Skip to content

Commit

Permalink
ttl: reschedule scan tasks after update task state (#39891)
Browse files Browse the repository at this point in the history
close #39890
  • Loading branch information
YangKeao authored Dec 13, 2022
1 parent 98cef5a commit d2eca72
Showing 1 changed file with 10 additions and 3 deletions.
13 changes: 10 additions & 3 deletions ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,13 @@ func (m *JobManager) jobLoop() error {
}
cancel()
case <-updateScanTaskStateTicker:
m.updateTaskState()
if m.updateTaskState() {
m.rescheduleJobs(se, now)
}
case <-m.notifyStateCh:
m.updateTaskState()
if m.updateTaskState() {
m.rescheduleJobs(se, now)
}
case <-jobCheckTicker:
m.checkFinishedJob(se, now)
m.checkNotOwnJob()
Expand Down Expand Up @@ -263,7 +267,8 @@ func (m *JobManager) resizeWorkers(workers []worker, count int, factory func() w
return workers, nil, nil
}

func (m *JobManager) updateTaskState() {
// updateTaskState polls the result from scan worker and returns whether there are result polled
func (m *JobManager) updateTaskState() bool {
results := m.pollScanWorkerResults()
for _, result := range results {
job := findJobWithTableID(m.runningJobs, result.task.tbl.ID)
Expand All @@ -276,6 +281,8 @@ func (m *JobManager) updateTaskState() {
job.finishedScanTaskCounter += 1
job.scanTaskErr = multierr.Append(job.scanTaskErr, result.err)
}

return len(results) > 0
}

func (m *JobManager) pollScanWorkerResults() []*ttlScanTaskExecResult {
Expand Down

0 comments on commit d2eca72

Please sign in to comment.