Skip to content

Commit

Permalink
Capture task processing panic (#3779)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Jan 7, 2023
1 parent e91760f commit 3882bb6
Show file tree
Hide file tree
Showing 13 changed files with 116 additions and 15 deletions.
6 changes: 6 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,12 @@ func IsResourceExhausted(err error) bool {
return false
}

// IsInternalError checks if the error is an internal error.
func IsInternalError(err error) bool {
var internalErr *serviceerror.Internal
return errors.As(err, &internalErr)
}

// WorkflowIDToHistoryShard is used to map namespaceID-workflowID pair to a shardID.
func WorkflowIDToHistoryShard(
namespaceID string,
Expand Down
1 change: 1 addition & 0 deletions service/history/archival_queue_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
queues.NewNoopPriorityAssigner(),
timeSource,
namespaceRegistry,
mockMetadata,
nil,
metrics.NoopMetricsHandler,
)
Expand Down
63 changes: 55 additions & 8 deletions service/history/queues/executable.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"go.temporal.io/server/common"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
Expand Down Expand Up @@ -110,6 +111,7 @@ type (
priorityAssigner PriorityAssigner
timeSource clock.TimeSource
namespaceRegistry namespace.Registry
clusterMetadata cluster.Metadata

readerID int32
loadTime time.Time
Expand All @@ -135,6 +137,7 @@ func NewExecutable(
priorityAssigner PriorityAssigner,
timeSource clock.TimeSource,
namespaceRegistry namespace.Registry,
clusterMetadata cluster.Metadata,
logger log.Logger,
metricsHandler metrics.Handler,
) Executable {
Expand All @@ -148,6 +151,7 @@ func NewExecutable(
priorityAssigner: priorityAssigner,
timeSource: timeSource,
namespaceRegistry: namespaceRegistry,
clusterMetadata: clusterMetadata,
readerID: readerID,
loadTime: util.MaxTime(timeSource.Now(), task.GetKey().FireTime),
logger: log.NewLazyLogger(
Expand All @@ -163,15 +167,29 @@ func NewExecutable(
return executable
}

func (e *executableImpl) Execute() error {
func (e *executableImpl) Execute() (retErr error) {
if e.State() == ctasks.TaskStateCancelled {
return nil
}

ctx := metrics.AddMetricsContext(context.Background())
namespace, _ := e.namespaceRegistry.GetNamespaceName(namespace.ID(e.GetNamespaceID()))
namespaceName, _ := e.namespaceRegistry.GetNamespaceName(namespace.ID(e.GetNamespaceID()))
ctx := headers.SetCallerInfo(
metrics.AddMetricsContext(context.Background()),
headers.NewBackgroundCallerInfo(namespaceName.String()),
)

var panicErr error
defer func() {
if panicErr != nil {
retErr = panicErr

// we need to guess the metrics tags here as we don't know which execution logic
// is actually used which is upto the executor implementation
e.taggedMetricsHandler = e.metricsHandler.WithTags(e.estimateTaskMetricTag()...)
}
}()

ctx = headers.SetCallerInfo(ctx, headers.NewBackgroundCallerInfo(namespace.String()))
defer log.CapturePanic(e.logger, &panicErr)

startTime := e.timeSource.Now()

Expand Down Expand Up @@ -296,6 +314,12 @@ func (e *executableImpl) IsRetryableError(err error) bool {
return false
}

// Internal error is non-retryable and usually means unexpected error has happened,
// e.g. unknown task, corrupted state, panic etc.
if common.IsInternalError(err) {
return false
}

// ErrTaskRetry means mutable state is not ready for standby task processing
// there's no point for retrying the task immediately which will hold the worker corouinte
// TODO: change ErrTaskRetry to a better name
Expand Down Expand Up @@ -424,6 +448,10 @@ func (e *executableImpl) shouldResubmitOnNack(attempt int, err error) bool {
return false
}

if common.IsInternalError(err) {
return false
}

return err != consts.ErrTaskRetry &&
err != consts.ErrDependencyTaskNotCompleted &&
err != consts.ErrNamespaceHandover
Expand All @@ -433,13 +461,14 @@ func (e *executableImpl) rescheduleTime(
err error,
attempt int,
) time.Time {
// elapsedTime (the first parameter in ComputeNextDelay) is not relevant here
// elapsedTime, the first parameter in ComputeNextDelay is not relevant here
// since reschedule policy has no expiration interval.

if err == consts.ErrTaskRetry || err == consts.ErrNamespaceHandover {
if err == consts.ErrTaskRetry ||
err == consts.ErrNamespaceHandover ||
common.IsInternalError(err) {
// using a different reschedule policy to slow down retry
// as the error means mutable state or namespace is not ready to handle the task,
// need to wait for replication.
// as immediate retry typically won't resolve the issue.
return e.timeSource.Now().Add(taskNotReadyReschedulePolicy.ComputeNextDelay(0, attempt))
}

Expand Down Expand Up @@ -469,3 +498,21 @@ func (e *executableImpl) updatePriority() {
e.lowestPriority = e.priority
}
}

func (e *executableImpl) estimateTaskMetricTag() []metrics.Tag {
namespaceTag := metrics.NamespaceUnknownTag()
isActive := true

namespace, err := e.namespaceRegistry.GetNamespaceByID(namespace.ID(e.GetNamespaceID()))
if err == nil {
namespaceTag = metrics.NamespaceTag(namespace.Name().String())
isActive = namespace.ActiveInCluster(e.clusterMetadata.GetCurrentClusterName())
}

taskType := getTaskTypeTagValue(e.Task, isActive)
return []metrics.Tag{
namespaceTag,
metrics.TaskTypeTag(taskType),
metrics.OperationTag(taskType), // for backward compatibility
}
}
17 changes: 17 additions & 0 deletions service/history/queues/executable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"go.temporal.io/api/serviceerror"

"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
Expand All @@ -57,6 +58,7 @@ type (
mockScheduler *MockScheduler
mockRescheduler *MockRescheduler
mockNamespaceRegistry *namespace.MockRegistry
mockClusterMetadata *cluster.MockMetadata

timeSource *clock.EventTimeSource
}
Expand All @@ -75,8 +77,11 @@ func (s *executableSuite) SetupTest() {
s.mockScheduler = NewMockScheduler(s.controller)
s.mockRescheduler = NewMockRescheduler(s.controller)
s.mockNamespaceRegistry = namespace.NewMockRegistry(s.controller)
s.mockClusterMetadata = cluster.NewMockMetadata(s.controller)

s.mockNamespaceRegistry.EXPECT().GetNamespaceName(gomock.Any()).Return(tests.Namespace, nil).AnyTimes()
s.mockNamespaceRegistry.EXPECT().GetNamespaceByID(gomock.Any()).Return(tests.GlobalNamespaceEntry, nil).AnyTimes()
s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()

s.timeSource = clock.NewEventTimeSource()
}
Expand Down Expand Up @@ -108,6 +113,17 @@ func (s *executableSuite) TestExecute_UserLatency() {
s.Equal(time.Duration(expectedUserLatency), executable.(*executableImpl).userLatency)
}

func (s *executableSuite) TestExecute_CapturePanic() {
executable := s.newTestExecutable()

s.mockExecutor.EXPECT().Execute(gomock.Any(), executable).DoAndReturn(
func(_ context.Context, _ Executable) ([]metrics.Tag, bool, error) {
panic("test panic during execution")
},
)
s.Error(executable.Execute())
}

func (s *executableSuite) TestHandleErr_EntityNotExists() {
executable := s.newTestExecutable()

Expand Down Expand Up @@ -219,6 +235,7 @@ func (s *executableSuite) newTestExecutable() Executable {
NewNoopPriorityAssigner(),
s.timeSource,
s.mockNamespaceRegistry,
s.mockClusterMetadata,
log.NewTestLogger(),
metrics.NoopMetricsHandler,
)
Expand Down
24 changes: 24 additions & 0 deletions service/history/queues/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,27 @@ func GetArchivalTaskTypeTagValue(
return ""
}
}

func getTaskTypeTagValue(
task tasks.Task,
isActive bool,
) string {
switch task.GetCategory() {
case tasks.CategoryTransfer:
if isActive {
return GetActiveTransferTaskTypeTagValue(task)
}
return GetStandbyTransferTaskTypeTagValue(task)
case tasks.CategoryTimer:
if isActive {
return GetActiveTimerTaskTypeTagValue(task)
}
return GetStandbyTimerTaskTypeTagValue(task)
case tasks.CategoryVisibility:
return GetVisibilityTaskTypeTagValue(task)
case tasks.CategoryArchival:
return GetArchivalTaskTypeTagValue(task)
default:
return task.GetType().String()
}
}
1 change: 1 addition & 0 deletions service/history/queues/queue_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func newQueueBase(
priorityAssigner,
timeSource,
shard.GetNamespaceRegistry(),
shard.GetClusterMetadata(),
logger,
metricsHandler,
)
Expand Down
2 changes: 1 addition & 1 deletion service/history/queues/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (s *readerSuite) SetupTest() {
s.metricsHandler = metrics.NoopMetricsHandler

s.executableInitializer = func(readerID int32, t tasks.Task) Executable {
return NewExecutable(readerID, t, nil, nil, nil, NewNoopPriorityAssigner(), clock.NewRealTimeSource(), nil, nil, metrics.NoopMetricsHandler)
return NewExecutable(readerID, t, nil, nil, nil, NewNoopPriorityAssigner(), clock.NewRealTimeSource(), nil, nil, nil, metrics.NoopMetricsHandler)
}
s.monitor = newMonitor(tasks.CategoryTypeScheduled, &MonitorOptions{
PendingTasksCriticalCount: dynamicconfig.GetIntPropertyFn(1000),
Expand Down
2 changes: 1 addition & 1 deletion service/history/queues/slice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (s *sliceSuite) SetupTest() {
s.controller = gomock.NewController(s.T())

s.executableInitializer = func(readerID int32, t tasks.Task) Executable {
return NewExecutable(readerID, t, nil, nil, nil, NewNoopPriorityAssigner(), clock.NewRealTimeSource(), nil, nil, metrics.NoopMetricsHandler)
return NewExecutable(readerID, t, nil, nil, nil, NewNoopPriorityAssigner(), clock.NewRealTimeSource(), nil, nil, nil, metrics.NoopMetricsHandler)
}
s.monitor = newMonitor(tasks.CategoryTypeScheduled, &MonitorOptions{
PendingTasksCriticalCount: dynamicconfig.GetIntPropertyFn(1000),
Expand Down
3 changes: 2 additions & 1 deletion service/history/timerQueueActiveTaskExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1479,7 +1479,8 @@ func (s *timerQueueActiveTaskExecutorSuite) newTaskExecutable(
nil,
queues.NewNoopPriorityAssigner(),
s.mockShard.GetTimeSource(),
nil,
s.mockNamespaceCache,
s.mockClusterMetadata,
nil,
metrics.NoopMetricsHandler,
)
Expand Down
3 changes: 2 additions & 1 deletion service/history/timerQueueStandbyTaskExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1542,7 +1542,8 @@ func (s *timerQueueStandbyTaskExecutorSuite) newTaskExecutable(
nil,
queues.NewNoopPriorityAssigner(),
s.mockShard.GetTimeSource(),
nil,
s.mockNamespaceCache,
s.mockClusterMetadata,
nil,
metrics.NoopMetricsHandler,
)
Expand Down
3 changes: 2 additions & 1 deletion service/history/transferQueueActiveTaskExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2802,7 +2802,8 @@ func (s *transferQueueActiveTaskExecutorSuite) newTaskExecutable(
nil,
queues.NewNoopPriorityAssigner(),
s.mockShard.GetTimeSource(),
nil,
s.mockNamespaceCache,
s.mockClusterMetadata,
nil,
metrics.NoopMetricsHandler,
)
Expand Down
3 changes: 2 additions & 1 deletion service/history/transferQueueStandbyTaskExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1300,7 +1300,8 @@ func (s *transferQueueStandbyTaskExecutorSuite) newTaskExecutable(
nil,
queues.NewNoopPriorityAssigner(),
s.mockShard.GetTimeSource(),
nil,
s.mockNamespaceCache,
s.mockClusterMetadata,
nil,
metrics.NoopMetricsHandler,
)
Expand Down
3 changes: 2 additions & 1 deletion service/history/visibilityQueueTaskExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,8 @@ func (s *visibilityQueueTaskExecutorSuite) newTaskExecutable(
nil,
queues.NewNoopPriorityAssigner(),
s.mockShard.GetTimeSource(),
nil,
s.mockShard.GetNamespaceRegistry(),
s.mockShard.GetClusterMetadata(),
nil,
metrics.NoopMetricsHandler,
)
Expand Down

0 comments on commit 3882bb6

Please sign in to comment.