diff --git a/service/history/historyEngine2_test.go b/service/history/historyEngine2_test.go index 21e3f9f8c4b..f6fbcf5a0ac 100644 --- a/service/history/historyEngine2_test.go +++ b/service/history/historyEngine2_test.go @@ -381,7 +381,7 @@ func (s *engine2Suite) TestRecordWorkflowTaskStartedIfTaskAlreadyCompleted() { tl := "testTaskQueue" ms := s.createExecutionStartedState(workflowExecution, tl, identity, true) - addWorkflowTaskCompletedEvent(ms, int64(2), int64(3), identity) + addWorkflowTaskCompletedEvent(&s.Suite, ms, int64(2), int64(3), identity) wfMs := workflow.TestCloneToProto(ms) gwmsResponse := &persistence.GetWorkflowExecutionResponse{State: wfMs} @@ -555,7 +555,7 @@ func (s *engine2Suite) TestRecordActivityTaskStartedSuccess() { activityInput := payloads.EncodeString("input1") ms := s.createExecutionStartedState(workflowExecution, tl, identity, true) - workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(ms, int64(2), int64(3), identity) + workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(&s.Suite, ms, int64(2), int64(3), identity) scheduledEvent, _ := addActivityTaskScheduledEvent(ms, workflowTaskCompletedEvent.EventId, activityID, activityType, tl, activityInput, 100*time.Second, 10*time.Second, 1*time.Second, 5*time.Second) ms1 := workflow.TestCloneToProto(ms) @@ -1002,6 +1002,7 @@ func (s *engine2Suite) TestRespondWorkflowTaskCompleted_StartChildWorkflow_Excee GetMapper(tests.Namespace). Return(&searchattribute.TestMapper{Namespace: tests.Namespace.String()}, nil). AnyTimes() + s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any(), gomock.Any()).Return(tests.UpdateWorkflowExecutionResponse, nil) s.historyEngine.shard.GetConfig().NumPendingChildExecutionsLimit = func(namespace string) int { return 5 @@ -1651,7 +1652,7 @@ func (s *engine2Suite) TestRecordChildExecutionCompleted() { wt := addWorkflowTaskScheduledEvent(ms) workflowTasksStartEvent := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, "testTaskQueue", uuid.New()) wt.StartedEventID = workflowTasksStartEvent.GetEventId() - workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(ms, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, wt.StartedEventID, "some random identity") initiatedEvent, _ := addStartChildWorkflowExecutionInitiatedEvent(ms, workflowTaskCompletedEvent.GetEventId(), uuid.New(), tests.ChildNamespace, tests.ChildNamespaceID, childWorkflowID, childWorkflowType, childTaskQueueName, nil, 1*time.Second, 1*time.Second, 1*time.Second, enumspb.PARENT_CLOSE_POLICY_TERMINATE) @@ -1831,7 +1832,7 @@ func (s *engine2Suite) TestVerifyChildExecutionCompletionRecorded_InitiatedEvent wt := addWorkflowTaskScheduledEvent(ms) workflowTasksStartEvent := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = workflowTasksStartEvent.GetEventId() - workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(ms, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, wt.StartedEventID, "some random identity") initiatedEvent, ci := addStartChildWorkflowExecutionInitiatedEvent(ms, workflowTaskCompletedEvent.GetEventId(), uuid.New(), tests.ChildNamespace, tests.ChildNamespaceID, childWorkflowID, childWorkflowType, childTaskQueueName, nil, 1*time.Second, 1*time.Second, 1*time.Second, enumspb.PARENT_CLOSE_POLICY_TERMINATE) diff --git a/service/history/historyEngine_test.go b/service/history/historyEngine_test.go index f192d39e7e3..eb0c060b5f3 100644 --- a/service/history/historyEngine_test.go +++ b/service/history/historyEngine_test.go @@ -467,7 +467,7 @@ func (s *engineSuite) TestQueryWorkflow_RejectBasedOnCompleted() { wt := addWorkflowTaskScheduledEvent(ms) event := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, taskqueue, identity) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(ms, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, wt.StartedEventID, "some random identity") addCompleteWorkflowEvent(ms, event.GetEventId(), nil) wfMs := workflow.TestCloneToProto(ms) gweResponse := &persistence.GetWorkflowExecutionResponse{State: wfMs} @@ -501,7 +501,7 @@ func (s *engineSuite) TestQueryWorkflow_RejectBasedOnFailed() { wt := addWorkflowTaskScheduledEvent(ms) event := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, taskqueue, identity) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(ms, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, wt.StartedEventID, "some random identity") addFailWorkflowEvent(ms, event.GetEventId(), failure.NewServerFailure("failure reason", true), enumspb.RETRY_STATE_NON_RETRYABLE_FAILURE) wfMs := workflow.TestCloneToProto(ms) gweResponse := &persistence.GetWorkflowExecutionResponse{State: wfMs} @@ -548,7 +548,7 @@ func (s *engineSuite) TestQueryWorkflow_DirectlyThroughMatching() { addWorkflowExecutionStartedEvent(ms, execution, "wType", taskqueue, payloads.EncodeString("input"), 100*time.Second, 50*time.Second, 200*time.Second, identity) wt := addWorkflowTaskScheduledEvent(ms) startedEvent := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, taskqueue, identity) - addWorkflowTaskCompletedEvent(ms, wt.ScheduledEventID, startedEvent.EventId, identity) + addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, startedEvent.EventId, identity) wfMs := workflow.TestCloneToProto(ms) gweResponse := &persistence.GetWorkflowExecutionResponse{State: wfMs} @@ -586,7 +586,7 @@ func (s *engineSuite) TestQueryWorkflow_WorkflowTaskDispatch_Timeout() { addWorkflowExecutionStartedEvent(ms, execution, "wType", taskqueue, payloads.EncodeString("input"), 100*time.Second, 50*time.Second, 200*time.Second, identity) wt := addWorkflowTaskScheduledEvent(ms) startedEvent := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, taskqueue, identity) - addWorkflowTaskCompletedEvent(ms, wt.ScheduledEventID, startedEvent.EventId, identity) + addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, startedEvent.EventId, identity) wt = addWorkflowTaskScheduledEvent(ms) addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, taskqueue, identity) @@ -640,7 +640,7 @@ func (s *engineSuite) TestQueryWorkflow_ConsistentQueryBufferFull() { addWorkflowExecutionStartedEvent(ms, execution, "wType", taskqueue, payloads.EncodeString("input"), 100*time.Second, 50*time.Second, 200*time.Second, identity) wt := addWorkflowTaskScheduledEvent(ms) startedEvent := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, taskqueue, identity) - addWorkflowTaskCompletedEvent(ms, wt.ScheduledEventID, startedEvent.EventId, identity) + addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, startedEvent.EventId, identity) wt = addWorkflowTaskScheduledEvent(ms) addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, taskqueue, identity) @@ -686,7 +686,7 @@ func (s *engineSuite) TestQueryWorkflow_WorkflowTaskDispatch_Complete() { addWorkflowExecutionStartedEvent(ms, execution, "wType", taskqueue, payloads.EncodeString("input"), 100*time.Second, 50*time.Second, 200*time.Second, identity) wt := addWorkflowTaskScheduledEvent(ms) startedEvent := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, taskqueue, identity) - addWorkflowTaskCompletedEvent(ms, wt.ScheduledEventID, startedEvent.EventId, identity) + addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, startedEvent.EventId, identity) wt = addWorkflowTaskScheduledEvent(ms) addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, taskqueue, identity) @@ -757,7 +757,7 @@ func (s *engineSuite) TestQueryWorkflow_WorkflowTaskDispatch_Unblocked() { addWorkflowExecutionStartedEvent(ms, execution, "wType", taskqueue, payloads.EncodeString("input"), 100*time.Second, 50*time.Second, 200*time.Second, identity) wt := addWorkflowTaskScheduledEvent(ms) startedEvent := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, taskqueue, identity) - addWorkflowTaskCompletedEvent(ms, wt.ScheduledEventID, startedEvent.EventId, identity) + addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, startedEvent.EventId, identity) wt = addWorkflowTaskScheduledEvent(ms) addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, taskqueue, identity) @@ -941,7 +941,7 @@ func (s *engineSuite) TestRespondWorkflowTaskCompletedIfTaskCompleted() { addWorkflowExecutionStartedEvent(ms, we, "wType", tq, payloads.EncodeString("input"), 100*time.Second, 50*time.Second, 200*time.Second, identity) wt := addWorkflowTaskScheduledEvent(ms) startedEvent := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, tq, identity) - addWorkflowTaskCompletedEvent(ms, wt.ScheduledEventID, startedEvent.EventId, identity) + addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, startedEvent.EventId, identity) wfMs := workflow.TestCloneToProto(ms) gwmsResponse := &persistence.GetWorkflowExecutionResponse{State: wfMs} @@ -1021,7 +1021,7 @@ func (s *engineSuite) TestRespondWorkflowTaskCompletedConflictOnUpdate() { addWorkflowExecutionStartedEvent(ms, we, "wType", tq, payloads.EncodeString("input"), 100*time.Second, 100*time.Second, 100*time.Second, identity) wt1 := addWorkflowTaskScheduledEvent(ms) workflowTaskStartedEvent1 := addWorkflowTaskStartedEvent(ms, wt1.ScheduledEventID, tq, identity) - workflowTaskCompletedEvent1 := addWorkflowTaskCompletedEvent(ms, wt1.ScheduledEventID, workflowTaskStartedEvent1.EventId, identity) + workflowTaskCompletedEvent1 := addWorkflowTaskCompletedEvent(&s.Suite, ms, wt1.ScheduledEventID, workflowTaskStartedEvent1.EventId, identity) activity1ScheduledEvent, _ := addActivityTaskScheduledEvent(ms, workflowTaskCompletedEvent1.EventId, activity1ID, activity1Type, tq, activity1Input, 100*time.Second, 10*time.Second, 1*time.Second, 5*time.Second) activity2ScheduledEvent, _ := addActivityTaskScheduledEvent(ms, workflowTaskCompletedEvent1.EventId, activity2ID, activity2Type, tq, activity2Input, 100*time.Second, 10*time.Second, 1*time.Second, 5*time.Second) activity1StartedEvent := addActivityTaskStartedEvent(ms, activity1ScheduledEvent.EventId, identity) @@ -1179,7 +1179,7 @@ func (s *engineSuite) TestRespondWorkflowTaskCompletedCompleteWorkflowFailed() { addWorkflowExecutionStartedEvent(ms, we, "wType", tl, payloads.EncodeString("input"), 25*time.Second, 20*time.Second, 200*time.Second, identity) wt1 := addWorkflowTaskScheduledEvent(ms) workflowTaskStartedEvent1 := addWorkflowTaskStartedEvent(ms, wt1.ScheduledEventID, tl, identity) - workflowTaskCompletedEvent1 := addWorkflowTaskCompletedEvent(ms, wt1.ScheduledEventID, workflowTaskStartedEvent1.EventId, identity) + workflowTaskCompletedEvent1 := addWorkflowTaskCompletedEvent(&s.Suite, ms, wt1.ScheduledEventID, workflowTaskStartedEvent1.EventId, identity) activity1ScheduledEvent, _ := addActivityTaskScheduledEvent(ms, workflowTaskCompletedEvent1.EventId, activity1ID, activity1Type, tl, activity1Input, 100*time.Second, 10*time.Second, 1*time.Second, 5*time.Second) activity2ScheduledEvent, _ := addActivityTaskScheduledEvent(ms, workflowTaskCompletedEvent1.EventId, activity2ID, activity2Type, tl, activity2Input, 100*time.Second, 10*time.Second, 1*time.Second, 5*time.Second) activity1StartedEvent := addActivityTaskStartedEvent(ms, activity1ScheduledEvent.EventId, identity) @@ -1264,7 +1264,7 @@ func (s *engineSuite) TestRespondWorkflowTaskCompletedFailWorkflowFailed() { addWorkflowExecutionStartedEvent(ms, we, "wType", tl, payloads.EncodeString("input"), 25*time.Second, 20*time.Second, 200*time.Second, identity) wt1 := addWorkflowTaskScheduledEvent(ms) workflowTaskStartedEvent1 := addWorkflowTaskStartedEvent(ms, wt1.ScheduledEventID, tl, identity) - workflowTaskCompletedEvent1 := addWorkflowTaskCompletedEvent(ms, wt1.ScheduledEventID, workflowTaskStartedEvent1.EventId, identity) + workflowTaskCompletedEvent1 := addWorkflowTaskCompletedEvent(&s.Suite, ms, wt1.ScheduledEventID, workflowTaskStartedEvent1.EventId, identity) activity1ScheduledEvent, _ := addActivityTaskScheduledEvent(ms, workflowTaskCompletedEvent1.EventId, activity1ID, activity1Type, tl, activity1Input, 100*time.Second, 10*time.Second, 1*time.Second, 5*time.Second) activity2ScheduledEvent, _ := addActivityTaskScheduledEvent(ms, workflowTaskCompletedEvent1.EventId, activity2ID, activity2Type, tl, activity2Input, 100*time.Second, 10*time.Second, 1*time.Second, 5*time.Second) activity1StartedEvent := addActivityTaskStartedEvent(ms, activity1ScheduledEvent.EventId, identity) @@ -1344,7 +1344,7 @@ func (s *engineSuite) TestRespondWorkflowTaskCompletedBadCommandAttributes() { addWorkflowExecutionStartedEvent(ms, we, "wType", tl, payloads.EncodeString("input"), 25*time.Second, 20*time.Second, 200*time.Second, identity) wt1 := addWorkflowTaskScheduledEvent(ms) workflowTaskStartedEvent1 := addWorkflowTaskStartedEvent(ms, wt1.ScheduledEventID, tl, identity) - workflowTaskCompletedEvent1 := addWorkflowTaskCompletedEvent(ms, wt1.ScheduledEventID, workflowTaskStartedEvent1.EventId, identity) + workflowTaskCompletedEvent1 := addWorkflowTaskCompletedEvent(&s.Suite, ms, wt1.ScheduledEventID, workflowTaskStartedEvent1.EventId, identity) activity1ScheduledEvent, _ := addActivityTaskScheduledEvent(ms, workflowTaskCompletedEvent1.EventId, activity1ID, activity1Type, tl, activity1Input, 100*time.Second, 10*time.Second, 1*time.Second, 5*time.Second) activity1StartedEvent := addActivityTaskStartedEvent(ms, activity1ScheduledEvent.EventId, identity) addActivityTaskCompletedEvent(ms, activity1ScheduledEvent.EventId, @@ -2614,7 +2614,7 @@ func (s *engineSuite) TestRespondActivityTaskCompletedUpdateExecutionFailed() { addWorkflowExecutionStartedEvent(ms, we, "wType", tl, payloads.EncodeString("input"), 100*time.Second, 50*time.Second, 200*time.Second, identity) wt := addWorkflowTaskScheduledEvent(ms) workflowTaskStartedEvent := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, tl, identity) - workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) + workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) activityScheduledEvent, _ := addActivityTaskScheduledEvent(ms, workflowTaskCompletedEvent.EventId, activityID, activityType, tl, activityInput, 100*time.Second, 10*time.Second, 1*time.Second, 5*time.Second) addActivityTaskStartedEvent(ms, activityScheduledEvent.EventId, identity) @@ -2662,7 +2662,7 @@ func (s *engineSuite) TestRespondActivityTaskCompletedIfTaskCompleted() { addWorkflowExecutionStartedEvent(ms, we, "wType", tl, payloads.EncodeString("input"), 100*time.Second, 50*time.Second, 200*time.Second, identity) wt := addWorkflowTaskScheduledEvent(ms) workflowTaskStartedEvent := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, tl, identity) - workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) + workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) activityScheduledEvent, _ := addActivityTaskScheduledEvent(ms, workflowTaskCompletedEvent.EventId, activityID, activityType, tl, activityInput, 100*time.Second, 10*time.Second, 1*time.Second, 5*time.Second) activityStartedEvent := addActivityTaskStartedEvent(ms, activityScheduledEvent.EventId, identity) addActivityTaskCompletedEvent(ms, activityScheduledEvent.EventId, activityStartedEvent.EventId, @@ -2712,7 +2712,7 @@ func (s *engineSuite) TestRespondActivityTaskCompletedIfTaskNotStarted() { addWorkflowExecutionStartedEvent(ms, we, "wType", tl, payloads.EncodeString("input"), 100*time.Second, 50*time.Second, 200*time.Second, identity) wt := addWorkflowTaskScheduledEvent(ms) workflowTaskStartedEvent := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, tl, identity) - workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) + workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) addActivityTaskScheduledEvent(ms, workflowTaskCompletedEvent.EventId, activityID, activityType, tl, activityInput, 100*time.Second, 10*time.Second, 1*time.Second, 5*time.Second) wfMs := workflow.TestCloneToProto(ms) @@ -2758,7 +2758,7 @@ func (s *engineSuite) TestRespondActivityTaskCompletedConflictOnUpdate() { addWorkflowExecutionStartedEvent(ms, we, "wType", tl, payloads.EncodeString("input"), 100*time.Second, 100*time.Second, 100*time.Second, identity) wt := addWorkflowTaskScheduledEvent(ms) workflowTaskStartedEvent := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, tl, identity) - workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) + workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) activityScheduledEvent, _ := addActivityTaskScheduledEvent(ms, workflowTaskCompletedEvent.EventId, activityID, activityType, tl, activityInput, 100*time.Second, 10*time.Second, 1*time.Second, 5*time.Second) addActivityTaskStartedEvent(ms, activityScheduledEvent.EventId, identity) @@ -2805,7 +2805,7 @@ func (s *engineSuite) TestRespondActivityTaskCompletedSuccess() { addWorkflowExecutionStartedEvent(ms, we, "wType", tl, payloads.EncodeString("input"), 100*time.Second, 100*time.Second, 100*time.Second, identity) wt := addWorkflowTaskScheduledEvent(ms) workflowTaskStartedEvent := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, tl, identity) - workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) + workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) activityScheduledEvent, _ := addActivityTaskScheduledEvent(ms, workflowTaskCompletedEvent.EventId, activityID, activityType, tl, activityInput, 100*time.Second, 10*time.Second, 1*time.Second, 5*time.Second) addActivityTaskStartedEvent(ms, activityScheduledEvent.EventId, identity) @@ -2864,7 +2864,7 @@ func (s *engineSuite) TestRespondActivityTaskCompletedByIdSuccess() { addWorkflowExecutionStartedEvent(ms, we, "wType", tl, payloads.EncodeString("input"), 100*time.Second, 100*time.Second, 100*time.Second, identity) workflowTaskScheduledEvent := addWorkflowTaskScheduledEvent(ms) workflowTaskStartedEvent := addWorkflowTaskStartedEvent(ms, workflowTaskScheduledEvent.ScheduledEventID, tl, identity) - workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(ms, workflowTaskScheduledEvent.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) + workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(&s.Suite, ms, workflowTaskScheduledEvent.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) activityScheduledEvent, _ := addActivityTaskScheduledEvent(ms, workflowTaskCompletedEvent.EventId, activityID, activityType, tl, activityInput, 100*time.Second, 10*time.Second, 1*time.Second, 5*time.Second) addActivityTaskStartedEvent(ms, activityScheduledEvent.EventId, identity) @@ -3091,7 +3091,7 @@ func (s *engineSuite) TestRespondActivityTaskFailedUpdateExecutionFailed() { addWorkflowExecutionStartedEvent(ms, we, "wType", tl, payloads.EncodeString("input"), 100*time.Second, 100*time.Second, 100*time.Second, identity) wt := addWorkflowTaskScheduledEvent(ms) workflowTaskStartedEvent := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, tl, identity) - workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) + workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) activityScheduledEvent, _ := addActivityTaskScheduledEvent(ms, workflowTaskCompletedEvent.EventId, activityID, activityType, tl, activityInput, 100*time.Second, 10*time.Second, 1*time.Second, 5*time.Second) addActivityTaskStartedEvent(ms, activityScheduledEvent.EventId, identity) @@ -3138,7 +3138,7 @@ func (s *engineSuite) TestRespondActivityTaskFailedIfTaskCompleted() { addWorkflowExecutionStartedEvent(ms, we, "wType", tl, payloads.EncodeString("input"), 100*time.Second, 100*time.Second, 100*time.Second, identity) wt := addWorkflowTaskScheduledEvent(ms) workflowTaskStartedEvent := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, tl, identity) - workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) + workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) activityScheduledEvent, _ := addActivityTaskScheduledEvent(ms, workflowTaskCompletedEvent.EventId, activityID, activityType, tl, activityInput, 100*time.Second, 10*time.Second, 1*time.Second, 5*time.Second) activityStartedEvent := addActivityTaskStartedEvent(ms, activityScheduledEvent.EventId, identity) addActivityTaskFailedEvent(ms, activityScheduledEvent.EventId, activityStartedEvent.EventId, failure, enumspb.RETRY_STATE_NON_RETRYABLE_FAILURE, identity) @@ -3186,7 +3186,7 @@ func (s *engineSuite) TestRespondActivityTaskFailedIfTaskNotStarted() { addWorkflowExecutionStartedEvent(ms, we, "wType", tl, payloads.EncodeString("input"), 100*time.Second, 100*time.Second, 100*time.Second, identity) wt := addWorkflowTaskScheduledEvent(ms) workflowTaskStartedEvent := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, tl, identity) - workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) + workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) addActivityTaskScheduledEvent(ms, workflowTaskCompletedEvent.EventId, activityID, activityType, tl, activityInput, 100*time.Second, 10*time.Second, 1*time.Second, 5*time.Second) wfMs := workflow.TestCloneToProto(ms) @@ -3230,7 +3230,7 @@ func (s *engineSuite) TestRespondActivityTaskFailedConflictOnUpdate() { addWorkflowExecutionStartedEvent(ms, we, "wType", tl, payloads.EncodeString("input"), 100*time.Second, 100*time.Second, 100*time.Second, identity) wt := addWorkflowTaskScheduledEvent(ms) workflowTaskStartedEvent := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, tl, identity) - workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) + workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) activityScheduledEvent, _ := addActivityTaskScheduledEvent(ms, workflowTaskCompletedEvent.EventId, activityID, activityType, tl, activityInput, 100*time.Second, 10*time.Second, 1*time.Second, 5*time.Second) addActivityTaskStartedEvent(ms, activityScheduledEvent.EventId, identity) @@ -3276,7 +3276,7 @@ func (s *engineSuite) TestRespondActivityTaskFailedSuccess() { addWorkflowExecutionStartedEvent(ms, we, "wType", tl, payloads.EncodeString("input"), 100*time.Second, 100*time.Second, 100*time.Second, identity) wt := addWorkflowTaskScheduledEvent(ms) workflowTaskStartedEvent := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, tl, identity) - workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) + workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) activityScheduledEvent, _ := addActivityTaskScheduledEvent(ms, workflowTaskCompletedEvent.EventId, activityID, activityType, tl, activityInput, 100*time.Second, 10*time.Second, 1*time.Second, 5*time.Second) addActivityTaskStartedEvent(ms, activityScheduledEvent.EventId, identity) @@ -3334,7 +3334,7 @@ func (s *engineSuite) TestRespondActivityTaskFailedWithHeartbeatSuccess() { addWorkflowExecutionStartedEvent(ms, we, "wType", tl, payloads.EncodeString("input"), 100*time.Second, 100*time.Second, 100*time.Second, identity) wt := addWorkflowTaskScheduledEvent(ms) workflowTaskStartedEvent := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, tl, identity) - workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) + workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) activityScheduledEvent, activityInfo := addActivityTaskScheduledEvent(ms, workflowTaskCompletedEvent.EventId, activityID, activityType, tl, activityInput, 100*time.Second, 10*time.Second, 1*time.Second, 5*time.Second) addActivityTaskStartedEvent(ms, activityScheduledEvent.EventId, identity) @@ -3401,7 +3401,7 @@ func (s *engineSuite) TestRespondActivityTaskFailedByIdSuccess() { addWorkflowExecutionStartedEvent(ms, we, "wType", tl, payloads.EncodeString("input"), 100*time.Second, 100*time.Second, 100*time.Second, identity) workflowTaskScheduledEvent := addWorkflowTaskScheduledEvent(ms) workflowTaskStartedEvent := addWorkflowTaskStartedEvent(ms, workflowTaskScheduledEvent.ScheduledEventID, tl, identity) - workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(ms, workflowTaskScheduledEvent.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) + workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(&s.Suite, ms, workflowTaskScheduledEvent.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) activityScheduledEvent, _ := addActivityTaskScheduledEvent(ms, workflowTaskCompletedEvent.EventId, activityID, activityType, tl, activityInput, 100*time.Second, 10*time.Second, 1*time.Second, 5*time.Second) addActivityTaskStartedEvent(ms, activityScheduledEvent.EventId, identity) @@ -3460,7 +3460,7 @@ func (s *engineSuite) TestRecordActivityTaskHeartBeatSuccess_NoTimer() { addWorkflowExecutionStartedEvent(ms, we, "wType", tl, payloads.EncodeString("input"), 100*time.Second, 100*time.Second, 100*time.Second, identity) wt := addWorkflowTaskScheduledEvent(ms) workflowTaskStartedEvent := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, tl, identity) - workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) + workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) activityScheduledEvent, _ := addActivityTaskScheduledEvent(ms, workflowTaskCompletedEvent.EventId, activityID, activityType, tl, activityInput, 100*time.Second, 10*time.Second, 1*time.Second, 0*time.Second) addActivityTaskStartedEvent(ms, activityScheduledEvent.EventId, identity) @@ -3508,7 +3508,7 @@ func (s *engineSuite) TestRecordActivityTaskHeartBeatSuccess_TimerRunning() { addWorkflowExecutionStartedEvent(ms, we, "wType", tl, payloads.EncodeString("input"), 100*time.Second, 100*time.Second, 100*time.Second, identity) wt := addWorkflowTaskScheduledEvent(ms) workflowTaskStartedEvent := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, tl, identity) - workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) + workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) activityScheduledEvent, _ := addActivityTaskScheduledEvent(ms, workflowTaskCompletedEvent.EventId, activityID, activityType, tl, activityInput, 100*time.Second, 10*time.Second, 1*time.Second, 1*time.Second) addActivityTaskStartedEvent(ms, activityScheduledEvent.EventId, identity) @@ -3563,7 +3563,7 @@ func (s *engineSuite) TestRecordActivityTaskHeartBeatByIDSuccess() { addWorkflowExecutionStartedEvent(ms, we, "wType", tl, payloads.EncodeString("input"), 100*time.Second, 100*time.Second, 100*time.Second, identity) wt := addWorkflowTaskScheduledEvent(ms) workflowTaskStartedEvent := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, tl, identity) - workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) + workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) activityScheduledEvent, _ := addActivityTaskScheduledEvent(ms, workflowTaskCompletedEvent.EventId, activityID, activityType, tl, activityInput, 100*time.Second, 10*time.Second, 1*time.Second, 0*time.Second) addActivityTaskStartedEvent(ms, activityScheduledEvent.EventId, identity) @@ -3611,7 +3611,7 @@ func (s *engineSuite) TestRespondActivityTaskCanceled_Scheduled() { addWorkflowExecutionStartedEvent(ms, we, "wType", tl, payloads.EncodeString("input"), 100*time.Second, 100*time.Second, 100*time.Second, identity) wt := addWorkflowTaskScheduledEvent(ms) workflowTaskStartedEvent := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, tl, identity) - workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) + workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) addActivityTaskScheduledEvent(ms, workflowTaskCompletedEvent.EventId, activityID, activityType, tl, activityInput, 100*time.Second, 10*time.Second, 1*time.Second, 1*time.Second) wfMs := workflow.TestCloneToProto(ms) @@ -3656,7 +3656,7 @@ func (s *engineSuite) TestRespondActivityTaskCanceled_Started() { addWorkflowExecutionStartedEvent(ms, we, "wType", tl, payloads.EncodeString("input"), 100*time.Second, 100*time.Second, 100*time.Second, identity) wt := addWorkflowTaskScheduledEvent(ms) workflowTaskStartedEvent := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, tl, identity) - workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) + workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) activityScheduledEvent, _ := addActivityTaskScheduledEvent(ms, workflowTaskCompletedEvent.EventId, activityID, activityType, tl, activityInput, 100*time.Second, 10*time.Second, 1*time.Second, 1*time.Second) addActivityTaskStartedEvent(ms, activityScheduledEvent.EventId, identity) _, _, err := ms.AddActivityTaskCancelRequestedEvent(workflowTaskCompletedEvent.EventId, activityScheduledEvent.EventId, identity) @@ -3715,7 +3715,7 @@ func (s *engineSuite) TestRespondActivityTaskCanceledById_Started() { addWorkflowExecutionStartedEvent(ms, we, "wType", tl, payloads.EncodeString("input"), 100*time.Second, 100*time.Second, 100*time.Second, identity) workflowTaskScheduledEvent := addWorkflowTaskScheduledEvent(ms) workflowTaskStartedEvent := addWorkflowTaskStartedEvent(ms, workflowTaskScheduledEvent.ScheduledEventID, tl, identity) - workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(ms, workflowTaskScheduledEvent.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) + workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(&s.Suite, ms, workflowTaskScheduledEvent.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) activityScheduledEvent, _ := addActivityTaskScheduledEvent(ms, workflowTaskCompletedEvent.EventId, activityID, activityType, tl, activityInput, 100*time.Second, 10*time.Second, 1*time.Second, 1*time.Second) addActivityTaskStartedEvent(ms, activityScheduledEvent.EventId, identity) _, _, err := ms.AddActivityTaskCancelRequestedEvent(workflowTaskCompletedEvent.EventId, activityScheduledEvent.EventId, identity) @@ -3939,7 +3939,7 @@ func (s *engineSuite) TestRequestCancel_RespondWorkflowTaskCompleted_Scheduled() addWorkflowExecutionStartedEvent(ms, we, "wType", tl, payloads.EncodeString("input"), 100*time.Second, 100*time.Second, 100*time.Second, identity) wt := addWorkflowTaskScheduledEvent(ms) workflowTaskStartedEvent := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, tl, identity) - workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) + workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) _, aInfo := addActivityTaskScheduledEvent(ms, workflowTaskCompletedEvent.EventId, activityID, activityType, tl, activityInput, 100*time.Second, 10*time.Second, 1*time.Second, 1*time.Second) wt2 := addWorkflowTaskScheduledEvent(ms) addWorkflowTaskStartedEvent(ms, wt2.ScheduledEventID, tl, identity) @@ -4003,7 +4003,7 @@ func (s *engineSuite) TestRequestCancel_RespondWorkflowTaskCompleted_Started() { addWorkflowExecutionStartedEvent(ms, we, "wType", tl, payloads.EncodeString("input"), 100*time.Second, 100*time.Second, 100*time.Second, identity) wt := addWorkflowTaskScheduledEvent(ms) workflowTaskStartedEvent := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, tl, identity) - workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) + workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) activityScheduledEvent, _ := addActivityTaskScheduledEvent(ms, workflowTaskCompletedEvent.EventId, activityID, activityType, tl, activityInput, 100*time.Second, 10*time.Second, 1*time.Second, 0*time.Second) addActivityTaskStartedEvent(ms, activityScheduledEvent.EventId, identity) wt2 := addWorkflowTaskScheduledEvent(ms) @@ -4065,7 +4065,7 @@ func (s *engineSuite) TestRequestCancel_RespondWorkflowTaskCompleted_Completed() addWorkflowExecutionStartedEvent(ms, we, "wType", tl, payloads.EncodeString("input"), 100*time.Second, 100*time.Second, 100*time.Second, identity) wt := addWorkflowTaskScheduledEvent(ms) workflowTaskStartedEvent := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, tl, identity) - workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) + workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) _, aInfo := addActivityTaskScheduledEvent(ms, workflowTaskCompletedEvent.EventId, activityID, activityType, tl, activityInput, 100*time.Second, 10*time.Second, 1*time.Second, 0*time.Second) wt2 := addWorkflowTaskScheduledEvent(ms) addWorkflowTaskStartedEvent(ms, wt2.ScheduledEventID, tl, identity) @@ -4132,7 +4132,7 @@ func (s *engineSuite) TestRequestCancel_RespondWorkflowTaskCompleted_NoHeartBeat addWorkflowExecutionStartedEvent(ms, we, "wType", tl, payloads.EncodeString("input"), 100*time.Second, 100*time.Second, 100*time.Second, identity) wt := addWorkflowTaskScheduledEvent(ms) workflowTaskStartedEvent := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, tl, identity) - workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) + workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) activityScheduledEvent, _ := addActivityTaskScheduledEvent(ms, workflowTaskCompletedEvent.EventId, activityID, activityType, tl, activityInput, 100*time.Second, 10*time.Second, 1*time.Second, 0*time.Second) addActivityTaskStartedEvent(ms, activityScheduledEvent.EventId, identity) wt2 := addWorkflowTaskScheduledEvent(ms) @@ -4236,7 +4236,7 @@ func (s *engineSuite) TestRequestCancel_RespondWorkflowTaskCompleted_Success() { addWorkflowExecutionStartedEvent(ms, we, "wType", tl, payloads.EncodeString("input"), 100*time.Second, 100*time.Second, 100*time.Second, identity) wt := addWorkflowTaskScheduledEvent(ms) workflowTaskStartedEvent := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, tl, identity) - workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) + workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) activityScheduledEvent, _ := addActivityTaskScheduledEvent(ms, workflowTaskCompletedEvent.EventId, activityID, activityType, tl, activityInput, 100*time.Second, 10*time.Second, 1*time.Second, 1*time.Second) addActivityTaskStartedEvent(ms, activityScheduledEvent.EventId, identity) wt2 := addWorkflowTaskScheduledEvent(ms) @@ -4340,7 +4340,7 @@ func (s *engineSuite) TestRequestCancel_RespondWorkflowTaskCompleted_SuccessWith addWorkflowExecutionStartedEvent(ms, we, "wType", tl, payloads.EncodeString("input"), 100*time.Second, 100*time.Second, 100*time.Second, identity) wt := addWorkflowTaskScheduledEvent(ms) workflowTaskStartedEvent := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, tl, identity) - workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) + workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) activityScheduledEvent, _ := addActivityTaskScheduledEvent(ms, workflowTaskCompletedEvent.EventId, activityID, activityType, tl, activityInput, 100*time.Second, 10*time.Second, 1*time.Second, 1*time.Second) addActivityTaskStartedEvent(ms, activityScheduledEvent.EventId, identity) wt2 := addWorkflowTaskScheduledEvent(ms) @@ -4591,7 +4591,7 @@ func (s *engineSuite) TestUserTimer_RespondWorkflowTaskCompleted() { addWorkflowExecutionStartedEvent(ms, we, "wType", tl, payloads.EncodeString("input"), 100*time.Second, 100*time.Second, 100*time.Second, identity) wt := addWorkflowTaskScheduledEvent(ms) workflowTaskStartedEvent := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, tl, identity) - workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) + workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) addTimerStartedEvent(ms, workflowTaskCompletedEvent.EventId, timerID, 10*time.Second) wt2 := addWorkflowTaskScheduledEvent(ms) addWorkflowTaskStartedEvent(ms, wt2.ScheduledEventID, tl, identity) @@ -4715,7 +4715,7 @@ func (s *engineSuite) TestCancelTimer_RespondWorkflowTaskCompleted_TimerFired() addWorkflowExecutionStartedEvent(ms, we, "wType", tl, payloads.EncodeString("input"), 100*time.Second, 100*time.Second, 100*time.Second, identity) wt := addWorkflowTaskScheduledEvent(ms) workflowTaskStartedEvent := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, tl, identity) - workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) + workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, workflowTaskStartedEvent.EventId, identity) addTimerStartedEvent(ms, workflowTaskCompletedEvent.EventId, timerID, 10*time.Second) wt2 := addWorkflowTaskScheduledEvent(ms) addWorkflowTaskStartedEvent(ms, wt2.ScheduledEventID, tl, identity) @@ -5305,8 +5305,12 @@ func addWorkflowTaskStartedEventWithRequestID(ms workflow.MutableState, schedule return event } -func addWorkflowTaskCompletedEvent(ms workflow.MutableState, scheduledEventID, startedEventID int64, identity string) *historypb.HistoryEvent { - event, _ := ms.AddWorkflowTaskCompletedEvent(scheduledEventID, startedEventID, &workflowservice.RespondWorkflowTaskCompletedRequest{ +func addWorkflowTaskCompletedEvent(s *suite.Suite, ms workflow.MutableState, scheduledEventID, startedEventID int64, identity string) *historypb.HistoryEvent { + workflowTask, _ := ms.GetWorkflowTaskInfo(scheduledEventID) + s.NotNil(workflowTask) + s.Equal(startedEventID, workflowTask.StartedEventID) + + event, _ := ms.AddWorkflowTaskCompletedEvent(workflowTask, &workflowservice.RespondWorkflowTaskCompletedRequest{ Identity: identity, }, configs.DefaultHistoryMaxAutoResetPoints) diff --git a/service/history/ndc/branch_manager_test.go b/service/history/ndc/branch_manager_test.go index 6b72cf8da91..136e68fa2a8 100644 --- a/service/history/ndc/branch_manager_test.go +++ b/service/history/ndc/branch_manager_test.go @@ -244,8 +244,7 @@ func (s *branchMgrSuite) TestFlushBufferedEvents() { VersionHistories: versionHistories, }).AnyTimes() s.mockMutableState.EXPECT().AddWorkflowTaskFailedEvent( - workflowTask.ScheduledEventID, - workflowTask.StartedEventID, + workflowTask, enumspb.WORKFLOW_TASK_FAILED_CAUSE_FAILOVER_CLOSE_COMMAND, nil, consts.IdentityHistoryService, diff --git a/service/history/ndc/workflow.go b/service/history/ndc/workflow.go index b5c83676cf6..37399b71e1c 100644 --- a/service/history/ndc/workflow.go +++ b/service/history/ndc/workflow.go @@ -238,8 +238,7 @@ func (r *WorkflowImpl) failWorkflowTask( } if _, err := r.mutableState.AddWorkflowTaskFailedEvent( - workflowTask.ScheduledEventID, - workflowTask.StartedEventID, + workflowTask, enumspb.WORKFLOW_TASK_FAILED_CAUSE_FAILOVER_CLOSE_COMMAND, nil, consts.IdentityHistoryService, diff --git a/service/history/ndc/workflow_resetter.go b/service/history/ndc/workflow_resetter.go index c4ea5f1df66..0a9631490c3 100644 --- a/service/history/ndc/workflow_resetter.go +++ b/service/history/ndc/workflow_resetter.go @@ -501,8 +501,7 @@ func (r *workflowResetterImpl) failWorkflowTask( } _, err = resetMutableState.AddWorkflowTaskFailedEvent( - workflowTask.ScheduledEventID, - workflowTask.StartedEventID, + workflowTask, enumspb.WORKFLOW_TASK_FAILED_CAUSE_RESET_WORKFLOW, failure.NewResetWorkflowFailure(resetReason, nil), consts.IdentityHistoryService, diff --git a/service/history/ndc/workflow_resetter_test.go b/service/history/ndc/workflow_resetter_test.go index 2db4cfd5cda..e87c455fd8e 100644 --- a/service/history/ndc/workflow_resetter_test.go +++ b/service/history/ndc/workflow_resetter_test.go @@ -392,8 +392,7 @@ func (s *workflowResetterSuite) TestFailWorkflowTask_WorkflowTaskScheduled() { consts.IdentityHistoryService, ).Return(&historypb.HistoryEvent{}, workflowTaskStart, nil) mutableState.EXPECT().AddWorkflowTaskFailedEvent( - workflowTaskStart.ScheduledEventID, - workflowTaskStart.StartedEventID, + workflowTaskStart, enumspb.WORKFLOW_TASK_FAILED_CAUSE_RESET_WORKFLOW, failure.NewResetWorkflowFailure(resetReason, nil), consts.IdentityHistoryService, @@ -433,8 +432,7 @@ func (s *workflowResetterSuite) TestFailWorkflowTask_WorkflowTaskStarted() { } mutableState.EXPECT().GetPendingWorkflowTask().Return(workflowTask, true).AnyTimes() mutableState.EXPECT().AddWorkflowTaskFailedEvent( - workflowTask.ScheduledEventID, - workflowTask.StartedEventID, + workflowTask, enumspb.WORKFLOW_TASK_FAILED_CAUSE_RESET_WORKFLOW, failure.NewResetWorkflowFailure(resetReason, nil), consts.IdentityHistoryService, @@ -534,8 +532,7 @@ func (s *workflowResetterSuite) TestTerminateWorkflow() { mutableState.EXPECT().GetNextEventID().Return(nextEventID).AnyTimes() mutableState.EXPECT().GetInFlightWorkflowTask().Return(workflowTask, true) mutableState.EXPECT().AddWorkflowTaskFailedEvent( - workflowTask.ScheduledEventID, - workflowTask.StartedEventID, + workflowTask, enumspb.WORKFLOW_TASK_FAILED_CAUSE_FORCE_CLOSE_COMMAND, nil, consts.IdentityHistoryService, diff --git a/service/history/ndc/workflow_test.go b/service/history/ndc/workflow_test.go index d7f4bff1562..5dc794a526c 100644 --- a/service/history/ndc/workflow_test.go +++ b/service/history/ndc/workflow_test.go @@ -286,8 +286,7 @@ func (s *workflowSuite) TestSuppressWorkflowBy_Terminate() { } s.mockMutableState.EXPECT().GetInFlightWorkflowTask().Return(inFlightWorkflowTask, true) s.mockMutableState.EXPECT().AddWorkflowTaskFailedEvent( - inFlightWorkflowTask.ScheduledEventID, - inFlightWorkflowTask.StartedEventID, + inFlightWorkflowTask, enumspb.WORKFLOW_TASK_FAILED_CAUSE_FAILOVER_CLOSE_COMMAND, nil, consts.IdentityHistoryService, diff --git a/service/history/timerQueueActiveTaskExecutor.go b/service/history/timerQueueActiveTaskExecutor.go index e6648979c5f..eee952b713a 100644 --- a/service/history/timerQueueActiveTaskExecutor.go +++ b/service/history/timerQueueActiveTaskExecutor.go @@ -332,8 +332,7 @@ func (t *timerQueueActiveTaskExecutor) executeWorkflowTaskTimeoutTask( enumspb.TIMEOUT_TYPE_START_TO_CLOSE, ) if _, err := mutableState.AddWorkflowTaskTimedOutEvent( - workflowTask.ScheduledEventID, - workflowTask.StartedEventID, + workflowTask, ); err != nil { return err } diff --git a/service/history/timerQueueActiveTaskExecutor_test.go b/service/history/timerQueueActiveTaskExecutor_test.go index 3dd3f8f24ed..ba62fa68c6f 100644 --- a/service/history/timerQueueActiveTaskExecutor_test.go +++ b/service/history/timerQueueActiveTaskExecutor_test.go @@ -232,7 +232,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessUserTimerTimeout_Fire() { wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") timerID := "timer" timerTimeout := 2 * time.Second @@ -302,7 +302,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessUserTimerTimeout_Noop() { wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") timerID := "timer" timerTimeout := 2 * time.Second @@ -366,7 +366,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessActivityTimeout_NoRetryPo wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") taskqueue := "taskqueue" activityID := "activity" @@ -445,7 +445,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessActivityTimeout_NoRetryPo wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") identity := "identity" taskqueue := "taskqueue" @@ -526,7 +526,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessActivityTimeout_RetryPoli wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") identity := "identity" taskqueue := "taskqueue" @@ -619,7 +619,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessActivityTimeout_RetryPoli wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") taskqueue := "taskqueue" activityID := "activity" @@ -705,7 +705,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessActivityTimeout_RetryPoli wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") identity := "identity" taskqueue := "taskqueue" @@ -794,7 +794,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessActivityTimeout_Heartbeat wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") identity := "identity" taskqueue := "taskqueue" @@ -1033,7 +1033,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestWorkflowBackoffTimer_Noop() { wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") timerTask := &tasks.WorkflowBackoffTimerTask{ WorkflowKey: definition.NewWorkflowKey( @@ -1084,7 +1084,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestActivityRetryTimer_Fire() { wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") taskqueue := "taskqueue" activityID := "activity" @@ -1173,7 +1173,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestActivityRetryTimer_Noop() { wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") identity := "identity" taskqueue := "taskqueue" @@ -1250,7 +1250,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestWorkflowTimeout_Fire() { wt := addWorkflowTaskScheduledEvent(mutableState) startEvent := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = startEvent.GetEventId() - completionEvent := addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + completionEvent := addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") timerTask := &tasks.WorkflowTimeoutTask{ WorkflowKey: definition.NewWorkflowKey( @@ -1310,7 +1310,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestWorkflowTimeout_Retry() { wt := addWorkflowTaskScheduledEvent(mutableState) startEvent := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = startEvent.GetEventId() - completionEvent := addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + completionEvent := addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") timerTask := &tasks.WorkflowTimeoutTask{ WorkflowKey: definition.NewWorkflowKey( @@ -1367,7 +1367,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestWorkflowTimeout_Cron() { wt := addWorkflowTaskScheduledEvent(mutableState) startEvent := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = startEvent.GetEventId() - completionEvent := addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + completionEvent := addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") timerTask := &tasks.WorkflowTimeoutTask{ WorkflowKey: definition.NewWorkflowKey( @@ -1423,7 +1423,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestWorkflowTimeout_WorkflowExpired( wt := addWorkflowTaskScheduledEvent(mutableState) startEvent := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = startEvent.GetEventId() - completionEvent := addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + completionEvent := addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") timerTask := &tasks.WorkflowTimeoutTask{ WorkflowKey: definition.NewWorkflowKey( diff --git a/service/history/timerQueueStandbyTaskExecutor_test.go b/service/history/timerQueueStandbyTaskExecutor_test.go index 13518860409..5f59f4a175a 100644 --- a/service/history/timerQueueStandbyTaskExecutor_test.go +++ b/service/history/timerQueueStandbyTaskExecutor_test.go @@ -236,7 +236,7 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessUserTimerTimeout_Pending wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") timerID := "timer" timerTimeout := 2 * time.Second @@ -327,7 +327,7 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessUserTimerTimeout_Success wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") timerID := "timer" timerTimeout := 2 * time.Second @@ -391,7 +391,7 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessUserTimerTimeout_Multipl wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") timerID1 := "timer-1" timerTimeout1 := 2 * time.Second @@ -459,7 +459,7 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityTimeout_Pending( wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") taskqueue := "taskqueue" activityID := "activity" @@ -551,7 +551,7 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityTimeout_Success( wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") identity := "identity" taskqueue := "taskqueue" @@ -622,7 +622,7 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityTimeout_Heartbea wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") identity := "identity" taskqueue := "taskqueue" @@ -693,7 +693,7 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityTimeout_Multiple wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") identity := "identity" taskqueue := "taskqueue" @@ -921,7 +921,7 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessWorkflowTaskTimeout_Succ wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") // Flush buffered events so real IDs get assigned mutableState.FlushBufferedEvents() @@ -1096,7 +1096,7 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessWorkflowTimeout_Pending( wt := addWorkflowTaskScheduledEvent(mutableState) startEvent := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = startEvent.GetEventId() - completionEvent := addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + completionEvent := addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") // Flush buffered events so real IDs get assigned mutableState.FlushBufferedEvents() nextEventID := completionEvent.GetEventId() @@ -1173,7 +1173,7 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessWorkflowTimeout_Success( wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") event = addCompleteWorkflowEvent(mutableState, event.GetEventId(), nil) // Flush buffered events so real IDs get assigned mutableState.FlushBufferedEvents() @@ -1267,7 +1267,7 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityRetryTimer_Noop( wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") identity := "identity" taskqueue := "taskqueue" @@ -1365,7 +1365,7 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityRetryTimer_Activ wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") identity := "identity" taskqueue := "taskqueue" @@ -1434,7 +1434,7 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityRetryTimer_Pendi wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") taskqueue := "taskqueue" activityID := "activity" diff --git a/service/history/transferQueueActiveTaskExecutor_test.go b/service/history/transferQueueActiveTaskExecutor_test.go index 108fefdf321..2f1e3071418 100644 --- a/service/history/transferQueueActiveTaskExecutor_test.go +++ b/service/history/transferQueueActiveTaskExecutor_test.go @@ -275,7 +275,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessActivityTask_Success() wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") taskID := int64(59) activityID := "activity-1" @@ -330,7 +330,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessActivityTask_Duplicati wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") taskID := int64(59) activityID := "activity-1" @@ -444,7 +444,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessWorkflowTask_NonFirstW wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") s.NotNil(event) // make another round of workflow task @@ -501,7 +501,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessWorkflowTask_Sticky_No wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") s.NotNil(event) // set the sticky taskqueue attr executionInfo := mutableState.GetExecutionInfo() @@ -565,7 +565,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessWorkflowTask_WorkflowT wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") s.NotNil(event) // set the sticky taskqueue attr executionInfo := mutableState.GetExecutionInfo() @@ -625,7 +625,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessWorkflowTask_Duplicati wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") transferTask := &tasks.WorkflowTask{ WorkflowKey: definition.NewWorkflowKey( @@ -692,7 +692,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessCloseExecution_HasPare wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") taskID := int64(59) event = addCompleteWorkflowEvent(mutableState, event.GetEventId(), nil) @@ -763,12 +763,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessCloseExecution_CanSkip wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent( - mutableState, - wt.ScheduledEventID, - wt.StartedEventID, - "some random identity", - ) + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") taskID := int64(59) event = addCompleteWorkflowEvent(mutableState, event.GetEventId(), nil) @@ -842,7 +837,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessCloseExecution_NoParen wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") taskID := int64(59) event = addCompleteWorkflowEvent(mutableState, event.GetEventId(), nil) @@ -905,7 +900,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessCloseExecution_NoParen parentClosePolicy2 := enumspb.PARENT_CLOSE_POLICY_TERMINATE parentClosePolicy3 := enumspb.PARENT_CLOSE_POLICY_REQUEST_CANCEL - event, _ = mutableState.AddWorkflowTaskCompletedEvent(wt.ScheduledEventID, wt.StartedEventID, &workflowservice.RespondWorkflowTaskCompletedRequest{ + event, _ = mutableState.AddWorkflowTaskCompletedEvent(wt, &workflowservice.RespondWorkflowTaskCompletedRequest{ Identity: "some random identity", Commands: []*commandpb.Command{ { @@ -1070,7 +1065,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessCloseExecution_NoParen }) } - event, _ = mutableState.AddWorkflowTaskCompletedEvent(wt.ScheduledEventID, wt.StartedEventID, &workflowservice.RespondWorkflowTaskCompletedRequest{ + event, _ = mutableState.AddWorkflowTaskCompletedEvent(wt, &workflowservice.RespondWorkflowTaskCompletedRequest{ Identity: "some random identity", Commands: commands, }, configs.DefaultHistoryMaxAutoResetPoints) @@ -1164,7 +1159,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessCloseExecution_NoParen }) } - event, _ = mutableState.AddWorkflowTaskCompletedEvent(wt.ScheduledEventID, wt.StartedEventID, &workflowservice.RespondWorkflowTaskCompletedRequest{ + event, _ = mutableState.AddWorkflowTaskCompletedEvent(wt, &workflowservice.RespondWorkflowTaskCompletedRequest{ Identity: "some random identity", Commands: commands, }, configs.DefaultHistoryMaxAutoResetPoints) @@ -1236,7 +1231,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessCloseExecution_NoParen event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event, _ = mutableState.AddWorkflowTaskCompletedEvent(wt.ScheduledEventID, wt.StartedEventID, &workflowservice.RespondWorkflowTaskCompletedRequest{ + event, _ = mutableState.AddWorkflowTaskCompletedEvent(wt, &workflowservice.RespondWorkflowTaskCompletedRequest{ Identity: "some random identity", Commands: []*commandpb.Command{ { @@ -1361,7 +1356,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessCloseExecution_DeleteA wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") taskID := int64(59) event = addCompleteWorkflowEvent(mutableState, event.GetEventId(), nil) @@ -1426,7 +1421,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessCancelExecution_Succes wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") taskID := int64(59) event, rci := addRequestCancelInitiatedEvent(mutableState, event.GetEventId(), uuid.New(), tests.TargetNamespace, tests.TargetNamespaceID, targetExecution.GetWorkflowId(), targetExecution.GetRunId()) @@ -1489,7 +1484,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessCancelExecution_Failur wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") taskID := int64(59) event, rci := addRequestCancelInitiatedEvent(mutableState, event.GetEventId(), uuid.New(), tests.TargetNamespace, tests.TargetNamespaceID, targetExecution.GetWorkflowId(), targetExecution.GetRunId()) @@ -1552,7 +1547,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessCancelExecution_Failur wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") taskID := int64(59) event, _ = addRequestCancelInitiatedEvent(mutableState, event.GetEventId(), uuid.New(), tests.TargetNamespace, tests.TargetNamespaceID, targetExecution.GetWorkflowId(), targetExecution.GetRunId()) @@ -1613,7 +1608,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessCancelExecution_Duplic wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") taskID := int64(59) event, _ = addRequestCancelInitiatedEvent(mutableState, event.GetEventId(), uuid.New(), tests.TargetNamespace, tests.TargetNamespaceID, targetExecution.GetWorkflowId(), targetExecution.GetRunId()) @@ -1682,7 +1677,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessSignalExecution_Succes wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") taskID := int64(59) event, si := addRequestSignalInitiatedEvent(mutableState, event.GetEventId(), uuid.New(), @@ -1762,7 +1757,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessSignalExecution_Failur wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") taskID := int64(59) event, si := addRequestSignalInitiatedEvent(mutableState, event.GetEventId(), uuid.New(), @@ -1833,7 +1828,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessSignalExecution_Failur wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") taskID := int64(59) event, _ = addRequestSignalInitiatedEvent(mutableState, event.GetEventId(), uuid.New(), @@ -1902,7 +1897,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessSignalExecution_Duplic wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") taskID := int64(59) event, _ = addRequestSignalInitiatedEvent(mutableState, event.GetEventId(), uuid.New(), @@ -1967,7 +1962,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessStartChildExecution_Su wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") taskID := int64(59) @@ -2057,7 +2052,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessStartChildExecution_Fa wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") taskID := int64(59) @@ -2139,7 +2134,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessStartChildExecution_Fa wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") taskID := int64(59) @@ -2214,7 +2209,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessStartChildExecution_Su wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") taskID := int64(59) @@ -2303,7 +2298,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessStartChildExecution_Du wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") taskID := int64(59) @@ -2387,7 +2382,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessorStartChildExecution_ wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") taskID := int64(59) @@ -2426,7 +2421,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessorStartChildExecution_ wt = addWorkflowTaskScheduledEvent(mutableState) event = addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, "some random identity") wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") event = addCompleteWorkflowEvent(mutableState, event.EventId, nil) // Flush buffered events so real IDs get assigned mutableState.FlushBufferedEvents() diff --git a/service/history/transferQueueStandbyTaskExecutor_test.go b/service/history/transferQueueStandbyTaskExecutor_test.go index 4e253628bc1..a18fbc96484 100644 --- a/service/history/transferQueueStandbyTaskExecutor_test.go +++ b/service/history/transferQueueStandbyTaskExecutor_test.go @@ -251,7 +251,7 @@ func (s *transferQueueStandbyTaskExecutorSuite) TestProcessActivityTask_Pending( wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") taskID := int64(59) activityID := "activity-1" @@ -340,7 +340,7 @@ func (s *transferQueueStandbyTaskExecutorSuite) TestProcessActivityTask_Success( wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") taskID := int64(59) activityID := "activity-1" @@ -531,7 +531,7 @@ func (s *transferQueueStandbyTaskExecutorSuite) TestProcessWorkflowTask_Success_ wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") taskID := int64(59) wt = addWorkflowTaskScheduledEvent(mutableState) @@ -606,7 +606,7 @@ func (s *transferQueueStandbyTaskExecutorSuite) TestProcessCloseExecution() { wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") taskID := int64(59) event = addCompleteWorkflowEvent(mutableState, event.GetEventId(), nil) @@ -706,12 +706,7 @@ func (s *transferQueueStandbyTaskExecutorSuite) TestProcessCloseExecution_CanSki wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent( - mutableState, - wt.ScheduledEventID, - wt.StartedEventID, - "some random identity", - ) + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") taskID := int64(59) event = addCompleteWorkflowEvent(mutableState, event.GetEventId(), nil) @@ -792,7 +787,7 @@ func (s *transferQueueStandbyTaskExecutorSuite) TestProcessCancelExecution_Pendi wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") taskID := int64(59) event, _ = addRequestCancelInitiatedEvent(mutableState, event.GetEventId(), uuid.New(), tests.TargetNamespace, tests.TargetNamespaceID, targetExecution.GetWorkflowId(), targetExecution.GetRunId()) @@ -881,7 +876,7 @@ func (s *transferQueueStandbyTaskExecutorSuite) TestProcessCancelExecution_Succe wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") taskID := int64(59) event, _ = addRequestCancelInitiatedEvent(mutableState, event.GetEventId(), uuid.New(), tests.TargetNamespace, tests.TargetNamespaceID, targetExecution.GetWorkflowId(), targetExecution.GetRunId()) @@ -947,7 +942,7 @@ func (s *transferQueueStandbyTaskExecutorSuite) TestProcessSignalExecution_Pendi wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") taskID := int64(59) event, _ = addRequestSignalInitiatedEvent(mutableState, event.GetEventId(), uuid.New(), @@ -1037,7 +1032,7 @@ func (s *transferQueueStandbyTaskExecutorSuite) TestProcessSignalExecution_Succe wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") taskID := int64(59) event, _ = addRequestSignalInitiatedEvent(mutableState, event.GetEventId(), uuid.New(), @@ -1102,7 +1097,7 @@ func (s *transferQueueStandbyTaskExecutorSuite) TestProcessStartChildExecution_P wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") taskID := int64(59) event, _ = addStartChildWorkflowExecutionInitiatedEvent(mutableState, event.GetEventId(), uuid.New(), @@ -1225,7 +1220,7 @@ func (s *transferQueueStandbyTaskExecutorSuite) TestProcessStartChildExecution_S wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") taskID := int64(59) event, childInfo := addStartChildWorkflowExecutionInitiatedEvent(mutableState, event.GetEventId(), uuid.New(), diff --git a/service/history/visibilityQueueTaskExecutor_test.go b/service/history/visibilityQueueTaskExecutor_test.go index db166f84722..e1f6c4044d0 100644 --- a/service/history/visibilityQueueTaskExecutor_test.go +++ b/service/history/visibilityQueueTaskExecutor_test.go @@ -220,7 +220,7 @@ func (s *visibilityQueueTaskExecutorSuite) TestProcessCloseExecution() { wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") taskID := int64(59) event = addCompleteWorkflowEvent(mutableState, event.GetEventId(), nil) @@ -289,7 +289,7 @@ func (s *visibilityQueueTaskExecutorSuite) TestProcessCloseExecutionWithWorkflow wt := addWorkflowTaskScheduledEvent(mutableState) event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) wt.StartedEventID = event.GetEventId() - event = addWorkflowTaskCompletedEvent(mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") taskID := int64(59) event = addCompleteWorkflowEvent(mutableState, event.GetEventId(), nil) diff --git a/service/history/workflow/mutable_state.go b/service/history/workflow/mutable_state.go index e051f578fff..ce37620d7be 100644 --- a/service/history/workflow/mutable_state.go +++ b/service/history/workflow/mutable_state.go @@ -107,14 +107,14 @@ type ( AddChildWorkflowExecutionTimedOutEvent(int64, *commonpb.WorkflowExecution, *historypb.WorkflowExecutionTimedOutEventAttributes) (*historypb.HistoryEvent, error) AddCompletedWorkflowEvent(int64, *commandpb.CompleteWorkflowExecutionCommandAttributes, string) (*historypb.HistoryEvent, error) AddContinueAsNewEvent(context.Context, int64, int64, namespace.Name, *commandpb.ContinueAsNewWorkflowExecutionCommandAttributes) (*historypb.HistoryEvent, MutableState, error) - AddWorkflowTaskCompletedEvent(int64, int64, *workflowservice.RespondWorkflowTaskCompletedRequest, int) (*historypb.HistoryEvent, error) - AddWorkflowTaskFailedEvent(scheduledEventID int64, startedEventID int64, cause enumspb.WorkflowTaskFailedCause, failure *failurepb.Failure, identity, binChecksum, baseRunID, newRunID string, forkEventVersion int64) (*historypb.HistoryEvent, error) + AddWorkflowTaskCompletedEvent(*WorkflowTaskInfo, *workflowservice.RespondWorkflowTaskCompletedRequest, int) (*historypb.HistoryEvent, error) + AddWorkflowTaskFailedEvent(workflowTask *WorkflowTaskInfo, cause enumspb.WorkflowTaskFailedCause, failure *failurepb.Failure, identity, binChecksum, baseRunID, newRunID string, forkEventVersion int64) (*historypb.HistoryEvent, error) AddWorkflowTaskScheduleToStartTimeoutEvent(int64) (*historypb.HistoryEvent, error) AddFirstWorkflowTaskScheduled(event *historypb.HistoryEvent, bypassTaskGeneration bool) (int64, error) AddWorkflowTaskScheduledEvent(bypassTaskGeneration bool, workflowTaskType enumsspb.WorkflowTaskType) (*WorkflowTaskInfo, error) AddWorkflowTaskScheduledEventAsHeartbeat(bypassTaskGeneration bool, originalScheduledTimestamp *time.Time, workflowTaskType enumsspb.WorkflowTaskType) (*WorkflowTaskInfo, error) AddWorkflowTaskStartedEvent(int64, string, *taskqueuepb.TaskQueue, string) (*historypb.HistoryEvent, *WorkflowTaskInfo, error) - AddWorkflowTaskTimedOutEvent(int64, int64) (*historypb.HistoryEvent, error) + AddWorkflowTaskTimedOutEvent(workflowTask *WorkflowTaskInfo) (*historypb.HistoryEvent, error) AddExternalWorkflowExecutionCancelRequested(int64, namespace.Name, namespace.ID, string, string) (*historypb.HistoryEvent, error) AddExternalWorkflowExecutionSignaled(int64, namespace.Name, namespace.ID, string, string, string) (*historypb.HistoryEvent, error) AddFailWorkflowEvent(int64, enumspb.RetryState, *commandpb.FailWorkflowExecutionCommandAttributes, string) (*historypb.HistoryEvent, error) diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index 90762b5ced0..505b6a80d58 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -1891,8 +1891,7 @@ func (ms *MutableStateImpl) CheckResettable() error { } func (ms *MutableStateImpl) AddWorkflowTaskCompletedEvent( - scheduledEventID int64, - startedEventID int64, + workflowTask *WorkflowTaskInfo, request *workflowservice.RespondWorkflowTaskCompletedRequest, maxResetPoints int, ) (*historypb.HistoryEvent, error) { @@ -1900,7 +1899,7 @@ func (ms *MutableStateImpl) AddWorkflowTaskCompletedEvent( if err := ms.checkMutability(opTag); err != nil { return nil, err } - return ms.workflowTaskManager.AddWorkflowTaskCompletedEvent(scheduledEventID, startedEventID, request, maxResetPoints) + return ms.workflowTaskManager.AddWorkflowTaskCompletedEvent(workflowTask, request, maxResetPoints) } func (ms *MutableStateImpl) ReplicateWorkflowTaskCompletedEvent( @@ -1910,14 +1909,13 @@ func (ms *MutableStateImpl) ReplicateWorkflowTaskCompletedEvent( } func (ms *MutableStateImpl) AddWorkflowTaskTimedOutEvent( - scheduledEventID int64, - startedEventID int64, + workflowTask *WorkflowTaskInfo, ) (*historypb.HistoryEvent, error) { opTag := tag.WorkflowActionWorkflowTaskTimedOut if err := ms.checkMutability(opTag); err != nil { return nil, err } - return ms.workflowTaskManager.AddWorkflowTaskTimedOutEvent(scheduledEventID, startedEventID) + return ms.workflowTaskManager.AddWorkflowTaskTimedOutEvent(workflowTask) } func (ms *MutableStateImpl) ReplicateWorkflowTaskTimedOutEvent( @@ -1937,8 +1935,7 @@ func (ms *MutableStateImpl) AddWorkflowTaskScheduleToStartTimeoutEvent( } func (ms *MutableStateImpl) AddWorkflowTaskFailedEvent( - scheduledEventID int64, - startedEventID int64, + workflowTask *WorkflowTaskInfo, cause enumspb.WorkflowTaskFailedCause, failure *failurepb.Failure, identity string, @@ -1952,8 +1949,7 @@ func (ms *MutableStateImpl) AddWorkflowTaskFailedEvent( return nil, err } return ms.workflowTaskManager.AddWorkflowTaskFailedEvent( - scheduledEventID, - startedEventID, + workflowTask, cause, failure, identity, diff --git a/service/history/workflow/mutable_state_impl_test.go b/service/history/workflow/mutable_state_impl_test.go index da49d9191d7..4d89319c48f 100644 --- a/service/history/workflow/mutable_state_impl_test.go +++ b/service/history/workflow/mutable_state_impl_test.go @@ -159,10 +159,13 @@ func (s *mutableStateSuite) TestTransientWorkflowTaskCompletionFirstBatchReplica runID, ) - newWorkflowTaskScheduleEvent, newWorkflowTaskStartedEvent := s.prepareTransientWorkflowTaskCompletionFirstBatchReplicated(version, runID) + newWorkflowTaskScheduleEvent, _ := s.prepareTransientWorkflowTaskCompletionFirstBatchReplicated(version, runID) + + newWorkflowTask, _ := s.mutableState.GetWorkflowTaskInfo(newWorkflowTaskScheduleEvent.GetEventId()) + s.NotNil(newWorkflowTask) + _, err := s.mutableState.AddWorkflowTaskTimedOutEvent( - newWorkflowTaskScheduleEvent.GetEventId(), - newWorkflowTaskStartedEvent.GetEventId(), + newWorkflowTask, ) s.NoError(err) s.Equal(0, s.mutableState.hBuilder.BufferEventSize()) @@ -179,11 +182,13 @@ func (s *mutableStateSuite) TestTransientWorkflowTaskCompletionFirstBatchReplica runID, ) - newWorkflowTaskScheduleEvent, newWorkflowTaskStartedEvent := s.prepareTransientWorkflowTaskCompletionFirstBatchReplicated(version, runID) + newWorkflowTaskScheduleEvent, _ := s.prepareTransientWorkflowTaskCompletionFirstBatchReplicated(version, runID) + + newWorkflowTask, _ := s.mutableState.GetWorkflowTaskInfo(newWorkflowTaskScheduleEvent.GetEventId()) + s.NotNil(newWorkflowTask) _, err := s.mutableState.AddWorkflowTaskFailedEvent( - newWorkflowTaskScheduleEvent.GetEventId(), - newWorkflowTaskStartedEvent.GetEventId(), + newWorkflowTask, enumspb.WORKFLOW_TASK_FAILED_CAUSE_WORKFLOW_WORKER_UNHANDLED_FAILURE, failure.NewServerFailure("some random workflow task failure details", false), "some random workflow task failure identity", diff --git a/service/history/workflow/mutable_state_mock.go b/service/history/workflow/mutable_state_mock.go index ed6a6ff6899..a3b79ac02e9 100644 --- a/service/history/workflow/mutable_state_mock.go +++ b/service/history/workflow/mutable_state_mock.go @@ -714,33 +714,33 @@ func (mr *MockMutableStateMockRecorder) AddWorkflowPropertiesModifiedEvent(arg0, } // AddWorkflowTaskCompletedEvent mocks base method. -func (m *MockMutableState) AddWorkflowTaskCompletedEvent(arg0, arg1 int64, arg2 *v17.RespondWorkflowTaskCompletedRequest, arg3 int) (*v13.HistoryEvent, error) { +func (m *MockMutableState) AddWorkflowTaskCompletedEvent(arg0 *WorkflowTaskInfo, arg1 *v17.RespondWorkflowTaskCompletedRequest, arg2 int) (*v13.HistoryEvent, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AddWorkflowTaskCompletedEvent", arg0, arg1, arg2, arg3) + ret := m.ctrl.Call(m, "AddWorkflowTaskCompletedEvent", arg0, arg1, arg2) ret0, _ := ret[0].(*v13.HistoryEvent) ret1, _ := ret[1].(error) return ret0, ret1 } // AddWorkflowTaskCompletedEvent indicates an expected call of AddWorkflowTaskCompletedEvent. -func (mr *MockMutableStateMockRecorder) AddWorkflowTaskCompletedEvent(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { +func (mr *MockMutableStateMockRecorder) AddWorkflowTaskCompletedEvent(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddWorkflowTaskCompletedEvent", reflect.TypeOf((*MockMutableState)(nil).AddWorkflowTaskCompletedEvent), arg0, arg1, arg2, arg3) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddWorkflowTaskCompletedEvent", reflect.TypeOf((*MockMutableState)(nil).AddWorkflowTaskCompletedEvent), arg0, arg1, arg2) } // AddWorkflowTaskFailedEvent mocks base method. -func (m *MockMutableState) AddWorkflowTaskFailedEvent(scheduledEventID, startedEventID int64, cause v11.WorkflowTaskFailedCause, failure *v12.Failure, identity, binChecksum, baseRunID, newRunID string, forkEventVersion int64) (*v13.HistoryEvent, error) { +func (m *MockMutableState) AddWorkflowTaskFailedEvent(workflowTask *WorkflowTaskInfo, cause v11.WorkflowTaskFailedCause, failure *v12.Failure, identity, binChecksum, baseRunID, newRunID string, forkEventVersion int64) (*v13.HistoryEvent, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AddWorkflowTaskFailedEvent", scheduledEventID, startedEventID, cause, failure, identity, binChecksum, baseRunID, newRunID, forkEventVersion) + ret := m.ctrl.Call(m, "AddWorkflowTaskFailedEvent", workflowTask, cause, failure, identity, binChecksum, baseRunID, newRunID, forkEventVersion) ret0, _ := ret[0].(*v13.HistoryEvent) ret1, _ := ret[1].(error) return ret0, ret1 } // AddWorkflowTaskFailedEvent indicates an expected call of AddWorkflowTaskFailedEvent. -func (mr *MockMutableStateMockRecorder) AddWorkflowTaskFailedEvent(scheduledEventID, startedEventID, cause, failure, identity, binChecksum, baseRunID, newRunID, forkEventVersion interface{}) *gomock.Call { +func (mr *MockMutableStateMockRecorder) AddWorkflowTaskFailedEvent(workflowTask, cause, failure, identity, binChecksum, baseRunID, newRunID, forkEventVersion interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddWorkflowTaskFailedEvent", reflect.TypeOf((*MockMutableState)(nil).AddWorkflowTaskFailedEvent), scheduledEventID, startedEventID, cause, failure, identity, binChecksum, baseRunID, newRunID, forkEventVersion) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddWorkflowTaskFailedEvent", reflect.TypeOf((*MockMutableState)(nil).AddWorkflowTaskFailedEvent), workflowTask, cause, failure, identity, binChecksum, baseRunID, newRunID, forkEventVersion) } // AddWorkflowTaskScheduleToStartTimeoutEvent mocks base method. @@ -805,18 +805,18 @@ func (mr *MockMutableStateMockRecorder) AddWorkflowTaskStartedEvent(arg0, arg1, } // AddWorkflowTaskTimedOutEvent mocks base method. -func (m *MockMutableState) AddWorkflowTaskTimedOutEvent(arg0, arg1 int64) (*v13.HistoryEvent, error) { +func (m *MockMutableState) AddWorkflowTaskTimedOutEvent(workflowTask *WorkflowTaskInfo) (*v13.HistoryEvent, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AddWorkflowTaskTimedOutEvent", arg0, arg1) + ret := m.ctrl.Call(m, "AddWorkflowTaskTimedOutEvent", workflowTask) ret0, _ := ret[0].(*v13.HistoryEvent) ret1, _ := ret[1].(error) return ret0, ret1 } // AddWorkflowTaskTimedOutEvent indicates an expected call of AddWorkflowTaskTimedOutEvent. -func (mr *MockMutableStateMockRecorder) AddWorkflowTaskTimedOutEvent(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockMutableStateMockRecorder) AddWorkflowTaskTimedOutEvent(workflowTask interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddWorkflowTaskTimedOutEvent", reflect.TypeOf((*MockMutableState)(nil).AddWorkflowTaskTimedOutEvent), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddWorkflowTaskTimedOutEvent", reflect.TypeOf((*MockMutableState)(nil).AddWorkflowTaskTimedOutEvent), workflowTask) } // CheckResettable mocks base method. diff --git a/service/history/workflow/util.go b/service/history/workflow/util.go index ae1464a42a8..a15e3e4bce5 100644 --- a/service/history/workflow/util.go +++ b/service/history/workflow/util.go @@ -48,8 +48,7 @@ func failWorkflowTask( ) error { if _, err := mutableState.AddWorkflowTaskFailedEvent( - workflowTask.ScheduledEventID, - workflowTask.StartedEventID, + workflowTask, workflowTaskFailureCause, nil, consts.IdentityHistoryService, diff --git a/service/history/workflow/workflow_task_state_machine.go b/service/history/workflow/workflow_task_state_machine.go index 670a5527802..f9c67056a78 100644 --- a/service/history/workflow/workflow_task_state_machine.go +++ b/service/history/workflow/workflow_task_state_machine.go @@ -482,23 +482,10 @@ func (m *workflowTaskStateMachine) skipWorkflowTaskCompletedEvent(workflowTaskTy return onlyUpdateRejectionMessages } func (m *workflowTaskStateMachine) AddWorkflowTaskCompletedEvent( - scheduledEventID int64, - startedEventID int64, + workflowTask *WorkflowTaskInfo, request *workflowservice.RespondWorkflowTaskCompletedRequest, maxResetPoints int, ) (*historypb.HistoryEvent, error) { - opTag := tag.WorkflowActionWorkflowTaskCompleted - workflowTask, ok := m.GetWorkflowTaskInfo(scheduledEventID) - m.setSpeculativeWorkflowTaskStartedEventID(workflowTask, startedEventID) - if !ok || workflowTask.StartedEventID != startedEventID { - m.ms.logger.Warn(mutableStateInvalidHistoryActionMsg, opTag, - tag.WorkflowEventID(m.ms.GetNextEventID()), - tag.ErrorTypeInvalidHistoryAction, - tag.WorkflowScheduledEventID(scheduledEventID), - tag.WorkflowStartedEventID(startedEventID)) - - return nil, m.ms.createInternalServerError(opTag) - } // Capture if WorkflowTaskScheduled and WorkflowTaskStarted events were created // before calling m.beforeAddWorkflowTaskCompletedEvent() because it will delete workflow task info from mutable state. @@ -524,12 +511,12 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskCompletedEvent( timestamp.TimeValue(workflowTask.StartedTime), ) m.ms.hBuilder.FlushAndCreateNewBatch() - startedEventID = startedEvent.GetEventId() + workflowTask.StartedEventID = startedEvent.GetEventId() } // Now write the completed event event := m.ms.hBuilder.AddWorkflowTaskCompletedEvent( - scheduledEventID, - startedEventID, + workflowTask.ScheduledEventID, + workflowTask.StartedEventID, request.Identity, request.BinaryChecksum, ) @@ -542,53 +529,29 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskCompletedEvent( } func (m *workflowTaskStateMachine) AddWorkflowTaskFailedEvent( - scheduledEventID int64, - startedEventID int64, + workflowTask *WorkflowTaskInfo, cause enumspb.WorkflowTaskFailedCause, failure *failurepb.Failure, identity string, - binChecksum string, + binaryChecksum string, baseRunID string, newRunID string, forkEventVersion int64, ) (*historypb.HistoryEvent, error) { - opTag := tag.WorkflowActionWorkflowTaskFailed - attr := &historypb.WorkflowTaskFailedEventAttributes{ - ScheduledEventId: scheduledEventID, - StartedEventId: startedEventID, - Cause: cause, - Failure: failure, - Identity: identity, - BinaryChecksum: binChecksum, - BaseRunId: baseRunID, - NewRunId: newRunID, - ForkEventVersion: forkEventVersion, - } - - workflowTask, ok := m.GetWorkflowTaskInfo(scheduledEventID) - m.setSpeculativeWorkflowTaskStartedEventID(workflowTask, startedEventID) - if !ok || workflowTask.StartedEventID != startedEventID { - m.ms.logger.Warn(mutableStateInvalidHistoryActionMsg, opTag, - tag.WorkflowEventID(m.ms.GetNextEventID()), - tag.ErrorTypeInvalidHistoryAction, - tag.WorkflowScheduledEventID(scheduledEventID), - tag.WorkflowStartedEventID(startedEventID)) - return nil, m.ms.createInternalServerError(opTag) - } var event *historypb.HistoryEvent // Only emit WorkflowTaskFailedEvent if workflow task is not transient and not speculative. if !m.ms.IsTransientWorkflowTask() && workflowTask.Type != enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE { event = m.ms.hBuilder.AddWorkflowTaskFailedEvent( - attr.ScheduledEventId, - attr.StartedEventId, - attr.Cause, - attr.Failure, - attr.Identity, - attr.BaseRunId, - attr.NewRunId, - attr.ForkEventVersion, - attr.BinaryChecksum, + workflowTask.ScheduledEventID, + workflowTask.StartedEventID, + cause, + failure, + identity, + baseRunID, + newRunID, + forkEventVersion, + binaryChecksum, ) } @@ -601,31 +564,22 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskFailedEvent( cause == enumspb.WORKFLOW_TASK_FAILED_CAUSE_FAILOVER_CLOSE_COMMAND { m.ms.executionInfo.WorkflowTaskAttempt = 1 } + + // Attempt counter was incremented directly in mutable state. Current WT attempt counter needs to be updated. + workflowTask.Attempt = m.ms.GetExecutionInfo().GetWorkflowTaskAttempt() + return event, nil } func (m *workflowTaskStateMachine) AddWorkflowTaskTimedOutEvent( - scheduledEventID int64, - startedEventID int64, + workflowTask *WorkflowTaskInfo, ) (*historypb.HistoryEvent, error) { - opTag := tag.WorkflowActionWorkflowTaskTimedOut - workflowTask, ok := m.GetWorkflowTaskInfo(scheduledEventID) - m.setSpeculativeWorkflowTaskStartedEventID(workflowTask, startedEventID) - if !ok || workflowTask.StartedEventID != startedEventID { - m.ms.logger.Warn(mutableStateInvalidHistoryActionMsg, opTag, - tag.WorkflowEventID(m.ms.GetNextEventID()), - tag.ErrorTypeInvalidHistoryAction, - tag.WorkflowScheduledEventID(scheduledEventID), - tag.WorkflowStartedEventID(startedEventID)) - return nil, m.ms.createInternalServerError(opTag) - } - var event *historypb.HistoryEvent // Avoid creating WorkflowTaskTimedOut history event when workflow task is transient. if !m.ms.IsTransientWorkflowTask() { event = m.ms.hBuilder.AddWorkflowTaskTimedOutEvent( - scheduledEventID, - startedEventID, + workflowTask.ScheduledEventID, + workflowTask.StartedEventID, enumspb.TIMEOUT_TYPE_START_TO_CLOSE, ) } @@ -789,14 +743,13 @@ func (m *workflowTaskStateMachine) tryRestoreSpeculativeWorkflowTask( func (m *workflowTaskStateMachine) setSpeculativeWorkflowTaskStartedEventID( workflowTask *WorkflowTaskInfo, - startedEventID int64, ) { // TODO (alex-update): Uncomment this code to support speculative workflow task restoration. /* // StartedEventID might be lost (cleared) for speculative workflow task due to shard reload or history service restart. if workflowTask != nil && workflowTask.Type == enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE && workflowTask.StartedEventID == common.EmptyEventID { - workflowTask.StartedEventID = startedEventID + workflowTask.StartedEventID = workflowTask.ScheduledEventID + 1 m.UpdateWorkflowTask(workflowTask) } */ diff --git a/service/history/workflowTaskHandlerCallbacks.go b/service/history/workflowTaskHandlerCallbacks.go index 4d051968ac1..8f3cd090eca 100644 --- a/service/history/workflowTaskHandlerCallbacks.go +++ b/service/history/workflowTaskHandlerCallbacks.go @@ -299,13 +299,21 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskFailed( scheduledEventID := token.GetScheduledEventId() workflowTask, isRunning := mutableState.GetWorkflowTaskInfo(scheduledEventID) + // TODO (alex-update): call mutableState.SetSpeculativeWorkflowTaskStartedEventID(mutableState) here to set StartEventID. + if !isRunning || workflowTask.Attempt != token.Attempt || workflowTask.StartedEventID == common.EmptyEventID { return nil, serviceerror.NewNotFound("Workflow task not found.") } - _, err := mutableState.AddWorkflowTaskFailedEvent(workflowTask.ScheduledEventID, workflowTask.StartedEventID, request.GetCause(), request.GetFailure(), - request.GetIdentity(), request.GetBinaryChecksum(), "", "", 0) - if err != nil { + if _, err := mutableState.AddWorkflowTaskFailedEvent( + workflowTask, + request.GetCause(), + request.GetFailure(), + request.GetIdentity(), + request.GetBinaryChecksum(), + "", + "", + 0); err != nil { return nil, err } @@ -331,21 +339,19 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( if err != nil { return nil, err } - namespaceID := namespaceEntry.ID() request := req.CompleteRequest token, err0 := handler.tokenSerializer.Deserialize(request.TaskToken) if err0 != nil { return nil, consts.ErrDeserializingToken } - scheduledEventID := token.GetScheduledEventId() workflowContext, err := handler.workflowConsistencyChecker.GetWorkflowContext( ctx, token.Clock, func(mutableState workflow.MutableState) bool { - _, ok := mutableState.GetWorkflowTaskInfo(scheduledEventID) - if !ok && scheduledEventID >= mutableState.GetNextEventID() { + _, ok := mutableState.GetWorkflowTaskInfo(token.GetScheduledEventId()) + if !ok && token.GetScheduledEventId() >= mutableState.GetNextEventID() { handler.metricsHandler.Counter(metrics.StaleMutableStateCounter.GetMetricName()).Record( 1, metrics.OperationTag(metrics.HistoryRespondWorkflowTaskCompletedScope)) @@ -354,7 +360,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( return true }, definition.NewWorkflowKey( - namespaceID.String(), + namespaceEntry.ID().String(), token.WorkflowId, token.RunId, ), @@ -366,7 +372,8 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( weContext := workflowContext.GetContext() ms := workflowContext.GetMutableState() - currentWorkflowTask, currentWorkflowTaskRunning := ms.GetWorkflowTaskInfo(scheduledEventID) + currentWorkflowTask, currentWorkflowTaskRunning := ms.GetWorkflowTaskInfo(token.GetScheduledEventId()) + // TODO (alex-update): call mutableState.SetSpeculativeWorkflowTaskStartedEventID(mutableState) here to set StartEventID. executionInfo := ms.GetExecutionInfo() executionStats, err := weContext.LoadExecutionStats(ctx) @@ -379,7 +386,6 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( return nil, serviceerror.NewNotFound("Workflow task not found.") } - startedEventID := currentWorkflowTask.StartedEventID maxResetPoints := handler.config.MaxAutoResetPoints(namespaceEntry.Name().String()) if ms.GetExecutionInfo().AutoResetPoints != nil && maxResetPoints == len(ms.GetExecutionInfo().AutoResetPoints.Points) { handler.metricsHandler.Counter(metrics.AutoResetPointsLimitExceededCounter.GetMetricName()).Record( @@ -404,19 +410,19 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( metrics.NamespaceTag(namespace.String()), ) scope.Counter(metrics.WorkflowTaskHeartbeatTimeoutCounter.GetMetricName()).Record(1) - completedEvent, err = ms.AddWorkflowTaskTimedOutEvent(currentWorkflowTask.ScheduledEventID, currentWorkflowTask.StartedEventID) + completedEvent, err = ms.AddWorkflowTaskTimedOutEvent(currentWorkflowTask) if err != nil { return nil, err } ms.ClearStickyness() } else { - completedEvent, err = ms.AddWorkflowTaskCompletedEvent(scheduledEventID, startedEventID, request, maxResetPoints) + completedEvent, err = ms.AddWorkflowTaskCompletedEvent(currentWorkflowTask, request, maxResetPoints) if err != nil { return nil, err } } } else { - completedEvent, err = ms.AddWorkflowTaskCompletedEvent(scheduledEventID, startedEventID, request, maxResetPoints) + completedEvent, err = ms.AddWorkflowTaskCompletedEvent(currentWorkflowTask, request, maxResetPoints) if err != nil { return nil, err } @@ -522,13 +528,13 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( tag.Value(wtFailedCause.Message()), tag.WorkflowID(token.GetWorkflowId()), tag.WorkflowRunID(token.GetRunId()), - tag.WorkflowNamespaceID(namespaceID.String())) + tag.WorkflowNamespaceID(namespaceEntry.ID().String())) if currentWorkflowTask.Attempt > 1 && wtFailedCause.failedCause != enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_COMMAND { // drop this workflow task if it keeps failing. This will cause the workflow task to timeout and get retried after timeout. return nil, serviceerror.NewInvalidArgument(wtFailedCause.Message()) } var nextEventBatchId int64 - ms, nextEventBatchId, err = failWorkflowTask(ctx, weContext, scheduledEventID, startedEventID, wtFailedCause, request) + ms, nextEventBatchId, err = failWorkflowTask(ctx, weContext, currentWorkflowTask, wtFailedCause, request) if err != nil { return nil, err } @@ -897,8 +903,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleBufferedQueries(ms workfl func failWorkflowTask( ctx context.Context, wfContext workflow.Context, - scheduledEventID int64, - startedEventID int64, + workflowTask *workflow.WorkflowTaskInfo, wtFailedCause *workflowTaskFailedCause, request *workflowservice.RespondWorkflowTaskCompletedRequest, ) (workflow.MutableState, int64, error) { @@ -913,8 +918,7 @@ func failWorkflowTask( } nextEventBatchId := mutableState.GetNextEventID() if _, err = mutableState.AddWorkflowTaskFailedEvent( - scheduledEventID, - startedEventID, + workflowTask, wtFailedCause.failedCause, failure.NewServerFailure(wtFailedCause.Message(), true), request.GetIdentity(), diff --git a/service/history/workflowTaskHandlerCallbacks_test.go b/service/history/workflowTaskHandlerCallbacks_test.go index e1cd06df77f..a32f114cfca 100644 --- a/service/history/workflowTaskHandlerCallbacks_test.go +++ b/service/history/workflowTaskHandlerCallbacks_test.go @@ -250,7 +250,7 @@ func (s *WorkflowTaskHandlerCallbackSuite) TestVerifyFirstWorkflowTaskScheduled_ wt := addWorkflowTaskScheduledEvent(ms) workflowTasksStartEvent := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, "testTaskQueue", uuid.New()) wt.StartedEventID = workflowTasksStartEvent.GetEventId() - addWorkflowTaskCompletedEvent(ms, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, wt.StartedEventID, "some random identity") wfMs := workflow.TestCloneToProto(ms) gwmsResponse := &persistence.GetWorkflowExecutionResponse{State: wfMs}