From a30148cd62b331085887300e2ad2ac5f158a83fe Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Wed, 29 May 2019 13:35:01 +0800 Subject: [PATCH] store/tikv: recycle idle connection in tikv client (#10616) if a tikv addr has been idle for a while, recycle its connection --- store/tikv/client.go | 73 +++++++++++++++++++++++++++++++++------ store/tikv/client_test.go | 2 ++ 2 files changed, 65 insertions(+), 10 deletions(-) diff --git a/store/tikv/client.go b/store/tikv/client.go index ebc5ef266fa2c..09a34912463bf 100644 --- a/store/tikv/client.go +++ b/store/tikv/client.go @@ -83,6 +83,11 @@ type connArray struct { batchCommandsCh chan *batchCommandsEntry batchCommandsClients []*batchCommandsClient tikvTransportLayerLoad uint64 + + // Notify rpcClient to check the idle flag + idleNotify *uint32 + idle bool + idleDetect *time.Timer } type batchCommandsClient struct { @@ -132,7 +137,6 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) { // When `conn.Close()` is called, `client.Recv()` will return an error. resp, err := c.client.Recv() if err != nil { - now := time.Now() for { // try to re-create the streaming in the loop. if c.isStopped() { @@ -198,8 +202,9 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) { } } -func newConnArray(maxSize uint, addr string, security config.Security) (*connArray, error) { +func newConnArray(maxSize uint, addr string, security config.Security, idleNotify *uint32) (*connArray, error) { cfg := config.GetGlobalConfig() + a := &connArray{ index: 0, v: make([]*grpc.ClientConn, maxSize), @@ -208,6 +213,9 @@ func newConnArray(maxSize uint, addr string, security config.Security) (*connArr batchCommandsCh: make(chan *batchCommandsEntry, cfg.TiKVClient.MaxBatchSize), batchCommandsClients: make([]*batchCommandsClient, 0, maxSize), tikvTransportLayerLoad: 0, + + idleNotify: idleNotify, + idleDetect: time.NewTimer(idleTimeout), } if err := a.Init(addr, security); err != nil { return nil, err @@ -328,15 +336,29 @@ type batchCommandsEntry struct { err error } +const idleTimeout = 3 * time.Minute + // fetchAllPendingRequests fetches all pending requests from the channel. -func fetchAllPendingRequests( - ch chan *batchCommandsEntry, +func (a *connArray) fetchAllPendingRequests( maxBatchSize int, entries *[]*batchCommandsEntry, requests *[]*tikvpb.BatchCommandsRequest_Request, ) { // Block on the first element. - headEntry := <-ch + var headEntry *batchCommandsEntry + select { + case headEntry = <-a.batchCommandsCh: + if !a.idleDetect.Stop() { + <-a.idleDetect.C + } + a.idleDetect.Reset(idleTimeout) + case <-a.idleDetect.C: + a.idleDetect.Reset(idleTimeout) + a.idle = true + atomic.CompareAndSwapUint32(a.idleNotify, 0, 1) + // This connArray to be recycled + return + } if headEntry == nil { return } @@ -346,7 +368,7 @@ func fetchAllPendingRequests( // This loop is for trying best to collect more requests. for len(*entries) < maxBatchSize { select { - case entry := <-ch: + case entry := <-a.batchCommandsCh: if entry == nil { return } @@ -431,7 +453,7 @@ func (a *connArray) batchSendLoop(cfg config.TiKVClient) { requestIDs = requestIDs[:0] metrics.TiKVPendingBatchRequests.Set(float64(len(a.batchCommandsCh))) - fetchAllPendingRequests(a.batchCommandsCh, int(cfg.MaxBatchSize), &entries, &requests) + a.fetchAllPendingRequests(int(cfg.MaxBatchSize), &entries, &requests) if len(entries) < int(cfg.MaxBatchSize) && cfg.MaxBatchWaitTime > 0 { tikvTransportLayerLoad := atomic.LoadUint64(batchCommandsClient.tikvTransportLayerLoad) @@ -488,13 +510,15 @@ func (a *connArray) batchSendLoop(cfg config.TiKVClient) { // TODO: Add flow control between RPC clients in TiDB ond RPC servers in TiKV. // Since we use shared client connection to communicate to the same TiKV, it's possible // that there are too many concurrent requests which overload the service of TiKV. -// TODO: Implement background cleanup. It adds a background goroutine to periodically check -// whether there is any connection is idle and then close and remove these idle connections. type rpcClient struct { sync.RWMutex isClosed bool conns map[string]*connArray security config.Security + + // Implement background cleanup. + // Periodically check whether there is any connection that is idle and then close and remove these idle connections. + idleNotify uint32 } func newRPCClient(security config.Security) *rpcClient { @@ -529,7 +553,7 @@ func (c *rpcClient) createConnArray(addr string) (*connArray, error) { if !ok { var err error connCount := config.GetGlobalConfig().TiKVClient.GrpcConnectionCount - array, err = newConnArray(connCount, addr, c.security) + array, err = newConnArray(connCount, addr, c.security, &c.idleNotify) if err != nil { return nil, err } @@ -594,6 +618,10 @@ func (c *rpcClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R metrics.TiKVSendReqHistogram.WithLabelValues(reqType, storeID).Observe(time.Since(start).Seconds()) }() + if atomic.CompareAndSwapUint32(&c.idleNotify, 1, 0) { + c.recycleIdleConnArray() + } + connArray, err := c.getConnArray(addr) if err != nil { return nil, errors.Trace(err) @@ -654,3 +682,28 @@ func (c *rpcClient) Close() error { c.closeConns() return nil } + +func (c *rpcClient) recycleIdleConnArray() { + var addrs []string + c.RLock() + for _, conn := range c.conns { + if conn.idle { + addrs = append(addrs, conn.target) + } + } + c.RUnlock() + + for _, addr := range addrs { + c.Lock() + conn, ok := c.conns[addr] + if ok { + delete(c.conns, addr) + logutil.Logger(context.Background()).Info("recycle idle connection", + zap.String("target", addr)) + } + c.Unlock() + if conn != nil { + conn.Close() + } + } +} diff --git a/store/tikv/client_test.go b/store/tikv/client_test.go index 5063acb77eeee..b673e9b674358 100644 --- a/store/tikv/client_test.go +++ b/store/tikv/client_test.go @@ -45,6 +45,8 @@ func (s *testClientSuite) TestConn(c *C) { c.Assert(err, IsNil) c.Assert(conn2.Get(), Not(Equals), conn1.Get()) + client.recycleIdleConnArray() + client.Close() conn3, err := client.getConnArray(addr) c.Assert(err, NotNil)