Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eager workflow dispatch #3835

Merged
merged 27 commits into from
Feb 2, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
ea6b281
Eager workflow dispatch
bergundy Jan 23, 2023
5f23b7c
Revert returning workflowTaskInfo from NewWorkflowWithSignal
bergundy Jan 25, 2023
bb221c7
Fix lint issues
bergundy Jan 25, 2023
c3a4cd9
Minor restructuring
bergundy Jan 25, 2023
e8b8fba
Support eager start with TERMINATE_IF_RUNNING
bergundy Jan 25, 2023
fa4dccf
Properly release workflow context
bergundy Jan 25, 2023
4e49875
Add documentation and restructure for better readability
bergundy Jan 25, 2023
38f628e
Merge remote-tracking branch 'origin/master' into eager-workflow-disp…
bergundy Jan 25, 2023
fc50446
Fix lint
bergundy Jan 25, 2023
15158f9
Run go-generate
bergundy Jan 25, 2023
cc64258
Fix nolint directive
bergundy Jan 25, 2023
06aebdc
More restructuring, get rid of cyclo complexity
bergundy Jan 26, 2023
f32e716
Fix task inflight condition
bergundy Jan 26, 2023
bb00ebe
Merge branch 'master' into eager-workflow-dispatch
bergundy Jan 26, 2023
6394cbb
Merge remote-tracking branch 'origin/master' into eager-workflow-disp…
bergundy Jan 26, 2023
8dba7ff
Merge branch 'master' into eager-workflow-dispatch
bergundy Jan 27, 2023
593a669
Address review comments
bergundy Jan 28, 2023
a01484b
Merge remote-tracking branch 'origin/master' into eager-workflow-disp…
bergundy Jan 28, 2023
7dae88e
Merge remote-tracking branch 'origin/master' into eager-workflow-disp…
bergundy Feb 1, 2023
9a3a16e
Address review comments
bergundy Feb 1, 2023
41296fc
Fix missing reason tag value
bergundy Feb 1, 2023
65f9726
Merge remote-tracking branch 'origin/master' into eager-workflow-disp…
bergundy Feb 1, 2023
efece49
Add missing nil check
bergundy Feb 1, 2023
f23a706
Merge remote-tracking branch 'origin/master' into eager-workflow-disp…
bergundy Feb 1, 2023
ab77d10
Merge remote-tracking branch 'origin/master' into eager-workflow-disp…
bergundy Feb 2, 2023
e0a69c2
Address review comments
bergundy Feb 2, 2023
efd06d7
Merge remote-tracking branch 'origin/master' into eager-workflow-disp…
bergundy Feb 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
803 changes: 436 additions & 367 deletions api/historyservice/v1/request_response.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1463,6 +1463,7 @@ var (
CommandTypeModifyWorkflowPropertiesCounter = NewCounterDef("modify_workflow_properties_command")
CommandTypeChildWorkflowCounter = NewCounterDef("child_workflow_command")
ActivityEagerExecutionCounter = NewCounterDef("activity_eager_execution")
WorkflowEagerExecutionCounter = NewCounterDef("workflow_eager_execution")
EmptyCompletionCommandsCounter = NewCounterDef("empty_completion_commands")
MultipleCompletionCommandsCounter = NewCounterDef("multiple_completion_commands")
FailedWorkflowTasksCounter = NewCounterDef("failed_workflow_tasks")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ message StartWorkflowExecutionRequest {
message StartWorkflowExecutionResponse {
string run_id = 1;
temporal.server.api.clock.v1.VectorClock clock = 2;
// Set if request_eager_execution is set on the start request
temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse eager_workflow_task = 3;
}

message GetMutableStateRequest {
Expand Down
3 changes: 2 additions & 1 deletion service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ func (wh *WorkflowHandler) StartWorkflowExecution(ctx context.Context, request *
if err != nil {
return nil, err
}
return &workflowservice.StartWorkflowExecutionResponse{RunId: resp.GetRunId()}, nil
return &workflowservice.StartWorkflowExecutionResponse{RunId: resp.GetRunId(), EagerWorkflowTask: resp.GetEagerWorkflowTask()}, nil
}

// GetWorkflowExecutionHistory returns the history of specified workflow execution. It fails with 'EntityNotExistError' if specified workflow
Expand Down Expand Up @@ -2864,6 +2864,7 @@ func (wh *WorkflowHandler) GetSystemInfo(ctx context.Context, request *workflows
SupportsSchedules: true,
EncodedFailureAttributes: true,
UpsertMemo: true,
EagerWorkflowStart: true,
},
}, nil
}
Expand Down
34 changes: 22 additions & 12 deletions service/history/api/create_workflow_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,16 +99,31 @@ func NewWorkflowWithSignal(
return nil, err
}
}

requestEagerExecution := startRequest.StartRequest.GetRequestEagerExecution()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the first usage of startRequest.StartRequest that I see in this function, so I'm worried about potential NPEs. How do we know this is always non-nil?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't, we count on this request to be validated before this method is called.

// Generate first workflow task event if not child WF and no first workflow task backoff
if err := GenerateFirstWorkflowTask(
scheduledEventID, err := GenerateFirstWorkflowTask(
yycptt marked this conversation as resolved.
Show resolved Hide resolved
newMutableState,
startRequest.ParentExecutionInfo,
startEvent,
); err != nil {
requestEagerExecution,
)
if err != nil {
return nil, err
}

if requestEagerExecution {
yycptt marked this conversation as resolved.
Show resolved Hide resolved
_, _, err = newMutableState.AddWorkflowTaskStartedEvent(
scheduledEventID,
startRequest.StartRequest.RequestId,
startRequest.StartRequest.TaskQueue,
startRequest.StartRequest.Identity,
)
if err != nil {
// Unable to add WorkflowTaskStarted event to history
return nil, err
}
}

newWorkflowContext := workflow.NewContext(
shard,
definition.NewWorkflowKey(
Expand Down Expand Up @@ -146,17 +161,13 @@ func GenerateFirstWorkflowTask(
mutableState workflow.MutableState,
parentInfo *workflowspb.ParentExecutionInfo,
startEvent *historypb.HistoryEvent,
) error {

bypassTaskGeneration bool,
) (int64, error) {
if parentInfo == nil {
// WorkflowTask is only created when it is not a Child Workflow and no backoff is needed
if err := mutableState.AddFirstWorkflowTaskScheduled(
startEvent,
); err != nil {
return err
}
return mutableState.AddFirstWorkflowTaskScheduled(startEvent, bypassTaskGeneration)
bergundy marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
return 0, nil
}

func NewWorkflowVersionCheck(
Expand Down Expand Up @@ -272,7 +283,6 @@ func ValidateStartWorkflowExecutionRequest(
if err := common.ValidateRetryPolicy(request.RetryPolicy); err != nil {
return err
}

if err := ValidateStart(
ctx,
shard,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func startAndSignalWorkflow(
) (string, error) {
workflowID := signalWithStartRequest.GetWorkflowId()
runID := uuid.New().String()
// TODO(bergundy): Support eager workflow task
newWorkflowContext, err := api.NewWorkflowWithSignal(
ctx,
shard,
Expand Down
Loading