Skip to content

Commit

Permalink
Add lazyFollower optimization to disable follower entry application
Browse files Browse the repository at this point in the history
  • Loading branch information
nvanbenschoten committed Apr 26, 2019
1 parent dad1e25 commit b62588c
Show file tree
Hide file tree
Showing 874 changed files with 319,136 additions and 4,933 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ raft-toy

# Output of the go coverage tool, specifically when used with LiteIDE
*.out
*.prof

# Test data
pebble-data
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ github.com/gogo/protobuf v1.0.0 h1:2jyBKDKU/8v3v2xVR2PtiWQviFUyiaGk2rpfyFT8rTM=
github.com/gogo/protobuf v1.0.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
Expand All @@ -36,17 +37,22 @@ github.com/grpc-ecosystem/grpc-gateway v1.4.1/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpg
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/json-iterator/go v1.1.5/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.0.0/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/matttproud/golang_protobuf_extensions v1.0.0 h1:YNOwxxSJzSUARoD9KRZLzM9Y858MNGCOACTvCW9TSAc=
github.com/matttproud/golang_protobuf_extensions v1.0.0/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
Expand Down Expand Up @@ -84,6 +90,8 @@ github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/etcd v0.0.0-20190417191421-cd7ffbe2270d h1:e9wo83KcuYgsnllPD+jumnGcj//lS6fRSz97LE0WPJ4=
go.etcd.io/etcd v0.0.0-20190417191421-cd7ffbe2270d/go.mod h1:KSGwdbiFchh5KIC9My2+ZVl5/3ANcwohw50dpPwa2cw=
go.etcd.io/etcd v0.0.0-20190425060523-cca0d5c1bed1 h1:xrG8gcsPMZwSN0OUTGT5T7DtmdOeTpxbic26vDtqOoc=
go.etcd.io/etcd v0.0.0-20190425060523-cca0d5c1bed1/go.mod h1:dNVoPOOuVFIPUanhzFuVeJ8eYRljmWRVTa4pS2jUNno=
go.etcd.io/etcd v3.3.12+incompatible h1:xR2YQOYo5JV5BMrUj9i1kcf2rEbpCQKHH2sKTtpAHiQ=
go.etcd.io/etcd v3.3.12+incompatible/go.mod h1:yaeTdrJi5lOmYerz05bd8+V7KubZs8YSFZfzsF9A6aI=
go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4=
Expand Down
15 changes: 8 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/nvanbenschoten/raft-toy/proposal"
"github.com/nvanbenschoten/raft-toy/storage"
"github.com/nvanbenschoten/raft-toy/storage/engine"
"github.com/nvanbenschoten/raft-toy/storage/wal"
"github.com/nvanbenschoten/raft-toy/transport"
)

Expand All @@ -31,13 +30,13 @@ func newPeer(epoch int32) *peer.Peer {
// WAL.
// w := wal.NewMem()
// w := engine.NewPebble(*dataDir, false).(wal.Wal)
w := wal.NewEtcdWal(*dataDir)
// w := wal.NewEtcdWal(*dataDir)
// Engine.
// e := engine.NewMem()
e := engine.NewPebble(*dataDir, false)
// e := engine.NewPebble(*dataDir, false)
// Combined.
s := storage.CombineWalAndEngine(w, e)
// s := engine.NewPebble(*dataDir, false).(storage.Storage)
// s := storage.CombineWalAndEngine(w, e)
s := engine.NewPebble(*dataDir, false).(storage.Storage)

// Transport.
t := transport.NewGRPC()
Expand All @@ -50,9 +49,11 @@ func newPeer(epoch int32) *peer.Peer {
case "parallel-append":
pl = pipeline.NewParallelAppender()
case "async-apply":
pl = pipeline.NewAsyncApplier(false /* earlyAck */)
pl = pipeline.NewAsyncApplier(false /* earlyAck */, false /* lazyFollower */)
case "async-apply-early-ack":
pl = pipeline.NewAsyncApplier(true /* earlyAck */)
pl = pipeline.NewAsyncApplier(true /* earlyAck */, false /* lazyFollower */)
case "async-apply-early-ack-lazy-follower":
pl = pipeline.NewAsyncApplier(true /* earlyAck */, true /* lazyFollower */)
default:
log.Fatalf("unknown pipeline %q", *pipelineImpl)
}
Expand Down
26 changes: 20 additions & 6 deletions pipeline/async_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pipeline
import (
"github.com/nvanbenschoten/raft-toy/metric"
"github.com/nvanbenschoten/raft-toy/proposal"
"go.etcd.io/etcd/raft"
"go.etcd.io/etcd/raft/raftpb"
)

Expand All @@ -11,13 +12,21 @@ import (
// allows it to have a tighter loop around appending log entries
// and sending Raft messages.
//
// The pipeline has an option to acknowledge committed entries
// immediately after learning that they were committed instead
// of waiting until after they have also been applied.
// The earlyAck option instructs the pipeline to acknowledge
// committed entries immediately after learning that they were
// committed instead of waiting until after they have also been
// applied.
//
// The lazyFollower option instructs the pipeline to forego entry
// application of Raft followers entirely. This is not quite a
// proposed optimization, but it simulates delayed and/or heavily
// batched entry application on followers, which is.
type asyncApplier struct {
basic
earlyAck bool
toApply chan asyncEvent
earlyAck bool
lazyFollower bool
leader bool
toApply chan asyncEvent
}

type asyncEvent struct {
Expand All @@ -27,7 +36,7 @@ type asyncEvent struct {
}

// NewAsyncApplier creates a new "async applier" pipeline.
func NewAsyncApplier(earlyAck bool) Pipeline {
func NewAsyncApplier(earlyAck, lazyFollower bool) Pipeline {
return &asyncApplier{
earlyAck: earlyAck,
toApply: make(chan asyncEvent, 512),
Expand All @@ -37,6 +46,9 @@ func NewAsyncApplier(earlyAck bool) Pipeline {
func (pl *asyncApplier) RunOnce() {
defer measurePipelineLat()()
rd := pl.n.Ready()
if rd.SoftState != nil {
pl.leader = rd.SoftState.RaftState == raft.StateLeader
}
if pl.earlyAck {
pl.ackCommittedEnts(rd.CommittedEntries)
}
Expand Down Expand Up @@ -84,6 +96,8 @@ func (pl *asyncApplier) maybeApplyAsync(ents []raftpb.Entry) {
pl.toApply <- asyncEvent{sync: syncC}
<-syncC
applyToStore(pl.n, pl.s, pl.pt, pl.l, ents, !pl.earlyAck)
} else if pl.lazyFollower && !pl.leader {
// Do nothing.
} else {
// Send to async applier.
pl.toApply <- asyncEvent{ents: ents}
Expand Down
2 changes: 1 addition & 1 deletion storage/engine/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func NewPebble(root string, disableWAL bool) Engine {
db: db,
opts: opts,
dir: dir,
c: wal.MakeLogCache(true),
c: wal.MakeLogCache(false),
}
}

Expand Down
4 changes: 3 additions & 1 deletion transport/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ func (g *grpc) sendAsync(to uint64, m *transpb.RaftMsg) {
if !ok {
log.Fatalf("unknown peer %d", to)
}
conn, err := rpc.DialContext(g.dialCtx, url, rpc.WithInsecure(), rpc.WithBlock())
conn, err := rpc.DialContext(g.dialCtx, url,
rpc.WithInsecure(), rpc.WithBlock(), rpc.WithInitialWindowSize(1<<20),
)
if err != nil {
switch err {
case context.Canceled:
Expand Down
20 changes: 20 additions & 0 deletions vendor/github.com/beorn7/perks/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit b62588c

Please sign in to comment.