Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean namespace handover #3692

Merged
merged 8 commits into from
Dec 21, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1418,6 +1418,7 @@ var (
TaskWorkflowBusyCounter = NewCounterDef("task_errors_workflow_busy")
TaskNotActiveCounter = NewCounterDef("task_errors_not_active_counter")
TaskLimitExceededCounter = NewCounterDef("task_errors_limit_exceeded_counter")
TaskNamespaceHandoverCounter = NewCounterDef("task_errors_namespace_handover")
yiminc marked this conversation as resolved.
Show resolved Hide resolved
TaskScheduleToStartLatency = NewTimerDef("task_schedule_to_start_latency")
TransferTaskMissingEventCounter = NewCounterDef("transfer_task_missing_event_counter")
TaskBatchCompleteCounter = NewCounterDef("task_batch_complete_counter")
Expand Down
10 changes: 9 additions & 1 deletion common/resource/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import (
"go.temporal.io/server/common/ringpop"
"go.temporal.io/server/common/rpc"
"go.temporal.io/server/common/rpc/encryption"
"go.temporal.io/server/common/rpc/interceptor"
"go.temporal.io/server/common/sdk"
"go.temporal.io/server/common/searchattribute"
"go.temporal.io/server/common/telemetry"
Expand Down Expand Up @@ -349,7 +350,14 @@ func HistoryClientProvider(clientBean client.Bean) historyservice.HistoryService
historyClient := history.NewRetryableClient(
historyRawClient,
common.CreateHistoryClientRetryPolicy(),
common.IsServiceClientTransientError,
func(err error) bool {
if err.Error() == interceptor.ErrNamespaceHandover.Error() {
yycptt marked this conversation as resolved.
Show resolved Hide resolved
// prevent retrying namespace handover unavailable error
// in when calling history service
return false
}
return common.IsServiceClientTransientError(err)
},
)
return historyClient
}
Expand Down
4 changes: 2 additions & 2 deletions common/rpc/interceptor/namespace_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type (
var (
ErrNamespaceNotSet = serviceerror.NewInvalidArgument("Namespace not set on request.")
errNamespaceTooLong = serviceerror.NewInvalidArgument("Namespace length exceeds limit.")
errNamespaceHandover = serviceerror.NewUnavailable(fmt.Sprintf("Namespace replication in %s state.", enumspb.REPLICATION_STATE_HANDOVER.String()))
ErrNamespaceHandover = serviceerror.NewUnavailable(fmt.Sprintf("Namespace replication in %s state.", enumspb.REPLICATION_STATE_HANDOVER.String()))
yycptt marked this conversation as resolved.
Show resolved Hide resolved
errTaskTokenNotSet = serviceerror.NewInvalidArgument("Task token not set on request.")
errTaskTokenNamespaceMismatch = serviceerror.NewInvalidArgument("Operation requested with a token from a different namespace.")

Expand Down Expand Up @@ -266,5 +266,5 @@ func (ni *NamespaceValidatorInterceptor) checkReplicationState(namespaceEntry *n
return nil
}

return errNamespaceHandover
return ErrNamespaceHandover
}
2 changes: 1 addition & 1 deletion common/rpc/interceptor/namespace_validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (s *namespaceValidatorSuite) Test_StateValidationIntercept_StatusFromNamesp
{
state: enumspb.NAMESPACE_STATE_REGISTERED,
replicationState: enumspb.REPLICATION_STATE_HANDOVER,
expectedErr: errNamespaceHandover,
expectedErr: ErrNamespaceHandover,
method: "/temporal/StartWorkflowExecution",
req: &workflowservice.StartWorkflowExecutionRequest{Namespace: "test-namespace"},
},
Expand Down
4 changes: 4 additions & 0 deletions service/history/api/retry_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ import (
)

func IsRetryableError(err error) bool {
if err == consts.ErrNamespaceHandover {
return false
}

return err == consts.ErrStaleState ||
err == consts.ErrLocateCurrentWorkflowExecution ||
err == consts.ErrBufferedQueryCleared ||
Expand Down
3 changes: 3 additions & 0 deletions service/history/consts/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"go.temporal.io/api/serviceerror"

"go.temporal.io/server/common"
"go.temporal.io/server/common/rpc/interceptor"
)

const (
Expand Down Expand Up @@ -90,6 +91,8 @@ var (
ErrWorkflowNotReady = serviceerror.NewWorkflowNotReady("Workflow state is not ready to handle the request.")
// ErrWorkflowTaskNotScheduled is error indicating workflow task is not scheduled yet.
ErrWorkflowTaskNotScheduled = serviceerror.NewWorkflowNotReady("Workflow task is not scheduled yet.")
// ErrNamespaceHandover is error in dicating namespace is in handover state and cannot process request.
ErrNamespaceHandover = interceptor.ErrNamespaceHandover
yiminc marked this conversation as resolved.
Show resolved Hide resolved

// FailedWorkflowStatuses is a set of failed workflow close states, used for start workflow policy
// for start workflow execution API
Expand Down
3 changes: 1 addition & 2 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,7 @@ func (e *historyEngineImpl) registerNamespaceFailoverCallback() {
}

if e.shard.GetClusterMetadata().IsGlobalNamespaceEnabled() {
maxTaskID, _ := e.replicationAckMgr.GetMaxTaskInfo()
e.shard.UpdateHandoverNamespaces(nextNamespaces, maxTaskID)
e.shard.UpdateHandoverNamespaces(nextNamespaces)
}

newNotificationVersion := nextNamespaces[len(nextNamespaces)-1].NotificationVersion() + 1
Expand Down
13 changes: 13 additions & 0 deletions service/history/nDCTaskUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package history
import (
"context"

enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"

enumsspb "go.temporal.io/server/api/enums/v1"
Expand Down Expand Up @@ -210,3 +211,15 @@ func getNamespaceTagByID(

return metrics.NamespaceTag(namespaceName.String())
}

func getNamespaceTagAndReplicationStateByID(
registry namespace.Registry,
namespaceID string,
) (metrics.Tag, enumspb.ReplicationState) {
namespace, err := registry.GetNamespaceByID(namespace.ID(namespaceID))
if err != nil {
return metrics.NamespaceUnknownTag(), enumspb.REPLICATION_STATE_UNSPECIFIED
}

return metrics.NamespaceTag(namespace.Name().String()), namespace.ReplicationState()
}
23 changes: 18 additions & 5 deletions service/history/queues/executable.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,12 @@ func (e *executableImpl) HandleErr(err error) (retErr error) {
return nil
}

if err.Error() == consts.ErrNamespaceHandover.Error() {
e.taggedMetricsHandler.Counter(metrics.TaskNamespaceHandoverCounter.GetMetricName()).Record(1)
err = consts.ErrNamespaceHandover
return err
}

if _, ok := err.(*serviceerror.NamespaceNotActive); ok {
// TODO remove this error check special case after multi-cursor is enabled by default,
// since the new task life cycle will not give up until task processed / verified
Expand Down Expand Up @@ -325,7 +331,10 @@ func (e *executableImpl) IsRetryableError(err error) bool {
// ErrTaskRetry means mutable state is not ready for standby task processing
// there's no point for retrying the task immediately which will hold the worker corouinte
// TODO: change ErrTaskRetry to a better name
return err != consts.ErrTaskRetry && err != consts.ErrWorkflowBusy && err != consts.ErrDependencyTaskNotCompleted
return err != consts.ErrTaskRetry &&
err != consts.ErrWorkflowBusy &&
err != consts.ErrDependencyTaskNotCompleted &&
err != consts.ErrNamespaceHandover
}

func (e *executableImpl) RetryPolicy() backoff.RetryPolicy {
Expand Down Expand Up @@ -449,7 +458,9 @@ func (e *executableImpl) shouldResubmitOnNack(attempt int, err error) bool {
return false
}

return err != consts.ErrTaskRetry && err != consts.ErrDependencyTaskNotCompleted
return err != consts.ErrTaskRetry &&
err != consts.ErrDependencyTaskNotCompleted &&
err != consts.ErrNamespaceHandover
}

func (e *executableImpl) rescheduleTime(
Expand All @@ -459,12 +470,14 @@ func (e *executableImpl) rescheduleTime(
// elapsedTime (the first parameter in ComputeNextDelay) is not relevant here
// since reschedule policy has no expiration interval.

if err == consts.ErrTaskRetry {
if err == consts.ErrTaskRetry || err == consts.ErrNamespaceHandover {
// using a different reschedule policy to slow down retry
// as the error means mutable state is not ready to handle the task,
// as the error means mutable state or namespace is not ready to handle the task,
// need to wait for replication.
return e.timeSource.Now().Add(taskNotReadyReschedulePolicy.ComputeNextDelay(0, attempt))
} else if err == consts.ErrDependencyTaskNotCompleted {
}

if err == consts.ErrDependencyTaskNotCompleted {
return e.timeSource.Now().Add(dependencyTaskNotCompletedReschedulePolicy.ComputeNextDelay(0, attempt))
}

Expand Down
2 changes: 1 addition & 1 deletion service/history/shard/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ type (

GetNamespaceNotificationVersion() int64
UpdateNamespaceNotificationVersion(namespaceNotificationVersion int64) error
UpdateHandoverNamespaces(newNamespaces []*namespace.Namespace, maxRepTaskID int64)
UpdateHandoverNamespaces(newNamespaces []*namespace.Namespace)

AppendHistoryEvents(ctx context.Context, request *persistence.AppendHistoryNodesRequest, namespaceID namespace.ID, execution commonpb.WorkflowExecution) (int, error)

Expand Down
85 changes: 76 additions & 9 deletions service/history/shard/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"context"
"errors"
"fmt"
"math"
"sync"
"time"

Expand Down Expand Up @@ -63,6 +64,7 @@ import (
"go.temporal.io/server/common/searchattribute"
"go.temporal.io/server/common/util"
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/consts"
"go.temporal.io/server/service/history/events"
"go.temporal.io/server/service/history/tasks"
"go.temporal.io/server/service/history/vclock"
Expand All @@ -80,6 +82,8 @@ const (

const (
shardIOTimeout = 5 * time.Second

pendingMaxReplicationTaskID = math.MaxInt64
)

type (
Expand Down Expand Up @@ -130,7 +134,7 @@ type (

// exist only in memory
remoteClusterInfos map[string]*remoteClusterInfo
handoverNamespaces map[string]*namespaceHandOverInfo // keyed on namespace name
handoverNamespaces map[namespace.Name]*namespaceHandOverInfo // keyed on namespace name
}

remoteClusterInfo struct {
Expand Down Expand Up @@ -602,24 +606,35 @@ func (s *ContextImpl) UpdateNamespaceNotificationVersion(namespaceNotificationVe
return nil
}

func (s *ContextImpl) UpdateHandoverNamespaces(namespaces []*namespace.Namespace, maxRepTaskID int64) {
func (s *ContextImpl) UpdateHandoverNamespaces(namespaces []*namespace.Namespace) {
s.wLock()
defer s.wUnlock()

newHandoverNamespaces := make(map[string]struct{})
maxReplicationTaskID := s.immediateTaskExclusiveMaxReadLevel
yiminc marked this conversation as resolved.
Show resolved Hide resolved
if s.errorByState() != nil {
// if shard state is not acquired, we don't know that's the max taskID
// as there might be in-flight requests
maxReplicationTaskID = pendingMaxReplicationTaskID
}

currentClustername := s.GetClusterMetadata().GetCurrentClusterName()
newHandoverNamespaces := make(map[namespace.Name]struct{})
for _, ns := range namespaces {
if ns.IsGlobalNamespace() && ns.ReplicationState() == enums.REPLICATION_STATE_HANDOVER {
nsName := ns.Name().String()
// NOTE: replication state field won't be replicated and currently we only update a namespace
// to handover state from active cluster, so the second condition will always be true. Adding
// it here to be more safe in case above assumption no longer holds in the future.
if ns.IsGlobalNamespace() && ns.ActiveInCluster(currentClustername) && ns.ReplicationState() == enums.REPLICATION_STATE_HANDOVER {
nsName := ns.Name()
newHandoverNamespaces[nsName] = struct{}{}
if handover, ok := s.handoverNamespaces[nsName]; ok {
if handover.NotificationVersion < ns.NotificationVersion() {
handover.NotificationVersion = ns.NotificationVersion()
handover.MaxReplicationTaskID = maxRepTaskID
handover.MaxReplicationTaskID = maxReplicationTaskID
}
} else {
s.handoverNamespaces[nsName] = &namespaceHandOverInfo{
NotificationVersion: ns.NotificationVersion(),
MaxReplicationTaskID: maxRepTaskID,
MaxReplicationTaskID: maxReplicationTaskID,
}
}
}
Expand Down Expand Up @@ -659,6 +674,10 @@ func (s *ContextImpl) AddTasks(
s.wUnlock()
return err
}
if err := s.errorByNamespaceStateLocked(namespaceEntry.Name()); err != nil {
s.wUnlock()
return err
}
err = s.addTasksLocked(ctx, request, namespaceEntry)
s.wUnlock()

Expand Down Expand Up @@ -694,6 +713,10 @@ func (s *ContextImpl) CreateWorkflowExecution(
return nil, err
}

if err := s.errorByNamespaceStateLocked(namespaceEntry.Name()); err != nil {
return nil, err
}

transferExclusiveMaxReadLevel := int64(0)
if err := s.allocateTaskIDAndTimestampLocked(
namespaceEntry,
Expand Down Expand Up @@ -739,6 +762,10 @@ func (s *ContextImpl) UpdateWorkflowExecution(
return nil, err
}

if err := s.errorByNamespaceStateLocked(namespaceEntry.Name()); err != nil {
return nil, err
}

transferExclusiveMaxReadLevel := int64(0)
if err := s.allocateTaskIDAndTimestampLocked(
namespaceEntry,
Expand Down Expand Up @@ -810,6 +837,10 @@ func (s *ContextImpl) ConflictResolveWorkflowExecution(
return nil, err
}

if err := s.errorByNamespaceStateLocked(namespaceEntry.Name()); err != nil {
return nil, err
}

transferExclusiveMaxReadLevel := int64(0)
if request.CurrentWorkflowMutation != nil {
if err := s.allocateTaskIDAndTimestampLocked(
Expand Down Expand Up @@ -874,6 +905,10 @@ func (s *ContextImpl) SetWorkflowExecution(
return nil, err
}

if err := s.errorByNamespaceStateLocked(namespaceEntry.Name()); err != nil {
return nil, err
}

transferExclusiveMaxReadLevel := int64(0)
if err := s.allocateTaskIDAndTimestampLocked(
namespaceEntry,
Expand Down Expand Up @@ -1170,6 +1205,15 @@ func (s *ContextImpl) errorByState() error {
}
}

func (s *ContextImpl) errorByNamespaceStateLocked(
namespaceName namespace.Name,
) error {
if _, ok := s.handoverNamespaces[namespaceName]; ok {
return consts.ErrNamespaceHandover
}
return nil
}

func (s *ContextImpl) generateTaskIDLocked() (int64, error) {
if err := s.updateRangeIfNeededLocked(); err != nil {
return -1, err
Expand Down Expand Up @@ -1737,6 +1781,23 @@ func (s *ContextImpl) notifyQueueProcessor() {
}
}

func (s *ContextImpl) updateHandoverNamespacePendingTaskID() {
s.wLock()
defer s.wUnlock()

if s.errorByState() != nil {
// if not in acquired state, this function will be called again
// later when shard is re-acquired.
return
}

for namespaceName, handoverInfo := range s.handoverNamespaces {
if handoverInfo.MaxReplicationTaskID == pendingMaxReplicationTaskID {
s.handoverNamespaces[namespaceName].MaxReplicationTaskID = s.immediateTaskExclusiveMaxReadLevel
}
}
}

func (s *ContextImpl) loadShardMetadata(ownershipChanged *bool) error {
// Only have to do this once, we can just re-acquire the rangeid lock after that
s.rLock()
Expand Down Expand Up @@ -1852,7 +1913,7 @@ func (s *ContextImpl) GetReplicationStatus(cluster []string) (map[string]*histor
}

for k, v := range s.handoverNamespaces {
handoverNamespaces[k] = &historyservice.HandoverNamespaceInfo{
handoverNamespaces[k.String()] = &historyservice.HandoverNamespaceInfo{
HandoverReplicationTaskId: v.MaxReplicationTaskID,
}
}
Expand Down Expand Up @@ -1928,6 +1989,10 @@ func (s *ContextImpl) acquireShard() {
engine = s.createEngine()
}

// NOTE: engine is created & started before setting shard state to acquired.
// -> namespace handover callback is registered & called before shard is able to serve traffic
// -> information for handover namespace is recorded before shard can servce traffic
// -> upon shard reload, no history api or task can go through for ns in handover state
err = s.transition(contextRequestAcquired{engine: engine})

if err != nil {
Expand All @@ -1942,6 +2007,8 @@ func (s *ContextImpl) acquireShard() {
// to trigger a load as queue max level can be updated to a newer value
s.notifyQueueProcessor()

s.updateHandoverNamespacePendingTaskID()

return nil
}

Expand Down Expand Up @@ -2014,7 +2081,7 @@ func newContext(
clusterMetadata: clusterMetadata,
archivalMetadata: archivalMetadata,
hostInfoProvider: hostInfoProvider,
handoverNamespaces: make(map[string]*namespaceHandOverInfo),
handoverNamespaces: make(map[namespace.Name]*namespaceHandOverInfo),
lifecycleCtx: lifecycleCtx,
lifecycleCancel: lifecycleCancel,
engineFuture: future.NewFuture[Engine](),
Expand Down
Loading