diff --git a/.gitignore b/.gitignore index 4d967a5..9adb184 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,4 @@ raft-toy # Test data pebble-data +etcd-wal diff --git a/go.mod b/go.mod index 94cb0ce..9c018f8 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a github.com/spf13/pflag v1.0.3 go.etcd.io/etcd v0.0.0-20190417191421-cd7ffbe2270d + go.uber.org/zap v1.9.1 golang.org/x/net v0.0.0-20180906233101-161cd47e91fd golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f google.golang.org/grpc v1.14.0 diff --git a/go.sum b/go.sum index 01143bb..f170525 100644 --- a/go.sum +++ b/go.sum @@ -1,10 +1,13 @@ +github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI= github.com/coreos/etcd v3.3.12+incompatible h1:5k8nkcBSvltjOO5RLflnXevOJXndlKIMbvVnMTX+cUU= github.com/coreos/etcd v3.3.12+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= +github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7 h1:u9SHYsPQNyt5tgDm3YN7+9dYrpK96E5wFilTFWIDZOM= github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf h1:CAKfRE2YtTUIjjh1bkBtyYFaUT/WmOqsJjgtihT0vMI= github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= @@ -42,6 +45,7 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= +github.com/matttproud/golang_protobuf_extensions v1.0.0 h1:YNOwxxSJzSUARoD9KRZLzM9Y858MNGCOACTvCW9TSAc= github.com/matttproud/golang_protobuf_extensions v1.0.0/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -54,9 +58,13 @@ github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v0.8.0 h1:1921Yw9Gc3iSc4VQh3PIoOqgPCZS7G/4xQNVUp8Mda8= github.com/prometheus/client_golang v0.8.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= +github.com/prometheus/client_model v0.0.0-20170216185247-6f3806018612 h1:13pIdM2tpaDi4OVe24fgoIS7ZTqMt0QI+bwQsX5hq+g= github.com/prometheus/client_model v0.0.0-20170216185247-6f3806018612/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= +github.com/prometheus/common v0.0.0-20180518154759-7600349dcfe1 h1:osmNoEW2SCW3L7EX0km2LYM8HKpNWRiouxjE3XHkyGc= github.com/prometheus/common v0.0.0-20180518154759-7600349dcfe1/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= +github.com/prometheus/procfs v0.0.0-20180612222113-7d6f385de8be h1:MoyXp/VjXUwM0GyDcdwT7Ubea2gxOSHpPaFo3qV+Y2A= github.com/prometheus/procfs v0.0.0-20180612222113-7d6f385de8be/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= @@ -78,8 +86,11 @@ go.etcd.io/etcd v0.0.0-20190417191421-cd7ffbe2270d h1:e9wo83KcuYgsnllPD+jumnGcj/ go.etcd.io/etcd v0.0.0-20190417191421-cd7ffbe2270d/go.mod h1:KSGwdbiFchh5KIC9My2+ZVl5/3ANcwohw50dpPwa2cw= go.etcd.io/etcd v3.3.12+incompatible h1:xR2YQOYo5JV5BMrUj9i1kcf2rEbpCQKHH2sKTtpAHiQ= go.etcd.io/etcd v3.3.12+incompatible/go.mod h1:yaeTdrJi5lOmYerz05bd8+V7KubZs8YSFZfzsF9A6aI= +go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= +go.uber.org/zap v1.9.1 h1:XCJQEf3W6eZaVwhRBof6ImoYGJSITeKWsyeh3HFu/5o= go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= golang.org/x/crypto v0.0.0-20180608092829-8ac0e0d97ce4/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd h1:nTDtHvHSdCn1m6ITfMRqtOd/9+7a3s8RBNOZ3eYZzJA= diff --git a/main.go b/main.go index e41d935..5df47e4 100644 --- a/main.go +++ b/main.go @@ -14,6 +14,7 @@ import ( "github.com/nvanbenschoten/raft-toy/proposal" "github.com/nvanbenschoten/raft-toy/storage" "github.com/nvanbenschoten/raft-toy/storage/engine" + "github.com/nvanbenschoten/raft-toy/storage/wal" "github.com/nvanbenschoten/raft-toy/transport" ) @@ -30,12 +31,13 @@ func newPeer(epoch int32) *peer.Peer { // WAL. // w := wal.NewMem() // w := engine.NewPebble(*dataDir, false).(wal.Wal) + w := wal.NewEtcdWal(*dataDir) // Engine. // e := engine.NewMem() - // e := engine.NewPebble(*dataDir, true) + e := engine.NewPebble(*dataDir, false) // Combined. - // s := storage.CombineWalAndEngine(w, e) - s := engine.NewPebble(*dataDir, false).(storage.Storage) + s := storage.CombineWalAndEngine(w, e) + // s := engine.NewPebble(*dataDir, false).(storage.Storage) // Transport. t := transport.NewGRPC() @@ -122,4 +124,5 @@ func becomeLeader(p *peer.Peer) { } time.Sleep(1 * time.Millisecond) } + p.WaitForAllCaughtUp() } diff --git a/peer/peer.go b/peer/peer.go index 4fb5167..3adca65 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -145,7 +145,8 @@ func (p *Peer) Stop() { p.wg.Wait() p.pt.FinishAll() p.pl.Stop() - p.s.Close() + p.s.CloseEngine() + p.s.CloseWal() } func (p *Peer) stopped() bool { @@ -228,3 +229,24 @@ func (p *Peer) bumpEpoch(epoch int32) { p.n = n p.pl.Resume(epoch, n) } + +// WaitForAllCaughtUp waits for all peers to catch up to the same log index. +func (p *Peer) WaitForAllCaughtUp() { + for { + p.mu.Lock() + var match uint64 + caughtUp := true + p.n.WithProgress(func(id uint64, _ raft.ProgressType, pr raft.Progress) { + if match == 0 { + match = pr.Match + } else { + caughtUp = caughtUp && match == pr.Match + } + }) + p.mu.Unlock() + if caughtUp { + return + } + time.Sleep(10 * time.Millisecond) + } +} diff --git a/peer/prop_buf.go b/peer/prop_buf.go index 8b3020f..0e8068f 100644 --- a/peer/prop_buf.go +++ b/peer/prop_buf.go @@ -7,7 +7,7 @@ import ( "github.com/nvanbenschoten/raft-toy/proposal" ) -const propBufCap = 64 +const propBufCap = 256 type propBuf struct { mu sync.RWMutex diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index 9fe59b7..3589ee4 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -48,7 +48,9 @@ func saveToDisk(s storage.Storage, ents []raftpb.Entry, st raftpb.HardState, syn s.Append(ents) } if !raft.IsEmptyHardState(st) { - s.SetHardState(st) + // This isn't exactly correct, but it's close enough. + syncHS := sync && len(ents) == 0 + s.SetHardState(st, syncHS) } } } diff --git a/storage/engine/engine.go b/storage/engine/engine.go index b56e2cb..f69f7ae 100644 --- a/storage/engine/engine.go +++ b/storage/engine/engine.go @@ -4,10 +4,10 @@ import "go.etcd.io/etcd/raft/raftpb" // Engine represents a storage engine that Raft entries are applied to. type Engine interface { - SetHardState(raftpb.HardState) + SetHardState(raftpb.HardState, bool) ApplyEntry(raftpb.Entry) Clear() - Close() + CloseEngine() } // BatchingEngine represents a storage engine that can apply a batch of diff --git a/storage/engine/mem.go b/storage/engine/mem.go index 2c23879..88cfb04 100644 --- a/storage/engine/mem.go +++ b/storage/engine/mem.go @@ -21,7 +21,7 @@ func NewMem() Engine { } } -func (m *mem) SetHardState(st raftpb.HardState) { +func (m *mem) SetHardState(st raftpb.HardState, _ bool) { if err := m.m.SetHardState(st); err != nil { log.Fatal(err) } @@ -37,6 +37,6 @@ func (m *mem) Clear() { m.kv = make(map[string][]byte) } -func (m *mem) Close() { +func (m *mem) CloseEngine() { // No-op. } diff --git a/storage/engine/pebble.go b/storage/engine/pebble.go index 8d312bd..2951631 100644 --- a/storage/engine/pebble.go +++ b/storage/engine/pebble.go @@ -3,14 +3,13 @@ package engine import ( "bytes" "log" - "math" "math/rand" "os" "path/filepath" "strconv" "github.com/nvanbenschoten/raft-toy/proposal" - "github.com/nvanbenschoten/raft-toy/util/raftentry" + "github.com/nvanbenschoten/raft-toy/storage/wal" pdb "github.com/petermattis/pebble" "github.com/petermattis/pebble/db" "go.etcd.io/etcd/raft" @@ -33,7 +32,7 @@ type pebble struct { db *pdb.DB opts *db.Options dir string - c logCache + c wal.LogCache } // NewPebble creates an LSM-based storage engine using Pebble. @@ -53,7 +52,7 @@ func NewPebble(root string, disableWAL bool) Engine { db: db, opts: opts, dir: dir, - c: makeLogCache(), + c: wal.MakeLogCache(true), } } @@ -61,12 +60,12 @@ func randDir(root string) string { return filepath.Join(root, dirPrefix, strconv.FormatUint(rand.Uint64(), 10)) } -func (p *pebble) SetHardState(st raftpb.HardState) { +func (p *pebble) SetHardState(st raftpb.HardState, sync bool) { buf, err := st.Marshal() if err != nil { log.Fatal(err) } - if err := p.db.Set(hardStateKey, buf, db.Sync); err != nil { + if err := p.db.Set(hardStateKey, buf, optsForSync(sync)); err != nil { log.Fatal(err) } } @@ -101,7 +100,7 @@ func (p *pebble) ApplyEntries(ents []raftpb.Entry) { } func (p *pebble) Clear() { - p.Close() + p.CloseWal() db, err := pdb.Open(p.dir, p.opts) if err != nil { log.Fatal(err) @@ -109,7 +108,7 @@ func (p *pebble) Clear() { p.db = db } -func (p *pebble) Close() { +func (p *pebble) CloseEngine() { if err := p.db.Close(); err != nil { log.Fatal(err) } @@ -163,31 +162,6 @@ func decodeRaftLogKey(k []byte) uint64 { return v } -const cacheSizeTarget = 2048 - -// logCache remembers a few facts about the Raft log. -type logCache struct { - lastIndex uint64 - lastTerm uint64 - truncIndex uint64 - ec *raftentry.Cache -} - -func makeLogCache() logCache { - return logCache{lastIndex: 0, ec: raftentry.NewCache(4 << 20)} -} - -func (c *logCache) updateForEnts(ents []raftpb.Entry) { - last := ents[len(ents)-1] - c.lastIndex = last.Index - c.lastTerm = last.Term - c.ec.Add(0, ents, true) - if last.Index >= c.truncIndex+2*cacheSizeTarget { - c.truncIndex = last.Index - cacheSizeTarget - c.ec.Clear(0, c.truncIndex) - } -} - func (p *pebble) Append(ents []raftpb.Entry) { if len(ents) == 0 { return @@ -198,7 +172,7 @@ func (p *pebble) Append(ents []raftpb.Entry) { if err := b.Commit(db.Sync); err != nil { log.Fatal(err) } - p.c.updateForEnts(ents) + p.c.UpdateOnAppend(ents) } func appendEntsToBatch(b *pdb.Batch, ents []raftpb.Entry) { @@ -230,7 +204,7 @@ func (p *pebble) Entries(lo, hi uint64) []raftpb.Entry { n = 100 } ents := make([]raftpb.Entry, 0, n) - ents, _, hitIndex, _ := p.c.ec.Scan(ents, 0, lo, hi, math.MaxUint64) + ents, hitIndex := p.c.Entries(ents, lo, hi) if uint64(len(ents)) == hi-lo { return ents } @@ -253,11 +227,8 @@ func (p *pebble) Entries(lo, hi uint64) []raftpb.Entry { } func (p *pebble) Term(i uint64) uint64 { - if p.c.lastIndex == i && p.c.lastTerm != 0 { - return p.c.lastTerm - } - if e, ok := p.c.ec.Get(0, i); ok { - return e.Term + if t, ok := p.c.Term(i); ok { + return t } var kArr [maxLogKeyLen]byte @@ -279,16 +250,20 @@ func (p *pebble) Term(i uint64) uint64 { } func (p *pebble) LastIndex() uint64 { - return p.c.lastIndex + return p.c.LastIndex() } func (p *pebble) FirstIndex() uint64 { - return 1 + return p.c.FirstIndex() } func (p *pebble) Truncate() { p.Clear() - p.c = makeLogCache() + p.c.Reset() +} + +func (p *pebble) CloseWal() { + p.CloseEngine() } ////////////////////////////////////////////////////// @@ -304,7 +279,7 @@ func (p *pebble) AppendAndSetHardState(ents []raftpb.Entry, st raftpb.HardState, defer b.Close() if len(ents) > 0 { appendEntsToBatch(b, ents) - p.c.updateForEnts(ents) + p.c.UpdateOnAppend(ents) } if !raft.IsEmptyHardState(st) { buf, err := st.Marshal() @@ -315,11 +290,14 @@ func (p *pebble) AppendAndSetHardState(ents []raftpb.Entry, st raftpb.HardState, log.Fatal(err) } } - opts := db.NoSync - if sync { - opts = db.Sync - } - if err := b.Commit(opts); err != nil { + if err := b.Commit(optsForSync(sync)); err != nil { log.Fatal(err) } } + +func optsForSync(sync bool) *db.WriteOptions { + if sync { + return db.Sync + } + return db.NoSync +} diff --git a/storage/wal/cache.go b/storage/wal/cache.go new file mode 100644 index 0000000..2ab07bf --- /dev/null +++ b/storage/wal/cache.go @@ -0,0 +1,77 @@ +package wal + +import ( + "math" + + "github.com/nvanbenschoten/raft-toy/util/raftentry" + "go.etcd.io/etcd/raft/raftpb" +) + +const cacheByteLimit = 512 << 20 // 512 MB +const cacheSizeTarget = 2048 + +// LogCache caches state about the Raft log in-memory to avoid on-disk lookups. +type LogCache struct { + lastIndex uint64 + lastTerm uint64 + truncIndex uint64 + trunc bool + ec *raftentry.Cache +} + +// MakeLogCache creates a new Raft log cache. Trunc indicates whether old entries +// in the cache should be periodically evicted. +func MakeLogCache(trunc bool) LogCache { + return LogCache{trunc: trunc, ec: raftentry.NewCache(cacheByteLimit)} +} + +// UpdateOnAppend updates the cache based on the newly-appended entries. +func (c *LogCache) UpdateOnAppend(ents []raftpb.Entry) { + last := ents[len(ents)-1] + c.lastIndex = last.Index + c.lastTerm = last.Term + c.ec.Add(0, ents, true) + c.maybeTruncate() +} + +func (c *LogCache) maybeTruncate() { + if c.trunc && c.lastIndex >= c.truncIndex+2*cacheSizeTarget { + c.truncIndex = c.lastIndex - cacheSizeTarget + c.ec.Clear(0, c.truncIndex) + } +} + +// Entries returns entries between lo and hi. +func (c *LogCache) Entries(ents []raftpb.Entry, lo, hi uint64) ([]raftpb.Entry, uint64) { + ents, _, hitIndex, _ := c.ec.Scan(ents, 0, lo, hi, math.MaxUint64) + return ents, hitIndex +} + +// Term attempts to determine the term for the provided log index. +func (c *LogCache) Term(i uint64) (uint64, bool) { + if c.lastIndex == i && c.lastTerm != 0 { + return c.lastTerm, true + } + if e, ok := c.ec.Get(0, i); ok { + return e.Term, true + } + return 0, false +} + +// LastIndex returns the index of the last entry in the log. +func (c *LogCache) LastIndex() uint64 { + return c.lastIndex +} + +// FirstIndex returns the index of the first entry in the log. +func (c *LogCache) FirstIndex() uint64 { + return 1 +} + +// Reset resets the cache. +func (c *LogCache) Reset() { + c.lastIndex = 0 + c.lastTerm = 0 + c.truncIndex = 0 + c.ec.Drop(0) +} diff --git a/storage/wal/etcd.go b/storage/wal/etcd.go new file mode 100644 index 0000000..520b929 --- /dev/null +++ b/storage/wal/etcd.go @@ -0,0 +1,95 @@ +package wal + +import ( + "log" + "math/rand" + "os" + "path/filepath" + "strconv" + + "go.etcd.io/etcd/raft/raftpb" + etcdwal "go.etcd.io/etcd/wal" + "go.uber.org/zap" +) + +const dirPrefix = "etcd-wal" + +type etcdWal struct { + w *etcdwal.WAL + dir string + c LogCache +} + +// NewEtcdWal creates a new write-ahead log using the etcd/wal package. +func NewEtcdWal(root string) Wal { + dir := randDir(root) + w, err := etcdwal.Create(zap.NewNop(), dir, nil) + if err != nil { + log.Fatal(err) + } + return &etcdWal{ + w: w, + dir: dir, + // We can't allow the cache to perform compactions + // of old indices because we can't read from the WAL + // file directly. + c: MakeLogCache(false), + } +} + +func randDir(root string) string { + return filepath.Join(root, dirPrefix, strconv.FormatUint(rand.Uint64(), 10)) +} + +func (w *etcdWal) Append(ents []raftpb.Entry) { + if err := w.w.Save(raftpb.HardState{}, ents); err != nil { + log.Fatal(err) + } + w.c.UpdateOnAppend(ents) +} + +func (w *etcdWal) Entries(lo, hi uint64) []raftpb.Entry { + n := hi - lo + if n > 100 { + n = 100 + } + ents := make([]raftpb.Entry, 0, n) + ents, _ = w.c.Entries(ents, lo, hi) + if uint64(len(ents)) != hi-lo { + log.Fatalf("missing entries in entry cache [%d,%d)", lo, hi) + } + return ents +} + +func (w *etcdWal) Term(i uint64) uint64 { + if t, ok := w.c.Term(i); ok { + return t + } + return 0 +} + +func (w *etcdWal) LastIndex() uint64 { + return w.c.LastIndex() +} + +func (w *etcdWal) FirstIndex() uint64 { + return w.c.FirstIndex() +} + +func (w *etcdWal) Truncate() { + w.CloseWal() + var err error + if w.w, err = etcdwal.Create(zap.NewNop(), w.dir, nil); err != nil { + log.Fatal(err) + } + w.c.Reset() +} + +func (w *etcdWal) CloseWal() { + if err := w.w.Close(); err != nil { + log.Fatal(err) + } + if err := os.RemoveAll(w.dir); err != nil { + log.Fatal(err) + } +} diff --git a/storage/wal/mem.go b/storage/wal/mem.go index efa5391..fbc5db2 100644 --- a/storage/wal/mem.go +++ b/storage/wal/mem.go @@ -59,3 +59,7 @@ func (m *mem) FirstIndex() uint64 { func (m *mem) Truncate() { m.m = raft.NewMemoryStorage() } + +func (m *mem) CloseWal() { + // No-op. +} diff --git a/storage/wal/wal.go b/storage/wal/wal.go index a7da7e4..2982fe4 100644 --- a/storage/wal/wal.go +++ b/storage/wal/wal.go @@ -10,4 +10,5 @@ type Wal interface { Term(i uint64) uint64 LastIndex() uint64 FirstIndex() uint64 + CloseWal() }