Skip to content

Commit

Permalink
Eager workflow dispatch (#3835)
Browse files Browse the repository at this point in the history
* Eager workflow dispatch

* Revert returning workflowTaskInfo from NewWorkflowWithSignal

* Fix lint issues

* Minor restructuring

* Support eager start with TERMINATE_IF_RUNNING

* Properly release workflow context

* Add documentation and restructure for better readability

* Fix lint

* Run go-generate

* Fix nolint directive

* More restructuring, get rid of cyclo complexity

* Fix task inflight condition

* Address review comments

* Address review comments

* Fix missing reason tag value

* Add missing nil check

* Address review comments
  • Loading branch information
bergundy authored Feb 2, 2023
1 parent dbeab31 commit 6ef7749
Show file tree
Hide file tree
Showing 23 changed files with 1,075 additions and 523 deletions.
805 changes: 437 additions & 368 deletions api/historyservice/v1/request_response.pb.go

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,11 @@ const (
EnableParentClosePolicyWorker = "system.enableParentClosePolicyWorker"
// EnableStickyQuery indicates if sticky query should be enabled per namespace
EnableStickyQuery = "system.enableStickyQuery"
// EnableActivityEagerExecution indicates if acitivty eager execution is enabled per namespace
// EnableActivityEagerExecution indicates if activity eager execution is enabled per namespace
EnableActivityEagerExecution = "system.enableActivityEagerExecution"
// EnableEagerWorkflowStart toggles "eager workflow start" - returning the first workflow task inline in the
// response to a StartWorkflowExecution request and skipping the trip through matching.
EnableEagerWorkflowStart = "system.enableEagerWorkflowStart"
// NamespaceCacheRefreshInterval is the key for namespace cache refresh interval dynamic config
NamespaceCacheRefreshInterval = "system.namespaceCacheRefreshInterval"

Expand Down
9 changes: 8 additions & 1 deletion common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1464,7 +1464,14 @@ var (
MessageTypeCompleteWorkflowExecutionUpdateCounter = NewCounterDef("complete_workflow_update_message")
MessageTypeRejectWorkflowExecutionUpdateCounter = NewCounterDef("reject_workflow_update_message")

ActivityEagerExecutionCounter = NewCounterDef("activity_eager_execution")
ActivityEagerExecutionCounter = NewCounterDef("activity_eager_execution")
// WorkflowEagerExecutionCounter is emitted any time eager workflow start is requested.
WorkflowEagerExecutionCounter = NewCounterDef("workflow_eager_execution")
// WorkflowEagerExecutionDeniedCounter is emitted any time eager workflow start is requested and the serer fell back
// to standard dispatch.
// Timeouts and failures are not counted in this metric.
// This metric has a "reason" tag attached to it to understand why eager start was denied.
WorkflowEagerExecutionDeniedCounter = NewCounterDef("workflow_eager_execution_denied")
EmptyCompletionCommandsCounter = NewCounterDef("empty_completion_commands")
MultipleCompletionCommandsCounter = NewCounterDef("multiple_completion_commands")
FailedWorkflowTasksCounter = NewCounterDef("failed_workflow_tasks")
Expand Down
12 changes: 12 additions & 0 deletions common/metrics/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ const (
commandType = "commandType"
serviceName = "service_name"
actionType = "action_type"
// Generic reason tag can be used anywhere a reason is needed.
reason = "reason"

namespaceAllValue = "all"
unknownValue = "_unknown_"
Expand Down Expand Up @@ -278,3 +280,13 @@ func StringTag(key string, value string) Tag {
func CacheTypeTag(value string) Tag {
return &tagImpl{key: CacheTypeTagName, value: value}
}

// ReasonString is just a string but the special type is defined here to remind callers of ReasonTag to limit the
// cardinality of possible reasons.
type ReasonString string

// ReasonTag is a generic tag can be used anywhere a reason is needed.
// Make sure that the value is of limited cardinality.
func ReasonTag(value ReasonString) Tag {
return &tagImpl{key: reason, value: string(value)}
}
2 changes: 2 additions & 0 deletions config/dynamicconfig/development-cass.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,5 @@
# constraints: {}
#system.enableParentClosePolicyWorker:
# - value: true
system.enableEagerWorkflowStart:
- value: true
2 changes: 2 additions & 0 deletions config/dynamicconfig/development-sql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
# constraints: {}
#system.enableParentClosePolicyWorker:
# - value: true
system.enableEagerWorkflowStart:
- value: true
limit.maxIDLength:
- value: 255
constraints: {}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ message StartWorkflowExecutionRequest {
message StartWorkflowExecutionResponse {
string run_id = 1;
temporal.server.api.clock.v1.VectorClock clock = 2;
// Set if request_eager_execution is set on the start request
temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse eager_workflow_task = 3;
}

message GetMutableStateRequest {
Expand Down
3 changes: 2 additions & 1 deletion service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ func (wh *WorkflowHandler) StartWorkflowExecution(ctx context.Context, request *
if err != nil {
return nil, err
}
return &workflowservice.StartWorkflowExecutionResponse{RunId: resp.GetRunId()}, nil
return &workflowservice.StartWorkflowExecutionResponse{RunId: resp.GetRunId(), EagerWorkflowTask: resp.GetEagerWorkflowTask()}, nil
}

// GetWorkflowExecutionHistory returns the history of specified workflow execution. It fails with 'EntityNotExistError' if specified workflow
Expand Down Expand Up @@ -2866,6 +2866,7 @@ func (wh *WorkflowHandler) GetSystemInfo(ctx context.Context, request *workflows
SupportsSchedules: true,
EncodedFailureAttributes: true,
UpsertMemo: true,
EagerWorkflowStart: true,
},
}, nil
}
Expand Down
35 changes: 23 additions & 12 deletions service/history/api/create_workflow_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,16 +99,32 @@ func NewWorkflowWithSignal(
return nil, err
}
}

requestEagerExecution := startRequest.StartRequest.GetRequestEagerExecution()
// Generate first workflow task event if not child WF and no first workflow task backoff
if err := GenerateFirstWorkflowTask(
scheduledEventID, err := GenerateFirstWorkflowTask(
newMutableState,
startRequest.ParentExecutionInfo,
startEvent,
); err != nil {
requestEagerExecution,
)
if err != nil {
return nil, err
}

// If first workflow task should back off (e.g. cron or workflow retry) a workflow task will not be scheduled.
if requestEagerExecution && newMutableState.HasPendingWorkflowTask() {
_, _, err = newMutableState.AddWorkflowTaskStartedEvent(
scheduledEventID,
startRequest.StartRequest.RequestId,
startRequest.StartRequest.TaskQueue,
startRequest.StartRequest.Identity,
)
if err != nil {
// Unable to add WorkflowTaskStarted event to history
return nil, err
}
}

newWorkflowContext := workflow.NewContext(
shard,
definition.NewWorkflowKey(
Expand Down Expand Up @@ -146,17 +162,13 @@ func GenerateFirstWorkflowTask(
mutableState workflow.MutableState,
parentInfo *workflowspb.ParentExecutionInfo,
startEvent *historypb.HistoryEvent,
) error {

bypassTaskGeneration bool,
) (int64, error) {
if parentInfo == nil {
// WorkflowTask is only created when it is not a Child Workflow and no backoff is needed
if err := mutableState.AddFirstWorkflowTaskScheduled(
startEvent,
); err != nil {
return err
}
return mutableState.AddFirstWorkflowTaskScheduled(startEvent, bypassTaskGeneration)
}
return nil
return 0, nil
}

func NewWorkflowVersionCheck(
Expand Down Expand Up @@ -272,7 +284,6 @@ func ValidateStartWorkflowExecutionRequest(
if err := common.ValidateRetryPolicy(request.RetryPolicy); err != nil {
return err
}

if err := ValidateStart(
ctx,
shard,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func startAndSignalWorkflow(
) (string, error) {
workflowID := signalWithStartRequest.GetWorkflowId()
runID := uuid.New().String()
// TODO(bergundy): Support eager workflow task
newWorkflowContext, err := api.NewWorkflowWithSignal(
ctx,
shard,
Expand Down
Loading

0 comments on commit 6ef7749

Please sign in to comment.