diff --git a/.bazelrc b/.bazelrc index 61356f086fda5..11c7c8f9ad2ba 100644 --- a/.bazelrc +++ b/.bazelrc @@ -1,4 +1,4 @@ -startup --host_jvm_args=-Xmx8g +startup --host_jvm_args=-Xmx4g startup --unlimit_coredumps run:ci --color=yes diff --git a/Makefile b/Makefile index ce49ce9f45913..6e00960f7dbda 100644 --- a/Makefile +++ b/Makefile @@ -414,6 +414,10 @@ bazel_coverage_test: failpoint-enable bazel_ci_prepare --build_event_json_file=bazel_1.json --@io_bazel_rules_go//go/config:cover_format=go_cover --define gotags=deadlock,intest \ -- //... -//cmd/... -//tests/graceshutdown/... \ -//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test -//tests/realtikvtest/... + bazel $(BAZEL_GLOBAL_CONFIG) coverage $(BAZEL_CMD_CONFIG) \ + --build_event_json_file=bazel_1.json --@io_bazel_rules_go//go/config:cover_format=go_cover --define gotags=deadlock,intest,distributereorg \ + -- //... -//cmd/... -//tests/graceshutdown/... \ + -//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test -//tests/realtikvtest/... bazel_build: bazel_ci_prepare mkdir -p bin diff --git a/ddl/BUILD.bazel b/ddl/BUILD.bazel index daa9c7eea4b29..a117b052d0651 100644 --- a/ddl/BUILD.bazel +++ b/ddl/BUILD.bazel @@ -24,6 +24,8 @@ go_library( "ddl_workerpool.go", "delete_range.go", "delete_range_util.go", + "dist_backfilling.go", + "dist_owner.go", "foreign_key.go", "generated_column.go", "index.go", @@ -81,6 +83,8 @@ go_library( "//parser/terror", "//parser/types", "//privilege", + "//resourcemanager/pooltask", + "//resourcemanager/util", "//sessionctx", "//sessionctx/binloginfo", "//sessionctx/stmtctx", @@ -106,6 +110,7 @@ go_library( "//util/filter", "//util/gcutil", "//util/generic", + "//util/gpool/spmc", "//util/hack", "//util/intest", "//util/logutil", @@ -118,6 +123,7 @@ go_library( "//util/slice", "//util/sqlexec", "//util/stringutil", + "//util/syncutil", "//util/timeutil", "//util/topsql", "//util/topsql/state", diff --git a/ddl/attributes_sql_test.go b/ddl/attributes_sql_test.go index 2d2685ffd06b2..6950b44645ecf 100644 --- a/ddl/attributes_sql_test.go +++ b/ddl/attributes_sql_test.go @@ -73,6 +73,8 @@ func TestAlterTableAttributes(t *testing.T) { // without equal tk.MustExec(`alter table alter_t attributes " merge_option=allow ";`) tk.MustExec(`alter table alter_t attributes " merge_option=allow , key=value ";`) + + tk.MustExec("drop table alter_t") } func TestAlterTablePartitionAttributes(t *testing.T) { @@ -134,6 +136,8 @@ PARTITION BY RANGE (c) ( require.Len(t, rows4, 1) require.NotEqual(t, rows3[0][3], rows4[0][3]) require.NotEqual(t, rows[0][3], rows4[0][3]) + + tk.MustExec("drop table alter_p") } func TestTruncateTable(t *testing.T) { diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 5036e56c096d8..4455801b7cc6d 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -65,11 +65,13 @@ const ( updateInstanceLease = 25 * time.Second genTaskBatch = 4096 minGenTaskBatch = 1024 - minDistTaskCnt = 16 - retrySQLTimes = 3 - retrySQLInterval = 500 * time.Millisecond + minDistTaskCnt = 32 + retrySQLTimes = 10 ) +// RetrySQLInterval is export for test. +var RetrySQLInterval = 300 * time.Millisecond + func (bT backfillerType) String() string { switch bT { case typeAddIndexWorker: @@ -122,7 +124,7 @@ func GetOracleTimeWithStartTS(se *session) (time.Time, error) { return oracle.GetTimeFromTS(txn.StartTS()).UTC(), nil } -// GetOracleTime returns the current time from TS. +// GetOracleTime returns the current time from TS without txn. func GetOracleTime(store kv.Storage) (time.Time, error) { currentVer, err := store.CurrentVersion(kv.GlobalTxnScope) if err != nil { @@ -189,7 +191,20 @@ func GetLeaseGoTime(currTime time.Time, lease time.Duration) types.Time { // Instead, it is divided into batches, each time a kv transaction completes the backfilling // of a partial batch. +// backfillTaskContext is the context of the batch adding indices or updating column values. +// After finishing the batch adding indices or updating column values, result in backfillTaskContext will be merged into backfillResult. +type backfillTaskContext struct { + nextKey kv.Key + done bool + addedCount int + scanCount int + warnings map[errors.ErrorID]*terror.Error + warningsCount map[errors.ErrorID]int64 + finishTS uint64 +} + type backfillCtx struct { + id int *ddlCtx reorgTp model.ReorgType sessCtx sessionctx.Context @@ -198,9 +213,13 @@ type backfillCtx struct { batchCnt int } -func newBackfillCtx(ctx *ddlCtx, sessCtx sessionctx.Context, reorgTp model.ReorgType, +func newBackfillCtx(ctx *ddlCtx, id int, sessCtx sessionctx.Context, reorgTp model.ReorgType, schemaName string, tbl table.Table) *backfillCtx { + if id == 0 { + id = int(backfillContextID.Add(1)) + } return &backfillCtx{ + id: id, ddlCtx: ctx, sessCtx: sessCtx, reorgTp: reorgTp, @@ -213,7 +232,7 @@ func newBackfillCtx(ctx *ddlCtx, sessCtx sessionctx.Context, reorgTp model.Reorg type backfiller interface { BackfillDataInTxn(handleRange reorgBackfillTask) (taskCtx backfillTaskContext, errInTxn error) AddMetricInfo(float64) - GetTask() (*BackfillJob, error) + GetTasks() ([]*BackfillJob, error) UpdateTask(bfJob *BackfillJob) error FinishTask(bfJob *BackfillJob) error GetCtx() *backfillCtx @@ -228,17 +247,6 @@ type backfillResult struct { err error } -// backfillTaskContext is the context of the batch adding indices or updating column values. -// After finishing the batch adding indices or updating column values, result in backfillTaskContext will be merged into backfillResult. -type backfillTaskContext struct { - nextKey kv.Key - done bool - addedCount int - scanCount int - warnings map[errors.ErrorID]*terror.Error - warningsCount map[errors.ErrorID]int64 -} - type reorgBackfillTask struct { bfJob *BackfillJob physicalTable table.PhysicalTable @@ -288,7 +296,6 @@ func mergeBackfillCtxToResult(taskCtx *backfillTaskContext, result *backfillResu } type backfillWorker struct { - id int backfiller taskCh chan *reorgBackfillTask resultCh chan *backfillResult @@ -296,11 +303,10 @@ type backfillWorker struct { cancel func() } -func newBackfillWorker(ctx context.Context, id int, bf backfiller) *backfillWorker { +func newBackfillWorker(ctx context.Context, bf backfiller) *backfillWorker { bfCtx, cancel := context.WithCancel(ctx) return &backfillWorker{ backfiller: bf, - id: id, taskCh: make(chan *reorgBackfillTask, 1), resultCh: make(chan *backfillResult, 1), ctx: bfCtx, @@ -320,15 +326,11 @@ func (w *backfillWorker) updateLease(execID string, bfJob *BackfillJob, nextKey } func (w *backfillWorker) finishJob(bfJob *BackfillJob) error { - bfJob.State = model.JobStateDone return w.backfiller.FinishTask(bfJob) } func (w *backfillWorker) String() string { - if w.backfiller == nil { - return fmt.Sprintf("worker %d", w.id) - } - return fmt.Sprintf("worker %d, tp %s", w.id, w.backfiller.String()) + return fmt.Sprintf("backfill-worker %d, tp %s", w.GetCtx().id, w.backfiller.String()) } func (w *backfillWorker) Close() { @@ -363,12 +365,13 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask, jobID := task.getJobID() rc := d.getReorgCtx(jobID) + isDistReorg := task.bfJob != nil for { // Give job chance to be canceled, if we not check it here, // if there is panic in bf.BackfillDataInTxn we will never cancel the job. // Because reorgRecordTask may run a long time, // we should check whether this ddl job is still runnable. - err := d.isReorgRunnable(jobID) + err := d.isReorgRunnable(jobID, isDistReorg) if err != nil { result.err = err return result @@ -396,7 +399,7 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask, if num := result.scanCount - lastLogCount; num >= 90000 { lastLogCount = result.scanCount - logutil.BgLogger().Info("[ddl] backfill worker back fill index", + logutil.BgLogger().Info("[ddl] backfill worker back fill index", zap.Stringer("worker", w), zap.Int("addedCount", result.addedCount), zap.Int("scanCount", result.scanCount), zap.String("next key", hex.EncodeToString(taskCtx.nextKey)), zap.Float64("speed(rows/s)", float64(num)/time.Since(lastLogTime).Seconds())) @@ -408,7 +411,7 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask, break } - if task.bfJob != nil { + if isDistReorg { // TODO: Adjust the updating lease frequency by batch processing time carefully. if time.Since(batchStartTime) < updateInstanceLease { continue @@ -423,18 +426,63 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask, } } logutil.BgLogger().Info("[ddl] backfill worker finish task", - zap.Stringer("worker", w), - zap.Stringer("task", task), + zap.Stringer("worker", w), zap.Stringer("task", task), zap.Int("added count", result.addedCount), zap.Int("scan count", result.scanCount), zap.String("next key", hex.EncodeToString(result.nextKey)), - zap.String("take time", time.Since(startTime).String())) + zap.Stringer("take time", time.Since(startTime))) if ResultCounterForTest != nil && result.err == nil { ResultCounterForTest.Add(1) } return result } +func (w *backfillWorker) runTask(task *reorgBackfillTask) (result *backfillResult) { + logutil.BgLogger().Info("[ddl] backfill worker start", zap.Stringer("worker", w), zap.String("task", task.String())) + defer util.Recover(metrics.LabelDDL, "backfillWorker.runTask", func() { + result = &backfillResult{taskID: task.id, err: dbterror.ErrReorgPanic} + }, false) + defer w.GetCtx().setDDLLabelForTopSQL(task.jobID, task.sqlQuery) + + failpoint.Inject("mockBackfillRunErr", func() { + if w.GetCtx().id == 0 { + result := &backfillResult{taskID: task.id, addedCount: 0, nextKey: nil, err: errors.Errorf("mock backfill error")} + failpoint.Return(result) + } + }) + failpoint.Inject("mockHighLoadForAddIndex", func() { + sqlPrefixes := []string{"alter"} + topsql.MockHighCPULoad(task.sqlQuery, sqlPrefixes, 5) + }) + failpoint.Inject("mockBackfillSlow", func() { + time.Sleep(100 * time.Millisecond) + }) + + // Change the batch size dynamically. + w.GetCtx().batchCnt = int(variable.GetDDLReorgBatchSize()) + result = w.handleBackfillTask(w.GetCtx().ddlCtx, task, w.backfiller) + task.bfJob.RowCount = int64(result.addedCount) + if result.err != nil { + logutil.BgLogger().Warn("[ddl] backfill worker runTask failed", + zap.Stringer("worker", w), zap.String("backfillJob", task.bfJob.AbbrStr()), zap.Error(result.err)) + if dbterror.ErrDDLJobNotFound.Equal(result.err) { + result.err = nil + return result + } + task.bfJob.State = model.JobStateCancelled + task.bfJob.Meta.Error = toTError(result.err) + if err := w.finishJob(task.bfJob); err != nil { + logutil.BgLogger().Info("[ddl] backfill worker runTask, finishJob failed", + zap.Stringer("worker", w), zap.String("backfillJob", task.bfJob.AbbrStr()), zap.Error(err)) + result.err = err + } + } else { + task.bfJob.State = model.JobStateDone + result.err = w.finishJob(task.bfJob) + } + return result +} + func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) { logutil.BgLogger().Info("[ddl] backfill worker start", zap.Stringer("worker", w)) var curTaskID int @@ -443,22 +491,20 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) { }, false) for { if util.HasCancelled(w.ctx) { - logutil.BgLogger().Info("[ddl] backfill worker exit on context done", - zap.Stringer("worker", w), zap.Int("workerID", w.id)) + logutil.BgLogger().Info("[ddl] backfill worker exit on context done", zap.Stringer("worker", w)) return } task, more := <-w.taskCh if !more { - logutil.BgLogger().Info("[ddl] backfill worker exit", - zap.Stringer("worker", w), zap.Int("workerID", w.id)) + logutil.BgLogger().Info("[ddl] backfill worker exit", zap.Stringer("worker", w)) return } curTaskID = task.id d.setDDLLabelForTopSQL(job.ID, job.Query) - logutil.BgLogger().Debug("[ddl] backfill worker got task", zap.Int("workerID", w.id), zap.String("task", task.String())) + logutil.BgLogger().Debug("[ddl] backfill worker got task", zap.Int("workerID", w.GetCtx().id), zap.String("task", task.String())) failpoint.Inject("mockBackfillRunErr", func() { - if w.id == 0 { + if w.GetCtx().id == 0 { result := &backfillResult{taskID: task.id, addedCount: 0, nextKey: nil, err: errors.Errorf("mock backfill error")} w.resultCh <- result failpoint.Continue() @@ -480,7 +526,7 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) { w.resultCh <- result if result.err != nil { logutil.BgLogger().Info("[ddl] backfill worker exit on error", - zap.Stringer("worker", w), zap.Int("workerID", w.id), zap.Error(result.err)) + zap.Stringer("worker", w), zap.Error(result.err)) return } } @@ -580,7 +626,7 @@ func (dc *ddlCtx) sendTasksAndWait(scheduler *backfillScheduler, totalAddedCount nextKey, taskAddedCount, err := waitTaskResults(scheduler, batchTasks, totalAddedCount) elapsedTime := time.Since(startTime) if err == nil { - err = dc.isReorgRunnable(reorgInfo.Job.ID) + err = dc.isReorgRunnable(reorgInfo.Job.ID, false) } // Update the reorg handle that has been processed. @@ -631,6 +677,8 @@ func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange, } // Build reorg tasks. job := reorgInfo.Job + //nolint:forcetypeassert + phyTbl := t.(table.PhysicalTable) jobCtx := reorgInfo.d.jobContext(reorgInfo.Job.ID) for i, keyRange := range kvRanges { startKey := keyRange.StartKey @@ -650,8 +698,6 @@ func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange, endKey = prefix.PrefixNext() } - //nolint:forcetypeassert - phyTbl := t.(table.PhysicalTable) task := &reorgBackfillTask{ id: i, jobID: reorgInfo.Job.ID, @@ -729,11 +775,16 @@ func makeupDecodeColMap(sessCtx sessionctx.Context, dbName model.CIStr, t table. return decodeColMap, nil } -func setSessCtxLocation(sctx sessionctx.Context, info *reorgInfo) error { +func setSessCtxLocation(sctx sessionctx.Context, tzLocation *model.TimeZoneLocation) error { // It is set to SystemLocation to be compatible with nil LocationInfo. - *sctx.GetSessionVars().TimeZone = *timeutil.SystemLocation() - if info.ReorgMeta.Location != nil { - loc, err := info.ReorgMeta.Location.GetLocation() + tz := *timeutil.SystemLocation() + if sctx.GetSessionVars().TimeZone == nil { + sctx.GetSessionVars().TimeZone = &tz + } else { + *sctx.GetSessionVars().TimeZone = tz + } + if tzLocation != nil { + loc, err := tzLocation.GetLocation() if err != nil { return errors.Trace(err) } @@ -782,15 +833,26 @@ func newBackfillScheduler(ctx context.Context, info *reorgInfo, sessPool *sessio func (b *backfillScheduler) newSessCtx() (sessionctx.Context, error) { reorgInfo := b.reorgInfo sessCtx := newContext(reorgInfo.d.store) + if err := initSessCtx(sessCtx, reorgInfo.ReorgMeta.SQLMode, reorgInfo.ReorgMeta.Location); err != nil { + return nil, errors.Trace(err) + } + return sessCtx, nil +} + +func initSessCtx(sessCtx sessionctx.Context, sqlMode mysql.SQLMode, tzLocation *model.TimeZoneLocation) error { + // Unify the TimeZone settings in newContext. + if sessCtx.GetSessionVars().StmtCtx.TimeZone == nil { + tz := *time.UTC + sessCtx.GetSessionVars().StmtCtx.TimeZone = &tz + } sessCtx.GetSessionVars().StmtCtx.IsDDLJobInQueue = true // Set the row encode format version. rowFormat := variable.GetDDLReorgRowFormat() sessCtx.GetSessionVars().RowEncoder.Enable = rowFormat != variable.DefTiDBRowFormatV1 // Simulate the sql mode environment in the worker sessionCtx. - sqlMode := reorgInfo.ReorgMeta.SQLMode sessCtx.GetSessionVars().SQLMode = sqlMode - if err := setSessCtxLocation(sessCtx, reorgInfo); err != nil { - return nil, errors.Trace(err) + if err := setSessCtxLocation(sessCtx, tzLocation); err != nil { + return errors.Trace(err) } sessCtx.GetSessionVars().StmtCtx.BadNullAsWarning = !sqlMode.HasStrictMode() sessCtx.GetSessionVars().StmtCtx.TruncateAsWarning = !sqlMode.HasStrictMode() @@ -799,7 +861,7 @@ func (b *backfillScheduler) newSessCtx() (sessionctx.Context, error) { sessCtx.GetSessionVars().StmtCtx.DividedByZeroAsWarning = !sqlMode.HasStrictMode() sessCtx.GetSessionVars().StmtCtx.IgnoreZeroInDate = !sqlMode.HasStrictMode() || sqlMode.HasAllowInvalidDatesMode() sessCtx.GetSessionVars().StmtCtx.NoZeroDate = sqlMode.HasStrictMode() - return sessCtx, nil + return nil } func (b *backfillScheduler) setMaxWorkerSize(maxSize int) { @@ -836,32 +898,32 @@ func (b *backfillScheduler) adjustWorkerSize() error { ) switch b.tp { case typeAddIndexWorker: - backfillCtx := newBackfillCtx(reorgInfo.d, sessCtx, reorgInfo.ReorgMeta.ReorgTp, job.SchemaName, b.tbl) - idxWorker, err := newAddIndexWorker(b.decodeColMap, i, b.tbl, backfillCtx, + backfillCtx := newBackfillCtx(reorgInfo.d, i, sessCtx, reorgInfo.ReorgMeta.ReorgTp, job.SchemaName, b.tbl) + idxWorker, err := newAddIndexWorker(b.decodeColMap, b.tbl, backfillCtx, jc, job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey) if err != nil { - if b.canSkipError(err) { + if canSkipError(b.reorgInfo.ID, len(b.workers), err) { continue } return err } idxWorker.copReqSenderPool = b.copReqSenderPool - runner = newBackfillWorker(jc.ddlJobCtx, i, idxWorker) + runner = newBackfillWorker(jc.ddlJobCtx, idxWorker) worker = idxWorker case typeAddIndexMergeTmpWorker: - backfillCtx := newBackfillCtx(reorgInfo.d, sessCtx, reorgInfo.ReorgMeta.ReorgTp, job.SchemaName, b.tbl) + backfillCtx := newBackfillCtx(reorgInfo.d, i, sessCtx, reorgInfo.ReorgMeta.ReorgTp, job.SchemaName, b.tbl) tmpIdxWorker := newMergeTempIndexWorker(backfillCtx, i, b.tbl, reorgInfo.currElement.ID, jc) - runner = newBackfillWorker(jc.ddlJobCtx, i, tmpIdxWorker) + runner = newBackfillWorker(jc.ddlJobCtx, tmpIdxWorker) worker = tmpIdxWorker case typeUpdateColumnWorker: // Setting InCreateOrAlterStmt tells the difference between SELECT casting and ALTER COLUMN casting. sessCtx.GetSessionVars().StmtCtx.InCreateOrAlterStmt = true - updateWorker := newUpdateColumnWorker(sessCtx, b.tbl, b.decodeColMap, reorgInfo, jc) - runner = newBackfillWorker(jc.ddlJobCtx, i, updateWorker) + updateWorker := newUpdateColumnWorker(sessCtx, i, b.tbl, b.decodeColMap, reorgInfo, jc) + runner = newBackfillWorker(jc.ddlJobCtx, updateWorker) worker = updateWorker case typeCleanUpIndexWorker: - idxWorker := newCleanUpIndexWorker(sessCtx, b.tbl, b.decodeColMap, reorgInfo, jc) - runner = newBackfillWorker(jc.ddlJobCtx, i, idxWorker) + idxWorker := newCleanUpIndexWorker(sessCtx, i, b.tbl, b.decodeColMap, reorgInfo, jc) + runner = newBackfillWorker(jc.ddlJobCtx, idxWorker) worker = idxWorker default: return errors.New("unknown backfill type") @@ -907,14 +969,14 @@ func (b *backfillScheduler) initCopReqSenderPool() { b.copReqSenderPool = newCopReqSenderPool(b.ctx, copCtx, sessCtx.GetStore()) } -func (b *backfillScheduler) canSkipError(err error) bool { - if len(b.workers) > 0 { +func canSkipError(jobID int64, workerCnt int, err error) bool { + if workerCnt > 0 { // The error can be skipped because the rest workers can handle the tasks. return true } logutil.BgLogger().Warn("[ddl] create add index backfill worker failed", - zap.Int("current worker count", len(b.workers)), - zap.Int64("job ID", b.reorgInfo.ID), zap.Error(err)) + zap.Int("current worker count", workerCnt), + zap.Int64("job ID", jobID), zap.Error(err)) return false } @@ -953,7 +1015,7 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic return errors.Trace(err) } - if err := dc.isReorgRunnable(reorgInfo.Job.ID); err != nil { + if err := dc.isReorgRunnable(reorgInfo.Job.ID, false); err != nil { return errors.Trace(err) } if startKey == nil && endKey == nil { @@ -1050,6 +1112,7 @@ func addBatchBackfillJobs(sess *session, bfWorkerType backfillerType, reorgInfo if notDistTask { instanceID = reorgInfo.d.uuid } + // TODO: Adjust the number of ranges(region) for each task. for _, task := range batchTasks { bm := &model.BackfillMeta{ @@ -1062,6 +1125,7 @@ func addBatchBackfillJobs(sess *session, bfWorkerType backfillerType, reorgInfo JobMeta: &model.JobMeta{ SchemaID: reorgInfo.Job.SchemaID, TableID: reorgInfo.Job.TableID, + Type: reorgInfo.Job.Type, Query: reorgInfo.Job.Query, }, } @@ -1087,7 +1151,7 @@ func addBatchBackfillJobs(sess *session, bfWorkerType backfillerType, reorgInfo return nil } -func (*ddlCtx) splitTableToBackfillJobs(sess *session, reorgInfo *reorgInfo, pTbl table.PhysicalTable, isUnique bool, +func (dc *ddlCtx) splitTableToBackfillJobs(sess *session, reorgInfo *reorgInfo, pTbl table.PhysicalTable, isUnique bool, bfWorkerType backfillerType, startKey kv.Key, currBackfillJobID int64) error { endKey := reorgInfo.EndKey isFirstOps := true @@ -1108,7 +1172,7 @@ func (*ddlCtx) splitTableToBackfillJobs(sess *session, reorgInfo *reorgInfo, pTb isFirstOps = false remains := kvRanges[len(batchTasks):] - // TODO: After adding backfillCh do asyncNotify(dc.backfillJobCh). + dc.asyncNotifyWorker(dc.backfillJobCh, addingBackfillJob, reorgInfo.Job.ID, "backfill_job") logutil.BgLogger().Info("[ddl] split backfill jobs to the backfill table", zap.Int("batchTasksCnt", len(batchTasks)), zap.Int("totalRegionCnt", len(kvRanges)), @@ -1128,235 +1192,13 @@ func (*ddlCtx) splitTableToBackfillJobs(sess *session, reorgInfo *reorgInfo, pTb if bJobCnt < minGenTaskBatch { break } - time.Sleep(retrySQLInterval) + time.Sleep(RetrySQLInterval) } startKey = remains[0].StartKey } return nil } -func (dc *ddlCtx) controlWritePhysicalTableRecord(sess *session, t table.PhysicalTable, bfWorkerType backfillerType, reorgInfo *reorgInfo) error { - startKey, endKey := reorgInfo.StartKey, reorgInfo.EndKey - if startKey == nil && endKey == nil { - return nil - } - - if err := dc.isReorgRunnable(reorgInfo.Job.ID); err != nil { - return errors.Trace(err) - } - - currBackfillJobID := int64(1) - err := checkAndHandleInterruptedBackfillJobs(sess, reorgInfo.Job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey) - if err != nil { - return errors.Trace(err) - } - maxBfJob, err := GetMaxBackfillJob(sess, reorgInfo.Job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey) - if err != nil { - return errors.Trace(err) - } - if maxBfJob != nil { - startKey = maxBfJob.EndKey - currBackfillJobID = maxBfJob.ID + 1 - } - - var isUnique bool - if bfWorkerType == typeAddIndexWorker { - idxInfo := model.FindIndexInfoByID(t.Meta().Indices, reorgInfo.currElement.ID) - isUnique = idxInfo.Unique - } - err = dc.splitTableToBackfillJobs(sess, reorgInfo, t, isUnique, bfWorkerType, startKey, currBackfillJobID) - if err != nil { - return errors.Trace(err) - } - - var backfillJobFinished bool - jobID := reorgInfo.Job.ID - ticker := time.NewTicker(300 * time.Millisecond) - defer ticker.Stop() - for { - if err := dc.isReorgRunnable(reorgInfo.Job.ID); err != nil { - return errors.Trace(err) - } - - select { - case <-ticker.C: - if !backfillJobFinished { - err := checkAndHandleInterruptedBackfillJobs(sess, jobID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey) - if err != nil { - logutil.BgLogger().Warn("[ddl] finish interrupted backfill jobs", zap.Int64("job ID", jobID), zap.Error(err)) - return errors.Trace(err) - } - - bfJob, err := getBackfillJobWithRetry(sess, BackfillTable, jobID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey, false) - if err != nil { - logutil.BgLogger().Info("[ddl] getBackfillJobWithRetry failed", zap.Int64("job ID", jobID), zap.Error(err)) - return errors.Trace(err) - } - if bfJob == nil { - backfillJobFinished = true - logutil.BgLogger().Info("[ddl] finish backfill jobs", zap.Int64("job ID", jobID)) - } - } - if backfillJobFinished { - // TODO: Consider whether these backfill jobs are always out of sync. - isSynced, err := checkJobIsSynced(sess, jobID) - if err != nil { - logutil.BgLogger().Warn("[ddl] checkJobIsSynced failed", zap.Int64("job ID", jobID), zap.Error(err)) - return errors.Trace(err) - } - if isSynced { - logutil.BgLogger().Info("[ddl] sync backfill jobs", zap.Int64("job ID", jobID)) - return nil - } - } - case <-dc.ctx.Done(): - return dc.ctx.Err() - } - } -} - -func checkJobIsSynced(sess *session, jobID int64) (bool, error) { - var err error - var unsyncedInstanceIDs []string - for i := 0; i < retrySQLTimes; i++ { - unsyncedInstanceIDs, err = getUnsyncedInstanceIDs(sess, jobID, "check_backfill_history_job_sync") - if err == nil && len(unsyncedInstanceIDs) == 0 { - return true, nil - } - - logutil.BgLogger().Info("[ddl] checkJobIsSynced failed", - zap.Strings("unsyncedInstanceIDs", unsyncedInstanceIDs), zap.Int("tryTimes", i), zap.Error(err)) - time.Sleep(retrySQLInterval) - } - - return false, errors.Trace(err) -} - -func checkAndHandleInterruptedBackfillJobs(sess *session, jobID, currEleID int64, currEleKey []byte) (err error) { - var bJobs []*BackfillJob - for i := 0; i < retrySQLTimes; i++ { - bJobs, err = GetInterruptedBackfillJobsForOneEle(sess, jobID, currEleID, currEleKey) - if err == nil { - break - } - logutil.BgLogger().Info("[ddl] getInterruptedBackfillJobsForOneEle failed", zap.Error(err)) - time.Sleep(retrySQLInterval) - } - if err != nil { - return errors.Trace(err) - } - if len(bJobs) == 0 { - return nil - } - - for i := 0; i < retrySQLTimes; i++ { - err = MoveBackfillJobsToHistoryTable(sess, bJobs[0]) - if err == nil { - return errors.Errorf(bJobs[0].Meta.ErrMsg) - } - logutil.BgLogger().Info("[ddl] MoveBackfillJobsToHistoryTable failed", zap.Error(err)) - time.Sleep(retrySQLInterval) - } - return errors.Trace(err) -} - -func checkBackfillJobCount(sess *session, jobID, currEleID int64, currEleKey []byte) (backfillJobCnt int, err error) { - err = checkAndHandleInterruptedBackfillJobs(sess, jobID, currEleID, currEleKey) - if err != nil { - return 0, errors.Trace(err) - } - - backfillJobCnt, err = GetBackfillJobCount(sess, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s'", - jobID, currEleID, currEleKey), "check_backfill_job_count") - if err != nil { - return 0, errors.Trace(err) - } - - return backfillJobCnt, nil -} - -func getBackfillJobWithRetry(sess *session, tableName string, jobID, currEleID int64, currEleKey []byte, isDesc bool) (*BackfillJob, error) { - var err error - var bJobs []*BackfillJob - descStr := "" - if isDesc { - descStr = "order by id desc" - } - for i := 0; i < retrySQLTimes; i++ { - bJobs, err = GetBackfillJobs(sess, tableName, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s' %s limit 1", - jobID, currEleID, currEleKey, descStr), "check_backfill_job_state") - if err != nil { - logutil.BgLogger().Warn("[ddl] GetBackfillJobs failed", zap.Error(err)) - continue - } - - if len(bJobs) != 0 { - return bJobs[0], nil - } - break - } - return nil, errors.Trace(err) -} - -// GetMaxBackfillJob gets the max backfill job in BackfillTable and BackfillHistoryTable. -func GetMaxBackfillJob(sess *session, jobID, currEleID int64, currEleKey []byte) (*BackfillJob, error) { - bfJob, err := getBackfillJobWithRetry(sess, BackfillTable, jobID, currEleID, currEleKey, true) - if err != nil { - return nil, errors.Trace(err) - } - hJob, err := getBackfillJobWithRetry(sess, BackfillHistoryTable, jobID, currEleID, currEleKey, true) - if err != nil { - return nil, errors.Trace(err) - } - - if bfJob == nil { - return hJob, nil - } - if hJob == nil { - return bfJob, nil - } - if bfJob.ID > hJob.ID { - return bfJob, nil - } - return hJob, nil -} - -// MoveBackfillJobsToHistoryTable moves backfill table jobs to the backfill history table. -func MoveBackfillJobsToHistoryTable(sctx sessionctx.Context, bfJob *BackfillJob) error { - s, ok := sctx.(*session) - if !ok { - return errors.Errorf("sess ctx:%#v convert session failed", sctx) - } - - return s.runInTxn(func(se *session) error { - // TODO: Consider batch by batch update backfill jobs and insert backfill history jobs. - bJobs, err := GetBackfillJobs(se, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s'", - bfJob.JobID, bfJob.EleID, bfJob.EleKey), "update_backfill_job") - if err != nil { - return errors.Trace(err) - } - if len(bJobs) == 0 { - return nil - } - - txn, err := se.txn() - if err != nil { - return errors.Trace(err) - } - startTS := txn.StartTS() - err = RemoveBackfillJob(se, true, bJobs[0]) - if err == nil { - for _, bj := range bJobs { - bj.State = model.JobStateCancelled - bj.FinishTS = startTS - } - err = AddBackfillHistoryJob(se, bJobs) - } - logutil.BgLogger().Info("[ddl] move backfill jobs to history table", zap.Int("job count", len(bJobs))) - return errors.Trace(err) - }) -} - // recordIterFunc is used for low-level record iteration. type recordIterFunc func(h kv.Handle, rowKey kv.Key, rawRecord []byte) (more bool, err error) diff --git a/ddl/column.go b/ddl/column.go index 833dd7a631f94..3c5eeb995c0c4 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -1126,8 +1126,7 @@ func (w *worker) updateCurrentElement(t table.Table, reorgInfo *reorgInfo) error err := reorgInfo.UpdateReorgMeta(reorgInfo.StartKey, w.sessPool) logutil.BgLogger().Info("[ddl] update column and indexes", zap.Int64("job ID", reorgInfo.Job.ID), - zap.ByteString("element type", reorgInfo.currElement.TypeKey), - zap.Int64("element ID", reorgInfo.currElement.ID), + zap.Stringer("element", reorgInfo.currElement), zap.String("start key", hex.EncodeToString(reorgInfo.StartKey)), zap.String("end key", hex.EncodeToString(reorgInfo.EndKey))) if err != nil { @@ -1157,7 +1156,7 @@ type updateColumnWorker struct { jobContext *JobContext } -func newUpdateColumnWorker(sessCtx sessionctx.Context, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *JobContext) *updateColumnWorker { +func newUpdateColumnWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *JobContext) *updateColumnWorker { if !bytes.Equal(reorgInfo.currElement.TypeKey, meta.ColumnElementKey) { logutil.BgLogger().Error("Element type for updateColumnWorker incorrect", zap.String("jobQuery", reorgInfo.Query), zap.String("reorgInfo", reorgInfo.String())) @@ -1173,7 +1172,7 @@ func newUpdateColumnWorker(sessCtx sessionctx.Context, t table.PhysicalTable, de } rowDecoder := decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap) return &updateColumnWorker{ - backfillCtx: newBackfillCtx(reorgInfo.d, sessCtx, reorgInfo.ReorgMeta.ReorgTp, reorgInfo.SchemaName, t), + backfillCtx: newBackfillCtx(reorgInfo.d, id, sessCtx, reorgInfo.ReorgMeta.ReorgTp, reorgInfo.SchemaName, t), oldColInfo: oldCol, newColInfo: newCol, metricCounter: metrics.BackfillTotalCounter.WithLabelValues(metrics.GenerateReorgLabel("update_col_rate", reorgInfo.SchemaName, t.Meta().Name.String())), @@ -1191,7 +1190,7 @@ func (*updateColumnWorker) String() string { return typeUpdateColumnWorker.String() } -func (*updateColumnWorker) GetTask() (*BackfillJob, error) { +func (*updateColumnWorker) GetTasks() ([]*BackfillJob, error) { panic("[ddl] update column worker GetTask function doesn't implement") } diff --git a/ddl/constant.go b/ddl/constant.go index 3fe6bf4a04ee6..f9de82e2e6dad 100644 --- a/ddl/constant.go +++ b/ddl/constant.go @@ -58,7 +58,7 @@ const ( store_id bigint, type int, exec_id blob default null, - exec_lease Time, + exec_lease timestamp, state int, curr_key blob, start_key blob, @@ -77,7 +77,7 @@ const ( store_id bigint, type int, exec_id blob default null, - exec_lease Time, + exec_lease timestamp, state int, curr_key blob, start_key blob, diff --git a/ddl/db_test.go b/ddl/db_test.go index 4bfe194d5c626..916cdc85ef367 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -991,7 +991,11 @@ func TestAddIndexFailOnCaseWhenCanExit(t *testing.T) { tk.MustExec("drop table if exists t") tk.MustExec("create table t(a int, b int)") tk.MustExec("insert into t values(1, 1)") - tk.MustGetErrMsg("alter table t add index idx(b)", "[ddl:-1]DDL job rollback, error msg: job.ErrCount:1, mock unknown type: ast.whenClause.") + if variable.DDLEnableDistributeReorg.Load() { + tk.MustGetErrMsg("alter table t add index idx(b)", "[ddl:-1]job.ErrCount:0, mock unknown type: ast.whenClause.") + } else { + tk.MustGetErrMsg("alter table t add index idx(b)", "[ddl:-1]DDL job rollback, error msg: job.ErrCount:1, mock unknown type: ast.whenClause.") + } tk.MustExec("drop table if exists t") } diff --git a/ddl/ddl.go b/ddl/ddl.go index 0d2e63fc7d6e1..a9e0f4d96478c 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -48,6 +48,7 @@ import ( "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/terror" + rmutil "github.com/pingcap/tidb/resourcemanager/util" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/binloginfo" "github.com/pingcap/tidb/sessionctx/variable" @@ -59,9 +60,11 @@ import ( "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/gcutil" + "github.com/pingcap/tidb/util/gpool/spmc" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/mathutil" "github.com/pingcap/tidb/util/sqlexec" + "github.com/pingcap/tidb/util/syncutil" "github.com/tikv/client-go/v2/tikvrpc" clientv3 "go.etcd.io/etcd/client/v3" atomicutil "go.uber.org/atomic" @@ -82,8 +85,9 @@ const ( batchAddingJobs = 10 - reorgWorkerCnt = 10 - generalWorkerCnt = 1 + reorgWorkerCnt = 10 + generalWorkerCnt = 1 + backfillWorkerCnt = 32 // checkFlagIndexInJobArgs is the recoverCheckFlag index used in RecoverTable/RecoverSchema job arg list. checkFlagIndexInJobArgs = 1 @@ -274,6 +278,8 @@ type ddl struct { // used in the concurrency ddl. reorgWorkerPool *workerPool generalDDLWorkerPool *workerPool + backfillCtxPool *backfillCtxPool + backfillWorkerPool *spmc.Pool[*reorgBackfillTask, *backfillResult, int, *backfillWorker, *backfillWorkerContext] // get notification if any DDL coming. ddlJobCh chan struct{} } @@ -329,6 +335,8 @@ type ddlCtx struct { statsHandle *handle.Handle tableLockCkr util.DeadTableLockChecker etcdCli *clientv3.Client + // backfillJobCh gets notification if any backfill jobs coming. + backfillJobCh chan struct{} *waitSchemaSyncedController *schemaVersionManager @@ -340,10 +348,11 @@ type ddlCtx struct { // It holds the running DDL jobs ID. runningJobIDs []string // reorgCtx is used for reorganization. - reorgCtx struct { - sync.RWMutex - // reorgCtxMap maps job ID to reorg context. - reorgCtxMap map[int64]*reorgCtx + reorgCtx reorgContexts + // backfillCtx is used for backfill workers. + backfillCtx struct { + syncutil.RWMutex + jobCtxMap map[int64]*JobContext } jobCtx struct { @@ -430,15 +439,15 @@ func (dc *ddlCtx) setDDLLabelForTopSQL(jobID int64, jobQuery string) { ctx.setDDLLabelForTopSQL(jobQuery) } -func (dc *ddlCtx) setDDLSourceForDiagnosis(job *model.Job) { +func (dc *ddlCtx) setDDLSourceForDiagnosis(jobID int64, jobType model.ActionType) { dc.jobCtx.Lock() defer dc.jobCtx.Unlock() - ctx, exists := dc.jobCtx.jobCtxMap[job.ID] + ctx, exists := dc.jobCtx.jobCtxMap[jobID] if !exists { ctx = NewJobContext() - dc.jobCtx.jobCtxMap[job.ID] = ctx + dc.jobCtx.jobCtxMap[jobID] = ctx } - ctx.setDDLLabelForDiagnosis(job) + ctx.setDDLLabelForDiagnosis(jobType) } func (dc *ddlCtx) getResourceGroupTaggerForTopSQL(jobID int64) tikvrpc.ResourceGroupTagger { @@ -466,27 +475,79 @@ func (dc *ddlCtx) jobContext(jobID int64) *JobContext { return NewJobContext() } +func (dc *ddlCtx) removeBackfillCtxJobCtx(jobID int64) { + dc.backfillCtx.Lock() + delete(dc.backfillCtx.jobCtxMap, jobID) + dc.backfillCtx.Unlock() +} + +func (dc *ddlCtx) backfillCtxJobIDs() []int64 { + dc.backfillCtx.Lock() + defer dc.backfillCtx.Unlock() + + runningJobIDs := make([]int64, 0, len(dc.backfillCtx.jobCtxMap)) + for id := range dc.backfillCtx.jobCtxMap { + runningJobIDs = append(runningJobIDs, id) + } + return runningJobIDs +} + +func (dc *ddlCtx) setBackfillCtxJobContext(jobID int64, jobQuery string, jobType model.ActionType) (*JobContext, bool) { + dc.backfillCtx.Lock() + defer dc.backfillCtx.Unlock() + + jobCtx, existent := dc.backfillCtx.jobCtxMap[jobID] + if !existent { + dc.setDDLLabelForTopSQL(jobID, jobQuery) + dc.setDDLSourceForDiagnosis(jobID, jobType) + jobCtx = dc.jobContext(jobID) + dc.backfillCtx.jobCtxMap[jobID] = jobCtx + } + return jobCtx, existent +} + +type reorgContexts struct { + sync.RWMutex + // reorgCtxMap maps job ID to reorg context. + reorgCtxMap map[int64]*reorgCtx +} + +func getReorgCtx(reorgCtxs *reorgContexts, jobID int64) *reorgCtx { + reorgCtxs.RLock() + defer reorgCtxs.RUnlock() + return reorgCtxs.reorgCtxMap[jobID] +} + +// TODO: Using getReorgCtx instead of dc.getReorgCtx. func (dc *ddlCtx) getReorgCtx(jobID int64) *reorgCtx { dc.reorgCtx.RLock() defer dc.reorgCtx.RUnlock() return dc.reorgCtx.reorgCtxMap[jobID] } -func (dc *ddlCtx) newReorgCtx(r *reorgInfo) *reorgCtx { +func (dc *ddlCtx) newReorgCtx(jobID int64, startKey []byte, currElement *meta.Element, rowCount int64) *reorgCtx { rc := &reorgCtx{} rc.doneCh = make(chan error, 1) // initial reorgCtx - rc.setRowCount(r.Job.GetRowCount()) - rc.setNextKey(r.StartKey) - rc.setCurrentElement(r.currElement) + rc.setRowCount(rowCount) + rc.setNextKey(startKey) + rc.setCurrentElement(currElement) rc.mu.warnings = make(map[errors.ErrorID]*terror.Error) rc.mu.warningsCount = make(map[errors.ErrorID]int64) dc.reorgCtx.Lock() defer dc.reorgCtx.Unlock() - dc.reorgCtx.reorgCtxMap[r.Job.ID] = rc + dc.reorgCtx.reorgCtxMap[jobID] = rc return rc } +func (dc *ddlCtx) setReorgCtxForBackfill(bfJob *BackfillJob) { + rc := dc.getReorgCtx(bfJob.JobID) + if rc == nil { + ele := &meta.Element{ID: bfJob.EleID, TypeKey: bfJob.EleKey} + dc.newReorgCtx(bfJob.JobID, bfJob.StartKey, ele, bfJob.RowCount) + } +} + func (dc *ddlCtx) removeReorgCtx(job *model.Job) { dc.reorgCtx.Lock() defer dc.reorgCtx.Unlock() @@ -675,6 +736,24 @@ func (d *ddl) prepareWorkers4ConcurrencyDDL() { d.wg.Run(d.startDispatchLoop) } +func (d *ddl) prepareBackfillWorkers() error { + workerFactory := func() (pools.Resource, error) { + bk := newBackfillWorker(context.Background(), nil) + metrics.DDLCounter.WithLabelValues(fmt.Sprintf("%s_backfill_worker", metrics.CreateDDL)).Inc() + return bk, nil + } + d.backfillCtxPool = newBackfillContextPool(pools.NewResourcePool(workerFactory, backfillWorkerCnt, backfillWorkerCnt, 0)) + var err error + d.backfillWorkerPool, err = spmc.NewSPMCPool[*reorgBackfillTask, *backfillResult, int, *backfillWorker, + *backfillWorkerContext]("backfill", int32(backfillWorkerCnt), rmutil.DDL) + if err != nil { + return err + } + d.backfillJobCh = make(chan struct{}, 1) + d.wg.Run(d.startDispatchBackfillJobsLoop) + return nil +} + // Start implements DDL.Start interface. func (d *ddl) Start(ctxPool *pools.ResourcePool) error { logutil.BgLogger().Info("[ddl] start DDL", zap.String("ID", d.uuid), zap.Bool("runWorker", config.GetGlobalConfig().Instance.TiDBEnableDDL.Load())) @@ -698,12 +777,17 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error { go d.startCleanDeadTableLock() } - // If tidb_enable_ddl is true, we need campaign owner and do DDL job. + // If tidb_enable_ddl is true, we need campaign owner and do DDL jobs. Besides, we also can do backfill jobs. // Otherwise, we needn't do that. if config.GetGlobalConfig().Instance.TiDBEnableDDL.Load() { if err := d.EnableDDL(); err != nil { return err } + + // TODO: Currently, it is only processed during initialization and is expected to be added to EnableDDL later. + if err := d.prepareBackfillWorkers(); err != nil { + return err + } } variable.RegisterStatistics(d) @@ -776,6 +860,12 @@ func (d *ddl) close() { if d.generalDDLWorkerPool != nil { d.generalDDLWorkerPool.close() } + if d.backfillCtxPool != nil { + d.backfillCtxPool.close() + } + if d.backfillWorkerPool != nil { + d.backfillWorkerPool.ReleaseAndWait() + } // d.delRangeMgr using sessions from d.sessPool. // Put it before d.sessPool.close to reduce the time spent by d.sessPool.close. @@ -891,15 +981,16 @@ func getJobCheckInterval(job *model.Job, i int) (time.Duration, bool) { } } -func (d *ddl) asyncNotifyWorker(job *model.Job) { +func (dc *ddlCtx) asyncNotifyWorker(ch chan struct{}, etcdPath string, jobID int64, jobType string) { // If the workers don't run, we needn't notify workers. + // TODO: It does not affect informing the backfill worker. if !config.GetGlobalConfig().Instance.TiDBEnableDDL.Load() { return } - if d.isOwner() { - asyncNotify(d.ddlJobCh) + if dc.isOwner() { + asyncNotify(ch) } else { - d.asyncNotifyByEtcd(addingDDLJobConcurrent, job) + dc.asyncNotifyByEtcd(etcdPath, jobID, jobType) } } @@ -968,7 +1059,7 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error { sessVars.StmtCtx.IsDDLJobInQueue = true // Notice worker that we push a new job and wait the job done. - d.asyncNotifyWorker(job) + d.asyncNotifyWorker(d.ddlJobCh, addingDDLJobConcurrent, job.ID, job.Type.String()) logutil.BgLogger().Info("[ddl] start DDL job", zap.String("job", job.String()), zap.String("query", job.Query)) var historyJob *model.Job @@ -1582,6 +1673,18 @@ func (s *session) runInTxn(f func(*session) error) (err error) { if err != nil { return err } + failpoint.Inject("NotifyBeginTxnCh", func(val failpoint.Value) { + //nolint:forcetypeassert + v := val.(int) + if v == 1 { + mockDDLErrOnce = 1 + TestNotifyBeginTxnCh <- struct{}{} + } else if v == 2 && mockDDLErrOnce == 1 { + <-TestNotifyBeginTxnCh + mockDDLErrOnce = 0 + } + }) + err = f(s) if err != nil { s.rollback() diff --git a/ddl/ddl_test.go b/ddl/ddl_test.go index 6b210d2445c26..dc19436afd9b9 100644 --- a/ddl/ddl_test.go +++ b/ddl/ddl_test.go @@ -58,6 +58,11 @@ var JobNeedGCForTest = jobNeedGC // NewSession is only used for test. var NewSession = newSession +// GetDDLCtx returns ddlCtx for test. +func GetDDLCtx(d DDL) *ddlCtx { + return d.(*ddl).ddlCtx +} + // GetMaxRowID is used for test. func GetMaxRowID(store kv.Storage, priority int, t table.Table, startHandle, endHandle kv.Key) (kv.Key, error) { return getRangeEndKey(NewJobContext(), store, priority, t.RecordPrefix(), startHandle, endHandle) diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 5c700f6273a3b..68b67836216d9 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -52,10 +52,14 @@ import ( var ( // ddlWorkerID is used for generating the next DDL worker ID. ddlWorkerID = atomicutil.NewInt32(0) + // backfillContextID is used for generating the next backfill context ID. + backfillContextID = atomicutil.NewInt32(0) // WaitTimeWhenErrorOccurred is waiting interval when processing DDL jobs encounter errors. WaitTimeWhenErrorOccurred = int64(1 * time.Second) mockDDLErrOnce = int64(0) + // TestNotifyBeginTxnCh is used for if the txn is beginning in runInTxn. + TestNotifyBeginTxnCh = make(chan struct{}) ) // GetWaitTimeWhenErrorOccurred return waiting interval when processing DDL jobs encounter errors. @@ -162,18 +166,19 @@ func (w *worker) Close() { logutil.Logger(w.logCtx).Info("[ddl] DDL worker closed", zap.Duration("take time", time.Since(startTime))) } -func (d *ddl) asyncNotifyByEtcd(addingDDLJobKey string, job *model.Job) { +func (d *ddlCtx) asyncNotifyByEtcd(etcdPath string, jobID int64, jobType string) { if d.etcdCli == nil { return } - jobID := strconv.FormatInt(job.ID, 10) + jobIDStr := strconv.FormatInt(jobID, 10) timeStart := time.Now() - err := util.PutKVToEtcd(d.ctx, d.etcdCli, 1, addingDDLJobKey, jobID) + err := util.PutKVToEtcd(d.ctx, d.etcdCli, 1, etcdPath, jobIDStr) if err != nil { - logutil.BgLogger().Info("[ddl] notify handling DDL job failed", zap.String("jobID", jobID), zap.Error(err)) + logutil.BgLogger().Info("[ddl] notify handling DDL job failed", + zap.String("etcdPath", etcdPath), zap.Int64("jobID", jobID), zap.String("type", jobType), zap.Error(err)) } - metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerNotifyDDLJob, job.Type.String(), metrics.RetLabel(err)).Observe(time.Since(timeStart).Seconds()) + metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerNotifyDDLJob, jobType, metrics.RetLabel(err)).Observe(time.Since(timeStart).Seconds()) } func asyncNotify(ch chan struct{}) { @@ -673,18 +678,18 @@ var DDLBackfillers = map[model.ActionType]string{ model.ActionDropIndex: "drop_index", } -func getDDLRequestSource(job *model.Job) string { - if tp, ok := DDLBackfillers[job.Type]; ok { +func getDDLRequestSource(jobType model.ActionType) string { + if tp, ok := DDLBackfillers[jobType]; ok { return kv.InternalTxnBackfillDDLPrefix + tp } return kv.InternalTxnDDL } -func (w *JobContext) setDDLLabelForDiagnosis(job *model.Job) { +func (w *JobContext) setDDLLabelForDiagnosis(jobType model.ActionType) { if w.tp != "" { return } - w.tp = getDDLRequestSource(job) + w.tp = getDDLRequestSource(jobType) w.ddlJobCtx = kv.WithInternalSourceType(w.ddlJobCtx, w.ddlJobSourceType()) } @@ -737,7 +742,7 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) (int64, error) { txn.SetDiskFullOpt(kvrpcpb.DiskFullOpt_NotAllowedOnFull) } w.setDDLLabelForTopSQL(job.ID, job.Query) - w.setDDLSourceForDiagnosis(job) + w.setDDLSourceForDiagnosis(job.ID, job.Type) jobContext := w.jobContext(job.ID) if tagger := w.getResourceGroupTaggerForTopSQL(job.ID); tagger != nil { txn.SetOption(kv.ResourceGroupTagger, tagger) diff --git a/ddl/ddl_workerpool.go b/ddl/ddl_workerpool.go index de709b6faeb3b..3ed4d8f499fa9 100644 --- a/ddl/ddl_workerpool.go +++ b/ddl/ddl_workerpool.go @@ -88,38 +88,38 @@ func (wp *workerPool) tp() jobType { return wp.t } -// backfilWorkerPool is used to new backfill worker. -type backfilWorkerPool struct { +// backfillCtxPool is used to new backfill context. +type backfillCtxPool struct { exit atomic.Bool resPool *pools.ResourcePool } -func newBackfillWorkerPool(resPool *pools.ResourcePool) *backfilWorkerPool { - return &backfilWorkerPool{ +func newBackfillContextPool(resPool *pools.ResourcePool) *backfillCtxPool { + return &backfillCtxPool{ exit: *atomic.NewBool(false), resPool: resPool, } } // setCapacity changes the capacity of the pool. -// A setCapacity of 0 is equivalent to closing the backfilWorkerPool. -func (bwp *backfilWorkerPool) setCapacity(capacity int) error { - return bwp.resPool.SetCapacity(capacity) +// A setCapacity of 0 is equivalent to closing the backfillCtxPool. +func (bcp *backfillCtxPool) setCapacity(capacity int) error { + return bcp.resPool.SetCapacity(capacity) } -// get gets backfilWorkerPool from context resource pool. -// Please remember to call put after you finished using backfilWorkerPool. -func (bwp *backfilWorkerPool) get() (*backfillWorker, error) { - if bwp.resPool == nil { +// get gets backfillCtxPool from context resource pool. +// Please remember to call put after you finished using backfillCtxPool. +func (bcp *backfillCtxPool) get() (*backfillWorker, error) { + if bcp.resPool == nil { return nil, nil } - if bwp.exit.Load() { + if bcp.exit.Load() { return nil, errors.Errorf("backfill worker pool is closed") } - // no need to protect bwp.resPool - resource, err := bwp.resPool.TryGet() + // no need to protect bcp.resPool + resource, err := bcp.resPool.TryGet() if err != nil { return nil, errors.Trace(err) } @@ -131,24 +131,55 @@ func (bwp *backfilWorkerPool) get() (*backfillWorker, error) { return worker, nil } +// batchGet gets a batch backfillWorkers from context resource pool. +// Please remember to call batchPut after you finished using backfillWorkerPool. +func (bcp *backfillCtxPool) batchGet(cnt int) ([]*backfillWorker, error) { + if bcp.resPool == nil { + return nil, nil + } + + if bcp.exit.Load() { + return nil, errors.Errorf("backfill worker pool is closed") + } + + workers := make([]*backfillWorker, 0, cnt) + for i := 0; i < cnt; i++ { + // no need to protect bcp.resPool + res, err := bcp.resPool.TryGet() + if err != nil { + return nil, errors.Trace(err) + } + if res == nil { + if len(workers) == 0 { + return nil, nil + } + return workers, nil + } + worker := res.(*backfillWorker) + workers = append(workers, worker) + } + + return workers, nil +} + // put returns workerPool to context resource pool. -func (bwp *backfilWorkerPool) put(wk *backfillWorker) { - if bwp.resPool == nil || bwp.exit.Load() { +func (bcp *backfillCtxPool) put(wk *backfillWorker) { + if bcp.resPool == nil || bcp.exit.Load() { return } - // No need to protect bwp.resPool, even the bwp.resPool is closed, the ctx still need to + // No need to protect bcp.resPool, even the bcp.resPool is closed, the ctx still need to // put into resPool, because when resPool is closing, it will wait all the ctx returns, then resPool finish closing. - bwp.resPool.Put(wk) + bcp.resPool.Put(wk) } -// close clean up the backfilWorkerPool. -func (bwp *backfilWorkerPool) close() { +// close clean up the backfillCtxPool. +func (bcp *backfillCtxPool) close() { // Prevent closing resPool twice. - if bwp.resPool == nil || bwp.exit.Load() { + if bcp.resPool == nil || bcp.exit.Load() { return } - bwp.exit.Store(true) + bcp.exit.Store(true) logutil.BgLogger().Info("[ddl] closing workerPool") - bwp.resPool.Close() + bcp.resPool.Close() } diff --git a/ddl/ddl_workerpool_test.go b/ddl/ddl_workerpool_test.go index 123d05abb1d86..8f86e816507d5 100644 --- a/ddl/ddl_workerpool_test.go +++ b/ddl/ddl_workerpool_test.go @@ -37,36 +37,54 @@ func TestDDLWorkerPool(t *testing.T) { func TestBackfillWorkerPool(t *testing.T) { f := func() func() (pools.Resource, error) { return func() (pools.Resource, error) { - wk := newBackfillWorker(context.Background(), 1, nil) + wk := newBackfillWorker(context.Background(), nil) return wk, nil } } - pool := newBackfillWorkerPool(pools.NewResourcePool(f(), 1, 2, 0)) - bwp, err := pool.get() + pool := newBackfillContextPool(pools.NewResourcePool(f(), 3, 4, 0)) + bc, err := pool.get() require.NoError(t, err) - require.Equal(t, 1, bwp.id) + require.NotNil(t, bc) + require.Nil(t, bc.backfiller) + bcs, err := pool.batchGet(3) + require.NoError(t, err) + require.Len(t, bcs, 2) // test it to reach the capacity - bwp1, err := pool.get() + bc1, err := pool.get() + require.NoError(t, err) + require.Nil(t, bc1) + bcs1, err := pool.batchGet(1) require.NoError(t, err) - require.Nil(t, bwp1) + require.Nil(t, bcs1) // test setCapacity - err = pool.setCapacity(2) + err = pool.setCapacity(5) + require.Equal(t, "capacity 5 is out of range", err.Error()) + err = pool.setCapacity(4) require.NoError(t, err) - bwp1, err = pool.get() + bc1, err = pool.get() require.NoError(t, err) - require.Equal(t, 1, bwp1.id) - pool.put(bwp) - pool.put(bwp1) + require.NotNil(t, bc) + require.Nil(t, bc.backfiller) + pool.put(bc) + pool.put(bc1) + for _, bc := range bcs { + pool.put(bc) + } // test close pool.close() pool.close() require.Equal(t, true, pool.exit.Load()) - pool.put(bwp1) + pool.put(bc1) + + bc, err = pool.get() + require.Error(t, err) + require.Equal(t, "backfill worker pool is closed", err.Error()) + require.Nil(t, bc) - bwp, err = pool.get() + bcs, err = pool.batchGet(1) require.Error(t, err) require.Equal(t, "backfill worker pool is closed", err.Error()) - require.Nil(t, bwp) + require.Nil(t, bcs) } diff --git a/ddl/delete_range.go b/ddl/delete_range.go index 669ff286ea9dd..899bb1bb7143e 100644 --- a/ddl/delete_range.go +++ b/ddl/delete_range.go @@ -266,7 +266,7 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, return errors.Trace(err) } - ctx = kv.WithInternalSourceType(ctx, getDDLRequestSource(job)) + ctx = kv.WithInternalSourceType(ctx, getDDLRequestSource(job.Type)) s := sctx.(sqlexec.SQLExecutor) switch job.Type { case model.ActionDropSchema: diff --git a/ddl/dist_backfilling.go b/ddl/dist_backfilling.go new file mode 100644 index 0000000000000..79b1250d0e98c --- /dev/null +++ b/ddl/dist_backfilling.go @@ -0,0 +1,282 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddl + +import ( + "encoding/hex" + "sync" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/ddl/ingest" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/resourcemanager/pooltask" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/util" + "github.com/pingcap/tidb/util/dbterror" + "github.com/pingcap/tidb/util/gpool/spmc" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" +) + +type backfillWorkerContext struct { + currID int + mu sync.Mutex + sessCtxs []sessionctx.Context + backfillWorkers []*backfillWorker +} + +type newBackfillerFunc func(bfCtx *backfillCtx) (bf backfiller, err error) + +func newBackfillWorkerContext(d *ddl, schemaName string, tbl table.Table, workerCnt int, jobID int64, bfMeta *model.BackfillMeta, + bfFunc newBackfillerFunc) (*backfillWorkerContext, error) { + if workerCnt <= 0 { + return nil, nil + } + + bwCtx := &backfillWorkerContext{backfillWorkers: make([]*backfillWorker, 0, workerCnt), sessCtxs: make([]sessionctx.Context, 0, workerCnt)} + var err error + defer func() { + if err != nil { + bwCtx.close(d) + } + }() + + for i := 0; i < workerCnt; i++ { + var se sessionctx.Context + se, err = d.sessPool.get() + if err != nil { + logutil.BgLogger().Error("[ddl] new backfill worker context, get a session failed", zap.Int64("jobID", jobID), zap.Error(err)) + return nil, errors.Trace(err) + } + bwCtx.sessCtxs = append(bwCtx.sessCtxs, se) + err = initSessCtx(se, bfMeta.SQLMode, bfMeta.Location) + if err != nil { + logutil.BgLogger().Error("[ddl] new backfill worker context, init the session ctx failed", zap.Int64("jobID", jobID), zap.Error(err)) + return nil, errors.Trace(err) + } + + var bf backfiller + bf, err = bfFunc(newBackfillCtx(d.ddlCtx, 0, se, bfMeta.ReorgTp, schemaName, tbl)) + if err != nil { + if canSkipError(jobID, len(bwCtx.backfillWorkers), err) { + err = nil + continue + } + logutil.BgLogger().Error("[ddl] new backfill worker context, do bfFunc failed", zap.Int64("jobID", jobID), zap.Error(err)) + return nil, errors.Trace(err) + } + var bCtx *backfillWorker + bCtx, err = d.backfillCtxPool.get() + if err != nil || bCtx == nil { + logutil.BgLogger().Info("[ddl] new backfill worker context, get backfill context failed", zap.Int64("jobID", jobID), zap.Error(err)) + err = nil + break + } + bCtx.backfiller = bf + bwCtx.backfillWorkers = append(bwCtx.backfillWorkers, bCtx) + } + return bwCtx, nil +} + +func (bwCtx *backfillWorkerContext) GetContext() *backfillWorker { + bwCtx.mu.Lock() + // TODO: Special considerations are required if the number of consumers we get from the backfillWorkerPool is increased. + offset := bwCtx.currID % len(bwCtx.backfillWorkers) + // To prevent different workers from using the same session. + bw := bwCtx.backfillWorkers[offset] + logutil.BgLogger().Info("[ddl] backfill worker get context", zap.Int("workerCount", len(bwCtx.backfillWorkers)), + zap.Int("currID", bwCtx.currID), zap.Int("offset", offset), zap.Stringer("backfill worker", bw)) + bwCtx.currID++ + bwCtx.mu.Unlock() + return bw +} + +func runBackfillJobs(d *ddl, ingestBackendCtx *ingest.BackendContext, bJob *BackfillJob, jobCtx *JobContext) (table.Table, error) { + dbInfo, tbl, err := d.getTableByTxn(d.store, bJob.Meta.SchemaID, bJob.Meta.TableID) + if err != nil { + logutil.BgLogger().Warn("[ddl] runBackfillJobs gets table failed", zap.String("bfJob", bJob.AbbrStr()), zap.Error(err)) + return nil, err + } + se, err := d.sessPool.get() + if err != nil { + logutil.BgLogger().Warn("[ddl] run backfill jobs get session failed", zap.Error(err)) + return nil, err + } + defer d.sessPool.put(se) + sess := newSession(se) + + workerCnt := int(variable.GetDDLReorgWorkerCounter()) + // TODO: Different worker using different newBackfillerFunc. + workerCtx, err := newAddIndexWorkerContext(d, dbInfo.Name, tbl, workerCnt, bJob, jobCtx) + if err != nil || workerCtx == nil { + logutil.BgLogger().Info("[ddl] new adding index worker context failed", zap.Reflect("workerCtx", workerCtx), zap.Error(err)) + return nil, errors.Trace(err) + } + workerCnt = len(workerCtx.backfillWorkers) + bwMgr := newBackfilWorkerManager(workerCtx) + d.backfillWorkerPool.SetConsumerFunc(func(task *reorgBackfillTask, _ int, bfWorker *backfillWorker) *backfillResult { + return bfWorker.runTask(task) + }) + + proFunc := func() ([]*reorgBackfillTask, error) { + // TODO: After BackfillJob replaces reorgBackfillTask, use backfiller's GetTasks instead of it. + return GetTasks(d.ddlCtx, sess, tbl, bJob.JobID, workerCnt+5) + } + // add new task + resultCh, control := d.backfillWorkerPool.AddProduceBySlice(proFunc, 0, workerCtx, spmc.WithConcurrency(workerCnt)) + bwMgr.waitFinalResult(resultCh, ingestBackendCtx, bJob.EleID, control) + + // waiting task finishing + control.Wait() + err = bwMgr.close(d) + + return tbl, err +} + +func (bwCtx *backfillWorkerContext) close(d *ddl) { + for _, s := range bwCtx.sessCtxs { + d.sessPool.put(s) + } + for _, w := range bwCtx.backfillWorkers { + d.backfillCtxPool.put(w) + } +} + +type backfilWorkerManager struct { + bwCtx *backfillWorkerContext + wg util.WaitGroupWrapper + unsyncErr error + exitCh chan struct{} +} + +func newBackfilWorkerManager(bwCtx *backfillWorkerContext) *backfilWorkerManager { + return &backfilWorkerManager{ + bwCtx: bwCtx, + exitCh: make(chan struct{}), + } +} + +func (bwm *backfilWorkerManager) waitFinalResult(resultCh <-chan *backfillResult, ingestBackendCtx *ingest.BackendContext, eleID int64, + tControl pooltask.TaskController[*reorgBackfillTask, *backfillResult, int, *backfillWorker, *backfillWorkerContext]) { + bwm.wg.Run(func() { + i := 0 + workerCnt := len(bwm.bwCtx.backfillWorkers) + + for { + select { + case result, ok := <-resultCh: + if !ok { + return + } + if result.err != nil { + logutil.BgLogger().Warn("handle backfill task failed", zap.Error(result.err)) + bwm.unsyncErr = result.err + tControl.Stop() + return + } + + if ingestBackendCtx != nil && i%workerCnt == 0 { + err := ingestBackendCtx.Flush(eleID) + if err != nil { + bwm.unsyncErr = err + return + } + } + i++ + case <-bwm.exitCh: + return + } + } + }) +} + +func (bwm *backfilWorkerManager) close(d *ddl) error { + close(bwm.exitCh) + bwm.wg.Wait() + + bwm.bwCtx.close(d) + + return bwm.unsyncErr +} + +// backfillJob2Task builds reorg task. +func (dc *ddlCtx) backfillJob2Task(t table.Table, bfJob *BackfillJob) (*reorgBackfillTask, error) { + pt := t.(table.PhysicalTable) + if tbl, ok := t.(table.PartitionedTable); ok { + pt = tbl.GetPartition(bfJob.Meta.PhysicalTableID) + if pt == nil { + return nil, dbterror.ErrCancelledDDLJob.GenWithStack("Can not find partition id %d for table %d", bfJob.Meta.PhysicalTableID, t.Meta().ID) + } + } + endKey := bfJob.EndKey + // TODO: Check reorgInfo.mergingTmpIdx + endK, err := getRangeEndKey(dc.jobContext(bfJob.JobID), dc.store, bfJob.Meta.Priority, pt.RecordPrefix(), bfJob.StartKey, endKey) + if err != nil { + logutil.BgLogger().Info("[ddl] convert backfill job to task, get reverse key failed", zap.String("backfill job", bfJob.AbbrStr()), zap.Error(err)) + } else { + logutil.BgLogger().Info("[ddl] convert backfill job to task, change end key", zap.String("backfill job", + bfJob.AbbrStr()), zap.String("current key", hex.EncodeToString(bfJob.StartKey)), zap.Bool("end include", bfJob.Meta.EndInclude), + zap.String("end key", hex.EncodeToString(endKey)), zap.String("current end key", hex.EncodeToString(endK))) + endKey = endK + } + + return &reorgBackfillTask{ + bfJob: bfJob, + physicalTable: pt, + // TODO: Remove these fields after remove the old logic. + sqlQuery: bfJob.Meta.Query, + startKey: bfJob.StartKey, + endKey: endKey, + endInclude: bfJob.Meta.EndInclude, + priority: bfJob.Meta.Priority}, nil +} + +// GetTasks gets the backfill tasks associated with the non-runningJobID. +func GetTasks(d *ddlCtx, sess *session, tbl table.Table, runningJobID int64, concurrency int) ([]*reorgBackfillTask, error) { + // TODO: At present, only add index is processed. In the future, different elements need to be distinguished. + var err error + var bJobs []*BackfillJob + for i := 0; i < retrySQLTimes; i++ { + bJobs, err = GetAndMarkBackfillJobsForOneEle(sess, concurrency, runningJobID, d.uuid, InstanceLease) + if err != nil { + // TODO: add test: if all tidbs can't get the unmark backfill job(a tidb mark a backfill job, other tidbs returned, then the tidb can't handle this job.) + if dbterror.ErrDDLJobNotFound.Equal(err) { + logutil.BgLogger().Info("no backfill job, handle backfill task finished") + return nil, err + } + if kv.ErrWriteConflict.Equal(err) { + logutil.BgLogger().Info("GetAndMarkBackfillJobsForOneEle failed", zap.Error(err)) + time.Sleep(RetrySQLInterval) + continue + } + } + + tasks := make([]*reorgBackfillTask, 0, len(bJobs)) + for _, bJ := range bJobs { + task, err := d.backfillJob2Task(tbl, bJ) + if err != nil { + return nil, err + } + tasks = append(tasks, task) + } + return tasks, nil + } + + return nil, err +} diff --git a/ddl/dist_owner.go b/ddl/dist_owner.go new file mode 100644 index 0000000000000..d10491fd0da3e --- /dev/null +++ b/ddl/dist_owner.go @@ -0,0 +1,315 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddl + +import ( + "context" + "fmt" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/meta" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/util/dbterror" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" +) + +// CheckBackfillJobFinishInterval is export for test. +var CheckBackfillJobFinishInterval = 300 * time.Millisecond + +func initDistReorg(reorgMeta *model.DDLReorgMeta, store kv.Storage, schemaID int64, tblInfo *model.TableInfo) error { + tbl, err := getTable(store, schemaID, tblInfo) + if err != nil { + return errors.Trace(err) + } + + isDistReorg := variable.DDLEnableDistributeReorg.Load() + // TODO: Support partitionTable. + if _, ok := tbl.(table.PartitionedTable); ok { + isDistReorg = false + } + reorgMeta.IsDistReorg = isDistReorg + return nil +} + +func (dc *ddlCtx) controlWritePhysicalTableRecord(sess *session, t table.PhysicalTable, bfWorkerType backfillerType, reorgInfo *reorgInfo) error { + startKey, endKey := reorgInfo.StartKey, reorgInfo.EndKey + if startKey == nil && endKey == nil { + return nil + } + + ddlJobID := reorgInfo.Job.ID + if err := dc.isReorgRunnable(ddlJobID, true); err != nil { + return errors.Trace(err) + } + + currEle := reorgInfo.currElement + defaultSQLMode := sess.GetSessionVars().SQLMode + defer func() { + sess.GetSessionVars().SQLMode = defaultSQLMode + }() + // Make timestamp type can be inserted ZeroTimestamp. + sess.GetSessionVars().SQLMode = mysql.ModeNone + currBackfillJobID := int64(1) + err := checkAndHandleInterruptedBackfillJobs(sess, ddlJobID, currEle.ID, currEle.TypeKey) + if err != nil { + return errors.Trace(err) + } + maxBfJob, err := GetMaxBackfillJob(sess, ddlJobID, currEle.ID, currEle.TypeKey) + if err != nil { + return errors.Trace(err) + } + if maxBfJob != nil { + startKey = maxBfJob.EndKey + currBackfillJobID = maxBfJob.ID + 1 + } + + var isUnique bool + if bfWorkerType == typeAddIndexWorker { + idxInfo := model.FindIndexInfoByID(t.Meta().Indices, currEle.ID) + isUnique = idxInfo.Unique + } + err = dc.splitTableToBackfillJobs(sess, reorgInfo, t, isUnique, bfWorkerType, startKey, currBackfillJobID) + if err != nil { + return errors.Trace(err) + } + return checkReorgJobFinished(dc.ctx, sess, &dc.reorgCtx, ddlJobID, currEle) +} + +func checkReorgJobFinished(ctx context.Context, sess *session, reorgCtxs *reorgContexts, ddlJobID int64, currEle *meta.Element) error { + var times int64 + var bfJob *BackfillJob + var backfillJobFinished bool + ticker := time.NewTicker(CheckBackfillJobFinishInterval) + defer ticker.Stop() + for { + if getReorgCtx(reorgCtxs, ddlJobID).isReorgCanceled() { + // Job is cancelled. So it can't be done. + return dbterror.ErrCancelledDDLJob + } + + select { + case <-ticker.C: + times++ + // Print this log every 5 min. + if times%1000 == 0 { + logutil.BgLogger().Info("[ddl] check all backfill jobs is finished", + zap.Int64("job ID", ddlJobID), zap.Bool("isFinished", backfillJobFinished), zap.Reflect("bfJob", bfJob)) + } + if !backfillJobFinished { + err := checkAndHandleInterruptedBackfillJobs(sess, ddlJobID, currEle.ID, currEle.TypeKey) + if err != nil { + logutil.BgLogger().Warn("[ddl] finish interrupted backfill jobs", zap.Int64("job ID", ddlJobID), zap.Error(err)) + return errors.Trace(err) + } + + bfJob, err = getBackfillJobWithRetry(sess, BackfillTable, ddlJobID, currEle.ID, currEle.TypeKey, false) + if err != nil { + logutil.BgLogger().Info("[ddl] getBackfillJobWithRetry failed", zap.Int64("job ID", ddlJobID), zap.Error(err)) + return errors.Trace(err) + } + if bfJob == nil { + backfillJobFinished = true + logutil.BgLogger().Info("[ddl] finish all backfill jobs", zap.Int64("job ID", ddlJobID)) + } + } + if backfillJobFinished { + // TODO: Consider whether these backfill jobs are always out of sync. + isSynced, err := checkJobIsFinished(sess, ddlJobID) + if err != nil { + logutil.BgLogger().Warn("[ddl] checkJobIsFinished failed", zap.Int64("job ID", ddlJobID), zap.Error(err)) + return errors.Trace(err) + } + if isSynced { + logutil.BgLogger().Info("[ddl] finish all backfill jobs and put them to history", zap.Int64("job ID", ddlJobID)) + return GetBackfillErr(sess, ddlJobID, currEle.ID, currEle.TypeKey) + } + } + case <-ctx.Done(): + return ctx.Err() + } + } +} + +func checkJobIsFinished(sess *session, ddlJobID int64) (bool, error) { + var err error + var unsyncedInstanceIDs []string + for i := 0; i < retrySQLTimes; i++ { + unsyncedInstanceIDs, err = getUnsyncedInstanceIDs(sess, ddlJobID, "check_backfill_history_job_sync") + if err == nil && len(unsyncedInstanceIDs) == 0 { + return true, nil + } + + logutil.BgLogger().Info("[ddl] checkJobIsSynced failed", + zap.Strings("unsyncedInstanceIDs", unsyncedInstanceIDs), zap.Int("tryTimes", i), zap.Error(err)) + time.Sleep(RetrySQLInterval) + } + + return false, errors.Trace(err) +} + +// GetBackfillErr gets the error in backfill job. +func GetBackfillErr(sess *session, ddlJobID, currEleID int64, currEleKey []byte) error { + var err error + var metas []*model.BackfillMeta + for i := 0; i < retrySQLTimes; i++ { + metas, err = GetBackfillMetas(sess, BackfillHistoryTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = %s", + ddlJobID, currEleID, wrapKey2String(currEleKey)), "get_backfill_job_metas") + if err == nil { + for _, m := range metas { + if m.Error != nil { + return m.Error + } + } + return nil + } + + logutil.BgLogger().Info("[ddl] GetBackfillMetas failed in checkJobIsSynced", zap.Int("tryTimes", i), zap.Error(err)) + time.Sleep(RetrySQLInterval) + } + + return errors.Trace(err) +} + +func checkAndHandleInterruptedBackfillJobs(sess *session, ddlJobID, currEleID int64, currEleKey []byte) (err error) { + var bJobs []*BackfillJob + for i := 0; i < retrySQLTimes; i++ { + bJobs, err = GetInterruptedBackfillJobForOneEle(sess, ddlJobID, currEleID, currEleKey) + if err == nil { + break + } + logutil.BgLogger().Info("[ddl] getInterruptedBackfillJobsForOneEle failed", zap.Error(err)) + time.Sleep(RetrySQLInterval) + } + if err != nil { + return errors.Trace(err) + } + if len(bJobs) == 0 { + return nil + } + + for i := 0; i < retrySQLTimes; i++ { + err = MoveBackfillJobsToHistoryTable(sess, bJobs[0]) + if err == nil { + return bJobs[0].Meta.Error + } + logutil.BgLogger().Info("[ddl] MoveBackfillJobsToHistoryTable failed", zap.Error(err)) + time.Sleep(RetrySQLInterval) + } + return errors.Trace(err) +} + +func checkBackfillJobCount(sess *session, ddlJobID, currEleID int64, currEleKey []byte) (backfillJobCnt int, err error) { + err = checkAndHandleInterruptedBackfillJobs(sess, ddlJobID, currEleID, currEleKey) + if err != nil { + return 0, errors.Trace(err) + } + + backfillJobCnt, err = GetBackfillJobCount(sess, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = %s", + ddlJobID, currEleID, wrapKey2String(currEleKey)), "check_backfill_job_count") + if err != nil { + return 0, errors.Trace(err) + } + + return backfillJobCnt, nil +} + +func getBackfillJobWithRetry(sess *session, tableName string, ddlJobID, currEleID int64, currEleKey []byte, isDesc bool) (*BackfillJob, error) { + var err error + var bJobs []*BackfillJob + descStr := "" + if isDesc { + descStr = "order by id desc" + } + for i := 0; i < retrySQLTimes; i++ { + bJobs, err = GetBackfillJobs(sess, tableName, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = %s %s limit 1", + ddlJobID, currEleID, wrapKey2String(currEleKey), descStr), "check_backfill_job_state") + if err != nil { + logutil.BgLogger().Warn("[ddl] GetBackfillJobs failed", zap.Error(err)) + continue + } + + if len(bJobs) != 0 { + return bJobs[0], nil + } + break + } + return nil, errors.Trace(err) +} + +// GetMaxBackfillJob gets the max backfill job in BackfillTable and BackfillHistoryTable. +func GetMaxBackfillJob(sess *session, ddlJobID, currEleID int64, currEleKey []byte) (*BackfillJob, error) { + bfJob, err := getBackfillJobWithRetry(sess, BackfillTable, ddlJobID, currEleID, currEleKey, true) + if err != nil { + return nil, errors.Trace(err) + } + hJob, err := getBackfillJobWithRetry(sess, BackfillHistoryTable, ddlJobID, currEleID, currEleKey, true) + if err != nil { + return nil, errors.Trace(err) + } + + if bfJob == nil { + return hJob, nil + } + if hJob == nil { + return bfJob, nil + } + if bfJob.ID > hJob.ID { + return bfJob, nil + } + return hJob, nil +} + +// MoveBackfillJobsToHistoryTable moves backfill table jobs to the backfill history table. +func MoveBackfillJobsToHistoryTable(sctx sessionctx.Context, bfJob *BackfillJob) error { + s, ok := sctx.(*session) + if !ok { + return errors.Errorf("sess ctx:%#v convert session failed", sctx) + } + + return s.runInTxn(func(se *session) error { + // TODO: Consider batch by batch update backfill jobs and insert backfill history jobs. + bJobs, err := GetBackfillJobs(se, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = %s", + bfJob.JobID, bfJob.EleID, wrapKey2String(bfJob.EleKey)), "update_backfill_job") + if err != nil { + return errors.Trace(err) + } + if len(bJobs) == 0 { + return nil + } + + txn, err := se.txn() + if err != nil { + return errors.Trace(err) + } + startTS := txn.StartTS() + err = RemoveBackfillJob(se, true, bJobs[0]) + if err == nil { + for _, bj := range bJobs { + bj.State = model.JobStateCancelled + bj.FinishTS = startTS + } + err = AddBackfillHistoryJob(se, bJobs) + } + logutil.BgLogger().Info("[ddl] move backfill jobs to history table", zap.Int("job count", len(bJobs))) + return errors.Trace(err) + }) +} diff --git a/ddl/failtest/fail_db_test.go b/ddl/failtest/fail_db_test.go index 4a938e5fd2ad4..6f33f6b2107dd 100644 --- a/ddl/failtest/fail_db_test.go +++ b/ddl/failtest/fail_db_test.go @@ -333,6 +333,10 @@ func TestRunDDLJobPanicDisableClusteredIndex(t *testing.T) { } func testAddIndexWorkerNum(t *testing.T, s *failedSuite, test func(*testkit.TestKit)) { + if variable.DDLEnableDistributeReorg.Load() { + t.Skip("dist reorg didn't support checkBackfillWorkerNum, skip this test") + } + tk := testkit.NewTestKit(t, s.store) tk.MustExec("create database if not exists test_db") tk.MustExec("use test_db") diff --git a/ddl/index.go b/ddl/index.go index cc3a3d3795242..312d7fe2cf305 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -41,7 +41,6 @@ import ( "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/sessionctx" - "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" @@ -656,6 +655,12 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo // Initialize SnapshotVer to 0 for later reorganization check. job.SnapshotVer = 0 job.SchemaState = model.StateWriteReorganization + + if job.MultiSchemaInfo == nil { + if err := initDistReorg(job.ReorgMeta, d.store, schemaID, tblInfo); err != nil { + return ver, errors.Trace(err) + } + } case model.StateWriteReorganization: // reorganization -> public tbl, err := getTable(d.store, schemaID, tblInfo) @@ -667,7 +672,11 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo if job.MultiSchemaInfo != nil { done, ver, err = doReorgWorkForCreateIndexMultiSchema(w, d, t, job, tbl, indexInfo) } else { - done, ver, err = doReorgWorkForCreateIndex(w, d, t, job, tbl, indexInfo) + if job.ReorgMeta.IsDistReorg { + done, ver, err = doReorgWorkForCreateIndexWithDistReorg(w, d, t, job, tbl, indexInfo) + } else { + done, ver, err = doReorgWorkForCreateIndex(w, d, t, job, tbl, indexInfo) + } } if !done { return ver, err @@ -886,6 +895,56 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo } } +func doReorgWorkForCreateIndexWithDistReorg(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, + tbl table.Table, indexInfo *model.IndexInfo) (done bool, ver int64, err error) { + bfProcess := pickBackfillType(w, job) + if !bfProcess.NeedMergeProcess() { + return runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, false) + } + switch indexInfo.BackfillState { + case model.BackfillStateRunning: + logutil.BgLogger().Info("[ddl] index backfill state running", + zap.Int64("job ID", job.ID), zap.String("table", tbl.Meta().Name.O), + zap.Bool("ingest mode", bfProcess == model.ReorgTypeLitMerge), + zap.String("index", indexInfo.Name.O)) + switch bfProcess { + case model.ReorgTypeLitMerge: + done, ver, err = runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, false) + if err != nil { + logutil.BgLogger().Warn("[ddl] dist lightning import error", zap.Error(err)) + return false, ver, errors.Trace(err) + } + if !done { + return false, ver, nil + } + case model.ReorgTypeTxnMerge: + done, ver, err = runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, false) + if err != nil || !done { + return false, ver, errors.Trace(err) + } + } + indexInfo.BackfillState = model.BackfillStateReadyToMerge + ver, err = updateVersionAndTableInfo(d, t, job, tbl.Meta(), true) + return false, ver, errors.Trace(err) + case model.BackfillStateReadyToMerge: + logutil.BgLogger().Info("[ddl] index backfill state ready to merge", zap.Int64("job ID", job.ID), + zap.String("table", tbl.Meta().Name.O), zap.String("index", indexInfo.Name.O)) + indexInfo.BackfillState = model.BackfillStateMerging + job.SnapshotVer = 0 // Reset the snapshot version for merge index reorg. + ver, err = updateVersionAndTableInfo(d, t, job, tbl.Meta(), true) + return false, ver, errors.Trace(err) + case model.BackfillStateMerging: + done, ver, err = runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, true) + if !done { + return false, ver, err + } + indexInfo.BackfillState = model.BackfillStateInapplicable // Prevent double-write on this index. + return true, ver, nil + default: + return false, 0, dbterror.ErrInvalidDDLState.GenWithStackByArgs("backfill", indexInfo.BackfillState) + } +} + func convertToKeyExistsErr(originErr error, idxInfo *model.IndexInfo, tblInfo *model.TableInfo) error { tErr, ok := errors.Cause(originErr).(*terror.Error) if !ok { @@ -934,7 +993,9 @@ func runReorgJobAndHandleErr(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, // if timeout, we should return, check for the owner and re-wait job done. return false, ver, nil } - if kv.ErrKeyExists.Equal(err) || dbterror.ErrCancelledDDLJob.Equal(err) || dbterror.ErrCantDecodeRecord.Equal(err) { + if kv.ErrKeyExists.Equal(err) || dbterror.ErrCancelledDDLJob.Equal(err) || dbterror.ErrCantDecodeRecord.Equal(err) || + // TODO: Remove this check make it can be retry. Related test is TestModifyColumnReorgInfo. + job.ReorgMeta.IsDistReorg { logutil.BgLogger().Warn("[ddl] run add index job failed, convert job to rollback", zap.String("job", job.String()), zap.Error(err)) ver, err = convertAddIdxJob2RollbackJob(d, t, job, tbl.Meta(), indexInfo, err) if err1 := rh.RemoveDDLReorgHandle(job, reorgInfo.elements); err1 != nil { @@ -1249,7 +1310,7 @@ type addIndexWorker struct { distinctCheckFlags []bool } -func newAddIndexWorker(decodeColMap map[int64]decoder.Column, id int, t table.PhysicalTable, bfCtx *backfillCtx, jc *JobContext, jobID, eleID int64, eleTypeKey []byte) (*addIndexWorker, error) { +func newAddIndexWorker(decodeColMap map[int64]decoder.Column, t table.PhysicalTable, bfCtx *backfillCtx, jc *JobContext, jobID, eleID int64, eleTypeKey []byte) (*addIndexWorker, error) { if !bytes.Equal(eleTypeKey, meta.IndexElementKey) { logutil.BgLogger().Error("Element type for addIndexWorker incorrect", zap.String("jobQuery", jc.cacheSQL), zap.Int64("job ID", jobID), zap.ByteString("element type", eleTypeKey), zap.Int64("element ID", eleID)) @@ -1269,7 +1330,7 @@ func newAddIndexWorker(decodeColMap map[int64]decoder.Column, id int, t table.Ph if err != nil { return nil, errors.Trace(err) } - lwCtx, err = ei.NewWriterCtx(id, indexInfo.Unique) + lwCtx, err = ei.NewWriterCtx(bfCtx.id, indexInfo.Unique) if err != nil { return nil, err } @@ -1294,7 +1355,7 @@ func (w *baseIndexWorker) AddMetricInfo(cnt float64) { w.metricCounter.Add(cnt) } -func (*baseIndexWorker) GetTask() (*BackfillJob, error) { +func (*baseIndexWorker) GetTasks() ([]*BackfillJob, error) { return nil, nil } @@ -1306,8 +1367,8 @@ func (w *baseIndexWorker) UpdateTask(bfJob *BackfillJob) error { s := newSession(w.backfillCtx.sessCtx) return s.runInTxn(func(se *session) error { - jobs, err := GetBackfillJobs(se, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s' and id = %d", - bfJob.JobID, bfJob.EleID, bfJob.EleKey, bfJob.ID), "update_backfill_task") + jobs, err := GetBackfillJobs(se, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = %s and id = %d", + bfJob.JobID, bfJob.EleID, wrapKey2String(bfJob.EleKey), bfJob.ID), "update_backfill_task") if err != nil { return err } @@ -1347,6 +1408,22 @@ func (w *baseIndexWorker) GetCtx() *backfillCtx { return w.backfillCtx } +func newAddIndexWorkerContext(d *ddl, schemaName model.CIStr, tbl table.Table, workerCnt int, + bfJob *BackfillJob, jobCtx *JobContext) (*backfillWorkerContext, error) { + //nolint:forcetypeassert + phyTbl := tbl.(table.PhysicalTable) + return newBackfillWorkerContext(d, schemaName.O, tbl, workerCnt, bfJob.JobID, bfJob.Meta, + func(bfCtx *backfillCtx) (backfiller, error) { + decodeColMap, err := makeupDecodeColMap(bfCtx.sessCtx, schemaName, phyTbl) + if err != nil { + logutil.BgLogger().Error("[ddl] make up decode column map failed", zap.Error(err)) + return nil, errors.Trace(err) + } + bf, err1 := newAddIndexWorker(decodeColMap, phyTbl, bfCtx, jobCtx, bfJob.JobID, bfJob.EleID, bfJob.EleKey) + return bf, err1 + }) +} + // mockNotOwnerErrOnce uses to make sure `notOwnerErr` only mock error once. var mockNotOwnerErrOnce uint32 @@ -1405,7 +1482,7 @@ func (w *baseIndexWorker) getNextKey(taskRange reorgBackfillTask, taskDone bool) if !taskDone { // The task is not done. So we need to pick the last processed entry's handle and add one. lastHandle := w.idxRecords[len(w.idxRecords)-1].handle - recordKey := tablecodec.EncodeRecordKey(w.table.RecordPrefix(), lastHandle) + recordKey := tablecodec.EncodeRecordKey(taskRange.physicalTable.RecordPrefix(), lastHandle) return recordKey.Next() } if taskRange.endInclude { @@ -1481,7 +1558,7 @@ func (w *baseIndexWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgBac taskDone = true } - logutil.BgLogger().Debug("[ddl] txn fetches handle info", zap.Uint64("txnStartTS", txn.StartTS()), + logutil.BgLogger().Debug("[ddl] txn fetches handle info", zap.Stringer("worker", w), zap.Uint64("txnStartTS", txn.StartTS()), zap.String("taskRange", taskRange.String()), zap.Duration("takeTime", time.Since(startTime))) return w.idxRecords, w.getNextKey(taskRange, taskDone), taskDone, errors.Trace(err) } @@ -1619,6 +1696,7 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC jobID := handleRange.getJobID() ctx := kv.WithInternalSourceType(context.Background(), w.jobContext.ddlJobSourceType()) errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) (err error) { + taskCtx.finishTS = txn.StartTS() taskCtx.addedCount = 0 taskCtx.scanCount = 0 txn.SetOption(kv.Priority, handleRange.priority) @@ -1744,9 +1822,8 @@ func (w *worker) addTableIndex(t table.Table, reorgInfo *reorgInfo) error { } else { //nolint:forcetypeassert phyTbl := t.(table.PhysicalTable) - // TODO: Support typeAddIndexMergeTmpWorker and partitionTable. - isDistReorg := variable.DDLEnableDistributeReorg.Load() - if isDistReorg && !reorgInfo.mergingTmpIdx { + // TODO: Support typeAddIndexMergeTmpWorker. + if reorgInfo.Job.ReorgMeta.IsDistReorg && !reorgInfo.mergingTmpIdx { sCtx, err := w.sessPool.get() if err != nil { return errors.Trace(err) @@ -1809,8 +1886,7 @@ func (w *worker) updateReorgInfo(t table.PartitionedTable, reorg *reorgInfo) (bo err = reorg.UpdateReorgMeta(reorg.StartKey, w.sessPool) logutil.BgLogger().Info("[ddl] job update reorgInfo", zap.Int64("jobID", reorg.Job.ID), - zap.ByteString("elementType", reorg.currElement.TypeKey), - zap.Int64("elementID", reorg.currElement.ID), + zap.Stringer("element", reorg.currElement), zap.Int64("partitionTableID", pid), zap.String("startKey", hex.EncodeToString(reorg.StartKey)), zap.String("endKey", hex.EncodeToString(reorg.EndKey)), zap.Error(err)) @@ -1868,7 +1944,7 @@ type cleanUpIndexWorker struct { baseIndexWorker } -func newCleanUpIndexWorker(sessCtx sessionctx.Context, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *JobContext) *cleanUpIndexWorker { +func newCleanUpIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *JobContext) *cleanUpIndexWorker { indexes := make([]table.Index, 0, len(t.Indices())) rowDecoder := decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap) for _, index := range t.Indices() { @@ -1878,7 +1954,7 @@ func newCleanUpIndexWorker(sessCtx sessionctx.Context, t table.PhysicalTable, de } return &cleanUpIndexWorker{ baseIndexWorker: baseIndexWorker{ - backfillCtx: newBackfillCtx(reorgInfo.d, sessCtx, reorgInfo.ReorgMeta.ReorgTp, reorgInfo.SchemaName, t), + backfillCtx: newBackfillCtx(reorgInfo.d, id, sessCtx, reorgInfo.ReorgMeta.ReorgTp, reorgInfo.SchemaName, t), indexes: indexes, rowDecoder: rowDecoder, defaultVals: make([]types.Datum, len(t.WritableCols())), @@ -2001,6 +2077,32 @@ func (w *worker) updateReorgInfoForPartitions(t table.PartitionedTable, reorg *r return false, errors.Trace(err) } +func runBackfillJobsWithLightning(d *ddl, bfJob *BackfillJob, jobCtx *JobContext) error { + bc, err := ingest.LitBackCtxMgr.Register(d.ctx, bfJob.Meta.IsUnique, bfJob.JobID, bfJob.Meta.SQLMode) + if err != nil { + logutil.BgLogger().Warn("[ddl] lightning register error", zap.Error(err)) + return err + } + + tbl, err := runBackfillJobs(d, bc, bfJob, jobCtx) + if err != nil { + logutil.BgLogger().Warn("[ddl] runBackfillJobs error", zap.Error(err)) + ingest.LitBackCtxMgr.Unregister(bfJob.JobID) + return err + } + + bc.EngMgr.ResetWorkers(bc, bfJob.JobID, bfJob.EleID) + err = bc.FinishImport(bfJob.EleID, bfJob.Meta.IsUnique, tbl) + if err != nil { + logutil.BgLogger().Warn("[ddl] lightning import error", zap.String("first backfill job", bfJob.AbbrStr()), zap.Error(err)) + ingest.LitBackCtxMgr.Unregister(bfJob.JobID) + return err + } + ingest.LitBackCtxMgr.Unregister(bfJob.ID) + bc.SetDone() + return nil +} + // changingIndex is used to store the index that need to be changed during modifying column. type changingIndex struct { IndexInfo *model.IndexInfo diff --git a/ddl/index_merge_tmp.go b/ddl/index_merge_tmp.go index 302bb6a50a620..6e3e98b04380b 100644 --- a/ddl/index_merge_tmp.go +++ b/ddl/index_merge_tmp.go @@ -209,7 +209,7 @@ func (*mergeIndexWorker) String() string { return typeAddIndexMergeTmpWorker.String() } -func (*mergeIndexWorker) GetTask() (*BackfillJob, error) { +func (*mergeIndexWorker) GetTasks() ([]*BackfillJob, error) { panic("[ddl] merge index worker GetTask function doesn't implement") } diff --git a/ddl/indexmergetest/merge_test.go b/ddl/indexmergetest/merge_test.go index 20bc251c77089..f63164bafe871 100644 --- a/ddl/indexmergetest/merge_test.go +++ b/ddl/indexmergetest/merge_test.go @@ -469,6 +469,11 @@ func TestAddIndexMergeConflictWithPessimistic(t *testing.T) { tk.MustExec(`CREATE TABLE t (id int primary key, a int);`) tk.MustExec(`INSERT INTO t VALUES (1, 1);`) + // Make shorten the conversion time from ReorgTypeLitMerge to BackfillStateReadyToMerge. + interval := ddl.CheckBackfillJobFinishInterval + ddl.CheckBackfillJobFinishInterval = 50 * time.Millisecond + defer func() { ddl.CheckBackfillJobFinishInterval = interval }() + // Force onCreateIndex use the txn-merge process. ingest.LitInitialized = false tk.MustExec("set @@global.tidb_ddl_enable_fast_reorg = 1;") diff --git a/ddl/ingest/engine.go b/ddl/ingest/engine.go index 0c9409bf7657e..24779ef9d7718 100644 --- a/ddl/ingest/engine.go +++ b/ddl/ingest/engine.go @@ -203,7 +203,7 @@ func (ei *engineInfo) newWriterContext(workerID int, unique bool) (*WriterContex func (ei *engineInfo) closeWriters() error { var firstErr error - for wid := range ei.writerCache.Keys() { + for _, wid := range ei.writerCache.Keys() { if w, ok := ei.writerCache.Load(wid); ok { _, err := w.Close(ei.ctx) if err != nil { diff --git a/ddl/job_table.go b/ddl/job_table.go index 782abcc8b5765..d6927be673336 100644 --- a/ddl/job_table.go +++ b/ddl/job_table.go @@ -26,11 +26,15 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/tidb/ddl/ingest" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/types" + tidbutil "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/logutil" clientv3 "go.etcd.io/etcd/client/v3" @@ -40,6 +44,7 @@ import ( var ( addingDDLJobConcurrent = "/tidb/ddl/add_ddl_job_general" + addingBackfillJob = "/tidb/ddl/add_backfill_job" ) func (dc *ddlCtx) insertRunningDDLJobMap(id int64) { @@ -309,6 +314,118 @@ func (d *ddl) markJobProcessing(sess *session, job *model.Job) error { return errors.Trace(err) } +func (d *ddl) startDispatchBackfillJobsLoop() { + d.backfillCtx.jobCtxMap = make(map[int64]*JobContext) + + var notifyBackfillJobByEtcdCh clientv3.WatchChan + if d.etcdCli != nil { + notifyBackfillJobByEtcdCh = d.etcdCli.Watch(d.ctx, addingBackfillJob) + } + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() + for { + if isChanClosed(d.ctx.Done()) { + return + } + select { + case <-d.backfillJobCh: + case <-ticker.C: + case _, ok := <-notifyBackfillJobByEtcdCh: + if !ok { + logutil.BgLogger().Warn("[ddl] start backfill worker watch channel closed", zap.String("watch key", addingBackfillJob)) + notifyBackfillJobByEtcdCh = d.etcdCli.Watch(d.ctx, addingBackfillJob) + time.Sleep(time.Second) + continue + } + case <-d.ctx.Done(): + return + } + d.loadBackfillJobAndRun() + } +} + +func (d *ddl) getTableByTxn(store kv.Storage, schemaID, tableID int64) (*model.DBInfo, table.Table, error) { + var tbl table.Table + var dbInfo *model.DBInfo + err := kv.RunInNewTxn(d.ctx, store, false, func(ctx context.Context, txn kv.Transaction) error { + t := meta.NewMeta(txn) + var err1 error + dbInfo, err1 = t.GetDatabase(schemaID) + if err1 != nil { + return errors.Trace(err1) + } + tblInfo, err1 := getTableInfo(t, tableID, schemaID) + if err1 != nil { + return errors.Trace(err1) + } + tbl, err1 = getTable(store, schemaID, tblInfo) + return errors.Trace(err1) + }) + return dbInfo, tbl, err +} + +func (d *ddl) loadBackfillJobAndRun() { + isDistReorg := variable.DDLEnableDistributeReorg.Load() + if !isDistReorg { + return + } + se, err := d.sessPool.get() + if err != nil { + logutil.BgLogger().Fatal("dispatch backfill jobs loop get session failed, it should not happen, please try restart TiDB", zap.Error(err)) + } + defer d.sessPool.put(se) + sess := newSession(se) + + runningJobIDs := d.backfillCtxJobIDs() + if len(runningJobIDs) >= reorgWorkerCnt { + return + } + + // TODO: Add ele info to distinguish backfill jobs. + // Get a Backfill job to get the reorg info like element info, schema ID and so on. + bfJob, err := GetBackfillJobForOneEle(sess, runningJobIDs, InstanceLease) + if err != nil || bfJob == nil { + if err != nil { + logutil.BgLogger().Warn("[ddl] get backfill jobs failed in this instance", zap.Error(err)) + } else { + logutil.BgLogger().Debug("[ddl] get no backfill job in this instance") + } + return + } + + jobCtx, existent := d.setBackfillCtxJobContext(bfJob.JobID, bfJob.Meta.Query, bfJob.Meta.Type) + if existent { + logutil.BgLogger().Warn("[ddl] get the type of backfill job is running in this instance", zap.String("backfill job", bfJob.AbbrStr())) + return + } + // TODO: Adjust how the non-owner uses ReorgCtx. + d.setReorgCtxForBackfill(bfJob) + d.wg.Run(func() { + defer func() { + d.removeBackfillCtxJobCtx(bfJob.JobID) + tidbutil.Recover(metrics.LabelBackfillWorker, "runBackfillJobs", nil, false) + }() + + if bfJob.Meta.ReorgTp == model.ReorgTypeLitMerge { + if !ingest.LitInitialized { + logutil.BgLogger().Warn("[ddl] we can't do ingest in this instance", + zap.Bool("LitInitialized", ingest.LitInitialized), zap.String("bfJob", bfJob.AbbrStr())) + return + } + logutil.BgLogger().Info("[ddl] run backfill jobs with ingest in this instance", zap.String("bfJob", bfJob.AbbrStr())) + err = runBackfillJobsWithLightning(d, bfJob, jobCtx) + } else { + logutil.BgLogger().Info("[ddl] run backfill jobs with txn-merge in this instance", zap.String("bfJob", bfJob.AbbrStr())) + _, err = runBackfillJobs(d, nil, bfJob, jobCtx) + } + + if err == nil { + err = syncBackfillHistoryJobs(sess, d.uuid, bfJob) + } + logutil.BgLogger().Info("[ddl] run backfill jobs finished in this instance", zap.Stringer("reorg type", bfJob.Meta.ReorgTp), zap.Error(err)) + }) +} + const ( addDDLJobSQL = "insert into mysql.tidb_ddl_job(job_id, reorg, schema_ids, table_ids, job_meta, type, processing) values" updateDDLJobSQL = "update mysql.tidb_ddl_job set job_meta = %s where job_id = %d" @@ -398,7 +515,7 @@ func updateDDLJob2Table(sctx *session, job *model.Job, updateRawArgs bool) error // getDDLReorgHandle gets DDL reorg handle. func getDDLReorgHandle(sess *session, job *model.Job) (element *meta.Element, startKey, endKey kv.Key, physicalTableID int64, err error) { sql := fmt.Sprintf("select ele_id, ele_type, start_key, end_key, physical_id from mysql.tidb_ddl_reorg where job_id = %d", job.ID) - ctx := kv.WithInternalSourceType(context.Background(), getDDLRequestSource(job)) + ctx := kv.WithInternalSourceType(context.Background(), getDDLRequestSource(job.Type)) rows, err := sess.execute(ctx, sql, "get_handle") if err != nil { return nil, nil, nil, 0, err @@ -511,26 +628,34 @@ func getJobsBySQL(sess *session, tbl, condition string) ([]*model.Job, error) { return jobs, nil } +func syncBackfillHistoryJobs(sess *session, uuid string, backfillJob *BackfillJob) error { + sql := fmt.Sprintf("update mysql.%s set state = %d where ddl_job_id = %d and ele_id = %d and ele_key = %s and exec_id = '%s' limit 1;", + BackfillHistoryTable, model.JobStateSynced, backfillJob.JobID, backfillJob.EleID, wrapKey2String(backfillJob.EleKey), uuid) + _, err := sess.execute(context.Background(), sql, "sync_backfill_history_job") + return err +} + func generateInsertBackfillJobSQL(tableName string, backfillJobs []*BackfillJob) (string, error) { - sqlPrefix := fmt.Sprintf("insert into mysql.%s(id, ddl_job_id, ele_id, ele_key, store_id, type, exec_id, exec_lease, state, curr_key, start_key, end_key, start_ts, finish_ts, row_count, backfill_meta) values", tableName) - var sql string + sqlBuilder := strings.Builder{} + sqlBuilder.WriteString("insert into mysql.") + sqlBuilder.WriteString(tableName) + sqlBuilder.WriteString("(id, ddl_job_id, ele_id, ele_key, store_id, type, exec_id, exec_lease, state, curr_key, start_key, end_key, start_ts, finish_ts, row_count, backfill_meta) values") + jobs := "" for i, bj := range backfillJobs { mateByte, err := bj.Meta.Encode() if err != nil { return "", errors.Trace(err) } - if i == 0 { - sql = sqlPrefix + fmt.Sprintf("(%d, %d, %d, '%s', %d, %d, '%s', '%s', %d, '%s', '%s', '%s', %d, %d, %d, '%s')", - bj.ID, bj.JobID, bj.EleID, bj.EleKey, bj.StoreID, bj.Tp, bj.InstanceID, bj.InstanceLease, bj.State, - bj.CurrKey, bj.StartKey, bj.EndKey, bj.StartTS, bj.FinishTS, bj.RowCount, mateByte) - continue + if i != 0 { + sqlBuilder.WriteString(", ") } - sql += fmt.Sprintf(", (%d, %d, %d, '%s', %d, %d, '%s', '%s', %d, '%s', '%s', '%s', %d, %d, %d, '%s')", - bj.ID, bj.JobID, bj.EleID, bj.EleKey, bj.StoreID, bj.Tp, bj.InstanceID, bj.InstanceLease, bj.State, - bj.CurrKey, bj.StartKey, bj.EndKey, bj.StartTS, bj.FinishTS, bj.RowCount, mateByte) + sqlBuilder.WriteString(fmt.Sprintf("(%d, %d, %d, %s, %d, %d, '%s', '%s', %d, %s, %s, %s, %d, %d, %d, %s)", + bj.ID, bj.JobID, bj.EleID, wrapKey2String(bj.EleKey), bj.StoreID, bj.Tp, bj.InstanceID, bj.InstanceLease, bj.State, wrapKey2String(bj.CurrKey), + wrapKey2String(bj.StartKey), wrapKey2String(bj.EndKey), bj.StartTS, bj.FinishTS, bj.RowCount, wrapKey2String(mateByte))) + jobs += fmt.Sprintf("job:%#v; ", bj.AbbrStr()) } - return sql, nil + return sqlBuilder.String(), nil } // AddBackfillHistoryJob adds the backfill jobs to the tidb_ddl_backfill_history table. @@ -567,8 +692,8 @@ func AddBackfillJobs(s *session, backfillJobs []*BackfillJob) error { }) } -// GetBackfillJobsForOneEle batch gets the backfill jobs in the tblName table that contains only one element. -func GetBackfillJobsForOneEle(s *session, batch int, excludedJobIDs []int64, lease time.Duration) ([]*BackfillJob, error) { +// GetBackfillJobForOneEle gets the backfill jobs in the tblName table that contains only one element. +func GetBackfillJobForOneEle(s *session, excludedJobIDs []int64, lease time.Duration) (*BackfillJob, error) { eJobIDsBuilder := strings.Builder{} for i, id := range excludedJobIDs { if i == 0 { @@ -589,26 +714,18 @@ func GetBackfillJobsForOneEle(s *session, batch int, excludedJobIDs []int64, lea if err != nil { return err } + leaseStr := currTime.Add(-lease).Format(types.TimeFormat) bJobs, err = GetBackfillJobs(se, BackfillTable, - fmt.Sprintf("(exec_ID = '' or exec_lease < '%v') %s order by ddl_job_id, ele_key, ele_id limit %d", - currTime.Add(-lease), eJobIDsBuilder.String(), batch), "get_backfill_job") + fmt.Sprintf("(exec_ID = '' or exec_lease < '%v') %s order by ddl_job_id, ele_key, ele_id limit 1", + leaseStr, eJobIDsBuilder.String()), "get_backfill_job") return err }) if err != nil || len(bJobs) == 0 { return nil, err } - validLen := 1 - firstJobID, firstEleID, firstEleKey := bJobs[0].JobID, bJobs[0].EleID, bJobs[0].EleKey - for i := 1; i < len(bJobs); i++ { - if bJobs[i].JobID != firstJobID || bJobs[i].EleID != firstEleID || !bytes.Equal(bJobs[i].EleKey, firstEleKey) { - break - } - validLen++ - } - - return bJobs[:validLen], nil + return bJobs[0], nil } // GetAndMarkBackfillJobsForOneEle batch gets the backfill jobs in the tblName table that contains only one element, @@ -621,10 +738,11 @@ func GetAndMarkBackfillJobsForOneEle(s *session, batch int, jobID int64, uuid st if err != nil { return err } + leaseStr := currTime.Add(-lease).Format(types.TimeFormat) bJobs, err = GetBackfillJobs(se, BackfillTable, fmt.Sprintf("(exec_ID = '' or exec_lease < '%v') and ddl_job_id = %d order by ddl_job_id, ele_key, ele_id limit %d", - currTime.Add(-lease), jobID, batch), "get_mark_backfill_job") + leaseStr, jobID, batch), "get_mark_backfill_job") if err != nil { return err } @@ -656,10 +774,10 @@ func GetAndMarkBackfillJobsForOneEle(s *session, batch int, jobID int64, uuid st return bJobs[:validLen], err } -// GetInterruptedBackfillJobsForOneEle gets the interrupted backfill jobs in the tblName table that contains only one element. -func GetInterruptedBackfillJobsForOneEle(sess *session, jobID, eleID int64, eleKey []byte) ([]*BackfillJob, error) { - bJobs, err := GetBackfillJobs(sess, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s' and (state = %d or state = %d)", - jobID, eleID, eleKey, model.JobStateRollingback, model.JobStateCancelling), "get_interrupt_backfill_job") +// GetInterruptedBackfillJobForOneEle gets an interrupted backfill job that contains only one element. +func GetInterruptedBackfillJobForOneEle(sess *session, jobID, eleID int64, eleKey []byte) ([]*BackfillJob, error) { + bJobs, err := GetBackfillJobs(sess, BackfillHistoryTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = %s and state = %d limit 1", + jobID, eleID, wrapKey2String(eleKey), model.JobStateCancelled), "get_interrupt_backfill_job") if err != nil || len(bJobs) == 0 { return nil, err } @@ -679,9 +797,32 @@ func GetBackfillJobCount(sess *session, tblName, condition string, label string) return int(rows[0].GetInt64(0)), nil } +// GetBackfillMetas gets the backfill metas in the tblName table according to condition. +func GetBackfillMetas(sess *session, tblName, condition string, label string) ([]*model.BackfillMeta, error) { + rows, err := sess.execute(context.Background(), fmt.Sprintf("select backfill_meta from mysql.%s where %s", tblName, condition), label) + if err != nil { + return nil, errors.Trace(err) + } + if len(rows) == 0 { + return nil, dbterror.ErrDDLJobNotFound.FastGenByArgs(fmt.Sprintf("get wrong result cnt:%d", len(rows))) + } + + metas := make([]*model.BackfillMeta, 0, len(rows)) + for _, r := range rows { + meta := &model.BackfillMeta{} + err = meta.Decode(r.GetBytes(0)) + if err != nil { + return nil, errors.Trace(err) + } + metas = append(metas, meta) + } + + return metas, nil +} + func getUnsyncedInstanceIDs(sess *session, jobID int64, label string) ([]string, error) { - sql := fmt.Sprintf("select sum(state = %d) as tmp, exec_id from mysql.tidb_ddl_backfill_history where ddl_job_id = %d group by exec_id having tmp = 0;", - model.JobStateSynced, jobID) + sql := fmt.Sprintf("select sum((state=%d) + (state=%d)) as tmp, exec_id from mysql.tidb_ddl_backfill_history where ddl_job_id = %d group by exec_id having tmp = 0;", + model.JobStateSynced, model.JobStateCancelled, jobID) rows, err := sess.execute(context.Background(), sql, label) if err != nil { return nil, errors.Trace(err) @@ -732,8 +873,8 @@ func GetBackfillJobs(sess *session, tblName, condition string, label string) ([] // RemoveBackfillJob removes the backfill jobs from the tidb_ddl_backfill table. // If isOneEle is true, removes all jobs with backfillJob's ddl_job_id, ele_id and ele_key. Otherwise, removes the backfillJob. func RemoveBackfillJob(sess *session, isOneEle bool, backfillJob *BackfillJob) error { - sql := fmt.Sprintf("delete from mysql.tidb_ddl_backfill where ddl_job_id = %d and ele_id = %d and ele_key = '%s'", - backfillJob.JobID, backfillJob.EleID, backfillJob.EleKey) + sql := fmt.Sprintf("delete from mysql.tidb_ddl_backfill where ddl_job_id = %d and ele_id = %d and ele_key = %s", + backfillJob.JobID, backfillJob.EleID, wrapKey2String(backfillJob.EleKey)) if !isOneEle { sql += fmt.Sprintf(" and id = %d", backfillJob.ID) } @@ -746,8 +887,9 @@ func updateBackfillJob(sess *session, tableName string, backfillJob *BackfillJob if err != nil { return err } - sql := fmt.Sprintf("update mysql.%s set exec_id = '%s', exec_lease = '%s', state = %d, backfill_meta = '%s' where ddl_job_id = %d and ele_id = %d and ele_key = '%s' and id = %d", - tableName, backfillJob.InstanceID, backfillJob.InstanceLease, backfillJob.State, mate, backfillJob.JobID, backfillJob.EleID, backfillJob.EleKey, backfillJob.ID) + sql := fmt.Sprintf("update mysql.%s set exec_id = '%s', exec_lease = '%s', state = %d, curr_key = %s, row_count = %d, backfill_meta = %s where ddl_job_id = %d and ele_id = %d and ele_key = %s and id = %d", + tableName, backfillJob.InstanceID, backfillJob.InstanceLease, backfillJob.State, wrapKey2String(backfillJob.CurrKey), backfillJob.RowCount, + wrapKey2String(mate), backfillJob.JobID, backfillJob.EleID, wrapKey2String(backfillJob.EleKey), backfillJob.ID) _, err = sess.execute(context.Background(), sql, label) return err } diff --git a/ddl/job_table_test.go b/ddl/job_table_test.go index 9f1150241bbd1..38515f71f07d2 100644 --- a/ddl/job_table_test.go +++ b/ddl/job_table_test.go @@ -17,6 +17,8 @@ package ddl_test import ( "context" "fmt" + "strconv" + "strings" "sync" "testing" "time" @@ -26,7 +28,9 @@ import ( "github.com/pingcap/tidb/ddl/internal/callback" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/sessiontxn" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/types" @@ -196,15 +200,16 @@ func makeAddIdxBackfillJobs(schemaID, tblID, jobID, eleID int64, cnt int, query }, } bj := &ddl.BackfillJob{ - ID: int64(i), - JobID: jobID, - EleID: eleID, - EleKey: meta.IndexElementKey, - State: model.JobStateNone, - CurrKey: sKey, - StartKey: sKey, - EndKey: eKey, - Meta: bm, + ID: int64(i), + JobID: jobID, + EleID: eleID, + EleKey: meta.IndexElementKey, + State: model.JobStateNone, + InstanceLease: types.ZeroTimestamp, + CurrKey: sKey, + StartKey: sKey, + EndKey: eKey, + Meta: bm, } bJobs = append(bJobs, bj) } @@ -224,8 +229,8 @@ func equalBackfillJob(t *testing.T, a, b *ddl.BackfillJob, lessTime types.Time) } func getIdxConditionStr(jobID, eleID int64) string { - return fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s'", - jobID, eleID, meta.IndexElementKey) + return fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = %s", + jobID, eleID, wrapKey2String(meta.IndexElementKey)) } func readInTxn(se sessionctx.Context, f func(sessionctx.Context)) (err error) { @@ -253,27 +258,30 @@ func TestSimpleExecBackfillJobs(t *testing.T) { uuid := d.GetID() eleKey := meta.IndexElementKey instanceLease := ddl.InstanceLease + // test no backfill job - bJobs, err := ddl.GetBackfillJobsForOneEle(se, 1, []int64{jobID1, jobID2}, instanceLease) + bJob, err := ddl.GetBackfillJobForOneEle(se, []int64{jobID1, jobID2}, instanceLease) require.NoError(t, err) - require.Nil(t, bJobs) - bJobs, err = ddl.GetAndMarkBackfillJobsForOneEle(se, 1, jobID1, uuid, instanceLease) + require.Nil(t, bJob) + bJobs, err := ddl.GetAndMarkBackfillJobsForOneEle(se, 1, jobID1, uuid, instanceLease) require.EqualError(t, err, dbterror.ErrDDLJobNotFound.FastGen("get zero backfill job").Error()) require.Nil(t, bJobs) - allCnt, err := ddl.GetBackfillJobCount(se, ddl.BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s'", - jobID1, eleID2, meta.IndexElementKey), "check_backfill_job_count") + allCnt, err := ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID1, eleID2), "check_backfill_job_count") require.NoError(t, err) require.Equal(t, allCnt, 0) // Test some backfill jobs, add backfill jobs to the table. cnt := 2 bjTestCases := make([]*ddl.BackfillJob, 0, cnt*3) - bJobs1 := makeAddIdxBackfillJobs(1, 2, jobID1, eleID1, cnt, "alter table add index idx(a)") - bJobs2 := makeAddIdxBackfillJobs(1, 2, jobID2, eleID2, cnt, "alter table add index idx(b)") - bJobs3 := makeAddIdxBackfillJobs(1, 2, jobID2, eleID3, cnt, "alter table add index idx(c)") + bJobs1 := makeAddIdxBackfillJobs(1, 2, jobID1, eleID1, cnt, "alter table t add index idx(a)") + bJobs2 := makeAddIdxBackfillJobs(1, 2, jobID2, eleID2, cnt, "alter table t add index idx(b)") + bJobs3 := makeAddIdxBackfillJobs(1, 2, jobID2, eleID3, cnt, "alter table t add index idx(c)") bjTestCases = append(bjTestCases, bJobs1...) bjTestCases = append(bjTestCases, bJobs2...) bjTestCases = append(bjTestCases, bJobs3...) err = ddl.AddBackfillJobs(se, bjTestCases) + require.Equal(t, err.Error(), "[table:1292]Incorrect timestamp value: '0000-00-00 00:00:00' for column 'exec_lease' at row 1") + tk.Session().GetSessionVars().SQLMode = mysql.ModeNone + err = ddl.AddBackfillJobs(se, bjTestCases) // ID jobID eleID InstanceID // ------------------------------------- // 0 jobID1 eleID1 uuid @@ -284,14 +292,13 @@ func TestSimpleExecBackfillJobs(t *testing.T) { // 1 jobID2 eleID3 "" require.NoError(t, err) // test get some backfill jobs - bJobs, err = ddl.GetBackfillJobsForOneEle(se, 1, []int64{jobID2 - 1, jobID2 + 1}, instanceLease) + bJob, err = ddl.GetBackfillJobForOneEle(se, []int64{jobID2 - 1, jobID2 + 1}, instanceLease) require.NoError(t, err) - require.Len(t, bJobs, 1) expectJob := bjTestCases[2] - if expectJob.ID != bJobs[0].ID { + if expectJob.ID != bJob.ID { expectJob = bjTestCases[3] } - require.Equal(t, expectJob, bJobs[0]) + require.Equal(t, expectJob, bJob) previousTime, err := ddl.GetOracleTimeWithStartTS(se) require.EqualError(t, err, "[kv:8024]invalid transaction") readInTxn(se, func(sessionctx.Context) { @@ -373,13 +380,10 @@ func TestSimpleExecBackfillJobs(t *testing.T) { require.Len(t, bJobs, 1) require.Equal(t, bJobs[0].FinishTS, uint64(0)) - // test GetMaxBackfillJob and GetInterruptedBackfillJobsForOneEle + // test GetMaxBackfillJob bjob, err := ddl.GetMaxBackfillJob(se, bJobs3[0].JobID, bJobs3[0].EleID, eleKey) require.NoError(t, err) require.Nil(t, bjob) - bJobs, err = ddl.GetInterruptedBackfillJobsForOneEle(se, jobID1, eleID1, eleKey) - require.NoError(t, err) - require.Nil(t, bJobs) err = ddl.AddBackfillJobs(se, bjTestCases) require.NoError(t, err) // ID jobID eleID @@ -393,15 +397,12 @@ func TestSimpleExecBackfillJobs(t *testing.T) { bjob, err = ddl.GetMaxBackfillJob(se, jobID2, eleID2, eleKey) require.NoError(t, err) require.Equal(t, bJobs2[1], bjob) - bJobs, err = ddl.GetInterruptedBackfillJobsForOneEle(se, jobID1, eleID1, eleKey) - require.NoError(t, err) - require.Nil(t, bJobs) bJobs1[0].State = model.JobStateRollingback bJobs1[0].ID = 2 bJobs1[0].InstanceID = uuid - bJobs1[1].State = model.JobStateCancelling + bJobs1[1].State = model.JobStateCancelled bJobs1[1].ID = 3 - bJobs1[1].Meta.ErrMsg = "errMsg" + bJobs1[1].Meta.Error = dbterror.ErrCancelledDDLJob err = ddl.AddBackfillJobs(se, bJobs1) require.NoError(t, err) // ID jobID eleID state @@ -413,20 +414,25 @@ func TestSimpleExecBackfillJobs(t *testing.T) { // 0 jobID2 eleID3 JobStateNone // 1 jobID2 eleID3 JobStateNone // 2 jobID1 eleID1 JobStateRollingback - // 3 jobID1 eleID1 JobStateCancelling + // 3 jobID1 eleID1 JobStateCancelled bjob, err = ddl.GetMaxBackfillJob(se, jobID1, eleID1, eleKey) require.NoError(t, err) require.Equal(t, bJobs1[1], bjob) - bJobs, err = ddl.GetInterruptedBackfillJobsForOneEle(se, jobID1, eleID1, eleKey) - require.NoError(t, err) - require.Len(t, bJobs, 2) - equalBackfillJob(t, bJobs1[0], bJobs[0], types.ZeroTime) - equalBackfillJob(t, bJobs1[1], bJobs[1], types.ZeroTime) // test the BackfillJob's AbbrStr require.Equal(t, fmt.Sprintf("ID:2, JobID:1, EleID:11, Type:add index, State:rollingback, InstanceID:%s, InstanceLease:0000-00-00 00:00:00", uuid), bJobs1[0].AbbrStr()) - require.Equal(t, "ID:3, JobID:1, EleID:11, Type:add index, State:cancelling, InstanceID:, InstanceLease:0000-00-00 00:00:00", bJobs1[1].AbbrStr()) + require.Equal(t, "ID:3, JobID:1, EleID:11, Type:add index, State:cancelled, InstanceID:, InstanceLease:0000-00-00 00:00:00", bJobs1[1].AbbrStr()) require.Equal(t, "ID:0, JobID:2, EleID:33, Type:add index, State:none, InstanceID:, InstanceLease:0000-00-00 00:00:00", bJobs3[0].AbbrStr()) require.Equal(t, "ID:1, JobID:2, EleID:33, Type:add index, State:none, InstanceID:, InstanceLease:0000-00-00 00:00:00", bJobs3[1].AbbrStr()) + // test select tidb_ddl_backfill + tk.MustQuery(fmt.Sprintf("select exec_id, exec_lease from mysql.tidb_ddl_backfill where id = %d and %s", bJobs1[0].ID, getIdxConditionStr(jobID1, eleID1))). + Check(testkit.Rows(fmt.Sprintf("%s 0000-00-00 00:00:00", uuid))) + tk.MustQuery(fmt.Sprintf("select exec_id, exec_lease from mysql.tidb_ddl_backfill where id = %d and %s", bJobs1[1].ID, getIdxConditionStr(jobID1, eleID1))). + Check(testkit.Rows(" 0000-00-00 00:00:00")) + // test GetBackfillMetas + bfErr := ddl.GetBackfillErr(se, jobID1, eleID1, eleKey) + require.Error(t, bfErr, dbterror.ErrCancelledDDLJob) + bfErr = ddl.GetBackfillErr(se, jobID2, eleID2, eleKey) + require.NoError(t, bfErr) bJobs1[0].State = model.JobStateNone bJobs1[0].ID = 5 @@ -443,7 +449,7 @@ func TestSimpleExecBackfillJobs(t *testing.T) { // 0 jobID2 eleID3 JobStateNone // 1 jobID2 eleID3 JobStateNone // 2 jobID1 eleID1 JobStateRollingback - // 3 jobID1 eleID1 JobStateCancelling + // 3 jobID1 eleID1 JobStateCancelled // // BackfillHistoryTable // ID jobID eleID state @@ -466,7 +472,7 @@ func TestSimpleExecBackfillJobs(t *testing.T) { // 0 jobID2 eleID3 JobStateNone // 1 jobID2 eleID3 JobStateNone // 2 jobID1 eleID1 JobStateRollingback - // 3 jobID1 eleID1 JobStateCancelling + // 3 jobID1 eleID1 JobStateCancelled // 6 jobID1 eleID1 JobStateNone // 7 jobID1 eleID1 JobStateNone // @@ -479,7 +485,7 @@ func TestSimpleExecBackfillJobs(t *testing.T) { require.NoError(t, err) require.Equal(t, bJobs1[1], bjob) - // test MoveBackfillJobsToHistoryTable + // test MoveBackfillJobsToHistoryTable and GetInterruptedBackfillJobForOneEle allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID2, eleID3), "test_get_bj") require.NoError(t, err) require.Equal(t, allCnt, 2) @@ -499,7 +505,7 @@ func TestSimpleExecBackfillJobs(t *testing.T) { // 0 jobID2 eleID2 JobStateNone // 1 jobID2 eleID2 JobStateNone // 2 jobID1 eleID1 JobStateRollingback - // 3 jobID1 eleID1 JobStateCancelling + // 3 jobID1 eleID1 JobStateCancelled // 6 jobID1 eleID1 JobStateNone // 7 jobID1 eleID1 JobStateNone // @@ -508,8 +514,11 @@ func TestSimpleExecBackfillJobs(t *testing.T) { // -------------------------------- // 5 jobID1 eleID1 JobStateNone // 4 jobID1 eleID1 JobStateNone - // 0 jobID2 eleID3 JobStateNone - // 1 jobID2 eleID3 JobStateNone + // 0 jobID2 eleID3 JobStateCancelled + // 1 jobID2 eleID3 JobStateCancelled + bJobs, err = ddl.GetInterruptedBackfillJobForOneEle(se, jobID1, eleID1, eleKey) + require.NoError(t, err) + require.Len(t, bJobs, 0) allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID1, eleID1), "test_get_bj") require.NoError(t, err) require.Equal(t, allCnt, 6) @@ -521,6 +530,15 @@ func TestSimpleExecBackfillJobs(t *testing.T) { allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillHistoryTable, getIdxConditionStr(jobID1, eleID1), "test_get_bj") require.NoError(t, err) require.Equal(t, allCnt, 8) + bJobs, err = ddl.GetInterruptedBackfillJobForOneEle(se, jobID2, eleID3, eleKey) + require.NoError(t, err) + require.Len(t, bJobs, 1) + expectJob = bJobs3[0] + if expectJob.ID != bJob.ID { + expectJob = bJobs3[1] + } + expectJob.State = model.JobStateCancelled + equalBackfillJob(t, bJobs3[0], bJobs[0], types.ZeroTimestamp) // BackfillTable // ID jobID eleID state // -------------------------------- @@ -532,12 +550,103 @@ func TestSimpleExecBackfillJobs(t *testing.T) { // -------------------------------- // 5 jobID1 eleID1 JobStateNone // 4 jobID1 eleID1 JobStateNone - // 0 jobID2 eleID3 JobStateNone - // 1 jobID2 eleID3 JobStateNone - // 0 jobID1 eleID1 JobStateNone - // 1 jobID1 eleID1 JobStateNone - // 2 jobID1 eleID1 JobStateRollingback - // 3 jobID1 eleID1 JobStateCancelling - // 6 jobID1 eleID1 JobStateNone - // 7 jobID1 eleID1 JobStateNone + // 0 jobID2 eleID3 JobStateCancelled + // 1 jobID2 eleID3 JobStateCancelled + // 0 jobID1 eleID1 JobStateCancelled + // 1 jobID1 eleID1 JobStateCancelled + // 2 jobID1 eleID1 JobStateCancelled + // 3 jobID1 eleID1 JobStateCancelled + // 6 jobID1 eleID1 JobStateCancelled + // 7 jobID1 eleID1 JobStateCancelled +} + +func TestGetTasks(t *testing.T) { + // TODO: update the variable of `enableDistReorg` + isDistReorg := variable.DDLEnableDistributeReorg.Load() + variable.DDLEnableDistributeReorg.Store(false) + defer func() { variable.DDLEnableDistributeReorg.Store(isDistReorg) }() + + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + se := ddl.NewSession(tk.Session()) + se.GetSessionVars().SQLMode = mysql.ModeNone + d := dom.DDL() + + jobID1 := int64(1) + eleID1 := int64(11) + uuid := d.GetID() + cnt := 3 + instanceLease := ddl.InstanceLease + bJobsTestCases := makeAddIdxBackfillJobs(1, 2, jobID1, eleID1, cnt, "alter table t add index idx(a)") + err := ddl.AddBackfillJobs(se, bJobsTestCases) + require.NoError(t, err) + + var wg util.WaitGroupWrapper + // Mock GetAndMarkBackfillJobsForOneEle gets a writing conflict error. + // Step 1: se1 begins txn1. + // Step 2: se2 begins txn2. + // Step 3: execute txn1 and txn2, then txn1 or txn2 returns a writing conflict error. + var err1 error + ch := make(chan struct{}, 1) + wg.Run(func() { + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/NotifyBeginTxnCh", `return(1)`)) + ch <- struct{}{} + var bJobs []*ddl.BackfillJob + bJobs, err = ddl.GetAndMarkBackfillJobsForOneEle(se, 1, jobID1, uuid, instanceLease) + require.Len(t, bJobs, 1) + }) + <-ch + defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/NotifyBeginTxnCh")) }() + wg.Run(func() { + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + se1 := ddl.NewSession(tk1.Session()) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/NotifyBeginTxnCh", `return(2)`)) + var bJobs1 []*ddl.BackfillJob + bJobs1, err1 = ddl.GetAndMarkBackfillJobsForOneEle(se1, 1, jobID1, uuid, instanceLease) + require.Len(t, bJobs1, 1) + }) + wg.Wait() + if err == nil { + require.NotNil(t, err1) + require.True(t, strings.Contains(err1.Error(), "[kv:9007]Write conflict")) + } else { + require.Nil(t, err1) + require.True(t, strings.Contains(err.Error(), "[kv:9007]Write conflict")) + } + + // get tbl + tk.MustExec("create table t(a int, b int)") + var tableID int64 + rs := tk.MustQuery("select TIDB_TABLE_ID from information_schema.tables where table_name='t' and table_schema='test';") + tableIDi, err := strconv.Atoi(rs.Rows()[0][0].(string)) + require.Nil(t, err) + tableID = int64(tableIDi) + tbl := testGetTable(t, dom, tableID) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/NotifyBeginTxnCh", `return(0)`)) + // Mock GetAndMarkBackfillJobsForOneEle gets a writing conflict error, but getTasks is successful. + // Step 1: se1 begins txn1. + // Step 2: se2 begins txn2. + // Step 3: execute txn1 and txn2, then txn1 or txn2 returns a writing conflict error. + // Step 4: se2 begin txn3. + // Step 5: getTasks(txn3) executes successfully. + wg.Run(func() { + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/NotifyBeginTxnCh", `return(1)`)) + ch <- struct{}{} + bJobs, err := ddl.GetTasks(ddl.GetDDLCtx(d), se, tbl, jobID1, 1) + require.Nil(t, err) + require.Len(t, bJobs, 1) + }) + <-ch + wg.Run(func() { + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + se1 := ddl.NewSession(tk1.Session()) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/NotifyBeginTxnCh", `return(2)`)) + bJobs1, err1 := ddl.GetTasks(ddl.GetDDLCtx(d), se1, tbl, jobID1, 1) + require.Nil(t, err1) + require.Len(t, bJobs1, 1) + }) + wg.Wait() } diff --git a/ddl/main_test.go b/ddl/main_test.go index a10374b04f0f2..6db3ace76e6a6 100644 --- a/ddl/main_test.go +++ b/ddl/main_test.go @@ -40,6 +40,8 @@ func TestMain(m *testing.M) { autoid.SetStep(5000) ddl.ReorgWaitTimeout = 30 * time.Millisecond + ddl.RetrySQLInterval = 30 * time.Millisecond + ddl.CheckBackfillJobFinishInterval = 50 * time.Millisecond ddl.RunInGoTest = true ddl.SetBatchInsertDeleteRangeSize(2) diff --git a/ddl/modify_column_test.go b/ddl/modify_column_test.go index 583c0a435b4ec..7aa23d526bc65 100644 --- a/ddl/modify_column_test.go +++ b/ddl/modify_column_test.go @@ -54,8 +54,11 @@ func TestModifyColumnReorgInfo(t *testing.T) { originalTimeout := ddl.ReorgWaitTimeout ddl.ReorgWaitTimeout = 10 * time.Millisecond + limit := variable.GetDDLErrorCountLimit() + variable.SetDDLErrorCountLimit(5) defer func() { ddl.ReorgWaitTimeout = originalTimeout + variable.SetDDLErrorCountLimit(limit) }() tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -160,10 +163,16 @@ func TestModifyColumnReorgInfo(t *testing.T) { // Test encountering a "notOwnerErr" error which caused the processing backfill job to exit halfway. // During the period, the old TiDB version(do not exist the element information) is upgraded to the new TiDB version. require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/MockGetIndexRecordErr", `return("addIdxNotOwnerErr")`)) - tk.MustExec("alter table t1 add index idx2(c1)") - expectedElements = []*meta.Element{ - {ID: 7, TypeKey: meta.IndexElementKey}} - checkReorgHandle(elements, expectedElements) + // TODO: Remove this check after "err" isn't nil in runReorgJobAndHandleErr. + if variable.DDLEnableDistributeReorg.Load() { + err = tk.ExecToErr("alter table t1 add index idx2(c1)") + require.EqualError(t, err, "[ddl:8201]TiDB server is not a DDL owner") + } else { + tk.MustExec("alter table t1 add index idx2(c1)") + expectedElements = []*meta.Element{ + {ID: 7, TypeKey: meta.IndexElementKey}} + checkReorgHandle(elements, expectedElements) + } tk.MustExec("admin check table t1") require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/MockGetIndexRecordErr")) } diff --git a/ddl/reorg.go b/ddl/reorg.go index e760e43c11221..3eaa8080f1f73 100644 --- a/ddl/reorg.go +++ b/ddl/reorg.go @@ -204,7 +204,7 @@ func (w *worker) runReorgJob(rh *reorgHandler, reorgInfo *reorgInfo, tblInfo *mo if job.IsCancelling() { return dbterror.ErrCancelledDDLJob } - rc = w.newReorgCtx(reorgInfo) + rc = w.newReorgCtx(reorgInfo.Job.ID, reorgInfo.StartKey, reorgInfo.currElement, reorgInfo.Job.GetRowCount()) w.wg.Add(1) go func() { defer w.wg.Done() @@ -338,7 +338,7 @@ func getTableTotalCount(w *worker, tblInfo *model.TableInfo) int64 { return rows[0].GetInt64(0) } -func (dc *ddlCtx) isReorgRunnable(jobID int64) error { +func (dc *ddlCtx) isReorgRunnable(jobID int64, isDistReorg bool) error { if isChanClosed(dc.ctx.Done()) { // Worker is closed. So it can't do the reorganization. return dbterror.ErrInvalidWorker.GenWithStack("worker is closed") @@ -349,6 +349,10 @@ func (dc *ddlCtx) isReorgRunnable(jobID int64) error { return dbterror.ErrCancelledDDLJob } + // If isDistReorg is true, we needn't check if it is owner. + if isDistReorg { + return nil + } if !dc.isOwner() { // If it's not the owner, we will try later, so here just returns an error. logutil.BgLogger().Info("[ddl] DDL is not the DDL owner", zap.String("ID", dc.uuid)) diff --git a/metrics/metrics.go b/metrics/metrics.go index 68a2729f3483c..7d61607d741f2 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -46,14 +46,15 @@ var ( // metrics labels. const ( - LabelSession = "session" - LabelDomain = "domain" - LabelDDLOwner = "ddl-owner" - LabelDDL = "ddl" - LabelDDLWorker = "ddl-worker" - LabelDDLSyncer = "ddl-syncer" - LabelGCWorker = "gcworker" - LabelAnalyze = "analyze" + LabelSession = "session" + LabelDomain = "domain" + LabelDDLOwner = "ddl-owner" + LabelDDL = "ddl" + LabelDDLWorker = "ddl-worker" + LabelBackfillWorker = "backfill-worker" + LabelDDLSyncer = "ddl-syncer" + LabelGCWorker = "gcworker" + LabelAnalyze = "analyze" LabelBatchRecvLoop = "batch-recv-loop" LabelBatchSendLoop = "batch-send-loop" diff --git a/parser/model/BUILD.bazel b/parser/model/BUILD.bazel index f3f214ce29eeb..5387516deaf94 100644 --- a/parser/model/BUILD.bazel +++ b/parser/model/BUILD.bazel @@ -31,6 +31,7 @@ go_test( deps = [ "//parser/charset", "//parser/mysql", + "//parser/terror", "//parser/types", "@com_github_stretchr_testify//require", ], diff --git a/parser/model/ddl.go b/parser/model/ddl.go index 8eb26ca238d3f..7ca33341258c5 100644 --- a/parser/model/ddl.go +++ b/parser/model/ddl.go @@ -237,6 +237,7 @@ type DDLReorgMeta struct { WarningsCount map[errors.ErrorID]int64 `json:"warnings_count"` Location *TimeZoneLocation `json:"location"` ReorgTp ReorgType `json:"reorg_tp"` + IsDistReorg bool `json:"is_dist_reorg"` } // ReorgType indicates which process is used for the data reorganization. @@ -426,7 +427,9 @@ func (sub *SubJob) FromProxyJob(proxyJob *Job, ver int64) { type JobMeta struct { SchemaID int64 `json:"schema_id"` TableID int64 `json:"table_id"` - // Query string of the ddl job. + // Type is the DDL job's type. + Type ActionType `json:"job_type"` + // Query is the DDL job's SQL string. Query string `json:"query"` // Priority is only used to set the operation priority of adding indices. Priority int `json:"priority"` @@ -434,10 +437,10 @@ type JobMeta struct { // BackfillMeta is meta info of the backfill job. type BackfillMeta struct { - PhysicalTableID int64 `json:"physical_table_id"` - IsUnique bool `json:"is_unique"` - EndInclude bool `json:"end_include"` - ErrMsg string `json:"err_msg"` + PhysicalTableID int64 `json:"physical_table_id"` + IsUnique bool `json:"is_unique"` + EndInclude bool `json:"end_include"` + Error *terror.Error `json:"err"` SQLMode mysql.SQLMode `json:"sql_mode"` Warnings map[errors.ErrorID]*terror.Error `json:"warnings"` diff --git a/parser/model/ddl_test.go b/parser/model/ddl_test.go index d67b6ac91175a..7ea82606363ce 100644 --- a/parser/model/ddl_test.go +++ b/parser/model/ddl_test.go @@ -18,6 +18,7 @@ import ( "unsafe" "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/terror" "github.com/stretchr/testify/require" ) @@ -61,7 +62,7 @@ func TestBackfillMetaCodec(t *testing.T) { } bm := &model.BackfillMeta{ EndInclude: true, - ErrMsg: "has a err", + Error: terror.ErrResultUndetermined, JobMeta: jm, } bmBytes, err := bm.Encode() diff --git a/sessionctx/variable/featuretag/distributereorg/default.go b/sessionctx/variable/featuretag/distributereorg/default.go index 910629adde825..6594d3ff726a4 100644 --- a/sessionctx/variable/featuretag/distributereorg/default.go +++ b/sessionctx/variable/featuretag/distributereorg/default.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build !featuretag +//go:build !distributereorg package distributereorg diff --git a/sessionctx/variable/featuretag/distributereorg/non_default.go b/sessionctx/variable/featuretag/distributereorg/non_default.go index f6286ba5b3409..9530be9c617d1 100644 --- a/sessionctx/variable/featuretag/distributereorg/non_default.go +++ b/sessionctx/variable/featuretag/distributereorg/non_default.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build featuretag +//go:build distributereorg package distributereorg diff --git a/tests/realtikvtest/addindextest/BUILD.bazel b/tests/realtikvtest/addindextest/BUILD.bazel index a79f2a15f8ca7..573fb1f531abc 100644 --- a/tests/realtikvtest/addindextest/BUILD.bazel +++ b/tests/realtikvtest/addindextest/BUILD.bazel @@ -41,6 +41,7 @@ go_test( "//domain", "//errno", "//parser/model", + "//sessionctx/variable", "//testkit", "//tests/realtikvtest", "@com_github_pingcap_failpoint//:failpoint", diff --git a/tests/realtikvtest/addindextest/integration_test.go b/tests/realtikvtest/addindextest/integration_test.go index ed1e4e6c85dcd..e5c147d3337a3 100644 --- a/tests/realtikvtest/addindextest/integration_test.go +++ b/tests/realtikvtest/addindextest/integration_test.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/tests/realtikvtest" "github.com/stretchr/testify/assert" @@ -184,6 +185,9 @@ func TestIngestMVIndexOnPartitionTable(t *testing.T) { } func TestAddIndexIngestAdjustBackfillWorker(t *testing.T) { + if variable.DDLEnableDistributeReorg.Load() { + t.Skip("dist reorg didn't support checkBackfillWorkerNum, skip this test") + } store := realtikvtest.CreateMockStoreAndSetup(t) tk := testkit.NewTestKit(t, store) tk.MustExec("drop database if exists addindexlit;")