Skip to content

Commit

Permalink
Capture panic in replication task processing (#3799)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Jan 12, 2023
1 parent 6319a28 commit 5b182a7
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 26 deletions.
2 changes: 1 addition & 1 deletion service/history/replication/dlq_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (r *dlqHandlerImpl) MergeMessages(
}

for _, task := range replicationTasks {
if _, err := taskExecutor.Execute(
if err := taskExecutor.Execute(
ctx,
task,
true,
Expand Down
2 changes: 1 addition & 1 deletion service/history/replication/dlq_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func (s *dlqHandlerSuite) TestMergeMessages() {
Return(&adminservice.GetDLQReplicationMessagesResponse{
ReplicationTasks: []*replicationspb.ReplicationTask{remoteTask},
}, nil)
s.taskExecutor.EXPECT().Execute(gomock.Any(), remoteTask, true).Return("", nil)
s.taskExecutor.EXPECT().Execute(gomock.Any(), remoteTask, true).Return(nil)
s.executionManager.EXPECT().RangeDeleteReplicationTaskFromDLQ(gomock.Any(), &persistence.RangeDeleteReplicationTaskFromDLQRequest{
RangeCompleteHistoryTasksRequest: persistence.RangeCompleteHistoryTasksRequest{
ShardID: s.mockShard.GetShardID(),
Expand Down
13 changes: 3 additions & 10 deletions service/history/replication/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import (

type (
TaskExecutor interface {
Execute(ctx context.Context, replicationTask *replicationspb.ReplicationTask, forceApply bool) (string, error)
Execute(ctx context.Context, replicationTask *replicationspb.ReplicationTask, forceApply bool) error
}

TaskExecutorParams struct {
Expand Down Expand Up @@ -107,32 +107,25 @@ func (e *taskExecutorImpl) Execute(
ctx context.Context,
replicationTask *replicationspb.ReplicationTask,
forceApply bool,
) (string, error) {
) error {
var err error
var operation string
switch replicationTask.GetTaskType() {
case enumsspb.REPLICATION_TASK_TYPE_SYNC_SHARD_STATUS_TASK:
// Shard status will be sent as part of the Replication message without kafka
operation = metrics.SyncShardTaskScope
case enumsspb.REPLICATION_TASK_TYPE_SYNC_ACTIVITY_TASK:
operation = metrics.SyncActivityTaskScope
err = e.handleActivityTask(ctx, replicationTask, forceApply)
case enumsspb.REPLICATION_TASK_TYPE_HISTORY_METADATA_TASK:
// Without kafka we should not have size limits so we don't necessary need this in the new replication scheme.
operation = metrics.HistoryMetadataReplicationTaskScope
case enumsspb.REPLICATION_TASK_TYPE_HISTORY_V2_TASK:
operation = metrics.HistoryReplicationTaskScope
err = e.handleHistoryReplicationTask(ctx, replicationTask, forceApply)
case enumsspb.REPLICATION_TASK_TYPE_SYNC_WORKFLOW_STATE_TASK:
operation = metrics.SyncWorkflowStateTaskScope
err = e.handleSyncWorkflowStateTask(ctx, replicationTask, forceApply)
default:
e.logger.Error("Unknown task type.")
operation = metrics.ReplicatorScope
err = ErrUnknownReplicationTask
}

return operation, err
return err
}

func (e *taskExecutorImpl) handleActivityTask(
Expand Down
7 changes: 3 additions & 4 deletions service/history/replication/task_executor_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions service/history/replication/task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func (s *taskExecutorSuite) TestProcessTaskOnce_SyncActivityReplicationTask() {
}

s.mockEngine.EXPECT().SyncActivity(gomock.Any(), request).Return(nil)
_, err := s.replicationTaskExecutor.Execute(context.Background(), task, true)
err := s.replicationTaskExecutor.Execute(context.Background(), task, true)
s.NoError(err)
}

Expand Down Expand Up @@ -297,7 +297,7 @@ func (s *taskExecutorSuite) TestProcessTaskOnce_SyncActivityReplicationTask_Rese
int64(456),
)
s.mockEngine.EXPECT().SyncActivity(gomock.Any(), request).Return(nil)
_, err := s.replicationTaskExecutor.Execute(context.Background(), task, true)
err := s.replicationTaskExecutor.Execute(context.Background(), task, true)
s.NoError(err)
}

Expand Down Expand Up @@ -330,7 +330,7 @@ func (s *taskExecutorSuite) TestProcess_HistoryReplicationTask() {
}

s.mockEngine.EXPECT().ReplicateEventsV2(gomock.Any(), request).Return(nil)
_, err := s.replicationTaskExecutor.Execute(context.Background(), task, true)
err := s.replicationTaskExecutor.Execute(context.Background(), task, true)
s.NoError(err)
}

Expand Down Expand Up @@ -385,7 +385,7 @@ func (s *taskExecutorSuite) TestProcess_HistoryReplicationTask_Resend() {
int64(456),
)
s.mockEngine.EXPECT().ReplicateEventsV2(gomock.Any(), request).Return(nil)
_, err := s.replicationTaskExecutor.Execute(context.Background(), task, true)
err := s.replicationTaskExecutor.Execute(context.Background(), task, true)
s.NoError(err)
}

Expand All @@ -406,6 +406,6 @@ func (s *taskExecutorSuite) TestProcessTaskOnce_SyncWorkflowStateTask() {

s.mockEngine.EXPECT().ReplicateWorkflowState(gomock.Any(), gomock.Any()).Return(nil)

_, err := s.replicationTaskExecutor.Execute(context.Background(), task, true)
err := s.replicationTaskExecutor.Execute(context.Background(), task, true)
s.NoError(err)
}
38 changes: 35 additions & 3 deletions service/history/replication/task_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,14 +324,27 @@ func (p *taskProcessorImpl) handleSyncShardStatus(
func (p *taskProcessorImpl) handleReplicationTask(
ctx context.Context,
replicationTask *replicationspb.ReplicationTask,
) error {
) (retErr error) {
_ = p.rateLimiter.Wait(ctx)

operationTagValue := p.getOperationTagValue(replicationTask)

operation := func() error {
operation, err := p.replicationTaskExecutor.Execute(ctx, replicationTask, false)
p.emitTaskMetrics(operation, err)
err := p.replicationTaskExecutor.Execute(ctx, replicationTask, false)
p.emitTaskMetrics(operationTagValue, err)
return err
}

var panicErr error
defer func() {
if panicErr != nil {
retErr = panicErr
p.emitTaskMetrics(operationTagValue, panicErr)
}
}()

defer log.CapturePanic(p.logger, &panicErr)

return backoff.ThrottleRetry(operation, p.taskRetryPolicy, p.isRetryableError)
}

Expand Down Expand Up @@ -526,6 +539,25 @@ func (p *taskProcessorImpl) emitTaskMetrics(operation string, err error) {
metricsScope.Counter(metrics.ReplicationTasksFailed.GetMetricName()).Record(1)
}

func (p *taskProcessorImpl) getOperationTagValue(
replicationTask *replicationspb.ReplicationTask,
) string {
switch replicationTask.GetTaskType() {
case enumsspb.REPLICATION_TASK_TYPE_SYNC_SHARD_STATUS_TASK:
return metrics.SyncShardTaskScope
case enumsspb.REPLICATION_TASK_TYPE_SYNC_ACTIVITY_TASK:
return metrics.SyncActivityTaskScope
case enumsspb.REPLICATION_TASK_TYPE_HISTORY_METADATA_TASK:
return metrics.HistoryMetadataReplicationTaskScope
case enumsspb.REPLICATION_TASK_TYPE_HISTORY_V2_TASK:
return metrics.HistoryReplicationTaskScope
case enumsspb.REPLICATION_TASK_TYPE_SYNC_WORKFLOW_STATE_TASK:
return metrics.SyncWorkflowStateTaskScope
default:
return metrics.ReplicatorScope
}
}

func (p *taskProcessorImpl) isStopped() bool {
return atomic.LoadInt32(&p.status) == common.DaemonStatusStopped
}
Expand Down
16 changes: 14 additions & 2 deletions service/history/replication/task_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (s *taskProcessorSuite) TestHandleReplicationTask_SyncActivity() {
VisibilityTime: &now,
}

s.mockReplicationTaskExecutor.EXPECT().Execute(gomock.Any(), task, false).Return("", nil)
s.mockReplicationTaskExecutor.EXPECT().Execute(gomock.Any(), task, false).Return(nil)
err := s.replicationTaskProcessor.handleReplicationTask(context.Background(), task)
s.NoError(err)
}
Expand Down Expand Up @@ -243,11 +243,23 @@ func (s *taskProcessorSuite) TestHandleReplicationTask_History() {
VisibilityTime: &now,
}

s.mockReplicationTaskExecutor.EXPECT().Execute(gomock.Any(), task, false).Return("", nil)
s.mockReplicationTaskExecutor.EXPECT().Execute(gomock.Any(), task, false).Return(nil)
err = s.replicationTaskProcessor.handleReplicationTask(context.Background(), task)
s.NoError(err)
}

func (s *taskProcessorSuite) TestHandleReplicationTask_Panic() {
task := &replicationspb.ReplicationTask{}

s.mockReplicationTaskExecutor.EXPECT().Execute(gomock.Any(), task, false).DoAndReturn(
func(_ context.Context, _ *replicationspb.ReplicationTask, _ bool) error {
panic("test replication task panic")
},
)
err := s.replicationTaskProcessor.handleReplicationTask(context.Background(), task)
s.Error(err)
}

func (s *taskProcessorSuite) TestHandleReplicationDLQTask_SyncActivity() {
namespaceID := uuid.NewRandom().String()
workflowID := uuid.New()
Expand Down

0 comments on commit 5b182a7

Please sign in to comment.