diff --git a/raft/node_test.go b/raft/node_test.go index a729068bfc2..e977da6d6e1 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -1006,7 +1006,7 @@ func TestNodeBoundedLogGrowthWithPartition(t *testing.T) { const maxEntries = 16 data := []byte("testdata") testEntry := raftpb.Entry{Data: data} - maxEntrySize := uint64(maxEntries * testEntry.Size()) + maxEntrySize := uint64(maxEntries * PayloadSize(testEntry)) s := NewMemoryStorage() cfg := newTestConfig(1, []uint64{1}, 10, 1, s) diff --git a/raft/raft.go b/raft/raft.go index bf0a8983c46..b76b9a942d8 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -635,17 +635,27 @@ func (r *raft) reset(term uint64) { r.readOnly = newReadOnly(r.readOnly.option) } -func (r *raft) appendEntry(es ...pb.Entry) { +func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) { li := r.raftLog.lastIndex() for i := range es { es[i].Term = r.Term es[i].Index = li + 1 + uint64(i) } + // Track the size of this uncommitted proposal. + if !r.increaseUncommittedSize(es) { + r.logger.Debugf( + "%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal", + r.id, + ) + // Drop the proposal. + return false + } // use latest "last" index after truncate/append li = r.raftLog.append(es...) r.getProgress(r.id).maybeUpdate(li) // Regardless of maybeCommit's return, our caller will call bcastAppend. r.maybeCommit() + return true } // tickElection is run by followers and candidates after r.electionTimeout. @@ -739,7 +749,16 @@ func (r *raft) becomeLeader() { // could be expensive. r.pendingConfIndex = r.raftLog.lastIndex() - r.appendEntry(pb.Entry{Data: nil}) + emptyEnt := pb.Entry{Data: nil} + if !r.appendEntry(emptyEnt) { + // This won't happen because we just called reset() above. + r.logger.Panic("empty entry was dropped") + } + // As a special case, don't count the initial empty entry towards the + // uncommitted log quota. This is because we want to preserve the + // behavior of allowing one entry larger than quota if the current + // usage is zero. + r.reduceUncommittedSize([]pb.Entry{emptyEnt}) r.logger.Infof("%x became leader at term %d", r.id, r.Term) } @@ -970,10 +989,6 @@ func stepLeader(r *raft, m pb.Message) error { r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee) return ErrProposalDropped } - if !r.increaseUncommittedSize(m.Entries) { - r.logger.Debugf("%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal", r.id) - return ErrProposalDropped - } for i, e := range m.Entries { if e.Type == pb.EntryConfChange { @@ -986,7 +1001,10 @@ func stepLeader(r *raft, m pb.Message) error { } } } - r.appendEntry(m.Entries...) + + if !r.appendEntry(m.Entries...) { + return ErrProposalDropped + } r.bcastAppend() return nil case pb.MsgReadIndex: @@ -1490,7 +1508,7 @@ func (r *raft) abortLeaderTransfer() { func (r *raft) increaseUncommittedSize(ents []pb.Entry) bool { var s uint64 for _, e := range ents { - s += uint64(e.Size()) + s += uint64(PayloadSize(e)) } if r.uncommittedSize > 0 && r.uncommittedSize+s > r.maxUncommittedSize { @@ -1513,7 +1531,7 @@ func (r *raft) reduceUncommittedSize(ents []pb.Entry) { var s uint64 for _, e := range ents { - s += uint64(e.Size()) + s += uint64(PayloadSize(e)) } if s > r.uncommittedSize { // uncommittedSize may underestimate the size of the uncommitted Raft diff --git a/raft/raft_paper_test.go b/raft/raft_paper_test.go index 313f9e6e518..288cc0060c0 100644 --- a/raft/raft_paper_test.go +++ b/raft/raft_paper_test.go @@ -114,7 +114,7 @@ func TestLeaderBcastBeat(t *testing.T) { r.becomeCandidate() r.becomeLeader() for i := 0; i < 10; i++ { - r.appendEntry(pb.Entry{Index: uint64(i) + 1}) + mustAppendEntry(r, pb.Entry{Index: uint64(i) + 1}) } for i := 0; i < hi; i++ { diff --git a/raft/raft_test.go b/raft/raft_test.go index cac4bb6c2ca..f6f3e2a5bdd 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -37,6 +37,12 @@ func nextEnts(r *raft, s *MemoryStorage) (ents []pb.Entry) { return ents } +func mustAppendEntry(r *raft, ents ...pb.Entry) { + if !r.appendEntry(ents...) { + panic("entry unexpectedly dropped") + } +} + type stateMachine interface { Step(m pb.Message) error readMessages() []pb.Message @@ -363,15 +369,24 @@ func TestProgressFlowControl(t *testing.T) { } func TestUncommittedEntryLimit(t *testing.T) { - const maxEntries = 16 + // Use a relatively large number of entries here to prevent regression of a + // bug which computed the size before it was fixed. This test would fail + // with the bug, either because we'd get dropped proposals earlier than we + // expect them, or because the final tally ends up nonzero. (At the time of + // writing, the former). + const maxEntries = 1024 testEntry := pb.Entry{Data: []byte("testdata")} - maxEntrySize := maxEntries * testEntry.Size() + maxEntrySize := maxEntries * PayloadSize(testEntry) cfg := newTestConfig(1, []uint64{1, 2, 3}, 5, 1, NewMemoryStorage()) cfg.MaxUncommittedEntriesSize = uint64(maxEntrySize) + cfg.MaxInflightMsgs = 2 * 1024 // avoid interference r := newRaft(cfg) r.becomeCandidate() r.becomeLeader() + if n := r.uncommittedSize; n != 0 { + t.Fatalf("expected zero uncommitted size, got %d bytes", n) + } // Set the two followers to the replicate state. Commit to tail of log. const numFollowers = 2 @@ -401,6 +416,9 @@ func TestUncommittedEntryLimit(t *testing.T) { t.Fatalf("expected %d messages, got %d", e, len(ms)) } r.reduceUncommittedSize(propEnts) + if r.uncommittedSize != 0 { + t.Fatalf("committed everything, but still tracking %d", r.uncommittedSize) + } // Send a single large proposal to r1. Should be accepted even though it // pushes us above the limit because we were beneath it before the proposal. @@ -425,6 +443,9 @@ func TestUncommittedEntryLimit(t *testing.T) { t.Fatalf("expected %d messages, got %d", e, len(ms)) } r.reduceUncommittedSize(propEnts) + if n := r.uncommittedSize; n != 0 { + t.Fatalf("expected zero uncommitted size, got %d", n) + } } func TestLeaderElection(t *testing.T) { @@ -2585,7 +2606,7 @@ func TestBcastBeat(t *testing.T) { sm.becomeCandidate() sm.becomeLeader() for i := 0; i < 10; i++ { - sm.appendEntry(pb.Entry{Index: uint64(i) + 1}) + mustAppendEntry(sm, pb.Entry{Index: uint64(i) + 1}) } // slow follower sm.prs[2].Match, sm.prs[2].Next = 5, 6 @@ -2709,7 +2730,7 @@ func TestSendAppendForProgressProbe(t *testing.T) { // we expect that raft will only send out one msgAPP on the first // loop. After that, the follower is paused until a heartbeat response is // received. - r.appendEntry(pb.Entry{Data: []byte("somedata")}) + mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) r.sendAppend(2) msg := r.readMessages() if len(msg) != 1 { @@ -2724,7 +2745,7 @@ func TestSendAppendForProgressProbe(t *testing.T) { t.Errorf("paused = %v, want true", r.prs[2].Paused) } for j := 0; j < 10; j++ { - r.appendEntry(pb.Entry{Data: []byte("somedata")}) + mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) r.sendAppend(2) if l := len(r.readMessages()); l != 0 { t.Errorf("len(msg) = %d, want %d", l, 0) @@ -2771,7 +2792,7 @@ func TestSendAppendForProgressReplicate(t *testing.T) { r.prs[2].becomeReplicate() for i := 0; i < 10; i++ { - r.appendEntry(pb.Entry{Data: []byte("somedata")}) + mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) r.sendAppend(2) msgs := r.readMessages() if len(msgs) != 1 { @@ -2788,7 +2809,7 @@ func TestSendAppendForProgressSnapshot(t *testing.T) { r.prs[2].becomeSnapshot(10) for i := 0; i < 10; i++ { - r.appendEntry(pb.Entry{Data: []byte("somedata")}) + mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) r.sendAppend(2) msgs := r.readMessages() if len(msgs) != 0 { @@ -3182,7 +3203,7 @@ func TestNewLeaderPendingConfig(t *testing.T) { for i, tt := range tests { r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) if tt.addEntry { - r.appendEntry(pb.Entry{Type: pb.EntryNormal}) + mustAppendEntry(r, pb.Entry{Type: pb.EntryNormal}) } r.becomeCandidate() r.becomeLeader() diff --git a/raft/rawnode_test.go b/raft/rawnode_test.go index 3e56733aa42..6348bb7e324 100644 --- a/raft/rawnode_test.go +++ b/raft/rawnode_test.go @@ -493,7 +493,7 @@ func TestRawNodeBoundedLogGrowthWithPartition(t *testing.T) { const maxEntries = 16 data := []byte("testdata") testEntry := raftpb.Entry{Data: data} - maxEntrySize := uint64(maxEntries * testEntry.Size()) + maxEntrySize := uint64(maxEntries * PayloadSize(testEntry)) s := NewMemoryStorage() cfg := newTestConfig(1, []uint64{1}, 10, 1, s) diff --git a/raft/util.go b/raft/util.go index 1a7a1e9ac3a..79eaa0c626f 100644 --- a/raft/util.go +++ b/raft/util.go @@ -101,6 +101,12 @@ func DescribeMessage(m pb.Message, f EntryFormatter) string { return buf.String() } +// PayloadSize is the size of the payload of this Entry. Notably, it does not +// depend on its Index or Term. +func PayloadSize(e pb.Entry) int { + return len(e.Data) +} + // DescribeEntry returns a concise human-readable description of an // Entry for debugging. func DescribeEntry(e pb.Entry, f EntryFormatter) string {