From 8e8538bbaca510e7414ff0c968a1efc3a959efa1 Mon Sep 17 00:00:00 2001 From: Xiang Date: Wed, 7 Feb 2018 13:20:58 -0800 Subject: [PATCH] mvcc: allow large concurrent reads under light write workload --- internal/mvcc/backend/backend.go | 6 +----- internal/mvcc/backend/batch_tx.go | 17 +++++++++++------ internal/mvcc/kvstore_test.go | 2 ++ 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/internal/mvcc/backend/backend.go b/internal/mvcc/backend/backend.go index cde61cd1aa4f..74caa6b5a0b7 100644 --- a/internal/mvcc/backend/backend.go +++ b/internal/mvcc/backend/backend.go @@ -310,11 +310,7 @@ func (b *backend) defrag() error { b.mu.Lock() defer b.mu.Unlock() - // block concurrent read requests while resetting tx - b.readTx.mu.Lock() - defer b.readTx.mu.Unlock() - - b.batchTx.unsafeCommit(true) + b.batchTx.commit(true) b.batchTx.tx = nil tmpdb, err := bolt.Open(b.db.Path()+".tmp", 0600, boltOpenOptions) diff --git a/internal/mvcc/backend/batch_tx.go b/internal/mvcc/backend/batch_tx.go index e7307bdca131..5a2442c6daca 100644 --- a/internal/mvcc/backend/batch_tx.go +++ b/internal/mvcc/backend/batch_tx.go @@ -98,6 +98,7 @@ func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte isMatch = func(b []byte) bool { return bytes.Equal(b, key) } limit = 1 } + for ck, cv := c.Seek(key); ck != nil && isMatch(ck); ck, cv = c.Next() { vs = append(vs, cv) keys = append(keys, ck) @@ -154,7 +155,7 @@ func (t *batchTx) Unlock() { t.Mutex.Unlock() } -func (t *batchTx) commit(stop bool) { +func (t *batchTx) commit(stop bool) bool { // commit the last tx if t.tx != nil { if t.pending == 0 && !stop { @@ -169,7 +170,7 @@ func (t *batchTx) commit(stop bool) { db := t.tx.DB() atomic.StoreInt64(&t.backend.size, size) atomic.StoreInt64(&t.backend.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize))) - return + return false } start := time.Now() @@ -187,6 +188,7 @@ func (t *batchTx) commit(stop bool) { if !stop { t.tx = t.backend.begin(true) } + return true } type batchTxBuffered struct { @@ -231,13 +233,18 @@ func (t *batchTxBuffered) CommitAndStop() { } func (t *batchTxBuffered) commit(stop bool) { + flushed := t.batchTx.commit(stop) + if !flushed { + return + } + // all read txs must be closed to acquire boltdb commit rwlock t.backend.readTx.mu.Lock() defer t.backend.readTx.mu.Unlock() - t.unsafeCommit(stop) + t.unsafeReadTxCommit(stop) } -func (t *batchTxBuffered) unsafeCommit(stop bool) { +func (t *batchTxBuffered) unsafeReadTxCommit(stop bool) { if t.backend.readTx.tx != nil { if err := t.backend.readTx.tx.Rollback(); err != nil { plog.Fatalf("cannot rollback tx (%s)", err) @@ -245,8 +252,6 @@ func (t *batchTxBuffered) unsafeCommit(stop bool) { t.backend.readTx.reset() } - t.batchTx.commit(stop) - if !stop { t.backend.readTx.tx = t.backend.begin(false) } diff --git a/internal/mvcc/kvstore_test.go b/internal/mvcc/kvstore_test.go index a526b603af75..1d5a61bcfaf2 100644 --- a/internal/mvcc/kvstore_test.go +++ b/internal/mvcc/kvstore_test.go @@ -638,6 +638,8 @@ func TestTxnBlockBackendForceCommit(t *testing.T) { s := NewStore(b, &lease.FakeLessor{}, nil) defer os.Remove(tmpPath) + // Put a key into the store so that force commit can take effect. + s.Put([]byte("foo"), []byte("bar"), lease.NoLease) txn := s.Read() done := make(chan struct{})