Skip to content

Commit

Permalink
mvcc: Clone the key index BTree to traverse for compaction and lock t…
Browse files Browse the repository at this point in the history
…he original tree on each KeyItem operation, so as to not hold the lock for

a long time. This allows read/write throughput by not blocking when the index tree is large (greater than 1M entries).
  • Loading branch information
braintreeps committed Apr 17, 2018
1 parent 46e19d2 commit eb74545
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 27 deletions.
43 changes: 18 additions & 25 deletions mvcc/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package mvcc
import (
"sort"
"sync"

"github.com/google/btree"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -185,27 +184,32 @@ func (ti *treeIndex) RangeSince(key, end []byte, rev int64) []revision {

func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
available := make(map[revision]struct{})
var emptyki []*keyIndex
if ti.lg != nil {
ti.lg.Info("compact tree index", zap.Int64("revision", rev))
} else {
plog.Printf("store.index: compact %d", rev)
}
// TODO: do not hold the lock for long time?
// This is probably OK. Compacting 10M keys takes O(10ms).
ti.Lock()
defer ti.Unlock()
ti.tree.Ascend(compactIndex(rev, available, &emptyki))
for _, ki := range emptyki {
item := ti.tree.Delete(ki)
if item == nil {
if ti.lg != nil {
ti.lg.Panic("failed to delete during compaction")
} else {
plog.Panic("store.index: unexpected delete failure during compaction")
clone := ti.tree.Clone()
ti.Unlock()

clone.Ascend(func(item btree.Item) bool {
keyi := item.(*keyIndex)
ti.Lock()
keyi.compact(rev, available)
if keyi.isEmpty() {
item := ti.tree.Delete(keyi)
if item == nil {
if ti.lg != nil {
ti.lg.Panic("failed to delete during compaction")
} else {
plog.Panic("store.index: unexpected delete failure during compaction")
}
}
}
}
ti.Unlock()
return true
})
return available
}

Expand All @@ -222,17 +226,6 @@ func (ti *treeIndex) Keep(rev int64) map[revision]struct{} {
return available
}

func compactIndex(rev int64, available map[revision]struct{}, emptyki *[]*keyIndex) func(i btree.Item) bool {
return func(i btree.Item) bool {
keyi := i.(*keyIndex)
keyi.compact(rev, available)
if keyi.isEmpty() {
*emptyki = append(*emptyki, keyi)
}
return true
}
}

func (ti *treeIndex) Equal(bi index) bool {
b := bi.(*treeIndex)

Expand Down
25 changes: 25 additions & 0 deletions mvcc/index_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package mvcc

import (
"testing"
)

func BenchmarkIndexCompact1(b *testing.B) { benchmarkIndexCompact(b, 1) }
func BenchmarkIndexCompact100(b *testing.B) { benchmarkIndexCompact(b, 100) }
func BenchmarkIndexCompact10000(b *testing.B) { benchmarkIndexCompact(b, 10000) }
func BenchmarkIndexCompact100000(b *testing.B) { benchmarkIndexCompact(b, 100000) }
func BenchmarkIndexCompact1000000(b *testing.B) { benchmarkIndexCompact(b, 1000000) }

func benchmarkIndexCompact(b *testing.B, size int) {
kvindex := newTreeIndex()

bytesN := 64
keys := createBytesSlice(bytesN, size)
for i := 1; i < size; i++ {
kvindex.Put(keys[i], revision {main: int64(i), sub: int64(i)})
}
b.ResetTimer()
for i := 1; i < b.N; i++ {
kvindex.Compact(int64(i))
}
}
4 changes: 2 additions & 2 deletions mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,10 @@ func (s *store) Compact(rev int64) (<-chan struct{}, error) {
// ensure that desired compaction is persisted
s.b.ForceCommit()

keep := s.kvindex.Compact(rev)
ch := make(chan struct{})
var j = func(ctx context.Context) {
keep := s.kvindex.Compact(rev)
indexCompactionPauseDurations.Observe(float64(time.Since(start) / time.Millisecond))
if ctx.Err() != nil {
s.compactBarrier(ctx, ch)
return
Expand All @@ -261,7 +262,6 @@ func (s *store) Compact(rev int64) (<-chan struct{}, error) {

s.fifoSched.Schedule(j)

indexCompactionPauseDurations.Observe(float64(time.Since(start) / time.Millisecond))
return ch, nil
}

Expand Down

0 comments on commit eb74545

Please sign in to comment.