Skip to content

Commit

Permalink
Allow connect clusters with different shards (#3777)
Browse files Browse the repository at this point in the history
* Allow connect clusters with different shards
  • Loading branch information
yux0 authored Jan 13, 2023
1 parent 51fd1dd commit 3948854
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 34 deletions.
12 changes: 9 additions & 3 deletions service/frontend/adminHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1580,9 +1580,15 @@ func (adh *AdminHandler) validateRemoteClusterMetadata(metadata *adminservice.De
return serviceerror.NewInvalidArgument("Cannot add remote cluster due to failover version increment mismatch")
}
if metadata.GetHistoryShardCount() != adh.config.NumHistoryShards {
// cluster shard number not equal
// TODO: remove this check once we support different shard numbers
return serviceerror.NewInvalidArgument("Cannot add remote cluster due to history shard number mismatch")
remoteShardCount := metadata.GetHistoryShardCount()
large := remoteShardCount
small := adh.config.NumHistoryShards
if large < small {
small, large = large, small
}
if large%small != 0 {
return serviceerror.NewInvalidArgument("Remote cluster shard number and local cluster shard number are not multiples.")
}
}
if !metadata.IsGlobalNamespaceEnabled {
// remote cluster doesn't support global namespace
Expand Down
70 changes: 56 additions & 14 deletions service/frontend/adminHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,9 @@ func (s *adminHandlerSuite) SetupTest() {
NumHistoryShards: 1,
}

cfg := &Config{}
cfg := &Config{
NumHistoryShards: 4,
}
args := NewAdminHandlerArgs{
persistenceConfig,
cfg,
Expand Down Expand Up @@ -793,7 +795,7 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_RecordFound_Success()
&adminservice.DescribeClusterResponse{
ClusterId: clusterId,
ClusterName: clusterName,
HistoryShardCount: 0,
HistoryShardCount: 4,
FailoverVersionIncrement: 0,
InitialFailoverVersion: 0,
IsGlobalNamespaceEnabled: true,
Expand All @@ -805,7 +807,7 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_RecordFound_Success()
s.mockClusterMetadataManager.EXPECT().SaveClusterMetadata(gomock.Any(), &persistence.SaveClusterMetadataRequest{
ClusterMetadata: persistencespb.ClusterMetadata{
ClusterName: clusterName,
HistoryShardCount: 0,
HistoryShardCount: 4,
ClusterId: clusterId,
ClusterAddress: rpcAddress,
FailoverVersionIncrement: 0,
Expand All @@ -832,7 +834,7 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_RecordNotFound_Success
&adminservice.DescribeClusterResponse{
ClusterId: clusterId,
ClusterName: clusterName,
HistoryShardCount: 0,
HistoryShardCount: 4,
FailoverVersionIncrement: 0,
InitialFailoverVersion: 0,
IsGlobalNamespaceEnabled: true,
Expand All @@ -844,7 +846,7 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_RecordNotFound_Success
s.mockClusterMetadataManager.EXPECT().SaveClusterMetadata(gomock.Any(), &persistence.SaveClusterMetadataRequest{
ClusterMetadata: persistencespb.ClusterMetadata{
ClusterName: clusterName,
HistoryShardCount: 0,
HistoryShardCount: 4,
ClusterId: clusterId,
ClusterAddress: rpcAddress,
FailoverVersionIncrement: 0,
Expand Down Expand Up @@ -901,7 +903,7 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_Failov
s.IsType(&serviceerror.InvalidArgument{}, err)
}

func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_ShardCountMismatch() {
func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_ShardCount_Invalid() {
var rpcAddress = uuid.New()
var clusterName = uuid.New()
var clusterId = uuid.New()
Expand All @@ -914,7 +916,7 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_ShardC
&adminservice.DescribeClusterResponse{
ClusterId: clusterId,
ClusterName: clusterName,
HistoryShardCount: 1000,
HistoryShardCount: 5,
FailoverVersionIncrement: 0,
InitialFailoverVersion: 0,
IsGlobalNamespaceEnabled: true,
Expand All @@ -924,6 +926,46 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_ShardC
s.IsType(&serviceerror.InvalidArgument{}, err)
}

func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_ShardCount_Multiple() {
var rpcAddress = uuid.New()
var clusterName = uuid.New()
var clusterId = uuid.New()
var recordVersion int64 = 5

s.mockMetadata.EXPECT().GetFailoverVersionIncrement().Return(int64(0))
s.mockMetadata.EXPECT().GetAllClusterInfo().Return(make(map[string]cluster.ClusterInformation))
s.mockClientFactory.EXPECT().NewRemoteAdminClientWithTimeout(rpcAddress, gomock.Any(), gomock.Any()).Return(
s.mockAdminClient,
)
s.mockAdminClient.EXPECT().DescribeCluster(gomock.Any(), &adminservice.DescribeClusterRequest{}).Return(
&adminservice.DescribeClusterResponse{
ClusterId: clusterId,
ClusterName: clusterName,
HistoryShardCount: 16,
FailoverVersionIncrement: 0,
InitialFailoverVersion: 0,
IsGlobalNamespaceEnabled: true,
}, nil)
s.mockClusterMetadataManager.EXPECT().GetClusterMetadata(gomock.Any(), &persistence.GetClusterMetadataRequest{ClusterName: clusterName}).Return(
&persistence.GetClusterMetadataResponse{
Version: recordVersion,
}, nil)
s.mockClusterMetadataManager.EXPECT().SaveClusterMetadata(gomock.Any(), &persistence.SaveClusterMetadataRequest{
ClusterMetadata: persistencespb.ClusterMetadata{
ClusterName: clusterName,
HistoryShardCount: 16,
ClusterId: clusterId,
ClusterAddress: rpcAddress,
FailoverVersionIncrement: 0,
InitialFailoverVersion: 0,
IsGlobalNamespaceEnabled: true,
},
Version: recordVersion,
}).Return(true, nil)
_, err := s.handler.AddOrUpdateRemoteCluster(context.Background(), &adminservice.AddOrUpdateRemoteClusterRequest{FrontendAddress: rpcAddress})
s.NoError(err)
}

func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_GlobalNamespaceDisabled() {
var rpcAddress = uuid.New()
var clusterName = uuid.New()
Expand All @@ -937,7 +979,7 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_Global
&adminservice.DescribeClusterResponse{
ClusterId: clusterId,
ClusterName: clusterName,
HistoryShardCount: 0,
HistoryShardCount: 4,
FailoverVersionIncrement: 0,
InitialFailoverVersion: 0,
IsGlobalNamespaceEnabled: false,
Expand All @@ -963,7 +1005,7 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_Initia
&adminservice.DescribeClusterResponse{
ClusterId: clusterId,
ClusterName: clusterName,
HistoryShardCount: 0,
HistoryShardCount: 4,
FailoverVersionIncrement: 0,
InitialFailoverVersion: 0,
IsGlobalNamespaceEnabled: true,
Expand Down Expand Up @@ -1001,7 +1043,7 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_GetClusterMetadata_Err
&adminservice.DescribeClusterResponse{
ClusterId: clusterId,
ClusterName: clusterName,
HistoryShardCount: 0,
HistoryShardCount: 4,
FailoverVersionIncrement: 0,
InitialFailoverVersion: 0,
IsGlobalNamespaceEnabled: true,
Expand All @@ -1028,7 +1070,7 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_SaveClusterMetadata_Er
&adminservice.DescribeClusterResponse{
ClusterId: clusterId,
ClusterName: clusterName,
HistoryShardCount: 0,
HistoryShardCount: 4,
FailoverVersionIncrement: 0,
InitialFailoverVersion: 0,
IsGlobalNamespaceEnabled: true,
Expand All @@ -1040,7 +1082,7 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_SaveClusterMetadata_Er
s.mockClusterMetadataManager.EXPECT().SaveClusterMetadata(gomock.Any(), &persistence.SaveClusterMetadataRequest{
ClusterMetadata: persistencespb.ClusterMetadata{
ClusterName: clusterName,
HistoryShardCount: 0,
HistoryShardCount: 4,
ClusterId: clusterId,
ClusterAddress: rpcAddress,
FailoverVersionIncrement: 0,
Expand All @@ -1067,7 +1109,7 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_SaveClusterMetadata_No
&adminservice.DescribeClusterResponse{
ClusterId: clusterId,
ClusterName: clusterName,
HistoryShardCount: 0,
HistoryShardCount: 4,
FailoverVersionIncrement: 0,
InitialFailoverVersion: 0,
IsGlobalNamespaceEnabled: true,
Expand All @@ -1079,7 +1121,7 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_SaveClusterMetadata_No
s.mockClusterMetadataManager.EXPECT().SaveClusterMetadata(gomock.Any(), &persistence.SaveClusterMetadataRequest{
ClusterMetadata: persistencespb.ClusterMetadata{
ClusterName: clusterName,
HistoryShardCount: 0,
HistoryShardCount: 4,
ClusterId: clusterId,
ClusterAddress: rpcAddress,
FailoverVersionIncrement: 0,
Expand Down
12 changes: 9 additions & 3 deletions service/frontend/operator_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,9 +499,15 @@ func (h *OperatorHandlerImpl) validateRemoteClusterMetadata(metadata *adminservi
return serviceerror.NewInvalidArgument("Cannot add remote cluster due to failover version increment mismatch")
}
if metadata.GetHistoryShardCount() != h.config.NumHistoryShards {
// cluster shard number not equal
// TODO: remove this check once we support different shard numbers
return serviceerror.NewInvalidArgument("Cannot add remote cluster due to history shard number mismatch")
remoteShardCount := metadata.GetHistoryShardCount()
large := remoteShardCount
small := h.config.NumHistoryShards
if large < small {
small, large = large, small
}
if large%small != 0 {
return serviceerror.NewInvalidArgument("Remote cluster shard number and local cluster shard number are not multiples.")
}
}
if !metadata.IsGlobalNamespaceEnabled {
// remote cluster doesn't support global namespace
Expand Down
68 changes: 54 additions & 14 deletions service/frontend/operator_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (s *operatorHandlerSuite) SetupTest() {
s.mockResource.ClusterMetadata.EXPECT().GetCurrentClusterName().Return(uuid.New()).AnyTimes()

args := NewOperatorHandlerImplArgs{
&Config{},
&Config{NumHistoryShards: 4},
nil,
s.mockResource.ESClient,
s.mockResource.Logger,
Expand Down Expand Up @@ -506,7 +506,7 @@ func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_RecordFound_Success
&adminservice.DescribeClusterResponse{
ClusterId: clusterId,
ClusterName: clusterName,
HistoryShardCount: 0,
HistoryShardCount: 4,
FailoverVersionIncrement: 0,
InitialFailoverVersion: 0,
IsGlobalNamespaceEnabled: true,
Expand All @@ -518,7 +518,7 @@ func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_RecordFound_Success
s.mockResource.ClusterMetadataMgr.EXPECT().SaveClusterMetadata(gomock.Any(), &persistence.SaveClusterMetadataRequest{
ClusterMetadata: persistencespb.ClusterMetadata{
ClusterName: clusterName,
HistoryShardCount: 0,
HistoryShardCount: 4,
ClusterId: clusterId,
ClusterAddress: rpcAddress,
FailoverVersionIncrement: 0,
Expand All @@ -545,7 +545,7 @@ func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_RecordNotFound_Succ
&adminservice.DescribeClusterResponse{
ClusterId: clusterId,
ClusterName: clusterName,
HistoryShardCount: 0,
HistoryShardCount: 4,
FailoverVersionIncrement: 0,
InitialFailoverVersion: 0,
IsGlobalNamespaceEnabled: true,
Expand All @@ -557,7 +557,7 @@ func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_RecordNotFound_Succ
s.mockResource.ClusterMetadataMgr.EXPECT().SaveClusterMetadata(gomock.Any(), &persistence.SaveClusterMetadataRequest{
ClusterMetadata: persistencespb.ClusterMetadata{
ClusterName: clusterName,
HistoryShardCount: 0,
HistoryShardCount: 4,
ClusterId: clusterId,
ClusterAddress: rpcAddress,
FailoverVersionIncrement: 0,
Expand Down Expand Up @@ -614,7 +614,7 @@ func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_Fai
s.IsType(&serviceerror.InvalidArgument{}, err)
}

func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_ShardCountMismatch() {
func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_ShardCount_Invalid() {
var rpcAddress = uuid.New()
var clusterName = uuid.New()
var clusterId = uuid.New()
Expand All @@ -627,7 +627,7 @@ func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_Sha
&adminservice.DescribeClusterResponse{
ClusterId: clusterId,
ClusterName: clusterName,
HistoryShardCount: 1000,
HistoryShardCount: 5,
FailoverVersionIncrement: 0,
InitialFailoverVersion: 0,
IsGlobalNamespaceEnabled: true,
Expand All @@ -637,6 +637,46 @@ func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_Sha
s.IsType(&serviceerror.InvalidArgument{}, err)
}

func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_ShardCount_Multiple() {
var rpcAddress = uuid.New()
var clusterName = uuid.New()
var clusterId = uuid.New()
var recordVersion int64 = 5

s.mockResource.ClusterMetadata.EXPECT().GetFailoverVersionIncrement().Return(int64(0))
s.mockResource.ClusterMetadata.EXPECT().GetAllClusterInfo().Return(make(map[string]cluster.ClusterInformation))
s.mockResource.ClientFactory.EXPECT().NewRemoteAdminClientWithTimeout(rpcAddress, gomock.Any(), gomock.Any()).Return(
s.mockResource.RemoteAdminClient,
)
s.mockResource.RemoteAdminClient.EXPECT().DescribeCluster(gomock.Any(), &adminservice.DescribeClusterRequest{}).Return(
&adminservice.DescribeClusterResponse{
ClusterId: clusterId,
ClusterName: clusterName,
HistoryShardCount: 16,
FailoverVersionIncrement: 0,
InitialFailoverVersion: 0,
IsGlobalNamespaceEnabled: true,
}, nil)
s.mockResource.ClusterMetadataMgr.EXPECT().GetClusterMetadata(gomock.Any(), &persistence.GetClusterMetadataRequest{ClusterName: clusterName}).Return(
&persistence.GetClusterMetadataResponse{
Version: recordVersion,
}, nil)
s.mockResource.ClusterMetadataMgr.EXPECT().SaveClusterMetadata(gomock.Any(), &persistence.SaveClusterMetadataRequest{
ClusterMetadata: persistencespb.ClusterMetadata{
ClusterName: clusterName,
HistoryShardCount: 16,
ClusterId: clusterId,
ClusterAddress: rpcAddress,
FailoverVersionIncrement: 0,
InitialFailoverVersion: 0,
IsGlobalNamespaceEnabled: true,
},
Version: recordVersion,
}).Return(true, nil)
_, err := s.handler.AddOrUpdateRemoteCluster(context.Background(), &operatorservice.AddOrUpdateRemoteClusterRequest{FrontendAddress: rpcAddress})
s.NoError(err)
}

func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_GlobalNamespaceDisabled() {
var rpcAddress = uuid.New()
var clusterName = uuid.New()
Expand All @@ -650,7 +690,7 @@ func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_Glo
&adminservice.DescribeClusterResponse{
ClusterId: clusterId,
ClusterName: clusterName,
HistoryShardCount: 0,
HistoryShardCount: 4,
FailoverVersionIncrement: 0,
InitialFailoverVersion: 0,
IsGlobalNamespaceEnabled: false,
Expand All @@ -676,7 +716,7 @@ func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_Ini
&adminservice.DescribeClusterResponse{
ClusterId: clusterId,
ClusterName: clusterName,
HistoryShardCount: 0,
HistoryShardCount: 4,
FailoverVersionIncrement: 0,
InitialFailoverVersion: 0,
IsGlobalNamespaceEnabled: true,
Expand Down Expand Up @@ -714,7 +754,7 @@ func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_GetClusterMetadata_
&adminservice.DescribeClusterResponse{
ClusterId: clusterId,
ClusterName: clusterName,
HistoryShardCount: 0,
HistoryShardCount: 4,
FailoverVersionIncrement: 0,
InitialFailoverVersion: 0,
IsGlobalNamespaceEnabled: true,
Expand All @@ -741,7 +781,7 @@ func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_SaveClusterMetadata
&adminservice.DescribeClusterResponse{
ClusterId: clusterId,
ClusterName: clusterName,
HistoryShardCount: 0,
HistoryShardCount: 4,
FailoverVersionIncrement: 0,
InitialFailoverVersion: 0,
IsGlobalNamespaceEnabled: true,
Expand All @@ -753,7 +793,7 @@ func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_SaveClusterMetadata
s.mockResource.ClusterMetadataMgr.EXPECT().SaveClusterMetadata(gomock.Any(), &persistence.SaveClusterMetadataRequest{
ClusterMetadata: persistencespb.ClusterMetadata{
ClusterName: clusterName,
HistoryShardCount: 0,
HistoryShardCount: 4,
ClusterId: clusterId,
ClusterAddress: rpcAddress,
FailoverVersionIncrement: 0,
Expand All @@ -780,7 +820,7 @@ func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_SaveClusterMetadata
&adminservice.DescribeClusterResponse{
ClusterId: clusterId,
ClusterName: clusterName,
HistoryShardCount: 0,
HistoryShardCount: 4,
FailoverVersionIncrement: 0,
InitialFailoverVersion: 0,
IsGlobalNamespaceEnabled: true,
Expand All @@ -792,7 +832,7 @@ func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_SaveClusterMetadata
s.mockResource.ClusterMetadataMgr.EXPECT().SaveClusterMetadata(gomock.Any(), &persistence.SaveClusterMetadataRequest{
ClusterMetadata: persistencespb.ClusterMetadata{
ClusterName: clusterName,
HistoryShardCount: 0,
HistoryShardCount: 4,
ClusterId: clusterId,
ClusterAddress: rpcAddress,
FailoverVersionIncrement: 0,
Expand Down

0 comments on commit 3948854

Please sign in to comment.