Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix fail of speculative workflow task #3898

Merged
merged 1 commit into from
Feb 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions service/history/historyEngine2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1017,8 +1017,9 @@ func (s *engine2Suite) TestRespondWorkflowTaskCompleted_StartChildWorkflow_Excee
})

s.Error(err)
s.Assert().Equal([]string{"the number of pending child workflow executions, 5, has reached the per-workflow" +
" limit of 5"}, s.errorMessages)
s.IsType(&serviceerror.InvalidArgument{}, err)
s.Len(s.errorMessages, 1)
s.Equal("the number of pending child workflow executions, 5, has reached the per-workflow limit of 5", s.errorMessages[0])
}

func (s *engine2Suite) TestStartWorkflowExecution_BrandNew() {
Expand Down
27 changes: 24 additions & 3 deletions service/history/workflow/workflow_task_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,8 +504,9 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskCompletedEvent(
workflowTask.Attempt,
timestamp.TimeValue(workflowTask.ScheduledTime).UTC(),
)
workflowTask.ScheduledEventID = scheduledEvent.GetEventId()
startedEvent := m.ms.hBuilder.AddWorkflowTaskStartedEvent(
scheduledEvent.GetEventId(),
workflowTask.ScheduledEventID,
workflowTask.RequestID,
request.GetIdentity(),
timestamp.TimeValue(workflowTask.StartedTime),
Expand Down Expand Up @@ -539,9 +540,29 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskFailedEvent(
forkEventVersion int64,
) (*historypb.HistoryEvent, error) {

if workflowTask.Type == enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE {
// Create corresponding WorkflowTaskScheduled and WorkflowTaskStarted events for speculative WT.
scheduledEvent := m.ms.hBuilder.AddWorkflowTaskScheduledEvent(
m.ms.TaskQueue(),
workflowTask.WorkflowTaskTimeout,
workflowTask.Attempt,
timestamp.TimeValue(workflowTask.ScheduledTime).UTC(),
)
workflowTask.ScheduledEventID = scheduledEvent.GetEventId()
startedEvent := m.ms.hBuilder.AddWorkflowTaskStartedEvent(
workflowTask.ScheduledEventID,
workflowTask.RequestID,
identity,
timestamp.TimeValue(workflowTask.StartedTime),
)
// TODO (alex-update): Do we need to call next line here same as in AddWorkflowTaskCompletedEvent?
m.ms.hBuilder.FlushAndCreateNewBatch()
workflowTask.StartedEventID = startedEvent.GetEventId()
}

var event *historypb.HistoryEvent
// Only emit WorkflowTaskFailedEvent if workflow task is not transient and not speculative.
if !m.ms.IsTransientWorkflowTask() && workflowTask.Type != enumsspb.WORKFLOW_TASK_TYPE_SPECULATIVE {
// Only emit WorkflowTaskFailedEvent if workflow task is not transient.
if !m.ms.IsTransientWorkflowTask() {
event = m.ms.hBuilder.AddWorkflowTaskFailedEvent(
workflowTask.ScheduledEventID,
workflowTask.StartedEventID,
Expand Down
237 changes: 213 additions & 24 deletions tests/update_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package tests

import (
"errors"
"strconv"
"testing"
"time"
Expand All @@ -40,6 +41,7 @@ import (
updatepb "go.temporal.io/api/update/v1"
"go.temporal.io/api/workflowservice/v1"

"go.temporal.io/server/common/debug"
"go.temporal.io/server/common/payloads"
"go.temporal.io/server/common/primitives/timestamp"
)
Expand Down Expand Up @@ -455,12 +457,11 @@ func (s *integrationSuite) TestUpdateWorkflow_FirstWorkflowTask_Reject() {
taskQueue := &taskqueuepb.TaskQueue{Name: tq}

request := &workflowservice.StartWorkflowExecutionRequest{
RequestId: uuid.New(),
Namespace: s.namespace,
WorkflowId: id,
WorkflowType: workflowType,
TaskQueue: taskQueue,
WorkflowTaskTimeout: timestamp.DurationPtr(10 * time.Minute),
RequestId: uuid.New(),
Namespace: s.namespace,
WorkflowId: id,
WorkflowType: workflowType,
TaskQueue: taskQueue,
}

startResp, err := s.engine.StartWorkflowExecution(NewContext(), request)
Expand Down Expand Up @@ -608,12 +609,11 @@ func (s *integrationSuite) TestUpdateWorkflow_NewWorkflowTask_Reject() {
taskQueue := &taskqueuepb.TaskQueue{Name: tq}

request := &workflowservice.StartWorkflowExecutionRequest{
RequestId: uuid.New(),
Namespace: s.namespace,
WorkflowId: id,
WorkflowType: workflowType,
TaskQueue: taskQueue,
WorkflowTaskTimeout: timestamp.DurationPtr(10 * time.Minute),
RequestId: uuid.New(),
Namespace: s.namespace,
WorkflowId: id,
WorkflowType: workflowType,
TaskQueue: taskQueue,
}

startResp, err := s.engine.StartWorkflowExecution(NewContext(), request)
Expand Down Expand Up @@ -781,12 +781,11 @@ func (s *integrationSuite) TestUpdateWorkflow_FirstWorkflowTask_1stAccept_2ndAcc
taskQueue := &taskqueuepb.TaskQueue{Name: tq}

request := &workflowservice.StartWorkflowExecutionRequest{
RequestId: uuid.New(),
Namespace: s.namespace,
WorkflowId: id,
WorkflowType: workflowType,
TaskQueue: taskQueue,
WorkflowTaskTimeout: timestamp.DurationPtr(10 * time.Minute),
RequestId: uuid.New(),
Namespace: s.namespace,
WorkflowId: id,
WorkflowType: workflowType,
TaskQueue: taskQueue,
}

startResp, err := s.engine.StartWorkflowExecution(NewContext(), request)
Expand Down Expand Up @@ -1072,12 +1071,11 @@ func (s *integrationSuite) TestUpdateWorkflow_FirstWorkflowTask_1stAccept_2ndRej
taskQueue := &taskqueuepb.TaskQueue{Name: tq}

request := &workflowservice.StartWorkflowExecutionRequest{
RequestId: uuid.New(),
Namespace: s.namespace,
WorkflowId: id,
WorkflowType: workflowType,
TaskQueue: taskQueue,
WorkflowTaskTimeout: timestamp.DurationPtr(10 * time.Minute),
RequestId: uuid.New(),
Namespace: s.namespace,
WorkflowId: id,
WorkflowType: workflowType,
TaskQueue: taskQueue,
}

startResp, err := s.engine.StartWorkflowExecution(NewContext(), request)
Expand Down Expand Up @@ -1329,3 +1327,194 @@ func (s *integrationSuite) TestUpdateWorkflow_FirstWorkflowTask_1stAccept_2ndRej
16 WorkflowTaskCompleted
17 WorkflowExecutionCompleted`, events)
}

func (s *integrationSuite) TestUpdateWorkflow_FirstWorkflowTask_BadAcceptMessage() {
id := "integration-update-workflow-test-7"
wt := "integration-update-workflow-test-7-type"
tq := "integration-update-workflow-test-7-task-queue"

workflowType := &commonpb.WorkflowType{Name: wt}
taskQueue := &taskqueuepb.TaskQueue{Name: tq}

request := &workflowservice.StartWorkflowExecutionRequest{
RequestId: uuid.New(),
Namespace: s.namespace,
WorkflowId: id,
WorkflowType: workflowType,
TaskQueue: taskQueue,
// Some short but reasonable timeout because there is a wait for it in the test.
WorkflowTaskTimeout: timestamp.DurationPtr(1 * time.Second * debug.TimeoutMultiplier),
}

startResp, err := s.engine.StartWorkflowExecution(NewContext(), request)
s.NoError(err)

we := &commonpb.WorkflowExecution{
WorkflowId: id,
RunId: startResp.GetRunId(),
}

wtHandlerCalls := 0
wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType, previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {
wtHandlerCalls++
switch wtHandlerCalls {
case 1:
return []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK,
Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{
ActivityId: strconv.Itoa(1),
ActivityType: &commonpb.ActivityType{Name: "activity_type_1"},
TaskQueue: &taskqueuepb.TaskQueue{Name: tq},
ScheduleToCloseTimeout: timestamp.DurationPtr(10 * time.Hour),
}},
}}, nil
case 2:
s.EqualHistory(`
1 WorkflowExecutionStarted
2 WorkflowTaskScheduled
3 WorkflowTaskStarted
4 WorkflowTaskCompleted
5 ActivityTaskScheduled
6 WorkflowTaskScheduled
7 WorkflowTaskStarted`, history)
return nil, nil
case 3:
s.Fail("should not be called because messageHandler returns error")
return nil, nil
case 4:
s.EqualHistory(`
1 WorkflowExecutionStarted
2 WorkflowTaskScheduled
3 WorkflowTaskStarted
4 WorkflowTaskCompleted
5 ActivityTaskScheduled
6 WorkflowTaskScheduled
7 WorkflowTaskStarted
8 WorkflowTaskFailed
9 WorkflowTaskScheduled
10 WorkflowTaskStarted`, history)
return []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION,
Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{
Result: payloads.EncodeString("done"),
}},
}}, nil
default:
s.Failf("wtHandler called too many times", "wtHandler shouldn't be called %d times", wtHandlerCalls)
return nil, nil
}
}

msgHandlerCalls := 0
msgHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*protocolpb.Message, error) {
msgHandlerCalls++
switch msgHandlerCalls {
case 1:
return nil, nil
case 2:
updRequestMsg := task.Messages[0]
s.EqualValues(6, updRequestMsg.GetEventId())

// Emulate bug in worker/SDK update handler code. Return malformed acceptance response.
return []*protocolpb.Message{
{
Id: uuid.New(),
ProtocolInstanceId: "some-random-wrong-id",
SequencingId: nil,
Body: marshalAny(s, &updatepb.Acceptance{
AcceptedRequestMessageId: updRequestMsg.GetId(),
AcceptedRequestSequencingEventId: updRequestMsg.GetEventId(),
AcceptedRequest: nil, // must not be nil.
}),
},
}, nil
case 3:
// 2nd attempt doesn't have any updates attached to it.
s.Empty(task.Messages)
wtHandlerCalls++ // because it won't be called for case 3 but counter should be in sync.
// Fail WT one more time. Although 2nd attempt is normal WT, it is also transient and shouldn't appear in the history.
// Returning error will cause the poller to fail WT.
return nil, errors.New("malformed request")
case 4:
return nil, nil
default:
s.Failf("msgHandler called too many times", "msgHandler shouldn't be called %d times", msgHandlerCalls)
return nil, nil
}
}

poller := &TaskPoller{
Engine: s.engine,
Namespace: s.namespace,
TaskQueue: taskQueue,
WorkflowTaskHandler: wtHandler,
MessageHandler: msgHandler,
Logger: s.Logger,
T: s.T(),
}

// Start activity using existing workflow task.
_, err = poller.PollAndProcessWorkflowTask(true, false)
s.NoError(err)

type UpdateResult struct {
Response *workflowservice.UpdateWorkflowExecutionResponse
Err error
}
updateResultCh := make(chan UpdateResult)
updateWorkflowFn := func() {
updateResponse, err1 := s.engine.UpdateWorkflowExecution(NewContext(), &workflowservice.UpdateWorkflowExecutionRequest{
Namespace: s.namespace,
WorkflowExecution: we,
Request: &updatepb.Request{
Meta: &updatepb.Meta{UpdateId: uuid.New()},
Input: &updatepb.Input{
Name: "update_handler",
Args: payloads.EncodeString("update args"),
},
},
})
s.NoError(err1)
updateResultCh <- UpdateResult{Response: updateResponse, Err: err1}
}
go updateWorkflowFn()
time.Sleep(500 * time.Millisecond) // This is to make sure that update gets to the server.

// Try to accept update in workflow: get malformed response.
_, err = poller.PollAndProcessWorkflowTask(false, false)
s.Error(err)
s.Equal(err.Error(), "BadUpdateWorkflowExecutionMessage: accepted_request is not set on update.Acceptance message body.")
// New normal (but transient) WT will be created but not returned.

updateResult := <-updateResultCh
s.NoError(updateResult.Err)
// TODO (alex-update): this is wrong. Caller shouldn't get this error if WT failed.
s.Equal("update cleared, please retry", updateResult.Response.GetOutcome().GetFailure().GetMessage())

// Try to accept update in workflow 2nd time: get error. Poller will fail WT.
_, err = poller.PollAndProcessWorkflowTask(false, false)
// The error is from RespondWorkflowTaskFailed, which should go w/o error.
s.NoError(err)

// Complete workflow.
_, err = poller.PollAndProcessWorkflowTask(false, false)
s.NoError(err)

s.Equal(4, wtHandlerCalls)
s.Equal(4, msgHandlerCalls)

events := s.getHistory(s.namespace, we)
s.EqualHistoryEvents(`
1 WorkflowExecutionStarted
2 WorkflowTaskScheduled
3 WorkflowTaskStarted
4 WorkflowTaskCompleted
5 ActivityTaskScheduled
6 WorkflowTaskScheduled
7 WorkflowTaskStarted
8 WorkflowTaskFailed
9 WorkflowTaskScheduled
10 WorkflowTaskStarted
11 WorkflowTaskCompleted
12 WorkflowExecutionCompleted`, events)
}