Skip to content

Commit

Permalink
ddl: improve backfill more general (#41093)
Browse files Browse the repository at this point in the history
close #41002
  • Loading branch information
hawkingrei authored Feb 10, 2023
1 parent 86d10c5 commit 8531018
Show file tree
Hide file tree
Showing 17 changed files with 247 additions and 214 deletions.
24 changes: 9 additions & 15 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (bT backfillerType) String() string {
}
}

// BackfillJob is for a tidb_ddl_backfill table's record.
// BackfillJob is for a tidb_background_subtask table's record.
type BackfillJob struct {
ID int64
JobID int64
Expand All @@ -101,15 +101,9 @@ type BackfillJob struct {
State model.JobState
InstanceID string
InstanceLease types.Time
// range info
CurrKey []byte
StartKey []byte
EndKey []byte

StartTS uint64
FinishTS uint64
RowCount int64
Meta *model.BackfillMeta
StartTS uint64
StateUpdateTS uint64
Meta *model.BackfillMeta
}

// AbbrStr returns the BackfillJob's info without the Meta info.
Expand Down Expand Up @@ -321,7 +315,7 @@ func (w *backfillWorker) updateLease(execID string, bfJob *BackfillJob, nextKey
if err != nil {
return err
}
bfJob.CurrKey = nextKey
bfJob.Meta.CurrKey = nextKey
bfJob.InstanceID = execID
bfJob.InstanceLease = GetLeaseGoTime(leaseTime, InstanceLease)
return w.backfiller.UpdateTask(bfJob)
Expand Down Expand Up @@ -475,7 +469,7 @@ func (w *backfillWorker) runTask(task *reorgBackfillTask) (result *backfillResul
// Change the batch size dynamically.
w.GetCtx().batchCnt = int(variable.GetDDLReorgBatchSize())
result = w.handleBackfillTask(w.GetCtx().ddlCtx, task, w.backfiller)
task.bfJob.RowCount = int64(result.addedCount)
task.bfJob.Meta.RowCount = int64(result.addedCount)
if result.err != nil {
logutil.BgLogger().Warn("[ddl] backfill worker runTask failed",
zap.Stringer("worker", w), zap.String("backfillJob", task.bfJob.AbbrStr()), zap.Error(result.err))
Expand Down Expand Up @@ -1142,6 +1136,8 @@ func addBatchBackfillJobs(sess *session, reorgInfo *reorgInfo, sJobCtx *splitJob
Type: reorgInfo.Job.Type,
Query: reorgInfo.Job.Query,
},
StartKey: task.startKey,
EndKey: task.endKey,
}
bj := &BackfillJob{
ID: sJobCtx.currBackfillJobID.Add(1),
Expand All @@ -1152,11 +1148,9 @@ func addBatchBackfillJobs(sess *session, reorgInfo *reorgInfo, sJobCtx *splitJob
Tp: sJobCtx.bfWorkerType,
State: model.JobStateNone,
InstanceID: instanceID,
CurrKey: task.startKey,
StartKey: task.startKey,
EndKey: task.endKey,
Meta: bm,
}
bj.Meta.CurrKey = task.startKey
bJobs = append(bJobs, bj)
}
if err := AddBackfillJobs(sess, bJobs); err != nil {
Expand Down
82 changes: 37 additions & 45 deletions ddl/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ const (
ReorgTable = "tidb_ddl_reorg"
// HistoryTable stores the history DDL jobs.
HistoryTable = "tidb_ddl_history"
// BackfillTable stores the information of backfill jobs.
BackfillTable = "tidb_ddl_backfill"
// BackfillHistoryTable stores the information of history backfill jobs.
BackfillHistoryTable = "tidb_ddl_backfill_history"
// BackgroundSubtaskTable stores the information of backfill jobs.
BackgroundSubtaskTable = "tidb_background_subtask"
// BackgroundSubtaskHistoryTable stores the information of history backfill jobs.
BackgroundSubtaskHistoryTable = "tidb_background_subtask_history"

// JobTableID is the table ID of `tidb_ddl_job`.
JobTableID = meta.MaxInt48 - 1
Expand All @@ -38,53 +38,45 @@ const (
HistoryTableID = meta.MaxInt48 - 3
// MDLTableID is the table ID of `tidb_mdl_info`.
MDLTableID = meta.MaxInt48 - 4
// BackfillTableID is the table ID of `tidb_ddl_backfill`.
BackfillTableID = meta.MaxInt48 - 5
// BackfillHistoryTableID is the table ID of `tidb_ddl_backfill_history`.
BackfillHistoryTableID = meta.MaxInt48 - 6
// BackgroundSubtaskTableID is the table ID of `tidb_background_subtask`.
BackgroundSubtaskTableID = meta.MaxInt48 - 5
// BackgroundSubtaskHistoryTableID is the table ID of `tidb_background_subtask_history`.
BackgroundSubtaskHistoryTableID = meta.MaxInt48 - 6

// JobTableSQL is the CREATE TABLE SQL of `tidb_ddl_job`.
JobTableSQL = "create table " + JobTable + "(job_id bigint not null, reorg int, schema_ids text(65535), table_ids text(65535), job_meta longblob, type int, processing int, primary key(job_id))"
// ReorgTableSQL is the CREATE TABLE SQL of `tidb_ddl_reorg`.
ReorgTableSQL = "create table " + ReorgTable + "(job_id bigint not null, ele_id bigint, ele_type blob, start_key blob, end_key blob, physical_id bigint, reorg_meta longblob, unique key(job_id, ele_id, ele_type(20)))"
// HistoryTableSQL is the CREATE TABLE SQL of `tidb_ddl_history`.
HistoryTableSQL = "create table " + HistoryTable + "(job_id bigint not null, job_meta longblob, db_name char(64), table_name char(64), schema_ids text(65535), table_ids text(65535), create_time datetime, primary key(job_id))"
// BackfillTableSQL is the CREATE TABLE SQL of `tidb_ddl_backfill`.
BackfillTableSQL = "create table " + BackfillTable + `(
id bigint not null,
ddl_job_id bigint not null,
ele_id bigint not null,
ele_key blob,
ddl_physical_tid bigint,
// BackgroundSubtaskTableSQL is the CREATE TABLE SQL of `tidb_background_subtask`.
BackgroundSubtaskTableSQL = "create table " + BackgroundSubtaskTable + `(
id bigint not null auto_increment primary key,
namespace varchar(256),
task_key varchar(256),
ddl_physical_tid bigint(20),
type int,
exec_id blob default null,
exec_lease timestamp,
state int,
curr_key blob,
start_key blob,
end_key blob,
start_ts bigint,
finish_ts bigint,
row_count bigint,
backfill_meta longblob,
unique key(ddl_job_id, ele_id, ele_key(20), id))`
// BackfillHistoryTableSQL is the CREATE TABLE SQL of `tidb_ddl_backfill_history`.
BackfillHistoryTableSQL = "create table " + BackfillHistoryTable + `(
id bigint not null,
ddl_job_id bigint not null,
ele_id bigint not null,
ele_key blob,
ddl_physical_tid bigint,
type int,
exec_id blob default null,
exec_lease timestamp,
state int,
curr_key blob,
start_key blob,
end_key blob,
start_ts bigint,
finish_ts bigint,
row_count bigint,
backfill_meta longblob,
unique key(ddl_job_id, ele_id, ele_key(20), id))`
exec_id varchar(256),
exec_expired timestamp,
state varchar(64) not null,
checkpoint longblob not null,
start_time bigint,
state_update_time bigint,
meta longblob,
unique key(namespace, task_key))`
// BackgroundSubtaskHistoryTableSQL is the CREATE TABLE SQL of `tidb_background_subtask_history`.
BackgroundSubtaskHistoryTableSQL = "create table " + BackgroundSubtaskHistoryTable + `(
id bigint not null auto_increment primary key,
namespace varchar(256),
task_key varchar(256),
ddl_physical_tid bigint(20),
type int,
exec_id varchar(256),
exec_expired timestamp,
state varchar(64) not null,
checkpoint longblob not null,
start_time bigint,
state_update_time bigint,
meta longblob,
unique key(namespace, task_key))`
)
2 changes: 1 addition & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ func (dc *ddlCtx) setReorgCtxForBackfill(bfJob *BackfillJob) {
rc := dc.getReorgCtx(bfJob.JobID)
if rc == nil {
ele := &meta.Element{ID: bfJob.EleID, TypeKey: bfJob.EleKey}
dc.newReorgCtx(bfJob.JobID, bfJob.StartKey, ele, bfJob.RowCount)
dc.newReorgCtx(bfJob.JobID, bfJob.Meta.StartKey, ele, bfJob.Meta.RowCount)
} else {
rc.references.Add(1)
}
Expand Down
3 changes: 2 additions & 1 deletion ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,8 @@ func TestUsingReorgCtx(t *testing.T) {
wg := util.WaitGroupWrapper{}
wg.Run(func() {
jobID := int64(1)
bfJob := &ddl.BackfillJob{JobID: jobID, EleID: 1, EleKey: nil}
m := &model.BackfillMeta{StartKey: []byte("skey"), RowCount: 1}
bfJob := &ddl.BackfillJob{JobID: jobID, EleID: 1, EleKey: nil, Meta: m}
for i := 0; i < 100; i++ {
d.(ddl.DDLForTest).SetReorgCtxForBackfill(bfJob)
d.(ddl.DDLForTest).GetReorgCtx(jobID).IsReorgCanceled()
Expand Down
4 changes: 2 additions & 2 deletions ddl/dist_backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,8 @@ func (dc *ddlCtx) backfillJob2Task(t table.Table, bfJob *BackfillJob) (*reorgBac
physicalTable: pt,
// TODO: Remove these fields after remove the old logic.
sqlQuery: bfJob.Meta.Query,
startKey: bfJob.StartKey,
endKey: bfJob.EndKey,
startKey: bfJob.Meta.StartKey,
endKey: bfJob.Meta.EndKey,
endInclude: bfJob.Meta.EndInclude,
priority: bfJob.Meta.Priority}, nil
}
Expand Down
36 changes: 17 additions & 19 deletions ddl/dist_owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func checkReorgJobFinished(ctx context.Context, sess *session, reorgCtxs *reorgC
return errors.Trace(err)
}

bfJobs, err := getBackfillJobWithRetry(sess, BackfillTable, ddlJobID, currEle.ID, currEle.TypeKey)
bfJobs, err := getBackfillJobWithRetry(sess, BackgroundSubtaskTable, ddlJobID, currEle.ID, currEle.TypeKey)
if err != nil {
logutil.BgLogger().Info("[ddl] getBackfillJobWithRetry failed", zap.Int64("job ID", ddlJobID), zap.Stringer("ele", currEle), zap.Error(err))
return errors.Trace(err)
Expand Down Expand Up @@ -380,10 +380,10 @@ func checkJobIsFinished(sess *session, ddlJobID int64) (bool, error) {
return true, nil
}

logutil.BgLogger().Info("[ddl] checkJobIsSynced failed",
zap.Strings("unsyncedInstanceIDs", unsyncedInstanceIDs), zap.Int("tryTimes", i), zap.Error(err))
time.Sleep(RetrySQLInterval)
}
logutil.BgLogger().Info("[ddl] checkJobIsSynced failed",
zap.Strings("unsyncedInstanceIDs", unsyncedInstanceIDs), zap.Int("tryTimes", retrySQLTimes), zap.Error(err))

return false, errors.Trace(err)
}
Expand All @@ -393,8 +393,8 @@ func GetBackfillErr(sess *session, ddlJobID, currEleID int64, currEleKey []byte)
var err error
var metas []*model.BackfillMeta
for i := 0; i < retrySQLTimes; i++ {
metas, err = GetBackfillMetas(sess, BackfillHistoryTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = %s",
ddlJobID, currEleID, wrapKey2String(currEleKey)), "get_backfill_job_metas")
metas, err = GetBackfillMetas(sess, BackgroundSubtaskHistoryTable, fmt.Sprintf("task_key like \"%d_%s_%d_%%\"",
ddlJobID, hex.EncodeToString(currEleKey), currEleID), "get_backfill_job_metas")
if err == nil {
for _, m := range metas {
if m.Error != nil {
Expand Down Expand Up @@ -445,9 +445,9 @@ func checkBackfillJobCount(sess *session, ddlJobID, currEleID int64, currEleKey
return 0, errors.Trace(err)
}

backfillJobCnt, err = GetBackfillJobCount(sess, BackfillTable,
fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = %s and ddl_physical_tid = %d",
ddlJobID, currEleID, wrapKey2String(currEleKey), pTblID), "check_backfill_job_count")
backfillJobCnt, err = GetBackfillJobCount(sess, BackgroundSubtaskTable,
fmt.Sprintf("task_key like \"%d_%s_%d_%%\"",
ddlJobID, hex.EncodeToString(currEleKey), currEleID), "check_backfill_job_count")
if err != nil {
return 0, errors.Trace(err)
}
Expand All @@ -459,31 +459,29 @@ func getBackfillJobWithRetry(sess *session, tableName string, ddlJobID, currEleI
var err error
var bJobs []*BackfillJob
for i := 0; i < retrySQLTimes; i++ {
bJobs, err = GetBackfillJobs(sess, tableName, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = %s limit 1",
ddlJobID, currEleID, wrapKey2String(currEleKey)), "check_backfill_job_state")
bJobs, err = GetBackfillJobs(sess, tableName, fmt.Sprintf("task_key like \"%d_%s_%d_%%\" limit 1",
ddlJobID, hex.EncodeToString(currEleKey), currEleID), "check_backfill_job_state")
if err != nil {
logutil.BgLogger().Warn("[ddl] GetBackfillJobs failed", zap.Error(err))
time.Sleep(RetrySQLInterval)
continue
}

return bJobs, nil
}
return nil, errors.Trace(err)
}

// GetPhysicalTableMetas gets the max backfill metas per physical table in BackfillTable and BackfillHistoryTable.
// GetPhysicalTableMetas gets the max backfill metas per physical table in BackgroundSubtaskTable and BackgroundSubtaskHistoryTable.
func GetPhysicalTableMetas(sess *session, ddlJobID, currEleID int64, currEleKey []byte) (map[int64]*BackfillJobRangeMeta, error) {
condition := fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = %s", ddlJobID, currEleID, wrapKey2String(currEleKey))
pTblMs, err := GetBackfillIDAndMetas(sess, BackfillTable, condition, "get_ptbl_metas")
condition := fmt.Sprintf("task_key like \"%d_%s_%d_%%\"", ddlJobID, hex.EncodeToString(currEleKey), currEleID)
pTblMs, err := GetBackfillIDAndMetas(sess, BackgroundSubtaskTable, condition, "get_ptbl_metas")
if err != nil {
return nil, errors.Trace(err)
}
hPTblMs, err := GetBackfillIDAndMetas(sess, BackfillHistoryTable, condition, "get_ptbl_metas")
hPTblMs, err := GetBackfillIDAndMetas(sess, BackgroundSubtaskHistoryTable, condition, "get_ptbl_metas")
if err != nil {
return nil, errors.Trace(err)
}

metaMap := make(map[int64]*BackfillJobRangeMeta, len(pTblMs)+len(hPTblMs))
for _, m := range pTblMs {
metaMap[m.PhyTblID] = m
Expand All @@ -506,8 +504,8 @@ func MoveBackfillJobsToHistoryTable(sctx sessionctx.Context, bfJob *BackfillJob)

return s.runInTxn(func(se *session) error {
// TODO: Consider batch by batch update backfill jobs and insert backfill history jobs.
bJobs, err := GetBackfillJobs(se, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = %s",
bfJob.JobID, bfJob.EleID, wrapKey2String(bfJob.EleKey)), "update_backfill_job")
bJobs, err := GetBackfillJobs(se, BackgroundSubtaskTable, fmt.Sprintf("task_key like \"%d_%s_%d_%%\"",
bfJob.JobID, hex.EncodeToString(bfJob.EleKey), bfJob.EleID), "update_backfill_job")
if err != nil {
return errors.Trace(err)
}
Expand All @@ -524,7 +522,7 @@ func MoveBackfillJobsToHistoryTable(sctx sessionctx.Context, bfJob *BackfillJob)
if err == nil {
for _, bj := range bJobs {
bj.State = model.JobStateCancelled
bj.FinishTS = startTS
bj.StateUpdateTS = startTS
}
err = AddBackfillHistoryJob(se, bJobs)
}
Expand Down
8 changes: 4 additions & 4 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1374,8 +1374,8 @@ func (w *baseIndexWorker) UpdateTask(bfJob *BackfillJob) error {
s := newSession(w.backfillCtx.sessCtx)

return s.runInTxn(func(se *session) error {
jobs, err := GetBackfillJobs(se, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = %s and id = %d",
bfJob.JobID, bfJob.EleID, wrapKey2String(bfJob.EleKey), bfJob.ID), "update_backfill_task")
jobs, err := GetBackfillJobs(se, BackgroundSubtaskTable, fmt.Sprintf("task_key = '%d_%s_%d_%d'",
bfJob.JobID, hex.EncodeToString(bfJob.EleKey), bfJob.EleID, bfJob.ID), "update_backfill_task")
if err != nil {
return err
}
Expand All @@ -1391,7 +1391,7 @@ func (w *baseIndexWorker) UpdateTask(bfJob *BackfillJob) error {
return err
}
bfJob.InstanceLease = GetLeaseGoTime(currTime, InstanceLease)
return updateBackfillJob(se, BackfillTable, bfJob, "update_backfill_task")
return updateBackfillJob(se, BackgroundSubtaskTable, bfJob, "update_backfill_task")
})
}

Expand All @@ -1402,7 +1402,7 @@ func (w *baseIndexWorker) FinishTask(bfJob *BackfillJob) error {
if err != nil {
return errors.Trace(err)
}
bfJob.FinishTS = txn.StartTS()
bfJob.StateUpdateTS = txn.StartTS()
err = RemoveBackfillJob(se, false, bfJob)
if err != nil {
return err
Expand Down
Loading

0 comments on commit 8531018

Please sign in to comment.