diff --git a/domain/domain.go b/domain/domain.go index 08f49ed018799..e2b4372db0ff2 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -1062,10 +1062,6 @@ func (do *Domain) Init( return err } - do.wg.Run(func() { - do.runTTLJobManager(ctx) - }) - return nil } @@ -2457,18 +2453,21 @@ func (do *Domain) serverIDKeeper() { } } -func (do *Domain) runTTLJobManager(ctx context.Context) { - ttlJobManager := ttlworker.NewJobManager(do.ddl.GetID(), do.sysSessionPool, do.store) - ttlJobManager.Start() - do.ttlJobManager = ttlJobManager +// StartTTLJobManager creates and starts the ttl job manager +func (do *Domain) StartTTLJobManager() { + do.wg.Run(func() { + ttlJobManager := ttlworker.NewJobManager(do.ddl.GetID(), do.sysSessionPool, do.store) + do.ttlJobManager = ttlJobManager + ttlJobManager.Start() - <-do.exit + <-do.exit - ttlJobManager.Stop() - err := ttlJobManager.WaitStopped(ctx, 30*time.Second) - if err != nil { - logutil.BgLogger().Warn("fail to wait until the ttl job manager stop", zap.Error(err)) - } + ttlJobManager.Stop() + err := ttlJobManager.WaitStopped(context.Background(), 30*time.Second) + if err != nil { + logutil.BgLogger().Warn("fail to wait until the ttl job manager stop", zap.Error(err)) + } + }) } // TTLJobManager returns the ttl job manager on this domain diff --git a/session/BUILD.bazel b/session/BUILD.bazel index be3c8699ee6c8..dc3106abdfe63 100644 --- a/session/BUILD.bazel +++ b/session/BUILD.bazel @@ -65,6 +65,7 @@ go_library( "//table/temptable", "//tablecodec", "//telemetry", + "//ttl/ttlworker", "//types", "//types/parser_driver", "//util", diff --git a/session/session.go b/session/session.go index 9639db2240da7..9f707e19a1fda 100644 --- a/session/session.go +++ b/session/session.go @@ -86,6 +86,7 @@ import ( "github.com/pingcap/tidb/table/temptable" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/telemetry" + "github.com/pingcap/tidb/ttl/ttlworker" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/collate" @@ -3418,6 +3419,22 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { return nil, err } + // start TTL job manager after setup stats collector + // because TTL could modify a lot of columns, and need to trigger auto analyze + ttlworker.AttachStatsCollector = func(s sqlexec.SQLExecutor) sqlexec.SQLExecutor { + if s, ok := s.(*session); ok { + return attachStatsCollector(s, dom) + } + return s + } + ttlworker.DetachStatsCollector = func(s sqlexec.SQLExecutor) sqlexec.SQLExecutor { + if s, ok := s.(*session); ok { + return detachStatsCollector(s) + } + return s + } + dom.StartTTLJobManager() + analyzeCtxs, err := createSessions(store, analyzeConcurrencyQuota) if err != nil { return nil, err @@ -3523,6 +3540,26 @@ func createSessionWithOpt(store kv.Storage, opt *Opt) (*session, error) { return s, nil } +// attachStatsCollector attaches the stats collector in the dom for the session +func attachStatsCollector(s *session, dom *domain.Domain) *session { + if dom.StatsHandle() != nil && dom.StatsUpdating() { + s.statsCollector = dom.StatsHandle().NewSessionStatsCollector() + if GetIndexUsageSyncLease() > 0 { + s.idxUsageCollector = dom.StatsHandle().NewSessionIndexUsageCollector() + } + } + + return s +} + +// detachStatsCollector removes the stats collector in the session +func detachStatsCollector(s *session) *session { + s.statsCollector = nil + s.idxUsageCollector = nil + + return s +} + // CreateSessionWithDomain creates a new Session and binds it with a Domain. // We need this because when we start DDL in Domain, the DDL need a session // to change some system tables. But at that time, we have been already in diff --git a/ttl/ttlworker/BUILD.bazel b/ttl/ttlworker/BUILD.bazel index 527c782ed691e..1724cd89195e3 100644 --- a/ttl/ttlworker/BUILD.bazel +++ b/ttl/ttlworker/BUILD.bazel @@ -31,6 +31,7 @@ go_library( "//util/timeutil", "@com_github_ngaut_pools//:pools", "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", "@org_golang_x_time//rate", "@org_uber_go_multierr//:multierr", "@org_uber_go_zap//:zap", @@ -58,6 +59,7 @@ go_test( "//session", "//sessionctx", "//sessionctx/variable", + "//statistics/handle", "//testkit", "//ttl/cache", "//ttl/session", @@ -66,6 +68,7 @@ go_test( "//util/logutil", "@com_github_ngaut_pools//:pools", "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@org_golang_x_time//rate", diff --git a/ttl/ttlworker/config.go b/ttl/ttlworker/config.go index a92f362241fcf..55d005a82e6c2 100644 --- a/ttl/ttlworker/config.go +++ b/ttl/ttlworker/config.go @@ -16,6 +16,8 @@ package ttlworker import ( "time" + + "github.com/pingcap/failpoint" ) const jobManagerLoopTickerInterval = 10 * time.Second @@ -27,3 +29,24 @@ const ttlInternalSQLTimeout = 30 * time.Second const resizeWorkersInterval = 30 * time.Second const splitScanCount = 64 const ttlJobTimeout = 6 * time.Hour + +func getUpdateInfoSchemaCacheInterval() time.Duration { + failpoint.Inject("update-info-schema-cache-interval", func(val failpoint.Value) time.Duration { + return time.Duration(val.(int)) + }) + return updateInfoSchemaCacheInterval +} + +func getUpdateTTLTableStatusCacheInterval() time.Duration { + failpoint.Inject("update-status-table-cache-interval", func(val failpoint.Value) time.Duration { + return time.Duration(val.(int)) + }) + return updateTTLTableStatusCacheInterval +} + +func getResizeWorkersInterval() time.Duration { + failpoint.Inject("resize-workers-interval", func(val failpoint.Value) time.Duration { + return time.Duration(val.(int)) + }) + return resizeWorkersInterval +} diff --git a/ttl/ttlworker/job_manager.go b/ttl/ttlworker/job_manager.go index 25af41e46ca58..8a9a3c17d48cf 100644 --- a/ttl/ttlworker/job_manager.go +++ b/ttl/ttlworker/job_manager.go @@ -102,8 +102,8 @@ func NewJobManager(id string, sessPool sessionPool, store kv.Storage) (manager * manager.init(manager.jobLoop) manager.ctx = logutil.WithKeyValue(manager.ctx, "ttl-worker", "manager") - manager.infoSchemaCache = cache.NewInfoSchemaCache(updateInfoSchemaCacheInterval) - manager.tableStatusCache = cache.NewTableStatusCache(updateTTLTableStatusCacheInterval) + manager.infoSchemaCache = cache.NewInfoSchemaCache(getUpdateInfoSchemaCacheInterval()) + manager.tableStatusCache = cache.NewTableStatusCache(getUpdateTTLTableStatusCacheInterval()) return } @@ -125,7 +125,7 @@ func (m *JobManager) jobLoop() error { updateScanTaskStateTicker := time.Tick(jobManagerLoopTickerInterval) infoSchemaCacheUpdateTicker := time.Tick(m.infoSchemaCache.GetInterval()) tableStatusCacheUpdateTicker := time.Tick(m.tableStatusCache.GetInterval()) - resizeWorkersTicker := time.Tick(resizeWorkersInterval) + resizeWorkersTicker := time.Tick(getResizeWorkersInterval()) for { m.reportMetrics() now := se.Now() @@ -487,7 +487,7 @@ func (m *JobManager) couldTrySchedule(table *cache.TableStatus, now time.Time) b hbTime := table.CurrentJobOwnerHBTime // a more concrete value is `2 * max(updateTTLTableStatusCacheInterval, jobManagerLoopTickerInterval)`, but the // `updateTTLTableStatusCacheInterval` is greater than `jobManagerLoopTickerInterval` in most cases. - if hbTime.Add(2 * updateTTLTableStatusCacheInterval).Before(now) { + if hbTime.Add(2 * getUpdateTTLTableStatusCacheInterval()).Before(now) { logutil.Logger(m.ctx).Info("task heartbeat has stopped", zap.Int64("tableID", table.TableID), zap.Time("hbTime", hbTime), zap.Time("now", now)) return true } diff --git a/ttl/ttlworker/job_manager_integration_test.go b/ttl/ttlworker/job_manager_integration_test.go index 8d33a736d2b9d..b4814bed9483a 100644 --- a/ttl/ttlworker/job_manager_integration_test.go +++ b/ttl/ttlworker/job_manager_integration_test.go @@ -22,11 +22,13 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/model" dbsession "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/statistics/handle" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/ttl/cache" "github.com/pingcap/tidb/ttl/session" @@ -123,3 +125,58 @@ func TestFinishJob(t *testing.T) { tk.MustQuery("select table_id, last_job_summary from mysql.tidb_ttl_table_status").Check(testkit.Rows("2 {\"total_rows\":0,\"success_rows\":0,\"error_rows\":0,\"total_scan_task\":1,\"scheduled_scan_task\":0,\"finished_scan_task\":0,\"scan_task_err\":\"\\\"'an error message contains both single and double quote'\\\"\"}")) } + +func TestTTLAutoAnalyze(t *testing.T) { + failpoint.Enable("github.com/pingcap/tidb/ttl/ttlworker/update-info-schema-cache-interval", fmt.Sprintf("return(%d)", time.Second)) + failpoint.Enable("github.com/pingcap/tidb/ttl/ttlworker/update-status-table-cache-interval", fmt.Sprintf("return(%d)", time.Second)) + failpoint.Enable("github.com/pingcap/tidb/ttl/ttlworker/resize-workers-interval", fmt.Sprintf("return(%d)", time.Second)) + originAutoAnalyzeMinCnt := handle.AutoAnalyzeMinCnt + handle.AutoAnalyzeMinCnt = 0 + defer func() { + handle.AutoAnalyzeMinCnt = originAutoAnalyzeMinCnt + }() + + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk.MustExec("create table t (id int, created_at datetime) ttl = `created_at` + interval 1 day") + + // insert ten rows, the 2,3,4,6,9,10 of them are expired + for i := 1; i <= 10; i++ { + t := time.Now() + if i%2 == 0 || i%3 == 0 { + t = t.Add(-time.Hour * 48) + } + + tk.MustExec("insert into t values(?, ?)", i, t.Format(time.RFC3339)) + } + // TODO: use a better way to pause and restart ttl worker after analyze the table to make it more stable + // but as the ttl worker takes several seconds to start, it's not too serious. + tk.MustExec("analyze table t") + rows := tk.MustQuery("show stats_meta").Rows() + require.Equal(t, rows[0][4], "0") + require.Equal(t, rows[0][5], "10") + + retryTime := 15 + retryInterval := time.Second * 2 + deleted := false + for retryTime >= 0 { + retryTime-- + time.Sleep(retryInterval) + + rows := tk.MustQuery("select count(*) from t").Rows() + count := rows[0][0].(string) + if count == "3" { + deleted = true + break + } + } + require.True(t, deleted, "ttl should remove expired rows") + + h := dom.StatsHandle() + is := dom.InfoSchema() + require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpAll)) + require.NoError(t, h.Update(is)) + require.True(t, h.HandleAutoAnalyze(is)) +} diff --git a/ttl/ttlworker/job_manager_test.go b/ttl/ttlworker/job_manager_test.go index 1f649abb065af..c218c7ee81a08 100644 --- a/ttl/ttlworker/job_manager_test.go +++ b/ttl/ttlworker/job_manager_test.go @@ -167,7 +167,8 @@ func newMockTTLJob(tbl *cache.PhysicalTable, status cache.JobStatus) *ttlJob { func TestReadyForNewJobTables(t *testing.T) { tbl := newMockTTLTbl(t, "t1") - m := NewJobManager("test-id", newMockSessionPool(t, tbl), nil) + m := NewJobManager("test-id", nil, nil) + m.sessPool = newMockSessionPool(t, tbl) se := newMockSession(t, tbl) cases := []struct { @@ -295,7 +296,8 @@ func TestLockNewTable(t *testing.T) { t.Run(c.name, func(t *testing.T) { tbl := newMockTTLTbl(t, "t1") - m := NewJobManager("test-id", newMockSessionPool(t, tbl), nil) + m := NewJobManager("test-id", nil, nil) + m.sessPool = newMockSessionPool(t, tbl) sqlCounter := 0 se := newMockSession(t, tbl) se.executeSQL = func(ctx context.Context, sql string, args ...interface{}) (rows []chunk.Row, err error) { @@ -333,7 +335,8 @@ func TestResizeWorkers(t *testing.T) { scanWorker1.Start() scanWorker2 := newMockScanWorker(t) - m := NewJobManager("test-id", newMockSessionPool(t, tbl), nil) + m := NewJobManager("test-id", nil, nil) + m.sessPool = newMockSessionPool(t, tbl) m.SetScanWorkers4Test([]worker{ scanWorker1, }) @@ -351,7 +354,8 @@ func TestResizeWorkers(t *testing.T) { scanWorker2 = newMockScanWorker(t) scanWorker2.Start() - m = NewJobManager("test-id", newMockSessionPool(t, tbl), nil) + m = NewJobManager("test-id", nil, nil) + m.sessPool = newMockSessionPool(t, tbl) m.SetScanWorkers4Test([]worker{ scanWorker1, scanWorker2, @@ -366,7 +370,8 @@ func TestResizeWorkers(t *testing.T) { scanWorker2 = newMockScanWorker(t) scanWorker2.Start() - m = NewJobManager("test-id", newMockSessionPool(t, tbl), nil) + m = NewJobManager("test-id", nil, nil) + m.sessPool = newMockSessionPool(t, tbl) m.SetScanWorkers4Test([]worker{ scanWorker1, scanWorker2, @@ -384,7 +389,8 @@ func TestLocalJobs(t *testing.T) { tbl1.ID = 1 tbl2 := newMockTTLTbl(t, "t2") tbl2.ID = 2 - m := NewJobManager("test-id", newMockSessionPool(t, tbl1, tbl2), nil) + m := NewJobManager("test-id", nil, nil) + m.sessPool = newMockSessionPool(t, tbl1, tbl2) m.runningJobs = []*ttlJob{{tbl: tbl1, id: "1", ctx: context.Background()}, {tbl: tbl2, id: "2", ctx: context.Background()}} m.tableStatusCache.Tables = map[int64]*cache.TableStatus{ @@ -410,7 +416,8 @@ func TestRescheduleJobs(t *testing.T) { scanWorker2.Start() scanWorker2.setOneRowResult(tbl, 2022) - m := NewJobManager("test-id", newMockSessionPool(t, tbl), nil) + m := NewJobManager("test-id", nil, nil) + m.sessPool = newMockSessionPool(t, tbl) m.SetScanWorkers4Test([]worker{ scanWorker1, scanWorker2, @@ -463,7 +470,8 @@ func TestRescheduleJobsOutOfWindow(t *testing.T) { scanWorker2.Start() scanWorker2.setOneRowResult(tbl, 2022) - m := NewJobManager("test-id", newMockSessionPool(t, tbl), nil) + m := NewJobManager("test-id", nil, nil) + m.sessPool = newMockSessionPool(t, tbl) m.SetScanWorkers4Test([]worker{ scanWorker1, scanWorker2, @@ -508,7 +516,8 @@ func TestCheckFinishedJob(t *testing.T) { se := newMockSession(t, tbl) // cancelled job will be regarded as finished - m := NewJobManager("test-id", newMockSessionPool(t, tbl), nil) + m := NewJobManager("test-id", nil, nil) + m.sessPool = newMockSessionPool(t, tbl) m.runningJobs = []*ttlJob{newMockTTLJob(tbl, cache.JobStatusCancelled)} m.checkFinishedJob(se, se.Now()) assert.Len(t, m.runningJobs, 0) @@ -517,7 +526,8 @@ func TestCheckFinishedJob(t *testing.T) { finishedStatistics := &ttlStatistics{} finishedStatistics.TotalRows.Store(1) finishedStatistics.SuccessRows.Store(1) - m = NewJobManager("test-id", newMockSessionPool(t, tbl), nil) + m = NewJobManager("test-id", nil, nil) + m.sessPool = newMockSessionPool(t, tbl) m.runningJobs = []*ttlJob{newMockTTLJob(tbl, cache.JobStatusRunning)} m.runningJobs[0].statistics = finishedStatistics m.runningJobs[0].tasks[0].statistics = finishedStatistics @@ -545,7 +555,8 @@ func TestCheckFinishedJob(t *testing.T) { // check timeout job now = se.Now() createTime := now.Add(-20 * time.Hour) - m = NewJobManager("test-id", newMockSessionPool(t, tbl), nil) + m = NewJobManager("test-id", nil, nil) + m.sessPool = newMockSessionPool(t, tbl) m.runningJobs = []*ttlJob{ { ctx: context.Background(), diff --git a/ttl/ttlworker/session.go b/ttl/ttlworker/session.go index d1500b0e533ee..aa83f195aa43e 100644 --- a/ttl/ttlworker/session.go +++ b/ttl/ttlworker/session.go @@ -33,6 +33,27 @@ import ( "go.uber.org/zap" ) +// The following two functions are using `sqlexec.SQLExecutor` to represent session +// which is actually not correct. It's a work around for the cyclic dependency problem. +// It actually doesn't accept arbitrary SQLExecutor, but just `*session.session`, which means +// you cannot pass the `(ttl/session).Session` into it. +// Use `sqlexec.SQLExecutor` and `sessionctx.Session` or another other interface (including +// `interface{}`) here is the same, I just pick one small enough interface. +// Also, we cannot use the functions in `session/session.go` (to avoid cyclic dependency), so +// registering function here is really needed. + +// AttachStatsCollector attaches the stats collector for the session. +// this function is registered in BootstrapSession in /session/session.go +var AttachStatsCollector = func(s sqlexec.SQLExecutor) sqlexec.SQLExecutor { + return s +} + +// DetachStatsCollector removes the stats collector for the session +// this function is registered in BootstrapSession in /session/session.go +var DetachStatsCollector = func(s sqlexec.SQLExecutor) sqlexec.SQLExecutor { + return s +} + type sessionPool interface { Get() (pools.Resource, error) Put(pools.Resource) @@ -80,9 +101,13 @@ func getSession(pool sessionPool) (session.Session, error) { terror.Log(err) } + DetachStatsCollector(exec) + pool.Put(resource) }) + exec = AttachStatsCollector(exec) + // store and set the retry limit to 0 _, err = se.ExecuteSQL(context.Background(), "set tidb_retry_limit=0") if err != nil {