Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Commit

Permalink
backend: speed up uploading by open multi TCP connections (#400)
Browse files Browse the repository at this point in the history
* backend: use uncached gRPC channels

* backend: use connect pool of gRPCs.

* backend: add a conns pool to local backend

* backend: remove some unused logs

* local: make connpool private

* local: make init mutex...

* local: address comments
  • Loading branch information
YuJuncen authored Oct 13, 2020
1 parent aa029d8 commit da84e5d
Showing 1 changed file with 96 additions and 20 deletions.
116 changes: 96 additions & 20 deletions lightning/backend/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,23 @@ func (e *LocalFile) Cleanup(dataDir string) error {
return os.RemoveAll(dbPath)
}

type grpcClis struct {
mu sync.Mutex
clis map[uint64]*grpc.ClientConn
type gRPCConns struct {
mu sync.Mutex
conns map[uint64]*connPool
}

func (conns *gRPCConns) Close() {
conns.mu.Lock()
defer conns.mu.Unlock()

for _, cp := range conns.conns {
cp.Close()
}
}

type local struct {
engines sync.Map
grpcClis grpcClis
conns gRPCConns
splitCli split.SplitClient
tls *common.TLS
pdAddr string
Expand All @@ -131,6 +140,67 @@ type local struct {
ingestConcurrency *worker.Pool
batchWriteKVPairs int
checkpointEnabled bool

tcpConcurrency int
}

// connPool is a lazy pool of gRPC channels.
// When `Get` called, it lazily allocates new connection if connection not full.
// If it's full, then it will return allocated channels round-robin.
type connPool struct {
mu sync.Mutex

conns []*grpc.ClientConn
name string
next int
cap int
newConn func(ctx context.Context) (*grpc.ClientConn, error)
}

func (p *connPool) takeConns() (conns []*grpc.ClientConn) {
p.mu.Lock()
defer p.mu.Unlock()
p.conns, conns = nil, p.conns
p.next = 0
return conns
}

// Close closes the conn pool.
func (p *connPool) Close() {
for _, c := range p.takeConns() {
if err := c.Close(); err != nil {
log.L().Warn("failed to close clientConn", zap.String("target", c.Target()), log.ShortError(err))
}
}
}

// get tries to get an existing connection from the pool, or make a new one if the pool not full.
func (p *connPool) get(ctx context.Context) (*grpc.ClientConn, error) {
p.mu.Lock()
defer p.mu.Unlock()
if len(p.conns) < p.cap {
c, err := p.newConn(ctx)
if err != nil {
return nil, err
}
p.conns = append(p.conns, c)
return c, nil
}

conn := p.conns[p.next]
p.next = (p.next + 1) % p.cap
return conn, nil
}

// newConnPool creates a new connPool by the specified conn factory function and capacity.
func newConnPool(cap int, newConn func(ctx context.Context) (*grpc.ClientConn, error)) *connPool {
return &connPool{
cap: cap,
conns: make([]*grpc.ClientConn, 0, cap),
newConn: newConn,

mu: sync.Mutex{},
}
}

// NewLocalBackend creates new connections to tikv.
Expand Down Expand Up @@ -179,15 +249,15 @@ func NewLocalBackend(

rangeConcurrency: worker.NewPool(ctx, rangeConcurrency, "range"),
ingestConcurrency: worker.NewPool(ctx, rangeConcurrency*2, "ingest"),
tcpConcurrency: rangeConcurrency,
batchWriteKVPairs: sendKVPairs,
checkpointEnabled: enableCheckpoint,
}
local.grpcClis.clis = make(map[uint64]*grpc.ClientConn)
local.conns.conns = make(map[uint64]*connPool)
return MakeBackend(local), nil
}

func (local *local) getGrpcConnLocked(ctx context.Context, storeID uint64) (*grpc.ClientConn, error) {

func (local *local) makeConn(ctx context.Context, storeID uint64) (*grpc.ClientConn, error) {
store, err := local.splitCli.GetStore(ctx, storeID)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -220,18 +290,27 @@ func (local *local) getGrpcConnLocked(ctx context.Context, storeID uint64) (*grp
if err != nil {
return nil, errors.WithStack(err)
}
// Cache the conn.
local.grpcClis.clis[storeID] = conn
return conn, nil
}

func (local *local) getGrpcConnLocked(ctx context.Context, storeID uint64) (*grpc.ClientConn, error) {
if _, ok := local.conns.conns[storeID]; !ok {
local.conns.conns[storeID] = newConnPool(local.tcpConcurrency, func(ctx context.Context) (*grpc.ClientConn, error) {
return local.makeConn(ctx, storeID)
})
}
return local.conns.conns[storeID].get(ctx)
}

// Close the importer connection.
func (local *local) Close() {
local.engines.Range(func(k, v interface{}) bool {
v.(*LocalFile).Close()
return true
})

local.conns.Close()

// if checkpoint is disable or we finish load all data successfully, then files in this
// dir will be useless, so we clean up this dir and all files in it.
if !local.checkpointEnabled || common.IsEmptyDir(local.localStoreDir) {
Expand Down Expand Up @@ -354,17 +433,12 @@ func (local *local) CloseEngine(ctx context.Context, engineUUID uuid.UUID) error
}

func (local *local) getImportClient(ctx context.Context, peer *metapb.Peer) (sst.ImportSSTClient, error) {
local.grpcClis.mu.Lock()
defer local.grpcClis.mu.Unlock()
var err error
local.conns.mu.Lock()
defer local.conns.mu.Unlock()

conn, ok := local.grpcClis.clis[peer.GetStoreId()]
if !ok {
conn, err = local.getGrpcConnLocked(ctx, peer.GetStoreId())
if err != nil {
log.L().Error("could not get grpc connect ", zap.Uint64("storeId", peer.GetStoreId()))
return nil, err
}
conn, err := local.getGrpcConnLocked(ctx, peer.GetStoreId())
if err != nil {
return nil, err
}
return sst.NewImportSSTClient(conn), nil
}
Expand All @@ -378,6 +452,7 @@ func (local *local) WriteToTiKV(
region *split.RegionInfo,
start, end []byte,
) ([]*sst.SSTMeta, *Range, error) {
begin := time.Now()
var startKey, endKey []byte
if len(region.Region.StartKey) > 0 {
_, startKey, _ = codec.DecodeBytes(region.Region.StartKey, []byte{})
Expand Down Expand Up @@ -524,7 +599,8 @@ func (local *local) WriteToTiKV(
log.L().Debug("write to kv", zap.Reflect("region", region), zap.Uint64("leader", leaderID),
zap.Reflect("meta", meta), zap.Reflect("return metas", leaderPeerMetas),
zap.Int("kv_pairs", totalCount), zap.Int64("total_bytes", size),
zap.Int64("buf_size", bytesBuf.totalSize()))
zap.Int64("buf_size", bytesBuf.totalSize()),
zap.Stringer("takeTime", time.Since(begin)))

var remainRange *Range
if iter.Valid() && iter.Next() {
Expand Down

0 comments on commit da84e5d

Please sign in to comment.