diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 548e67b3faf..6b0a4ff5018 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -531,8 +531,6 @@ const ( ArchivalProcessorMaxPollRPS = "history.archivalProcessorMaxPollRPS" // ArchivalProcessorMaxPollHostRPS is max poll rate per second for all archivalQueueProcessor on a host ArchivalProcessorMaxPollHostRPS = "history.archivalProcessorMaxPollHostRPS" - // ArchivalTaskMaxRetryCount is max times of retry for archivalQueueProcessor - ArchivalTaskMaxRetryCount = "history.archivalTaskMaxRetryCount" // ArchivalProcessorSchedulerWorkerCount is the number of workers in the host level task scheduler for // archivalQueueProcessor ArchivalProcessorSchedulerWorkerCount = "history.archivalProcessorSchedulerWorkerCount" diff --git a/common/metrics/metric_defs.go b/common/metrics/metric_defs.go index 0fb9144ea27..8557a05815a 100644 --- a/common/metrics/metric_defs.go +++ b/common/metrics/metric_defs.go @@ -1369,14 +1369,18 @@ var ( VersionCheckLatency = NewTimerDef("version_check_latency") // History - CacheRequests = NewCounterDef("cache_requests") - CacheFailures = NewCounterDef("cache_errors") - CacheLatency = NewTimerDef("cache_latency") - CacheMissCounter = NewCounterDef("cache_miss") - HistoryEventNotificationQueueingLatency = NewTimerDef("history_event_notification_queueing_latency") - HistoryEventNotificationFanoutLatency = NewTimerDef("history_event_notification_fanout_latency") - HistoryEventNotificationInFlightMessageGauge = NewGaugeDef("history_event_notification_inflight_message_gauge") - HistoryEventNotificationFailDeliveryCount = NewCounterDef("history_event_notification_fail_delivery_count") + CacheRequests = NewCounterDef("cache_requests") + CacheFailures = NewCounterDef("cache_errors") + CacheLatency = NewTimerDef("cache_latency") + CacheMissCounter = NewCounterDef("cache_miss") + HistoryEventNotificationQueueingLatency = NewTimerDef("history_event_notification_queueing_latency") + HistoryEventNotificationFanoutLatency = NewTimerDef("history_event_notification_fanout_latency") + HistoryEventNotificationInFlightMessageGauge = NewGaugeDef("history_event_notification_inflight_message_gauge") + HistoryEventNotificationFailDeliveryCount = NewCounterDef("history_event_notification_fail_delivery_count") + // ArchivalTaskInvalidURI is emitted by the archival queue task executor when the history or visibility URI for an + // archival task is not a valid URI. + // We may emit this metric several times for a single task if the task is retried. + ArchivalTaskInvalidURI = NewCounterDef("archival_task_invalid_uri") ArchiverClientSendSignalCount = NewCounterDef("archiver_client_sent_signal") ArchiverClientSendSignalFailureCount = NewCounterDef("archiver_client_send_signal_error") ArchiverClientHistoryRequestCount = NewCounterDef("archiver_client_history_request") diff --git a/service/history/archival/archiver.go b/service/history/archival/archiver.go index ac61c4f8548..8d850c5d556 100644 --- a/service/history/archival/archiver.go +++ b/service/history/archival/archiver.go @@ -44,7 +44,6 @@ import ( "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" - "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/common/quotas" "go.temporal.io/server/common/searchattribute" ) @@ -61,18 +60,20 @@ type ( BranchToken []byte NextEventID int64 CloseFailoverVersion int64 - HistoryURI string + // HistoryURI is the URI of the history archival backend. + HistoryURI carchiver.URI // visibility archival WorkflowTypeName string - StartTime time.Time - ExecutionTime time.Time - CloseTime time.Time + StartTime *time.Time + ExecutionTime *time.Time + CloseTime *time.Time Status enumspb.WorkflowExecutionStatus HistoryLength int64 Memo *commonpb.Memo SearchAttributes *commonpb.SearchAttributes - VisibilityURI string + // VisibilityURI is the URI of the visibility archival backend. + VisibilityURI carchiver.URI // archival targets: history and/or visibility Targets []Target @@ -148,7 +149,6 @@ func (a *archiver) Archive(ctx context.Context, request *Request) (res *Response Message: fmt.Sprintf("archival rate limited: %s", err.Error()), } } - var wg sync.WaitGroup errs := make([]error, len(request.Targets)) for i, target := range request.Targets { @@ -178,21 +178,16 @@ func (a *archiver) archiveHistory(ctx context.Context, request *Request, logger logger, tag.ArchivalRequestBranchToken(request.BranchToken), tag.ArchivalRequestCloseFailoverVersion(request.CloseFailoverVersion), - tag.ArchivalURI(request.HistoryURI), + tag.ArchivalURI(request.HistoryURI.String()), ) defer a.recordArchiveTargetResult(logger, time.Now(), TargetHistory, &err) - URI, err := carchiver.NewURI(request.HistoryURI) - if err != nil { - return err - } - - historyArchiver, err := a.archiverProvider.GetHistoryArchiver(URI.Scheme(), request.CallerService) + historyArchiver, err := a.archiverProvider.GetHistoryArchiver(request.HistoryURI.Scheme(), request.CallerService) if err != nil { return err } - return historyArchiver.Archive(ctx, URI, &carchiver.ArchiveHistoryRequest{ + return historyArchiver.Archive(ctx, request.HistoryURI, &carchiver.ArchiveHistoryRequest{ ShardID: request.ShardID, NamespaceID: request.NamespaceID, Namespace: request.Namespace, @@ -207,16 +202,11 @@ func (a *archiver) archiveHistory(ctx context.Context, request *Request, logger func (a *archiver) archiveVisibility(ctx context.Context, request *Request, logger log.Logger) (err error) { logger = log.With( logger, - tag.ArchivalURI(request.VisibilityURI), + tag.ArchivalURI(request.VisibilityURI.String()), ) defer a.recordArchiveTargetResult(logger, time.Now(), TargetVisibility, &err) - uri, err := carchiver.NewURI(request.VisibilityURI) - if err != nil { - return err - } - - visibilityArchiver, err := a.archiverProvider.GetVisibilityArchiver(uri.Scheme(), request.CallerService) + visibilityArchiver, err := a.archiverProvider.GetVisibilityArchiver(request.VisibilityURI.Scheme(), request.CallerService) if err != nil { return err } @@ -226,20 +216,20 @@ func (a *archiver) archiveVisibility(ctx context.Context, request *Request, logg if err != nil { return err } - return visibilityArchiver.Archive(ctx, uri, &archiverspb.VisibilityRecord{ + return visibilityArchiver.Archive(ctx, request.VisibilityURI, &archiverspb.VisibilityRecord{ NamespaceId: request.NamespaceID, Namespace: request.Namespace, WorkflowId: request.WorkflowID, RunId: request.RunID, WorkflowTypeName: request.WorkflowTypeName, - StartTime: timestamp.TimePtr(request.StartTime), - ExecutionTime: timestamp.TimePtr(request.ExecutionTime), - CloseTime: timestamp.TimePtr(request.CloseTime), + StartTime: request.StartTime, + ExecutionTime: request.ExecutionTime, + CloseTime: request.CloseTime, Status: request.Status, HistoryLength: request.HistoryLength, Memo: request.Memo, SearchAttributes: searchAttributes, - HistoryArchivalUri: request.HistoryURI, + HistoryArchivalUri: request.HistoryURI.String(), }) } diff --git a/service/history/archival/archiver_test.go b/service/history/archival/archiver_test.go index ee9f7334b40..77d70ff2afe 100644 --- a/service/history/archival/archiver_test.go +++ b/service/history/archival/archiver_test.go @@ -282,8 +282,8 @@ func TestArchiver(t *testing.T) { archiver := NewArchiver(archiverProvider, logRecorder, metricsHandler, rateLimiter) _, err = archiver.Archive(ctx, &Request{ - HistoryURI: historyURI.String(), - VisibilityURI: visibilityURI.String(), + HistoryURI: historyURI, + VisibilityURI: visibilityURI, Targets: c.Targets, }) diff --git a/service/history/archival_queue_task_executor.go b/service/history/archival_queue_task_executor.go new file mode 100644 index 00000000000..0e9a3cc320e --- /dev/null +++ b/service/history/archival_queue_task_executor.go @@ -0,0 +1,349 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package history + +import ( + "context" + "errors" + "fmt" + + enumspb "go.temporal.io/api/enums/v1" + + carchiver "go.temporal.io/server/common/archiver" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/log/tag" + "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/persistence" + "go.temporal.io/server/common/primitives" + "go.temporal.io/server/service/history/archival" + "go.temporal.io/server/service/history/queues" + "go.temporal.io/server/service/history/shard" + "go.temporal.io/server/service/history/tasks" + "go.temporal.io/server/service/history/workflow" + "go.temporal.io/server/service/history/workflow/cache" +) + +// NewArchivalQueueTaskExecutor creates a new queue task executor for the archival queue. +// If you use this executor, you must monitor for any metrics.ArchivalTaskInvalidURI errors. +// If this metric is emitted, it means that an archival URI is invalid and the task will never succeed, which is a +// serious problem because the archival queue retries tasks forever. +func NewArchivalQueueTaskExecutor( + archiver archival.Archiver, + shardContext shard.Context, + workflowCache cache.Cache, + relocatableAttributesFetcher workflow.RelocatableAttributesFetcher, + metricsHandler metrics.MetricsHandler, + logger log.Logger, +) queues.Executor { + return &archivalQueueTaskExecutor{ + archiver: archiver, + shardContext: shardContext, + workflowCache: workflowCache, + relocatableAttributesFetcher: relocatableAttributesFetcher, + metricsHandler: metricsHandler, + logger: logger, + } +} + +// archivalQueueTaskExecutor is an implementation of queues.Executor for the archival queue. +type archivalQueueTaskExecutor struct { + archiver archival.Archiver + shardContext shard.Context + workflowCache cache.Cache + metricsHandler metrics.MetricsHandler + logger log.Logger + relocatableAttributesFetcher workflow.RelocatableAttributesFetcher +} + +// Execute executes a task from the archival queue. +func (e *archivalQueueTaskExecutor) Execute( + ctx context.Context, + executable queues.Executable, +) (tags []metrics.Tag, isActive bool, err error) { + task := executable.GetTask() + taskType := queues.GetArchivalTaskTypeTagValue(task) + tags = []metrics.Tag{ + getNamespaceTagByID(e.shardContext.GetNamespaceRegistry(), task.GetNamespaceID()), + metrics.TaskTypeTag(taskType), + } + switch task := task.(type) { + case *tasks.ArchiveExecutionTask: + err = e.processArchiveExecutionTask(ctx, task) + if err == ErrMutableStateIsNil || err == ErrWorkflowExecutionIsStillRunning { + // If either of these errors are returned, it means that we can just drop the task. + err = nil + } + default: + err = fmt.Errorf("task with invalid type sent to archival queue: %+v", task) + } + return tags, true, err +} + +// processArchiveExecutionTask processes a tasks.ArchiveExecutionTask +// First, we load the mutable state to populate an archival.Request. +// Second, we unlock the mutable state and send the archival request to the archival.Archiver. +// Finally, we lock the mutable state again, and send a deletion follow-up task to the history engine. +func (e *archivalQueueTaskExecutor) processArchiveExecutionTask(ctx context.Context, task *tasks.ArchiveExecutionTask) (err error) { + logger := log.With(e.logger, tag.Task(task)) + request, err := e.getArchiveTaskRequest(ctx, logger, task) + if err != nil { + return err + } + _, err = e.archiver.Archive(ctx, request) + if err != nil { + return err + } + return e.addDeletionTask(ctx, logger, task) +} + +// getArchiveTaskRequest returns an archival request for the given archive execution task. +func (e *archivalQueueTaskExecutor) getArchiveTaskRequest( + ctx context.Context, + logger log.Logger, + task *tasks.ArchiveExecutionTask, +) (request *archival.Request, err error) { + mutableState, err := e.loadAndVersionCheckMutableState(ctx, logger, task) + if err != nil { + return nil, err + } + defer func() { + mutableState.Release(err) + }() + + namespaceEntry := mutableState.GetNamespaceEntry() + namespaceName := namespaceEntry.Name() + nextEventID := mutableState.GetNextEventID() + executionInfo := mutableState.GetExecutionInfo() + executionState := mutableState.GetExecutionState() + + logger = log.With(logger, + tag.WorkflowNamespaceID(executionInfo.NamespaceId), + tag.WorkflowID(executionInfo.WorkflowId), + tag.WorkflowRunID(executionState.RunId), + ) + + closeTime, err := mutableState.GetWorkflowCloseTime(ctx) + if err != nil { + return nil, err + } + branchToken, err := mutableState.GetCurrentBranchToken() + if err != nil { + return nil, err + } + + var targets []archival.Target + if e.shardContext.GetArchivalMetadata().GetVisibilityConfig().ClusterConfiguredForArchival() && + namespaceEntry.VisibilityArchivalState().State == enumspb.ARCHIVAL_STATE_ENABLED { + targets = append(targets, archival.TargetVisibility) + } + if e.shardContext.GetArchivalMetadata().GetHistoryConfig().ClusterConfiguredForArchival() && + namespaceEntry.HistoryArchivalState().State == enumspb.ARCHIVAL_STATE_ENABLED { + targets = append(targets, archival.TargetHistory) + } + if len(targets) == 0 { + return nil, fmt.Errorf( + "no archival targets configured for archive execution task: %+v", + task.WorkflowKey, + ) + } + + historyURIString := namespaceEntry.HistoryArchivalState().URI + historyURI, err := carchiver.NewURI(historyURIString) + if err != nil { + e.metricsHandler.Counter(metrics.ArchivalTaskInvalidURI.GetMetricName()).Record( + 1, + metrics.NamespaceTag(namespaceName.String()), + metrics.FailureTag("invalid_history_uri"), + ) + logger.Error( + "Failed to parse history URI.", + tag.ArchivalURI(historyURIString), + tag.Error(err), + ) + return nil, fmt.Errorf("failed to parse history URI for archival task: %w", err) + } + visibilityURIString := namespaceEntry.VisibilityArchivalState().URI + visibilityURI, err := carchiver.NewURI(visibilityURIString) + if err != nil { + e.metricsHandler.Counter(metrics.ArchivalTaskInvalidURI.GetMetricName()).Record( + 1, + metrics.NamespaceTag(namespaceName.String()), + metrics.FailureTag("invalid_visibility_uri"), + ) + logger.Error( + "Failed to parse visibility URI.", + tag.ArchivalURI(visibilityURIString), + tag.Error(err), + ) + return nil, fmt.Errorf("failed to parse visibility URI for archival task: %w", err) + } + workflowAttributes, err := e.relocatableAttributesFetcher.Fetch(ctx, mutableState) + if err != nil { + return nil, err + } + + request = &archival.Request{ + ShardID: e.shardContext.GetShardID(), + NamespaceID: task.NamespaceID, + Namespace: namespaceName.String(), + WorkflowID: task.WorkflowID, + RunID: task.RunID, + BranchToken: branchToken, + NextEventID: nextEventID, + CloseFailoverVersion: mutableState.LastWriteVersion, + HistoryURI: historyURI, + VisibilityURI: visibilityURI, + WorkflowTypeName: executionInfo.GetWorkflowTypeName(), + StartTime: executionInfo.GetStartTime(), + ExecutionTime: executionInfo.GetExecutionTime(), + CloseTime: closeTime, + Status: executionState.Status, + HistoryLength: nextEventID - 1, + Memo: workflowAttributes.Memo, + SearchAttributes: workflowAttributes.SearchAttributes, + Targets: targets, + CallerService: string(primitives.HistoryService), + } + return request, nil +} + +// addDeletionTask adds a task to delete workflow history events from primary storage. +func (e *archivalQueueTaskExecutor) addDeletionTask(ctx context.Context, logger log.Logger, task *tasks.ArchiveExecutionTask) error { + mutableState, err := e.loadAndVersionCheckMutableState(ctx, logger, task) + if err != nil { + return err + } + defer func() { + mutableState.Release(err) + }() + + taskGenerator := workflow.NewTaskGenerator( + e.shardContext.GetNamespaceRegistry(), + mutableState, + e.shardContext.GetConfig(), + ) + closeTime, err := mutableState.GetWorkflowCloseTime(ctx) + if err != nil { + return err + } + err = taskGenerator.GenerateDeleteHistoryEventTask(*closeTime, true) + if err != nil { + return nil + } + err = e.shardContext.AddTasks(ctx, &persistence.AddHistoryTasksRequest{ + ShardID: e.shardContext.GetShardID(), + NamespaceID: task.GetNamespaceID(), + WorkflowID: task.WorkflowID, + RunID: task.RunID, + Tasks: mutableState.PopTasks(), + }) + return err +} + +// lockedMutableState is a wrapper around mutable state that includes the last write version of the mutable state and +// a function to release this mutable state's context when we're done with it. +type lockedMutableState struct { + // MutableState is the mutable state that is being wrapped. You may call any method on this object safely since + // the state is locked. + workflow.MutableState + // LastWriteVersion is the last write version of the mutable state. We store this here so that we don't have to + // call GetLastWriteVersion() on the mutable state object again. + LastWriteVersion int64 + // Release is a function that releases the context of the mutable state. This function should be called when + // you are done with the mutable state. + Release cache.ReleaseCacheFunc +} + +// newLockedMutableState returns a new lockedMutableState with the given mutable state, +// last write version and release function +func newLockedMutableState( + mutableState workflow.MutableState, + version int64, + releaseFunc cache.ReleaseCacheFunc, +) *lockedMutableState { + return &lockedMutableState{ + MutableState: mutableState, + LastWriteVersion: version, + Release: releaseFunc, + } +} + +var ( + // ErrMutableStateIsNil is returned when the mutable state is nil + ErrMutableStateIsNil = errors.New("mutable state is nil") + // ErrWorkflowExecutionIsStillRunning is returned when the workflow execution is still running + ErrWorkflowExecutionIsStillRunning = errors.New("workflow execution is still running") +) + +// loadAndVersionCheckMutableState loads the mutable state for the given task and performs a version check to verify +// that the mutable state's last write version is equal to the task's version. If the version check fails then +// the mutable state is released and an error is returned. This can happen if this task was already processed. +func (e *archivalQueueTaskExecutor) loadAndVersionCheckMutableState( + ctx context.Context, + logger log.Logger, + task tasks.Task, +) (lockedMutableState *lockedMutableState, err error) { + weContext, release, err := getWorkflowExecutionContextForTask(ctx, e.workflowCache, task) + if err != nil { + return nil, err + } + defer func() { + // If we return an error, the caller will not release the mutable state, so we need to do it here. + if err != nil { + release(err) + } + // If we don't return an error, the caller will release the mutable state, so we don't need to do it here. + }() + + mutableState, err := weContext.LoadMutableState(ctx) + if err != nil { + return nil, err + } + if mutableState == nil { + logger.Warn("Dropping archival task because mutable state is nil.") + return nil, ErrMutableStateIsNil + } + if mutableState.IsWorkflowExecutionRunning() { + logger.Warn("Dropping archival task because workflow is still running.") + return nil, ErrWorkflowExecutionIsStillRunning + } + lastWriteVersion, err := mutableState.GetLastWriteVersion() + if err != nil { + return nil, err + } + namespaceEntry := mutableState.GetNamespaceEntry() + err = CheckTaskVersion( + e.shardContext, + logger, + namespaceEntry, + lastWriteVersion, + task.GetVersion(), + task, + ) + if err != nil { + return nil, err + } + return newLockedMutableState(mutableState, lastWriteVersion, release), nil +} diff --git a/service/history/archival_queue_task_executor_test.go b/service/history/archival_queue_task_executor_test.go new file mode 100644 index 00000000000..9d047149e44 --- /dev/null +++ b/service/history/archival_queue_task_executor_test.go @@ -0,0 +1,430 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package history + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + enumspb "go.temporal.io/api/enums/v1" + "go.temporal.io/api/serviceerror" + + "go.temporal.io/server/api/persistence/v1" + carchiver "go.temporal.io/server/common/archiver" + "go.temporal.io/server/common/clock" + "go.temporal.io/server/common/cluster" + "go.temporal.io/server/common/definition" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/namespace" + cpersistence "go.temporal.io/server/common/persistence" + "go.temporal.io/server/common/persistence/visibility/manager" + "go.temporal.io/server/common/primitives/timestamp" + "go.temporal.io/server/service/history/archival" + "go.temporal.io/server/service/history/queues" + "go.temporal.io/server/service/history/shard" + "go.temporal.io/server/service/history/tasks" + "go.temporal.io/server/service/history/tests" + "go.temporal.io/server/service/history/workflow" + "go.temporal.io/server/service/history/workflow/cache" +) + +func TestArchivalQueueTaskExecutor(t *testing.T) { + for _, c := range []testCase{ + { + Name: "success", + Configure: func(p *params) { + }, + }, + { + Name: "history archival disabled for cluster", + Configure: func(p *params) { + p.HistoryConfig.ClusterEnabled = false + p.ExpectedTargets = []archival.Target{ + archival.TargetVisibility, + } + }, + }, + { + Name: "history archival disabled for namespace", + Configure: func(p *params) { + p.HistoryConfig.NamespaceArchivalState = carchiver.ArchivalDisabled + p.ExpectedTargets = []archival.Target{ + archival.TargetVisibility, + } + }, + }, + { + Name: "visibility archival disabled for cluster", + Configure: func(p *params) { + p.VisibilityConfig.ClusterEnabled = false + p.ExpectedTargets = []archival.Target{ + archival.TargetHistory, + } + }, + }, + { + Name: "visibility archival disabled for namespace", + Configure: func(p *params) { + p.VisibilityConfig.NamespaceArchivalState = carchiver.ArchivalDisabled + p.ExpectedTargets = []archival.Target{ + archival.TargetHistory, + } + }, + }, + { + Name: "both history and visibility archival disabled", + Configure: func(p *params) { + p.VisibilityConfig.NamespaceArchivalState = carchiver.ArchivalDisabled + p.HistoryConfig.NamespaceArchivalState = carchiver.ArchivalDisabled + p.ExpectedErrorSubstrings = []string{ + "no archival targets", + } + p.ExpectArchive = false + p.ExpectAddTask = false + }, + }, + { + Name: "running execution", + Configure: func(p *params) { + p.IsWorkflowExecutionRunning = true + p.ExpectArchive = false + p.ExpectAddTask = false + }, + }, + { + Name: "nil mutable state", + Configure: func(p *params) { + p.MutableStateExists = false + p.ExpectArchive = false + p.ExpectAddTask = false + }, + }, + { + Name: "namespace not found", + Configure: func(p *params) { + p.Retention = nil + p.GetNamespaceByIDError = &serviceerror.NamespaceNotFound{} + p.ExpectedDeleteTime = p.CloseTime.Add(24 * time.Hour) + }, + }, + { + Name: "wrong task type", + Configure: func(p *params) { + p.Task = &tasks.DeleteExecutionTask{ + WorkflowKey: p.WorkflowKey, + Version: p.Version, + } + p.ExpectArchive = false + p.ExpectAddTask = false + p.ExpectedErrorSubstrings = []string{"invalid type"} + }, + }, + { + Name: "invalid history URI", + Configure: func(p *params) { + p.HistoryURI = "invalid_uri" + p.ExpectedErrorSubstrings = []string{"history URI", "parse"} + p.ExpectArchive = false + p.ExpectAddTask = false + mockCounter := metrics.NewMockCounterMetric(p.Controller) + mockCounter.EXPECT().Record( + int64(1), + metrics.NamespaceTag(tests.Namespace.String()), + metrics.FailureTag("invalid_history_uri"), + ) + p.MetricsHandler.EXPECT().Counter("archival_task_invalid_uri").Return(mockCounter) + }, + }, + { + Name: "invalid visibility URI", + Configure: func(p *params) { + p.VisibilityURI = "invalid_uri" + p.ExpectedErrorSubstrings = []string{"visibility URI", "parse"} + p.ExpectArchive = false + p.ExpectAddTask = false + mockCounter := metrics.NewMockCounterMetric(p.Controller) + mockCounter.EXPECT().Record( + int64(1), + metrics.NamespaceTag(tests.Namespace.String()), + metrics.FailureTag("invalid_visibility_uri"), + ) + p.MetricsHandler.EXPECT().Counter("archival_task_invalid_uri").Return(mockCounter) + }, + }, + { + Name: "archiver error", + Configure: func(p *params) { + p.ArchiveError = errors.New("archive error") + p.ExpectedErrorSubstrings = []string{"archive error"} + p.ExpectAddTask = false + }, + }, + } { + c := c // store c in closure to prevent loop from changing it when a parallel task is accessing it + t.Run(c.Name, func(t *testing.T) { + t.Parallel() + var p params + p.Controller = gomock.NewController(t) + p.HistoryConfig.NamespaceArchivalState = carchiver.ArchivalEnabled + p.VisibilityConfig.NamespaceArchivalState = carchiver.ArchivalEnabled + p.HistoryConfig.ClusterEnabled = true + p.VisibilityConfig.ClusterEnabled = true + p.WorkflowKey = definition.NewWorkflowKey( + tests.NamespaceID.String(), + tests.WorkflowID, + tests.RunID, + ) + p.StartTime = time.Unix(0, 0) + p.ExecutionTime = time.Unix(0, 0) + p.CloseTime = time.Unix(0, 0).Add(time.Minute * 2) + p.Retention = timestamp.DurationPtr(time.Hour) + // delete time = close time + retention + // delete time = 2 minutes + 1 hour = 1 hour 2 minutes + p.ExpectedDeleteTime = time.Unix(0, 0).Add(time.Minute * 2).Add(time.Hour) + p.Version = 52 + p.Task = &tasks.ArchiveExecutionTask{ + WorkflowKey: p.WorkflowKey, + Version: p.Version, + } + p.HistoryURI = "test://history/archival" + p.VisibilityURI = "test://visibility/archival" + p.ExpectedTargets = []archival.Target{ + archival.TargetHistory, + archival.TargetVisibility, + } + p.ExpectArchive = true + p.ExpectAddTask = true + p.MetricsHandler = metrics.NewMockMetricsHandler(p.Controller) + p.MutableStateExists = true + + c.Configure(&p) + namespaceRegistry := namespace.NewMockRegistry(p.Controller) + task := p.Task + shardContext := shard.NewMockContext(p.Controller) + workflowCache := cache.NewMockCache(p.Controller) + workflowContext := workflow.NewMockContext(p.Controller) + branchToken := []byte{42} + logger := log.NewNoopLogger() + timeSource := clock.NewRealTimeSource() + a := archival.NewMockArchiver(p.Controller) + + shardContext.EXPECT().GetNamespaceRegistry().Return(namespaceRegistry).AnyTimes() + cfg := tests.NewDynamicConfig() + cfg.RetentionTimerJitterDuration = func() time.Duration { + return 0 + } + shardContext.EXPECT().GetConfig().Return(cfg).AnyTimes() + mockMetadata := cluster.NewMockMetadata(p.Controller) + mockMetadata.EXPECT().IsGlobalNamespaceEnabled().Return(true).AnyTimes() + shardContext.EXPECT().GetClusterMetadata().Return(mockMetadata).AnyTimes() + + shardID := int32(1) + historyArchivalState := p.HistoryConfig.NamespaceArchivalState + visibilityArchivalState := p.VisibilityConfig.NamespaceArchivalState + + namespaceEntry := namespace.NewGlobalNamespaceForTest( + &persistence.NamespaceInfo{ + Id: tests.NamespaceID.String(), + Name: tests.Namespace.String(), + }, + &persistence.NamespaceConfig{ + Retention: p.Retention, + HistoryArchivalState: enumspb.ArchivalState(historyArchivalState), + HistoryArchivalUri: p.HistoryURI, + VisibilityArchivalState: enumspb.ArchivalState(visibilityArchivalState), + VisibilityArchivalUri: p.VisibilityURI, + }, + &persistence.NamespaceReplicationConfig{ + ActiveClusterName: cluster.TestCurrentClusterName, + Clusters: []string{ + cluster.TestCurrentClusterName, + }, + }, + 52, + ) + namespaceRegistry.EXPECT().GetNamespaceName(namespaceEntry.ID()). + Return(namespaceEntry.Name(), nil).AnyTimes() + namespaceRegistry.EXPECT().GetNamespaceByID(namespaceEntry.ID()). + Return(namespaceEntry, p.GetNamespaceByIDError).AnyTimes() + + if p.MutableStateExists { + mutableState := workflow.NewMockMutableState(p.Controller) + mutableState.EXPECT().IsWorkflowExecutionRunning().Return(p.IsWorkflowExecutionRunning).AnyTimes() + mutableState.EXPECT().GetCurrentVersion().Return(p.Version).AnyTimes() + mutableState.EXPECT().GetWorkflowKey().Return(p.WorkflowKey).AnyTimes() + workflowContext.EXPECT().LoadMutableState(gomock.Any()).Return(mutableState, nil).AnyTimes() + mutableState.EXPECT().GetCurrentBranchToken().Return(branchToken, nil).AnyTimes() + mutableState.EXPECT().GetNamespaceEntry().Return(namespaceEntry).AnyTimes() + mutableState.EXPECT().GetNextEventID().Return(int64(100)).AnyTimes() + mutableState.EXPECT().GetLastWriteVersion().Return(int64(52), nil).AnyTimes() + mutableState.EXPECT().GetWorkflowCloseTime(gomock.Any()).Return(&p.CloseTime, nil).AnyTimes() + executionInfo := &persistence.WorkflowExecutionInfo{ + NamespaceId: tests.NamespaceID.String(), + StartTime: &p.StartTime, + ExecutionTime: &p.ExecutionTime, + CloseTime: &p.CloseTime, + } + mutableState.EXPECT().GetExecutionInfo().Return(executionInfo).AnyTimes() + executionState := &persistence.WorkflowExecutionState{ + State: 0, + Status: 0, + } + mutableState.EXPECT().GetExecutionState().Return(executionState).AnyTimes() + if p.ExpectAddTask { + mutableState.EXPECT().AddTasks(gomock.Any()).Do(func(ts ...*tasks.DeleteHistoryEventTask) { + require.Len(t, ts, 1) + task := ts[0] + assert.Equal(t, p.WorkflowKey, task.WorkflowKey) + assert.Zero(t, task.TaskID) + assert.Equal(t, p.Version, task.Version) + assert.Equal(t, branchToken, task.BranchToken) + assert.True(t, task.WorkflowDataAlreadyArchived) + assert.Equal(t, p.ExpectedDeleteTime, task.VisibilityTimestamp) + popTasks := map[tasks.Category][]tasks.Task{ + tasks.CategoryTimer: { + task, + }, + } + mutableState.EXPECT().PopTasks().Return(popTasks) + shardContext.EXPECT().AddTasks(gomock.Any(), &cpersistence.AddHistoryTasksRequest{ + ShardID: shardID, + NamespaceID: tests.NamespaceID.String(), + WorkflowID: task.WorkflowID, + RunID: task.RunID, + Tasks: popTasks, + }) + }) + } + } else { + workflowContext.EXPECT().LoadMutableState(gomock.Any()).Return(nil, nil).AnyTimes() + } + workflowCache.EXPECT().GetOrCreateWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(workflowContext, cache.ReleaseCacheFunc(func(err error) {}), nil).AnyTimes() + + archivalMetadata := carchiver.NewMockArchivalMetadata(p.Controller) + historyConfig := carchiver.NewMockArchivalConfig(p.Controller) + historyConfig.EXPECT().ClusterConfiguredForArchival().Return(p.HistoryConfig.ClusterEnabled).AnyTimes() + archivalMetadata.EXPECT().GetHistoryConfig().Return(historyConfig).AnyTimes() + visibilityConfig := carchiver.NewMockArchivalConfig(p.Controller) + visibilityConfig.EXPECT().ClusterConfiguredForArchival().Return(p.VisibilityConfig.ClusterEnabled).AnyTimes() + archivalMetadata.EXPECT().GetVisibilityConfig().Return(visibilityConfig).AnyTimes() + shardContext.EXPECT().GetArchivalMetadata().Return(archivalMetadata).AnyTimes() + shardContext.EXPECT().GetShardID().Return(shardID).AnyTimes() + + if p.ExpectArchive { + a.EXPECT().Archive(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, + request *archival.Request) (*archival.Response, error) { + assert.Equal(t, p.StartTime, *request.StartTime) + assert.Equal(t, p.ExecutionTime, *request.ExecutionTime) + assert.Equal(t, p.CloseTime, *request.CloseTime) + assert.ElementsMatch(t, p.ExpectedTargets, request.Targets) + + return &archival.Response{}, p.ArchiveError + }) + } + + visibilityManager := manager.NewMockVisibilityManager(p.Controller) + + executor := NewArchivalQueueTaskExecutor( + a, + shardContext, + workflowCache, + workflow.NewRelocatableAttributesFetcher(visibilityManager), + p.MetricsHandler, + logger, + ) + executable := queues.NewExecutable( + queues.DefaultReaderId, + task, + nil, + executor, + nil, + nil, + queues.NewNoopPriorityAssigner(), + timeSource, + namespaceRegistry, + nil, + metrics.NoopMetricsHandler, + nil, + nil, + ) + err := executable.Execute() + if len(p.ExpectedErrorSubstrings) > 0 { + require.Error(t, err) + for _, s := range p.ExpectedErrorSubstrings { + assert.ErrorContains(t, err, s) + } + } else { + assert.Nil(t, err) + } + }) + } +} + +// testCase represents a single test case for TestArchivalQueueTaskExecutor +type testCase struct { + // Name is the name of the test case + Name string + // Configure is a function that takes the default params and modifies them for the test case + Configure func(*params) +} + +// params represents the parameters for a test within TestArchivalQueueTaskExecutor +type params struct { + Controller *gomock.Controller + IsWorkflowExecutionRunning bool + Retention *time.Duration + Task tasks.Task + ExpectedDeleteTime time.Time + ExpectedErrorSubstrings []string + ExpectArchive bool + ExpectAddTask bool + ExpectedTargets []archival.Target + HistoryConfig archivalConfig + VisibilityConfig archivalConfig + WorkflowKey definition.WorkflowKey + StartTime time.Time + ExecutionTime time.Time + CloseTime time.Time + Version int64 + GetNamespaceByIDError error + HistoryURI string + VisibilityURI string + MetricsHandler *metrics.MockMetricsHandler + MutableStateExists bool + ArchiveError error +} + +// archivalConfig represents the user configuration of archival for the cluster and namespace +type archivalConfig struct { + ClusterEnabled bool + NamespaceArchivalState carchiver.ArchivalState +} diff --git a/service/history/workflow/task_generator.go b/service/history/workflow/task_generator.go index fe416f05878..39c9ef21af1 100644 --- a/service/history/workflow/task_generator.go +++ b/service/history/workflow/task_generator.go @@ -55,6 +55,12 @@ type ( closeEvent *historypb.HistoryEvent, deleteAfterClose bool, ) error + // GenerateDeleteHistoryEventTask adds a tasks.DeleteHistoryEventTask to the mutable state. + // This task is used to delete the history events of the workflow execution after the retention period expires. + // If workflowDataAlreadyArchived is true, then the workflow data is already archived, + // so we can delete the history immediately. Otherwise, we need to archive the history first before we can + // safely delete it. + GenerateDeleteHistoryEventTask(closeTime time.Time, workflowDataAlreadyArchived bool) error GenerateDeleteExecutionTask() (*tasks.DeleteExecutionTask, error) GenerateRecordWorkflowStartedTasks( startEvent *historypb.HistoryEvent, @@ -142,6 +148,8 @@ func (r *TaskGeneratorImpl) GenerateWorkflowStartTasks( return nil } +// archivalDelayJitterCoefficient is a variable because we need to override it to 0 in unit tests to make them +// deterministic. var archivalDelayJitterCoefficient = 1.0 func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks( @@ -150,22 +158,6 @@ func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks( ) error { currentVersion := r.mutableState.GetCurrentVersion() - executionInfo := r.mutableState.GetExecutionInfo() - - retention := defaultWorkflowRetention - namespaceEntry, err := r.namespaceRegistry.GetNamespaceByID(namespace.ID(executionInfo.NamespaceId)) - switch err.(type) { - case nil: - retention = namespaceEntry.Retention() - case *serviceerror.NamespaceNotFound: - // namespace is not accessible, use default value above - default: - return err - } - branchToken, err := r.mutableState.GetCurrentBranchToken() - if err != nil { - return err - } closeTasks := []tasks.Task{ &tasks.CloseExecutionTask{ @@ -200,28 +192,31 @@ func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks( }, ) if r.config.DurableArchivalEnabled() { + retention, err := r.getRetention() + if err != nil { + return err + } + // We schedule the archival task for a random time in the near future to avoid sending a surge of tasks + // to the archival system at the same time delay := backoff.JitDuration(r.config.ArchivalProcessorArchiveDelay(), archivalDelayJitterCoefficient) / 2 if delay > retention { delay = retention } - + // archiveTime is the time when the archival queue recognizes the ArchiveExecutionTask as ready-to-process archiveTime := closeEvent.GetEventTime().Add(delay) - closeTasks = append(closeTasks, &tasks.ArchiveExecutionTask{ + + task := &tasks.ArchiveExecutionTask{ // TaskID is set by the shard WorkflowKey: r.mutableState.GetWorkflowKey(), VisibilityTimestamp: archiveTime, Version: currentVersion, - }) + } + closeTasks = append(closeTasks, task) } else { closeTime := timestamp.TimeValue(closeEvent.GetEventTime()) - retentionJitterDuration := backoff.JitDuration(r.config.RetentionTimerJitterDuration(), 1) / 2 - closeTasks = append(closeTasks, &tasks.DeleteHistoryEventTask{ - // TaskID is set by shard - WorkflowKey: r.mutableState.GetWorkflowKey(), - VisibilityTimestamp: closeTime.Add(retention).Add(retentionJitterDuration), - Version: currentVersion, - BranchToken: branchToken, - }) + if err := r.GenerateDeleteHistoryEventTask(closeTime, false); err != nil { + return err + } } } @@ -229,6 +224,53 @@ func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks( return nil } +// getRetention returns the retention period for this task generator's workflow execution. +// The retention period represents how long the workflow data should exist in primary storage after the workflow closes. +// If the workflow namespace is not found, the default retention period is returned. +// This method returns an error when the GetNamespaceByID call fails with anything other than +// serviceerror.NamespaceNotFound. +func (r *TaskGeneratorImpl) getRetention() (time.Duration, error) { + retention := defaultWorkflowRetention + executionInfo := r.mutableState.GetExecutionInfo() + namespaceEntry, err := r.namespaceRegistry.GetNamespaceByID(namespace.ID(executionInfo.NamespaceId)) + switch err.(type) { + case nil: + retention = namespaceEntry.Retention() + case *serviceerror.NamespaceNotFound: + // namespace is not accessible, use default value above + default: + return 0, err + } + return retention, nil +} + +// GenerateDeleteHistoryEventTask adds a task to delete all history events for a workflow execution. +// This method only adds the task to the mutable state object in memory; it does not write the task to the database. +// You must call shard.Context#AddTasks to notify the history engine of this task. +func (r *TaskGeneratorImpl) GenerateDeleteHistoryEventTask(closeTime time.Time, workflowDataAlreadyArchived bool) error { + retention, err := r.getRetention() + if err != nil { + return err + } + currentVersion := r.mutableState.GetCurrentVersion() + branchToken, err := r.mutableState.GetCurrentBranchToken() + if err != nil { + return err + } + + retentionJitterDuration := backoff.JitDuration(r.config.RetentionTimerJitterDuration(), 1) / 2 + deleteTime := closeTime.Add(retention).Add(retentionJitterDuration) + r.mutableState.AddTasks(&tasks.DeleteHistoryEventTask{ + // TaskID is set by shard + WorkflowKey: r.mutableState.GetWorkflowKey(), + VisibilityTimestamp: deleteTime, + Version: currentVersion, + BranchToken: branchToken, + WorkflowDataAlreadyArchived: workflowDataAlreadyArchived, + }) + return nil +} + func (r *TaskGeneratorImpl) GenerateDeleteExecutionTask() (*tasks.DeleteExecutionTask, error) { return &tasks.DeleteExecutionTask{ // TaskID, VisibilityTimestamp is set by shard diff --git a/service/history/workflow/task_generator_mock.go b/service/history/workflow/task_generator_mock.go index 7f20a71707e..f528ee3acf8 100644 --- a/service/history/workflow/task_generator_mock.go +++ b/service/history/workflow/task_generator_mock.go @@ -30,6 +30,7 @@ package workflow import ( reflect "reflect" + time "time" gomock "github.com/golang/mock/gomock" history "go.temporal.io/api/history/v1" @@ -144,6 +145,20 @@ func (mr *MockTaskGeneratorMockRecorder) GenerateDeleteExecutionTask() *gomock.C return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GenerateDeleteExecutionTask", reflect.TypeOf((*MockTaskGenerator)(nil).GenerateDeleteExecutionTask)) } +// GenerateDeleteHistoryEventTask mocks base method. +func (m *MockTaskGenerator) GenerateDeleteHistoryEventTask(closeTime time.Time, workflowDataAlreadyArchived bool) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GenerateDeleteHistoryEventTask", closeTime, workflowDataAlreadyArchived) + ret0, _ := ret[0].(error) + return ret0 +} + +// GenerateDeleteHistoryEventTask indicates an expected call of GenerateDeleteHistoryEventTask. +func (mr *MockTaskGeneratorMockRecorder) GenerateDeleteHistoryEventTask(closeTime, workflowDataAlreadyArchived interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GenerateDeleteHistoryEventTask", reflect.TypeOf((*MockTaskGenerator)(nil).GenerateDeleteHistoryEventTask), closeTime, workflowDataAlreadyArchived) +} + // GenerateHistoryReplicationTasks mocks base method. func (m *MockTaskGenerator) GenerateHistoryReplicationTasks(branchToken []byte, events []*history.HistoryEvent) error { m.ctrl.T.Helper() diff --git a/service/history/workflow/task_generator_test.go b/service/history/workflow/task_generator_test.go index da1936b12e8..fa7e3dc1e8a 100644 --- a/service/history/workflow/task_generator_test.go +++ b/service/history/workflow/task_generator_test.go @@ -194,7 +194,7 @@ func TestTaskGeneratorImpl_GenerateWorkflowCloseTasks(t *testing.T) { mutableState.EXPECT().GetWorkflowKey().Return(definition.NewWorkflowKey( namespaceEntry.ID().String(), tests.WorkflowID, tests.RunID, )).AnyTimes() - mutableState.EXPECT().GetCurrentBranchToken().Return(nil, nil) + mutableState.EXPECT().GetCurrentBranchToken().Return(nil, nil).AnyTimes() retentionTimerDelay := time.Second cfg := &configs.Config{ DurableArchivalEnabled: func() bool { @@ -211,7 +211,7 @@ func TestTaskGeneratorImpl_GenerateWorkflowCloseTasks(t *testing.T) { var allTasks []tasks.Task mutableState.EXPECT().AddTasks(gomock.Any()).Do(func(ts ...tasks.Task) { allTasks = append(allTasks, ts...) - }) + }).AnyTimes() taskGenerator := NewTaskGenerator(namespaceRegistry, mutableState, cfg) err := taskGenerator.GenerateWorkflowCloseTasks(&historypb.HistoryEvent{