From 52ab61ff1f505a85ba2003154fb394dfcb1d7a58 Mon Sep 17 00:00:00 2001 From: yux0 Date: Mon, 9 Jan 2023 15:51:03 -0800 Subject: [PATCH 1/6] Allow start many replication poller in one shard --- service/history/replication/poller_manager.go | 6 +++ service/history/replication/task_processor.go | 38 +++++++------- .../replication/task_processor_manager.go | 50 ++++++++++++------- .../replication/task_processor_test.go | 1 + 4 files changed, 59 insertions(+), 36 deletions(-) diff --git a/service/history/replication/poller_manager.go b/service/history/replication/poller_manager.go index 0b01e6c6621..7423114b63c 100644 --- a/service/history/replication/poller_manager.go +++ b/service/history/replication/poller_manager.go @@ -31,12 +31,18 @@ import ( ) type ( + pollerManager interface { + getPollingShardIDs(remoteClusterName string) []int32 + } + pollerManagerImpl struct { currentShardId int32 clusterMetadata cluster.Metadata } ) +var _ pollerManager = (*pollerManagerImpl)(nil) + func newPollerManager( currentShardId int32, clusterMetadata cluster.Metadata, diff --git a/service/history/replication/task_processor.go b/service/history/replication/task_processor.go index f848c6e960d..728534b6481 100644 --- a/service/history/replication/task_processor.go +++ b/service/history/replication/task_processor.go @@ -74,9 +74,10 @@ type ( // taskProcessorImpl is responsible for processing replication tasks for a shard. taskProcessorImpl struct { - currentCluster string + status int32 + sourceCluster string - status int32 + pollingShardID int32 shard shard.Context historyEngine shard.Engine historySerializer serialization.Serializer @@ -109,6 +110,7 @@ type ( // NewTaskProcessor creates a new replication task processor. func NewTaskProcessor( + pollingShardID int32, shard shard.Context, historyEngine shard.Engine, config *configs.Config, @@ -117,24 +119,23 @@ func NewTaskProcessor( replicationTaskExecutor TaskExecutor, eventSerializer serialization.Serializer, ) TaskProcessor { - shardID := shard.GetShardID() - taskRetryPolicy := backoff.NewExponentialRetryPolicy(config.ReplicationTaskProcessorErrorRetryWait(shardID)). - WithBackoffCoefficient(config.ReplicationTaskProcessorErrorRetryBackoffCoefficient(shardID)). - WithMaximumInterval(config.ReplicationTaskProcessorErrorRetryMaxInterval(shardID)). - WithMaximumAttempts(config.ReplicationTaskProcessorErrorRetryMaxAttempts(shardID)). - WithExpirationInterval(config.ReplicationTaskProcessorErrorRetryExpiration(shardID)) + taskRetryPolicy := backoff.NewExponentialRetryPolicy(config.ReplicationTaskProcessorErrorRetryWait(pollingShardID)). + WithBackoffCoefficient(config.ReplicationTaskProcessorErrorRetryBackoffCoefficient(pollingShardID)). + WithMaximumInterval(config.ReplicationTaskProcessorErrorRetryMaxInterval(pollingShardID)). + WithMaximumAttempts(config.ReplicationTaskProcessorErrorRetryMaxAttempts(pollingShardID)). + WithExpirationInterval(config.ReplicationTaskProcessorErrorRetryExpiration(pollingShardID)) // TODO: define separate set of configs for dlq retry - dlqRetryPolicy := backoff.NewExponentialRetryPolicy(config.ReplicationTaskProcessorErrorRetryWait(shardID)). - WithBackoffCoefficient(config.ReplicationTaskProcessorErrorRetryBackoffCoefficient(shardID)). - WithMaximumInterval(config.ReplicationTaskProcessorErrorRetryMaxInterval(shardID)). - WithMaximumAttempts(config.ReplicationTaskProcessorErrorRetryMaxAttempts(shardID)). - WithExpirationInterval(config.ReplicationTaskProcessorErrorRetryExpiration(shardID)) + dlqRetryPolicy := backoff.NewExponentialRetryPolicy(config.ReplicationTaskProcessorErrorRetryWait(pollingShardID)). + WithBackoffCoefficient(config.ReplicationTaskProcessorErrorRetryBackoffCoefficient(pollingShardID)). + WithMaximumInterval(config.ReplicationTaskProcessorErrorRetryMaxInterval(pollingShardID)). + WithMaximumAttempts(config.ReplicationTaskProcessorErrorRetryMaxAttempts(pollingShardID)). + WithExpirationInterval(config.ReplicationTaskProcessorErrorRetryExpiration(pollingShardID)) return &taskProcessorImpl{ - currentCluster: shard.GetClusterMetadata().GetCurrentClusterName(), - sourceCluster: replicationTaskFetcher.getSourceCluster(), status: common.DaemonStatusInitialized, + pollingShardID: pollingShardID, + sourceCluster: replicationTaskFetcher.getSourceCluster(), shard: shard, historyEngine: historyEngine, historySerializer: eventSerializer, @@ -370,6 +371,7 @@ func (p *taskProcessorImpl) convertTaskToDLQTask( switch replicationTask.TaskType { case enumsspb.REPLICATION_TASK_TYPE_SYNC_ACTIVITY_TASK: taskAttributes := replicationTask.GetSyncActivityTaskAttributes() + // TODO: GetShardID will break GetDLQReplicationMessages we need to handle DLQ for cross shard replication. return &persistence.PutReplicationTaskToDLQRequest{ ShardID: p.shard.GetShardID(), SourceClusterName: p.sourceCluster, @@ -401,6 +403,7 @@ func (p *taskProcessorImpl) convertTaskToDLQTask( // NOTE: last event vs next event, next event ID is exclusive nextEventID := lastEvent.GetEventId() + 1 + // TODO: GetShardID will break GetDLQReplicationMessages we need to handle DLQ for cross shard replication. return &persistence.PutReplicationTaskToDLQRequest{ ShardID: p.shard.GetShardID(), SourceClusterName: p.sourceCluster, @@ -429,6 +432,7 @@ func (p *taskProcessorImpl) convertTaskToDLQTask( return nil, err } + // TODO: GetShardID will break GetDLQReplicationMessages we need to handle DLQ for cross shard replication. return &persistence.PutReplicationTaskToDLQRequest{ ShardID: p.shard.GetShardID(), SourceClusterName: p.sourceCluster, @@ -451,7 +455,7 @@ func (p *taskProcessorImpl) paginationFn(_ []byte) ([]interface{}, []byte, error respChan := make(chan *replicationspb.ReplicationMessages, 1) p.requestChan <- &replicationTaskRequest{ token: &replicationspb.ReplicationToken{ - ShardId: p.shard.GetShardID(), + ShardId: p.pollingShardID, LastProcessedMessageId: p.maxRxProcessedTaskID, LastProcessedVisibilityTime: &p.maxRxProcessedTimestamp, LastRetrievedMessageId: p.maxRxReceivedTaskID, @@ -486,7 +490,7 @@ func (p *taskProcessorImpl) paginationFn(_ []byte) ([]interface{}, []byte, error if resp.GetHasMore() { p.rxTaskBackoff = time.Duration(0) } else { - p.rxTaskBackoff = p.config.ReplicationTaskProcessorNoTaskRetryWait(p.shard.GetShardID()) + p.rxTaskBackoff = p.config.ReplicationTaskProcessorNoTaskRetryWait(p.pollingShardID) } return tasks, nil, nil diff --git a/service/history/replication/task_processor_manager.go b/service/history/replication/task_processor_manager.go index e3b520199ce..2d9b2344c00 100644 --- a/service/history/replication/task_processor_manager.go +++ b/service/history/replication/task_processor_manager.go @@ -26,6 +26,7 @@ package replication import ( "context" + "fmt" "sync" "sync/atomic" "time" @@ -49,6 +50,10 @@ import ( wcache "go.temporal.io/server/service/history/workflow/cache" ) +const ( + clusterCallbackKey = "%s-%d" // - +) + type ( // taskProcessorManagerImpl is to manage replication task processors taskProcessorManagerImpl struct { @@ -62,6 +67,7 @@ type ( workflowCache wcache.Cache resender xdc.NDCHistoryResender taskExecutorProvider TaskExecutorProvider + taskPollerManager pollerManager metricsHandler metrics.Handler logger log.Logger @@ -110,6 +116,7 @@ func NewTaskProcessorManager( metricsHandler: shard.GetMetricsHandler(), taskProcessors: make(map[string]TaskProcessor), taskExecutorProvider: taskExecutorProvider, + taskPollerManager: newPollerManager(shard.GetShardID(), shard.GetClusterMetadata()), minTxAckedTaskID: persistence.EmptyQueueMessageID, shutdownChan: make(chan struct{}), } @@ -167,7 +174,7 @@ func (r *taskProcessorManagerImpl) handleClusterMetadataUpdate( if clusterName == currentClusterName { continue } - // The metadata triggers a update when the following fields update: 1. Enabled 2. Initial Failover Version 3. Cluster address + // The metadata triggers an update when the following fields update: 1. Enabled 2. Initial Failover Version 3. Cluster address // The callback covers three cases: // Case 1: Remove a cluster Case 2: Add a new cluster Case 3: Refresh cluster metadata. @@ -180,24 +187,29 @@ func (r *taskProcessorManagerImpl) handleClusterMetadataUpdate( if clusterInfo := newClusterMetadata[clusterName]; clusterInfo != nil && clusterInfo.Enabled { // Case 2 and Case 3 fetcher := r.replicationTaskFetcherFactory.GetOrCreateFetcher(clusterName) - replicationTaskProcessor := NewTaskProcessor( - r.shard, - r.engine, - r.config, - r.shard.GetMetricsHandler(), - fetcher, - r.taskExecutorProvider(TaskExecutorParams{ - RemoteCluster: clusterName, - Shard: r.shard, - HistoryResender: r.resender, - HistoryEngine: r.engine, - DeleteManager: r.deleteMgr, - WorkflowCache: r.workflowCache, - }), - r.eventSerializer, - ) - replicationTaskProcessor.Start() - r.taskProcessors[clusterName] = replicationTaskProcessor + pollingShardIds := r.taskPollerManager.getPollingShardIDs(clusterName) + for _, pollingShardId := range pollingShardIds { + replicationTaskProcessor := NewTaskProcessor( + pollingShardId, + r.shard, + r.engine, + r.config, + r.shard.GetMetricsHandler(), + fetcher, + r.taskExecutorProvider(TaskExecutorParams{ + RemoteCluster: clusterName, + Shard: r.shard, + HistoryResender: r.resender, + HistoryEngine: r.engine, + DeleteManager: r.deleteMgr, + WorkflowCache: r.workflowCache, + }), + r.eventSerializer, + ) + replicationTaskProcessor.Start() + key := fmt.Sprintf(clusterCallbackKey, clusterName, pollingShardIds) + r.taskProcessors[key] = replicationTaskProcessor + } } } } diff --git a/service/history/replication/task_processor_test.go b/service/history/replication/task_processor_test.go index c24a84f3b0e..847317a9b30 100644 --- a/service/history/replication/task_processor_test.go +++ b/service/history/replication/task_processor_test.go @@ -148,6 +148,7 @@ func (s *taskProcessorSuite) SetupTest() { metricsClient := metrics.NoopMetricsHandler s.replicationTaskProcessor = NewTaskProcessor( + s.shardID, s.mockShard, s.mockEngine, s.config, From d5ce7d280c049649d7eb6a4b6aaff4fa9d716623 Mon Sep 17 00:00:00 2001 From: yux0 Date: Tue, 10 Jan 2023 13:06:35 -0800 Subject: [PATCH 2/6] handle test --- service/history/replication/poller_manager.go | 3 +- .../replication/task_processor_manager.go | 32 +++++++++---------- 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/service/history/replication/poller_manager.go b/service/history/replication/poller_manager.go index 7423114b63c..ce6213f3ffb 100644 --- a/service/history/replication/poller_manager.go +++ b/service/history/replication/poller_manager.go @@ -70,7 +70,8 @@ func (p pollerManagerImpl) getPollingShardIDs(remoteClusterName string) []int32 func generatePollingShardIDs(localShardId int32, localShardCount int32, remoteShardCount int32) []int32 { var pollingShards []int32 if remoteShardCount <= localShardCount { - if localShardId <= remoteShardCount { + if localShardId <= remoteShardCount || remoteShardCount == 0 { + // TODO: remove remoteShardCount == 0. This is due to current NDC/XDC functional test setup. pollingShards = append(pollingShards, localShardId) } return pollingShards diff --git a/service/history/replication/task_processor_manager.go b/service/history/replication/task_processor_manager.go index 2d9b2344c00..b30578d1623 100644 --- a/service/history/replication/task_processor_manager.go +++ b/service/history/replication/task_processor_manager.go @@ -174,21 +174,20 @@ func (r *taskProcessorManagerImpl) handleClusterMetadataUpdate( if clusterName == currentClusterName { continue } - // The metadata triggers an update when the following fields update: 1. Enabled 2. Initial Failover Version 3. Cluster address - // The callback covers three cases: - // Case 1: Remove a cluster Case 2: Add a new cluster Case 3: Refresh cluster metadata. - - if processor, ok := r.taskProcessors[clusterName]; ok { - // Case 1 and Case 3 - processor.Stop() - delete(r.taskProcessors, clusterName) - } - - if clusterInfo := newClusterMetadata[clusterName]; clusterInfo != nil && clusterInfo.Enabled { - // Case 2 and Case 3 - fetcher := r.replicationTaskFetcherFactory.GetOrCreateFetcher(clusterName) - pollingShardIds := r.taskPollerManager.getPollingShardIDs(clusterName) - for _, pollingShardId := range pollingShardIds { + pollingShardIds := r.taskPollerManager.getPollingShardIDs(clusterName) + for _, pollingShardId := range pollingShardIds { + perShardTaskProcessorKey := fmt.Sprintf(clusterCallbackKey, clusterName, pollingShardId) + // The metadata triggers an update when the following fields update: 1. Enabled 2. Initial Failover Version 3. Cluster address + // The callback covers three cases: + // Case 1: Remove a cluster Case 2: Add a new cluster Case 3: Refresh cluster metadata. + if processor, ok := r.taskProcessors[perShardTaskProcessorKey]; ok { + // Case 1 and Case 3 + processor.Stop() + delete(r.taskProcessors, perShardTaskProcessorKey) + } + if clusterInfo := newClusterMetadata[clusterName]; clusterInfo != nil && clusterInfo.Enabled { + // Case 2 and Case 3 + fetcher := r.replicationTaskFetcherFactory.GetOrCreateFetcher(clusterName) replicationTaskProcessor := NewTaskProcessor( pollingShardId, r.shard, @@ -207,8 +206,7 @@ func (r *taskProcessorManagerImpl) handleClusterMetadataUpdate( r.eventSerializer, ) replicationTaskProcessor.Start() - key := fmt.Sprintf(clusterCallbackKey, clusterName, pollingShardIds) - r.taskProcessors[key] = replicationTaskProcessor + r.taskProcessors[perShardTaskProcessorKey] = replicationTaskProcessor } } } From 811faa28aa7eaa510eef1fd228457bfbf35edeba Mon Sep 17 00:00:00 2001 From: yux0 Date: Wed, 11 Jan 2023 22:12:35 -0800 Subject: [PATCH 3/6] Rename method --- service/history/replication/poller_manager.go | 12 ++++++------ service/history/replication/poller_manager_test.go | 2 +- .../history/replication/task_processor_manager.go | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/service/history/replication/poller_manager.go b/service/history/replication/poller_manager.go index ce6213f3ffb..876a71679f1 100644 --- a/service/history/replication/poller_manager.go +++ b/service/history/replication/poller_manager.go @@ -32,7 +32,7 @@ import ( type ( pollerManager interface { - getPollingShardIDs(remoteClusterName string) []int32 + getSourceClusterShardIDs(sourceClusterName string) []int32 } pollerManagerImpl struct { @@ -53,21 +53,21 @@ func newPollerManager( } } -func (p pollerManagerImpl) getPollingShardIDs(remoteClusterName string) []int32 { +func (p pollerManagerImpl) getSourceClusterShardIDs(sourceClusterName string) []int32 { currentCluster := p.clusterMetadata.GetCurrentClusterName() allClusters := p.clusterMetadata.GetAllClusterInfo() currentClusterInfo, ok := allClusters[currentCluster] if !ok { panic("Cannot get current cluster info from cluster metadata cache") } - remoteClusterInfo, ok := allClusters[remoteClusterName] + remoteClusterInfo, ok := allClusters[sourceClusterName] if !ok { - panic(fmt.Sprintf("Cannot get remote cluster %s info from cluster metadata cache", remoteClusterName)) + panic(fmt.Sprintf("Cannot get source cluster %s info from cluster metadata cache", sourceClusterName)) } - return generatePollingShardIDs(p.currentShardId, currentClusterInfo.ShardCount, remoteClusterInfo.ShardCount) + return generateShardIDs(p.currentShardId, currentClusterInfo.ShardCount, remoteClusterInfo.ShardCount) } -func generatePollingShardIDs(localShardId int32, localShardCount int32, remoteShardCount int32) []int32 { +func generateShardIDs(localShardId int32, localShardCount int32, remoteShardCount int32) []int32 { var pollingShards []int32 if remoteShardCount <= localShardCount { if localShardId <= remoteShardCount || remoteShardCount == 0 { diff --git a/service/history/replication/poller_manager_test.go b/service/history/replication/poller_manager_test.go index 3b2be030b9d..abe69d9f79f 100644 --- a/service/history/replication/poller_manager_test.go +++ b/service/history/replication/poller_manager_test.go @@ -90,7 +90,7 @@ func TestGetPollingShardIds(t *testing.T) { t.Errorf("The code did not panic") } }() - shardIDs := generatePollingShardIDs(tt.shardID, tt.localShardCount, tt.remoteShardCount) + shardIDs := generateShardIDs(tt.shardID, tt.localShardCount, tt.remoteShardCount) assert.Equal(t, tt.expectedShardIDs, shardIDs) }) } diff --git a/service/history/replication/task_processor_manager.go b/service/history/replication/task_processor_manager.go index b30578d1623..56945637dfc 100644 --- a/service/history/replication/task_processor_manager.go +++ b/service/history/replication/task_processor_manager.go @@ -174,7 +174,7 @@ func (r *taskProcessorManagerImpl) handleClusterMetadataUpdate( if clusterName == currentClusterName { continue } - pollingShardIds := r.taskPollerManager.getPollingShardIDs(clusterName) + pollingShardIds := r.taskPollerManager.getSourceClusterShardIDs(clusterName) for _, pollingShardId := range pollingShardIds { perShardTaskProcessorKey := fmt.Sprintf(clusterCallbackKey, clusterName, pollingShardId) // The metadata triggers an update when the following fields update: 1. Enabled 2. Initial Failover Version 3. Cluster address From da5bb3a434a95fb0cc6e6db70560a24a27ebb5e8 Mon Sep 17 00:00:00 2001 From: yux0 Date: Thu, 12 Jan 2023 20:20:48 -0800 Subject: [PATCH 4/6] revert change to use the local shard id for config --- service/history/replication/task_processor.go | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/service/history/replication/task_processor.go b/service/history/replication/task_processor.go index 728534b6481..1ac17c7dedf 100644 --- a/service/history/replication/task_processor.go +++ b/service/history/replication/task_processor.go @@ -119,18 +119,19 @@ func NewTaskProcessor( replicationTaskExecutor TaskExecutor, eventSerializer serialization.Serializer, ) TaskProcessor { - taskRetryPolicy := backoff.NewExponentialRetryPolicy(config.ReplicationTaskProcessorErrorRetryWait(pollingShardID)). - WithBackoffCoefficient(config.ReplicationTaskProcessorErrorRetryBackoffCoefficient(pollingShardID)). - WithMaximumInterval(config.ReplicationTaskProcessorErrorRetryMaxInterval(pollingShardID)). - WithMaximumAttempts(config.ReplicationTaskProcessorErrorRetryMaxAttempts(pollingShardID)). - WithExpirationInterval(config.ReplicationTaskProcessorErrorRetryExpiration(pollingShardID)) + shardID := shard.GetShardID() + taskRetryPolicy := backoff.NewExponentialRetryPolicy(config.ReplicationTaskProcessorErrorRetryWait(shardID)). + WithBackoffCoefficient(config.ReplicationTaskProcessorErrorRetryBackoffCoefficient(shardID)). + WithMaximumInterval(config.ReplicationTaskProcessorErrorRetryMaxInterval(shardID)). + WithMaximumAttempts(config.ReplicationTaskProcessorErrorRetryMaxAttempts(shardID)). + WithExpirationInterval(config.ReplicationTaskProcessorErrorRetryExpiration(shardID)) // TODO: define separate set of configs for dlq retry - dlqRetryPolicy := backoff.NewExponentialRetryPolicy(config.ReplicationTaskProcessorErrorRetryWait(pollingShardID)). - WithBackoffCoefficient(config.ReplicationTaskProcessorErrorRetryBackoffCoefficient(pollingShardID)). - WithMaximumInterval(config.ReplicationTaskProcessorErrorRetryMaxInterval(pollingShardID)). - WithMaximumAttempts(config.ReplicationTaskProcessorErrorRetryMaxAttempts(pollingShardID)). - WithExpirationInterval(config.ReplicationTaskProcessorErrorRetryExpiration(pollingShardID)) + dlqRetryPolicy := backoff.NewExponentialRetryPolicy(config.ReplicationTaskProcessorErrorRetryWait(shardID)). + WithBackoffCoefficient(config.ReplicationTaskProcessorErrorRetryBackoffCoefficient(shardID)). + WithMaximumInterval(config.ReplicationTaskProcessorErrorRetryMaxInterval(shardID)). + WithMaximumAttempts(config.ReplicationTaskProcessorErrorRetryMaxAttempts(shardID)). + WithExpirationInterval(config.ReplicationTaskProcessorErrorRetryExpiration(shardID)) return &taskProcessorImpl{ status: common.DaemonStatusInitialized, From ce0687172b29813e8b3b6ab599fb87751e06fa32 Mon Sep 17 00:00:00 2001 From: yux0 Date: Thu, 12 Jan 2023 22:20:47 -0800 Subject: [PATCH 5/6] fix functional test --- service/history/replication/poller_manager.go | 3 +-- tests/test_cluster.go | 4 ++++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/service/history/replication/poller_manager.go b/service/history/replication/poller_manager.go index 876a71679f1..7e392077d2f 100644 --- a/service/history/replication/poller_manager.go +++ b/service/history/replication/poller_manager.go @@ -70,8 +70,7 @@ func (p pollerManagerImpl) getSourceClusterShardIDs(sourceClusterName string) [] func generateShardIDs(localShardId int32, localShardCount int32, remoteShardCount int32) []int32 { var pollingShards []int32 if remoteShardCount <= localShardCount { - if localShardId <= remoteShardCount || remoteShardCount == 0 { - // TODO: remove remoteShardCount == 0. This is due to current NDC/XDC functional test setup. + if localShardId <= remoteShardCount { pollingShards = append(pollingShards, localShardId) } return pollingShards diff --git a/tests/test_cluster.go b/tests/test_cluster.go index 5622d41adc7..de4c6c40e6e 100644 --- a/tests/test_cluster.go +++ b/tests/test_cluster.go @@ -169,7 +169,10 @@ func NewCluster(options *TestClusterConfig, logger log.Logger) (*TestCluster, er } } + clusterInfoMap := make(map[string]cluster.ClusterInformation) for clusterName, clusterInfo := range clusterMetadataConfig.ClusterInformation { + clusterInfo.ShardCount = options.HistoryConfig.NumHistoryShards + clusterInfoMap[clusterName] = clusterInfo _, err := testBase.ClusterMetadataManager.SaveClusterMetadata(context.Background(), &persistence.SaveClusterMetadataRequest{ ClusterMetadata: persistencespb.ClusterMetadata{ HistoryShardCount: options.HistoryConfig.NumHistoryShards, @@ -185,6 +188,7 @@ func NewCluster(options *TestClusterConfig, logger log.Logger) (*TestCluster, er return nil, err } } + clusterMetadataConfig.ClusterInformation = clusterInfoMap // This will save custom test search attributes to cluster metadata. // Actual Elasticsearch fields are created from index template (testdata/es_v7_index_template.json). From 48d1d0ce634f7b9c12fcf8099e1278d24a598ea6 Mon Sep 17 00:00:00 2001 From: yux0 Date: Fri, 13 Jan 2023 09:26:21 -0800 Subject: [PATCH 6/6] rename variables --- service/history/replication/task_processor.go | 10 +++++----- service/history/replication/task_processor_manager.go | 8 ++++---- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/service/history/replication/task_processor.go b/service/history/replication/task_processor.go index 2b466f97c8c..7910f52dd02 100644 --- a/service/history/replication/task_processor.go +++ b/service/history/replication/task_processor.go @@ -77,7 +77,7 @@ type ( status int32 sourceCluster string - pollingShardID int32 + sourceShardID int32 shard shard.Context historyEngine shard.Engine historySerializer serialization.Serializer @@ -110,7 +110,7 @@ type ( // NewTaskProcessor creates a new replication task processor. func NewTaskProcessor( - pollingShardID int32, + sourceShardID int32, shard shard.Context, historyEngine shard.Engine, config *configs.Config, @@ -135,7 +135,7 @@ func NewTaskProcessor( return &taskProcessorImpl{ status: common.DaemonStatusInitialized, - pollingShardID: pollingShardID, + sourceShardID: sourceShardID, sourceCluster: replicationTaskFetcher.getSourceCluster(), shard: shard, historyEngine: historyEngine, @@ -469,7 +469,7 @@ func (p *taskProcessorImpl) paginationFn(_ []byte) ([]interface{}, []byte, error respChan := make(chan *replicationspb.ReplicationMessages, 1) p.requestChan <- &replicationTaskRequest{ token: &replicationspb.ReplicationToken{ - ShardId: p.pollingShardID, + ShardId: p.sourceShardID, LastProcessedMessageId: p.maxRxProcessedTaskID, LastProcessedVisibilityTime: &p.maxRxProcessedTimestamp, LastRetrievedMessageId: p.maxRxReceivedTaskID, @@ -504,7 +504,7 @@ func (p *taskProcessorImpl) paginationFn(_ []byte) ([]interface{}, []byte, error if resp.GetHasMore() { p.rxTaskBackoff = time.Duration(0) } else { - p.rxTaskBackoff = p.config.ReplicationTaskProcessorNoTaskRetryWait(p.pollingShardID) + p.rxTaskBackoff = p.config.ReplicationTaskProcessorNoTaskRetryWait(p.sourceShardID) } return tasks, nil, nil diff --git a/service/history/replication/task_processor_manager.go b/service/history/replication/task_processor_manager.go index 92b50785fd7..a19ac75289d 100644 --- a/service/history/replication/task_processor_manager.go +++ b/service/history/replication/task_processor_manager.go @@ -174,9 +174,9 @@ func (r *taskProcessorManagerImpl) handleClusterMetadataUpdate( if clusterName == currentClusterName { continue } - pollingShardIds := r.taskPollerManager.getSourceClusterShardIDs(clusterName) - for _, pollingShardId := range pollingShardIds { - perShardTaskProcessorKey := fmt.Sprintf(clusterCallbackKey, clusterName, pollingShardId) + sourceShardIds := r.taskPollerManager.getSourceClusterShardIDs(clusterName) + for _, sourceShardId := range sourceShardIds { + perShardTaskProcessorKey := fmt.Sprintf(clusterCallbackKey, clusterName, sourceShardId) // The metadata triggers an update when the following fields update: 1. Enabled 2. Initial Failover Version 3. Cluster address // The callback covers three cases: // Case 1: Remove a cluster Case 2: Add a new cluster Case 3: Refresh cluster metadata. @@ -189,7 +189,7 @@ func (r *taskProcessorManagerImpl) handleClusterMetadataUpdate( // Case 2 and Case 3 fetcher := r.replicationTaskFetcherFactory.GetOrCreateFetcher(clusterName) replicationTaskProcessor := NewTaskProcessor( - pollingShardId, + sourceShardId, r.shard, r.engine, r.config,