From f9bccbd12f81447565178c687f99952d36df0c4c Mon Sep 17 00:00:00 2001 From: YangKeao Date: Tue, 13 Dec 2022 12:46:53 -0500 Subject: [PATCH] ttl: reschedule scan tasks after update task state (#39891) close pingcap/tidb#39890 --- ttl/ttlworker/job_manager.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/ttl/ttlworker/job_manager.go b/ttl/ttlworker/job_manager.go index a141cfc046fb3..00e18d8c4a8fb 100644 --- a/ttl/ttlworker/job_manager.go +++ b/ttl/ttlworker/job_manager.go @@ -140,9 +140,13 @@ func (m *JobManager) jobLoop() error { } cancel() case <-updateScanTaskStateTicker: - m.updateTaskState() + if m.updateTaskState() { + m.rescheduleJobs(se, now) + } case <-m.notifyStateCh: - m.updateTaskState() + if m.updateTaskState() { + m.rescheduleJobs(se, now) + } case <-jobCheckTicker: m.checkFinishedJob(se, now) m.checkNotOwnJob() @@ -212,7 +216,8 @@ func (m *JobManager) resizeWorkers(workers []worker, count int, factory func() w return workers, nil } -func (m *JobManager) updateTaskState() { +// updateTaskState polls the result from scan worker and returns whether there are result polled +func (m *JobManager) updateTaskState() bool { results := m.pollScanWorkerResults() for _, result := range results { job := findJobWithTableID(m.runningJobs, result.task.tbl.ID) @@ -223,6 +228,8 @@ func (m *JobManager) updateTaskState() { job.scanTaskErr = multierr.Append(job.scanTaskErr, result.err) } } + + return len(results) > 0 } func (m *JobManager) pollScanWorkerResults() []*ttlScanTaskExecResult {