Skip to content

Commit

Permalink
Workflow task state machine: minor renames (#3735)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin authored Dec 20, 2022
1 parent 12e4162 commit c6a89c0
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 61 deletions.
2 changes: 1 addition & 1 deletion service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ type Config struct {
// Workflow task settings
// DefaultWorkflowTaskTimeout the default workflow task timeout
DefaultWorkflowTaskTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter
// WorkflowTaskHeartbeatTimeout is to timeout behavior of: RespondWorkflowTaskComplete with ForceCreateNewWorkflowTask == true without any workflow tasks
// WorkflowTaskHeartbeatTimeout is to timeout behavior of: RespondWorkflowTaskComplete with ForceCreateNewWorkflowTask == true without any commands
// So that workflow task will be scheduled to another worker(by clear stickyness)
WorkflowTaskHeartbeatTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter
WorkflowTaskCriticalAttempts dynamicconfig.IntPropertyFn
Expand Down
8 changes: 4 additions & 4 deletions service/history/workflow/history_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,9 @@ func (b *HistoryBuilder) AddWorkflowTaskScheduledEvent(
taskQueue *taskqueuepb.TaskQueue,
startToCloseTimeout *time.Duration,
attempt int32,
now time.Time,
scheduleTime time.Time,
) *historypb.HistoryEvent {
event := b.createNewHistoryEvent(enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED, now)
event := b.createNewHistoryEvent(enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED, scheduleTime)
event.Attributes = &historypb.HistoryEvent_WorkflowTaskScheduledEventAttributes{
WorkflowTaskScheduledEventAttributes: &historypb.WorkflowTaskScheduledEventAttributes{
TaskQueue: taskQueue,
Expand All @@ -216,9 +216,9 @@ func (b *HistoryBuilder) AddWorkflowTaskStartedEvent(
scheduledEventID int64,
requestID string,
identity string,
now time.Time,
startTime time.Time,
) *historypb.HistoryEvent {
event := b.createNewHistoryEvent(enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED, now)
event := b.createNewHistoryEvent(enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED, startTime)
event.Attributes = &historypb.HistoryEvent_WorkflowTaskStartedEventAttributes{
WorkflowTaskStartedEventAttributes: &historypb.WorkflowTaskStartedEventAttributes{
ScheduledEventId: scheduledEventID,
Expand Down
3 changes: 2 additions & 1 deletion service/history/workflow/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ type (
CheckResettable() error
CloneToProto() *persistencespb.WorkflowMutableState
RetryActivity(ai *persistencespb.ActivityInfo, failure *failurepb.Failure) (enumspb.RetryState, error)
CreateTransientWorkflowTask(workflowTask *WorkflowTaskInfo, identity string) *historyspb.TransientWorkflowTaskInfo
GetTransientWorkflowTaskInfo(workflowTask *WorkflowTaskInfo, identity string) *historyspb.TransientWorkflowTaskInfo
DeleteWorkflowTask()
DeleteSignalRequested(requestID string)
FlushBufferedEvents()
Expand Down Expand Up @@ -193,6 +193,7 @@ type (
IsCurrentWorkflowGuaranteed() bool
IsSignalRequested(requestID string) bool
IsStickyTaskQueueEnabled() bool
TaskQueue() *taskqueuepb.TaskQueue
IsWorkflowExecutionRunning() bool
IsResourceDuplicated(resourceDedupKey definition.DeduplicationID) bool
IsWorkflowPendingOnWorkflowTaskBackoff() bool
Expand Down
21 changes: 17 additions & 4 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,19 @@ func (ms *MutableStateImpl) IsStickyTaskQueueEnabled() bool {
return ms.executionInfo.StickyTaskQueue != ""
}

func (e *MutableStateImpl) TaskQueue() *taskqueuepb.TaskQueue {
if e.IsStickyTaskQueueEnabled() {
return &taskqueuepb.TaskQueue{
Name: e.executionInfo.StickyTaskQueue,
Kind: enumspb.TASK_QUEUE_KIND_STICKY,
}
}
return &taskqueuepb.TaskQueue{
Name: e.executionInfo.TaskQueue,
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
}
}

func (ms *MutableStateImpl) GetWorkflowType() *commonpb.WorkflowType {
wType := &commonpb.WorkflowType{}
wType.Name = ms.executionInfo.WorkflowTypeName
Expand Down Expand Up @@ -1304,7 +1317,7 @@ func (ms *MutableStateImpl) ClearTransientWorkflowTask() error {
return serviceerror.NewInternal("cannot clear transient workflow task when there are buffered events")
}
// no buffered event
resetWorkflowTaskInfo := &WorkflowTaskInfo{
emptyWorkflowTaskInfo := &WorkflowTaskInfo{
Version: common.EmptyVersion,
ScheduledEventID: common.EmptyEventID,
StartedEventID: common.EmptyEventID,
Expand All @@ -1317,7 +1330,7 @@ func (ms *MutableStateImpl) ClearTransientWorkflowTask() error {
TaskQueue: nil,
OriginalScheduledTime: timestamp.UnixOrZeroTimePtr(0),
}
ms.workflowTaskManager.UpdateWorkflowTask(resetWorkflowTaskInfo)
ms.workflowTaskManager.UpdateWorkflowTask(emptyWorkflowTaskInfo)
return nil
}

Expand Down Expand Up @@ -1774,11 +1787,11 @@ func (ms *MutableStateImpl) ReplicateWorkflowTaskStartedEvent(
return ms.workflowTaskManager.ReplicateWorkflowTaskStartedEvent(workflowTask, version, scheduledEventID, startedEventID, requestID, timestamp)
}

func (ms *MutableStateImpl) CreateTransientWorkflowTask(
func (ms *MutableStateImpl) GetTransientWorkflowTaskInfo(
workflowTask *WorkflowTaskInfo,
identity string,
) *historyspb.TransientWorkflowTaskInfo {
return ms.workflowTaskManager.CreateTransientWorkflowTaskEvents(workflowTask, identity)
return ms.workflowTaskManager.GetTransientWorkflowTaskInfo(workflowTask, identity)
}

// add BinaryCheckSum for the first workflowTaskCompletedID for auto-reset
Expand Down
42 changes: 28 additions & 14 deletions service/history/workflow/mutable_state_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 18 additions & 35 deletions service/history/workflow/workflow_task_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,16 +250,6 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduledEventAsHeartbeat(
return nil, m.ms.createInternalServerError(opTag)
}

// Task queue and workflow task timeout should already be set from workflow execution started event
taskQueue := &taskqueuepb.TaskQueue{}
if m.ms.IsStickyTaskQueueEnabled() {
taskQueue.Name = m.ms.executionInfo.StickyTaskQueue
taskQueue.Kind = enumspb.TASK_QUEUE_KIND_STICKY
} else {
taskQueue.Name = m.ms.executionInfo.TaskQueue
taskQueue.Kind = enumspb.TASK_QUEUE_KIND_NORMAL
}

// Flush any buffered events before creating the workflow task, otherwise it will result in invalid IDs for transient
// workflow task and will cause in timeout processing to not work for transient workflow tasks
if m.ms.HasBufferedEvents() {
Expand All @@ -280,21 +270,27 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduledEventAsHeartbeat(
}
}

var newWorkflowTaskEvent *historypb.HistoryEvent
scheduledEventID := m.ms.GetNextEventID() // we will generate the schedule event later for repeatedly failing workflow tasks
// Avoid creating new history events when workflow tasks are continuously failing
scheduledTime := m.ms.timeSource.Now().UTC()
scheduleTime := m.ms.timeSource.Now().UTC()
attempt := m.ms.executionInfo.WorkflowTaskAttempt
// TaskQueue should already be set from workflow execution started event.
taskQueue := m.ms.TaskQueue()
// DefaultWorkflowTaskTimeout should already be set from workflow execution started event.
startToCloseTimeout := m.getStartToCloseTimeout(m.ms.executionInfo.DefaultWorkflowTaskTimeout, attempt)

var scheduledEvent *historypb.HistoryEvent
var scheduledEventID int64

if attempt == 1 {
newWorkflowTaskEvent = m.ms.hBuilder.AddWorkflowTaskScheduledEvent(
scheduledEvent = m.ms.hBuilder.AddWorkflowTaskScheduledEvent(
taskQueue,
startToCloseTimeout,
attempt,
m.ms.timeSource.Now(),
scheduleTime,
)
scheduledEventID = newWorkflowTaskEvent.GetEventId()
scheduledTime = timestamp.TimeValue(newWorkflowTaskEvent.GetEventTime())
scheduledEventID = scheduledEvent.GetEventId()
} else {
// WorkflowTaskScheduledEvent will be created later.
scheduledEventID = m.ms.GetNextEventID()
}

workflowTask, err := m.ReplicateWorkflowTaskScheduledEvent(
Expand All @@ -303,7 +299,7 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduledEventAsHeartbeat(
taskQueue,
startToCloseTimeout,
attempt,
&scheduledTime,
&scheduleTime,
originalScheduledTimestamp,
)
if err != nil {
Expand Down Expand Up @@ -446,12 +442,8 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskCompletedEvent(
m.beforeAddWorkflowTaskCompletedEvent()
if workflowTask.Attempt > 1 {
// Create corresponding WorkflowTaskSchedule and WorkflowTaskStarted events for workflow tasks we have been retrying
taskQueue := &taskqueuepb.TaskQueue{
Name: m.ms.executionInfo.TaskQueue,
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
}
scheduledEvent := m.ms.hBuilder.AddWorkflowTaskScheduledEvent(
taskQueue,
m.ms.TaskQueue(),
workflowTask.WorkflowTaskTimeout,
workflowTask.Attempt,
timestamp.TimeValue(workflowTask.ScheduledTime).UTC(),
Expand Down Expand Up @@ -684,7 +676,7 @@ func (m *workflowTaskStateMachine) GetWorkflowTaskInfo(
return nil, false
}

func (m *workflowTaskStateMachine) CreateTransientWorkflowTaskEvents(
func (m *workflowTaskStateMachine) GetTransientWorkflowTaskInfo(
workflowTask *WorkflowTaskInfo,
identity string,
) *historyspb.TransientWorkflowTaskInfo {
Expand Down Expand Up @@ -735,15 +727,6 @@ func (m *workflowTaskStateMachine) CreateTransientWorkflowTaskEvents(
}

func (m *workflowTaskStateMachine) getWorkflowTaskInfo() *WorkflowTaskInfo {
taskQueue := &taskqueuepb.TaskQueue{}
if m.ms.IsStickyTaskQueueEnabled() {
taskQueue.Name = m.ms.executionInfo.StickyTaskQueue
taskQueue.Kind = enumspb.TASK_QUEUE_KIND_STICKY
} else {
taskQueue.Name = m.ms.executionInfo.TaskQueue
taskQueue.Kind = enumspb.TASK_QUEUE_KIND_NORMAL
}

return &WorkflowTaskInfo{
Version: m.ms.executionInfo.WorkflowTaskVersion,
ScheduledEventID: m.ms.executionInfo.WorkflowTaskScheduledEventId,
Expand All @@ -753,7 +736,7 @@ func (m *workflowTaskStateMachine) getWorkflowTaskInfo() *WorkflowTaskInfo {
Attempt: m.ms.executionInfo.WorkflowTaskAttempt,
StartedTime: m.ms.executionInfo.WorkflowTaskStartedTime,
ScheduledTime: m.ms.executionInfo.WorkflowTaskScheduledTime,
TaskQueue: taskQueue,
TaskQueue: m.ms.TaskQueue(),
OriginalScheduledTime: m.ms.executionInfo.WorkflowTaskOriginalScheduledTime,
}
}
Expand Down
3 changes: 2 additions & 1 deletion service/history/workflowTaskHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,15 @@ func newWorkflowTaskHandler(
config *configs.Config,
shard shard.Context,
searchAttributesMapper searchattribute.Mapper,
hasBufferedEvents bool,
) *workflowTaskHandlerImpl {

return &workflowTaskHandlerImpl{
identity: identity,
workflowTaskCompletedID: workflowTaskCompletedID,

// internal state
hasBufferedEvents: mutableState.HasBufferedEvents(),
hasBufferedEvents: hasBufferedEvents,
workflowTaskFailedCause: nil,
activityNotStartedCancelled: false,
newMutableState: nil,
Expand Down
3 changes: 2 additions & 1 deletion service/history/workflowTaskHandlerCallbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted(
handler.config,
handler.shard,
handler.searchAttributesMapper,
hasUnhandledEvents,
)

if responseMutations, err = workflowTaskHandler.handleCommands(
Expand Down Expand Up @@ -726,7 +727,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) createRecordWorkflowTaskStarted
response.ScheduledTime = workflowTask.ScheduledTime
response.StartedTime = workflowTask.StartedTime

response.TransientWorkflowTask = ms.CreateTransientWorkflowTask(workflowTask, identity)
response.TransientWorkflowTask = ms.GetTransientWorkflowTaskInfo(workflowTask, identity)

currentBranchToken, err := ms.GetCurrentBranchToken()
if err != nil {
Expand Down

0 comments on commit c6a89c0

Please sign in to comment.