Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug where we return nil when there's an error in the archival queue #3723

Merged
merged 1 commit into from
Dec 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions service/history/archival_queue_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"context"
"errors"
"fmt"
"time"

enumspb "go.temporal.io/api/enums/v1"

Expand Down Expand Up @@ -115,7 +116,7 @@ func (e *archivalQueueTaskExecutor) processArchiveExecutionTask(ctx context.Cont
if err != nil {
return err
}
return e.addDeletionTask(ctx, logger, task)
return e.addDeletionTask(ctx, logger, task, request.CloseTime)
}

// getArchiveTaskRequest returns an archival request for the given archive execution task.
Expand Down Expand Up @@ -230,7 +231,12 @@ func (e *archivalQueueTaskExecutor) getArchiveTaskRequest(
}

// addDeletionTask adds a task to delete workflow history events from primary storage.
func (e *archivalQueueTaskExecutor) addDeletionTask(ctx context.Context, logger log.Logger, task *tasks.ArchiveExecutionTask) error {
func (e *archivalQueueTaskExecutor) addDeletionTask(
ctx context.Context,
logger log.Logger,
task *tasks.ArchiveExecutionTask,
closeTime *time.Time,
) error {
mutableState, err := e.loadAndVersionCheckMutableState(ctx, logger, task)
if err != nil {
return err
Expand All @@ -244,13 +250,9 @@ func (e *archivalQueueTaskExecutor) addDeletionTask(ctx context.Context, logger
mutableState,
e.shardContext.GetConfig(),
)
closeTime, err := mutableState.GetWorkflowCloseTime(ctx)
if err != nil {
return err
}
err = taskGenerator.GenerateDeleteHistoryEventTask(*closeTime, true)
if err != nil {
return nil
return err
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the bug we're fixing. The rest of this is just adding additional test coverage.

}
err = e.shardContext.AddTasks(ctx, &persistence.AddHistoryTasksRequest{
ShardID: e.shardContext.GetShardID(),
Expand Down
233 changes: 191 additions & 42 deletions service/history/archival_queue_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/stretchr/testify/require"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
workflowpb "go.temporal.io/api/workflow/v1"

"go.temporal.io/server/api/persistence/v1"
carchiver "go.temporal.io/server/common/archiver"
Expand Down Expand Up @@ -130,17 +131,28 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
{
Name: "namespace not found",
Configure: func(p *params) {
p.Retention = nil
p.GetNamespaceByIDError = &serviceerror.NamespaceNotFound{}
// namespace not found means we should use default retention
p.ExpectedDeleteTime = p.CloseTime.Add(24 * time.Hour)
},
},
{
Name: "get namespace internal error",
Configure: func(p *params) {
p.GetNamespaceByIDError = serviceerror.NewInternal("get namespace error")
p.ExpectAddTask = false
p.ExpectedErrorSubstrings = []string{
"get namespace error",
}
},
},
{
Name: "wrong task type",
Configure: func(p *params) {
version := p.Task.GetVersion()
p.Task = &tasks.DeleteExecutionTask{
WorkflowKey: p.WorkflowKey,
Version: p.Version,
Version: version,
}
p.ExpectArchive = false
p.ExpectAddTask = false
Expand Down Expand Up @@ -182,8 +194,97 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
{
Name: "archiver error",
Configure: func(p *params) {
p.ArchiveError = errors.New("archive error")
p.ExpectedErrorSubstrings = []string{"archive error"}
p.ArchiveError = errors.New("archiver error")
p.ExpectedErrorSubstrings = []string{"archiver error"}
p.ExpectAddTask = false
},
},
{
Name: "get workflow close time error",
Configure: func(p *params) {
p.GetWorkflowCloseTimeError = errors.New("get workflow close time error")
p.ExpectedErrorSubstrings = []string{"get workflow close time error"}
p.ExpectArchive = false
p.ExpectAddTask = false
},
},
{
Name: "get current branch token error",
Configure: func(p *params) {
p.GetCurrentBranchTokenError = errors.New("get current branch token error")
p.ExpectedErrorSubstrings = []string{"get current branch token error"}
p.ExpectArchive = false
p.ExpectAddTask = false
},
},
{
Name: "load mutable state error",
Configure: func(p *params) {
p.LoadMutableStateError = errors.New("load mutable state error")
p.ExpectedErrorSubstrings = []string{"load mutable state error"}
p.ExpectArchive = false
p.ExpectAddTask = false
},
},
{
Name: "get or create workflow execution error",
Configure: func(p *params) {
p.GetOrCreateWorkflowExecutionError = errors.New("get or create workflow execution error")
p.ExpectedErrorSubstrings = []string{"get or create workflow execution error"}
p.ExpectArchive = false
p.ExpectAddTask = false
},
},
{
Name: "get last write version error before archiving",
Configure: func(p *params) {
p.GetLastWriteVersionBeforeArchivalError = errors.New("get last write version error")
p.ExpectedErrorSubstrings = []string{"get last write version error"}
p.ExpectArchive = false
p.ExpectAddTask = false
},
},
{
Name: "get last write version error after archiving",
Configure: func(p *params) {
p.GetLastWriteVersionAfterArchivalError = errors.New("get last write version error")
p.ExpectedErrorSubstrings = []string{"get last write version error"}
p.ExpectArchive = true
p.ExpectAddTask = false
},
},
{
Name: "mutable state version does not match task version",
Configure: func(p *params) {
p.LastWriteVersionBeforeArchival = 1
p.Task.(*tasks.ArchiveExecutionTask).Version = 2
p.ExpectedErrorSubstrings = []string{"version mismatch"}
p.ExpectArchive = false
p.ExpectAddTask = false
},
},
{
Name: "last write version changed during archival",
Configure: func(p *params) {
p.LastWriteVersionAfterArchival = p.LastWriteVersionBeforeArchival + 1
p.ExpectedErrorSubstrings = []string{"version mismatch"}
p.ExpectArchive = true
p.ExpectAddTask = false
},
},
{
Name: "close visibility task complete",
Configure: func(p *params) {
p.CloseVisibilityTaskCompleted = true
},
},
{
Name: "get workflow execution from visibility error",
Configure: func(p *params) {
p.CloseVisibilityTaskCompleted = true
p.GetWorkflowExecutionError = errors.New("get workflow execution error")
p.ExpectedErrorSubstrings = []string{"get workflow execution error"}
p.ExpectArchive = false
p.ExpectAddTask = false
},
},
Expand All @@ -209,10 +310,11 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
// delete time = close time + retention
// delete time = 2 minutes + 1 hour = 1 hour 2 minutes
p.ExpectedDeleteTime = time.Unix(0, 0).Add(time.Minute * 2).Add(time.Hour)
p.Version = 52
p.LastWriteVersionBeforeArchival = 1
p.LastWriteVersionAfterArchival = 1
p.Task = &tasks.ArchiveExecutionTask{
WorkflowKey: p.WorkflowKey,
Version: p.Version,
Version: 1,
}
p.HistoryURI = "test://history/archival"
p.VisibilityURI = "test://visibility/archival"
Expand Down Expand Up @@ -268,7 +370,7 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
cluster.TestCurrentClusterName,
},
},
52,
123,
)
namespaceRegistry.EXPECT().GetNamespaceName(namespaceEntry.ID()).
Return(namespaceEntry.Name(), nil).AnyTimes()
Expand All @@ -278,19 +380,36 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
if p.MutableStateExists {
mutableState := workflow.NewMockMutableState(p.Controller)
mutableState.EXPECT().IsWorkflowExecutionRunning().Return(p.IsWorkflowExecutionRunning).AnyTimes()
mutableState.EXPECT().GetCurrentVersion().Return(p.Version).AnyTimes()
mutableState.EXPECT().GetCurrentVersion().Return(p.LastWriteVersionBeforeArchival).AnyTimes()
mutableState.EXPECT().GetWorkflowKey().Return(p.WorkflowKey).AnyTimes()
workflowContext.EXPECT().LoadMutableState(gomock.Any()).Return(mutableState, nil).AnyTimes()
mutableState.EXPECT().GetCurrentBranchToken().Return(branchToken, nil).AnyTimes()
workflowContext.EXPECT().LoadMutableState(gomock.Any()).Return(
mutableState,
p.LoadMutableStateError,
).AnyTimes()
mutableState.EXPECT().GetCurrentBranchToken().Return(
branchToken,
p.GetCurrentBranchTokenError,
).AnyTimes()
mutableState.EXPECT().GetNamespaceEntry().Return(namespaceEntry).AnyTimes()
mutableState.EXPECT().GetNextEventID().Return(int64(100)).AnyTimes()
mutableState.EXPECT().GetLastWriteVersion().Return(int64(52), nil).AnyTimes()
mutableState.EXPECT().GetWorkflowCloseTime(gomock.Any()).Return(&p.CloseTime, nil).AnyTimes()
mutableState.EXPECT().GetLastWriteVersion().Return(
p.LastWriteVersionBeforeArchival,
p.GetLastWriteVersionBeforeArchivalError,
).MaxTimes(1)
mutableState.EXPECT().GetLastWriteVersion().Return(
p.LastWriteVersionAfterArchival,
p.GetLastWriteVersionAfterArchivalError,
).MaxTimes(1)
mutableState.EXPECT().GetWorkflowCloseTime(gomock.Any()).Return(
&p.CloseTime,
p.GetWorkflowCloseTimeError,
).AnyTimes()
executionInfo := &persistence.WorkflowExecutionInfo{
NamespaceId: tests.NamespaceID.String(),
StartTime: &p.StartTime,
ExecutionTime: &p.ExecutionTime,
CloseTime: &p.CloseTime,
NamespaceId: tests.NamespaceID.String(),
StartTime: &p.StartTime,
ExecutionTime: &p.ExecutionTime,
CloseTime: &p.CloseTime,
CloseVisibilityTaskCompleted: p.CloseVisibilityTaskCompleted,
}
mutableState.EXPECT().GetExecutionInfo().Return(executionInfo).AnyTimes()
executionState := &persistence.WorkflowExecutionState{
Expand All @@ -304,7 +423,7 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
task := ts[0]
assert.Equal(t, p.WorkflowKey, task.WorkflowKey)
assert.Zero(t, task.TaskID)
assert.Equal(t, p.Version, task.Version)
assert.Equal(t, p.LastWriteVersionBeforeArchival, task.Version)
assert.Equal(t, branchToken, task.BranchToken)
assert.True(t, task.WorkflowDataAlreadyArchived)
assert.Equal(t, p.ExpectedDeleteTime, task.VisibilityTimestamp)
Expand All @@ -324,10 +443,21 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
})
}
} else {
workflowContext.EXPECT().LoadMutableState(gomock.Any()).Return(nil, nil).AnyTimes()
workflowContext.EXPECT().LoadMutableState(gomock.Any()).Return(
nil,
p.LoadMutableStateError,
).AnyTimes()
}
workflowCache.EXPECT().GetOrCreateWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(workflowContext, cache.ReleaseCacheFunc(func(err error) {}), nil).AnyTimes()
workflowCache.EXPECT().GetOrCreateWorkflowExecution(
gomock.Any(),
gomock.Any(),
gomock.Any(),
gomock.Any(),
).Return(
workflowContext,
cache.ReleaseCacheFunc(func(err error) {}),
p.GetOrCreateWorkflowExecutionError,
).AnyTimes()

archivalMetadata := carchiver.NewMockArchivalMetadata(p.Controller)
historyConfig := carchiver.NewMockArchivalConfig(p.Controller)
Expand All @@ -352,6 +482,15 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
}

visibilityManager := manager.NewMockVisibilityManager(p.Controller)
if p.CloseVisibilityTaskCompleted {
visibilityManager.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(
&manager.GetWorkflowExecutionResponse{Execution: &workflowpb.WorkflowExecutionInfo{
Memo: nil,
SearchAttributes: nil,
}},
p.GetWorkflowExecutionError,
)
}

executor := NewArchivalQueueTaskExecutor(
a,
Expand Down Expand Up @@ -399,28 +538,38 @@ type testCase struct {

// params represents the parameters for a test within TestArchivalQueueTaskExecutor
type params struct {
Controller *gomock.Controller
IsWorkflowExecutionRunning bool
Retention *time.Duration
Task tasks.Task
ExpectedDeleteTime time.Time
ExpectedErrorSubstrings []string
ExpectArchive bool
ExpectAddTask bool
ExpectedTargets []archival.Target
HistoryConfig archivalConfig
VisibilityConfig archivalConfig
WorkflowKey definition.WorkflowKey
StartTime time.Time
ExecutionTime time.Time
CloseTime time.Time
Version int64
GetNamespaceByIDError error
HistoryURI string
VisibilityURI string
MetricsHandler *metrics.MockHandler
MutableStateExists bool
ArchiveError error
Controller *gomock.Controller
IsWorkflowExecutionRunning bool
Retention *time.Duration
Task tasks.Task
ExpectedDeleteTime time.Time
ExpectedErrorSubstrings []string
ExpectArchive bool
ExpectAddTask bool
ExpectedTargets []archival.Target
HistoryConfig archivalConfig
VisibilityConfig archivalConfig
WorkflowKey definition.WorkflowKey
StartTime time.Time
ExecutionTime time.Time
CloseTime time.Time
GetNamespaceByIDError error
HistoryURI string
VisibilityURI string
MetricsHandler *metrics.MockHandler
MutableStateExists bool
ArchiveError error
GetWorkflowCloseTimeError error
GetCurrentBranchTokenError error
CloseVisibilityTaskCompleted bool
ExpectGetWorkflowExecution bool
GetWorkflowExecutionError error
LoadMutableStateError error
GetOrCreateWorkflowExecutionError error
LastWriteVersionBeforeArchival int64
GetLastWriteVersionBeforeArchivalError error
LastWriteVersionAfterArchival int64
GetLastWriteVersionAfterArchivalError error
}

// archivalConfig represents the user configuration of archival for the cluster and namespace
Expand Down