Skip to content

Commit

Permalink
ttl, domain: setup a customized session pool with stats collector (#4…
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKeao authored Dec 29, 2022
1 parent cf34941 commit 1a7b395
Show file tree
Hide file tree
Showing 9 changed files with 185 additions and 29 deletions.
27 changes: 13 additions & 14 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1062,10 +1062,6 @@ func (do *Domain) Init(
return err
}

do.wg.Run(func() {
do.runTTLJobManager(ctx)
})

return nil
}

Expand Down Expand Up @@ -2457,18 +2453,21 @@ func (do *Domain) serverIDKeeper() {
}
}

func (do *Domain) runTTLJobManager(ctx context.Context) {
ttlJobManager := ttlworker.NewJobManager(do.ddl.GetID(), do.sysSessionPool, do.store)
ttlJobManager.Start()
do.ttlJobManager = ttlJobManager
// StartTTLJobManager creates and starts the ttl job manager
func (do *Domain) StartTTLJobManager() {
do.wg.Run(func() {
ttlJobManager := ttlworker.NewJobManager(do.ddl.GetID(), do.sysSessionPool, do.store)
do.ttlJobManager = ttlJobManager
ttlJobManager.Start()

<-do.exit
<-do.exit

ttlJobManager.Stop()
err := ttlJobManager.WaitStopped(ctx, 30*time.Second)
if err != nil {
logutil.BgLogger().Warn("fail to wait until the ttl job manager stop", zap.Error(err))
}
ttlJobManager.Stop()
err := ttlJobManager.WaitStopped(context.Background(), 30*time.Second)
if err != nil {
logutil.BgLogger().Warn("fail to wait until the ttl job manager stop", zap.Error(err))
}
})
}

// TTLJobManager returns the ttl job manager on this domain
Expand Down
1 change: 1 addition & 0 deletions session/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ go_library(
"//table/temptable",
"//tablecodec",
"//telemetry",
"//ttl/ttlworker",
"//types",
"//types/parser_driver",
"//util",
Expand Down
37 changes: 37 additions & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ import (
"github.com/pingcap/tidb/table/temptable"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/telemetry"
"github.com/pingcap/tidb/ttl/ttlworker"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/collate"
Expand Down Expand Up @@ -3418,6 +3419,22 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) {
return nil, err
}

// start TTL job manager after setup stats collector
// because TTL could modify a lot of columns, and need to trigger auto analyze
ttlworker.AttachStatsCollector = func(s sqlexec.SQLExecutor) sqlexec.SQLExecutor {
if s, ok := s.(*session); ok {
return attachStatsCollector(s, dom)
}
return s
}
ttlworker.DetachStatsCollector = func(s sqlexec.SQLExecutor) sqlexec.SQLExecutor {
if s, ok := s.(*session); ok {
return detachStatsCollector(s)
}
return s
}
dom.StartTTLJobManager()

analyzeCtxs, err := createSessions(store, analyzeConcurrencyQuota)
if err != nil {
return nil, err
Expand Down Expand Up @@ -3523,6 +3540,26 @@ func createSessionWithOpt(store kv.Storage, opt *Opt) (*session, error) {
return s, nil
}

// attachStatsCollector attaches the stats collector in the dom for the session
func attachStatsCollector(s *session, dom *domain.Domain) *session {
if dom.StatsHandle() != nil && dom.StatsUpdating() {
s.statsCollector = dom.StatsHandle().NewSessionStatsCollector()
if GetIndexUsageSyncLease() > 0 {
s.idxUsageCollector = dom.StatsHandle().NewSessionIndexUsageCollector()
}
}

return s
}

// detachStatsCollector removes the stats collector in the session
func detachStatsCollector(s *session) *session {
s.statsCollector = nil
s.idxUsageCollector = nil

return s
}

// CreateSessionWithDomain creates a new Session and binds it with a Domain.
// We need this because when we start DDL in Domain, the DDL need a session
// to change some system tables. But at that time, we have been already in
Expand Down
3 changes: 3 additions & 0 deletions ttl/ttlworker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ go_library(
"//util/timeutil",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@org_golang_x_time//rate",
"@org_uber_go_multierr//:multierr",
"@org_uber_go_zap//:zap",
Expand Down Expand Up @@ -58,6 +59,7 @@ go_test(
"//session",
"//sessionctx",
"//sessionctx/variable",
"//statistics/handle",
"//testkit",
"//ttl/cache",
"//ttl/session",
Expand All @@ -66,6 +68,7 @@ go_test(
"//util/logutil",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_x_time//rate",
Expand Down
23 changes: 23 additions & 0 deletions ttl/ttlworker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package ttlworker

import (
"time"

"github.com/pingcap/failpoint"
)

const jobManagerLoopTickerInterval = 10 * time.Second
Expand All @@ -27,3 +29,24 @@ const ttlInternalSQLTimeout = 30 * time.Second
const resizeWorkersInterval = 30 * time.Second
const splitScanCount = 64
const ttlJobTimeout = 6 * time.Hour

func getUpdateInfoSchemaCacheInterval() time.Duration {
failpoint.Inject("update-info-schema-cache-interval", func(val failpoint.Value) time.Duration {
return time.Duration(val.(int))
})
return updateInfoSchemaCacheInterval
}

func getUpdateTTLTableStatusCacheInterval() time.Duration {
failpoint.Inject("update-status-table-cache-interval", func(val failpoint.Value) time.Duration {
return time.Duration(val.(int))
})
return updateTTLTableStatusCacheInterval
}

func getResizeWorkersInterval() time.Duration {
failpoint.Inject("resize-workers-interval", func(val failpoint.Value) time.Duration {
return time.Duration(val.(int))
})
return resizeWorkersInterval
}
8 changes: 4 additions & 4 deletions ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ func NewJobManager(id string, sessPool sessionPool, store kv.Storage) (manager *
manager.init(manager.jobLoop)
manager.ctx = logutil.WithKeyValue(manager.ctx, "ttl-worker", "manager")

manager.infoSchemaCache = cache.NewInfoSchemaCache(updateInfoSchemaCacheInterval)
manager.tableStatusCache = cache.NewTableStatusCache(updateTTLTableStatusCacheInterval)
manager.infoSchemaCache = cache.NewInfoSchemaCache(getUpdateInfoSchemaCacheInterval())
manager.tableStatusCache = cache.NewTableStatusCache(getUpdateTTLTableStatusCacheInterval())

return
}
Expand All @@ -125,7 +125,7 @@ func (m *JobManager) jobLoop() error {
updateScanTaskStateTicker := time.Tick(jobManagerLoopTickerInterval)
infoSchemaCacheUpdateTicker := time.Tick(m.infoSchemaCache.GetInterval())
tableStatusCacheUpdateTicker := time.Tick(m.tableStatusCache.GetInterval())
resizeWorkersTicker := time.Tick(resizeWorkersInterval)
resizeWorkersTicker := time.Tick(getResizeWorkersInterval())
for {
m.reportMetrics()
now := se.Now()
Expand Down Expand Up @@ -487,7 +487,7 @@ func (m *JobManager) couldTrySchedule(table *cache.TableStatus, now time.Time) b
hbTime := table.CurrentJobOwnerHBTime
// a more concrete value is `2 * max(updateTTLTableStatusCacheInterval, jobManagerLoopTickerInterval)`, but the
// `updateTTLTableStatusCacheInterval` is greater than `jobManagerLoopTickerInterval` in most cases.
if hbTime.Add(2 * updateTTLTableStatusCacheInterval).Before(now) {
if hbTime.Add(2 * getUpdateTTLTableStatusCacheInterval()).Before(now) {
logutil.Logger(m.ctx).Info("task heartbeat has stopped", zap.Int64("tableID", table.TableID), zap.Time("hbTime", hbTime), zap.Time("now", now))
return true
}
Expand Down
57 changes: 57 additions & 0 deletions ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
dbsession "github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/ttl/cache"
"github.com/pingcap/tidb/ttl/session"
Expand Down Expand Up @@ -123,3 +125,58 @@ func TestFinishJob(t *testing.T) {

tk.MustQuery("select table_id, last_job_summary from mysql.tidb_ttl_table_status").Check(testkit.Rows("2 {\"total_rows\":0,\"success_rows\":0,\"error_rows\":0,\"total_scan_task\":1,\"scheduled_scan_task\":0,\"finished_scan_task\":0,\"scan_task_err\":\"\\\"'an error message contains both single and double quote'\\\"\"}"))
}

func TestTTLAutoAnalyze(t *testing.T) {
failpoint.Enable("github.com/pingcap/tidb/ttl/ttlworker/update-info-schema-cache-interval", fmt.Sprintf("return(%d)", time.Second))
failpoint.Enable("github.com/pingcap/tidb/ttl/ttlworker/update-status-table-cache-interval", fmt.Sprintf("return(%d)", time.Second))
failpoint.Enable("github.com/pingcap/tidb/ttl/ttlworker/resize-workers-interval", fmt.Sprintf("return(%d)", time.Second))
originAutoAnalyzeMinCnt := handle.AutoAnalyzeMinCnt
handle.AutoAnalyzeMinCnt = 0
defer func() {
handle.AutoAnalyzeMinCnt = originAutoAnalyzeMinCnt
}()

store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("create table t (id int, created_at datetime) ttl = `created_at` + interval 1 day")

// insert ten rows, the 2,3,4,6,9,10 of them are expired
for i := 1; i <= 10; i++ {
t := time.Now()
if i%2 == 0 || i%3 == 0 {
t = t.Add(-time.Hour * 48)
}

tk.MustExec("insert into t values(?, ?)", i, t.Format(time.RFC3339))
}
// TODO: use a better way to pause and restart ttl worker after analyze the table to make it more stable
// but as the ttl worker takes several seconds to start, it's not too serious.
tk.MustExec("analyze table t")
rows := tk.MustQuery("show stats_meta").Rows()
require.Equal(t, rows[0][4], "0")
require.Equal(t, rows[0][5], "10")

retryTime := 15
retryInterval := time.Second * 2
deleted := false
for retryTime >= 0 {
retryTime--
time.Sleep(retryInterval)

rows := tk.MustQuery("select count(*) from t").Rows()
count := rows[0][0].(string)
if count == "3" {
deleted = true
break
}
}
require.True(t, deleted, "ttl should remove expired rows")

h := dom.StatsHandle()
is := dom.InfoSchema()
require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpAll))
require.NoError(t, h.Update(is))
require.True(t, h.HandleAutoAnalyze(is))
}
33 changes: 22 additions & 11 deletions ttl/ttlworker/job_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ func newMockTTLJob(tbl *cache.PhysicalTable, status cache.JobStatus) *ttlJob {

func TestReadyForNewJobTables(t *testing.T) {
tbl := newMockTTLTbl(t, "t1")
m := NewJobManager("test-id", newMockSessionPool(t, tbl), nil)
m := NewJobManager("test-id", nil, nil)
m.sessPool = newMockSessionPool(t, tbl)
se := newMockSession(t, tbl)

cases := []struct {
Expand Down Expand Up @@ -295,7 +296,8 @@ func TestLockNewTable(t *testing.T) {
t.Run(c.name, func(t *testing.T) {
tbl := newMockTTLTbl(t, "t1")

m := NewJobManager("test-id", newMockSessionPool(t, tbl), nil)
m := NewJobManager("test-id", nil, nil)
m.sessPool = newMockSessionPool(t, tbl)
sqlCounter := 0
se := newMockSession(t, tbl)
se.executeSQL = func(ctx context.Context, sql string, args ...interface{}) (rows []chunk.Row, err error) {
Expand Down Expand Up @@ -333,7 +335,8 @@ func TestResizeWorkers(t *testing.T) {
scanWorker1.Start()
scanWorker2 := newMockScanWorker(t)

m := NewJobManager("test-id", newMockSessionPool(t, tbl), nil)
m := NewJobManager("test-id", nil, nil)
m.sessPool = newMockSessionPool(t, tbl)
m.SetScanWorkers4Test([]worker{
scanWorker1,
})
Expand All @@ -351,7 +354,8 @@ func TestResizeWorkers(t *testing.T) {
scanWorker2 = newMockScanWorker(t)
scanWorker2.Start()

m = NewJobManager("test-id", newMockSessionPool(t, tbl), nil)
m = NewJobManager("test-id", nil, nil)
m.sessPool = newMockSessionPool(t, tbl)
m.SetScanWorkers4Test([]worker{
scanWorker1,
scanWorker2,
Expand All @@ -366,7 +370,8 @@ func TestResizeWorkers(t *testing.T) {
scanWorker2 = newMockScanWorker(t)
scanWorker2.Start()

m = NewJobManager("test-id", newMockSessionPool(t, tbl), nil)
m = NewJobManager("test-id", nil, nil)
m.sessPool = newMockSessionPool(t, tbl)
m.SetScanWorkers4Test([]worker{
scanWorker1,
scanWorker2,
Expand All @@ -384,7 +389,8 @@ func TestLocalJobs(t *testing.T) {
tbl1.ID = 1
tbl2 := newMockTTLTbl(t, "t2")
tbl2.ID = 2
m := NewJobManager("test-id", newMockSessionPool(t, tbl1, tbl2), nil)
m := NewJobManager("test-id", nil, nil)
m.sessPool = newMockSessionPool(t, tbl1, tbl2)

m.runningJobs = []*ttlJob{{tbl: tbl1, id: "1", ctx: context.Background()}, {tbl: tbl2, id: "2", ctx: context.Background()}}
m.tableStatusCache.Tables = map[int64]*cache.TableStatus{
Expand All @@ -410,7 +416,8 @@ func TestRescheduleJobs(t *testing.T) {
scanWorker2.Start()
scanWorker2.setOneRowResult(tbl, 2022)

m := NewJobManager("test-id", newMockSessionPool(t, tbl), nil)
m := NewJobManager("test-id", nil, nil)
m.sessPool = newMockSessionPool(t, tbl)
m.SetScanWorkers4Test([]worker{
scanWorker1,
scanWorker2,
Expand Down Expand Up @@ -463,7 +470,8 @@ func TestRescheduleJobsOutOfWindow(t *testing.T) {
scanWorker2.Start()
scanWorker2.setOneRowResult(tbl, 2022)

m := NewJobManager("test-id", newMockSessionPool(t, tbl), nil)
m := NewJobManager("test-id", nil, nil)
m.sessPool = newMockSessionPool(t, tbl)
m.SetScanWorkers4Test([]worker{
scanWorker1,
scanWorker2,
Expand Down Expand Up @@ -508,7 +516,8 @@ func TestCheckFinishedJob(t *testing.T) {
se := newMockSession(t, tbl)

// cancelled job will be regarded as finished
m := NewJobManager("test-id", newMockSessionPool(t, tbl), nil)
m := NewJobManager("test-id", nil, nil)
m.sessPool = newMockSessionPool(t, tbl)
m.runningJobs = []*ttlJob{newMockTTLJob(tbl, cache.JobStatusCancelled)}
m.checkFinishedJob(se, se.Now())
assert.Len(t, m.runningJobs, 0)
Expand All @@ -517,7 +526,8 @@ func TestCheckFinishedJob(t *testing.T) {
finishedStatistics := &ttlStatistics{}
finishedStatistics.TotalRows.Store(1)
finishedStatistics.SuccessRows.Store(1)
m = NewJobManager("test-id", newMockSessionPool(t, tbl), nil)
m = NewJobManager("test-id", nil, nil)
m.sessPool = newMockSessionPool(t, tbl)
m.runningJobs = []*ttlJob{newMockTTLJob(tbl, cache.JobStatusRunning)}
m.runningJobs[0].statistics = finishedStatistics
m.runningJobs[0].tasks[0].statistics = finishedStatistics
Expand Down Expand Up @@ -545,7 +555,8 @@ func TestCheckFinishedJob(t *testing.T) {
// check timeout job
now = se.Now()
createTime := now.Add(-20 * time.Hour)
m = NewJobManager("test-id", newMockSessionPool(t, tbl), nil)
m = NewJobManager("test-id", nil, nil)
m.sessPool = newMockSessionPool(t, tbl)
m.runningJobs = []*ttlJob{
{
ctx: context.Background(),
Expand Down
Loading

0 comments on commit 1a7b395

Please sign in to comment.