From 394885498162b61778a3d3fc9f545ea839717768 Mon Sep 17 00:00:00 2001 From: Yu Xia Date: Fri, 13 Jan 2023 10:36:27 -0800 Subject: [PATCH] Allow connect clusters with different shards (#3777) * Allow connect clusters with different shards --- service/frontend/adminHandler.go | 12 +++- service/frontend/adminHandler_test.go | 70 ++++++++++++++++++----- service/frontend/operator_handler.go | 12 +++- service/frontend/operator_handler_test.go | 68 +++++++++++++++++----- 4 files changed, 128 insertions(+), 34 deletions(-) diff --git a/service/frontend/adminHandler.go b/service/frontend/adminHandler.go index 3e8c687959a..63a48d324e9 100644 --- a/service/frontend/adminHandler.go +++ b/service/frontend/adminHandler.go @@ -1580,9 +1580,15 @@ func (adh *AdminHandler) validateRemoteClusterMetadata(metadata *adminservice.De return serviceerror.NewInvalidArgument("Cannot add remote cluster due to failover version increment mismatch") } if metadata.GetHistoryShardCount() != adh.config.NumHistoryShards { - // cluster shard number not equal - // TODO: remove this check once we support different shard numbers - return serviceerror.NewInvalidArgument("Cannot add remote cluster due to history shard number mismatch") + remoteShardCount := metadata.GetHistoryShardCount() + large := remoteShardCount + small := adh.config.NumHistoryShards + if large < small { + small, large = large, small + } + if large%small != 0 { + return serviceerror.NewInvalidArgument("Remote cluster shard number and local cluster shard number are not multiples.") + } } if !metadata.IsGlobalNamespaceEnabled { // remote cluster doesn't support global namespace diff --git a/service/frontend/adminHandler_test.go b/service/frontend/adminHandler_test.go index f006d54ff59..19ab1314033 100644 --- a/service/frontend/adminHandler_test.go +++ b/service/frontend/adminHandler_test.go @@ -134,7 +134,9 @@ func (s *adminHandlerSuite) SetupTest() { NumHistoryShards: 1, } - cfg := &Config{} + cfg := &Config{ + NumHistoryShards: 4, + } args := NewAdminHandlerArgs{ persistenceConfig, cfg, @@ -793,7 +795,7 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_RecordFound_Success() &adminservice.DescribeClusterResponse{ ClusterId: clusterId, ClusterName: clusterName, - HistoryShardCount: 0, + HistoryShardCount: 4, FailoverVersionIncrement: 0, InitialFailoverVersion: 0, IsGlobalNamespaceEnabled: true, @@ -805,7 +807,7 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_RecordFound_Success() s.mockClusterMetadataManager.EXPECT().SaveClusterMetadata(gomock.Any(), &persistence.SaveClusterMetadataRequest{ ClusterMetadata: persistencespb.ClusterMetadata{ ClusterName: clusterName, - HistoryShardCount: 0, + HistoryShardCount: 4, ClusterId: clusterId, ClusterAddress: rpcAddress, FailoverVersionIncrement: 0, @@ -832,7 +834,7 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_RecordNotFound_Success &adminservice.DescribeClusterResponse{ ClusterId: clusterId, ClusterName: clusterName, - HistoryShardCount: 0, + HistoryShardCount: 4, FailoverVersionIncrement: 0, InitialFailoverVersion: 0, IsGlobalNamespaceEnabled: true, @@ -844,7 +846,7 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_RecordNotFound_Success s.mockClusterMetadataManager.EXPECT().SaveClusterMetadata(gomock.Any(), &persistence.SaveClusterMetadataRequest{ ClusterMetadata: persistencespb.ClusterMetadata{ ClusterName: clusterName, - HistoryShardCount: 0, + HistoryShardCount: 4, ClusterId: clusterId, ClusterAddress: rpcAddress, FailoverVersionIncrement: 0, @@ -901,7 +903,7 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_Failov s.IsType(&serviceerror.InvalidArgument{}, err) } -func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_ShardCountMismatch() { +func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_ShardCount_Invalid() { var rpcAddress = uuid.New() var clusterName = uuid.New() var clusterId = uuid.New() @@ -914,7 +916,7 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_ShardC &adminservice.DescribeClusterResponse{ ClusterId: clusterId, ClusterName: clusterName, - HistoryShardCount: 1000, + HistoryShardCount: 5, FailoverVersionIncrement: 0, InitialFailoverVersion: 0, IsGlobalNamespaceEnabled: true, @@ -924,6 +926,46 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_ShardC s.IsType(&serviceerror.InvalidArgument{}, err) } +func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_ShardCount_Multiple() { + var rpcAddress = uuid.New() + var clusterName = uuid.New() + var clusterId = uuid.New() + var recordVersion int64 = 5 + + s.mockMetadata.EXPECT().GetFailoverVersionIncrement().Return(int64(0)) + s.mockMetadata.EXPECT().GetAllClusterInfo().Return(make(map[string]cluster.ClusterInformation)) + s.mockClientFactory.EXPECT().NewRemoteAdminClientWithTimeout(rpcAddress, gomock.Any(), gomock.Any()).Return( + s.mockAdminClient, + ) + s.mockAdminClient.EXPECT().DescribeCluster(gomock.Any(), &adminservice.DescribeClusterRequest{}).Return( + &adminservice.DescribeClusterResponse{ + ClusterId: clusterId, + ClusterName: clusterName, + HistoryShardCount: 16, + FailoverVersionIncrement: 0, + InitialFailoverVersion: 0, + IsGlobalNamespaceEnabled: true, + }, nil) + s.mockClusterMetadataManager.EXPECT().GetClusterMetadata(gomock.Any(), &persistence.GetClusterMetadataRequest{ClusterName: clusterName}).Return( + &persistence.GetClusterMetadataResponse{ + Version: recordVersion, + }, nil) + s.mockClusterMetadataManager.EXPECT().SaveClusterMetadata(gomock.Any(), &persistence.SaveClusterMetadataRequest{ + ClusterMetadata: persistencespb.ClusterMetadata{ + ClusterName: clusterName, + HistoryShardCount: 16, + ClusterId: clusterId, + ClusterAddress: rpcAddress, + FailoverVersionIncrement: 0, + InitialFailoverVersion: 0, + IsGlobalNamespaceEnabled: true, + }, + Version: recordVersion, + }).Return(true, nil) + _, err := s.handler.AddOrUpdateRemoteCluster(context.Background(), &adminservice.AddOrUpdateRemoteClusterRequest{FrontendAddress: rpcAddress}) + s.NoError(err) +} + func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_GlobalNamespaceDisabled() { var rpcAddress = uuid.New() var clusterName = uuid.New() @@ -937,7 +979,7 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_Global &adminservice.DescribeClusterResponse{ ClusterId: clusterId, ClusterName: clusterName, - HistoryShardCount: 0, + HistoryShardCount: 4, FailoverVersionIncrement: 0, InitialFailoverVersion: 0, IsGlobalNamespaceEnabled: false, @@ -963,7 +1005,7 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_Initia &adminservice.DescribeClusterResponse{ ClusterId: clusterId, ClusterName: clusterName, - HistoryShardCount: 0, + HistoryShardCount: 4, FailoverVersionIncrement: 0, InitialFailoverVersion: 0, IsGlobalNamespaceEnabled: true, @@ -1001,7 +1043,7 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_GetClusterMetadata_Err &adminservice.DescribeClusterResponse{ ClusterId: clusterId, ClusterName: clusterName, - HistoryShardCount: 0, + HistoryShardCount: 4, FailoverVersionIncrement: 0, InitialFailoverVersion: 0, IsGlobalNamespaceEnabled: true, @@ -1028,7 +1070,7 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_SaveClusterMetadata_Er &adminservice.DescribeClusterResponse{ ClusterId: clusterId, ClusterName: clusterName, - HistoryShardCount: 0, + HistoryShardCount: 4, FailoverVersionIncrement: 0, InitialFailoverVersion: 0, IsGlobalNamespaceEnabled: true, @@ -1040,7 +1082,7 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_SaveClusterMetadata_Er s.mockClusterMetadataManager.EXPECT().SaveClusterMetadata(gomock.Any(), &persistence.SaveClusterMetadataRequest{ ClusterMetadata: persistencespb.ClusterMetadata{ ClusterName: clusterName, - HistoryShardCount: 0, + HistoryShardCount: 4, ClusterId: clusterId, ClusterAddress: rpcAddress, FailoverVersionIncrement: 0, @@ -1067,7 +1109,7 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_SaveClusterMetadata_No &adminservice.DescribeClusterResponse{ ClusterId: clusterId, ClusterName: clusterName, - HistoryShardCount: 0, + HistoryShardCount: 4, FailoverVersionIncrement: 0, InitialFailoverVersion: 0, IsGlobalNamespaceEnabled: true, @@ -1079,7 +1121,7 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_SaveClusterMetadata_No s.mockClusterMetadataManager.EXPECT().SaveClusterMetadata(gomock.Any(), &persistence.SaveClusterMetadataRequest{ ClusterMetadata: persistencespb.ClusterMetadata{ ClusterName: clusterName, - HistoryShardCount: 0, + HistoryShardCount: 4, ClusterId: clusterId, ClusterAddress: rpcAddress, FailoverVersionIncrement: 0, diff --git a/service/frontend/operator_handler.go b/service/frontend/operator_handler.go index f02db1ecba5..058229cabbd 100644 --- a/service/frontend/operator_handler.go +++ b/service/frontend/operator_handler.go @@ -499,9 +499,15 @@ func (h *OperatorHandlerImpl) validateRemoteClusterMetadata(metadata *adminservi return serviceerror.NewInvalidArgument("Cannot add remote cluster due to failover version increment mismatch") } if metadata.GetHistoryShardCount() != h.config.NumHistoryShards { - // cluster shard number not equal - // TODO: remove this check once we support different shard numbers - return serviceerror.NewInvalidArgument("Cannot add remote cluster due to history shard number mismatch") + remoteShardCount := metadata.GetHistoryShardCount() + large := remoteShardCount + small := h.config.NumHistoryShards + if large < small { + small, large = large, small + } + if large%small != 0 { + return serviceerror.NewInvalidArgument("Remote cluster shard number and local cluster shard number are not multiples.") + } } if !metadata.IsGlobalNamespaceEnabled { // remote cluster doesn't support global namespace diff --git a/service/frontend/operator_handler_test.go b/service/frontend/operator_handler_test.go index 03f245ea671..3a9a814bcb7 100644 --- a/service/frontend/operator_handler_test.go +++ b/service/frontend/operator_handler_test.go @@ -78,7 +78,7 @@ func (s *operatorHandlerSuite) SetupTest() { s.mockResource.ClusterMetadata.EXPECT().GetCurrentClusterName().Return(uuid.New()).AnyTimes() args := NewOperatorHandlerImplArgs{ - &Config{}, + &Config{NumHistoryShards: 4}, nil, s.mockResource.ESClient, s.mockResource.Logger, @@ -506,7 +506,7 @@ func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_RecordFound_Success &adminservice.DescribeClusterResponse{ ClusterId: clusterId, ClusterName: clusterName, - HistoryShardCount: 0, + HistoryShardCount: 4, FailoverVersionIncrement: 0, InitialFailoverVersion: 0, IsGlobalNamespaceEnabled: true, @@ -518,7 +518,7 @@ func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_RecordFound_Success s.mockResource.ClusterMetadataMgr.EXPECT().SaveClusterMetadata(gomock.Any(), &persistence.SaveClusterMetadataRequest{ ClusterMetadata: persistencespb.ClusterMetadata{ ClusterName: clusterName, - HistoryShardCount: 0, + HistoryShardCount: 4, ClusterId: clusterId, ClusterAddress: rpcAddress, FailoverVersionIncrement: 0, @@ -545,7 +545,7 @@ func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_RecordNotFound_Succ &adminservice.DescribeClusterResponse{ ClusterId: clusterId, ClusterName: clusterName, - HistoryShardCount: 0, + HistoryShardCount: 4, FailoverVersionIncrement: 0, InitialFailoverVersion: 0, IsGlobalNamespaceEnabled: true, @@ -557,7 +557,7 @@ func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_RecordNotFound_Succ s.mockResource.ClusterMetadataMgr.EXPECT().SaveClusterMetadata(gomock.Any(), &persistence.SaveClusterMetadataRequest{ ClusterMetadata: persistencespb.ClusterMetadata{ ClusterName: clusterName, - HistoryShardCount: 0, + HistoryShardCount: 4, ClusterId: clusterId, ClusterAddress: rpcAddress, FailoverVersionIncrement: 0, @@ -614,7 +614,7 @@ func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_Fai s.IsType(&serviceerror.InvalidArgument{}, err) } -func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_ShardCountMismatch() { +func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_ShardCount_Invalid() { var rpcAddress = uuid.New() var clusterName = uuid.New() var clusterId = uuid.New() @@ -627,7 +627,7 @@ func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_Sha &adminservice.DescribeClusterResponse{ ClusterId: clusterId, ClusterName: clusterName, - HistoryShardCount: 1000, + HistoryShardCount: 5, FailoverVersionIncrement: 0, InitialFailoverVersion: 0, IsGlobalNamespaceEnabled: true, @@ -637,6 +637,46 @@ func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_Sha s.IsType(&serviceerror.InvalidArgument{}, err) } +func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_ShardCount_Multiple() { + var rpcAddress = uuid.New() + var clusterName = uuid.New() + var clusterId = uuid.New() + var recordVersion int64 = 5 + + s.mockResource.ClusterMetadata.EXPECT().GetFailoverVersionIncrement().Return(int64(0)) + s.mockResource.ClusterMetadata.EXPECT().GetAllClusterInfo().Return(make(map[string]cluster.ClusterInformation)) + s.mockResource.ClientFactory.EXPECT().NewRemoteAdminClientWithTimeout(rpcAddress, gomock.Any(), gomock.Any()).Return( + s.mockResource.RemoteAdminClient, + ) + s.mockResource.RemoteAdminClient.EXPECT().DescribeCluster(gomock.Any(), &adminservice.DescribeClusterRequest{}).Return( + &adminservice.DescribeClusterResponse{ + ClusterId: clusterId, + ClusterName: clusterName, + HistoryShardCount: 16, + FailoverVersionIncrement: 0, + InitialFailoverVersion: 0, + IsGlobalNamespaceEnabled: true, + }, nil) + s.mockResource.ClusterMetadataMgr.EXPECT().GetClusterMetadata(gomock.Any(), &persistence.GetClusterMetadataRequest{ClusterName: clusterName}).Return( + &persistence.GetClusterMetadataResponse{ + Version: recordVersion, + }, nil) + s.mockResource.ClusterMetadataMgr.EXPECT().SaveClusterMetadata(gomock.Any(), &persistence.SaveClusterMetadataRequest{ + ClusterMetadata: persistencespb.ClusterMetadata{ + ClusterName: clusterName, + HistoryShardCount: 16, + ClusterId: clusterId, + ClusterAddress: rpcAddress, + FailoverVersionIncrement: 0, + InitialFailoverVersion: 0, + IsGlobalNamespaceEnabled: true, + }, + Version: recordVersion, + }).Return(true, nil) + _, err := s.handler.AddOrUpdateRemoteCluster(context.Background(), &operatorservice.AddOrUpdateRemoteClusterRequest{FrontendAddress: rpcAddress}) + s.NoError(err) +} + func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_GlobalNamespaceDisabled() { var rpcAddress = uuid.New() var clusterName = uuid.New() @@ -650,7 +690,7 @@ func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_Glo &adminservice.DescribeClusterResponse{ ClusterId: clusterId, ClusterName: clusterName, - HistoryShardCount: 0, + HistoryShardCount: 4, FailoverVersionIncrement: 0, InitialFailoverVersion: 0, IsGlobalNamespaceEnabled: false, @@ -676,7 +716,7 @@ func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_ValidationError_Ini &adminservice.DescribeClusterResponse{ ClusterId: clusterId, ClusterName: clusterName, - HistoryShardCount: 0, + HistoryShardCount: 4, FailoverVersionIncrement: 0, InitialFailoverVersion: 0, IsGlobalNamespaceEnabled: true, @@ -714,7 +754,7 @@ func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_GetClusterMetadata_ &adminservice.DescribeClusterResponse{ ClusterId: clusterId, ClusterName: clusterName, - HistoryShardCount: 0, + HistoryShardCount: 4, FailoverVersionIncrement: 0, InitialFailoverVersion: 0, IsGlobalNamespaceEnabled: true, @@ -741,7 +781,7 @@ func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_SaveClusterMetadata &adminservice.DescribeClusterResponse{ ClusterId: clusterId, ClusterName: clusterName, - HistoryShardCount: 0, + HistoryShardCount: 4, FailoverVersionIncrement: 0, InitialFailoverVersion: 0, IsGlobalNamespaceEnabled: true, @@ -753,7 +793,7 @@ func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_SaveClusterMetadata s.mockResource.ClusterMetadataMgr.EXPECT().SaveClusterMetadata(gomock.Any(), &persistence.SaveClusterMetadataRequest{ ClusterMetadata: persistencespb.ClusterMetadata{ ClusterName: clusterName, - HistoryShardCount: 0, + HistoryShardCount: 4, ClusterId: clusterId, ClusterAddress: rpcAddress, FailoverVersionIncrement: 0, @@ -780,7 +820,7 @@ func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_SaveClusterMetadata &adminservice.DescribeClusterResponse{ ClusterId: clusterId, ClusterName: clusterName, - HistoryShardCount: 0, + HistoryShardCount: 4, FailoverVersionIncrement: 0, InitialFailoverVersion: 0, IsGlobalNamespaceEnabled: true, @@ -792,7 +832,7 @@ func (s *operatorHandlerSuite) Test_AddOrUpdateRemoteCluster_SaveClusterMetadata s.mockResource.ClusterMetadataMgr.EXPECT().SaveClusterMetadata(gomock.Any(), &persistence.SaveClusterMetadataRequest{ ClusterMetadata: persistencespb.ClusterMetadata{ ClusterName: clusterName, - HistoryShardCount: 0, + HistoryShardCount: 4, ClusterId: clusterId, ClusterAddress: rpcAddress, FailoverVersionIncrement: 0,