Skip to content

Commit

Permalink
br/pkg/membuf: remove global buffer pool (#29934)
Browse files Browse the repository at this point in the history
  • Loading branch information
sleepymole authored Dec 23, 2021
1 parent ec55c38 commit dfc25ff
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 31 deletions.
15 changes: 9 additions & 6 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,9 @@ type local struct {
duplicateDetection bool
duplicateDB *pebble.DB
errorMgr *errormanager.ErrorManager
}

var bufferPool = membuf.NewPool(1024, manual.Allocator{})
bufferPool *membuf.Pool
}

func openDuplicateDB(storeDir string) (*pebble.DB, error) {
dbPath := filepath.Join(storeDir, duplicateDBName)
Expand Down Expand Up @@ -244,6 +244,8 @@ func NewLocalBackend(
checkTiKVAvaliable: cfg.App.CheckRequirements,
duplicateDB: duplicateDB,
errorMgr: errorMgr,

bufferPool: membuf.NewPool(membuf.WithAllocator(manual.Allocator{})),
}
local.conns = common.NewGRPCConns()
if err = local.checkMultiIngestSupport(ctx); err != nil {
Expand Down Expand Up @@ -423,6 +425,7 @@ func (local *local) Close() {
engine.unlock()
}
local.conns.Close()
local.bufferPool.Destroy()

if local.duplicateDB != nil {
// Check whether there are duplicates.
Expand Down Expand Up @@ -776,7 +779,7 @@ func (local *local) WriteToTiKV(
requests = append(requests, req)
}

bytesBuf := bufferPool.NewBuffer()
bytesBuf := local.bufferPool.NewBuffer()
defer bytesBuf.Destroy()
pairs := make([]*sst.Pair, 0, local.batchWriteKVPairs)
count := 0
Expand Down Expand Up @@ -1664,14 +1667,14 @@ func (local *local) LocalWriter(ctx context.Context, cfg *backend.LocalWriterCon
return nil, errors.Errorf("could not find engine for %s", engineUUID.String())
}
engine := e.(*Engine)
return openLocalWriter(cfg, engine, local.localWriterMemCacheSize)
return openLocalWriter(cfg, engine, local.localWriterMemCacheSize, local.bufferPool.NewBuffer())
}

func openLocalWriter(cfg *backend.LocalWriterConfig, engine *Engine, cacheSize int64) (*Writer, error) {
func openLocalWriter(cfg *backend.LocalWriterConfig, engine *Engine, cacheSize int64, kvBuffer *membuf.Buffer) (*Writer, error) {
w := &Writer{
engine: engine,
memtableSizeLimit: cacheSize,
kvBuffer: bufferPool.NewBuffer(),
kvBuffer: kvBuffer,
isKVSorted: cfg.IsKVSorted,
isWriteBatchSorted: true,
}
Expand Down
6 changes: 5 additions & 1 deletion br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
"github.com/pingcap/tidb/br/pkg/membuf"
"github.com/pingcap/tidb/br/pkg/mock"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/restore"
Expand Down Expand Up @@ -357,7 +358,10 @@ func testLocalWriter(c *C, needSort bool, partitialSort bool) {
f.wg.Add(1)
go f.ingestSSTLoop()
sorted := needSort && !partitialSort
w, err := openLocalWriter(&backend.LocalWriterConfig{IsKVSorted: sorted}, f, 1024)
pool := membuf.NewPool()
defer pool.Destroy()
kvBuffer := pool.NewBuffer()
w, err := openLocalWriter(&backend.LocalWriterConfig{IsKVSorted: sorted}, f, 1024, kvBuffer)
c.Assert(err, IsNil)

ctx := context.Background()
Expand Down
81 changes: 63 additions & 18 deletions br/pkg/membuf/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@

package membuf

const bigValueSize = 1 << 16 // 64K

var allocBufLen = 1 << 20 // 1M
const (
defaultPoolSize = 1024
defaultBlockSize = 1 << 20 // 1M
defaultLargeAllocThreshold = 1 << 16 // 64K
)

// Allocator is the abstract interface for allocating and freeing memory.
type Allocator interface {
Expand All @@ -38,30 +40,71 @@ func (stdAllocator) Free(_ []byte) {}
// garbage collector which always release the memory so late. Use a fixed size chan to reuse
// can decrease the memory usage to 1/3 compare with sync.Pool.
type Pool struct {
allocator Allocator
recycleCh chan []byte
allocator Allocator
blockSize int
blockCache chan []byte
largeAllocThreshold int
}

// Option configures a pool.
type Option func(p *Pool)

// WithPoolSize configures how many blocks cached by this pool.
func WithPoolSize(size int) Option {
return func(p *Pool) {
p.blockCache = make(chan []byte, size)
}
}

// WithBlockSize configures the size of each block.
func WithBlockSize(size int) Option {
return func(p *Pool) {
p.blockSize = size
}
}

// WithAllocator specifies the allocator used by pool to allocate and free memory.
func WithAllocator(allocator Allocator) Option {
return func(p *Pool) {
p.allocator = allocator
}
}

// WithLargeAllocThreshold configures the threshold for large allocation of a Buffer.
// If allocate size is larger than this threshold, bytes will be allocated directly
// by the make built-in function and won't be tracked by the pool.
func WithLargeAllocThreshold(threshold int) Option {
return func(p *Pool) {
p.largeAllocThreshold = threshold
}
}

// NewPool creates a new pool.
func NewPool(size int, allocator Allocator) *Pool {
return &Pool{
allocator: allocator,
recycleCh: make(chan []byte, size),
func NewPool(opts ...Option) *Pool {
p := &Pool{
allocator: stdAllocator{},
blockSize: defaultBlockSize,
blockCache: make(chan []byte, defaultPoolSize),
largeAllocThreshold: defaultLargeAllocThreshold,
}
for _, opt := range opts {
opt(p)
}
return p
}

func (p *Pool) acquire() []byte {
select {
case b := <-p.recycleCh:
case b := <-p.blockCache:
return b
default:
return p.allocator.Alloc(allocBufLen)
return p.allocator.Alloc(p.blockSize)
}
}

func (p *Pool) release(b []byte) {
select {
case p.recycleCh <- b:
case p.blockCache <- b:
default:
p.allocator.Free(b)
}
Expand All @@ -72,10 +115,12 @@ func (p *Pool) NewBuffer() *Buffer {
return &Buffer{pool: p, bufs: make([][]byte, 0, 128), curBufIdx: -1}
}

var globalPool = NewPool(1024, stdAllocator{})

// NewBuffer creates a new buffer in global pool.
func NewBuffer() *Buffer { return globalPool.NewBuffer() }
func (p *Pool) Destroy() {
close(p.blockCache)
for b := range p.blockCache {
p.allocator.Free(b)
}
}

// Buffer represents the reuse buffer.
type Buffer struct {
Expand Down Expand Up @@ -123,12 +168,12 @@ func (b *Buffer) Destroy() {

// TotalSize represents the total memory size of this Buffer.
func (b *Buffer) TotalSize() int64 {
return int64(len(b.bufs) * allocBufLen)
return int64(len(b.bufs) * b.pool.blockSize)
}

// AllocBytes allocates bytes with the given length.
func (b *Buffer) AllocBytes(n int) []byte {
if n > bigValueSize {
if n > b.pool.largeAllocThreshold {
return make([]byte, n)
}
if b.curIdx+n > b.curBufLen {
Expand Down
20 changes: 14 additions & 6 deletions br/pkg/membuf/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ import (
"github.com/stretchr/testify/require"
)

func init() {
allocBufLen = 1024
}

type testAllocator struct {
allocs int
frees int
Expand All @@ -41,7 +37,13 @@ func (t *testAllocator) Free(_ []byte) {

func TestBufferPool(t *testing.T) {
allocator := &testAllocator{}
pool := NewPool(2, allocator)
pool := NewPool(
WithPoolSize(2),
WithAllocator(allocator),
WithBlockSize(1024),
WithLargeAllocThreshold(512),
)
defer pool.Destroy()

bytesBuf := pool.NewBuffer()
bytesBuf.AllocBytes(256)
Expand All @@ -53,6 +55,10 @@ func TestBufferPool(t *testing.T) {
bytesBuf.AllocBytes(767)
require.Equal(t, 2, allocator.allocs)

largeBytes := bytesBuf.AllocBytes(513)
require.Equal(t, 513, len(largeBytes))
require.Equal(t, 2, allocator.allocs)

require.Equal(t, 0, allocator.frees)
bytesBuf.Destroy()
require.Equal(t, 0, allocator.frees)
Expand All @@ -67,7 +73,9 @@ func TestBufferPool(t *testing.T) {
}

func TestBufferIsolation(t *testing.T) {
bytesBuf := NewBuffer()
pool := NewPool(WithBlockSize(1024))
defer pool.Destroy()
bytesBuf := pool.NewBuffer()
defer bytesBuf.Destroy()

b1 := bytesBuf.AllocBytes(16)
Expand Down

0 comments on commit dfc25ff

Please sign in to comment.