diff --git a/ddl/backfilling.go b/ddl/backfilling.go index c8bedf85dc5cd..a22ea5c48abe7 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -776,11 +776,16 @@ func makeupDecodeColMap(sessCtx sessionctx.Context, dbName model.CIStr, t table. return decodeColMap, nil } -func setSessCtxLocation(sctx sessionctx.Context, info *reorgInfo) error { +func setSessCtxLocation(sctx sessionctx.Context, tzLocation *model.TimeZoneLocation) error { // It is set to SystemLocation to be compatible with nil LocationInfo. - *sctx.GetSessionVars().TimeZone = *timeutil.SystemLocation() - if info.ReorgMeta.Location != nil { - loc, err := info.ReorgMeta.Location.GetLocation() + tz := *timeutil.SystemLocation() + if sctx.GetSessionVars().TimeZone == nil { + sctx.GetSessionVars().TimeZone = &tz + } else { + *sctx.GetSessionVars().TimeZone = tz + } + if tzLocation != nil { + loc, err := tzLocation.GetLocation() if err != nil { return errors.Trace(err) } @@ -829,15 +834,21 @@ func newBackfillScheduler(ctx context.Context, info *reorgInfo, sessPool *sessio func (b *backfillScheduler) newSessCtx() (sessionctx.Context, error) { reorgInfo := b.reorgInfo sessCtx := newContext(reorgInfo.d.store) + if err := initSessCtx(sessCtx, reorgInfo.ReorgMeta.SQLMode, reorgInfo.ReorgMeta.Location); err != nil { + return nil, errors.Trace(err) + } + return sessCtx, nil +} + +func initSessCtx(sessCtx sessionctx.Context, sqlMode mysql.SQLMode, tzLocation *model.TimeZoneLocation) error { sessCtx.GetSessionVars().StmtCtx.IsDDLJobInQueue = true // Set the row encode format version. rowFormat := variable.GetDDLReorgRowFormat() sessCtx.GetSessionVars().RowEncoder.Enable = rowFormat != variable.DefTiDBRowFormatV1 // Simulate the sql mode environment in the worker sessionCtx. - sqlMode := reorgInfo.ReorgMeta.SQLMode sessCtx.GetSessionVars().SQLMode = sqlMode - if err := setSessCtxLocation(sessCtx, reorgInfo); err != nil { - return nil, errors.Trace(err) + if err := setSessCtxLocation(sessCtx, tzLocation); err != nil { + return errors.Trace(err) } sessCtx.GetSessionVars().StmtCtx.BadNullAsWarning = !sqlMode.HasStrictMode() sessCtx.GetSessionVars().StmtCtx.TruncateAsWarning = !sqlMode.HasStrictMode() @@ -846,7 +857,7 @@ func (b *backfillScheduler) newSessCtx() (sessionctx.Context, error) { sessCtx.GetSessionVars().StmtCtx.DividedByZeroAsWarning = !sqlMode.HasStrictMode() sessCtx.GetSessionVars().StmtCtx.IgnoreZeroInDate = !sqlMode.HasStrictMode() || sqlMode.HasAllowInvalidDatesMode() sessCtx.GetSessionVars().StmtCtx.NoZeroDate = sqlMode.HasStrictMode() - return sessCtx, nil + return nil } func (b *backfillScheduler) setMaxWorkerSize(maxSize int) { @@ -1097,6 +1108,7 @@ func addBatchBackfillJobs(sess *session, bfWorkerType backfillerType, reorgInfo if notDistTask { instanceID = reorgInfo.d.uuid } + // TODO: Adjust the number of ranges(region) for each task. for _, task := range batchTasks { bm := &model.BackfillMeta{ @@ -1194,6 +1206,12 @@ func (dc *ddlCtx) controlWritePhysicalTableRecord(sess *session, t table.Physica return errors.Trace(err) } + defaultSQLMode := sess.GetSessionVars().SQLMode + defer func() { + sess.GetSessionVars().SQLMode = defaultSQLMode + }() + // Make timestamp type can be inserted ZeroTimestamp. + sess.GetSessionVars().SQLMode = mysql.ModeNone currBackfillJobID := int64(1) err := checkAndHandleInterruptedBackfillJobs(sess, ddlJobID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey) if err != nil { diff --git a/ddl/constant.go b/ddl/constant.go index 3fe6bf4a04ee6..f9de82e2e6dad 100644 --- a/ddl/constant.go +++ b/ddl/constant.go @@ -58,7 +58,7 @@ const ( store_id bigint, type int, exec_id blob default null, - exec_lease Time, + exec_lease timestamp, state int, curr_key blob, start_key blob, @@ -77,7 +77,7 @@ const ( store_id bigint, type int, exec_id blob default null, - exec_lease Time, + exec_lease timestamp, state int, curr_key blob, start_key blob, diff --git a/ddl/dist_backfilling.go b/ddl/dist_backfilling.go index c8f5cc284c5fd..b85a53340b559 100644 --- a/ddl/dist_backfilling.go +++ b/ddl/dist_backfilling.go @@ -40,7 +40,7 @@ type backfillWorkerContext struct { type newBackfillerFunc func(bfCtx *backfillCtx) (bf backfiller, err error) -func newBackfillWorkerContext(d *ddl, schemaName string, tbl table.Table, workerCnt int, reorgTp model.ReorgType, +func newBackfillWorkerContext(d *ddl, schemaName string, tbl table.Table, workerCnt int, bfMeta *model.BackfillMeta, bfFunc newBackfillerFunc) (*backfillWorkerContext, error) { if workerCnt <= 0 { return nil, nil @@ -51,24 +51,36 @@ func newBackfillWorkerContext(d *ddl, schemaName string, tbl table.Table, worker logutil.BgLogger().Debug("[ddl] no backfill context available now", zap.Int("backfillCtx", len(bCtxs)), zap.Error(err)) return nil, errors.Trace(err) } - seCtxs := make([]sessionctx.Context, 0, len(bCtxs)) + bwCtx := &backfillWorkerContext{backfillWorkers: bCtxs, sessCtxs: make([]sessionctx.Context, 0, len(bCtxs))} + defer func() { + if err != nil { + bwCtx.close(d) + } + }() + for i := 0; i < len(bCtxs); i++ { - se, err := d.sessPool.get() + var se sessionctx.Context + se, err = d.sessPool.get() if err != nil { logutil.BgLogger().Error("[ddl] new backfill worker context, get a session failed", zap.Error(err)) return nil, errors.Trace(err) } - sess := newSession(se) - bfCtx := newBackfillCtx(d.ddlCtx, 0, sess, reorgTp, schemaName, tbl) - bf, err := bfFunc(bfCtx) + err = initSessCtx(se, bfMeta.SQLMode, bfMeta.Location) + if err != nil { + logutil.BgLogger().Error("[ddl] new backfill worker context, init the session ctx failed", zap.Error(err)) + return nil, errors.Trace(err) + } + bfCtx := newBackfillCtx(d.ddlCtx, 0, se, bfMeta.ReorgTp, schemaName, tbl) + var bf backfiller + bf, err = bfFunc(bfCtx) if err != nil { logutil.BgLogger().Error("[ddl] new backfill worker context, do bfFunc failed", zap.Error(err)) return nil, errors.Trace(err) } - seCtxs = append(seCtxs, se) + bwCtx.sessCtxs = append(bwCtx.sessCtxs, se) bCtxs[i].backfiller = bf } - return &backfillWorkerContext{backfillWorkers: bCtxs, sessCtxs: seCtxs}, nil + return bwCtx, nil } func (bwCtx *backfillWorkerContext) GetContext() *backfillWorker { @@ -84,6 +96,15 @@ func (bwCtx *backfillWorkerContext) GetContext() *backfillWorker { return bw } +func (bwCtx *backfillWorkerContext) close(d *ddl) { + for _, s := range bwCtx.sessCtxs { + d.sessPool.put(s) + } + for _, w := range bwCtx.backfillWorkers { + d.backfillCtxPool.put(w) + } +} + type backfilWorkerManager struct { bwCtx *backfillWorkerContext wg util.WaitGroupWrapper @@ -124,12 +145,7 @@ func (bwm *backfilWorkerManager) close(d *ddl) error { close(bwm.exitCh) bwm.wg.Wait() - for _, s := range bwm.bwCtx.sessCtxs { - d.sessPool.put(s) - } - for _, w := range bwm.bwCtx.backfillWorkers { - d.backfillCtxPool.put(w) - } + bwm.bwCtx.close(d) return bwm.unsyncErr } diff --git a/ddl/index.go b/ddl/index.go index 26bf4a2917d89..1ddf2ce04d8f3 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1387,7 +1387,7 @@ func newAddIndexWorkerContext(d *ddl, sess *session, schemaName model.CIStr, tbl bfJob *BackfillJob, jobCtx *JobContext) (*backfillWorkerContext, error) { //nolint:forcetypeassert phyTbl := tbl.(table.PhysicalTable) - return newBackfillWorkerContext(d, schemaName.O, tbl, workerCnt, bfJob.Meta.ReorgTp, + return newBackfillWorkerContext(d, schemaName.O, tbl, workerCnt, bfJob.Meta, func(bfCtx *backfillCtx) (backfiller, error) { decodeColMap, err := makeupDecodeColMap(sess, schemaName, phyTbl) if err != nil { diff --git a/ddl/job_table.go b/ddl/job_table.go index e973324caf195..1b613f3324814 100644 --- a/ddl/job_table.go +++ b/ddl/job_table.go @@ -34,6 +34,7 @@ import ( "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/types" tidbutil "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/gpool/spmc" @@ -693,6 +694,7 @@ func generateInsertBackfillJobSQL(tableName string, backfillJobs []*BackfillJob) if i != 0 { sqlBuilder.WriteString(", ") } + bj.InstanceLease.SetFsp(0) sqlBuilder.WriteString(fmt.Sprintf("(%d, %d, %d, %s, %d, %d, '%s', '%s', %d, %s, %s, %s, %d, %d, %d, %s)", bj.ID, bj.JobID, bj.EleID, wrapKey2String(bj.EleKey), bj.StoreID, bj.Tp, bj.InstanceID, bj.InstanceLease, bj.State, wrapKey2String(bj.CurrKey), wrapKey2String(bj.StartKey), wrapKey2String(bj.EndKey), bj.StartTS, bj.FinishTS, bj.RowCount, wrapKey2String(mateByte))) @@ -757,10 +759,11 @@ func GetBackfillJobsForOneEle(s *session, batch int, excludedJobIDs []int64, lea if err != nil { return err } + leaseStr := currTime.Add(-lease).Format(types.TimeFormat) bJobs, err = GetBackfillJobs(se, BackfillTable, fmt.Sprintf("(exec_ID = '' or exec_lease < '%v') %s order by ddl_job_id, ele_key, ele_id limit %d", - currTime.Add(-lease), eJobIDsBuilder.String(), batch), "get_backfill_job") + leaseStr, eJobIDsBuilder.String(), batch), "get_backfill_job") return err }) if err != nil || len(bJobs) == 0 { @@ -789,10 +792,11 @@ func GetAndMarkBackfillJobsForOneEle(s *session, batch int, jobID int64, uuid st if err != nil { return err } + leaseStr := currTime.Add(-lease).Format(types.TimeFormat) bJobs, err = GetBackfillJobs(se, BackfillTable, fmt.Sprintf("(exec_ID = '' or exec_lease < '%v') and ddl_job_id = %d order by ddl_job_id, ele_key, ele_id limit %d", - currTime.Add(-lease), jobID, batch), "get_mark_backfill_job") + leaseStr, jobID, batch), "get_mark_backfill_job") if err != nil { return err } @@ -937,6 +941,7 @@ func updateBackfillJob(sess *session, tableName string, backfillJob *BackfillJob if err != nil { return err } + backfillJob.InstanceLease.SetFsp(0) sql := fmt.Sprintf("update mysql.%s set exec_id = '%s', exec_lease = '%s', state = %d, curr_key = %s, row_count = %d, backfill_meta = %s where ddl_job_id = %d and ele_id = %d and ele_key = %s and id = %d", tableName, backfillJob.InstanceID, backfillJob.InstanceLease, backfillJob.State, wrapKey2String(backfillJob.CurrKey), backfillJob.RowCount, wrapKey2String(mate), backfillJob.JobID, backfillJob.EleID, wrapKey2String(backfillJob.EleKey), backfillJob.ID) diff --git a/ddl/job_table_test.go b/ddl/job_table_test.go index a901afc0934ba..d9223161a17f9 100644 --- a/ddl/job_table_test.go +++ b/ddl/job_table_test.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/sessiontxn" @@ -198,15 +199,16 @@ func makeAddIdxBackfillJobs(schemaID, tblID, jobID, eleID int64, cnt int, query }, } bj := &ddl.BackfillJob{ - ID: int64(i), - JobID: jobID, - EleID: eleID, - EleKey: meta.IndexElementKey, - State: model.JobStateNone, - CurrKey: sKey, - StartKey: sKey, - EndKey: eKey, - Meta: bm, + ID: int64(i), + JobID: jobID, + EleID: eleID, + EleKey: meta.IndexElementKey, + State: model.JobStateNone, + InstanceLease: types.ZeroTimestamp, + CurrKey: sKey, + StartKey: sKey, + EndKey: eKey, + Meta: bm, } bJobs = append(bJobs, bj) } @@ -263,8 +265,7 @@ func TestSimpleExecBackfillJobs(t *testing.T) { bJobs, err = ddl.GetAndMarkBackfillJobsForOneEle(se, 1, jobID1, uuid, instanceLease) require.EqualError(t, err, dbterror.ErrDDLJobNotFound.FastGen("get zero backfill job").Error()) require.Nil(t, bJobs) - allCnt, err := ddl.GetBackfillJobCount(se, ddl.BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s'", - jobID1, eleID2, meta.IndexElementKey), "check_backfill_job_count") + allCnt, err := ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID1, eleID2), "check_backfill_job_count") require.NoError(t, err) require.Equal(t, allCnt, 0) // Test some backfill jobs, add backfill jobs to the table. @@ -277,6 +278,9 @@ func TestSimpleExecBackfillJobs(t *testing.T) { bjTestCases = append(bjTestCases, bJobs2...) bjTestCases = append(bjTestCases, bJobs3...) err = ddl.AddBackfillJobs(se, bjTestCases) + require.Equal(t, err.Error(), "[table:1292]Incorrect timestamp value: '0000-00-00 00:00:00' for column 'exec_lease' at row 1") + tk.Session().GetSessionVars().SQLMode = mysql.ModeNone + err = ddl.AddBackfillJobs(se, bjTestCases) // ID jobID eleID InstanceID // ------------------------------------- // 0 jobID1 eleID1 uuid @@ -423,12 +427,17 @@ func TestSimpleExecBackfillJobs(t *testing.T) { bJobs, err = ddl.GetInterruptedBackfillJobsForOneEle(se, jobID1, eleID1, eleKey) require.NoError(t, err) require.Len(t, bJobs, 1) - equalBackfillJob(t, bJobs1[1], bJobs[0], types.ZeroTime) + equalBackfillJob(t, bJobs1[1], bJobs[0], types.ZeroTimestamp) // test the BackfillJob's AbbrStr require.Equal(t, fmt.Sprintf("ID:2, JobID:1, EleID:11, Type:add index, State:rollingback, InstanceID:%s, InstanceLease:0000-00-00 00:00:00", uuid), bJobs1[0].AbbrStr()) require.Equal(t, "ID:3, JobID:1, EleID:11, Type:add index, State:cancelled, InstanceID:, InstanceLease:0000-00-00 00:00:00", bJobs1[1].AbbrStr()) require.Equal(t, "ID:0, JobID:2, EleID:33, Type:add index, State:none, InstanceID:, InstanceLease:0000-00-00 00:00:00", bJobs3[0].AbbrStr()) require.Equal(t, "ID:1, JobID:2, EleID:33, Type:add index, State:none, InstanceID:, InstanceLease:0000-00-00 00:00:00", bJobs3[1].AbbrStr()) + // test select tidb_ddl_backfill + tk.MustQuery(fmt.Sprintf("select exec_id, exec_lease from mysql.tidb_ddl_backfill where id = %d and %s", bJobs1[0].ID, getIdxConditionStr(jobID1, eleID1))). + Check(testkit.Rows(fmt.Sprintf("%s 0000-00-00 00:00:00", uuid))) + tk.MustQuery(fmt.Sprintf("select exec_id, exec_lease from mysql.tidb_ddl_backfill where id = %d and %s", bJobs1[1].ID, getIdxConditionStr(jobID1, eleID1))). + Check(testkit.Rows(" 0000-00-00 00:00:00")) // test GetBackfillMetas bfErr := ddl.GetBackfillErr(se, jobID1, eleID1, eleKey) require.Error(t, bfErr, dbterror.ErrCancelledDDLJob) @@ -559,6 +568,7 @@ func TestGetTasks(t *testing.T) { tk := testkit.NewTestKit(t, store) tk.MustExec("use test") se := ddl.NewSession(tk.Session()) + se.GetSessionVars().SQLMode = mysql.ModeNone d := dom.DDL() jobID1 := int64(1) diff --git a/util/gpool/spmc/BUILD.bazel b/util/gpool/spmc/BUILD.bazel index b3fb100373a7a..1c951a219fb20 100644 --- a/util/gpool/spmc/BUILD.bazel +++ b/util/gpool/spmc/BUILD.bazel @@ -11,6 +11,7 @@ go_library( importpath = "github.com/pingcap/tidb/util/gpool/spmc", visibility = ["//visibility:public"], deps = [ + "//resourcemanager", "//resourcemanager/pooltask", "//resourcemanager/util", "//util/gpool", diff --git a/util/gpool/spmc/spmcpool.go b/util/gpool/spmc/spmcpool.go index c99f6b34515bd..b8cecb289c0e5 100644 --- a/util/gpool/spmc/spmcpool.go +++ b/util/gpool/spmc/spmcpool.go @@ -21,6 +21,7 @@ import ( "time" "github.com/pingcap/log" + "github.com/pingcap/tidb/resourcemanager" "github.com/pingcap/tidb/resourcemanager/pooltask" "github.com/pingcap/tidb/resourcemanager/util" "github.com/pingcap/tidb/util/gpool" @@ -78,11 +79,10 @@ func NewSPMCPool[T any, U any, C any, CT any, TF pooltask.Context[CT]](name stri result.capacity.Add(size) result.workers = newWorkerLoopQueue[T, U, C, CT, TF](int(size)) result.cond = sync.NewCond(result.lock) - // TODO: wait https://github.com/pingcap/tidb/pull/40547 - // err := resourcemanager.GlobalResourceManager.Register(result, name, component) - // if err != nil { - // return nil, err - // } + err := resourcemanager.GlobalResourceManager.Register(result, name, component) + if err != nil { + return nil, err + } // Start a goroutine to clean up expired workers periodically. go result.purgePeriodically() return result, nil