diff --git a/service/history/workflowTaskHandler.go b/service/history/workflowTaskHandler.go index 659911ecb7b..b2b4d05912a 100644 --- a/service/history/workflowTaskHandler.go +++ b/service/history/workflowTaskHandler.go @@ -332,7 +332,7 @@ func (handler *workflowTaskHandlerImpl) handleCommandScheduleActivity( return nil, handler.failWorkflow(enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_ACTIVITY_ATTRIBUTES, err) } if err := handler.sizeLimitChecker.checkIfNumPendingActivitiesExceedsLimit(); err != nil { - return nil, handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_ACTIVITIES_LIMIT_EXCEEDED, err) + return nil, handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_ACTIVITIES_LIMIT_EXCEEDED, err) } enums.SetDefaultTaskQueueKind(&attr.GetTaskQueue().Kind) @@ -349,10 +349,7 @@ func (handler *workflowTaskHandlerImpl) handleCommandScheduleActivity( eagerStartActivity, ) if err != nil { - if _, ok := err.(*serviceerror.InvalidArgument); ok { - return nil, handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_SCHEDULE_ACTIVITY_DUPLICATE_ID, err) - } - return nil, err + return nil, handler.failWorkflowTaskOnInvalidArgument(enumspb.WORKFLOW_TASK_FAILED_CAUSE_SCHEDULE_ACTIVITY_DUPLICATE_ID, err) } if !eagerStartActivity { @@ -473,10 +470,7 @@ func (handler *workflowTaskHandlerImpl) handleCommandRequestCancelActivity( handler.identity, ) if err != nil { - if _, ok := err.(*serviceerror.InvalidArgument); ok { - return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_REQUEST_CANCEL_ACTIVITY_ATTRIBUTES, err) - } - return err + return handler.failWorkflowTaskOnInvalidArgument(enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_REQUEST_CANCEL_ACTIVITY_ATTRIBUTES, err) } if ai != nil { // If ai is nil, the activity has already been canceled/completed/timedout. The cancel request @@ -518,10 +512,7 @@ func (handler *workflowTaskHandlerImpl) handleCommandStartTimer( _, _, err := handler.mutableState.AddTimerStartedEvent(handler.workflowTaskCompletedID, attr) if err != nil { - if _, ok := err.(*serviceerror.InvalidArgument); ok { - return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_START_TIMER_DUPLICATE_ID, err) - } - return err + return handler.failWorkflowTaskOnInvalidArgument(enumspb.WORKFLOW_TASK_FAILED_CAUSE_START_TIMER_DUPLICATE_ID, err) } return nil } @@ -534,11 +525,11 @@ func (handler *workflowTaskHandlerImpl) handleCommandCompleteWorkflow( handler.metricsHandler.Counter(metrics.CommandTypeCompleteWorkflowCounter.GetMetricName()).Record(1) if handler.hasBufferedEvents { - return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_COMMAND, nil) + return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_COMMAND, nil) } if handler.hasPendingUpdates { - return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_UPDATE, nil) + return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_UPDATE, nil) } if err := handler.validateCommandAttr( @@ -596,11 +587,11 @@ func (handler *workflowTaskHandlerImpl) handleCommandFailWorkflow( handler.metricsHandler.Counter(metrics.CommandTypeFailWorkflowCounter.GetMetricName()).Record(1) if handler.hasBufferedEvents { - return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_COMMAND, nil) + return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_COMMAND, nil) } if handler.hasPendingUpdates { - return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_UPDATE, nil) + return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_UPDATE, nil) } if err := handler.validateCommandAttr( @@ -685,10 +676,7 @@ func (handler *workflowTaskHandlerImpl) handleCommandCancelTimer( attr, handler.identity) if err != nil { - if _, ok := err.(*serviceerror.InvalidArgument); ok { - return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_CANCEL_TIMER_ATTRIBUTES, err) - } - return err + return handler.failWorkflowTaskOnInvalidArgument(enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_CANCEL_TIMER_ATTRIBUTES, err) } // timer deletion is a success, we may have deleted a fired timer in @@ -707,11 +695,11 @@ func (handler *workflowTaskHandlerImpl) handleCommandCancelWorkflow( handler.metricsHandler.Counter(metrics.CommandTypeCancelWorkflowCounter.GetMetricName()).Record(1) if handler.hasBufferedEvents { - return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_COMMAND, nil) + return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_COMMAND, nil) } if handler.hasPendingUpdates { - return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_UPDATE, nil) + return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_UPDATE, nil) } if err := handler.validateCommandAttr( @@ -768,7 +756,7 @@ func (handler *workflowTaskHandlerImpl) handleCommandRequestCancelExternalWorkfl return err } if err := handler.sizeLimitChecker.checkIfNumPendingCancelRequestsExceedsLimit(); err != nil { - return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_REQUEST_CANCEL_LIMIT_EXCEEDED, err) + return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_REQUEST_CANCEL_LIMIT_EXCEEDED, err) } cancelRequestID := uuid.New() @@ -814,11 +802,11 @@ func (handler *workflowTaskHandlerImpl) handleCommandContinueAsNewWorkflow( handler.metricsHandler.Counter(metrics.CommandTypeContinueAsNewCounter.GetMetricName()).Record(1) if handler.hasBufferedEvents { - return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_COMMAND, nil) + return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_COMMAND, nil) } if handler.hasPendingUpdates { - return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_UPDATE, nil) + return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_UPDATE, nil) } namespaceName := handler.mutableState.GetNamespaceEntry().Name() @@ -991,7 +979,7 @@ func (handler *workflowTaskHandlerImpl) handleCommandStartChildWorkflow( // child workflow limit if err := handler.sizeLimitChecker.checkIfNumChildWorkflowsExceedsLimit(); err != nil { - return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_CHILD_WORKFLOWS_LIMIT_EXCEEDED, err) + return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_CHILD_WORKFLOWS_LIMIT_EXCEEDED, err) } enabled := handler.config.EnableParentClosePolicy(parentNamespace.String()) @@ -1044,7 +1032,7 @@ func (handler *workflowTaskHandlerImpl) handleCommandSignalExternalWorkflow( return err } if err := handler.sizeLimitChecker.checkIfNumPendingSignalsExceedsLimit(); err != nil { - return handler.failCommand(enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_SIGNALS_LIMIT_EXCEEDED, err) + return handler.failWorkflowTask(enumspb.WORKFLOW_TASK_FAILED_CAUSE_PENDING_SIGNALS_LIMIT_EXCEEDED, err) } if err := handler.sizeLimitChecker.checkIfPayloadSizeExceedsLimit( @@ -1423,22 +1411,32 @@ func (handler *workflowTaskHandlerImpl) validateCommandAttr( validationFn commandAttrValidationFn, ) error { - if failedCause, err := validationFn(); err != nil { - if _, ok := err.(*serviceerror.InvalidArgument); ok { - return handler.failCommand(failedCause, err) - } + return handler.failWorkflowTaskOnInvalidArgument(validationFn()) +} + +func (handler *workflowTaskHandlerImpl) failWorkflowTaskOnInvalidArgument( + wtFailedCause enumspb.WorkflowTaskFailedCause, + err error, +) error { + + switch err.(type) { + case *serviceerror.InvalidArgument: + return handler.failWorkflowTask(wtFailedCause, err) + default: return err } - - return nil } -func (handler *workflowTaskHandlerImpl) failCommand( +func (handler *workflowTaskHandlerImpl) failWorkflowTask( failedCause enumspb.WorkflowTaskFailedCause, causeErr error, ) error { - handler.workflowTaskFailedCause = NewWorkflowTaskFailedCause(failedCause, causeErr) + + handler.workflowTaskFailedCause = newWorkflowTaskFailedCause(failedCause, causeErr, nil) handler.stopProcessing = true + // NOTE: failWorkflowTask always return nil. + // It is important to clear returned error if WT needs to be failed to properly add WTFailed event. + // Handler will rely on stopProcessing flag and workflowTaskFailedCause field. return nil } @@ -1446,26 +1444,33 @@ func (handler *workflowTaskHandlerImpl) failWorkflow( failedCause enumspb.WorkflowTaskFailedCause, causeErr error, ) error { - handler.workflowTaskFailedCause = &workflowTaskFailedCause{ - failedCause: failedCause, - causeErr: causeErr, - workflowFailure: failure.NewServerFailure(causeErr.Error(), true), - } + + handler.workflowTaskFailedCause = newWorkflowTaskFailedCause( + failedCause, + causeErr, + failure.NewServerFailure(causeErr.Error(), true), + ) handler.stopProcessing = true + // NOTE: failWorkflow always return nil. + // It is important to clear returned error if WT needs to be failed to properly add WTFailed and FailWorkflow events. + // Handler will rely on stopProcessing flag and workflowTaskFailedCause and workflowFailure fields. return nil } -func NewWorkflowTaskFailedCause(failedCause enumspb.WorkflowTaskFailedCause, causeErr error) *workflowTaskFailedCause { +func newWorkflowTaskFailedCause(failedCause enumspb.WorkflowTaskFailedCause, causeErr error, workflowFailure *failurepb.Failure) *workflowTaskFailedCause { + return &workflowTaskFailedCause{ - failedCause: failedCause, - causeErr: causeErr, + failedCause: failedCause, + causeErr: causeErr, + workflowFailure: workflowFailure, } } -func (wtfc *workflowTaskFailedCause) Message() string { - if wtfc.causeErr == nil { - return wtfc.failedCause.String() +func (c *workflowTaskFailedCause) Message() string { + + if c.causeErr == nil { + return c.failedCause.String() } - return fmt.Sprintf("%v: %v", wtfc.failedCause, wtfc.causeErr.Error()) + return fmt.Sprintf("%v: %v", c.failedCause, c.causeErr.Error()) } diff --git a/service/history/workflowTaskHandlerCallbacks.go b/service/history/workflowTaskHandlerCallbacks.go index 8f3cd090eca..204924b3a7e 100644 --- a/service/history/workflowTaskHandlerCallbacks.go +++ b/service/history/workflowTaskHandlerCallbacks.go @@ -453,7 +453,10 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( hasPendingUpdates := ms.UpdateRegistry().HasPending(request.GetMessages()) hasBufferedEvents := ms.HasBufferedEvents() if err := namespaceEntry.VerifyBinaryChecksum(request.GetBinaryChecksum()); err != nil { - wtFailedCause = NewWorkflowTaskFailedCause(enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_BINARY, serviceerror.NewInvalidArgument(fmt.Sprintf("binary %v is already marked as bad deployment", request.GetBinaryChecksum()))) + wtFailedCause = newWorkflowTaskFailedCause( + enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_BINARY, + serviceerror.NewInvalidArgument(fmt.Sprintf("binary %v is already marked as bad deployment", request.GetBinaryChecksum())), + nil) } else { namespace := namespaceEntry.Name() workflowSizeChecker := newWorkflowSizeChecker(