Skip to content

Commit

Permalink
Produce archival tasks conditionally (#3823)
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden authored Jan 24, 2023
1 parent b9bba94 commit 554b32d
Show file tree
Hide file tree
Showing 11 changed files with 361 additions and 34 deletions.
13 changes: 12 additions & 1 deletion common/archiver/archivalMetadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func NewArchivalConfig(
}
}

// NewDisabledArchvialConfig returns a disabled ArchivalConfig
// NewDisabledArchvialConfig returns an ArchivalConfig where archival is disabled for both the cluster and the namespace
func NewDisabledArchvialConfig() ArchivalConfig {
return &archivalConfig{
staticClusterState: ArchivalDisabled,
Expand All @@ -157,6 +157,17 @@ func NewDisabledArchvialConfig() ArchivalConfig {
}
}

// NewEnabledArchivalConfig returns an ArchivalConfig where archival is enabled for both the cluster and the namespace
func NewEnabledArchivalConfig() ArchivalConfig {
return &archivalConfig{
staticClusterState: ArchivalEnabled,
dynamicClusterState: dynamicconfig.GetStringPropertyFn("enabled"),
enableRead: dynamicconfig.GetBoolPropertyFn(true),
namespaceDefaultState: enumspb.ARCHIVAL_STATE_ENABLED,
namespaceDefaultURI: "some-uri",
}
}

// ClusterConfiguredForArchival returns true if cluster is configured to handle archival, false otherwise
func (a *archivalConfig) ClusterConfiguredForArchival() bool {
return a.GetClusterState() == ArchivalEnabled
Expand Down
109 changes: 109 additions & 0 deletions common/archiver/metadata_mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package archiver

import (
"github.com/golang/mock/gomock"
)

// MetadataMock is an implementation of ArchivalMetadata that can be used for testing.
// It can be used as a mock, but it also provides default values, which is something that can't be done with
// *MockArchivalMetadata. This cuts down on the amount of boilerplate code needed to write tests.
type MetadataMock interface {
ArchivalMetadata
// EXPECT returns a MetadataMockRecorder which can be used to set expectations on the mock.
EXPECT() MetadataMockRecorder
// SetHistoryEnabledByDefault sets the default history archival config to be enabled.
SetHistoryEnabledByDefault()
// SetVisibilityEnabledByDefault sets the default visibility archival config to be enabled.
SetVisibilityEnabledByDefault()
}

// NewMetadataMock returns a new MetadataMock which uses the provided controller to create a MockArchivalMetadata
// instance.
func NewMetadataMock(controller *gomock.Controller) MetadataMock {
m := &metadataMock{
MockArchivalMetadata: NewMockArchivalMetadata(controller),
defaultHistoryConfig: NewDisabledArchvialConfig(),
defaultVisibilityConfig: NewDisabledArchvialConfig(),
}
return m
}

// MetadataMockRecorder is a wrapper around a ArchivalMetadata mock recorder.
// It is used to determine whether any calls to EXPECT().GetHistoryConfig() or EXPECT().GetVisibilityConfig() were made.
// A call to EXPECT().GetSomeConfig() causes that default config to no longer be used.
type MetadataMockRecorder interface {
GetHistoryConfig() *gomock.Call
GetVisibilityConfig() *gomock.Call
}

type metadataMock struct {
*MockArchivalMetadata
defaultHistoryConfig ArchivalConfig
defaultVisibilityConfig ArchivalConfig
historyOverwritten bool
visibilityOverwritten bool
}

func (m *metadataMock) SetHistoryEnabledByDefault() {
m.defaultHistoryConfig = NewEnabledArchivalConfig()
}

func (m *metadataMock) SetVisibilityEnabledByDefault() {
m.defaultVisibilityConfig = NewEnabledArchivalConfig()
}

func (m *metadataMock) GetHistoryConfig() ArchivalConfig {
if !m.historyOverwritten {
return m.defaultHistoryConfig
}
return m.MockArchivalMetadata.GetHistoryConfig()
}

func (m *metadataMock) GetVisibilityConfig() ArchivalConfig {
if !m.visibilityOverwritten {
return m.defaultVisibilityConfig
}
return m.MockArchivalMetadata.GetVisibilityConfig()
}

func (m *metadataMock) EXPECT() MetadataMockRecorder {
return metadataMockRecorder{m}
}

type metadataMockRecorder struct {
*metadataMock
}

func (r metadataMockRecorder) GetHistoryConfig() *gomock.Call {
r.metadataMock.historyOverwritten = true
return r.MockArchivalMetadata.EXPECT().GetHistoryConfig()
}

func (r metadataMockRecorder) GetVisibilityConfig() *gomock.Call {
r.metadataMock.visibilityOverwritten = true
return r.MockArchivalMetadata.EXPECT().GetVisibilityConfig()
}
96 changes: 96 additions & 0 deletions common/archiver/metadata_mock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package archiver

import (
"testing"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
)

func TestMetadataMock(t *testing.T) {
t.Run("GetHistoryConfig", func(t *testing.T) {
metadata := NewMetadataMock(gomock.NewController(t))
config := metadata.GetHistoryConfig()

assert.False(t, config.ClusterConfiguredForArchival())
})
t.Run("GetVisibilityConfig", func(t *testing.T) {
metadata := NewMetadataMock(gomock.NewController(t))
config := metadata.GetVisibilityConfig()

assert.False(t, config.ClusterConfiguredForArchival())
})
t.Run("GetHistoryConfig_SetHistoryEnabledByDefault", func(t *testing.T) {
metadata := NewMetadataMock(gomock.NewController(t))
metadata.SetHistoryEnabledByDefault()
config := metadata.GetHistoryConfig()

assert.True(t, config.ClusterConfiguredForArchival())

metadata.EXPECT().GetHistoryConfig().Return(NewDisabledArchvialConfig())
config = metadata.GetHistoryConfig()

assert.False(t, config.ClusterConfiguredForArchival())
})
t.Run("GetVisibilityConfig_SetVisibilityEnabledByDefault", func(t *testing.T) {
metadata := NewMetadataMock(gomock.NewController(t))
metadata.SetVisibilityEnabledByDefault()
config := metadata.GetVisibilityConfig()

assert.True(t, config.ClusterConfiguredForArchival())

metadata.EXPECT().GetVisibilityConfig().Return(NewDisabledArchvialConfig())
config = metadata.GetVisibilityConfig()

assert.False(t, config.ClusterConfiguredForArchival())
})
t.Run("EXPECT_GetHistoryConfig", func(t *testing.T) {
metadata := NewMetadataMock(gomock.NewController(t))
metadata.EXPECT().GetHistoryConfig().Return(NewEnabledArchivalConfig())
config := metadata.GetHistoryConfig()

assert.True(t, config.ClusterConfiguredForArchival())

metadata.EXPECT().GetHistoryConfig().Return(NewDisabledArchvialConfig())
config = metadata.GetHistoryConfig()

assert.False(t, config.ClusterConfiguredForArchival())
})

t.Run("EXPECT_GetVisibilityConfig", func(t *testing.T) {
metadata := NewMetadataMock(gomock.NewController(t))
metadata.EXPECT().GetVisibilityConfig().Return(NewEnabledArchivalConfig())
config := metadata.GetVisibilityConfig()

assert.True(t, config.ClusterConfiguredForArchival())

metadata.EXPECT().GetVisibilityConfig().Return(NewDisabledArchvialConfig())
config = metadata.GetVisibilityConfig()

assert.False(t, config.ClusterConfiguredForArchival())
})
}
4 changes: 2 additions & 2 deletions common/resourcetest/resourceTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type (
TimeSource clock.TimeSource
PayloadSerializer serialization.Serializer
MetricsHandler metrics.Handler
ArchivalMetadata *archiver.MockArchivalMetadata
ArchivalMetadata archiver.MetadataMock
ArchiverProvider *provider.MockArchiverProvider

// membership infos
Expand Down Expand Up @@ -184,7 +184,7 @@ func NewTest(
TimeSource: clock.NewRealTimeSource(),
PayloadSerializer: serialization.NewSerializer(),
MetricsHandler: metricsHandler,
ArchivalMetadata: archiver.NewMockArchivalMetadata(controller),
ArchivalMetadata: archiver.NewMetadataMock(controller),
ArchiverProvider: provider.NewMockArchiverProvider(controller),

// membership infos
Expand Down
2 changes: 1 addition & 1 deletion service/frontend/workflow_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ type (
mockMetadataMgr *persistence.MockMetadataManager
mockExecutionManager *persistence.MockExecutionManager
mockVisibilityMgr *manager.MockVisibilityManager
mockArchivalMetadata *archiver.MockArchivalMetadata
mockArchivalMetadata archiver.MetadataMock
mockArchiverProvider *provider.MockArchiverProvider
mockHistoryArchiver *archiver.MockHistoryArchiver
mockVisibilityArchiver *archiver.MockVisibilityArchiver
Expand Down
1 change: 1 addition & 0 deletions service/history/archival_queue_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ func (e *archivalQueueTaskExecutor) addDeletionTask(
e.shardContext.GetNamespaceRegistry(),
mutableState,
e.shardContext.GetConfig(),
e.shardContext.GetArchivalMetadata(),
)
err = taskGenerator.GenerateDeleteHistoryEventTask(*closeTime, true)
if err != nil {
Expand Down
7 changes: 4 additions & 3 deletions service/history/transferQueueActiveTaskExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ import (
"go.temporal.io/server/api/matchingservicemock/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
workflowspb "go.temporal.io/server/api/workflow/v1"

"go.temporal.io/server/common"
"go.temporal.io/server/common/archiver"
"go.temporal.io/server/common/archiver/provider"
Expand Down Expand Up @@ -102,7 +101,7 @@ type (

mockExecutionMgr *persistence.MockExecutionManager
mockArchivalClient *warchiver.MockClient
mockArchivalMetadata *archiver.MockArchivalMetadata
mockArchivalMetadata archiver.MetadataMock
mockArchiverProvider *provider.MockArchiverProvider
mockParentClosePolicyClient *parentclosepolicy.MockClient

Expand Down Expand Up @@ -207,6 +206,8 @@ func (s *transferQueueActiveTaskExecutorSuite) SetupTest() {
s.mockClusterMetadata.EXPECT().GetAllClusterInfo().Return(cluster.TestAllClusterInfo).AnyTimes()
s.mockClusterMetadata.EXPECT().IsGlobalNamespaceEnabled().Return(true).AnyTimes()
s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(s.namespaceEntry.IsGlobalNamespace(), s.version).Return(s.mockClusterMetadata.GetCurrentClusterName()).AnyTimes()
s.mockArchivalMetadata.SetHistoryEnabledByDefault()
s.mockArchivalMetadata.SetVisibilityEnabledByDefault()

s.workflowCache = wcache.NewCache(s.mockShard)
s.logger = s.mockShard.GetLogger()
Expand Down Expand Up @@ -800,7 +801,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessCloseExecution_CanSkip
dc.GetBoolPropertyFn(true),
"disabled",
"random URI",
))
)).AnyTimes()
s.mockArchivalClient.EXPECT().Archive(gomock.Any(), gomock.Any()).Return(nil, nil)
s.mockSearchAttributesProvider.EXPECT().GetSearchAttributes(gomock.Any(), false)
}
Expand Down
7 changes: 5 additions & 2 deletions service/history/transferQueueStandbyTaskExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ type (

mockExecutionMgr *persistence.MockExecutionManager
mockArchivalClient *warchiver.MockClient
mockArchivalMetadata *archiver.MockArchivalMetadata
mockArchivalMetadata archiver.MetadataMock
mockArchiverProvider *provider.MockArchiverProvider

workflowCache wcache.Cache
Expand Down Expand Up @@ -194,6 +194,9 @@ func (s *transferQueueStandbyTaskExecutorSuite) SetupTest() {
s.workflowCache = wcache.NewCache(s.mockShard)
s.logger = s.mockShard.GetLogger()

s.mockArchivalMetadata.SetHistoryEnabledByDefault()
s.mockArchivalMetadata.SetVisibilityEnabledByDefault()

h := &historyEngineImpl{
currentClusterName: s.mockShard.Resource.GetClusterMetadata().GetCurrentClusterName(),
shard: s.mockShard,
Expand Down Expand Up @@ -745,7 +748,7 @@ func (s *transferQueueStandbyTaskExecutorSuite) TestProcessCloseExecution_CanSki
"disabled",
"random URI",
),
)
).AnyTimes()
s.mockArchivalClient.EXPECT().Archive(gomock.Any(), gomock.Any()).Return(nil, nil)
s.mockSearchAttributesProvider.EXPECT().GetSearchAttributes(gomock.Any(), false)
}
Expand Down
20 changes: 19 additions & 1 deletion service/history/workflow/task_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"go.temporal.io/api/serviceerror"

enumsspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/common/archiver"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence/versionhistory"
Expand Down Expand Up @@ -108,6 +109,7 @@ type (
namespaceRegistry namespace.Registry
mutableState MutableState
config *configs.Config
archivalMetadata archiver.ArchivalMetadata
}
)

Expand All @@ -119,11 +121,13 @@ func NewTaskGenerator(
namespaceRegistry namespace.Registry,
mutableState MutableState,
config *configs.Config,
archivalMetadata archiver.ArchivalMetadata,
) *TaskGeneratorImpl {
return &TaskGeneratorImpl{
namespaceRegistry: namespaceRegistry,
mutableState: mutableState,
config: config,
archivalMetadata: archivalMetadata,
}
}

Expand Down Expand Up @@ -188,7 +192,7 @@ func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks(
Version: currentVersion,
},
)
if r.config.DurableArchivalEnabled() {
if r.archivalQueueEnabled() {
retention, err := r.getRetention()
if err != nil {
return err
Expand Down Expand Up @@ -636,3 +640,17 @@ func (r *TaskGeneratorImpl) getTargetNamespaceID(

return namespace.ID(r.mutableState.GetExecutionInfo().NamespaceId), nil
}

// archivalQueueEnabled returns true if archival is enabled for either history or visibility, and the archival queue
// itself is also enabled.
// For both history and visibility, we check that archival is enabled for both the cluster and the namespace.
func (r *TaskGeneratorImpl) archivalQueueEnabled() bool {
if !r.config.DurableArchivalEnabled() {
return false
}
namespaceEntry := r.mutableState.GetNamespaceEntry()
return r.archivalMetadata.GetHistoryConfig().ClusterConfiguredForArchival() &&
namespaceEntry.HistoryArchivalState().State == enumspb.ARCHIVAL_STATE_ENABLED ||
r.archivalMetadata.GetVisibilityConfig().ClusterConfiguredForArchival() &&
namespaceEntry.VisibilityArchivalState().State == enumspb.ARCHIVAL_STATE_ENABLED
}
1 change: 1 addition & 0 deletions service/history/workflow/task_generator_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,6 @@ func (p *taskGeneratorProviderImpl) NewTaskGenerator(
shard.GetNamespaceRegistry(),
mutableState,
shard.GetConfig(),
shard.GetArchivalMetadata(),
)
}
Loading

0 comments on commit 554b32d

Please sign in to comment.