diff --git a/ttl/ttlworker/config.go b/ttl/ttlworker/config.go index 468150c3949a7..89ca9eedae010 100644 --- a/ttl/ttlworker/config.go +++ b/ttl/ttlworker/config.go @@ -32,7 +32,7 @@ const ttlJobTimeout = 6 * time.Hour const taskManagerLoopTickerInterval = time.Minute const ttlTaskHeartBeatTickerInterval = time.Minute -const ttlTaskGCInterval = time.Hour +const ttlGCInterval = time.Hour func getUpdateInfoSchemaCacheInterval() time.Duration { failpoint.Inject("update-info-schema-cache-interval", func(val failpoint.Value) time.Duration { diff --git a/ttl/ttlworker/job_manager.go b/ttl/ttlworker/job_manager.go index 132be2e626cde..0b427e64318ac 100644 --- a/ttl/ttlworker/job_manager.go +++ b/ttl/ttlworker/job_manager.go @@ -58,6 +58,8 @@ const taskGCTemplate = `DELETE task FROM ON task.job_id = job.current_job_id WHERE job.table_id IS NULL` +const ttlJobHistoryGCTemplate = `DELETE FROM mysql.tidb_ttl_job_history WHERE create_time < CURDATE() - INTERVAL 90 DAY` + const timeFormat = "2006-01-02 15:04:05" func insertNewTableIntoStatusSQL(tableID int64, parentTableID int64) (string, []interface{}) { @@ -143,7 +145,7 @@ func (m *JobManager) jobLoop() error { infoSchemaCacheUpdateTicker := time.Tick(m.infoSchemaCache.GetInterval()) tableStatusCacheUpdateTicker := time.Tick(m.tableStatusCache.GetInterval()) resizeWorkersTicker := time.Tick(getResizeWorkersInterval()) - taskGC := time.Tick(jobManagerLoopTickerInterval) + gcTicker := time.Tick(ttlGCInterval) scheduleJobTicker := time.Tick(jobManagerLoopTickerInterval) jobCheckTicker := time.Tick(jobManagerLoopTickerInterval) @@ -175,12 +177,9 @@ func (m *JobManager) jobLoop() error { if err != nil { logutil.Logger(m.ctx).Warn("fail to update table status cache", zap.Error(err)) } - case <-taskGC: - taskGCCtx, cancel := context.WithTimeout(m.ctx, ttlInternalSQLTimeout) - _, err = se.ExecuteSQL(taskGCCtx, taskGCTemplate) - if err != nil { - logutil.Logger(m.ctx).Warn("fail to gc redundant scan task", zap.Error(err)) - } + case <-gcTicker: + gcCtx, cancel := context.WithTimeout(m.ctx, ttlInternalSQLTimeout) + DoGC(gcCtx, se) cancel() // Job Schedule loop: case <-updateJobHeartBeatTicker: @@ -777,3 +776,14 @@ func summarizeTaskResult(tasks []*cache.TTLTask) (*TTLSummary, error) { summary.SummaryText = string(buf) return summary, nil } + +// DoGC deletes some old TTL job histories and redundant scan tasks +func DoGC(ctx context.Context, se session.Session) { + if _, err := se.ExecuteSQL(ctx, taskGCTemplate); err != nil { + logutil.Logger(ctx).Warn("fail to gc redundant scan task", zap.Error(err)) + } + + if _, err := se.ExecuteSQL(ctx, ttlJobHistoryGCTemplate); err != nil { + logutil.Logger(ctx).Warn("fail to gc ttl job history", zap.Error(err)) + } +} diff --git a/ttl/ttlworker/job_manager_integration_test.go b/ttl/ttlworker/job_manager_integration_test.go index 82b107b976b70..e2e864344fde3 100644 --- a/ttl/ttlworker/job_manager_integration_test.go +++ b/ttl/ttlworker/job_manager_integration_test.go @@ -490,3 +490,87 @@ func waitAndStopTTLManager(t *testing.T, dom *domain.Domain) { continue } } + +func TestGCScanTasks(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + addTableStatusRecord := func(tableID, parentTableID, curJobID int64) { + tk.MustExec("INSERT INTO mysql.tidb_ttl_table_status (table_id,parent_table_id) VALUES (?, ?)", tableID, parentTableID) + if curJobID == 0 { + return + } + + tk.MustExec(`UPDATE mysql.tidb_ttl_table_status + SET current_job_id = ?, + current_job_owner_id = '12345', + current_job_start_time = NOW(), + current_job_status = 'running', + current_job_status_update_time = NOW(), + current_job_ttl_expire = NOW(), + current_job_owner_hb_time = NOW() + WHERE table_id = ?`, curJobID, tableID) + } + + addScanTaskRecord := func(jobID, tableID, scanID int64) { + tk.MustExec(`INSERT INTO mysql.tidb_ttl_task SET + job_id = ?, + table_id = ?, + scan_id = ?, + expire_time = NOW(), + created_time = NOW()`, jobID, tableID, scanID) + } + + addTableStatusRecord(1, 1, 1) + addScanTaskRecord(1, 1, 1) + addScanTaskRecord(1, 1, 2) + addScanTaskRecord(2, 1, 1) + addScanTaskRecord(2, 1, 2) + addScanTaskRecord(3, 2, 1) + addScanTaskRecord(3, 2, 2) + + se := session.NewSession(tk.Session(), tk.Session(), func(_ session.Session) {}) + ttlworker.DoGC(context.TODO(), se) + tk.MustQuery("select job_id, scan_id from mysql.tidb_ttl_task order by job_id, scan_id asc").Check(testkit.Rows("1 1", "1 2")) +} + +func TestGCTTLHistory(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + addHistory := func(jobID, createdBeforeDays int) { + tk.MustExec(fmt.Sprintf(`INSERT INTO mysql.tidb_ttl_job_history ( + job_id, + table_id, + parent_table_id, + table_schema, + table_name, + partition_name, + create_time, + finish_time, + ttl_expire, + summary_text, + expired_rows, + deleted_rows, + error_delete_rows, + status + ) + VALUES + ( + %d, 1, 1, 'test', 't1', '', + CURDATE() - INTERVAL %d DAY, + CURDATE() - INTERVAL %d DAY + INTERVAL 1 HOUR, + CURDATE() - INTERVAL %d DAY, + "", 100, 100, 0, "finished" + )`, jobID, createdBeforeDays, createdBeforeDays, createdBeforeDays)) + } + + addHistory(1, 1) + addHistory(2, 30) + addHistory(3, 60) + addHistory(4, 89) + addHistory(5, 90) + addHistory(6, 91) + addHistory(7, 100) + se := session.NewSession(tk.Session(), tk.Session(), func(_ session.Session) {}) + ttlworker.DoGC(context.TODO(), se) + tk.MustQuery("select job_id from mysql.tidb_ttl_job_history order by job_id asc").Check(testkit.Rows("1", "2", "3", "4", "5")) +}