From 750fc54c3af2688312e68124c479463efd1c7375 Mon Sep 17 00:00:00 2001 From: Rodrigo Zhou Date: Tue, 31 Jan 2023 14:53:01 -0800 Subject: [PATCH] Implement searchattribute.MapperProvider (#3873) --- common/persistence/visibility/factory.go | 14 +-- .../store/elasticsearch/query_interceptors.go | 31 +++--- .../store/elasticsearch/visibility_store.go | 38 +++---- .../visibility_store_read_test.go | 69 +++++------- common/resource/fx.go | 9 ++ common/resourcetest/resourceTest.go | 24 ++-- common/searchattribute/mapper.go | 61 ++++++++++- common/searchattribute/mapper_mock.go | 39 +++++++ common/searchattribute/mapper_test.go | 103 ++++++------------ common/searchattribute/test_provider.go | 52 +++++++++ common/searchattribute/validator.go | 19 +++- common/searchattribute/validator_test.go | 14 +-- service/frontend/dcRedirectionHandler_test.go | 2 +- service/frontend/fx.go | 8 +- service/frontend/workflow_handler.go | 26 ++--- service/frontend/workflow_handler_test.go | 30 ++--- service/history/commandChecker_test.go | 2 +- service/history/fx.go | 5 +- service/history/handler.go | 2 - service/history/historyEngine.go | 2 +- service/history/historyEngine2_test.go | 12 +- service/history/historyEngine_test.go | 7 ++ service/history/shard/context.go | 2 +- service/history/shard/context_impl.go | 10 +- service/history/shard/context_mock.go | 14 +-- service/history/shard/context_testutil.go | 2 +- service/history/shard/controller_impl.go | 4 +- service/history/shard/controller_test.go | 2 +- service/history/shard/fx.go | 4 +- service/history/workflowTaskHandler.go | 20 ++-- .../history/workflowTaskHandlerCallbacks.go | 32 +++--- .../workflowTaskHandlerCallbacks_test.go | 2 +- service/worker/fx.go | 4 +- 33 files changed, 393 insertions(+), 272 deletions(-) diff --git a/common/persistence/visibility/factory.go b/common/persistence/visibility/factory.go index 9e5b84b125c..69da46227f4 100644 --- a/common/persistence/visibility/factory.go +++ b/common/persistence/visibility/factory.go @@ -49,7 +49,7 @@ func NewManager( esClient esclient.Client, esProcessorConfig *elasticsearch.ProcessorConfig, searchAttributesProvider searchattribute.Provider, - searchAttributesMapper searchattribute.Mapper, + searchAttributesMapperProvider searchattribute.MapperProvider, standardVisibilityPersistenceMaxReadQPS dynamicconfig.IntPropertyFn, standardVisibilityPersistenceMaxWriteQPS dynamicconfig.IntPropertyFn, @@ -81,7 +81,7 @@ func NewManager( esClient, esProcessorConfig, searchAttributesProvider, - searchAttributesMapper, + searchAttributesMapperProvider, advancedVisibilityPersistenceMaxReadQPS, advancedVisibilityPersistenceMaxWriteQPS, visibilityDisableOrderByClause, @@ -97,7 +97,7 @@ func NewManager( esClient, esProcessorConfig, searchAttributesProvider, - searchAttributesMapper, + searchAttributesMapperProvider, advancedVisibilityPersistenceMaxReadQPS, advancedVisibilityPersistenceMaxWriteQPS, visibilityDisableOrderByClause, @@ -187,7 +187,7 @@ func NewAdvancedManager( esClient esclient.Client, esProcessorConfig *elasticsearch.ProcessorConfig, searchAttributesProvider searchattribute.Provider, - searchAttributesMapper searchattribute.Mapper, + searchAttributesMapperProvider searchattribute.MapperProvider, advancedVisibilityPersistenceMaxReadQPS dynamicconfig.IntPropertyFn, advancedVisibilityPersistenceMaxWriteQPS dynamicconfig.IntPropertyFn, @@ -205,7 +205,7 @@ func NewAdvancedManager( esClient, esProcessorConfig, searchAttributesProvider, - searchAttributesMapper, + searchAttributesMapperProvider, visibilityDisableOrderByClause, metricsHandler, logger) @@ -289,7 +289,7 @@ func newAdvancedVisibilityStore( esClient esclient.Client, esProcessorConfig *elasticsearch.ProcessorConfig, searchAttributesProvider searchattribute.Provider, - searchAttributesMapper searchattribute.Mapper, + searchAttributesMapperProvider searchattribute.MapperProvider, visibilityDisableOrderByClause dynamicconfig.BoolPropertyFn, metricsHandler metrics.Handler, logger log.Logger, @@ -311,7 +311,7 @@ func newAdvancedVisibilityStore( esClient, defaultIndexName, searchAttributesProvider, - searchAttributesMapper, + searchAttributesMapperProvider, esProcessor, esProcessorAckTimeout, visibilityDisableOrderByClause, diff --git a/common/persistence/visibility/store/elasticsearch/query_interceptors.go b/common/persistence/visibility/store/elasticsearch/query_interceptors.go index ea6a6ce66f8..465f2c7507f 100644 --- a/common/persistence/visibility/store/elasticsearch/query_interceptors.go +++ b/common/persistence/visibility/store/elasticsearch/query_interceptors.go @@ -39,11 +39,11 @@ import ( type ( nameInterceptor struct { - namespace namespace.Name - index string - searchAttributesTypeMap searchattribute.NameTypeMap - searchAttributesMapper searchattribute.Mapper - seenNamespaceDivision bool + namespace namespace.Name + index string + searchAttributesTypeMap searchattribute.NameTypeMap + searchAttributesMapperProvider searchattribute.MapperProvider + seenNamespaceDivision bool } valuesInterceptor struct{} ) @@ -52,13 +52,13 @@ func newNameInterceptor( namespace namespace.Name, index string, saTypeMap searchattribute.NameTypeMap, - searchAttributesMapper searchattribute.Mapper, + searchAttributesMapperProvider searchattribute.MapperProvider, ) *nameInterceptor { return &nameInterceptor{ - namespace: namespace, - index: index, - searchAttributesTypeMap: saTypeMap, - searchAttributesMapper: searchAttributesMapper, + namespace: namespace, + index: index, + searchAttributesTypeMap: saTypeMap, + searchAttributesMapperProvider: searchAttributesMapperProvider, } } @@ -68,12 +68,17 @@ func NewValuesInterceptor() *valuesInterceptor { func (ni *nameInterceptor) Name(name string, usage query.FieldNameUsage) (string, error) { fieldName := name - if searchattribute.IsMappable(name) && ni.searchAttributesMapper != nil { - var err error - fieldName, err = ni.searchAttributesMapper.GetFieldName(name, ni.namespace.String()) + if searchattribute.IsMappable(name) { + mapper, err := ni.searchAttributesMapperProvider.GetMapper(ni.namespace) if err != nil { return "", err } + if mapper != nil { + fieldName, err = mapper.GetFieldName(name, ni.namespace.String()) + if err != nil { + return "", err + } + } } fieldType, err := ni.searchAttributesTypeMap.GetType(fieldName) diff --git a/common/persistence/visibility/store/elasticsearch/visibility_store.go b/common/persistence/visibility/store/elasticsearch/visibility_store.go index 7b90308578a..43b836b9e6e 100644 --- a/common/persistence/visibility/store/elasticsearch/visibility_store.go +++ b/common/persistence/visibility/store/elasticsearch/visibility_store.go @@ -68,14 +68,14 @@ var defaultSorter = []elastic.Sorter{ type ( visibilityStore struct { - esClient client.Client - index string - searchAttributesProvider searchattribute.Provider - searchAttributesMapper searchattribute.Mapper - processor Processor - processorAckTimeout dynamicconfig.DurationPropertyFn - disableOrderByClause dynamicconfig.BoolPropertyFn - metricsHandler metrics.Handler + esClient client.Client + index string + searchAttributesProvider searchattribute.Provider + searchAttributesMapperProvider searchattribute.MapperProvider + processor Processor + processorAckTimeout dynamicconfig.DurationPropertyFn + disableOrderByClause dynamicconfig.BoolPropertyFn + metricsHandler metrics.Handler } visibilityPageToken struct { @@ -100,7 +100,7 @@ func NewVisibilityStore( esClient client.Client, index string, searchAttributesProvider searchattribute.Provider, - searchAttributesMapper searchattribute.Mapper, + searchAttributesMapperProvider searchattribute.MapperProvider, processor Processor, processorAckTimeout dynamicconfig.DurationPropertyFn, disableOrderByClause dynamicconfig.BoolPropertyFn, @@ -108,14 +108,14 @@ func NewVisibilityStore( ) *visibilityStore { return &visibilityStore{ - esClient: esClient, - index: index, - searchAttributesProvider: searchAttributesProvider, - searchAttributesMapper: searchAttributesMapper, - processor: processor, - processorAckTimeout: processorAckTimeout, - disableOrderByClause: disableOrderByClause, - metricsHandler: metricsHandler.WithTags(metrics.OperationTag(metrics.ElasticsearchVisibility)), + esClient: esClient, + index: index, + searchAttributesProvider: searchAttributesProvider, + searchAttributesMapperProvider: searchAttributesMapperProvider, + processor: processor, + processorAckTimeout: processorAckTimeout, + disableOrderByClause: disableOrderByClause, + metricsHandler: metricsHandler.WithTags(metrics.OperationTag(metrics.ElasticsearchVisibility)), } } @@ -730,7 +730,7 @@ func (s *visibilityStore) convertQuery( if err != nil { return nil, nil, serviceerror.NewUnavailable(fmt.Sprintf("Unable to read search attribute types: %v", err)) } - nameInterceptor := newNameInterceptor(namespace, s.index, saTypeMap, s.searchAttributesMapper) + nameInterceptor := newNameInterceptor(namespace, s.index, saTypeMap, s.searchAttributesMapperProvider) queryConverter := newQueryConverter(nameInterceptor, NewValuesInterceptor()) requestQuery, fieldSorts, err := queryConverter.ConvertWhereOrderBy(requestQueryStr) if err != nil { @@ -991,7 +991,7 @@ func (s *visibilityStore) parseESDoc(docID string, docSource json.RawMessage, sa s.metricsHandler.Counter(metrics.ElasticsearchDocumentParseFailuresCount.GetMetricName()).Record(1) return nil, serviceerror.NewInternal(fmt.Sprintf("Unable to encode custom search attributes of Elasticsearch document(%s): %v", docID, err)) } - aliasedSas, err := searchattribute.AliasFields(s.searchAttributesMapper, record.SearchAttributes, namespace.String()) + aliasedSas, err := searchattribute.AliasFields(s.searchAttributesMapperProvider, record.SearchAttributes, namespace.String()) if err != nil { return nil, err } diff --git a/common/persistence/visibility/store/elasticsearch/visibility_store_read_test.go b/common/persistence/visibility/store/elasticsearch/visibility_store_read_test.go index 05cfbb925a4..1fb8c1bdd9e 100644 --- a/common/persistence/visibility/store/elasticsearch/visibility_store_read_test.go +++ b/common/persistence/visibility/store/elasticsearch/visibility_store_read_test.go @@ -57,12 +57,12 @@ type ( suite.Suite // override suite.Suite.Assertions with require.Assertions; this means that s.NotNil(nil) will stop the test, not merely log an error *require.Assertions - controller *gomock.Controller - visibilityStore *visibilityStore - mockESClient *client.MockClient - mockProcessor *MockProcessor - mockMetricsHandler *metrics.MockHandler - mockSearchAttributesMapper *searchattribute.MockMapper + controller *gomock.Controller + visibilityStore *visibilityStore + mockESClient *client.MockClient + mockProcessor *MockProcessor + mockMetricsHandler *metrics.MockHandler + mockSearchAttributesMapperProvider *searchattribute.MockMapperProvider } ) @@ -126,12 +126,12 @@ func (s *ESVisibilitySuite) SetupTest() { s.mockMetricsHandler.EXPECT().WithTags(metrics.OperationTag(metrics.ElasticsearchVisibility)).Return(s.mockMetricsHandler).AnyTimes() s.mockProcessor = NewMockProcessor(s.controller) s.mockESClient = client.NewMockClient(s.controller) - s.mockSearchAttributesMapper = searchattribute.NewMockMapper(s.controller) + s.mockSearchAttributesMapperProvider = searchattribute.NewMockMapperProvider(s.controller) s.visibilityStore = NewVisibilityStore( s.mockESClient, testIndex, searchattribute.NewTestProvider(), - nil, + searchattribute.NewTestMapperProvider(nil), s.mockProcessor, esProcessorAckTimeout, visibilityDisableOrderByClause, @@ -649,15 +649,10 @@ func (s *ESVisibilitySuite) Test_convertQuery() { } func (s *ESVisibilitySuite) Test_convertQuery_Mapper() { - s.mockSearchAttributesMapper.EXPECT().GetFieldName(gomock.Any(), testNamespace.String()).DoAndReturn( - func(alias string, namespace string) (string, error) { - if strings.HasPrefix(alias, "AliasFor") { - return strings.TrimPrefix(alias, "AliasFor"), nil - } - return "", serviceerror.NewInvalidArgument("mapper error") - }).AnyTimes() + s.mockSearchAttributesMapperProvider.EXPECT().GetMapper(testNamespace). + Return(&searchattribute.TestMapper{}, nil).AnyTimes() - s.visibilityStore.searchAttributesMapper = s.mockSearchAttributesMapper + s.visibilityStore.searchAttributesMapperProvider = s.mockSearchAttributesMapperProvider query := `WorkflowId = 'wid'` qry, srt, err := s.visibilityStore.convertQuery(testNamespace, testNamespaceID, query) @@ -707,16 +702,14 @@ func (s *ESVisibilitySuite) Test_convertQuery_Mapper() { s.Error(err) s.ErrorAs(err, &invalidArgumentErr) s.EqualError(err, "invalid query: unable to convert 'order by' column name: invalid search attribute: AliasForUnknownField") - s.visibilityStore.searchAttributesMapper = nil + s.visibilityStore.searchAttributesMapperProvider = nil } func (s *ESVisibilitySuite) Test_convertQuery_Mapper_Error() { - s.mockSearchAttributesMapper.EXPECT().GetFieldName(gomock.Any(), testNamespace.String()).DoAndReturn( - func(fieldName string, namespace string) (string, error) { - return "", serviceerror.NewInvalidArgument("mapper error") - }).AnyTimes() + s.mockSearchAttributesMapperProvider.EXPECT().GetMapper(testNamespace). + Return(&searchattribute.TestMapper{}, nil).AnyTimes() - s.visibilityStore.searchAttributesMapper = s.mockSearchAttributesMapper + s.visibilityStore.searchAttributesMapperProvider = s.mockSearchAttributesMapperProvider query := `WorkflowId = 'wid'` qry, srt, err := s.visibilityStore.convertQuery(testNamespace, testNamespaceID, query) @@ -743,7 +736,7 @@ func (s *ESVisibilitySuite) Test_convertQuery_Mapper_Error() { s.ErrorAs(err, &invalidArgumentErr) s.EqualError(err, "mapper error") - s.visibilityStore.searchAttributesMapper = nil + s.visibilityStore.searchAttributesMapperProvider = nil } func (s *ESVisibilitySuite) TestGetListWorkflowExecutionsResponse() { @@ -966,12 +959,10 @@ func (s *ESVisibilitySuite) TestParseESDoc_SearchAttributes_WithMapper() { "CustomIntField": [111,222], "CustomBoolField": true, "UnknownField": "random"}`) - s.visibilityStore.searchAttributesMapper = s.mockSearchAttributesMapper + s.visibilityStore.searchAttributesMapperProvider = s.mockSearchAttributesMapperProvider - s.mockSearchAttributesMapper.EXPECT().GetAlias(gomock.Any(), testNamespace.String()).DoAndReturn( - func(fieldName string, namespace string) (string, error) { - return "AliasOf" + fieldName, nil - }).Times(6) + s.mockSearchAttributesMapperProvider.EXPECT().GetMapper(testNamespace). + Return(&searchattribute.TestMapper{}, nil).AnyTimes() info, err := s.visibilityStore.parseESDoc("", docSource, searchattribute.TestNameTypeMap, testNamespace) s.NoError(err) @@ -979,24 +970,16 @@ func (s *ESVisibilitySuite) TestParseESDoc_SearchAttributes_WithMapper() { s.Len(info.SearchAttributes.GetIndexedFields(), 7) s.Contains(info.SearchAttributes.GetIndexedFields(), "TemporalChangeVersion") - s.Contains(info.SearchAttributes.GetIndexedFields(), "AliasOfCustomKeywordField") - s.Contains(info.SearchAttributes.GetIndexedFields(), "AliasOfCustomTextField") - s.Contains(info.SearchAttributes.GetIndexedFields(), "AliasOfCustomDatetimeField") - s.Contains(info.SearchAttributes.GetIndexedFields(), "AliasOfCustomDoubleField") - s.Contains(info.SearchAttributes.GetIndexedFields(), "AliasOfCustomBoolField") - s.Contains(info.SearchAttributes.GetIndexedFields(), "AliasOfCustomIntField") + s.Contains(info.SearchAttributes.GetIndexedFields(), "AliasForCustomKeywordField") + s.Contains(info.SearchAttributes.GetIndexedFields(), "AliasForCustomTextField") + s.Contains(info.SearchAttributes.GetIndexedFields(), "AliasForCustomDatetimeField") + s.Contains(info.SearchAttributes.GetIndexedFields(), "AliasForCustomDoubleField") + s.Contains(info.SearchAttributes.GetIndexedFields(), "AliasForCustomBoolField") + s.Contains(info.SearchAttributes.GetIndexedFields(), "AliasForCustomIntField") s.NotContains(info.SearchAttributes.GetIndexedFields(), "UnknownField") s.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, info.Status) - s.mockSearchAttributesMapper.EXPECT().GetAlias(gomock.Any(), testNamespace.String()).DoAndReturn( - func(fieldName string, namespace string) (string, error) { - return "", serviceerror.NewUnavailable("error") - }) - info, err = s.visibilityStore.parseESDoc("", docSource, searchattribute.TestNameTypeMap, testNamespace) - s.Error(err) - s.Nil(info) - - s.visibilityStore.searchAttributesMapper = nil + s.visibilityStore.searchAttributesMapperProvider = nil } func (s *ESVisibilitySuite) TestListWorkflowExecutions() { diff --git a/common/resource/fx.go b/common/resource/fx.go index ea4672495d7..9c547ed8638 100644 --- a/common/resource/fx.go +++ b/common/resource/fx.go @@ -97,6 +97,7 @@ var Module = fx.Options( fx.Provide(HostNameProvider), fx.Provide(TimeSourceProvider), cluster.MetadataLifetimeHooksModule, + fx.Provide(SearchAttributeMapperProviderProvider), fx.Provide(SearchAttributeProviderProvider), fx.Provide(SearchAttributeManagerProvider), fx.Provide(NamespaceRegistryProvider), @@ -162,6 +163,14 @@ func TimeSourceProvider() clock.TimeSource { return clock.NewRealTimeSource() } +func SearchAttributeMapperProviderProvider( + saMapper searchattribute.Mapper, +) searchattribute.MapperProvider { + return searchattribute.NewMapperProvider( + saMapper, + ) +} + func SearchAttributeProviderProvider( timeSource clock.TimeSource, cmMgr persistence.ClusterMetadataManager, diff --git a/common/resourcetest/resourceTest.go b/common/resourcetest/resourceTest.go index 6f165a95a21..488e82fcec8 100644 --- a/common/resourcetest/resourceTest.go +++ b/common/resourcetest/resourceTest.go @@ -63,11 +63,11 @@ import ( type ( // Test is the test implementation used for testing Test struct { - MetricsScope tally.Scope - ClusterMetadata *cluster.MockMetadata - SearchAttributesProvider *searchattribute.MockProvider - SearchAttributesManager *searchattribute.MockManager - SearchAttributesMapper *searchattribute.MockMapper + MetricsScope tally.Scope + ClusterMetadata *cluster.MockMetadata + SearchAttributesProvider *searchattribute.MockProvider + SearchAttributesManager *searchattribute.MockManager + SearchAttributesMapperProvider *searchattribute.MockMapperProvider // other common resources @@ -174,11 +174,11 @@ func NewTest( ) return &Test{ - MetricsScope: scope, - ClusterMetadata: cluster.NewMockMetadata(controller), - SearchAttributesProvider: searchattribute.NewMockProvider(controller), - SearchAttributesManager: searchattribute.NewMockManager(controller), - SearchAttributesMapper: searchattribute.NewMockMapper(controller), + MetricsScope: scope, + ClusterMetadata: cluster.NewMockMetadata(controller), + SearchAttributesProvider: searchattribute.NewMockProvider(controller), + SearchAttributesManager: searchattribute.NewMockManager(controller), + SearchAttributesMapperProvider: searchattribute.NewMockMapperProvider(controller), // other common resources @@ -439,8 +439,8 @@ func (t *Test) GetSearchAttributesManager() searchattribute.Manager { return t.SearchAttributesManager } -func (t *Test) GetSearchAttributesMapper() searchattribute.Mapper { - return t.SearchAttributesMapper +func (t *Test) GetSearchAttributesMapperProvider() searchattribute.MapperProvider { + return t.SearchAttributesMapperProvider } func (t *Test) RefreshNamespaceCache() { diff --git a/common/searchattribute/mapper.go b/common/searchattribute/mapper.go index d77058d0357..2666462c031 100644 --- a/common/searchattribute/mapper.go +++ b/common/searchattribute/mapper.go @@ -29,6 +29,7 @@ package searchattribute import ( commonpb "go.temporal.io/api/common/v1" "go.temporal.io/api/serviceerror" + "go.temporal.io/server/common/namespace" ) type ( @@ -39,11 +40,54 @@ type ( GetAlias(fieldName string, namespace string) (string, error) GetFieldName(alias string, namespace string) (string, error) } + + noopMapper struct{} + + MapperProvider interface { + GetMapper(nsName namespace.Name) (Mapper, error) + } + + mapperProviderImpl struct { + customMapper Mapper + } ) +var _ Mapper = (*noopMapper)(nil) +var _ Mapper = (*namespace.CustomSearchAttributesMapper)(nil) +var _ MapperProvider = (*mapperProviderImpl)(nil) + +func (m *noopMapper) GetAlias(fieldName string, _ string) (string, error) { + return fieldName, nil +} + +func (m *noopMapper) GetFieldName(alias string, _ string) (string, error) { + return alias, nil +} + +func NewMapperProvider( + customMapper Mapper, +) MapperProvider { + return &mapperProviderImpl{ + customMapper: customMapper, + } +} + +func (m *mapperProviderImpl) GetMapper(_ namespace.Name) (Mapper, error) { + return m.customMapper, nil +} + // AliasFields returns SearchAttributes struct where each search attribute name is replaced with alias. // If no replacement where made, it returns nil which means that original SearchAttributes struct should be used. -func AliasFields(mapper Mapper, searchAttributes *commonpb.SearchAttributes, namespace string) (*commonpb.SearchAttributes, error) { +func AliasFields( + mapperProvider MapperProvider, + searchAttributes *commonpb.SearchAttributes, + namespaceName string, +) (*commonpb.SearchAttributes, error) { + mapper, err := mapperProvider.GetMapper(namespace.Name(namespaceName)) + if err != nil { + return nil, err + } + if len(searchAttributes.GetIndexedFields()) == 0 || mapper == nil { return nil, nil } @@ -56,7 +100,7 @@ func AliasFields(mapper Mapper, searchAttributes *commonpb.SearchAttributes, nam continue } - aliasName, err := mapper.GetAlias(saName, namespace) + aliasName, err := mapper.GetAlias(saName, namespaceName) if err != nil { if _, isInvalidArgument := err.(*serviceerror.InvalidArgument); isInvalidArgument { // Silently ignore serviceerror.InvalidArgument because it indicates unmapped field (alias was deleted, for example). @@ -81,7 +125,16 @@ func AliasFields(mapper Mapper, searchAttributes *commonpb.SearchAttributes, nam // UnaliasFields returns SearchAttributes struct where each search attribute alias is replaced with field name. // If no replacement where made, it returns nil which means that original SearchAttributes struct should be used. -func UnaliasFields(mapper Mapper, searchAttributes *commonpb.SearchAttributes, namespace string) (*commonpb.SearchAttributes, error) { +func UnaliasFields( + mapperProvider MapperProvider, + searchAttributes *commonpb.SearchAttributes, + namespaceName string, +) (*commonpb.SearchAttributes, error) { + mapper, err := mapperProvider.GetMapper(namespace.Name(namespaceName)) + if err != nil { + return nil, err + } + if len(searchAttributes.GetIndexedFields()) == 0 || mapper == nil { return nil, nil } @@ -94,7 +147,7 @@ func UnaliasFields(mapper Mapper, searchAttributes *commonpb.SearchAttributes, n continue } - fieldName, err := mapper.GetFieldName(saName, namespace) + fieldName, err := mapper.GetFieldName(saName, namespaceName) if err != nil { return nil, err } diff --git a/common/searchattribute/mapper_mock.go b/common/searchattribute/mapper_mock.go index 55274cdf664..3a2eb99c074 100644 --- a/common/searchattribute/mapper_mock.go +++ b/common/searchattribute/mapper_mock.go @@ -32,6 +32,7 @@ import ( reflect "reflect" gomock "github.com/golang/mock/gomock" + namespace "go.temporal.io/server/common/namespace" ) // MockMapper is a mock of Mapper interface. @@ -86,3 +87,41 @@ func (mr *MockMapperMockRecorder) GetFieldName(alias, namespace interface{}) *go mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetFieldName", reflect.TypeOf((*MockMapper)(nil).GetFieldName), alias, namespace) } + +// MockMapperProvider is a mock of MapperProvider interface. +type MockMapperProvider struct { + ctrl *gomock.Controller + recorder *MockMapperProviderMockRecorder +} + +// MockMapperProviderMockRecorder is the mock recorder for MockMapperProvider. +type MockMapperProviderMockRecorder struct { + mock *MockMapperProvider +} + +// NewMockMapperProvider creates a new mock instance. +func NewMockMapperProvider(ctrl *gomock.Controller) *MockMapperProvider { + mock := &MockMapperProvider{ctrl: ctrl} + mock.recorder = &MockMapperProviderMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockMapperProvider) EXPECT() *MockMapperProviderMockRecorder { + return m.recorder +} + +// GetMapper mocks base method. +func (m *MockMapperProvider) GetMapper(nsName namespace.Name) (Mapper, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMapper", nsName) + ret0, _ := ret[0].(Mapper) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetMapper indicates an expected call of GetMapper. +func (mr *MockMapperProviderMockRecorder) GetMapper(nsName interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMapper", reflect.TypeOf((*MockMapperProvider)(nil).GetMapper), nsName) +} diff --git a/common/searchattribute/mapper_test.go b/common/searchattribute/mapper_test.go index 519e9e11206..c873ec84c7a 100644 --- a/common/searchattribute/mapper_test.go +++ b/common/searchattribute/mapper_test.go @@ -25,7 +25,6 @@ package searchattribute import ( - "strings" "testing" "github.com/stretchr/testify/assert" @@ -33,90 +32,52 @@ import ( "go.temporal.io/api/serviceerror" ) -type ( - TestMapper struct { - } -) - -func (t *TestMapper) GetAlias(fieldName string, namespace string) (string, error) { - if fieldName == "wrong_field" { - // This error must be always ignored. - return "", serviceerror.NewInvalidArgument("unmapped field") - } - if namespace == "error-namespace" { - return "", serviceerror.NewInternal("mapper error") - } else if namespace == "test-namespace" { - if fieldName == "pass-through" { - return fieldName, nil - } - - return "alias_of_" + fieldName, nil - } - - // This error must be always ignored. - return "", serviceerror.NewInvalidArgument("unknown namespace") -} - -func (t *TestMapper) GetFieldName(alias string, namespace string) (string, error) { - if alias == "wrong_alias" { - // This error must be always ignored. - return "", serviceerror.NewInvalidArgument("unmapped alias") - } - if namespace == "error-namespace" { - return "", serviceerror.NewInternal("mapper error") - } else if namespace == "test-namespace" { - if alias == "pass-through" { - return alias, nil - } - return strings.TrimPrefix(alias, "alias_of_"), nil - } - return "", serviceerror.NewInvalidArgument("unknown namespace") -} - func Test_AliasFields(t *testing.T) { + mapperProvider := NewTestMapperProvider(&TestMapper{}) + sa := &commonpb.SearchAttributes{ IndexedFields: map[string]*commonpb.Payload{ - "field1": {Data: []byte("data1")}, + "Field1": {Data: []byte("data1")}, "wrong_field": {Data: []byte("data23")}, // Wrong unknown name must be ignored. }, } - _, err := AliasFields(&TestMapper{}, sa, "error-namespace") + _, err := AliasFields(mapperProvider, sa, "error-namespace") assert.Error(t, err) var internalErr *serviceerror.Internal assert.ErrorAs(t, err, &internalErr) sa = &commonpb.SearchAttributes{ IndexedFields: map[string]*commonpb.Payload{ - "field1": {Data: []byte("data1")}, + "Field1": {Data: []byte("data1")}, "wrong_field": {Data: []byte("data23")}, // Wrong unknown name must be ignored. }, } - sa, err = AliasFields(&TestMapper{}, sa, "unknown-namespace") + sa, err = AliasFields(mapperProvider, sa, "unknown-namespace") assert.NoError(t, err) assert.Nil(t, sa) sa = &commonpb.SearchAttributes{ IndexedFields: map[string]*commonpb.Payload{ - "field1": {Data: []byte("data1")}, - "field2": {Data: []byte("data2")}, + "Field1": {Data: []byte("data1")}, + "Field2": {Data: []byte("data2")}, "wrong_field": {Data: []byte("data23")}, // Wrong unknown name must be ignored. }, } - sa, err = AliasFields(&TestMapper{}, sa, "test-namespace") + sa, err = AliasFields(mapperProvider, sa, "test-namespace") assert.NoError(t, err) assert.NotNil(t, sa) assert.Len(t, sa.GetIndexedFields(), 2) - assert.EqualValues(t, "data1", sa.GetIndexedFields()["alias_of_field1"].GetData()) - assert.EqualValues(t, "data2", sa.GetIndexedFields()["alias_of_field2"].GetData()) + assert.EqualValues(t, "data1", sa.GetIndexedFields()["AliasForField1"].GetData()) + assert.EqualValues(t, "data2", sa.GetIndexedFields()["AliasForField2"].GetData()) // Empty search attributes are not validated with mapper. sa = &commonpb.SearchAttributes{ IndexedFields: nil, } - sa, err = AliasFields(&TestMapper{}, sa, "error-namespace") + sa, err = AliasFields(mapperProvider, sa, "error-namespace") assert.NoError(t, err) assert.Nil(t, sa) - sa, err = AliasFields(&TestMapper{}, sa, "unknown-namespace") + sa, err = AliasFields(mapperProvider, sa, "unknown-namespace") assert.NoError(t, err) assert.Nil(t, sa) @@ -126,54 +87,56 @@ func Test_AliasFields(t *testing.T) { "pass-through": {Data: []byte("data1")}, }, } - sa, err = AliasFields(&TestMapper{}, sa, "test-namespace") + sa, err = AliasFields(mapperProvider, sa, "test-namespace") assert.NoError(t, err) assert.Nil(t, sa) } func Test_UnaliasFields(t *testing.T) { + mapperProvider := NewTestMapperProvider(&TestMapper{}) + sa := &commonpb.SearchAttributes{ IndexedFields: map[string]*commonpb.Payload{ - "alias_of_field1": {Data: []byte("data1")}, + "AliasForField1": {Data: []byte("data1")}, }, } - _, err := UnaliasFields(&TestMapper{}, sa, "error-namespace") + _, err := UnaliasFields(mapperProvider, sa, "error-namespace") assert.Error(t, err) var internalErr *serviceerror.Internal assert.ErrorAs(t, err, &internalErr) sa = &commonpb.SearchAttributes{ IndexedFields: map[string]*commonpb.Payload{ - "alias_of_field1": {Data: []byte("data1")}, - "alias_of_field2": {Data: []byte("data2")}, + "AliasForField1": {Data: []byte("data1")}, + "AliasForField2": {Data: []byte("data2")}, }, } - _, err = UnaliasFields(&TestMapper{}, sa, "unknown-namespace") + _, err = UnaliasFields(mapperProvider, sa, "unknown-namespace") assert.Error(t, err) var invalidArgumentErr *serviceerror.InvalidArgument assert.ErrorAs(t, err, &invalidArgumentErr) sa = &commonpb.SearchAttributes{ IndexedFields: map[string]*commonpb.Payload{ - "alias_of_field1": {Data: []byte("data1")}, - "alias_of_field2": {Data: []byte("data2")}, + "AliasForField1": {Data: []byte("data1")}, + "AliasForField2": {Data: []byte("data2")}, }, } - sa, err = UnaliasFields(&TestMapper{}, sa, "test-namespace") + sa, err = UnaliasFields(mapperProvider, sa, "test-namespace") assert.NoError(t, err) assert.NotNil(t, sa) assert.Len(t, sa.GetIndexedFields(), 2) - assert.EqualValues(t, "data1", sa.GetIndexedFields()["field1"].GetData()) - assert.EqualValues(t, "data2", sa.GetIndexedFields()["field2"].GetData()) + assert.EqualValues(t, "data1", sa.GetIndexedFields()["Field1"].GetData()) + assert.EqualValues(t, "data2", sa.GetIndexedFields()["Field2"].GetData()) sa = &commonpb.SearchAttributes{ IndexedFields: map[string]*commonpb.Payload{ - "alias_of_field1": {Data: []byte("data1")}, - "alias_of_field2": {Data: []byte("data2")}, - "wrong_alias": {Data: []byte("data3")}, + "AliasForField1": {Data: []byte("data1")}, + "AliasForField2": {Data: []byte("data2")}, + "wrong_alias": {Data: []byte("data3")}, }, } - _, err = UnaliasFields(&TestMapper{}, sa, "test-namespace") + _, err = UnaliasFields(mapperProvider, sa, "test-namespace") assert.Error(t, err) assert.ErrorAs(t, err, &invalidArgumentErr) @@ -181,10 +144,10 @@ func Test_UnaliasFields(t *testing.T) { sa = &commonpb.SearchAttributes{ IndexedFields: nil, } - sa, err = UnaliasFields(&TestMapper{}, sa, "error-namespace") + sa, err = UnaliasFields(mapperProvider, sa, "error-namespace") assert.NoError(t, err) assert.Nil(t, sa) - sa, err = UnaliasFields(&TestMapper{}, sa, "unknown-namespace") + sa, err = UnaliasFields(mapperProvider, sa, "unknown-namespace") assert.NoError(t, err) assert.Nil(t, sa) @@ -194,7 +157,7 @@ func Test_UnaliasFields(t *testing.T) { "pass-through": {Data: []byte("data1")}, }, } - sa, err = UnaliasFields(&TestMapper{}, sa, "test-namespace") + sa, err = UnaliasFields(mapperProvider, sa, "test-namespace") assert.NoError(t, err) assert.Nil(t, sa) } diff --git a/common/searchattribute/test_provider.go b/common/searchattribute/test_provider.go index fd99c131f3f..b4815d17805 100644 --- a/common/searchattribute/test_provider.go +++ b/common/searchattribute/test_provider.go @@ -25,13 +25,23 @@ package searchattribute import ( + "strings" + enumspb "go.temporal.io/api/enums/v1" + "go.temporal.io/api/serviceerror" ) type ( TestProvider struct{} + + TestMapper struct { + Namespace string + } ) +var _ Provider = (*TestProvider)(nil) +var _ Mapper = (*TestMapper)(nil) + var ( TestNameTypeMap = NameTypeMap{ customSearchAttributes: map[string]enumspb.IndexedValueType{ @@ -52,3 +62,45 @@ func NewTestProvider() *TestProvider { func (s *TestProvider) GetSearchAttributes(_ string, _ bool) (NameTypeMap, error) { return TestNameTypeMap, nil } + +func (t *TestMapper) GetAlias(fieldName string, namespace string) (string, error) { + if fieldName == "wrong_field" { + // This error must be always ignored. + return "", serviceerror.NewInvalidArgument("unmapped field") + } + if namespace == "error-namespace" { + return "", serviceerror.NewInternal("mapper error") + } else if namespace == "test-namespace" || namespace == t.Namespace { + if fieldName == "pass-through" { + return fieldName, nil + } + + return "AliasFor" + fieldName, nil + } + + // This error must be always ignored. + return "", serviceerror.NewInvalidArgument("unknown namespace") +} + +func (t *TestMapper) GetFieldName(alias string, namespace string) (string, error) { + if alias == "wrong_alias" { + // This error must be always ignored. + return "", serviceerror.NewInvalidArgument("unmapped alias") + } + if namespace == "error-namespace" { + return "", serviceerror.NewInternal("mapper error") + } else if namespace == "test-namespace" || namespace == t.Namespace { + if alias == "pass-through" { + return alias, nil + } + if strings.HasPrefix(alias, "AliasFor") { + return strings.TrimPrefix(alias, "AliasFor"), nil + } + return "", serviceerror.NewInvalidArgument("mapper error") + } + return "", serviceerror.NewInvalidArgument("unknown namespace") +} + +func NewTestMapperProvider(customMapper Mapper) MapperProvider { + return NewMapperProvider(customMapper) +} diff --git a/common/searchattribute/validator.go b/common/searchattribute/validator.go index cc03143508a..89141a39405 100644 --- a/common/searchattribute/validator.go +++ b/common/searchattribute/validator.go @@ -32,6 +32,7 @@ import ( "go.temporal.io/api/serviceerror" "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/payload" ) @@ -39,7 +40,7 @@ type ( // Validator is used to validate search attributes Validator struct { searchAttributesProvider Provider - searchAttributesMapper Mapper + searchAttributesMapperProvider MapperProvider searchAttributesNumberOfKeysLimit dynamicconfig.IntPropertyFnWithNamespaceFilter searchAttributesSizeOfValueLimit dynamicconfig.IntPropertyFnWithNamespaceFilter searchAttributesTotalSizeLimit dynamicconfig.IntPropertyFnWithNamespaceFilter @@ -50,7 +51,7 @@ type ( // NewValidator create Validator func NewValidator( searchAttributesProvider Provider, - searchAttributesMapper Mapper, + searchAttributesMapperProvider MapperProvider, searchAttributesNumberOfKeysLimit dynamicconfig.IntPropertyFnWithNamespaceFilter, searchAttributesSizeOfValueLimit dynamicconfig.IntPropertyFnWithNamespaceFilter, searchAttributesTotalSizeLimit dynamicconfig.IntPropertyFnWithNamespaceFilter, @@ -58,7 +59,7 @@ func NewValidator( ) *Validator { return &Validator{ searchAttributesProvider: searchAttributesProvider, - searchAttributesMapper: searchAttributesMapper, + searchAttributesMapperProvider: searchAttributesMapperProvider, searchAttributesNumberOfKeysLimit: searchAttributesNumberOfKeysLimit, searchAttributesSizeOfValueLimit: searchAttributesSizeOfValueLimit, searchAttributesTotalSizeLimit: searchAttributesTotalSizeLimit, @@ -181,9 +182,15 @@ func (v *Validator) validationError(msg string, saFieldName string, namespace st return serviceerror.NewInvalidArgument(fmt.Sprintf(msg, saAlias)) } -func (v *Validator) getAlias(saFieldName string, namespace string) (string, error) { - if IsMappable(saFieldName) && v.searchAttributesMapper != nil { - return v.searchAttributesMapper.GetAlias(saFieldName, namespace) +func (v *Validator) getAlias(saFieldName string, namespaceName string) (string, error) { + if IsMappable(saFieldName) { + mapper, err := v.searchAttributesMapperProvider.GetMapper(namespace.Name(namespaceName)) + if err != nil { + return "", err + } + if mapper != nil { + return mapper.GetAlias(saFieldName, namespaceName) + } } return saFieldName, nil } diff --git a/common/searchattribute/validator_test.go b/common/searchattribute/validator_test.go index 388996fd7d9..9df4d1b82ee 100644 --- a/common/searchattribute/validator_test.go +++ b/common/searchattribute/validator_test.go @@ -50,7 +50,7 @@ func (s *searchAttributesValidatorSuite) TestSearchAttributesValidate() { saValidator := NewValidator( NewTestProvider(), - nil, + NewTestMapperProvider(nil), dynamicconfig.GetIntPropertyFilteredByNamespace(numOfKeysLimit), dynamicconfig.GetIntPropertyFilteredByNamespace(sizeOfValueLimit), dynamicconfig.GetIntPropertyFilteredByNamespace(sizeOfTotalLimit), @@ -126,7 +126,7 @@ func (s *searchAttributesValidatorSuite) TestSearchAttributesValidate_Mapper() { saValidator := NewValidator( NewTestProvider(), - &TestMapper{}, + NewTestMapperProvider(&TestMapper{}), dynamicconfig.GetIntPropertyFilteredByNamespace(numOfKeysLimit), dynamicconfig.GetIntPropertyFilteredByNamespace(sizeOfValueLimit), dynamicconfig.GetIntPropertyFilteredByNamespace(sizeOfTotalLimit), @@ -165,7 +165,7 @@ func (s *searchAttributesValidatorSuite) TestSearchAttributesValidate_Mapper() { attr.IndexedFields = fields err = saValidator.Validate(attr, namespace) s.Error(err) - s.Equal("search attribute alias_of_InvalidKey is not defined", err.Error()) + s.Equal("search attribute AliasForInvalidKey is not defined", err.Error()) err = saValidator.Validate(attr, "error-namespace") s.Error(err) @@ -178,7 +178,7 @@ func (s *searchAttributesValidatorSuite) TestSearchAttributesValidate_Mapper() { attr.IndexedFields = fields err = saValidator.Validate(attr, namespace) s.Error(err) - s.Equal("invalid value for search attribute alias_of_CustomBoolField of type Bool: 123", err.Error()) + s.Equal("invalid value for search attribute AliasForCustomBoolField of type Bool: 123", err.Error()) } func (s *searchAttributesValidatorSuite) TestSearchAttributesValidateSize() { @@ -188,7 +188,7 @@ func (s *searchAttributesValidatorSuite) TestSearchAttributesValidateSize() { saValidator := NewValidator( NewTestProvider(), - nil, + NewTestMapperProvider(nil), dynamicconfig.GetIntPropertyFilteredByNamespace(numOfKeysLimit), dynamicconfig.GetIntPropertyFilteredByNamespace(sizeOfValueLimit), dynamicconfig.GetIntPropertyFilteredByNamespace(sizeOfTotalLimit), @@ -226,7 +226,7 @@ func (s *searchAttributesValidatorSuite) TestSearchAttributesValidateSize_Mapper saValidator := NewValidator( NewTestProvider(), - &TestMapper{}, + NewTestMapperProvider(&TestMapper{}), dynamicconfig.GetIntPropertyFilteredByNamespace(numOfKeysLimit), dynamicconfig.GetIntPropertyFilteredByNamespace(sizeOfValueLimit), dynamicconfig.GetIntPropertyFilteredByNamespace(sizeOfTotalLimit), @@ -245,7 +245,7 @@ func (s *searchAttributesValidatorSuite) TestSearchAttributesValidateSize_Mapper attr.IndexedFields = fields err := saValidator.ValidateSize(attr, namespace) s.Error(err) - s.Equal("search attribute alias_of_CustomKeywordField value size 8 exceeds size limit 5", err.Error()) + s.Equal("search attribute AliasForCustomKeywordField value size 8 exceeds size limit 5", err.Error()) fields = map[string]*commonpb.Payload{ "CustomKeywordField": payload.EncodeString("123"), diff --git a/service/frontend/dcRedirectionHandler_test.go b/service/frontend/dcRedirectionHandler_test.go index 3d05c449175..cda07eb9470 100644 --- a/service/frontend/dcRedirectionHandler_test.go +++ b/service/frontend/dcRedirectionHandler_test.go @@ -126,7 +126,7 @@ func (s *dcRedirectionHandlerSuite) SetupTest() { s.mockResource.GetArchiverProvider(), s.mockResource.GetPayloadSerializer(), s.mockResource.GetNamespaceRegistry(), - s.mockResource.GetSearchAttributesMapper(), + s.mockResource.GetSearchAttributesMapperProvider(), s.mockResource.GetSearchAttributesProvider(), s.mockResource.GetClusterMetadata(), s.mockResource.GetArchivalMetadata(), diff --git a/service/frontend/fx.go b/service/frontend/fx.go index c1c880f0a07..f6251c0f5bf 100644 --- a/service/frontend/fx.go +++ b/service/frontend/fx.go @@ -368,7 +368,7 @@ func VisibilityManagerProvider( esConfig *esclient.Config, esClient esclient.Client, persistenceServiceResolver resolver.ServiceResolver, - searchAttributesMapper searchattribute.Mapper, + searchAttributesMapperProvider searchattribute.MapperProvider, saProvider searchattribute.Provider, ) (manager.VisibilityManager, error) { return visibility.NewManager( @@ -379,7 +379,7 @@ func VisibilityManagerProvider( esClient, nil, // frontend visibility never write saProvider, - searchAttributesMapper, + searchAttributesMapperProvider, serviceConfig.StandardVisibilityPersistenceMaxReadQPS, serviceConfig.StandardVisibilityPersistenceMaxWriteQPS, serviceConfig.AdvancedVisibilityPersistenceMaxReadQPS, @@ -525,7 +525,7 @@ func HandlerProvider( payloadSerializer serialization.Serializer, timeSource clock.TimeSource, namespaceRegistry namespace.Registry, - saMapper searchattribute.Mapper, + saMapperProvider searchattribute.MapperProvider, saProvider searchattribute.Provider, clusterMetadata cluster.Metadata, archivalMetadata archiver.ArchivalMetadata, @@ -545,7 +545,7 @@ func HandlerProvider( archiverProvider, payloadSerializer, namespaceRegistry, - saMapper, + saMapperProvider, saProvider, clusterMetadata, archivalMetadata, diff --git a/service/frontend/workflow_handler.go b/service/frontend/workflow_handler.go index dba9f97d136..b147ca0e943 100644 --- a/service/frontend/workflow_handler.go +++ b/service/frontend/workflow_handler.go @@ -115,7 +115,7 @@ type ( archiverProvider provider.ArchiverProvider payloadSerializer serialization.Serializer namespaceRegistry namespace.Registry - saMapper searchattribute.Mapper + saMapperProvider searchattribute.MapperProvider saProvider searchattribute.Provider saValidator *searchattribute.Validator archivalMetadata archiver.ArchivalMetadata @@ -139,7 +139,7 @@ func NewWorkflowHandler( archiverProvider provider.ArchiverProvider, payloadSerializer serialization.Serializer, namespaceRegistry namespace.Registry, - saMapper searchattribute.Mapper, + saMapperProvider searchattribute.MapperProvider, saProvider searchattribute.Provider, clusterMetadata cluster.Metadata, archivalMetadata archiver.ArchivalMetadata, @@ -175,10 +175,10 @@ func NewWorkflowHandler( payloadSerializer: payloadSerializer, namespaceRegistry: namespaceRegistry, saProvider: saProvider, - saMapper: saMapper, + saMapperProvider: saMapperProvider, saValidator: searchattribute.NewValidator( saProvider, - saMapper, + saMapperProvider, config.SearchAttributesNumberOfKeysLimit, config.SearchAttributesSizeOfValueLimit, config.SearchAttributesTotalSizeLimit, @@ -2765,7 +2765,7 @@ func (wh *WorkflowHandler) DescribeWorkflowExecution(ctx context.Context, reques return nil, serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetSearchAttributesMessage, err)) } searchattribute.ApplyTypeMap(response.GetWorkflowExecutionInfo().GetSearchAttributes(), saTypeMap) - aliasedSas, err := searchattribute.AliasFields(wh.saMapper, response.GetWorkflowExecutionInfo().GetSearchAttributes(), request.GetNamespace()) + aliasedSas, err := searchattribute.AliasFields(wh.saMapperProvider, response.GetWorkflowExecutionInfo().GetSearchAttributes(), request.GetNamespace()) if err != nil { return nil, err } @@ -3112,7 +3112,7 @@ func (wh *WorkflowHandler) DescribeSchedule(ctx context.Context, request *workfl return nil, serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetSearchAttributesMessage, err)) } searchattribute.ApplyTypeMap(sas, saTypeMap) - aliasedSas, err := searchattribute.AliasFields(wh.saMapper, sas, request.GetNamespace()) + aliasedSas, err := searchattribute.AliasFields(wh.saMapperProvider, sas, request.GetNamespace()) if err != nil { return nil, err } @@ -3156,7 +3156,7 @@ func (wh *WorkflowHandler) DescribeSchedule(ctx context.Context, request *workfl return serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetSearchAttributesMessage, err)) } searchattribute.ApplyTypeMap(sa, saTypeMap) - aliasedSas, err := searchattribute.AliasFields(wh.saMapper, sa, request.Namespace) + aliasedSas, err := searchattribute.AliasFields(wh.saMapperProvider, sa, request.Namespace) if err != nil { return err } @@ -4259,7 +4259,7 @@ func (wh *WorkflowHandler) processOutgoingSearchAttributes(events []*historypb.H } if searchAttributes != nil { searchattribute.ApplyTypeMap(searchAttributes, saTypeMap) - aliasedSas, err := searchattribute.AliasFields(wh.saMapper, searchAttributes, namespace.String()) + aliasedSas, err := searchattribute.AliasFields(wh.saMapperProvider, searchAttributes, namespace.String()) if err != nil { return err } @@ -4877,7 +4877,7 @@ func getBatchOperationState(workflowState enumspb.WorkflowExecutionStatus) enums } func (wh *WorkflowHandler) unaliasStartWorkflowExecutionRequestSearchAttributes(request *workflowservice.StartWorkflowExecutionRequest, namespaceName namespace.Name) (*workflowservice.StartWorkflowExecutionRequest, error) { - unaliasedSas, err := searchattribute.UnaliasFields(wh.saMapper, request.GetSearchAttributes(), namespaceName.String()) + unaliasedSas, err := searchattribute.UnaliasFields(wh.saMapperProvider, request.GetSearchAttributes(), namespaceName.String()) if err != nil { return nil, err } @@ -4892,7 +4892,7 @@ func (wh *WorkflowHandler) unaliasStartWorkflowExecutionRequestSearchAttributes( } func (wh *WorkflowHandler) unaliasSignalWithStartWorkflowExecutionRequestSearchAttributes(request *workflowservice.SignalWithStartWorkflowExecutionRequest, namespaceName namespace.Name) (*workflowservice.SignalWithStartWorkflowExecutionRequest, error) { - unaliasedSas, err := searchattribute.UnaliasFields(wh.saMapper, request.GetSearchAttributes(), namespaceName.String()) + unaliasedSas, err := searchattribute.UnaliasFields(wh.saMapperProvider, request.GetSearchAttributes(), namespaceName.String()) if err != nil { return nil, err } @@ -4907,13 +4907,13 @@ func (wh *WorkflowHandler) unaliasSignalWithStartWorkflowExecutionRequestSearchA } func (wh *WorkflowHandler) unaliasCreateScheduleRequestSearchAttributes(request *workflowservice.CreateScheduleRequest, namespaceName namespace.Name) (*workflowservice.CreateScheduleRequest, error) { - unaliasedSas, err := searchattribute.UnaliasFields(wh.saMapper, request.GetSearchAttributes(), namespaceName.String()) + unaliasedSas, err := searchattribute.UnaliasFields(wh.saMapperProvider, request.GetSearchAttributes(), namespaceName.String()) if err != nil { return nil, err } startWorkflow := request.GetSchedule().GetAction().GetStartWorkflow() - unaliasedStartWorkflowSas, err := searchattribute.UnaliasFields(wh.saMapper, startWorkflow.GetSearchAttributes(), namespaceName.String()) + unaliasedStartWorkflowSas, err := searchattribute.UnaliasFields(wh.saMapperProvider, startWorkflow.GetSearchAttributes(), namespaceName.String()) if err != nil { return nil, err } @@ -4949,7 +4949,7 @@ func (wh *WorkflowHandler) unaliasUpdateScheduleRequestStartWorkflowSearchAttrib return request, nil } - unaliasedSas, err := searchattribute.UnaliasFields(wh.saMapper, startWorkflow.GetSearchAttributes(), namespaceName.String()) + unaliasedSas, err := searchattribute.UnaliasFields(wh.saMapperProvider, startWorkflow.GetSearchAttributes(), namespaceName.String()) if err != nil { return nil, err } diff --git a/service/frontend/workflow_handler_test.go b/service/frontend/workflow_handler_test.go index 7cf36193ab3..28484efb39f 100644 --- a/service/frontend/workflow_handler_test.go +++ b/service/frontend/workflow_handler_test.go @@ -94,14 +94,14 @@ type ( suite.Suite *require.Assertions - controller *gomock.Controller - mockResource *resourcetest.Test - mockNamespaceCache *namespace.MockRegistry - mockHistoryClient *historyservicemock.MockHistoryServiceClient - mockClusterMetadata *cluster.MockMetadata - mockSearchAttributesProvider *searchattribute.MockProvider - mockSearchAttributesMapper *searchattribute.MockMapper - mockMatchingClient *matchingservicemock.MockMatchingServiceClient + controller *gomock.Controller + mockResource *resourcetest.Test + mockNamespaceCache *namespace.MockRegistry + mockHistoryClient *historyservicemock.MockHistoryServiceClient + mockClusterMetadata *cluster.MockMetadata + mockSearchAttributesProvider *searchattribute.MockProvider + mockSearchAttributesMapperProvider *searchattribute.MockMapperProvider + mockMatchingClient *matchingservicemock.MockMatchingServiceClient mockProducer *persistence.MockNamespaceReplicationQueue mockMetadataMgr *persistence.MockMetadataManager @@ -144,7 +144,7 @@ func (s *workflowHandlerSuite) SetupTest() { s.mockHistoryClient = s.mockResource.HistoryClient s.mockClusterMetadata = s.mockResource.ClusterMetadata s.mockSearchAttributesProvider = s.mockResource.SearchAttributesProvider - s.mockSearchAttributesMapper = s.mockResource.SearchAttributesMapper + s.mockSearchAttributesMapperProvider = s.mockResource.SearchAttributesMapperProvider s.mockMetadataMgr = s.mockResource.MetadataMgr s.mockExecutionManager = s.mockResource.ExecutionMgr s.mockVisibilityMgr = s.mockResource.VisibilityManager @@ -182,7 +182,7 @@ func (s *workflowHandlerSuite) getWorkflowHandler(config *Config) *WorkflowHandl s.mockResource.GetArchiverProvider(), s.mockResource.GetPayloadSerializer(), s.mockResource.GetNamespaceRegistry(), - s.mockResource.GetSearchAttributesMapper(), + s.mockResource.GetSearchAttributesMapperProvider(), s.mockResource.GetSearchAttributesProvider(), s.mockResource.GetClusterMetadata(), s.mockResource.GetArchivalMetadata(), @@ -401,6 +401,7 @@ func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_NamespaceNotSet wh := s.getWorkflowHandler(config) s.mockNamespaceCache.EXPECT().GetNamespaceID(namespace.EmptyName).Return(namespace.EmptyID, serviceerror.NewNamespaceNotFound("missing-namespace")).AnyTimes() + s.mockSearchAttributesMapperProvider.EXPECT().GetMapper(namespace.EmptyName).Return(nil, nil) startWorkflowExecutionRequest := &workflowservice.StartWorkflowExecutionRequest{ // Namespace: "forget to specify", @@ -1363,7 +1364,7 @@ func (s *workflowHandlerSuite) TestGetArchivedHistory_Success_GetFirstPage() { func (s *workflowHandlerSuite) TestGetHistory() { namespaceID := namespace.ID(uuid.New()) - namespace := namespace.Name("namespace") + namespaceName := namespace.Name("test-namespace") firstEventID := int64(100) nextEventID := int64(102) branchToken := []byte{1} @@ -1406,7 +1407,8 @@ func (s *workflowHandlerSuite) TestGetHistory() { }, nil) s.mockSearchAttributesProvider.EXPECT().GetSearchAttributes(gomock.Any(), false).Return(searchattribute.TestNameTypeMap, nil) - s.mockSearchAttributesMapper.EXPECT().GetAlias("CustomKeywordField", namespace.String()).Return("AliasOfCustomKeyword", nil) + s.mockSearchAttributesMapperProvider.EXPECT().GetMapper(namespaceName). + Return(&searchattribute.TestMapper{}, nil).AnyTimes() wh := s.getWorkflowHandler(s.newConfig()) @@ -1414,7 +1416,7 @@ func (s *workflowHandlerSuite) TestGetHistory() { context.Background(), metrics.NoopMetricsHandler, namespaceID, - namespace, + namespaceName, we, firstEventID, nextEventID, @@ -1427,7 +1429,7 @@ func (s *workflowHandlerSuite) TestGetHistory() { s.NotNil(history) s.Equal([]byte{}, token) - s.EqualValues("Keyword", history.Events[1].GetWorkflowExecutionStartedEventAttributes().GetSearchAttributes().GetIndexedFields()["AliasOfCustomKeyword"].GetMetadata()["type"]) + s.EqualValues("Keyword", history.Events[1].GetWorkflowExecutionStartedEventAttributes().GetSearchAttributes().GetIndexedFields()["AliasForCustomKeywordField"].GetMetadata()["type"]) s.EqualValues(`"random-data"`, history.Events[1].GetWorkflowExecutionStartedEventAttributes().GetSearchAttributes().GetIndexedFields()["TemporalChangeVersion"].GetData()) } diff --git a/service/history/commandChecker_test.go b/service/history/commandChecker_test.go index 6782edd1136..289f827cec2 100644 --- a/service/history/commandChecker_test.go +++ b/service/history/commandChecker_test.go @@ -126,7 +126,7 @@ func (s *commandAttrValidatorSuite) SetupTest() { config, searchattribute.NewValidator( searchattribute.NewTestProvider(), - nil, + searchattribute.NewTestMapperProvider(nil), config.SearchAttributesNumberOfKeysLimit, config.SearchAttributesSizeOfValueLimit, config.SearchAttributesTotalSizeLimit, diff --git a/service/history/fx.go b/service/history/fx.go index 31cbf35e89e..7bb72df3f8b 100644 --- a/service/history/fx.go +++ b/service/history/fx.go @@ -135,7 +135,6 @@ func HandlerProvider(args NewHandlerArgs) *Handler { timeSource: args.TimeSource, namespaceRegistry: args.NamespaceRegistry, saProvider: args.SaProvider, - saMapper: args.SaMapper, clusterMetadata: args.ClusterMetadata, archivalMetadata: args.ArchivalMetadata, hostInfoProvider: args.HostInfoProvider, @@ -247,7 +246,7 @@ func VisibilityManagerProvider( esConfig *esclient.Config, esClient esclient.Client, persistenceServiceResolver resolver.ServiceResolver, - searchAttributesMapper searchattribute.Mapper, + searchAttributesMapperProvider searchattribute.MapperProvider, saProvider searchattribute.Provider, ) (manager.VisibilityManager, error) { return visibility.NewManager( @@ -258,7 +257,7 @@ func VisibilityManagerProvider( esClient, esProcessorConfig, saProvider, - searchAttributesMapper, + searchAttributesMapperProvider, serviceConfig.StandardVisibilityPersistenceMaxReadQPS, serviceConfig.StandardVisibilityPersistenceMaxWriteQPS, serviceConfig.AdvancedVisibilityPersistenceMaxReadQPS, diff --git a/service/history/handler.go b/service/history/handler.go index 74f2912fc31..56e7f03ce1d 100644 --- a/service/history/handler.go +++ b/service/history/handler.go @@ -91,7 +91,6 @@ type ( timeSource clock.TimeSource namespaceRegistry namespace.Registry saProvider searchattribute.Provider - saMapper searchattribute.Mapper clusterMetadata cluster.Metadata archivalMetadata archiver.ArchivalMetadata hostInfoProvider membership.HostInfoProvider @@ -114,7 +113,6 @@ type ( TimeSource clock.TimeSource NamespaceRegistry namespace.Registry SaProvider searchattribute.Provider - SaMapper searchattribute.Mapper ClusterMetadata cluster.Metadata ArchivalMetadata archiver.ArchivalMetadata HostInfoProvider membership.HostInfoProvider diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index e4b467da4e5..7b7e32243be 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -225,7 +225,7 @@ func NewEngineWithShardContext( historyEngImpl.searchAttributesValidator = searchattribute.NewValidator( shard.GetSearchAttributesProvider(), - shard.GetSearchAttributesMapper(), + shard.GetSearchAttributesMapperProvider(), config.SearchAttributesNumberOfKeysLimit, config.SearchAttributesSizeOfValueLimit, config.SearchAttributesTotalSizeLimit, diff --git a/service/history/historyEngine2_test.go b/service/history/historyEngine2_test.go index b84db2a3822..21e3f9f8c4b 100644 --- a/service/history/historyEngine2_test.go +++ b/service/history/historyEngine2_test.go @@ -189,7 +189,7 @@ func (s *engine2Suite) SetupTest() { }, searchAttributesValidator: searchattribute.NewValidator( searchattribute.NewTestProvider(), - s.mockShard.Resource.SearchAttributesMapper, + s.mockShard.Resource.SearchAttributesMapperProvider, s.config.SearchAttributesNumberOfKeysLimit, s.config.SearchAttributesSizeOfValueLimit, s.config.SearchAttributesTotalSizeLimit, @@ -917,9 +917,9 @@ func (s *engine2Suite) TestRespondWorkflowTaskCompleted_StartChildWithSearchAttr return tests.UpdateWorkflowExecutionResponse, nil }) - s.mockShard.Resource.SearchAttributesMapper.EXPECT(). - GetFieldName("AliasForCustomTextField", tests.Namespace.String()).Return("CustomTextField", nil). - Times(1) // one for mapper + s.mockShard.Resource.SearchAttributesMapperProvider.EXPECT(). + GetMapper(tests.Namespace). + Return(&searchattribute.TestMapper{Namespace: tests.Namespace.String()}, nil) _, err := s.historyEngine.RespondWorkflowTaskCompleted(metrics.AddMetricsContext(context.Background()), &historyservice.RespondWorkflowTaskCompletedRequest{ NamespaceId: tests.NamespaceID.String(), @@ -998,6 +998,10 @@ func (s *engine2Suite) TestRespondWorkflowTaskCompleted_StartChildWorkflow_Excee taskTokenBytes, _ := taskToken.Marshal() response := &persistence.GetWorkflowExecutionResponse{State: workflow.TestCloneToProto(ms)} s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(response, nil).AnyTimes() + s.mockShard.Resource.SearchAttributesMapperProvider.EXPECT(). + GetMapper(tests.Namespace). + Return(&searchattribute.TestMapper{Namespace: tests.Namespace.String()}, nil). + AnyTimes() s.historyEngine.shard.GetConfig().NumPendingChildExecutionsLimit = func(namespace string) int { return 5 diff --git a/service/history/historyEngine_test.go b/service/history/historyEngine_test.go index 3fe2ba79bed..c13b857e318 100644 --- a/service/history/historyEngine_test.go +++ b/service/history/historyEngine_test.go @@ -68,6 +68,7 @@ import ( "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/persistence/versionhistory" "go.temporal.io/server/common/primitives/timestamp" + "go.temporal.io/server/common/searchattribute" "go.temporal.io/server/service/history/api" "go.temporal.io/server/service/history/configs" "go.temporal.io/server/service/history/consts" @@ -2269,6 +2270,9 @@ func (s *engineSuite) TestRespondWorkflowTaskCompletedStartChildWorkflowWithAban s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(gwmsResponse, nil) s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any(), gomock.Any()).Return(tests.UpdateWorkflowExecutionResponse, nil) + s.mockShard.Resource.SearchAttributesMapperProvider.EXPECT(). + GetMapper(tests.Namespace). + Return(&searchattribute.TestMapper{Namespace: tests.Namespace.String()}, nil) _, err := s.mockHistoryEngine.RespondWorkflowTaskCompleted(context.Background(), &historyservice.RespondWorkflowTaskCompletedRequest{ NamespaceId: tests.NamespaceID.String(), @@ -2333,6 +2337,9 @@ func (s *engineSuite) TestRespondWorkflowTaskCompletedStartChildWorkflowWithTerm s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(gwmsResponse, nil) s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any(), gomock.Any()).Return(tests.UpdateWorkflowExecutionResponse, nil) + s.mockShard.Resource.SearchAttributesMapperProvider.EXPECT(). + GetMapper(tests.Namespace). + Return(&searchattribute.TestMapper{Namespace: tests.Namespace.String()}, nil) _, err := s.mockHistoryEngine.RespondWorkflowTaskCompleted(context.Background(), &historyservice.RespondWorkflowTaskCompletedRequest{ NamespaceId: tests.NamespaceID.String(), diff --git a/service/history/shard/context.go b/service/history/shard/context.go index 0df1b67d970..5b570b15a09 100644 --- a/service/history/shard/context.go +++ b/service/history/shard/context.go @@ -119,7 +119,7 @@ type ( GetPayloadSerializer() serialization.Serializer GetSearchAttributesProvider() searchattribute.Provider - GetSearchAttributesMapper() searchattribute.Mapper + GetSearchAttributesMapperProvider() searchattribute.MapperProvider GetArchivalMetadata() archiver.ArchivalMetadata Unload() diff --git a/service/history/shard/context_impl.go b/service/history/shard/context_impl.go index 1383c36374a..9c60dceea80 100644 --- a/service/history/shard/context_impl.go +++ b/service/history/shard/context_impl.go @@ -111,7 +111,7 @@ type ( timeSource cclock.TimeSource namespaceRegistry namespace.Registry saProvider searchattribute.Provider - saMapper searchattribute.Mapper + saMapperProvider searchattribute.MapperProvider clusterMetadata cluster.Metadata archivalMetadata archiver.ArchivalMetadata hostInfoProvider membership.HostInfoProvider @@ -1979,7 +1979,7 @@ func newContext( timeSource cclock.TimeSource, namespaceRegistry namespace.Registry, saProvider searchattribute.Provider, - saMapper searchattribute.Mapper, + saMapperProvider searchattribute.MapperProvider, clusterMetadata cluster.Metadata, archivalMetadata archiver.ArchivalMetadata, hostInfoProvider membership.HostInfoProvider, @@ -2006,7 +2006,7 @@ func newContext( timeSource: timeSource, namespaceRegistry: namespaceRegistry, saProvider: saProvider, - saMapper: saMapper, + saMapperProvider: saMapperProvider, clusterMetadata: clusterMetadata, archivalMetadata: archivalMetadata, hostInfoProvider: hostInfoProvider, @@ -2080,8 +2080,8 @@ func (s *ContextImpl) GetSearchAttributesProvider() searchattribute.Provider { return s.saProvider } -func (s *ContextImpl) GetSearchAttributesMapper() searchattribute.Mapper { - return s.saMapper +func (s *ContextImpl) GetSearchAttributesMapperProvider() searchattribute.MapperProvider { + return s.saMapperProvider } func (s *ContextImpl) GetClusterMetadata() cluster.Metadata { diff --git a/service/history/shard/context_mock.go b/service/history/shard/context_mock.go index 5819bbc72f4..413cdcb70fc 100644 --- a/service/history/shard/context_mock.go +++ b/service/history/shard/context_mock.go @@ -522,18 +522,18 @@ func (mr *MockContextMockRecorder) GetReplicatorDLQAckLevel(sourceCluster interf return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetReplicatorDLQAckLevel", reflect.TypeOf((*MockContext)(nil).GetReplicatorDLQAckLevel), sourceCluster) } -// GetSearchAttributesMapper mocks base method. -func (m *MockContext) GetSearchAttributesMapper() searchattribute.Mapper { +// GetSearchAttributesMapperProvider mocks base method. +func (m *MockContext) GetSearchAttributesMapperProvider() searchattribute.MapperProvider { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetSearchAttributesMapper") - ret0, _ := ret[0].(searchattribute.Mapper) + ret := m.ctrl.Call(m, "GetSearchAttributesMapperProvider") + ret0, _ := ret[0].(searchattribute.MapperProvider) return ret0 } -// GetSearchAttributesMapper indicates an expected call of GetSearchAttributesMapper. -func (mr *MockContextMockRecorder) GetSearchAttributesMapper() *gomock.Call { +// GetSearchAttributesMapperProvider indicates an expected call of GetSearchAttributesMapperProvider. +func (mr *MockContextMockRecorder) GetSearchAttributesMapperProvider() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSearchAttributesMapper", reflect.TypeOf((*MockContext)(nil).GetSearchAttributesMapper)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSearchAttributesMapperProvider", reflect.TypeOf((*MockContext)(nil).GetSearchAttributesMapperProvider)) } // GetSearchAttributesProvider mocks base method. diff --git a/service/history/shard/context_testutil.go b/service/history/shard/context_testutil.go index fe6ea6f3fe7..48e215094ad 100644 --- a/service/history/shard/context_testutil.go +++ b/service/history/shard/context_testutil.go @@ -107,7 +107,7 @@ func NewTestContext( persistenceShardManager: resourceTest.GetShardManager(), clientBean: resourceTest.GetClientBean(), saProvider: resourceTest.GetSearchAttributesProvider(), - saMapper: resourceTest.GetSearchAttributesMapper(), + saMapperProvider: resourceTest.GetSearchAttributesMapperProvider(), historyClient: resourceTest.GetHistoryClient(), archivalMetadata: resourceTest.GetArchivalMetadata(), hostInfoProvider: hostInfoProvider, diff --git a/service/history/shard/controller_impl.go b/service/history/shard/controller_impl.go index cfffdbed4f6..c06c46a6603 100644 --- a/service/history/shard/controller_impl.go +++ b/service/history/shard/controller_impl.go @@ -89,7 +89,7 @@ type ( timeSource clock.TimeSource namespaceRegistry namespace.Registry saProvider searchattribute.Provider - saMapper searchattribute.Mapper + saMapperProvider searchattribute.MapperProvider clusterMetadata cluster.Metadata archivalMetadata archiver.ArchivalMetadata hostInfoProvider membership.HostInfoProvider @@ -305,7 +305,7 @@ func (c *ControllerImpl) getOrCreateShardContext(shardID int32) (*ContextImpl, e c.timeSource, c.namespaceRegistry, c.saProvider, - c.saMapper, + c.saMapperProvider, c.clusterMetadata, c.archivalMetadata, c.hostInfoProvider, diff --git a/service/history/shard/controller_test.go b/service/history/shard/controller_test.go index e140fcbfdec..b9d340d1943 100644 --- a/service/history/shard/controller_test.go +++ b/service/history/shard/controller_test.go @@ -101,7 +101,7 @@ func NewTestController( timeSource: resource.GetTimeSource(), namespaceRegistry: resource.GetNamespaceRegistry(), saProvider: resource.GetSearchAttributesProvider(), - saMapper: resource.GetSearchAttributesMapper(), + saMapperProvider: resource.GetSearchAttributesMapperProvider(), clusterMetadata: resource.GetClusterMetadata(), archivalMetadata: resource.GetArchivalMetadata(), hostInfoProvider: hostInfoProvider, diff --git a/service/history/shard/fx.go b/service/history/shard/fx.go index dd89a4b32d9..6e95a018939 100644 --- a/service/history/shard/fx.go +++ b/service/history/shard/fx.go @@ -67,7 +67,7 @@ func ControllerProvider( timeSource clock.TimeSource, namespaceRegistry namespace.Registry, saProvider searchattribute.Provider, - saMapper searchattribute.Mapper, + saMapperProvider searchattribute.MapperProvider, clusterMetadata cluster.Metadata, archivalMetadata archiver.ArchivalMetadata, hostInfoProvider membership.HostInfoProvider, @@ -94,7 +94,7 @@ func ControllerProvider( timeSource: timeSource, namespaceRegistry: namespaceRegistry, saProvider: saProvider, - saMapper: saMapper, + saMapperProvider: saMapperProvider, clusterMetadata: clusterMetadata, archivalMetadata: archivalMetadata, hostInfoProvider: hostInfoProvider, diff --git a/service/history/workflowTaskHandler.go b/service/history/workflowTaskHandler.go index 2ced19fdae8..1a979d1e482 100644 --- a/service/history/workflowTaskHandler.go +++ b/service/history/workflowTaskHandler.go @@ -74,9 +74,9 @@ type ( initiatedChildExecutionsInBatch map[string]struct{} // Set of initiated child executions in the workflow task // validation - attrValidator *commandAttrValidator - sizeLimitChecker *workflowSizeChecker - searchAttributesMapper searchattribute.Mapper + attrValidator *commandAttrValidator + sizeLimitChecker *workflowSizeChecker + searchAttributesMapperProvider searchattribute.MapperProvider logger log.Logger namespaceRegistry namespace.Registry @@ -117,7 +117,7 @@ func newWorkflowTaskHandler( metricsHandler metrics.Handler, config *configs.Config, shard shard.Context, - searchAttributesMapper searchattribute.Mapper, + searchAttributesMapperProvider searchattribute.MapperProvider, hasBufferedEvents bool, ) *workflowTaskHandlerImpl { @@ -135,9 +135,9 @@ func newWorkflowTaskHandler( initiatedChildExecutionsInBatch: make(map[string]struct{}), // validation - attrValidator: attrValidator, - sizeLimitChecker: sizeLimitChecker, - searchAttributesMapper: searchAttributesMapper, + attrValidator: attrValidator, + sizeLimitChecker: sizeLimitChecker, + searchAttributesMapperProvider: searchAttributesMapperProvider, logger: logger, namespaceRegistry: namespaceRegistry, @@ -771,7 +771,7 @@ func (handler *workflowTaskHandlerImpl) handleCommandContinueAsNewWorkflow( namespaceName := handler.mutableState.GetNamespaceEntry().Name() unaliasedSas, err := searchattribute.UnaliasFields( - handler.searchAttributesMapper, + handler.searchAttributesMapperProvider, attr.GetSearchAttributes(), namespaceName.String(), ) @@ -882,7 +882,7 @@ func (handler *workflowTaskHandlerImpl) handleCommandStartChildWorkflow( } unaliasedSas, err := searchattribute.UnaliasFields( - handler.searchAttributesMapper, + handler.searchAttributesMapperProvider, attr.GetSearchAttributes(), targetNamespace.String(), ) @@ -1026,7 +1026,7 @@ func (handler *workflowTaskHandlerImpl) handleCommandUpsertWorkflowSearchAttribu namespace := namespaceEntry.Name() unaliasedSas, err := searchattribute.UnaliasFields( - handler.searchAttributesMapper, + handler.searchAttributesMapperProvider, attr.GetSearchAttributes(), namespace.String(), ) diff --git a/service/history/workflowTaskHandlerCallbacks.go b/service/history/workflowTaskHandlerCallbacks.go index 34f3e91a235..5c9a57b81fc 100644 --- a/service/history/workflowTaskHandlerCallbacks.go +++ b/service/history/workflowTaskHandlerCallbacks.go @@ -73,19 +73,19 @@ type ( } workflowTaskHandlerCallbacksImpl struct { - currentClusterName string - config *configs.Config - shard shard.Context - workflowConsistencyChecker api.WorkflowConsistencyChecker - timeSource clock.TimeSource - namespaceRegistry namespace.Registry - tokenSerializer common.TaskTokenSerializer - metricsHandler metrics.Handler - logger log.Logger - throttledLogger log.Logger - commandAttrValidator *commandAttrValidator - searchAttributesMapper searchattribute.Mapper - searchAttributesValidator *searchattribute.Validator + currentClusterName string + config *configs.Config + shard shard.Context + workflowConsistencyChecker api.WorkflowConsistencyChecker + timeSource clock.TimeSource + namespaceRegistry namespace.Registry + tokenSerializer common.TaskTokenSerializer + metricsHandler metrics.Handler + logger log.Logger + throttledLogger log.Logger + commandAttrValidator *commandAttrValidator + searchAttributesMapperProvider searchattribute.MapperProvider + searchAttributesValidator *searchattribute.Validator } ) @@ -106,8 +106,8 @@ func newWorkflowTaskHandlerCallback(historyEngine *historyEngineImpl) *workflowT historyEngine.config, historyEngine.searchAttributesValidator, ), - searchAttributesMapper: historyEngine.shard.GetSearchAttributesMapper(), - searchAttributesValidator: historyEngine.searchAttributesValidator, + searchAttributesMapperProvider: historyEngine.shard.GetSearchAttributesMapperProvider(), + searchAttributesValidator: historyEngine.searchAttributesValidator, } } @@ -479,7 +479,7 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( handler.metricsHandler, handler.config, handler.shard, - handler.searchAttributesMapper, + handler.searchAttributesMapperProvider, hasBufferedEvents, ) diff --git a/service/history/workflowTaskHandlerCallbacks_test.go b/service/history/workflowTaskHandlerCallbacks_test.go index 97ebafc0fc2..e1cd06df77f 100644 --- a/service/history/workflowTaskHandlerCallbacks_test.go +++ b/service/history/workflowTaskHandlerCallbacks_test.go @@ -120,7 +120,7 @@ func (s *WorkflowTaskHandlerCallbackSuite) SetupTest() { eventNotifier: events.NewNotifier(clock.NewRealTimeSource(), metrics.NoopMetricsHandler, func(namespace.ID, string) int32 { return 1 }), searchAttributesValidator: searchattribute.NewValidator( searchattribute.NewTestProvider(), - mockShard.Resource.SearchAttributesMapper, + mockShard.Resource.SearchAttributesMapperProvider, config.SearchAttributesNumberOfKeysLimit, config.SearchAttributesSizeOfValueLimit, config.SearchAttributesTotalSizeLimit, diff --git a/service/worker/fx.go b/service/worker/fx.go index a446f5446f2..6a7f22367b5 100644 --- a/service/worker/fx.go +++ b/service/worker/fx.go @@ -100,7 +100,7 @@ func VisibilityManagerProvider( esConfig *esclient.Config, esClient esclient.Client, persistenceServiceResolver resolver.ServiceResolver, - searchAttributesMapper searchattribute.Mapper, + searchAttributesMapperProvider searchattribute.MapperProvider, saProvider searchattribute.Provider, ) (manager.VisibilityManager, error) { return visibility.NewManager( @@ -111,7 +111,7 @@ func VisibilityManagerProvider( esClient, nil, // worker visibility never write saProvider, - searchAttributesMapper, + searchAttributesMapperProvider, serviceConfig.StandardVisibilityPersistenceMaxReadQPS, serviceConfig.StandardVisibilityPersistenceMaxWriteQPS, serviceConfig.AdvancedVisibilityPersistenceMaxReadQPS,