diff --git a/raft/raft.go b/raft/raft.go index 1b363a426da..72ad4c896f4 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -663,6 +663,13 @@ func stepFollower(r *raft, m pb.Message) { } func (r *raft) handleAppendEntries(m pb.Message) { + // Ignore the out-of-date append message + // It is safe since the leader has sent the commit index to this raft, which is min(leader.Commit, pr[raft.id].Match). + // Assume leader updated pr[raft.id] to commit index at t0. All message after t0 should have a m.Index that is greater + // than commit index. If m.Index is smaller than r.Commit, then m is sent before t0. + if m.Index < r.Commit { + return + } if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok { r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) } else { diff --git a/raft/raft_paper_test.go b/raft/raft_paper_test.go index 1a29b57ff9c..431c13ef6ab 100644 --- a/raft/raft_paper_test.go +++ b/raft/raft_paper_test.go @@ -605,12 +605,13 @@ func TestFollowerCheckMsgApp(t *testing.T) { index uint64 wreject bool wrejectHint uint64 + wignored bool }{ - {ents[0].Term, ents[0].Index, false, 0}, - {ents[0].Term, ents[0].Index + 1, true, 2}, - {ents[0].Term + 1, ents[0].Index, true, 2}, - {ents[1].Term, ents[1].Index, false, 0}, - {3, 3, true, 2}, + {ents[0].Term, ents[0].Index, false, 0, true}, // out-of-date message should be ignored + {ents[0].Term, ents[0].Index + 1, true, 2, false}, + {ents[1].Term + 1, ents[1].Index, true, 2, false}, + {ents[1].Term, ents[1].Index, false, 0, false}, + {3, 3, true, 2, false}, } for i, tt := range tests { storage := NewMemoryStorage() @@ -622,6 +623,13 @@ func TestFollowerCheckMsgApp(t *testing.T) { r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, LogTerm: tt.term, Index: tt.index}) msgs := r.readMessages() + if tt.wignored { + if len(msgs) != 0 { + t.Errorf("#%d: len(msgs) = %d, want ignore", i, len(msgs)) + } + continue + } + wmsgs := []pb.Message{ {From: 1, To: 2, Type: pb.MsgAppResp, Term: 2, Index: tt.index, Reject: tt.wreject, RejectHint: tt.wrejectHint}, } diff --git a/raft/raft_test.go b/raft/raft_test.go index 79d90d13057..cf5c563da99 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -779,6 +779,29 @@ func TestHandleMsgApp(t *testing.T) { } } +// TestHandleMsgAppIgnore ensures: If message.Index < commitIndex, ignore the message. +func TestHandleMsgAppIgnore(t *testing.T) { + tests := []pb.Message{ + {Type: pb.MsgApp, Term: 2, LogTerm: 3, Index: 0, Commit: 1}, + {Type: pb.MsgApp, Term: 2, LogTerm: 3, Index: 1, Commit: 1}, + } + + for i, tt := range tests { + storage := NewMemoryStorage() + storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}}) + sm := newRaft(1, []uint64{1}, 10, 1, storage, 0) + sm.raftLog.commitTo(2) + sm.Commit = 2 + sm.becomeFollower(2, None) + + sm.handleAppendEntries(tt) + m := sm.readMessages() + if len(m) != 0 { + t.Errorf("#%d: msg = %d, want ignore", i, len(m)) + } + } +} + // TestHandleHeartbeat ensures that the follower commits to the commit in the message. func TestHandleHeartbeat(t *testing.T) { commit := uint64(2)