From c6a89c095bdbfa0ddd26d4c52f1bebfa36af3319 Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Tue, 20 Dec 2022 13:54:54 -0800 Subject: [PATCH] Workflow task state machine: minor renames (#3735) --- service/history/configs/config.go | 2 +- service/history/workflow/history_builder.go | 8 +-- service/history/workflow/mutable_state.go | 3 +- .../history/workflow/mutable_state_impl.go | 21 ++++++-- .../history/workflow/mutable_state_mock.go | 42 ++++++++++----- .../workflow/workflow_task_state_machine.go | 53 +++++++------------ service/history/workflowTaskHandler.go | 3 +- .../history/workflowTaskHandlerCallbacks.go | 3 +- 8 files changed, 74 insertions(+), 61 deletions(-) diff --git a/service/history/configs/config.go b/service/history/configs/config.go index 47834f4e455..cc52e5cf7e9 100644 --- a/service/history/configs/config.go +++ b/service/history/configs/config.go @@ -209,7 +209,7 @@ type Config struct { // Workflow task settings // DefaultWorkflowTaskTimeout the default workflow task timeout DefaultWorkflowTaskTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter - // WorkflowTaskHeartbeatTimeout is to timeout behavior of: RespondWorkflowTaskComplete with ForceCreateNewWorkflowTask == true without any workflow tasks + // WorkflowTaskHeartbeatTimeout is to timeout behavior of: RespondWorkflowTaskComplete with ForceCreateNewWorkflowTask == true without any commands // So that workflow task will be scheduled to another worker(by clear stickyness) WorkflowTaskHeartbeatTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter WorkflowTaskCriticalAttempts dynamicconfig.IntPropertyFn diff --git a/service/history/workflow/history_builder.go b/service/history/workflow/history_builder.go index a32003557e6..8817a0eb9ef 100644 --- a/service/history/workflow/history_builder.go +++ b/service/history/workflow/history_builder.go @@ -198,9 +198,9 @@ func (b *HistoryBuilder) AddWorkflowTaskScheduledEvent( taskQueue *taskqueuepb.TaskQueue, startToCloseTimeout *time.Duration, attempt int32, - now time.Time, + scheduleTime time.Time, ) *historypb.HistoryEvent { - event := b.createNewHistoryEvent(enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED, now) + event := b.createNewHistoryEvent(enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED, scheduleTime) event.Attributes = &historypb.HistoryEvent_WorkflowTaskScheduledEventAttributes{ WorkflowTaskScheduledEventAttributes: &historypb.WorkflowTaskScheduledEventAttributes{ TaskQueue: taskQueue, @@ -216,9 +216,9 @@ func (b *HistoryBuilder) AddWorkflowTaskStartedEvent( scheduledEventID int64, requestID string, identity string, - now time.Time, + startTime time.Time, ) *historypb.HistoryEvent { - event := b.createNewHistoryEvent(enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED, now) + event := b.createNewHistoryEvent(enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED, startTime) event.Attributes = &historypb.HistoryEvent_WorkflowTaskStartedEventAttributes{ WorkflowTaskStartedEventAttributes: &historypb.WorkflowTaskStartedEventAttributes{ ScheduledEventId: scheduledEventID, diff --git a/service/history/workflow/mutable_state.go b/service/history/workflow/mutable_state.go index 9cfb97e6143..6b5a9419209 100644 --- a/service/history/workflow/mutable_state.go +++ b/service/history/workflow/mutable_state.go @@ -138,7 +138,7 @@ type ( CheckResettable() error CloneToProto() *persistencespb.WorkflowMutableState RetryActivity(ai *persistencespb.ActivityInfo, failure *failurepb.Failure) (enumspb.RetryState, error) - CreateTransientWorkflowTask(workflowTask *WorkflowTaskInfo, identity string) *historyspb.TransientWorkflowTaskInfo + GetTransientWorkflowTaskInfo(workflowTask *WorkflowTaskInfo, identity string) *historyspb.TransientWorkflowTaskInfo DeleteWorkflowTask() DeleteSignalRequested(requestID string) FlushBufferedEvents() @@ -193,6 +193,7 @@ type ( IsCurrentWorkflowGuaranteed() bool IsSignalRequested(requestID string) bool IsStickyTaskQueueEnabled() bool + TaskQueue() *taskqueuepb.TaskQueue IsWorkflowExecutionRunning() bool IsResourceDuplicated(resourceDedupKey definition.DeduplicationID) bool IsWorkflowPendingOnWorkflowTaskBackoff() bool diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index 2239b83fd98..689299df739 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -607,6 +607,19 @@ func (ms *MutableStateImpl) IsStickyTaskQueueEnabled() bool { return ms.executionInfo.StickyTaskQueue != "" } +func (e *MutableStateImpl) TaskQueue() *taskqueuepb.TaskQueue { + if e.IsStickyTaskQueueEnabled() { + return &taskqueuepb.TaskQueue{ + Name: e.executionInfo.StickyTaskQueue, + Kind: enumspb.TASK_QUEUE_KIND_STICKY, + } + } + return &taskqueuepb.TaskQueue{ + Name: e.executionInfo.TaskQueue, + Kind: enumspb.TASK_QUEUE_KIND_NORMAL, + } +} + func (ms *MutableStateImpl) GetWorkflowType() *commonpb.WorkflowType { wType := &commonpb.WorkflowType{} wType.Name = ms.executionInfo.WorkflowTypeName @@ -1304,7 +1317,7 @@ func (ms *MutableStateImpl) ClearTransientWorkflowTask() error { return serviceerror.NewInternal("cannot clear transient workflow task when there are buffered events") } // no buffered event - resetWorkflowTaskInfo := &WorkflowTaskInfo{ + emptyWorkflowTaskInfo := &WorkflowTaskInfo{ Version: common.EmptyVersion, ScheduledEventID: common.EmptyEventID, StartedEventID: common.EmptyEventID, @@ -1317,7 +1330,7 @@ func (ms *MutableStateImpl) ClearTransientWorkflowTask() error { TaskQueue: nil, OriginalScheduledTime: timestamp.UnixOrZeroTimePtr(0), } - ms.workflowTaskManager.UpdateWorkflowTask(resetWorkflowTaskInfo) + ms.workflowTaskManager.UpdateWorkflowTask(emptyWorkflowTaskInfo) return nil } @@ -1774,11 +1787,11 @@ func (ms *MutableStateImpl) ReplicateWorkflowTaskStartedEvent( return ms.workflowTaskManager.ReplicateWorkflowTaskStartedEvent(workflowTask, version, scheduledEventID, startedEventID, requestID, timestamp) } -func (ms *MutableStateImpl) CreateTransientWorkflowTask( +func (ms *MutableStateImpl) GetTransientWorkflowTaskInfo( workflowTask *WorkflowTaskInfo, identity string, ) *historyspb.TransientWorkflowTaskInfo { - return ms.workflowTaskManager.CreateTransientWorkflowTaskEvents(workflowTask, identity) + return ms.workflowTaskManager.GetTransientWorkflowTaskInfo(workflowTask, identity) } // add BinaryCheckSum for the first workflowTaskCompletedID for auto-reset diff --git a/service/history/workflow/mutable_state_mock.go b/service/history/workflow/mutable_state_mock.go index 31c108e29a8..f66071a19eb 100644 --- a/service/history/workflow/mutable_state_mock.go +++ b/service/history/workflow/mutable_state_mock.go @@ -886,20 +886,6 @@ func (mr *MockMutableStateMockRecorder) ContinueAsNewMinBackoff(backoffDuration return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ContinueAsNewMinBackoff", reflect.TypeOf((*MockMutableState)(nil).ContinueAsNewMinBackoff), backoffDuration) } -// CreateTransientWorkflowTask mocks base method. -func (m *MockMutableState) CreateTransientWorkflowTask(workflowTask *WorkflowTaskInfo, identity string) *v19.TransientWorkflowTaskInfo { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CreateTransientWorkflowTask", workflowTask, identity) - ret0, _ := ret[0].(*v19.TransientWorkflowTaskInfo) - return ret0 -} - -// CreateTransientWorkflowTask indicates an expected call of CreateTransientWorkflowTask. -func (mr *MockMutableStateMockRecorder) CreateTransientWorkflowTask(workflowTask, identity interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateTransientWorkflowTask", reflect.TypeOf((*MockMutableState)(nil).CreateTransientWorkflowTask), workflowTask, identity) -} - // DeleteSignalRequested mocks base method. func (m *MockMutableState) DeleteSignalRequested(requestID string) { m.ctrl.T.Helper() @@ -1434,6 +1420,20 @@ func (mr *MockMutableStateMockRecorder) GetStartVersion() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetStartVersion", reflect.TypeOf((*MockMutableState)(nil).GetStartVersion)) } +// GetTransientWorkflowTaskInfo mocks base method. +func (m *MockMutableState) GetTransientWorkflowTaskInfo(workflowTask *WorkflowTaskInfo, identity string) *v19.TransientWorkflowTaskInfo { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetTransientWorkflowTaskInfo", workflowTask, identity) + ret0, _ := ret[0].(*v19.TransientWorkflowTaskInfo) + return ret0 +} + +// GetTransientWorkflowTaskInfo indicates an expected call of GetTransientWorkflowTaskInfo. +func (mr *MockMutableStateMockRecorder) GetTransientWorkflowTaskInfo(workflowTask, identity interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTransientWorkflowTaskInfo", reflect.TypeOf((*MockMutableState)(nil).GetTransientWorkflowTaskInfo), workflowTask, identity) +} + // GetUpdateCondition mocks base method. func (m *MockMutableState) GetUpdateCondition() (int64, int64) { m.ctrl.T.Helper() @@ -2422,6 +2422,20 @@ func (mr *MockMutableStateMockRecorder) StartTransaction(entry interface{}) *gom return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartTransaction", reflect.TypeOf((*MockMutableState)(nil).StartTransaction), entry) } +// TaskQueue mocks base method. +func (m *MockMutableState) TaskQueue() *v14.TaskQueue { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TaskQueue") + ret0, _ := ret[0].(*v14.TaskQueue) + return ret0 +} + +// TaskQueue indicates an expected call of TaskQueue. +func (mr *MockMutableStateMockRecorder) TaskQueue() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TaskQueue", reflect.TypeOf((*MockMutableState)(nil).TaskQueue)) +} + // UpdateActivity mocks base method. func (m *MockMutableState) UpdateActivity(arg0 *v111.ActivityInfo) error { m.ctrl.T.Helper() diff --git a/service/history/workflow/workflow_task_state_machine.go b/service/history/workflow/workflow_task_state_machine.go index 692cc57474e..7e71e94a5ed 100644 --- a/service/history/workflow/workflow_task_state_machine.go +++ b/service/history/workflow/workflow_task_state_machine.go @@ -250,16 +250,6 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduledEventAsHeartbeat( return nil, m.ms.createInternalServerError(opTag) } - // Task queue and workflow task timeout should already be set from workflow execution started event - taskQueue := &taskqueuepb.TaskQueue{} - if m.ms.IsStickyTaskQueueEnabled() { - taskQueue.Name = m.ms.executionInfo.StickyTaskQueue - taskQueue.Kind = enumspb.TASK_QUEUE_KIND_STICKY - } else { - taskQueue.Name = m.ms.executionInfo.TaskQueue - taskQueue.Kind = enumspb.TASK_QUEUE_KIND_NORMAL - } - // Flush any buffered events before creating the workflow task, otherwise it will result in invalid IDs for transient // workflow task and will cause in timeout processing to not work for transient workflow tasks if m.ms.HasBufferedEvents() { @@ -280,21 +270,27 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduledEventAsHeartbeat( } } - var newWorkflowTaskEvent *historypb.HistoryEvent - scheduledEventID := m.ms.GetNextEventID() // we will generate the schedule event later for repeatedly failing workflow tasks - // Avoid creating new history events when workflow tasks are continuously failing - scheduledTime := m.ms.timeSource.Now().UTC() + scheduleTime := m.ms.timeSource.Now().UTC() attempt := m.ms.executionInfo.WorkflowTaskAttempt + // TaskQueue should already be set from workflow execution started event. + taskQueue := m.ms.TaskQueue() + // DefaultWorkflowTaskTimeout should already be set from workflow execution started event. startToCloseTimeout := m.getStartToCloseTimeout(m.ms.executionInfo.DefaultWorkflowTaskTimeout, attempt) + + var scheduledEvent *historypb.HistoryEvent + var scheduledEventID int64 + if attempt == 1 { - newWorkflowTaskEvent = m.ms.hBuilder.AddWorkflowTaskScheduledEvent( + scheduledEvent = m.ms.hBuilder.AddWorkflowTaskScheduledEvent( taskQueue, startToCloseTimeout, attempt, - m.ms.timeSource.Now(), + scheduleTime, ) - scheduledEventID = newWorkflowTaskEvent.GetEventId() - scheduledTime = timestamp.TimeValue(newWorkflowTaskEvent.GetEventTime()) + scheduledEventID = scheduledEvent.GetEventId() + } else { + // WorkflowTaskScheduledEvent will be created later. + scheduledEventID = m.ms.GetNextEventID() } workflowTask, err := m.ReplicateWorkflowTaskScheduledEvent( @@ -303,7 +299,7 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduledEventAsHeartbeat( taskQueue, startToCloseTimeout, attempt, - &scheduledTime, + &scheduleTime, originalScheduledTimestamp, ) if err != nil { @@ -446,12 +442,8 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskCompletedEvent( m.beforeAddWorkflowTaskCompletedEvent() if workflowTask.Attempt > 1 { // Create corresponding WorkflowTaskSchedule and WorkflowTaskStarted events for workflow tasks we have been retrying - taskQueue := &taskqueuepb.TaskQueue{ - Name: m.ms.executionInfo.TaskQueue, - Kind: enumspb.TASK_QUEUE_KIND_NORMAL, - } scheduledEvent := m.ms.hBuilder.AddWorkflowTaskScheduledEvent( - taskQueue, + m.ms.TaskQueue(), workflowTask.WorkflowTaskTimeout, workflowTask.Attempt, timestamp.TimeValue(workflowTask.ScheduledTime).UTC(), @@ -684,7 +676,7 @@ func (m *workflowTaskStateMachine) GetWorkflowTaskInfo( return nil, false } -func (m *workflowTaskStateMachine) CreateTransientWorkflowTaskEvents( +func (m *workflowTaskStateMachine) GetTransientWorkflowTaskInfo( workflowTask *WorkflowTaskInfo, identity string, ) *historyspb.TransientWorkflowTaskInfo { @@ -735,15 +727,6 @@ func (m *workflowTaskStateMachine) CreateTransientWorkflowTaskEvents( } func (m *workflowTaskStateMachine) getWorkflowTaskInfo() *WorkflowTaskInfo { - taskQueue := &taskqueuepb.TaskQueue{} - if m.ms.IsStickyTaskQueueEnabled() { - taskQueue.Name = m.ms.executionInfo.StickyTaskQueue - taskQueue.Kind = enumspb.TASK_QUEUE_KIND_STICKY - } else { - taskQueue.Name = m.ms.executionInfo.TaskQueue - taskQueue.Kind = enumspb.TASK_QUEUE_KIND_NORMAL - } - return &WorkflowTaskInfo{ Version: m.ms.executionInfo.WorkflowTaskVersion, ScheduledEventID: m.ms.executionInfo.WorkflowTaskScheduledEventId, @@ -753,7 +736,7 @@ func (m *workflowTaskStateMachine) getWorkflowTaskInfo() *WorkflowTaskInfo { Attempt: m.ms.executionInfo.WorkflowTaskAttempt, StartedTime: m.ms.executionInfo.WorkflowTaskStartedTime, ScheduledTime: m.ms.executionInfo.WorkflowTaskScheduledTime, - TaskQueue: taskQueue, + TaskQueue: m.ms.TaskQueue(), OriginalScheduledTime: m.ms.executionInfo.WorkflowTaskOriginalScheduledTime, } } diff --git a/service/history/workflowTaskHandler.go b/service/history/workflowTaskHandler.go index 5b104a86f14..602a2871313 100644 --- a/service/history/workflowTaskHandler.go +++ b/service/history/workflowTaskHandler.go @@ -117,6 +117,7 @@ func newWorkflowTaskHandler( config *configs.Config, shard shard.Context, searchAttributesMapper searchattribute.Mapper, + hasBufferedEvents bool, ) *workflowTaskHandlerImpl { return &workflowTaskHandlerImpl{ @@ -124,7 +125,7 @@ func newWorkflowTaskHandler( workflowTaskCompletedID: workflowTaskCompletedID, // internal state - hasBufferedEvents: mutableState.HasBufferedEvents(), + hasBufferedEvents: hasBufferedEvents, workflowTaskFailedCause: nil, activityNotStartedCancelled: false, newMutableState: nil, diff --git a/service/history/workflowTaskHandlerCallbacks.go b/service/history/workflowTaskHandlerCallbacks.go index d3d205a2db4..37859fa8e1b 100644 --- a/service/history/workflowTaskHandlerCallbacks.go +++ b/service/history/workflowTaskHandlerCallbacks.go @@ -481,6 +481,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( handler.config, handler.shard, handler.searchAttributesMapper, + hasUnhandledEvents, ) if responseMutations, err = workflowTaskHandler.handleCommands( @@ -726,7 +727,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) createRecordWorkflowTaskStarted response.ScheduledTime = workflowTask.ScheduledTime response.StartedTime = workflowTask.StartedTime - response.TransientWorkflowTask = ms.CreateTransientWorkflowTask(workflowTask, identity) + response.TransientWorkflowTask = ms.GetTransientWorkflowTaskInfo(workflowTask, identity) currentBranchToken, err := ms.GetCurrentBranchToken() if err != nil {