From 262c51f7f2643ec08f2a476b0005a30e2c21c2f5 Mon Sep 17 00:00:00 2001 From: Ng Wei Han <47109095+weiihann@users.noreply.github.com> Date: Thu, 3 Oct 2024 17:47:46 +0800 Subject: [PATCH] Refactor transaction into batch and snapshot (#2182) --- db/db.go | 3 - db/pebble/batch.go | 105 +++++++++++++++++++++++++++++ db/pebble/common.go | 34 ++++++++++ db/pebble/db.go | 16 ++--- db/pebble/db_test.go | 2 +- db/pebble/snapshot.go | 75 +++++++++++++++++++++ db/pebble/transaction.go | 138 --------------------------------------- 7 files changed, 223 insertions(+), 150 deletions(-) create mode 100644 db/pebble/batch.go create mode 100644 db/pebble/common.go create mode 100644 db/pebble/snapshot.go delete mode 100644 db/pebble/transaction.go diff --git a/db/db.go b/db/db.go index 3fc16526d1..6475dc5e19 100644 --- a/db/db.go +++ b/db/db.go @@ -77,9 +77,6 @@ type Transaction interface { // Get fetches the value for the given key, should return ErrKeyNotFound if key is not present // Caller should not assume that the slice would stay valid after the call to cb Get(key []byte, cb func([]byte) error) error - - // Impl returns the underlying transaction object - Impl() any } // View : see db.DB.View diff --git a/db/pebble/batch.go b/db/pebble/batch.go new file mode 100644 index 0000000000..60747d6a86 --- /dev/null +++ b/db/pebble/batch.go @@ -0,0 +1,105 @@ +package pebble + +import ( + "errors" + "sync" + "time" + + "github.com/NethermindEth/juno/db" + "github.com/NethermindEth/juno/utils" + "github.com/cockroachdb/pebble" +) + +var _ db.Transaction = (*batch)(nil) + +type batch struct { + batch *pebble.Batch + lock *sync.Mutex + listener db.EventListener +} + +func NewBatch(dbBatch *pebble.Batch, lock *sync.Mutex, listener db.EventListener) *batch { + return &batch{ + batch: dbBatch, + lock: lock, + listener: listener, + } +} + +// Discard : see db.Transaction.Discard +func (b *batch) Discard() error { + if b.batch == nil { + return nil + } + + err := b.batch.Close() + b.batch = nil + b.lock.Unlock() + b.lock = nil + + return err +} + +// Commit : see db.Transaction.Commit +func (b *batch) Commit() error { + if b.batch == nil { + return ErrDiscardedTransaction + } + + start := time.Now() + defer func() { b.listener.OnCommit(time.Since(start)) }() + return utils.RunAndWrapOnError(b.Discard, b.batch.Commit(pebble.Sync)) +} + +// Set : see db.Transaction.Set +func (b *batch) Set(key, val []byte) error { + start := time.Now() + if len(key) == 0 { + return errors.New("empty key") + } + + if b.batch == nil { + return ErrDiscardedTransaction + } + + defer func() { b.listener.OnIO(true, time.Since(start)) }() + + return b.batch.Set(key, val, pebble.Sync) +} + +// Delete : see db.Transaction.Delete +func (b *batch) Delete(key []byte) error { + if b.batch == nil { + return ErrDiscardedTransaction + } + + start := time.Now() + defer func() { b.listener.OnIO(true, time.Since(start)) }() + + return b.batch.Delete(key, pebble.Sync) +} + +// Get : see db.Transaction.Get +func (b *batch) Get(key []byte, cb func([]byte) error) error { + if b.batch == nil { + return ErrDiscardedTransaction + } + return get(b.batch, key, cb, b.listener) +} + +// NewIterator : see db.Transaction.NewIterator +func (b *batch) NewIterator() (db.Iterator, error) { + var iter *pebble.Iterator + var err error + + if b.batch == nil { + return nil, ErrDiscardedTransaction + } + + iter, err = b.batch.NewIter(nil) + if err != nil { + return nil, err + } + + return &iterator{iter: iter}, nil +} diff --git a/db/pebble/common.go b/db/pebble/common.go new file mode 100644 index 0000000000..a67c905acd --- /dev/null +++ b/db/pebble/common.go @@ -0,0 +1,34 @@ +package pebble + +import ( + "errors" + "io" + "time" + + "github.com/NethermindEth/juno/db" + "github.com/NethermindEth/juno/utils" + "github.com/cockroachdb/pebble" +) + +type getter interface { + Get([]byte) ([]byte, io.Closer, error) +} + +func get(g getter, key []byte, cb func([]byte) error, listener db.EventListener) error { + start := time.Now() + var val []byte + var closer io.Closer + + val, closer, err := g.Get(key) + + // We need it evaluated immediately so the duration doesn't include the runtime of the user callback that we call below. + defer listener.OnIO(false, time.Since(start)) //nolint:govet + if err != nil { + if errors.Is(err, pebble.ErrNotFound) { + return db.ErrKeyNotFound + } + return err + } + + return utils.RunAndWrapOnError(closer.Close, cb(val)) +} diff --git a/db/pebble/db.go b/db/pebble/db.go index 6592a602f0..5974edf720 100644 --- a/db/pebble/db.go +++ b/db/pebble/db.go @@ -2,6 +2,7 @@ package pebble import ( "context" + "errors" "fmt" "sync" "testing" @@ -18,6 +19,10 @@ const ( minCacheSizeMB = 8 ) +var ( + ErrDiscardedTransaction = errors.New("discarded transaction") + ErrReadOnlyTransaction = errors.New("read-only transaction") +) var _ db.DB = (*DB)(nil) type DB struct { @@ -83,19 +88,14 @@ func (d *DB) WithListener(listener db.EventListener) db.DB { } // NewTransaction : see db.DB.NewTransaction +// Batch is used for read-write operations, while snapshot is used for read-only operations func (d *DB) NewTransaction(update bool) (db.Transaction, error) { - txn := &Transaction{ - listener: d.listener, - } if update { d.wMutex.Lock() - txn.lock = d.wMutex - txn.batch = d.pebble.NewIndexedBatch() - } else { - txn.snapshot = d.pebble.NewSnapshot() + return NewBatch(d.pebble.NewIndexedBatch(), d.wMutex, d.listener), nil } - return txn, nil + return NewSnapshot(d.pebble.NewSnapshot(), d.listener), nil } // Close : see io.Closer.Close diff --git a/db/pebble/db_test.go b/db/pebble/db_test.go index c239764cee..896d5fac5c 100644 --- a/db/pebble/db_test.go +++ b/db/pebble/db_test.go @@ -408,7 +408,7 @@ func TestPanic(t *testing.T) { require.ErrorIs(t, testDB.View(func(txn db.Transaction) error { return txn.Get([]byte{0}, func(b []byte) error { return nil }) }), db.ErrKeyNotFound) - require.EqualError(t, panicingTxn.Get([]byte{0}, func(b []byte) error { return nil }), "discarded txn") + require.EqualError(t, panicingTxn.Get([]byte{0}, func(b []byte) error { return nil }), "discarded transaction") }() require.NoError(t, testDB.Update(func(txn db.Transaction) error { diff --git a/db/pebble/snapshot.go b/db/pebble/snapshot.go new file mode 100644 index 0000000000..b8ce545e3e --- /dev/null +++ b/db/pebble/snapshot.go @@ -0,0 +1,75 @@ +package pebble + +import ( + "github.com/NethermindEth/juno/db" + "github.com/cockroachdb/pebble" +) + +var _ db.Transaction = (*snapshot)(nil) + +type snapshot struct { + snapshot *pebble.Snapshot + listener db.EventListener +} + +func NewSnapshot(dbSnapshot *pebble.Snapshot, listener db.EventListener) *snapshot { + return &snapshot{ + snapshot: dbSnapshot, + listener: listener, + } +} + +// Discard : see db.Transaction.Discard +func (s *snapshot) Discard() error { + if s.snapshot == nil { + return nil + } + + if err := s.snapshot.Close(); err != nil { + return err + } + + s.snapshot = nil + + return nil +} + +// Commit : see db.Transaction.Commit +func (s *snapshot) Commit() error { + return ErrReadOnlyTransaction +} + +// Set : see db.Transaction.Set +func (s *snapshot) Set(key, val []byte) error { + return ErrReadOnlyTransaction +} + +// Delete : see db.Transaction.Delete +func (s *snapshot) Delete(key []byte) error { + return ErrReadOnlyTransaction +} + +// Get : see db.Transaction.Get +func (s *snapshot) Get(key []byte, cb func([]byte) error) error { + if s.snapshot == nil { + return ErrDiscardedTransaction + } + return get(s.snapshot, key, cb, s.listener) +} + +// NewIterator : see db.Transaction.NewIterator +func (s *snapshot) NewIterator() (db.Iterator, error) { + var iter *pebble.Iterator + var err error + + if s.snapshot == nil { + return nil, ErrDiscardedTransaction + } + + iter, err = s.snapshot.NewIter(nil) + if err != nil { + return nil, err + } + + return &iterator{iter: iter}, nil +} diff --git a/db/pebble/transaction.go b/db/pebble/transaction.go deleted file mode 100644 index e1b74e089b..0000000000 --- a/db/pebble/transaction.go +++ /dev/null @@ -1,138 +0,0 @@ -package pebble - -import ( - "errors" - "io" - "sync" - "time" - - "github.com/NethermindEth/juno/db" - "github.com/NethermindEth/juno/utils" - "github.com/cockroachdb/pebble" -) - -var ErrDiscardedTransaction = errors.New("discarded txn") - -var _ db.Transaction = (*Transaction)(nil) - -type Transaction struct { - batch *pebble.Batch - snapshot *pebble.Snapshot - lock *sync.Mutex - listener db.EventListener -} - -// Discard : see db.Transaction.Discard -func (t *Transaction) Discard() error { - if t.batch != nil { - if err := t.batch.Close(); err != nil { - return err - } - t.batch = nil - } - if t.snapshot != nil { - if err := t.snapshot.Close(); err != nil { - return err - } - t.snapshot = nil - } - - if t.lock != nil { - t.lock.Unlock() - t.lock = nil - } - return nil -} - -// Commit : see db.Transaction.Commit -func (t *Transaction) Commit() error { - start := time.Now() - defer func() { t.listener.OnCommit(time.Since(start)) }() - if t.batch != nil { - return utils.RunAndWrapOnError(t.Discard, t.batch.Commit(pebble.Sync)) - } - return utils.RunAndWrapOnError(t.Discard, ErrDiscardedTransaction) -} - -// Set : see db.Transaction.Set -func (t *Transaction) Set(key, val []byte) error { - start := time.Now() - if t.batch == nil { - return errors.New("read only transaction") - } - if len(key) == 0 { - return errors.New("empty key") - } - - defer func() { t.listener.OnIO(true, time.Since(start)) }() - return t.batch.Set(key, val, pebble.Sync) -} - -// Delete : see db.Transaction.Delete -func (t *Transaction) Delete(key []byte) error { - start := time.Now() - if t.batch == nil { - return errors.New("read only transaction") - } - - defer func() { t.listener.OnIO(true, time.Since(start)) }() - return t.batch.Delete(key, pebble.Sync) -} - -// Get : see db.Transaction.Get -func (t *Transaction) Get(key []byte, cb func([]byte) error) error { - start := time.Now() - var val []byte - var closer io.Closer - - var err error - if t.batch != nil { - val, closer, err = t.batch.Get(key) - } else if t.snapshot != nil { - val, closer, err = t.snapshot.Get(key) - } else { - return ErrDiscardedTransaction - } - - // We need it evaluated immediately so the duration doesn't include the runtime of the user callback that we call below. - defer t.listener.OnIO(false, time.Since(start)) //nolint:govet - if err != nil { - if errors.Is(err, pebble.ErrNotFound) { - return db.ErrKeyNotFound - } - - return err - } - return utils.RunAndWrapOnError(closer.Close, cb(val)) -} - -// Impl : see db.Transaction.Impl -func (t *Transaction) Impl() any { - if t.batch != nil { - return t.batch - } - - if t.snapshot != nil { - return t.snapshot - } - return nil -} - -// NewIterator : see db.Transaction.NewIterator -func (t *Transaction) NewIterator() (db.Iterator, error) { - var iter *pebble.Iterator - var err error - if t.batch != nil { - iter, err = t.batch.NewIter(nil) - } else if t.snapshot != nil { - iter, err = t.snapshot.NewIter(nil) - } else { - return nil, ErrDiscardedTransaction - } - - if err != nil { - return nil, err - } - - return &iterator{iter: iter}, nil -}