diff --git a/common/persistence/visibility/defs.go b/common/persistence/visibility/defs.go index 71d839cc1cc..36f9c3dc050 100644 --- a/common/persistence/visibility/defs.go +++ b/common/persistence/visibility/defs.go @@ -47,8 +47,18 @@ func DefaultAdvancedVisibilityWritingMode(advancedVisibilityConfigExist bool) st return AdvancedVisibilityWritingModeOff } -func AllowListForValidation(pluginName string) bool { - switch pluginName { +func AllowListForValidation(storeNames []string) bool { + if len(storeNames) == 0 { + return false + } + + if len(storeNames) > 1 { + // If more than one store is configured then it means that dual visibility is enabled. + // Dual visibility is used for migration to advanced, don't allow list of values because it will be removed soon. + return false + } + + switch storeNames[0] { case mysql.PluginNameV8, postgresql.PluginNameV12, sqlite.PluginName: // Advanced visibility with SQL DB don't support list of values return false diff --git a/common/persistence/visibility/manager/visibility_manager.go b/common/persistence/visibility/manager/visibility_manager.go index 6dc6664e95c..c89332afb3f 100644 --- a/common/persistence/visibility/manager/visibility_manager.go +++ b/common/persistence/visibility/manager/visibility_manager.go @@ -43,7 +43,9 @@ type ( // VisibilityManager is used to manage the visibility store VisibilityManager interface { persistence.Closeable - GetName() string + GetReadStoreName(nsName namespace.Name) string + GetStoreNames() []string + HasStoreName(stName string) bool GetIndexName() string // Write APIs. diff --git a/common/persistence/visibility/manager/visibility_manager_mock.go b/common/persistence/visibility/manager/visibility_manager_mock.go index a28cffaec01..49e2163f696 100644 --- a/common/persistence/visibility/manager/visibility_manager_mock.go +++ b/common/persistence/visibility/manager/visibility_manager_mock.go @@ -33,6 +33,7 @@ import ( reflect "reflect" gomock "github.com/golang/mock/gomock" + namespace "go.temporal.io/server/common/namespace" ) // MockVisibilityManager is a mock of VisibilityManager interface. @@ -113,18 +114,32 @@ func (mr *MockVisibilityManagerMockRecorder) GetIndexName() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetIndexName", reflect.TypeOf((*MockVisibilityManager)(nil).GetIndexName)) } -// GetName mocks base method. -func (m *MockVisibilityManager) GetName() string { +// GetReadStoreName mocks base method. +func (m *MockVisibilityManager) GetReadStoreName(nsName namespace.Name) string { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetName") + ret := m.ctrl.Call(m, "GetReadStoreName", nsName) ret0, _ := ret[0].(string) return ret0 } -// GetName indicates an expected call of GetName. -func (mr *MockVisibilityManagerMockRecorder) GetName() *gomock.Call { +// GetReadStoreName indicates an expected call of GetReadStoreName. +func (mr *MockVisibilityManagerMockRecorder) GetReadStoreName(nsName interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetName", reflect.TypeOf((*MockVisibilityManager)(nil).GetName)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetReadStoreName", reflect.TypeOf((*MockVisibilityManager)(nil).GetReadStoreName), nsName) +} + +// GetStoreNames mocks base method. +func (m *MockVisibilityManager) GetStoreNames() []string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetStoreNames") + ret0, _ := ret[0].([]string) + return ret0 +} + +// GetStoreNames indicates an expected call of GetStoreNames. +func (mr *MockVisibilityManagerMockRecorder) GetStoreNames() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetStoreNames", reflect.TypeOf((*MockVisibilityManager)(nil).GetStoreNames)) } // GetWorkflowExecution mocks base method. @@ -142,6 +157,20 @@ func (mr *MockVisibilityManagerMockRecorder) GetWorkflowExecution(ctx, request i return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetWorkflowExecution", reflect.TypeOf((*MockVisibilityManager)(nil).GetWorkflowExecution), ctx, request) } +// HasStoreName mocks base method. +func (m *MockVisibilityManager) HasStoreName(stName string) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "HasStoreName", stName) + ret0, _ := ret[0].(bool) + return ret0 +} + +// HasStoreName indicates an expected call of HasStoreName. +func (mr *MockVisibilityManagerMockRecorder) HasStoreName(stName interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HasStoreName", reflect.TypeOf((*MockVisibilityManager)(nil).HasStoreName), stName) +} + // ListClosedWorkflowExecutions mocks base method. func (m *MockVisibilityManager) ListClosedWorkflowExecutions(ctx context.Context, request *ListWorkflowExecutionsRequest) (*ListWorkflowExecutionsResponse, error) { m.ctrl.T.Helper() diff --git a/common/persistence/visibility/visibility_manager_dual.go b/common/persistence/visibility/visibility_manager_dual.go index 7475357e95e..1fb218b1673 100644 --- a/common/persistence/visibility/visibility_manager_dual.go +++ b/common/persistence/visibility/visibility_manager_dual.go @@ -26,8 +26,8 @@ package visibility import ( "context" - "strings" + "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence/visibility/manager" ) @@ -60,8 +60,21 @@ func (v *visibilityManagerDual) Close() { v.secondaryVisibilityManager.Close() } -func (v *visibilityManagerDual) GetName() string { - return strings.Join([]string{v.visibilityManager.GetName(), v.secondaryVisibilityManager.GetName()}, ",") +func (v *visibilityManagerDual) GetReadStoreName(nsName namespace.Name) string { + return v.managerSelector.readManager(nsName).GetReadStoreName(nsName) +} + +func (v *visibilityManagerDual) GetStoreNames() []string { + return append(v.visibilityManager.GetStoreNames(), v.secondaryVisibilityManager.GetStoreNames()...) +} + +func (v *visibilityManagerDual) HasStoreName(stName string) bool { + for _, sn := range v.GetStoreNames() { + if sn == stName { + return true + } + } + return false } func (v *visibilityManagerDual) GetIndexName() string { diff --git a/common/persistence/visibility/visibility_manager_impl.go b/common/persistence/visibility/visibility_manager_impl.go index 4b91ff59cb3..24c745671a9 100644 --- a/common/persistence/visibility/visibility_manager_impl.go +++ b/common/persistence/visibility/visibility_manager_impl.go @@ -36,6 +36,7 @@ import ( workflowpb "go.temporal.io/api/workflow/v1" "go.temporal.io/server/common/log" + "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence/visibility/manager" "go.temporal.io/server/common/persistence/visibility/store" ) @@ -72,10 +73,18 @@ func (p *visibilityManagerImpl) Close() { p.store.Close() } -func (p *visibilityManagerImpl) GetName() string { +func (p *visibilityManagerImpl) GetReadStoreName(_ namespace.Name) string { return p.store.GetName() } +func (p *visibilityManagerImpl) GetStoreNames() []string { + return []string{p.store.GetName()} +} + +func (p *visibilityManagerImpl) HasStoreName(stName string) bool { + return p.store.GetName() == stName +} + func (p *visibilityManagerImpl) GetIndexName() string { return p.store.GetIndexName() } diff --git a/common/persistence/visibility/visibility_manager_rate_limited.go b/common/persistence/visibility/visibility_manager_rate_limited.go index e7aa59d13bc..62446259b36 100644 --- a/common/persistence/visibility/visibility_manager_rate_limited.go +++ b/common/persistence/visibility/visibility_manager_rate_limited.go @@ -28,6 +28,7 @@ import ( "context" "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/persistence/visibility/manager" "go.temporal.io/server/common/quotas" @@ -63,8 +64,16 @@ func (m *visibilityManagerRateLimited) Close() { m.delegate.Close() } -func (m *visibilityManagerRateLimited) GetName() string { - return m.delegate.GetName() +func (m *visibilityManagerRateLimited) GetReadStoreName(nsName namespace.Name) string { + return m.delegate.GetReadStoreName(nsName) +} + +func (m *visibilityManagerRateLimited) GetStoreNames() []string { + return m.delegate.GetStoreNames() +} + +func (m *visibilityManagerRateLimited) HasStoreName(stName string) bool { + return m.delegate.HasStoreName(stName) } func (m *visibilityManagerRateLimited) GetIndexName() string { diff --git a/common/persistence/visibility/visiblity_manager_metrics.go b/common/persistence/visibility/visiblity_manager_metrics.go index b8545d6d98f..d9ce840de90 100644 --- a/common/persistence/visibility/visiblity_manager_metrics.go +++ b/common/persistence/visibility/visiblity_manager_metrics.go @@ -33,6 +33,7 @@ import ( "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/persistence/visibility/manager" ) @@ -65,8 +66,16 @@ func (m *visibilityManagerMetrics) Close() { m.delegate.Close() } -func (m *visibilityManagerMetrics) GetName() string { - return m.delegate.GetName() +func (m *visibilityManagerMetrics) GetReadStoreName(nsName namespace.Name) string { + return m.delegate.GetReadStoreName(nsName) +} + +func (m *visibilityManagerMetrics) GetStoreNames() []string { + return m.delegate.GetStoreNames() +} + +func (m *visibilityManagerMetrics) HasStoreName(stName string) bool { + return m.delegate.HasStoreName(stName) } func (m *visibilityManagerMetrics) GetIndexName() string { diff --git a/service/frontend/adminHandler.go b/service/frontend/adminHandler.go index ec6b1a1e452..a6a35c86010 100644 --- a/service/frontend/adminHandler.go +++ b/service/frontend/adminHandler.go @@ -275,7 +275,7 @@ func (adh *AdminHandler) AddSearchAttributes( // register the search attributes in the cluster metadata if ES is up or if // `skip-schema-update` is set. This is for backward compatibility using // standard visibility. - if adh.visibilityMgr.GetName() == elasticsearch.PersistenceName || indexName == "" { + if adh.visibilityMgr.HasStoreName(elasticsearch.PersistenceName) || indexName == "" { err = adh.addSearchAttributesElasticsearch(ctx, request, indexName) } else { err = adh.addSearchAttributesSQL(ctx, request, currentSearchAttributes) @@ -414,7 +414,7 @@ func (adh *AdminHandler) RemoveSearchAttributes( // register the search attributes in the cluster metadata if ES is up or if // `skip-schema-update` is set. This is for backward compatibility using // standard visibility. - if adh.visibilityMgr.GetName() == elasticsearch.PersistenceName || indexName == "" { + if adh.visibilityMgr.HasStoreName(elasticsearch.PersistenceName) || indexName == "" { err = adh.removeSearchAttributesElasticsearch(ctx, request, indexName) } else { err = adh.removeSearchAttributesSQL(ctx, request) @@ -520,7 +520,7 @@ func (adh *AdminHandler) GetSearchAttributes( // register the search attributes in the cluster metadata if ES is up or if // `skip-schema-update` is set. This is for backward compatibility using // standard visibility. - if adh.visibilityMgr.GetName() == elasticsearch.PersistenceName || indexName == "" { + if adh.visibilityMgr.HasStoreName(elasticsearch.PersistenceName) || indexName == "" { return adh.getSearchAttributesElasticsearch(ctx, indexName, searchAttributes) } return adh.getSearchAttributesSQL(request, searchAttributes) @@ -1038,7 +1038,7 @@ func (adh *AdminHandler) DescribeCluster( ClusterName: metadata.ClusterName, HistoryShardCount: metadata.HistoryShardCount, PersistenceStore: adh.persistenceExecutionManager.GetName(), - VisibilityStore: adh.visibilityMgr.GetName(), + VisibilityStore: strings.Join(adh.visibilityMgr.GetStoreNames(), ","), VersionInfo: metadata.VersionInfo, FailoverVersionIncrement: metadata.FailoverVersionIncrement, InitialFailoverVersion: metadata.InitialFailoverVersion, @@ -1628,7 +1628,7 @@ func (adh *AdminHandler) DeleteWorkflowExecution( var warnings []string var branchTokens [][]byte var startTime, closeTime *time.Time - cassVisBackend := strings.Contains(adh.visibilityMgr.GetName(), cassandra.CassandraPersistenceName) + cassVisBackend := adh.visibilityMgr.HasStoreName(cassandra.CassandraPersistenceName) resp, err := adh.persistenceExecutionManager.GetWorkflowExecution(ctx, &persistence.GetWorkflowExecutionRequest{ ShardID: shardID, diff --git a/service/frontend/adminHandler_test.go b/service/frontend/adminHandler_test.go index c685191532c..1a730074902 100644 --- a/service/frontend/adminHandler_test.go +++ b/service/frontend/adminHandler_test.go @@ -32,6 +32,7 @@ import ( "time" "go.temporal.io/server/common/clock" + "go.temporal.io/server/common/persistence/visibility/store/standard/cassandra" "go.temporal.io/server/common/resourcetest" "google.golang.org/grpc/health" @@ -553,7 +554,7 @@ func (s *adminHandlerSuite) Test_AddSearchAttributes() { mockSdkClient := mocksdk.NewMockClient(s.controller) s.mockResource.SDKClientFactory.EXPECT().GetSystemClient().Return(mockSdkClient).AnyTimes() - s.mockVisibilityMgr.EXPECT().GetName().Return(elasticsearch.PersistenceName).AnyTimes() + s.mockVisibilityMgr.EXPECT().HasStoreName(elasticsearch.PersistenceName).Return(true).AnyTimes() // Start workflow failed. mockSdkClient.EXPECT().ExecuteWorkflow(gomock.Any(), gomock.Any(), "temporal-sys-add-search-attributes-workflow", gomock.Any()).Return(nil, errors.New("start failed")) @@ -606,7 +607,7 @@ func (s *adminHandlerSuite) Test_GetSearchAttributes_EmptyIndexName() { s.mockNamespaceCache.EXPECT().GetNamespace(s.namespace).Return(s.namespaceEntry, nil).AnyTimes() // Elasticsearch is not configured - s.mockVisibilityMgr.EXPECT().GetName().Return(elasticsearch.PersistenceName).AnyTimes() + s.mockVisibilityMgr.EXPECT().HasStoreName(elasticsearch.PersistenceName).Return(true).AnyTimes() s.mockVisibilityMgr.EXPECT().GetIndexName().Return("").AnyTimes() mockSdkClient.EXPECT().DescribeWorkflowExecution(gomock.Any(), "temporal-sys-add-search-attributes-workflow", "").Return( &workflowservice.DescribeWorkflowExecutionResponse{}, nil) @@ -626,7 +627,7 @@ func (s *adminHandlerSuite) Test_GetSearchAttributes_NonEmptyIndexName() { s.mockResource.SDKClientFactory.EXPECT().GetSystemClient().Return(mockSdkClient).AnyTimes() // Configure Elasticsearch: add advanced visibility store config with index name. - s.mockVisibilityMgr.EXPECT().GetName().Return(elasticsearch.PersistenceName).AnyTimes() + s.mockVisibilityMgr.EXPECT().HasStoreName(elasticsearch.PersistenceName).Return(true).AnyTimes() s.mockVisibilityMgr.EXPECT().GetIndexName().Return("random-index-name").AnyTimes() mockSdkClient.EXPECT().DescribeWorkflowExecution(gomock.Any(), "temporal-sys-add-search-attributes-workflow", "").Return( @@ -685,7 +686,7 @@ func (s *adminHandlerSuite) Test_RemoveSearchAttributes_EmptyIndexName() { } // Elasticsearch is not configured - s.mockVisibilityMgr.EXPECT().GetName().Return(elasticsearch.PersistenceName).AnyTimes() + s.mockVisibilityMgr.EXPECT().HasStoreName(elasticsearch.PersistenceName).Return(true).AnyTimes() s.mockVisibilityMgr.EXPECT().GetIndexName().Return("").AnyTimes() s.mockResource.SearchAttributesProvider.EXPECT().GetSearchAttributes("", true).Return(searchattribute.TestNameTypeMap, nil).AnyTimes() testCases2 := []test{ @@ -748,7 +749,7 @@ func (s *adminHandlerSuite) Test_RemoveSearchAttributes_NonEmptyIndexName() { } // Configure Elasticsearch: add advanced visibility store config with index name. - s.mockVisibilityMgr.EXPECT().GetName().Return(elasticsearch.PersistenceName).AnyTimes() + s.mockVisibilityMgr.EXPECT().HasStoreName(elasticsearch.PersistenceName).Return(true).AnyTimes() s.mockVisibilityMgr.EXPECT().GetIndexName().Return("random-index-name").AnyTimes() s.mockResource.SearchAttributesProvider.EXPECT().GetSearchAttributes("random-index-name", true).Return(searchattribute.TestNameTypeMap, nil).AnyTimes() for _, testCase := range testCases { @@ -1162,7 +1163,7 @@ func (s *adminHandlerSuite) Test_DescribeCluster_CurrentCluster_Success() { s.mockResource.WorkerServiceResolver.EXPECT().Members().Return([]*membership.HostInfo{}) s.mockResource.WorkerServiceResolver.EXPECT().MemberCount().Return(0) s.mockResource.ExecutionMgr.EXPECT().GetName().Return("") - s.mockVisibilityMgr.EXPECT().GetName().Return("") + s.mockVisibilityMgr.EXPECT().GetStoreNames().Return([]string{elasticsearch.PersistenceName}) s.mockClusterMetadataManager.EXPECT().GetClusterMetadata(gomock.Any(), &persistence.GetClusterMetadataRequest{ClusterName: clusterName}).Return( &persistence.GetClusterMetadataResponse{ ClusterMetadata: persistencespb.ClusterMetadata{ @@ -1201,7 +1202,7 @@ func (s *adminHandlerSuite) Test_DescribeCluster_NonCurrentCluster_Success() { s.mockResource.WorkerServiceResolver.EXPECT().Members().Return([]*membership.HostInfo{}) s.mockResource.WorkerServiceResolver.EXPECT().MemberCount().Return(0) s.mockResource.ExecutionMgr.EXPECT().GetName().Return("") - s.mockVisibilityMgr.EXPECT().GetName().Return("") + s.mockVisibilityMgr.EXPECT().GetStoreNames().Return([]string{elasticsearch.PersistenceName}) s.mockClusterMetadataManager.EXPECT().GetClusterMetadata(gomock.Any(), &persistence.GetClusterMetadataRequest{ClusterName: clusterName}).Return( &persistence.GetClusterMetadataResponse{ ClusterMetadata: persistencespb.ClusterMetadata{ @@ -1257,7 +1258,7 @@ func (s *adminHandlerSuite) TestDeleteWorkflowExecution_DeleteCurrentExecution() } s.mockNamespaceCache.EXPECT().GetNamespaceID(s.namespace).Return(s.namespaceID, nil).AnyTimes() - s.mockVisibilityMgr.EXPECT().GetName().Return("elasticsearch").AnyTimes() + s.mockVisibilityMgr.EXPECT().HasStoreName(cassandra.CassandraPersistenceName).Return(false) s.mockExecutionMgr.EXPECT().GetCurrentExecution(gomock.Any(), gomock.Any()).Return(nil, errors.New("some random error")) resp, err := s.handler.DeleteWorkflowExecution(context.Background(), request) @@ -1332,7 +1333,7 @@ func (s *adminHandlerSuite) TestDeleteWorkflowExecution_LoadMutableStateFailed() } s.mockNamespaceCache.EXPECT().GetNamespaceID(s.namespace).Return(s.namespaceID, nil).AnyTimes() - s.mockVisibilityMgr.EXPECT().GetName().Return("elasticsearch").AnyTimes() + s.mockVisibilityMgr.EXPECT().HasStoreName(cassandra.CassandraPersistenceName).Return(false) s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(nil, errors.New("some random error")) s.mockHistoryClient.EXPECT().DeleteWorkflowVisibilityRecord(gomock.Any(), gomock.Any()).Return(&historyservice.DeleteWorkflowVisibilityRecordResponse{}, nil) @@ -1355,7 +1356,7 @@ func (s *adminHandlerSuite) TestDeleteWorkflowExecution_CassandraVisibilityBacke } s.mockNamespaceCache.EXPECT().GetNamespaceID(s.namespace).Return(s.namespaceID, nil).AnyTimes() - s.mockVisibilityMgr.EXPECT().GetName().Return("elasticsearch,cassandra").AnyTimes() + s.mockVisibilityMgr.EXPECT().HasStoreName(cassandra.CassandraPersistenceName).Return(true).AnyTimes() // test delete open records branchToken := []byte("branchToken") diff --git a/service/frontend/operator_handler.go b/service/frontend/operator_handler.go index b29ab23dc69..8e532b0bcce 100644 --- a/service/frontend/operator_handler.go +++ b/service/frontend/operator_handler.go @@ -194,7 +194,7 @@ func (h *OperatorHandlerImpl) AddSearchAttributes( // register the search attributes in the cluster metadata if ES is up or if // `skip-schema-update` is set. This is for backward compatibility using // standard visibility. - if h.visibilityMgr.GetName() == elasticsearch.PersistenceName || indexName == "" { + if h.visibilityMgr.HasStoreName(elasticsearch.PersistenceName) || indexName == "" { err = h.addSearchAttributesElasticsearch(ctx, request, indexName, currentSearchAttributes) if err != nil { if _, isWorkflowErr := err.(*serviceerror.SystemWorkflow); isWorkflowErr { @@ -352,7 +352,7 @@ func (h *OperatorHandlerImpl) RemoveSearchAttributes( // register the search attributes in the cluster metadata if ES is up or if // `skip-schema-update` is set. This is for backward compatibility using // standard visibility. - if h.visibilityMgr.GetName() == elasticsearch.PersistenceName || indexName == "" { + if h.visibilityMgr.HasStoreName(elasticsearch.PersistenceName) || indexName == "" { err = h.removeSearchAttributesElasticsearch(ctx, request, indexName) } else { err = h.removeSearchAttributesSQL(ctx, request) @@ -457,7 +457,7 @@ func (h *OperatorHandlerImpl) ListSearchAttributes( // register the search attributes in the cluster metadata if ES is up or if // `skip-schema-update` is set. This is for backward compatibility using // standard visibility. - if h.visibilityMgr.GetName() == elasticsearch.PersistenceName || indexName == "" { + if h.visibilityMgr.HasStoreName(elasticsearch.PersistenceName) || indexName == "" { return h.listSearchAttributesElasticsearch(ctx, indexName, searchAttributes) } return h.listSearchAttributesSQL(request, searchAttributes) diff --git a/service/frontend/operator_handler_test.go b/service/frontend/operator_handler_test.go index 1439fcbaded..ab2befec294 100644 --- a/service/frontend/operator_handler_test.go +++ b/service/frontend/operator_handler_test.go @@ -125,7 +125,7 @@ func (s *operatorHandlerSuite) Test_AddSearchAttributes_EmptyIndexName() { }, } - s.mockResource.VisibilityManager.EXPECT().GetName().Return(elasticsearch.PersistenceName).AnyTimes() + s.mockResource.VisibilityManager.EXPECT().HasStoreName(elasticsearch.PersistenceName).Return(true) s.mockResource.VisibilityManager.EXPECT().GetIndexName().Return("").AnyTimes() for _, testCase := range testCases1 { s.T().Run(testCase.Name, func(t *testing.T) { @@ -197,7 +197,7 @@ func (s *operatorHandlerSuite) Test_AddSearchAttributes_NonEmptyIndexName() { } // Configure Elasticsearch: add advanced visibility store config with index name. - s.mockResource.VisibilityManager.EXPECT().GetName().Return(elasticsearch.PersistenceName).AnyTimes() + s.mockResource.VisibilityManager.EXPECT().HasStoreName(elasticsearch.PersistenceName).Return(true).AnyTimes() s.mockResource.VisibilityManager.EXPECT().GetIndexName().Return("random-index-name").AnyTimes() s.mockResource.SearchAttributesProvider.EXPECT().GetSearchAttributes("random-index-name", true).Return(searchattribute.TestNameTypeMap, nil).AnyTimes() for _, testCase := range testCases { @@ -255,7 +255,7 @@ func (s *operatorHandlerSuite) Test_ListSearchAttributes_EmptyIndexName() { handler := s.handler ctx := context.Background() - s.mockResource.VisibilityManager.EXPECT().GetName().Return(elasticsearch.PersistenceName).AnyTimes() + s.mockResource.VisibilityManager.EXPECT().HasStoreName(elasticsearch.PersistenceName).Return(true) s.mockResource.VisibilityManager.EXPECT().GetIndexName().Return("").AnyTimes() resp, err := handler.ListSearchAttributes(ctx, nil) s.Error(err) @@ -276,7 +276,7 @@ func (s *operatorHandlerSuite) Test_ListSearchAttributes_NonEmptyIndexName() { ctx := context.Background() // Configure Elasticsearch: add advanced visibility store config with index name. - s.mockResource.VisibilityManager.EXPECT().GetName().Return(elasticsearch.PersistenceName).AnyTimes() + s.mockResource.VisibilityManager.EXPECT().HasStoreName(elasticsearch.PersistenceName).Return(true) s.mockResource.VisibilityManager.EXPECT().GetIndexName().Return("random-index-name").AnyTimes() s.mockResource.ESClient.EXPECT().GetMapping(gomock.Any(), "random-index-name").Return(map[string]string{"col": "type"}, nil) s.mockResource.SearchAttributesProvider.EXPECT().GetSearchAttributes("random-index-name", true).Return(searchattribute.TestNameTypeMap, nil) @@ -313,7 +313,7 @@ func (s *operatorHandlerSuite) Test_RemoveSearchAttributes_EmptyIndexName() { }, } - s.mockResource.VisibilityManager.EXPECT().GetName().Return(elasticsearch.PersistenceName).AnyTimes() + s.mockResource.VisibilityManager.EXPECT().HasStoreName(elasticsearch.PersistenceName).Return(true).AnyTimes() s.mockResource.VisibilityManager.EXPECT().GetIndexName().Return("").AnyTimes() for _, testCase := range testCases1 { s.T().Run(testCase.Name, func(t *testing.T) { @@ -385,7 +385,7 @@ func (s *operatorHandlerSuite) Test_RemoveSearchAttributes_NonEmptyIndexName() { } // Configure Elasticsearch: add advanced visibility store config with index name. - s.mockResource.VisibilityManager.EXPECT().GetName().Return(elasticsearch.PersistenceName).AnyTimes() + s.mockResource.VisibilityManager.EXPECT().HasStoreName(elasticsearch.PersistenceName).Return(true).AnyTimes() s.mockResource.VisibilityManager.EXPECT().GetIndexName().Return("random-index-name").AnyTimes() s.mockResource.SearchAttributesProvider.EXPECT().GetSearchAttributes("random-index-name", true).Return(searchattribute.TestNameTypeMap, nil).AnyTimes() for _, testCase := range testCases { diff --git a/service/frontend/workflow_handler.go b/service/frontend/workflow_handler.go index ab4c471810c..056dfcec389 100644 --- a/service/frontend/workflow_handler.go +++ b/service/frontend/workflow_handler.go @@ -188,7 +188,7 @@ func NewWorkflowHandler( config.SearchAttributesSizeOfValueLimit, config.SearchAttributesTotalSizeLimit, visibilityMrg.GetIndexName(), - visibility.AllowListForValidation(visibilityMrg.GetName()), + visibility.AllowListForValidation(visibilityMrg.GetStoreNames()), ), archivalMetadata: archivalMetadata, healthServer: healthServer, @@ -2850,7 +2850,7 @@ func (wh *WorkflowHandler) GetClusterInfo(ctx context.Context, _ *workflowservic ClusterName: metadata.ClusterName, HistoryShardCount: metadata.HistoryShardCount, PersistenceStore: wh.persistenceExecutionManager.GetName(), - VisibilityStore: wh.visibilityMrg.GetName(), + VisibilityStore: strings.Join(wh.visibilityMrg.GetStoreNames(), ","), }, nil } diff --git a/service/frontend/workflow_handler_test.go b/service/frontend/workflow_handler_test.go index 9cb021c20de..910fb271159 100644 --- a/service/frontend/workflow_handler_test.go +++ b/service/frontend/workflow_handler_test.go @@ -72,6 +72,7 @@ import ( "go.temporal.io/server/common/payloads" "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/persistence/visibility/manager" + "go.temporal.io/server/common/persistence/visibility/store/elasticsearch" "go.temporal.io/server/common/primitives" "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/common/resourcetest" @@ -161,7 +162,7 @@ func (s *workflowHandlerSuite) SetupTest() { mockMonitor := s.mockResource.MembershipMonitor mockMonitor.EXPECT().GetMemberCount(primitives.FrontendService).Return(5, nil).AnyTimes() - s.mockVisibilityMgr.EXPECT().GetName().Return("").AnyTimes() + s.mockVisibilityMgr.EXPECT().GetStoreNames().Return([]string{elasticsearch.PersistenceName}) } func (s *workflowHandlerSuite) TearDownTest() { diff --git a/service/history/handler.go b/service/history/handler.go index d74a73c3160..00bb60543a0 100644 --- a/service/history/handler.go +++ b/service/history/handler.go @@ -28,7 +28,6 @@ import ( "context" "fmt" "math" - "strings" "sync" "sync/atomic" @@ -1787,16 +1786,15 @@ func (h *Handler) DeleteWorkflowVisibilityRecord( namespaceID := namespace.ID(request.GetNamespaceId()) if namespaceID == "" { - return nil, h.convertError(errNamespaceNotSet) + return nil, errNamespaceNotSet } if request.Execution == nil { - return nil, h.convertError(errWorkflowExecutionNotSet) + return nil, errWorkflowExecutionNotSet } - // if using cass visibility, then either start or close time should be non-nilv - cassVisBackend := strings.Contains(h.persistenceVisibilityManager.GetName(), cassandra.CassandraPersistenceName) - if cassVisBackend && request.WorkflowStartTime == nil && request.WorkflowCloseTime == nil { + // If at least one visibility store is Cassandra, then either start or close time should be non-nil. + if h.persistenceVisibilityManager.HasStoreName(cassandra.CassandraPersistenceName) && request.WorkflowStartTime == nil && request.WorkflowCloseTime == nil { return nil, &serviceerror.InvalidArgument{Message: "workflow start and close time not specified when deleting cassandra based visibility record"} } @@ -1806,8 +1804,6 @@ func (h *Handler) DeleteWorkflowVisibilityRecord( // delete) again to delete again if this happens. // For ES implementation, we used max int64 as the TaskID (version) to make sure deletion is // the last operation applied for this workflow - fmt.Println("history DeleteWorkflowVisibilityRecord ") - err := h.persistenceVisibilityManager.DeleteWorkflowExecution(ctx, &manager.VisibilityDeleteWorkflowExecutionRequest{ NamespaceID: namespaceID, WorkflowID: request.Execution.GetWorkflowId(), @@ -1817,7 +1813,7 @@ func (h *Handler) DeleteWorkflowVisibilityRecord( CloseTime: request.GetWorkflowCloseTime(), }) if err != nil { - return nil, err + return nil, h.convertError(err) } return &historyservice.DeleteWorkflowVisibilityRecordResponse{}, nil diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index c6051d42a58..09fb74a3e4a 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -231,7 +231,7 @@ func NewEngineWithShardContext( config.SearchAttributesSizeOfValueLimit, config.SearchAttributesTotalSizeLimit, config.DefaultVisibilityIndexName, - visibility.AllowListForValidation(persistenceVisibilityMgr.GetName()), + visibility.AllowListForValidation(persistenceVisibilityMgr.GetStoreNames()), ) historyEngImpl.workflowTaskHandler = newWorkflowTaskHandlerCallback(historyEngImpl) diff --git a/service/worker/deletenamespace/reclaimresources/activities.go b/service/worker/deletenamespace/reclaimresources/activities.go index 41bde0589a5..1f00c025936 100644 --- a/service/worker/deletenamespace/reclaimresources/activities.go +++ b/service/worker/deletenamespace/reclaimresources/activities.go @@ -66,8 +66,8 @@ func NewActivities( } } -func (a *Activities) IsAdvancedVisibilityActivity(_ context.Context) (bool, error) { - switch a.visibilityManager.GetName() { +func (a *Activities) IsAdvancedVisibilityActivity(_ context.Context, nsName namespace.Name) (bool, error) { + switch a.visibilityManager.GetReadStoreName(nsName) { case elasticsearch.PersistenceName, mysql.PluginNameV8, postgresql.PluginNameV12, sqlite.PluginName: return true, nil default: diff --git a/service/worker/deletenamespace/reclaimresources/workflow.go b/service/worker/deletenamespace/reclaimresources/workflow.go index 8b93cf746b3..66a16c86959 100644 --- a/service/worker/deletenamespace/reclaimresources/workflow.go +++ b/service/worker/deletenamespace/reclaimresources/workflow.go @@ -158,7 +158,7 @@ func deleteWorkflowExecutions(ctx workflow.Context, params ReclaimResourcesParam ctx1 := workflow.WithLocalActivityOptions(ctx, localActivityOptions) var isAdvancedVisibility bool - err := workflow.ExecuteLocalActivity(ctx1, a.IsAdvancedVisibilityActivity).Get(ctx, &isAdvancedVisibility) + err := workflow.ExecuteLocalActivity(ctx1, a.IsAdvancedVisibilityActivity, params.Namespace).Get(ctx, &isAdvancedVisibility) if err != nil { return result, fmt.Errorf("%w: IsAdvancedVisibilityActivity: %v", errors.ErrUnableToExecuteActivity, err) } diff --git a/service/worker/deletenamespace/reclaimresources/workflow_test.go b/service/worker/deletenamespace/reclaimresources/workflow_test.go index 85dce875521..9c8001fcf7e 100644 --- a/service/worker/deletenamespace/reclaimresources/workflow_test.go +++ b/service/worker/deletenamespace/reclaimresources/workflow_test.go @@ -68,7 +68,7 @@ func Test_ReclaimResourcesWorkflow_Success(t *testing.T) { ErrorCount: 0, }, nil).Once() - env.OnActivity(a.IsAdvancedVisibilityActivity, mock.Anything).Return(true, nil).Once() + env.OnActivity(a.IsAdvancedVisibilityActivity, mock.Anything, namespace.Name("namespace")).Return(true, nil).Once() env.OnActivity(a.CountExecutionsAdvVisibilityActivity, mock.Anything, namespace.ID("namespace-id"), namespace.Name("namespace")).Return(int64(10), nil).Once() env.OnActivity(a.EnsureNoExecutionsAdvVisibilityActivity, mock.Anything, namespace.ID("namespace-id"), namespace.Name("namespace"), 0).Return(nil).Once() @@ -117,7 +117,7 @@ func Test_ReclaimResourcesWorkflow_EnsureNoExecutionsActivity_Error(t *testing.T ErrorCount: 0, }, nil).Once() - env.OnActivity(a.IsAdvancedVisibilityActivity, mock.Anything).Return(true, nil).Once() + env.OnActivity(a.IsAdvancedVisibilityActivity, mock.Anything, namespace.Name("namespace")).Return(true, nil).Once() env.OnActivity(a.CountExecutionsAdvVisibilityActivity, mock.Anything, namespace.ID("namespace-id"), namespace.Name("namespace")).Return(int64(10), nil).Once() env.OnActivity(a.EnsureNoExecutionsAdvVisibilityActivity, mock.Anything, namespace.ID("namespace-id"), namespace.Name("namespace"), 0). Return(stderrors.New("specific_error_from_activity")). @@ -163,7 +163,7 @@ func Test_ReclaimResourcesWorkflow_EnsureNoExecutionsActivity_ExecutionsStillExi ErrorCount: 0, }, nil).Once() - env.OnActivity(a.IsAdvancedVisibilityActivity, mock.Anything).Return(true, nil).Once() + env.OnActivity(a.IsAdvancedVisibilityActivity, mock.Anything, namespace.Name("namespace")).Return(true, nil).Once() env.OnActivity(a.CountExecutionsAdvVisibilityActivity, mock.Anything, namespace.ID("namespace-id"), namespace.Name("namespace")).Return(int64(10), nil).Once() env.OnActivity(a.EnsureNoExecutionsAdvVisibilityActivity, mock.Anything, namespace.ID("namespace-id"), namespace.Name("namespace"), 0). Return(errors.NewExecutionsStillExistError(1)). @@ -192,7 +192,7 @@ func Test_ReclaimResourcesWorkflow_NoActivityMocks_Success(t *testing.T) { ctrl := gomock.NewController(t) visibilityManager := manager.NewMockVisibilityManager(ctrl) - visibilityManager.EXPECT().GetName().Return("elasticsearch") + visibilityManager.EXPECT().GetReadStoreName(namespace.Name("namespace")).Return("elasticsearch") // For CountExecutionsAdvVisibilityActivity. visibilityManager.EXPECT().CountWorkflowExecutions(gomock.Any(), &manager.CountWorkflowExecutionsRequest{ @@ -279,7 +279,7 @@ func Test_ReclaimResourcesWorkflow_NoActivityMocks_NoProgressMade(t *testing.T) ctrl := gomock.NewController(t) visibilityManager := manager.NewMockVisibilityManager(ctrl) - visibilityManager.EXPECT().GetName().Return("elasticsearch") + visibilityManager.EXPECT().GetReadStoreName(namespace.Name("namespace")).Return("elasticsearch") // For CountExecutionsAdvVisibilityActivity. visibilityManager.EXPECT().CountWorkflowExecutions(gomock.Any(), &manager.CountWorkflowExecutionsRequest{