Skip to content

Commit

Permalink
Refactor failWorkflowTask method
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed Feb 8, 2023
1 parent 793dd7e commit 07fd32e
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 49 deletions.
103 changes: 55 additions & 48 deletions service/history/workflowTaskHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func (handler *workflowTaskHandlerImpl) handleCommandScheduleActivity(
return nil, handler.failWorkflow(enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_ACTIVITY_ATTRIBUTES, err)
}
if err := handler.sizeLimitChecker.checkIfNumPendingActivitiesExceedsLimit(); err != nil {
return nil, handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_ACTIVITIES_LIMIT_EXCEEDED, err)
return nil, handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_ACTIVITIES_LIMIT_EXCEEDED, err)
}

enums.SetDefaultTaskQueueKind(&attr.GetTaskQueue().Kind)
Expand All @@ -349,10 +349,7 @@ func (handler *workflowTaskHandlerImpl) handleCommandScheduleActivity(
eagerStartActivity,
)
if err != nil {
if _, ok := err.(*serviceerror.InvalidArgument); ok {
return nil, handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_SCHEDULE_ACTIVITY_DUPLICATE_ID, err)
}
return nil, err
return nil, handler.failWorkflowTaskOnInvalidArgument(enumspb.WORKFLOW_TASK_FAILED_CAUSE_SCHEDULE_ACTIVITY_DUPLICATE_ID, err)
}

if !eagerStartActivity {
Expand Down Expand Up @@ -473,10 +470,7 @@ func (handler *workflowTaskHandlerImpl) handleCommandRequestCancelActivity(
handler.identity,
)
if err != nil {
if _, ok := err.(*serviceerror.InvalidArgument); ok {
return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_REQUEST_CANCEL_ACTIVITY_ATTRIBUTES, err)
}
return err
return handler.failWorkflowTaskOnInvalidArgument(enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_REQUEST_CANCEL_ACTIVITY_ATTRIBUTES, err)
}
if ai != nil {
// If ai is nil, the activity has already been canceled/completed/timedout. The cancel request
Expand Down Expand Up @@ -518,10 +512,7 @@ func (handler *workflowTaskHandlerImpl) handleCommandStartTimer(

_, _, err := handler.mutableState.AddTimerStartedEvent(handler.workflowTaskCompletedID, attr)
if err != nil {
if _, ok := err.(*serviceerror.InvalidArgument); ok {
return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_START_TIMER_DUPLICATE_ID, err)
}
return err
return handler.failWorkflowTaskOnInvalidArgument(enumspb.WORKFLOW_TASK_FAILED_CAUSE_START_TIMER_DUPLICATE_ID, err)
}
return nil
}
Expand All @@ -534,11 +525,11 @@ func (handler *workflowTaskHandlerImpl) handleCommandCompleteWorkflow(
handler.metricsHandler.Counter(metrics.CommandTypeCompleteWorkflowCounter.GetMetricName()).Record(1)

if handler.hasBufferedEvents {
return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_COMMAND, nil)
return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_COMMAND, nil)
}

if handler.hasPendingUpdates {
return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_UPDATE, nil)
return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_UPDATE, nil)
}

if err := handler.validateCommandAttr(
Expand Down Expand Up @@ -596,11 +587,11 @@ func (handler *workflowTaskHandlerImpl) handleCommandFailWorkflow(
handler.metricsHandler.Counter(metrics.CommandTypeFailWorkflowCounter.GetMetricName()).Record(1)

if handler.hasBufferedEvents {
return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_COMMAND, nil)
return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_COMMAND, nil)
}

if handler.hasPendingUpdates {
return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_UPDATE, nil)
return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_UPDATE, nil)
}

if err := handler.validateCommandAttr(
Expand Down Expand Up @@ -685,10 +676,7 @@ func (handler *workflowTaskHandlerImpl) handleCommandCancelTimer(
attr,
handler.identity)
if err != nil {
if _, ok := err.(*serviceerror.InvalidArgument); ok {
return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_CANCEL_TIMER_ATTRIBUTES, err)
}
return err
return handler.failWorkflowTaskOnInvalidArgument(enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_CANCEL_TIMER_ATTRIBUTES, err)
}

// timer deletion is a success, we may have deleted a fired timer in
Expand All @@ -707,11 +695,11 @@ func (handler *workflowTaskHandlerImpl) handleCommandCancelWorkflow(
handler.metricsHandler.Counter(metrics.CommandTypeCancelWorkflowCounter.GetMetricName()).Record(1)

if handler.hasBufferedEvents {
return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_COMMAND, nil)
return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_COMMAND, nil)
}

if handler.hasPendingUpdates {
return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_UPDATE, nil)
return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_UPDATE, nil)
}

if err := handler.validateCommandAttr(
Expand Down Expand Up @@ -768,7 +756,7 @@ func (handler *workflowTaskHandlerImpl) handleCommandRequestCancelExternalWorkfl
return err
}
if err := handler.sizeLimitChecker.checkIfNumPendingCancelRequestsExceedsLimit(); err != nil {
return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_REQUEST_CANCEL_LIMIT_EXCEEDED, err)
return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_REQUEST_CANCEL_LIMIT_EXCEEDED, err)
}

cancelRequestID := uuid.New()
Expand Down Expand Up @@ -814,11 +802,11 @@ func (handler *workflowTaskHandlerImpl) handleCommandContinueAsNewWorkflow(
handler.metricsHandler.Counter(metrics.CommandTypeContinueAsNewCounter.GetMetricName()).Record(1)

if handler.hasBufferedEvents {
return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_COMMAND, nil)
return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_COMMAND, nil)
}

if handler.hasPendingUpdates {
return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_UPDATE, nil)
return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_UPDATE, nil)
}

namespaceName := handler.mutableState.GetNamespaceEntry().Name()
Expand Down Expand Up @@ -991,7 +979,7 @@ func (handler *workflowTaskHandlerImpl) handleCommandStartChildWorkflow(

// child workflow limit
if err := handler.sizeLimitChecker.checkIfNumChildWorkflowsExceedsLimit(); err != nil {
return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_CHILD_WORKFLOWS_LIMIT_EXCEEDED, err)
return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_CHILD_WORKFLOWS_LIMIT_EXCEEDED, err)
}

enabled := handler.config.EnableParentClosePolicy(parentNamespace.String())
Expand Down Expand Up @@ -1044,7 +1032,7 @@ func (handler *workflowTaskHandlerImpl) handleCommandSignalExternalWorkflow(
return err
}
if err := handler.sizeLimitChecker.checkIfNumPendingSignalsExceedsLimit(); err != nil {
return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_SIGNALS_LIMIT_EXCEEDED, err)
return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_SIGNALS_LIMIT_EXCEEDED, err)
}

if err := handler.sizeLimitChecker.checkIfPayloadSizeExceedsLimit(
Expand Down Expand Up @@ -1423,49 +1411,68 @@ func (handler *workflowTaskHandlerImpl) validateCommandAttr(
validationFn commandAttrValidationFn,
) error {

if failedCause, err := validationFn(); err != nil {
if _, ok := err.(*serviceerror.InvalidArgument); ok {
return handler.failCommand(failedCause, err)
}
return handler.failWorkflowTaskOnInvalidArgument(validationFn())
}

func (handler *workflowTaskHandlerImpl) failWorkflowTaskOnInvalidArgument(
wtFailedCause enumspb.WorkflowTaskFailedCause,
err error,
) error {

switch err.(type) {
case *serviceerror.InvalidArgument:
return handler.failWorkflowTask(wtFailedCause, err)
default:
return err
}

return nil
}

func (handler *workflowTaskHandlerImpl) failCommand(
func (handler *workflowTaskHandlerImpl) failWorkflowTask(
failedCause enumspb.WorkflowTaskFailedCause,
causeErr error,
) error {
handler.workflowTaskFailedCause = NewWorkflowTaskFailedCause(failedCause, causeErr)

handler.workflowTaskFailedCause = newWorkflowTaskFailedCause(
failedCause,
causeErr,
nil)
handler.stopProcessing = true
// NOTE: failWorkflowTask always return nil.
// It is important to clear returned error if WT needs to be failed to properly add WTFailed event.
// Handler will rely on stopProcessing flag and workflowTaskFailedCause field.
return nil
}

func (handler *workflowTaskHandlerImpl) failWorkflow(
failedCause enumspb.WorkflowTaskFailedCause,
causeErr error,
) error {
handler.workflowTaskFailedCause = &workflowTaskFailedCause{
failedCause: failedCause,
causeErr: causeErr,
workflowFailure: failure.NewServerFailure(causeErr.Error(), true),
}

handler.workflowTaskFailedCause = newWorkflowTaskFailedCause(
failedCause,
causeErr,
failure.NewServerFailure(causeErr.Error(), true))
handler.stopProcessing = true
// NOTE: failWorkflow always return nil.
// It is important to clear returned error if WT needs to be failed to properly add WTFailed and FailWorkflow events.
// Handler will rely on stopProcessing flag and workflowTaskFailedCause field.
return nil
}

func NewWorkflowTaskFailedCause(failedCause enumspb.WorkflowTaskFailedCause, causeErr error) *workflowTaskFailedCause {
func newWorkflowTaskFailedCause(failedCause enumspb.WorkflowTaskFailedCause, causeErr error, workflowFailure *failurepb.Failure) *workflowTaskFailedCause {

return &workflowTaskFailedCause{
failedCause: failedCause,
causeErr: causeErr,
failedCause: failedCause,
causeErr: causeErr,
workflowFailure: workflowFailure,
}
}

func (wtfc *workflowTaskFailedCause) Message() string {
if wtfc.causeErr == nil {
return wtfc.failedCause.String()
func (c *workflowTaskFailedCause) Message() string {

if c.causeErr == nil {
return c.failedCause.String()
}

return fmt.Sprintf("%v: %v", wtfc.failedCause, wtfc.causeErr.Error())
return fmt.Sprintf("%v: %v", c.failedCause, c.causeErr.Error())
}
8 changes: 7 additions & 1 deletion service/history/workflowTaskHandlerCallbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,13 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted(
hasPendingUpdates := ms.UpdateRegistry().HasPending(request.GetMessages())
hasBufferedEvents := ms.HasBufferedEvents()
if err := namespaceEntry.VerifyBinaryChecksum(request.GetBinaryChecksum()); err != nil {
wtFailedCause = NewWorkflowTaskFailedCause(enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_BINARY, serviceerror.NewInvalidArgument(fmt.Sprintf("binary %v is already marked as bad deployment", request.GetBinaryChecksum())))
wtFailedCause = newWorkflowTaskFailedCause(
enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_BINARY,
serviceerror.NewInvalidArgument(
fmt.Sprintf(
"binary %v is marked as bad deployment",
request.GetBinaryChecksum())),
nil)
} else {
namespace := namespaceEntry.Name()
workflowSizeChecker := newWorkflowSizeChecker(
Expand Down

0 comments on commit 07fd32e

Please sign in to comment.