Skip to content

Commit

Permalink
mvcc: address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jingyih committed Jun 14, 2019
1 parent 2a9320e commit 55066eb
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 20 deletions.
8 changes: 5 additions & 3 deletions mvcc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ var (
)

type Backend interface {
// ReadTx returns a read transaction. It is replaced by ConcurrentReadTx in the main data path, see #10523.
ReadTx() ReadTx
BatchTx() BatchTx
// ConcurrentReadTx returns a non-blocking read transaction.
Expand Down Expand Up @@ -200,7 +201,7 @@ func (b *backend) ReadTx() ReadTx { return b.readTx }
func (b *backend) ConcurrentReadTx() ReadTx {
b.readTx.RLock()
defer b.readTx.RUnlock()
// prevent boltdb read Tx from been rolled back until store read Tx is done.
// prevent boltdb read Tx from been rolled back until store read Tx is done. Needs to be called when holding readTx.RLock().
b.readTx.txWg.Add(1)
// TODO: might want to copy the read buffer lazily - create copy when A) end of a write transaction B) end of a batch interval.
return &concurrentReadTx{
Expand Down Expand Up @@ -516,9 +517,10 @@ func (b *backend) begin(write bool) *bolt.Tx {

size := tx.Size()
db := tx.DB()
stats := db.Stats()
atomic.StoreInt64(&b.size, size)
atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))
atomic.StoreInt64(&b.openReadTxN, int64(db.Stats().OpenTxN))
atomic.StoreInt64(&b.sizeInUse, size-(int64(stats.FreePageN)*int64(db.Info().PageSize)))
atomic.StoreInt64(&b.openReadTxN, int64(stats.OpenTxN))

return tx
}
Expand Down
22 changes: 10 additions & 12 deletions mvcc/backend/batch_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,16 @@ func (t *batchTxBuffered) unsafeCommit(stop bool) {
if t.backend.readTx.tx != nil {
// wait all store read transactions using the current boltdb tx to finish,
// then close the boltdb tx
go waitAndRollback(t.backend.readTx.tx, t.backend.readTx.txWg, t.backend.lg)
go func(tx *bolt.Tx, wg *sync.WaitGroup) {
wg.Wait()
if err := tx.Rollback(); err != nil {
if t.backend.lg != nil {
t.backend.lg.Fatal("failed to rollback tx", zap.Error(err))
} else {
plog.Fatalf("cannot rollback tx (%s)", err)
}
}
}(t.backend.readTx.tx, t.backend.readTx.txWg)
t.backend.readTx.reset()
}

Expand All @@ -319,17 +328,6 @@ func (t *batchTxBuffered) unsafeCommit(stop bool) {
}
}

func waitAndRollback(tx *bolt.Tx, wg *sync.WaitGroup, lg *zap.Logger) {
wg.Wait()
if err := tx.Rollback(); err != nil {
if lg != nil {
lg.Fatal("failed to rollback tx", zap.Error(err))
} else {
plog.Fatalf("cannot rollback tx (%s)", err)
}
}
}

func (t *batchTxBuffered) UnsafePut(bucketName []byte, key []byte, value []byte) {
t.batchTx.UnsafePut(bucketName, key, value)
t.buf.put(bucketName, key, value)
Expand Down
12 changes: 8 additions & 4 deletions mvcc/backend/read_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,17 @@ type concurrentReadTx struct {
buf txReadBuffer
txMu *sync.RWMutex
tx *bolt.Tx
buckets map[string]*bolt.Bucket // note: A map value is a pointer
buckets map[string]*bolt.Bucket
txWg *sync.WaitGroup
}

func (rt *concurrentReadTx) Lock() {}
func (rt *concurrentReadTx) Unlock() {}
func (rt *concurrentReadTx) RLock() {}
func (rt *concurrentReadTx) Lock() {}
func (rt *concurrentReadTx) Unlock() {}

// RLock is no-op. concurrentReadTx does not need to be locked after it is created.
func (rt *concurrentReadTx) RLock() {}

// RUnlock signals the end of concurrentReadTx.
func (rt *concurrentReadTx) RUnlock() { rt.txWg.Done() }

func (rt *concurrentReadTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
Expand Down
3 changes: 2 additions & 1 deletion mvcc/kvstore_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func (s *store) Read() TxnRead {
// backend holds b.readTx.RLock() only when creating the concurrentReadTx. After
// ConcurrentReadTx is created, it will not block write transaction.
tx := s.b.ConcurrentReadTx()
tx.RLock() // RLock is no-op. concurrentReadTx does not need to be locked after it is created.
firstRev, rev := s.compactMainRev, s.currentRev
s.revMu.RUnlock()
return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev})
Expand All @@ -48,7 +49,7 @@ func (tr *storeTxnRead) Range(key, end []byte, ro RangeOptions) (r *RangeResult,
}

func (tr *storeTxnRead) End() {
tr.tx.RUnlock()
tr.tx.RUnlock() // RUnlock signals the end of concurrentReadTx.
tr.s.mu.RUnlock()
}

Expand Down

0 comments on commit 55066eb

Please sign in to comment.