From 827d8ff2d22ac4c93ae1b841b79d468211e1d393 Mon Sep 17 00:00:00 2001 From: YangKeao Date: Tue, 13 Dec 2022 08:52:52 -0500 Subject: [PATCH 1/6] ttl: fix scan workers shrink (#39860) close pingcap/tidb#39859 --- ttl/ttlworker/job_manager.go | 64 ++++++++++++++++++++++++------- ttl/ttlworker/job_manager_test.go | 20 +++++++++- ttl/ttlworker/scan.go | 2 + ttl/ttlworker/worker.go | 6 +++ 4 files changed, 77 insertions(+), 15 deletions(-) diff --git a/ttl/ttlworker/job_manager.go b/ttl/ttlworker/job_manager.go index 7b80fa3165c50..10e0e8751173e 100644 --- a/ttl/ttlworker/job_manager.go +++ b/ttl/ttlworker/job_manager.go @@ -180,39 +180,73 @@ func (m *JobManager) reportMetrics() { func (m *JobManager) resizeScanWorkers(count int) error { var err error - m.scanWorkers, err = m.resizeWorkers(m.scanWorkers, count, func() worker { + var canceledWorkers []worker + m.scanWorkers, canceledWorkers, err = m.resizeWorkers(m.scanWorkers, count, func() worker { return newScanWorker(m.delCh, m.notifyStateCh, m.sessPool) }) + for _, w := range canceledWorkers { + s := w.(scanWorker) + + var tableID int64 + var scanErr error + result := s.PollTaskResult() + if result != nil { + tableID = result.task.tbl.ID + scanErr = result.err + } else { + // if the scan worker failed to poll the task, it's possible that the `WaitStopped` has timeout + // we still consider the scan task as finished + curTask := s.CurrentTask() + if curTask == nil { + continue + } + tableID = curTask.tbl.ID + scanErr = errors.New("timeout to cancel scan task") + } + + job := findJobWithTableID(m.runningJobs, tableID) + if job == nil { + logutil.Logger(m.ctx).Warn("task state changed but job not found", zap.Int64("tableID", tableID)) + continue + } + logutil.Logger(m.ctx).Debug("scan task finished", zap.String("jobID", job.id)) + job.finishedScanTaskCounter += 1 + job.scanTaskErr = multierr.Append(job.scanTaskErr, scanErr) + } return err } func (m *JobManager) resizeDelWorkers(count int) error { var err error - m.delWorkers, err = m.resizeWorkers(m.delWorkers, count, func() worker { + m.delWorkers, _, err = m.resizeWorkers(m.delWorkers, count, func() worker { return newDeleteWorker(m.delCh, m.sessPool) }) return err } -func (m *JobManager) resizeWorkers(workers []worker, count int, factory func() worker) ([]worker, error) { +// resizeWorkers scales the worker, and returns the full set of workers as the first return value. If there are workers +// stopped, return the stopped worker in the second return value +func (m *JobManager) resizeWorkers(workers []worker, count int, factory func() worker) ([]worker, []worker, error) { if count < len(workers) { logutil.Logger(m.ctx).Info("shrink ttl worker", zap.Int("originalCount", len(workers)), zap.Int("newCount", count)) for _, w := range workers[count:] { w.Stop() } + var errs error + ctx, cancel := context.WithTimeout(m.ctx, 30*time.Second) for _, w := range workers[count:] { - err := w.WaitStopped(m.ctx, 30*time.Second) + err := w.WaitStopped(ctx, 30*time.Second) if err != nil { logutil.Logger(m.ctx).Warn("fail to stop ttl worker", zap.Error(err)) errs = multierr.Append(errs, err) } } + cancel() // remove the existing workers, and keep the left workers - workers = workers[:count] - return workers, errs + return workers[:count], workers[count:], errs } if count > len(workers) { @@ -223,29 +257,31 @@ func (m *JobManager) resizeWorkers(workers []worker, count int, factory func() w w.Start() workers = append(workers, w) } - return workers, nil + return workers, nil, nil } - return workers, nil + return workers, nil, nil } func (m *JobManager) updateTaskState() { results := m.pollScanWorkerResults() for _, result := range results { job := findJobWithTableID(m.runningJobs, result.task.tbl.ID) - if job != nil { - logutil.Logger(m.ctx).Debug("scan task state changed", zap.String("jobID", job.id)) - - job.finishedScanTaskCounter += 1 - job.scanTaskErr = multierr.Append(job.scanTaskErr, result.err) + if job == nil { + logutil.Logger(m.ctx).Warn("task state changed but job not found", zap.Int64("tableID", result.task.tbl.ID)) + continue } + logutil.Logger(m.ctx).Debug("scan task finished", zap.String("jobID", job.id)) + + job.finishedScanTaskCounter += 1 + job.scanTaskErr = multierr.Append(job.scanTaskErr, result.err) } } func (m *JobManager) pollScanWorkerResults() []*ttlScanTaskExecResult { results := make([]*ttlScanTaskExecResult, 0, len(m.scanWorkers)) for _, w := range m.scanWorkers { - worker := w.(*ttlScanWorker) + worker := w.(scanWorker) result := worker.PollTaskResult() if result != nil { results = append(results, result) diff --git a/ttl/ttlworker/job_manager_test.go b/ttl/ttlworker/job_manager_test.go index edc9c9beee616..87bc19ca08261 100644 --- a/ttl/ttlworker/job_manager_test.go +++ b/ttl/ttlworker/job_manager_test.go @@ -306,7 +306,7 @@ func TestResizeWorkers(t *testing.T) { m.SetScanWorkers4Test([]worker{ scanWorker1, }) - newWorkers, err := m.resizeWorkers(m.scanWorkers, 2, func() worker { + newWorkers, _, err := m.resizeWorkers(m.scanWorkers, 2, func() worker { return scanWorker2 }) assert.NoError(t, err) @@ -328,6 +328,24 @@ func TestResizeWorkers(t *testing.T) { assert.NoError(t, m.resizeScanWorkers(1)) scanWorker2.checkWorkerStatus(workerStatusStopped, false, nil) + + // shrink scan workers after job is run + scanWorker1 = newMockScanWorker(t) + scanWorker1.Start() + scanWorker2 = newMockScanWorker(t) + scanWorker2.Start() + + m = NewJobManager("test-id", newMockSessionPool(t, tbl), nil) + m.SetScanWorkers4Test([]worker{ + scanWorker1, + scanWorker2, + }) + m.runningJobs = append(m.runningJobs, &ttlJob{tbl: tbl}) + + scanWorker2.curTaskResult = &ttlScanTaskExecResult{task: &ttlScanTask{tbl: tbl}} + assert.NoError(t, m.resizeScanWorkers(1)) + scanWorker2.checkWorkerStatus(workerStatusStopped, false, nil) + assert.Equal(t, m.runningJobs[0].finishedScanTaskCounter, 1) } func TestLocalJobs(t *testing.T) { diff --git a/ttl/ttlworker/scan.go b/ttl/ttlworker/scan.go index 8a7487d973e35..242c51fb8b686 100644 --- a/ttl/ttlworker/scan.go +++ b/ttl/ttlworker/scan.go @@ -349,4 +349,6 @@ type scanWorker interface { Idle() bool Schedule(*ttlScanTask) error + PollTaskResult() *ttlScanTaskExecResult + CurrentTask() *ttlScanTask } diff --git a/ttl/ttlworker/worker.go b/ttl/ttlworker/worker.go index a04110373cdbf..783384862cacf 100644 --- a/ttl/ttlworker/worker.go +++ b/ttl/ttlworker/worker.go @@ -96,6 +96,12 @@ func (w *baseWorker) Error() error { } func (w *baseWorker) WaitStopped(ctx context.Context, timeout time.Duration) error { + // consider the situation when the worker has stopped, but the context has also stopped. We should + // return without error + if w.Status() == workerStatusStopped { + return nil + } + ctx, cancel := context.WithTimeout(ctx, timeout) go func() { w.wg.Wait() From 222faa4346b92f608d2ee4628e41a546d180ef7a Mon Sep 17 00:00:00 2001 From: raymonder jin Date: Tue, 13 Dec 2022 22:50:52 +0800 Subject: [PATCH 2/6] docs: fix typos (#39870) --- br/COMPATIBILITY_TEST.md | 2 +- docs/design/2019-11-05-index-advisor.md | 2 +- docs/design/2020-01-24-collations.md | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/br/COMPATIBILITY_TEST.md b/br/COMPATIBILITY_TEST.md index b5580835baee8..44984ebcd2bfa 100644 --- a/br/COMPATIBILITY_TEST.md +++ b/br/COMPATIBILITY_TEST.md @@ -3,7 +3,7 @@ ## Background We had some incompatibility issues in the past, which made BR cannot restore backed up data in some situations. -So we need a test workflow to check the compatiblity. +So we need a test workflow to check the compatibility. ## Goal diff --git a/docs/design/2019-11-05-index-advisor.md b/docs/design/2019-11-05-index-advisor.md index 53abab5ba0d5a..5606d9bd9a942 100644 --- a/docs/design/2019-11-05-index-advisor.md +++ b/docs/design/2019-11-05-index-advisor.md @@ -57,7 +57,7 @@ for { Note that executing `Swap and Re-evaluate` algorithm is necessary as the `reduced_cost` sometimes is a joint effect of several indexes and it's hard to tell each index's independent contribution to the final `reduced_cost`. For example, assume there is an extremely slow query in input workload and the desired indexes for this query is `a` and `b`. However, the number of allowed recommended indexes for the whole workload is limited and for some reason, `a` ranks top `n` in the final score list while `b` is not. But there are chances that without `b`, `a` can no more optimize that extremely slow query. ---------------------------------------------- -### A quick exmaple for single-column index recommendation +### A quick example for single-column index recommendation **Workload**: diff --git a/docs/design/2020-01-24-collations.md b/docs/design/2020-01-24-collations.md index af7c9e30b2f26..d610514ed76d5 100644 --- a/docs/design/2020-01-24-collations.md +++ b/docs/design/2020-01-24-collations.md @@ -233,7 +233,7 @@ The following features of the general collation algorithm will be supported: * Tertiary Weight i.e. case * PAD / NOPAD -All of them are supported by `text/collate` package of Go, so it is possible to map Go collations to some of UCA-based collations in MySQL like `utf8mb4_unicode_ci`/`utf8mb4_0900_ai_ci`, if we ignore the differences between UCA versions: current `text/collate` uses UCA version `6.2.0` and it is not changable. However, the collations in MySQL are with different UCA versions marked in the names, for example, `utf8mb4_0900_ai_ci` uses version `9.0`. +All of them are supported by `text/collate` package of Go, so it is possible to map Go collations to some of UCA-based collations in MySQL like `utf8mb4_unicode_ci`/`utf8mb4_0900_ai_ci`, if we ignore the differences between UCA versions: current `text/collate` uses UCA version `6.2.0` and it is not changeable. However, the collations in MySQL are with different UCA versions marked in the names, for example, `utf8mb4_0900_ai_ci` uses version `9.0`. For non-standard UCA implementations in MySQL, i.e. the `utf8mb4_general_ci`. The implementation depends on our choice to the [Compatibility with MySQL](#compatibility-with-mysql) chapter, if a 100% compatibility of `utf8mb4_general_ci` is chosen, we need to implement it by our hands. From 4b3a4424e1ea7e28cc04696a271cf68f3f41f1f1 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Tue, 13 Dec 2022 23:22:51 +0800 Subject: [PATCH 3/6] domain: support dump sql meta in plan replayer (#39863) close pingcap/tidb#39883 --- domain/plan_replayer.go | 1 + domain/plan_replayer_dump.go | 42 +++++++++++++++++++++++++++++++----- executor/compiler.go | 13 +++++++---- executor/executor_test.go | 1 + executor/plan_replayer.go | 22 +++++++++++++------ 5 files changed, 63 insertions(+), 16 deletions(-) diff --git a/domain/plan_replayer.go b/domain/plan_replayer.go index 66f99b96d0fe6..19b5b9144836c 100644 --- a/domain/plan_replayer.go +++ b/domain/plan_replayer.go @@ -527,6 +527,7 @@ type PlanReplayerDumpTask struct { TblStats map[int64]interface{} // variables used to dump the plan + StartTS uint64 SessionBindings []*bindinfo.BindRecord EncodedPlan string SessionVars *variable.SessionVars diff --git a/domain/plan_replayer_dump.go b/domain/plan_replayer_dump.go index 575d0bc35264e..504fba64b7548 100644 --- a/domain/plan_replayer_dump.go +++ b/domain/plan_replayer_dump.go @@ -41,6 +41,8 @@ import ( ) const ( + // planReplayerSQLMeta indicates sql meta path for plan replayer + planReplayerSQLMeta = "sql_meta.toml" // PlanReplayerConfigFile indicates config file path for plan replayer PlanReplayerConfigFile = "config.toml" // PlanReplayerMetaFile meta file path for plan replayer @@ -55,6 +57,11 @@ const ( PlanReplayerGlobalBindingFile = "global_bindings.sql" ) +const ( + // PlanReplayerSQLMetaStartTS indicates the startTS in plan replayer sql meta + PlanReplayerSQLMetaStartTS = "startTS" +) + type tableNamePair struct { DBName string TableName string @@ -131,6 +138,7 @@ func (tne *tableNameExtractor) handleIsView(t *ast.TableName) (bool, error) { // DumpPlanReplayerInfo will dump the information about sqls. // The files will be organized into the following format: /* + |-sql_meta.toml |-meta.txt |-schema | |-db1.table1.schema.txt @@ -164,7 +172,7 @@ func DumpPlanReplayerInfo(ctx context.Context, sctx sessionctx.Context, sessionVars := task.SessionVars execStmts := task.ExecStmts zw := zip.NewWriter(zf) - records := generateRecords(task) + var records []PlanReplayerStatusRecord defer func() { if err != nil { logutil.BgLogger().Error("dump plan replayer failed", zap.Error(err)) @@ -183,6 +191,12 @@ func DumpPlanReplayerInfo(ctx context.Context, sctx sessionctx.Context, } insertPlanReplayerStatus(ctx, sctx, records) }() + + // Dump SQLMeta + if err = dumpSQLMeta(zw, task); err != nil { + return err + } + // Dump config if err = dumpConfig(zw); err != nil { return err @@ -244,10 +258,11 @@ func DumpPlanReplayerInfo(ctx context.Context, sctx sessionctx.Context, } if len(task.EncodedPlan) > 0 { + records = generateRecords(task) return dumpEncodedPlan(sctx, zw, task.EncodedPlan) } // Dump explain - return dumpExplain(sctx, zw, execStmts, task.Analyze) + return dumpExplain(sctx, zw, task, &records) } func generateRecords(task *PlanReplayerDumpTask) []PlanReplayerStatusRecord { @@ -265,6 +280,19 @@ func generateRecords(task *PlanReplayerDumpTask) []PlanReplayerStatusRecord { return records } +func dumpSQLMeta(zw *zip.Writer, task *PlanReplayerDumpTask) error { + cf, err := zw.Create(planReplayerSQLMeta) + if err != nil { + return errors.AddStack(err) + } + varMap := make(map[string]string) + varMap[PlanReplayerSQLMetaStartTS] = strconv.FormatUint(task.StartTS, 10) + if err := toml.NewEncoder(cf).Encode(varMap); err != nil { + return errors.AddStack(err) + } + return nil +} + func dumpConfig(zw *zip.Writer) error { cf, err := zw.Create(PlanReplayerConfigFile) if err != nil { @@ -488,12 +516,12 @@ func dumpEncodedPlan(ctx sessionctx.Context, zw *zip.Writer, encodedPlan string) return nil } -func dumpExplain(ctx sessionctx.Context, zw *zip.Writer, execStmts []ast.StmtNode, isAnalyze bool) error { - for i, stmtExec := range execStmts { +func dumpExplain(ctx sessionctx.Context, zw *zip.Writer, task *PlanReplayerDumpTask, records *[]PlanReplayerStatusRecord) error { + for i, stmtExec := range task.ExecStmts { sql := stmtExec.Text() var recordSets []sqlexec.RecordSet var err error - if isAnalyze { + if task.Analyze { // Explain analyze recordSets, err = ctx.(sqlexec.SQLExecutor).Execute(context.Background(), fmt.Sprintf("explain analyze %s", sql)) if err != nil { @@ -522,6 +550,10 @@ func dumpExplain(ctx sessionctx.Context, zw *zip.Writer, execStmts []ast.StmtNod return err } } + *records = append(*records, PlanReplayerStatusRecord{ + OriginSQL: sql, + Token: task.FileName, + }) } return nil } diff --git a/executor/compiler.go b/executor/compiler.go index ce8b487e24657..10e624d1920bd 100644 --- a/executor/compiler.go +++ b/executor/compiler.go @@ -157,13 +157,17 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (_ *ExecS } } if c.Ctx.GetSessionVars().EnablePlanReplayerCapture && !c.Ctx.GetSessionVars().InRestrictedSQL { - checkPlanReplayerCaptureTask(c.Ctx, stmtNode) + startTS, err := sessiontxn.GetTxnManager(c.Ctx).GetStmtReadTS() + if err != nil { + return nil, err + } + checkPlanReplayerCaptureTask(c.Ctx, stmtNode, startTS) } return stmt, nil } -func checkPlanReplayerCaptureTask(sctx sessionctx.Context, stmtNode ast.StmtNode) { +func checkPlanReplayerCaptureTask(sctx sessionctx.Context, stmtNode ast.StmtNode, startTS uint64) { dom := domain.GetDomain(sctx) if dom == nil { return @@ -178,14 +182,14 @@ func checkPlanReplayerCaptureTask(sctx sessionctx.Context, stmtNode ast.StmtNode for _, task := range tasks { if task.SQLDigest == sqlDigest.String() { if task.PlanDigest == "*" || task.PlanDigest == planDigest.String() { - sendPlanReplayerDumpTask(sqlDigest.String(), planDigest.String(), sctx, stmtNode) + sendPlanReplayerDumpTask(sqlDigest.String(), planDigest.String(), sctx, stmtNode, startTS) return } } } } -func sendPlanReplayerDumpTask(sqlDigest, planDigest string, sctx sessionctx.Context, stmtNode ast.StmtNode) { +func sendPlanReplayerDumpTask(sqlDigest, planDigest string, sctx sessionctx.Context, stmtNode ast.StmtNode, startTS uint64) { stmtCtx := sctx.GetSessionVars().StmtCtx handle := sctx.Value(bindinfo.SessionBindInfoKeyType).(*bindinfo.SessionHandle) dumpTask := &domain.PlanReplayerDumpTask{ @@ -193,6 +197,7 @@ func sendPlanReplayerDumpTask(sqlDigest, planDigest string, sctx sessionctx.Cont SQLDigest: sqlDigest, PlanDigest: planDigest, }, + StartTS: startTS, EncodePlan: GetEncodedPlan, TblStats: stmtCtx.TableStats, SessionBindings: handle.GetAllBindRecord(), diff --git a/executor/executor_test.go b/executor/executor_test.go index 59e70022727d5..fe29cdf823349 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -87,6 +87,7 @@ func checkFileName(s string) bool { "global_bindings.sql", "sql/sql0.sql", "explain/sql0.txt", + "sql_meta.toml", } for _, f := range files { if strings.Compare(f, s) == 0 { diff --git a/executor/plan_replayer.go b/executor/plan_replayer.go index 490e65ef96dd2..6dcee1efc624c 100644 --- a/executor/plan_replayer.go +++ b/executor/plan_replayer.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/sessiontxn" "github.com/pingcap/tidb/statistics/handle" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/logutil" @@ -133,7 +134,12 @@ func (e *PlanReplayerExec) createFile() error { func (e *PlanReplayerDumpInfo) dump(ctx context.Context) (err error) { fileName := e.FileName zf := e.File + startTS, err := sessiontxn.GetTxnManager(e.ctx).GetStmtReadTS() + if err != nil { + return err + } task := &domain.PlanReplayerDumpTask{ + StartTS: startTS, FileName: fileName, Zf: zf, SessionVars: e.ctx.GetSessionVars(), @@ -375,21 +381,23 @@ func createSchemaAndItems(ctx sessionctx.Context, f *zip.File) error { if err != nil { return errors.AddStack(err) } - sqls := strings.Split(buf.String(), ";") - if len(sqls) != 3 { - return errors.New("plan replayer: create schema and tables failed") - } + originText := buf.String() + index1 := strings.Index(originText, ";") + createDatabaseSQL := originText[:index1+1] + index2 := strings.Index(originText[index1+1:], ";") + useDatabaseSQL := originText[index1+1:][:index2+1] + createTableSQL := originText[index1+1:][index2+1:] c := context.Background() // create database if not exists - _, err = ctx.(sqlexec.SQLExecutor).Execute(c, sqls[0]) + _, err = ctx.(sqlexec.SQLExecutor).Execute(c, createDatabaseSQL) logutil.BgLogger().Debug("plan replayer: skip error", zap.Error(err)) // use database - _, err = ctx.(sqlexec.SQLExecutor).Execute(c, sqls[1]) + _, err = ctx.(sqlexec.SQLExecutor).Execute(c, useDatabaseSQL) if err != nil { return err } // create table or view - _, err = ctx.(sqlexec.SQLExecutor).Execute(c, sqls[2]) + _, err = ctx.(sqlexec.SQLExecutor).Execute(c, createTableSQL) if err != nil { return err } From 1e7c6ddbe6493d5e3c78a0b71e6432c917e43255 Mon Sep 17 00:00:00 2001 From: TonsnakeLin <87681388+TonsnakeLin@users.noreply.github.com> Date: Wed, 14 Dec 2022 00:08:52 +0800 Subject: [PATCH 4/6] SlowLog: Update `Wait_TS` field in slow log for every sql (#39843) close pingcap/tidb#39713 --- executor/executor.go | 1 + sessiontxn/isolation/readcommitted.go | 3 ++ sessiontxn/isolation/repeatable_read.go | 3 ++ sessiontxn/isolation/repeatable_read_test.go | 22 +++++++++++++++ sessiontxn/txn_rc_tso_optimize_test.go | 29 ++++++++++++++++++++ 5 files changed, 58 insertions(+) diff --git a/executor/executor.go b/executor/executor.go index cb96942e8f776..90622ce52e527 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -2167,6 +2167,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { vars.ClearStmtVars() vars.PrevFoundInBinding = vars.FoundInBinding vars.FoundInBinding = false + vars.DurationWaitTS = 0 return } diff --git a/sessiontxn/isolation/readcommitted.go b/sessiontxn/isolation/readcommitted.go index 8752eb5b35e11..ca198fda6b9e7 100644 --- a/sessiontxn/isolation/readcommitted.go +++ b/sessiontxn/isolation/readcommitted.go @@ -16,6 +16,7 @@ package isolation import ( "context" + "time" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -186,9 +187,11 @@ func (p *PessimisticRCTxnContextProvider) getStmtTS() (ts uint64, err error) { } p.prepareStmtTS() + start := time.Now() if ts, err = p.stmtTSFuture.Wait(); err != nil { return 0, err } + p.sctx.GetSessionVars().DurationWaitTS += time.Since(start) txn.SetOption(kv.SnapshotTS, ts) p.stmtTS = ts diff --git a/sessiontxn/isolation/repeatable_read.go b/sessiontxn/isolation/repeatable_read.go index 18fa2ebd8608c..043998384951c 100644 --- a/sessiontxn/isolation/repeatable_read.go +++ b/sessiontxn/isolation/repeatable_read.go @@ -16,6 +16,7 @@ package isolation import ( "context" + "time" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -83,9 +84,11 @@ func (p *PessimisticRRTxnContextProvider) getForUpdateTs() (ts uint64, err error txnCtx := p.sctx.GetSessionVars().TxnCtx futureTS := newOracleFuture(p.ctx, p.sctx, txnCtx.TxnScope) + start := time.Now() if ts, err = futureTS.Wait(); err != nil { return 0, err } + p.sctx.GetSessionVars().DurationWaitTS += time.Since(start) txnCtx.SetForUpdateTS(ts) txn.SetOption(kv.SnapshotTS, ts) diff --git a/sessiontxn/isolation/repeatable_read_test.go b/sessiontxn/isolation/repeatable_read_test.go index da798f05c2152..085b64c34cc38 100644 --- a/sessiontxn/isolation/repeatable_read_test.go +++ b/sessiontxn/isolation/repeatable_read_test.go @@ -678,3 +678,25 @@ func initializeRepeatableReadProvider(t *testing.T, tk *testkit.TestKit, active require.NoError(t, tk.Session().PrepareTxnCtx(context.TODO())) return assert.CheckAndGetProvider(t) } + +func TestRRWaitTSTimeInSlowLog(t *testing.T) { + store := testkit.CreateMockStore(t) + + tk := testkit.NewTestKit(t, store) + se := tk.Session() + + tk.MustExec("use test") + tk.MustExec("create table t (id int primary key, v int)") + tk.MustExec("insert into t values (1, 1)") + + tk.MustExec("begin pessimistic") + waitTS1 := se.GetSessionVars().DurationWaitTS + tk.MustExec("update t set v = v + 10 where id = 1") + waitTS2 := se.GetSessionVars().DurationWaitTS + tk.MustExec("delete from t") + waitTS3 := se.GetSessionVars().DurationWaitTS + tk.MustExec("commit") + require.NotEqual(t, waitTS1, waitTS2) + require.NotEqual(t, waitTS1, waitTS3) + require.NotEqual(t, waitTS2, waitTS3) +} diff --git a/sessiontxn/txn_rc_tso_optimize_test.go b/sessiontxn/txn_rc_tso_optimize_test.go index 59bbf0de330bc..f321d40340496 100644 --- a/sessiontxn/txn_rc_tso_optimize_test.go +++ b/sessiontxn/txn_rc_tso_optimize_test.go @@ -790,3 +790,32 @@ func TestConflictErrorsUseRcWriteCheckTs(t *testing.T) { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/assertPessimisticLockErr")) } + +func TestRcWaitTSInSlowLog(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("set global transaction_isolation = 'READ-COMMITTED'") + tk.RefreshSession() + sctx := tk.Session() + + tk.MustExec("use test") + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1(id1 int, id2 int, id3 int, PRIMARY KEY(id1), UNIQUE KEY udx_id2 (id2))") + tk.MustExec("insert into t1 values (1, 1, 1), (2, 2, 2), (3, 3, 3)") + + res := tk.MustQuery("show variables like 'transaction_isolation'") + require.Equal(t, "READ-COMMITTED", res.Rows()[0][1]) + sctx.SetValue(sessiontxn.TsoRequestCount, 0) + + tk.MustExec("begin pessimistic") + waitTs1 := sctx.GetSessionVars().DurationWaitTS + tk.MustExec("update t1 set id3 = id3 + 10 where id1 = 1") + waitTs2 := sctx.GetSessionVars().DurationWaitTS + tk.MustExec("update t1 set id3 = id3 + 10 where id1 > 3 and id1 < 6") + waitTs3 := sctx.GetSessionVars().DurationWaitTS + tk.MustExec("commit") + require.NotEqual(t, waitTs1, waitTs2) + require.NotEqual(t, waitTs1, waitTs2) + require.NotEqual(t, waitTs2, waitTs3) +} From 98cef5a5b1f7a8cb7ec28b7e105e88efcd9b4cdf Mon Sep 17 00:00:00 2001 From: Lynn Date: Wed, 14 Dec 2022 00:58:51 +0800 Subject: [PATCH 5/6] session: fix a bug for InitDDLJobTables (#39861) ref pingcap/tidb#39854 --- session/session.go | 86 +++++++++++++++++++++++++++------------------- 1 file changed, 50 insertions(+), 36 deletions(-) diff --git a/session/session.go b/session/session.go index 40b3921a4145c..be15798c7f236 100644 --- a/session/session.go +++ b/session/session.go @@ -3055,22 +3055,21 @@ func loadCollationParameter(ctx context.Context, se *session) (bool, error) { return false, nil } +type tableBasicInfo struct { + SQL string + id int64 +} + var ( errResultIsEmpty = dbterror.ClassExecutor.NewStd(errno.ErrResultIsEmpty) // DDLJobTables is a list of tables definitions used in concurrent DDL. - DDLJobTables = []struct { - SQL string - id int64 - }{ + DDLJobTables = []tableBasicInfo{ {ddl.JobTableSQL, ddl.JobTableID}, {ddl.ReorgTableSQL, ddl.ReorgTableID}, {ddl.HistoryTableSQL, ddl.HistoryTableID}, } // BackfillTables is a list of tables definitions used in dist reorg DDL. - BackfillTables = []struct { - SQL string - id int64 - }{ + BackfillTables = []tableBasicInfo{ {ddl.BackfillTableSQL, ddl.BackfillTableID}, {ddl.BackfillHistoryTableSQL, ddl.BackfillHistoryTableID}, } @@ -3091,7 +3090,7 @@ func splitAndScatterTable(store kv.Storage, tableIDs []int64) { } } -// InitDDLJobTables is to create tidb_ddl_job, tidb_ddl_reorg and tidb_ddl_history, or tidb_ddl_backfill and tidb_ddl_backfill_history. +// InitDDLJobTables is to create tidb_ddl_job, tidb_ddl_reorg, tidb_ddl_history, tidb_ddl_backfill and tidb_ddl_backfill_history. func InitDDLJobTables(store kv.Storage) error { return kv.RunInNewTxn(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), store, true, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) @@ -3103,41 +3102,56 @@ func InitDDLJobTables(store kv.Storage) error { if err != nil { return err } - tables := append(DDLJobTables, BackfillTables...) if exists { - tblExist, err := t.CheckTableExists(dbID, BackfillTables[0].id) - if err != nil || tblExist { - return errors.Trace(err) - } - tables = BackfillTables + return initBackfillJobTables(store, t, dbID) } - tableIDs := make([]int64, 0, len(tables)) - for _, tbl := range tables { - tableIDs = append(tableIDs, tbl.id) + + if err = createAndSplitTables(store, t, dbID, DDLJobTables); err != nil { + return err } - splitAndScatterTable(store, tableIDs) - p := parser.New() - for _, tbl := range tables { - stmt, err := p.ParseOneStmt(tbl.SQL, "", "") - if err != nil { - return errors.Trace(err) - } - tblInfo, err := ddl.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt)) - if err != nil { - return errors.Trace(err) - } - tblInfo.State = model.StatePublic - tblInfo.ID = tbl.id - tblInfo.UpdateTS = t.StartTS - err = t.CreateTableOrView(dbID, tblInfo) - if err != nil { - return errors.Trace(err) - } + if err = initBackfillJobTables(store, t, dbID); err != nil { + return err } return t.SetDDLTables() }) } +// initBackfillJobTables is to create tidb_ddl_backfill and tidb_ddl_backfill_history. +func initBackfillJobTables(store kv.Storage, t *meta.Meta, dbID int64) error { + tblExist, err := t.CheckTableExists(dbID, BackfillTables[0].id) + if err != nil || tblExist { + return errors.Trace(err) + } + return createAndSplitTables(store, t, dbID, BackfillTables) +} + +func createAndSplitTables(store kv.Storage, t *meta.Meta, dbID int64, tables []tableBasicInfo) error { + tableIDs := make([]int64, 0, len(tables)) + for _, tbl := range tables { + tableIDs = append(tableIDs, tbl.id) + } + splitAndScatterTable(store, tableIDs) + p := parser.New() + for _, tbl := range tables { + stmt, err := p.ParseOneStmt(tbl.SQL, "", "") + if err != nil { + return errors.Trace(err) + } + tblInfo, err := ddl.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt)) + if err != nil { + return errors.Trace(err) + } + tblInfo.State = model.StatePublic + tblInfo.ID = tbl.id + tblInfo.UpdateTS = t.StartTS + err = t.CreateTableOrView(dbID, tblInfo) + if err != nil { + return errors.Trace(err) + } + } + return nil +} + // InitMDLTable is to create tidb_mdl_info, which is used for metadata lock. func InitMDLTable(store kv.Storage) error { return kv.RunInNewTxn(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), store, true, func(ctx context.Context, txn kv.Transaction) error { From d2eca727f5c8bf8c3390740ce2c2a64d99e6e1e0 Mon Sep 17 00:00:00 2001 From: YangKeao Date: Tue, 13 Dec 2022 12:46:53 -0500 Subject: [PATCH 6/6] ttl: reschedule scan tasks after update task state (#39891) close pingcap/tidb#39890 --- ttl/ttlworker/job_manager.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/ttl/ttlworker/job_manager.go b/ttl/ttlworker/job_manager.go index 10e0e8751173e..cd3c1208f1f37 100644 --- a/ttl/ttlworker/job_manager.go +++ b/ttl/ttlworker/job_manager.go @@ -143,9 +143,13 @@ func (m *JobManager) jobLoop() error { } cancel() case <-updateScanTaskStateTicker: - m.updateTaskState() + if m.updateTaskState() { + m.rescheduleJobs(se, now) + } case <-m.notifyStateCh: - m.updateTaskState() + if m.updateTaskState() { + m.rescheduleJobs(se, now) + } case <-jobCheckTicker: m.checkFinishedJob(se, now) m.checkNotOwnJob() @@ -263,7 +267,8 @@ func (m *JobManager) resizeWorkers(workers []worker, count int, factory func() w return workers, nil, nil } -func (m *JobManager) updateTaskState() { +// updateTaskState polls the result from scan worker and returns whether there are result polled +func (m *JobManager) updateTaskState() bool { results := m.pollScanWorkerResults() for _, result := range results { job := findJobWithTableID(m.runningJobs, result.task.tbl.ID) @@ -276,6 +281,8 @@ func (m *JobManager) updateTaskState() { job.finishedScanTaskCounter += 1 job.scanTaskErr = multierr.Append(job.scanTaskErr, result.err) } + + return len(results) > 0 } func (m *JobManager) pollScanWorkerResults() []*ttlScanTaskExecResult {