diff --git a/ttl/cache/ttlstatus.go b/ttl/cache/ttlstatus.go index 1657105e6c3e7..f14c1a559ae95 100644 --- a/ttl/cache/ttlstatus.go +++ b/ttl/cache/ttlstatus.go @@ -16,7 +16,6 @@ package cache import ( "context" - "fmt" "time" "github.com/pingcap/tidb/sessionctx" @@ -43,8 +42,8 @@ const ( const selectFromTTLTableStatus = "SELECT LOW_PRIORITY table_id,parent_table_id,table_statistics,last_job_id,last_job_start_time,last_job_finish_time,last_job_ttl_expire,last_job_summary,current_job_id,current_job_owner_id,current_job_owner_addr,current_job_owner_hb_time,current_job_start_time,current_job_ttl_expire,current_job_state,current_job_status,current_job_status_update_time FROM mysql.tidb_ttl_table_status" // SelectFromTTLTableStatusWithID returns an SQL statement to get the table status from table id -func SelectFromTTLTableStatusWithID(tableID int64) string { - return selectFromTTLTableStatus + fmt.Sprintf(" WHERE table_id = %d", tableID) +func SelectFromTTLTableStatusWithID(tableID int64) (string, []interface{}) { + return selectFromTTLTableStatus + " WHERE table_id = %?", []interface{}{tableID} } // TableStatus contains the corresponding information in the system table `mysql.tidb_ttl_table_status` diff --git a/ttl/ttlworker/BUILD.bazel b/ttl/ttlworker/BUILD.bazel index 8d031e22e6e4a..527c782ed691e 100644 --- a/ttl/ttlworker/BUILD.bazel +++ b/ttl/ttlworker/BUILD.bazel @@ -51,6 +51,7 @@ go_test( flaky = True, deps = [ "//infoschema", + "//kv", "//parser/ast", "//parser/model", "//parser/mysql", diff --git a/ttl/ttlworker/job.go b/ttl/ttlworker/job.go index 1a2f351465027..8628e9a6c9746 100644 --- a/ttl/ttlworker/job.go +++ b/ttl/ttlworker/job.go @@ -17,7 +17,6 @@ package ttlworker import ( "context" "encoding/json" - "fmt" "sync" "time" @@ -29,20 +28,34 @@ import ( "go.uber.org/zap" ) -const updateJobCurrentStatusTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_status = '%s' WHERE table_id = %d AND current_job_status = '%s' AND current_job_id = '%s'" -const finishJobTemplate = "UPDATE mysql.tidb_ttl_table_status SET last_job_id = current_job_id, last_job_start_time = current_job_start_time, last_job_finish_time = '%s', last_job_ttl_expire = current_job_ttl_expire, last_job_summary = '%s', current_job_id = NULL, current_job_owner_id = NULL, current_job_owner_hb_time = NULL, current_job_start_time = NULL, current_job_ttl_expire = NULL, current_job_state = NULL, current_job_status = NULL, current_job_status_update_time = NULL WHERE table_id = %d AND current_job_id = '%s'" -const updateJobStateTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_state = '%s' WHERE table_id = %d AND current_job_id = '%s' AND current_job_owner_id = '%s'" +const updateJobCurrentStatusTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_status = %? WHERE table_id = %? AND current_job_status = %? AND current_job_id = %?" +const finishJobTemplate = `UPDATE mysql.tidb_ttl_table_status + SET last_job_id = current_job_id, + last_job_start_time = current_job_start_time, + last_job_finish_time = %?, + last_job_ttl_expire = current_job_ttl_expire, + last_job_summary = %?, + current_job_id = NULL, + current_job_owner_id = NULL, + current_job_owner_hb_time = NULL, + current_job_start_time = NULL, + current_job_ttl_expire = NULL, + current_job_state = NULL, + current_job_status = NULL, + current_job_status_update_time = NULL + WHERE table_id = %? AND current_job_id = %?` +const updateJobStateTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_state = %? WHERE table_id = %? AND current_job_id = %? AND current_job_owner_id = %?" -func updateJobCurrentStatusSQL(tableID int64, oldStatus cache.JobStatus, newStatus cache.JobStatus, jobID string) string { - return fmt.Sprintf(updateJobCurrentStatusTemplate, newStatus, tableID, oldStatus, jobID) +func updateJobCurrentStatusSQL(tableID int64, oldStatus cache.JobStatus, newStatus cache.JobStatus, jobID string) (string, []interface{}) { + return updateJobCurrentStatusTemplate, []interface{}{newStatus, tableID, oldStatus, jobID} } -func finishJobSQL(tableID int64, finishTime time.Time, summary string, jobID string) string { - return fmt.Sprintf(finishJobTemplate, finishTime.Format(timeFormat), summary, tableID, jobID) +func finishJobSQL(tableID int64, finishTime time.Time, summary string, jobID string) (string, []interface{}) { + return finishJobTemplate, []interface{}{finishTime.Format(timeFormat), summary, tableID, jobID} } -func updateJobState(tableID int64, currentJobID string, currentJobState string, currentJobOwnerID string) string { - return fmt.Sprintf(updateJobStateTemplate, currentJobState, tableID, currentJobID, currentJobOwnerID) +func updateJobState(tableID int64, currentJobID string, currentJobState string, currentJobOwnerID string) (string, []interface{}) { + return updateJobStateTemplate, []interface{}{currentJobState, tableID, currentJobID, currentJobOwnerID} } type ttlJob struct { @@ -76,9 +89,10 @@ func (job *ttlJob) changeStatus(ctx context.Context, se session.Session, status job.status = status job.statusMutex.Unlock() - _, err := se.ExecuteSQL(ctx, updateJobCurrentStatusSQL(job.tbl.ID, oldStatus, status, job.id)) + sql, args := updateJobCurrentStatusSQL(job.tbl.ID, oldStatus, status, job.id) + _, err := se.ExecuteSQL(ctx, sql, args...) if err != nil { - return errors.Trace(err) + return errors.Wrapf(err, "execute sql: %s", sql) } return nil @@ -89,9 +103,10 @@ func (job *ttlJob) updateState(ctx context.Context, se session.Session) error { if err != nil { logutil.Logger(job.ctx).Warn("fail to generate summary for ttl job", zap.Error(err)) } - _, err = se.ExecuteSQL(ctx, updateJobState(job.tbl.ID, job.id, summary, job.ownerID)) + sql, args := updateJobState(job.tbl.ID, job.id, summary, job.ownerID) + _, err = se.ExecuteSQL(ctx, sql, args...) if err != nil { - return errors.Trace(err) + return errors.Wrapf(err, "execute sql: %s", sql) } return nil @@ -115,9 +130,10 @@ func (job *ttlJob) finish(se session.Session, now time.Time) { } // at this time, the job.ctx may have been canceled (to cancel this job) // even when it's canceled, we'll need to update the states, so use another context - _, err = se.ExecuteSQL(context.TODO(), finishJobSQL(job.tbl.ID, now, summary, job.id)) + sql, args := finishJobSQL(job.tbl.ID, now, summary, job.id) + _, err = se.ExecuteSQL(context.TODO(), sql, args...) if err != nil { - logutil.Logger(job.ctx).Error("fail to finish a ttl job", zap.Error(err), zap.Int64("tableID", job.tbl.ID), zap.String("jobID", job.id)) + logutil.Logger(job.ctx).Error("fail to finish a ttl job", zap.Error(err), zap.Int64("tableID", job.tbl.ID), zap.String("jobID", job.id), zap.String("sql", sql), zap.Any("arguments", args)) } } diff --git a/ttl/ttlworker/job_manager.go b/ttl/ttlworker/job_manager.go index 58c47d05efe4f..25af41e46ca58 100644 --- a/ttl/ttlworker/job_manager.go +++ b/ttl/ttlworker/job_manager.go @@ -16,7 +16,6 @@ package ttlworker import ( "context" - "fmt" "time" "github.com/pingcap/errors" @@ -31,22 +30,30 @@ import ( "go.uber.org/zap" ) -const insertNewTableIntoStatusTemplate = "INSERT INTO mysql.tidb_ttl_table_status (table_id,parent_table_id) VALUES (%d, %d)" -const setTableStatusOwnerTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_id = UUID(), current_job_owner_id = '%s',current_job_start_time = '%s',current_job_status = 'waiting',current_job_status_update_time = '%s',current_job_ttl_expire = '%s',current_job_owner_hb_time = '%s' WHERE table_id = %d" -const updateHeartBeatTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_owner_hb_time = '%s' WHERE table_id = %d AND current_job_owner_id = '%s'" +const insertNewTableIntoStatusTemplate = "INSERT INTO mysql.tidb_ttl_table_status (table_id,parent_table_id) VALUES (%?, %?)" +const setTableStatusOwnerTemplate = `UPDATE mysql.tidb_ttl_table_status + SET current_job_id = UUID(), + current_job_owner_id = %?, + current_job_start_time = %?, + current_job_status = 'waiting', + current_job_status_update_time = %?, + current_job_ttl_expire = %?, + current_job_owner_hb_time = %? + WHERE table_id = %?` +const updateHeartBeatTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_owner_hb_time = %? WHERE table_id = %? AND current_job_owner_id = %?" const timeFormat = "2006-01-02 15:04:05" -func insertNewTableIntoStatusSQL(tableID int64, parentTableID int64) string { - return fmt.Sprintf(insertNewTableIntoStatusTemplate, tableID, parentTableID) +func insertNewTableIntoStatusSQL(tableID int64, parentTableID int64) (string, []interface{}) { + return insertNewTableIntoStatusTemplate, []interface{}{tableID, parentTableID} } -func setTableStatusOwnerSQL(tableID int64, now time.Time, currentJobTTLExpire time.Time, id string) string { - return fmt.Sprintf(setTableStatusOwnerTemplate, id, now.Format(timeFormat), now.Format(timeFormat), currentJobTTLExpire.Format(timeFormat), now.Format(timeFormat), tableID) +func setTableStatusOwnerSQL(tableID int64, now time.Time, currentJobTTLExpire time.Time, id string) (string, []interface{}) { + return setTableStatusOwnerTemplate, []interface{}{id, now.Format(timeFormat), now.Format(timeFormat), currentJobTTLExpire.Format(timeFormat), now.Format(timeFormat), tableID} } -func updateHeartBeatSQL(tableID int64, now time.Time, id string) string { - return fmt.Sprintf(updateHeartBeatTemplate, now.Format(timeFormat), tableID, id) +func updateHeartBeatSQL(tableID int64, now time.Time, id string) (string, []interface{}) { + return updateHeartBeatTemplate, []interface{}{now.Format(timeFormat), tableID, id} } // JobManager schedules and manages the ttl jobs on this instance @@ -503,19 +510,22 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table * var expireTime time.Time err := se.RunInTxn(ctx, func() error { - rows, err := se.ExecuteSQL(ctx, cache.SelectFromTTLTableStatusWithID(table.ID)) + sql, args := cache.SelectFromTTLTableStatusWithID(table.ID) + rows, err := se.ExecuteSQL(ctx, sql, args...) if err != nil { - return err + return errors.Wrapf(err, "execute sql: %s", sql) } if len(rows) == 0 { // cannot find the row, insert the status row - _, err = se.ExecuteSQL(ctx, insertNewTableIntoStatusSQL(table.ID, table.TableInfo.ID)) + sql, args := insertNewTableIntoStatusSQL(table.ID, table.TableInfo.ID) + _, err = se.ExecuteSQL(ctx, sql, args...) if err != nil { - return err + return errors.Wrapf(err, "execute sql: %s", sql) } - rows, err = se.ExecuteSQL(ctx, cache.SelectFromTTLTableStatusWithID(table.ID)) + sql, args = cache.SelectFromTTLTableStatusWithID(table.ID) + rows, err = se.ExecuteSQL(ctx, sql, args...) if err != nil { - return err + return errors.Wrapf(err, "execute sql: %s", sql) } if len(rows) == 0 { return errors.New("table status row still doesn't exist after insertion") @@ -534,9 +544,9 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table * return err } - _, err = se.ExecuteSQL(ctx, setTableStatusOwnerSQL(table.ID, now, expireTime, m.id)) - - return err + sql, args = setTableStatusOwnerSQL(table.ID, now, expireTime, m.id) + _, err = se.ExecuteSQL(ctx, sql, args...) + return errors.Wrapf(err, "execute sql: %s", sql) }) if err != nil { return nil, err @@ -599,9 +609,10 @@ func (m *JobManager) createNewJob(expireTime time.Time, now time.Time, table *ca func (m *JobManager) updateHeartBeat(ctx context.Context, se session.Session) error { now := se.Now() for _, job := range m.localJobs() { - _, err := se.ExecuteSQL(ctx, updateHeartBeatSQL(job.tbl.ID, now, m.id)) + sql, args := updateHeartBeatSQL(job.tbl.ID, now, m.id) + _, err := se.ExecuteSQL(ctx, sql, args...) if err != nil { - return errors.Trace(err) + return errors.Wrapf(err, "execute sql: %s", sql) } // also updates some internal state for this job err = job.updateState(ctx, se) diff --git a/ttl/ttlworker/job_manager_integration_test.go b/ttl/ttlworker/job_manager_integration_test.go index 8c299afcd48de..8d33a736d2b9d 100644 --- a/ttl/ttlworker/job_manager_integration_test.go +++ b/ttl/ttlworker/job_manager_integration_test.go @@ -21,6 +21,8 @@ import ( "testing" "time" + "github.com/pingcap/errors" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/model" dbsession "github.com/pingcap/tidb/session" @@ -35,10 +37,8 @@ import ( "go.uber.org/zap" ) -func TestParallelLockNewJob(t *testing.T) { - store := testkit.CreateMockStore(t) - - sessionFactory := func() session.Session { +func sessionFactory(t *testing.T, store kv.Storage) func() session.Session { + return func() session.Session { dbSession, err := dbsession.CreateSession4Test(store) require.NoError(t, err) se := session.NewSession(dbSession, dbSession, nil) @@ -50,6 +50,12 @@ func TestParallelLockNewJob(t *testing.T) { return se } +} + +func TestParallelLockNewJob(t *testing.T) { + store := testkit.CreateMockStore(t) + + sessionFactory := sessionFactory(t, store) storedTTLJobRunInterval := variable.TTLJobRunInterval.Load() variable.TTLJobRunInterval.Store(0) @@ -96,3 +102,24 @@ func TestParallelLockNewJob(t *testing.T) { successJob.Finish(se, time.Now()) } } + +func TestFinishJob(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + sessionFactory := sessionFactory(t, store) + + testTable := &cache.PhysicalTable{ID: 2, TableInfo: &model.TableInfo{ID: 1, TTLInfo: &model.TTLInfo{IntervalExprStr: "1", IntervalTimeUnit: int(ast.TimeUnitDay)}}} + + tk.MustExec("insert into mysql.tidb_ttl_table_status(table_id) values (2)") + + // finish with error + m := ttlworker.NewJobManager("test-id", nil, store) + se := sessionFactory() + job, err := m.LockNewJob(context.Background(), se, testTable, time.Now()) + require.NoError(t, err) + job.SetScanErr(errors.New(`"'an error message contains both single and double quote'"`)) + job.Finish(se, time.Now()) + + tk.MustQuery("select table_id, last_job_summary from mysql.tidb_ttl_table_status").Check(testkit.Rows("2 {\"total_rows\":0,\"success_rows\":0,\"error_rows\":0,\"total_scan_task\":1,\"scheduled_scan_task\":0,\"finished_scan_task\":0,\"scan_task_err\":\"\\\"'an error message contains both single and double quote'\\\"\"}")) +} diff --git a/ttl/ttlworker/job_manager_test.go b/ttl/ttlworker/job_manager_test.go index 6718c384543fe..1f649abb065af 100644 --- a/ttl/ttlworker/job_manager_test.go +++ b/ttl/ttlworker/job_manager_test.go @@ -156,6 +156,10 @@ func (j *ttlJob) ID() string { return j.id } +func (j *ttlJob) SetScanErr(err error) { + j.scanTaskErr = err +} + func newMockTTLJob(tbl *cache.PhysicalTable, status cache.JobStatus) *ttlJob { statistics := &ttlStatistics{} return &ttlJob{tbl: tbl, ctx: context.Background(), statistics: statistics, status: status, tasks: []*ttlScanTask{{ctx: context.Background(), tbl: tbl, statistics: statistics}}} @@ -216,8 +220,18 @@ func TestLockNewTable(t *testing.T) { testPhysicalTable := &cache.PhysicalTable{ID: 1, TableInfo: &model.TableInfo{ID: 1, TTLInfo: &model.TTLInfo{ColumnName: model.NewCIStr("test"), IntervalExprStr: "5 Year"}}} + type executeInfo struct { + sql string + args []interface{} + } + getExecuteInfo := func(sql string, args []interface{}) executeInfo { + return executeInfo{ + sql, + args, + } + } type sqlExecute struct { - sql string + executeInfo rows []chunk.Row err error @@ -231,47 +245,47 @@ func TestLockNewTable(t *testing.T) { }{ {"normal lock table", testPhysicalTable, []sqlExecute{ { - cache.SelectFromTTLTableStatusWithID(1), + getExecuteInfo(cache.SelectFromTTLTableStatusWithID(1)), newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil, }, { - setTableStatusOwnerSQL(1, now, expireTime, "test-id"), + getExecuteInfo(setTableStatusOwnerSQL(1, now, expireTime, "test-id")), nil, nil, }, { - updateStatusSQL, + getExecuteInfo(updateStatusSQL, nil), newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil, }, }, true, false}, {"select nothing", testPhysicalTable, []sqlExecute{ { - cache.SelectFromTTLTableStatusWithID(1), + getExecuteInfo(cache.SelectFromTTLTableStatusWithID(1)), nil, nil, }, { - insertNewTableIntoStatusSQL(1, 1), + getExecuteInfo(insertNewTableIntoStatusSQL(1, 1)), nil, nil, }, { - cache.SelectFromTTLTableStatusWithID(1), + getExecuteInfo(cache.SelectFromTTLTableStatusWithID(1)), newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil, }, { - setTableStatusOwnerSQL(1, now, expireTime, "test-id"), + getExecuteInfo(setTableStatusOwnerSQL(1, now, expireTime, "test-id")), nil, nil, }, { - updateStatusSQL, + getExecuteInfo(updateStatusSQL, nil), newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil, }, }, true, false}, {"return error", testPhysicalTable, []sqlExecute{ { - cache.SelectFromTTLTableStatusWithID(1), + getExecuteInfo(cache.SelectFromTTLTableStatusWithID(1)), newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil, }, { - setTableStatusOwnerSQL(1, now, expireTime, "test-id"), + getExecuteInfo(setTableStatusOwnerSQL(1, now, expireTime, "test-id")), nil, errors.New("test error message"), }, }, false, true}, @@ -287,6 +301,7 @@ func TestLockNewTable(t *testing.T) { se.executeSQL = func(ctx context.Context, sql string, args ...interface{}) (rows []chunk.Row, err error) { assert.Less(t, sqlCounter, len(c.sqls)) assert.Equal(t, sql, c.sqls[sqlCounter].sql) + assert.Equal(t, args, c.sqls[sqlCounter].args) rows = c.sqls[sqlCounter].rows err = c.sqls[sqlCounter].err @@ -509,11 +524,26 @@ func TestCheckFinishedJob(t *testing.T) { m.runningJobs[0].taskIter = 1 m.runningJobs[0].finishedScanTaskCounter = 1 - m.checkFinishedJob(se, se.Now()) + // meetArg records whether the sql statement uses the arg + meetArg := false + now := se.Now() + jobID := m.runningJobs[0].id + se.executeSQL = func(ctx context.Context, sql string, args ...interface{}) ([]chunk.Row, error) { + if len(args) > 0 { + meetArg = true + expectedSQL, expectedArgs := finishJobSQL(tbl.ID, now, "{\"total_rows\":1,\"success_rows\":1,\"error_rows\":0,\"total_scan_task\":1,\"scheduled_scan_task\":1,\"finished_scan_task\":1}", jobID) + assert.Equal(t, expectedSQL, sql) + assert.Equal(t, expectedArgs, args) + } + return nil, nil + } + m.checkFinishedJob(se, now) assert.Len(t, m.runningJobs, 0) + assert.Equal(t, true, meetArg) + se.executeSQL = nil // check timeout job - now := se.Now() + now = se.Now() createTime := now.Add(-20 * time.Hour) m = NewJobManager("test-id", newMockSessionPool(t, tbl), nil) m.runningJobs = []*ttlJob{ diff --git a/ttl/ttlworker/session_test.go b/ttl/ttlworker/session_test.go index 877fd7996eaa7..712c696fa5f35 100644 --- a/ttl/ttlworker/session_test.go +++ b/ttl/ttlworker/session_test.go @@ -179,7 +179,7 @@ func (s *mockSession) ExecuteSQL(ctx context.Context, sql string, args ...interf } if s.executeSQL != nil { - return s.executeSQL(ctx, sql, args) + return s.executeSQL(ctx, sql, args...) } return s.rows, s.execErr }