Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow start many replication pollers in one shard #3790

Merged
merged 7 commits into from
Jan 13, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions service/history/replication/poller_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,18 @@ import (
)

type (
pollerManager interface {
getSourceClusterShardIDs(sourceClusterName string) []int32
}

pollerManagerImpl struct {
currentShardId int32
clusterMetadata cluster.Metadata
}
)

var _ pollerManager = (*pollerManagerImpl)(nil)

func newPollerManager(
currentShardId int32,
clusterMetadata cluster.Metadata,
Expand All @@ -47,24 +53,25 @@ func newPollerManager(
}
}

func (p pollerManagerImpl) getPollingShardIDs(remoteClusterName string) []int32 {
func (p pollerManagerImpl) getSourceClusterShardIDs(sourceClusterName string) []int32 {
currentCluster := p.clusterMetadata.GetCurrentClusterName()
allClusters := p.clusterMetadata.GetAllClusterInfo()
currentClusterInfo, ok := allClusters[currentCluster]
if !ok {
panic("Cannot get current cluster info from cluster metadata cache")
}
remoteClusterInfo, ok := allClusters[remoteClusterName]
remoteClusterInfo, ok := allClusters[sourceClusterName]
if !ok {
panic(fmt.Sprintf("Cannot get remote cluster %s info from cluster metadata cache", remoteClusterName))
panic(fmt.Sprintf("Cannot get source cluster %s info from cluster metadata cache", sourceClusterName))
}
return generatePollingShardIDs(p.currentShardId, currentClusterInfo.ShardCount, remoteClusterInfo.ShardCount)
return generateShardIDs(p.currentShardId, currentClusterInfo.ShardCount, remoteClusterInfo.ShardCount)
}

func generatePollingShardIDs(localShardId int32, localShardCount int32, remoteShardCount int32) []int32 {
func generateShardIDs(localShardId int32, localShardCount int32, remoteShardCount int32) []int32 {
var pollingShards []int32
if remoteShardCount <= localShardCount {
if localShardId <= remoteShardCount {
if localShardId <= remoteShardCount || remoteShardCount == 0 {
// TODO: remove remoteShardCount == 0. This is due to current NDC/XDC functional test setup.
pollingShards = append(pollingShards, localShardId)
}
return pollingShards
Expand Down
2 changes: 1 addition & 1 deletion service/history/replication/poller_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestGetPollingShardIds(t *testing.T) {
t.Errorf("The code did not panic")
}
}()
shardIDs := generatePollingShardIDs(tt.shardID, tt.localShardCount, tt.remoteShardCount)
shardIDs := generateShardIDs(tt.shardID, tt.localShardCount, tt.remoteShardCount)
assert.Equal(t, tt.expectedShardIDs, shardIDs)
})
}
Expand Down
38 changes: 21 additions & 17 deletions service/history/replication/task_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,10 @@ type (

// taskProcessorImpl is responsible for processing replication tasks for a shard.
taskProcessorImpl struct {
currentCluster string
status int32

sourceCluster string
status int32
pollingShardID int32
shard shard.Context
historyEngine shard.Engine
historySerializer serialization.Serializer
Expand Down Expand Up @@ -109,6 +110,7 @@ type (

// NewTaskProcessor creates a new replication task processor.
func NewTaskProcessor(
pollingShardID int32,
shard shard.Context,
historyEngine shard.Engine,
config *configs.Config,
Expand All @@ -117,24 +119,23 @@ func NewTaskProcessor(
replicationTaskExecutor TaskExecutor,
eventSerializer serialization.Serializer,
) TaskProcessor {
shardID := shard.GetShardID()
taskRetryPolicy := backoff.NewExponentialRetryPolicy(config.ReplicationTaskProcessorErrorRetryWait(shardID)).
WithBackoffCoefficient(config.ReplicationTaskProcessorErrorRetryBackoffCoefficient(shardID)).
WithMaximumInterval(config.ReplicationTaskProcessorErrorRetryMaxInterval(shardID)).
WithMaximumAttempts(config.ReplicationTaskProcessorErrorRetryMaxAttempts(shardID)).
WithExpirationInterval(config.ReplicationTaskProcessorErrorRetryExpiration(shardID))
taskRetryPolicy := backoff.NewExponentialRetryPolicy(config.ReplicationTaskProcessorErrorRetryWait(pollingShardID)).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This config cannot be based on pollingShardID, right? or why would it be?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you have 3 clusters and the shard counts are all different, then what will you specify in the config?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. You are right. I am going to revert this.

WithBackoffCoefficient(config.ReplicationTaskProcessorErrorRetryBackoffCoefficient(pollingShardID)).
WithMaximumInterval(config.ReplicationTaskProcessorErrorRetryMaxInterval(pollingShardID)).
WithMaximumAttempts(config.ReplicationTaskProcessorErrorRetryMaxAttempts(pollingShardID)).
WithExpirationInterval(config.ReplicationTaskProcessorErrorRetryExpiration(pollingShardID))

// TODO: define separate set of configs for dlq retry
dlqRetryPolicy := backoff.NewExponentialRetryPolicy(config.ReplicationTaskProcessorErrorRetryWait(shardID)).
WithBackoffCoefficient(config.ReplicationTaskProcessorErrorRetryBackoffCoefficient(shardID)).
WithMaximumInterval(config.ReplicationTaskProcessorErrorRetryMaxInterval(shardID)).
WithMaximumAttempts(config.ReplicationTaskProcessorErrorRetryMaxAttempts(shardID)).
WithExpirationInterval(config.ReplicationTaskProcessorErrorRetryExpiration(shardID))
dlqRetryPolicy := backoff.NewExponentialRetryPolicy(config.ReplicationTaskProcessorErrorRetryWait(pollingShardID)).
WithBackoffCoefficient(config.ReplicationTaskProcessorErrorRetryBackoffCoefficient(pollingShardID)).
WithMaximumInterval(config.ReplicationTaskProcessorErrorRetryMaxInterval(pollingShardID)).
WithMaximumAttempts(config.ReplicationTaskProcessorErrorRetryMaxAttempts(pollingShardID)).
WithExpirationInterval(config.ReplicationTaskProcessorErrorRetryExpiration(pollingShardID))

return &taskProcessorImpl{
currentCluster: shard.GetClusterMetadata().GetCurrentClusterName(),
sourceCluster: replicationTaskFetcher.getSourceCluster(),
status: common.DaemonStatusInitialized,
pollingShardID: pollingShardID,
sourceCluster: replicationTaskFetcher.getSourceCluster(),
shard: shard,
historyEngine: historyEngine,
historySerializer: eventSerializer,
Expand Down Expand Up @@ -370,6 +371,7 @@ func (p *taskProcessorImpl) convertTaskToDLQTask(
switch replicationTask.TaskType {
case enumsspb.REPLICATION_TASK_TYPE_SYNC_ACTIVITY_TASK:
taskAttributes := replicationTask.GetSyncActivityTaskAttributes()
// TODO: GetShardID will break GetDLQReplicationMessages we need to handle DLQ for cross shard replication.
return &persistence.PutReplicationTaskToDLQRequest{
ShardID: p.shard.GetShardID(),
SourceClusterName: p.sourceCluster,
Expand Down Expand Up @@ -401,6 +403,7 @@ func (p *taskProcessorImpl) convertTaskToDLQTask(
// NOTE: last event vs next event, next event ID is exclusive
nextEventID := lastEvent.GetEventId() + 1

// TODO: GetShardID will break GetDLQReplicationMessages we need to handle DLQ for cross shard replication.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why will it break? Using local shardID for storing DLQ seems the right behavior to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Here is ok. But when hydrate the replication task from the source cluster, it needs to calculate the shard id instead of using this. We will not hit this case in a short term.

return &persistence.PutReplicationTaskToDLQRequest{
ShardID: p.shard.GetShardID(),
SourceClusterName: p.sourceCluster,
Expand Down Expand Up @@ -429,6 +432,7 @@ func (p *taskProcessorImpl) convertTaskToDLQTask(
return nil, err
}

// TODO: GetShardID will break GetDLQReplicationMessages we need to handle DLQ for cross shard replication.
return &persistence.PutReplicationTaskToDLQRequest{
ShardID: p.shard.GetShardID(),
SourceClusterName: p.sourceCluster,
Expand All @@ -451,7 +455,7 @@ func (p *taskProcessorImpl) paginationFn(_ []byte) ([]interface{}, []byte, error
respChan := make(chan *replicationspb.ReplicationMessages, 1)
p.requestChan <- &replicationTaskRequest{
token: &replicationspb.ReplicationToken{
ShardId: p.shard.GetShardID(),
ShardId: p.pollingShardID,
LastProcessedMessageId: p.maxRxProcessedTaskID,
LastProcessedVisibilityTime: &p.maxRxProcessedTimestamp,
LastRetrievedMessageId: p.maxRxReceivedTaskID,
Expand Down Expand Up @@ -486,7 +490,7 @@ func (p *taskProcessorImpl) paginationFn(_ []byte) ([]interface{}, []byte, error
if resp.GetHasMore() {
p.rxTaskBackoff = time.Duration(0)
} else {
p.rxTaskBackoff = p.config.ReplicationTaskProcessorNoTaskRetryWait(p.shard.GetShardID())
p.rxTaskBackoff = p.config.ReplicationTaskProcessorNoTaskRetryWait(p.pollingShardID)
}
return tasks, nil, nil

Expand Down
72 changes: 41 additions & 31 deletions service/history/replication/task_processor_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package replication

import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
Expand All @@ -49,6 +50,10 @@ import (
wcache "go.temporal.io/server/service/history/workflow/cache"
)

const (
clusterCallbackKey = "%s-%d" // <cluster name>-<polling shard id>
)

type (
// taskProcessorManagerImpl is to manage replication task processors
taskProcessorManagerImpl struct {
Expand All @@ -62,6 +67,7 @@ type (
workflowCache wcache.Cache
resender xdc.NDCHistoryResender
taskExecutorProvider TaskExecutorProvider
taskPollerManager pollerManager
metricsHandler metrics.Handler
logger log.Logger

Expand Down Expand Up @@ -110,6 +116,7 @@ func NewTaskProcessorManager(
metricsHandler: shard.GetMetricsHandler(),
taskProcessors: make(map[string]TaskProcessor),
taskExecutorProvider: taskExecutorProvider,
taskPollerManager: newPollerManager(shard.GetShardID(), shard.GetClusterMetadata()),
minTxAckedTaskID: persistence.EmptyQueueMessageID,
shutdownChan: make(chan struct{}),
}
Expand Down Expand Up @@ -167,37 +174,40 @@ func (r *taskProcessorManagerImpl) handleClusterMetadataUpdate(
if clusterName == currentClusterName {
continue
}
// The metadata triggers a update when the following fields update: 1. Enabled 2. Initial Failover Version 3. Cluster address
// The callback covers three cases:
// Case 1: Remove a cluster Case 2: Add a new cluster Case 3: Refresh cluster metadata.

if processor, ok := r.taskProcessors[clusterName]; ok {
// Case 1 and Case 3
processor.Stop()
delete(r.taskProcessors, clusterName)
}

if clusterInfo := newClusterMetadata[clusterName]; clusterInfo != nil && clusterInfo.Enabled {
// Case 2 and Case 3
fetcher := r.replicationTaskFetcherFactory.GetOrCreateFetcher(clusterName)
replicationTaskProcessor := NewTaskProcessor(
r.shard,
r.engine,
r.config,
r.shard.GetMetricsHandler(),
fetcher,
r.taskExecutorProvider(TaskExecutorParams{
RemoteCluster: clusterName,
Shard: r.shard,
HistoryResender: r.resender,
HistoryEngine: r.engine,
DeleteManager: r.deleteMgr,
WorkflowCache: r.workflowCache,
}),
r.eventSerializer,
)
replicationTaskProcessor.Start()
r.taskProcessors[clusterName] = replicationTaskProcessor
pollingShardIds := r.taskPollerManager.getSourceClusterShardIDs(clusterName)
for _, pollingShardId := range pollingShardIds {
perShardTaskProcessorKey := fmt.Sprintf(clusterCallbackKey, clusterName, pollingShardId)
// The metadata triggers an update when the following fields update: 1. Enabled 2. Initial Failover Version 3. Cluster address
// The callback covers three cases:
// Case 1: Remove a cluster Case 2: Add a new cluster Case 3: Refresh cluster metadata.
if processor, ok := r.taskProcessors[perShardTaskProcessorKey]; ok {
// Case 1 and Case 3
processor.Stop()
delete(r.taskProcessors, perShardTaskProcessorKey)
}
if clusterInfo := newClusterMetadata[clusterName]; clusterInfo != nil && clusterInfo.Enabled {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if a cluster exists in both old & new cluster metadata? stop & restart?
what if a cluster only exists in both new cluster metadata? noop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if a cluster exists in both old and new, it means the metadata is updated. So stop the processor and restart it to load the new metadata. If a cluster only exists in new cluster. It means this cluster is newly added, so we just start a new processor.

// Case 2 and Case 3
fetcher := r.replicationTaskFetcherFactory.GetOrCreateFetcher(clusterName)
replicationTaskProcessor := NewTaskProcessor(
pollingShardId,
r.shard,
r.engine,
r.config,
r.shard.GetMetricsHandler(),
fetcher,
r.taskExecutorProvider(TaskExecutorParams{
RemoteCluster: clusterName,
Shard: r.shard,
HistoryResender: r.resender,
HistoryEngine: r.engine,
DeleteManager: r.deleteMgr,
WorkflowCache: r.workflowCache,
}),
r.eventSerializer,
)
replicationTaskProcessor.Start()
r.taskProcessors[perShardTaskProcessorKey] = replicationTaskProcessor
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions service/history/replication/task_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func (s *taskProcessorSuite) SetupTest() {
metricsClient := metrics.NoopMetricsHandler

s.replicationTaskProcessor = NewTaskProcessor(
s.shardID,
s.mockShard,
s.mockEngine,
s.config,
Expand Down