diff --git a/service/history/replication/dlq_handler.go b/service/history/replication/dlq_handler.go index cb76cea0916..3b008c713c5 100644 --- a/service/history/replication/dlq_handler.go +++ b/service/history/replication/dlq_handler.go @@ -209,7 +209,7 @@ func (r *dlqHandlerImpl) MergeMessages( } for _, task := range replicationTasks { - if _, err := taskExecutor.Execute( + if err := taskExecutor.Execute( ctx, task, true, diff --git a/service/history/replication/dlq_handler_test.go b/service/history/replication/dlq_handler_test.go index 6674dc3acab..5c21e1939ed 100644 --- a/service/history/replication/dlq_handler_test.go +++ b/service/history/replication/dlq_handler_test.go @@ -292,7 +292,7 @@ func (s *dlqHandlerSuite) TestMergeMessages() { Return(&adminservice.GetDLQReplicationMessagesResponse{ ReplicationTasks: []*replicationspb.ReplicationTask{remoteTask}, }, nil) - s.taskExecutor.EXPECT().Execute(gomock.Any(), remoteTask, true).Return("", nil) + s.taskExecutor.EXPECT().Execute(gomock.Any(), remoteTask, true).Return(nil) s.executionManager.EXPECT().RangeDeleteReplicationTaskFromDLQ(gomock.Any(), &persistence.RangeDeleteReplicationTaskFromDLQRequest{ RangeCompleteHistoryTasksRequest: persistence.RangeCompleteHistoryTasksRequest{ ShardID: s.mockShard.GetShardID(), diff --git a/service/history/replication/task_executor.go b/service/history/replication/task_executor.go index bc326e68807..4fe6d463737 100644 --- a/service/history/replication/task_executor.go +++ b/service/history/replication/task_executor.go @@ -51,7 +51,7 @@ import ( type ( TaskExecutor interface { - Execute(ctx context.Context, replicationTask *replicationspb.ReplicationTask, forceApply bool) (string, error) + Execute(ctx context.Context, replicationTask *replicationspb.ReplicationTask, forceApply bool) error } TaskExecutorParams struct { @@ -107,32 +107,25 @@ func (e *taskExecutorImpl) Execute( ctx context.Context, replicationTask *replicationspb.ReplicationTask, forceApply bool, -) (string, error) { +) error { var err error - var operation string switch replicationTask.GetTaskType() { case enumsspb.REPLICATION_TASK_TYPE_SYNC_SHARD_STATUS_TASK: // Shard status will be sent as part of the Replication message without kafka - operation = metrics.SyncShardTaskScope case enumsspb.REPLICATION_TASK_TYPE_SYNC_ACTIVITY_TASK: - operation = metrics.SyncActivityTaskScope err = e.handleActivityTask(ctx, replicationTask, forceApply) case enumsspb.REPLICATION_TASK_TYPE_HISTORY_METADATA_TASK: // Without kafka we should not have size limits so we don't necessary need this in the new replication scheme. - operation = metrics.HistoryMetadataReplicationTaskScope case enumsspb.REPLICATION_TASK_TYPE_HISTORY_V2_TASK: - operation = metrics.HistoryReplicationTaskScope err = e.handleHistoryReplicationTask(ctx, replicationTask, forceApply) case enumsspb.REPLICATION_TASK_TYPE_SYNC_WORKFLOW_STATE_TASK: - operation = metrics.SyncWorkflowStateTaskScope err = e.handleSyncWorkflowStateTask(ctx, replicationTask, forceApply) default: e.logger.Error("Unknown task type.") - operation = metrics.ReplicatorScope err = ErrUnknownReplicationTask } - return operation, err + return err } func (e *taskExecutorImpl) handleActivityTask( diff --git a/service/history/replication/task_executor_mock.go b/service/history/replication/task_executor_mock.go index 34782b70958..66c0910ebd7 100644 --- a/service/history/replication/task_executor_mock.go +++ b/service/history/replication/task_executor_mock.go @@ -60,12 +60,11 @@ func (m *MockTaskExecutor) EXPECT() *MockTaskExecutorMockRecorder { } // Execute mocks base method. -func (m *MockTaskExecutor) Execute(ctx context.Context, replicationTask *repication.ReplicationTask, forceApply bool) (string, error) { +func (m *MockTaskExecutor) Execute(ctx context.Context, replicationTask *repication.ReplicationTask, forceApply bool) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Execute", ctx, replicationTask, forceApply) - ret0, _ := ret[0].(string) - ret1, _ := ret[1].(error) - return ret0, ret1 + ret0, _ := ret[0].(error) + return ret0 } // Execute indicates an expected call of Execute. diff --git a/service/history/replication/task_executor_test.go b/service/history/replication/task_executor_test.go index d298eb0853c..63fd8ba1b9a 100644 --- a/service/history/replication/task_executor_test.go +++ b/service/history/replication/task_executor_test.go @@ -232,7 +232,7 @@ func (s *taskExecutorSuite) TestProcessTaskOnce_SyncActivityReplicationTask() { } s.mockEngine.EXPECT().SyncActivity(gomock.Any(), request).Return(nil) - _, err := s.replicationTaskExecutor.Execute(context.Background(), task, true) + err := s.replicationTaskExecutor.Execute(context.Background(), task, true) s.NoError(err) } @@ -297,7 +297,7 @@ func (s *taskExecutorSuite) TestProcessTaskOnce_SyncActivityReplicationTask_Rese int64(456), ) s.mockEngine.EXPECT().SyncActivity(gomock.Any(), request).Return(nil) - _, err := s.replicationTaskExecutor.Execute(context.Background(), task, true) + err := s.replicationTaskExecutor.Execute(context.Background(), task, true) s.NoError(err) } @@ -330,7 +330,7 @@ func (s *taskExecutorSuite) TestProcess_HistoryReplicationTask() { } s.mockEngine.EXPECT().ReplicateEventsV2(gomock.Any(), request).Return(nil) - _, err := s.replicationTaskExecutor.Execute(context.Background(), task, true) + err := s.replicationTaskExecutor.Execute(context.Background(), task, true) s.NoError(err) } @@ -385,7 +385,7 @@ func (s *taskExecutorSuite) TestProcess_HistoryReplicationTask_Resend() { int64(456), ) s.mockEngine.EXPECT().ReplicateEventsV2(gomock.Any(), request).Return(nil) - _, err := s.replicationTaskExecutor.Execute(context.Background(), task, true) + err := s.replicationTaskExecutor.Execute(context.Background(), task, true) s.NoError(err) } @@ -406,6 +406,6 @@ func (s *taskExecutorSuite) TestProcessTaskOnce_SyncWorkflowStateTask() { s.mockEngine.EXPECT().ReplicateWorkflowState(gomock.Any(), gomock.Any()).Return(nil) - _, err := s.replicationTaskExecutor.Execute(context.Background(), task, true) + err := s.replicationTaskExecutor.Execute(context.Background(), task, true) s.NoError(err) } diff --git a/service/history/replication/task_processor.go b/service/history/replication/task_processor.go index f848c6e960d..9c2701f7ca4 100644 --- a/service/history/replication/task_processor.go +++ b/service/history/replication/task_processor.go @@ -324,14 +324,27 @@ func (p *taskProcessorImpl) handleSyncShardStatus( func (p *taskProcessorImpl) handleReplicationTask( ctx context.Context, replicationTask *replicationspb.ReplicationTask, -) error { +) (retErr error) { _ = p.rateLimiter.Wait(ctx) + operationTagValue := p.getOperationTagValue(replicationTask) + operation := func() error { - operation, err := p.replicationTaskExecutor.Execute(ctx, replicationTask, false) - p.emitTaskMetrics(operation, err) + err := p.replicationTaskExecutor.Execute(ctx, replicationTask, false) + p.emitTaskMetrics(operationTagValue, err) return err } + + var panicErr error + defer func() { + if panicErr != nil { + retErr = panicErr + p.emitTaskMetrics(operationTagValue, panicErr) + } + }() + + defer log.CapturePanic(p.logger, &panicErr) + return backoff.ThrottleRetry(operation, p.taskRetryPolicy, p.isRetryableError) } @@ -526,6 +539,25 @@ func (p *taskProcessorImpl) emitTaskMetrics(operation string, err error) { metricsScope.Counter(metrics.ReplicationTasksFailed.GetMetricName()).Record(1) } +func (p *taskProcessorImpl) getOperationTagValue( + replicationTask *replicationspb.ReplicationTask, +) string { + switch replicationTask.GetTaskType() { + case enumsspb.REPLICATION_TASK_TYPE_SYNC_SHARD_STATUS_TASK: + return metrics.SyncShardTaskScope + case enumsspb.REPLICATION_TASK_TYPE_SYNC_ACTIVITY_TASK: + return metrics.SyncActivityTaskScope + case enumsspb.REPLICATION_TASK_TYPE_HISTORY_METADATA_TASK: + return metrics.HistoryMetadataReplicationTaskScope + case enumsspb.REPLICATION_TASK_TYPE_HISTORY_V2_TASK: + return metrics.HistoryReplicationTaskScope + case enumsspb.REPLICATION_TASK_TYPE_SYNC_WORKFLOW_STATE_TASK: + return metrics.SyncWorkflowStateTaskScope + default: + return metrics.ReplicatorScope + } +} + func (p *taskProcessorImpl) isStopped() bool { return atomic.LoadInt32(&p.status) == common.DaemonStatusStopped } diff --git a/service/history/replication/task_processor_test.go b/service/history/replication/task_processor_test.go index c24a84f3b0e..0298e428939 100644 --- a/service/history/replication/task_processor_test.go +++ b/service/history/replication/task_processor_test.go @@ -204,7 +204,7 @@ func (s *taskProcessorSuite) TestHandleReplicationTask_SyncActivity() { VisibilityTime: &now, } - s.mockReplicationTaskExecutor.EXPECT().Execute(gomock.Any(), task, false).Return("", nil) + s.mockReplicationTaskExecutor.EXPECT().Execute(gomock.Any(), task, false).Return(nil) err := s.replicationTaskProcessor.handleReplicationTask(context.Background(), task) s.NoError(err) } @@ -243,11 +243,23 @@ func (s *taskProcessorSuite) TestHandleReplicationTask_History() { VisibilityTime: &now, } - s.mockReplicationTaskExecutor.EXPECT().Execute(gomock.Any(), task, false).Return("", nil) + s.mockReplicationTaskExecutor.EXPECT().Execute(gomock.Any(), task, false).Return(nil) err = s.replicationTaskProcessor.handleReplicationTask(context.Background(), task) s.NoError(err) } +func (s *taskProcessorSuite) TestHandleReplicationTask_Panic() { + task := &replicationspb.ReplicationTask{} + + s.mockReplicationTaskExecutor.EXPECT().Execute(gomock.Any(), task, false).DoAndReturn( + func(_ context.Context, _ *replicationspb.ReplicationTask, _ bool) error { + panic("test replication task panic") + }, + ) + err := s.replicationTaskProcessor.handleReplicationTask(context.Background(), task) + s.Error(err) +} + func (s *taskProcessorSuite) TestHandleReplicationDLQTask_SyncActivity() { namespaceID := uuid.NewRandom().String() workflowID := uuid.New()