diff --git a/contrib/raftexample/raft.go b/contrib/raftexample/raft.go index 9a270eac7190..a76fb9d7454d 100644 --- a/contrib/raftexample/raft.go +++ b/contrib/raftexample/raft.go @@ -274,12 +274,13 @@ func (rc *raftNode) startRaft() { rpeers[i] = raft.Peer{ID: uint64(i + 1)} } c := &raft.Config{ - ID: uint64(rc.id), - ElectionTick: 10, - HeartbeatTick: 1, - Storage: rc.raftStorage, - MaxSizePerMsg: 1024 * 1024, - MaxInflightMsgs: 256, + ID: uint64(rc.id), + ElectionTick: 10, + HeartbeatTick: 1, + Storage: rc.raftStorage, + MaxSizePerMsg: 1024 * 1024, + MaxInflightMsgs: 256, + MaxUncommittedEntriesSize: 1 << 30, } if oldwal { diff --git a/raft/node.go b/raft/node.go index 9e7c209ce66a..f67628fd361d 100644 --- a/raft/node.go +++ b/raft/node.go @@ -401,6 +401,7 @@ func (n *node) run(r *raft) { r.msgs = nil r.readStates = nil + r.reduceUncommittedSize(rd.CommittedEntries) advancec = n.advancec case <-advancec: if applyingToI != 0 { diff --git a/raft/node_test.go b/raft/node_test.go index b067aaad4d12..a729068bfc21 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -997,3 +997,57 @@ func TestNodeCommitPaginationAfterRestart(t *testing.T) { ) } } + +// TestNodeBoundedLogGrowthWithPartition tests a scenario where a leader is +// partitioned from a quorum of nodes. It verifies that the leader's log is +// protected from unbounded growth even as new entries continue to be proposed. +// This protection is provided by the MaxUncommittedEntriesSize configuration. +func TestNodeBoundedLogGrowthWithPartition(t *testing.T) { + const maxEntries = 16 + data := []byte("testdata") + testEntry := raftpb.Entry{Data: data} + maxEntrySize := uint64(maxEntries * testEntry.Size()) + + s := NewMemoryStorage() + cfg := newTestConfig(1, []uint64{1}, 10, 1, s) + cfg.MaxUncommittedEntriesSize = maxEntrySize + r := newRaft(cfg) + n := newNode() + go n.run(r) + defer n.Stop() + n.Campaign(context.TODO()) + + rd := readyWithTimeout(&n) + if len(rd.CommittedEntries) != 1 { + t.Fatalf("expected 1 (empty) entry, got %d", len(rd.CommittedEntries)) + } + s.Append(rd.Entries) + n.Advance() + + // Simulate a network partition while we make our proposals by never + // committing anything. These proposals should not cause the leader's + // log to grow indefinitely. + for i := 0; i < 1024; i++ { + n.Propose(context.TODO(), data) + } + + // Check the size of leader's uncommitted log tail. It should not exceed the + // MaxUncommittedEntriesSize limit. + checkUncommitted := func(exp uint64) { + t.Helper() + if a := r.uncommittedSize; exp != a { + t.Fatalf("expected %d uncommitted entry bytes, found %d", exp, a) + } + } + checkUncommitted(maxEntrySize) + + // Recover from the partition. The uncommitted tail of the Raft log should + // disappear as entries are committed. + rd = readyWithTimeout(&n) + if len(rd.CommittedEntries) != maxEntries { + t.Fatalf("expected %d entries, got %d", maxEntries, len(rd.CommittedEntries)) + } + s.Append(rd.Entries) + n.Advance() + checkUncommitted(0) +} diff --git a/raft/raft.go b/raft/raft.go index 81bad3bec73b..7fc4335630d4 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -154,6 +154,11 @@ type Config struct { // replication. Note: math.MaxUint64 for unlimited, 0 for at most one entry per // message. MaxSizePerMsg uint64 + // MaxUncommittedEntriesSize limits the aggregate size of the uncommitted + // entries that may be appended to a leader's log. Once this limit is + // exceeded, proposals will begin to return ErrProposalDropped errors. + // Note: 0 for no limit. + MaxUncommittedEntriesSize uint64 // MaxInflightMsgs limits the max number of in-flight append messages during // optimistic replication phase. The application transportation layer usually // has its own sending buffer over TCP/UDP. Setting MaxInflightMsgs to avoid @@ -215,6 +220,12 @@ func (c *Config) validate() error { return errors.New("storage cannot be nil") } + if c.MaxUncommittedEntriesSize < 0 { + return errors.New("max uncommitted entry size cannot be less than 0") + } else if c.MaxUncommittedEntriesSize == 0 { + c.MaxUncommittedEntriesSize = noLimit + } + if c.MaxInflightMsgs <= 0 { return errors.New("max inflight messages must be greater than 0") } @@ -241,11 +252,12 @@ type raft struct { // the log raftLog *raftLog - maxInflight int - maxMsgSize uint64 - prs map[uint64]*Progress - learnerPrs map[uint64]*Progress - matchBuf uint64Slice + maxMsgSize uint64 + maxUncommittedSize uint64 + maxInflight int + prs map[uint64]*Progress + learnerPrs map[uint64]*Progress + matchBuf uint64Slice state StateType @@ -268,6 +280,10 @@ type raft struct { // be proposed if the leader's applied index is greater than this // value. pendingConfIndex uint64 + // an estimate of the size of the uncommitted tail of the Raft log. Used to + // prevent unbounded log growth. Only maintained by the leader. Reset on + // term changes. + uncommittedSize uint64 readOnly *readOnly @@ -326,6 +342,7 @@ func newRaft(c *Config) *raft { raftLog: raftlog, maxMsgSize: c.MaxSizePerMsg, maxInflight: c.MaxInflightMsgs, + maxUncommittedSize: c.MaxUncommittedEntriesSize, prs: make(map[uint64]*Progress), learnerPrs: make(map[uint64]*Progress), electionTimeout: c.ElectionTick, @@ -616,6 +633,7 @@ func (r *raft) reset(term uint64) { }) r.pendingConfIndex = 0 + r.uncommittedSize = 0 r.readOnly = newReadOnly(r.readOnly.option) } @@ -954,6 +972,10 @@ func stepLeader(r *raft, m pb.Message) error { r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee) return ErrProposalDropped } + if !r.increaseUncommittedSize(m.Entries) { + r.logger.Debugf("%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal", r.id) + return ErrProposalDropped + } for i, e := range m.Entries { if e.Type == pb.EntryConfChange { @@ -1462,6 +1484,49 @@ func (r *raft) abortLeaderTransfer() { r.leadTransferee = None } +// increaseUncommittedSize computes the size of the proposed entries and +// determines whether they would push leader over its maxUncommittedSize limit. +// If the new entries would exceed the limit, the method returns false. If not, +// the increase in uncommitted entry size is recorded and the method returns +// true. +func (r *raft) increaseUncommittedSize(ents []pb.Entry) bool { + var s uint64 + for _, e := range ents { + s += uint64(e.Size()) + } + + if r.uncommittedSize > 0 && r.uncommittedSize+s > r.maxUncommittedSize { + // If the uncommitted tail of the Raft log is empty, allow any size + // proposal. Otherwise, limit the size of the uncommitted tail of the + // log and drop any proposal that would push the size over the limit. + return false + } + r.uncommittedSize += s + return true +} + +// reduceUncommittedSize accounts for the newly committed entries by decreasing +// the uncommitted entry size limit. +func (r *raft) reduceUncommittedSize(ents []pb.Entry) { + if r.uncommittedSize == 0 { + // Fast-path for followers, who do not track or enforce the limit. + return + } + + var s uint64 + for _, e := range ents { + s += uint64(e.Size()) + } + if s > r.uncommittedSize { + // uncommittedSize may underestimate the size of the uncommitted Raft + // log tail but will never overestimate it. Saturate at 0 instead of + // allowing overflow. + r.uncommittedSize = 0 + } else { + r.uncommittedSize -= s + } +} + func numOfPendingConf(ents []pb.Entry) int { n := 0 for i := range ents { diff --git a/raft/raft_test.go b/raft/raft_test.go index 8619692f302d..cac4bb6c2caa 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -362,6 +362,71 @@ func TestProgressFlowControl(t *testing.T) { } } +func TestUncommittedEntryLimit(t *testing.T) { + const maxEntries = 16 + testEntry := pb.Entry{Data: []byte("testdata")} + maxEntrySize := maxEntries * testEntry.Size() + + cfg := newTestConfig(1, []uint64{1, 2, 3}, 5, 1, NewMemoryStorage()) + cfg.MaxUncommittedEntriesSize = uint64(maxEntrySize) + r := newRaft(cfg) + r.becomeCandidate() + r.becomeLeader() + + // Set the two followers to the replicate state. Commit to tail of log. + const numFollowers = 2 + r.prs[2].becomeReplicate() + r.prs[3].becomeReplicate() + r.uncommittedSize = 0 + + // Send proposals to r1. The first 5 entries should be appended to the log. + propMsg := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{testEntry}} + propEnts := make([]pb.Entry, maxEntries) + for i := 0; i < maxEntries; i++ { + if err := r.Step(propMsg); err != nil { + t.Fatalf("proposal resulted in error: %v", err) + } + propEnts[i] = testEntry + } + + // Send one more proposal to r1. It should be rejected. + if err := r.Step(propMsg); err != ErrProposalDropped { + t.Fatalf("proposal not dropped: %v", err) + } + + // Read messages and reduce the uncommitted size as if we had committed + // these entries. + ms := r.readMessages() + if e := maxEntries * numFollowers; len(ms) != e { + t.Fatalf("expected %d messages, got %d", e, len(ms)) + } + r.reduceUncommittedSize(propEnts) + + // Send a single large proposal to r1. Should be accepted even though it + // pushes us above the limit because we were beneath it before the proposal. + propEnts = make([]pb.Entry, 2*maxEntries) + for i := range propEnts { + propEnts[i] = testEntry + } + propMsgLarge := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: propEnts} + if err := r.Step(propMsgLarge); err != nil { + t.Fatalf("proposal resulted in error: %v", err) + } + + // Send one more proposal to r1. It should be rejected, again. + if err := r.Step(propMsg); err != ErrProposalDropped { + t.Fatalf("proposal not dropped: %v", err) + } + + // Read messages and reduce the uncommitted size as if we had committed + // these entries. + ms = r.readMessages() + if e := 1 * numFollowers; len(ms) != e { + t.Fatalf("expected %d messages, got %d", e, len(ms)) + } + r.reduceUncommittedSize(propEnts) +} + func TestLeaderElection(t *testing.T) { testLeaderElection(t, false) } diff --git a/raft/rafttest/node.go b/raft/rafttest/node.go index 57ff9b262bc4..add21da0b560 100644 --- a/raft/rafttest/node.go +++ b/raft/rafttest/node.go @@ -41,12 +41,13 @@ type node struct { func startNode(id uint64, peers []raft.Peer, iface iface) *node { st := raft.NewMemoryStorage() c := &raft.Config{ - ID: id, - ElectionTick: 10, - HeartbeatTick: 1, - Storage: st, - MaxSizePerMsg: 1024 * 1024, - MaxInflightMsgs: 256, + ID: id, + ElectionTick: 10, + HeartbeatTick: 1, + Storage: st, + MaxSizePerMsg: 1024 * 1024, + MaxInflightMsgs: 256, + MaxUncommittedEntriesSize: 1 << 30, } rn := raft.StartNode(c, peers) n := &node{ @@ -125,12 +126,13 @@ func (n *node) restart() { // wait for the shutdown <-n.stopc c := &raft.Config{ - ID: n.id, - ElectionTick: 10, - HeartbeatTick: 1, - Storage: n.storage, - MaxSizePerMsg: 1024 * 1024, - MaxInflightMsgs: 256, + ID: n.id, + ElectionTick: 10, + HeartbeatTick: 1, + Storage: n.storage, + MaxSizePerMsg: 1024 * 1024, + MaxInflightMsgs: 256, + MaxUncommittedEntriesSize: 1 << 30, } n.Node = raft.RestartNode(c) n.start() diff --git a/raft/rawnode.go b/raft/rawnode.go index 5f8a116dd63b..4a4ec2e94635 100644 --- a/raft/rawnode.go +++ b/raft/rawnode.go @@ -198,6 +198,7 @@ func (rn *RawNode) Step(m pb.Message) error { func (rn *RawNode) Ready() Ready { rd := rn.newReady() rn.raft.msgs = nil + rn.raft.reduceUncommittedSize(rd.CommittedEntries) return rd } diff --git a/raft/rawnode_test.go b/raft/rawnode_test.go index f941cbf7b60b..3e56733aa425 100644 --- a/raft/rawnode_test.go +++ b/raft/rawnode_test.go @@ -484,3 +484,64 @@ func TestRawNodeCommitPaginationAfterRestart(t *testing.T) { }) } } + +// TestRawNodeBoundedLogGrowthWithPartition tests a scenario where a leader is +// partitioned from a quorum of nodes. It verifies that the leader's log is +// protected from unbounded growth even as new entries continue to be proposed. +// This protection is provided by the MaxUncommittedEntriesSize configuration. +func TestRawNodeBoundedLogGrowthWithPartition(t *testing.T) { + const maxEntries = 16 + data := []byte("testdata") + testEntry := raftpb.Entry{Data: data} + maxEntrySize := uint64(maxEntries * testEntry.Size()) + + s := NewMemoryStorage() + cfg := newTestConfig(1, []uint64{1}, 10, 1, s) + cfg.MaxUncommittedEntriesSize = maxEntrySize + rawNode, err := NewRawNode(cfg, []Peer{{ID: 1}}) + if err != nil { + t.Fatal(err) + } + rd := rawNode.Ready() + s.Append(rd.Entries) + rawNode.Advance(rd) + + // Become the leader. + rawNode.Campaign() + for { + rd = rawNode.Ready() + s.Append(rd.Entries) + if rd.SoftState.Lead == rawNode.raft.id { + rawNode.Advance(rd) + break + } + rawNode.Advance(rd) + } + + // Simulate a network partition while we make our proposals by never + // committing anything. These proposals should not cause the leader's + // log to grow indefinitely. + for i := 0; i < 1024; i++ { + rawNode.Propose(data) + } + + // Check the size of leader's uncommitted log tail. It should not exceed the + // MaxUncommittedEntriesSize limit. + checkUncommitted := func(exp uint64) { + t.Helper() + if a := rawNode.raft.uncommittedSize; exp != a { + t.Fatalf("expected %d uncommitted entry bytes, found %d", exp, a) + } + } + checkUncommitted(maxEntrySize) + + // Recover from the partition. The uncommitted tail of the Raft log should + // disappear as entries are committed. + rd = rawNode.Ready() + if len(rd.CommittedEntries) != maxEntries { + t.Fatalf("expected %d entries, got %d", maxEntries, len(rd.CommittedEntries)) + } + s.Append(rd.Entries) + rawNode.Advance(rd) + checkUncommitted(0) +}