Skip to content

Commit

Permalink
Add new WAL implementation using coreos/etcd/wal
Browse files Browse the repository at this point in the history
  • Loading branch information
nvanbenschoten committed Apr 26, 2019
1 parent 64c7ac0 commit dad1e25
Show file tree
Hide file tree
Showing 14 changed files with 254 additions and 59 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ raft-toy

# Test data
pebble-data
etcd-wal
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a
github.com/spf13/pflag v1.0.3
go.etcd.io/etcd v0.0.0-20190417191421-cd7ffbe2270d
go.uber.org/zap v1.9.1
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f
google.golang.org/grpc v1.14.0
Expand Down
11 changes: 11 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI=
github.com/coreos/etcd v3.3.12+incompatible h1:5k8nkcBSvltjOO5RLflnXevOJXndlKIMbvVnMTX+cUU=
github.com/coreos/etcd v3.3.12+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7 h1:u9SHYsPQNyt5tgDm3YN7+9dYrpK96E5wFilTFWIDZOM=
github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf h1:CAKfRE2YtTUIjjh1bkBtyYFaUT/WmOqsJjgtihT0vMI=
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
Expand Down Expand Up @@ -42,6 +45,7 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/matttproud/golang_protobuf_extensions v1.0.0 h1:YNOwxxSJzSUARoD9KRZLzM9Y858MNGCOACTvCW9TSAc=
github.com/matttproud/golang_protobuf_extensions v1.0.0/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
Expand All @@ -54,9 +58,13 @@ github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v0.8.0 h1:1921Yw9Gc3iSc4VQh3PIoOqgPCZS7G/4xQNVUp8Mda8=
github.com/prometheus/client_golang v0.8.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_model v0.0.0-20170216185247-6f3806018612 h1:13pIdM2tpaDi4OVe24fgoIS7ZTqMt0QI+bwQsX5hq+g=
github.com/prometheus/client_model v0.0.0-20170216185247-6f3806018612/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/common v0.0.0-20180518154759-7600349dcfe1 h1:osmNoEW2SCW3L7EX0km2LYM8HKpNWRiouxjE3XHkyGc=
github.com/prometheus/common v0.0.0-20180518154759-7600349dcfe1/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/procfs v0.0.0-20180612222113-7d6f385de8be h1:MoyXp/VjXUwM0GyDcdwT7Ubea2gxOSHpPaFo3qV+Y2A=
github.com/prometheus/procfs v0.0.0-20180612222113-7d6f385de8be/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
Expand All @@ -78,8 +86,11 @@ go.etcd.io/etcd v0.0.0-20190417191421-cd7ffbe2270d h1:e9wo83KcuYgsnllPD+jumnGcj/
go.etcd.io/etcd v0.0.0-20190417191421-cd7ffbe2270d/go.mod h1:KSGwdbiFchh5KIC9My2+ZVl5/3ANcwohw50dpPwa2cw=
go.etcd.io/etcd v3.3.12+incompatible h1:xR2YQOYo5JV5BMrUj9i1kcf2rEbpCQKHH2sKTtpAHiQ=
go.etcd.io/etcd v3.3.12+incompatible/go.mod h1:yaeTdrJi5lOmYerz05bd8+V7KubZs8YSFZfzsF9A6aI=
go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.9.1 h1:XCJQEf3W6eZaVwhRBof6ImoYGJSITeKWsyeh3HFu/5o=
go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/crypto v0.0.0-20180608092829-8ac0e0d97ce4/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd h1:nTDtHvHSdCn1m6ITfMRqtOd/9+7a3s8RBNOZ3eYZzJA=
Expand Down
9 changes: 6 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/nvanbenschoten/raft-toy/proposal"
"github.com/nvanbenschoten/raft-toy/storage"
"github.com/nvanbenschoten/raft-toy/storage/engine"
"github.com/nvanbenschoten/raft-toy/storage/wal"
"github.com/nvanbenschoten/raft-toy/transport"
)

Expand All @@ -30,12 +31,13 @@ func newPeer(epoch int32) *peer.Peer {
// WAL.
// w := wal.NewMem()
// w := engine.NewPebble(*dataDir, false).(wal.Wal)
w := wal.NewEtcdWal(*dataDir)
// Engine.
// e := engine.NewMem()
// e := engine.NewPebble(*dataDir, true)
e := engine.NewPebble(*dataDir, false)
// Combined.
// s := storage.CombineWalAndEngine(w, e)
s := engine.NewPebble(*dataDir, false).(storage.Storage)
s := storage.CombineWalAndEngine(w, e)
// s := engine.NewPebble(*dataDir, false).(storage.Storage)

// Transport.
t := transport.NewGRPC()
Expand Down Expand Up @@ -122,4 +124,5 @@ func becomeLeader(p *peer.Peer) {
}
time.Sleep(1 * time.Millisecond)
}
p.WaitForAllCaughtUp()
}
24 changes: 23 additions & 1 deletion peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ func (p *Peer) Stop() {
p.wg.Wait()
p.pt.FinishAll()
p.pl.Stop()
p.s.Close()
p.s.CloseEngine()
p.s.CloseWal()
}

func (p *Peer) stopped() bool {
Expand Down Expand Up @@ -228,3 +229,24 @@ func (p *Peer) bumpEpoch(epoch int32) {
p.n = n
p.pl.Resume(epoch, n)
}

// WaitForAllCaughtUp waits for all peers to catch up to the same log index.
func (p *Peer) WaitForAllCaughtUp() {
for {
p.mu.Lock()
var match uint64
caughtUp := true
p.n.WithProgress(func(id uint64, _ raft.ProgressType, pr raft.Progress) {
if match == 0 {
match = pr.Match
} else {
caughtUp = caughtUp && match == pr.Match
}
})
p.mu.Unlock()
if caughtUp {
return
}
time.Sleep(10 * time.Millisecond)
}
}
2 changes: 1 addition & 1 deletion peer/prop_buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/nvanbenschoten/raft-toy/proposal"
)

const propBufCap = 64
const propBufCap = 256

type propBuf struct {
mu sync.RWMutex
Expand Down
4 changes: 3 additions & 1 deletion pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ func saveToDisk(s storage.Storage, ents []raftpb.Entry, st raftpb.HardState, syn
s.Append(ents)
}
if !raft.IsEmptyHardState(st) {
s.SetHardState(st)
// This isn't exactly correct, but it's close enough.
syncHS := sync && len(ents) == 0
s.SetHardState(st, syncHS)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions storage/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import "go.etcd.io/etcd/raft/raftpb"

// Engine represents a storage engine that Raft entries are applied to.
type Engine interface {
SetHardState(raftpb.HardState)
SetHardState(raftpb.HardState, bool)
ApplyEntry(raftpb.Entry)
Clear()
Close()
CloseEngine()
}

// BatchingEngine represents a storage engine that can apply a batch of
Expand Down
4 changes: 2 additions & 2 deletions storage/engine/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func NewMem() Engine {
}
}

func (m *mem) SetHardState(st raftpb.HardState) {
func (m *mem) SetHardState(st raftpb.HardState, _ bool) {
if err := m.m.SetHardState(st); err != nil {
log.Fatal(err)
}
Expand All @@ -37,6 +37,6 @@ func (m *mem) Clear() {
m.kv = make(map[string][]byte)
}

func (m *mem) Close() {
func (m *mem) CloseEngine() {
// No-op.
}
76 changes: 27 additions & 49 deletions storage/engine/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@ package engine
import (
"bytes"
"log"
"math"
"math/rand"
"os"
"path/filepath"
"strconv"

"github.com/nvanbenschoten/raft-toy/proposal"
"github.com/nvanbenschoten/raft-toy/util/raftentry"
"github.com/nvanbenschoten/raft-toy/storage/wal"
pdb "github.com/petermattis/pebble"
"github.com/petermattis/pebble/db"
"go.etcd.io/etcd/raft"
Expand All @@ -33,7 +32,7 @@ type pebble struct {
db *pdb.DB
opts *db.Options
dir string
c logCache
c wal.LogCache
}

// NewPebble creates an LSM-based storage engine using Pebble.
Expand All @@ -53,20 +52,20 @@ func NewPebble(root string, disableWAL bool) Engine {
db: db,
opts: opts,
dir: dir,
c: makeLogCache(),
c: wal.MakeLogCache(true),
}
}

func randDir(root string) string {
return filepath.Join(root, dirPrefix, strconv.FormatUint(rand.Uint64(), 10))
}

func (p *pebble) SetHardState(st raftpb.HardState) {
func (p *pebble) SetHardState(st raftpb.HardState, sync bool) {
buf, err := st.Marshal()
if err != nil {
log.Fatal(err)
}
if err := p.db.Set(hardStateKey, buf, db.Sync); err != nil {
if err := p.db.Set(hardStateKey, buf, optsForSync(sync)); err != nil {
log.Fatal(err)
}
}
Expand Down Expand Up @@ -101,15 +100,15 @@ func (p *pebble) ApplyEntries(ents []raftpb.Entry) {
}

func (p *pebble) Clear() {
p.Close()
p.CloseWal()
db, err := pdb.Open(p.dir, p.opts)
if err != nil {
log.Fatal(err)
}
p.db = db
}

func (p *pebble) Close() {
func (p *pebble) CloseEngine() {
if err := p.db.Close(); err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -163,31 +162,6 @@ func decodeRaftLogKey(k []byte) uint64 {
return v
}

const cacheSizeTarget = 2048

// logCache remembers a few facts about the Raft log.
type logCache struct {
lastIndex uint64
lastTerm uint64
truncIndex uint64
ec *raftentry.Cache
}

func makeLogCache() logCache {
return logCache{lastIndex: 0, ec: raftentry.NewCache(4 << 20)}
}

func (c *logCache) updateForEnts(ents []raftpb.Entry) {
last := ents[len(ents)-1]
c.lastIndex = last.Index
c.lastTerm = last.Term
c.ec.Add(0, ents, true)
if last.Index >= c.truncIndex+2*cacheSizeTarget {
c.truncIndex = last.Index - cacheSizeTarget
c.ec.Clear(0, c.truncIndex)
}
}

func (p *pebble) Append(ents []raftpb.Entry) {
if len(ents) == 0 {
return
Expand All @@ -198,7 +172,7 @@ func (p *pebble) Append(ents []raftpb.Entry) {
if err := b.Commit(db.Sync); err != nil {
log.Fatal(err)
}
p.c.updateForEnts(ents)
p.c.UpdateOnAppend(ents)
}

func appendEntsToBatch(b *pdb.Batch, ents []raftpb.Entry) {
Expand Down Expand Up @@ -230,7 +204,7 @@ func (p *pebble) Entries(lo, hi uint64) []raftpb.Entry {
n = 100
}
ents := make([]raftpb.Entry, 0, n)
ents, _, hitIndex, _ := p.c.ec.Scan(ents, 0, lo, hi, math.MaxUint64)
ents, hitIndex := p.c.Entries(ents, lo, hi)
if uint64(len(ents)) == hi-lo {
return ents
}
Expand All @@ -253,11 +227,8 @@ func (p *pebble) Entries(lo, hi uint64) []raftpb.Entry {
}

func (p *pebble) Term(i uint64) uint64 {
if p.c.lastIndex == i && p.c.lastTerm != 0 {
return p.c.lastTerm
}
if e, ok := p.c.ec.Get(0, i); ok {
return e.Term
if t, ok := p.c.Term(i); ok {
return t
}

var kArr [maxLogKeyLen]byte
Expand All @@ -279,16 +250,20 @@ func (p *pebble) Term(i uint64) uint64 {
}

func (p *pebble) LastIndex() uint64 {
return p.c.lastIndex
return p.c.LastIndex()
}

func (p *pebble) FirstIndex() uint64 {
return 1
return p.c.FirstIndex()
}

func (p *pebble) Truncate() {
p.Clear()
p.c = makeLogCache()
p.c.Reset()
}

func (p *pebble) CloseWal() {
p.CloseEngine()
}

//////////////////////////////////////////////////////
Expand All @@ -304,7 +279,7 @@ func (p *pebble) AppendAndSetHardState(ents []raftpb.Entry, st raftpb.HardState,
defer b.Close()
if len(ents) > 0 {
appendEntsToBatch(b, ents)
p.c.updateForEnts(ents)
p.c.UpdateOnAppend(ents)
}
if !raft.IsEmptyHardState(st) {
buf, err := st.Marshal()
Expand All @@ -315,11 +290,14 @@ func (p *pebble) AppendAndSetHardState(ents []raftpb.Entry, st raftpb.HardState,
log.Fatal(err)
}
}
opts := db.NoSync
if sync {
opts = db.Sync
}
if err := b.Commit(opts); err != nil {
if err := b.Commit(optsForSync(sync)); err != nil {
log.Fatal(err)
}
}

func optsForSync(sync bool) *db.WriteOptions {
if sync {
return db.Sync
}
return db.NoSync
}
Loading

0 comments on commit dad1e25

Please sign in to comment.