From 2ec1a2567065294ede7c81ac44a2254b6ec23487 Mon Sep 17 00:00:00 2001 From: Jingyi Hu Date: Wed, 12 Jun 2019 17:18:59 -0700 Subject: [PATCH] mvcc: address comments --- mvcc/backend/backend.go | 8 +++++--- mvcc/backend/read_tx.go | 12 ++++++++---- mvcc/kvstore_txn.go | 1 + 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/mvcc/backend/backend.go b/mvcc/backend/backend.go index 8d20db4b9058..bffd74950b46 100644 --- a/mvcc/backend/backend.go +++ b/mvcc/backend/backend.go @@ -49,6 +49,7 @@ var ( ) type Backend interface { + // ReadTx returns a read transaction. It is replaced by ConcurrentReadTx in the main data path, see #10523. ReadTx() ReadTx BatchTx() BatchTx // ConcurrentReadTx returns a non-blocking read transaction. @@ -200,7 +201,7 @@ func (b *backend) ReadTx() ReadTx { return b.readTx } func (b *backend) ConcurrentReadTx() ReadTx { b.readTx.RLock() defer b.readTx.RUnlock() - // prevent boltdb read Tx from been rolled back until store read Tx is done. + // prevent boltdb read Tx from been rolled back until store read Tx is done. Needs to be called when holding readTx.RLock(). b.readTx.txWg.Add(1) // TODO: might want to copy the read buffer lazily - create copy when A) end of a write transaction B) end of a batch interval. return &concurrentReadTx{ @@ -516,9 +517,10 @@ func (b *backend) begin(write bool) *bolt.Tx { size := tx.Size() db := tx.DB() + stats := db.Stats() atomic.StoreInt64(&b.size, size) - atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize))) - atomic.StoreInt64(&b.openReadTxN, int64(db.Stats().OpenTxN)) + atomic.StoreInt64(&b.sizeInUse, size-(int64(stats.FreePageN)*int64(db.Info().PageSize))) + atomic.StoreInt64(&b.openReadTxN, int64(stats.OpenTxN)) return tx } diff --git a/mvcc/backend/read_tx.go b/mvcc/backend/read_tx.go index b0473200e208..91fe72ec5589 100644 --- a/mvcc/backend/read_tx.go +++ b/mvcc/backend/read_tx.go @@ -132,13 +132,17 @@ type concurrentReadTx struct { buf txReadBuffer txMu *sync.RWMutex tx *bolt.Tx - buckets map[string]*bolt.Bucket // note: A map value is a pointer + buckets map[string]*bolt.Bucket txWg *sync.WaitGroup } -func (rt *concurrentReadTx) Lock() {} -func (rt *concurrentReadTx) Unlock() {} -func (rt *concurrentReadTx) RLock() {} +func (rt *concurrentReadTx) Lock() {} +func (rt *concurrentReadTx) Unlock() {} + +// RLock is no-op. concurrentReadTx does not need to be locked after it is created. +func (rt *concurrentReadTx) RLock() {} + +// RUnlock signals the end of concurrentReadTx. func (rt *concurrentReadTx) RUnlock() { rt.txWg.Done() } func (rt *concurrentReadTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error { diff --git a/mvcc/kvstore_txn.go b/mvcc/kvstore_txn.go index 719e728fa948..abc1de1db3b5 100644 --- a/mvcc/kvstore_txn.go +++ b/mvcc/kvstore_txn.go @@ -35,6 +35,7 @@ func (s *store) Read() TxnRead { // backend holds b.readTx.RLock() only when creating the concurrentReadTx. After // ConcurrentReadTx is created, it will not block write transaction. tx := s.b.ConcurrentReadTx() + tx.RLock() firstRev, rev := s.compactMainRev, s.currentRev s.revMu.RUnlock() return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev})