Skip to content

Commit

Permalink
Pass workflowTask object to completion methods of mutable state (#3897)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin authored Feb 3, 2023
1 parent 52d1ba0 commit 32a4686
Show file tree
Hide file tree
Showing 21 changed files with 208 additions and 264 deletions.
9 changes: 5 additions & 4 deletions service/history/historyEngine2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func (s *engine2Suite) TestRecordWorkflowTaskStartedIfTaskAlreadyCompleted() {
tl := "testTaskQueue"

ms := s.createExecutionStartedState(workflowExecution, tl, identity, true)
addWorkflowTaskCompletedEvent(ms, int64(2), int64(3), identity)
addWorkflowTaskCompletedEvent(&s.Suite, ms, int64(2), int64(3), identity)

wfMs := workflow.TestCloneToProto(ms)
gwmsResponse := &persistence.GetWorkflowExecutionResponse{State: wfMs}
Expand Down Expand Up @@ -555,7 +555,7 @@ func (s *engine2Suite) TestRecordActivityTaskStartedSuccess() {
activityInput := payloads.EncodeString("input1")

ms := s.createExecutionStartedState(workflowExecution, tl, identity, true)
workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(ms, int64(2), int64(3), identity)
workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(&s.Suite, ms, int64(2), int64(3), identity)
scheduledEvent, _ := addActivityTaskScheduledEvent(ms, workflowTaskCompletedEvent.EventId, activityID, activityType, tl, activityInput, 100*time.Second, 10*time.Second, 1*time.Second, 5*time.Second)

ms1 := workflow.TestCloneToProto(ms)
Expand Down Expand Up @@ -1002,6 +1002,7 @@ func (s *engine2Suite) TestRespondWorkflowTaskCompleted_StartChildWorkflow_Excee
GetMapper(tests.Namespace).
Return(&searchattribute.TestMapper{Namespace: tests.Namespace.String()}, nil).
AnyTimes()
s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any(), gomock.Any()).Return(tests.UpdateWorkflowExecutionResponse, nil)

s.historyEngine.shard.GetConfig().NumPendingChildExecutionsLimit = func(namespace string) int {
return 5
Expand Down Expand Up @@ -1651,7 +1652,7 @@ func (s *engine2Suite) TestRecordChildExecutionCompleted() {
wt := addWorkflowTaskScheduledEvent(ms)
workflowTasksStartEvent := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, "testTaskQueue", uuid.New())
wt.StartedEventID = workflowTasksStartEvent.GetEventId()
workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(ms, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, wt.StartedEventID, "some random identity")

initiatedEvent, _ := addStartChildWorkflowExecutionInitiatedEvent(ms, workflowTaskCompletedEvent.GetEventId(), uuid.New(),
tests.ChildNamespace, tests.ChildNamespaceID, childWorkflowID, childWorkflowType, childTaskQueueName, nil, 1*time.Second, 1*time.Second, 1*time.Second, enumspb.PARENT_CLOSE_POLICY_TERMINATE)
Expand Down Expand Up @@ -1831,7 +1832,7 @@ func (s *engine2Suite) TestVerifyChildExecutionCompletionRecorded_InitiatedEvent
wt := addWorkflowTaskScheduledEvent(ms)
workflowTasksStartEvent := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, taskQueueName, uuid.New())
wt.StartedEventID = workflowTasksStartEvent.GetEventId()
workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(ms, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
initiatedEvent, ci := addStartChildWorkflowExecutionInitiatedEvent(ms, workflowTaskCompletedEvent.GetEventId(), uuid.New(),
tests.ChildNamespace, tests.ChildNamespaceID, childWorkflowID, childWorkflowType, childTaskQueueName, nil, 1*time.Second, 1*time.Second, 1*time.Second, enumspb.PARENT_CLOSE_POLICY_TERMINATE)

Expand Down
86 changes: 45 additions & 41 deletions service/history/historyEngine_test.go

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions service/history/ndc/branch_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,7 @@ func (s *branchMgrSuite) TestFlushBufferedEvents() {
VersionHistories: versionHistories,
}).AnyTimes()
s.mockMutableState.EXPECT().AddWorkflowTaskFailedEvent(
workflowTask.ScheduledEventID,
workflowTask.StartedEventID,
workflowTask,
enumspb.WORKFLOW_TASK_FAILED_CAUSE_FAILOVER_CLOSE_COMMAND,
nil,
consts.IdentityHistoryService,
Expand Down
3 changes: 1 addition & 2 deletions service/history/ndc/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,7 @@ func (r *WorkflowImpl) failWorkflowTask(
}

if _, err := r.mutableState.AddWorkflowTaskFailedEvent(
workflowTask.ScheduledEventID,
workflowTask.StartedEventID,
workflowTask,
enumspb.WORKFLOW_TASK_FAILED_CAUSE_FAILOVER_CLOSE_COMMAND,
nil,
consts.IdentityHistoryService,
Expand Down
3 changes: 1 addition & 2 deletions service/history/ndc/workflow_resetter.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,8 +501,7 @@ func (r *workflowResetterImpl) failWorkflowTask(
}

_, err = resetMutableState.AddWorkflowTaskFailedEvent(
workflowTask.ScheduledEventID,
workflowTask.StartedEventID,
workflowTask,
enumspb.WORKFLOW_TASK_FAILED_CAUSE_RESET_WORKFLOW,
failure.NewResetWorkflowFailure(resetReason, nil),
consts.IdentityHistoryService,
Expand Down
9 changes: 3 additions & 6 deletions service/history/ndc/workflow_resetter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,8 +392,7 @@ func (s *workflowResetterSuite) TestFailWorkflowTask_WorkflowTaskScheduled() {
consts.IdentityHistoryService,
).Return(&historypb.HistoryEvent{}, workflowTaskStart, nil)
mutableState.EXPECT().AddWorkflowTaskFailedEvent(
workflowTaskStart.ScheduledEventID,
workflowTaskStart.StartedEventID,
workflowTaskStart,
enumspb.WORKFLOW_TASK_FAILED_CAUSE_RESET_WORKFLOW,
failure.NewResetWorkflowFailure(resetReason, nil),
consts.IdentityHistoryService,
Expand Down Expand Up @@ -433,8 +432,7 @@ func (s *workflowResetterSuite) TestFailWorkflowTask_WorkflowTaskStarted() {
}
mutableState.EXPECT().GetPendingWorkflowTask().Return(workflowTask, true).AnyTimes()
mutableState.EXPECT().AddWorkflowTaskFailedEvent(
workflowTask.ScheduledEventID,
workflowTask.StartedEventID,
workflowTask,
enumspb.WORKFLOW_TASK_FAILED_CAUSE_RESET_WORKFLOW,
failure.NewResetWorkflowFailure(resetReason, nil),
consts.IdentityHistoryService,
Expand Down Expand Up @@ -534,8 +532,7 @@ func (s *workflowResetterSuite) TestTerminateWorkflow() {
mutableState.EXPECT().GetNextEventID().Return(nextEventID).AnyTimes()
mutableState.EXPECT().GetInFlightWorkflowTask().Return(workflowTask, true)
mutableState.EXPECT().AddWorkflowTaskFailedEvent(
workflowTask.ScheduledEventID,
workflowTask.StartedEventID,
workflowTask,
enumspb.WORKFLOW_TASK_FAILED_CAUSE_FORCE_CLOSE_COMMAND,
nil,
consts.IdentityHistoryService,
Expand Down
3 changes: 1 addition & 2 deletions service/history/ndc/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,7 @@ func (s *workflowSuite) TestSuppressWorkflowBy_Terminate() {
}
s.mockMutableState.EXPECT().GetInFlightWorkflowTask().Return(inFlightWorkflowTask, true)
s.mockMutableState.EXPECT().AddWorkflowTaskFailedEvent(
inFlightWorkflowTask.ScheduledEventID,
inFlightWorkflowTask.StartedEventID,
inFlightWorkflowTask,
enumspb.WORKFLOW_TASK_FAILED_CAUSE_FAILOVER_CLOSE_COMMAND,
nil,
consts.IdentityHistoryService,
Expand Down
3 changes: 1 addition & 2 deletions service/history/timerQueueActiveTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,7 @@ func (t *timerQueueActiveTaskExecutor) executeWorkflowTaskTimeoutTask(
enumspb.TIMEOUT_TYPE_START_TO_CLOSE,
)
if _, err := mutableState.AddWorkflowTaskTimedOutEvent(
workflowTask.ScheduledEventID,
workflowTask.StartedEventID,
workflowTask,
); err != nil {
return err
}
Expand Down
30 changes: 15 additions & 15 deletions service/history/timerQueueActiveTaskExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessUserTimerTimeout_Fire() {
wt := addWorkflowTaskScheduledEvent(mutableState)
event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New())
wt.StartedEventID = event.GetEventId()
event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")

timerID := "timer"
timerTimeout := 2 * time.Second
Expand Down Expand Up @@ -302,7 +302,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessUserTimerTimeout_Noop() {
wt := addWorkflowTaskScheduledEvent(mutableState)
event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New())
wt.StartedEventID = event.GetEventId()
event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")

timerID := "timer"
timerTimeout := 2 * time.Second
Expand Down Expand Up @@ -366,7 +366,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessActivityTimeout_NoRetryPo
wt := addWorkflowTaskScheduledEvent(mutableState)
event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New())
wt.StartedEventID = event.GetEventId()
event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")

taskqueue := "taskqueue"
activityID := "activity"
Expand Down Expand Up @@ -445,7 +445,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessActivityTimeout_NoRetryPo
wt := addWorkflowTaskScheduledEvent(mutableState)
event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New())
wt.StartedEventID = event.GetEventId()
event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")

identity := "identity"
taskqueue := "taskqueue"
Expand Down Expand Up @@ -526,7 +526,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessActivityTimeout_RetryPoli
wt := addWorkflowTaskScheduledEvent(mutableState)
event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New())
wt.StartedEventID = event.GetEventId()
event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")

identity := "identity"
taskqueue := "taskqueue"
Expand Down Expand Up @@ -619,7 +619,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessActivityTimeout_RetryPoli
wt := addWorkflowTaskScheduledEvent(mutableState)
event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New())
wt.StartedEventID = event.GetEventId()
event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")

taskqueue := "taskqueue"
activityID := "activity"
Expand Down Expand Up @@ -705,7 +705,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessActivityTimeout_RetryPoli
wt := addWorkflowTaskScheduledEvent(mutableState)
event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New())
wt.StartedEventID = event.GetEventId()
event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")

identity := "identity"
taskqueue := "taskqueue"
Expand Down Expand Up @@ -794,7 +794,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessActivityTimeout_Heartbeat
wt := addWorkflowTaskScheduledEvent(mutableState)
event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New())
wt.StartedEventID = event.GetEventId()
event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")

identity := "identity"
taskqueue := "taskqueue"
Expand Down Expand Up @@ -1033,7 +1033,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestWorkflowBackoffTimer_Noop() {
wt := addWorkflowTaskScheduledEvent(mutableState)
event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New())
wt.StartedEventID = event.GetEventId()
event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")

timerTask := &tasks.WorkflowBackoffTimerTask{
WorkflowKey: definition.NewWorkflowKey(
Expand Down Expand Up @@ -1084,7 +1084,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestActivityRetryTimer_Fire() {
wt := addWorkflowTaskScheduledEvent(mutableState)
event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New())
wt.StartedEventID = event.GetEventId()
event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")

taskqueue := "taskqueue"
activityID := "activity"
Expand Down Expand Up @@ -1173,7 +1173,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestActivityRetryTimer_Noop() {
wt := addWorkflowTaskScheduledEvent(mutableState)
event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New())
wt.StartedEventID = event.GetEventId()
event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")

identity := "identity"
taskqueue := "taskqueue"
Expand Down Expand Up @@ -1250,7 +1250,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestWorkflowTimeout_Fire() {
wt := addWorkflowTaskScheduledEvent(mutableState)
startEvent := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New())
wt.StartedEventID = startEvent.GetEventId()
completionEvent := addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
completionEvent := addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")

timerTask := &tasks.WorkflowTimeoutTask{
WorkflowKey: definition.NewWorkflowKey(
Expand Down Expand Up @@ -1310,7 +1310,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestWorkflowTimeout_Retry() {
wt := addWorkflowTaskScheduledEvent(mutableState)
startEvent := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New())
wt.StartedEventID = startEvent.GetEventId()
completionEvent := addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
completionEvent := addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")

timerTask := &tasks.WorkflowTimeoutTask{
WorkflowKey: definition.NewWorkflowKey(
Expand Down Expand Up @@ -1367,7 +1367,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestWorkflowTimeout_Cron() {
wt := addWorkflowTaskScheduledEvent(mutableState)
startEvent := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New())
wt.StartedEventID = startEvent.GetEventId()
completionEvent := addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
completionEvent := addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")

timerTask := &tasks.WorkflowTimeoutTask{
WorkflowKey: definition.NewWorkflowKey(
Expand Down Expand Up @@ -1423,7 +1423,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestWorkflowTimeout_WorkflowExpired(
wt := addWorkflowTaskScheduledEvent(mutableState)
startEvent := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New())
wt.StartedEventID = startEvent.GetEventId()
completionEvent := addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")
completionEvent := addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")

timerTask := &tasks.WorkflowTimeoutTask{
WorkflowKey: definition.NewWorkflowKey(
Expand Down
Loading

0 comments on commit 32a4686

Please sign in to comment.