diff --git a/raft/node.go b/raft/node.go index 745553c4fbc4..4a3b2f1dfd3e 100644 --- a/raft/node.go +++ b/raft/node.go @@ -208,7 +208,19 @@ func StartNode(c *Config, peers []Peer) Node { if err != nil { panic("unexpected marshal error") } - e := pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: r.raftLog.lastIndex() + 1, Data: d} + // TODO(tbg): this should append the ConfChange for the own node first + // and also call applyConfChange below for that node first. Otherwise + // we have a Raft group (for a little while) that doesn't have itself + // in its config, which is bad. + // This whole way of setting things up is rickety. The app should just + // populate the initial ConfState appropriately and then all of this + // goes away. + e := pb.Entry{ + Type: pb.EntryConfChange, + Term: 1, + Index: r.raftLog.lastIndex() + 1, + Data: d, + } r.raftLog.append(e) } // Mark these initial entries as committed. @@ -225,7 +237,7 @@ func StartNode(c *Config, peers []Peer) Node { // We do not set raftLog.applied so the application will be able // to observe all conf changes via Ready.CommittedEntries. for _, peer := range peers { - r.addNode(peer.ID) + r.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode}) } n := newNode() @@ -357,35 +369,16 @@ func (n *node) run(r *raft) { r.Step(m) } case cc := <-n.confc: - if cc.NodeID == None { - select { - case n.confstatec <- pb.ConfState{ - Nodes: r.prs.VoterNodes(), - Learners: r.prs.LearnerNodes()}: - case <-n.done: - } - break - } - switch cc.Type { - case pb.ConfChangeAddNode: - r.addNode(cc.NodeID) - case pb.ConfChangeAddLearnerNode: - r.addLearner(cc.NodeID) - case pb.ConfChangeRemoveNode: + cs := r.applyConfChange(cc) + if _, ok := r.prs.Progress[r.id]; !ok { // block incoming proposal when local node is // removed if cc.NodeID == r.id { propc = nil } - r.removeNode(cc.NodeID) - case pb.ConfChangeUpdateNode: - default: - panic("unexpected conf type") } select { - case n.confstatec <- pb.ConfState{ - Nodes: r.prs.VoterNodes(), - Learners: r.prs.LearnerNodes()}: + case n.confstatec <- cs: case <-n.done: } case <-n.tickc: diff --git a/raft/raft.go b/raft/raft.go index 3cdc1f0ac6c2..89de7fb7df92 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -1327,6 +1327,40 @@ func (r *raft) restore(s pb.Snapshot) bool { if s.Metadata.Index <= r.raftLog.committed { return false } + if r.state != StateFollower { + // This is defense-in-depth: if the leader somehow ended up applying a + // snapshot, it could move into a new term without moving into a + // follower state. This should never fire, but if it did, we'd have + // prevented damage by returning early, so log only a loud warning. + r.logger.Warningf("%x attempted to restore snapshot as leader; should never happen", r.id) + return false + } + + // More defense-in-depth: throw away snapshot if recipient is not in the + // config. + found := false + cs := s.Metadata.ConfState + for _, set := range [][]uint64{ + cs.Nodes, + cs.Learners, + } { + for _, id := range set { + if id == r.id { + found = true + break + } + } + } + if !found { + r.logger.Warningf( + "%x attempted to restore snapshot but it is not in the ConfState %v; should never happen", + r.id, cs, + ) + return false + } + + // Now go ahead and actually restore. + if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) { r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]", r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term) @@ -1344,26 +1378,23 @@ func (r *raft) restore(s pb.Snapshot) bool { } } - r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] starts to restore snapshot [index: %d, term: %d]", - r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term) - r.raftLog.restore(s) - r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight) - r.restoreNode(s.Metadata.ConfState.Nodes, false) - r.restoreNode(s.Metadata.ConfState.Learners, true) - return true -} -func (r *raft) restoreNode(nodes []uint64, isLearner bool) { - for _, n := range nodes { - match, next := uint64(0), r.raftLog.lastIndex()+1 - if n == r.id { - match = next - 1 - r.isLearner = isLearner - } - r.prs.InitProgress(n, match, next, isLearner) - r.logger.Infof("%x restored progress of %x [%s]", r.id, n, r.prs.Progress[n]) + // Reset the configuration and add the (potentially updated) peers in anew. + r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight) + for _, id := range s.Metadata.ConfState.Nodes { + r.applyConfChange(pb.ConfChange{NodeID: id, Type: pb.ConfChangeAddNode}) + } + for _, id := range s.Metadata.ConfState.Learners { + r.applyConfChange(pb.ConfChange{NodeID: id, Type: pb.ConfChangeAddLearnerNode}) } + + pr := r.prs.Progress[r.id] + pr.MaybeUpdate(pr.Next - 1) // TODO(tbg): this is untested and likely unneeded + + r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] restored snapshot [index: %d, term: %d]", + r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term) + return true } // promotable indicates whether state machine can be promoted to leader, @@ -1373,68 +1404,97 @@ func (r *raft) promotable() bool { return pr != nil && !pr.IsLearner } -func (r *raft) addNode(id uint64) { - r.addNodeOrLearnerNode(id, false) -} +func (r *raft) applyConfChange(cc pb.ConfChange) pb.ConfState { + addNodeOrLearnerNode := func(id uint64, isLearner bool) { + // NB: this method is intentionally hidden from view. All mutations of + // the conf state must call applyConfChange directly. + pr := r.prs.Progress[id] + if pr == nil { + r.prs.InitProgress(id, 0, r.raftLog.lastIndex()+1, isLearner) + } else { + if isLearner && !pr.IsLearner { + // Can only change Learner to Voter. + // + // TODO(tbg): why? + r.logger.Infof("%x ignored addLearner: do not support changing %x from raft peer to learner.", r.id, id) + return + } -func (r *raft) addLearner(id uint64) { - r.addNodeOrLearnerNode(id, true) -} + if isLearner == pr.IsLearner { + // Ignore any redundant addNode calls (which can happen because the + // initial bootstrapping entries are applied twice). + return + } -func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) { - pr := r.prs.Progress[id] - if pr == nil { - r.prs.InitProgress(id, 0, r.raftLog.lastIndex()+1, isLearner) - } else { - if isLearner && !pr.IsLearner { - // Can only change Learner to Voter. - r.logger.Infof("%x ignored addLearner: do not support changing %x from raft peer to learner.", r.id, id) - return + // Change Learner to Voter, use origin Learner progress. + r.prs.RemoveAny(id) + r.prs.InitProgress(id, 0 /* match */, 1 /* next */, false /* isLearner */) + pr.IsLearner = false + *r.prs.Progress[id] = *pr } - if isLearner == pr.IsLearner { - // Ignore any redundant addNode calls (which can happen because the - // initial bootstrapping entries are applied twice). - return + // When a node is first added, we should mark it as recently active. + // Otherwise, CheckQuorum may cause us to step down if it is invoked + // before the added node has had a chance to communicate with us. + r.prs.Progress[id].RecentActive = true + } + + var removed int + if cc.NodeID != None { + switch cc.Type { + case pb.ConfChangeAddNode: + addNodeOrLearnerNode(cc.NodeID, false /* isLearner */) + case pb.ConfChangeAddLearnerNode: + addNodeOrLearnerNode(cc.NodeID, true /* isLearner */) + case pb.ConfChangeRemoveNode: + removed++ + r.prs.RemoveAny(cc.NodeID) + case pb.ConfChangeUpdateNode: + default: + panic("unexpected conf type") } - - // Change Learner to Voter, use origin Learner progress. - r.prs.RemoveAny(id) - r.prs.InitProgress(id, 0 /* match */, 1 /* next */, false /* isLearner */) - pr.IsLearner = false - *r.prs.Progress[id] = *pr } - if r.id == id { - r.isLearner = isLearner - } + // Now that the configuration is updated, handle any side effects. - // When a node is first added, we should mark it as recently active. - // Otherwise, CheckQuorum may cause us to step down if it is invoked - // before the added node has a chance to communicate with us. - r.prs.Progress[id].RecentActive = true -} + cs := pb.ConfState{Nodes: r.prs.VoterNodes(), Learners: r.prs.LearnerNodes()} + pr, ok := r.prs.Progress[r.id] -func (r *raft) removeNode(id uint64) { - r.prs.RemoveAny(id) + // Update whether the node itself is a learner, resetting to false when the + // node is removed. + r.isLearner = ok && pr.IsLearner - // Do not try to commit or abort transferring if the cluster is now empty. - if len(r.prs.Voters[0]) == 0 && len(r.prs.Learners) == 0 { - return + if (!ok || r.isLearner) && r.state == StateLeader { + // This node is leader and was removed or demoted. We prevent demotions + // at the time writing but hypothetically we handle them the same way as + // removing the leader: stepping down into the next Term. + // + // TODO(tbg): step down (for sanity) and ask follower with largest Match + // to TimeoutNow (to avoid interruption). This might still drop some + // proposals but it's better than nothing. + // + // TODO(tbg): test this branch. It is untested at the time of writing. + return cs } - // TODO(tbg): won't bad (or at least unfortunate) things happen if the - // leader just removed itself? - - // The quorum size is now smaller, so see if any pending entries can - // be committed. - if r.maybeCommit() { - r.bcastAppend() + // The remaining steps only make sense if this node is the leader and there + // are other nodes. + if r.state != StateLeader || len(cs.Nodes) == 0 { + return cs } - // If the removed node is the leadTransferee, then abort the leadership transferring. - if r.state == StateLeader && r.leadTransferee == id { + if removed > 0 { + // The quorum size may have been reduced (but not to zero), so see if + // any pending entries can be committed. + if r.maybeCommit() { + r.bcastAppend() + } + } + // If the the leadTransferee was removed, abort the leadership transfer. + if _, tOK := r.prs.Progress[r.leadTransferee]; !tOK && r.leadTransferee != 0 { r.abortLeaderTransfer() } + + return cs } func (r *raft) loadState(state pb.HardState) { diff --git a/raft/raft_snap_test.go b/raft/raft_snap_test.go index de72c4c4d332..187aef697fa9 100644 --- a/raft/raft_snap_test.go +++ b/raft/raft_snap_test.go @@ -118,30 +118,38 @@ func TestSnapshotSucceed(t *testing.T) { // in the past left the follower in probing status until the next log entry was // committed. func TestSnapshotSucceedViaAppResp(t *testing.T) { - snap := pb.Snapshot{ - Metadata: pb.SnapshotMetadata{ - Index: 11, // magic number - Term: 11, // magic number - ConfState: pb.ConfState{Nodes: []uint64{1, 2, 3}}, - }, - } - s1 := NewMemoryStorage() - n1 := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, s1) - - // Become follower because otherwise the way this test sets things up the - // leadership term will be 1 (which is stale). We want it to match the snap- - // shot term in this test. - n1.becomeFollower(snap.Metadata.Term-1, 2) + // Create a single-node leader. + n1 := newTestRaft(1, []uint64{1}, 10, 1, s1) n1.becomeCandidate() n1.becomeLeader() + // We need to add a second empty entry so that we can truncate the first + // one away. + n1.Step(pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{}}}) - // Apply a snapshot on the leader. This is a workaround against the fact that - // the leader will always append an empty entry, but that empty entry works - // against what we're trying to assert in this test, namely that a snapshot - // at the latest committed index leaves the follower in probing state. - // With the snapshot, the empty entry is fully committed. - n1.restore(snap) + rd := newReady(n1, &SoftState{}, pb.HardState{}) + s1.Append(rd.Entries) + s1.SetHardState(rd.HardState) + + if exp, ci := s1.lastIndex(), n1.raftLog.committed; ci != exp { + t.Fatalf("unexpected committed index %d, wanted %d: %+v", ci, exp, s1) + } + + // Force a log truncation. + if err := s1.Compact(1); err != nil { + t.Fatal(err) + } + + // Add a follower to the group. Do this in a clandestine way for simplicity. + // Also set up a snapshot that will be sent to the follower. + n1.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}) + s1.snapshot = pb.Snapshot{ + Metadata: pb.SnapshotMetadata{ + ConfState: pb.ConfState{Nodes: []uint64{1, 2}}, + Index: s1.lastIndex(), + Term: s1.ents[len(s1.ents)-1].Term, + }, + } noMessage := pb.MessageType(-1) mustSend := func(from, to *raft, typ pb.MessageType) pb.Message { @@ -151,6 +159,9 @@ func TestSnapshotSucceedViaAppResp(t *testing.T) { continue } t.Log(DescribeMessage(msg, func([]byte) string { return "" })) + if len(msg.Entries) > 0 { + t.Log(DescribeEntries(msg.Entries, func(b []byte) string { return string(b) })) + } if err := to.Step(msg); err != nil { t.Fatalf("%v: %s", msg, err) } @@ -169,7 +180,7 @@ func TestSnapshotSucceedViaAppResp(t *testing.T) { // Create the follower that will receive the snapshot. s2 := NewMemoryStorage() - n2 := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, s2) + n2 := newTestRaft(2, []uint64{1, 2}, 10, 1, s2) // Let the leader probe the follower. if !n1.maybeSendAppend(2, true /* sendIfEmpty */) { @@ -186,9 +197,9 @@ func TestSnapshotSucceedViaAppResp(t *testing.T) { t.Fatalf("expected a rejection with zero hint, got reject=%t hint=%d", msg.Reject, msg.RejectHint) } - expIdx := snap.Metadata.Index - // Leader sends snapshot due to RejectHint of zero (the storage we use here - // has index zero compacted). + const expIdx = 2 + // Leader sends snapshot due to RejectHint of zero (we set up the raft log + // to start at index 2). if msg := mustSend(n1, n2, pb.MsgSnap); msg.Snapshot.Metadata.Index != expIdx { t.Fatalf("expected snapshot at index %d, got %d", expIdx, msg.Snapshot.Metadata.Index) } diff --git a/raft/raft_test.go b/raft/raft_test.go index 805e6071f0e1..8fcc2f5c76da 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -356,8 +356,8 @@ func TestLearnerPromotion(t *testing.T) { nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) - n1.addNode(2) - n2.addNode(2) + n1.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}) + n2.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}) if n2.isLearner { t.Error("peer 2 is learner, want not") } @@ -3076,7 +3076,7 @@ func TestNewLeaderPendingConfig(t *testing.T) { // TestAddNode tests that addNode could update nodes correctly. func TestAddNode(t *testing.T) { r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) - r.addNode(2) + r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}) nodes := r.prs.VoterNodes() wnodes := []uint64{1, 2} if !reflect.DeepEqual(nodes, wnodes) { @@ -3087,7 +3087,7 @@ func TestAddNode(t *testing.T) { // TestAddLearner tests that addLearner could update nodes correctly. func TestAddLearner(t *testing.T) { r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) - r.addLearner(2) + r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddLearnerNode}) nodes := r.prs.LearnerNodes() wnodes := []uint64{2} if !reflect.DeepEqual(nodes, wnodes) { @@ -3111,7 +3111,7 @@ func TestAddNodeCheckQuorum(t *testing.T) { r.tick() } - r.addNode(2) + r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}) // This tick will reach electionTimeout, which triggers a quorum check. r.tick() @@ -3136,14 +3136,14 @@ func TestAddNodeCheckQuorum(t *testing.T) { // and removed list correctly. func TestRemoveNode(t *testing.T) { r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) - r.removeNode(2) + r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeRemoveNode}) w := []uint64{1} if g := r.prs.VoterNodes(); !reflect.DeepEqual(g, w) { t.Errorf("nodes = %v, want %v", g, w) } // remove all nodes from cluster - r.removeNode(1) + r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeRemoveNode}) w = []uint64{} if g := r.prs.VoterNodes(); !reflect.DeepEqual(g, w) { t.Errorf("nodes = %v, want %v", g, w) @@ -3154,7 +3154,7 @@ func TestRemoveNode(t *testing.T) { // and removed list correctly. func TestRemoveLearner(t *testing.T) { r := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage()) - r.removeNode(2) + r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeRemoveNode}) w := []uint64{1} if g := r.prs.VoterNodes(); !reflect.DeepEqual(g, w) { t.Errorf("nodes = %v, want %v", g, w) @@ -3166,7 +3166,7 @@ func TestRemoveLearner(t *testing.T) { } // remove all nodes from cluster - r.removeNode(1) + r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeRemoveNode}) if g := r.prs.VoterNodes(); !reflect.DeepEqual(g, w) { t.Errorf("nodes = %v, want %v", g, w) } @@ -3300,7 +3300,7 @@ func TestCommitAfterRemoveNode(t *testing.T) { // Apply the config change. This reduces quorum requirements so the // pending command can now commit. - r.removeNode(2) + r.applyConfChange(cc) ents = nextEnts(r, s) if len(ents) != 1 || ents[0].Type != pb.EntryNormal || string(ents[0].Data) != "hello" { @@ -3549,7 +3549,7 @@ func TestLeaderTransferRemoveNode(t *testing.T) { t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3) } - lead.removeNode(3) + lead.applyConfChange(pb.ConfChange{NodeID: 3, Type: pb.ConfChangeRemoveNode}) checkLeaderTransferState(t, lead, StateLeader, 1) } @@ -3875,9 +3875,9 @@ func TestPreVoteWithCheckQuorum(t *testing.T) { // a MsgHup or MsgTimeoutNow. func TestLearnerCampaign(t *testing.T) { n1 := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) - n1.addLearner(2) + n1.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddLearnerNode}) n2 := newTestRaft(2, []uint64{1}, 10, 1, NewMemoryStorage()) - n2.addLearner(2) + n2.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddLearnerNode}) nt := newNetwork(n1, n2) nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup}) diff --git a/raft/rawnode.go b/raft/rawnode.go index d7e54eeea3e1..77183b793cee 100644 --- a/raft/rawnode.go +++ b/raft/rawnode.go @@ -101,7 +101,7 @@ func NewRawNode(config *Config, peers []Peer) (*RawNode, error) { r.raftLog.append(ents...) r.raftLog.committed = uint64(len(ents)) for _, peer := range peers { - r.addNode(peer.ID) + r.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode}) } } @@ -166,21 +166,8 @@ func (rn *RawNode) ProposeConfChange(cc pb.ConfChange) error { // ApplyConfChange applies a config change to the local node. func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState { - if cc.NodeID == None { - return &pb.ConfState{Nodes: rn.raft.prs.VoterNodes(), Learners: rn.raft.prs.LearnerNodes()} - } - switch cc.Type { - case pb.ConfChangeAddNode: - rn.raft.addNode(cc.NodeID) - case pb.ConfChangeAddLearnerNode: - rn.raft.addLearner(cc.NodeID) - case pb.ConfChangeRemoveNode: - rn.raft.removeNode(cc.NodeID) - case pb.ConfChangeUpdateNode: - default: - panic("unexpected conf type") - } - return &pb.ConfState{Nodes: rn.raft.prs.VoterNodes(), Learners: rn.raft.prs.LearnerNodes()} + cs := rn.raft.applyConfChange(cc) + return &cs } // Step advances the state machine using the given message.