Skip to content

Commit

Permalink
Merge pull request #10865 from tbg/multi-conf-change
Browse files Browse the repository at this point in the history
raft: centralize configuration change application
  • Loading branch information
tbg authored Jul 3, 2019
2 parents 1f40b66 + 6697adf commit 48f5bb6
Show file tree
Hide file tree
Showing 9 changed files with 296 additions and 148 deletions.
41 changes: 17 additions & 24 deletions raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,19 @@ func StartNode(c *Config, peers []Peer) Node {
if err != nil {
panic("unexpected marshal error")
}
e := pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: r.raftLog.lastIndex() + 1, Data: d}
// TODO(tbg): this should append the ConfChange for the own node first
// and also call applyConfChange below for that node first. Otherwise
// we have a Raft group (for a little while) that doesn't have itself
// in its config, which is bad.
// This whole way of setting things up is rickety. The app should just
// populate the initial ConfState appropriately and then all of this
// goes away.
e := pb.Entry{
Type: pb.EntryConfChange,
Term: 1,
Index: r.raftLog.lastIndex() + 1,
Data: d,
}
r.raftLog.append(e)
}
// Mark these initial entries as committed.
Expand All @@ -225,7 +237,7 @@ func StartNode(c *Config, peers []Peer) Node {
// We do not set raftLog.applied so the application will be able
// to observe all conf changes via Ready.CommittedEntries.
for _, peer := range peers {
r.addNode(peer.ID)
r.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode})
}

n := newNode()
Expand Down Expand Up @@ -357,35 +369,16 @@ func (n *node) run(r *raft) {
r.Step(m)
}
case cc := <-n.confc:
if cc.NodeID == None {
select {
case n.confstatec <- pb.ConfState{
Nodes: r.prs.VoterNodes(),
Learners: r.prs.LearnerNodes()}:
case <-n.done:
}
break
}
switch cc.Type {
case pb.ConfChangeAddNode:
r.addNode(cc.NodeID)
case pb.ConfChangeAddLearnerNode:
r.addLearner(cc.NodeID)
case pb.ConfChangeRemoveNode:
cs := r.applyConfChange(cc)
if _, ok := r.prs.Progress[r.id]; !ok {
// block incoming proposal when local node is
// removed
if cc.NodeID == r.id {
propc = nil
}
r.removeNode(cc.NodeID)
case pb.ConfChangeUpdateNode:
default:
panic("unexpected conf type")
}
select {
case n.confstatec <- pb.ConfState{
Nodes: r.prs.VoterNodes(),
Learners: r.prs.LearnerNodes()}:
case n.confstatec <- cs:
case <-n.done:
}
case <-n.tickc:
Expand Down
7 changes: 7 additions & 0 deletions raft/quorum/joint.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ package quorum
// majority configurations. Decisions require the support of both majorities.
type JointConfig [2]MajorityConfig

func (c JointConfig) String() string {
if len(c[1]) > 0 {
return c[0].String() + "&&" + c[1].String()
}
return c[0].String()
}

// IDs returns a newly initialized map representing the set of voters present
// in the joint configuration.
func (c JointConfig) IDs() map[uint64]struct{} {
Expand Down
18 changes: 18 additions & 0 deletions raft/quorum/majority.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,24 @@ import (
// MajorityConfig is a set of IDs that uses majority quorums to make decisions.
type MajorityConfig map[uint64]struct{}

func (c MajorityConfig) String() string {
sl := make([]uint64, 0, len(c))
for id := range c {
sl = append(sl, id)
}
sort.Slice(sl, func(i, j int) bool { return sl[i] < sl[j] })
var buf strings.Builder
buf.WriteByte('(')
for i := range sl {
if i > 0 {
buf.WriteByte(' ')
}
fmt.Fprint(&buf, sl[i])
}
buf.WriteByte(')')
return buf.String()
}

// Describe returns a (multi-line) representation of the commit indexes for the
// given lookuper.
func (c MajorityConfig) Describe(l AckedIndexer) string {
Expand Down
1 change: 1 addition & 0 deletions raft/quorum/quorum.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"strconv"
)

// Index is a Raft log position.
type Index uint64

func (i Index) String() string {
Expand Down
197 changes: 132 additions & 65 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1322,11 +1322,51 @@ func (r *raft) handleSnapshot(m pb.Message) {
}

// restore recovers the state machine from a snapshot. It restores the log and the
// configuration of state machine.
// configuration of state machine. If this method returns false, the snapshot was
// ignored, either because it was obsolete or because of an error.
func (r *raft) restore(s pb.Snapshot) bool {
if s.Metadata.Index <= r.raftLog.committed {
return false
}
if r.state != StateFollower {
// This is defense-in-depth: if the leader somehow ended up applying a
// snapshot, it could move into a new term without moving into a
// follower state. This should never fire, but if it did, we'd have
// prevented damage by returning early, so log only a loud warning.
//
// At the time of writing, the instance is guaranteed to be in follower
// state when this method is called.
r.logger.Warningf("%x attempted to restore snapshot as leader; should never happen", r.id)
r.becomeFollower(r.Term+1, None)
return false
}

// More defense-in-depth: throw away snapshot if recipient is not in the
// config. This shouuldn't ever happen (at the time of writing) but lots of
// code here and there assumes that r.id is in the progress tracker.
found := false
cs := s.Metadata.ConfState
for _, set := range [][]uint64{
cs.Nodes,
cs.Learners,
} {
for _, id := range set {
if id == r.id {
found = true
break
}
}
}
if !found {
r.logger.Warningf(
"%x attempted to restore snapshot but it is not in the ConfState %v; should never happen",
r.id, cs,
)
return false
}

// Now go ahead and actually restore.

if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) {
r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]",
r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
Expand All @@ -1344,26 +1384,23 @@ func (r *raft) restore(s pb.Snapshot) bool {
}
}

r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] starts to restore snapshot [index: %d, term: %d]",
r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)

r.raftLog.restore(s)
r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight)
r.restoreNode(s.Metadata.ConfState.Nodes, false)
r.restoreNode(s.Metadata.ConfState.Learners, true)
return true
}

func (r *raft) restoreNode(nodes []uint64, isLearner bool) {
for _, n := range nodes {
match, next := uint64(0), r.raftLog.lastIndex()+1
if n == r.id {
match = next - 1
r.isLearner = isLearner
}
r.prs.InitProgress(n, match, next, isLearner)
r.logger.Infof("%x restored progress of %x [%s]", r.id, n, r.prs.Progress[n])
// Reset the configuration and add the (potentially updated) peers in anew.
r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight)
for _, id := range s.Metadata.ConfState.Nodes {
r.applyConfChange(pb.ConfChange{NodeID: id, Type: pb.ConfChangeAddNode})
}
for _, id := range s.Metadata.ConfState.Learners {
r.applyConfChange(pb.ConfChange{NodeID: id, Type: pb.ConfChangeAddLearnerNode})
}

pr := r.prs.Progress[r.id]
pr.MaybeUpdate(pr.Next - 1) // TODO(tbg): this is untested and likely unneeded

r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] restored snapshot [index: %d, term: %d]",
r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
return true
}

// promotable indicates whether state machine can be promoted to leader,
Expand All @@ -1373,68 +1410,98 @@ func (r *raft) promotable() bool {
return pr != nil && !pr.IsLearner
}

func (r *raft) addNode(id uint64) {
r.addNodeOrLearnerNode(id, false)
}
func (r *raft) applyConfChange(cc pb.ConfChange) pb.ConfState {
addNodeOrLearnerNode := func(id uint64, isLearner bool) {
// NB: this method is intentionally hidden from view. All mutations of
// the conf state must call applyConfChange directly.
pr := r.prs.Progress[id]
if pr == nil {
r.prs.InitProgress(id, 0, r.raftLog.lastIndex()+1, isLearner)
} else {
if isLearner && !pr.IsLearner {
// Can only change Learner to Voter.
//
// TODO(tbg): why?
r.logger.Infof("%x ignored addLearner: do not support changing %x from raft peer to learner.", r.id, id)
return
}

func (r *raft) addLearner(id uint64) {
r.addNodeOrLearnerNode(id, true)
}
if isLearner == pr.IsLearner {
// Ignore any redundant addNode calls (which can happen because the
// initial bootstrapping entries are applied twice).
return
}

func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) {
pr := r.prs.Progress[id]
if pr == nil {
r.prs.InitProgress(id, 0, r.raftLog.lastIndex()+1, isLearner)
} else {
if isLearner && !pr.IsLearner {
// Can only change Learner to Voter.
r.logger.Infof("%x ignored addLearner: do not support changing %x from raft peer to learner.", r.id, id)
return
// Change Learner to Voter, use origin Learner progress.
r.prs.RemoveAny(id)
r.prs.InitProgress(id, 0 /* match */, 1 /* next */, false /* isLearner */)
pr.IsLearner = false
*r.prs.Progress[id] = *pr
}

if isLearner == pr.IsLearner {
// Ignore any redundant addNode calls (which can happen because the
// initial bootstrapping entries are applied twice).
return
// When a node is first added, we should mark it as recently active.
// Otherwise, CheckQuorum may cause us to step down if it is invoked
// before the added node has had a chance to communicate with us.
r.prs.Progress[id].RecentActive = true
}

var removed int
if cc.NodeID != None {
switch cc.Type {
case pb.ConfChangeAddNode:
addNodeOrLearnerNode(cc.NodeID, false /* isLearner */)
case pb.ConfChangeAddLearnerNode:
addNodeOrLearnerNode(cc.NodeID, true /* isLearner */)
case pb.ConfChangeRemoveNode:
removed++
r.prs.RemoveAny(cc.NodeID)
case pb.ConfChangeUpdateNode:
default:
panic("unexpected conf type")
}

// Change Learner to Voter, use origin Learner progress.
r.prs.RemoveAny(id)
r.prs.InitProgress(id, 0 /* match */, 1 /* next */, false /* isLearner */)
pr.IsLearner = false
*r.prs.Progress[id] = *pr
}

if r.id == id {
r.isLearner = isLearner
}
r.logger.Infof("%x switched to configuration %s", r.id, r.prs.Config)
// Now that the configuration is updated, handle any side effects.

// When a node is first added, we should mark it as recently active.
// Otherwise, CheckQuorum may cause us to step down if it is invoked
// before the added node has a chance to communicate with us.
r.prs.Progress[id].RecentActive = true
}
cs := pb.ConfState{Nodes: r.prs.VoterNodes(), Learners: r.prs.LearnerNodes()}
pr, ok := r.prs.Progress[r.id]

func (r *raft) removeNode(id uint64) {
r.prs.RemoveAny(id)
// Update whether the node itself is a learner, resetting to false when the
// node is removed.
r.isLearner = ok && pr.IsLearner

// Do not try to commit or abort transferring if the cluster is now empty.
if len(r.prs.Voters[0]) == 0 && len(r.prs.Learners) == 0 {
return
if (!ok || r.isLearner) && r.state == StateLeader {
// This node is leader and was removed or demoted. We prevent demotions
// at the time writing but hypothetically we handle them the same way as
// removing the leader: stepping down into the next Term.
//
// TODO(tbg): step down (for sanity) and ask follower with largest Match
// to TimeoutNow (to avoid interruption). This might still drop some
// proposals but it's better than nothing.
//
// TODO(tbg): test this branch. It is untested at the time of writing.
return cs
}

// TODO(tbg): won't bad (or at least unfortunate) things happen if the
// leader just removed itself?

// The quorum size is now smaller, so see if any pending entries can
// be committed.
if r.maybeCommit() {
r.bcastAppend()
// The remaining steps only make sense if this node is the leader and there
// are other nodes.
if r.state != StateLeader || len(cs.Nodes) == 0 {
return cs
}
// If the removed node is the leadTransferee, then abort the leadership transferring.
if r.state == StateLeader && r.leadTransferee == id {
if removed > 0 {
// The quorum size may have been reduced (but not to zero), so see if
// any pending entries can be committed.
if r.maybeCommit() {
r.bcastAppend()
}
}
// If the the leadTransferee was removed, abort the leadership transfer.
if _, tOK := r.prs.Progress[r.leadTransferee]; !tOK && r.leadTransferee != 0 {
r.abortLeaderTransfer()
}

return cs
}

func (r *raft) loadState(state pb.HardState) {
Expand Down
Loading

0 comments on commit 48f5bb6

Please sign in to comment.