From 097a860689560c939a4f6ddbf61037a90457706b Mon Sep 17 00:00:00 2001 From: Song Gao Date: Mon, 2 Nov 2020 16:29:01 +0800 Subject: [PATCH 01/20] fix unknown dc Signed-off-by: Song Gao --- go.mod | 1 + go.sum | 4 ++ server/grpc_service.go | 25 ++++++++ server/tso/allocator_manager.go | 103 ++++++++++++++++++++++++++++++++ server/tso/global_allocator.go | 46 ++++---------- 5 files changed, 143 insertions(+), 36 deletions(-) diff --git a/go.mod b/go.mod index a7b66e5d005..91ee467fbbf 100644 --- a/go.mod +++ b/go.mod @@ -52,6 +52,7 @@ require ( ) replace ( + github.com/pingcap/kvproto => github.com/Yisaer/kvproto v0.0.0-20201102071311-c664be078e8e github.com/sirupsen/logrus => github.com/sirupsen/logrus v1.2.0 go.etcd.io/bbolt => go.etcd.io/bbolt v1.3.5 ) diff --git a/go.sum b/go.sum index 506387789b9..32efa6f67e8 100644 --- a/go.sum +++ b/go.sum @@ -53,6 +53,10 @@ github.com/VividCortex/mysqlerr v0.0.0-20200629151747-c28746d985dd h1:59Whn6shj5 github.com/VividCortex/mysqlerr v0.0.0-20200629151747-c28746d985dd/go.mod h1:f3HiCrHjHBdcm6E83vGaXh1KomZMA2P6aeo3hKx/wg0= github.com/VoltDB/voltdb-client-go v1.0.1/go.mod h1:FSuyUyPbMimaMJ9DItBzhVzjFj5S4XHcEISlKEnFZ5Q= github.com/XiaoMi/pegasus-go-client v0.0.0-20181029071519-9400942c5d1c/go.mod h1:KcL6D/4RZ8RAYzQ5gKI0odcdWUmCVlbQTOlWrhP71CY= +github.com/Yisaer/kvproto v0.0.0-20201102060001-4f78b5b9dc8c h1:UfLV1/V7HmuT4mIriXOY+iZlCZpqZ6RlAPwhJxBEk5U= +github.com/Yisaer/kvproto v0.0.0-20201102060001-4f78b5b9dc8c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/Yisaer/kvproto v0.0.0-20201102071311-c664be078e8e h1:0uuKHxg8bYBDBxyvcK/eszjzW/flSJXSvmTRbOrtpZs= +github.com/Yisaer/kvproto v0.0.0-20201102071311-c664be078e8e/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/aerospike/aerospike-client-go v1.35.2/go.mod h1:zj8LBEnWBDOVEIJt8LvaRvDG5ARAoa5dBeHaB472NRc= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= github.com/akavel/rsrc v0.8.0/go.mod h1:uLoCtb9J+EyAqh+26kdrTgmzRBFPGOolLWKpdxkKq+c= diff --git a/server/grpc_service.go b/server/grpc_service.go index 16c2f42451c..8c5d19bd6e5 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -32,6 +32,7 @@ import ( "github.com/tikv/pd/server/cluster" "github.com/tikv/pd/server/config" "github.com/tikv/pd/server/core" + "github.com/tikv/pd/server/tso" "github.com/tikv/pd/server/versioninfo" "go.uber.org/zap" "google.golang.org/grpc/codes" @@ -990,6 +991,30 @@ func (s *Server) SyncMaxTS(ctx context.Context, request *pdpb.SyncMaxTSRequest) }, nil } +func (s *Server) GetDCLocations(ctx context.Context, request *pdpb.GetDCLocationsRequest) (*pdpb.GetDCLocationsResponse, error) { + if err := s.validateInternalRequest(request.GetHeader()); err != nil { + return nil, err + } + if !s.member.IsLeader() { + return nil, fmt.Errorf("receving pd member[%v] is not pd leader", s.member.ID()) + } + allocator, err := s.GetTSOAllocatorManager().GetAllocator(config.GlobalDCLocation) + if err != nil { + return nil, fmt.Errorf("failed to get tso allocator[%v]", config.GlobalDCLocation) + } + globalAllocator, ok := allocator.(*tso.GlobalTSOAllocator) + if !ok { + return nil, fmt.Errorf("tso allocator[%v] is not global tso allocator", config.GlobalDCLocation) + } + if !globalAllocator.IsInitialize() { + return nil, fmt.Errorf("global tso alloactor is not initialized") + } + return &pdpb.GetDCLocationsResponse{ + Header: s.header(), + DcLocations: globalAllocator.GetDcLocations(), + }, nil +} + // validateInternalRequest checks if server is closed, which is used to validate // the gRPC communication between PD servers internally. func (s *Server) validateInternalRequest(header *pdpb.RequestHeader) error { diff --git a/server/tso/allocator_manager.go b/server/tso/allocator_manager.go index 99d7dac5e90..c8fd0ff54db 100644 --- a/server/tso/allocator_manager.go +++ b/server/tso/allocator_manager.go @@ -34,6 +34,7 @@ import ( "github.com/tikv/pd/server/member" "go.etcd.io/etcd/clientv3" "go.uber.org/zap" + "google.golang.org/grpc" ) const ( @@ -85,6 +86,11 @@ type AllocatorManager struct { updatePhysicalInterval time.Duration maxResetTSGap func() time.Duration securityConfig *grpcutil.TLSConfig + // for gRPC use + localAllocatorConn struct { + sync.RWMutex + clientConns map[string]*grpc.ClientConn + } } // NewAllocatorManager creates a new TSO Allocator Manager. @@ -105,6 +111,7 @@ func NewAllocatorManager( securityConfig: sc, } allocatorManager.mu.allocatorGroups = make(map[string]*allocatorGroup) + allocatorManager.localAllocatorConn.clientConns = make(map[string]*grpc.ClientConn) return allocatorManager } @@ -221,6 +228,22 @@ func (am *AllocatorManager) allocatorLeaderLoop(ctx context.Context, allocator * default: } + ok, err := am.isLeaderAwareOfDCLocation(ctx, allocator.dcLocation) + if err != nil { + log.Error("get dc-locations from pd leader failed", + zap.String("dc-location", allocator.dcLocation), + errs.ZapError(err)) + time.Sleep(200 * time.Millisecond) + continue + } + if !ok { + log.Error("pd leader is not aware of dc-location during allocatorLeaderLoop, wait next round", + zap.String("dc-location", allocator.dcLocation), + zap.String("wait-duration", checkStep.String())) + time.Sleep(checkStep) + continue + } + allocatorLeader, rev, checkAgain := allocator.CheckAllocatorLeader() if checkAgain { continue @@ -656,3 +679,83 @@ func (am *AllocatorManager) GetLocalAllocatorLeaders() (map[string]*pdpb.Member, } return localAllocatorLeaderMember, nil } + +func (am *AllocatorManager) GetOrCreateGRPCConn(ctx context.Context, addr string) (*grpc.ClientConn, error) { + conn, ok := am.getGRPCConn(addr) + if ok { + return conn, nil + } + tlsCfg, err := am.securityConfig.ToTLSConfig() + if err != nil { + return nil, err + } + ctxWithTimeout, cancel := context.WithTimeout(ctx, dialTimeout) + defer cancel() + cc, err := grpcutil.GetClientConn(ctxWithTimeout, addr, tlsCfg) + if err != nil { + return nil, err + } + am.setGRPCConn(cc, addr) + return cc, nil +} + +func (am *AllocatorManager) isLeaderAwareOfDCLocation(ctx context.Context, dcLocation string) (bool, error) { + dcLocations, err := am.getLeaderDCLocations(ctx) + if err != nil { + return false, err + } + for _, dc := range dcLocations { + if dcLocation == dc { + return true, nil + } + } + return false, nil +} + +func (am *AllocatorManager) getLeaderDCLocations(ctx context.Context) ([]string, error) { + dcLocations := make([]string, 0) + if am.member.IsLeader() { + for dcLocation := range am.GetClusterDCLocations() { + dcLocations = append(dcLocations, dcLocation) + } + return dcLocations, nil + } + + leaderAddrs := am.member.GetLeader().ClientUrls + if len(leaderAddrs) < 1 { + return nil, fmt.Errorf("failed to get leader client url") + } + conn, err := am.GetOrCreateGRPCConn(ctx, leaderAddrs[0]) + if err != nil { + return nil, err + } + getCtx, cancel := context.WithTimeout(ctx, rpcTimeout) + defer cancel() + resp, err := pdpb.NewPDClient(conn).GetDCLocations(getCtx, &pdpb.GetDCLocationsRequest{ + Header: &pdpb.RequestHeader{ + SenderId: am.member.ID(), + }, + }) + if err != nil { + return nil, err + } + return resp.GetDcLocations(), nil +} + +func (am *AllocatorManager) getGRPCConn(addr string) (*grpc.ClientConn, bool) { + am.localAllocatorConn.RLock() + defer am.localAllocatorConn.RUnlock() + conn, ok := am.localAllocatorConn.clientConns[addr] + return conn, ok +} + +func (am *AllocatorManager) setGRPCConn(newConn *grpc.ClientConn, addr string) { + am.localAllocatorConn.Lock() + defer am.localAllocatorConn.Unlock() + if _, ok := am.localAllocatorConn.clientConns[addr]; ok { + newConn.Close() + log.Debug("use old connection", zap.String("target", newConn.Target()), zap.String("state", newConn.GetState().String())) + return + } + am.localAllocatorConn.clientConns[addr] = newConn +} diff --git a/server/tso/global_allocator.go b/server/tso/global_allocator.go index 36004fc3944..6ad12b88004 100644 --- a/server/tso/global_allocator.go +++ b/server/tso/global_allocator.go @@ -22,7 +22,6 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/grpcutil" "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/tsoutil" "github.com/tikv/pd/pkg/typeutil" @@ -59,11 +58,6 @@ type GlobalTSOAllocator struct { timestampOracle *timestampOracle // for global TSO synchronization allocatorManager *AllocatorManager - // for gRPC use - localAllocatorConn struct { - sync.RWMutex - clientConns map[string]*grpc.ClientConn - } } // NewGlobalTSOAllocator creates a new global TSO allocator. @@ -86,7 +80,6 @@ func NewGlobalTSOAllocator( }, allocatorManager: am, } - gta.localAllocatorConn.clientConns = make(map[string]*grpc.ClientConn) return gta } @@ -178,7 +171,7 @@ func (gta *GlobalTSOAllocator) syncMaxTS(ctx context.Context, dcLocationMap map[ var errList []error wg := sync.WaitGroup{} for _, leaderURL := range leaderURLs { - leaderConn, err := gta.getOrCreateGRPCConn(ctx, leaderURL) + leaderConn, err := gta.allocatorManager.GetOrCreateGRPCConn(ctx, leaderURL) if err != nil { return err } @@ -255,34 +248,6 @@ func (gta *GlobalTSOAllocator) checkSyncedDCs(dcLocationMap map[string][]uint64, return len(unsyncedDCs) == 0 } -func (gta *GlobalTSOAllocator) getOrCreateGRPCConn(ctx context.Context, addr string) (*grpc.ClientConn, error) { - gta.localAllocatorConn.RLock() - conn, ok := gta.localAllocatorConn.clientConns[addr] - gta.localAllocatorConn.RUnlock() - if ok { - return conn, nil - } - tlsCfg, err := gta.allocatorManager.securityConfig.ToTLSConfig() - if err != nil { - return nil, err - } - ctxWithTimeout, cancel := context.WithTimeout(ctx, dialTimeout) - defer cancel() - cc, err := grpcutil.GetClientConn(ctxWithTimeout, addr, tlsCfg) - if err != nil { - return nil, err - } - gta.localAllocatorConn.Lock() - defer gta.localAllocatorConn.Unlock() - if old, ok := gta.localAllocatorConn.clientConns[addr]; ok { - cc.Close() - log.Debug("use old connection", zap.String("target", cc.Target()), zap.String("state", cc.GetState().String())) - return old, nil - } - gta.localAllocatorConn.clientConns[addr] = cc - return cc, nil -} - func (gta *GlobalTSOAllocator) getCurrentTSO() (pdpb.Timestamp, error) { currentPhysical, currentLogical := gta.timestampOracle.getTSO() if currentPhysical == typeutil.ZeroTime { @@ -295,3 +260,12 @@ func (gta *GlobalTSOAllocator) getCurrentTSO() (pdpb.Timestamp, error) { func (gta *GlobalTSOAllocator) Reset() { gta.timestampOracle.ResetTimestamp() } + +func (gta *GlobalTSOAllocator) GetDcLocations() []string { + dcLocationsMap := gta.allocatorManager.GetClusterDCLocations() + dcLocations := make([]string, len(dcLocationsMap)) + for dc := range dcLocationsMap { + dcLocations = append(dcLocations, dc) + } + return dcLocations +} From af4b7bc13bf7ec95b3b2f31413876605b6b586a2 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Mon, 2 Nov 2020 16:34:44 +0800 Subject: [PATCH 02/20] fix lint Signed-off-by: Song Gao --- server/grpc_service.go | 2 ++ server/tso/allocator_manager.go | 4 ++-- server/tso/global_allocator.go | 3 ++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/server/grpc_service.go b/server/grpc_service.go index 8c5d19bd6e5..6b0972d0ab7 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -991,6 +991,8 @@ func (s *Server) SyncMaxTS(ctx context.Context, request *pdpb.SyncMaxTSRequest) }, nil } +// GetDCLocations will return the dcLocations which hold by the Global TSO Allocator. +// If the receiving PD Member is not PD Leader, GetDCLocations will return error. func (s *Server) GetDCLocations(ctx context.Context, request *pdpb.GetDCLocationsRequest) (*pdpb.GetDCLocationsResponse, error) { if err := s.validateInternalRequest(request.GetHeader()); err != nil { return nil, err diff --git a/server/tso/allocator_manager.go b/server/tso/allocator_manager.go index c8fd0ff54db..cfb0f437766 100644 --- a/server/tso/allocator_manager.go +++ b/server/tso/allocator_manager.go @@ -680,7 +680,7 @@ func (am *AllocatorManager) GetLocalAllocatorLeaders() (map[string]*pdpb.Member, return localAllocatorLeaderMember, nil } -func (am *AllocatorManager) GetOrCreateGRPCConn(ctx context.Context, addr string) (*grpc.ClientConn, error) { +func (am *AllocatorManager) getOrCreateGRPCConn(ctx context.Context, addr string) (*grpc.ClientConn, error) { conn, ok := am.getGRPCConn(addr) if ok { return conn, nil @@ -725,7 +725,7 @@ func (am *AllocatorManager) getLeaderDCLocations(ctx context.Context) ([]string, if len(leaderAddrs) < 1 { return nil, fmt.Errorf("failed to get leader client url") } - conn, err := am.GetOrCreateGRPCConn(ctx, leaderAddrs[0]) + conn, err := am.getOrCreateGRPCConn(ctx, leaderAddrs[0]) if err != nil { return nil, err } diff --git a/server/tso/global_allocator.go b/server/tso/global_allocator.go index 6ad12b88004..939b5514d15 100644 --- a/server/tso/global_allocator.go +++ b/server/tso/global_allocator.go @@ -171,7 +171,7 @@ func (gta *GlobalTSOAllocator) syncMaxTS(ctx context.Context, dcLocationMap map[ var errList []error wg := sync.WaitGroup{} for _, leaderURL := range leaderURLs { - leaderConn, err := gta.allocatorManager.GetOrCreateGRPCConn(ctx, leaderURL) + leaderConn, err := gta.allocatorManager.getOrCreateGRPCConn(ctx, leaderURL) if err != nil { return err } @@ -261,6 +261,7 @@ func (gta *GlobalTSOAllocator) Reset() { gta.timestampOracle.ResetTimestamp() } +// GetDcLocations return all the dcLocations the GlobalTSOAllocator will check func (gta *GlobalTSOAllocator) GetDcLocations() []string { dcLocationsMap := gta.allocatorManager.GetClusterDCLocations() dcLocations := make([]string, len(dcLocationsMap)) From dffebbb2b8a58c16fc2176ff5727c9f7be7a1a62 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Mon, 2 Nov 2020 16:46:53 +0800 Subject: [PATCH 03/20] fix lint Signed-off-by: Song Gao --- server/grpc_service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/grpc_service.go b/server/grpc_service.go index 6b0972d0ab7..2c8445a8198 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -998,7 +998,7 @@ func (s *Server) GetDCLocations(ctx context.Context, request *pdpb.GetDCLocation return nil, err } if !s.member.IsLeader() { - return nil, fmt.Errorf("receving pd member[%v] is not pd leader", s.member.ID()) + return nil, fmt.Errorf("receiving pd member[%v] is not pd leader", s.member.ID()) } allocator, err := s.GetTSOAllocatorManager().GetAllocator(config.GlobalDCLocation) if err != nil { From 32327c11f49cae6aee5fde6d3748f3716a6b33d4 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Mon, 2 Nov 2020 17:06:31 +0800 Subject: [PATCH 04/20] fix tidy Signed-off-by: Song Gao --- go.sum | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/go.sum b/go.sum index 32efa6f67e8..5e0eaea8e2f 100644 --- a/go.sum +++ b/go.sum @@ -53,8 +53,6 @@ github.com/VividCortex/mysqlerr v0.0.0-20200629151747-c28746d985dd h1:59Whn6shj5 github.com/VividCortex/mysqlerr v0.0.0-20200629151747-c28746d985dd/go.mod h1:f3HiCrHjHBdcm6E83vGaXh1KomZMA2P6aeo3hKx/wg0= github.com/VoltDB/voltdb-client-go v1.0.1/go.mod h1:FSuyUyPbMimaMJ9DItBzhVzjFj5S4XHcEISlKEnFZ5Q= github.com/XiaoMi/pegasus-go-client v0.0.0-20181029071519-9400942c5d1c/go.mod h1:KcL6D/4RZ8RAYzQ5gKI0odcdWUmCVlbQTOlWrhP71CY= -github.com/Yisaer/kvproto v0.0.0-20201102060001-4f78b5b9dc8c h1:UfLV1/V7HmuT4mIriXOY+iZlCZpqZ6RlAPwhJxBEk5U= -github.com/Yisaer/kvproto v0.0.0-20201102060001-4f78b5b9dc8c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/Yisaer/kvproto v0.0.0-20201102071311-c664be078e8e h1:0uuKHxg8bYBDBxyvcK/eszjzW/flSJXSvmTRbOrtpZs= github.com/Yisaer/kvproto v0.0.0-20201102071311-c664be078e8e/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/aerospike/aerospike-client-go v1.35.2/go.mod h1:zj8LBEnWBDOVEIJt8LvaRvDG5ARAoa5dBeHaB472NRc= @@ -313,7 +311,6 @@ github.com/gocql/gocql v0.0.0-20181124151448-70385f88b28b/go.mod h1:4Fw1eo5iaEhD github.com/gocql/gocql v0.0.0-20200103014340-68f928edb90a/go.mod h1:DL0ekTmBSTdlNF25Orwt/JMzqIq3EJ4MVa/J/uK64OY= github.com/godror/godror v0.10.3/go.mod h1:9MVLtu25FBJBMHkPs0m3Ngf/VmwGcLpM2HS8PlNGw9U= github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= -github.com/gogo/protobuf v0.0.0-20180717141946-636bf0302bc9/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= @@ -337,7 +334,6 @@ github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4er github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= -github.com/golang/protobuf v0.0.0-20180814211427-aa810b61a9c7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -632,16 +628,6 @@ github.com/pingcap/go-ycsb v0.0.0-20200226103513-00ca633a87d8/go.mod h1:B9UJ3Lbp github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3/go.mod h1:DazNTg0PTldtpsQiT9I5tVJwV1onHMKBBgXzmJUlMns= github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= -github.com/pingcap/kvproto v0.0.0-20190305055742-ab7debc182d9/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= -github.com/pingcap/kvproto v0.0.0-20190506024016-26344dff8f48/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= -github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20200214064158-62d31900d88e/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20200417092353-efbe03bcffbd/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20200420075417-e0c6e8842f22/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20200810113304-6157337686b1/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20201027123903-c4791e779a8c h1:O3tnG2+7F0yvTli099KS6yMAv6YtHx5EFW9tn46MpY8= -github.com/pingcap/kvproto v0.0.0-20201027123903-c4791e779a8c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad h1:SveG82rmu/GFxYanffxsSF503SiQV+2JLnWEiGiF+Tc= @@ -1124,7 +1110,6 @@ google.golang.org/appengine v1.6.2/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww google.golang.org/appengine v1.6.5 h1:tycE03LOZYQNhDpS27tcQdAzLCVMaj7QT2SXxebnpCM= google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= -google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto v0.0.0-20190418145605-e7d98fc518a7/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= @@ -1143,7 +1128,6 @@ google.golang.org/genproto v0.0.0-20191216164720-4f79533eabd1/go.mod h1:n3cpQtvx google.golang.org/genproto v0.0.0-20191230161307-f3c370f40bfb/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= google.golang.org/genproto v0.0.0-20200108215221-bd8f9a0ef82f h1:2wh8dWY8959cBGQvk1RD+/eQBgRYYDaZ+hT0/zsARoA= google.golang.org/genproto v0.0.0-20200108215221-bd8f9a0ef82f/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= -google.golang.org/grpc v0.0.0-20180607172857-7a6a684ca69e/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= From dea3d642de856d27ae971d418f1bab4463db5c86 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Mon, 2 Nov 2020 17:42:23 +0800 Subject: [PATCH 05/20] fix sender header Signed-off-by: Song Gao --- server/tso/allocator_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/tso/allocator_manager.go b/server/tso/allocator_manager.go index cfb0f437766..b98b8d6ddfa 100644 --- a/server/tso/allocator_manager.go +++ b/server/tso/allocator_manager.go @@ -733,7 +733,7 @@ func (am *AllocatorManager) getLeaderDCLocations(ctx context.Context) ([]string, defer cancel() resp, err := pdpb.NewPDClient(conn).GetDCLocations(getCtx, &pdpb.GetDCLocationsRequest{ Header: &pdpb.RequestHeader{ - SenderId: am.member.ID(), + SenderId: am.member.GetLeader().GetMemberId(), }, }) if err != nil { From 5efd8969ab1eafd7ceea43170e22b4e8b45b3998 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Mon, 2 Nov 2020 18:32:27 +0800 Subject: [PATCH 06/20] fix nil Signed-off-by: Song Gao --- server/tso/allocator_manager.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/tso/allocator_manager.go b/server/tso/allocator_manager.go index b98b8d6ddfa..0b4de928d99 100644 --- a/server/tso/allocator_manager.go +++ b/server/tso/allocator_manager.go @@ -721,8 +721,8 @@ func (am *AllocatorManager) getLeaderDCLocations(ctx context.Context) ([]string, return dcLocations, nil } - leaderAddrs := am.member.GetLeader().ClientUrls - if len(leaderAddrs) < 1 { + leaderAddrs := am.member.GetLeader().GetClientUrls() + if leaderAddrs == nil || len(leaderAddrs) < 1 { return nil, fmt.Errorf("failed to get leader client url") } conn, err := am.getOrCreateGRPCConn(ctx, leaderAddrs[0]) From bf4804a404a13c51d5da7062e7c4cb133735a790 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Mon, 2 Nov 2020 19:50:51 +0800 Subject: [PATCH 07/20] address the comment Signed-off-by: Song Gao --- server/tso/allocator_manager.go | 1 - server/tso/global_allocator.go | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/server/tso/allocator_manager.go b/server/tso/allocator_manager.go index 0b4de928d99..4b7a8ed44e6 100644 --- a/server/tso/allocator_manager.go +++ b/server/tso/allocator_manager.go @@ -755,7 +755,6 @@ func (am *AllocatorManager) setGRPCConn(newConn *grpc.ClientConn, addr string) { if _, ok := am.localAllocatorConn.clientConns[addr]; ok { newConn.Close() log.Debug("use old connection", zap.String("target", newConn.Target()), zap.String("state", newConn.GetState().String())) - return } am.localAllocatorConn.clientConns[addr] = newConn } diff --git a/server/tso/global_allocator.go b/server/tso/global_allocator.go index 939b5514d15..2a2dad139b3 100644 --- a/server/tso/global_allocator.go +++ b/server/tso/global_allocator.go @@ -264,7 +264,7 @@ func (gta *GlobalTSOAllocator) Reset() { // GetDcLocations return all the dcLocations the GlobalTSOAllocator will check func (gta *GlobalTSOAllocator) GetDcLocations() []string { dcLocationsMap := gta.allocatorManager.GetClusterDCLocations() - dcLocations := make([]string, len(dcLocationsMap)) + dcLocations := make([]string, 0, len(dcLocationsMap)) for dc := range dcLocationsMap { dcLocations = append(dcLocations, dc) } From 572aeedf85311e0ac8ae4f7c980a27f8ad66ccbf Mon Sep 17 00:00:00 2001 From: Song Gao Date: Mon, 2 Nov 2020 19:56:23 +0800 Subject: [PATCH 08/20] fix error Signed-off-by: Song Gao --- server/tso/allocator_manager.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/tso/allocator_manager.go b/server/tso/allocator_manager.go index 4b7a8ed44e6..88479559753 100644 --- a/server/tso/allocator_manager.go +++ b/server/tso/allocator_manager.go @@ -696,7 +696,8 @@ func (am *AllocatorManager) getOrCreateGRPCConn(ctx context.Context, addr string return nil, err } am.setGRPCConn(cc, addr) - return cc, nil + conn, _ = am.getGRPCConn(addr) + return conn, nil } func (am *AllocatorManager) isLeaderAwareOfDCLocation(ctx context.Context, dcLocation string) (bool, error) { @@ -755,6 +756,7 @@ func (am *AllocatorManager) setGRPCConn(newConn *grpc.ClientConn, addr string) { if _, ok := am.localAllocatorConn.clientConns[addr]; ok { newConn.Close() log.Debug("use old connection", zap.String("target", newConn.Target()), zap.String("state", newConn.GetState().String())) + return } am.localAllocatorConn.clientConns[addr] = newConn } From 6cef1ab0053ee5bd8d400d10e2c2fc91adfef232 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Tue, 3 Nov 2020 15:05:01 +0800 Subject: [PATCH 09/20] add unit test Signed-off-by: Song Gao --- tests/server/tso/local_tso_test.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/tests/server/tso/local_tso_test.go b/tests/server/tso/local_tso_test.go index 9a153e44fc8..470ff9d2c5f 100644 --- a/tests/server/tso/local_tso_test.go +++ b/tests/server/tso/local_tso_test.go @@ -15,6 +15,7 @@ package tso_test import ( "context" + "sort" "sync" "time" @@ -75,6 +76,13 @@ func (s *testLocalTSOSuite) TestLocalTSO(c *C) { cluster.WaitLeader() leaderServer := cluster.GetServer(cluster.GetLeader()) + s.testGetDcLocations(c, dcClientMap[leaderServer.GetConfig().LocalTSO.DCLocation], + &pdpb.GetDCLocationsRequest{ + Header: &pdpb.RequestHeader{ + SenderId: leaderServer.GetServer().GetMember().ID(), + }, + }, + []string{"dc-1", "dc-2", "dc-3"}) var wg sync.WaitGroup for i := 0; i < 10; i++ { @@ -125,3 +133,13 @@ func (s *testLocalTSOSuite) testGetLocalTimestamp(c *C, pdCli pdpb.PDClient, req c.Assert(res.GetLogical(), Greater, int64(0)) return res } + +func (s *testLocalTSOSuite) testGetDcLocations(c *C, pdCli pdpb.PDClient, req *pdpb.GetDCLocationsRequest, dcLocations []string) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + resp, err := pdCli.GetDCLocations(ctx, req) + c.Assert(err, IsNil) + sort.Strings(dcLocations) + sort.Strings(resp.DcLocations) + c.Assert(resp.DcLocations, DeepEquals, dcLocations) +} From 4d6817714f003f5f2535fe0e4807786086811272 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Wed, 4 Nov 2020 12:35:13 +0800 Subject: [PATCH 10/20] update kvproto Signed-off-by: Song Gao --- go.mod | 3 +-- go.sum | 16 ++++++++++++++-- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index 91ee467fbbf..8fb53f73436 100644 --- a/go.mod +++ b/go.mod @@ -28,7 +28,7 @@ require ( github.com/pingcap/errcode v0.3.0 github.com/pingcap/errors v0.11.5-0.20200917111840-a15ef68f753d github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce - github.com/pingcap/kvproto v0.0.0-20201027123903-c4791e779a8c + github.com/pingcap/kvproto v0.0.0-20201104042953-62eb316d5182 github.com/pingcap/log v0.0.0-20200511115504-543df19646ad github.com/pingcap/sysutil v0.0.0-20201021075216-f93ced2829e2 github.com/pingcap/tiup v1.2.2 @@ -52,7 +52,6 @@ require ( ) replace ( - github.com/pingcap/kvproto => github.com/Yisaer/kvproto v0.0.0-20201102071311-c664be078e8e github.com/sirupsen/logrus => github.com/sirupsen/logrus v1.2.0 go.etcd.io/bbolt => go.etcd.io/bbolt v1.3.5 ) diff --git a/go.sum b/go.sum index 5e0eaea8e2f..32706f914e2 100644 --- a/go.sum +++ b/go.sum @@ -53,8 +53,6 @@ github.com/VividCortex/mysqlerr v0.0.0-20200629151747-c28746d985dd h1:59Whn6shj5 github.com/VividCortex/mysqlerr v0.0.0-20200629151747-c28746d985dd/go.mod h1:f3HiCrHjHBdcm6E83vGaXh1KomZMA2P6aeo3hKx/wg0= github.com/VoltDB/voltdb-client-go v1.0.1/go.mod h1:FSuyUyPbMimaMJ9DItBzhVzjFj5S4XHcEISlKEnFZ5Q= github.com/XiaoMi/pegasus-go-client v0.0.0-20181029071519-9400942c5d1c/go.mod h1:KcL6D/4RZ8RAYzQ5gKI0odcdWUmCVlbQTOlWrhP71CY= -github.com/Yisaer/kvproto v0.0.0-20201102071311-c664be078e8e h1:0uuKHxg8bYBDBxyvcK/eszjzW/flSJXSvmTRbOrtpZs= -github.com/Yisaer/kvproto v0.0.0-20201102071311-c664be078e8e/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/aerospike/aerospike-client-go v1.35.2/go.mod h1:zj8LBEnWBDOVEIJt8LvaRvDG5ARAoa5dBeHaB472NRc= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= github.com/akavel/rsrc v0.8.0/go.mod h1:uLoCtb9J+EyAqh+26kdrTgmzRBFPGOolLWKpdxkKq+c= @@ -311,6 +309,7 @@ github.com/gocql/gocql v0.0.0-20181124151448-70385f88b28b/go.mod h1:4Fw1eo5iaEhD github.com/gocql/gocql v0.0.0-20200103014340-68f928edb90a/go.mod h1:DL0ekTmBSTdlNF25Orwt/JMzqIq3EJ4MVa/J/uK64OY= github.com/godror/godror v0.10.3/go.mod h1:9MVLtu25FBJBMHkPs0m3Ngf/VmwGcLpM2HS8PlNGw9U= github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= +github.com/gogo/protobuf v0.0.0-20180717141946-636bf0302bc9/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= @@ -334,6 +333,7 @@ github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4er github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= +github.com/golang/protobuf v0.0.0-20180814211427-aa810b61a9c7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -628,6 +628,16 @@ github.com/pingcap/go-ycsb v0.0.0-20200226103513-00ca633a87d8/go.mod h1:B9UJ3Lbp github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3/go.mod h1:DazNTg0PTldtpsQiT9I5tVJwV1onHMKBBgXzmJUlMns= github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= +github.com/pingcap/kvproto v0.0.0-20190305055742-ab7debc182d9/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= +github.com/pingcap/kvproto v0.0.0-20190506024016-26344dff8f48/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= +github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= +github.com/pingcap/kvproto v0.0.0-20200214064158-62d31900d88e/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20200417092353-efbe03bcffbd/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20200420075417-e0c6e8842f22/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20200810113304-6157337686b1/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20201104042953-62eb316d5182 h1:werchMhPPSWn+MCbBGQdh9VpW1CJEZQ3+lrQO8zSb+4= +github.com/pingcap/kvproto v0.0.0-20201104042953-62eb316d5182/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad h1:SveG82rmu/GFxYanffxsSF503SiQV+2JLnWEiGiF+Tc= @@ -1110,6 +1120,7 @@ google.golang.org/appengine v1.6.2/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww google.golang.org/appengine v1.6.5 h1:tycE03LOZYQNhDpS27tcQdAzLCVMaj7QT2SXxebnpCM= google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto v0.0.0-20190418145605-e7d98fc518a7/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= @@ -1128,6 +1139,7 @@ google.golang.org/genproto v0.0.0-20191216164720-4f79533eabd1/go.mod h1:n3cpQtvx google.golang.org/genproto v0.0.0-20191230161307-f3c370f40bfb/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= google.golang.org/genproto v0.0.0-20200108215221-bd8f9a0ef82f h1:2wh8dWY8959cBGQvk1RD+/eQBgRYYDaZ+hT0/zsARoA= google.golang.org/genproto v0.0.0-20200108215221-bd8f9a0ef82f/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= +google.golang.org/grpc v0.0.0-20180607172857-7a6a684ca69e/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= From c6364279d0e1b8f0b04f911d3deb6785f641cbe5 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Mon, 2 Nov 2020 16:29:01 +0800 Subject: [PATCH 11/20] fix unknown dc Signed-off-by: Song Gao --- go.mod | 1 + go.sum | 4 ++ server/grpc_service.go | 25 ++++++++ server/tso/allocator_manager.go | 103 ++++++++++++++++++++++++++++++++ server/tso/global_allocator.go | 46 ++++---------- 5 files changed, 143 insertions(+), 36 deletions(-) diff --git a/go.mod b/go.mod index a7b66e5d005..91ee467fbbf 100644 --- a/go.mod +++ b/go.mod @@ -52,6 +52,7 @@ require ( ) replace ( + github.com/pingcap/kvproto => github.com/Yisaer/kvproto v0.0.0-20201102071311-c664be078e8e github.com/sirupsen/logrus => github.com/sirupsen/logrus v1.2.0 go.etcd.io/bbolt => go.etcd.io/bbolt v1.3.5 ) diff --git a/go.sum b/go.sum index 506387789b9..32efa6f67e8 100644 --- a/go.sum +++ b/go.sum @@ -53,6 +53,10 @@ github.com/VividCortex/mysqlerr v0.0.0-20200629151747-c28746d985dd h1:59Whn6shj5 github.com/VividCortex/mysqlerr v0.0.0-20200629151747-c28746d985dd/go.mod h1:f3HiCrHjHBdcm6E83vGaXh1KomZMA2P6aeo3hKx/wg0= github.com/VoltDB/voltdb-client-go v1.0.1/go.mod h1:FSuyUyPbMimaMJ9DItBzhVzjFj5S4XHcEISlKEnFZ5Q= github.com/XiaoMi/pegasus-go-client v0.0.0-20181029071519-9400942c5d1c/go.mod h1:KcL6D/4RZ8RAYzQ5gKI0odcdWUmCVlbQTOlWrhP71CY= +github.com/Yisaer/kvproto v0.0.0-20201102060001-4f78b5b9dc8c h1:UfLV1/V7HmuT4mIriXOY+iZlCZpqZ6RlAPwhJxBEk5U= +github.com/Yisaer/kvproto v0.0.0-20201102060001-4f78b5b9dc8c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/Yisaer/kvproto v0.0.0-20201102071311-c664be078e8e h1:0uuKHxg8bYBDBxyvcK/eszjzW/flSJXSvmTRbOrtpZs= +github.com/Yisaer/kvproto v0.0.0-20201102071311-c664be078e8e/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/aerospike/aerospike-client-go v1.35.2/go.mod h1:zj8LBEnWBDOVEIJt8LvaRvDG5ARAoa5dBeHaB472NRc= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= github.com/akavel/rsrc v0.8.0/go.mod h1:uLoCtb9J+EyAqh+26kdrTgmzRBFPGOolLWKpdxkKq+c= diff --git a/server/grpc_service.go b/server/grpc_service.go index c81fa7d5216..c35d4dcee46 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -33,6 +33,7 @@ import ( "github.com/tikv/pd/server/cluster" "github.com/tikv/pd/server/config" "github.com/tikv/pd/server/core" + "github.com/tikv/pd/server/tso" "github.com/tikv/pd/server/versioninfo" "go.uber.org/zap" "google.golang.org/grpc/codes" @@ -994,6 +995,30 @@ func (s *Server) SyncMaxTS(ctx context.Context, request *pdpb.SyncMaxTSRequest) }, nil } +func (s *Server) GetDCLocations(ctx context.Context, request *pdpb.GetDCLocationsRequest) (*pdpb.GetDCLocationsResponse, error) { + if err := s.validateInternalRequest(request.GetHeader()); err != nil { + return nil, err + } + if !s.member.IsLeader() { + return nil, fmt.Errorf("receving pd member[%v] is not pd leader", s.member.ID()) + } + allocator, err := s.GetTSOAllocatorManager().GetAllocator(config.GlobalDCLocation) + if err != nil { + return nil, fmt.Errorf("failed to get tso allocator[%v]", config.GlobalDCLocation) + } + globalAllocator, ok := allocator.(*tso.GlobalTSOAllocator) + if !ok { + return nil, fmt.Errorf("tso allocator[%v] is not global tso allocator", config.GlobalDCLocation) + } + if !globalAllocator.IsInitialize() { + return nil, fmt.Errorf("global tso alloactor is not initialized") + } + return &pdpb.GetDCLocationsResponse{ + Header: s.header(), + DcLocations: globalAllocator.GetDcLocations(), + }, nil +} + // validateInternalRequest checks if server is closed, which is used to validate // the gRPC communication between PD servers internally. func (s *Server) validateInternalRequest(header *pdpb.RequestHeader) error { diff --git a/server/tso/allocator_manager.go b/server/tso/allocator_manager.go index 99d7dac5e90..c8fd0ff54db 100644 --- a/server/tso/allocator_manager.go +++ b/server/tso/allocator_manager.go @@ -34,6 +34,7 @@ import ( "github.com/tikv/pd/server/member" "go.etcd.io/etcd/clientv3" "go.uber.org/zap" + "google.golang.org/grpc" ) const ( @@ -85,6 +86,11 @@ type AllocatorManager struct { updatePhysicalInterval time.Duration maxResetTSGap func() time.Duration securityConfig *grpcutil.TLSConfig + // for gRPC use + localAllocatorConn struct { + sync.RWMutex + clientConns map[string]*grpc.ClientConn + } } // NewAllocatorManager creates a new TSO Allocator Manager. @@ -105,6 +111,7 @@ func NewAllocatorManager( securityConfig: sc, } allocatorManager.mu.allocatorGroups = make(map[string]*allocatorGroup) + allocatorManager.localAllocatorConn.clientConns = make(map[string]*grpc.ClientConn) return allocatorManager } @@ -221,6 +228,22 @@ func (am *AllocatorManager) allocatorLeaderLoop(ctx context.Context, allocator * default: } + ok, err := am.isLeaderAwareOfDCLocation(ctx, allocator.dcLocation) + if err != nil { + log.Error("get dc-locations from pd leader failed", + zap.String("dc-location", allocator.dcLocation), + errs.ZapError(err)) + time.Sleep(200 * time.Millisecond) + continue + } + if !ok { + log.Error("pd leader is not aware of dc-location during allocatorLeaderLoop, wait next round", + zap.String("dc-location", allocator.dcLocation), + zap.String("wait-duration", checkStep.String())) + time.Sleep(checkStep) + continue + } + allocatorLeader, rev, checkAgain := allocator.CheckAllocatorLeader() if checkAgain { continue @@ -656,3 +679,83 @@ func (am *AllocatorManager) GetLocalAllocatorLeaders() (map[string]*pdpb.Member, } return localAllocatorLeaderMember, nil } + +func (am *AllocatorManager) GetOrCreateGRPCConn(ctx context.Context, addr string) (*grpc.ClientConn, error) { + conn, ok := am.getGRPCConn(addr) + if ok { + return conn, nil + } + tlsCfg, err := am.securityConfig.ToTLSConfig() + if err != nil { + return nil, err + } + ctxWithTimeout, cancel := context.WithTimeout(ctx, dialTimeout) + defer cancel() + cc, err := grpcutil.GetClientConn(ctxWithTimeout, addr, tlsCfg) + if err != nil { + return nil, err + } + am.setGRPCConn(cc, addr) + return cc, nil +} + +func (am *AllocatorManager) isLeaderAwareOfDCLocation(ctx context.Context, dcLocation string) (bool, error) { + dcLocations, err := am.getLeaderDCLocations(ctx) + if err != nil { + return false, err + } + for _, dc := range dcLocations { + if dcLocation == dc { + return true, nil + } + } + return false, nil +} + +func (am *AllocatorManager) getLeaderDCLocations(ctx context.Context) ([]string, error) { + dcLocations := make([]string, 0) + if am.member.IsLeader() { + for dcLocation := range am.GetClusterDCLocations() { + dcLocations = append(dcLocations, dcLocation) + } + return dcLocations, nil + } + + leaderAddrs := am.member.GetLeader().ClientUrls + if len(leaderAddrs) < 1 { + return nil, fmt.Errorf("failed to get leader client url") + } + conn, err := am.GetOrCreateGRPCConn(ctx, leaderAddrs[0]) + if err != nil { + return nil, err + } + getCtx, cancel := context.WithTimeout(ctx, rpcTimeout) + defer cancel() + resp, err := pdpb.NewPDClient(conn).GetDCLocations(getCtx, &pdpb.GetDCLocationsRequest{ + Header: &pdpb.RequestHeader{ + SenderId: am.member.ID(), + }, + }) + if err != nil { + return nil, err + } + return resp.GetDcLocations(), nil +} + +func (am *AllocatorManager) getGRPCConn(addr string) (*grpc.ClientConn, bool) { + am.localAllocatorConn.RLock() + defer am.localAllocatorConn.RUnlock() + conn, ok := am.localAllocatorConn.clientConns[addr] + return conn, ok +} + +func (am *AllocatorManager) setGRPCConn(newConn *grpc.ClientConn, addr string) { + am.localAllocatorConn.Lock() + defer am.localAllocatorConn.Unlock() + if _, ok := am.localAllocatorConn.clientConns[addr]; ok { + newConn.Close() + log.Debug("use old connection", zap.String("target", newConn.Target()), zap.String("state", newConn.GetState().String())) + return + } + am.localAllocatorConn.clientConns[addr] = newConn +} diff --git a/server/tso/global_allocator.go b/server/tso/global_allocator.go index 36004fc3944..6ad12b88004 100644 --- a/server/tso/global_allocator.go +++ b/server/tso/global_allocator.go @@ -22,7 +22,6 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/grpcutil" "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/tsoutil" "github.com/tikv/pd/pkg/typeutil" @@ -59,11 +58,6 @@ type GlobalTSOAllocator struct { timestampOracle *timestampOracle // for global TSO synchronization allocatorManager *AllocatorManager - // for gRPC use - localAllocatorConn struct { - sync.RWMutex - clientConns map[string]*grpc.ClientConn - } } // NewGlobalTSOAllocator creates a new global TSO allocator. @@ -86,7 +80,6 @@ func NewGlobalTSOAllocator( }, allocatorManager: am, } - gta.localAllocatorConn.clientConns = make(map[string]*grpc.ClientConn) return gta } @@ -178,7 +171,7 @@ func (gta *GlobalTSOAllocator) syncMaxTS(ctx context.Context, dcLocationMap map[ var errList []error wg := sync.WaitGroup{} for _, leaderURL := range leaderURLs { - leaderConn, err := gta.getOrCreateGRPCConn(ctx, leaderURL) + leaderConn, err := gta.allocatorManager.GetOrCreateGRPCConn(ctx, leaderURL) if err != nil { return err } @@ -255,34 +248,6 @@ func (gta *GlobalTSOAllocator) checkSyncedDCs(dcLocationMap map[string][]uint64, return len(unsyncedDCs) == 0 } -func (gta *GlobalTSOAllocator) getOrCreateGRPCConn(ctx context.Context, addr string) (*grpc.ClientConn, error) { - gta.localAllocatorConn.RLock() - conn, ok := gta.localAllocatorConn.clientConns[addr] - gta.localAllocatorConn.RUnlock() - if ok { - return conn, nil - } - tlsCfg, err := gta.allocatorManager.securityConfig.ToTLSConfig() - if err != nil { - return nil, err - } - ctxWithTimeout, cancel := context.WithTimeout(ctx, dialTimeout) - defer cancel() - cc, err := grpcutil.GetClientConn(ctxWithTimeout, addr, tlsCfg) - if err != nil { - return nil, err - } - gta.localAllocatorConn.Lock() - defer gta.localAllocatorConn.Unlock() - if old, ok := gta.localAllocatorConn.clientConns[addr]; ok { - cc.Close() - log.Debug("use old connection", zap.String("target", cc.Target()), zap.String("state", cc.GetState().String())) - return old, nil - } - gta.localAllocatorConn.clientConns[addr] = cc - return cc, nil -} - func (gta *GlobalTSOAllocator) getCurrentTSO() (pdpb.Timestamp, error) { currentPhysical, currentLogical := gta.timestampOracle.getTSO() if currentPhysical == typeutil.ZeroTime { @@ -295,3 +260,12 @@ func (gta *GlobalTSOAllocator) getCurrentTSO() (pdpb.Timestamp, error) { func (gta *GlobalTSOAllocator) Reset() { gta.timestampOracle.ResetTimestamp() } + +func (gta *GlobalTSOAllocator) GetDcLocations() []string { + dcLocationsMap := gta.allocatorManager.GetClusterDCLocations() + dcLocations := make([]string, len(dcLocationsMap)) + for dc := range dcLocationsMap { + dcLocations = append(dcLocations, dc) + } + return dcLocations +} From 8923d67d39c9b02dcc3d9a4fdb77ccef5af3c4b6 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Mon, 2 Nov 2020 16:34:44 +0800 Subject: [PATCH 12/20] fix lint Signed-off-by: Song Gao --- server/grpc_service.go | 2 ++ server/tso/allocator_manager.go | 4 ++-- server/tso/global_allocator.go | 3 ++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/server/grpc_service.go b/server/grpc_service.go index c35d4dcee46..df5ea245728 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -995,6 +995,8 @@ func (s *Server) SyncMaxTS(ctx context.Context, request *pdpb.SyncMaxTSRequest) }, nil } +// GetDCLocations will return the dcLocations which hold by the Global TSO Allocator. +// If the receiving PD Member is not PD Leader, GetDCLocations will return error. func (s *Server) GetDCLocations(ctx context.Context, request *pdpb.GetDCLocationsRequest) (*pdpb.GetDCLocationsResponse, error) { if err := s.validateInternalRequest(request.GetHeader()); err != nil { return nil, err diff --git a/server/tso/allocator_manager.go b/server/tso/allocator_manager.go index c8fd0ff54db..cfb0f437766 100644 --- a/server/tso/allocator_manager.go +++ b/server/tso/allocator_manager.go @@ -680,7 +680,7 @@ func (am *AllocatorManager) GetLocalAllocatorLeaders() (map[string]*pdpb.Member, return localAllocatorLeaderMember, nil } -func (am *AllocatorManager) GetOrCreateGRPCConn(ctx context.Context, addr string) (*grpc.ClientConn, error) { +func (am *AllocatorManager) getOrCreateGRPCConn(ctx context.Context, addr string) (*grpc.ClientConn, error) { conn, ok := am.getGRPCConn(addr) if ok { return conn, nil @@ -725,7 +725,7 @@ func (am *AllocatorManager) getLeaderDCLocations(ctx context.Context) ([]string, if len(leaderAddrs) < 1 { return nil, fmt.Errorf("failed to get leader client url") } - conn, err := am.GetOrCreateGRPCConn(ctx, leaderAddrs[0]) + conn, err := am.getOrCreateGRPCConn(ctx, leaderAddrs[0]) if err != nil { return nil, err } diff --git a/server/tso/global_allocator.go b/server/tso/global_allocator.go index 6ad12b88004..939b5514d15 100644 --- a/server/tso/global_allocator.go +++ b/server/tso/global_allocator.go @@ -171,7 +171,7 @@ func (gta *GlobalTSOAllocator) syncMaxTS(ctx context.Context, dcLocationMap map[ var errList []error wg := sync.WaitGroup{} for _, leaderURL := range leaderURLs { - leaderConn, err := gta.allocatorManager.GetOrCreateGRPCConn(ctx, leaderURL) + leaderConn, err := gta.allocatorManager.getOrCreateGRPCConn(ctx, leaderURL) if err != nil { return err } @@ -261,6 +261,7 @@ func (gta *GlobalTSOAllocator) Reset() { gta.timestampOracle.ResetTimestamp() } +// GetDcLocations return all the dcLocations the GlobalTSOAllocator will check func (gta *GlobalTSOAllocator) GetDcLocations() []string { dcLocationsMap := gta.allocatorManager.GetClusterDCLocations() dcLocations := make([]string, len(dcLocationsMap)) From 343498ba9dc874d059436e899cc95d0cba574890 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Mon, 2 Nov 2020 16:46:53 +0800 Subject: [PATCH 13/20] fix lint Signed-off-by: Song Gao --- server/grpc_service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/grpc_service.go b/server/grpc_service.go index df5ea245728..84b6ad36e1d 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -1002,7 +1002,7 @@ func (s *Server) GetDCLocations(ctx context.Context, request *pdpb.GetDCLocation return nil, err } if !s.member.IsLeader() { - return nil, fmt.Errorf("receving pd member[%v] is not pd leader", s.member.ID()) + return nil, fmt.Errorf("receiving pd member[%v] is not pd leader", s.member.ID()) } allocator, err := s.GetTSOAllocatorManager().GetAllocator(config.GlobalDCLocation) if err != nil { From f65d35b60d32923c1bb93df85b7883c7a6dd0df7 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Mon, 2 Nov 2020 17:06:31 +0800 Subject: [PATCH 14/20] fix tidy Signed-off-by: Song Gao --- go.sum | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/go.sum b/go.sum index 32efa6f67e8..5e0eaea8e2f 100644 --- a/go.sum +++ b/go.sum @@ -53,8 +53,6 @@ github.com/VividCortex/mysqlerr v0.0.0-20200629151747-c28746d985dd h1:59Whn6shj5 github.com/VividCortex/mysqlerr v0.0.0-20200629151747-c28746d985dd/go.mod h1:f3HiCrHjHBdcm6E83vGaXh1KomZMA2P6aeo3hKx/wg0= github.com/VoltDB/voltdb-client-go v1.0.1/go.mod h1:FSuyUyPbMimaMJ9DItBzhVzjFj5S4XHcEISlKEnFZ5Q= github.com/XiaoMi/pegasus-go-client v0.0.0-20181029071519-9400942c5d1c/go.mod h1:KcL6D/4RZ8RAYzQ5gKI0odcdWUmCVlbQTOlWrhP71CY= -github.com/Yisaer/kvproto v0.0.0-20201102060001-4f78b5b9dc8c h1:UfLV1/V7HmuT4mIriXOY+iZlCZpqZ6RlAPwhJxBEk5U= -github.com/Yisaer/kvproto v0.0.0-20201102060001-4f78b5b9dc8c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/Yisaer/kvproto v0.0.0-20201102071311-c664be078e8e h1:0uuKHxg8bYBDBxyvcK/eszjzW/flSJXSvmTRbOrtpZs= github.com/Yisaer/kvproto v0.0.0-20201102071311-c664be078e8e/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/aerospike/aerospike-client-go v1.35.2/go.mod h1:zj8LBEnWBDOVEIJt8LvaRvDG5ARAoa5dBeHaB472NRc= @@ -313,7 +311,6 @@ github.com/gocql/gocql v0.0.0-20181124151448-70385f88b28b/go.mod h1:4Fw1eo5iaEhD github.com/gocql/gocql v0.0.0-20200103014340-68f928edb90a/go.mod h1:DL0ekTmBSTdlNF25Orwt/JMzqIq3EJ4MVa/J/uK64OY= github.com/godror/godror v0.10.3/go.mod h1:9MVLtu25FBJBMHkPs0m3Ngf/VmwGcLpM2HS8PlNGw9U= github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= -github.com/gogo/protobuf v0.0.0-20180717141946-636bf0302bc9/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= @@ -337,7 +334,6 @@ github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4er github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= -github.com/golang/protobuf v0.0.0-20180814211427-aa810b61a9c7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -632,16 +628,6 @@ github.com/pingcap/go-ycsb v0.0.0-20200226103513-00ca633a87d8/go.mod h1:B9UJ3Lbp github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3/go.mod h1:DazNTg0PTldtpsQiT9I5tVJwV1onHMKBBgXzmJUlMns= github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= -github.com/pingcap/kvproto v0.0.0-20190305055742-ab7debc182d9/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= -github.com/pingcap/kvproto v0.0.0-20190506024016-26344dff8f48/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= -github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20200214064158-62d31900d88e/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20200417092353-efbe03bcffbd/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20200420075417-e0c6e8842f22/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20200810113304-6157337686b1/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20201027123903-c4791e779a8c h1:O3tnG2+7F0yvTli099KS6yMAv6YtHx5EFW9tn46MpY8= -github.com/pingcap/kvproto v0.0.0-20201027123903-c4791e779a8c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad h1:SveG82rmu/GFxYanffxsSF503SiQV+2JLnWEiGiF+Tc= @@ -1124,7 +1110,6 @@ google.golang.org/appengine v1.6.2/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww google.golang.org/appengine v1.6.5 h1:tycE03LOZYQNhDpS27tcQdAzLCVMaj7QT2SXxebnpCM= google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= -google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto v0.0.0-20190418145605-e7d98fc518a7/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= @@ -1143,7 +1128,6 @@ google.golang.org/genproto v0.0.0-20191216164720-4f79533eabd1/go.mod h1:n3cpQtvx google.golang.org/genproto v0.0.0-20191230161307-f3c370f40bfb/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= google.golang.org/genproto v0.0.0-20200108215221-bd8f9a0ef82f h1:2wh8dWY8959cBGQvk1RD+/eQBgRYYDaZ+hT0/zsARoA= google.golang.org/genproto v0.0.0-20200108215221-bd8f9a0ef82f/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= -google.golang.org/grpc v0.0.0-20180607172857-7a6a684ca69e/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= From 5a2aa23a67fe8d016d276e3bf79977adb9e3410b Mon Sep 17 00:00:00 2001 From: Song Gao Date: Mon, 2 Nov 2020 17:42:23 +0800 Subject: [PATCH 15/20] fix sender header Signed-off-by: Song Gao --- server/tso/allocator_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/tso/allocator_manager.go b/server/tso/allocator_manager.go index cfb0f437766..b98b8d6ddfa 100644 --- a/server/tso/allocator_manager.go +++ b/server/tso/allocator_manager.go @@ -733,7 +733,7 @@ func (am *AllocatorManager) getLeaderDCLocations(ctx context.Context) ([]string, defer cancel() resp, err := pdpb.NewPDClient(conn).GetDCLocations(getCtx, &pdpb.GetDCLocationsRequest{ Header: &pdpb.RequestHeader{ - SenderId: am.member.ID(), + SenderId: am.member.GetLeader().GetMemberId(), }, }) if err != nil { From 906dd8a76b6c8ae7a3afd4098961e4c18a61f904 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Mon, 2 Nov 2020 18:32:27 +0800 Subject: [PATCH 16/20] fix nil Signed-off-by: Song Gao --- server/tso/allocator_manager.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/tso/allocator_manager.go b/server/tso/allocator_manager.go index b98b8d6ddfa..0b4de928d99 100644 --- a/server/tso/allocator_manager.go +++ b/server/tso/allocator_manager.go @@ -721,8 +721,8 @@ func (am *AllocatorManager) getLeaderDCLocations(ctx context.Context) ([]string, return dcLocations, nil } - leaderAddrs := am.member.GetLeader().ClientUrls - if len(leaderAddrs) < 1 { + leaderAddrs := am.member.GetLeader().GetClientUrls() + if leaderAddrs == nil || len(leaderAddrs) < 1 { return nil, fmt.Errorf("failed to get leader client url") } conn, err := am.getOrCreateGRPCConn(ctx, leaderAddrs[0]) From b511ceb90e871dde1419fb0904b244f6d3f7b072 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Mon, 2 Nov 2020 19:50:51 +0800 Subject: [PATCH 17/20] address the comment Signed-off-by: Song Gao --- server/tso/allocator_manager.go | 1 - server/tso/global_allocator.go | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/server/tso/allocator_manager.go b/server/tso/allocator_manager.go index 0b4de928d99..4b7a8ed44e6 100644 --- a/server/tso/allocator_manager.go +++ b/server/tso/allocator_manager.go @@ -755,7 +755,6 @@ func (am *AllocatorManager) setGRPCConn(newConn *grpc.ClientConn, addr string) { if _, ok := am.localAllocatorConn.clientConns[addr]; ok { newConn.Close() log.Debug("use old connection", zap.String("target", newConn.Target()), zap.String("state", newConn.GetState().String())) - return } am.localAllocatorConn.clientConns[addr] = newConn } diff --git a/server/tso/global_allocator.go b/server/tso/global_allocator.go index 939b5514d15..2a2dad139b3 100644 --- a/server/tso/global_allocator.go +++ b/server/tso/global_allocator.go @@ -264,7 +264,7 @@ func (gta *GlobalTSOAllocator) Reset() { // GetDcLocations return all the dcLocations the GlobalTSOAllocator will check func (gta *GlobalTSOAllocator) GetDcLocations() []string { dcLocationsMap := gta.allocatorManager.GetClusterDCLocations() - dcLocations := make([]string, len(dcLocationsMap)) + dcLocations := make([]string, 0, len(dcLocationsMap)) for dc := range dcLocationsMap { dcLocations = append(dcLocations, dc) } From 4ee50a3e26baffd6cd4139a04d579bd3430abf13 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Mon, 2 Nov 2020 19:56:23 +0800 Subject: [PATCH 18/20] fix error Signed-off-by: Song Gao --- server/tso/allocator_manager.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/tso/allocator_manager.go b/server/tso/allocator_manager.go index 4b7a8ed44e6..88479559753 100644 --- a/server/tso/allocator_manager.go +++ b/server/tso/allocator_manager.go @@ -696,7 +696,8 @@ func (am *AllocatorManager) getOrCreateGRPCConn(ctx context.Context, addr string return nil, err } am.setGRPCConn(cc, addr) - return cc, nil + conn, _ = am.getGRPCConn(addr) + return conn, nil } func (am *AllocatorManager) isLeaderAwareOfDCLocation(ctx context.Context, dcLocation string) (bool, error) { @@ -755,6 +756,7 @@ func (am *AllocatorManager) setGRPCConn(newConn *grpc.ClientConn, addr string) { if _, ok := am.localAllocatorConn.clientConns[addr]; ok { newConn.Close() log.Debug("use old connection", zap.String("target", newConn.Target()), zap.String("state", newConn.GetState().String())) + return } am.localAllocatorConn.clientConns[addr] = newConn } From 8c188f37bb4c286efa236c56f7c739fb2309cd93 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Tue, 3 Nov 2020 15:05:01 +0800 Subject: [PATCH 19/20] add unit test Signed-off-by: Song Gao --- tests/server/tso/local_tso_test.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/tests/server/tso/local_tso_test.go b/tests/server/tso/local_tso_test.go index 9a153e44fc8..470ff9d2c5f 100644 --- a/tests/server/tso/local_tso_test.go +++ b/tests/server/tso/local_tso_test.go @@ -15,6 +15,7 @@ package tso_test import ( "context" + "sort" "sync" "time" @@ -75,6 +76,13 @@ func (s *testLocalTSOSuite) TestLocalTSO(c *C) { cluster.WaitLeader() leaderServer := cluster.GetServer(cluster.GetLeader()) + s.testGetDcLocations(c, dcClientMap[leaderServer.GetConfig().LocalTSO.DCLocation], + &pdpb.GetDCLocationsRequest{ + Header: &pdpb.RequestHeader{ + SenderId: leaderServer.GetServer().GetMember().ID(), + }, + }, + []string{"dc-1", "dc-2", "dc-3"}) var wg sync.WaitGroup for i := 0; i < 10; i++ { @@ -125,3 +133,13 @@ func (s *testLocalTSOSuite) testGetLocalTimestamp(c *C, pdCli pdpb.PDClient, req c.Assert(res.GetLogical(), Greater, int64(0)) return res } + +func (s *testLocalTSOSuite) testGetDcLocations(c *C, pdCli pdpb.PDClient, req *pdpb.GetDCLocationsRequest, dcLocations []string) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + resp, err := pdCli.GetDCLocations(ctx, req) + c.Assert(err, IsNil) + sort.Strings(dcLocations) + sort.Strings(resp.DcLocations) + c.Assert(resp.DcLocations, DeepEquals, dcLocations) +} From 868570286454ebcc8b161783be94c48abe79b1c0 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Wed, 4 Nov 2020 12:35:13 +0800 Subject: [PATCH 20/20] update kvproto Signed-off-by: Song Gao --- go.mod | 3 +-- go.sum | 16 ++++++++++++++-- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index 91ee467fbbf..8fb53f73436 100644 --- a/go.mod +++ b/go.mod @@ -28,7 +28,7 @@ require ( github.com/pingcap/errcode v0.3.0 github.com/pingcap/errors v0.11.5-0.20200917111840-a15ef68f753d github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce - github.com/pingcap/kvproto v0.0.0-20201027123903-c4791e779a8c + github.com/pingcap/kvproto v0.0.0-20201104042953-62eb316d5182 github.com/pingcap/log v0.0.0-20200511115504-543df19646ad github.com/pingcap/sysutil v0.0.0-20201021075216-f93ced2829e2 github.com/pingcap/tiup v1.2.2 @@ -52,7 +52,6 @@ require ( ) replace ( - github.com/pingcap/kvproto => github.com/Yisaer/kvproto v0.0.0-20201102071311-c664be078e8e github.com/sirupsen/logrus => github.com/sirupsen/logrus v1.2.0 go.etcd.io/bbolt => go.etcd.io/bbolt v1.3.5 ) diff --git a/go.sum b/go.sum index 5e0eaea8e2f..32706f914e2 100644 --- a/go.sum +++ b/go.sum @@ -53,8 +53,6 @@ github.com/VividCortex/mysqlerr v0.0.0-20200629151747-c28746d985dd h1:59Whn6shj5 github.com/VividCortex/mysqlerr v0.0.0-20200629151747-c28746d985dd/go.mod h1:f3HiCrHjHBdcm6E83vGaXh1KomZMA2P6aeo3hKx/wg0= github.com/VoltDB/voltdb-client-go v1.0.1/go.mod h1:FSuyUyPbMimaMJ9DItBzhVzjFj5S4XHcEISlKEnFZ5Q= github.com/XiaoMi/pegasus-go-client v0.0.0-20181029071519-9400942c5d1c/go.mod h1:KcL6D/4RZ8RAYzQ5gKI0odcdWUmCVlbQTOlWrhP71CY= -github.com/Yisaer/kvproto v0.0.0-20201102071311-c664be078e8e h1:0uuKHxg8bYBDBxyvcK/eszjzW/flSJXSvmTRbOrtpZs= -github.com/Yisaer/kvproto v0.0.0-20201102071311-c664be078e8e/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/aerospike/aerospike-client-go v1.35.2/go.mod h1:zj8LBEnWBDOVEIJt8LvaRvDG5ARAoa5dBeHaB472NRc= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= github.com/akavel/rsrc v0.8.0/go.mod h1:uLoCtb9J+EyAqh+26kdrTgmzRBFPGOolLWKpdxkKq+c= @@ -311,6 +309,7 @@ github.com/gocql/gocql v0.0.0-20181124151448-70385f88b28b/go.mod h1:4Fw1eo5iaEhD github.com/gocql/gocql v0.0.0-20200103014340-68f928edb90a/go.mod h1:DL0ekTmBSTdlNF25Orwt/JMzqIq3EJ4MVa/J/uK64OY= github.com/godror/godror v0.10.3/go.mod h1:9MVLtu25FBJBMHkPs0m3Ngf/VmwGcLpM2HS8PlNGw9U= github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= +github.com/gogo/protobuf v0.0.0-20180717141946-636bf0302bc9/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= @@ -334,6 +333,7 @@ github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4er github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= +github.com/golang/protobuf v0.0.0-20180814211427-aa810b61a9c7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -628,6 +628,16 @@ github.com/pingcap/go-ycsb v0.0.0-20200226103513-00ca633a87d8/go.mod h1:B9UJ3Lbp github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3/go.mod h1:DazNTg0PTldtpsQiT9I5tVJwV1onHMKBBgXzmJUlMns= github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= +github.com/pingcap/kvproto v0.0.0-20190305055742-ab7debc182d9/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= +github.com/pingcap/kvproto v0.0.0-20190506024016-26344dff8f48/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= +github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= +github.com/pingcap/kvproto v0.0.0-20200214064158-62d31900d88e/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20200417092353-efbe03bcffbd/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20200420075417-e0c6e8842f22/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20200810113304-6157337686b1/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20201104042953-62eb316d5182 h1:werchMhPPSWn+MCbBGQdh9VpW1CJEZQ3+lrQO8zSb+4= +github.com/pingcap/kvproto v0.0.0-20201104042953-62eb316d5182/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad h1:SveG82rmu/GFxYanffxsSF503SiQV+2JLnWEiGiF+Tc= @@ -1110,6 +1120,7 @@ google.golang.org/appengine v1.6.2/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww google.golang.org/appengine v1.6.5 h1:tycE03LOZYQNhDpS27tcQdAzLCVMaj7QT2SXxebnpCM= google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto v0.0.0-20190418145605-e7d98fc518a7/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= @@ -1128,6 +1139,7 @@ google.golang.org/genproto v0.0.0-20191216164720-4f79533eabd1/go.mod h1:n3cpQtvx google.golang.org/genproto v0.0.0-20191230161307-f3c370f40bfb/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= google.golang.org/genproto v0.0.0-20200108215221-bd8f9a0ef82f h1:2wh8dWY8959cBGQvk1RD+/eQBgRYYDaZ+hT0/zsARoA= google.golang.org/genproto v0.0.0-20200108215221-bd8f9a0ef82f/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= +google.golang.org/grpc v0.0.0-20180607172857-7a6a684ca69e/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=