Skip to content

Commit

Permalink
util/gpool/spmc, ddl: tiny update and init sess ctx
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala committed Jan 17, 2023
1 parent 0fff15a commit e813a44
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 44 deletions.
34 changes: 26 additions & 8 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,11 +776,16 @@ func makeupDecodeColMap(sessCtx sessionctx.Context, dbName model.CIStr, t table.
return decodeColMap, nil
}

func setSessCtxLocation(sctx sessionctx.Context, info *reorgInfo) error {
func setSessCtxLocation(sctx sessionctx.Context, tzLocation *model.TimeZoneLocation) error {
// It is set to SystemLocation to be compatible with nil LocationInfo.
*sctx.GetSessionVars().TimeZone = *timeutil.SystemLocation()
if info.ReorgMeta.Location != nil {
loc, err := info.ReorgMeta.Location.GetLocation()
tz := *timeutil.SystemLocation()
if sctx.GetSessionVars().TimeZone == nil {
sctx.GetSessionVars().TimeZone = &tz
} else {
*sctx.GetSessionVars().TimeZone = tz
}
if tzLocation != nil {
loc, err := tzLocation.GetLocation()
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -829,15 +834,21 @@ func newBackfillScheduler(ctx context.Context, info *reorgInfo, sessPool *sessio
func (b *backfillScheduler) newSessCtx() (sessionctx.Context, error) {
reorgInfo := b.reorgInfo
sessCtx := newContext(reorgInfo.d.store)
if err := initSessCtx(sessCtx, reorgInfo.ReorgMeta.SQLMode, reorgInfo.ReorgMeta.Location); err != nil {
return nil, errors.Trace(err)
}
return sessCtx, nil
}

func initSessCtx(sessCtx sessionctx.Context, sqlMode mysql.SQLMode, tzLocation *model.TimeZoneLocation) error {
sessCtx.GetSessionVars().StmtCtx.IsDDLJobInQueue = true
// Set the row encode format version.
rowFormat := variable.GetDDLReorgRowFormat()
sessCtx.GetSessionVars().RowEncoder.Enable = rowFormat != variable.DefTiDBRowFormatV1
// Simulate the sql mode environment in the worker sessionCtx.
sqlMode := reorgInfo.ReorgMeta.SQLMode
sessCtx.GetSessionVars().SQLMode = sqlMode
if err := setSessCtxLocation(sessCtx, reorgInfo); err != nil {
return nil, errors.Trace(err)
if err := setSessCtxLocation(sessCtx, tzLocation); err != nil {
return errors.Trace(err)
}
sessCtx.GetSessionVars().StmtCtx.BadNullAsWarning = !sqlMode.HasStrictMode()
sessCtx.GetSessionVars().StmtCtx.TruncateAsWarning = !sqlMode.HasStrictMode()
Expand All @@ -846,7 +857,7 @@ func (b *backfillScheduler) newSessCtx() (sessionctx.Context, error) {
sessCtx.GetSessionVars().StmtCtx.DividedByZeroAsWarning = !sqlMode.HasStrictMode()
sessCtx.GetSessionVars().StmtCtx.IgnoreZeroInDate = !sqlMode.HasStrictMode() || sqlMode.HasAllowInvalidDatesMode()
sessCtx.GetSessionVars().StmtCtx.NoZeroDate = sqlMode.HasStrictMode()
return sessCtx, nil
return nil
}

func (b *backfillScheduler) setMaxWorkerSize(maxSize int) {
Expand Down Expand Up @@ -1097,6 +1108,7 @@ func addBatchBackfillJobs(sess *session, bfWorkerType backfillerType, reorgInfo
if notDistTask {
instanceID = reorgInfo.d.uuid
}

// TODO: Adjust the number of ranges(region) for each task.
for _, task := range batchTasks {
bm := &model.BackfillMeta{
Expand Down Expand Up @@ -1194,6 +1206,12 @@ func (dc *ddlCtx) controlWritePhysicalTableRecord(sess *session, t table.Physica
return errors.Trace(err)
}

defaultSQLMode := sess.GetSessionVars().SQLMode
defer func() {
sess.GetSessionVars().SQLMode = defaultSQLMode
}()
// Make timestamp type can be inserted ZeroTimestamp.
sess.GetSessionVars().SQLMode = mysql.ModeNone
currBackfillJobID := int64(1)
err := checkAndHandleInterruptedBackfillJobs(sess, ddlJobID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions ddl/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ const (
store_id bigint,
type int,
exec_id blob default null,
exec_lease Time,
exec_lease timestamp,
state int,
curr_key blob,
start_key blob,
Expand All @@ -77,7 +77,7 @@ const (
store_id bigint,
type int,
exec_id blob default null,
exec_lease Time,
exec_lease timestamp,
state int,
curr_key blob,
start_key blob,
Expand Down
44 changes: 30 additions & 14 deletions ddl/dist_backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type backfillWorkerContext struct {

type newBackfillerFunc func(bfCtx *backfillCtx) (bf backfiller, err error)

func newBackfillWorkerContext(d *ddl, schemaName string, tbl table.Table, workerCnt int, reorgTp model.ReorgType,
func newBackfillWorkerContext(d *ddl, schemaName string, tbl table.Table, workerCnt int, bfMeta *model.BackfillMeta,
bfFunc newBackfillerFunc) (*backfillWorkerContext, error) {
if workerCnt <= 0 {
return nil, nil
Expand All @@ -51,24 +51,36 @@ func newBackfillWorkerContext(d *ddl, schemaName string, tbl table.Table, worker
logutil.BgLogger().Debug("[ddl] no backfill context available now", zap.Int("backfillCtx", len(bCtxs)), zap.Error(err))
return nil, errors.Trace(err)
}
seCtxs := make([]sessionctx.Context, 0, len(bCtxs))
bwCtx := &backfillWorkerContext{backfillWorkers: bCtxs, sessCtxs: make([]sessionctx.Context, 0, len(bCtxs))}
defer func() {
if err != nil {
bwCtx.close(d)
}
}()

for i := 0; i < len(bCtxs); i++ {
se, err := d.sessPool.get()
var se sessionctx.Context
se, err = d.sessPool.get()
if err != nil {
logutil.BgLogger().Error("[ddl] new backfill worker context, get a session failed", zap.Error(err))
return nil, errors.Trace(err)
}
sess := newSession(se)
bfCtx := newBackfillCtx(d.ddlCtx, 0, sess, reorgTp, schemaName, tbl)
bf, err := bfFunc(bfCtx)
err = initSessCtx(se, bfMeta.SQLMode, bfMeta.Location)
if err != nil {
logutil.BgLogger().Error("[ddl] new backfill worker context, init the session ctx failed", zap.Error(err))
return nil, errors.Trace(err)
}
bfCtx := newBackfillCtx(d.ddlCtx, 0, se, bfMeta.ReorgTp, schemaName, tbl)
var bf backfiller
bf, err = bfFunc(bfCtx)
if err != nil {
logutil.BgLogger().Error("[ddl] new backfill worker context, do bfFunc failed", zap.Error(err))
return nil, errors.Trace(err)
}
seCtxs = append(seCtxs, se)
bwCtx.sessCtxs = append(bwCtx.sessCtxs, se)
bCtxs[i].backfiller = bf
}
return &backfillWorkerContext{backfillWorkers: bCtxs, sessCtxs: seCtxs}, nil
return bwCtx, nil
}

func (bwCtx *backfillWorkerContext) GetContext() *backfillWorker {
Expand All @@ -84,6 +96,15 @@ func (bwCtx *backfillWorkerContext) GetContext() *backfillWorker {
return bw
}

func (bwCtx *backfillWorkerContext) close(d *ddl) {
for _, s := range bwCtx.sessCtxs {
d.sessPool.put(s)
}
for _, w := range bwCtx.backfillWorkers {
d.backfillCtxPool.put(w)
}
}

type backfilWorkerManager struct {
bwCtx *backfillWorkerContext
wg util.WaitGroupWrapper
Expand Down Expand Up @@ -124,12 +145,7 @@ func (bwm *backfilWorkerManager) close(d *ddl) error {
close(bwm.exitCh)
bwm.wg.Wait()

for _, s := range bwm.bwCtx.sessCtxs {
d.sessPool.put(s)
}
for _, w := range bwm.bwCtx.backfillWorkers {
d.backfillCtxPool.put(w)
}
bwm.bwCtx.close(d)

return bwm.unsyncErr
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1387,7 +1387,7 @@ func newAddIndexWorkerContext(d *ddl, sess *session, schemaName model.CIStr, tbl
bfJob *BackfillJob, jobCtx *JobContext) (*backfillWorkerContext, error) {
//nolint:forcetypeassert
phyTbl := tbl.(table.PhysicalTable)
return newBackfillWorkerContext(d, schemaName.O, tbl, workerCnt, bfJob.Meta.ReorgTp,
return newBackfillWorkerContext(d, schemaName.O, tbl, workerCnt, bfJob.Meta,
func(bfCtx *backfillCtx) (backfiller, error) {
decodeColMap, err := makeupDecodeColMap(sess, schemaName, phyTbl)
if err != nil {
Expand Down
9 changes: 7 additions & 2 deletions ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
tidbutil "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/gpool/spmc"
Expand Down Expand Up @@ -693,6 +694,7 @@ func generateInsertBackfillJobSQL(tableName string, backfillJobs []*BackfillJob)
if i != 0 {
sqlBuilder.WriteString(", ")
}
bj.InstanceLease.SetFsp(0)
sqlBuilder.WriteString(fmt.Sprintf("(%d, %d, %d, %s, %d, %d, '%s', '%s', %d, %s, %s, %s, %d, %d, %d, %s)",
bj.ID, bj.JobID, bj.EleID, wrapKey2String(bj.EleKey), bj.StoreID, bj.Tp, bj.InstanceID, bj.InstanceLease, bj.State, wrapKey2String(bj.CurrKey),
wrapKey2String(bj.StartKey), wrapKey2String(bj.EndKey), bj.StartTS, bj.FinishTS, bj.RowCount, wrapKey2String(mateByte)))
Expand Down Expand Up @@ -757,10 +759,11 @@ func GetBackfillJobsForOneEle(s *session, batch int, excludedJobIDs []int64, lea
if err != nil {
return err
}
leaseStr := currTime.Add(-lease).Format(types.TimeFormat)

bJobs, err = GetBackfillJobs(se, BackfillTable,
fmt.Sprintf("(exec_ID = '' or exec_lease < '%v') %s order by ddl_job_id, ele_key, ele_id limit %d",
currTime.Add(-lease), eJobIDsBuilder.String(), batch), "get_backfill_job")
leaseStr, eJobIDsBuilder.String(), batch), "get_backfill_job")
return err
})
if err != nil || len(bJobs) == 0 {
Expand Down Expand Up @@ -789,10 +792,11 @@ func GetAndMarkBackfillJobsForOneEle(s *session, batch int, jobID int64, uuid st
if err != nil {
return err
}
leaseStr := currTime.Add(-lease).Format(types.TimeFormat)

bJobs, err = GetBackfillJobs(se, BackfillTable,
fmt.Sprintf("(exec_ID = '' or exec_lease < '%v') and ddl_job_id = %d order by ddl_job_id, ele_key, ele_id limit %d",
currTime.Add(-lease), jobID, batch), "get_mark_backfill_job")
leaseStr, jobID, batch), "get_mark_backfill_job")
if err != nil {
return err
}
Expand Down Expand Up @@ -937,6 +941,7 @@ func updateBackfillJob(sess *session, tableName string, backfillJob *BackfillJob
if err != nil {
return err
}
backfillJob.InstanceLease.SetFsp(0)
sql := fmt.Sprintf("update mysql.%s set exec_id = '%s', exec_lease = '%s', state = %d, curr_key = %s, row_count = %d, backfill_meta = %s where ddl_job_id = %d and ele_id = %d and ele_key = %s and id = %d",
tableName, backfillJob.InstanceID, backfillJob.InstanceLease, backfillJob.State, wrapKey2String(backfillJob.CurrKey), backfillJob.RowCount,
wrapKey2String(mate), backfillJob.JobID, backfillJob.EleID, wrapKey2String(backfillJob.EleKey), backfillJob.ID)
Expand Down
34 changes: 22 additions & 12 deletions ddl/job_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
Expand Down Expand Up @@ -198,15 +199,16 @@ func makeAddIdxBackfillJobs(schemaID, tblID, jobID, eleID int64, cnt int, query
},
}
bj := &ddl.BackfillJob{
ID: int64(i),
JobID: jobID,
EleID: eleID,
EleKey: meta.IndexElementKey,
State: model.JobStateNone,
CurrKey: sKey,
StartKey: sKey,
EndKey: eKey,
Meta: bm,
ID: int64(i),
JobID: jobID,
EleID: eleID,
EleKey: meta.IndexElementKey,
State: model.JobStateNone,
InstanceLease: types.ZeroTimestamp,
CurrKey: sKey,
StartKey: sKey,
EndKey: eKey,
Meta: bm,
}
bJobs = append(bJobs, bj)
}
Expand Down Expand Up @@ -263,8 +265,7 @@ func TestSimpleExecBackfillJobs(t *testing.T) {
bJobs, err = ddl.GetAndMarkBackfillJobsForOneEle(se, 1, jobID1, uuid, instanceLease)
require.EqualError(t, err, dbterror.ErrDDLJobNotFound.FastGen("get zero backfill job").Error())
require.Nil(t, bJobs)
allCnt, err := ddl.GetBackfillJobCount(se, ddl.BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s'",
jobID1, eleID2, meta.IndexElementKey), "check_backfill_job_count")
allCnt, err := ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID1, eleID2), "check_backfill_job_count")
require.NoError(t, err)
require.Equal(t, allCnt, 0)
// Test some backfill jobs, add backfill jobs to the table.
Expand All @@ -277,6 +278,9 @@ func TestSimpleExecBackfillJobs(t *testing.T) {
bjTestCases = append(bjTestCases, bJobs2...)
bjTestCases = append(bjTestCases, bJobs3...)
err = ddl.AddBackfillJobs(se, bjTestCases)
require.Equal(t, err.Error(), "[table:1292]Incorrect timestamp value: '0000-00-00 00:00:00' for column 'exec_lease' at row 1")
tk.Session().GetSessionVars().SQLMode = mysql.ModeNone
err = ddl.AddBackfillJobs(se, bjTestCases)
// ID jobID eleID InstanceID
// -------------------------------------
// 0 jobID1 eleID1 uuid
Expand Down Expand Up @@ -423,12 +427,17 @@ func TestSimpleExecBackfillJobs(t *testing.T) {
bJobs, err = ddl.GetInterruptedBackfillJobsForOneEle(se, jobID1, eleID1, eleKey)
require.NoError(t, err)
require.Len(t, bJobs, 1)
equalBackfillJob(t, bJobs1[1], bJobs[0], types.ZeroTime)
equalBackfillJob(t, bJobs1[1], bJobs[0], types.ZeroTimestamp)
// test the BackfillJob's AbbrStr
require.Equal(t, fmt.Sprintf("ID:2, JobID:1, EleID:11, Type:add index, State:rollingback, InstanceID:%s, InstanceLease:0000-00-00 00:00:00", uuid), bJobs1[0].AbbrStr())
require.Equal(t, "ID:3, JobID:1, EleID:11, Type:add index, State:cancelled, InstanceID:, InstanceLease:0000-00-00 00:00:00", bJobs1[1].AbbrStr())
require.Equal(t, "ID:0, JobID:2, EleID:33, Type:add index, State:none, InstanceID:, InstanceLease:0000-00-00 00:00:00", bJobs3[0].AbbrStr())
require.Equal(t, "ID:1, JobID:2, EleID:33, Type:add index, State:none, InstanceID:, InstanceLease:0000-00-00 00:00:00", bJobs3[1].AbbrStr())
// test select tidb_ddl_backfill
tk.MustQuery(fmt.Sprintf("select exec_id, exec_lease from mysql.tidb_ddl_backfill where id = %d and %s", bJobs1[0].ID, getIdxConditionStr(jobID1, eleID1))).
Check(testkit.Rows(fmt.Sprintf("%s 0000-00-00 00:00:00", uuid)))
tk.MustQuery(fmt.Sprintf("select exec_id, exec_lease from mysql.tidb_ddl_backfill where id = %d and %s", bJobs1[1].ID, getIdxConditionStr(jobID1, eleID1))).
Check(testkit.Rows(" 0000-00-00 00:00:00"))
// test GetBackfillMetas
bfErr := ddl.GetBackfillErr(se, jobID1, eleID1, eleKey)
require.Error(t, bfErr, dbterror.ErrCancelledDDLJob)
Expand Down Expand Up @@ -559,6 +568,7 @@ func TestGetTasks(t *testing.T) {
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
se := ddl.NewSession(tk.Session())
se.GetSessionVars().SQLMode = mysql.ModeNone
d := dom.DDL()

jobID1 := int64(1)
Expand Down
1 change: 1 addition & 0 deletions util/gpool/spmc/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
importpath = "github.com/pingcap/tidb/util/gpool/spmc",
visibility = ["//visibility:public"],
deps = [
"//resourcemanager",
"//resourcemanager/pooltask",
"//resourcemanager/util",
"//util/gpool",
Expand Down
10 changes: 5 additions & 5 deletions util/gpool/spmc/spmcpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/pingcap/log"
"github.com/pingcap/tidb/resourcemanager"
"github.com/pingcap/tidb/resourcemanager/pooltask"
"github.com/pingcap/tidb/resourcemanager/util"
"github.com/pingcap/tidb/util/gpool"
Expand Down Expand Up @@ -78,11 +79,10 @@ func NewSPMCPool[T any, U any, C any, CT any, TF pooltask.Context[CT]](name stri
result.capacity.Add(size)
result.workers = newWorkerLoopQueue[T, U, C, CT, TF](int(size))
result.cond = sync.NewCond(result.lock)
// TODO: wait https://github.com/pingcap/tidb/pull/40547
// err := resourcemanager.GlobalResourceManager.Register(result, name, component)
// if err != nil {
// return nil, err
// }
err := resourcemanager.GlobalResourceManager.Register(result, name, component)
if err != nil {
return nil, err
}
// Start a goroutine to clean up expired workers periodically.
go result.purgePeriodically()
return result, nil
Expand Down

0 comments on commit e813a44

Please sign in to comment.