From dfc25ff06bf6955652c6f027a4e8718372d3ac6f Mon Sep 17 00:00:00 2001 From: Yujie Xia Date: Thu, 23 Dec 2021 11:13:47 +0800 Subject: [PATCH] br/pkg/membuf: remove global buffer pool (#29934) --- br/pkg/lightning/backend/local/local.go | 15 ++-- br/pkg/lightning/backend/local/local_test.go | 6 +- br/pkg/membuf/buffer.go | 81 +++++++++++++++----- br/pkg/membuf/buffer_test.go | 20 +++-- 4 files changed, 91 insertions(+), 31 deletions(-) diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index eb7ab37802e4e..b703acec49395 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -150,9 +150,9 @@ type local struct { duplicateDetection bool duplicateDB *pebble.DB errorMgr *errormanager.ErrorManager -} -var bufferPool = membuf.NewPool(1024, manual.Allocator{}) + bufferPool *membuf.Pool +} func openDuplicateDB(storeDir string) (*pebble.DB, error) { dbPath := filepath.Join(storeDir, duplicateDBName) @@ -244,6 +244,8 @@ func NewLocalBackend( checkTiKVAvaliable: cfg.App.CheckRequirements, duplicateDB: duplicateDB, errorMgr: errorMgr, + + bufferPool: membuf.NewPool(membuf.WithAllocator(manual.Allocator{})), } local.conns = common.NewGRPCConns() if err = local.checkMultiIngestSupport(ctx); err != nil { @@ -423,6 +425,7 @@ func (local *local) Close() { engine.unlock() } local.conns.Close() + local.bufferPool.Destroy() if local.duplicateDB != nil { // Check whether there are duplicates. @@ -776,7 +779,7 @@ func (local *local) WriteToTiKV( requests = append(requests, req) } - bytesBuf := bufferPool.NewBuffer() + bytesBuf := local.bufferPool.NewBuffer() defer bytesBuf.Destroy() pairs := make([]*sst.Pair, 0, local.batchWriteKVPairs) count := 0 @@ -1664,14 +1667,14 @@ func (local *local) LocalWriter(ctx context.Context, cfg *backend.LocalWriterCon return nil, errors.Errorf("could not find engine for %s", engineUUID.String()) } engine := e.(*Engine) - return openLocalWriter(cfg, engine, local.localWriterMemCacheSize) + return openLocalWriter(cfg, engine, local.localWriterMemCacheSize, local.bufferPool.NewBuffer()) } -func openLocalWriter(cfg *backend.LocalWriterConfig, engine *Engine, cacheSize int64) (*Writer, error) { +func openLocalWriter(cfg *backend.LocalWriterConfig, engine *Engine, cacheSize int64, kvBuffer *membuf.Buffer) (*Writer, error) { w := &Writer{ engine: engine, memtableSizeLimit: cacheSize, - kvBuffer: bufferPool.NewBuffer(), + kvBuffer: kvBuffer, isKVSorted: cfg.IsKVSorted, isWriteBatchSorted: true, } diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index 747034068c463..35c13692dce3e 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -46,6 +46,7 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/lightning/mydump" + "github.com/pingcap/tidb/br/pkg/membuf" "github.com/pingcap/tidb/br/pkg/mock" "github.com/pingcap/tidb/br/pkg/pdutil" "github.com/pingcap/tidb/br/pkg/restore" @@ -357,7 +358,10 @@ func testLocalWriter(c *C, needSort bool, partitialSort bool) { f.wg.Add(1) go f.ingestSSTLoop() sorted := needSort && !partitialSort - w, err := openLocalWriter(&backend.LocalWriterConfig{IsKVSorted: sorted}, f, 1024) + pool := membuf.NewPool() + defer pool.Destroy() + kvBuffer := pool.NewBuffer() + w, err := openLocalWriter(&backend.LocalWriterConfig{IsKVSorted: sorted}, f, 1024, kvBuffer) c.Assert(err, IsNil) ctx := context.Background() diff --git a/br/pkg/membuf/buffer.go b/br/pkg/membuf/buffer.go index 172d99baec9aa..49ffbae8afdf3 100644 --- a/br/pkg/membuf/buffer.go +++ b/br/pkg/membuf/buffer.go @@ -14,9 +14,11 @@ package membuf -const bigValueSize = 1 << 16 // 64K - -var allocBufLen = 1 << 20 // 1M +const ( + defaultPoolSize = 1024 + defaultBlockSize = 1 << 20 // 1M + defaultLargeAllocThreshold = 1 << 16 // 64K +) // Allocator is the abstract interface for allocating and freeing memory. type Allocator interface { @@ -38,30 +40,71 @@ func (stdAllocator) Free(_ []byte) {} // garbage collector which always release the memory so late. Use a fixed size chan to reuse // can decrease the memory usage to 1/3 compare with sync.Pool. type Pool struct { - allocator Allocator - recycleCh chan []byte + allocator Allocator + blockSize int + blockCache chan []byte + largeAllocThreshold int +} + +// Option configures a pool. +type Option func(p *Pool) + +// WithPoolSize configures how many blocks cached by this pool. +func WithPoolSize(size int) Option { + return func(p *Pool) { + p.blockCache = make(chan []byte, size) + } +} + +// WithBlockSize configures the size of each block. +func WithBlockSize(size int) Option { + return func(p *Pool) { + p.blockSize = size + } +} + +// WithAllocator specifies the allocator used by pool to allocate and free memory. +func WithAllocator(allocator Allocator) Option { + return func(p *Pool) { + p.allocator = allocator + } +} + +// WithLargeAllocThreshold configures the threshold for large allocation of a Buffer. +// If allocate size is larger than this threshold, bytes will be allocated directly +// by the make built-in function and won't be tracked by the pool. +func WithLargeAllocThreshold(threshold int) Option { + return func(p *Pool) { + p.largeAllocThreshold = threshold + } } // NewPool creates a new pool. -func NewPool(size int, allocator Allocator) *Pool { - return &Pool{ - allocator: allocator, - recycleCh: make(chan []byte, size), +func NewPool(opts ...Option) *Pool { + p := &Pool{ + allocator: stdAllocator{}, + blockSize: defaultBlockSize, + blockCache: make(chan []byte, defaultPoolSize), + largeAllocThreshold: defaultLargeAllocThreshold, + } + for _, opt := range opts { + opt(p) } + return p } func (p *Pool) acquire() []byte { select { - case b := <-p.recycleCh: + case b := <-p.blockCache: return b default: - return p.allocator.Alloc(allocBufLen) + return p.allocator.Alloc(p.blockSize) } } func (p *Pool) release(b []byte) { select { - case p.recycleCh <- b: + case p.blockCache <- b: default: p.allocator.Free(b) } @@ -72,10 +115,12 @@ func (p *Pool) NewBuffer() *Buffer { return &Buffer{pool: p, bufs: make([][]byte, 0, 128), curBufIdx: -1} } -var globalPool = NewPool(1024, stdAllocator{}) - -// NewBuffer creates a new buffer in global pool. -func NewBuffer() *Buffer { return globalPool.NewBuffer() } +func (p *Pool) Destroy() { + close(p.blockCache) + for b := range p.blockCache { + p.allocator.Free(b) + } +} // Buffer represents the reuse buffer. type Buffer struct { @@ -123,12 +168,12 @@ func (b *Buffer) Destroy() { // TotalSize represents the total memory size of this Buffer. func (b *Buffer) TotalSize() int64 { - return int64(len(b.bufs) * allocBufLen) + return int64(len(b.bufs) * b.pool.blockSize) } // AllocBytes allocates bytes with the given length. func (b *Buffer) AllocBytes(n int) []byte { - if n > bigValueSize { + if n > b.pool.largeAllocThreshold { return make([]byte, n) } if b.curIdx+n > b.curBufLen { diff --git a/br/pkg/membuf/buffer_test.go b/br/pkg/membuf/buffer_test.go index c5d095d299f9c..fa45c5c4e34b1 100644 --- a/br/pkg/membuf/buffer_test.go +++ b/br/pkg/membuf/buffer_test.go @@ -21,10 +21,6 @@ import ( "github.com/stretchr/testify/require" ) -func init() { - allocBufLen = 1024 -} - type testAllocator struct { allocs int frees int @@ -41,7 +37,13 @@ func (t *testAllocator) Free(_ []byte) { func TestBufferPool(t *testing.T) { allocator := &testAllocator{} - pool := NewPool(2, allocator) + pool := NewPool( + WithPoolSize(2), + WithAllocator(allocator), + WithBlockSize(1024), + WithLargeAllocThreshold(512), + ) + defer pool.Destroy() bytesBuf := pool.NewBuffer() bytesBuf.AllocBytes(256) @@ -53,6 +55,10 @@ func TestBufferPool(t *testing.T) { bytesBuf.AllocBytes(767) require.Equal(t, 2, allocator.allocs) + largeBytes := bytesBuf.AllocBytes(513) + require.Equal(t, 513, len(largeBytes)) + require.Equal(t, 2, allocator.allocs) + require.Equal(t, 0, allocator.frees) bytesBuf.Destroy() require.Equal(t, 0, allocator.frees) @@ -67,7 +73,9 @@ func TestBufferPool(t *testing.T) { } func TestBufferIsolation(t *testing.T) { - bytesBuf := NewBuffer() + pool := NewPool(WithBlockSize(1024)) + defer pool.Destroy() + bytesBuf := pool.NewBuffer() defer bytesBuf.Destroy() b1 := bytesBuf.AllocBytes(16)