Skip to content

Commit

Permalink
Fix fail of speculative workflow task
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed Feb 3, 2023
1 parent 32a4686 commit 1ad9ac4
Show file tree
Hide file tree
Showing 3 changed files with 240 additions and 29 deletions.
5 changes: 3 additions & 2 deletions service/history/historyEngine2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1017,8 +1017,9 @@ func (s *engine2Suite) TestRespondWorkflowTaskCompleted_StartChildWorkflow_Excee
})

s.Error(err)
s.Assert().Equal([]string{"the number of pending child workflow executions, 5, has reached the per-workflow" +
" limit of 5"}, s.errorMessages)
s.IsType(&serviceerror.InvalidArgument{}, err)
s.Len(s.errorMessages, 1)
s.Equal("the number of pending child workflow executions, 5, has reached the per-workflow limit of 5", s.errorMessages[0])
}

func (s *engine2Suite) TestStartWorkflowExecution_BrandNew() {
Expand Down
27 changes: 24 additions & 3 deletions service/history/workflow/workflow_task_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,8 +504,9 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskCompletedEvent(
workflowTask.Attempt,
timestamp.TimeValue(workflowTask.ScheduledTime).UTC(),
)
workflowTask.ScheduledEventID = scheduledEvent.GetEventId()
startedEvent := m.ms.hBuilder.AddWorkflowTaskStartedEvent(
scheduledEvent.GetEventId(),
workflowTask.ScheduledEventID,
workflowTask.RequestID,
request.GetIdentity(),
timestamp.TimeValue(workflowTask.StartedTime),
Expand Down Expand Up @@ -539,9 +540,29 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskFailedEvent(
forkEventVersion int64,
) (*historypb.HistoryEvent, error) {

if workflowTask.Type == enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE {
// Create corresponding WorkflowTaskScheduled and WorkflowTaskStarted events for speculative WT.
scheduledEvent := m.ms.hBuilder.AddWorkflowTaskScheduledEvent(
m.ms.TaskQueue(),
workflowTask.WorkflowTaskTimeout,
workflowTask.Attempt,
timestamp.TimeValue(workflowTask.ScheduledTime).UTC(),
)
workflowTask.ScheduledEventID = scheduledEvent.GetEventId()
startedEvent := m.ms.hBuilder.AddWorkflowTaskStartedEvent(
workflowTask.ScheduledEventID,
workflowTask.RequestID,
identity,
timestamp.TimeValue(workflowTask.StartedTime),
)
// TODO (alex-update): Do we need to call next line here same as in AddWorkflowTaskCompletedEvent?
m.ms.hBuilder.FlushAndCreateNewBatch()
workflowTask.StartedEventID = startedEvent.GetEventId()
}

var event *historypb.HistoryEvent
// Only emit WorkflowTaskFailedEvent if workflow task is not transient and not speculative.
if !m.ms.IsTransientWorkflowTask() && workflowTask.Type != enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE {
// Only emit WorkflowTaskFailedEvent if workflow task is not transient.
if !m.ms.IsTransientWorkflowTask() {
event = m.ms.hBuilder.AddWorkflowTaskFailedEvent(
workflowTask.ScheduledEventID,
workflowTask.StartedEventID,
Expand Down
237 changes: 213 additions & 24 deletions tests/update_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package tests

import (
"errors"
"strconv"
"testing"
"time"
Expand All @@ -40,6 +41,7 @@ import (
updatepb "go.temporal.io/api/update/v1"
"go.temporal.io/api/workflowservice/v1"

"go.temporal.io/server/common/debug"
"go.temporal.io/server/common/payloads"
"go.temporal.io/server/common/primitives/timestamp"
)
Expand Down Expand Up @@ -455,12 +457,11 @@ func (s *integrationSuite) TestUpdateWorkflow_FirstWorkflowTask_Reject() {
taskQueue := &taskqueuepb.TaskQueue{Name: tq}

request := &workflowservice.StartWorkflowExecutionRequest{
RequestId: uuid.New(),
Namespace: s.namespace,
WorkflowId: id,
WorkflowType: workflowType,
TaskQueue: taskQueue,
WorkflowTaskTimeout: timestamp.DurationPtr(10 * time.Minute),
RequestId: uuid.New(),
Namespace: s.namespace,
WorkflowId: id,
WorkflowType: workflowType,
TaskQueue: taskQueue,
}

startResp, err := s.engine.StartWorkflowExecution(NewContext(), request)
Expand Down Expand Up @@ -608,12 +609,11 @@ func (s *integrationSuite) TestUpdateWorkflow_NewWorkflowTask_Reject() {
taskQueue := &taskqueuepb.TaskQueue{Name: tq}

request := &workflowservice.StartWorkflowExecutionRequest{
RequestId: uuid.New(),
Namespace: s.namespace,
WorkflowId: id,
WorkflowType: workflowType,
TaskQueue: taskQueue,
WorkflowTaskTimeout: timestamp.DurationPtr(10 * time.Minute),
RequestId: uuid.New(),
Namespace: s.namespace,
WorkflowId: id,
WorkflowType: workflowType,
TaskQueue: taskQueue,
}

startResp, err := s.engine.StartWorkflowExecution(NewContext(), request)
Expand Down Expand Up @@ -781,12 +781,11 @@ func (s *integrationSuite) TestUpdateWorkflow_FirstWorkflowTask_1stAccept_2ndAcc
taskQueue := &taskqueuepb.TaskQueue{Name: tq}

request := &workflowservice.StartWorkflowExecutionRequest{
RequestId: uuid.New(),
Namespace: s.namespace,
WorkflowId: id,
WorkflowType: workflowType,
TaskQueue: taskQueue,
WorkflowTaskTimeout: timestamp.DurationPtr(10 * time.Minute),
RequestId: uuid.New(),
Namespace: s.namespace,
WorkflowId: id,
WorkflowType: workflowType,
TaskQueue: taskQueue,
}

startResp, err := s.engine.StartWorkflowExecution(NewContext(), request)
Expand Down Expand Up @@ -1072,12 +1071,11 @@ func (s *integrationSuite) TestUpdateWorkflow_FirstWorkflowTask_1stAccept_2ndRej
taskQueue := &taskqueuepb.TaskQueue{Name: tq}

request := &workflowservice.StartWorkflowExecutionRequest{
RequestId: uuid.New(),
Namespace: s.namespace,
WorkflowId: id,
WorkflowType: workflowType,
TaskQueue: taskQueue,
WorkflowTaskTimeout: timestamp.DurationPtr(10 * time.Minute),
RequestId: uuid.New(),
Namespace: s.namespace,
WorkflowId: id,
WorkflowType: workflowType,
TaskQueue: taskQueue,
}

startResp, err := s.engine.StartWorkflowExecution(NewContext(), request)
Expand Down Expand Up @@ -1329,3 +1327,194 @@ func (s *integrationSuite) TestUpdateWorkflow_FirstWorkflowTask_1stAccept_2ndRej
16 WorkflowTaskCompleted
17 WorkflowExecutionCompleted`, events)
}

func (s *integrationSuite) TestUpdateWorkflow_FirstWorkflowTask_BadAcceptMessage() {
id := "integration-update-workflow-test-7"
wt := "integration-update-workflow-test-7-type"
tq := "integration-update-workflow-test-7-task-queue"

workflowType := &commonpb.WorkflowType{Name: wt}
taskQueue := &taskqueuepb.TaskQueue{Name: tq}

request := &workflowservice.StartWorkflowExecutionRequest{
RequestId: uuid.New(),
Namespace: s.namespace,
WorkflowId: id,
WorkflowType: workflowType,
TaskQueue: taskQueue,
// Some short but reasonable timeout because there is a wait for it in the test.
WorkflowTaskTimeout: timestamp.DurationPtr(1 * time.Second * debug.TimeoutMultiplier),
}

startResp, err := s.engine.StartWorkflowExecution(NewContext(), request)
s.NoError(err)

we := &commonpb.WorkflowExecution{
WorkflowId: id,
RunId: startResp.GetRunId(),
}

wtHandlerCalls := 0
wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType, previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {
wtHandlerCalls++
switch wtHandlerCalls {
case 1:
return []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK,
Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{
ActivityId: strconv.Itoa(1),
ActivityType: &commonpb.ActivityType{Name: "activity_type_1"},
TaskQueue: &taskqueuepb.TaskQueue{Name: tq},
ScheduleToCloseTimeout: timestamp.DurationPtr(10 * time.Hour),
}},
}}, nil
case 2:
s.EqualHistory(`
1 WorkflowExecutionStarted
2 WorkflowTaskScheduled
3 WorkflowTaskStarted
4 WorkflowTaskCompleted
5 ActivityTaskScheduled
6 WorkflowTaskScheduled
7 WorkflowTaskStarted`, history)
return nil, nil
case 3:
s.Fail("should not be called because messageHandler returns error")
return nil, nil
case 4:
s.EqualHistory(`
1 WorkflowExecutionStarted
2 WorkflowTaskScheduled
3 WorkflowTaskStarted
4 WorkflowTaskCompleted
5 ActivityTaskScheduled
6 WorkflowTaskScheduled
7 WorkflowTaskStarted
8 WorkflowTaskFailed
9 WorkflowTaskScheduled
10 WorkflowTaskStarted`, history)
return []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION,
Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{
Result: payloads.EncodeString("done"),
}},
}}, nil
default:
s.Failf("wtHandler called too many times", "wtHandler shouldn't be called %d times", wtHandlerCalls)
return nil, nil
}
}

msgHandlerCalls := 0
msgHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*protocolpb.Message, error) {
msgHandlerCalls++
switch msgHandlerCalls {
case 1:
return nil, nil
case 2:
updRequestMsg := task.Messages[0]
s.EqualValues(6, updRequestMsg.GetEventId())

// Emulate bug in worker/SDK update handler code. Return malformed acceptance response.
return []*protocolpb.Message{
{
Id: uuid.New(),
ProtocolInstanceId: "some-random-wrong-id",
SequencingId: nil,
Body: marshalAny(s, &updatepb.Acceptance{
AcceptedRequestMessageId: updRequestMsg.GetId(),
AcceptedRequestSequencingEventId: updRequestMsg.GetEventId(),
AcceptedRequest: nil, // must not be nil.
}),
},
}, nil
case 3:
// 2nd attempt doesn't have any updates attached to it.
s.Empty(task.Messages)
wtHandlerCalls++ // because it won't be called for case 3 but counter should be in sync.
// Fail WT one more time. Although 2nd attempt is normal WT, it is also transient and shouldn't appear in the history.
// Returning error will cause the poller to fail WT.
return nil, errors.New("malformed request")
case 4:
return nil, nil
default:
s.Failf("msgHandler called too many times", "msgHandler shouldn't be called %d times", msgHandlerCalls)
return nil, nil
}
}

poller := &TaskPoller{
Engine: s.engine,
Namespace: s.namespace,
TaskQueue: taskQueue,
WorkflowTaskHandler: wtHandler,
MessageHandler: msgHandler,
Logger: s.Logger,
T: s.T(),
}

// Start activity using existing workflow task.
_, err = poller.PollAndProcessWorkflowTask(true, false)
s.NoError(err)

type UpdateResult struct {
Response *workflowservice.UpdateWorkflowExecutionResponse
Err error
}
updateResultCh := make(chan UpdateResult)
updateWorkflowFn := func() {
updateResponse, err1 := s.engine.UpdateWorkflowExecution(NewContext(), &workflowservice.UpdateWorkflowExecutionRequest{
Namespace: s.namespace,
WorkflowExecution: we,
Request: &updatepb.Request{
Meta: &updatepb.Meta{UpdateId: uuid.New()},
Input: &updatepb.Input{
Name: "update_handler",
Args: payloads.EncodeString("update args"),
},
},
})
s.NoError(err1)
updateResultCh <- UpdateResult{Response: updateResponse, Err: err1}
}
go updateWorkflowFn()
time.Sleep(500 * time.Millisecond) // This is to make sure that update gets to the server.

// Try to accept update in workflow: get malformed response.
_, err = poller.PollAndProcessWorkflowTask(false, false)
s.Error(err)
s.Equal(err.Error(), "BadUpdateWorkflowExecutionMessage: accepted_request is not set on update.Acceptance message body.")
// New normal (but transient) WT will be created but not returned.

updateResult := <-updateResultCh
s.NoError(updateResult.Err)
// TODO (alex-update): this is wrong. Caller shouldn't get this error if WT failed.
s.Equal("update cleared, please retry", updateResult.Response.GetOutcome().GetFailure().GetMessage())

// Try to accept update in workflow 2nd time: get error. Poller will fail WT.
_, err = poller.PollAndProcessWorkflowTask(false, false)
// The error is from RespondWorkflowTaskFailed, which should go w/o error.
s.NoError(err)

// Complete workflow.
_, err = poller.PollAndProcessWorkflowTask(false, false)
s.NoError(err)

s.Equal(4, wtHandlerCalls)
s.Equal(4, msgHandlerCalls)

events := s.getHistory(s.namespace, we)
s.EqualHistoryEvents(`
1 WorkflowExecutionStarted
2 WorkflowTaskScheduled
3 WorkflowTaskStarted
4 WorkflowTaskCompleted
5 ActivityTaskScheduled
6 WorkflowTaskScheduled
7 WorkflowTaskStarted
8 WorkflowTaskFailed
9 WorkflowTaskScheduled
10 WorkflowTaskStarted
11 WorkflowTaskCompleted
12 WorkflowExecutionCompleted`, events)
}

0 comments on commit 1ad9ac4

Please sign in to comment.