From 9c82e8c72b96eec1e7667a0e139a07b944c33b75 Mon Sep 17 00:00:00 2001 From: Wilson Wang Date: Mon, 24 May 2021 15:15:23 -0700 Subject: [PATCH] server: set multiple concurrentReadTx instances share one txReadBuffer. --- server/mvcc/backend/backend.go | 80 +++++++++++++++++++++++++++++++- server/mvcc/backend/tx_buffer.go | 13 +++++- 2 files changed, 89 insertions(+), 4 deletions(-) diff --git a/server/mvcc/backend/backend.go b/server/mvcc/backend/backend.go index 2acf3e46a3b..b7207c1717a 100644 --- a/server/mvcc/backend/backend.go +++ b/server/mvcc/backend/backend.go @@ -79,6 +79,12 @@ type Snapshot interface { Close() error } +type txReadBufferCache struct { + mu sync.Mutex + buf *txReadBuffer + bufVersion uint64 +} + type backend struct { // size and commits are used with atomic operations so they must be // 64-bit aligned, otherwise 32-bit tests will crash @@ -102,6 +108,11 @@ type backend struct { batchTx *batchTxBuffered readTx *readTx + // txReadBufferCache mirrors "txReadBuffer" within "readTx" -- readTx.baseReadTx.buf. + // When creating "concurrentReadTx": + // - if the cache is up-to-date, "readTx.baseReadTx.buf" copy can be skipped + // - if the cache is empty or outdated, "readTx.baseReadTx.buf" copy is required + txReadBufferCache txReadBufferCache stopc chan struct{} donec chan struct{} @@ -183,19 +194,26 @@ func newBackend(bcfg BackendConfig) *backend { readTx: &readTx{ baseReadTx: baseReadTx{ buf: txReadBuffer{ - txBuffer: txBuffer{make(map[BucketID]*bucketBuffer)}, + txBuffer: txBuffer{make(map[BucketID]*bucketBuffer)}, + bufVersion: 0, }, buckets: make(map[BucketID]*bolt.Bucket), txWg: new(sync.WaitGroup), txMu: new(sync.RWMutex), }, }, + txReadBufferCache: txReadBufferCache{ + mu: sync.Mutex{}, + bufVersion: 0, + buf: nil, + }, stopc: make(chan struct{}), donec: make(chan struct{}), lg: bcfg.Logger, } + b.batchTx = newBatchTxBuffered(b) // We set it after newBatchTxBuffered to skip the 'empty' commit. b.hooks = bcfg.Hooks @@ -221,10 +239,68 @@ func (b *backend) ConcurrentReadTx() ReadTx { defer b.readTx.RUnlock() // prevent boltdb read Tx from been rolled back until store read Tx is done. Needs to be called when holding readTx.RLock(). b.readTx.txWg.Add(1) + // TODO: might want to copy the read buffer lazily - create copy when A) end of a write transaction B) end of a batch interval. + + // inspect/update cache recency iff there's no ongoing update to the cache + // this falls through if there's no cache update + + // by this line, "ConcurrentReadTx" code path is already protected against concurrent "writeback" operations + // which requires write lock to update "readTx.baseReadTx.buf". + // Which means setting "buf *txReadBuffer" with "readTx.buf.unsafeCopy()" is guaranteed to be up-to-date, + // whereas "txReadBufferCache.buf" may be stale from concurrent "writeback" operations. + // We only update "txReadBufferCache.buf" if we know "buf *txReadBuffer" is up-to-date. + // The update to "txReadBufferCache.buf" will benefit the following "ConcurrentReadTx" creation + // by avoiding copying "readTx.baseReadTx.buf". + b.txReadBufferCache.mu.Lock() + + curCache := b.txReadBufferCache.buf + curCacheVer := b.txReadBufferCache.bufVersion + curBufVer := b.readTx.buf.bufVersion + + isEmptyCache := curCache == nil + isStaleCache := curCacheVer != curBufVer + + var buf *txReadBuffer + switch { + case isEmptyCache: + // perform safe copy of buffer while holding "b.txReadBufferCache.mu.Lock" + // this is only supposed to run once so there won't be much overhead + curBuf := b.readTx.buf.unsafeCopy() + buf = &curBuf + case isStaleCache: + // to maximize the concurrency, try unsafe copy of buffer + // release the lock while copying buffer -- cache may become stale again and + // get overwritten by someone else. + // therefore, we need to check the readTx buffer version again + b.txReadBufferCache.mu.Unlock() + curBuf := b.readTx.buf.unsafeCopy() + b.txReadBufferCache.mu.Lock() + buf = &curBuf + default: + // neither empty nor stale cache, just use the current buffer + buf = curCache + } + // txReadBufferCache.bufVersion can be modified when we doing an unsafeCopy() + // as a result, curCacheVer could be no longer the same as + // txReadBufferCache.bufVersion + // if !isEmptyCache && curCacheVer != b.txReadBufferCache.bufVersion + // then the cache became stale while copying "readTx.baseReadTx.buf". + // It is safe to not update "txReadBufferCache.buf", because the next following + // "ConcurrentReadTx" creation will trigger a new "readTx.baseReadTx.buf" copy + // and "buf" is still used for the current "concurrentReadTx.baseReadTx.buf". + if isEmptyCache || curCacheVer == b.txReadBufferCache.bufVersion { + // continue if the cache is never set or no one has modified the cache + b.txReadBufferCache.buf = buf + b.txReadBufferCache.bufVersion = curBufVer + } + + b.txReadBufferCache.mu.Unlock() + + // concurrentReadTx is not supposed to write to its txReadBuffer return &concurrentReadTx{ baseReadTx: baseReadTx{ - buf: b.readTx.buf.unsafeCopy(), + buf: *buf, txMu: b.readTx.txMu, tx: b.readTx.tx, buckets: b.readTx.buckets, diff --git a/server/mvcc/backend/tx_buffer.go b/server/mvcc/backend/tx_buffer.go index cdb5180123a..779255b7320 100644 --- a/server/mvcc/backend/tx_buffer.go +++ b/server/mvcc/backend/tx_buffer.go @@ -19,6 +19,8 @@ import ( "sort" ) +const bucketBufferInitialSize = 512 + // txBuffer handles functionality shared between txWriteBuffer and txReadBuffer. type txBuffer struct { buckets map[BucketID]*bucketBuffer @@ -88,10 +90,16 @@ func (txw *txWriteBuffer) writeback(txr *txReadBuffer) { rb.merge(wb) } txw.reset() + // increase the buffer version + txr.bufVersion++ } // txReadBuffer accesses buffered updates. -type txReadBuffer struct{ txBuffer } +type txReadBuffer struct { + txBuffer + // bufVersion is used to check if the buffer is modified recently + bufVersion uint64 +} func (txr *txReadBuffer) Range(bucket Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) { if b := txr.buckets[bucket.ID()]; b != nil { @@ -113,6 +121,7 @@ func (txr *txReadBuffer) unsafeCopy() txReadBuffer { txBuffer: txBuffer{ buckets: make(map[BucketID]*bucketBuffer, len(txr.txBuffer.buckets)), }, + bufVersion: 0, } for bucketName, bucket := range txr.txBuffer.buckets { txrCopy.txBuffer.buckets[bucketName] = bucket.Copy() @@ -133,7 +142,7 @@ type bucketBuffer struct { } func newBucketBuffer() *bucketBuffer { - return &bucketBuffer{buf: make([]kv, 512), used: 0} + return &bucketBuffer{buf: make([]kv, bucketBufferInitialSize), used: 0} } func (bb *bucketBuffer) Range(key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) {