Skip to content

Commit

Permalink
raft: clean up and improve progress-related comments
Browse files Browse the repository at this point in the history
  • Loading branch information
tbg committed Jun 8, 2019
1 parent 5dae63a commit 84f7042
Show file tree
Hide file tree
Showing 11 changed files with 146 additions and 101 deletions.
2 changes: 1 addition & 1 deletion raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ func (n *node) run(r *raft) {
}
case m := <-n.recvc:
// filter out response message from unknown From.
if pr := r.prs.GetProgress(m.From); pr != nil || !IsResponseMsg(m.Type) {
if pr := r.prs.Progress[m.From]; pr != nil || !IsResponseMsg(m.Type) {
r.Step(m)
}
case cc := <-n.confc:
Expand Down
36 changes: 19 additions & 17 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ func (r *raft) sendAppend(to uint64) {
// ("empty" messages are useful to convey updated Commit indexes, but
// are undesirable when we're sending multiple messages in a batch).
func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
pr := r.prs.GetProgress(to)
pr := r.prs.Progress[to]
if pr.IsPaused() {
return false
}
Expand Down Expand Up @@ -494,7 +494,7 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
pr.OptimisticUpdate(last)
pr.Inflights.Add(last)
case tracker.StateProbe:
pr.Pause()
pr.RecentlyContacted = true
default:
r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
}
Expand All @@ -512,7 +512,7 @@ func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
// or it might not have all the committed entries.
// The leader MUST NOT forward the follower's commit to
// an unmatched index.
commit := min(r.prs.GetProgress(to).Match, r.raftLog.committed)
commit := min(r.prs.Progress[to].Match, r.raftLog.committed)
m := pb.Message{
To: to,
Type: pb.MsgHeartbeat,
Expand Down Expand Up @@ -610,7 +610,7 @@ func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
}
// use latest "last" index after truncate/append
li = r.raftLog.append(es...)
r.prs.GetProgress(r.id).MaybeUpdate(li)
r.prs.Progress[r.id].MaybeUpdate(li)
// Regardless of maybeCommit's return, our caller will call bcastAppend.
r.maybeCommit()
return true
Expand Down Expand Up @@ -704,7 +704,7 @@ func (r *raft) becomeLeader() {
// (perhaps after having received a snapshot as a result). The leader is
// trivially in this state. Note that r.reset() has initialized this
// progress with the last index already.
r.prs.GetProgress(r.id).BecomeReplicate()
r.prs.Progress[r.id].BecomeReplicate()

// Conservatively set the pendingConfIndex to the last index in the
// log. There may or may not be a pending config change, but it's
Expand Down Expand Up @@ -933,7 +933,7 @@ func stepLeader(r *raft, m pb.Message) error {
//
// TODO(tbg): I added a TODO in removeNode, it doesn't seem that the
// leader steps down when removing itself. I might be missing something.
if pr := r.prs.GetProgress(r.id); pr != nil {
if pr := r.prs.Progress[r.id]; pr != nil {
pr.RecentActive = true
}
if !r.prs.QuorumActive() {
Expand All @@ -952,7 +952,7 @@ func stepLeader(r *raft, m pb.Message) error {
if len(m.Entries) == 0 {
r.logger.Panicf("%x stepped empty MsgProp", r.id)
}
if r.prs.GetProgress(r.id) == nil {
if r.prs.Progress[r.id] == nil {
// If we are not currently a member of the range (i.e. this node
// was removed from the configuration while serving as leader),
// drop any new proposals.
Expand Down Expand Up @@ -1019,7 +1019,7 @@ func stepLeader(r *raft, m pb.Message) error {
}

// All other message types require a progress for m.From (pr).
pr := r.prs.GetProgress(m.From)
pr := r.prs.Progress[m.From]
if pr == nil {
r.logger.Debugf("%x no progress available for %x", r.id, m.From)
return nil
Expand Down Expand Up @@ -1054,7 +1054,7 @@ func stepLeader(r *raft, m pb.Message) error {
pr.BecomeProbe()
pr.BecomeReplicate()
case pr.State == tracker.StateReplicate:
pr.Inflights.FreeTo(m.Index)
pr.Inflights.FreeLE(m.Index)
}

if r.maybeCommit() {
Expand All @@ -1081,7 +1081,7 @@ func stepLeader(r *raft, m pb.Message) error {
}
case pb.MsgHeartbeatResp:
pr.RecentActive = true
pr.Resume()
pr.RecentlyContacted = false

// free one slot for the full inflights window to allow progress.
if pr.State == tracker.StateReplicate && pr.Inflights.Full() {
Expand Down Expand Up @@ -1116,14 +1116,16 @@ func stepLeader(r *raft, m pb.Message) error {
pr.BecomeProbe()
r.logger.Debugf("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
} else {
pr.SnapshotFailure()
// NB: the order here matters or we'll be probing erroneously from
// the snapshot index, but the snapshot never applied.
pr.PendingSnapshot = 0
pr.BecomeProbe()
r.logger.Debugf("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
}
// If snapshot finish, wait for the msgAppResp from the remote node before sending
// out the next msgApp.
// If snapshot failure, wait for a heartbeat interval before next try
pr.Pause()
pr.RecentlyContacted = true
case pb.MsgUnreachable:
// During optimistic replication, if the remote becomes unreachable,
// there is huge probability that a MsgApp is lost.
Expand Down Expand Up @@ -1345,14 +1347,14 @@ func (r *raft) restoreNode(nodes []uint64, isLearner bool) {
r.isLearner = isLearner
}
r.prs.InitProgress(n, match, next, isLearner)
r.logger.Infof("%x restored progress of %x [%s]", r.id, n, r.prs.GetProgress(n))
r.logger.Infof("%x restored progress of %x [%s]", r.id, n, r.prs.Progress[n])
}
}

// promotable indicates whether state machine can be promoted to leader,
// which is true when its own id is in progress list.
func (r *raft) promotable() bool {
pr := r.prs.GetProgress(r.id)
pr := r.prs.Progress[r.id]
return pr != nil && !pr.IsLearner
}

Expand All @@ -1365,7 +1367,7 @@ func (r *raft) addLearner(id uint64) {
}

func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) {
pr := r.prs.GetProgress(id)
pr := r.prs.Progress[id]
if pr == nil {
r.prs.InitProgress(id, 0, r.raftLog.lastIndex()+1, isLearner)
} else {
Expand All @@ -1385,7 +1387,7 @@ func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) {
r.prs.RemoveAny(id)
r.prs.InitProgress(id, 0 /* match */, 1 /* next */, false /* isLearner */)
pr.IsLearner = false
*r.prs.GetProgress(id) = *pr
*r.prs.Progress[id] = *pr
}

if r.id == id {
Expand All @@ -1395,7 +1397,7 @@ func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) {
// When a node is first added, we should mark it as recently active.
// Otherwise, CheckQuorum may cause us to step down if it is invoked
// before the added node has a chance to communicate with us.
r.prs.GetProgress(id).RecentActive = true
r.prs.Progress[id].RecentActive = true
}

func (r *raft) removeNode(id uint64) {
Expand Down
8 changes: 4 additions & 4 deletions raft/raft_snap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ func TestSnapshotFailure(t *testing.T) {
if sm.prs.Progress[2].Next != 1 {
t.Fatalf("Next = %d, want 1", sm.prs.Progress[2].Next)
}
if !sm.prs.Progress[2].Paused {
t.Errorf("Paused = %v, want true", sm.prs.Progress[2].Paused)
if !sm.prs.Progress[2].RecentlyContacted {
t.Errorf("RecentlyContacted = %v, want true", sm.prs.Progress[2].RecentlyContacted)
}
}

Expand All @@ -107,8 +107,8 @@ func TestSnapshotSucceed(t *testing.T) {
if sm.prs.Progress[2].Next != 12 {
t.Fatalf("Next = %d, want 12", sm.prs.Progress[2].Next)
}
if !sm.prs.Progress[2].Paused {
t.Errorf("Paused = %v, want true", sm.prs.Progress[2].Paused)
if !sm.prs.Progress[2].RecentlyContacted {
t.Errorf("RecentlyContacted = %v, want true", sm.prs.Progress[2].RecentlyContacted)
}
}

Expand Down
24 changes: 12 additions & 12 deletions raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,17 @@ func TestProgressResumeByHeartbeatResp(t *testing.T) {
r.becomeCandidate()
r.becomeLeader()

r.prs.Progress[2].Paused = true
r.prs.Progress[2].RecentlyContacted = true

r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
if !r.prs.Progress[2].Paused {
t.Errorf("paused = %v, want true", r.prs.Progress[2].Paused)
if !r.prs.Progress[2].RecentlyContacted {
t.Errorf("paused = %v, want true", r.prs.Progress[2].RecentlyContacted)
}

r.prs.Progress[2].BecomeReplicate()
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
if r.prs.Progress[2].Paused {
t.Errorf("paused = %v, want false", r.prs.Progress[2].Paused)
if r.prs.Progress[2].RecentlyContacted {
t.Errorf("paused = %v, want false", r.prs.Progress[2].RecentlyContacted)
}
}

Expand Down Expand Up @@ -678,7 +678,7 @@ func TestLearnerLogReplication(t *testing.T) {
t.Errorf("peer 2 wants committed to %d, but still %d", n1.raftLog.committed, n2.raftLog.committed)
}

match := n1.prs.GetProgress(2).Match
match := n1.prs.Progress[2].Match
if match != n2.raftLog.committed {
t.Errorf("progress 2 of leader 1 wants match %d, but got %d", n2.raftLog.committed, match)
}
Expand Down Expand Up @@ -2600,8 +2600,8 @@ func TestSendAppendForProgressProbe(t *testing.T) {
}
}

if !r.prs.Progress[2].Paused {
t.Errorf("paused = %v, want true", r.prs.Progress[2].Paused)
if !r.prs.Progress[2].RecentlyContacted {
t.Errorf("paused = %v, want true", r.prs.Progress[2].RecentlyContacted)
}
for j := 0; j < 10; j++ {
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
Expand All @@ -2615,8 +2615,8 @@ func TestSendAppendForProgressProbe(t *testing.T) {
for j := 0; j < r.heartbeatTimeout; j++ {
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
}
if !r.prs.Progress[2].Paused {
t.Errorf("paused = %v, want true", r.prs.Progress[2].Paused)
if !r.prs.Progress[2].RecentlyContacted {
t.Errorf("paused = %v, want true", r.prs.Progress[2].RecentlyContacted)
}

// consume the heartbeat
Expand All @@ -2638,8 +2638,8 @@ func TestSendAppendForProgressProbe(t *testing.T) {
if msg[0].Index != 0 {
t.Errorf("index = %d, want %d", msg[0].Index, 0)
}
if !r.prs.Progress[2].Paused {
t.Errorf("paused = %v, want true", r.prs.Progress[2].Paused)
if !r.prs.Progress[2].RecentlyContacted {
t.Errorf("paused = %v, want true", r.prs.Progress[2].RecentlyContacted)
}
}

Expand Down
2 changes: 1 addition & 1 deletion raft/rawnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (rn *RawNode) Step(m pb.Message) error {
if IsLocalMsg(m.Type) {
return ErrStepLocalMsg
}
if pr := rn.raft.prs.GetProgress(m.From); pr != nil || !IsResponseMsg(m.Type) {
if pr := rn.raft.prs.Progress[m.From]; pr != nil || !IsResponseMsg(m.Type) {
return rn.raft.Step(m)
}
return ErrStepPeerNotFound
Expand Down
33 changes: 22 additions & 11 deletions raft/tracker/inflights.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@

package tracker

// Inflights limits the number of MsgApp (represented by the largest index
// contained within) sent to followers but not yet acknowledged by them. Callers
// use Full() to check whether more messages can be sent, call Add() whenever
// they are sending a new append, and release "quota" via FreeLE() whenever an
// ack is received.
type Inflights struct {
// the starting index in the buffer
start int
Expand All @@ -28,15 +33,17 @@ type Inflights struct {
buffer []uint64
}

func (ins *Inflights) Count() int { return ins.count }

// NewInflights sets up an Inflights that allows up to 'size' inflight messages.
func NewInflights(size int) *Inflights {
return &Inflights{
size: size,
}
}

// Add adds an inflight into inflights
// Add notifies the Inflights that a new message with the given index is being
// dispatched. Full() must be called prior to Add() to verify that there is room
// for one more message, and consecutive calls to add Add() must provide a
// monotonic sequence of indexes.
func (in *Inflights) Add(inflight uint64) {
if in.Full() {
panic("cannot add into a Full inflights")
Expand All @@ -47,7 +54,7 @@ func (in *Inflights) Add(inflight uint64) {
next -= size
}
if next >= len(in.buffer) {
in.growBuf()
in.grow()
}
in.buffer[next] = inflight
in.count++
Expand All @@ -56,7 +63,7 @@ func (in *Inflights) Add(inflight uint64) {
// grow the inflight buffer by doubling up to inflights.size. We grow on demand
// instead of preallocating to inflights.size to handle systems which have
// thousands of Raft groups per process.
func (in *Inflights) growBuf() {
func (in *Inflights) grow() {
newSize := len(in.buffer) * 2
if newSize == 0 {
newSize = 1
Expand All @@ -68,8 +75,8 @@ func (in *Inflights) growBuf() {
in.buffer = newBuffer
}

// FreeTo frees the inflights smaller or equal to the given `to` flight.
func (in *Inflights) FreeTo(to uint64) {
// FreeLE frees the inflights smaller or equal to the given `to` flight.
func (in *Inflights) FreeLE(to uint64) {
if in.count == 0 || to < in.buffer[in.start] {
// out of the left side of the window
return
Expand Down Expand Up @@ -98,15 +105,19 @@ func (in *Inflights) FreeTo(to uint64) {
}
}

// FreeFirstOne releases the first inflight.
func (in *Inflights) FreeFirstOne() { in.FreeTo(in.buffer[in.start]) }
// FreeFirstOne releases the first inflight. This is a no-op if nothing is
// inflight.
func (in *Inflights) FreeFirstOne() { in.FreeLE(in.buffer[in.start]) }

// Full returns true if the inflights is Full.
// Full returns true if no more messages can be sent at the moment.
func (in *Inflights) Full() bool {
return in.count == in.size
}

// resets frees all inflights.
// Count returns the number of inflight messages.
func (in *Inflights) Count() int { return in.count }

// reset frees all inflights.
func (in *Inflights) reset() {
in.count = 0
in.start = 0
Expand Down
8 changes: 4 additions & 4 deletions raft/tracker/inflights_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestInflightFreeTo(t *testing.T) {
in.Add(uint64(i))
}

in.FreeTo(4)
in.FreeLE(4)

wantIn := &Inflights{
start: 5,
Expand All @@ -119,7 +119,7 @@ func TestInflightFreeTo(t *testing.T) {
t.Fatalf("in = %+v, want %+v", in, wantIn)
}

in.FreeTo(8)
in.FreeLE(8)

wantIn2 := &Inflights{
start: 9,
Expand All @@ -138,7 +138,7 @@ func TestInflightFreeTo(t *testing.T) {
in.Add(uint64(i))
}

in.FreeTo(12)
in.FreeLE(12)

wantIn3 := &Inflights{
start: 3,
Expand All @@ -152,7 +152,7 @@ func TestInflightFreeTo(t *testing.T) {
t.Fatalf("in = %+v, want %+v", in, wantIn3)
}

in.FreeTo(14)
in.FreeLE(14)

wantIn4 := &Inflights{
start: 0,
Expand Down
Loading

0 comments on commit 84f7042

Please sign in to comment.