From dc257ce87a8d9e95cd1235009db525bbd51488b0 Mon Sep 17 00:00:00 2001 From: Rodrigo Zhou Date: Fri, 27 Jan 2023 00:44:39 -0800 Subject: [PATCH] Add GetIndexName method to visibility manager (#3820) --- common/persistence/sql/common.go | 4 + .../persistence/sql/sqlplugin/interfaces.go | 1 + common/persistence/sql/sqlplugin/mysql/db.go | 5 ++ .../sql/sqlplugin/postgresql/db.go | 5 ++ common/persistence/sql/sqlplugin/sqlite/db.go | 5 ++ .../visibility/manager/visibility_manager.go | 1 + .../manager/visibility_manager_mock.go | 14 ++++ .../store/elasticsearch/visibility_store.go | 4 + .../standard/cassandra/visibility_store.go | 6 ++ .../store/standard/sql/visibility_store.go | 4 + .../store/standard/visibility_store.go | 4 + .../visibility/store/visibility_store.go | 1 + .../visibility/store/visibility_store_mock.go | 14 ++++ .../visibility/visibility_manager_dual.go | 4 + .../visibility/visibility_manager_impl.go | 4 + .../visibility_manager_rate_limited.go | 4 + .../visibility/visiblity_manager_metrics.go | 4 + common/resourcetest/resourceTest.go | 8 ++ common/searchattribute/manager.go | 16 +++- service/frontend/adminHandler.go | 9 +- service/frontend/adminHandler_test.go | 66 ++++++++------- service/frontend/dcRedirectionHandler_test.go | 10 ++- service/frontend/dcRedirectionPolicy_test.go | 2 +- service/frontend/fx.go | 8 +- service/frontend/operator_handler.go | 13 +-- service/frontend/operator_handler_test.go | 84 +++++++++++-------- service/frontend/service.go | 4 +- service/frontend/workflow_handler.go | 14 ++-- service/frontend/workflow_handler_test.go | 12 +-- service/history/fx.go | 15 +++- tests/test_cluster.go | 16 +++- 31 files changed, 253 insertions(+), 108 deletions(-) diff --git a/common/persistence/sql/common.go b/common/persistence/sql/common.go index c24a943cb94..597b815eabb 100644 --- a/common/persistence/sql/common.go +++ b/common/persistence/sql/common.go @@ -57,6 +57,10 @@ func (m *SqlStore) GetName() string { return m.Db.PluginName() } +func (m *SqlStore) GetDbName() string { + return m.Db.DbName() +} + func (m *SqlStore) Close() { if m.Db != nil { err := m.Db.Close() diff --git a/common/persistence/sql/sqlplugin/interfaces.go b/common/persistence/sql/sqlplugin/interfaces.go index 14f202ca1a6..a450bff837f 100644 --- a/common/persistence/sql/sqlplugin/interfaces.go +++ b/common/persistence/sql/sqlplugin/interfaces.go @@ -110,6 +110,7 @@ type ( BeginTx(ctx context.Context) (Tx, error) PluginName() string + DbName() string IsDupEntryError(err error) bool Close() error } diff --git a/common/persistence/sql/sqlplugin/mysql/db.go b/common/persistence/sql/sqlplugin/mysql/db.go index 7236844bce2..115721c7076 100644 --- a/common/persistence/sql/sqlplugin/mysql/db.go +++ b/common/persistence/sql/sqlplugin/mysql/db.go @@ -111,6 +111,11 @@ func (mdb *db) PluginName() string { return PluginName } +// DbName returns the name of the database +func (mdb *db) DbName() string { + return mdb.dbName +} + // ExpectedVersion returns expected version. func (mdb *db) ExpectedVersion() string { switch mdb.dbKind { diff --git a/common/persistence/sql/sqlplugin/postgresql/db.go b/common/persistence/sql/sqlplugin/postgresql/db.go index b32fd8756ad..c3de74b00f8 100644 --- a/common/persistence/sql/sqlplugin/postgresql/db.go +++ b/common/persistence/sql/sqlplugin/postgresql/db.go @@ -110,6 +110,11 @@ func (pdb *db) PluginName() string { return PluginName } +// DbName returns the name of the database +func (pdb *db) DbName() string { + return pdb.dbName +} + // ExpectedVersion returns expected version. func (pdb *db) ExpectedVersion() string { switch pdb.dbKind { diff --git a/common/persistence/sql/sqlplugin/sqlite/db.go b/common/persistence/sql/sqlplugin/sqlite/db.go index ab4046f8a63..f3ebe8ab42e 100644 --- a/common/persistence/sql/sqlplugin/sqlite/db.go +++ b/common/persistence/sql/sqlplugin/sqlite/db.go @@ -122,6 +122,11 @@ func (mdb *db) PluginName() string { return PluginName } +// DbName returns the name of the database +func (mdb *db) DbName() string { + return mdb.dbName +} + // ExpectedVersion returns expected version. func (mdb *db) ExpectedVersion() string { switch mdb.dbKind { diff --git a/common/persistence/visibility/manager/visibility_manager.go b/common/persistence/visibility/manager/visibility_manager.go index 2c6fe5795b7..6dc6664e95c 100644 --- a/common/persistence/visibility/manager/visibility_manager.go +++ b/common/persistence/visibility/manager/visibility_manager.go @@ -44,6 +44,7 @@ type ( VisibilityManager interface { persistence.Closeable GetName() string + GetIndexName() string // Write APIs. RecordWorkflowExecutionStarted(ctx context.Context, request *RecordWorkflowExecutionStartedRequest) error diff --git a/common/persistence/visibility/manager/visibility_manager_mock.go b/common/persistence/visibility/manager/visibility_manager_mock.go index 1c99edff4b7..a28cffaec01 100644 --- a/common/persistence/visibility/manager/visibility_manager_mock.go +++ b/common/persistence/visibility/manager/visibility_manager_mock.go @@ -99,6 +99,20 @@ func (mr *MockVisibilityManagerMockRecorder) DeleteWorkflowExecution(ctx, reques return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteWorkflowExecution", reflect.TypeOf((*MockVisibilityManager)(nil).DeleteWorkflowExecution), ctx, request) } +// GetIndexName mocks base method. +func (m *MockVisibilityManager) GetIndexName() string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetIndexName") + ret0, _ := ret[0].(string) + return ret0 +} + +// GetIndexName indicates an expected call of GetIndexName. +func (mr *MockVisibilityManagerMockRecorder) GetIndexName() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetIndexName", reflect.TypeOf((*MockVisibilityManager)(nil).GetIndexName)) +} + // GetName mocks base method. func (m *MockVisibilityManager) GetName() string { m.ctrl.T.Helper() diff --git a/common/persistence/visibility/store/elasticsearch/visibility_store.go b/common/persistence/visibility/store/elasticsearch/visibility_store.go index 3efa3c5a83c..7e12a89216b 100644 --- a/common/persistence/visibility/store/elasticsearch/visibility_store.go +++ b/common/persistence/visibility/store/elasticsearch/visibility_store.go @@ -130,6 +130,10 @@ func (s *visibilityStore) GetName() string { return PersistenceName } +func (s *visibilityStore) GetIndexName() string { + return s.index +} + func (s *visibilityStore) RecordWorkflowExecutionStarted( ctx context.Context, request *store.InternalRecordWorkflowExecutionStartedRequest, diff --git a/common/persistence/visibility/store/standard/cassandra/visibility_store.go b/common/persistence/visibility/store/standard/cassandra/visibility_store.go index 8165553074d..2a06f235f1e 100644 --- a/common/persistence/visibility/store/standard/cassandra/visibility_store.go +++ b/common/persistence/visibility/store/standard/cassandra/visibility_store.go @@ -140,6 +140,7 @@ type ( visibilityStore struct { session gocql.Session lowConslevel gocql.Consistency + keyspace string } ) @@ -158,6 +159,7 @@ func NewVisibilityStore( return &visibilityStore{ session: session, lowConslevel: gocql.One, + keyspace: cfg.Keyspace, }, nil } @@ -165,6 +167,10 @@ func (v *visibilityStore) GetName() string { return CassandraPersistenceName } +func (v *visibilityStore) GetIndexName() string { + return v.keyspace +} + // Close releases the resources held by this object func (v *visibilityStore) Close() { v.session.Close() diff --git a/common/persistence/visibility/store/standard/sql/visibility_store.go b/common/persistence/visibility/store/standard/sql/visibility_store.go index c20c9182c53..ee6d17b3998 100644 --- a/common/persistence/visibility/store/standard/sql/visibility_store.go +++ b/common/persistence/visibility/store/standard/sql/visibility_store.go @@ -80,6 +80,10 @@ func (s *visibilityStore) GetName() string { return s.sqlStore.GetName() } +func (s *visibilityStore) GetIndexName() string { + return s.sqlStore.GetDbName() +} + func (s *visibilityStore) RecordWorkflowExecutionStarted( ctx context.Context, request *store.InternalRecordWorkflowExecutionStartedRequest, diff --git a/common/persistence/visibility/store/standard/visibility_store.go b/common/persistence/visibility/store/standard/visibility_store.go index 03a4eaea6c8..c75b193673e 100644 --- a/common/persistence/visibility/store/standard/visibility_store.go +++ b/common/persistence/visibility/store/standard/visibility_store.go @@ -73,6 +73,10 @@ func (s *standardStore) GetName() string { return s.store.GetName() } +func (s *standardStore) GetIndexName() string { + return s.store.GetIndexName() +} + func (s *standardStore) RecordWorkflowExecutionStarted( ctx context.Context, request *store.InternalRecordWorkflowExecutionStartedRequest, diff --git a/common/persistence/visibility/store/visibility_store.go b/common/persistence/visibility/store/visibility_store.go index 5a1df29b3a5..b2ae07171cf 100644 --- a/common/persistence/visibility/store/visibility_store.go +++ b/common/persistence/visibility/store/visibility_store.go @@ -43,6 +43,7 @@ type ( VisibilityStore interface { persistence.Closeable GetName() string + GetIndexName() string // Write APIs. RecordWorkflowExecutionStarted(ctx context.Context, request *InternalRecordWorkflowExecutionStartedRequest) error diff --git a/common/persistence/visibility/store/visibility_store_mock.go b/common/persistence/visibility/store/visibility_store_mock.go index af305329410..013a9b3e77e 100644 --- a/common/persistence/visibility/store/visibility_store_mock.go +++ b/common/persistence/visibility/store/visibility_store_mock.go @@ -100,6 +100,20 @@ func (mr *MockVisibilityStoreMockRecorder) DeleteWorkflowExecution(ctx, request return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteWorkflowExecution", reflect.TypeOf((*MockVisibilityStore)(nil).DeleteWorkflowExecution), ctx, request) } +// GetIndexName mocks base method. +func (m *MockVisibilityStore) GetIndexName() string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetIndexName") + ret0, _ := ret[0].(string) + return ret0 +} + +// GetIndexName indicates an expected call of GetIndexName. +func (mr *MockVisibilityStoreMockRecorder) GetIndexName() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetIndexName", reflect.TypeOf((*MockVisibilityStore)(nil).GetIndexName)) +} + // GetName mocks base method. func (m *MockVisibilityStore) GetName() string { m.ctrl.T.Helper() diff --git a/common/persistence/visibility/visibility_manager_dual.go b/common/persistence/visibility/visibility_manager_dual.go index 9e799ddab8a..7475357e95e 100644 --- a/common/persistence/visibility/visibility_manager_dual.go +++ b/common/persistence/visibility/visibility_manager_dual.go @@ -64,6 +64,10 @@ func (v *visibilityManagerDual) GetName() string { return strings.Join([]string{v.visibilityManager.GetName(), v.secondaryVisibilityManager.GetName()}, ",") } +func (v *visibilityManagerDual) GetIndexName() string { + return v.visibilityManager.GetIndexName() +} + func (v *visibilityManagerDual) RecordWorkflowExecutionStarted( ctx context.Context, request *manager.RecordWorkflowExecutionStartedRequest, diff --git a/common/persistence/visibility/visibility_manager_impl.go b/common/persistence/visibility/visibility_manager_impl.go index c1427bbce1c..f5c189f5cf1 100644 --- a/common/persistence/visibility/visibility_manager_impl.go +++ b/common/persistence/visibility/visibility_manager_impl.go @@ -76,6 +76,10 @@ func (p *visibilityManagerImpl) GetName() string { return p.store.GetName() } +func (p *visibilityManagerImpl) GetIndexName() string { + return p.store.GetIndexName() +} + func (p *visibilityManagerImpl) RecordWorkflowExecutionStarted( ctx context.Context, request *manager.RecordWorkflowExecutionStartedRequest, diff --git a/common/persistence/visibility/visibility_manager_rate_limited.go b/common/persistence/visibility/visibility_manager_rate_limited.go index eb61c626616..e7aa59d13bc 100644 --- a/common/persistence/visibility/visibility_manager_rate_limited.go +++ b/common/persistence/visibility/visibility_manager_rate_limited.go @@ -67,6 +67,10 @@ func (m *visibilityManagerRateLimited) GetName() string { return m.delegate.GetName() } +func (m *visibilityManagerRateLimited) GetIndexName() string { + return m.delegate.GetIndexName() +} + // Below are write APIs. func (m *visibilityManagerRateLimited) RecordWorkflowExecutionStarted( diff --git a/common/persistence/visibility/visiblity_manager_metrics.go b/common/persistence/visibility/visiblity_manager_metrics.go index 50fafcad09f..b8545d6d98f 100644 --- a/common/persistence/visibility/visiblity_manager_metrics.go +++ b/common/persistence/visibility/visiblity_manager_metrics.go @@ -69,6 +69,10 @@ func (m *visibilityManagerMetrics) GetName() string { return m.delegate.GetName() } +func (m *visibilityManagerMetrics) GetIndexName() string { + return m.delegate.GetIndexName() +} + func (m *visibilityManagerMetrics) RecordWorkflowExecutionStarted( ctx context.Context, request *manager.RecordWorkflowExecutionStartedRequest, diff --git a/common/resourcetest/resourceTest.go b/common/resourcetest/resourceTest.go index 2083edf94f8..6f165a95a21 100644 --- a/common/resourcetest/resourceTest.go +++ b/common/resourcetest/resourceTest.go @@ -51,6 +51,7 @@ import ( "go.temporal.io/server/common/persistence" persistenceClient "go.temporal.io/server/common/persistence/client" "go.temporal.io/server/common/persistence/serialization" + "go.temporal.io/server/common/persistence/visibility/manager" esclient "go.temporal.io/server/common/persistence/visibility/store/elasticsearch/client" "go.temporal.io/server/common/primitives" "go.temporal.io/server/common/sdk" @@ -96,6 +97,7 @@ type ( ClientBean *client.MockBean ClientFactory *client.MockFactory ESClient *esclient.MockClient + VisibilityManager *manager.MockVisibilityManager // persistence clients @@ -206,6 +208,7 @@ func NewTest( ClientBean: clientBean, ClientFactory: clientFactory, ESClient: esclient.NewMockClient(controller), + VisibilityManager: manager.NewMockVisibilityManager(controller), // persistence clients @@ -373,6 +376,11 @@ func (t *Test) GetClientFactory() client.Factory { return t.ClientFactory } +// GetVisibilityManager for testing +func (t *Test) GetVisibilityManager() manager.VisibilityManager { + return t.VisibilityManager +} + // persistence clients // GetMetadataManager for testing diff --git a/common/searchattribute/manager.go b/common/searchattribute/manager.go index 271a783b4cb..1fd5f272bd3 100644 --- a/common/searchattribute/manager.go +++ b/common/searchattribute/manager.go @@ -87,7 +87,7 @@ func NewManager( } // GetSearchAttributes returns all search attributes (including system and build-in) for specified index. -// indexName can be an empty string when Elasticsearch is not configured. +// indexName can be an empty string for backward compatibility. func (m *managerImpl) GetSearchAttributes( indexName string, forceRefreshCache bool, @@ -111,11 +111,19 @@ func (m *managerImpl) GetSearchAttributes( } indexSearchAttributes, ok := saCache.searchAttributes[indexName] - if !ok { - return NameTypeMap{}, nil + if ok { + return indexSearchAttributes, nil } - return indexSearchAttributes, nil + if indexName != "" { + // Try to look for the empty string indexName for backward compatibility: up to v1.19, + // empty string was used when Elasticsearch was not configured. + indexSearchAttributes, ok = saCache.searchAttributes[""] + if ok { + return indexSearchAttributes, nil + } + } + return NameTypeMap{}, nil } func (m *managerImpl) needRefreshCache(saCache cache, forceRefreshCache bool, now time.Time) bool { diff --git a/service/frontend/adminHandler.go b/service/frontend/adminHandler.go index 63a48d324e9..c2ea21d17a0 100644 --- a/service/frontend/adminHandler.go +++ b/service/frontend/adminHandler.go @@ -97,7 +97,6 @@ type ( logger log.Logger numberOfHistoryShards int32 - ESConfig *esclient.Config ESClient esclient.Client config *Config namespaceHandler namespace.Handler @@ -127,7 +126,6 @@ type ( Config *Config NamespaceReplicationQueue persistence.NamespaceReplicationQueue ReplicatorNamespaceReplicationQueue persistence.NamespaceReplicationQueue - EsConfig *esclient.Config EsClient esclient.Client VisibilityMrg manager.VisibilityManager Logger log.Logger @@ -192,7 +190,6 @@ func NewAdminHandler( ), eventSerializer: args.EventSerializer, visibilityMgr: args.VisibilityMrg, - ESConfig: args.EsConfig, ESClient: args.EsClient, persistenceExecutionManager: args.PersistenceExecutionManager, namespaceReplicationQueue: args.NamespaceReplicationQueue, @@ -257,7 +254,7 @@ func (adh *AdminHandler) AddSearchAttributes(ctx context.Context, request *admin indexName := request.GetIndexName() if indexName == "" { - indexName = adh.ESConfig.GetVisibilityIndex() + indexName = adh.visibilityMgr.GetIndexName() } currentSearchAttributes, err := adh.saProvider.GetSearchAttributes(indexName, true) @@ -322,7 +319,7 @@ func (adh *AdminHandler) RemoveSearchAttributes(ctx context.Context, request *ad indexName := request.GetIndexName() if indexName == "" { - indexName = adh.ESConfig.GetVisibilityIndex() + indexName = adh.visibilityMgr.GetIndexName() } currentSearchAttributes, err := adh.saProvider.GetSearchAttributes(indexName, true) @@ -359,7 +356,7 @@ func (adh *AdminHandler) GetSearchAttributes(ctx context.Context, request *admin indexName := request.GetIndexName() if indexName == "" { - indexName = adh.ESConfig.GetVisibilityIndex() + indexName = adh.visibilityMgr.GetIndexName() } resp, err := adh.getSearchAttributes(ctx, indexName, "") diff --git a/service/frontend/adminHandler_test.go b/service/frontend/adminHandler_test.go index 19ab1314033..f774fc098be 100644 --- a/service/frontend/adminHandler_test.go +++ b/service/frontend/adminHandler_test.go @@ -67,7 +67,6 @@ import ( "go.temporal.io/server/common/persistence/serialization" "go.temporal.io/server/common/persistence/versionhistory" "go.temporal.io/server/common/persistence/visibility/manager" - "go.temporal.io/server/common/persistence/visibility/store/elasticsearch/client" "go.temporal.io/server/common/searchattribute" ) @@ -127,7 +126,7 @@ func (s *adminHandlerSuite) SetupTest() { s.mockClientFactory = s.mockResource.ClientFactory s.mockAdminClient = adminservicemock.NewMockAdminServiceClient(s.controller) s.mockMetadata = s.mockResource.ClusterMetadata - s.mockVisibilityMgr = manager.NewMockVisibilityManager(s.controller) + s.mockVisibilityMgr = s.mockResource.VisibilityManager s.mockProducer = persistence.NewMockNamespaceReplicationQueue(s.controller) persistenceConfig := &config.Persistence{ @@ -142,10 +141,9 @@ func (s *adminHandlerSuite) SetupTest() { cfg, s.mockResource.GetNamespaceReplicationQueue(), s.mockProducer, - nil, s.mockResource.ESClient, - s.mockVisibilityMgr, - s.mockResource.Logger, + s.mockResource.GetVisibilityManager(), + s.mockResource.GetLogger(), s.mockResource.GetExecutionManager(), s.mockResource.GetTaskManager(), s.mockResource.GetClusterMetadataManager(), @@ -491,6 +489,7 @@ func (s *adminHandlerSuite) Test_AddSearchAttributes() { } // Elasticsearch is not configured + s.mockVisibilityMgr.EXPECT().GetIndexName().Return("").AnyTimes() s.mockResource.SearchAttributesProvider.EXPECT().GetSearchAttributes("", true).Return(searchattribute.TestNameTypeMap, nil).AnyTimes() testCases3 := []test{ { @@ -521,12 +520,7 @@ func (s *adminHandlerSuite) Test_AddSearchAttributes() { } // Configure Elasticsearch: add advanced visibility store config with index name. - handler.ESConfig = &client.Config{ - Indices: map[string]string{ - "visibility": "random-index-name", - }, - } - + s.mockVisibilityMgr.EXPECT().GetIndexName().Return("random-index-name").AnyTimes() s.mockResource.SearchAttributesProvider.EXPECT().GetSearchAttributes("random-index-name", true).Return(searchattribute.TestNameTypeMap, nil).AnyTimes() testCases2 := []test{ { @@ -596,7 +590,7 @@ func (s *adminHandlerSuite) Test_AddSearchAttributes() { s.NotNil(resp) } -func (s *adminHandlerSuite) Test_GetSearchAttributes() { +func (s *adminHandlerSuite) Test_GetSearchAttributes_EmptyIndexName() { handler := s.handler ctx := context.Background() @@ -609,6 +603,7 @@ func (s *adminHandlerSuite) Test_GetSearchAttributes() { s.mockResource.SDKClientFactory.EXPECT().GetSystemClient().Return(mockSdkClient).AnyTimes() // Elasticsearch is not configured + s.mockVisibilityMgr.EXPECT().GetIndexName().Return("").AnyTimes() mockSdkClient.EXPECT().DescribeWorkflowExecution(gomock.Any(), "temporal-sys-add-search-attributes-workflow", "").Return( &workflowservice.DescribeWorkflowExecutionResponse{}, nil) s.mockResource.ESClient.EXPECT().GetMapping(gomock.Any(), "").Return(map[string]string{"col": "type"}, nil) @@ -617,19 +612,23 @@ func (s *adminHandlerSuite) Test_GetSearchAttributes() { resp, err = handler.GetSearchAttributes(ctx, &adminservice.GetSearchAttributesRequest{}) s.NoError(err) s.NotNil(resp) +} + +func (s *adminHandlerSuite) Test_GetSearchAttributes_NonEmptyIndexName() { + handler := s.handler + ctx := context.Background() + + mockSdkClient := mocksdk.NewMockClient(s.controller) + s.mockResource.SDKClientFactory.EXPECT().GetSystemClient().Return(mockSdkClient).AnyTimes() // Configure Elasticsearch: add advanced visibility store config with index name. - handler.ESConfig = &client.Config{ - Indices: map[string]string{ - "visibility": "random-index-name", - }, - } + s.mockVisibilityMgr.EXPECT().GetIndexName().Return("random-index-name").AnyTimes() mockSdkClient.EXPECT().DescribeWorkflowExecution(gomock.Any(), "temporal-sys-add-search-attributes-workflow", "").Return( &workflowservice.DescribeWorkflowExecutionResponse{}, nil) s.mockResource.ESClient.EXPECT().GetMapping(gomock.Any(), "random-index-name").Return(map[string]string{"col": "type"}, nil) s.mockResource.SearchAttributesProvider.EXPECT().GetSearchAttributes("random-index-name", true).Return(searchattribute.TestNameTypeMap, nil).AnyTimes() - resp, err = handler.GetSearchAttributes(ctx, &adminservice.GetSearchAttributesRequest{}) + resp, err := handler.GetSearchAttributes(ctx, &adminservice.GetSearchAttributesRequest{}) s.NoError(err) s.NotNil(resp) @@ -650,7 +649,7 @@ func (s *adminHandlerSuite) Test_GetSearchAttributes() { s.Nil(resp) } -func (s *adminHandlerSuite) Test_RemoveSearchAttributes() { +func (s *adminHandlerSuite) Test_RemoveSearchAttributes_EmptyIndexName() { handler := s.handler ctx := context.Background() @@ -681,8 +680,9 @@ func (s *adminHandlerSuite) Test_RemoveSearchAttributes() { } // Elasticsearch is not configured + s.mockVisibilityMgr.EXPECT().GetIndexName().Return("").AnyTimes() s.mockResource.SearchAttributesProvider.EXPECT().GetSearchAttributes("", true).Return(searchattribute.TestNameTypeMap, nil).AnyTimes() - testCases3 := []test{ + testCases2 := []test{ { Name: "reserved search attribute (empty index)", Request: &adminservice.RemoveSearchAttributesRequest{ @@ -702,23 +702,25 @@ func (s *adminHandlerSuite) Test_RemoveSearchAttributes() { Expected: &serviceerror.InvalidArgument{Message: "Search attribute ProductId doesn't exist."}, }, } - for _, testCase := range testCases3 { + for _, testCase := range testCases2 { s.T().Run(testCase.Name, func(t *testing.T) { resp, err := handler.RemoveSearchAttributes(ctx, testCase.Request) s.Equal(testCase.Expected, err) s.Nil(resp) }) } +} - // Configure Elasticsearch: add advanced visibility store config with index name. - handler.ESConfig = &client.Config{ - Indices: map[string]string{ - "visibility": "random-index-name", - }, - } +func (s *adminHandlerSuite) Test_RemoveSearchAttributes_NonEmptyIndexName() { + handler := s.handler + ctx := context.Background() - s.mockResource.SearchAttributesProvider.EXPECT().GetSearchAttributes("random-index-name", true).Return(searchattribute.TestNameTypeMap, nil).AnyTimes() - testCases2 := []test{ + type test struct { + Name string + Request *adminservice.RemoveSearchAttributesRequest + Expected error + } + testCases := []test{ { Name: "reserved search attribute (ES configured)", Request: &adminservice.RemoveSearchAttributesRequest{ @@ -738,7 +740,11 @@ func (s *adminHandlerSuite) Test_RemoveSearchAttributes() { Expected: &serviceerror.InvalidArgument{Message: "Search attribute ProductId doesn't exist."}, }, } - for _, testCase := range testCases2 { + + // Configure Elasticsearch: add advanced visibility store config with index name. + s.mockVisibilityMgr.EXPECT().GetIndexName().Return("random-index-name").AnyTimes() + s.mockResource.SearchAttributesProvider.EXPECT().GetSearchAttributes("random-index-name", true).Return(searchattribute.TestNameTypeMap, nil).AnyTimes() + for _, testCase := range testCases { s.T().Run(testCase.Name, func(t *testing.T) { resp, err := handler.RemoveSearchAttributes(ctx, testCase.Request) s.Equal(testCase.Expected, err) diff --git a/service/frontend/dcRedirectionHandler_test.go b/service/frontend/dcRedirectionHandler_test.go index 01452d69418..3d05c449175 100644 --- a/service/frontend/dcRedirectionHandler_test.go +++ b/service/frontend/dcRedirectionHandler_test.go @@ -43,6 +43,7 @@ import ( "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/persistence/visibility/manager" "go.temporal.io/server/common/resourcetest" ) @@ -56,6 +57,7 @@ type ( mockFrontendHandler *workflowservicemock.MockWorkflowServiceServer mockRemoteFrontendClient *workflowservicemock.MockWorkflowServiceClient mockClusterMetadata *cluster.MockMetadata + mockVisibilityMgr *manager.MockVisibilityManager mockDCRedirectionPolicy *MockDCRedirectionPolicy @@ -102,17 +104,19 @@ func (s *dcRedirectionHandlerSuite) SetupTest() { s.mockResource = resourcetest.NewTest(s.controller, metrics.Frontend) s.mockClusterMetadata = s.mockResource.ClusterMetadata s.mockRemoteFrontendClient = s.mockResource.RemoteFrontendClient + s.mockVisibilityMgr = s.mockResource.VisibilityManager s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(s.currentClusterName).AnyTimes() s.mockClusterMetadata.EXPECT().IsGlobalNamespaceEnabled().Return(true).AnyTimes() - s.config = NewConfig(dynamicconfig.NewCollection(dynamicconfig.NewNoopClient(), s.mockResource.GetLogger()), 0, "", false) + s.mockVisibilityMgr.EXPECT().GetIndexName().Return("").AnyTimes() + s.config = NewConfig(dynamicconfig.NewCollection(dynamicconfig.NewNoopClient(), s.mockResource.GetLogger()), 0, false) frontendHandlerGRPC := NewWorkflowHandler( s.config, nil, - nil, - s.mockResource.Logger, + s.mockResource.GetVisibilityManager(), + s.mockResource.GetLogger(), s.mockResource.GetThrottledLogger(), s.mockResource.GetExecutionManager(), s.mockResource.GetClusterMetadataManager(), diff --git a/service/frontend/dcRedirectionPolicy_test.go b/service/frontend/dcRedirectionPolicy_test.go index 43eb04c5fee..5a91f20ae45 100644 --- a/service/frontend/dcRedirectionPolicy_test.go +++ b/service/frontend/dcRedirectionPolicy_test.go @@ -138,7 +138,7 @@ func (s *selectedAPIsForwardingRedirectionPolicySuite) SetupTest() { logger := log.NewTestLogger() - s.mockConfig = NewConfig(dynamicconfig.NewCollection(dynamicconfig.NewNoopClient(), logger), 0, "", false) + s.mockConfig = NewConfig(dynamicconfig.NewCollection(dynamicconfig.NewNoopClient(), logger), 0, false) s.mockClusterMetadata.EXPECT().IsGlobalNamespaceEnabled().Return(true).AnyTimes() s.policy = NewSelectedAPIsForwardingPolicy( s.currentClusterName, diff --git a/service/frontend/fx.go b/service/frontend/fx.go index 4cade1c10a7..c1c880f0a07 100644 --- a/service/frontend/fx.go +++ b/service/frontend/fx.go @@ -213,12 +213,10 @@ func GrpcServerOptionsProvider( func ConfigProvider( dc *dynamicconfig.Collection, persistenceConfig config.Persistence, - esConfig *esclient.Config, ) *Config { return NewConfig( dc, persistenceConfig.NumHistoryShards, - esConfig.GetVisibilityIndex(), persistenceConfig.AdvancedVisibilityConfigExist(), ) } @@ -418,7 +416,6 @@ func AdminHandlerProvider( persistenceConfig *config.Persistence, config *Config, replicatorNamespaceReplicationQueue FEReplicatorNamespaceReplicationQueue, - esConfig *esclient.Config, esClient esclient.Client, visibilityMrg manager.VisibilityManager, logger log.SnTaggedLogger, @@ -448,7 +445,6 @@ func AdminHandlerProvider( config, namespaceReplicationQueue, replicatorNamespaceReplicationQueue, - esConfig, esClient, visibilityMrg, logger, @@ -477,11 +473,11 @@ func AdminHandlerProvider( func OperatorHandlerProvider( config *Config, - esConfig *esclient.Config, esClient esclient.Client, logger log.SnTaggedLogger, sdkClientFactory sdk.ClientFactory, metricsHandler metrics.Handler, + visibilityMgr manager.VisibilityManager, saProvider searchattribute.Provider, saManager searchattribute.Manager, healthServer *health.Server, @@ -493,11 +489,11 @@ func OperatorHandlerProvider( ) *OperatorHandlerImpl { args := NewOperatorHandlerImplArgs{ config, - esConfig, esClient, logger, sdkClientFactory, metricsHandler, + visibilityMgr, saProvider, saManager, healthServer, diff --git a/service/frontend/operator_handler.go b/service/frontend/operator_handler.go index 058229cabbd..1f7017bdb48 100644 --- a/service/frontend/operator_handler.go +++ b/service/frontend/operator_handler.go @@ -51,6 +51,7 @@ import ( "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence" + "go.temporal.io/server/common/persistence/visibility/manager" esclient "go.temporal.io/server/common/persistence/visibility/store/elasticsearch/client" "go.temporal.io/server/common/primitives" "go.temporal.io/server/common/sdk" @@ -70,10 +71,10 @@ type ( logger log.Logger config *Config - esConfig *esclient.Config esClient esclient.Client sdkClientFactory sdk.ClientFactory metricsHandler metrics.Handler + visibilityMgr manager.VisibilityManager saProvider searchattribute.Provider saManager searchattribute.Manager healthServer *health.Server @@ -86,11 +87,11 @@ type ( NewOperatorHandlerImplArgs struct { config *Config - EsConfig *esclient.Config EsClient esclient.Client Logger log.Logger sdkClientFactory sdk.ClientFactory MetricsHandler metrics.Handler + VisibilityMgr manager.VisibilityManager SaProvider searchattribute.Provider SaManager searchattribute.Manager healthServer *health.Server @@ -111,10 +112,10 @@ func NewOperatorHandlerImpl( logger: args.Logger, status: common.DaemonStatusInitialized, config: args.config, - esConfig: args.EsConfig, esClient: args.EsClient, sdkClientFactory: args.sdkClientFactory, metricsHandler: args.MetricsHandler, + visibilityMgr: args.VisibilityMgr, saProvider: args.SaProvider, saManager: args.SaManager, healthServer: args.healthServer, @@ -165,7 +166,7 @@ func (h *OperatorHandlerImpl) AddSearchAttributes(ctx context.Context, request * return nil, errSearchAttributesNotSet } - indexName := h.esConfig.GetVisibilityIndex() + indexName := h.visibilityMgr.GetIndexName() currentSearchAttributes, err := h.saProvider.GetSearchAttributes(indexName, true) if err != nil { @@ -229,7 +230,7 @@ func (h *OperatorHandlerImpl) RemoveSearchAttributes(ctx context.Context, reques return nil, errSearchAttributesNotSet } - indexName := h.esConfig.GetVisibilityIndex() + indexName := h.visibilityMgr.GetIndexName() currentSearchAttributes, err := h.saProvider.GetSearchAttributes(indexName, true) if err != nil { @@ -263,7 +264,7 @@ func (h *OperatorHandlerImpl) ListSearchAttributes(ctx context.Context, request return nil, errRequestNotSet } - indexName := h.esConfig.GetVisibilityIndex() + indexName := h.visibilityMgr.GetIndexName() var lastErr error var esMapping map[string]string = nil diff --git a/service/frontend/operator_handler_test.go b/service/frontend/operator_handler_test.go index 3a9a814bcb7..04f26fc70ea 100644 --- a/service/frontend/operator_handler_test.go +++ b/service/frontend/operator_handler_test.go @@ -46,7 +46,6 @@ import ( "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/persistence" - "go.temporal.io/server/common/persistence/visibility/store/elasticsearch/client" "go.temporal.io/server/common/resourcetest" "go.temporal.io/server/common/searchattribute" "go.temporal.io/server/common/testing/mocksdk" @@ -79,11 +78,11 @@ func (s *operatorHandlerSuite) SetupTest() { args := NewOperatorHandlerImplArgs{ &Config{NumHistoryShards: 4}, - nil, s.mockResource.ESClient, s.mockResource.Logger, s.mockResource.GetSDKClientFactory(), s.mockResource.GetMetricsHandler(), + s.mockResource.GetVisibilityManager(), s.mockResource.GetSearchAttributesProvider(), s.mockResource.GetSearchAttributesManager(), health.NewServer(), @@ -102,7 +101,7 @@ func (s *operatorHandlerSuite) TearDownTest() { s.handler.Stop() } -func (s *operatorHandlerSuite) Test_AddSearchAttributes() { +func (s *operatorHandlerSuite) Test_AddSearchAttributes_EmptyIndexName() { handler := s.handler ctx := context.Background() @@ -124,6 +123,8 @@ func (s *operatorHandlerSuite) Test_AddSearchAttributes() { Expected: &serviceerror.InvalidArgument{Message: "SearchAttributes are not set on request."}, }, } + + s.mockResource.VisibilityManager.EXPECT().GetIndexName().Return("").AnyTimes() for _, testCase := range testCases1 { s.T().Run(testCase.Name, func(t *testing.T) { resp, err := handler.AddSearchAttributes(ctx, testCase.Request) @@ -134,7 +135,7 @@ func (s *operatorHandlerSuite) Test_AddSearchAttributes() { // Elasticsearch is not configured s.mockResource.SearchAttributesProvider.EXPECT().GetSearchAttributes("", true).Return(searchattribute.TestNameTypeMap, nil).AnyTimes() - testCases3 := []test{ + testCases2 := []test{ { Name: "reserved key (empty index)", Request: &operatorservice.AddSearchAttributesRequest{ @@ -154,23 +155,25 @@ func (s *operatorHandlerSuite) Test_AddSearchAttributes() { Expected: &serviceerror.AlreadyExists{Message: "Search attribute CustomTextField already exists."}, }, } - for _, testCase := range testCases3 { + for _, testCase := range testCases2 { s.T().Run(testCase.Name, func(t *testing.T) { resp, err := handler.AddSearchAttributes(ctx, testCase.Request) s.Equal(testCase.Expected, err) s.Nil(resp) }) } +} - // Configure Elasticsearch: add advanced visibility store config with index name. - handler.esConfig = &client.Config{ - Indices: map[string]string{ - "visibility": "random-index-name", - }, - } +func (s *operatorHandlerSuite) Test_AddSearchAttributes_NonEmptyIndexName() { + handler := s.handler + ctx := context.Background() - s.mockResource.SearchAttributesProvider.EXPECT().GetSearchAttributes("random-index-name", true).Return(searchattribute.TestNameTypeMap, nil).AnyTimes() - testCases2 := []test{ + type test struct { + Name string + Request *operatorservice.AddSearchAttributesRequest + Expected error + } + testCases := []test{ { Name: "reserved key (ES configured)", Request: &operatorservice.AddSearchAttributesRequest{ @@ -190,7 +193,11 @@ func (s *operatorHandlerSuite) Test_AddSearchAttributes() { Expected: &serviceerror.AlreadyExists{Message: "Search attribute CustomTextField already exists."}, }, } - for _, testCase := range testCases2 { + + // Configure Elasticsearch: add advanced visibility store config with index name. + s.mockResource.VisibilityManager.EXPECT().GetIndexName().Return("random-index-name").AnyTimes() + s.mockResource.SearchAttributesProvider.EXPECT().GetSearchAttributes("random-index-name", true).Return(searchattribute.TestNameTypeMap, nil).AnyTimes() + for _, testCase := range testCases { s.T().Run(testCase.Name, func(t *testing.T) { resp, err := handler.AddSearchAttributes(ctx, testCase.Request) s.Equal(testCase.Expected, err) @@ -241,10 +248,11 @@ func (s *operatorHandlerSuite) Test_AddSearchAttributes() { s.NotNil(resp) } -func (s *operatorHandlerSuite) Test_ListSearchAttributes() { +func (s *operatorHandlerSuite) Test_ListSearchAttributes_EmptyIndexName() { handler := s.handler ctx := context.Background() + s.mockResource.VisibilityManager.EXPECT().GetIndexName().Return("").AnyTimes() resp, err := handler.ListSearchAttributes(ctx, nil) s.Error(err) s.Equal(&serviceerror.InvalidArgument{Message: "Request is nil."}, err) @@ -257,17 +265,17 @@ func (s *operatorHandlerSuite) Test_ListSearchAttributes() { resp, err = handler.ListSearchAttributes(ctx, &operatorservice.ListSearchAttributesRequest{}) s.NoError(err) s.NotNil(resp) +} - // Configure Elasticsearch: add advanced visibility store config with index name. - handler.esConfig = &client.Config{ - Indices: map[string]string{ - "visibility": "random-index-name", - }, - } +func (s *operatorHandlerSuite) Test_ListSearchAttributes_NonEmptyIndexName() { + handler := s.handler + ctx := context.Background() + // Configure Elasticsearch: add advanced visibility store config with index name. + s.mockResource.VisibilityManager.EXPECT().GetIndexName().Return("random-index-name").AnyTimes() s.mockResource.ESClient.EXPECT().GetMapping(gomock.Any(), "random-index-name").Return(map[string]string{"col": "type"}, nil) s.mockResource.SearchAttributesProvider.EXPECT().GetSearchAttributes("random-index-name", true).Return(searchattribute.TestNameTypeMap, nil) - resp, err = handler.ListSearchAttributes(ctx, &operatorservice.ListSearchAttributesRequest{}) + resp, err := handler.ListSearchAttributes(ctx, &operatorservice.ListSearchAttributesRequest{}) s.NoError(err) s.NotNil(resp) @@ -278,7 +286,7 @@ func (s *operatorHandlerSuite) Test_ListSearchAttributes() { s.Nil(resp) } -func (s *operatorHandlerSuite) Test_RemoveSearchAttributes() { +func (s *operatorHandlerSuite) Test_RemoveSearchAttributes_EmptyIndexName() { handler := s.handler ctx := context.Background() @@ -300,6 +308,8 @@ func (s *operatorHandlerSuite) Test_RemoveSearchAttributes() { Expected: &serviceerror.InvalidArgument{Message: "SearchAttributes are not set on request."}, }, } + + s.mockResource.VisibilityManager.EXPECT().GetIndexName().Return("").AnyTimes() for _, testCase := range testCases1 { s.T().Run(testCase.Name, func(t *testing.T) { resp, err := handler.RemoveSearchAttributes(ctx, testCase.Request) @@ -310,7 +320,7 @@ func (s *operatorHandlerSuite) Test_RemoveSearchAttributes() { // Elasticsearch is not configured s.mockResource.SearchAttributesProvider.EXPECT().GetSearchAttributes("", true).Return(searchattribute.TestNameTypeMap, nil).AnyTimes() - testCases3 := []test{ + testCases2 := []test{ { Name: "reserved search attribute (empty index)", Request: &operatorservice.RemoveSearchAttributesRequest{ @@ -330,23 +340,25 @@ func (s *operatorHandlerSuite) Test_RemoveSearchAttributes() { Expected: &serviceerror.NotFound{Message: "Search attribute ProductId doesn't exist."}, }, } - for _, testCase := range testCases3 { + for _, testCase := range testCases2 { s.T().Run(testCase.Name, func(t *testing.T) { resp, err := handler.RemoveSearchAttributes(ctx, testCase.Request) s.Equal(testCase.Expected, err) s.Nil(resp) }) } +} - // Configure Elasticsearch: add advanced visibility store config with index name. - handler.esConfig = &client.Config{ - Indices: map[string]string{ - "visibility": "random-index-name", - }, - } +func (s *operatorHandlerSuite) Test_RemoveSearchAttributes_NonEmptyIndexName() { + handler := s.handler + ctx := context.Background() - s.mockResource.SearchAttributesProvider.EXPECT().GetSearchAttributes("random-index-name", true).Return(searchattribute.TestNameTypeMap, nil).AnyTimes() - testCases2 := []test{ + type test struct { + Name string + Request *operatorservice.RemoveSearchAttributesRequest + Expected error + } + testCases := []test{ { Name: "reserved search attribute (ES configured)", Request: &operatorservice.RemoveSearchAttributesRequest{ @@ -366,7 +378,11 @@ func (s *operatorHandlerSuite) Test_RemoveSearchAttributes() { Expected: &serviceerror.NotFound{Message: "Search attribute ProductId doesn't exist."}, }, } - for _, testCase := range testCases2 { + + // Configure Elasticsearch: add advanced visibility store config with index name. + s.mockResource.VisibilityManager.EXPECT().GetIndexName().Return("random-index-name").AnyTimes() + s.mockResource.SearchAttributesProvider.EXPECT().GetSearchAttributes("random-index-name", true).Return(searchattribute.TestNameTypeMap, nil).AnyTimes() + for _, testCase := range testCases { s.T().Run(testCase.Name, func(t *testing.T) { resp, err := handler.RemoveSearchAttributes(ctx, testCase.Request) s.Equal(testCase.Expected, err) diff --git a/service/frontend/service.go b/service/frontend/service.go index d13251a599e..753123e7e7f 100644 --- a/service/frontend/service.go +++ b/service/frontend/service.go @@ -55,7 +55,6 @@ import ( // Config represents configuration for frontend service type Config struct { NumHistoryShards int32 - ESIndexName string PersistenceMaxQPS dynamicconfig.IntPropertyFn PersistenceGlobalMaxQPS dynamicconfig.IntPropertyFn PersistenceNamespaceMaxQPS dynamicconfig.IntPropertyFnWithNamespaceFilter @@ -169,10 +168,9 @@ type Config struct { } // NewConfig returns new service config with default values -func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int32, esIndexName string, enableReadFromES bool) *Config { +func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int32, enableReadFromES bool) *Config { return &Config{ NumHistoryShards: numHistoryShards, - ESIndexName: esIndexName, PersistenceMaxQPS: dc.GetIntProperty(dynamicconfig.FrontendPersistenceMaxQPS, 2000), PersistenceGlobalMaxQPS: dc.GetIntProperty(dynamicconfig.FrontendPersistenceGlobalMaxQPS, 0), PersistenceNamespaceMaxQPS: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendPersistenceNamespaceMaxQPS, 0), diff --git a/service/frontend/workflow_handler.go b/service/frontend/workflow_handler.go index bb49d38bf25..1da918d3311 100644 --- a/service/frontend/workflow_handler.go +++ b/service/frontend/workflow_handler.go @@ -182,7 +182,7 @@ func NewWorkflowHandler( config.SearchAttributesNumberOfKeysLimit, config.SearchAttributesSizeOfValueLimit, config.SearchAttributesTotalSizeLimit, - config.ESIndexName, + visibilityMrg.GetIndexName(), ), archivalMetadata: archivalMetadata, healthServer: healthServer, @@ -2439,7 +2439,7 @@ func (wh *WorkflowHandler) ListArchivedWorkflowExecutions(ctx context.Context, r Query: request.GetQuery(), } - searchAttributes, err := wh.saProvider.GetSearchAttributes(wh.config.ESIndexName, false) + searchAttributes, err := wh.saProvider.GetSearchAttributes(wh.visibilityMrg.GetIndexName(), false) if err != nil { return nil, serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetSearchAttributesMessage, err)) } @@ -2553,7 +2553,7 @@ func (wh *WorkflowHandler) GetSearchAttributes(ctx context.Context, _ *workflows return nil, errShuttingDown } - searchAttributes, err := wh.saProvider.GetSearchAttributes(wh.config.ESIndexName, false) + searchAttributes, err := wh.saProvider.GetSearchAttributes(wh.visibilityMrg.GetIndexName(), false) if err != nil { return nil, serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetSearchAttributesMessage, err)) } @@ -2760,7 +2760,7 @@ func (wh *WorkflowHandler) DescribeWorkflowExecution(ctx context.Context, reques } if response.GetWorkflowExecutionInfo().GetSearchAttributes() != nil { - saTypeMap, err := wh.saProvider.GetSearchAttributes(wh.config.ESIndexName, false) + saTypeMap, err := wh.saProvider.GetSearchAttributes(wh.visibilityMrg.GetIndexName(), false) if err != nil { return nil, serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetSearchAttributesMessage, err)) } @@ -3107,7 +3107,7 @@ func (wh *WorkflowHandler) DescribeSchedule(ctx context.Context, request *workfl // map search attributes if sas := executionInfo.GetSearchAttributes(); sas != nil { - saTypeMap, err := wh.saProvider.GetSearchAttributes(wh.config.ESIndexName, false) + saTypeMap, err := wh.saProvider.GetSearchAttributes(wh.visibilityMrg.GetIndexName(), false) if err != nil { return nil, serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetSearchAttributesMessage, err)) } @@ -3151,7 +3151,7 @@ func (wh *WorkflowHandler) DescribeSchedule(ctx context.Context, request *workfl // map action search attributes if sa := queryResponse.Schedule.Action.GetStartWorkflow().SearchAttributes; sa != nil { - saTypeMap, err := wh.saProvider.GetSearchAttributes(wh.config.ESIndexName, false) + saTypeMap, err := wh.saProvider.GetSearchAttributes(wh.visibilityMrg.GetIndexName(), false) if err != nil { return serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetSearchAttributesMessage, err)) } @@ -4241,7 +4241,7 @@ func (wh *WorkflowHandler) getHistoryReverse( } func (wh *WorkflowHandler) processOutgoingSearchAttributes(events []*historypb.HistoryEvent, namespace namespace.Name) error { - saTypeMap, err := wh.saProvider.GetSearchAttributes(wh.config.ESIndexName, false) + saTypeMap, err := wh.saProvider.GetSearchAttributes(wh.visibilityMrg.GetIndexName(), false) if err != nil { return serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetSearchAttributesMessage, err)) } diff --git a/service/frontend/workflow_handler_test.go b/service/frontend/workflow_handler_test.go index 18e8bde4b30..487a254b654 100644 --- a/service/frontend/workflow_handler_test.go +++ b/service/frontend/workflow_handler_test.go @@ -81,6 +81,7 @@ import ( const ( numHistoryShards = 10 + esIndexName = "" testWorkflowID = "test-workflow-id" testRunID = "test-run-id" @@ -146,7 +147,7 @@ func (s *workflowHandlerSuite) SetupTest() { s.mockSearchAttributesMapper = s.mockResource.SearchAttributesMapper s.mockMetadataMgr = s.mockResource.MetadataMgr s.mockExecutionManager = s.mockResource.ExecutionMgr - s.mockVisibilityMgr = manager.NewMockVisibilityManager(s.controller) + s.mockVisibilityMgr = s.mockResource.VisibilityManager s.mockArchivalMetadata = s.mockResource.ArchivalMetadata s.mockArchiverProvider = s.mockResource.ArchiverProvider s.mockMatchingClient = s.mockResource.MatchingClient @@ -166,11 +167,12 @@ func (s *workflowHandlerSuite) TearDownTest() { } func (s *workflowHandlerSuite) getWorkflowHandler(config *Config) *WorkflowHandler { + s.mockVisibilityMgr.EXPECT().GetIndexName().Return(esIndexName).AnyTimes() return NewWorkflowHandler( config, s.mockProducer, - s.mockVisibilityMgr, - s.mockResource.Logger, + s.mockResource.GetVisibilityManager(), + s.mockResource.GetLogger(), s.mockResource.GetThrottledLogger(), s.mockResource.GetExecutionManager(), s.mockResource.GetClusterMetadataManager(), @@ -262,7 +264,7 @@ func (s *workflowHandlerSuite) TestTransientTaskInjection() { // Needed to execute test but not relevant s.mockSearchAttributesProvider.EXPECT(). - GetSearchAttributes(cfg.ESIndexName, false). + GetSearchAttributes(esIndexName, false). Return(searchattribute.NameTypeMap{}, nil). AnyTimes() @@ -2541,7 +2543,7 @@ func (s *workflowHandlerSuite) TestListBatchOperations_InvalidRerquest() { } func (s *workflowHandlerSuite) newConfig() *Config { - return NewConfig(dc.NewCollection(dc.NewNoopClient(), s.mockResource.GetLogger()), numHistoryShards, "", false) + return NewConfig(dc.NewCollection(dc.NewNoopClient(), s.mockResource.GetLogger()), numHistoryShards, false) } func updateRequest( diff --git a/service/history/fx.go b/service/history/fx.go index 5718d1c3beb..31cbf35e89e 100644 --- a/service/history/fx.go +++ b/service/history/fx.go @@ -163,10 +163,23 @@ func ConfigProvider( persistenceConfig config.Persistence, esConfig *esclient.Config, ) *configs.Config { + indexName := "" + if persistenceConfig.StandardVisibilityConfigExist() { + storeConfig := persistenceConfig.DataStores[persistenceConfig.VisibilityStore] + switch { + case storeConfig.Cassandra != nil: + indexName = storeConfig.Cassandra.Keyspace + case storeConfig.SQL != nil: + indexName = storeConfig.SQL.DatabaseName + } + } else if persistenceConfig.AdvancedVisibilityConfigExist() { + indexName = esConfig.GetVisibilityIndex() + } return configs.NewConfig(dc, persistenceConfig.NumHistoryShards, persistenceConfig.AdvancedVisibilityConfigExist(), - esConfig.GetVisibilityIndex()) + indexName, + ) } func ThrottledLoggerRpsFnProvider(serviceConfig *configs.Config) resource.ThrottledLoggerRpsFn { diff --git a/tests/test_cluster.go b/tests/test_cluster.go index de4c6c40e6e..cff632f2323 100644 --- a/tests/test_cluster.go +++ b/tests/test_cluster.go @@ -159,14 +159,26 @@ func NewCluster(options *TestClusterConfig, logger log.Logger) (*TestCluster, er pConfig.NumHistoryShards = options.HistoryConfig.NumHistoryShards var ( - esClient esclient.Client + indexName string + esClient esclient.Client ) if options.ESConfig != nil { + // Disable standard to elasticsearch dual visibility + pConfig.VisibilityStore = "" + indexName = options.ESConfig.GetVisibilityIndex() var err error esClient, err = esclient.NewClient(options.ESConfig, nil, logger) if err != nil { return nil, err } + } else { + storeConfig := pConfig.DataStores[pConfig.VisibilityStore] + switch { + case storeConfig.Cassandra != nil: + indexName = storeConfig.Cassandra.Keyspace + case storeConfig.SQL != nil: + indexName = storeConfig.SQL.DatabaseName + } } clusterInfoMap := make(map[string]cluster.ClusterInformation) @@ -194,7 +206,7 @@ func NewCluster(options *TestClusterConfig, logger log.Logger) (*TestCluster, er // Actual Elasticsearch fields are created from index template (testdata/es_v7_index_template.json). err := testBase.SearchAttributesManager.SaveSearchAttributes( context.Background(), - options.ESConfig.GetVisibilityIndex(), + indexName, searchattribute.TestNameTypeMap.Custom(), ) if err != nil {