Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: set jobs dependency by schema and table name #49699

Merged
merged 13 commits into from
Dec 26, 2023
2 changes: 2 additions & 0 deletions pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_library(
"ddl.go",
"ddl_algorithm.go",
"ddl_api.go",
"ddl_running_jobs.go",
"ddl_tiflash_api.go",
"ddl_worker.go",
"ddl_workerpool.go",
Expand Down Expand Up @@ -204,6 +205,7 @@ go_test(
"ddl_algorithm_test.go",
"ddl_api_test.go",
"ddl_error_test.go",
"ddl_running_jobs_test.go",
"ddl_test.go",
"ddl_worker_test.go",
"ddl_workerpool_test.go",
Expand Down
13 changes: 4 additions & 9 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,13 +362,9 @@ type ddlCtx struct {

*waitSchemaSyncedController
*schemaVersionManager
// recording the running jobs.
runningJobs struct {
sync.RWMutex
ids map[int64]struct{}
}
// It holds the running DDL jobs ID.
runningJobIDs []string

runningJobs *runningJobs

// reorgCtx is used for reorganization.
reorgCtx reorgContexts
// backfillCtx is used for backfill workers.
Expand Down Expand Up @@ -660,15 +656,14 @@ func newDDL(ctx context.Context, options ...Option) *ddl {
autoidCli: opt.AutoIDClient,
schemaVersionManager: newSchemaVersionManager(),
waitSchemaSyncedController: newWaitSchemaSyncedController(),
runningJobIDs: make([]string, 0, jobRecordCapacity),
runningJobs: newRunningJobs(),
}
ddlCtx.reorgCtx.reorgCtxMap = make(map[int64]*reorgCtx)
ddlCtx.jobCtx.jobCtxMap = make(map[int64]*JobContext)
ddlCtx.mu.hook = opt.Hook
ddlCtx.mu.interceptor = &BaseInterceptor{}
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnDDL)
ddlCtx.ctx, ddlCtx.cancel = context.WithCancel(ctx)
ddlCtx.runningJobs.ids = make(map[int64]struct{})

d := &ddl{
ddlCtx: ddlCtx,
Expand Down
121 changes: 95 additions & 26 deletions pkg/ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,8 @@ func (d *ddl) AlterTablePlacement(ctx sessionctx.Context, ident ast.Ident, place
return nil
}

if tb.Meta().TempTableType != model.TempTableNone {
tblInfo := tb.Meta()
if tblInfo.TempTableType != model.TempTableNone {
return errors.Trace(dbterror.ErrOptOnTemporaryTable.GenWithStackByArgs("placement"))
}

Expand All @@ -491,9 +492,9 @@ func (d *ddl) AlterTablePlacement(ctx sessionctx.Context, ident ast.Ident, place

job := &model.Job{
SchemaID: schema.ID,
TableID: tb.Meta().ID,
TableID: tblInfo.ID,
SchemaName: schema.Name.L,
TableName: tb.Meta().Name.L,
TableName: tblInfo.Name.L,
Type: model.ActionAlterTablePlacement,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{placementPolicyRef},
Expand Down Expand Up @@ -656,6 +657,10 @@ func (d *ddl) RecoverSchema(ctx sessionctx.Context, recoverSchemaInfo *RecoverSc
Type: model.ActionRecoverSchema,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{recoverSchemaInfo, recoverCheckFlagNone},
InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{
Database: recoverSchemaInfo.Name.L,
Table: model.InvolvingAll,
}},
}
err := d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)
Expand Down Expand Up @@ -2829,6 +2834,11 @@ func (d *ddl) BatchCreateTableWithInfo(ctx sessionctx.Context,
return errors.Trace(fmt.Errorf("except table info"))
}
args = append(args, info)
jobs.InvolvingSchemaInfo = append(jobs.InvolvingSchemaInfo,
model.InvolvingSchemaInfo{
Database: dbName.L,
Table: info.Name.L,
})
}
if len(args) == 0 {
return nil
Expand Down Expand Up @@ -2893,6 +2903,11 @@ func (d *ddl) CreatePlacementPolicyWithInfo(ctx sessionctx.Context, policy *mode
Type: model.ActionCreatePlacementPolicy,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{policy, onExist == OnExistReplace},
// CREATE PLACEMENT does not affect any schemas or tables.
InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{
Database: model.InvolvingNone,
Table: model.InvolvingNone,
}},
}
err = d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)
Expand Down Expand Up @@ -2956,6 +2971,11 @@ func (d *ddl) FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64) error
0, /* commitTS */
variable.On, /* tidb_ttl_job_enable */
[]kv.KeyRange{} /* flashback key_ranges */},
// FLASHBACK CLUSTER affects all schemas and tables.
InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{
Database: model.InvolvingAll,
Table: model.InvolvingAll,
}},
}
err = d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)
Expand Down Expand Up @@ -3889,10 +3909,10 @@ func (d *ddl) RebaseAutoID(ctx sessionctx.Context, ident ast.Ident, newBase int6
if err != nil {
return errors.Trace(err)
}
tbInfo := t.Meta()
var actionType model.ActionType
switch tp {
case autoid.AutoRandomType:
tbInfo := t.Meta()
pkCol := tbInfo.GetPkColInfo()
if tbInfo.AutoRandomBits == 0 || pkCol == nil {
return errors.Trace(dbterror.ErrInvalidAutoRandom.GenWithStackByArgs(autoid.AutoRandomRebaseNotApplicable))
Expand Down Expand Up @@ -3926,9 +3946,9 @@ func (d *ddl) RebaseAutoID(ctx sessionctx.Context, ident ast.Ident, newBase int6
}
job := &model.Job{
SchemaID: schema.ID,
TableID: t.Meta().ID,
TableID: tbInfo.ID,
SchemaName: schema.Name.L,
TableName: t.Meta().Name.L,
TableName: tbInfo.Name.L,
Type: actionType,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{newBase, force},
Expand Down Expand Up @@ -3961,14 +3981,15 @@ func (d *ddl) ShardRowID(ctx sessionctx.Context, tableIdent ast.Ident, uVal uint
if err != nil {
return errors.Trace(err)
}
if t.Meta().TempTableType != model.TempTableNone {
tbInfo := t.Meta()
if tbInfo.TempTableType != model.TempTableNone {
return dbterror.ErrOptOnTemporaryTable.GenWithStackByArgs("shard_row_id_bits")
}
if uVal == t.Meta().ShardRowIDBits {
if uVal == tbInfo.ShardRowIDBits {
// Nothing need to do.
return nil
}
if uVal > 0 && t.Meta().HasClusteredIndex() {
if uVal > 0 && tbInfo.HasClusteredIndex() {
return dbterror.ErrUnsupportedShardRowIDBits
}
err = verifyNoOverflowShardBits(d.sessPool, t, uVal)
Expand All @@ -3978,9 +3999,9 @@ func (d *ddl) ShardRowID(ctx sessionctx.Context, tableIdent ast.Ident, uVal uint
job := &model.Job{
Type: model.ActionShardRowID,
SchemaID: schema.ID,
TableID: t.Meta().ID,
TableID: tbInfo.ID,
SchemaName: schema.Name.L,
TableName: t.Meta().Name.L,
TableName: tbInfo.Name.L,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{uVal},
}
Expand Down Expand Up @@ -4141,6 +4162,7 @@ func (d *ddl) AddColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTab
if err != nil {
return errors.Trace(err)
}
tbInfo := t.Meta()
if err = checkAddColumnTooManyColumns(len(t.Cols()) + 1); err != nil {
return errors.Trace(err)
}
Expand All @@ -4152,16 +4174,16 @@ func (d *ddl) AddColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTab
if col == nil {
return nil
}
err = CheckAfterPositionExists(t.Meta(), spec.Position)
err = CheckAfterPositionExists(tbInfo, spec.Position)
if err != nil {
return errors.Trace(err)
}

job := &model.Job{
SchemaID: schema.ID,
TableID: t.Meta().ID,
TableID: tbInfo.ID,
SchemaName: schema.Name.L,
TableName: t.Meta().Name.L,
TableName: tbInfo.Name.L,
Type: model.ActionAddColumn,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{col, spec.Position, 0, spec.IfNotExists},
Expand Down Expand Up @@ -5044,6 +5066,10 @@ func (d *ddl) ExchangeTablePartition(ctx sessionctx.Context, ident ast.Ident, sp
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{defID, ptSchema.ID, ptMeta.ID, partName, spec.WithValidation},
CtxVars: []interface{}{[]int64{ntSchema.ID, ptSchema.ID}, []int64{ntMeta.ID, ptMeta.ID}},
InvolvingSchemaInfo: []model.InvolvingSchemaInfo{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw you have added one blank line since 6937, please make it all consistent.

{Database: ptSchema.Name.L, Table: ptMeta.Name.L},
{Database: ntSchema.Name.L, Table: ntMeta.Name.L},
},
}

err = d.DoDDLJob(ctx, job)
Expand Down Expand Up @@ -6849,6 +6875,10 @@ func (d *ddl) renameTable(ctx sessionctx.Context, oldIdent, newIdent ast.Ident,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{schemas[0].ID, newIdent.Name, schemas[0].Name},
CtxVars: []interface{}{[]int64{schemas[0].ID, schemas[1].ID}, []int64{tableID}},
InvolvingSchemaInfo: []model.InvolvingSchemaInfo{
{Database: schemas[0].Name.L, Table: oldIdent.Name.L},
{Database: schemas[1].Name.L, Table: newIdent.Name.L},
},
}

err = d.DoDDLJob(ctx, job)
Expand All @@ -6864,6 +6894,7 @@ func (d *ddl) renameTables(ctx sessionctx.Context, oldIdents, newIdents []ast.Id
newSchemaIDs := make([]int64, 0, len(oldIdents))
tableIDs := make([]int64, 0, len(oldIdents))
oldSchemaNames := make([]*model.CIStr, 0, len(oldIdents))
involveSchemaInfo := make([]model.InvolvingSchemaInfo, 0, len(oldIdents)*2)

var schemas []*model.DBInfo
var tableID int64
Expand All @@ -6888,16 +6919,22 @@ func (d *ddl) renameTables(ctx sessionctx.Context, oldIdents, newIdents []ast.Id
oldSchemaIDs = append(oldSchemaIDs, schemas[0].ID)
newSchemaIDs = append(newSchemaIDs, schemas[1].ID)
oldSchemaNames = append(oldSchemaNames, &schemas[0].Name)
involveSchemaInfo = append(involveSchemaInfo, model.InvolvingSchemaInfo{
Database: schemas[0].Name.L, Table: oldIdents[i].Name.L,
}, model.InvolvingSchemaInfo{
Database: schemas[1].Name.L, Table: newIdents[i].Name.L,
})
}

job := &model.Job{
SchemaID: schemas[1].ID,
TableID: tableIDs[0],
SchemaName: schemas[1].Name.L,
Type: model.ActionRenameTables,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{oldSchemaIDs, newSchemaIDs, tableNames, tableIDs, oldSchemaNames, oldTableNames},
CtxVars: []interface{}{append(oldSchemaIDs, newSchemaIDs...), tableIDs},
SchemaID: schemas[1].ID,
TableID: tableIDs[0],
SchemaName: schemas[1].Name.L,
Type: model.ActionRenameTables,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{oldSchemaIDs, newSchemaIDs, tableNames, tableIDs, oldSchemaNames, oldTableNames},
CtxVars: []interface{}{append(oldSchemaIDs, newSchemaIDs...), tableIDs},
InvolvingSchemaInfo: involveSchemaInfo,
}

err = d.DoDDLJob(ctx, job)
Expand Down Expand Up @@ -7901,6 +7938,7 @@ func (d *ddl) LockTables(ctx sessionctx.Context, stmt *ast.LockTablesStmt) error
SessionID: ctx.GetSessionVars().ConnectionID,
}
uniqueTableID := make(map[int64]struct{})
involveSchemaInfo := make([]model.InvolvingSchemaInfo, 0, len(stmt.TableLocks))
// Check whether the table was already locked by another.
for _, tl := range stmt.TableLocks {
tb := tl.Table
Expand All @@ -7925,6 +7963,10 @@ func (d *ddl) LockTables(ctx sessionctx.Context, stmt *ast.LockTablesStmt) error
}
uniqueTableID[t.Meta().ID] = struct{}{}
lockTables = append(lockTables, model.TableLockTpInfo{SchemaID: schema.ID, TableID: t.Meta().ID, Tp: tl.Type})
involveSchemaInfo = append(involveSchemaInfo, model.InvolvingSchemaInfo{
Database: schema.Name.L,
Table: t.Meta().Name.L,
})
}

unlockTables := ctx.GetAllTableLocks()
Expand All @@ -7939,6 +7981,8 @@ func (d *ddl) LockTables(ctx sessionctx.Context, stmt *ast.LockTablesStmt) error
Type: model.ActionLockTable,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{arg},

InvolvingSchemaInfo: involveSchemaInfo,
}
// AddTableLock here is avoiding this job was executed successfully but the session was killed before return.
ctx.AddTableLock(lockTables)
Expand Down Expand Up @@ -8567,6 +8611,10 @@ func (d *ddl) AddResourceGroup(ctx sessionctx.Context, stmt *ast.CreateResourceG
Type: model.ActionCreateResourceGroup,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{groupInfo, false},
InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{
Database: model.InvolvingNone,
Table: model.InvolvingNone,
}},
}
err = d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)
Expand Down Expand Up @@ -8613,6 +8661,10 @@ func (d *ddl) DropResourceGroup(ctx sessionctx.Context, stmt *ast.DropResourceGr
Type: model.ActionDropResourceGroup,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{groupName},
InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{
Database: model.InvolvingNone,
Table: model.InvolvingNone,
}},
}
err = d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)
Expand Down Expand Up @@ -8665,6 +8717,10 @@ func (d *ddl) AlterResourceGroup(ctx sessionctx.Context, stmt *ast.AlterResource
Type: model.ActionAlterResourceGroup,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{newGroupInfo},
InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{
Database: model.InvolvingNone,
Table: model.InvolvingNone,
}},
}
err = d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)
Expand Down Expand Up @@ -8725,6 +8781,10 @@ func (d *ddl) DropPlacementPolicy(ctx sessionctx.Context, stmt *ast.DropPlacemen
Type: model.ActionDropPlacementPolicy,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{policyName},
InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{
Database: model.InvolvingNone,
Table: model.InvolvingNone,
}},
}
err = d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)
Expand Down Expand Up @@ -8759,6 +8819,10 @@ func (d *ddl) AlterPlacementPolicy(ctx sessionctx.Context, stmt *ast.AlterPlacem
Type: model.ActionAlterPlacementPolicy,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{newPolicyInfo},
InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{
Database: model.InvolvingNone,
Table: model.InvolvingNone,
}},
}
err = d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)
Expand Down Expand Up @@ -8958,8 +9022,9 @@ func (d *ddl) CreateCheckConstraint(ctx sessionctx.Context, ti ast.Ident, constr
}
job := &model.Job{
SchemaID: schema.ID,
TableID: t.Meta().ID,
TableID: tblInfo.ID,
SchemaName: schema.Name.L,
TableName: tblInfo.Name.L,
Type: model.ActionAddCheckConstraint,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{constraintInfo},
Expand All @@ -8981,16 +9046,18 @@ func (d *ddl) DropCheckConstraint(ctx sessionctx.Context, ti ast.Ident, constrNa
if err != nil {
return errors.Trace(infoschema.ErrTableNotExists.GenWithStackByArgs(ti.Schema, ti.Name))
}
tblInfo := t.Meta()

constraintInfo := t.Meta().FindConstraintInfoByName(constrName.L)
constraintInfo := tblInfo.FindConstraintInfoByName(constrName.L)
if constraintInfo == nil {
return dbterror.ErrConstraintNotFound.GenWithStackByArgs(constrName)
}

job := &model.Job{
SchemaID: schema.ID,
TableID: t.Meta().ID,
TableID: tblInfo.ID,
SchemaName: schema.Name.L,
TableName: tblInfo.Name.L,
Type: model.ActionDropCheckConstraint,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{constrName},
Expand All @@ -9011,16 +9078,18 @@ func (d *ddl) AlterCheckConstraint(ctx sessionctx.Context, ti ast.Ident, constrN
if err != nil {
return errors.Trace(infoschema.ErrTableNotExists.GenWithStackByArgs(ti.Schema, ti.Name))
}
tblInfo := t.Meta()

constraintInfo := t.Meta().FindConstraintInfoByName(constrName.L)
constraintInfo := tblInfo.FindConstraintInfoByName(constrName.L)
if constraintInfo == nil {
return dbterror.ErrConstraintNotFound.GenWithStackByArgs(constrName)
}

job := &model.Job{
SchemaID: schema.ID,
TableID: t.Meta().ID,
TableID: tblInfo.ID,
SchemaName: schema.Name.L,
TableName: tblInfo.Name.L,
Type: model.ActionAlterCheckConstraint,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{constrName, enforced},
Expand Down
Loading