diff --git a/ddl/backfilling.go b/ddl/backfilling.go index c6d19fcc0827e..9f12759d16d51 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -90,7 +90,7 @@ func (bT backfillerType) String() string { } } -// BackfillJob is for a tidb_ddl_backfill table's record. +// BackfillJob is for a tidb_background_subtask table's record. type BackfillJob struct { ID int64 JobID int64 @@ -101,15 +101,9 @@ type BackfillJob struct { State model.JobState InstanceID string InstanceLease types.Time - // range info - CurrKey []byte - StartKey []byte - EndKey []byte - - StartTS uint64 - FinishTS uint64 - RowCount int64 - Meta *model.BackfillMeta + StartTS uint64 + StateUpdateTS uint64 + Meta *model.BackfillMeta } // AbbrStr returns the BackfillJob's info without the Meta info. @@ -321,7 +315,7 @@ func (w *backfillWorker) updateLease(execID string, bfJob *BackfillJob, nextKey if err != nil { return err } - bfJob.CurrKey = nextKey + bfJob.Meta.CurrKey = nextKey bfJob.InstanceID = execID bfJob.InstanceLease = GetLeaseGoTime(leaseTime, InstanceLease) return w.backfiller.UpdateTask(bfJob) @@ -475,7 +469,7 @@ func (w *backfillWorker) runTask(task *reorgBackfillTask) (result *backfillResul // Change the batch size dynamically. w.GetCtx().batchCnt = int(variable.GetDDLReorgBatchSize()) result = w.handleBackfillTask(w.GetCtx().ddlCtx, task, w.backfiller) - task.bfJob.RowCount = int64(result.addedCount) + task.bfJob.Meta.RowCount = int64(result.addedCount) if result.err != nil { logutil.BgLogger().Warn("[ddl] backfill worker runTask failed", zap.Stringer("worker", w), zap.String("backfillJob", task.bfJob.AbbrStr()), zap.Error(result.err)) @@ -1142,6 +1136,8 @@ func addBatchBackfillJobs(sess *session, reorgInfo *reorgInfo, sJobCtx *splitJob Type: reorgInfo.Job.Type, Query: reorgInfo.Job.Query, }, + StartKey: task.startKey, + EndKey: task.endKey, } bj := &BackfillJob{ ID: sJobCtx.currBackfillJobID.Add(1), @@ -1152,11 +1148,9 @@ func addBatchBackfillJobs(sess *session, reorgInfo *reorgInfo, sJobCtx *splitJob Tp: sJobCtx.bfWorkerType, State: model.JobStateNone, InstanceID: instanceID, - CurrKey: task.startKey, - StartKey: task.startKey, - EndKey: task.endKey, Meta: bm, } + bj.Meta.CurrKey = task.startKey bJobs = append(bJobs, bj) } if err := AddBackfillJobs(sess, bJobs); err != nil { diff --git a/ddl/constant.go b/ddl/constant.go index 8e276e7205e56..99d6498c0cea1 100644 --- a/ddl/constant.go +++ b/ddl/constant.go @@ -25,10 +25,10 @@ const ( ReorgTable = "tidb_ddl_reorg" // HistoryTable stores the history DDL jobs. HistoryTable = "tidb_ddl_history" - // BackfillTable stores the information of backfill jobs. - BackfillTable = "tidb_ddl_backfill" - // BackfillHistoryTable stores the information of history backfill jobs. - BackfillHistoryTable = "tidb_ddl_backfill_history" + // BackgroundSubtaskTable stores the information of backfill jobs. + BackgroundSubtaskTable = "tidb_background_subtask" + // BackgroundSubtaskHistoryTable stores the information of history backfill jobs. + BackgroundSubtaskHistoryTable = "tidb_background_subtask_history" // JobTableID is the table ID of `tidb_ddl_job`. JobTableID = meta.MaxInt48 - 1 @@ -38,10 +38,10 @@ const ( HistoryTableID = meta.MaxInt48 - 3 // MDLTableID is the table ID of `tidb_mdl_info`. MDLTableID = meta.MaxInt48 - 4 - // BackfillTableID is the table ID of `tidb_ddl_backfill`. - BackfillTableID = meta.MaxInt48 - 5 - // BackfillHistoryTableID is the table ID of `tidb_ddl_backfill_history`. - BackfillHistoryTableID = meta.MaxInt48 - 6 + // BackgroundSubtaskTableID is the table ID of `tidb_background_subtask`. + BackgroundSubtaskTableID = meta.MaxInt48 - 5 + // BackgroundSubtaskHistoryTableID is the table ID of `tidb_background_subtask_history`. + BackgroundSubtaskHistoryTableID = meta.MaxInt48 - 6 // JobTableSQL is the CREATE TABLE SQL of `tidb_ddl_job`. JobTableSQL = "create table " + JobTable + "(job_id bigint not null, reorg int, schema_ids text(65535), table_ids text(65535), job_meta longblob, type int, processing int, primary key(job_id))" @@ -49,42 +49,34 @@ const ( ReorgTableSQL = "create table " + ReorgTable + "(job_id bigint not null, ele_id bigint, ele_type blob, start_key blob, end_key blob, physical_id bigint, reorg_meta longblob, unique key(job_id, ele_id, ele_type(20)))" // HistoryTableSQL is the CREATE TABLE SQL of `tidb_ddl_history`. HistoryTableSQL = "create table " + HistoryTable + "(job_id bigint not null, job_meta longblob, db_name char(64), table_name char(64), schema_ids text(65535), table_ids text(65535), create_time datetime, primary key(job_id))" - // BackfillTableSQL is the CREATE TABLE SQL of `tidb_ddl_backfill`. - BackfillTableSQL = "create table " + BackfillTable + `( - id bigint not null, - ddl_job_id bigint not null, - ele_id bigint not null, - ele_key blob, - ddl_physical_tid bigint, + // BackgroundSubtaskTableSQL is the CREATE TABLE SQL of `tidb_background_subtask`. + BackgroundSubtaskTableSQL = "create table " + BackgroundSubtaskTable + `( + id bigint not null auto_increment primary key, + namespace varchar(256), + task_key varchar(256), + ddl_physical_tid bigint(20), type int, - exec_id blob default null, - exec_lease timestamp, - state int, - curr_key blob, - start_key blob, - end_key blob, - start_ts bigint, - finish_ts bigint, - row_count bigint, - backfill_meta longblob, - unique key(ddl_job_id, ele_id, ele_key(20), id))` - // BackfillHistoryTableSQL is the CREATE TABLE SQL of `tidb_ddl_backfill_history`. - BackfillHistoryTableSQL = "create table " + BackfillHistoryTable + `( - id bigint not null, - ddl_job_id bigint not null, - ele_id bigint not null, - ele_key blob, - ddl_physical_tid bigint, - type int, - exec_id blob default null, - exec_lease timestamp, - state int, - curr_key blob, - start_key blob, - end_key blob, - start_ts bigint, - finish_ts bigint, - row_count bigint, - backfill_meta longblob, - unique key(ddl_job_id, ele_id, ele_key(20), id))` + exec_id varchar(256), + exec_expired timestamp, + state varchar(64) not null, + checkpoint longblob not null, + start_time bigint, + state_update_time bigint, + meta longblob, + unique key(namespace, task_key))` + // BackgroundSubtaskHistoryTableSQL is the CREATE TABLE SQL of `tidb_background_subtask_history`. + BackgroundSubtaskHistoryTableSQL = "create table " + BackgroundSubtaskHistoryTable + `( + id bigint not null auto_increment primary key, + namespace varchar(256), + task_key varchar(256), + ddl_physical_tid bigint(20), + type int, + exec_id varchar(256), + exec_expired timestamp, + state varchar(64) not null, + checkpoint longblob not null, + start_time bigint, + state_update_time bigint, + meta longblob, + unique key(namespace, task_key))` ) diff --git a/ddl/ddl.go b/ddl/ddl.go index 11158ba790297..9f101d1562d57 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -545,7 +545,7 @@ func (dc *ddlCtx) setReorgCtxForBackfill(bfJob *BackfillJob) { rc := dc.getReorgCtx(bfJob.JobID) if rc == nil { ele := &meta.Element{ID: bfJob.EleID, TypeKey: bfJob.EleKey} - dc.newReorgCtx(bfJob.JobID, bfJob.StartKey, ele, bfJob.RowCount) + dc.newReorgCtx(bfJob.JobID, bfJob.Meta.StartKey, ele, bfJob.Meta.RowCount) } else { rc.references.Add(1) } diff --git a/ddl/ddl_worker_test.go b/ddl/ddl_worker_test.go index 9a6aab44080a2..b946490e18e25 100644 --- a/ddl/ddl_worker_test.go +++ b/ddl/ddl_worker_test.go @@ -317,7 +317,8 @@ func TestUsingReorgCtx(t *testing.T) { wg := util.WaitGroupWrapper{} wg.Run(func() { jobID := int64(1) - bfJob := &ddl.BackfillJob{JobID: jobID, EleID: 1, EleKey: nil} + m := &model.BackfillMeta{StartKey: []byte("skey"), RowCount: 1} + bfJob := &ddl.BackfillJob{JobID: jobID, EleID: 1, EleKey: nil, Meta: m} for i := 0; i < 100; i++ { d.(ddl.DDLForTest).SetReorgCtxForBackfill(bfJob) d.(ddl.DDLForTest).GetReorgCtx(jobID).IsReorgCanceled() diff --git a/ddl/dist_backfilling.go b/ddl/dist_backfilling.go index 59f9b17506543..9b2524beb86c5 100644 --- a/ddl/dist_backfilling.go +++ b/ddl/dist_backfilling.go @@ -229,8 +229,8 @@ func (dc *ddlCtx) backfillJob2Task(t table.Table, bfJob *BackfillJob) (*reorgBac physicalTable: pt, // TODO: Remove these fields after remove the old logic. sqlQuery: bfJob.Meta.Query, - startKey: bfJob.StartKey, - endKey: bfJob.EndKey, + startKey: bfJob.Meta.StartKey, + endKey: bfJob.Meta.EndKey, endInclude: bfJob.Meta.EndInclude, priority: bfJob.Meta.Priority}, nil } diff --git a/ddl/dist_owner.go b/ddl/dist_owner.go index 588ef036ef276..43a6dff87bc97 100644 --- a/ddl/dist_owner.go +++ b/ddl/dist_owner.go @@ -343,7 +343,7 @@ func checkReorgJobFinished(ctx context.Context, sess *session, reorgCtxs *reorgC return errors.Trace(err) } - bfJobs, err := getBackfillJobWithRetry(sess, BackfillTable, ddlJobID, currEle.ID, currEle.TypeKey) + bfJobs, err := getBackfillJobWithRetry(sess, BackgroundSubtaskTable, ddlJobID, currEle.ID, currEle.TypeKey) if err != nil { logutil.BgLogger().Info("[ddl] getBackfillJobWithRetry failed", zap.Int64("job ID", ddlJobID), zap.Stringer("ele", currEle), zap.Error(err)) return errors.Trace(err) @@ -380,10 +380,10 @@ func checkJobIsFinished(sess *session, ddlJobID int64) (bool, error) { return true, nil } - logutil.BgLogger().Info("[ddl] checkJobIsSynced failed", - zap.Strings("unsyncedInstanceIDs", unsyncedInstanceIDs), zap.Int("tryTimes", i), zap.Error(err)) time.Sleep(RetrySQLInterval) } + logutil.BgLogger().Info("[ddl] checkJobIsSynced failed", + zap.Strings("unsyncedInstanceIDs", unsyncedInstanceIDs), zap.Int("tryTimes", retrySQLTimes), zap.Error(err)) return false, errors.Trace(err) } @@ -393,8 +393,8 @@ func GetBackfillErr(sess *session, ddlJobID, currEleID int64, currEleKey []byte) var err error var metas []*model.BackfillMeta for i := 0; i < retrySQLTimes; i++ { - metas, err = GetBackfillMetas(sess, BackfillHistoryTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = %s", - ddlJobID, currEleID, wrapKey2String(currEleKey)), "get_backfill_job_metas") + metas, err = GetBackfillMetas(sess, BackgroundSubtaskHistoryTable, fmt.Sprintf("task_key like \"%d_%s_%d_%%\"", + ddlJobID, hex.EncodeToString(currEleKey), currEleID), "get_backfill_job_metas") if err == nil { for _, m := range metas { if m.Error != nil { @@ -445,9 +445,9 @@ func checkBackfillJobCount(sess *session, ddlJobID, currEleID int64, currEleKey return 0, errors.Trace(err) } - backfillJobCnt, err = GetBackfillJobCount(sess, BackfillTable, - fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = %s and ddl_physical_tid = %d", - ddlJobID, currEleID, wrapKey2String(currEleKey), pTblID), "check_backfill_job_count") + backfillJobCnt, err = GetBackfillJobCount(sess, BackgroundSubtaskTable, + fmt.Sprintf("task_key like \"%d_%s_%d_%%\"", + ddlJobID, hex.EncodeToString(currEleKey), currEleID), "check_backfill_job_count") if err != nil { return 0, errors.Trace(err) } @@ -459,31 +459,29 @@ func getBackfillJobWithRetry(sess *session, tableName string, ddlJobID, currEleI var err error var bJobs []*BackfillJob for i := 0; i < retrySQLTimes; i++ { - bJobs, err = GetBackfillJobs(sess, tableName, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = %s limit 1", - ddlJobID, currEleID, wrapKey2String(currEleKey)), "check_backfill_job_state") + bJobs, err = GetBackfillJobs(sess, tableName, fmt.Sprintf("task_key like \"%d_%s_%d_%%\" limit 1", + ddlJobID, hex.EncodeToString(currEleKey), currEleID), "check_backfill_job_state") if err != nil { logutil.BgLogger().Warn("[ddl] GetBackfillJobs failed", zap.Error(err)) time.Sleep(RetrySQLInterval) continue } - return bJobs, nil } return nil, errors.Trace(err) } -// GetPhysicalTableMetas gets the max backfill metas per physical table in BackfillTable and BackfillHistoryTable. +// GetPhysicalTableMetas gets the max backfill metas per physical table in BackgroundSubtaskTable and BackgroundSubtaskHistoryTable. func GetPhysicalTableMetas(sess *session, ddlJobID, currEleID int64, currEleKey []byte) (map[int64]*BackfillJobRangeMeta, error) { - condition := fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = %s", ddlJobID, currEleID, wrapKey2String(currEleKey)) - pTblMs, err := GetBackfillIDAndMetas(sess, BackfillTable, condition, "get_ptbl_metas") + condition := fmt.Sprintf("task_key like \"%d_%s_%d_%%\"", ddlJobID, hex.EncodeToString(currEleKey), currEleID) + pTblMs, err := GetBackfillIDAndMetas(sess, BackgroundSubtaskTable, condition, "get_ptbl_metas") if err != nil { return nil, errors.Trace(err) } - hPTblMs, err := GetBackfillIDAndMetas(sess, BackfillHistoryTable, condition, "get_ptbl_metas") + hPTblMs, err := GetBackfillIDAndMetas(sess, BackgroundSubtaskHistoryTable, condition, "get_ptbl_metas") if err != nil { return nil, errors.Trace(err) } - metaMap := make(map[int64]*BackfillJobRangeMeta, len(pTblMs)+len(hPTblMs)) for _, m := range pTblMs { metaMap[m.PhyTblID] = m @@ -506,8 +504,8 @@ func MoveBackfillJobsToHistoryTable(sctx sessionctx.Context, bfJob *BackfillJob) return s.runInTxn(func(se *session) error { // TODO: Consider batch by batch update backfill jobs and insert backfill history jobs. - bJobs, err := GetBackfillJobs(se, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = %s", - bfJob.JobID, bfJob.EleID, wrapKey2String(bfJob.EleKey)), "update_backfill_job") + bJobs, err := GetBackfillJobs(se, BackgroundSubtaskTable, fmt.Sprintf("task_key like \"%d_%s_%d_%%\"", + bfJob.JobID, hex.EncodeToString(bfJob.EleKey), bfJob.EleID), "update_backfill_job") if err != nil { return errors.Trace(err) } @@ -524,7 +522,7 @@ func MoveBackfillJobsToHistoryTable(sctx sessionctx.Context, bfJob *BackfillJob) if err == nil { for _, bj := range bJobs { bj.State = model.JobStateCancelled - bj.FinishTS = startTS + bj.StateUpdateTS = startTS } err = AddBackfillHistoryJob(se, bJobs) } diff --git a/ddl/index.go b/ddl/index.go index 07a3960eb07fc..d64316b2d3fa3 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1374,8 +1374,8 @@ func (w *baseIndexWorker) UpdateTask(bfJob *BackfillJob) error { s := newSession(w.backfillCtx.sessCtx) return s.runInTxn(func(se *session) error { - jobs, err := GetBackfillJobs(se, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = %s and id = %d", - bfJob.JobID, bfJob.EleID, wrapKey2String(bfJob.EleKey), bfJob.ID), "update_backfill_task") + jobs, err := GetBackfillJobs(se, BackgroundSubtaskTable, fmt.Sprintf("task_key = '%d_%s_%d_%d'", + bfJob.JobID, hex.EncodeToString(bfJob.EleKey), bfJob.EleID, bfJob.ID), "update_backfill_task") if err != nil { return err } @@ -1391,7 +1391,7 @@ func (w *baseIndexWorker) UpdateTask(bfJob *BackfillJob) error { return err } bfJob.InstanceLease = GetLeaseGoTime(currTime, InstanceLease) - return updateBackfillJob(se, BackfillTable, bfJob, "update_backfill_task") + return updateBackfillJob(se, BackgroundSubtaskTable, bfJob, "update_backfill_task") }) } @@ -1402,7 +1402,7 @@ func (w *baseIndexWorker) FinishTask(bfJob *BackfillJob) error { if err != nil { return errors.Trace(err) } - bfJob.FinishTS = txn.StartTS() + bfJob.StateUpdateTS = txn.StartTS() err = RemoveBackfillJob(se, false, bfJob) if err != nil { return err diff --git a/ddl/job_table.go b/ddl/job_table.go index 95bfd4c8a268f..710fc7ad9d0ce 100644 --- a/ddl/job_table.go +++ b/ddl/job_table.go @@ -17,6 +17,7 @@ package ddl import ( "bytes" "context" + "encoding/hex" "fmt" "math" "strconv" @@ -633,8 +634,8 @@ func getJobsBySQL(sess *session, tbl, condition string) ([]*model.Job, error) { } func syncBackfillHistoryJobs(sess *session, uuid string, backfillJob *BackfillJob) error { - sql := fmt.Sprintf("update mysql.%s set state = %d where ddl_job_id = %d and ele_id = %d and ele_key = %s and exec_id = '%s' limit 1;", - BackfillHistoryTable, model.JobStateSynced, backfillJob.JobID, backfillJob.EleID, wrapKey2String(backfillJob.EleKey), uuid) + sql := fmt.Sprintf("update mysql.%s set state = '%s' where task_key like \"%d_%s_%d_%%\" and exec_id = '%s' limit 1;", + BackgroundSubtaskHistoryTable, model.JobStateSynced.String(), backfillJob.JobID, hex.EncodeToString(backfillJob.EleKey), backfillJob.EleID, uuid) _, err := sess.execute(context.Background(), sql, "sync_backfill_history_job") return err } @@ -643,7 +644,7 @@ func generateInsertBackfillJobSQL(tableName string, backfillJobs []*BackfillJob) sqlBuilder := strings.Builder{} sqlBuilder.WriteString("insert into mysql.") sqlBuilder.WriteString(tableName) - sqlBuilder.WriteString("(id, ddl_job_id, ele_id, ele_key, ddl_physical_tid, type, exec_id, exec_lease, state, curr_key, start_key, end_key, start_ts, finish_ts, row_count, backfill_meta) values") + sqlBuilder.WriteString("(task_key, ddl_physical_tid, type, exec_id, exec_expired, state, checkpoint, start_time, state_update_time, meta) values") for i, bj := range backfillJobs { mateByte, err := bj.Meta.Encode() if err != nil { @@ -653,17 +654,17 @@ func generateInsertBackfillJobSQL(tableName string, backfillJobs []*BackfillJob) if i != 0 { sqlBuilder.WriteString(", ") } - sqlBuilder.WriteString(fmt.Sprintf("(%d, %d, %d, %s, %d, %d, '%s', '%s', %d, %s, %s, %s, %d, %d, %d, %s)", bj.ID, bj.JobID, bj.EleID, - wrapKey2String(bj.EleKey), bj.PhysicalTableID, bj.Tp, bj.InstanceID, bj.InstanceLease, bj.State, wrapKey2String(bj.CurrKey), - wrapKey2String(bj.StartKey), wrapKey2String(bj.EndKey), bj.StartTS, bj.FinishTS, bj.RowCount, wrapKey2String(mateByte))) + sqlBuilder.WriteString(fmt.Sprintf("('%d_%s_%d_%d', %d, %d, '%s', '%s', '%s', %s, %d, %d, %s)", + bj.JobID, hex.EncodeToString(bj.EleKey), bj.EleID, bj.ID, bj.PhysicalTableID, bj.Tp, bj.InstanceID, bj.InstanceLease, bj.State.String(), wrapKey2String(bj.Meta.CurrKey), + bj.StartTS, bj.StateUpdateTS, wrapKey2String(mateByte))) } return sqlBuilder.String(), nil } -// AddBackfillHistoryJob adds the backfill jobs to the tidb_ddl_backfill_history table. +// AddBackfillHistoryJob adds the backfill jobs to the tidb_background_subtask_history table. func AddBackfillHistoryJob(sess *session, backfillJobs []*BackfillJob) error { - label := fmt.Sprintf("add_%s_job", BackfillHistoryTable) - sql, err := generateInsertBackfillJobSQL(BackfillHistoryTable, backfillJobs) + label := fmt.Sprintf("add_%s_job", BackgroundSubtaskHistoryTable) + sql, err := generateInsertBackfillJobSQL(BackgroundSubtaskHistoryTable, backfillJobs) if err != nil { return err } @@ -671,9 +672,9 @@ func AddBackfillHistoryJob(sess *session, backfillJobs []*BackfillJob) error { return errors.Trace(err) } -// AddBackfillJobs adds the backfill jobs to the tidb_ddl_backfill table. +// AddBackfillJobs adds the backfill jobs to the tidb_background_subtask table. func AddBackfillJobs(s *session, backfillJobs []*BackfillJob) error { - label := fmt.Sprintf("add_%s_job", BackfillTable) + label := fmt.Sprintf("add_%s_job", BackgroundSubtaskTable) // Do runInTxn to get StartTS. return s.runInTxn(func(se *session) error { txn, err := se.txn() @@ -685,7 +686,7 @@ func AddBackfillJobs(s *session, backfillJobs []*BackfillJob) error { bj.StartTS = startTS } - sql, err := generateInsertBackfillJobSQL(BackfillTable, backfillJobs) + sql, err := generateInsertBackfillJobSQL(BackgroundSubtaskTable, backfillJobs) if err != nil { return err } @@ -697,16 +698,8 @@ func AddBackfillJobs(s *session, backfillJobs []*BackfillJob) error { // GetBackfillJobForOneEle gets the backfill jobs in the tblName table that contains only one element. func GetBackfillJobForOneEle(s *session, excludedJobIDs []int64, lease time.Duration) (*BackfillJob, error) { eJobIDsBuilder := strings.Builder{} - for i, id := range excludedJobIDs { - if i == 0 { - eJobIDsBuilder.WriteString(" and ddl_job_id not in (") - } - eJobIDsBuilder.WriteString(strconv.Itoa(int(id))) - if i == len(excludedJobIDs)-1 { - eJobIDsBuilder.WriteString(")") - } else { - eJobIDsBuilder.WriteString(", ") - } + for _, id := range excludedJobIDs { + eJobIDsBuilder.WriteString(fmt.Sprintf(" and task_key not like \"%d_%%\"", id)) } var err error @@ -717,9 +710,8 @@ func GetBackfillJobForOneEle(s *session, excludedJobIDs []int64, lease time.Dura return err } leaseStr := currTime.Add(-lease).Format(types.TimeFormat) - - bJobs, err = GetBackfillJobs(se, BackfillTable, - fmt.Sprintf("(exec_ID = '' or exec_lease < '%v') %s order by ddl_job_id, ele_key, ele_id limit 1", + bJobs, err = GetBackfillJobs(se, BackgroundSubtaskTable, + fmt.Sprintf("(exec_id = '' or exec_expired < '%v') %s order by task_key limit 1", leaseStr, eJobIDsBuilder.String()), "get_backfill_job") return err }) @@ -742,13 +734,13 @@ func GetAndMarkBackfillJobsForOneEle(s *session, batch int, jobID int64, uuid st } leaseStr := currTime.Add(-lease).Format(types.TimeFormat) - getJobsSQL := fmt.Sprintf("(exec_ID = '' or exec_lease < '%v') and ddl_job_id = %d order by ddl_job_id, ele_id, ele_key limit %d", + getJobsSQL := fmt.Sprintf("(exec_id = '' or exec_expired < '%v') and task_key like \"%d_%%\" order by task_key limit %d", leaseStr, jobID, batch) if pTblID != getJobWithoutPartition { if pTblID == 0 { rows, err := s.execute(context.Background(), - fmt.Sprintf("select ddl_physical_tid from mysql.%s group by ddl_job_id, ele_id, ele_key, ddl_physical_tid having max(length(exec_id)) = 0 or max(exec_lease) < '%s' order by ddl_job_id, ele_key, ele_id, ddl_physical_tid limit 1", - BackfillTable, leaseStr), "get_mark_backfill_job") + fmt.Sprintf("select ddl_physical_tid from mysql.%s group by substring_index(task_key,\"_\",3), ddl_physical_tid having max(length(exec_id)) = 0 or max(exec_expired) < '%s' order by substring_index(task_key,\"_\",3), ddl_physical_tid limit 1", + BackgroundSubtaskTable, leaseStr), "get_mark_backfill_job") if err != nil { return errors.Trace(err) } @@ -759,11 +751,11 @@ func GetAndMarkBackfillJobsForOneEle(s *session, batch int, jobID int64, uuid st pTblID = rows[0].GetInt64(0) } - getJobsSQL = fmt.Sprintf("(exec_ID = '' or exec_lease < '%s') and ddl_job_id = %d and ddl_physical_tid = %d order by ddl_job_id, ele_key, ele_id limit %d", + getJobsSQL = fmt.Sprintf("(exec_id = '' or exec_expired < '%s') and task_key like \"%d_%%\" and ddl_physical_tid = %d order by task_key limit %d", leaseStr, jobID, pTblID, batch) } - bJobs, err = GetBackfillJobs(se, BackfillTable, getJobsSQL, "get_mark_backfill_job") + bJobs, err = GetBackfillJobs(se, BackgroundSubtaskTable, getJobsSQL, "get_mark_backfill_job") if err != nil { return err } @@ -782,7 +774,7 @@ func GetAndMarkBackfillJobsForOneEle(s *session, batch int, jobID int64, uuid st bJobs[i].InstanceID = uuid bJobs[i].InstanceLease = GetLeaseGoTime(currTime, lease) // TODO: batch update - if err = updateBackfillJob(se, BackfillTable, bJobs[i], "get_mark_backfill_job"); err != nil { + if err = updateBackfillJob(se, BackgroundSubtaskTable, bJobs[i], "get_mark_backfill_job"); err != nil { return err } } @@ -797,8 +789,8 @@ func GetAndMarkBackfillJobsForOneEle(s *session, batch int, jobID int64, uuid st // GetInterruptedBackfillJobForOneEle gets an interrupted backfill job that contains only one element. func GetInterruptedBackfillJobForOneEle(sess *session, jobID, eleID int64, eleKey []byte) ([]*BackfillJob, error) { - bJobs, err := GetBackfillJobs(sess, BackfillHistoryTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = %s and state = %d limit 1", - jobID, eleID, wrapKey2String(eleKey), model.JobStateCancelled), "get_interrupt_backfill_job") + bJobs, err := GetBackfillJobs(sess, BackgroundSubtaskHistoryTable, fmt.Sprintf("task_key like \"%d_%s_%d_%%\" and state = \"%s\" limit 1", + jobID, hex.EncodeToString(eleKey), eleID, model.JobStateCancelled.String()), "get_interrupt_backfill_job") if err != nil || len(bJobs) == 0 { return nil, err } @@ -820,7 +812,7 @@ func GetBackfillJobCount(sess *session, tblName, condition string, label string) // GetBackfillMetas gets the backfill metas in the tblName table according to condition. func GetBackfillMetas(sess *session, tblName, condition string, label string) ([]*model.BackfillMeta, error) { - rows, err := sess.execute(context.Background(), fmt.Sprintf("select backfill_meta from mysql.%s where %s", tblName, condition), label) + rows, err := sess.execute(context.Background(), fmt.Sprintf("select meta from mysql.%s where %s", tblName, condition), label) if err != nil { return nil, errors.Trace(err) } @@ -843,9 +835,9 @@ func GetBackfillMetas(sess *session, tblName, condition string, label string) ([ // GetBackfillIDAndMetas gets the backfill IDs and metas in the tblName table according to condition. func GetBackfillIDAndMetas(sess *session, tblName, condition string, label string) ([]*BackfillJobRangeMeta, error) { - sql := "select tbl.id, tbl.curr_key, tbl.end_key, tbl.ddl_physical_tid from (select max(id) max_id, ddl_physical_tid " + + sql := "select tbl.task_key, tbl.meta, tbl.ddl_physical_tid from (select max(task_key) max_id, ddl_physical_tid " + fmt.Sprintf(" from mysql.%s tbl where %s group by ddl_physical_tid) tmp join mysql.%s tbl", - tblName, condition, tblName) + " on tbl.id=tmp.max_id and tbl.ddl_physical_tid=tmp.ddl_physical_tid;" + tblName, condition, tblName) + " on tbl.task_key=tmp.max_id and tbl.ddl_physical_tid=tmp.ddl_physical_tid;" rows, err := sess.execute(context.Background(), sql, label) if err != nil { return nil, errors.Trace(err) @@ -856,21 +848,31 @@ func GetBackfillIDAndMetas(sess *session, tblName, condition string, label strin pTblMetas := make([]*BackfillJobRangeMeta, 0, len(rows)) for _, r := range rows { + key := r.GetString(0) + keySlice := strings.Split(key, "_") + id, err := strconv.ParseInt(keySlice[3], 10, 64) + if err != nil { + return nil, err + } + meta := &model.BackfillMeta{} + err = meta.Decode(r.GetBytes(1)) + if err != nil { + return nil, err + } pTblMeta := BackfillJobRangeMeta{ - ID: r.GetInt64(0), - StartKey: r.GetBytes(1), - EndKey: r.GetBytes(2), - PhyTblID: r.GetInt64(3), + ID: id, + StartKey: meta.StartKey, + EndKey: meta.EndKey, + PhyTblID: r.GetInt64(2), } pTblMetas = append(pTblMetas, &pTblMeta) } - return pTblMetas, nil } func getUnsyncedInstanceIDs(sess *session, jobID int64, label string) ([]string, error) { - sql := fmt.Sprintf("select sum((state=%d) + (state=%d)) as tmp, exec_id from mysql.tidb_ddl_backfill_history where ddl_job_id = %d group by exec_id having tmp = 0;", - model.JobStateSynced, model.JobStateCancelled, jobID) + sql := fmt.Sprintf("select sum((state='%s') + (state='%s')) as tmp, exec_id from mysql.tidb_background_subtask_history where task_key like \"%d_%%\" group by exec_id having tmp = 0;", + model.JobStateSynced.String(), model.JobStateCancelled.String(), jobID) rows, err := sess.execute(context.Background(), sql, label) if err != nil { return nil, errors.Trace(err) @@ -891,40 +893,56 @@ func GetBackfillJobs(sess *session, tblName, condition string, label string) ([] } bJobs := make([]*BackfillJob, 0, len(rows)) for _, row := range rows { + key := row.GetString(2) + keySlice := strings.Split(key, "_") + jobID, err := strconv.ParseInt(keySlice[0], 10, 64) + if err != nil { + return nil, err + } + eleKey, err := hex.DecodeString(keySlice[1]) + if err != nil { + return nil, err + } + eleID, err := strconv.ParseInt(keySlice[2], 10, 64) + if err != nil { + return nil, err + } + id, err := strconv.ParseInt(keySlice[3], 10, 64) + if err != nil { + return nil, err + } bfJob := BackfillJob{ - ID: row.GetInt64(0), - JobID: row.GetInt64(1), - EleID: row.GetInt64(2), - EleKey: row.GetBytes(3), - PhysicalTableID: row.GetInt64(4), - Tp: backfillerType(row.GetInt64(5)), - InstanceID: row.GetString(6), - InstanceLease: row.GetTime(7), - State: model.JobState(row.GetInt64(8)), - CurrKey: row.GetBytes(9), - StartKey: row.GetBytes(10), - EndKey: row.GetBytes(11), - StartTS: row.GetUint64(12), - FinishTS: row.GetUint64(13), - RowCount: row.GetInt64(14), + ID: id, + JobID: jobID, + EleID: eleID, + EleKey: eleKey, + PhysicalTableID: row.GetInt64(3), + Tp: backfillerType(row.GetInt64(4)), + InstanceID: row.GetString(5), + InstanceLease: row.GetTime(6), + State: model.StrToJobState(row.GetString(7)), + StartTS: row.GetUint64(9), + StateUpdateTS: row.GetUint64(10), } bfJob.Meta = &model.BackfillMeta{} - err = bfJob.Meta.Decode(row.GetBytes(15)) + err = bfJob.Meta.Decode(row.GetBytes(11)) if err != nil { return nil, errors.Trace(err) } + bfJob.Meta.CurrKey = row.GetBytes(8) bJobs = append(bJobs, &bfJob) } return bJobs, nil } -// RemoveBackfillJob removes the backfill jobs from the tidb_ddl_backfill table. +// RemoveBackfillJob removes the backfill jobs from the tidb_background_subtask table. // If isOneEle is true, removes all jobs with backfillJob's ddl_job_id, ele_id and ele_key. Otherwise, removes the backfillJob. func RemoveBackfillJob(sess *session, isOneEle bool, backfillJob *BackfillJob) error { - sql := fmt.Sprintf("delete from mysql.tidb_ddl_backfill where ddl_job_id = %d and ele_id = %d and ele_key = %s", - backfillJob.JobID, backfillJob.EleID, wrapKey2String(backfillJob.EleKey)) + sql := "delete from mysql.tidb_background_subtask" if !isOneEle { - sql += fmt.Sprintf(" and id = %d", backfillJob.ID) + sql += fmt.Sprintf(" where task_key like \"%d_%s_%d_%d\"", backfillJob.JobID, hex.EncodeToString(backfillJob.EleKey), backfillJob.EleID, backfillJob.ID) + } else { + sql += fmt.Sprintf(" where task_key like \"%d_%s_%d_%%\"", backfillJob.JobID, hex.EncodeToString(backfillJob.EleKey), backfillJob.EleID) } _, err := sess.execute(context.Background(), sql, "remove_backfill_job") return err @@ -935,9 +953,9 @@ func updateBackfillJob(sess *session, tableName string, backfillJob *BackfillJob if err != nil { return err } - sql := fmt.Sprintf("update mysql.%s set exec_id = '%s', exec_lease = '%s', state = %d, curr_key = %s, row_count = %d, backfill_meta = %s where ddl_job_id = %d and ele_id = %d and ele_key = %s and id = %d", - tableName, backfillJob.InstanceID, backfillJob.InstanceLease, backfillJob.State, wrapKey2String(backfillJob.CurrKey), backfillJob.RowCount, - wrapKey2String(mate), backfillJob.JobID, backfillJob.EleID, wrapKey2String(backfillJob.EleKey), backfillJob.ID) + sql := fmt.Sprintf("update mysql.%s set exec_id = '%s', exec_expired = '%s', state = '%s', checkpoint = %s, meta = %s where task_key = '%d_%s_%d_%d'", + tableName, backfillJob.InstanceID, backfillJob.InstanceLease, backfillJob.State.String(), wrapKey2String(backfillJob.Meta.CurrKey), + wrapKey2String(mate), backfillJob.JobID, hex.EncodeToString(backfillJob.EleKey), backfillJob.EleID, backfillJob.ID) _, err = sess.execute(context.Background(), sql, label) return err } diff --git a/ddl/job_table_test.go b/ddl/job_table_test.go index ae04c0eb6dd85..2f7a180f7b9ca 100644 --- a/ddl/job_table_test.go +++ b/ddl/job_table_test.go @@ -16,6 +16,7 @@ package ddl_test import ( "context" + "encoding/hex" "fmt" "strconv" "strings" @@ -198,6 +199,8 @@ func makeAddIdxBackfillJobs(schemaID, tblID, jobID, eleID int64, cnt int, query TableID: tblID, Query: query, }, + StartKey: sKey, + EndKey: eKey, } bj := &ddl.BackfillJob{ ID: int64(i), @@ -207,11 +210,9 @@ func makeAddIdxBackfillJobs(schemaID, tblID, jobID, eleID int64, cnt int, query State: model.JobStateNone, PhysicalTableID: 1, InstanceLease: types.ZeroTimestamp, - CurrKey: sKey, - StartKey: sKey, - EndKey: eKey, Meta: bm, } + bj.Meta.CurrKey = sKey bJobs = append(bJobs, bj) } return bJobs @@ -230,8 +231,8 @@ func equalBackfillJob(t *testing.T, a, b *ddl.BackfillJob, lessTime types.Time) } func getIdxConditionStr(jobID, eleID int64) string { - return fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = %s", - jobID, eleID, wrapKey2String(meta.IndexElementKey)) + return fmt.Sprintf("task_key like \"%d_%s_%d_%%\"", + jobID, hex.EncodeToString(meta.IndexElementKey), eleID) } func readInTxn(se sessionctx.Context, f func(sessionctx.Context)) (err error) { @@ -248,8 +249,8 @@ func backfillJob2PTblMetaMap(bJob *ddl.BackfillJob) map[int64]*ddl.BackfillJobRa m := &ddl.BackfillJobRangeMeta{ ID: bJob.ID, PhyTblID: bJob.PhysicalTableID, - StartKey: bJob.StartKey, - EndKey: bJob.EndKey, + StartKey: bJob.Meta.StartKey, + EndKey: bJob.Meta.EndKey, } mMap := make(map[int64]*ddl.BackfillJobRangeMeta) mMap[m.PhyTblID] = m @@ -280,7 +281,7 @@ func TestSimpleExecBackfillJobs(t *testing.T) { bJobs, err := ddl.GetAndMarkBackfillJobsForOneEle(se, 1, jobID1, uuid, noPID, instanceLease) require.EqualError(t, err, dbterror.ErrDDLJobNotFound.FastGen("get zero backfill job").Error()) require.Nil(t, bJobs) - allCnt, err := ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID1, eleID2), "check_backfill_job_count") + allCnt, err := ddl.GetBackfillJobCount(se, ddl.BackgroundSubtaskTable, getIdxConditionStr(jobID1, eleID2), "check_backfill_job_count") require.NoError(t, err) require.Equal(t, allCnt, 0) // Test some backfill jobs, add backfill jobs to the table. @@ -293,7 +294,7 @@ func TestSimpleExecBackfillJobs(t *testing.T) { bjTestCases = append(bjTestCases, bJobs2...) bjTestCases = append(bjTestCases, bJobs3...) err = ddl.AddBackfillJobs(se, bjTestCases) - require.Equal(t, err.Error(), "[table:1292]Incorrect timestamp value: '0000-00-00 00:00:00' for column 'exec_lease' at row 1") + require.Equal(t, err.Error(), "[table:1292]Incorrect timestamp value: '0000-00-00 00:00:00' for column 'exec_expired' at row 1") tk.Session().GetSessionVars().SQLMode = mysql.ModeNone err = ddl.AddBackfillJobs(se, bjTestCases) // ID jobID eleID InstanceID PhysicalTableID @@ -336,7 +337,7 @@ func TestSimpleExecBackfillJobs(t *testing.T) { }) currGoTime := ddl.GetLeaseGoTime(currTime, instanceLease) require.GreaterOrEqual(t, currGoTime.Compare(bJobs[0].InstanceLease), 0) - allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID2, eleID2), "test_get_bj") + allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackgroundSubtaskTable, getIdxConditionStr(jobID2, eleID2), "test_get_bj") require.NoError(t, err) require.Equal(t, allCnt, cnt) // test physical table @@ -503,10 +504,10 @@ func TestSimpleExecBackfillJobs(t *testing.T) { // 0 jobID2 eleID3 // 1 jobID2 eleID3 require.NoError(t, err) - allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID1, eleID1), "test_get_bj") + allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackgroundSubtaskTable, getIdxConditionStr(jobID1, eleID1), "test_get_bj") require.NoError(t, err) require.Equal(t, allCnt, 1) - allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID2, eleID2), "test_get_bj") + allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackgroundSubtaskTable, getIdxConditionStr(jobID2, eleID2), "test_get_bj") require.NoError(t, err) require.Equal(t, allCnt, cnt) // remove all backfill jobs @@ -517,10 +518,10 @@ func TestSimpleExecBackfillJobs(t *testing.T) { // 0 jobID2 eleID3 // 1 jobID2 eleID3 require.NoError(t, err) - allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID1, eleID1), "test_get_bj") + allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackgroundSubtaskTable, getIdxConditionStr(jobID1, eleID1), "test_get_bj") require.NoError(t, err) require.Equal(t, allCnt, 1) - allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID2, eleID2), "test_get_bj") + allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackgroundSubtaskTable, getIdxConditionStr(jobID2, eleID2), "test_get_bj") require.NoError(t, err) require.Equal(t, allCnt, 0) // clean backfill job @@ -541,11 +542,11 @@ func TestSimpleExecBackfillJobs(t *testing.T) { currTime, err = ddl.GetOracleTimeWithStartTS(se) require.NoError(t, err) }) - condition := fmt.Sprintf("exec_ID = '' or exec_lease < '%v' and ddl_job_id = %d", currTime.Add(-instanceLease), jobID2) - bJobs, err = ddl.GetBackfillJobs(se, ddl.BackfillHistoryTable, condition, "test_get_bj") + condition := fmt.Sprintf("exec_id = '' or exec_expired < '%v' and task_key like \"%d_%%\"", currTime.Add(-instanceLease), jobID2) + bJobs, err = ddl.GetBackfillJobs(se, ddl.BackgroundSubtaskHistoryTable, condition, "test_get_bj") require.NoError(t, err) require.Len(t, bJobs, 1) - require.Equal(t, bJobs[0].FinishTS, uint64(0)) + require.Equal(t, bJobs[0].StateUpdateTS, uint64(0)) // test GetMaxBackfillJob pTblMeta, err := ddl.GetPhysicalTableMetas(se, bJobs3[0].JobID, bJobs3[0].EleID, eleKey) @@ -590,10 +591,10 @@ func TestSimpleExecBackfillJobs(t *testing.T) { require.Equal(t, "ID:3, JobID:1, EleID:11, Type:add index, State:cancelled, InstanceID:, InstanceLease:0000-00-00 00:00:00", bJobs1[1].AbbrStr()) require.Equal(t, "ID:0, JobID:2, EleID:33, Type:add index, State:none, InstanceID:, InstanceLease:0000-00-00 00:00:00", bJobs3[0].AbbrStr()) require.Equal(t, "ID:1, JobID:2, EleID:33, Type:add index, State:none, InstanceID:, InstanceLease:0000-00-00 00:00:00", bJobs3[1].AbbrStr()) - // test select tidb_ddl_backfill - tk.MustQuery(fmt.Sprintf("select exec_id, exec_lease from mysql.tidb_ddl_backfill where id = %d and %s", bJobs1[0].ID, getIdxConditionStr(jobID1, eleID1))). + // test select tidb_background_subtask + tk.MustQuery(fmt.Sprintf("select exec_id, exec_expired from mysql.tidb_background_subtask where task_key like \"%%%d\" and %s", bJobs1[0].ID, getIdxConditionStr(jobID1, eleID1))). Check(testkit.Rows(fmt.Sprintf("%s 0000-00-00 00:00:00", uuid))) - tk.MustQuery(fmt.Sprintf("select exec_id, exec_lease from mysql.tidb_ddl_backfill where id = %d and %s", bJobs1[1].ID, getIdxConditionStr(jobID1, eleID1))). + tk.MustQuery(fmt.Sprintf("select exec_id, exec_expired from mysql.tidb_background_subtask where task_key like \"%%%d\" and %s", bJobs1[1].ID, getIdxConditionStr(jobID1, eleID1))). Check(testkit.Rows(" 0000-00-00 00:00:00")) // test GetBackfillMetas bfErr := ddl.GetBackfillErr(se, jobID1, eleID1, eleKey) @@ -606,7 +607,7 @@ func TestSimpleExecBackfillJobs(t *testing.T) { bJobs1[1].State = model.JobStateNone bJobs1[1].ID = 4 err = ddl.AddBackfillHistoryJob(se, bJobs1) - // BackfillTable + // BackgroundSubtaskTable // ID jobID eleID state // -------------------------------- // 0 jobID1 eleID1 JobStateNone @@ -618,18 +619,18 @@ func TestSimpleExecBackfillJobs(t *testing.T) { // 2 jobID1 eleID1 JobStateRollingback // 3 jobID1 eleID1 JobStateCancelled // - // BackfillHistoryTable + // BackgroundSubtaskHistoryTable // ID jobID eleID state // -------------------------------- // 5 jobID1 eleID1 JobStateNone // 4 jobID1 eleID1 JobStateNone pTblMeta, err = ddl.GetPhysicalTableMetas(se, jobID1, eleID1, eleKey) require.NoError(t, err) - require.Equal(t, backfillJob2PTblMetaMap(bJobs1[0]), pTblMeta) + require.Equal(t, backfillJob2PTblMetaMap(bJobs1[0]), pTblMeta) // ??????????? bJobs1[0].ID = 6 bJobs1[1].ID = 7 err = ddl.AddBackfillJobs(se, bJobs1) - // BackfillTable + // BackgroundSubtaskTable // ID jobID eleID state // -------------------------------- // 0 jobID1 eleID1 JobStateNone @@ -643,7 +644,7 @@ func TestSimpleExecBackfillJobs(t *testing.T) { // 6 jobID1 eleID1 JobStateNone // 7 jobID1 eleID1 JobStateNone // - // BackfillHistoryTable + // BackgroundSubtaskHistoryTable // ID jobID eleID state // -------------------------------- // 5 jobID1 eleID1 JobStateNone @@ -653,18 +654,18 @@ func TestSimpleExecBackfillJobs(t *testing.T) { require.Equal(t, backfillJob2PTblMetaMap(bJobs1[1]), pTblMeta) // test MoveBackfillJobsToHistoryTable and GetInterruptedBackfillJobForOneEle - allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID2, eleID3), "test_get_bj") + allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackgroundSubtaskTable, getIdxConditionStr(jobID2, eleID3), "test_get_bj") require.NoError(t, err) require.Equal(t, allCnt, 2) err = ddl.MoveBackfillJobsToHistoryTable(se, bJobs3[0]) require.NoError(t, err) - allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID2, eleID3), "test_get_bj") + allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackgroundSubtaskTable, getIdxConditionStr(jobID2, eleID3), "test_get_bj") require.NoError(t, err) require.Equal(t, allCnt, 0) - allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillHistoryTable, getIdxConditionStr(jobID2, eleID3), "test_get_bj") + allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackgroundSubtaskHistoryTable, getIdxConditionStr(jobID2, eleID3), "test_get_bj") require.NoError(t, err) require.Equal(t, allCnt, 2) - // BackfillTable + // BackgroundSubtaskTable // ID jobID eleID state // -------------------------------- // 0 jobID1 eleID1 JobStateNone @@ -676,7 +677,7 @@ func TestSimpleExecBackfillJobs(t *testing.T) { // 6 jobID1 eleID1 JobStateNone // 7 jobID1 eleID1 JobStateNone // - // BackfillHistoryTable + // BackgroundSubtaskHistoryTable // ID jobID eleID state // -------------------------------- // 5 jobID1 eleID1 JobStateNone @@ -686,15 +687,15 @@ func TestSimpleExecBackfillJobs(t *testing.T) { bJobs, err = ddl.GetInterruptedBackfillJobForOneEle(se, jobID1, eleID1, eleKey) require.NoError(t, err) require.Len(t, bJobs, 0) - allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID1, eleID1), "test_get_bj") + allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackgroundSubtaskTable, getIdxConditionStr(jobID1, eleID1), "test_get_bj") require.NoError(t, err) require.Equal(t, allCnt, 6) err = ddl.MoveBackfillJobsToHistoryTable(se, bJobs1[0]) require.NoError(t, err) - allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID1, eleID1), "test_get_bj") + allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackgroundSubtaskTable, getIdxConditionStr(jobID1, eleID1), "test_get_bj") require.NoError(t, err) require.Equal(t, allCnt, 0) - allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillHistoryTable, getIdxConditionStr(jobID1, eleID1), "test_get_bj") + allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackgroundSubtaskHistoryTable, getIdxConditionStr(jobID1, eleID1), "test_get_bj") require.NoError(t, err) require.Equal(t, allCnt, 8) bJobs, err = ddl.GetInterruptedBackfillJobForOneEle(se, jobID2, eleID3, eleKey) @@ -706,13 +707,13 @@ func TestSimpleExecBackfillJobs(t *testing.T) { } expectJob.State = model.JobStateCancelled equalBackfillJob(t, bJobs3[0], bJobs[0], types.ZeroTimestamp) - // BackfillTable + // BackgroundSubtaskTable // ID jobID eleID state // -------------------------------- // 0 jobID2 eleID2 JobStateNone // 1 jobID2 eleID2 JobStateNone // - // BackfillHistoryTable + // BackgroundSubtaskHistoryTable // ID jobID eleID state // -------------------------------- // 5 jobID1 eleID1 JobStateNone diff --git a/meta/meta.go b/meta/meta.go index 801273a98cbf9..af02ae7882ac3 100644 --- a/meta/meta.go +++ b/meta/meta.go @@ -132,7 +132,7 @@ const ( BaseDDLTableVersion DDLTableVersion = 1 // MDLTableVersion is for support MDL tables. MDLTableVersion DDLTableVersion = 2 - // BackfillTableVersion is for support distributed reorg stage, it added tidb_ddl_backfill, tidb_ddl_backfill_history. + // BackfillTableVersion is for support distributed reorg stage, it added tidb_background_subtask, tidb_background_subtask_history. BackfillTableVersion DDLTableVersion = 3 ) diff --git a/parser/model/ddl.go b/parser/model/ddl.go index 3eec005372384..36ada1ed9b3ea 100644 --- a/parser/model/ddl.go +++ b/parser/model/ddl.go @@ -446,8 +446,11 @@ type BackfillMeta struct { WarningsCount map[errors.ErrorID]int64 `json:"warnings_count"` Location *TimeZoneLocation `json:"location"` ReorgTp ReorgType `json:"reorg_tp"` - - *JobMeta `json:"job_meta"` + RowCount int64 `json:"row_count"` + StartKey []byte `json:"start_key"` + EndKey []byte `json:"end_key"` + CurrKey []byte `json:"curr_key"` + *JobMeta `json:"job_meta"` } // Encode encodes BackfillMeta with json format. @@ -952,6 +955,30 @@ func (s JobState) String() string { } } +// StrToJobState converts string to JobState. +func StrToJobState(s string) JobState { + switch s { + case "running": + return JobStateRunning + case "rollingback": + return JobStateRollingback + case "rollback done": + return JobStateRollbackDone + case "done": + return JobStateDone + case "cancelled": + return JobStateCancelled + case "cancelling": + return JobStateCancelling + case "synced": + return JobStateSynced + case "queueing": + return JobStateQueueing + default: + return JobStateNone + } +} + // SchemaDiff contains the schema modification at a particular schema version. // It is used to reduce schema reload cost. type SchemaDiff struct { diff --git a/resourcemanager/schedule.go b/resourcemanager/schedule.go index a33e0b75a764e..4ba4068ba88c2 100644 --- a/resourcemanager/schedule.go +++ b/resourcemanager/schedule.go @@ -55,14 +55,14 @@ func (*ResourceManager) exec(pool *util.PoolContainer, cmd scheduler.Command) { switch cmd { case scheduler.Downclock: concurrency := con - 1 - log.Info("[resource manager] downclock goroutine pool", + log.Debug("[resource manager] downclock goroutine pool", zap.Int("origin concurrency", con), zap.Int("concurrency", concurrency), zap.String("name", pool.Pool.Name())) pool.Pool.Tune(concurrency) case scheduler.Overclock: concurrency := con + 1 - log.Info("[resource manager] overclock goroutine pool", + log.Debug("[resource manager] overclock goroutine pool", zap.Int("origin concurrency", con), zap.Int("concurrency", concurrency), zap.String("name", pool.Pool.Name())) diff --git a/session/bootstrap_test.go b/session/bootstrap_test.go index e6013d8b150ce..d126c1f7549f9 100644 --- a/session/bootstrap_test.go +++ b/session/bootstrap_test.go @@ -217,8 +217,8 @@ func TestBootstrapWithError(t *testing.T) { require.Equal(t, []byte("True"), row.GetBytes(0)) require.NoError(t, r.Close()) - mustExec(t, se, "SELECT * from mysql.tidb_ddl_backfill") - mustExec(t, se, "SELECT * from mysql.tidb_ddl_backfill_history") + mustExec(t, se, "SELECT * from mysql.tidb_background_subtask") + mustExec(t, se, "SELECT * from mysql.tidb_background_subtask_history") // Check tidb_ttl_table_status table mustExec(t, se, "SELECT * from mysql.tidb_ttl_table_status") @@ -238,8 +238,8 @@ func TestDDLTableCreateBackfillTable(t *testing.T) { // downgrade `mDDLTableVersion` m.SetDDLTables(meta.MDLTableVersion) - mustExec(t, se, "drop table mysql.tidb_ddl_backfill") - mustExec(t, se, "drop table mysql.tidb_ddl_backfill_history") + mustExec(t, se, "drop table mysql.tidb_background_subtask") + mustExec(t, se, "drop table mysql.tidb_background_subtask_history") err = txn.Commit(context.Background()) require.NoError(t, err) @@ -249,8 +249,8 @@ func TestDDLTableCreateBackfillTable(t *testing.T) { require.NoError(t, err) se = createSessionAndSetID(t, store) - mustExec(t, se, "select * from mysql.tidb_ddl_backfill") - mustExec(t, se, "select * from mysql.tidb_ddl_backfill_history") + mustExec(t, se, "select * from mysql.tidb_background_subtask") + mustExec(t, se, "select * from mysql.tidb_background_subtask_history") dom.Close() } diff --git a/session/session.go b/session/session.go index 6d7c879dd22bf..bd99f44604802 100644 --- a/session/session.go +++ b/session/session.go @@ -3074,8 +3074,8 @@ var ( } // BackfillTables is a list of tables definitions used in dist reorg DDL. BackfillTables = []tableBasicInfo{ - {ddl.BackfillTableSQL, ddl.BackfillTableID}, - {ddl.BackfillHistoryTableSQL, ddl.BackfillHistoryTableID}, + {ddl.BackgroundSubtaskTableSQL, ddl.BackgroundSubtaskTableID}, + {ddl.BackgroundSubtaskHistoryTableSQL, ddl.BackgroundSubtaskHistoryTableID}, } mdlTable = "create table mysql.tidb_mdl_info(job_id BIGINT NOT NULL PRIMARY KEY, version BIGINT NOT NULL, table_ids text(65535));" ) @@ -3094,7 +3094,7 @@ func splitAndScatterTable(store kv.Storage, tableIDs []int64) { } } -// InitDDLJobTables is to create tidb_ddl_job, tidb_ddl_reorg and tidb_ddl_history, or tidb_ddl_backfill and tidb_ddl_backfill_history. +// InitDDLJobTables is to create tidb_ddl_job, tidb_ddl_reorg and tidb_ddl_history, or tidb_background_subtask and tidb_background_subtask_history. func InitDDLJobTables(store kv.Storage, targetVer meta.DDLTableVersion) error { targetTables := DDLJobTables if targetVer == meta.BackfillTableVersion { diff --git a/session/session_test.go b/session/session_test.go index 4edb8ddce4c86..1426c689997d6 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -45,11 +45,11 @@ func TestInitMetaTable(t *testing.T) { } tbls := map[string]struct{}{ - "tidb_ddl_job": {}, - "tidb_ddl_reorg": {}, - "tidb_ddl_history": {}, - "tidb_ddl_backfill": {}, - "tidb_ddl_backfill_history": {}, + "tidb_ddl_job": {}, + "tidb_ddl_reorg": {}, + "tidb_ddl_history": {}, + "tidb_background_subtask": {}, + "tidb_background_subtask_history": {}, } for tbl := range tbls { @@ -83,12 +83,12 @@ func TestMetaTableRegion(t *testing.T) { require.NotEqual(t, ddlJobTableRegionID, ddlReorgTableRegionID) - ddlBackfillTableRegionID := tk.MustQuery("show table mysql.tidb_ddl_backfill regions").Rows()[0][0] - ddlBackfillTableRegionStartKey := tk.MustQuery("show table mysql.tidb_ddl_backfill regions").Rows()[0][1] - require.Equal(t, ddlBackfillTableRegionStartKey, fmt.Sprintf("%s_%d_", tablecodec.TablePrefix(), ddl.BackfillTableID)) - ddlBackfillHistoryTableRegionID := tk.MustQuery("show table mysql.tidb_ddl_backfill_history regions").Rows()[0][0] - ddlBackfillHistoryTableRegionStartKey := tk.MustQuery("show table mysql.tidb_ddl_backfill_history regions").Rows()[0][1] - require.Equal(t, ddlBackfillHistoryTableRegionStartKey, fmt.Sprintf("%s_%d_", tablecodec.TablePrefix(), ddl.BackfillHistoryTableID)) + ddlBackfillTableRegionID := tk.MustQuery("show table mysql.tidb_background_subtask regions").Rows()[0][0] + ddlBackfillTableRegionStartKey := tk.MustQuery("show table mysql.tidb_background_subtask regions").Rows()[0][1] + require.Equal(t, ddlBackfillTableRegionStartKey, fmt.Sprintf("%s_%d_", tablecodec.TablePrefix(), ddl.BackgroundSubtaskTableID)) + ddlBackfillHistoryTableRegionID := tk.MustQuery("show table mysql.tidb_background_subtask_history regions").Rows()[0][0] + ddlBackfillHistoryTableRegionStartKey := tk.MustQuery("show table mysql.tidb_background_subtask_history regions").Rows()[0][1] + require.Equal(t, ddlBackfillHistoryTableRegionStartKey, fmt.Sprintf("%s_%d_", tablecodec.TablePrefix(), ddl.BackgroundSubtaskHistoryTableID)) require.NotEqual(t, ddlBackfillTableRegionID, ddlBackfillHistoryTableRegionID) } diff --git a/telemetry/BUILD.bazel b/telemetry/BUILD.bazel index 56376f0031109..8b67ae1144e7b 100644 --- a/telemetry/BUILD.bazel +++ b/telemetry/BUILD.bazel @@ -51,7 +51,7 @@ go_library( go_test( name = "telemetry_test", - timeout = "short", + timeout = "moderate", srcs = [ "data_cluster_hardware_test.go", "data_feature_usage_test.go", @@ -62,6 +62,7 @@ go_test( ], embed = [":telemetry"], flaky = True, + shard_count = 30, deps = [ "//autoid_service", "//config", diff --git a/telemetry/data_feature_usage_test.go b/telemetry/data_feature_usage_test.go index 67d3afac74d98..af746b1b7b44d 100644 --- a/telemetry/data_feature_usage_test.go +++ b/telemetry/data_feature_usage_test.go @@ -586,6 +586,7 @@ func TestAddIndexAccelerationAndMDL(t *testing.T) { } func TestDistReorgUsage(t *testing.T) { + t.Skip("skip in order to pass the test quickly") store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) usage, err := telemetry.GetFeatureUsage(tk.Session())