Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mvcc: fully concurrent read #10523

Merged
merged 5 commits into from
Jun 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 32 additions & 14 deletions integration/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package integration

import (
"context"
"fmt"
"net/http"
"strconv"
"testing"
Expand Down Expand Up @@ -103,22 +104,39 @@ func testMetricDbSizeDefrag(t *testing.T, name string) {
t.Fatal(kerr)
}

// Put to move PendingPages to FreePages
if _, err = kvc.Put(context.TODO(), putreq); err != nil {
t.Fatal(err)
}
time.Sleep(500 * time.Millisecond)
validateAfterCompactionInUse := func() error {
// Put to move PendingPages to FreePages
if _, err = kvc.Put(context.TODO(), putreq); err != nil {
t.Fatal(err)
}
time.Sleep(500 * time.Millisecond)

afterCompactionInUse, err := clus.Members[0].Metric("etcd_mvcc_db_total_size_in_use_in_bytes")
if err != nil {
t.Fatal(err)
}
aciu, err := strconv.Atoi(afterCompactionInUse)
if err != nil {
t.Fatal(err)
afterCompactionInUse, err := clus.Members[0].Metric("etcd_mvcc_db_total_size_in_use_in_bytes")
if err != nil {
t.Fatal(err)
}
aciu, err := strconv.Atoi(afterCompactionInUse)
if err != nil {
t.Fatal(err)
}
if biu <= aciu {
return fmt.Errorf("expected less than %d, got %d after compaction", biu, aciu)
}
return nil
}
if biu <= aciu {
t.Fatalf("expected less than %d, got %d after compaction", biu, aciu)

// backend rollbacks read transaction asynchronously (PR #10523),
// which causes the result to be flaky. Retry 3 times.
maxRetry, retry := 3, 0
for {
err := validateAfterCompactionInUse()
if err == nil {
break
}
retry++
if retry >= maxRetry {
t.Fatalf(err.Error())
}
}

// defrag should give freed space back to fs
Expand Down
34 changes: 33 additions & 1 deletion mvcc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,11 @@ var (
)

type Backend interface {
// ReadTx returns a read transaction. It is replaced by ConcurrentReadTx in the main data path, see #10523.
ReadTx() ReadTx
jpbetz marked this conversation as resolved.
Show resolved Hide resolved
BatchTx() BatchTx
// ConcurrentReadTx returns a non-blocking read transaction.
ConcurrentReadTx() ReadTx

Snapshot() Snapshot
Hash(ignores map[IgnoreKey]struct{}) (uint32, error)
Expand All @@ -63,6 +66,8 @@ type Backend interface {
// Since the backend can manage free space in a non-byte unit such as
// number of pages, the returned value can be not exactly accurate in bytes.
SizeInUse() int64
// OpenReadTxN returns the number of currently open read transactions in the backend.
OpenReadTxN() int64
Defrag() error
ForceCommit()
Close() error
Expand All @@ -87,6 +92,8 @@ type backend struct {
sizeInUse int64
// commits counts number of commits since start
commits int64
// openReadTxN is the number of currently open read transactions in the backend
openReadTxN int64

mu sync.RWMutex
db *bolt.DB
Expand Down Expand Up @@ -166,6 +173,7 @@ func newBackend(bcfg BackendConfig) *backend {
txBuffer: txBuffer{make(map[string]*bucketBuffer)},
},
buckets: make(map[string]*bolt.Bucket),
txWg: new(sync.WaitGroup),
},

stopc: make(chan struct{}),
Expand All @@ -187,6 +195,24 @@ func (b *backend) BatchTx() BatchTx {

func (b *backend) ReadTx() ReadTx { return b.readTx }

// ConcurrentReadTx creates and returns a new ReadTx, which:
// A) creates and keeps a copy of backend.readTx.txReadBuffer,
// B) references the boltdb read Tx (and its bucket cache) of current batch interval.
func (b *backend) ConcurrentReadTx() ReadTx {
b.readTx.RLock()
defer b.readTx.RUnlock()
// prevent boltdb read Tx from been rolled back until store read Tx is done. Needs to be called when holding readTx.RLock().
b.readTx.txWg.Add(1)
// TODO: might want to copy the read buffer lazily - create copy when A) end of a write transaction B) end of a batch interval.
return &concurrentReadTx{
buf: b.readTx.buf.unsafeCopy(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i remember the design doc mentioned that this is lazy copy. the first read wont need to do the copy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making a copy at the beginning is just a natural way of preserving the correct view [1] throughout the life cycle of a read transaction. I think lazy copy could be a nice follow up improvement, especially if copying the buffer is time or memory consuming. So far in my benchmark tests I do not feel like this is an issue.

[1] By correct view, I mean the read buffer's state at the beginning of the read transaction, combined with the view of boltdb Tx opened at the beginning of the current batch interval.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. can you add a todo or something? just in case we want to do an optimization in future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I will add a TODO.

tx: b.readTx.tx,
txMu: &b.readTx.txMu,
buckets: b.readTx.buckets,
txWg: b.readTx.txWg,
}
}

// ForceCommit forces the current batching tx to commit.
func (b *backend) ForceCommit() {
b.batchTx.Commit()
Expand Down Expand Up @@ -491,8 +517,10 @@ func (b *backend) begin(write bool) *bolt.Tx {

size := tx.Size()
db := tx.DB()
stats := db.Stats()
atomic.StoreInt64(&b.size, size)
atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))
atomic.StoreInt64(&b.sizeInUse, size-(int64(stats.FreePageN)*int64(db.Info().PageSize)))
atomic.StoreInt64(&b.openReadTxN, int64(stats.OpenTxN))

return tx
}
Expand All @@ -509,6 +537,10 @@ func (b *backend) unsafeBegin(write bool) *bolt.Tx {
return tx
}

func (b *backend) OpenReadTxN() int64 {
return atomic.LoadInt64(&b.openReadTxN)
}

// NewTmpBackend creates a backend implementation for testing.
func NewTmpBackend(batchInterval time.Duration, batchLimit int) (*backend, string) {
dir, err := ioutil.TempDir(os.TempDir(), "etcd_backend_test")
Expand Down
29 changes: 29 additions & 0 deletions mvcc/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,35 @@ func TestBackendWriteback(t *testing.T) {
}
}

// TestConcurrentReadTx ensures that current read transaction can see all prior writes stored in read buffer
func TestConcurrentReadTx(t *testing.T) {
b, tmpPath := NewTmpBackend(time.Hour, 10000)
defer cleanup(b, tmpPath)

wtx1 := b.BatchTx()
wtx1.Lock()
wtx1.UnsafeCreateBucket([]byte("key"))
wtx1.UnsafePut([]byte("key"), []byte("abc"), []byte("ABC"))
wtx1.UnsafePut([]byte("key"), []byte("overwrite"), []byte("1"))
wtx1.Unlock()

wtx2 := b.BatchTx()
wtx2.Lock()
wtx2.UnsafePut([]byte("key"), []byte("def"), []byte("DEF"))
wtx2.UnsafePut([]byte("key"), []byte("overwrite"), []byte("2"))
wtx2.Unlock()

rtx := b.ConcurrentReadTx()
rtx.RLock() // no-op
k, v := rtx.UnsafeRange([]byte("key"), []byte("abc"), []byte("\xff"), 0)
rtx.RUnlock()
wKey := [][]byte{[]byte("abc"), []byte("def"), []byte("overwrite")}
wVal := [][]byte{[]byte("ABC"), []byte("DEF"), []byte("2")}
if !reflect.DeepEqual(wKey, k) || !reflect.DeepEqual(wVal, v) {
t.Errorf("want k=%+v, v=%+v; got k=%+v, v=%+v", wKey, wVal, k, v)
}
}

// TestBackendWritebackForEach checks that partially written / buffered
// data is visited in the same order as fully committed data.
func TestBackendWritebackForEach(t *testing.T) {
Expand Down
17 changes: 11 additions & 6 deletions mvcc/backend/batch_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,13 +306,18 @@ func (t *batchTxBuffered) commit(stop bool) {

func (t *batchTxBuffered) unsafeCommit(stop bool) {
if t.backend.readTx.tx != nil {
if err := t.backend.readTx.tx.Rollback(); err != nil {
if t.backend.lg != nil {
t.backend.lg.Fatal("failed to rollback tx", zap.Error(err))
} else {
plog.Fatalf("cannot rollback tx (%s)", err)
// wait all store read transactions using the current boltdb tx to finish,
// then close the boltdb tx
go func(tx *bolt.Tx, wg *sync.WaitGroup) {
wg.Wait()
if err := tx.Rollback(); err != nil {
if t.backend.lg != nil {
t.backend.lg.Fatal("failed to rollback tx", zap.Error(err))
} else {
plog.Fatalf("cannot rollback tx (%s)", err)
}
}
}
}(t.backend.readTx.tx, t.backend.readTx.txWg)
t.backend.readTx.reset()
}

Expand Down
106 changes: 96 additions & 10 deletions mvcc/backend/read_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,13 @@ type readTx struct {
mu sync.RWMutex
buf txReadBuffer

// txmu protects accesses to buckets and tx on Range requests.
txmu sync.RWMutex
// TODO: group and encapsulate {txMu, tx, buckets, txWg}, as they share the same lifecycle.
// txMu protects accesses to buckets and tx on Range requests.
jingyih marked this conversation as resolved.
Show resolved Hide resolved
txMu sync.RWMutex
tx *bolt.Tx
buckets map[string]*bolt.Bucket
// txWg protects tx from being rolled back at the end of a batch interval until all reads using this tx are done.
txWg *sync.WaitGroup
}

func (rt *readTx) Lock() { rt.mu.Lock() }
Expand All @@ -71,23 +74,23 @@ func (rt *readTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]

// find/cache bucket
bn := string(bucketName)
rt.txmu.RLock()
rt.txMu.RLock()
bucket, ok := rt.buckets[bn]
rt.txmu.RUnlock()
rt.txMu.RUnlock()
if !ok {
rt.txmu.Lock()
rt.txMu.Lock()
bucket = rt.tx.Bucket(bucketName)
rt.buckets[bn] = bucket
rt.txmu.Unlock()
rt.txMu.Unlock()
}

// ignore missing bucket since may have been created in this batch
if bucket == nil {
return keys, vals
}
rt.txmu.Lock()
rt.txMu.Lock()
c := bucket.Cursor()
rt.txmu.Unlock()
rt.txMu.Unlock()

k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
return append(k2, keys...), append(v2, vals...)
Expand All @@ -108,9 +111,9 @@ func (rt *readTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) err
if err := rt.buf.ForEach(bucketName, getDups); err != nil {
return err
}
rt.txmu.Lock()
rt.txMu.Lock()
err := unsafeForEach(rt.tx, bucketName, visitNoDup)
rt.txmu.Unlock()
rt.txMu.Unlock()
if err != nil {
return err
}
Expand All @@ -121,4 +124,87 @@ func (rt *readTx) reset() {
rt.buf.reset()
rt.buckets = make(map[string]*bolt.Bucket)
rt.tx = nil
rt.txWg = new(sync.WaitGroup)
}

// TODO: create a base type for readTx and concurrentReadTx to avoid duplicated function implementation?
jingyih marked this conversation as resolved.
Show resolved Hide resolved
type concurrentReadTx struct {
buf txReadBuffer
txMu *sync.RWMutex
tx *bolt.Tx
buckets map[string]*bolt.Bucket
txWg *sync.WaitGroup
}

func (rt *concurrentReadTx) Lock() {}
func (rt *concurrentReadTx) Unlock() {}

// RLock is no-op. concurrentReadTx does not need to be locked after it is created.
func (rt *concurrentReadTx) RLock() {}

// RUnlock signals the end of concurrentReadTx.
func (rt *concurrentReadTx) RUnlock() { rt.txWg.Done() }

func (rt *concurrentReadTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
dups := make(map[string]struct{})
getDups := func(k, v []byte) error {
dups[string(k)] = struct{}{}
return nil
}
visitNoDup := func(k, v []byte) error {
if _, ok := dups[string(k)]; ok {
return nil
}
return visitor(k, v)
}
if err := rt.buf.ForEach(bucketName, getDups); err != nil {
return err
}
rt.txMu.Lock()
err := unsafeForEach(rt.tx, bucketName, visitNoDup)
rt.txMu.Unlock()
if err != nil {
return err
}
return rt.buf.ForEach(bucketName, visitor)
}

func (rt *concurrentReadTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
if endKey == nil {
// forbid duplicates for single keys
limit = 1
}
if limit <= 0 {
limit = math.MaxInt64
}
if limit > 1 && !bytes.Equal(bucketName, safeRangeBucket) {
panic("do not use unsafeRange on non-keys bucket")
}
keys, vals := rt.buf.Range(bucketName, key, endKey, limit)
if int64(len(keys)) == limit {
return keys, vals
}

// find/cache bucket
bn := string(bucketName)
rt.txMu.RLock()
bucket, ok := rt.buckets[bn]
rt.txMu.RUnlock()
if !ok {
rt.txMu.Lock()
bucket = rt.tx.Bucket(bucketName)
rt.buckets[bn] = bucket
rt.txMu.Unlock()
}

// ignore missing bucket since may have been created in this batch
if bucket == nil {
return keys, vals
}
rt.txMu.Lock()
c := bucket.Cursor()
rt.txMu.Unlock()

k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
return append(k2, keys...), append(v2, vals...)
}
22 changes: 22 additions & 0 deletions mvcc/backend/tx_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,19 @@ func (txr *txReadBuffer) ForEach(bucketName []byte, visitor func(k, v []byte) er
return nil
}

// unsafeCopy returns a copy of txReadBuffer, caller should acquire backend.readTx.RLock()
func (txr *txReadBuffer) unsafeCopy() txReadBuffer {
txrCopy := txReadBuffer{
txBuffer: txBuffer{
buckets: make(map[string]*bucketBuffer, len(txr.txBuffer.buckets)),
},
}
for bucketName, bucket := range txr.txBuffer.buckets {
txrCopy.txBuffer.buckets[bucketName] = bucket.Copy()
}
return txrCopy
}

type kv struct {
key []byte
val []byte
Expand Down Expand Up @@ -179,3 +192,12 @@ func (bb *bucketBuffer) Less(i, j int) bool {
return bytes.Compare(bb.buf[i].key, bb.buf[j].key) < 0
}
func (bb *bucketBuffer) Swap(i, j int) { bb.buf[i], bb.buf[j] = bb.buf[j], bb.buf[i] }

func (bb *bucketBuffer) Copy() *bucketBuffer {
bbCopy := bucketBuffer{
buf: make([]kv, len(bb.buf)),
used: bb.used,
}
copy(bbCopy.buf, bb.buf)
return &bbCopy
}
3 changes: 3 additions & 0 deletions mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,9 @@ func (s *store) restore() error {
reportDbTotalSizeInUseInBytesMu.Lock()
reportDbTotalSizeInUseInBytes = func() float64 { return float64(b.SizeInUse()) }
reportDbTotalSizeInUseInBytesMu.Unlock()
reportDbOpenReadTxNMu.Lock()
reportDbOpenReadTxN = func() float64 { return float64(b.OpenReadTxN()) }
reportDbOpenReadTxNMu.Unlock()

min, max := newRevBytes(), newRevBytes()
revToBytes(revision{main: 1}, min)
Expand Down
Loading