Skip to content

Commit

Permalink
raft: let raft step return error when proposal is dropped to allow fa…
Browse files Browse the repository at this point in the history
…il-fast.
  • Loading branch information
absolute8511 committed Jan 12, 2018
1 parent 52f73c5 commit 30ced5b
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 31 deletions.
9 changes: 6 additions & 3 deletions raft/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ func TestNodeStepUnblock(t *testing.T) {
// TestNodePropose ensures that node.Propose sends the given proposal to the underlying raft.
func TestNodePropose(t *testing.T) {
msgs := []raftpb.Message{}
appendStep := func(r *raft, m raftpb.Message) {
appendStep := func(r *raft, m raftpb.Message) error {
msgs = append(msgs, m)
return nil
}

n := newNode()
Expand Down Expand Up @@ -147,8 +148,9 @@ func TestNodePropose(t *testing.T) {
// It also ensures that ReadState can be read out through ready chan.
func TestNodeReadIndex(t *testing.T) {
msgs := []raftpb.Message{}
appendStep := func(r *raft, m raftpb.Message) {
appendStep := func(r *raft, m raftpb.Message) error {
msgs = append(msgs, m)
return nil
}
wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}}

Expand Down Expand Up @@ -284,8 +286,9 @@ func TestNodeReadIndexToOldLeader(t *testing.T) {
// to the underlying raft.
func TestNodeProposeConfig(t *testing.T) {
msgs := []raftpb.Message{}
appendStep := func(r *raft, m raftpb.Message) {
appendStep := func(r *raft, m raftpb.Message) error {
msgs = append(msgs, m)
return nil
}

n := newNode()
Expand Down
60 changes: 35 additions & 25 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ const (
campaignTransfer CampaignType = "CampaignTransfer"
)

// ErrProposalDropped is returned when the proposal is ignored by some cases,
// so that the proposer can be notified and fail fast.
var ErrProposalDropped = errors.New("raft proposal dropped")

// lockedRand is a small wrapper around rand.Rand to provide
// synchronization. Only the methods needed by the code are exposed
// (e.g. Intn).
Expand Down Expand Up @@ -872,25 +876,28 @@ func (r *raft) Step(m pb.Message) error {
}

default:
r.step(r, m)
err := r.step(r, m)
if err != nil {
return err
}
}
return nil
}

type stepFunc func(r *raft, m pb.Message)
type stepFunc func(r *raft, m pb.Message) error

func stepLeader(r *raft, m pb.Message) {
func stepLeader(r *raft, m pb.Message) error {
// These message types do not require any progress for m.From.
switch m.Type {
case pb.MsgBeat:
r.bcastHeartbeat()
return
return nil
case pb.MsgCheckQuorum:
if !r.checkQuorumActive() {
r.logger.Warningf("%x stepped down to follower since quorum is not active", r.id)
r.becomeFollower(r.Term, None)
}
return
return nil
case pb.MsgProp:
if len(m.Entries) == 0 {
r.logger.Panicf("%x stepped empty MsgProp", r.id)
Expand All @@ -899,11 +906,11 @@ func stepLeader(r *raft, m pb.Message) {
// If we are not currently a member of the range (i.e. this node
// was removed from the configuration while serving as leader),
// drop any new proposals.
return
return ErrProposalDropped
}
if r.leadTransferee != None {
r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
return
return ErrProposalDropped
}

for i, e := range m.Entries {
Expand All @@ -919,12 +926,12 @@ func stepLeader(r *raft, m pb.Message) {
}
r.appendEntry(m.Entries...)
r.bcastAppend()
return
return nil
case pb.MsgReadIndex:
if r.quorum() > 1 {
if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term {
// Reject read only request when this leader has not committed any log entry at its term.
return
return nil
}

// thinking: use an interally defined context instead of the user given context.
Expand All @@ -946,14 +953,14 @@ func stepLeader(r *raft, m pb.Message) {
r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
}

return
return nil
}

// All other message types require a progress for m.From (pr).
pr := r.getProgress(m.From)
if pr == nil {
r.logger.Debugf("%x no progress available for %x", r.id, m.From)
return
return nil
}
switch m.Type {
case pb.MsgAppResp:
Expand Down Expand Up @@ -1009,12 +1016,12 @@ func stepLeader(r *raft, m pb.Message) {
}

if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
return
return nil
}

ackCount := r.readOnly.recvAck(m)
if ackCount < r.quorum() {
return
return nil
}

rss := r.readOnly.advance(m)
Expand All @@ -1028,7 +1035,7 @@ func stepLeader(r *raft, m pb.Message) {
}
case pb.MsgSnapStatus:
if pr.State != ProgressStateSnapshot {
return
return nil
}
if !m.Reject {
pr.becomeProbe()
Expand All @@ -1052,22 +1059,22 @@ func stepLeader(r *raft, m pb.Message) {
case pb.MsgTransferLeader:
if pr.IsLearner {
r.logger.Debugf("%x is learner. Ignored transferring leadership", r.id)
return
return nil
}
leadTransferee := m.From
lastLeadTransferee := r.leadTransferee
if lastLeadTransferee != None {
if lastLeadTransferee == leadTransferee {
r.logger.Infof("%x [term %d] transfer leadership to %x is in progress, ignores request to same node %x",
r.id, r.Term, leadTransferee, leadTransferee)
return
return nil
}
r.abortLeaderTransfer()
r.logger.Infof("%x [term %d] abort previous transferring leadership to %x", r.id, r.Term, lastLeadTransferee)
}
if leadTransferee == r.id {
r.logger.Debugf("%x is already leader. Ignored transferring leadership to self", r.id)
return
return nil
}
// Transfer leadership to third party.
r.logger.Infof("%x [term %d] starts to transfer leadership to %x", r.id, r.Term, leadTransferee)
Expand All @@ -1081,11 +1088,12 @@ func stepLeader(r *raft, m pb.Message) {
r.sendAppend(leadTransferee)
}
}
return nil
}

// stepCandidate is shared by StateCandidate and StatePreCandidate; the difference is
// whether they respond to MsgVoteResp or MsgPreVoteResp.
func stepCandidate(r *raft, m pb.Message) {
func stepCandidate(r *raft, m pb.Message) error {
// Only handle vote responses corresponding to our candidacy (while in
// StateCandidate, we may get stale MsgPreVoteResp messages in this term from
// our pre-candidate state).
Expand All @@ -1098,7 +1106,7 @@ func stepCandidate(r *raft, m pb.Message) {
switch m.Type {
case pb.MsgProp:
r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
return
return ErrProposalDropped
case pb.MsgApp:
r.becomeFollower(r.Term, m.From)
r.handleAppendEntries(m)
Expand All @@ -1125,17 +1133,18 @@ func stepCandidate(r *raft, m pb.Message) {
case pb.MsgTimeoutNow:
r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From)
}
return nil
}

func stepFollower(r *raft, m pb.Message) {
func stepFollower(r *raft, m pb.Message) error {
switch m.Type {
case pb.MsgProp:
if r.lead == None {
r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
return
return ErrProposalDropped
} else if r.disableProposalForwarding {
r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term)
return
return ErrProposalDropped
}
m.To = r.lead
r.send(m)
Expand All @@ -1154,7 +1163,7 @@ func stepFollower(r *raft, m pb.Message) {
case pb.MsgTransferLeader:
if r.lead == None {
r.logger.Infof("%x no leader at term %d; dropping leader transfer msg", r.id, r.Term)
return
return nil
}
m.To = r.lead
r.send(m)
Expand All @@ -1171,17 +1180,18 @@ func stepFollower(r *raft, m pb.Message) {
case pb.MsgReadIndex:
if r.lead == None {
r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)
return
return nil
}
m.To = r.lead
r.send(m)
case pb.MsgReadIndexResp:
if len(m.Entries) != 1 {
r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries))
return
return nil
}
r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})
}
return nil
}

func (r *raft) handleAppendEntries(m pb.Message) {
Expand Down
3 changes: 2 additions & 1 deletion raft/raft_paper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ func testUpdateTermFromMessage(t *testing.T, state StateType) {
// Reference: section 5.1
func TestRejectStaleTermMessage(t *testing.T) {
called := false
fakeStep := func(r *raft, m pb.Message) {
fakeStep := func(r *raft, m pb.Message) error {
called = true
return nil
}
r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
r.step = fakeStep
Expand Down
7 changes: 6 additions & 1 deletion raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1231,8 +1231,9 @@ func TestPastElectionTimeout(t *testing.T) {
// actual stepX function.
func TestStepIgnoreOldTermMsg(t *testing.T) {
called := false
fakeStep := func(r *raft, m pb.Message) {
fakeStep := func(r *raft, m pb.Message) error {
called = true
return nil
}
sm := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
sm.step = fakeStep
Expand Down Expand Up @@ -3219,6 +3220,10 @@ func TestLeaderTransferIgnoreProposal(t *testing.T) {
}

nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
err := lead.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
if err != ErrProposalDropped {
t.Fatalf("should return drop proposal error while transferring")
}

if lead.prs[1].Match != 1 {
t.Fatalf("node 1 has match %x, want %x", lead.prs[1].Match, 1)
Expand Down
3 changes: 2 additions & 1 deletion raft/rawnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,9 @@ func TestRawNodeProposeAddDuplicateNode(t *testing.T) {
// to the underlying raft. It also ensures that ReadState can be read out.
func TestRawNodeReadIndex(t *testing.T) {
msgs := []raftpb.Message{}
appendStep := func(r *raft, m raftpb.Message) {
appendStep := func(r *raft, m raftpb.Message) error {
msgs = append(msgs, m)
return nil
}
wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}}

Expand Down

0 comments on commit 30ced5b

Please sign in to comment.