From 44a75fbbb5714c1714ab888eb60ecb40a928bb6f Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Tue, 6 Dec 2022 11:06:20 -0800 Subject: [PATCH 1/6] Clean namespace handover --- common/metrics/metric_defs.go | 1 + common/resource/fx.go | 10 ++- common/rpc/interceptor/namespace_validator.go | 4 +- .../interceptor/namespace_validator_test.go | 2 +- service/history/api/retry_util.go | 4 + service/history/consts/const.go | 3 + service/history/historyEngine.go | 3 +- service/history/nDCTaskUtil.go | 13 +++ service/history/queues/executable.go | 23 ++++-- service/history/shard/context.go | 2 +- service/history/shard/context_impl.go | 79 +++++++++++++++++-- service/history/shard/context_mock.go | 8 +- service/history/shard/context_testutil.go | 3 +- .../history/timerQueueActiveTaskExecutor.go | 17 +++- .../transferQueueActiveTaskExecutor.go | 16 +++- .../history/visibilityQueueTaskExecutor.go | 19 ++++- service/worker/migration/activities.go | 2 +- 17 files changed, 180 insertions(+), 29 deletions(-) diff --git a/common/metrics/metric_defs.go b/common/metrics/metric_defs.go index dbae56d9e55..e0705180b86 100644 --- a/common/metrics/metric_defs.go +++ b/common/metrics/metric_defs.go @@ -1418,6 +1418,7 @@ var ( TaskWorkflowBusyCounter = NewCounterDef("task_errors_workflow_busy") TaskNotActiveCounter = NewCounterDef("task_errors_not_active_counter") TaskLimitExceededCounter = NewCounterDef("task_errors_limit_exceeded_counter") + TaskNamespaceHandoverCounter = NewCounterDef("task_errors_namespace_handover") TaskScheduleToStartLatency = NewTimerDef("task_schedule_to_start_latency") TransferTaskMissingEventCounter = NewCounterDef("transfer_task_missing_event_counter") TaskBatchCompleteCounter = NewCounterDef("task_batch_complete_counter") diff --git a/common/resource/fx.go b/common/resource/fx.go index 1c091a8f89e..e05c04fb9a5 100644 --- a/common/resource/fx.go +++ b/common/resource/fx.go @@ -64,6 +64,7 @@ import ( "go.temporal.io/server/common/ringpop" "go.temporal.io/server/common/rpc" "go.temporal.io/server/common/rpc/encryption" + "go.temporal.io/server/common/rpc/interceptor" "go.temporal.io/server/common/sdk" "go.temporal.io/server/common/searchattribute" "go.temporal.io/server/common/telemetry" @@ -349,7 +350,14 @@ func HistoryClientProvider(clientBean client.Bean) historyservice.HistoryService historyClient := history.NewRetryableClient( historyRawClient, common.CreateHistoryClientRetryPolicy(), - common.IsServiceClientTransientError, + func(err error) bool { + if err.Error() == interceptor.ErrNamespaceHandover.Error() { + // prevent retrying namespace handover unavailable error + // in when calling history service + return false + } + return common.IsServiceClientTransientError(err) + }, ) return historyClient } diff --git a/common/rpc/interceptor/namespace_validator.go b/common/rpc/interceptor/namespace_validator.go index f10d3aed75b..102f64eec1d 100644 --- a/common/rpc/interceptor/namespace_validator.go +++ b/common/rpc/interceptor/namespace_validator.go @@ -51,7 +51,7 @@ type ( var ( ErrNamespaceNotSet = serviceerror.NewInvalidArgument("Namespace not set on request.") errNamespaceTooLong = serviceerror.NewInvalidArgument("Namespace length exceeds limit.") - errNamespaceHandover = serviceerror.NewUnavailable(fmt.Sprintf("Namespace replication in %s state.", enumspb.REPLICATION_STATE_HANDOVER.String())) + ErrNamespaceHandover = serviceerror.NewUnavailable(fmt.Sprintf("Namespace replication in %s state.", enumspb.REPLICATION_STATE_HANDOVER.String())) errTaskTokenNotSet = serviceerror.NewInvalidArgument("Task token not set on request.") errTaskTokenNamespaceMismatch = serviceerror.NewInvalidArgument("Operation requested with a token from a different namespace.") @@ -266,5 +266,5 @@ func (ni *NamespaceValidatorInterceptor) checkReplicationState(namespaceEntry *n return nil } - return errNamespaceHandover + return ErrNamespaceHandover } diff --git a/common/rpc/interceptor/namespace_validator_test.go b/common/rpc/interceptor/namespace_validator_test.go index 7c38ca050c3..95122d35935 100644 --- a/common/rpc/interceptor/namespace_validator_test.go +++ b/common/rpc/interceptor/namespace_validator_test.go @@ -200,7 +200,7 @@ func (s *namespaceValidatorSuite) Test_StateValidationIntercept_StatusFromNamesp { state: enumspb.NAMESPACE_STATE_REGISTERED, replicationState: enumspb.REPLICATION_STATE_HANDOVER, - expectedErr: errNamespaceHandover, + expectedErr: ErrNamespaceHandover, method: "/temporal/StartWorkflowExecution", req: &workflowservice.StartWorkflowExecutionRequest{Namespace: "test-namespace"}, }, diff --git a/service/history/api/retry_util.go b/service/history/api/retry_util.go index 90d3dfdc0f8..97c3daa9b09 100644 --- a/service/history/api/retry_util.go +++ b/service/history/api/retry_util.go @@ -35,6 +35,10 @@ import ( ) func IsRetryableError(err error) bool { + if err == consts.ErrNamespaceHandover { + return false + } + return err == consts.ErrStaleState || err == consts.ErrLocateCurrentWorkflowExecution || err == consts.ErrBufferedQueryCleared || diff --git a/service/history/consts/const.go b/service/history/consts/const.go index b10e8406faf..7d6e141baed 100644 --- a/service/history/consts/const.go +++ b/service/history/consts/const.go @@ -31,6 +31,7 @@ import ( "go.temporal.io/api/serviceerror" "go.temporal.io/server/common" + "go.temporal.io/server/common/rpc/interceptor" ) const ( @@ -90,6 +91,8 @@ var ( ErrWorkflowNotReady = serviceerror.NewWorkflowNotReady("Workflow state is not ready to handle the request.") // ErrWorkflowTaskNotScheduled is error indicating workflow task is not scheduled yet. ErrWorkflowTaskNotScheduled = serviceerror.NewWorkflowNotReady("Workflow task is not scheduled yet.") + // ErrNamespaceHandover is error in dicating namespace is in handover state and cannot process request. + ErrNamespaceHandover = interceptor.ErrNamespaceHandover // FailedWorkflowStatuses is a set of failed workflow close states, used for start workflow policy // for start workflow execution API diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 562ef8109c0..20f0d0175df 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -369,8 +369,7 @@ func (e *historyEngineImpl) registerNamespaceFailoverCallback() { } if e.shard.GetClusterMetadata().IsGlobalNamespaceEnabled() { - maxTaskID, _ := e.replicationAckMgr.GetMaxTaskInfo() - e.shard.UpdateHandoverNamespaces(nextNamespaces, maxTaskID) + e.shard.UpdateHandoverNamespaces(nextNamespaces) } newNotificationVersion := nextNamespaces[len(nextNamespaces)-1].NotificationVersion() + 1 diff --git a/service/history/nDCTaskUtil.go b/service/history/nDCTaskUtil.go index d240744d1e0..e6c0ac6db83 100644 --- a/service/history/nDCTaskUtil.go +++ b/service/history/nDCTaskUtil.go @@ -27,6 +27,7 @@ package history import ( "context" + enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" enumsspb "go.temporal.io/server/api/enums/v1" @@ -210,3 +211,15 @@ func getNamespaceTagByID( return metrics.NamespaceTag(namespaceName.String()) } + +func getNamespaceTagAndReplicationStateByID( + registry namespace.Registry, + namespaceID string, +) (metrics.Tag, enumspb.ReplicationState) { + namespace, err := registry.GetNamespaceByID(namespace.ID(namespaceID)) + if err != nil { + return metrics.NamespaceUnknownTag(), enumspb.REPLICATION_STATE_UNSPECIFIED + } + + return metrics.NamespaceTag(namespace.Name().String()), namespace.ReplicationState() +} diff --git a/service/history/queues/executable.go b/service/history/queues/executable.go index c69c6cb36fe..2ca3552b225 100644 --- a/service/history/queues/executable.go +++ b/service/history/queues/executable.go @@ -284,6 +284,12 @@ func (e *executableImpl) HandleErr(err error) (retErr error) { return nil } + if err.Error() == consts.ErrNamespaceHandover.Error() { + e.taggedMetricsHandler.Counter(metrics.TaskNamespaceHandoverCounter.GetMetricName()).Record(1) + err = consts.ErrNamespaceHandover + return err + } + if _, ok := err.(*serviceerror.NamespaceNotActive); ok { // TODO remove this error check special case after multi-cursor is enabled by default, // since the new task life cycle will not give up until task processed / verified @@ -325,7 +331,10 @@ func (e *executableImpl) IsRetryableError(err error) bool { // ErrTaskRetry means mutable state is not ready for standby task processing // there's no point for retrying the task immediately which will hold the worker corouinte // TODO: change ErrTaskRetry to a better name - return err != consts.ErrTaskRetry && err != consts.ErrWorkflowBusy && err != consts.ErrDependencyTaskNotCompleted + return err != consts.ErrTaskRetry && + err != consts.ErrWorkflowBusy && + err != consts.ErrDependencyTaskNotCompleted && + err != consts.ErrNamespaceHandover } func (e *executableImpl) RetryPolicy() backoff.RetryPolicy { @@ -449,7 +458,9 @@ func (e *executableImpl) shouldResubmitOnNack(attempt int, err error) bool { return false } - return err != consts.ErrTaskRetry && err != consts.ErrDependencyTaskNotCompleted + return err != consts.ErrTaskRetry && + err != consts.ErrDependencyTaskNotCompleted && + err != consts.ErrNamespaceHandover } func (e *executableImpl) rescheduleTime( @@ -459,12 +470,14 @@ func (e *executableImpl) rescheduleTime( // elapsedTime (the first parameter in ComputeNextDelay) is not relevant here // since reschedule policy has no expiration interval. - if err == consts.ErrTaskRetry { + if err == consts.ErrTaskRetry || err == consts.ErrNamespaceHandover { // using a different reschedule policy to slow down retry - // as the error means mutable state is not ready to handle the task, + // as the error means mutable state or namespace is not ready to handle the task, // need to wait for replication. return e.timeSource.Now().Add(taskNotReadyReschedulePolicy.ComputeNextDelay(0, attempt)) - } else if err == consts.ErrDependencyTaskNotCompleted { + } + + if err == consts.ErrDependencyTaskNotCompleted { return e.timeSource.Now().Add(dependencyTaskNotCompletedReschedulePolicy.ComputeNextDelay(0, attempt)) } diff --git a/service/history/shard/context.go b/service/history/shard/context.go index 117bbbf22bf..3cb9a214b46 100644 --- a/service/history/shard/context.go +++ b/service/history/shard/context.go @@ -104,7 +104,7 @@ type ( GetNamespaceNotificationVersion() int64 UpdateNamespaceNotificationVersion(namespaceNotificationVersion int64) error - UpdateHandoverNamespaces(newNamespaces []*namespace.Namespace, maxRepTaskID int64) + UpdateHandoverNamespaces(newNamespaces []*namespace.Namespace) AppendHistoryEvents(ctx context.Context, request *persistence.AppendHistoryNodesRequest, namespaceID namespace.ID, execution commonpb.WorkflowExecution) (int, error) diff --git a/service/history/shard/context_impl.go b/service/history/shard/context_impl.go index 4f8ce397358..196cc5eaed0 100644 --- a/service/history/shard/context_impl.go +++ b/service/history/shard/context_impl.go @@ -28,6 +28,7 @@ import ( "context" "errors" "fmt" + "math" "sync" "time" @@ -63,6 +64,7 @@ import ( "go.temporal.io/server/common/searchattribute" "go.temporal.io/server/common/util" "go.temporal.io/server/service/history/configs" + "go.temporal.io/server/service/history/consts" "go.temporal.io/server/service/history/events" "go.temporal.io/server/service/history/tasks" "go.temporal.io/server/service/history/vclock" @@ -80,6 +82,8 @@ const ( const ( shardIOTimeout = 5 * time.Second + + pendingMaxReplicationTaskID = math.MaxInt64 ) type ( @@ -130,7 +134,7 @@ type ( // exist only in memory remoteClusterInfos map[string]*remoteClusterInfo - handoverNamespaces map[string]*namespaceHandOverInfo // keyed on namespace name + handoverNamespaces map[namespace.Name]*namespaceHandOverInfo // keyed on namespace name } remoteClusterInfo struct { @@ -602,24 +606,31 @@ func (s *ContextImpl) UpdateNamespaceNotificationVersion(namespaceNotificationVe return nil } -func (s *ContextImpl) UpdateHandoverNamespaces(namespaces []*namespace.Namespace, maxRepTaskID int64) { +func (s *ContextImpl) UpdateHandoverNamespaces(namespaces []*namespace.Namespace) { s.wLock() defer s.wUnlock() - newHandoverNamespaces := make(map[string]struct{}) + maxReplicationTaskID := s.immediateTaskExclusiveMaxReadLevel + if s.errorByState() != nil { + // if shard state is not acquired, we don't know that's the max taskID + // as there might be in-flight requests + maxReplicationTaskID = pendingMaxReplicationTaskID + } + + newHandoverNamespaces := make(map[namespace.Name]struct{}) for _, ns := range namespaces { if ns.IsGlobalNamespace() && ns.ReplicationState() == enums.REPLICATION_STATE_HANDOVER { - nsName := ns.Name().String() + nsName := ns.Name() newHandoverNamespaces[nsName] = struct{}{} if handover, ok := s.handoverNamespaces[nsName]; ok { if handover.NotificationVersion < ns.NotificationVersion() { handover.NotificationVersion = ns.NotificationVersion() - handover.MaxReplicationTaskID = maxRepTaskID + handover.MaxReplicationTaskID = maxReplicationTaskID } } else { s.handoverNamespaces[nsName] = &namespaceHandOverInfo{ NotificationVersion: ns.NotificationVersion(), - MaxReplicationTaskID: maxRepTaskID, + MaxReplicationTaskID: maxReplicationTaskID, } } } @@ -659,6 +670,10 @@ func (s *ContextImpl) AddTasks( s.wUnlock() return err } + if err := s.errorByNamespaceStateLocked(namespaceEntry.Name()); err != nil { + s.wUnlock() + return err + } err = s.addTasksLocked(ctx, request, namespaceEntry) s.wUnlock() @@ -694,6 +709,10 @@ func (s *ContextImpl) CreateWorkflowExecution( return nil, err } + if err := s.errorByNamespaceStateLocked(namespaceEntry.Name()); err != nil { + return nil, err + } + transferExclusiveMaxReadLevel := int64(0) if err := s.allocateTaskIDAndTimestampLocked( namespaceEntry, @@ -739,6 +758,10 @@ func (s *ContextImpl) UpdateWorkflowExecution( return nil, err } + if err := s.errorByNamespaceStateLocked(namespaceEntry.Name()); err != nil { + return nil, err + } + transferExclusiveMaxReadLevel := int64(0) if err := s.allocateTaskIDAndTimestampLocked( namespaceEntry, @@ -810,6 +833,10 @@ func (s *ContextImpl) ConflictResolveWorkflowExecution( return nil, err } + if err := s.errorByNamespaceStateLocked(namespaceEntry.Name()); err != nil { + return nil, err + } + transferExclusiveMaxReadLevel := int64(0) if request.CurrentWorkflowMutation != nil { if err := s.allocateTaskIDAndTimestampLocked( @@ -874,6 +901,10 @@ func (s *ContextImpl) SetWorkflowExecution( return nil, err } + if err := s.errorByNamespaceStateLocked(namespaceEntry.Name()); err != nil { + return nil, err + } + transferExclusiveMaxReadLevel := int64(0) if err := s.allocateTaskIDAndTimestampLocked( namespaceEntry, @@ -1170,6 +1201,15 @@ func (s *ContextImpl) errorByState() error { } } +func (s *ContextImpl) errorByNamespaceStateLocked( + namespaceName namespace.Name, +) error { + if _, ok := s.handoverNamespaces[namespaceName]; ok { + return consts.ErrNamespaceHandover + } + return nil +} + func (s *ContextImpl) generateTaskIDLocked() (int64, error) { if err := s.updateRangeIfNeededLocked(); err != nil { return -1, err @@ -1737,6 +1777,23 @@ func (s *ContextImpl) notifyQueueProcessor() { } } +func (s *ContextImpl) updateHandoverNamespacePendingTaskID() { + s.wLock() + defer s.wUnlock() + + if s.errorByState() != nil { + // if not in acquired state, this function will be called again + // later when shard is re-acquired. + return + } + + for namespaceName, handoverInfo := range s.handoverNamespaces { + if handoverInfo.MaxReplicationTaskID == pendingMaxReplicationTaskID { + s.handoverNamespaces[namespaceName].MaxReplicationTaskID = s.immediateTaskExclusiveMaxReadLevel + } + } +} + func (s *ContextImpl) loadShardMetadata(ownershipChanged *bool) error { // Only have to do this once, we can just re-acquire the rangeid lock after that s.rLock() @@ -1852,7 +1909,7 @@ func (s *ContextImpl) GetReplicationStatus(cluster []string) (map[string]*histor } for k, v := range s.handoverNamespaces { - handoverNamespaces[k] = &historyservice.HandoverNamespaceInfo{ + handoverNamespaces[k.String()] = &historyservice.HandoverNamespaceInfo{ HandoverReplicationTaskId: v.MaxReplicationTaskID, } } @@ -1928,6 +1985,10 @@ func (s *ContextImpl) acquireShard() { engine = s.createEngine() } + // NOTE: engine is created & started before setting shard state to acquired. + // -> namespace handover callback is registered & called before shard is able to serve traffic + // -> information for handover namespace is recorded before shard can servce traffic + // -> upon shard reload, no history api or task can go through for ns in handover state err = s.transition(contextRequestAcquired{engine: engine}) if err != nil { @@ -1942,6 +2003,8 @@ func (s *ContextImpl) acquireShard() { // to trigger a load as queue max level can be updated to a newer value s.notifyQueueProcessor() + s.updateHandoverNamespacePendingTaskID() + return nil } @@ -2014,7 +2077,7 @@ func newContext( clusterMetadata: clusterMetadata, archivalMetadata: archivalMetadata, hostInfoProvider: hostInfoProvider, - handoverNamespaces: make(map[string]*namespaceHandOverInfo), + handoverNamespaces: make(map[namespace.Name]*namespaceHandOverInfo), lifecycleCtx: lifecycleCtx, lifecycleCancel: lifecycleCancel, engineFuture: future.NewFuture[Engine](), diff --git a/service/history/shard/context_mock.go b/service/history/shard/context_mock.go index 77231f335f7..a12e499abab 100644 --- a/service/history/shard/context_mock.go +++ b/service/history/shard/context_mock.go @@ -718,15 +718,15 @@ func (mr *MockContextMockRecorder) UpdateFailoverLevel(category, failoverID, lev } // UpdateHandoverNamespaces mocks base method. -func (m *MockContext) UpdateHandoverNamespaces(newNamespaces []*namespace.Namespace, maxRepTaskID int64) { +func (m *MockContext) UpdateHandoverNamespaces(newNamespaces []*namespace.Namespace) { m.ctrl.T.Helper() - m.ctrl.Call(m, "UpdateHandoverNamespaces", newNamespaces, maxRepTaskID) + m.ctrl.Call(m, "UpdateHandoverNamespaces", newNamespaces) } // UpdateHandoverNamespaces indicates an expected call of UpdateHandoverNamespaces. -func (mr *MockContextMockRecorder) UpdateHandoverNamespaces(newNamespaces, maxRepTaskID interface{}) *gomock.Call { +func (mr *MockContextMockRecorder) UpdateHandoverNamespaces(newNamespaces interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateHandoverNamespaces", reflect.TypeOf((*MockContext)(nil).UpdateHandoverNamespaces), newNamespaces, maxRepTaskID) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateHandoverNamespaces", reflect.TypeOf((*MockContext)(nil).UpdateHandoverNamespaces), newNamespaces) } // UpdateNamespaceNotificationVersion mocks base method. diff --git a/service/history/shard/context_testutil.go b/service/history/shard/context_testutil.go index f434d287148..b7b69e4789e 100644 --- a/service/history/shard/context_testutil.go +++ b/service/history/shard/context_testutil.go @@ -36,6 +36,7 @@ import ( "go.temporal.io/server/common/future" "go.temporal.io/server/common/membership" "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/resourcetest" "go.temporal.io/server/service/history/configs" @@ -100,7 +101,7 @@ func NewTestContext( maxTaskSequenceNumber: (shardInfo.RangeId + 1) << int64(config.RangeSizeBits), scheduledTaskMaxReadLevelMap: make(map[string]time.Time), remoteClusterInfos: make(map[string]*remoteClusterInfo), - handoverNamespaces: make(map[string]*namespaceHandOverInfo), + handoverNamespaces: make(map[namespace.Name]*namespaceHandOverInfo), clusterMetadata: resourceTest.ClusterMetadata, timeSource: resourceTest.TimeSource, diff --git a/service/history/timerQueueActiveTaskExecutor.go b/service/history/timerQueueActiveTaskExecutor.go index ca491c93451..54d5aa96e23 100644 --- a/service/history/timerQueueActiveTaskExecutor.go +++ b/service/history/timerQueueActiveTaskExecutor.go @@ -48,6 +48,7 @@ import ( "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/service/history/configs" + "go.temporal.io/server/service/history/consts" "go.temporal.io/server/service/history/queues" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tasks" @@ -93,12 +94,26 @@ func (t *timerQueueActiveTaskExecutor) Execute( ) ([]metrics.Tag, bool, error) { task := executable.GetTask() taskType := queues.GetActiveTimerTaskTypeTagValue(task) + namespaceTag, replicationState := getNamespaceTagAndReplicationStateByID( + t.shard.GetNamespaceRegistry(), + task.GetNamespaceID(), + ) metricsTags := []metrics.Tag{ - getNamespaceTagByID(t.shard.GetNamespaceRegistry(), task.GetNamespaceID()), + namespaceTag, metrics.TaskTypeTag(taskType), metrics.OperationTag(taskType), // for backward compatibility } + if replicationState == enumspb.REPLICATION_STATE_HANDOVER { + // TODO: exclude task types here if we believe it's safe & necessary to execute + // them during namespace handover. + // TODO: move this logic to queues.Executable when metrics tag doesn't need to + // be returned from task executor + + // namespace in handover state is still active namespace + return metricsTags, true, consts.ErrNamespaceHandover + } + var err error switch task := task.(type) { case *tasks.UserTimerTask: diff --git a/service/history/transferQueueActiveTaskExecutor.go b/service/history/transferQueueActiveTaskExecutor.go index a3851a07886..643b52c22e2 100644 --- a/service/history/transferQueueActiveTaskExecutor.go +++ b/service/history/transferQueueActiveTaskExecutor.go @@ -113,12 +113,26 @@ func (t *transferQueueActiveTaskExecutor) Execute( ) ([]metrics.Tag, bool, error) { task := executable.GetTask() taskType := queues.GetActiveTransferTaskTypeTagValue(task) + namespaceTag, replicationState := getNamespaceTagAndReplicationStateByID( + t.shard.GetNamespaceRegistry(), + task.GetNamespaceID(), + ) metricsTags := []metrics.Tag{ - getNamespaceTagByID(t.shard.GetNamespaceRegistry(), task.GetNamespaceID()), + namespaceTag, metrics.TaskTypeTag(taskType), metrics.OperationTag(taskType), // for backward compatibility } + if replicationState == enumspb.REPLICATION_STATE_HANDOVER { + // TODO: exclude task types here if we believe it's safe & necessary to execute + // them during namespace handover. + // TODO: move this logic to queues.Executable when metrics tag doesn't need to + // be returned from task executor + + // namespace in handover state is still active namespace + return metricsTags, true, consts.ErrNamespaceHandover + } + var err error switch task := task.(type) { case *tasks.ActivityTask: diff --git a/service/history/visibilityQueueTaskExecutor.go b/service/history/visibilityQueueTaskExecutor.go index 8c7666c76d9..d46d855b71c 100644 --- a/service/history/visibilityQueueTaskExecutor.go +++ b/service/history/visibilityQueueTaskExecutor.go @@ -89,12 +89,29 @@ func (t *visibilityQueueTaskExecutor) Execute( ) ([]metrics.Tag, bool, error) { task := executable.GetTask() taskType := queues.GetVisibilityTaskTypeTagValue(task) + namespaceTag, replicationState := getNamespaceTagAndReplicationStateByID( + t.shard.GetNamespaceRegistry(), + task.GetNamespaceID(), + ) metricsTags := []metrics.Tag{ - getNamespaceTagByID(t.shard.GetNamespaceRegistry(), task.GetNamespaceID()), + namespaceTag, metrics.TaskTypeTag(taskType), metrics.OperationTag(taskType), // for backward compatibility } + if replicationState == enumspb.REPLICATION_STATE_HANDOVER { + // TODO: exclude task types here if we believe it's safe & necessary to execute + // them during namespace handover. + // Visibility tasks should all be safe, but close execution task + // might do a setWorkflowExecution to clean up memo and search attributes, which + // will be blocked by shard context during ns handover + // TODO: move this logic to queues.Executable when metrics tag doesn't need to + // be returned from task executor + + // namespace in handover state is still active namespace + return metricsTags, true, consts.ErrNamespaceHandover + } + var err error switch task := task.(type) { case *tasks.StartExecutionVisibilityTask: diff --git a/service/worker/migration/activities.go b/service/worker/migration/activities.go index e2242aaebd3..2bdd43309eb 100644 --- a/service/worker/migration/activities.go +++ b/service/worker/migration/activities.go @@ -192,7 +192,7 @@ func (a *activities) checkHandoverOnce(ctx context.Context, waitRequest waitHand clusterInfo, hasClusterInfo := shard.RemoteClusters[waitRequest.RemoteCluster] handoverInfo, hasHandoverInfo := shard.HandoverNamespaces[waitRequest.Namespace] if hasClusterInfo && hasHandoverInfo { - if clusterInfo.AckedTaskId == shard.MaxReplicationTaskId || clusterInfo.AckedTaskId >= handoverInfo.HandoverReplicationTaskId { + if clusterInfo.AckedTaskId >= handoverInfo.HandoverReplicationTaskId { readyShardCount++ continue } From 134a8cfea17a41c0a1b62ffa8053ef9f16898eab Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Tue, 6 Dec 2022 11:49:51 -0800 Subject: [PATCH 2/6] only block active handover namespaces --- service/history/shard/context_impl.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/service/history/shard/context_impl.go b/service/history/shard/context_impl.go index 196cc5eaed0..565b92ee92b 100644 --- a/service/history/shard/context_impl.go +++ b/service/history/shard/context_impl.go @@ -617,9 +617,10 @@ func (s *ContextImpl) UpdateHandoverNamespaces(namespaces []*namespace.Namespace maxReplicationTaskID = pendingMaxReplicationTaskID } + currentClustername := s.GetClusterMetadata().GetCurrentClusterName() newHandoverNamespaces := make(map[namespace.Name]struct{}) for _, ns := range namespaces { - if ns.IsGlobalNamespace() && ns.ReplicationState() == enums.REPLICATION_STATE_HANDOVER { + if ns.IsGlobalNamespace() && ns.ActiveInCluster(currentClustername) && ns.ReplicationState() == enums.REPLICATION_STATE_HANDOVER { nsName := ns.Name() newHandoverNamespaces[nsName] = struct{}{} if handover, ok := s.handoverNamespaces[nsName]; ok { From b27ae1d0173add23a546ac4e25e2ffd676ff7e86 Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Tue, 6 Dec 2022 12:33:05 -0800 Subject: [PATCH 3/6] update --- service/history/shard/context_impl.go | 3 +++ service/history/timerQueueActiveTaskExecutor.go | 2 -- service/history/transferQueueActiveTaskExecutor.go | 2 -- service/history/visibilityQueueTaskExecutor.go | 2 -- 4 files changed, 3 insertions(+), 6 deletions(-) diff --git a/service/history/shard/context_impl.go b/service/history/shard/context_impl.go index 565b92ee92b..8ea6496d46b 100644 --- a/service/history/shard/context_impl.go +++ b/service/history/shard/context_impl.go @@ -620,6 +620,9 @@ func (s *ContextImpl) UpdateHandoverNamespaces(namespaces []*namespace.Namespace currentClustername := s.GetClusterMetadata().GetCurrentClusterName() newHandoverNamespaces := make(map[namespace.Name]struct{}) for _, ns := range namespaces { + // NOTE: replication state field won't be replicated and currently we only update a namespace + // to handover state from active cluster, so the second condition will always be true. Adding + // it here to be more safe in case above assumption no longer holds in the future. if ns.IsGlobalNamespace() && ns.ActiveInCluster(currentClustername) && ns.ReplicationState() == enums.REPLICATION_STATE_HANDOVER { nsName := ns.Name() newHandoverNamespaces[nsName] = struct{}{} diff --git a/service/history/timerQueueActiveTaskExecutor.go b/service/history/timerQueueActiveTaskExecutor.go index 54d5aa96e23..3359f186cb7 100644 --- a/service/history/timerQueueActiveTaskExecutor.go +++ b/service/history/timerQueueActiveTaskExecutor.go @@ -109,8 +109,6 @@ func (t *timerQueueActiveTaskExecutor) Execute( // them during namespace handover. // TODO: move this logic to queues.Executable when metrics tag doesn't need to // be returned from task executor - - // namespace in handover state is still active namespace return metricsTags, true, consts.ErrNamespaceHandover } diff --git a/service/history/transferQueueActiveTaskExecutor.go b/service/history/transferQueueActiveTaskExecutor.go index 643b52c22e2..50b54643275 100644 --- a/service/history/transferQueueActiveTaskExecutor.go +++ b/service/history/transferQueueActiveTaskExecutor.go @@ -128,8 +128,6 @@ func (t *transferQueueActiveTaskExecutor) Execute( // them during namespace handover. // TODO: move this logic to queues.Executable when metrics tag doesn't need to // be returned from task executor - - // namespace in handover state is still active namespace return metricsTags, true, consts.ErrNamespaceHandover } diff --git a/service/history/visibilityQueueTaskExecutor.go b/service/history/visibilityQueueTaskExecutor.go index d46d855b71c..4a545b150fc 100644 --- a/service/history/visibilityQueueTaskExecutor.go +++ b/service/history/visibilityQueueTaskExecutor.go @@ -107,8 +107,6 @@ func (t *visibilityQueueTaskExecutor) Execute( // will be blocked by shard context during ns handover // TODO: move this logic to queues.Executable when metrics tag doesn't need to // be returned from task executor - - // namespace in handover state is still active namespace return metricsTags, true, consts.ErrNamespaceHandover } From 34b81b725aa55f096226e520805b371dbcd5e07c Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Thu, 15 Dec 2022 11:20:58 -0800 Subject: [PATCH 4/6] do not retry handover error --- common/resource/fx.go | 10 +-- common/rpc/interceptor/namespace_validator.go | 14 ++-- .../interceptor/namespace_validator_test.go | 6 +- common/util.go | 9 +++ service/history/api/retry_util.go | 4 -- service/history/consts/const.go | 5 +- .../transferQueueActiveTaskExecutor_test.go | 2 +- service/matching/matchingEngine.go | 10 +++ service/matching/matchingEngine_test.go | 64 +++++++++++++++++++ 9 files changed, 96 insertions(+), 28 deletions(-) diff --git a/common/resource/fx.go b/common/resource/fx.go index e05c04fb9a5..1c091a8f89e 100644 --- a/common/resource/fx.go +++ b/common/resource/fx.go @@ -64,7 +64,6 @@ import ( "go.temporal.io/server/common/ringpop" "go.temporal.io/server/common/rpc" "go.temporal.io/server/common/rpc/encryption" - "go.temporal.io/server/common/rpc/interceptor" "go.temporal.io/server/common/sdk" "go.temporal.io/server/common/searchattribute" "go.temporal.io/server/common/telemetry" @@ -350,14 +349,7 @@ func HistoryClientProvider(clientBean client.Bean) historyservice.HistoryService historyClient := history.NewRetryableClient( historyRawClient, common.CreateHistoryClientRetryPolicy(), - func(err error) bool { - if err.Error() == interceptor.ErrNamespaceHandover.Error() { - // prevent retrying namespace handover unavailable error - // in when calling history service - return false - } - return common.IsServiceClientTransientError(err) - }, + common.IsServiceClientTransientError, ) return historyClient } diff --git a/common/rpc/interceptor/namespace_validator.go b/common/rpc/interceptor/namespace_validator.go index 102f64eec1d..dd30af1b70f 100644 --- a/common/rpc/interceptor/namespace_validator.go +++ b/common/rpc/interceptor/namespace_validator.go @@ -26,7 +26,6 @@ package interceptor import ( "context" - "fmt" enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" @@ -49,9 +48,8 @@ type ( ) var ( - ErrNamespaceNotSet = serviceerror.NewInvalidArgument("Namespace not set on request.") + errNamespaceNotSet = serviceerror.NewInvalidArgument("Namespace not set on request.") errNamespaceTooLong = serviceerror.NewInvalidArgument("Namespace length exceeds limit.") - ErrNamespaceHandover = serviceerror.NewUnavailable(fmt.Sprintf("Namespace replication in %s state.", enumspb.REPLICATION_STATE_HANDOVER.String())) errTaskTokenNotSet = serviceerror.NewInvalidArgument("Task token not set on request.") errTaskTokenNamespaceMismatch = serviceerror.NewInvalidArgument("Operation requested with a token from a different namespace.") @@ -170,20 +168,20 @@ func (ni *NamespaceValidatorInterceptor) extractNamespaceFromRequest(req interfa // Special case for DescribeNamespace API which should read namespace directly from database. // Therefore, it must bypass namespace registry and validator. if request.GetId() == "" && namespaceName.IsEmpty() { - return nil, ErrNamespaceNotSet + return nil, errNamespaceNotSet } return nil, nil case *workflowservice.RegisterNamespaceRequest: // Special case for RegisterNamespace API. `namespaceName` is name of namespace that about to be registered. // There is no namespace entry for it, therefore, it must bypass namespace registry and validator. if namespaceName.IsEmpty() { - return nil, ErrNamespaceNotSet + return nil, errNamespaceNotSet } return nil, nil default: // All other APIs. if namespaceName.IsEmpty() { - return nil, ErrNamespaceNotSet + return nil, errNamespaceNotSet } return ni.namespaceRegistry.GetNamespace(namespaceName) } @@ -215,7 +213,7 @@ func (ni *NamespaceValidatorInterceptor) extractNamespaceFromTaskToken(req inter } if namespaceID.IsEmpty() { - return nil, ErrNamespaceNotSet + return nil, errNamespaceNotSet } return ni.namespaceRegistry.GetNamespaceByID(namespaceID) } @@ -266,5 +264,5 @@ func (ni *NamespaceValidatorInterceptor) checkReplicationState(namespaceEntry *n return nil } - return ErrNamespaceHandover + return common.ErrNamespaceHandover } diff --git a/common/rpc/interceptor/namespace_validator_test.go b/common/rpc/interceptor/namespace_validator_test.go index 95122d35935..6e976076702 100644 --- a/common/rpc/interceptor/namespace_validator_test.go +++ b/common/rpc/interceptor/namespace_validator_test.go @@ -200,14 +200,14 @@ func (s *namespaceValidatorSuite) Test_StateValidationIntercept_StatusFromNamesp { state: enumspb.NAMESPACE_STATE_REGISTERED, replicationState: enumspb.REPLICATION_STATE_HANDOVER, - expectedErr: ErrNamespaceHandover, + expectedErr: common.ErrNamespaceHandover, method: "/temporal/StartWorkflowExecution", req: &workflowservice.StartWorkflowExecutionRequest{Namespace: "test-namespace"}, }, // DescribeNamespace { state: enumspb.NAMESPACE_STATE_UNSPECIFIED, - expectedErr: ErrNamespaceNotSet, + expectedErr: errNamespaceNotSet, method: "/temporal/DescribeNamespace", req: &workflowservice.DescribeNamespaceRequest{}, }, @@ -232,7 +232,7 @@ func (s *namespaceValidatorSuite) Test_StateValidationIntercept_StatusFromNamesp }, { state: enumspb.NAMESPACE_STATE_UNSPECIFIED, - expectedErr: ErrNamespaceNotSet, + expectedErr: errNamespaceNotSet, method: "/temporal/RegisterNamespace", req: &workflowservice.RegisterNamespaceRequest{}, }, diff --git a/common/util.go b/common/util.go index 9d337a563a1..aa29ed7b778 100644 --- a/common/util.go +++ b/common/util.go @@ -150,6 +150,11 @@ var ( ErrContextTimeoutNotSet = serviceerror.NewInvalidArgument("Context timeout is not set.") ) +var ( + // ErrNamespaceHandover is error indicating namespace is in handover state and cannot process request. + ErrNamespaceHandover = serviceerror.NewUnavailable(fmt.Sprintf("Namespace replication in %s state.", enumspb.REPLICATION_STATE_HANDOVER.String())) +) + // AwaitWaitGroup calls Wait on the given wait // Returns true if the Wait() call succeeded before the timeout // Returns false if the Wait() did not return before the timeout @@ -336,6 +341,10 @@ func IsServiceClientTransientError(err error) bool { } func IsServiceHandlerRetryableError(err error) bool { + if err.Error() == ErrNamespaceHandover.Error() { + return false + } + switch err.(type) { case *serviceerror.Internal, *serviceerror.Unavailable: diff --git a/service/history/api/retry_util.go b/service/history/api/retry_util.go index 97c3daa9b09..90d3dfdc0f8 100644 --- a/service/history/api/retry_util.go +++ b/service/history/api/retry_util.go @@ -35,10 +35,6 @@ import ( ) func IsRetryableError(err error) bool { - if err == consts.ErrNamespaceHandover { - return false - } - return err == consts.ErrStaleState || err == consts.ErrLocateCurrentWorkflowExecution || err == consts.ErrBufferedQueryCleared || diff --git a/service/history/consts/const.go b/service/history/consts/const.go index 7d6e141baed..509861f2c18 100644 --- a/service/history/consts/const.go +++ b/service/history/consts/const.go @@ -31,7 +31,6 @@ import ( "go.temporal.io/api/serviceerror" "go.temporal.io/server/common" - "go.temporal.io/server/common/rpc/interceptor" ) const ( @@ -91,8 +90,8 @@ var ( ErrWorkflowNotReady = serviceerror.NewWorkflowNotReady("Workflow state is not ready to handle the request.") // ErrWorkflowTaskNotScheduled is error indicating workflow task is not scheduled yet. ErrWorkflowTaskNotScheduled = serviceerror.NewWorkflowNotReady("Workflow task is not scheduled yet.") - // ErrNamespaceHandover is error in dicating namespace is in handover state and cannot process request. - ErrNamespaceHandover = interceptor.ErrNamespaceHandover + // ErrNamespaceHandover is error indicating namespace is in handover state and cannot process request. + ErrNamespaceHandover = common.ErrNamespaceHandover // FailedWorkflowStatuses is a set of failed workflow close states, used for start workflow policy // for start workflow execution API diff --git a/service/history/transferQueueActiveTaskExecutor_test.go b/service/history/transferQueueActiveTaskExecutor_test.go index f4b92a680be..6223a99a383 100644 --- a/service/history/transferQueueActiveTaskExecutor_test.go +++ b/service/history/transferQueueActiveTaskExecutor_test.go @@ -2577,7 +2577,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestPendingCloseExecutionTasks() mockShard.EXPECT().GetClusterMetadata().Return(mockClusterMetadata).AnyTimes() mockMutableState.EXPECT().GetLastWriteVersion().Return(tests.Version, nil).AnyTimes() mockNamespaceRegistry := namespace.NewMockRegistry(ctrl) - mockNamespaceRegistry.EXPECT().GetNamespaceName(gomock.Any()).Return(namespaceEntry.Name(), nil) + mockNamespaceRegistry.EXPECT().GetNamespaceByID(gomock.Any()).Return(namespaceEntry, nil) mockShard.EXPECT().GetNamespaceRegistry().Return(mockNamespaceRegistry) if c.MultiCursorQueue { var highWatermarkTaskId int64 diff --git a/service/matching/matchingEngine.go b/service/matching/matchingEngine.go index e8e42af6ce9..f3b779d8b21 100644 --- a/service/matching/matchingEngine.go +++ b/service/matching/matchingEngine.go @@ -437,6 +437,11 @@ pollLoop: task.finish(nil) default: task.finish(err) + if err.Error() == common.ErrNamespaceHandover.Error() { + // do not keep polling new tasks when namespace is in handover state + // as record start request will be rejected by history service + return nil, err + } } continue pollLoop @@ -515,6 +520,11 @@ pollLoop: task.finish(nil) default: task.finish(err) + if err.Error() == common.ErrNamespaceHandover.Error() { + // do not keep polling new tasks when namespace is in handover state + // as record start request will be rejected by history service + return nil, err + } } continue pollLoop diff --git a/service/matching/matchingEngine_test.go b/service/matching/matchingEngine_test.go index 4c073014a45..1ff0e990639 100644 --- a/service/matching/matchingEngine_test.go +++ b/service/matching/matchingEngine_test.go @@ -478,6 +478,70 @@ func (s *matchingEngineSuite) PollForTasksEmptyResultTest(callContext context.Co s.EqualValues(1, s.taskManager.getTaskQueueManager(tlID).RangeID()) } +func (s *matchingEngineSuite) TestPollWorkflowTaskQueues_NamespaceHandover() { + namespaceID := namespace.ID(uuid.New()) + taskQueue := &taskqueuepb.TaskQueue{Name: "taskQueue", Kind: enumspb.TASK_QUEUE_KIND_NORMAL} + + addRequest := matchingservice.AddWorkflowTaskRequest{ + NamespaceId: namespaceID.String(), + Execution: &commonpb.WorkflowExecution{WorkflowId: "workflowID", RunId: uuid.NewRandom().String()}, + ScheduledEventId: int64(0), + TaskQueue: taskQueue, + ScheduleToStartTimeout: timestamp.DurationFromSeconds(100), + } + + // add multiple workflow tasks, but matching should not keeping polling new tasks + // upon getting namespace handover error when recording start for the first task + _, err := s.matchingEngine.AddWorkflowTask(s.handlerContext, &addRequest) + s.NoError(err) + _, err = s.matchingEngine.AddWorkflowTask(s.handlerContext, &addRequest) + s.NoError(err) + + s.mockHistoryClient.EXPECT().RecordWorkflowTaskStarted(gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil, common.ErrNamespaceHandover).Times(1) + resp, err := s.matchingEngine.PollWorkflowTaskQueue(s.handlerContext, &matchingservice.PollWorkflowTaskQueueRequest{ + NamespaceId: namespaceID.String(), + PollRequest: &workflowservice.PollWorkflowTaskQueueRequest{ + TaskQueue: taskQueue, + Identity: "identity", + }, + }) + s.Nil(resp) + s.Equal(common.ErrNamespaceHandover.Error(), err.Error()) +} + +func (s *matchingEngineSuite) TestPollActivityTaskQueues_NamespaceHandover() { + namespaceID := namespace.ID(uuid.New()) + taskQueue := &taskqueuepb.TaskQueue{Name: "taskQueue", Kind: enumspb.TASK_QUEUE_KIND_NORMAL} + + addRequest := matchingservice.AddActivityTaskRequest{ + NamespaceId: namespaceID.String(), + Execution: &commonpb.WorkflowExecution{WorkflowId: "workflowID", RunId: uuid.NewRandom().String()}, + ScheduledEventId: int64(5), + TaskQueue: taskQueue, + ScheduleToStartTimeout: timestamp.DurationFromSeconds(100), + } + + // add multiple activity tasks, but matching should not keeping polling new tasks + // upon getting namespace handover error when recording start for the first task + _, err := s.matchingEngine.AddActivityTask(s.handlerContext, &addRequest) + s.NoError(err) + _, err = s.matchingEngine.AddActivityTask(s.handlerContext, &addRequest) + s.NoError(err) + + s.mockHistoryClient.EXPECT().RecordActivityTaskStarted(gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil, common.ErrNamespaceHandover).Times(1) + resp, err := s.matchingEngine.PollActivityTaskQueue(s.handlerContext, &matchingservice.PollActivityTaskQueueRequest{ + NamespaceId: namespaceID.String(), + PollRequest: &workflowservice.PollActivityTaskQueueRequest{ + TaskQueue: taskQueue, + Identity: "identity", + }, + }) + s.Nil(resp) + s.Equal(common.ErrNamespaceHandover.Error(), err.Error()) +} + func (s *matchingEngineSuite) TestAddActivityTasks() { s.AddTasksTest(enumspb.TASK_QUEUE_TYPE_ACTIVITY, false) } From d219c012a8876d0adf979e5a79602e4458eadf8c Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Thu, 15 Dec 2022 16:34:57 -0800 Subject: [PATCH 5/6] notify replication ack manager --- service/history/shard/context_impl.go | 38 ++++++++++++++++++++++++--- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/service/history/shard/context_impl.go b/service/history/shard/context_impl.go index bb69fe36c5a..3aa4e99444a 100644 --- a/service/history/shard/context_impl.go +++ b/service/history/shard/context_impl.go @@ -609,9 +609,8 @@ func (s *ContextImpl) UpdateNamespaceNotificationVersion(namespaceNotificationVe func (s *ContextImpl) UpdateHandoverNamespaces(namespaces []*namespace.Namespace) { s.wLock() - defer s.wUnlock() - maxReplicationTaskID := s.immediateTaskExclusiveMaxReadLevel + maxReplicationTaskID := s.immediateTaskExclusiveMaxReadLevel - 1 if s.errorByState() != nil { // if shard state is not acquired, we don't know that's the max taskID // as there might be in-flight requests @@ -646,6 +645,9 @@ func (s *ContextImpl) UpdateHandoverNamespaces(namespaces []*namespace.Namespace delete(s.handoverNamespaces, k) } } + s.wUnlock() + + s.notifyReplicationQueueProcessor(maxReplicationTaskID) } func (s *ContextImpl) AddTasks( @@ -1786,19 +1788,47 @@ func (s *ContextImpl) notifyQueueProcessor() { func (s *ContextImpl) updateHandoverNamespacePendingTaskID() { s.wLock() - defer s.wUnlock() if s.errorByState() != nil { // if not in acquired state, this function will be called again // later when shard is re-acquired. + s.wUnlock() return } + maxReplicationTaskID := s.immediateTaskExclusiveMaxReadLevel - 1 for namespaceName, handoverInfo := range s.handoverNamespaces { if handoverInfo.MaxReplicationTaskID == pendingMaxReplicationTaskID { - s.handoverNamespaces[namespaceName].MaxReplicationTaskID = s.immediateTaskExclusiveMaxReadLevel + s.handoverNamespaces[namespaceName].MaxReplicationTaskID = maxReplicationTaskID } } + s.wUnlock() + + s.notifyReplicationQueueProcessor(maxReplicationTaskID) +} + +func (s *ContextImpl) notifyReplicationQueueProcessor(taskID int64) { + // Replication ack level won't exceed the max taskID it received via task notification. + // Since here we want it's ack level to advance to at least the input taskID, we need to + // trigger an fake notification. + + cancelledCtx, cancel := context.WithCancel(context.Background()) + cancel() + + engine, err := s.engineFuture.Get(cancelledCtx) + if err != nil { + s.contextTaggedLogger.Warn("tried to notify replication queue processor when engine is not ready") + return + } + + fakeReplicationTask := tasks.NewFakeTask(definition.WorkflowKey{}, tasks.CategoryReplication, tasks.MinimumKey.FireTime) + fakeReplicationTask.SetTaskID(taskID) + + // cluster name parameter doesn't apply to replication processor + // also this parameter will be removed now that we are using multi-cursor impl. + engine.NotifyNewTasks("", map[tasks.Category][]tasks.Task{ + tasks.CategoryReplication: {fakeReplicationTask}, + }) } func (s *ContextImpl) loadShardMetadata(ownershipChanged *bool) error { From c808a2febe4c66fd904379813a2dd7f03dc06a47 Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Thu, 15 Dec 2022 17:03:55 -0800 Subject: [PATCH 6/6] fix tests --- service/history/shard/controller_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/service/history/shard/controller_test.go b/service/history/shard/controller_test.go index 4e33415e369..3054469131d 100644 --- a/service/history/shard/controller_test.go +++ b/service/history/shard/controller_test.go @@ -164,7 +164,7 @@ func (s *controllerSuite) TestAcquireShardSuccess() { myShards = append(myShards, shardID) s.mockHistoryEngine.EXPECT().Start().Return() // notification step is done after engine is created, so may not be called when test finishes - s.mockHistoryEngine.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any()).MaxTimes(1) + s.mockHistoryEngine.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any()).MaxTimes(2) s.mockServiceResolver.EXPECT().Lookup(convert.Int32ToString(shardID)).Return(s.hostInfo, nil).Times(2) s.mockEngineFactory.EXPECT().CreateEngine(gomock.Any()).Return(s.mockHistoryEngine) s.mockShardManager.EXPECT().GetOrCreateShard(gomock.Any(), getOrCreateShardRequestMatcher(shardID)).Return( @@ -234,7 +234,7 @@ func (s *controllerSuite) TestAcquireShardsConcurrently() { myShards = append(myShards, shardID) s.mockHistoryEngine.EXPECT().Start().Return() // notification step is done after engine is created, so may not be called when test finishes - s.mockHistoryEngine.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any()).MaxTimes(1) + s.mockHistoryEngine.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any()).MaxTimes(2) s.mockServiceResolver.EXPECT().Lookup(convert.Int32ToString(shardID)).Return(s.hostInfo, nil).Times(2) s.mockEngineFactory.EXPECT().CreateEngine(gomock.Any()).Return(s.mockHistoryEngine) s.mockShardManager.EXPECT().GetOrCreateShard(gomock.Any(), getOrCreateShardRequestMatcher(shardID)).Return( @@ -313,7 +313,7 @@ func (s *controllerSuite) TestAcquireShardRenewSuccess() { for shardID := int32(1); shardID <= numShards; shardID++ { s.mockHistoryEngine.EXPECT().Start().Return() // notification step is done after engine is created, so may not be called when test finishes - s.mockHistoryEngine.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any()).MaxTimes(1) + s.mockHistoryEngine.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any()).MaxTimes(2) s.mockServiceResolver.EXPECT().Lookup(convert.Int32ToString(shardID)).Return(s.hostInfo, nil).Times(2) s.mockEngineFactory.EXPECT().CreateEngine(gomock.Any()).Return(s.mockHistoryEngine) s.mockShardManager.EXPECT().GetOrCreateShard(gomock.Any(), getOrCreateShardRequestMatcher(shardID)).Return( @@ -377,7 +377,7 @@ func (s *controllerSuite) TestAcquireShardRenewLookupFailed() { for shardID := int32(1); shardID <= numShards; shardID++ { s.mockHistoryEngine.EXPECT().Start().Return() // notification step is done after engine is created, so may not be called when test finishes - s.mockHistoryEngine.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any()).MaxTimes(1) + s.mockHistoryEngine.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any()).MaxTimes(2) s.mockServiceResolver.EXPECT().Lookup(convert.Int32ToString(shardID)).Return(s.hostInfo, nil).Times(2) s.mockEngineFactory.EXPECT().CreateEngine(gomock.Any()).Return(s.mockHistoryEngine) s.mockShardManager.EXPECT().GetOrCreateShard(gomock.Any(), getOrCreateShardRequestMatcher(shardID)).Return( @@ -730,7 +730,7 @@ func (s *controllerSuite) TestShardControllerFuzz() { mockEngine := NewMockEngine(disconnectedMockController) status := new(int32) // notification step is done after engine is created, so may not be called when test finishes - mockEngine.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any()).MaxTimes(1) + mockEngine.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any()).MaxTimes(2) mockEngine.EXPECT().Start().Do(func() { if !atomic.CompareAndSwapInt32(status, common.DaemonStatusInitialized, common.DaemonStatusStarted) { return @@ -850,7 +850,7 @@ func (s *controllerSuite) setupMocksForAcquireShard( // s.mockResource.ExecutionMgr.On("Close").Return() mockEngine.EXPECT().Start().MinTimes(minTimes) // notification step is done after engine is created, so may not be called when test finishes - mockEngine.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any()).MaxTimes(1) + mockEngine.EXPECT().NotifyNewTasks(gomock.Any(), gomock.Any()).MaxTimes(2) s.mockServiceResolver.EXPECT().Lookup(convert.Int32ToString(shardID)).Return(s.hostInfo, nil).Times(2).MinTimes(minTimes) s.mockEngineFactory.EXPECT().CreateEngine(contextMatcher(shardID)).Return(mockEngine).MinTimes(minTimes) s.mockShardManager.EXPECT().GetOrCreateShard(gomock.Any(), getOrCreateShardRequestMatcher(shardID)).Return(