Skip to content

Commit

Permalink
Fix backward compatibility with std visibility when doing search attr…
Browse files Browse the repository at this point in the history
…ibute operations (#3907)
  • Loading branch information
rodrigozhou authored Feb 6, 2023
1 parent 157cd11 commit 9972e77
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ type (
visibilityStore struct {
session gocql.Session
lowConslevel gocql.Consistency
keyspace string
}
)

Expand All @@ -159,7 +158,6 @@ func NewVisibilityStore(
return &visibilityStore{
session: session,
lowConslevel: gocql.One,
keyspace: cfg.Keyspace,
}, nil
}

Expand All @@ -168,7 +166,9 @@ func (v *visibilityStore) GetName() string {
}

func (v *visibilityStore) GetIndexName() string {
return v.keyspace
// GetIndexName is used to get cluster metadata, which in verstions < v1.20
// were stored in an empty string key.
return ""
}

// Close releases the resources held by this object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ func (s *visibilityStore) GetName() string {
}

func (s *visibilityStore) GetIndexName() string {
return s.sqlStore.GetDbName()
// GetIndexName is used to get cluster metadata, which in verstions < v1.20
// were stored in an empty string key.
return ""
}

func (s *visibilityStore) RecordWorkflowExecutionStarted(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ func (s *standardStore) GetName() string {
}

func (s *standardStore) GetIndexName() string {
return s.store.GetIndexName()
// GetIndexName is used to get cluster metadata, which in verstions < v1.20
// were stored in an empty string key.
return ""
}

func (s *standardStore) RecordWorkflowExecutionStarted(
Expand Down
6 changes: 5 additions & 1 deletion common/searchattribute/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,11 @@ func (m *managerImpl) GetSearchAttributes(
if indexName != "" {
indexSearchAttributes, ok = saCache.searchAttributes[""]
if ok {
maps.Copy(result.customSearchAttributes, indexSearchAttributes.customSearchAttributes)
if result.customSearchAttributes == nil {
result.customSearchAttributes = maps.Clone(indexSearchAttributes.customSearchAttributes)
} else {
maps.Copy(result.customSearchAttributes, indexSearchAttributes.customSearchAttributes)
}
}
}
return result, nil
Expand Down
21 changes: 18 additions & 3 deletions service/frontend/adminHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,12 @@ func (adh *AdminHandler) AddSearchAttributes(
}
}

if adh.visibilityMgr.GetName() == elasticsearch.PersistenceName {
// TODO (rodrigozhou): Remove condition `indexName == ""`.
// If indexName == "", then calling addSearchAttributesElasticsearch will
// register the search attributes in the cluster metadata if ES is up or if
// `skip-schema-update` is set. This is for backward compatibility using
// standard visibility.
if adh.visibilityMgr.GetName() == elasticsearch.PersistenceName || indexName == "" {
err = adh.addSearchAttributesElasticsearch(ctx, request, indexName)
} else {
err = adh.addSearchAttributesSQL(ctx, request, currentSearchAttributes)
Expand Down Expand Up @@ -404,7 +409,12 @@ func (adh *AdminHandler) RemoveSearchAttributes(
}

var err error
if adh.visibilityMgr.GetName() == elasticsearch.PersistenceName {
// TODO (rodrigozhou): Remove condition `indexName == ""`.
// If indexName == "", then calling addSearchAttributesElasticsearch will
// register the search attributes in the cluster metadata if ES is up or if
// `skip-schema-update` is set. This is for backward compatibility using
// standard visibility.
if adh.visibilityMgr.GetName() == elasticsearch.PersistenceName || indexName == "" {
err = adh.removeSearchAttributesElasticsearch(ctx, request, indexName)
} else {
err = adh.removeSearchAttributesSQL(ctx, request)
Expand Down Expand Up @@ -505,7 +515,12 @@ func (adh *AdminHandler) GetSearchAttributes(
return nil, serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetSearchAttributesMessage, err))
}

if adh.visibilityMgr.GetName() == elasticsearch.PersistenceName {
// TODO (rodrigozhou): Remove condition `indexName == ""`.
// If indexName == "", then calling addSearchAttributesElasticsearch will
// register the search attributes in the cluster metadata if ES is up or if
// `skip-schema-update` is set. This is for backward compatibility using
// standard visibility.
if adh.visibilityMgr.GetName() == elasticsearch.PersistenceName || indexName == "" {
return adh.getSearchAttributesElasticsearch(ctx, indexName, searchAttributes)
}
return adh.getSearchAttributesSQL(request, searchAttributes)
Expand Down
21 changes: 18 additions & 3 deletions service/frontend/operator_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,12 @@ func (h *OperatorHandlerImpl) AddSearchAttributes(
}
}

if h.visibilityMgr.GetName() == elasticsearch.PersistenceName {
// TODO (rodrigozhou): Remove condition `indexName == ""`.
// If indexName == "", then calling addSearchAttributesElasticsearch will
// register the search attributes in the cluster metadata if ES is up or if
// `skip-schema-update` is set. This is for backward compatibility using
// standard visibility.
if h.visibilityMgr.GetName() == elasticsearch.PersistenceName || indexName == "" {
err = h.addSearchAttributesElasticsearch(ctx, request, indexName)
} else {
err = h.addSearchAttributesSQL(ctx, request, currentSearchAttributes)
Expand Down Expand Up @@ -325,7 +330,12 @@ func (h *OperatorHandlerImpl) RemoveSearchAttributes(

var err error
indexName := h.visibilityMgr.GetIndexName()
if h.visibilityMgr.GetName() == elasticsearch.PersistenceName {
// TODO (rodrigozhou): Remove condition `indexName == ""`.
// If indexName == "", then calling addSearchAttributesElasticsearch will
// register the search attributes in the cluster metadata if ES is up or if
// `skip-schema-update` is set. This is for backward compatibility using
// standard visibility.
if h.visibilityMgr.GetName() == elasticsearch.PersistenceName || indexName == "" {
err = h.removeSearchAttributesElasticsearch(ctx, request, indexName)
} else {
err = h.removeSearchAttributesSQL(ctx, request)
Expand Down Expand Up @@ -425,7 +435,12 @@ func (h *OperatorHandlerImpl) ListSearchAttributes(
)
}

if h.visibilityMgr.GetName() == elasticsearch.PersistenceName {
// TODO (rodrigozhou): Remove condition `indexName == ""`.
// If indexName == "", then calling addSearchAttributesElasticsearch will
// register the search attributes in the cluster metadata if ES is up or if
// `skip-schema-update` is set. This is for backward compatibility using
// standard visibility.
if h.visibilityMgr.GetName() == elasticsearch.PersistenceName || indexName == "" {
return h.listSearchAttributesElasticsearch(ctx, indexName, searchAttributes)
}
return h.listSearchAttributesSQL(request, searchAttributes)
Expand Down
13 changes: 8 additions & 5 deletions service/history/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ import (
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
persistenceClient "go.temporal.io/server/common/persistence/client"
"go.temporal.io/server/common/persistence/sql/sqlplugin/mysql"
"go.temporal.io/server/common/persistence/sql/sqlplugin/postgresql"
"go.temporal.io/server/common/persistence/sql/sqlplugin/sqlite"
"go.temporal.io/server/common/persistence/visibility"
"go.temporal.io/server/common/persistence/visibility/manager"
"go.temporal.io/server/common/persistence/visibility/store/elasticsearch"
Expand Down Expand Up @@ -165,11 +168,11 @@ func ConfigProvider(
indexName := ""
if persistenceConfig.StandardVisibilityConfigExist() {
storeConfig := persistenceConfig.DataStores[persistenceConfig.VisibilityStore]
switch {
case storeConfig.Cassandra != nil:
indexName = storeConfig.Cassandra.Keyspace
case storeConfig.SQL != nil:
indexName = storeConfig.SQL.DatabaseName
if storeConfig.SQL != nil {
switch storeConfig.SQL.PluginName {
case mysql.PluginNameV8, postgresql.PluginNameV12, sqlite.PluginName:
indexName = storeConfig.SQL.DatabaseName
}
}
} else if persistenceConfig.AdvancedVisibilityConfigExist() {
indexName = esConfig.GetVisibilityIndex()
Expand Down
10 changes: 5 additions & 5 deletions tests/test_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,11 @@ func NewCluster(options *TestClusterConfig, logger log.Logger) (*TestCluster, er
}
} else {
storeConfig := pConfig.DataStores[pConfig.VisibilityStore]
switch {
case storeConfig.Cassandra != nil:
indexName = storeConfig.Cassandra.Keyspace
case storeConfig.SQL != nil:
indexName = storeConfig.SQL.DatabaseName
if storeConfig.SQL != nil {
switch storeConfig.SQL.PluginName {
case mysql.PluginNameV8, postgresql.PluginNameV12, sqlite.PluginName:
indexName = storeConfig.SQL.DatabaseName
}
}
}

Expand Down

0 comments on commit 9972e77

Please sign in to comment.