From f176427791da54267e96277d7d6666c72214d9b2 Mon Sep 17 00:00:00 2001 From: jocalvert Date: Thu, 29 Mar 2018 21:03:30 +0000 Subject: [PATCH] mvcc: Clone the key index for compaction and lock on each item For compaction, clone the original Btree for traversal purposes, so as to not hold the lock for the duration of compaction. This allows read/write throughput by not blocking when the index tree is large (> 1M entries). mvcc: add comment for index compaction lock mvcc: explicitly unlock store to do index compaction synchronously mvcc: formatting index bench mvcc: add release note for index compaction changes mvcc: add license header --- CHANGELOG-3.4.md | 1 + mvcc/index.go | 44 ++++++++++++++++++---------------------- mvcc/index_bench_test.go | 42 ++++++++++++++++++++++++++++++++++++++ mvcc/kvstore.go | 9 +++++--- 4 files changed, 69 insertions(+), 27 deletions(-) create mode 100644 mvcc/index_bench_test.go diff --git a/CHANGELOG-3.4.md b/CHANGELOG-3.4.md index ebb4ef69d98..54bae2953db 100644 --- a/CHANGELOG-3.4.md +++ b/CHANGELOG-3.4.md @@ -34,6 +34,7 @@ See [code changes](https://github.com/coreos/etcd/compare/v3.3.0...v3.4.0) and [ - e.g. a node is removed from cluster, or [`raftpb.MsgProp` arrives at current leader while there is an ongoing leadership transfer](https://github.com/coreos/etcd/issues/8975). - Add [`snapshot`](https://github.com/coreos/etcd/pull/9118) package for easier snapshot workflow (see [`godoc.org/github.com/etcd/snapshot`](https://godoc.org/github.com/coreos/etcd/snapshot) for more). - Improve [functional tester](https://github.com/coreos/etcd/tree/master/functional) coverage: [proxy layer to run network fault tests in CI](https://github.com/coreos/etcd/pull/9081), [TLS is enabled both for server and client](https://github.com/coreos/etcd/pull/9534), [liveness mode](https://github.com/coreos/etcd/issues/9230), [shuffle test sequence](https://github.com/coreos/etcd/issues/9381), [membership reconfiguration failure cases](https://github.com/coreos/etcd/pull/9564), [disastrous quorum loss and snapshot recover from a seed member](https://github.com/coreos/etcd/pull/9565), [embedded etcd](https://github.com/coreos/etcd/pull/9572). +- Improve [index compaction blocking](https://github.com/coreos/etcd/pull/9511) by using a copy on write clone to avoid holding the lock for the traversal of the entire index. ### Breaking Changes diff --git a/mvcc/index.go b/mvcc/index.go index 626de3825fc..16e223be225 100644 --- a/mvcc/index.go +++ b/mvcc/index.go @@ -185,27 +185,34 @@ func (ti *treeIndex) RangeSince(key, end []byte, rev int64) []revision { func (ti *treeIndex) Compact(rev int64) map[revision]struct{} { available := make(map[revision]struct{}) - var emptyki []*keyIndex if ti.lg != nil { ti.lg.Info("compact tree index", zap.Int64("revision", rev)) } else { plog.Printf("store.index: compact %d", rev) } - // TODO: do not hold the lock for long time? - // This is probably OK. Compacting 10M keys takes O(10ms). ti.Lock() - defer ti.Unlock() - ti.tree.Ascend(compactIndex(rev, available, &emptyki)) - for _, ki := range emptyki { - item := ti.tree.Delete(ki) - if item == nil { - if ti.lg != nil { - ti.lg.Panic("failed to delete during compaction") - } else { - plog.Panic("store.index: unexpected delete failure during compaction") + clone := ti.tree.Clone() + ti.Unlock() + + clone.Ascend(func(item btree.Item) bool { + keyi := item.(*keyIndex) + //Lock is needed here to prevent modification to the keyIndex while + //compaction is going on or revision added to empty before deletion + ti.Lock() + keyi.compact(rev, available) + if keyi.isEmpty() { + item := ti.tree.Delete(keyi) + if item == nil { + if ti.lg != nil { + ti.lg.Panic("failed to delete during compaction") + } else { + plog.Panic("store.index: unexpected delete failure during compaction") + } } } - } + ti.Unlock() + return true + }) return available } @@ -222,17 +229,6 @@ func (ti *treeIndex) Keep(rev int64) map[revision]struct{} { return available } -func compactIndex(rev int64, available map[revision]struct{}, emptyki *[]*keyIndex) func(i btree.Item) bool { - return func(i btree.Item) bool { - keyi := i.(*keyIndex) - keyi.compact(rev, available) - if keyi.isEmpty() { - *emptyki = append(*emptyki, keyi) - } - return true - } -} - func (ti *treeIndex) Equal(bi index) bool { b := bi.(*treeIndex) diff --git a/mvcc/index_bench_test.go b/mvcc/index_bench_test.go new file mode 100644 index 00000000000..5d2e5e3f49b --- /dev/null +++ b/mvcc/index_bench_test.go @@ -0,0 +1,42 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mvcc + +import ( + "testing" + + "go.uber.org/zap" +) + +func BenchmarkIndexCompact1(b *testing.B) { benchmarkIndexCompact(b, 1) } +func BenchmarkIndexCompact100(b *testing.B) { benchmarkIndexCompact(b, 100) } +func BenchmarkIndexCompact10000(b *testing.B) { benchmarkIndexCompact(b, 10000) } +func BenchmarkIndexCompact100000(b *testing.B) { benchmarkIndexCompact(b, 100000) } +func BenchmarkIndexCompact1000000(b *testing.B) { benchmarkIndexCompact(b, 1000000) } + +func benchmarkIndexCompact(b *testing.B, size int) { + log := zap.NewNop() + kvindex := newTreeIndex(log) + + bytesN := 64 + keys := createBytesSlice(bytesN, size) + for i := 1; i < size; i++ { + kvindex.Put(keys[i], revision{main: int64(i), sub: int64(i)}) + } + b.ResetTimer() + for i := 1; i < b.N; i++ { + kvindex.Compact(int64(i)) + } +} diff --git a/mvcc/kvstore.go b/mvcc/kvstore.go index f03b6311e4f..832de9e35d5 100644 --- a/mvcc/kvstore.go +++ b/mvcc/kvstore.go @@ -217,17 +217,18 @@ func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev func (s *store) Compact(rev int64) (<-chan struct{}, error) { s.mu.Lock() - defer s.mu.Unlock() s.revMu.Lock() - defer s.revMu.Unlock() - if rev <= s.compactMainRev { ch := make(chan struct{}) f := func(ctx context.Context) { s.compactBarrier(ctx, ch) } s.fifoSched.Schedule(f) + s.mu.Unlock() + s.revMu.Unlock() return ch, ErrCompacted } if rev > s.currentRev { + s.mu.Unlock() + s.revMu.Unlock() return nil, ErrFutureRev } @@ -245,6 +246,8 @@ func (s *store) Compact(rev int64) (<-chan struct{}, error) { // ensure that desired compaction is persisted s.b.ForceCommit() + s.mu.Unlock() + s.revMu.Unlock() keep := s.kvindex.Compact(rev) ch := make(chan struct{}) var j = func(ctx context.Context) {