Skip to content

Commit

Permalink
raft: provide protection against unbounded Raft log growth
Browse files Browse the repository at this point in the history
The suggested pattern for Raft proposals is that they be retried
periodically until they succeed. This turns out to be an issue
when a leader cannot commit entries because the leader will continue
to append re-proposed entries to its log without committing anything.
This can result in the uncommitted tail of a leader's log growing
without bound until it is able to commit entries.

This change add a safeguard to protect against this case where a
leader's log can grow without bound during loss of quorum scenarios.
It does so by introducing a new, optional `MaxUncommittedEntries`
configuration. This config limits the max number of uncommitted
entries that may be appended to a leader's log. Once this limit
is exceeded, proposals will begin to return ErrProposalDropped
errors.

See cockroachdb/cockroach#27772
  • Loading branch information
nvanbenschoten committed Oct 9, 2018
1 parent b046a37 commit 58b7dc6
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 25 deletions.
13 changes: 7 additions & 6 deletions contrib/raftexample/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,12 +274,13 @@ func (rc *raftNode) startRaft() {
rpeers[i] = raft.Peer{ID: uint64(i + 1)}
}
c := &raft.Config{
ID: uint64(rc.id),
ElectionTick: 10,
HeartbeatTick: 1,
Storage: rc.raftStorage,
MaxSizePerMsg: 1024 * 1024,
MaxInflightMsgs: 256,
ID: uint64(rc.id),
ElectionTick: 10,
HeartbeatTick: 1,
Storage: rc.raftStorage,
MaxSizePerMsg: 1024 * 1024,
MaxInflightMsgs: 256,
MaxUncommittedEntries: 1024,
}

if oldwal {
Expand Down
70 changes: 70 additions & 0 deletions raft/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -997,3 +997,73 @@ func TestNodeCommitPaginationAfterRestart(t *testing.T) {
)
}
}

// TestBoundedLogGrowthWithPartition tests a scenario where a leader is
// partitioned from a quorum of nodes. It verifies that the leader's log is
// protected from unbounded growth even as new entries continue to be proposed.
// This protection is provided by the MaxUncommittedEntries configuration.
func TestBoundedLogGrowthWithPartition(t *testing.T) {
peers := []uint64{1, 2, 3, 4, 5}
cfg1 := newTestConfig(1, peers, 10, 1, NewMemoryStorage())
cfg1.MaxUncommittedEntries = 16
r1 := newRaft(cfg1)
r2 := newTestRaft(2, peers, 10, 1, NewMemoryStorage())
r3 := newTestRaft(3, peers, 10, 1, NewMemoryStorage())
r4 := newTestRaft(4, peers, 10, 1, NewMemoryStorage())
r5 := newTestRaft(5, peers, 10, 1, NewMemoryStorage())
nt := newNetwork(r1, r2, r3, r4, r5)

// Elect r1 as leader and commit to the tail of the log.
nt.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgHup})

// Partition the network while we make our proposals. These proposals should
// not cause the leader's log to grow indefinitely.
nt.isolate(1)
testEntry := []raftpb.Entry{{Data: []byte("testdata")}}
testEntryMsg := raftpb.Message{From: 1, To: 1, Type: raftpb.MsgProp, Entries: testEntry}
for i := 0; i < 1024; i++ {
nt.send(testEntryMsg)
}

// Check the size of leader's uncommitted log tail. It should not exceed the
// MaxUncommittedEntries limit.
checkUncommitted := func(exp int) {
t.Helper()
l := int(r1.raftLog.lastIndex() - r1.raftLog.committed)
if exp != l {
t.Fatalf("expected %d uncommitted entries, found %d", exp, l)
}
}
checkUncommitted(cfg1.MaxUncommittedEntries)

// Recover from the partition and tick the clock to wake everything back up
// and send the messages. The uncommitted tail of the Raft log should
// disappear as entries are committed.
nt.recover()
nt.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgBeat})
checkUncommitted(0)

// Create a second partition. This time, the leader is partitioned with a
// single follower. They still can't establish a quorum.
nt.isolate(1)
nt.isolate(2)
nt.drop(1, 2, 0.0)
nt.drop(2, 1, 0.0)

// The follower now forwards proposals to the leader. Same deal.
testEntryMsg = raftpb.Message{From: 2, To: 2, Type: raftpb.MsgProp, Entries: testEntry}
for i := 0; i < 1024; i++ {
nt.send(testEntryMsg)
}

// Check the size of leader's uncommitted log tail. It should not exceed the
// MaxUncommittedEntries limit.
checkUncommitted(cfg1.MaxUncommittedEntries)

// Recover from the partition and tick the clock to wake everything back up
// and send the messages. The uncommitted tail of the Raft log should
// disappear as entries are committed.
nt.recover()
nt.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgBeat})
checkUncommitted(0)
}
31 changes: 26 additions & 5 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ type Config struct {
// overflowing that sending buffer. TODO (xiangli): feedback to application to
// limit the proposal rate?
MaxInflightMsgs int
// MaxUncommittedEntries limits the max number of uncommitted entries that
// may be appended to a leader's log. Once this limit is exceeded, proposals
// will begin to return ErrProposalDropped errors. Note: 0 for no limit.
MaxUncommittedEntries int

// CheckQuorum specifies if the leader should check quorum activity. Leader
// steps down when quorum is not active for an electionTimeout.
Expand Down Expand Up @@ -219,6 +223,10 @@ func (c *Config) validate() error {
return errors.New("max inflight messages must be greater than 0")
}

if c.MaxUncommittedEntries < 0 {
return errors.New("max uncommitted entries cannot be less than 0")
}

if c.Logger == nil {
c.Logger = raftLogger
}
Expand All @@ -241,11 +249,12 @@ type raft struct {
// the log
raftLog *raftLog

maxInflight int
maxMsgSize uint64
prs map[uint64]*Progress
learnerPrs map[uint64]*Progress
matchBuf uint64Slice
maxMsgSize uint64
maxInflight int
maxUncommitted int
prs map[uint64]*Progress
learnerPrs map[uint64]*Progress
matchBuf uint64Slice

state StateType

Expand Down Expand Up @@ -326,6 +335,7 @@ func newRaft(c *Config) *raft {
raftLog: raftlog,
maxMsgSize: c.MaxSizePerMsg,
maxInflight: c.MaxInflightMsgs,
maxUncommitted: c.MaxUncommittedEntries,
prs: make(map[uint64]*Progress),
learnerPrs: make(map[uint64]*Progress),
electionTimeout: c.ElectionTick,
Expand Down Expand Up @@ -954,6 +964,17 @@ func stepLeader(r *raft, m pb.Message) error {
r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
return ErrProposalDropped
}
if r.maxUncommitted > 0 {
// We're limiting the size of the uncommitted tail of the Raft log.
// If the uncommitted tail is equal to or above the limit, begin
// dropping proposals. If we're below the limit but the proposal
// would push us above it because it has multiple entries, we still
// allow it. This avoids cases where large proposals are starved by
// small proposals.
if int(r.raftLog.lastIndex()-r.raftLog.committed) >= r.maxUncommitted {
return ErrProposalDropped
}
}

for i, e := range m.Entries {
if e.Type == pb.EntryConfChange {
Expand Down
62 changes: 60 additions & 2 deletions raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,64 @@ func TestProgressFlowControl(t *testing.T) {
}
}

func TestUncommittedEntryLimit(t *testing.T) {
cfg := newTestConfig(1, []uint64{1, 2, 3}, 5, 1, NewMemoryStorage())
cfg.MaxUncommittedEntries = 5
r := newRaft(cfg)
r.becomeCandidate()
r.becomeLeader()

// Set the two followers to the replicate state. Commit to tail of log.
const followers = 2
r.prs[2].becomeReplicate()
r.prs[3].becomeReplicate()
r.raftLog.commitTo(r.raftLog.lastIndex())

// Send proposals to r1. The first 5 entries should be appended to the log.
blob := []byte(strings.Repeat("a", 10))
propMsg := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: blob}}}
for i := 0; i < cfg.MaxUncommittedEntries; i++ {
if err := r.Step(propMsg); err != nil {
t.Fatalf("proposal resulted in error: %v", err)
}
}

// Send one more proposal to r1. It should be rejected.
if err := r.Step(propMsg); err != ErrProposalDropped {
t.Fatalf("proposal not dropped: %v", err)
}

// Read messages and commit to last index.
ms := r.readMessages()
if e := cfg.MaxUncommittedEntries * followers; len(ms) != e {
t.Fatalf("expected %d messages, got %d", e, len(ms))
}
r.raftLog.commitTo(r.raftLog.lastIndex())

// Send a single large proposal to r1. Should be accepted even though it
// pushes us above the limit because we were beneath it before the proposal.
propEnts := make([]pb.Entry, 2*cfg.MaxUncommittedEntries)
for i := range propEnts {
propEnts[i] = pb.Entry{Data: blob}
}
propMsgLarge := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: propEnts}
if err := r.Step(propMsgLarge); err != nil {
t.Fatalf("proposal resulted in error: %v", err)
}

// Send one more proposal to r1. It should be rejected, again.
if err := r.Step(propMsg); err != ErrProposalDropped {
t.Fatalf("proposal not dropped: %v", err)
}

// Read messages and commit to last index.
ms = r.readMessages()
if e := 1 * followers; len(ms) != e {
t.Fatalf("expected %d messages, got %d", e, len(ms))
}
r.raftLog.commitTo(r.raftLog.lastIndex())
}

func TestLeaderElection(t *testing.T) {
testLeaderElection(t, false)
}
Expand Down Expand Up @@ -4138,8 +4196,8 @@ func (nw *network) drop(from, to uint64, perc float64) {
}

func (nw *network) cut(one, other uint64) {
nw.drop(one, other, 2.0) // always drop
nw.drop(other, one, 2.0) // always drop
nw.drop(one, other, 1.0) // always drop
nw.drop(other, one, 1.0) // always drop
}

func (nw *network) isolate(id uint64) {
Expand Down
26 changes: 14 additions & 12 deletions raft/rafttest/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,13 @@ type node struct {
func startNode(id uint64, peers []raft.Peer, iface iface) *node {
st := raft.NewMemoryStorage()
c := &raft.Config{
ID: id,
ElectionTick: 10,
HeartbeatTick: 1,
Storage: st,
MaxSizePerMsg: 1024 * 1024,
MaxInflightMsgs: 256,
ID: id,
ElectionTick: 10,
HeartbeatTick: 1,
Storage: st,
MaxSizePerMsg: 1024 * 1024,
MaxInflightMsgs: 256,
MaxUncommittedEntries: 1024,
}
rn := raft.StartNode(c, peers)
n := &node{
Expand Down Expand Up @@ -125,12 +126,13 @@ func (n *node) restart() {
// wait for the shutdown
<-n.stopc
c := &raft.Config{
ID: n.id,
ElectionTick: 10,
HeartbeatTick: 1,
Storage: n.storage,
MaxSizePerMsg: 1024 * 1024,
MaxInflightMsgs: 256,
ID: n.id,
ElectionTick: 10,
HeartbeatTick: 1,
Storage: n.storage,
MaxSizePerMsg: 1024 * 1024,
MaxInflightMsgs: 256,
MaxUncommittedEntries: 1024,
}
n.Node = raft.RestartNode(c)
n.start()
Expand Down

0 comments on commit 58b7dc6

Please sign in to comment.