Skip to content

Commit

Permalink
Fix bug where we return nil when there's an error in the archival queue
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Dec 19, 2022
1 parent edd3f0a commit aa4788b
Show file tree
Hide file tree
Showing 2 changed files with 200 additions and 49 deletions.
16 changes: 9 additions & 7 deletions service/history/archival_queue_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"context"
"errors"
"fmt"
"time"

enumspb "go.temporal.io/api/enums/v1"

Expand Down Expand Up @@ -115,7 +116,7 @@ func (e *archivalQueueTaskExecutor) processArchiveExecutionTask(ctx context.Cont
if err != nil {
return err
}
return e.addDeletionTask(ctx, logger, task)
return e.addDeletionTask(ctx, logger, task, request.CloseTime)
}

// getArchiveTaskRequest returns an archival request for the given archive execution task.
Expand Down Expand Up @@ -230,7 +231,12 @@ func (e *archivalQueueTaskExecutor) getArchiveTaskRequest(
}

// addDeletionTask adds a task to delete workflow history events from primary storage.
func (e *archivalQueueTaskExecutor) addDeletionTask(ctx context.Context, logger log.Logger, task *tasks.ArchiveExecutionTask) error {
func (e *archivalQueueTaskExecutor) addDeletionTask(
ctx context.Context,
logger log.Logger,
task *tasks.ArchiveExecutionTask,
closeTime *time.Time,
) error {
mutableState, err := e.loadAndVersionCheckMutableState(ctx, logger, task)
if err != nil {
return err
Expand All @@ -244,13 +250,9 @@ func (e *archivalQueueTaskExecutor) addDeletionTask(ctx context.Context, logger
mutableState,
e.shardContext.GetConfig(),
)
closeTime, err := mutableState.GetWorkflowCloseTime(ctx)
if err != nil {
return err
}
err = taskGenerator.GenerateDeleteHistoryEventTask(*closeTime, true)
if err != nil {
return nil
return err
}
err = e.shardContext.AddTasks(ctx, &persistence.AddHistoryTasksRequest{
ShardID: e.shardContext.GetShardID(),
Expand Down
233 changes: 191 additions & 42 deletions service/history/archival_queue_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/stretchr/testify/require"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
workflowpb "go.temporal.io/api/workflow/v1"

"go.temporal.io/server/api/persistence/v1"
carchiver "go.temporal.io/server/common/archiver"
Expand Down Expand Up @@ -130,17 +131,28 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
{
Name: "namespace not found",
Configure: func(p *params) {
p.Retention = nil
p.GetNamespaceByIDError = &serviceerror.NamespaceNotFound{}
// namespace not found means we should use default retention
p.ExpectedDeleteTime = p.CloseTime.Add(24 * time.Hour)
},
},
{
Name: "get namespace internal error",
Configure: func(p *params) {
p.GetNamespaceByIDError = serviceerror.NewInternal("get namespace error")
p.ExpectAddTask = false
p.ExpectedErrorSubstrings = []string{
"get namespace error",
}
},
},
{
Name: "wrong task type",
Configure: func(p *params) {
version := p.Task.GetVersion()
p.Task = &tasks.DeleteExecutionTask{
WorkflowKey: p.WorkflowKey,
Version: p.Version,
Version: version,
}
p.ExpectArchive = false
p.ExpectAddTask = false
Expand Down Expand Up @@ -182,8 +194,97 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
{
Name: "archiver error",
Configure: func(p *params) {
p.ArchiveError = errors.New("archive error")
p.ExpectedErrorSubstrings = []string{"archive error"}
p.ArchiveError = errors.New("archiver error")
p.ExpectedErrorSubstrings = []string{"archiver error"}
p.ExpectAddTask = false
},
},
{
Name: "get workflow close time error",
Configure: func(p *params) {
p.GetWorkflowCloseTimeError = errors.New("get workflow close time error")
p.ExpectedErrorSubstrings = []string{"get workflow close time error"}
p.ExpectArchive = false
p.ExpectAddTask = false
},
},
{
Name: "get current branch token error",
Configure: func(p *params) {
p.GetCurrentBranchTokenError = errors.New("get current branch token error")
p.ExpectedErrorSubstrings = []string{"get current branch token error"}
p.ExpectArchive = false
p.ExpectAddTask = false
},
},
{
Name: "load mutable state error",
Configure: func(p *params) {
p.LoadMutableStateError = errors.New("load mutable state error")
p.ExpectedErrorSubstrings = []string{"load mutable state error"}
p.ExpectArchive = false
p.ExpectAddTask = false
},
},
{
Name: "get or create workflow execution error",
Configure: func(p *params) {
p.GetOrCreateWorkflowExecutionError = errors.New("get or create workflow execution error")
p.ExpectedErrorSubstrings = []string{"get or create workflow execution error"}
p.ExpectArchive = false
p.ExpectAddTask = false
},
},
{
Name: "get last write version error before archiving",
Configure: func(p *params) {
p.GetLastWriteVersionBeforeArchivalError = errors.New("get last write version error")
p.ExpectedErrorSubstrings = []string{"get last write version error"}
p.ExpectArchive = false
p.ExpectAddTask = false
},
},
{
Name: "get last write version error after archiving",
Configure: func(p *params) {
p.GetLastWriteVersionAfterArchivalError = errors.New("get last write version error")
p.ExpectedErrorSubstrings = []string{"get last write version error"}
p.ExpectArchive = true
p.ExpectAddTask = false
},
},
{
Name: "mutable state version does not match task version",
Configure: func(p *params) {
p.LastWriteVersionBeforeArchival = 1
p.Task.(*tasks.ArchiveExecutionTask).Version = 2
p.ExpectedErrorSubstrings = []string{"version mismatch"}
p.ExpectArchive = false
p.ExpectAddTask = false
},
},
{
Name: "last write version changed during archival",
Configure: func(p *params) {
p.LastWriteVersionAfterArchival = p.LastWriteVersionBeforeArchival + 1
p.ExpectedErrorSubstrings = []string{"version mismatch"}
p.ExpectArchive = true
p.ExpectAddTask = false
},
},
{
Name: "close visibility task complete",
Configure: func(p *params) {
p.CloseVisibilityTaskCompleted = true
},
},
{
Name: "get workflow execution from visibility error",
Configure: func(p *params) {
p.CloseVisibilityTaskCompleted = true
p.GetWorkflowExecutionError = errors.New("get workflow execution error")
p.ExpectedErrorSubstrings = []string{"get workflow execution error"}
p.ExpectArchive = false
p.ExpectAddTask = false
},
},
Expand All @@ -209,10 +310,11 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
// delete time = close time + retention
// delete time = 2 minutes + 1 hour = 1 hour 2 minutes
p.ExpectedDeleteTime = time.Unix(0, 0).Add(time.Minute * 2).Add(time.Hour)
p.Version = 52
p.LastWriteVersionBeforeArchival = 1
p.LastWriteVersionAfterArchival = 1
p.Task = &tasks.ArchiveExecutionTask{
WorkflowKey: p.WorkflowKey,
Version: p.Version,
Version: 1,
}
p.HistoryURI = "test://history/archival"
p.VisibilityURI = "test://visibility/archival"
Expand Down Expand Up @@ -268,7 +370,7 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
cluster.TestCurrentClusterName,
},
},
52,
123,
)
namespaceRegistry.EXPECT().GetNamespaceName(namespaceEntry.ID()).
Return(namespaceEntry.Name(), nil).AnyTimes()
Expand All @@ -278,19 +380,36 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
if p.MutableStateExists {
mutableState := workflow.NewMockMutableState(p.Controller)
mutableState.EXPECT().IsWorkflowExecutionRunning().Return(p.IsWorkflowExecutionRunning).AnyTimes()
mutableState.EXPECT().GetCurrentVersion().Return(p.Version).AnyTimes()
mutableState.EXPECT().GetCurrentVersion().Return(p.LastWriteVersionBeforeArchival).AnyTimes()
mutableState.EXPECT().GetWorkflowKey().Return(p.WorkflowKey).AnyTimes()
workflowContext.EXPECT().LoadMutableState(gomock.Any()).Return(mutableState, nil).AnyTimes()
mutableState.EXPECT().GetCurrentBranchToken().Return(branchToken, nil).AnyTimes()
workflowContext.EXPECT().LoadMutableState(gomock.Any()).Return(
mutableState,
p.LoadMutableStateError,
).AnyTimes()
mutableState.EXPECT().GetCurrentBranchToken().Return(
branchToken,
p.GetCurrentBranchTokenError,
).AnyTimes()
mutableState.EXPECT().GetNamespaceEntry().Return(namespaceEntry).AnyTimes()
mutableState.EXPECT().GetNextEventID().Return(int64(100)).AnyTimes()
mutableState.EXPECT().GetLastWriteVersion().Return(int64(52), nil).AnyTimes()
mutableState.EXPECT().GetWorkflowCloseTime(gomock.Any()).Return(&p.CloseTime, nil).AnyTimes()
mutableState.EXPECT().GetLastWriteVersion().Return(
p.LastWriteVersionBeforeArchival,
p.GetLastWriteVersionBeforeArchivalError,
).MaxTimes(1)
mutableState.EXPECT().GetLastWriteVersion().Return(
p.LastWriteVersionAfterArchival,
p.GetLastWriteVersionAfterArchivalError,
).MaxTimes(1)
mutableState.EXPECT().GetWorkflowCloseTime(gomock.Any()).Return(
&p.CloseTime,
p.GetWorkflowCloseTimeError,
).AnyTimes()
executionInfo := &persistence.WorkflowExecutionInfo{
NamespaceId: tests.NamespaceID.String(),
StartTime: &p.StartTime,
ExecutionTime: &p.ExecutionTime,
CloseTime: &p.CloseTime,
NamespaceId: tests.NamespaceID.String(),
StartTime: &p.StartTime,
ExecutionTime: &p.ExecutionTime,
CloseTime: &p.CloseTime,
CloseVisibilityTaskCompleted: p.CloseVisibilityTaskCompleted,
}
mutableState.EXPECT().GetExecutionInfo().Return(executionInfo).AnyTimes()
executionState := &persistence.WorkflowExecutionState{
Expand All @@ -304,7 +423,7 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
task := ts[0]
assert.Equal(t, p.WorkflowKey, task.WorkflowKey)
assert.Zero(t, task.TaskID)
assert.Equal(t, p.Version, task.Version)
assert.Equal(t, p.LastWriteVersionBeforeArchival, task.Version)
assert.Equal(t, branchToken, task.BranchToken)
assert.True(t, task.WorkflowDataAlreadyArchived)
assert.Equal(t, p.ExpectedDeleteTime, task.VisibilityTimestamp)
Expand All @@ -324,10 +443,21 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
})
}
} else {
workflowContext.EXPECT().LoadMutableState(gomock.Any()).Return(nil, nil).AnyTimes()
workflowContext.EXPECT().LoadMutableState(gomock.Any()).Return(
nil,
p.LoadMutableStateError,
).AnyTimes()
}
workflowCache.EXPECT().GetOrCreateWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(workflowContext, cache.ReleaseCacheFunc(func(err error) {}), nil).AnyTimes()
workflowCache.EXPECT().GetOrCreateWorkflowExecution(
gomock.Any(),
gomock.Any(),
gomock.Any(),
gomock.Any(),
).Return(
workflowContext,
cache.ReleaseCacheFunc(func(err error) {}),
p.GetOrCreateWorkflowExecutionError,
).AnyTimes()

archivalMetadata := carchiver.NewMockArchivalMetadata(p.Controller)
historyConfig := carchiver.NewMockArchivalConfig(p.Controller)
Expand All @@ -352,6 +482,15 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
}

visibilityManager := manager.NewMockVisibilityManager(p.Controller)
if p.CloseVisibilityTaskCompleted {
visibilityManager.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(
&manager.GetWorkflowExecutionResponse{Execution: &workflowpb.WorkflowExecutionInfo{
Memo: nil,
SearchAttributes: nil,
}},
p.GetWorkflowExecutionError,
)
}

executor := NewArchivalQueueTaskExecutor(
a,
Expand Down Expand Up @@ -399,28 +538,38 @@ type testCase struct {

// params represents the parameters for a test within TestArchivalQueueTaskExecutor
type params struct {
Controller *gomock.Controller
IsWorkflowExecutionRunning bool
Retention *time.Duration
Task tasks.Task
ExpectedDeleteTime time.Time
ExpectedErrorSubstrings []string
ExpectArchive bool
ExpectAddTask bool
ExpectedTargets []archival.Target
HistoryConfig archivalConfig
VisibilityConfig archivalConfig
WorkflowKey definition.WorkflowKey
StartTime time.Time
ExecutionTime time.Time
CloseTime time.Time
Version int64
GetNamespaceByIDError error
HistoryURI string
VisibilityURI string
MetricsHandler *metrics.MockHandler
MutableStateExists bool
ArchiveError error
Controller *gomock.Controller
IsWorkflowExecutionRunning bool
Retention *time.Duration
Task tasks.Task
ExpectedDeleteTime time.Time
ExpectedErrorSubstrings []string
ExpectArchive bool
ExpectAddTask bool
ExpectedTargets []archival.Target
HistoryConfig archivalConfig
VisibilityConfig archivalConfig
WorkflowKey definition.WorkflowKey
StartTime time.Time
ExecutionTime time.Time
CloseTime time.Time
GetNamespaceByIDError error
HistoryURI string
VisibilityURI string
MetricsHandler *metrics.MockHandler
MutableStateExists bool
ArchiveError error
GetWorkflowCloseTimeError error
GetCurrentBranchTokenError error
CloseVisibilityTaskCompleted bool
ExpectGetWorkflowExecution bool
GetWorkflowExecutionError error
LoadMutableStateError error
GetOrCreateWorkflowExecutionError error
LastWriteVersionBeforeArchival int64
GetLastWriteVersionBeforeArchivalError error
LastWriteVersionAfterArchival int64
GetLastWriteVersionAfterArchivalError error
}

// archivalConfig represents the user configuration of archival for the cluster and namespace
Expand Down

0 comments on commit aa4788b

Please sign in to comment.