diff --git a/ddl/ddl.go b/ddl/ddl.go index 81c0d01bb3d52..ced80b86c315a 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -109,13 +109,11 @@ type DDL interface { DropTable(ctx sessionctx.Context, stmt *ast.DropTableStmt) (err error) RecoverTable(ctx sessionctx.Context, recoverInfo *RecoverInfo) (err error) DropView(ctx sessionctx.Context, stmt *ast.DropTableStmt) (err error) - CreateIndex(ctx sessionctx.Context, tableIdent ast.Ident, keyType ast.IndexKeyType, indexName model.CIStr, - columnNames []*ast.IndexPartSpecification, indexOption *ast.IndexOption, ifNotExists bool) error - DropIndex(ctx sessionctx.Context, tableIdent ast.Ident, indexName model.CIStr, ifExists bool) error - AlterTable(ctx context.Context, sctx sessionctx.Context, tableIdent ast.Ident, spec []*ast.AlterTableSpec) error + CreateIndex(ctx sessionctx.Context, stmt *ast.CreateIndexStmt) error + DropIndex(ctx sessionctx.Context, stmt *ast.DropIndexStmt) error + AlterTable(ctx context.Context, sctx sessionctx.Context, stmt *ast.AlterTableStmt) error TruncateTable(ctx sessionctx.Context, tableIdent ast.Ident) error - RenameTable(ctx sessionctx.Context, oldTableIdent, newTableIdent ast.Ident, isAlterTable bool) error - RenameTables(ctx sessionctx.Context, oldTableIdent, newTableIdent []ast.Ident, isAlterTable bool) error + RenameTable(ctx sessionctx.Context, stmt *ast.RenameTableStmt) error LockTables(ctx sessionctx.Context, stmt *ast.LockTablesStmt) error UnlockTables(ctx sessionctx.Context, lockedTables []model.TableLockTpInfo) error CleanupTableLock(ctx sessionctx.Context, tables []*ast.TableName) error diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index d0b799731a779..2dcde0a82f03b 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -3055,8 +3055,10 @@ func checkMultiSpecs(sctx sessionctx.Context, specs []*ast.AlterTableSpec) error return nil } -func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, ident ast.Ident, specs []*ast.AlterTableSpec) (err error) { - validSpecs, err := resolveAlterTableSpec(sctx, specs) +func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, stmt *ast.AlterTableStmt) (err error) { + ident := ast.Ident{Schema: stmt.Table.Schema, Name: stmt.Table.Name} + + validSpecs, err := resolveAlterTableSpec(sctx, stmt.Specs) if err != nil { return errors.Trace(err) } @@ -3131,9 +3133,9 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, ident ast case ast.AlterTableDropColumn: err = d.DropColumn(sctx, ident, spec) case ast.AlterTableDropIndex: - err = d.DropIndex(sctx, ident, model.NewCIStr(spec.Name), spec.IfExists) + err = d.dropIndex(sctx, ident, model.NewCIStr(spec.Name), spec.IfExists) case ast.AlterTableDropPrimaryKey: - err = d.DropIndex(sctx, ident, model.NewCIStr(mysql.PrimaryKeyName), spec.IfExists) + err = d.dropIndex(sctx, ident, model.NewCIStr(mysql.PrimaryKeyName), spec.IfExists) case ast.AlterTableRenameIndex: err = d.RenameIndex(sctx, ident, spec) case ast.AlterTableDropPartition: @@ -3164,10 +3166,10 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, ident ast constr := spec.Constraint switch spec.Constraint.Tp { case ast.ConstraintKey, ast.ConstraintIndex: - err = d.CreateIndex(sctx, ident, ast.IndexKeyTypeNone, model.NewCIStr(constr.Name), + err = d.createIndex(sctx, ident, ast.IndexKeyTypeNone, model.NewCIStr(constr.Name), spec.Constraint.Keys, constr.Option, constr.IfNotExists) case ast.ConstraintUniq, ast.ConstraintUniqIndex, ast.ConstraintUniqKey: - err = d.CreateIndex(sctx, ident, ast.IndexKeyTypeUnique, model.NewCIStr(constr.Name), + err = d.createIndex(sctx, ident, ast.IndexKeyTypeUnique, model.NewCIStr(constr.Name), spec.Constraint.Keys, constr.Option, false) // IfNotExists should be not applied case ast.ConstraintForeignKey: // NOTE: we do not handle `symbol` and `index_name` well in the parser and we do not check ForeignKey already exists, @@ -3196,7 +3198,7 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, ident ast case ast.AlterTableRenameTable: newIdent := ast.Ident{Schema: spec.NewTable.Schema, Name: spec.NewTable.Name} isAlterTable := true - err = d.RenameTable(sctx, ident, newIdent, isAlterTable) + err = d.renameTable(sctx, ident, newIdent, isAlterTable) case ast.AlterTablePartition: // Prevent silent succeed if user executes ALTER TABLE x PARTITION BY ... err = errors.New("alter table partition is unsupported") @@ -5547,7 +5549,28 @@ func (d *ddl) TruncateTable(ctx sessionctx.Context, ti ast.Ident) error { return nil } -func (d *ddl) RenameTable(ctx sessionctx.Context, oldIdent, newIdent ast.Ident, isAlterTable bool) error { +func (d *ddl) RenameTable(ctx sessionctx.Context, s *ast.RenameTableStmt) error { + isAlterTable := false + var err error + if len(s.TableToTables) == 1 { + oldIdent := ast.Ident{Schema: s.TableToTables[0].OldTable.Schema, Name: s.TableToTables[0].OldTable.Name} + newIdent := ast.Ident{Schema: s.TableToTables[0].NewTable.Schema, Name: s.TableToTables[0].NewTable.Name} + err = d.renameTable(ctx, oldIdent, newIdent, isAlterTable) + } else { + oldIdents := make([]ast.Ident, 0, len(s.TableToTables)) + newIdents := make([]ast.Ident, 0, len(s.TableToTables)) + for _, tables := range s.TableToTables { + oldIdent := ast.Ident{Schema: tables.OldTable.Schema, Name: tables.OldTable.Name} + newIdent := ast.Ident{Schema: tables.NewTable.Schema, Name: tables.NewTable.Name} + oldIdents = append(oldIdents, oldIdent) + newIdents = append(newIdents, newIdent) + } + err = d.renameTables(ctx, oldIdents, newIdents, isAlterTable) + } + return err +} + +func (d *ddl) renameTable(ctx sessionctx.Context, oldIdent, newIdent ast.Ident, isAlterTable bool) error { is := d.GetInfoSchemaWithInterceptor(ctx) tables := make(map[string]int64) schemas, tableID, err := extractTblInfos(is, oldIdent, newIdent, isAlterTable, tables) @@ -5580,7 +5603,7 @@ func (d *ddl) RenameTable(ctx sessionctx.Context, oldIdent, newIdent ast.Ident, return errors.Trace(err) } -func (d *ddl) RenameTables(ctx sessionctx.Context, oldIdents, newIdents []ast.Ident, isAlterTable bool) error { +func (d *ddl) renameTables(ctx sessionctx.Context, oldIdents, newIdents []ast.Ident, isAlterTable bool) error { is := d.GetInfoSchemaWithInterceptor(ctx) oldTableNames := make([]*model.CIStr, 0, len(oldIdents)) tableNames := make([]*model.CIStr, 0, len(oldIdents)) @@ -5905,7 +5928,13 @@ func buildHiddenColumnInfo(ctx sessionctx.Context, indexPartSpecifications []*as return hiddenCols, nil } -func (d *ddl) CreateIndex(ctx sessionctx.Context, ti ast.Ident, keyType ast.IndexKeyType, indexName model.CIStr, +func (d *ddl) CreateIndex(ctx sessionctx.Context, stmt *ast.CreateIndexStmt) error { + ident := ast.Ident{Schema: stmt.Table.Schema, Name: stmt.Table.Name} + return d.createIndex(ctx, ident, stmt.KeyType, model.NewCIStr(stmt.IndexName), + stmt.IndexPartSpecifications, stmt.IndexOption, stmt.IfNotExists) +} + +func (d *ddl) createIndex(ctx sessionctx.Context, ti ast.Ident, keyType ast.IndexKeyType, indexName model.CIStr, indexPartSpecifications []*ast.IndexPartSpecification, indexOption *ast.IndexOption, ifNotExists bool) error { // not support Spatial and FullText index if keyType == ast.IndexKeyTypeFullText || keyType == ast.IndexKeyTypeSpatial { @@ -6167,7 +6196,16 @@ func (d *ddl) DropForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName model. return errors.Trace(err) } -func (d *ddl) DropIndex(ctx sessionctx.Context, ti ast.Ident, indexName model.CIStr, ifExists bool) error { +func (d *ddl) DropIndex(ctx sessionctx.Context, stmt *ast.DropIndexStmt) error { + ti := ast.Ident{Schema: stmt.Table.Schema, Name: stmt.Table.Name} + err := d.dropIndex(ctx, ti, model.NewCIStr(stmt.IndexName), stmt.IfExists) + if (infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableNotExists.Equal(err)) && stmt.IfExists { + err = nil + } + return err +} + +func (d *ddl) dropIndex(ctx sessionctx.Context, ti ast.Ident, indexName model.CIStr, ifExists bool) error { is := d.infoCache.GetLatest() schema, ok := is.SchemaByName(ti.Schema) if !ok { diff --git a/executor/ddl.go b/executor/ddl.go index 68115f2014e44..269014684d52a 100644 --- a/executor/ddl.go +++ b/executor/ddl.go @@ -222,30 +222,12 @@ func (e *DDLExec) executeTruncateTable(s *ast.TruncateTableStmt) error { } func (e *DDLExec) executeRenameTable(s *ast.RenameTableStmt) error { - isAlterTable := false - var err error - if len(s.TableToTables) == 1 { - oldIdent := ast.Ident{Schema: s.TableToTables[0].OldTable.Schema, Name: s.TableToTables[0].OldTable.Name} - if _, ok := e.getLocalTemporaryTable(oldIdent.Schema, oldIdent.Name); ok { + for _, tables := range s.TableToTables { + if _, ok := e.getLocalTemporaryTable(tables.OldTable.Schema, tables.OldTable.Name); ok { return dbterror.ErrUnsupportedLocalTempTableDDL.GenWithStackByArgs("RENAME TABLE") } - newIdent := ast.Ident{Schema: s.TableToTables[0].NewTable.Schema, Name: s.TableToTables[0].NewTable.Name} - err = domain.GetDomain(e.ctx).DDL().RenameTable(e.ctx, oldIdent, newIdent, isAlterTable) - } else { - oldIdents := make([]ast.Ident, 0, len(s.TableToTables)) - newIdents := make([]ast.Ident, 0, len(s.TableToTables)) - for _, tables := range s.TableToTables { - oldIdent := ast.Ident{Schema: tables.OldTable.Schema, Name: tables.OldTable.Name} - if _, ok := e.getLocalTemporaryTable(oldIdent.Schema, oldIdent.Name); ok { - return dbterror.ErrUnsupportedLocalTempTableDDL.GenWithStackByArgs("RENAME TABLE") - } - newIdent := ast.Ident{Schema: tables.NewTable.Schema, Name: tables.NewTable.Name} - oldIdents = append(oldIdents, oldIdent) - newIdents = append(newIdents, newIdent) - } - err = domain.GetDomain(e.ctx).DDL().RenameTables(e.ctx, oldIdents, newIdents, isAlterTable) } - return err + return domain.GetDomain(e.ctx).DDL().RenameTable(e.ctx, s) } func (e *DDLExec) executeCreateDatabase(s *ast.CreateDatabaseStmt) error { @@ -302,14 +284,11 @@ func (e *DDLExec) executeCreateView(s *ast.CreateViewStmt) error { } func (e *DDLExec) executeCreateIndex(s *ast.CreateIndexStmt) error { - ident := ast.Ident{Schema: s.Table.Schema, Name: s.Table.Name} - if _, ok := e.getLocalTemporaryTable(ident.Schema, ident.Name); ok { + if _, ok := e.getLocalTemporaryTable(s.Table.Schema, s.Table.Name); ok { return dbterror.ErrUnsupportedLocalTempTableDDL.GenWithStackByArgs("CREATE INDEX") } - err := domain.GetDomain(e.ctx).DDL().CreateIndex(e.ctx, ident, s.KeyType, model.NewCIStr(s.IndexName), - s.IndexPartSpecifications, s.IndexOption, s.IfNotExists) - return err + return domain.GetDomain(e.ctx).DDL().CreateIndex(e.ctx, s) } func (e *DDLExec) executeDropDatabase(s *ast.DropDatabaseStmt) error { @@ -365,26 +344,19 @@ func (e *DDLExec) dropLocalTemporaryTables(localTempTables []*ast.TableName) err } func (e *DDLExec) executeDropIndex(s *ast.DropIndexStmt) error { - ti := ast.Ident{Schema: s.Table.Schema, Name: s.Table.Name} - if _, ok := e.getLocalTemporaryTable(ti.Schema, ti.Name); ok { + if _, ok := e.getLocalTemporaryTable(s.Table.Schema, s.Table.Name); ok { return dbterror.ErrUnsupportedLocalTempTableDDL.GenWithStackByArgs("DROP INDEX") } - err := domain.GetDomain(e.ctx).DDL().DropIndex(e.ctx, ti, model.NewCIStr(s.IndexName), s.IfExists) - if (infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableNotExists.Equal(err)) && s.IfExists { - err = nil - } - return err + return domain.GetDomain(e.ctx).DDL().DropIndex(e.ctx, s) } func (e *DDLExec) executeAlterTable(ctx context.Context, s *ast.AlterTableStmt) error { - ti := ast.Ident{Schema: s.Table.Schema, Name: s.Table.Name} - if _, ok := e.getLocalTemporaryTable(ti.Schema, ti.Name); ok { + if _, ok := e.getLocalTemporaryTable(s.Table.Schema, s.Table.Name); ok { return dbterror.ErrUnsupportedLocalTempTableDDL.GenWithStackByArgs("ALTER TABLE") } - err := domain.GetDomain(e.ctx).DDL().AlterTable(ctx, e.ctx, ti, s.Specs) - return err + return domain.GetDomain(e.ctx).DDL().AlterTable(ctx, e.ctx, s) } // executeRecoverTable represents a recover table executor.