Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eager workflow dispatch #3835

Merged
merged 27 commits into from
Feb 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
ea6b281
Eager workflow dispatch
bergundy Jan 23, 2023
5f23b7c
Revert returning workflowTaskInfo from NewWorkflowWithSignal
bergundy Jan 25, 2023
bb221c7
Fix lint issues
bergundy Jan 25, 2023
c3a4cd9
Minor restructuring
bergundy Jan 25, 2023
e8b8fba
Support eager start with TERMINATE_IF_RUNNING
bergundy Jan 25, 2023
fa4dccf
Properly release workflow context
bergundy Jan 25, 2023
4e49875
Add documentation and restructure for better readability
bergundy Jan 25, 2023
38f628e
Merge remote-tracking branch 'origin/master' into eager-workflow-disp…
bergundy Jan 25, 2023
fc50446
Fix lint
bergundy Jan 25, 2023
15158f9
Run go-generate
bergundy Jan 25, 2023
cc64258
Fix nolint directive
bergundy Jan 25, 2023
06aebdc
More restructuring, get rid of cyclo complexity
bergundy Jan 26, 2023
f32e716
Fix task inflight condition
bergundy Jan 26, 2023
bb00ebe
Merge branch 'master' into eager-workflow-dispatch
bergundy Jan 26, 2023
6394cbb
Merge remote-tracking branch 'origin/master' into eager-workflow-disp…
bergundy Jan 26, 2023
8dba7ff
Merge branch 'master' into eager-workflow-dispatch
bergundy Jan 27, 2023
593a669
Address review comments
bergundy Jan 28, 2023
a01484b
Merge remote-tracking branch 'origin/master' into eager-workflow-disp…
bergundy Jan 28, 2023
7dae88e
Merge remote-tracking branch 'origin/master' into eager-workflow-disp…
bergundy Feb 1, 2023
9a3a16e
Address review comments
bergundy Feb 1, 2023
41296fc
Fix missing reason tag value
bergundy Feb 1, 2023
65f9726
Merge remote-tracking branch 'origin/master' into eager-workflow-disp…
bergundy Feb 1, 2023
efece49
Add missing nil check
bergundy Feb 1, 2023
f23a706
Merge remote-tracking branch 'origin/master' into eager-workflow-disp…
bergundy Feb 1, 2023
ab77d10
Merge remote-tracking branch 'origin/master' into eager-workflow-disp…
bergundy Feb 2, 2023
e0a69c2
Address review comments
bergundy Feb 2, 2023
efd06d7
Merge remote-tracking branch 'origin/master' into eager-workflow-disp…
bergundy Feb 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
805 changes: 437 additions & 368 deletions api/historyservice/v1/request_response.pb.go

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,11 @@ const (
EnableParentClosePolicyWorker = "system.enableParentClosePolicyWorker"
// EnableStickyQuery indicates if sticky query should be enabled per namespace
EnableStickyQuery = "system.enableStickyQuery"
// EnableActivityEagerExecution indicates if acitivty eager execution is enabled per namespace
// EnableActivityEagerExecution indicates if activity eager execution is enabled per namespace
EnableActivityEagerExecution = "system.enableActivityEagerExecution"
// EnableEagerWorkflowStart toggles "eager workflow start" - returning the first workflow task inline in the
// response to a StartWorkflowExecution request and skipping the trip through matching.
EnableEagerWorkflowStart = "system.enableEagerWorkflowStart"
// NamespaceCacheRefreshInterval is the key for namespace cache refresh interval dynamic config
NamespaceCacheRefreshInterval = "system.namespaceCacheRefreshInterval"

Expand Down
9 changes: 8 additions & 1 deletion common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1464,7 +1464,14 @@ var (
MessageTypeCompleteWorkflowExecutionUpdateCounter = NewCounterDef("complete_workflow_update_message")
MessageTypeRejectWorkflowExecutionUpdateCounter = NewCounterDef("reject_workflow_update_message")

ActivityEagerExecutionCounter = NewCounterDef("activity_eager_execution")
ActivityEagerExecutionCounter = NewCounterDef("activity_eager_execution")
// WorkflowEagerExecutionCounter is emitted any time eager workflow start is requested.
WorkflowEagerExecutionCounter = NewCounterDef("workflow_eager_execution")
// WorkflowEagerExecutionDeniedCounter is emitted any time eager workflow start is requested and the serer fell back
// to standard dispatch.
// Timeouts and failures are not counted in this metric.
// This metric has a "reason" tag attached to it to understand why eager start was denied.
WorkflowEagerExecutionDeniedCounter = NewCounterDef("workflow_eager_execution_denied")
EmptyCompletionCommandsCounter = NewCounterDef("empty_completion_commands")
MultipleCompletionCommandsCounter = NewCounterDef("multiple_completion_commands")
FailedWorkflowTasksCounter = NewCounterDef("failed_workflow_tasks")
Expand Down
12 changes: 12 additions & 0 deletions common/metrics/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ const (
commandType = "commandType"
serviceName = "service_name"
actionType = "action_type"
// Generic reason tag can be used anywhere a reason is needed.
reason = "reason"

namespaceAllValue = "all"
unknownValue = "_unknown_"
Expand Down Expand Up @@ -278,3 +280,13 @@ func StringTag(key string, value string) Tag {
func CacheTypeTag(value string) Tag {
return &tagImpl{key: CacheTypeTagName, value: value}
}

// ReasonString is just a string but the special type is defined here to remind callers of ReasonTag to limit the
// cardinality of possible reasons.
type ReasonString string

// ReasonTag is a generic tag can be used anywhere a reason is needed.
// Make sure that the value is of limited cardinality.
func ReasonTag(value ReasonString) Tag {
return &tagImpl{key: reason, value: string(value)}
}
2 changes: 2 additions & 0 deletions config/dynamicconfig/development-cass.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,5 @@
# constraints: {}
#system.enableParentClosePolicyWorker:
# - value: true
system.enableEagerWorkflowStart:
- value: true
2 changes: 2 additions & 0 deletions config/dynamicconfig/development-sql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
# constraints: {}
#system.enableParentClosePolicyWorker:
# - value: true
system.enableEagerWorkflowStart:
- value: true
yycptt marked this conversation as resolved.
Show resolved Hide resolved
limit.maxIDLength:
- value: 255
constraints: {}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ message StartWorkflowExecutionRequest {
message StartWorkflowExecutionResponse {
string run_id = 1;
temporal.server.api.clock.v1.VectorClock clock = 2;
// Set if request_eager_execution is set on the start request
temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse eager_workflow_task = 3;
}

message GetMutableStateRequest {
Expand Down
3 changes: 2 additions & 1 deletion service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ func (wh *WorkflowHandler) StartWorkflowExecution(ctx context.Context, request *
if err != nil {
return nil, err
}
return &workflowservice.StartWorkflowExecutionResponse{RunId: resp.GetRunId()}, nil
return &workflowservice.StartWorkflowExecutionResponse{RunId: resp.GetRunId(), EagerWorkflowTask: resp.GetEagerWorkflowTask()}, nil
}

// GetWorkflowExecutionHistory returns the history of specified workflow execution. It fails with 'EntityNotExistError' if specified workflow
Expand Down Expand Up @@ -2866,6 +2866,7 @@ func (wh *WorkflowHandler) GetSystemInfo(ctx context.Context, request *workflows
SupportsSchedules: true,
EncodedFailureAttributes: true,
UpsertMemo: true,
EagerWorkflowStart: true,
},
}, nil
}
Expand Down
35 changes: 23 additions & 12 deletions service/history/api/create_workflow_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,16 +99,32 @@ func NewWorkflowWithSignal(
return nil, err
}
}

requestEagerExecution := startRequest.StartRequest.GetRequestEagerExecution()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the first usage of startRequest.StartRequest that I see in this function, so I'm worried about potential NPEs. How do we know this is always non-nil?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't, we count on this request to be validated before this method is called.

// Generate first workflow task event if not child WF and no first workflow task backoff
if err := GenerateFirstWorkflowTask(
scheduledEventID, err := GenerateFirstWorkflowTask(
yycptt marked this conversation as resolved.
Show resolved Hide resolved
newMutableState,
startRequest.ParentExecutionInfo,
startEvent,
); err != nil {
requestEagerExecution,
)
if err != nil {
return nil, err
}

// If first workflow task should back off (e.g. cron or workflow retry) a workflow task will not be scheduled.
if requestEagerExecution && newMutableState.HasPendingWorkflowTask() {
_, _, err = newMutableState.AddWorkflowTaskStartedEvent(
scheduledEventID,
startRequest.StartRequest.RequestId,
startRequest.StartRequest.TaskQueue,
startRequest.StartRequest.Identity,
)
if err != nil {
// Unable to add WorkflowTaskStarted event to history
return nil, err
}
}

newWorkflowContext := workflow.NewContext(
shard,
definition.NewWorkflowKey(
Expand Down Expand Up @@ -146,17 +162,13 @@ func GenerateFirstWorkflowTask(
mutableState workflow.MutableState,
parentInfo *workflowspb.ParentExecutionInfo,
startEvent *historypb.HistoryEvent,
) error {

bypassTaskGeneration bool,
) (int64, error) {
if parentInfo == nil {
// WorkflowTask is only created when it is not a Child Workflow and no backoff is needed
if err := mutableState.AddFirstWorkflowTaskScheduled(
startEvent,
); err != nil {
return err
}
return mutableState.AddFirstWorkflowTaskScheduled(startEvent, bypassTaskGeneration)
bergundy marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
return 0, nil
}

func NewWorkflowVersionCheck(
Expand Down Expand Up @@ -272,7 +284,6 @@ func ValidateStartWorkflowExecutionRequest(
if err := common.ValidateRetryPolicy(request.RetryPolicy); err != nil {
return err
}

if err := ValidateStart(
ctx,
shard,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func startAndSignalWorkflow(
) (string, error) {
workflowID := signalWithStartRequest.GetWorkflowId()
runID := uuid.New().String()
// TODO(bergundy): Support eager workflow task
newWorkflowContext, err := api.NewWorkflowWithSignal(
ctx,
shard,
Expand Down
Loading