Skip to content

Commit

Permalink
Suport dual visibility in visibility persistence checks (#3968)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin authored Feb 17, 2023
1 parent 93fb0ec commit 06188f3
Show file tree
Hide file tree
Showing 18 changed files with 141 additions and 62 deletions.
14 changes: 12 additions & 2 deletions common/persistence/visibility/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,18 @@ func DefaultAdvancedVisibilityWritingMode(advancedVisibilityConfigExist bool) st
return AdvancedVisibilityWritingModeOff
}

func AllowListForValidation(pluginName string) bool {
switch pluginName {
func AllowListForValidation(storeNames []string) bool {
if len(storeNames) == 0 {
return false
}

if len(storeNames) > 1 {
// If more than one store is configured then it means that dual visibility is enabled.
// Dual visibility is used for migration to advanced, don't allow list of values because it will be removed soon.
return false
}

switch storeNames[0] {
case mysql.PluginNameV8, postgresql.PluginNameV12, sqlite.PluginName:
// Advanced visibility with SQL DB don't support list of values
return false
Expand Down
4 changes: 3 additions & 1 deletion common/persistence/visibility/manager/visibility_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ type (
// VisibilityManager is used to manage the visibility store
VisibilityManager interface {
persistence.Closeable
GetName() string
GetReadStoreName(nsName namespace.Name) string
GetStoreNames() []string
HasStoreName(stName string) bool
GetIndexName() string

// Write APIs.
Expand Down
41 changes: 35 additions & 6 deletions common/persistence/visibility/manager/visibility_manager_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 16 additions & 3 deletions common/persistence/visibility/visibility_manager_dual.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ package visibility

import (
"context"
"strings"

"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence/visibility/manager"
)

Expand Down Expand Up @@ -60,8 +60,21 @@ func (v *visibilityManagerDual) Close() {
v.secondaryVisibilityManager.Close()
}

func (v *visibilityManagerDual) GetName() string {
return strings.Join([]string{v.visibilityManager.GetName(), v.secondaryVisibilityManager.GetName()}, ",")
func (v *visibilityManagerDual) GetReadStoreName(nsName namespace.Name) string {
return v.managerSelector.readManager(nsName).GetReadStoreName(nsName)
}

func (v *visibilityManagerDual) GetStoreNames() []string {
return append(v.visibilityManager.GetStoreNames(), v.secondaryVisibilityManager.GetStoreNames()...)
}

func (v *visibilityManagerDual) HasStoreName(stName string) bool {
for _, sn := range v.GetStoreNames() {
if sn == stName {
return true
}
}
return false
}

func (v *visibilityManagerDual) GetIndexName() string {
Expand Down
11 changes: 10 additions & 1 deletion common/persistence/visibility/visibility_manager_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
workflowpb "go.temporal.io/api/workflow/v1"

"go.temporal.io/server/common/log"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence/visibility/manager"
"go.temporal.io/server/common/persistence/visibility/store"
)
Expand Down Expand Up @@ -72,10 +73,18 @@ func (p *visibilityManagerImpl) Close() {
p.store.Close()
}

func (p *visibilityManagerImpl) GetName() string {
func (p *visibilityManagerImpl) GetReadStoreName(_ namespace.Name) string {
return p.store.GetName()
}

func (p *visibilityManagerImpl) GetStoreNames() []string {
return []string{p.store.GetName()}
}

func (p *visibilityManagerImpl) HasStoreName(stName string) bool {
return p.store.GetName() == stName
}

func (p *visibilityManagerImpl) GetIndexName() string {
return p.store.GetIndexName()
}
Expand Down
13 changes: 11 additions & 2 deletions common/persistence/visibility/visibility_manager_rate_limited.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"context"

"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/visibility/manager"
"go.temporal.io/server/common/quotas"
Expand Down Expand Up @@ -63,8 +64,16 @@ func (m *visibilityManagerRateLimited) Close() {
m.delegate.Close()
}

func (m *visibilityManagerRateLimited) GetName() string {
return m.delegate.GetName()
func (m *visibilityManagerRateLimited) GetReadStoreName(nsName namespace.Name) string {
return m.delegate.GetReadStoreName(nsName)
}

func (m *visibilityManagerRateLimited) GetStoreNames() []string {
return m.delegate.GetStoreNames()
}

func (m *visibilityManagerRateLimited) HasStoreName(stName string) bool {
return m.delegate.HasStoreName(stName)
}

func (m *visibilityManagerRateLimited) GetIndexName() string {
Expand Down
13 changes: 11 additions & 2 deletions common/persistence/visibility/visiblity_manager_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/visibility/manager"
)
Expand Down Expand Up @@ -65,8 +66,16 @@ func (m *visibilityManagerMetrics) Close() {
m.delegate.Close()
}

func (m *visibilityManagerMetrics) GetName() string {
return m.delegate.GetName()
func (m *visibilityManagerMetrics) GetReadStoreName(nsName namespace.Name) string {
return m.delegate.GetReadStoreName(nsName)
}

func (m *visibilityManagerMetrics) GetStoreNames() []string {
return m.delegate.GetStoreNames()
}

func (m *visibilityManagerMetrics) HasStoreName(stName string) bool {
return m.delegate.HasStoreName(stName)
}

func (m *visibilityManagerMetrics) GetIndexName() string {
Expand Down
10 changes: 5 additions & 5 deletions service/frontend/adminHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func (adh *AdminHandler) AddSearchAttributes(
// register the search attributes in the cluster metadata if ES is up or if
// `skip-schema-update` is set. This is for backward compatibility using
// standard visibility.
if adh.visibilityMgr.GetName() == elasticsearch.PersistenceName || indexName == "" {
if adh.visibilityMgr.HasStoreName(elasticsearch.PersistenceName) || indexName == "" {
err = adh.addSearchAttributesElasticsearch(ctx, request, indexName)
} else {
err = adh.addSearchAttributesSQL(ctx, request, currentSearchAttributes)
Expand Down Expand Up @@ -414,7 +414,7 @@ func (adh *AdminHandler) RemoveSearchAttributes(
// register the search attributes in the cluster metadata if ES is up or if
// `skip-schema-update` is set. This is for backward compatibility using
// standard visibility.
if adh.visibilityMgr.GetName() == elasticsearch.PersistenceName || indexName == "" {
if adh.visibilityMgr.HasStoreName(elasticsearch.PersistenceName) || indexName == "" {
err = adh.removeSearchAttributesElasticsearch(ctx, request, indexName)
} else {
err = adh.removeSearchAttributesSQL(ctx, request)
Expand Down Expand Up @@ -520,7 +520,7 @@ func (adh *AdminHandler) GetSearchAttributes(
// register the search attributes in the cluster metadata if ES is up or if
// `skip-schema-update` is set. This is for backward compatibility using
// standard visibility.
if adh.visibilityMgr.GetName() == elasticsearch.PersistenceName || indexName == "" {
if adh.visibilityMgr.HasStoreName(elasticsearch.PersistenceName) || indexName == "" {
return adh.getSearchAttributesElasticsearch(ctx, indexName, searchAttributes)
}
return adh.getSearchAttributesSQL(request, searchAttributes)
Expand Down Expand Up @@ -1038,7 +1038,7 @@ func (adh *AdminHandler) DescribeCluster(
ClusterName: metadata.ClusterName,
HistoryShardCount: metadata.HistoryShardCount,
PersistenceStore: adh.persistenceExecutionManager.GetName(),
VisibilityStore: adh.visibilityMgr.GetName(),
VisibilityStore: strings.Join(adh.visibilityMgr.GetStoreNames(), ","),
VersionInfo: metadata.VersionInfo,
FailoverVersionIncrement: metadata.FailoverVersionIncrement,
InitialFailoverVersion: metadata.InitialFailoverVersion,
Expand Down Expand Up @@ -1628,7 +1628,7 @@ func (adh *AdminHandler) DeleteWorkflowExecution(
var warnings []string
var branchTokens [][]byte
var startTime, closeTime *time.Time
cassVisBackend := strings.Contains(adh.visibilityMgr.GetName(), cassandra.CassandraPersistenceName)
cassVisBackend := adh.visibilityMgr.HasStoreName(cassandra.CassandraPersistenceName)

resp, err := adh.persistenceExecutionManager.GetWorkflowExecution(ctx, &persistence.GetWorkflowExecutionRequest{
ShardID: shardID,
Expand Down
21 changes: 11 additions & 10 deletions service/frontend/adminHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"time"

"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/persistence/visibility/store/standard/cassandra"
"go.temporal.io/server/common/resourcetest"

"google.golang.org/grpc/health"
Expand Down Expand Up @@ -553,7 +554,7 @@ func (s *adminHandlerSuite) Test_AddSearchAttributes() {

mockSdkClient := mocksdk.NewMockClient(s.controller)
s.mockResource.SDKClientFactory.EXPECT().GetSystemClient().Return(mockSdkClient).AnyTimes()
s.mockVisibilityMgr.EXPECT().GetName().Return(elasticsearch.PersistenceName).AnyTimes()
s.mockVisibilityMgr.EXPECT().HasStoreName(elasticsearch.PersistenceName).Return(true).AnyTimes()

// Start workflow failed.
mockSdkClient.EXPECT().ExecuteWorkflow(gomock.Any(), gomock.Any(), "temporal-sys-add-search-attributes-workflow", gomock.Any()).Return(nil, errors.New("start failed"))
Expand Down Expand Up @@ -606,7 +607,7 @@ func (s *adminHandlerSuite) Test_GetSearchAttributes_EmptyIndexName() {
s.mockNamespaceCache.EXPECT().GetNamespace(s.namespace).Return(s.namespaceEntry, nil).AnyTimes()

// Elasticsearch is not configured
s.mockVisibilityMgr.EXPECT().GetName().Return(elasticsearch.PersistenceName).AnyTimes()
s.mockVisibilityMgr.EXPECT().HasStoreName(elasticsearch.PersistenceName).Return(true).AnyTimes()
s.mockVisibilityMgr.EXPECT().GetIndexName().Return("").AnyTimes()
mockSdkClient.EXPECT().DescribeWorkflowExecution(gomock.Any(), "temporal-sys-add-search-attributes-workflow", "").Return(
&workflowservice.DescribeWorkflowExecutionResponse{}, nil)
Expand All @@ -626,7 +627,7 @@ func (s *adminHandlerSuite) Test_GetSearchAttributes_NonEmptyIndexName() {
s.mockResource.SDKClientFactory.EXPECT().GetSystemClient().Return(mockSdkClient).AnyTimes()

// Configure Elasticsearch: add advanced visibility store config with index name.
s.mockVisibilityMgr.EXPECT().GetName().Return(elasticsearch.PersistenceName).AnyTimes()
s.mockVisibilityMgr.EXPECT().HasStoreName(elasticsearch.PersistenceName).Return(true).AnyTimes()
s.mockVisibilityMgr.EXPECT().GetIndexName().Return("random-index-name").AnyTimes()

mockSdkClient.EXPECT().DescribeWorkflowExecution(gomock.Any(), "temporal-sys-add-search-attributes-workflow", "").Return(
Expand Down Expand Up @@ -685,7 +686,7 @@ func (s *adminHandlerSuite) Test_RemoveSearchAttributes_EmptyIndexName() {
}

// Elasticsearch is not configured
s.mockVisibilityMgr.EXPECT().GetName().Return(elasticsearch.PersistenceName).AnyTimes()
s.mockVisibilityMgr.EXPECT().HasStoreName(elasticsearch.PersistenceName).Return(true).AnyTimes()
s.mockVisibilityMgr.EXPECT().GetIndexName().Return("").AnyTimes()
s.mockResource.SearchAttributesProvider.EXPECT().GetSearchAttributes("", true).Return(searchattribute.TestNameTypeMap, nil).AnyTimes()
testCases2 := []test{
Expand Down Expand Up @@ -748,7 +749,7 @@ func (s *adminHandlerSuite) Test_RemoveSearchAttributes_NonEmptyIndexName() {
}

// Configure Elasticsearch: add advanced visibility store config with index name.
s.mockVisibilityMgr.EXPECT().GetName().Return(elasticsearch.PersistenceName).AnyTimes()
s.mockVisibilityMgr.EXPECT().HasStoreName(elasticsearch.PersistenceName).Return(true).AnyTimes()
s.mockVisibilityMgr.EXPECT().GetIndexName().Return("random-index-name").AnyTimes()
s.mockResource.SearchAttributesProvider.EXPECT().GetSearchAttributes("random-index-name", true).Return(searchattribute.TestNameTypeMap, nil).AnyTimes()
for _, testCase := range testCases {
Expand Down Expand Up @@ -1162,7 +1163,7 @@ func (s *adminHandlerSuite) Test_DescribeCluster_CurrentCluster_Success() {
s.mockResource.WorkerServiceResolver.EXPECT().Members().Return([]*membership.HostInfo{})
s.mockResource.WorkerServiceResolver.EXPECT().MemberCount().Return(0)
s.mockResource.ExecutionMgr.EXPECT().GetName().Return("")
s.mockVisibilityMgr.EXPECT().GetName().Return("")
s.mockVisibilityMgr.EXPECT().GetStoreNames().Return([]string{elasticsearch.PersistenceName})
s.mockClusterMetadataManager.EXPECT().GetClusterMetadata(gomock.Any(), &persistence.GetClusterMetadataRequest{ClusterName: clusterName}).Return(
&persistence.GetClusterMetadataResponse{
ClusterMetadata: persistencespb.ClusterMetadata{
Expand Down Expand Up @@ -1201,7 +1202,7 @@ func (s *adminHandlerSuite) Test_DescribeCluster_NonCurrentCluster_Success() {
s.mockResource.WorkerServiceResolver.EXPECT().Members().Return([]*membership.HostInfo{})
s.mockResource.WorkerServiceResolver.EXPECT().MemberCount().Return(0)
s.mockResource.ExecutionMgr.EXPECT().GetName().Return("")
s.mockVisibilityMgr.EXPECT().GetName().Return("")
s.mockVisibilityMgr.EXPECT().GetStoreNames().Return([]string{elasticsearch.PersistenceName})
s.mockClusterMetadataManager.EXPECT().GetClusterMetadata(gomock.Any(), &persistence.GetClusterMetadataRequest{ClusterName: clusterName}).Return(
&persistence.GetClusterMetadataResponse{
ClusterMetadata: persistencespb.ClusterMetadata{
Expand Down Expand Up @@ -1257,7 +1258,7 @@ func (s *adminHandlerSuite) TestDeleteWorkflowExecution_DeleteCurrentExecution()
}

s.mockNamespaceCache.EXPECT().GetNamespaceID(s.namespace).Return(s.namespaceID, nil).AnyTimes()
s.mockVisibilityMgr.EXPECT().GetName().Return("elasticsearch").AnyTimes()
s.mockVisibilityMgr.EXPECT().HasStoreName(cassandra.CassandraPersistenceName).Return(false)

s.mockExecutionMgr.EXPECT().GetCurrentExecution(gomock.Any(), gomock.Any()).Return(nil, errors.New("some random error"))
resp, err := s.handler.DeleteWorkflowExecution(context.Background(), request)
Expand Down Expand Up @@ -1332,7 +1333,7 @@ func (s *adminHandlerSuite) TestDeleteWorkflowExecution_LoadMutableStateFailed()
}

s.mockNamespaceCache.EXPECT().GetNamespaceID(s.namespace).Return(s.namespaceID, nil).AnyTimes()
s.mockVisibilityMgr.EXPECT().GetName().Return("elasticsearch").AnyTimes()
s.mockVisibilityMgr.EXPECT().HasStoreName(cassandra.CassandraPersistenceName).Return(false)

s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(nil, errors.New("some random error"))
s.mockHistoryClient.EXPECT().DeleteWorkflowVisibilityRecord(gomock.Any(), gomock.Any()).Return(&historyservice.DeleteWorkflowVisibilityRecordResponse{}, nil)
Expand All @@ -1355,7 +1356,7 @@ func (s *adminHandlerSuite) TestDeleteWorkflowExecution_CassandraVisibilityBacke
}

s.mockNamespaceCache.EXPECT().GetNamespaceID(s.namespace).Return(s.namespaceID, nil).AnyTimes()
s.mockVisibilityMgr.EXPECT().GetName().Return("elasticsearch,cassandra").AnyTimes()
s.mockVisibilityMgr.EXPECT().HasStoreName(cassandra.CassandraPersistenceName).Return(true).AnyTimes()

// test delete open records
branchToken := []byte("branchToken")
Expand Down
6 changes: 3 additions & 3 deletions service/frontend/operator_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (h *OperatorHandlerImpl) AddSearchAttributes(
// register the search attributes in the cluster metadata if ES is up or if
// `skip-schema-update` is set. This is for backward compatibility using
// standard visibility.
if h.visibilityMgr.GetName() == elasticsearch.PersistenceName || indexName == "" {
if h.visibilityMgr.HasStoreName(elasticsearch.PersistenceName) || indexName == "" {
err = h.addSearchAttributesElasticsearch(ctx, request, indexName, currentSearchAttributes)
if err != nil {
if _, isWorkflowErr := err.(*serviceerror.SystemWorkflow); isWorkflowErr {
Expand Down Expand Up @@ -352,7 +352,7 @@ func (h *OperatorHandlerImpl) RemoveSearchAttributes(
// register the search attributes in the cluster metadata if ES is up or if
// `skip-schema-update` is set. This is for backward compatibility using
// standard visibility.
if h.visibilityMgr.GetName() == elasticsearch.PersistenceName || indexName == "" {
if h.visibilityMgr.HasStoreName(elasticsearch.PersistenceName) || indexName == "" {
err = h.removeSearchAttributesElasticsearch(ctx, request, indexName)
} else {
err = h.removeSearchAttributesSQL(ctx, request)
Expand Down Expand Up @@ -457,7 +457,7 @@ func (h *OperatorHandlerImpl) ListSearchAttributes(
// register the search attributes in the cluster metadata if ES is up or if
// `skip-schema-update` is set. This is for backward compatibility using
// standard visibility.
if h.visibilityMgr.GetName() == elasticsearch.PersistenceName || indexName == "" {
if h.visibilityMgr.HasStoreName(elasticsearch.PersistenceName) || indexName == "" {
return h.listSearchAttributesElasticsearch(ctx, indexName, searchAttributes)
}
return h.listSearchAttributesSQL(request, searchAttributes)
Expand Down
Loading

0 comments on commit 06188f3

Please sign in to comment.