From d9791918ecc4b88fdc0f8e665cf3199c38cda12c Mon Sep 17 00:00:00 2001 From: Michael Snowden Date: Fri, 20 Jan 2023 12:39:26 -0800 Subject: [PATCH] Produce archival tasks conditionally --- common/archiver/archivalMetadata.go | 13 +- common/archiver/metadata_mock.go | 109 ++++++++++++++ common/archiver/metadata_mock_test.go | 96 +++++++++++++ common/resourcetest/resourceTest.go | 4 +- service/frontend/workflow_handler_test.go | 2 +- .../history/archival_queue_task_executor.go | 1 + .../transferQueueActiveTaskExecutor_test.go | 7 +- .../transferQueueStandbyTaskExecutor_test.go | 7 +- service/history/workflow/task_generator.go | 20 ++- .../workflow/task_generator_provider.go | 1 + .../history/workflow/task_generator_test.go | 135 ++++++++++++++---- 11 files changed, 361 insertions(+), 34 deletions(-) create mode 100644 common/archiver/metadata_mock.go create mode 100644 common/archiver/metadata_mock_test.go diff --git a/common/archiver/archivalMetadata.go b/common/archiver/archivalMetadata.go index 0e160b78cf0..6e23928f7fc 100644 --- a/common/archiver/archivalMetadata.go +++ b/common/archiver/archivalMetadata.go @@ -146,7 +146,7 @@ func NewArchivalConfig( } } -// NewDisabledArchvialConfig returns a disabled ArchivalConfig +// NewDisabledArchvialConfig returns an ArchivalConfig where archival is disabled for both the cluster and the namespace func NewDisabledArchvialConfig() ArchivalConfig { return &archivalConfig{ staticClusterState: ArchivalDisabled, @@ -157,6 +157,17 @@ func NewDisabledArchvialConfig() ArchivalConfig { } } +// NewEnabledArchivalConfig returns an ArchivalConfig where archival is enabled for both the cluster and the namespace +func NewEnabledArchivalConfig() ArchivalConfig { + return &archivalConfig{ + staticClusterState: ArchivalEnabled, + dynamicClusterState: dynamicconfig.GetStringPropertyFn("enabled"), + enableRead: dynamicconfig.GetBoolPropertyFn(true), + namespaceDefaultState: enumspb.ARCHIVAL_STATE_ENABLED, + namespaceDefaultURI: "some-uri", + } +} + // ClusterConfiguredForArchival returns true if cluster is configured to handle archival, false otherwise func (a *archivalConfig) ClusterConfiguredForArchival() bool { return a.GetClusterState() == ArchivalEnabled diff --git a/common/archiver/metadata_mock.go b/common/archiver/metadata_mock.go new file mode 100644 index 00000000000..f1f5dcdb6ca --- /dev/null +++ b/common/archiver/metadata_mock.go @@ -0,0 +1,109 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package archiver + +import ( + "github.com/golang/mock/gomock" +) + +// MetadataMock is an implementation of ArchivalMetadata that can be used for testing. +// It can be used as a mock, but it also provides default values, which is something that can't be done with +// *MockArchivalMetadata. This cuts down on the amount of boilerplate code needed to write tests. +type MetadataMock interface { + ArchivalMetadata + // EXPECT returns a MetadataMockRecorder which can be used to set expectations on the mock. + EXPECT() MetadataMockRecorder + // SetHistoryEnabledByDefault sets the default history archival config to be enabled. + SetHistoryEnabledByDefault() + // SetVisibilityEnabledByDefault sets the default visibility archival config to be enabled. + SetVisibilityEnabledByDefault() +} + +// NewMetadataMock returns a new MetadataMock which uses the provided controller to create a MockArchivalMetadata +// instance. +func NewMetadataMock(controller *gomock.Controller) MetadataMock { + m := &metadataMock{ + MockArchivalMetadata: NewMockArchivalMetadata(controller), + defaultHistoryConfig: NewDisabledArchvialConfig(), + defaultVisibilityConfig: NewDisabledArchvialConfig(), + } + return m +} + +// MetadataMockRecorder is a wrapper around a ArchivalMetadata mock recorder. +// It is used to determine whether any calls to EXPECT().GetHistoryConfig() or EXPECT().GetVisibilityConfig() were made. +// A call to EXPECT().GetSomeConfig() causes that default config to no longer be used. +type MetadataMockRecorder interface { + GetHistoryConfig() *gomock.Call + GetVisibilityConfig() *gomock.Call +} + +type metadataMock struct { + *MockArchivalMetadata + defaultHistoryConfig ArchivalConfig + defaultVisibilityConfig ArchivalConfig + historyOverwritten bool + visibilityOverwritten bool +} + +func (m *metadataMock) SetHistoryEnabledByDefault() { + m.defaultHistoryConfig = NewEnabledArchivalConfig() +} + +func (m *metadataMock) SetVisibilityEnabledByDefault() { + m.defaultVisibilityConfig = NewEnabledArchivalConfig() +} + +func (m *metadataMock) GetHistoryConfig() ArchivalConfig { + if !m.historyOverwritten { + return m.defaultHistoryConfig + } + return m.MockArchivalMetadata.GetHistoryConfig() +} + +func (m *metadataMock) GetVisibilityConfig() ArchivalConfig { + if !m.visibilityOverwritten { + return m.defaultVisibilityConfig + } + return m.MockArchivalMetadata.GetVisibilityConfig() +} + +func (m *metadataMock) EXPECT() MetadataMockRecorder { + return metadataMockRecorder{m} +} + +type metadataMockRecorder struct { + *metadataMock +} + +func (r metadataMockRecorder) GetHistoryConfig() *gomock.Call { + r.metadataMock.historyOverwritten = true + return r.MockArchivalMetadata.EXPECT().GetHistoryConfig() +} + +func (r metadataMockRecorder) GetVisibilityConfig() *gomock.Call { + r.metadataMock.visibilityOverwritten = true + return r.MockArchivalMetadata.EXPECT().GetVisibilityConfig() +} diff --git a/common/archiver/metadata_mock_test.go b/common/archiver/metadata_mock_test.go new file mode 100644 index 00000000000..e2958a80c68 --- /dev/null +++ b/common/archiver/metadata_mock_test.go @@ -0,0 +1,96 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package archiver + +import ( + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" +) + +func TestMetadataMock(t *testing.T) { + t.Run("GetHistoryConfig", func(t *testing.T) { + metadata := NewMetadataMock(gomock.NewController(t)) + config := metadata.GetHistoryConfig() + + assert.False(t, config.ClusterConfiguredForArchival()) + }) + t.Run("GetVisibilityConfig", func(t *testing.T) { + metadata := NewMetadataMock(gomock.NewController(t)) + config := metadata.GetVisibilityConfig() + + assert.False(t, config.ClusterConfiguredForArchival()) + }) + t.Run("GetHistoryConfig_SetHistoryEnabledByDefault", func(t *testing.T) { + metadata := NewMetadataMock(gomock.NewController(t)) + metadata.SetHistoryEnabledByDefault() + config := metadata.GetHistoryConfig() + + assert.True(t, config.ClusterConfiguredForArchival()) + + metadata.EXPECT().GetHistoryConfig().Return(NewDisabledArchvialConfig()) + config = metadata.GetHistoryConfig() + + assert.False(t, config.ClusterConfiguredForArchival()) + }) + t.Run("GetVisibilityConfig_SetVisibilityEnabledByDefault", func(t *testing.T) { + metadata := NewMetadataMock(gomock.NewController(t)) + metadata.SetVisibilityEnabledByDefault() + config := metadata.GetVisibilityConfig() + + assert.True(t, config.ClusterConfiguredForArchival()) + + metadata.EXPECT().GetVisibilityConfig().Return(NewDisabledArchvialConfig()) + config = metadata.GetVisibilityConfig() + + assert.False(t, config.ClusterConfiguredForArchival()) + }) + t.Run("EXPECT_GetHistoryConfig", func(t *testing.T) { + metadata := NewMetadataMock(gomock.NewController(t)) + metadata.EXPECT().GetHistoryConfig().Return(NewEnabledArchivalConfig()) + config := metadata.GetHistoryConfig() + + assert.True(t, config.ClusterConfiguredForArchival()) + + metadata.EXPECT().GetHistoryConfig().Return(NewDisabledArchvialConfig()) + config = metadata.GetHistoryConfig() + + assert.False(t, config.ClusterConfiguredForArchival()) + }) + + t.Run("EXPECT_GetVisibilityConfig", func(t *testing.T) { + metadata := NewMetadataMock(gomock.NewController(t)) + metadata.EXPECT().GetVisibilityConfig().Return(NewEnabledArchivalConfig()) + config := metadata.GetVisibilityConfig() + + assert.True(t, config.ClusterConfiguredForArchival()) + + metadata.EXPECT().GetVisibilityConfig().Return(NewDisabledArchvialConfig()) + config = metadata.GetVisibilityConfig() + + assert.False(t, config.ClusterConfiguredForArchival()) + }) +} diff --git a/common/resourcetest/resourceTest.go b/common/resourcetest/resourceTest.go index 93eae2f6784..2083edf94f8 100644 --- a/common/resourcetest/resourceTest.go +++ b/common/resourcetest/resourceTest.go @@ -74,7 +74,7 @@ type ( TimeSource clock.TimeSource PayloadSerializer serialization.Serializer MetricsHandler metrics.Handler - ArchivalMetadata *archiver.MockArchivalMetadata + ArchivalMetadata archiver.MetadataMock ArchiverProvider *provider.MockArchiverProvider // membership infos @@ -184,7 +184,7 @@ func NewTest( TimeSource: clock.NewRealTimeSource(), PayloadSerializer: serialization.NewSerializer(), MetricsHandler: metricsHandler, - ArchivalMetadata: archiver.NewMockArchivalMetadata(controller), + ArchivalMetadata: archiver.NewMetadataMock(controller), ArchiverProvider: provider.NewMockArchiverProvider(controller), // membership infos diff --git a/service/frontend/workflow_handler_test.go b/service/frontend/workflow_handler_test.go index 71ff798ef7d..18e8bde4b30 100644 --- a/service/frontend/workflow_handler_test.go +++ b/service/frontend/workflow_handler_test.go @@ -106,7 +106,7 @@ type ( mockMetadataMgr *persistence.MockMetadataManager mockExecutionManager *persistence.MockExecutionManager mockVisibilityMgr *manager.MockVisibilityManager - mockArchivalMetadata *archiver.MockArchivalMetadata + mockArchivalMetadata archiver.MetadataMock mockArchiverProvider *provider.MockArchiverProvider mockHistoryArchiver *archiver.MockHistoryArchiver mockVisibilityArchiver *archiver.MockVisibilityArchiver diff --git a/service/history/archival_queue_task_executor.go b/service/history/archival_queue_task_executor.go index a4d7ba93bc6..4857da98fce 100644 --- a/service/history/archival_queue_task_executor.go +++ b/service/history/archival_queue_task_executor.go @@ -246,6 +246,7 @@ func (e *archivalQueueTaskExecutor) addDeletionTask( e.shardContext.GetNamespaceRegistry(), mutableState, e.shardContext.GetConfig(), + e.shardContext.GetArchivalMetadata(), ) err = taskGenerator.GenerateDeleteHistoryEventTask(*closeTime, true) if err != nil { diff --git a/service/history/transferQueueActiveTaskExecutor_test.go b/service/history/transferQueueActiveTaskExecutor_test.go index 799069d005e..25f9d5b2b1f 100644 --- a/service/history/transferQueueActiveTaskExecutor_test.go +++ b/service/history/transferQueueActiveTaskExecutor_test.go @@ -51,7 +51,6 @@ import ( "go.temporal.io/server/api/matchingservicemock/v1" persistencespb "go.temporal.io/server/api/persistence/v1" workflowspb "go.temporal.io/server/api/workflow/v1" - "go.temporal.io/server/common" "go.temporal.io/server/common/archiver" "go.temporal.io/server/common/archiver/provider" @@ -102,7 +101,7 @@ type ( mockExecutionMgr *persistence.MockExecutionManager mockArchivalClient *warchiver.MockClient - mockArchivalMetadata *archiver.MockArchivalMetadata + mockArchivalMetadata archiver.MetadataMock mockArchiverProvider *provider.MockArchiverProvider mockParentClosePolicyClient *parentclosepolicy.MockClient @@ -207,6 +206,8 @@ func (s *transferQueueActiveTaskExecutorSuite) SetupTest() { s.mockClusterMetadata.EXPECT().GetAllClusterInfo().Return(cluster.TestAllClusterInfo).AnyTimes() s.mockClusterMetadata.EXPECT().IsGlobalNamespaceEnabled().Return(true).AnyTimes() s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(s.namespaceEntry.IsGlobalNamespace(), s.version).Return(s.mockClusterMetadata.GetCurrentClusterName()).AnyTimes() + s.mockArchivalMetadata.SetHistoryEnabledByDefault() + s.mockArchivalMetadata.SetVisibilityEnabledByDefault() s.workflowCache = wcache.NewCache(s.mockShard) s.logger = s.mockShard.GetLogger() @@ -800,7 +801,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessCloseExecution_CanSkip dc.GetBoolPropertyFn(true), "disabled", "random URI", - )) + )).AnyTimes() s.mockArchivalClient.EXPECT().Archive(gomock.Any(), gomock.Any()).Return(nil, nil) s.mockSearchAttributesProvider.EXPECT().GetSearchAttributes(gomock.Any(), false) } diff --git a/service/history/transferQueueStandbyTaskExecutor_test.go b/service/history/transferQueueStandbyTaskExecutor_test.go index 4ba67883c10..26254d63cd0 100644 --- a/service/history/transferQueueStandbyTaskExecutor_test.go +++ b/service/history/transferQueueStandbyTaskExecutor_test.go @@ -95,7 +95,7 @@ type ( mockExecutionMgr *persistence.MockExecutionManager mockArchivalClient *warchiver.MockClient - mockArchivalMetadata *archiver.MockArchivalMetadata + mockArchivalMetadata archiver.MetadataMock mockArchiverProvider *provider.MockArchiverProvider workflowCache wcache.Cache @@ -194,6 +194,9 @@ func (s *transferQueueStandbyTaskExecutorSuite) SetupTest() { s.workflowCache = wcache.NewCache(s.mockShard) s.logger = s.mockShard.GetLogger() + s.mockArchivalMetadata.SetHistoryEnabledByDefault() + s.mockArchivalMetadata.SetVisibilityEnabledByDefault() + h := &historyEngineImpl{ currentClusterName: s.mockShard.Resource.GetClusterMetadata().GetCurrentClusterName(), shard: s.mockShard, @@ -745,7 +748,7 @@ func (s *transferQueueStandbyTaskExecutorSuite) TestProcessCloseExecution_CanSki "disabled", "random URI", ), - ) + ).AnyTimes() s.mockArchivalClient.EXPECT().Archive(gomock.Any(), gomock.Any()).Return(nil, nil) s.mockSearchAttributesProvider.EXPECT().GetSearchAttributes(gomock.Any(), false) } diff --git a/service/history/workflow/task_generator.go b/service/history/workflow/task_generator.go index e4421e8eeb0..5f23601c2b9 100644 --- a/service/history/workflow/task_generator.go +++ b/service/history/workflow/task_generator.go @@ -35,6 +35,7 @@ import ( "go.temporal.io/api/serviceerror" enumsspb "go.temporal.io/server/api/enums/v1" + "go.temporal.io/server/common/archiver" "go.temporal.io/server/common/backoff" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence/versionhistory" @@ -108,6 +109,7 @@ type ( namespaceRegistry namespace.Registry mutableState MutableState config *configs.Config + archivalMetadata archiver.ArchivalMetadata } ) @@ -119,11 +121,13 @@ func NewTaskGenerator( namespaceRegistry namespace.Registry, mutableState MutableState, config *configs.Config, + archivalMetadata archiver.ArchivalMetadata, ) *TaskGeneratorImpl { return &TaskGeneratorImpl{ namespaceRegistry: namespaceRegistry, mutableState: mutableState, config: config, + archivalMetadata: archivalMetadata, } } @@ -188,7 +192,7 @@ func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks( Version: currentVersion, }, ) - if r.config.DurableArchivalEnabled() { + if r.archivalQueueEnabled() { retention, err := r.getRetention() if err != nil { return err @@ -636,3 +640,17 @@ func (r *TaskGeneratorImpl) getTargetNamespaceID( return namespace.ID(r.mutableState.GetExecutionInfo().NamespaceId), nil } + +// archivalQueueEnabled returns true if archival is enabled for either history or visibility, and the archival queue +// itself is also enabled. +// For both history and visibility, we check that archival is enabled for both the cluster and the namespace. +func (r *TaskGeneratorImpl) archivalQueueEnabled() bool { + if !r.config.DurableArchivalEnabled() { + return false + } + namespaceEntry := r.mutableState.GetNamespaceEntry() + return r.archivalMetadata.GetHistoryConfig().ClusterConfiguredForArchival() && + namespaceEntry.HistoryArchivalState().State == enumspb.ARCHIVAL_STATE_ENABLED || + r.archivalMetadata.GetVisibilityConfig().ClusterConfiguredForArchival() && + namespaceEntry.VisibilityArchivalState().State == enumspb.ARCHIVAL_STATE_ENABLED +} diff --git a/service/history/workflow/task_generator_provider.go b/service/history/workflow/task_generator_provider.go index 2d3beb7a97f..51b1684d84e 100644 --- a/service/history/workflow/task_generator_provider.go +++ b/service/history/workflow/task_generator_provider.go @@ -56,5 +56,6 @@ func (p *taskGeneratorProviderImpl) NewTaskGenerator( shard.GetNamespaceRegistry(), mutableState, shard.GetConfig(), + shard.GetArchivalMetadata(), ) } diff --git a/service/history/workflow/task_generator_test.go b/service/history/workflow/task_generator_test.go index 9f81ba8cd24..7ba0acdf118 100644 --- a/service/history/workflow/task_generator_test.go +++ b/service/history/workflow/task_generator_test.go @@ -55,9 +55,12 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.temporal.io/api/enums/v1" historypb "go.temporal.io/api/history/v1" "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/common/archiver" + "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/definition" "go.temporal.io/server/common/log" "go.temporal.io/server/common/namespace" @@ -73,12 +76,16 @@ type testConfig struct { } type testParams struct { - DurableArchivalEnabled bool - DeleteAfterClose bool - CloseEventTime time.Time - Retention time.Duration - Logger *log.MockLogger - ArchivalProcessorArchiveDelay time.Duration + DurableArchivalEnabled bool + DeleteAfterClose bool + CloseEventTime time.Time + Retention time.Duration + Logger *log.MockLogger + ArchivalProcessorArchiveDelay time.Duration + HistoryArchivalEnabledInCluster bool + HistoryArchivalEnabledInNamespace bool + VisibilityArchivalEnabledForCluster bool + VisibilityArchivalEnabledInNamespace bool ExpectCloseExecutionVisibilityTask bool ExpectArchiveExecutionTask bool @@ -104,15 +111,6 @@ func TestTaskGeneratorImpl_GenerateWorkflowCloseTasks(t *testing.T) { p.ExpectArchiveExecutionTask = true }, }, - { - Name: "delete after close", - ConfigFn: func(p *testParams) { - p.DurableArchivalEnabled = true - - p.ExpectCloseExecutionVisibilityTask = true - p.ExpectArchiveExecutionTask = true - }, - }, { Name: "delete after close ignores durable execution flag", ConfigFn: func(p *testParams) { @@ -159,19 +157,68 @@ func TestTaskGeneratorImpl_GenerateWorkflowCloseTasks(t *testing.T) { p.ExpectArchiveExecutionTask = true }, }, + { + Name: "history archival disabled", + ConfigFn: func(p *testParams) { + p.DurableArchivalEnabled = true + p.HistoryArchivalEnabledInCluster = false + p.HistoryArchivalEnabledInNamespace = false + + p.ExpectCloseExecutionVisibilityTask = true + p.ExpectArchiveExecutionTask = true + }, + }, + { + Name: "visibility archival disabled", + ConfigFn: func(p *testParams) { + p.DurableArchivalEnabled = true + p.VisibilityArchivalEnabledForCluster = false + p.VisibilityArchivalEnabledInNamespace = false + + p.ExpectCloseExecutionVisibilityTask = true + p.ExpectArchiveExecutionTask = true + }, + }, + { + Name: "archival disabled in cluster", + ConfigFn: func(p *testParams) { + p.DurableArchivalEnabled = true + p.HistoryArchivalEnabledInCluster = false + p.VisibilityArchivalEnabledForCluster = false + + p.ExpectCloseExecutionVisibilityTask = true + p.ExpectDeleteHistoryEventTask = true + p.ExpectArchiveExecutionTask = false + }, + }, + { + Name: "archival disabled in namespace", + ConfigFn: func(p *testParams) { + p.DurableArchivalEnabled = true + p.HistoryArchivalEnabledInNamespace = false + p.VisibilityArchivalEnabledInNamespace = false + + p.ExpectCloseExecutionVisibilityTask = true + p.ExpectDeleteHistoryEventTask = true + p.ExpectArchiveExecutionTask = false + }, + }, } { c := c t.Run(c.Name, func(t *testing.T) { - // t.Parallel() now := time.Unix(0, 0).UTC() ctrl := gomock.NewController(t) mockLogger := log.NewMockLogger(ctrl) p := testParams{ - DurableArchivalEnabled: false, - DeleteAfterClose: false, - CloseEventTime: now, - Retention: time.Hour * 24 * 7, - Logger: mockLogger, + DurableArchivalEnabled: false, + DeleteAfterClose: false, + CloseEventTime: now, + Retention: time.Hour * 24 * 7, + Logger: mockLogger, + HistoryArchivalEnabledInCluster: true, + HistoryArchivalEnabledInNamespace: true, + VisibilityArchivalEnabledForCluster: true, + VisibilityArchivalEnabledInNamespace: true, ExpectCloseExecutionVisibilityTask: false, ExpectArchiveExecutionTask: false, @@ -180,11 +227,39 @@ func TestTaskGeneratorImpl_GenerateWorkflowCloseTasks(t *testing.T) { } c.ConfigFn(&p) namespaceRegistry := namespace.NewMockRegistry(ctrl) - namespaceEntry := tests.GlobalNamespaceEntry.Clone(namespace.WithRetention(&p.Retention)) + + namespaceConfig := &persistence.NamespaceConfig{ + Retention: &p.Retention, + HistoryArchivalUri: "test:///history/archival/", + VisibilityArchivalUri: "test:///visibility/archival", + } + if p.HistoryArchivalEnabledInNamespace { + namespaceConfig.HistoryArchivalState = enums.ARCHIVAL_STATE_ENABLED + } else { + namespaceConfig.HistoryArchivalState = enums.ARCHIVAL_STATE_DISABLED + } + if p.VisibilityArchivalEnabledInNamespace { + namespaceConfig.VisibilityArchivalState = enums.ARCHIVAL_STATE_ENABLED + } else { + namespaceConfig.VisibilityArchivalState = enums.ARCHIVAL_STATE_DISABLED + } + namespaceEntry := namespace.NewGlobalNamespaceForTest( + &persistence.NamespaceInfo{Id: tests.NamespaceID.String(), Name: tests.Namespace.String()}, + namespaceConfig, + &persistence.NamespaceReplicationConfig{ + ActiveClusterName: cluster.TestCurrentClusterName, + Clusters: []string{ + cluster.TestCurrentClusterName, + cluster.TestAlternativeClusterName, + }, + }, + tests.Version, + ) namespaceRegistry.EXPECT().GetNamespaceID(gomock.Any()).Return(namespaceEntry.ID(), nil).AnyTimes() namespaceRegistry.EXPECT().GetNamespaceByID(namespaceEntry.ID()).Return(namespaceEntry, nil).AnyTimes() mutableState := NewMockMutableState(ctrl) + mutableState.EXPECT().GetNamespaceEntry().Return(namespaceEntry).AnyTimes() mutableState.EXPECT().GetCurrentVersion().Return(int64(0)).AnyTimes() mutableState.EXPECT().GetExecutionInfo().Return(&persistence.WorkflowExecutionInfo{ NamespaceId: namespaceEntry.ID().String(), @@ -211,7 +286,19 @@ func TestTaskGeneratorImpl_GenerateWorkflowCloseTasks(t *testing.T) { allTasks = append(allTasks, ts...) }).AnyTimes() - taskGenerator := NewTaskGenerator(namespaceRegistry, mutableState, cfg) + archivalMetadata := archiver.NewMockArchivalMetadata(ctrl) + archivalMetadata.EXPECT().GetHistoryConfig().DoAndReturn(func() archiver.ArchivalConfig { + cfg := archiver.NewMockArchivalConfig(ctrl) + cfg.EXPECT().ClusterConfiguredForArchival().Return(p.HistoryArchivalEnabledInCluster).AnyTimes() + return cfg + }).AnyTimes() + archivalMetadata.EXPECT().GetVisibilityConfig().DoAndReturn(func() archiver.ArchivalConfig { + cfg := archiver.NewMockArchivalConfig(ctrl) + cfg.EXPECT().ClusterConfiguredForArchival().Return(p.VisibilityArchivalEnabledForCluster).AnyTimes() + return cfg + }).AnyTimes() + + taskGenerator := NewTaskGenerator(namespaceRegistry, mutableState, cfg, archivalMetadata) err := taskGenerator.GenerateWorkflowCloseTasks(&historypb.HistoryEvent{ Attributes: &historypb.HistoryEvent_WorkflowExecutionCompletedEventAttributes{ WorkflowExecutionCompletedEventAttributes: &historypb.WorkflowExecutionCompletedEventAttributes{}, @@ -242,7 +329,7 @@ func TestTaskGeneratorImpl_GenerateWorkflowCloseTasks(t *testing.T) { assert.Equal(t, p.DeleteAfterClose, closeExecutionTask.DeleteAfterClose) assert.Equal( t, - p.DurableArchivalEnabled && !p.DeleteAfterClose, + p.ExpectArchiveExecutionTask, closeExecutionTask.CanSkipVisibilityArchival, )