Skip to content

Commit

Permalink
Implement searchattribute.MapperProvider (#3873)
Browse files Browse the repository at this point in the history
  • Loading branch information
rodrigozhou authored Jan 31, 2023
1 parent cb22aab commit 750fc54
Show file tree
Hide file tree
Showing 33 changed files with 393 additions and 272 deletions.
14 changes: 7 additions & 7 deletions common/persistence/visibility/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func NewManager(
esClient esclient.Client,
esProcessorConfig *elasticsearch.ProcessorConfig,
searchAttributesProvider searchattribute.Provider,
searchAttributesMapper searchattribute.Mapper,
searchAttributesMapperProvider searchattribute.MapperProvider,

standardVisibilityPersistenceMaxReadQPS dynamicconfig.IntPropertyFn,
standardVisibilityPersistenceMaxWriteQPS dynamicconfig.IntPropertyFn,
Expand Down Expand Up @@ -81,7 +81,7 @@ func NewManager(
esClient,
esProcessorConfig,
searchAttributesProvider,
searchAttributesMapper,
searchAttributesMapperProvider,
advancedVisibilityPersistenceMaxReadQPS,
advancedVisibilityPersistenceMaxWriteQPS,
visibilityDisableOrderByClause,
Expand All @@ -97,7 +97,7 @@ func NewManager(
esClient,
esProcessorConfig,
searchAttributesProvider,
searchAttributesMapper,
searchAttributesMapperProvider,
advancedVisibilityPersistenceMaxReadQPS,
advancedVisibilityPersistenceMaxWriteQPS,
visibilityDisableOrderByClause,
Expand Down Expand Up @@ -187,7 +187,7 @@ func NewAdvancedManager(
esClient esclient.Client,
esProcessorConfig *elasticsearch.ProcessorConfig,
searchAttributesProvider searchattribute.Provider,
searchAttributesMapper searchattribute.Mapper,
searchAttributesMapperProvider searchattribute.MapperProvider,

advancedVisibilityPersistenceMaxReadQPS dynamicconfig.IntPropertyFn,
advancedVisibilityPersistenceMaxWriteQPS dynamicconfig.IntPropertyFn,
Expand All @@ -205,7 +205,7 @@ func NewAdvancedManager(
esClient,
esProcessorConfig,
searchAttributesProvider,
searchAttributesMapper,
searchAttributesMapperProvider,
visibilityDisableOrderByClause,
metricsHandler,
logger)
Expand Down Expand Up @@ -289,7 +289,7 @@ func newAdvancedVisibilityStore(
esClient esclient.Client,
esProcessorConfig *elasticsearch.ProcessorConfig,
searchAttributesProvider searchattribute.Provider,
searchAttributesMapper searchattribute.Mapper,
searchAttributesMapperProvider searchattribute.MapperProvider,
visibilityDisableOrderByClause dynamicconfig.BoolPropertyFn,
metricsHandler metrics.Handler,
logger log.Logger,
Expand All @@ -311,7 +311,7 @@ func newAdvancedVisibilityStore(
esClient,
defaultIndexName,
searchAttributesProvider,
searchAttributesMapper,
searchAttributesMapperProvider,
esProcessor,
esProcessorAckTimeout,
visibilityDisableOrderByClause,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ import (

type (
nameInterceptor struct {
namespace namespace.Name
index string
searchAttributesTypeMap searchattribute.NameTypeMap
searchAttributesMapper searchattribute.Mapper
seenNamespaceDivision bool
namespace namespace.Name
index string
searchAttributesTypeMap searchattribute.NameTypeMap
searchAttributesMapperProvider searchattribute.MapperProvider
seenNamespaceDivision bool
}
valuesInterceptor struct{}
)
Expand All @@ -52,13 +52,13 @@ func newNameInterceptor(
namespace namespace.Name,
index string,
saTypeMap searchattribute.NameTypeMap,
searchAttributesMapper searchattribute.Mapper,
searchAttributesMapperProvider searchattribute.MapperProvider,
) *nameInterceptor {
return &nameInterceptor{
namespace: namespace,
index: index,
searchAttributesTypeMap: saTypeMap,
searchAttributesMapper: searchAttributesMapper,
namespace: namespace,
index: index,
searchAttributesTypeMap: saTypeMap,
searchAttributesMapperProvider: searchAttributesMapperProvider,
}
}

Expand All @@ -68,12 +68,17 @@ func NewValuesInterceptor() *valuesInterceptor {

func (ni *nameInterceptor) Name(name string, usage query.FieldNameUsage) (string, error) {
fieldName := name
if searchattribute.IsMappable(name) && ni.searchAttributesMapper != nil {
var err error
fieldName, err = ni.searchAttributesMapper.GetFieldName(name, ni.namespace.String())
if searchattribute.IsMappable(name) {
mapper, err := ni.searchAttributesMapperProvider.GetMapper(ni.namespace)
if err != nil {
return "", err
}
if mapper != nil {
fieldName, err = mapper.GetFieldName(name, ni.namespace.String())
if err != nil {
return "", err
}
}
}

fieldType, err := ni.searchAttributesTypeMap.GetType(fieldName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,14 @@ var defaultSorter = []elastic.Sorter{

type (
visibilityStore struct {
esClient client.Client
index string
searchAttributesProvider searchattribute.Provider
searchAttributesMapper searchattribute.Mapper
processor Processor
processorAckTimeout dynamicconfig.DurationPropertyFn
disableOrderByClause dynamicconfig.BoolPropertyFn
metricsHandler metrics.Handler
esClient client.Client
index string
searchAttributesProvider searchattribute.Provider
searchAttributesMapperProvider searchattribute.MapperProvider
processor Processor
processorAckTimeout dynamicconfig.DurationPropertyFn
disableOrderByClause dynamicconfig.BoolPropertyFn
metricsHandler metrics.Handler
}

visibilityPageToken struct {
Expand All @@ -100,22 +100,22 @@ func NewVisibilityStore(
esClient client.Client,
index string,
searchAttributesProvider searchattribute.Provider,
searchAttributesMapper searchattribute.Mapper,
searchAttributesMapperProvider searchattribute.MapperProvider,
processor Processor,
processorAckTimeout dynamicconfig.DurationPropertyFn,
disableOrderByClause dynamicconfig.BoolPropertyFn,
metricsHandler metrics.Handler,
) *visibilityStore {

return &visibilityStore{
esClient: esClient,
index: index,
searchAttributesProvider: searchAttributesProvider,
searchAttributesMapper: searchAttributesMapper,
processor: processor,
processorAckTimeout: processorAckTimeout,
disableOrderByClause: disableOrderByClause,
metricsHandler: metricsHandler.WithTags(metrics.OperationTag(metrics.ElasticsearchVisibility)),
esClient: esClient,
index: index,
searchAttributesProvider: searchAttributesProvider,
searchAttributesMapperProvider: searchAttributesMapperProvider,
processor: processor,
processorAckTimeout: processorAckTimeout,
disableOrderByClause: disableOrderByClause,
metricsHandler: metricsHandler.WithTags(metrics.OperationTag(metrics.ElasticsearchVisibility)),
}
}

Expand Down Expand Up @@ -730,7 +730,7 @@ func (s *visibilityStore) convertQuery(
if err != nil {
return nil, nil, serviceerror.NewUnavailable(fmt.Sprintf("Unable to read search attribute types: %v", err))
}
nameInterceptor := newNameInterceptor(namespace, s.index, saTypeMap, s.searchAttributesMapper)
nameInterceptor := newNameInterceptor(namespace, s.index, saTypeMap, s.searchAttributesMapperProvider)
queryConverter := newQueryConverter(nameInterceptor, NewValuesInterceptor())
requestQuery, fieldSorts, err := queryConverter.ConvertWhereOrderBy(requestQueryStr)
if err != nil {
Expand Down Expand Up @@ -991,7 +991,7 @@ func (s *visibilityStore) parseESDoc(docID string, docSource json.RawMessage, sa
s.metricsHandler.Counter(metrics.ElasticsearchDocumentParseFailuresCount.GetMetricName()).Record(1)
return nil, serviceerror.NewInternal(fmt.Sprintf("Unable to encode custom search attributes of Elasticsearch document(%s): %v", docID, err))
}
aliasedSas, err := searchattribute.AliasFields(s.searchAttributesMapper, record.SearchAttributes, namespace.String())
aliasedSas, err := searchattribute.AliasFields(s.searchAttributesMapperProvider, record.SearchAttributes, namespace.String())
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ type (
suite.Suite
// override suite.Suite.Assertions with require.Assertions; this means that s.NotNil(nil) will stop the test, not merely log an error
*require.Assertions
controller *gomock.Controller
visibilityStore *visibilityStore
mockESClient *client.MockClient
mockProcessor *MockProcessor
mockMetricsHandler *metrics.MockHandler
mockSearchAttributesMapper *searchattribute.MockMapper
controller *gomock.Controller
visibilityStore *visibilityStore
mockESClient *client.MockClient
mockProcessor *MockProcessor
mockMetricsHandler *metrics.MockHandler
mockSearchAttributesMapperProvider *searchattribute.MockMapperProvider
}
)

Expand Down Expand Up @@ -126,12 +126,12 @@ func (s *ESVisibilitySuite) SetupTest() {
s.mockMetricsHandler.EXPECT().WithTags(metrics.OperationTag(metrics.ElasticsearchVisibility)).Return(s.mockMetricsHandler).AnyTimes()
s.mockProcessor = NewMockProcessor(s.controller)
s.mockESClient = client.NewMockClient(s.controller)
s.mockSearchAttributesMapper = searchattribute.NewMockMapper(s.controller)
s.mockSearchAttributesMapperProvider = searchattribute.NewMockMapperProvider(s.controller)
s.visibilityStore = NewVisibilityStore(
s.mockESClient,
testIndex,
searchattribute.NewTestProvider(),
nil,
searchattribute.NewTestMapperProvider(nil),
s.mockProcessor,
esProcessorAckTimeout,
visibilityDisableOrderByClause,
Expand Down Expand Up @@ -649,15 +649,10 @@ func (s *ESVisibilitySuite) Test_convertQuery() {
}

func (s *ESVisibilitySuite) Test_convertQuery_Mapper() {
s.mockSearchAttributesMapper.EXPECT().GetFieldName(gomock.Any(), testNamespace.String()).DoAndReturn(
func(alias string, namespace string) (string, error) {
if strings.HasPrefix(alias, "AliasFor") {
return strings.TrimPrefix(alias, "AliasFor"), nil
}
return "", serviceerror.NewInvalidArgument("mapper error")
}).AnyTimes()
s.mockSearchAttributesMapperProvider.EXPECT().GetMapper(testNamespace).
Return(&searchattribute.TestMapper{}, nil).AnyTimes()

s.visibilityStore.searchAttributesMapper = s.mockSearchAttributesMapper
s.visibilityStore.searchAttributesMapperProvider = s.mockSearchAttributesMapperProvider

query := `WorkflowId = 'wid'`
qry, srt, err := s.visibilityStore.convertQuery(testNamespace, testNamespaceID, query)
Expand Down Expand Up @@ -707,16 +702,14 @@ func (s *ESVisibilitySuite) Test_convertQuery_Mapper() {
s.Error(err)
s.ErrorAs(err, &invalidArgumentErr)
s.EqualError(err, "invalid query: unable to convert 'order by' column name: invalid search attribute: AliasForUnknownField")
s.visibilityStore.searchAttributesMapper = nil
s.visibilityStore.searchAttributesMapperProvider = nil
}

func (s *ESVisibilitySuite) Test_convertQuery_Mapper_Error() {
s.mockSearchAttributesMapper.EXPECT().GetFieldName(gomock.Any(), testNamespace.String()).DoAndReturn(
func(fieldName string, namespace string) (string, error) {
return "", serviceerror.NewInvalidArgument("mapper error")
}).AnyTimes()
s.mockSearchAttributesMapperProvider.EXPECT().GetMapper(testNamespace).
Return(&searchattribute.TestMapper{}, nil).AnyTimes()

s.visibilityStore.searchAttributesMapper = s.mockSearchAttributesMapper
s.visibilityStore.searchAttributesMapperProvider = s.mockSearchAttributesMapperProvider

query := `WorkflowId = 'wid'`
qry, srt, err := s.visibilityStore.convertQuery(testNamespace, testNamespaceID, query)
Expand All @@ -743,7 +736,7 @@ func (s *ESVisibilitySuite) Test_convertQuery_Mapper_Error() {
s.ErrorAs(err, &invalidArgumentErr)
s.EqualError(err, "mapper error")

s.visibilityStore.searchAttributesMapper = nil
s.visibilityStore.searchAttributesMapperProvider = nil
}

func (s *ESVisibilitySuite) TestGetListWorkflowExecutionsResponse() {
Expand Down Expand Up @@ -966,37 +959,27 @@ func (s *ESVisibilitySuite) TestParseESDoc_SearchAttributes_WithMapper() {
"CustomIntField": [111,222],
"CustomBoolField": true,
"UnknownField": "random"}`)
s.visibilityStore.searchAttributesMapper = s.mockSearchAttributesMapper
s.visibilityStore.searchAttributesMapperProvider = s.mockSearchAttributesMapperProvider

s.mockSearchAttributesMapper.EXPECT().GetAlias(gomock.Any(), testNamespace.String()).DoAndReturn(
func(fieldName string, namespace string) (string, error) {
return "AliasOf" + fieldName, nil
}).Times(6)
s.mockSearchAttributesMapperProvider.EXPECT().GetMapper(testNamespace).
Return(&searchattribute.TestMapper{}, nil).AnyTimes()

info, err := s.visibilityStore.parseESDoc("", docSource, searchattribute.TestNameTypeMap, testNamespace)
s.NoError(err)
s.NotNil(info)

s.Len(info.SearchAttributes.GetIndexedFields(), 7)
s.Contains(info.SearchAttributes.GetIndexedFields(), "TemporalChangeVersion")
s.Contains(info.SearchAttributes.GetIndexedFields(), "AliasOfCustomKeywordField")
s.Contains(info.SearchAttributes.GetIndexedFields(), "AliasOfCustomTextField")
s.Contains(info.SearchAttributes.GetIndexedFields(), "AliasOfCustomDatetimeField")
s.Contains(info.SearchAttributes.GetIndexedFields(), "AliasOfCustomDoubleField")
s.Contains(info.SearchAttributes.GetIndexedFields(), "AliasOfCustomBoolField")
s.Contains(info.SearchAttributes.GetIndexedFields(), "AliasOfCustomIntField")
s.Contains(info.SearchAttributes.GetIndexedFields(), "AliasForCustomKeywordField")
s.Contains(info.SearchAttributes.GetIndexedFields(), "AliasForCustomTextField")
s.Contains(info.SearchAttributes.GetIndexedFields(), "AliasForCustomDatetimeField")
s.Contains(info.SearchAttributes.GetIndexedFields(), "AliasForCustomDoubleField")
s.Contains(info.SearchAttributes.GetIndexedFields(), "AliasForCustomBoolField")
s.Contains(info.SearchAttributes.GetIndexedFields(), "AliasForCustomIntField")
s.NotContains(info.SearchAttributes.GetIndexedFields(), "UnknownField")
s.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, info.Status)

s.mockSearchAttributesMapper.EXPECT().GetAlias(gomock.Any(), testNamespace.String()).DoAndReturn(
func(fieldName string, namespace string) (string, error) {
return "", serviceerror.NewUnavailable("error")
})
info, err = s.visibilityStore.parseESDoc("", docSource, searchattribute.TestNameTypeMap, testNamespace)
s.Error(err)
s.Nil(info)

s.visibilityStore.searchAttributesMapper = nil
s.visibilityStore.searchAttributesMapperProvider = nil
}

func (s *ESVisibilitySuite) TestListWorkflowExecutions() {
Expand Down
9 changes: 9 additions & 0 deletions common/resource/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ var Module = fx.Options(
fx.Provide(HostNameProvider),
fx.Provide(TimeSourceProvider),
cluster.MetadataLifetimeHooksModule,
fx.Provide(SearchAttributeMapperProviderProvider),
fx.Provide(SearchAttributeProviderProvider),
fx.Provide(SearchAttributeManagerProvider),
fx.Provide(NamespaceRegistryProvider),
Expand Down Expand Up @@ -162,6 +163,14 @@ func TimeSourceProvider() clock.TimeSource {
return clock.NewRealTimeSource()
}

func SearchAttributeMapperProviderProvider(
saMapper searchattribute.Mapper,
) searchattribute.MapperProvider {
return searchattribute.NewMapperProvider(
saMapper,
)
}

func SearchAttributeProviderProvider(
timeSource clock.TimeSource,
cmMgr persistence.ClusterMetadataManager,
Expand Down
24 changes: 12 additions & 12 deletions common/resourcetest/resourceTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ import (
type (
// Test is the test implementation used for testing
Test struct {
MetricsScope tally.Scope
ClusterMetadata *cluster.MockMetadata
SearchAttributesProvider *searchattribute.MockProvider
SearchAttributesManager *searchattribute.MockManager
SearchAttributesMapper *searchattribute.MockMapper
MetricsScope tally.Scope
ClusterMetadata *cluster.MockMetadata
SearchAttributesProvider *searchattribute.MockProvider
SearchAttributesManager *searchattribute.MockManager
SearchAttributesMapperProvider *searchattribute.MockMapperProvider

// other common resources

Expand Down Expand Up @@ -174,11 +174,11 @@ func NewTest(
)

return &Test{
MetricsScope: scope,
ClusterMetadata: cluster.NewMockMetadata(controller),
SearchAttributesProvider: searchattribute.NewMockProvider(controller),
SearchAttributesManager: searchattribute.NewMockManager(controller),
SearchAttributesMapper: searchattribute.NewMockMapper(controller),
MetricsScope: scope,
ClusterMetadata: cluster.NewMockMetadata(controller),
SearchAttributesProvider: searchattribute.NewMockProvider(controller),
SearchAttributesManager: searchattribute.NewMockManager(controller),
SearchAttributesMapperProvider: searchattribute.NewMockMapperProvider(controller),

// other common resources

Expand Down Expand Up @@ -439,8 +439,8 @@ func (t *Test) GetSearchAttributesManager() searchattribute.Manager {
return t.SearchAttributesManager
}

func (t *Test) GetSearchAttributesMapper() searchattribute.Mapper {
return t.SearchAttributesMapper
func (t *Test) GetSearchAttributesMapperProvider() searchattribute.MapperProvider {
return t.SearchAttributesMapperProvider
}

func (t *Test) RefreshNamespaceCache() {
Expand Down
Loading

0 comments on commit 750fc54

Please sign in to comment.