diff --git a/internal/mvcc/backend/backend.go b/internal/mvcc/backend/backend.go index cde61cd1aa4f..03e105b52318 100644 --- a/internal/mvcc/backend/backend.go +++ b/internal/mvcc/backend/backend.go @@ -271,7 +271,9 @@ func (b *backend) run() { b.batchTx.CommitAndStop() return } - b.batchTx.Commit() + if b.batchTx.pending != 0 { + b.batchTx.Commit() + } t.Reset(b.batchInterval) } } @@ -293,9 +295,6 @@ func (b *backend) Defrag() error { return err } - // commit to update metadata like db.size - b.batchTx.Commit() - return nil } @@ -310,11 +309,7 @@ func (b *backend) defrag() error { b.mu.Lock() defer b.mu.Unlock() - // block concurrent read requests while resetting tx - b.readTx.mu.Lock() - defer b.readTx.mu.Unlock() - - b.batchTx.unsafeCommit(true) + b.batchTx.commit(true) b.batchTx.tx = nil tmpdb, err := bolt.Open(b.db.Path()+".tmp", 0600, boltOpenOptions) diff --git a/internal/mvcc/backend/batch_tx.go b/internal/mvcc/backend/batch_tx.go index e7307bdca131..b1669301259c 100644 --- a/internal/mvcc/backend/batch_tx.go +++ b/internal/mvcc/backend/batch_tx.go @@ -98,6 +98,7 @@ func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte isMatch = func(b []byte) bool { return bytes.Equal(b, key) } limit = 1 } + for ck, cv := c.Seek(key); ck != nil && isMatch(ck); ck, cv = c.Next() { vs = append(vs, cv) keys = append(keys, ck) @@ -158,17 +159,6 @@ func (t *batchTx) commit(stop bool) { // commit the last tx if t.tx != nil { if t.pending == 0 && !stop { - t.backend.mu.RLock() - defer t.backend.mu.RUnlock() - - // t.tx.DB()==nil if 'CommitAndStop' calls 'batchTx.commit(true)', - // which initializes *bolt.Tx.db and *bolt.Tx.meta as nil; panics t.tx.Size(). - // Server must make sure 'batchTx.commit(false)' does not follow - // 'batchTx.commit(true)' (e.g. stopping backend, and inflight Hash call). - size := t.tx.Size() - db := t.tx.DB() - atomic.StoreInt64(&t.backend.size, size) - atomic.StoreInt64(&t.backend.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize))) return }