Skip to content

Commit

Permalink
ddl: CREATE/DROP INDEX, ALTER TABLE now use XXXStmt as param (#35819)
Browse files Browse the repository at this point in the history
close #35665
  • Loading branch information
lance6716 authored Jun 30, 2022
1 parent 7567f07 commit fe4de25
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 54 deletions.
10 changes: 4 additions & 6 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,11 @@ type DDL interface {
DropTable(ctx sessionctx.Context, stmt *ast.DropTableStmt) (err error)
RecoverTable(ctx sessionctx.Context, recoverInfo *RecoverInfo) (err error)
DropView(ctx sessionctx.Context, stmt *ast.DropTableStmt) (err error)
CreateIndex(ctx sessionctx.Context, tableIdent ast.Ident, keyType ast.IndexKeyType, indexName model.CIStr,
columnNames []*ast.IndexPartSpecification, indexOption *ast.IndexOption, ifNotExists bool) error
DropIndex(ctx sessionctx.Context, tableIdent ast.Ident, indexName model.CIStr, ifExists bool) error
AlterTable(ctx context.Context, sctx sessionctx.Context, tableIdent ast.Ident, spec []*ast.AlterTableSpec) error
CreateIndex(ctx sessionctx.Context, stmt *ast.CreateIndexStmt) error
DropIndex(ctx sessionctx.Context, stmt *ast.DropIndexStmt) error
AlterTable(ctx context.Context, sctx sessionctx.Context, stmt *ast.AlterTableStmt) error
TruncateTable(ctx sessionctx.Context, tableIdent ast.Ident) error
RenameTable(ctx sessionctx.Context, oldTableIdent, newTableIdent ast.Ident, isAlterTable bool) error
RenameTables(ctx sessionctx.Context, oldTableIdent, newTableIdent []ast.Ident, isAlterTable bool) error
RenameTable(ctx sessionctx.Context, stmt *ast.RenameTableStmt) error
LockTables(ctx sessionctx.Context, stmt *ast.LockTablesStmt) error
UnlockTables(ctx sessionctx.Context, lockedTables []model.TableLockTpInfo) error
CleanupTableLock(ctx sessionctx.Context, tables []*ast.TableName) error
Expand Down
60 changes: 49 additions & 11 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3055,8 +3055,10 @@ func checkMultiSpecs(sctx sessionctx.Context, specs []*ast.AlterTableSpec) error
return nil
}

func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, ident ast.Ident, specs []*ast.AlterTableSpec) (err error) {
validSpecs, err := resolveAlterTableSpec(sctx, specs)
func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, stmt *ast.AlterTableStmt) (err error) {
ident := ast.Ident{Schema: stmt.Table.Schema, Name: stmt.Table.Name}

validSpecs, err := resolveAlterTableSpec(sctx, stmt.Specs)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -3131,9 +3133,9 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, ident ast
case ast.AlterTableDropColumn:
err = d.DropColumn(sctx, ident, spec)
case ast.AlterTableDropIndex:
err = d.DropIndex(sctx, ident, model.NewCIStr(spec.Name), spec.IfExists)
err = d.dropIndex(sctx, ident, model.NewCIStr(spec.Name), spec.IfExists)
case ast.AlterTableDropPrimaryKey:
err = d.DropIndex(sctx, ident, model.NewCIStr(mysql.PrimaryKeyName), spec.IfExists)
err = d.dropIndex(sctx, ident, model.NewCIStr(mysql.PrimaryKeyName), spec.IfExists)
case ast.AlterTableRenameIndex:
err = d.RenameIndex(sctx, ident, spec)
case ast.AlterTableDropPartition:
Expand Down Expand Up @@ -3164,10 +3166,10 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, ident ast
constr := spec.Constraint
switch spec.Constraint.Tp {
case ast.ConstraintKey, ast.ConstraintIndex:
err = d.CreateIndex(sctx, ident, ast.IndexKeyTypeNone, model.NewCIStr(constr.Name),
err = d.createIndex(sctx, ident, ast.IndexKeyTypeNone, model.NewCIStr(constr.Name),
spec.Constraint.Keys, constr.Option, constr.IfNotExists)
case ast.ConstraintUniq, ast.ConstraintUniqIndex, ast.ConstraintUniqKey:
err = d.CreateIndex(sctx, ident, ast.IndexKeyTypeUnique, model.NewCIStr(constr.Name),
err = d.createIndex(sctx, ident, ast.IndexKeyTypeUnique, model.NewCIStr(constr.Name),
spec.Constraint.Keys, constr.Option, false) // IfNotExists should be not applied
case ast.ConstraintForeignKey:
// NOTE: we do not handle `symbol` and `index_name` well in the parser and we do not check ForeignKey already exists,
Expand Down Expand Up @@ -3196,7 +3198,7 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, ident ast
case ast.AlterTableRenameTable:
newIdent := ast.Ident{Schema: spec.NewTable.Schema, Name: spec.NewTable.Name}
isAlterTable := true
err = d.RenameTable(sctx, ident, newIdent, isAlterTable)
err = d.renameTable(sctx, ident, newIdent, isAlterTable)
case ast.AlterTablePartition:
// Prevent silent succeed if user executes ALTER TABLE x PARTITION BY ...
err = errors.New("alter table partition is unsupported")
Expand Down Expand Up @@ -5547,7 +5549,28 @@ func (d *ddl) TruncateTable(ctx sessionctx.Context, ti ast.Ident) error {
return nil
}

func (d *ddl) RenameTable(ctx sessionctx.Context, oldIdent, newIdent ast.Ident, isAlterTable bool) error {
func (d *ddl) RenameTable(ctx sessionctx.Context, s *ast.RenameTableStmt) error {
isAlterTable := false
var err error
if len(s.TableToTables) == 1 {
oldIdent := ast.Ident{Schema: s.TableToTables[0].OldTable.Schema, Name: s.TableToTables[0].OldTable.Name}
newIdent := ast.Ident{Schema: s.TableToTables[0].NewTable.Schema, Name: s.TableToTables[0].NewTable.Name}
err = d.renameTable(ctx, oldIdent, newIdent, isAlterTable)
} else {
oldIdents := make([]ast.Ident, 0, len(s.TableToTables))
newIdents := make([]ast.Ident, 0, len(s.TableToTables))
for _, tables := range s.TableToTables {
oldIdent := ast.Ident{Schema: tables.OldTable.Schema, Name: tables.OldTable.Name}
newIdent := ast.Ident{Schema: tables.NewTable.Schema, Name: tables.NewTable.Name}
oldIdents = append(oldIdents, oldIdent)
newIdents = append(newIdents, newIdent)
}
err = d.renameTables(ctx, oldIdents, newIdents, isAlterTable)
}
return err
}

func (d *ddl) renameTable(ctx sessionctx.Context, oldIdent, newIdent ast.Ident, isAlterTable bool) error {
is := d.GetInfoSchemaWithInterceptor(ctx)
tables := make(map[string]int64)
schemas, tableID, err := extractTblInfos(is, oldIdent, newIdent, isAlterTable, tables)
Expand Down Expand Up @@ -5580,7 +5603,7 @@ func (d *ddl) RenameTable(ctx sessionctx.Context, oldIdent, newIdent ast.Ident,
return errors.Trace(err)
}

func (d *ddl) RenameTables(ctx sessionctx.Context, oldIdents, newIdents []ast.Ident, isAlterTable bool) error {
func (d *ddl) renameTables(ctx sessionctx.Context, oldIdents, newIdents []ast.Ident, isAlterTable bool) error {
is := d.GetInfoSchemaWithInterceptor(ctx)
oldTableNames := make([]*model.CIStr, 0, len(oldIdents))
tableNames := make([]*model.CIStr, 0, len(oldIdents))
Expand Down Expand Up @@ -5905,7 +5928,13 @@ func buildHiddenColumnInfo(ctx sessionctx.Context, indexPartSpecifications []*as
return hiddenCols, nil
}

func (d *ddl) CreateIndex(ctx sessionctx.Context, ti ast.Ident, keyType ast.IndexKeyType, indexName model.CIStr,
func (d *ddl) CreateIndex(ctx sessionctx.Context, stmt *ast.CreateIndexStmt) error {
ident := ast.Ident{Schema: stmt.Table.Schema, Name: stmt.Table.Name}
return d.createIndex(ctx, ident, stmt.KeyType, model.NewCIStr(stmt.IndexName),
stmt.IndexPartSpecifications, stmt.IndexOption, stmt.IfNotExists)
}

func (d *ddl) createIndex(ctx sessionctx.Context, ti ast.Ident, keyType ast.IndexKeyType, indexName model.CIStr,
indexPartSpecifications []*ast.IndexPartSpecification, indexOption *ast.IndexOption, ifNotExists bool) error {
// not support Spatial and FullText index
if keyType == ast.IndexKeyTypeFullText || keyType == ast.IndexKeyTypeSpatial {
Expand Down Expand Up @@ -6167,7 +6196,16 @@ func (d *ddl) DropForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName model.
return errors.Trace(err)
}

func (d *ddl) DropIndex(ctx sessionctx.Context, ti ast.Ident, indexName model.CIStr, ifExists bool) error {
func (d *ddl) DropIndex(ctx sessionctx.Context, stmt *ast.DropIndexStmt) error {
ti := ast.Ident{Schema: stmt.Table.Schema, Name: stmt.Table.Name}
err := d.dropIndex(ctx, ti, model.NewCIStr(stmt.IndexName), stmt.IfExists)
if (infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableNotExists.Equal(err)) && stmt.IfExists {
err = nil
}
return err
}

func (d *ddl) dropIndex(ctx sessionctx.Context, ti ast.Ident, indexName model.CIStr, ifExists bool) error {
is := d.infoCache.GetLatest()
schema, ok := is.SchemaByName(ti.Schema)
if !ok {
Expand Down
46 changes: 9 additions & 37 deletions executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,30 +222,12 @@ func (e *DDLExec) executeTruncateTable(s *ast.TruncateTableStmt) error {
}

func (e *DDLExec) executeRenameTable(s *ast.RenameTableStmt) error {
isAlterTable := false
var err error
if len(s.TableToTables) == 1 {
oldIdent := ast.Ident{Schema: s.TableToTables[0].OldTable.Schema, Name: s.TableToTables[0].OldTable.Name}
if _, ok := e.getLocalTemporaryTable(oldIdent.Schema, oldIdent.Name); ok {
for _, tables := range s.TableToTables {
if _, ok := e.getLocalTemporaryTable(tables.OldTable.Schema, tables.OldTable.Name); ok {
return dbterror.ErrUnsupportedLocalTempTableDDL.GenWithStackByArgs("RENAME TABLE")
}
newIdent := ast.Ident{Schema: s.TableToTables[0].NewTable.Schema, Name: s.TableToTables[0].NewTable.Name}
err = domain.GetDomain(e.ctx).DDL().RenameTable(e.ctx, oldIdent, newIdent, isAlterTable)
} else {
oldIdents := make([]ast.Ident, 0, len(s.TableToTables))
newIdents := make([]ast.Ident, 0, len(s.TableToTables))
for _, tables := range s.TableToTables {
oldIdent := ast.Ident{Schema: tables.OldTable.Schema, Name: tables.OldTable.Name}
if _, ok := e.getLocalTemporaryTable(oldIdent.Schema, oldIdent.Name); ok {
return dbterror.ErrUnsupportedLocalTempTableDDL.GenWithStackByArgs("RENAME TABLE")
}
newIdent := ast.Ident{Schema: tables.NewTable.Schema, Name: tables.NewTable.Name}
oldIdents = append(oldIdents, oldIdent)
newIdents = append(newIdents, newIdent)
}
err = domain.GetDomain(e.ctx).DDL().RenameTables(e.ctx, oldIdents, newIdents, isAlterTable)
}
return err
return domain.GetDomain(e.ctx).DDL().RenameTable(e.ctx, s)
}

func (e *DDLExec) executeCreateDatabase(s *ast.CreateDatabaseStmt) error {
Expand Down Expand Up @@ -302,14 +284,11 @@ func (e *DDLExec) executeCreateView(s *ast.CreateViewStmt) error {
}

func (e *DDLExec) executeCreateIndex(s *ast.CreateIndexStmt) error {
ident := ast.Ident{Schema: s.Table.Schema, Name: s.Table.Name}
if _, ok := e.getLocalTemporaryTable(ident.Schema, ident.Name); ok {
if _, ok := e.getLocalTemporaryTable(s.Table.Schema, s.Table.Name); ok {
return dbterror.ErrUnsupportedLocalTempTableDDL.GenWithStackByArgs("CREATE INDEX")
}

err := domain.GetDomain(e.ctx).DDL().CreateIndex(e.ctx, ident, s.KeyType, model.NewCIStr(s.IndexName),
s.IndexPartSpecifications, s.IndexOption, s.IfNotExists)
return err
return domain.GetDomain(e.ctx).DDL().CreateIndex(e.ctx, s)
}

func (e *DDLExec) executeDropDatabase(s *ast.DropDatabaseStmt) error {
Expand Down Expand Up @@ -365,26 +344,19 @@ func (e *DDLExec) dropLocalTemporaryTables(localTempTables []*ast.TableName) err
}

func (e *DDLExec) executeDropIndex(s *ast.DropIndexStmt) error {
ti := ast.Ident{Schema: s.Table.Schema, Name: s.Table.Name}
if _, ok := e.getLocalTemporaryTable(ti.Schema, ti.Name); ok {
if _, ok := e.getLocalTemporaryTable(s.Table.Schema, s.Table.Name); ok {
return dbterror.ErrUnsupportedLocalTempTableDDL.GenWithStackByArgs("DROP INDEX")
}

err := domain.GetDomain(e.ctx).DDL().DropIndex(e.ctx, ti, model.NewCIStr(s.IndexName), s.IfExists)
if (infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableNotExists.Equal(err)) && s.IfExists {
err = nil
}
return err
return domain.GetDomain(e.ctx).DDL().DropIndex(e.ctx, s)
}

func (e *DDLExec) executeAlterTable(ctx context.Context, s *ast.AlterTableStmt) error {
ti := ast.Ident{Schema: s.Table.Schema, Name: s.Table.Name}
if _, ok := e.getLocalTemporaryTable(ti.Schema, ti.Name); ok {
if _, ok := e.getLocalTemporaryTable(s.Table.Schema, s.Table.Name); ok {
return dbterror.ErrUnsupportedLocalTempTableDDL.GenWithStackByArgs("ALTER TABLE")
}

err := domain.GetDomain(e.ctx).DDL().AlterTable(ctx, e.ctx, ti, s.Specs)
return err
return domain.GetDomain(e.ctx).DDL().AlterTable(ctx, e.ctx, s)
}

// executeRecoverTable represents a recover table executor.
Expand Down

0 comments on commit fe4de25

Please sign in to comment.