diff --git a/common/metrics/metric_defs.go b/common/metrics/metric_defs.go index b3702f8f0a5..24d395eea96 100644 --- a/common/metrics/metric_defs.go +++ b/common/metrics/metric_defs.go @@ -1562,7 +1562,6 @@ var ( ShardControllerLockLatency = NewTimerDef("shard_controller_lock_latency") ShardLockLatency = NewTimerDef("shard_lock_latency") NamespaceRegistryLockLatency = NewTimerDef("namespace_registry_lock_latency") - NamespaceRegistryCallbackLockLatency = NewTimerDef("namespace_registry_callback_lock_latency") ClosedWorkflowBufferEventCount = NewCounterDef("closed_workflow_buffer_event_counter") // Matching diff --git a/common/namespace/namespace.go b/common/namespace/namespace.go index ec711634e6e..b8db6328780 100644 --- a/common/namespace/namespace.go +++ b/common/namespace/namespace.go @@ -241,21 +241,6 @@ func (ns *Namespace) GetCustomData(key string) string { return ns.info.Data[key] } -// Len return length -func (t Namespaces) Len() int { - return len(t) -} - -// Swap implements sort.Interface. -func (t Namespaces) Swap(i, j int) { - t[i], t[j] = t[j], t[i] -} - -// Less implements sort.Interface -func (t Namespaces) Less(i, j int) bool { - return t[i].notificationVersion < t[j].notificationVersion -} - // Retention returns retention duration for this namespace. func (ns *Namespace) Retention() time.Duration { if ns.config.Retention == nil { diff --git a/common/namespace/registry.go b/common/namespace/registry.go index 651a6acd0c7..7ebd4b5462a 100644 --- a/common/namespace/registry.go +++ b/common/namespace/registry.go @@ -28,13 +28,11 @@ package namespace import ( "context" - "sort" "sync" "sync/atomic" "time" "go.temporal.io/api/serviceerror" - "golang.org/x/exp/maps" "go.temporal.io/server/common" "go.temporal.io/server/common/cache" @@ -131,8 +129,6 @@ type ( Registry interface { common.Daemon common.Pingable - RegisterNamespaceChangeCallback(listenerID any, initialNotificationVersion int64, prepareCallback PrepareCallbackFn, callback CallbackFn) - UnregisterNamespaceChangeCallback(listenerID any) GetNamespace(name Name) (*Namespace, error) GetNamespaceByID(id ID) (*Namespace, error) GetNamespaceID(name Name) (ID, error) @@ -140,9 +136,9 @@ type ( GetCacheSize() (sizeOfCacheByName int64, sizeOfCacheByID int64) // Refresh forces an immediate refresh of the namespace cache and blocks until it's complete. Refresh() - // Registers callback for namespace state changes. This is regrettably - // different from the above RegisterNamespaceChangeCallback because we - // need different semantics. + // Registers callback for namespace state changes. + // StateChangeCallbackFn will be invoked for a new/deleted namespace or namespace that has + // State, ReplicationState, ActiveCluster, or isGlobalNamespace config changed. RegisterStateChangeCallback(key any, cb StateChangeCallbackFn) UnregisterStateChangeCallback(key any) } @@ -156,24 +152,14 @@ type ( clock Clock metricsHandler metrics.Handler logger log.Logger - lastRefreshTime atomic.Value refreshInterval dynamicconfig.DurationPropertyFn // cacheLock protects cachNameToID, cacheByID and stateChangeCallbacks. // If the exclusive side is to be held at the same time as the // callbackLock (below), this lock MUST be acquired *first*. - cacheLock sync.RWMutex - cacheNameToID cache.Cache - cacheByID cache.Cache - - // callbackLock protects prepareCallbacks and callbacks. Do not call - // cacheLock.Lock() (the other lock in this struct, above) while holding - // this lock or you risk a deadlock. - callbackLock sync.Mutex - prepareCallbacks map[any]PrepareCallbackFn - callbacks map[any]CallbackFn - - // State-change callbacks. Protected by cacheLock + cacheLock sync.RWMutex + cacheNameToID cache.Cache + cacheByID cache.Cache stateChangeCallbacks map[any]StateChangeCallbackFn } ) @@ -196,12 +182,9 @@ func NewRegistry( logger: logger, cacheNameToID: cache.New(cacheMaxSize, &cacheOpts), cacheByID: cache.New(cacheMaxSize, &cacheOpts), - prepareCallbacks: make(map[any]PrepareCallbackFn), - callbacks: make(map[any]CallbackFn), refreshInterval: refreshInterval, stateChangeCallbacks: make(map[any]StateChangeCallbackFn), } - reg.lastRefreshTime.Store(time.Time{}) return reg } @@ -257,18 +240,6 @@ func (r *registry) GetPingChecks() []common.PingCheck { }, MetricsName: metrics.NamespaceRegistryLockLatency.GetMetricName(), }, - { - Name: "namespace registry callback lock", - // we don't do any persistence ops, this shouldn't be blocked - Timeout: 10 * time.Second, - Ping: func() []common.Pingable { - r.callbackLock.Lock() - //lint:ignore SA2001 just checking if we can acquire the lock - r.callbackLock.Unlock() - return nil - }, - MetricsName: metrics.NamespaceRegistryCallbackLockLatency.GetMetricName(), - }, } } @@ -292,53 +263,6 @@ func (r *registry) getAllNamespaceLocked() map[ID]*Namespace { return result } -// RegisterNamespaceChangeCallback set a namespace change callback WARN: -// callback functions MUST NOT call back into this registry instance, either to -// unregister themselves or to look up Namespaces. -func (r *registry) RegisterNamespaceChangeCallback( - listenerID any, - initialNotificationVersion int64, - prepareCallback PrepareCallbackFn, - callback CallbackFn, -) { - - r.callbackLock.Lock() - r.prepareCallbacks[listenerID] = prepareCallback - r.callbacks[listenerID] = callback - r.callbackLock.Unlock() - - // this section is trying to make the shard catch up with namespace changes - namespaces := Namespaces(maps.Values(r.getAllNamespace())) - // we mush notify the change in a ordered fashion - // since history shard have to update the shard info - // with namespace change version. - sort.Sort(namespaces) - - var oldEntries []*Namespace - var newEntries []*Namespace - for _, namespace := range namespaces { - if namespace.notificationVersion >= initialNotificationVersion { - oldEntries = append(oldEntries, nil) - newEntries = append(newEntries, namespace) - } - } - if len(oldEntries) > 0 { - prepareCallback() - callback(oldEntries, newEntries) - } -} - -// UnregisterNamespaceChangeCallback delete a namespace failover callback -func (r *registry) UnregisterNamespaceChangeCallback( - listenerID any, -) { - r.callbackLock.Lock() - defer r.callbackLock.Unlock() - - delete(r.prepareCallbacks, listenerID) - delete(r.callbacks, listenerID) -} - func (r *registry) RegisterStateChangeCallback(key any, cb StateChangeCallbackFn) { r.cacheLock.Lock() r.stateChangeCallbacks[key] = cb @@ -444,21 +368,13 @@ func (r *registry) refreshLoop(ctx context.Context) error { } } if replyCh != nil { - replyCh <- struct{}{} + replyCh <- struct{}{} // TODO: close replyCh? } } } } func (r *registry) refreshNamespaces(ctx context.Context) error { - // first load the metadata record, then load namespaces - // this can guarantee that namespaces in the cache are not updated more than metadata record - metadata, err := r.persistence.GetMetadata(ctx) - if err != nil { - return err - } - namespaceNotificationVersion := metadata.NotificationVersion - request := &persistence.ListNamespacesRequest{ PageSize: CacheRefreshPageSize, IncludeDeleted: true, @@ -481,10 +397,6 @@ func (r *registry) refreshNamespaces(ctx context.Context) error { request.NextPageToken = response.NextPageToken } - // Sort namespaces by notification version because changes must be applied in this order - // because history shard has to update the shard info with namespace change version. - sort.Sort(namespacesDb) - // Make a copy of the existing namespace cache (excluding deleted), so we can calculate diff and do "compare and swap". newCacheNameToID := cache.New(cacheMaxSize, &cacheOpts) newCacheByID := cache.New(cacheMaxSize, &cacheOpts) @@ -498,49 +410,29 @@ func (r *registry) refreshNamespaces(ctx context.Context) error { newCacheByID.Put(ID(namespace.info.Id), namespace) } - var oldEntries []*Namespace - var newEntries []*Namespace var stateChanged []*Namespace -UpdateLoop: for _, namespace := range namespacesDb { - if namespace.notificationVersion >= namespaceNotificationVersion { - // this guarantee that namespace change events before the - // namespaceNotificationVersion is loaded into the cache. - - // the namespace change events after the namespaceNotificationVersion - // will be loaded into cache in the next refresh - break UpdateLoop - } - oldNS, oldNSAnyVersion := r.updateIDToNamespaceCache(newCacheByID, namespace.ID(), namespace) + oldNS := r.updateIDToNamespaceCache(newCacheByID, namespace.ID(), namespace) newCacheNameToID.Put(namespace.Name(), namespace.ID()) - if oldNS != nil { - oldEntries = append(oldEntries, oldNS) - newEntries = append(newEntries, namespace) - } - // this test should include anything that might affect whether a namespace is active on // this cluster. - if oldNSAnyVersion == nil || - oldNSAnyVersion.State() != namespace.State() || - oldNSAnyVersion.IsGlobalNamespace() != namespace.IsGlobalNamespace() || - oldNSAnyVersion.ActiveClusterName() != namespace.ActiveClusterName() { + if oldNS == nil || + oldNS.State() != namespace.State() || + oldNS.IsGlobalNamespace() != namespace.IsGlobalNamespace() || + oldNS.ActiveClusterName() != namespace.ActiveClusterName() || + oldNS.ReplicationState() != namespace.ReplicationState() { stateChanged = append(stateChanged, namespace) } } var stateChangeCallbacks []StateChangeCallbackFn - // NOTE: READ REF BEFORE MODIFICATION - // ref: historyEngine.go registerNamespaceFailoverCallback function - r.publishCacheUpdate(func() (Namespaces, Namespaces) { - r.cacheLock.Lock() - defer r.cacheLock.Unlock() - r.cacheByID = newCacheByID - r.cacheNameToID = newCacheNameToID - stateChangeCallbacks = mapAnyValues(r.stateChangeCallbacks) - return oldEntries, newEntries - }) + r.cacheLock.Lock() + r.cacheByID = newCacheByID + r.cacheNameToID = newCacheNameToID + stateChangeCallbacks = mapAnyValues(r.stateChangeCallbacks) + r.cacheLock.Unlock() // call state change callbacks for _, cb := range stateChangeCallbacks { @@ -559,15 +451,12 @@ func (r *registry) updateIDToNamespaceCache( cacheByID cache.Cache, id ID, newNS *Namespace, -) (*Namespace, *Namespace) { +) (oldNS *Namespace) { oldCacheRec := cacheByID.Put(id, newNS) if oldNS, ok := oldCacheRec.(*Namespace); ok { - if newNS.notificationVersion > oldNS.notificationVersion && r.globalNamespacesEnabled { - return oldNS, oldNS - } - return nil, oldNS + return oldNS } - return nil, nil + return nil } // getNamespace retrieves the information from the cache if it exists @@ -594,53 +483,6 @@ func (r *registry) getNamespaceByIDLocked(id ID) (*Namespace, error) { return nil, serviceerror.NewNamespaceNotFound(id.String()) } -func (r *registry) publishCacheUpdate( - updateCache func() (Namespaces, Namespaces), -) { - now := r.clock.Now() - - prepareCallbacks, callbacks := r.getNamespaceChangeCallbacks() - - r.triggerNamespaceChangePrepareCallback(prepareCallbacks) - oldEntries, newEntries := updateCache() - r.triggerNamespaceChangeCallback(callbacks, oldEntries, newEntries) - r.lastRefreshTime.Store(now) -} - -func (r *registry) getNamespaceChangeCallbacks() ([]PrepareCallbackFn, []CallbackFn) { - r.callbackLock.Lock() - defer r.callbackLock.Unlock() - return mapAnyValues(r.prepareCallbacks), mapAnyValues(r.callbacks) -} - -func (r *registry) triggerNamespaceChangePrepareCallback( - prepareCallbacks []PrepareCallbackFn, -) { - startTime := time.Now().UTC() - defer func() { - r.metricsHandler.Timer(metrics.NamespaceCachePrepareCallbacksLatency.GetMetricName()).Record(time.Since(startTime)) - }() - - for _, prepareCallback := range prepareCallbacks { - prepareCallback() - } -} - -func (r *registry) triggerNamespaceChangeCallback( - callbacks []CallbackFn, - oldNamespaces []*Namespace, - newNamespaces []*Namespace, -) { - startTime := time.Now().UTC() - defer func() { - r.metricsHandler.Timer(metrics.NamespaceCacheCallbacksLatency.GetMetricName()).Record(time.Since(startTime)) - }() - - for _, callback := range callbacks { - callback(oldNamespaces, newNamespaces) - } -} - // This is https://pkg.go.dev/golang.org/x/exp/maps#Values except that it works // for map[any]T (see https://github.com/golang/go/issues/51257 and many more) func mapAnyValues[T any](m map[any]T) []T { diff --git a/common/namespace/registry_mock.go b/common/namespace/registry_mock.go index 26142aa9b17..dc0356fa01b 100644 --- a/common/namespace/registry_mock.go +++ b/common/namespace/registry_mock.go @@ -267,18 +267,6 @@ func (mr *MockRegistryMockRecorder) Refresh() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Refresh", reflect.TypeOf((*MockRegistry)(nil).Refresh)) } -// RegisterNamespaceChangeCallback mocks base method. -func (m *MockRegistry) RegisterNamespaceChangeCallback(listenerID any, initialNotificationVersion int64, prepareCallback PrepareCallbackFn, callback CallbackFn) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "RegisterNamespaceChangeCallback", listenerID, initialNotificationVersion, prepareCallback, callback) -} - -// RegisterNamespaceChangeCallback indicates an expected call of RegisterNamespaceChangeCallback. -func (mr *MockRegistryMockRecorder) RegisterNamespaceChangeCallback(listenerID, initialNotificationVersion, prepareCallback, callback interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RegisterNamespaceChangeCallback", reflect.TypeOf((*MockRegistry)(nil).RegisterNamespaceChangeCallback), listenerID, initialNotificationVersion, prepareCallback, callback) -} - // RegisterStateChangeCallback mocks base method. func (m *MockRegistry) RegisterStateChangeCallback(key any, cb StateChangeCallbackFn) { m.ctrl.T.Helper() @@ -315,18 +303,6 @@ func (mr *MockRegistryMockRecorder) Stop() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockRegistry)(nil).Stop)) } -// UnregisterNamespaceChangeCallback mocks base method. -func (m *MockRegistry) UnregisterNamespaceChangeCallback(listenerID any) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "UnregisterNamespaceChangeCallback", listenerID) -} - -// UnregisterNamespaceChangeCallback indicates an expected call of UnregisterNamespaceChangeCallback. -func (mr *MockRegistryMockRecorder) UnregisterNamespaceChangeCallback(listenerID interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UnregisterNamespaceChangeCallback", reflect.TypeOf((*MockRegistry)(nil).UnregisterNamespaceChangeCallback), listenerID) -} - // UnregisterStateChangeCallback mocks base method. func (m *MockRegistry) UnregisterStateChangeCallback(key any) { m.ctrl.T.Helper() diff --git a/common/namespace/registry_test.go b/common/namespace/registry_test.go index 2a02d0d1faa..8e195f3bc08 100644 --- a/common/namespace/registry_test.go +++ b/common/namespace/registry_test.go @@ -166,10 +166,6 @@ func (s *registrySuite) TestListNamespace() { pageToken := []byte("some random page token") - s.regPersistence.EXPECT().GetMetadata(gomock.Any()).Return( - &persistence.GetMetadataResponse{ - NotificationVersion: namespaceNotificationVersion, - }, nil) s.regPersistence.EXPECT().ListNamespaces(gomock.Any(), &persistence.ListNamespacesRequest{ PageSize: namespace.CacheRefreshPageSize, IncludeDeleted: true, @@ -209,7 +205,7 @@ func (s *registrySuite) TestListNamespace() { s.Equal(entry2, entryByID2) } -func (s *registrySuite) TestRegisterCallback_CatchUp() { +func (s *registrySuite) TestRegisterStateChangeCallback_CatchUp() { namespaceNotificationVersion := int64(0) namespaceRecord1 := &persistence.GetNamespaceResponse{ Namespace: &persistencespb.NamespaceDetail{ @@ -265,10 +261,6 @@ func (s *registrySuite) TestRegisterCallback_CatchUp() { entry2 := namespace.FromPersistentState(namespaceRecord2) namespaceNotificationVersion++ - s.regPersistence.EXPECT().GetMetadata(gomock.Any()).Return( - &persistence.GetMetadataResponse{ - NotificationVersion: namespaceNotificationVersion, - }, nil) s.regPersistence.EXPECT().ListNamespaces(gomock.Any(), &persistence.ListNamespacesRequest{ PageSize: namespace.CacheRefreshPageSize, IncludeDeleted: true, @@ -284,27 +276,19 @@ func (s *registrySuite) TestRegisterCallback_CatchUp() { s.registry.Start() defer s.registry.Stop() - prepareCallbackInvoked := false var entriesNotification []*namespace.Namespace - // we are not testing catching up, so make this really large - currentNamespaceNotificationVersion := int64(0) - s.registry.RegisterNamespaceChangeCallback( + s.registry.RegisterStateChangeCallback( "0", - currentNamespaceNotificationVersion, - func() { - prepareCallbackInvoked = true - }, - func(prevNamespaces []*namespace.Namespace, nextNamespaces []*namespace.Namespace) { - s.Equal(len(prevNamespaces), len(nextNamespaces)) - for index := range prevNamespaces { - s.Nil(prevNamespaces[index]) - } - entriesNotification = nextNamespaces + func(ns *namespace.Namespace, deletedFromDb bool) { + s.False(deletedFromDb) + entriesNotification = append(entriesNotification, ns) }, ) - // the order matters here, should be ordered by notification version - s.True(prepareCallbackInvoked) + s.Len(entriesNotification, 2) + if entriesNotification[0].NotificationVersion() > entriesNotification[1].NotificationVersion() { + entriesNotification[0], entriesNotification[1] = entriesNotification[1], entriesNotification[0] + } s.Equal([]*namespace.Namespace{entry1, entry2}, entriesNotification) } @@ -364,10 +348,6 @@ func (s *registrySuite) TestUpdateCache_TriggerCallBack() { entry2Old := namespace.FromPersistentState(namespaceRecord2Old) namespaceNotificationVersion++ - s.regPersistence.EXPECT().GetMetadata(gomock.Any()).Return( - &persistence.GetMetadataResponse{ - NotificationVersion: namespaceNotificationVersion, - }, nil) s.regPersistence.EXPECT().ListNamespaces(gomock.Any(), &persistence.ListNamespacesRequest{ PageSize: namespace.CacheRefreshPageSize, IncludeDeleted: true, @@ -421,33 +401,28 @@ func (s *registrySuite) TestUpdateCache_TriggerCallBack() { }, NotificationVersion: namespaceNotificationVersion, } - entry1New := namespace.FromPersistentState(namespaceRecord1New) namespaceNotificationVersion++ - prepareCallbackInvoked := false - var entriesOld []*namespace.Namespace - var entriesNew []*namespace.Namespace - // we are not testing catching up, so make this really large - currentNamespaceNotificationVersion := int64(9999999) - s.registry.RegisterNamespaceChangeCallback( + var entries []*namespace.Namespace + + wg := &sync.WaitGroup{} + wg.Add(2) + s.registry.RegisterStateChangeCallback( "0", - currentNamespaceNotificationVersion, - func() { - prepareCallbackInvoked = true - }, - func(prevNamespaces []*namespace.Namespace, nextNamespaces []*namespace.Namespace) { - entriesOld = prevNamespaces - entriesNew = nextNamespaces + func(ns *namespace.Namespace, deletedFromDb bool) { + defer wg.Done() + s.False(deletedFromDb) + entries = append(entries, ns) }, ) - s.False(prepareCallbackInvoked) - s.Empty(entriesOld) - s.Empty(entriesNew) - - s.regPersistence.EXPECT().GetMetadata(gomock.Any()).Return( - &persistence.GetMetadataResponse{ - NotificationVersion: namespaceNotificationVersion, - }, nil) + wg.Wait() + + s.Len(entries, 2) + if entries[0].NotificationVersion() > entries[1].NotificationVersion() { + entries[0], entries[1] = entries[1], entries[0] + } + s.Equal([]*namespace.Namespace{entry1Old, entry2Old}, entries) + s.regPersistence.EXPECT().ListNamespaces(gomock.Any(), &persistence.ListNamespacesRequest{ PageSize: namespace.CacheRefreshPageSize, IncludeDeleted: true, @@ -459,23 +434,18 @@ func (s *registrySuite) TestUpdateCache_TriggerCallBack() { NextPageToken: nil, }, nil) + entries = []*namespace.Namespace{} + + wg.Add(1) s.registry.Refresh() + wg.Wait() - // the order matters here: the record 2 got updated first, thus with a lower notification version - // the record 1 got updated later, thus a higher notification version. - // making sure notifying from lower to higher version helps the shard to keep track the - // namespace change events - s.True(prepareCallbackInvoked) - s.Equal([]*namespace.Namespace{entry2Old, entry1Old}, entriesOld) - s.Equal([]*namespace.Namespace{entry2New, entry1New}, entriesNew) + // entry1 only has descrption update, so won't trigger the state change callback + s.Len(entries, 1) + s.Equal([]*namespace.Namespace{entry2New}, entries) } func (s *registrySuite) TestGetTriggerListAndUpdateCache_ConcurrentAccess() { - namespaceNotificationVersion := int64(999999) // make this notification version really large for test - s.regPersistence.EXPECT().GetMetadata(gomock.Any()).Return( - &persistence.GetMetadataResponse{ - NotificationVersion: namespaceNotificationVersion, - }, nil) id := namespace.NewID() namespaceRecordOld := &persistence.GetNamespaceResponse{ Namespace: &persistencespb.NamespaceDetail{ @@ -595,10 +565,6 @@ func (s *registrySuite) TestRemoveDeletedNamespace() { } namespaceNotificationVersion++ - s.regPersistence.EXPECT().GetMetadata(gomock.Any()).Return( - &persistence.GetMetadataResponse{ - NotificationVersion: namespaceNotificationVersion, - }, nil) s.regPersistence.EXPECT().ListNamespaces(gomock.Any(), &persistence.ListNamespacesRequest{ PageSize: namespace.CacheRefreshPageSize, IncludeDeleted: true, @@ -614,10 +580,6 @@ func (s *registrySuite) TestRemoveDeletedNamespace() { s.registry.Start() defer s.registry.Stop() - s.regPersistence.EXPECT().GetMetadata(gomock.Any()).Return( - &persistence.GetMetadataResponse{ - NotificationVersion: namespaceNotificationVersion, - }, nil) s.regPersistence.EXPECT().ListNamespaces(gomock.Any(), &persistence.ListNamespacesRequest{ PageSize: namespace.CacheRefreshPageSize, IncludeDeleted: true, @@ -642,7 +604,7 @@ func (s *registrySuite) TestRemoveDeletedNamespace() { s.ErrorAs(err, ¬Found) } -func TestCacheByName(t *testing.T) { +func (s *registrySuite) TestCacheByName() { nsrec := persistence.GetNamespaceResponse{ Namespace: &persistencespb.NamespaceDetail{ Info: &persistencespb.NamespaceInfo{ @@ -653,17 +615,14 @@ func TestCacheByName(t *testing.T) { ReplicationConfig: &persistencespb.NamespaceReplicationConfig{}, }, } - regPersist := persistence.NewMockMetadataManager(gomock.NewController(t)) - regPersist.EXPECT().GetMetadata(gomock.Any()).Return( - &persistence.GetMetadataResponse{NotificationVersion: nsrec.NotificationVersion + 1}, nil) - regPersist.EXPECT().ListNamespaces(gomock.Any(), gomock.Any()).Return(&persistence.ListNamespacesResponse{ + + s.regPersistence.EXPECT().ListNamespaces(gomock.Any(), gomock.Any()).Return(&persistence.ListNamespacesResponse{ Namespaces: []*persistence.GetNamespaceResponse{&nsrec}, }, nil) - reg := namespace.NewRegistry( - regPersist, false, dynamicconfig.GetDurationPropertyFn(time.Second), metrics.NoopMetricsHandler, log.NewNoopLogger()) - reg.Start() - defer reg.Stop() - ns, err := reg.GetNamespace(namespace.Name("foo")) - require.NoError(t, err) - require.Equal(t, namespace.Name("foo"), ns.Name()) + + s.registry.Start() + defer s.registry.Stop() + ns, err := s.registry.GetNamespace(namespace.Name("foo")) + s.NoError(err) + s.Equal(namespace.Name("foo"), ns.Name()) } diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 8269f0a3e57..203610e3a97 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -268,20 +268,12 @@ func (e *historyEngineImpl) Start() { e.logger.Info("", tag.LifeCycleStarting) defer e.logger.Info("", tag.LifeCycleStarted) + e.registerNamespaceStateChangeCallback() + for _, queueProcessor := range e.queueProcessors { queueProcessor.Start() } e.replicationProcessorMgr.Start() - - // failover callback will try to create a failover queue processor to scan all inflight tasks - // if domain needs to be failovered. However, in the multicursor queue logic, the scan range - // can't be retrieved before the processor is started. If failover callback is registered - // before queue processor is started, it may result in a deadline as to create the failover queue, - // queue processor need to be started. - // - // Ideally, when both timer and transfer queues enabled single cursor mode, we don't have to register - // the callback. However, currently namespace migration is relying on the callback to UpdateHandoverNamespaces - e.registerNamespaceFailoverCallback() } // Stop the service. @@ -302,107 +294,32 @@ func (e *historyEngineImpl) Stop() { } e.replicationProcessorMgr.Stop() // unset the failover callback - e.shard.GetNamespaceRegistry().UnregisterNamespaceChangeCallback(e) -} - -func (e *historyEngineImpl) registerNamespaceFailoverCallback() { - - // NOTE: READ BEFORE MODIFICATION - // - // Tasks, e.g. transfer tasks and timer tasks, are created when holding the shard lock - // meaning tasks -> release of shard lock - // - // Namespace change notification follows the following steps, order matters - // 1. lock all task processing. - // 2. namespace changes visible to everyone (Note: lock of task processing prevents task processing logic seeing the namespace changes). - // 3. failover min and max task levels are calculated, then update to shard. - // 4. failover start & task processing unlock & shard namespace version notification update. (order does not matter for this discussion) - // - // The above guarantees that task created during the failover will be processed. - // If the task is created after namespace change: - // then active processor will handle it. (simple case) - // If the task is created before namespace change: - // task -> release of shard lock - // failover min / max task levels calculated & updated to shard (using shard lock) -> failover start - // above 2 guarantees that failover start is after persistence of the task. - - failoverPredicate := func(shardNotificationVersion int64, nextNamespace *namespace.Namespace, action func()) { - namespaceFailoverNotificationVersion := nextNamespace.FailoverNotificationVersion() - namespaceActiveCluster := nextNamespace.ActiveClusterName() - - // +1 in the following check as the version in shard is max notification version +1. - // Need to run action() when namespaceFailoverNotificationVersion+1 == shardNotificationVersion - // as we don't know if the failover queue execution for that notification version is - // completed or not. - // - // NOTE: theoretically we need to get rid of the check on shardNotificationVersion, as - // we have no idea if the failover queue for any notification version below that is completed - // or not. However, removing that will cause more load upon shard reload. - // So here assume failover queue processor for notification version < X-1 is completed if - // shard notification version is X. - - if nextNamespace.IsGlobalNamespace() && - nextNamespace.ReplicationPolicy() == namespace.ReplicationPolicyMultiCluster && - namespaceFailoverNotificationVersion+1 >= shardNotificationVersion && - namespaceActiveCluster == e.currentClusterName { - action() - } - } + e.shard.GetNamespaceRegistry().UnregisterStateChangeCallback(e) +} - // first set the failover callback - e.shard.GetNamespaceRegistry().RegisterNamespaceChangeCallback( - e, - 0, /* always want callback so UpdateHandoverNamespaces() can be called after shard reload */ - func() {}, - func(prevNamespaces []*namespace.Namespace, nextNamespaces []*namespace.Namespace) { - if len(nextNamespaces) == 0 { - return - } +func (e *historyEngineImpl) registerNamespaceStateChangeCallback() { - if e.shard.GetClusterMetadata().IsGlobalNamespaceEnabled() { - e.shard.UpdateHandoverNamespaces(nextNamespaces) - } + e.shard.GetNamespaceRegistry().RegisterStateChangeCallback(e, func(ns *namespace.Namespace, deletedFromDb bool) { + if e.shard.GetClusterMetadata().IsGlobalNamespaceEnabled() { + e.shard.UpdateHandoverNamespaces(ns, deletedFromDb) + } - newNotificationVersion := nextNamespaces[len(nextNamespaces)-1].NotificationVersion() + 1 - shardNotificationVersion := e.shard.GetNamespaceNotificationVersion() - - // 1. We can't return when newNotificationVersion == shardNotificationVersion - // since we don't know if the previous failover queue processing has finished or not - // 2. We can return when newNotificationVersion < shardNotificationVersion. But the check - // is basically the same as the check in failover predicate. Because - // failoverNotificationVersion + 1 <= NotificationVersion + 1 = newNotificationVersion, - // there's no notification version can make - // newNotificationVersion < shardNotificationVersion and - // failoverNotificationVersion + 1 >= shardNotificationVersion are true at the same time - // Meaning if the check decides to return, no namespace will pass the failover predicate. - - failoverNamespaceIDs := map[string]struct{}{} - for _, nextNamespace := range nextNamespaces { - failoverPredicate(shardNotificationVersion, nextNamespace, func() { - failoverNamespaceIDs[nextNamespace.ID().String()] = struct{}{} - }) - } + if deletedFromDb { + return + } + + if ns.IsGlobalNamespace() && + ns.ReplicationPolicy() == namespace.ReplicationPolicyMultiCluster && + ns.ActiveClusterName() == e.currentClusterName { - if len(failoverNamespaceIDs) > 0 { - e.logger.Info("Namespace Failover Start.", tag.WorkflowNamespaceIDs(failoverNamespaceIDs)) - - for _, queueProcessor := range e.queueProcessors { - queueProcessor.FailoverNamespace(failoverNamespaceIDs) - } - - // the fake tasks will not be actually used, we just need to make sure - // its length > 0 and has correct timestamp, to trigger a db scan - now := e.shard.GetTimeSource().Now() - fakeTasks := make(map[tasks.Category][]tasks.Task) - for category := range e.queueProcessors { - fakeTasks[category] = []tasks.Task{tasks.NewFakeTask(definition.WorkflowKey{}, category, now)} - } - e.NotifyNewTasks(fakeTasks) + for _, queueProcessor := range e.queueProcessors { + queueProcessor.FailoverNamespace(ns.ID().String()) } + } - _ = e.shard.UpdateNamespaceNotificationVersion(newNotificationVersion) - }, - ) + // for backward compatibility + _ = e.shard.UpdateNamespaceNotificationVersion(ns.NotificationVersion() + 1) + }) } // StartWorkflowExecution starts a workflow execution diff --git a/service/history/queues/queue.go b/service/history/queues/queue.go index f2a7643378a..2ef75bbe066 100644 --- a/service/history/queues/queue.go +++ b/service/history/queues/queue.go @@ -36,6 +36,6 @@ type ( common.Daemon Category() tasks.Category NotifyNewTasks(tasks []tasks.Task) - FailoverNamespace(namespaceIDs map[string]struct{}) + FailoverNamespace(namespaceID string) } ) diff --git a/service/history/queues/queue_base.go b/service/history/queues/queue_base.go index 9a1abbcb37d..859e8c40064 100644 --- a/service/history/queues/queue_base.go +++ b/service/history/queues/queue_base.go @@ -275,9 +275,9 @@ func (p *queueBase) Category() tasks.Category { } func (p *queueBase) FailoverNamespace( - namespaceIDs map[string]struct{}, + namespaceID string, ) { - p.rescheduler.Reschedule(namespaceIDs) + p.rescheduler.Reschedule(namespaceID) } func (p *queueBase) processNewRange() error { diff --git a/service/history/queues/queue_mock.go b/service/history/queues/queue_mock.go index 54415e828fc..e4ed19c3fec 100644 --- a/service/history/queues/queue_mock.go +++ b/service/history/queues/queue_mock.go @@ -73,15 +73,15 @@ func (mr *MockQueueMockRecorder) Category() *gomock.Call { } // FailoverNamespace mocks base method. -func (m *MockQueue) FailoverNamespace(namespaceIDs map[string]struct{}) { +func (m *MockQueue) FailoverNamespace(namespaceID string) { m.ctrl.T.Helper() - m.ctrl.Call(m, "FailoverNamespace", namespaceIDs) + m.ctrl.Call(m, "FailoverNamespace", namespaceID) } // FailoverNamespace indicates an expected call of FailoverNamespace. -func (mr *MockQueueMockRecorder) FailoverNamespace(namespaceIDs interface{}) *gomock.Call { +func (mr *MockQueueMockRecorder) FailoverNamespace(namespaceID interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FailoverNamespace", reflect.TypeOf((*MockQueue)(nil).FailoverNamespace), namespaceIDs) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FailoverNamespace", reflect.TypeOf((*MockQueue)(nil).FailoverNamespace), namespaceID) } // NotifyNewTasks mocks base method. diff --git a/service/history/queues/rescheduler.go b/service/history/queues/rescheduler.go index dec68123daf..de6af4dd38d 100644 --- a/service/history/queues/rescheduler.go +++ b/service/history/queues/rescheduler.go @@ -61,7 +61,7 @@ type ( // Reschedule triggers an immediate reschedule for provided namespace // ignoring executable's reschedule time. // Used by namespace failover logic - Reschedule(namespaceIDs map[string]struct{}) + Reschedule(namespaceID string) // Len returns the total number of task executables waiting to be rescheduled. Len() int @@ -159,7 +159,7 @@ func (r *reschedulerImpl) Add( } func (r *reschedulerImpl) Reschedule( - namespaceIDs map[string]struct{}, + namespaceID string, ) { r.Lock() defer r.Unlock() @@ -167,7 +167,7 @@ func (r *reschedulerImpl) Reschedule( now := r.timeSource.Now() updatedRescheduleTime := false for key, pq := range r.pqMap { - if _, ok := namespaceIDs[key.NamespaceID]; !ok { + if key.NamespaceID != namespaceID { continue } diff --git a/service/history/queues/rescheduler_mock.go b/service/history/queues/rescheduler_mock.go index edfdfac4c0d..05699dc1d57 100644 --- a/service/history/queues/rescheduler_mock.go +++ b/service/history/queues/rescheduler_mock.go @@ -85,15 +85,15 @@ func (mr *MockReschedulerMockRecorder) Len() *gomock.Call { } // Reschedule mocks base method. -func (m *MockRescheduler) Reschedule(namespaceIDs map[string]struct{}) { +func (m *MockRescheduler) Reschedule(namespaceID string) { m.ctrl.T.Helper() - m.ctrl.Call(m, "Reschedule", namespaceIDs) + m.ctrl.Call(m, "Reschedule", namespaceID) } // Reschedule indicates an expected call of Reschedule. -func (mr *MockReschedulerMockRecorder) Reschedule(namespaceIDs interface{}) *gomock.Call { +func (mr *MockReschedulerMockRecorder) Reschedule(namespaceID interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reschedule", reflect.TypeOf((*MockRescheduler)(nil).Reschedule), namespaceIDs) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reschedule", reflect.TypeOf((*MockRescheduler)(nil).Reschedule), namespaceID) } // Start mocks base method. diff --git a/service/history/queues/rescheduler_test.go b/service/history/queues/rescheduler_test.go index 97f72ce1c7c..b45d5922c5b 100644 --- a/service/history/queues/rescheduler_test.go +++ b/service/history/queues/rescheduler_test.go @@ -247,9 +247,7 @@ func (s *rescheudulerSuite) TestImmdiateReschedule() { return true }).Times(numTask) - s.rescheduler.Reschedule(map[string]struct{}{ - namespaceID: {}, - }) + s.rescheduler.Reschedule(namespaceID) taskWG.Wait() s.Equal(0, s.rescheduler.Len()) } diff --git a/service/history/queues/scheduler.go b/service/history/queues/scheduler.go index 7f91f14edc9..dfb1edc86e0 100644 --- a/service/history/queues/scheduler.go +++ b/service/history/queues/scheduler.go @@ -124,7 +124,7 @@ func NewNamespacePriorityScheduler( namespaceWeights = options.StandbyNamespaceWeights } } else { - // if namespace not found, treat is as active namespace and + // if namespace not found, treat it as active namespace and // use default active namespace weight logger.Warn("Unable to find namespace, using active namespace task channel weight", tag.WorkflowNamespaceID(key.NamespaceID), @@ -241,36 +241,19 @@ func NewPriorityScheduler( func (s *schedulerImpl) Start() { if s.channelWeightUpdateCh != nil { - s.namespaceRegistry.RegisterNamespaceChangeCallback( - s, - 0, - func() {}, // no-op - func(oldNamespaces, newNamespaces []*namespace.Namespace) { - namespaceFailover := false - for idx := range oldNamespaces { - if oldNamespaces[idx].FailoverVersion() != newNamespaces[idx].FailoverVersion() { - namespaceFailover = true - break - } - } - - if !namespaceFailover { - return - } - - select { - case s.channelWeightUpdateCh <- struct{}{}: - default: - } - }, - ) + s.namespaceRegistry.RegisterStateChangeCallback(s, func(ns *namespace.Namespace, deletedFromDb bool) { + select { + case s.channelWeightUpdateCh <- struct{}{}: + default: + } + }) } s.Scheduler.Start() } func (s *schedulerImpl) Stop() { if s.channelWeightUpdateCh != nil { - s.namespaceRegistry.UnregisterNamespaceChangeCallback(s) + s.namespaceRegistry.UnregisterStateChangeCallback(s) // note we can't close the channelWeightUpdateCh here // as callback may still be triggered even after unregister returns diff --git a/service/history/shard/context.go b/service/history/shard/context.go index 81d4a999b44..4dc1ddb52f3 100644 --- a/service/history/shard/context.go +++ b/service/history/shard/context.go @@ -102,9 +102,10 @@ type ( GetReplicationStatus(cluster []string) (map[string]*historyservice.ShardReplicationStatusPerCluster, map[string]*historyservice.HandoverNamespaceInfo, error) - GetNamespaceNotificationVersion() int64 + // TODO: deprecate UpdateNamespaceNotificationVersion in v1.21 and remove + // NamespaceNotificationVersion from shardInfo proto blob UpdateNamespaceNotificationVersion(namespaceNotificationVersion int64) error - UpdateHandoverNamespaces(newNamespaces []*namespace.Namespace) + UpdateHandoverNamespaces(ns *namespace.Namespace, deletedFromDb bool) AppendHistoryEvents(ctx context.Context, request *persistence.AppendHistoryNodesRequest, namespaceID namespace.ID, execution commonpb.WorkflowExecution) (int, error) diff --git a/service/history/shard/context_impl.go b/service/history/shard/context_impl.go index 53f2fa7225f..c2a3fe058b8 100644 --- a/service/history/shard/context_impl.go +++ b/service/history/shard/context_impl.go @@ -588,13 +588,6 @@ func (s *ContextImpl) GetAllFailoverLevels(category tasks.Category) map[string]p return ret } -func (s *ContextImpl) GetNamespaceNotificationVersion() int64 { - s.rLock() - defer s.rUnlock() - - return s.shardInfo.NamespaceNotificationVersion -} - func (s *ContextImpl) UpdateNamespaceNotificationVersion(namespaceNotificationVersion int64) error { s.wLock() defer s.wUnlock() @@ -608,8 +601,15 @@ func (s *ContextImpl) UpdateNamespaceNotificationVersion(namespaceNotificationVe return nil } -func (s *ContextImpl) UpdateHandoverNamespaces(namespaces []*namespace.Namespace) { +func (s *ContextImpl) UpdateHandoverNamespaces(ns *namespace.Namespace, deletedFromDb bool) { + nsName := ns.Name() + s.wLock() + if deletedFromDb { + delete(s.handoverNamespaces, ns.Name()) + s.wUnlock() + return + } maxReplicationTaskID := s.immediateTaskExclusiveMaxReadLevel - 1 if s.errorByState() != nil { @@ -618,36 +618,27 @@ func (s *ContextImpl) UpdateHandoverNamespaces(namespaces []*namespace.Namespace maxReplicationTaskID = pendingMaxReplicationTaskID } - currentClustername := s.GetClusterMetadata().GetCurrentClusterName() - newHandoverNamespaces := make(map[namespace.Name]struct{}) - for _, ns := range namespaces { - // NOTE: replication state field won't be replicated and currently we only update a namespace - // to handover state from active cluster, so the second condition will always be true. Adding - // it here to be more safe in case above assumption no longer holds in the future. - if ns.IsGlobalNamespace() && ns.ActiveInCluster(currentClustername) && ns.ReplicationState() == enums.REPLICATION_STATE_HANDOVER { - nsName := ns.Name() - newHandoverNamespaces[nsName] = struct{}{} - if handover, ok := s.handoverNamespaces[nsName]; ok { - if handover.NotificationVersion < ns.NotificationVersion() { - handover.NotificationVersion = ns.NotificationVersion() - handover.MaxReplicationTaskID = maxReplicationTaskID - } - } else { - s.handoverNamespaces[nsName] = &namespaceHandOverInfo{ - NotificationVersion: ns.NotificationVersion(), - MaxReplicationTaskID: maxReplicationTaskID, - } + // NOTE: replication state field won't be replicated and currently we only update a namespace + // to handover state from active cluster, so the second condition will always be true. Adding + // it here to be more safe in case above assumption no longer holds in the future. + if ns.IsGlobalNamespace() && + ns.ActiveInCluster(s.GetClusterMetadata().GetCurrentClusterName()) && + ns.ReplicationState() == enums.REPLICATION_STATE_HANDOVER { + + if handover, ok := s.handoverNamespaces[nsName]; ok { + if handover.NotificationVersion < ns.NotificationVersion() { + handover.NotificationVersion = ns.NotificationVersion() + handover.MaxReplicationTaskID = maxReplicationTaskID + } + } else { + s.handoverNamespaces[nsName] = &namespaceHandOverInfo{ + NotificationVersion: ns.NotificationVersion(), + MaxReplicationTaskID: maxReplicationTaskID, } } } - // delete old handover ns - for k := range s.handoverNamespaces { - if _, ok := newHandoverNamespaces[k]; !ok { - delete(s.handoverNamespaces, k) - } - } - s.wUnlock() + s.wUnlock() s.notifyReplicationQueueProcessor(maxReplicationTaskID) } diff --git a/service/history/shard/context_mock.go b/service/history/shard/context_mock.go index 26a07f6eb18..bafe7dc4bce 100644 --- a/service/history/shard/context_mock.go +++ b/service/history/shard/context_mock.go @@ -434,20 +434,6 @@ func (mr *MockContextMockRecorder) GetMetricsHandler() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMetricsHandler", reflect.TypeOf((*MockContext)(nil).GetMetricsHandler)) } -// GetNamespaceNotificationVersion mocks base method. -func (m *MockContext) GetNamespaceNotificationVersion() int64 { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetNamespaceNotificationVersion") - ret0, _ := ret[0].(int64) - return ret0 -} - -// GetNamespaceNotificationVersion indicates an expected call of GetNamespaceNotificationVersion. -func (mr *MockContextMockRecorder) GetNamespaceNotificationVersion() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetNamespaceNotificationVersion", reflect.TypeOf((*MockContext)(nil).GetNamespaceNotificationVersion)) -} - // GetNamespaceRegistry mocks base method. func (m *MockContext) GetNamespaceRegistry() namespace.Registry { m.ctrl.T.Helper() @@ -718,15 +704,15 @@ func (mr *MockContextMockRecorder) UpdateFailoverLevel(category, failoverID, lev } // UpdateHandoverNamespaces mocks base method. -func (m *MockContext) UpdateHandoverNamespaces(newNamespaces []*namespace.Namespace) { +func (m *MockContext) UpdateHandoverNamespaces(ns *namespace.Namespace, deletedFromDb bool) { m.ctrl.T.Helper() - m.ctrl.Call(m, "UpdateHandoverNamespaces", newNamespaces) + m.ctrl.Call(m, "UpdateHandoverNamespaces", ns, deletedFromDb) } // UpdateHandoverNamespaces indicates an expected call of UpdateHandoverNamespaces. -func (mr *MockContextMockRecorder) UpdateHandoverNamespaces(newNamespaces interface{}) *gomock.Call { +func (mr *MockContextMockRecorder) UpdateHandoverNamespaces(ns, deletedFromDb interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateHandoverNamespaces", reflect.TypeOf((*MockContext)(nil).UpdateHandoverNamespaces), newNamespaces) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateHandoverNamespaces", reflect.TypeOf((*MockContext)(nil).UpdateHandoverNamespaces), ns, deletedFromDb) } // UpdateNamespaceNotificationVersion mocks base method.