Skip to content

Commit

Permalink
Deprecate old namespace change callback (#3774)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Jan 7, 2023
1 parent 3882bb6 commit 04ce8e3
Show file tree
Hide file tree
Showing 16 changed files with 140 additions and 503 deletions.
1 change: 0 additions & 1 deletion common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1562,7 +1562,6 @@ var (
ShardControllerLockLatency = NewTimerDef("shard_controller_lock_latency")
ShardLockLatency = NewTimerDef("shard_lock_latency")
NamespaceRegistryLockLatency = NewTimerDef("namespace_registry_lock_latency")
NamespaceRegistryCallbackLockLatency = NewTimerDef("namespace_registry_callback_lock_latency")
ClosedWorkflowBufferEventCount = NewCounterDef("closed_workflow_buffer_event_counter")

// Matching
Expand Down
15 changes: 0 additions & 15 deletions common/namespace/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,21 +241,6 @@ func (ns *Namespace) GetCustomData(key string) string {
return ns.info.Data[key]
}

// Len return length
func (t Namespaces) Len() int {
return len(t)
}

// Swap implements sort.Interface.
func (t Namespaces) Swap(i, j int) {
t[i], t[j] = t[j], t[i]
}

// Less implements sort.Interface
func (t Namespaces) Less(i, j int) bool {
return t[i].notificationVersion < t[j].notificationVersion
}

// Retention returns retention duration for this namespace.
func (ns *Namespace) Retention() time.Duration {
if ns.config.Retention == nil {
Expand Down
200 changes: 21 additions & 179 deletions common/namespace/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,11 @@ package namespace

import (
"context"
"sort"
"sync"
"sync/atomic"
"time"

"go.temporal.io/api/serviceerror"
"golang.org/x/exp/maps"

"go.temporal.io/server/common"
"go.temporal.io/server/common/cache"
Expand Down Expand Up @@ -131,18 +129,16 @@ type (
Registry interface {
common.Daemon
common.Pingable
RegisterNamespaceChangeCallback(listenerID any, initialNotificationVersion int64, prepareCallback PrepareCallbackFn, callback CallbackFn)
UnregisterNamespaceChangeCallback(listenerID any)
GetNamespace(name Name) (*Namespace, error)
GetNamespaceByID(id ID) (*Namespace, error)
GetNamespaceID(name Name) (ID, error)
GetNamespaceName(id ID) (Name, error)
GetCacheSize() (sizeOfCacheByName int64, sizeOfCacheByID int64)
// Refresh forces an immediate refresh of the namespace cache and blocks until it's complete.
Refresh()
// Registers callback for namespace state changes. This is regrettably
// different from the above RegisterNamespaceChangeCallback because we
// need different semantics.
// Registers callback for namespace state changes.
// StateChangeCallbackFn will be invoked for a new/deleted namespace or namespace that has
// State, ReplicationState, ActiveCluster, or isGlobalNamespace config changed.
RegisterStateChangeCallback(key any, cb StateChangeCallbackFn)
UnregisterStateChangeCallback(key any)
}
Expand All @@ -156,24 +152,14 @@ type (
clock Clock
metricsHandler metrics.Handler
logger log.Logger
lastRefreshTime atomic.Value
refreshInterval dynamicconfig.DurationPropertyFn

// cacheLock protects cachNameToID, cacheByID and stateChangeCallbacks.
// If the exclusive side is to be held at the same time as the
// callbackLock (below), this lock MUST be acquired *first*.
cacheLock sync.RWMutex
cacheNameToID cache.Cache
cacheByID cache.Cache

// callbackLock protects prepareCallbacks and callbacks. Do not call
// cacheLock.Lock() (the other lock in this struct, above) while holding
// this lock or you risk a deadlock.
callbackLock sync.Mutex
prepareCallbacks map[any]PrepareCallbackFn
callbacks map[any]CallbackFn

// State-change callbacks. Protected by cacheLock
cacheLock sync.RWMutex
cacheNameToID cache.Cache
cacheByID cache.Cache
stateChangeCallbacks map[any]StateChangeCallbackFn
}
)
Expand All @@ -196,12 +182,9 @@ func NewRegistry(
logger: logger,
cacheNameToID: cache.New(cacheMaxSize, &cacheOpts),
cacheByID: cache.New(cacheMaxSize, &cacheOpts),
prepareCallbacks: make(map[any]PrepareCallbackFn),
callbacks: make(map[any]CallbackFn),
refreshInterval: refreshInterval,
stateChangeCallbacks: make(map[any]StateChangeCallbackFn),
}
reg.lastRefreshTime.Store(time.Time{})
return reg
}

Expand Down Expand Up @@ -257,18 +240,6 @@ func (r *registry) GetPingChecks() []common.PingCheck {
},
MetricsName: metrics.NamespaceRegistryLockLatency.GetMetricName(),
},
{
Name: "namespace registry callback lock",
// we don't do any persistence ops, this shouldn't be blocked
Timeout: 10 * time.Second,
Ping: func() []common.Pingable {
r.callbackLock.Lock()
//lint:ignore SA2001 just checking if we can acquire the lock
r.callbackLock.Unlock()
return nil
},
MetricsName: metrics.NamespaceRegistryCallbackLockLatency.GetMetricName(),
},
}
}

Expand All @@ -292,53 +263,6 @@ func (r *registry) getAllNamespaceLocked() map[ID]*Namespace {
return result
}

// RegisterNamespaceChangeCallback set a namespace change callback WARN:
// callback functions MUST NOT call back into this registry instance, either to
// unregister themselves or to look up Namespaces.
func (r *registry) RegisterNamespaceChangeCallback(
listenerID any,
initialNotificationVersion int64,
prepareCallback PrepareCallbackFn,
callback CallbackFn,
) {

r.callbackLock.Lock()
r.prepareCallbacks[listenerID] = prepareCallback
r.callbacks[listenerID] = callback
r.callbackLock.Unlock()

// this section is trying to make the shard catch up with namespace changes
namespaces := Namespaces(maps.Values(r.getAllNamespace()))
// we mush notify the change in a ordered fashion
// since history shard have to update the shard info
// with namespace change version.
sort.Sort(namespaces)

var oldEntries []*Namespace
var newEntries []*Namespace
for _, namespace := range namespaces {
if namespace.notificationVersion >= initialNotificationVersion {
oldEntries = append(oldEntries, nil)
newEntries = append(newEntries, namespace)
}
}
if len(oldEntries) > 0 {
prepareCallback()
callback(oldEntries, newEntries)
}
}

// UnregisterNamespaceChangeCallback delete a namespace failover callback
func (r *registry) UnregisterNamespaceChangeCallback(
listenerID any,
) {
r.callbackLock.Lock()
defer r.callbackLock.Unlock()

delete(r.prepareCallbacks, listenerID)
delete(r.callbacks, listenerID)
}

func (r *registry) RegisterStateChangeCallback(key any, cb StateChangeCallbackFn) {
r.cacheLock.Lock()
r.stateChangeCallbacks[key] = cb
Expand Down Expand Up @@ -444,21 +368,13 @@ func (r *registry) refreshLoop(ctx context.Context) error {
}
}
if replyCh != nil {
replyCh <- struct{}{}
replyCh <- struct{}{} // TODO: close replyCh?
}
}
}
}

func (r *registry) refreshNamespaces(ctx context.Context) error {
// first load the metadata record, then load namespaces
// this can guarantee that namespaces in the cache are not updated more than metadata record
metadata, err := r.persistence.GetMetadata(ctx)
if err != nil {
return err
}
namespaceNotificationVersion := metadata.NotificationVersion

request := &persistence.ListNamespacesRequest{
PageSize: CacheRefreshPageSize,
IncludeDeleted: true,
Expand All @@ -481,10 +397,6 @@ func (r *registry) refreshNamespaces(ctx context.Context) error {
request.NextPageToken = response.NextPageToken
}

// Sort namespaces by notification version because changes must be applied in this order
// because history shard has to update the shard info with namespace change version.
sort.Sort(namespacesDb)

// Make a copy of the existing namespace cache (excluding deleted), so we can calculate diff and do "compare and swap".
newCacheNameToID := cache.New(cacheMaxSize, &cacheOpts)
newCacheByID := cache.New(cacheMaxSize, &cacheOpts)
Expand All @@ -498,49 +410,29 @@ func (r *registry) refreshNamespaces(ctx context.Context) error {
newCacheByID.Put(ID(namespace.info.Id), namespace)
}

var oldEntries []*Namespace
var newEntries []*Namespace
var stateChanged []*Namespace
UpdateLoop:
for _, namespace := range namespacesDb {
if namespace.notificationVersion >= namespaceNotificationVersion {
// this guarantee that namespace change events before the
// namespaceNotificationVersion is loaded into the cache.

// the namespace change events after the namespaceNotificationVersion
// will be loaded into cache in the next refresh
break UpdateLoop
}
oldNS, oldNSAnyVersion := r.updateIDToNamespaceCache(newCacheByID, namespace.ID(), namespace)
oldNS := r.updateIDToNamespaceCache(newCacheByID, namespace.ID(), namespace)
newCacheNameToID.Put(namespace.Name(), namespace.ID())

if oldNS != nil {
oldEntries = append(oldEntries, oldNS)
newEntries = append(newEntries, namespace)
}

// this test should include anything that might affect whether a namespace is active on
// this cluster.
if oldNSAnyVersion == nil ||
oldNSAnyVersion.State() != namespace.State() ||
oldNSAnyVersion.IsGlobalNamespace() != namespace.IsGlobalNamespace() ||
oldNSAnyVersion.ActiveClusterName() != namespace.ActiveClusterName() {
if oldNS == nil ||
oldNS.State() != namespace.State() ||
oldNS.IsGlobalNamespace() != namespace.IsGlobalNamespace() ||
oldNS.ActiveClusterName() != namespace.ActiveClusterName() ||
oldNS.ReplicationState() != namespace.ReplicationState() {
stateChanged = append(stateChanged, namespace)
}
}

var stateChangeCallbacks []StateChangeCallbackFn

// NOTE: READ REF BEFORE MODIFICATION
// ref: historyEngine.go registerNamespaceFailoverCallback function
r.publishCacheUpdate(func() (Namespaces, Namespaces) {
r.cacheLock.Lock()
defer r.cacheLock.Unlock()
r.cacheByID = newCacheByID
r.cacheNameToID = newCacheNameToID
stateChangeCallbacks = mapAnyValues(r.stateChangeCallbacks)
return oldEntries, newEntries
})
r.cacheLock.Lock()
r.cacheByID = newCacheByID
r.cacheNameToID = newCacheNameToID
stateChangeCallbacks = mapAnyValues(r.stateChangeCallbacks)
r.cacheLock.Unlock()

// call state change callbacks
for _, cb := range stateChangeCallbacks {
Expand All @@ -559,15 +451,12 @@ func (r *registry) updateIDToNamespaceCache(
cacheByID cache.Cache,
id ID,
newNS *Namespace,
) (*Namespace, *Namespace) {
) (oldNS *Namespace) {
oldCacheRec := cacheByID.Put(id, newNS)
if oldNS, ok := oldCacheRec.(*Namespace); ok {
if newNS.notificationVersion > oldNS.notificationVersion && r.globalNamespacesEnabled {
return oldNS, oldNS
}
return nil, oldNS
return oldNS
}
return nil, nil
return nil
}

// getNamespace retrieves the information from the cache if it exists
Expand All @@ -594,53 +483,6 @@ func (r *registry) getNamespaceByIDLocked(id ID) (*Namespace, error) {
return nil, serviceerror.NewNamespaceNotFound(id.String())
}

func (r *registry) publishCacheUpdate(
updateCache func() (Namespaces, Namespaces),
) {
now := r.clock.Now()

prepareCallbacks, callbacks := r.getNamespaceChangeCallbacks()

r.triggerNamespaceChangePrepareCallback(prepareCallbacks)
oldEntries, newEntries := updateCache()
r.triggerNamespaceChangeCallback(callbacks, oldEntries, newEntries)
r.lastRefreshTime.Store(now)
}

func (r *registry) getNamespaceChangeCallbacks() ([]PrepareCallbackFn, []CallbackFn) {
r.callbackLock.Lock()
defer r.callbackLock.Unlock()
return mapAnyValues(r.prepareCallbacks), mapAnyValues(r.callbacks)
}

func (r *registry) triggerNamespaceChangePrepareCallback(
prepareCallbacks []PrepareCallbackFn,
) {
startTime := time.Now().UTC()
defer func() {
r.metricsHandler.Timer(metrics.NamespaceCachePrepareCallbacksLatency.GetMetricName()).Record(time.Since(startTime))
}()

for _, prepareCallback := range prepareCallbacks {
prepareCallback()
}
}

func (r *registry) triggerNamespaceChangeCallback(
callbacks []CallbackFn,
oldNamespaces []*Namespace,
newNamespaces []*Namespace,
) {
startTime := time.Now().UTC()
defer func() {
r.metricsHandler.Timer(metrics.NamespaceCacheCallbacksLatency.GetMetricName()).Record(time.Since(startTime))
}()

for _, callback := range callbacks {
callback(oldNamespaces, newNamespaces)
}
}

// This is https://pkg.go.dev/golang.org/x/exp/maps#Values except that it works
// for map[any]T (see https://github.com/golang/go/issues/51257 and many more)
func mapAnyValues[T any](m map[any]T) []T {
Expand Down
24 changes: 0 additions & 24 deletions common/namespace/registry_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 04ce8e3

Please sign in to comment.