Skip to content

Commit

Permalink
disttask: fix dispatcher checksubtask flaky test (#46887)
Browse files Browse the repository at this point in the history
close #46884
  • Loading branch information
ywqzzy authored Sep 12, 2023
1 parent d583ed6 commit f85c9be
Showing 1 changed file with 12 additions and 10 deletions.
22 changes: 12 additions & 10 deletions disttask/framework/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,8 @@ func (*numberExampleDispatcherExt) OnTick(_ context.Context, _ *proto.Task) {
}

func (n *numberExampleDispatcherExt) OnNextSubtasksBatch(_ context.Context, _ dispatcher.TaskHandle, task *proto.Task) (metas [][]byte, err error) {
if task.State == proto.TaskStatePending {
task.Step = proto.StepInit
}
switch task.Step {
case proto.StepInit:
task.Step = proto.StepOne
for i := 0; i < subtaskCnt; i++ {
metas = append(metas, []byte{'1'})
}
Expand Down Expand Up @@ -264,6 +260,17 @@ func checkDispatch(t *testing.T, taskCnt int, isSucc bool, isCancel bool) {
return tasks
}

checkSubtaskCnt := func(tasks []*proto.Task, taskIDs []int64) {
for i, taskID := range taskIDs {
require.Equal(t, int64(i+1), tasks[i].ID)
require.Eventually(t, func() bool {
cnt, err := mgr.GetSubtaskInStatesCnt(taskID, proto.TaskStatePending)
require.NoError(t, err)
return int64(subtaskCnt) == cnt
}, time.Second, 50*time.Millisecond)
}
}

// Mock add tasks.
taskIDs := make([]int64, 0, taskCnt)
for i := 0; i < taskCnt; i++ {
Expand All @@ -274,12 +281,7 @@ func checkDispatch(t *testing.T, taskCnt int, isSucc bool, isCancel bool) {
// test OnNextSubtasksBatch.
checkGetRunningTaskCnt(taskCnt)
tasks := checkTaskRunningCnt()
for i, taskID := range taskIDs {
require.Equal(t, int64(i+1), tasks[i].ID)
subtasks, err := mgr.GetSubtaskInStatesCnt(taskID, proto.TaskStatePending)
require.NoError(t, err)
require.Equal(t, int64(subtaskCnt), subtasks, fmt.Sprintf("num:%d", i))
}
checkSubtaskCnt(tasks, taskIDs)
// test parallelism control
if taskCnt == 1 {
taskID, err := mgr.AddNewGlobalTask(fmt.Sprintf("%d", taskCnt), proto.TaskTypeExample, 0, nil)
Expand Down

0 comments on commit f85c9be

Please sign in to comment.