Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft: fix auto-transitioning out of joint config #11046

Merged
merged 3 commits into from
Feb 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,9 @@ type Node interface {
Advance()
// ApplyConfChange applies a config change (previously passed to
// ProposeConfChange) to the node. This must be called whenever a config
// change is observed in Ready.CommittedEntries.
// change is observed in Ready.CommittedEntries, except when the app decides
// to reject the configuration change (i.e. treats it as a noop instead), in
// which case it must not be called.
//
// Returns an opaque non-nil ConfState protobuf which must be recorded in
// snapshots.
Expand Down
42 changes: 24 additions & 18 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,35 +554,34 @@ func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
}

func (r *raft) advance(rd Ready) {
r.reduceUncommittedSize(rd.CommittedEntries)

// If entries were applied (or a snapshot), update our cursor for
// the next Ready. Note that if the current HardState contains a
// new Commit index, this does not mean that we're also applying
// all of the new entries due to commit pagination by size.
if index := rd.appliedCursor(); index > 0 {
r.raftLog.appliedTo(index)
if r.prs.Config.AutoLeave && index >= r.pendingConfIndex && r.state == StateLeader {
if newApplied := rd.appliedCursor(); newApplied > 0 {
oldApplied := r.raftLog.applied
r.raftLog.appliedTo(newApplied)

if r.prs.Config.AutoLeave && oldApplied < r.pendingConfIndex && newApplied >= r.pendingConfIndex && r.state == StateLeader {
// If the current (and most recent, at least for this leader's term)
// configuration should be auto-left, initiate that now.
ccdata, err := (&pb.ConfChangeV2{}).Marshal()
if err != nil {
panic(err)
}
// configuration should be auto-left, initiate that now. We use a
// nil Data which unmarshals into an empty ConfChangeV2 and has the
// benefit that appendEntry can never refuse it based on its size
// (which registers as zero).
ent := pb.Entry{
Type: pb.EntryConfChangeV2,
Data: ccdata,
Data: nil,
}
// There's no way in which this proposal should be able to be rejected.
if !r.appendEntry(ent) {
// If we could not append the entry, bump the pending conf index
// so that we'll try again later.
//
// TODO(tbg): test this case.
r.pendingConfIndex = r.raftLog.lastIndex()
} else {
r.logger.Infof("initiating automatic transition out of joint configuration %s", r.prs.Config)
panic("refused un-refusable auto-leaving ConfChangeV2")
}
r.pendingConfIndex = r.raftLog.lastIndex()
r.logger.Infof("initiating automatic transition out of joint configuration %s", r.prs.Config)
}
}
r.reduceUncommittedSize(rd.CommittedEntries)

if len(rd.Entries) > 0 {
e := rd.Entries[len(rd.Entries)-1]
Expand Down Expand Up @@ -1607,16 +1606,23 @@ func (r *raft) abortLeaderTransfer() {
// If the new entries would exceed the limit, the method returns false. If not,
// the increase in uncommitted entry size is recorded and the method returns
// true.
//
// Empty payloads are never refused. This is used both for appending an empty
// entry at a new leader's term, as well as leaving a joint configuration.
func (r *raft) increaseUncommittedSize(ents []pb.Entry) bool {
var s uint64
for _, e := range ents {
s += uint64(PayloadSize(e))
}

if r.uncommittedSize > 0 && r.uncommittedSize+s > r.maxUncommittedSize {
if r.uncommittedSize > 0 && s > 0 && r.uncommittedSize+s > r.maxUncommittedSize {
// If the uncommitted tail of the Raft log is empty, allow any size
// proposal. Otherwise, limit the size of the uncommitted tail of the
// log and drop any proposal that would push the size over the limit.
// Note the added requirement s>0 which is used to make sure that
// appending single empty entries to the log always succeeds, used both
// for replicating a new leader's initial empty entry, and for
// auto-leaving joint configurations.
return false
}
r.uncommittedSize += s
Expand Down
15 changes: 14 additions & 1 deletion raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ func TestUncommittedEntryLimit(t *testing.T) {
testEntry := pb.Entry{Data: []byte("testdata")}
maxEntrySize := maxEntries * PayloadSize(testEntry)

if n := PayloadSize(pb.Entry{Data: nil}); n != 0 {
t.Fatal("entry with no Data must have zero payload size")
}

cfg := newTestConfig(1, []uint64{1, 2, 3}, 5, 1, NewMemoryStorage())
cfg.MaxUncommittedEntriesSize = uint64(maxEntrySize)
cfg.MaxInflightMsgs = 2 * 1024 // avoid interference
Expand Down Expand Up @@ -244,10 +248,19 @@ func TestUncommittedEntryLimit(t *testing.T) {
t.Fatalf("proposal not dropped: %v", err)
}

// But we can always append an entry with no Data. This is used both for the
// leader's first empty entry and for auto-transitioning out of joint config
// states.
if err := r.Step(
pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}},
); err != nil {
t.Fatal(err)
}

// Read messages and reduce the uncommitted size as if we had committed
// these entries.
ms = r.readMessages()
if e := 1 * numFollowers; len(ms) != e {
if e := 2 * numFollowers; len(ms) != e {
t.Fatalf("expected %d messages, got %d", e, len(ms))
}
r.reduceUncommittedSize(propEnts)
Expand Down
10 changes: 5 additions & 5 deletions raft/raftpb/confchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (c ConfChangeV2) AsV1() (ConfChange, bool) { return ConfChange{}, false }
// than one change or if the use of Joint Consensus was requested explicitly.
// The first bool can only be true if second one is, and indicates whether the
// Joint State will be left automatically.
func (c *ConfChangeV2) EnterJoint() (autoLeave bool, ok bool) {
func (c ConfChangeV2) EnterJoint() (autoLeave bool, ok bool) {
// NB: in theory, more config changes could qualify for the "simple"
// protocol but it depends on the config on top of which the changes apply.
// For example, adding two learners is not OK if both nodes are part of the
Expand All @@ -100,10 +100,10 @@ func (c *ConfChangeV2) EnterJoint() (autoLeave bool, ok bool) {
// LeaveJoint is true if the configuration change leaves a joint configuration.
// This is the case if the ConfChangeV2 is zero, with the possible exception of
// the Context field.
func (c *ConfChangeV2) LeaveJoint() bool {
cpy := *c
cpy.Context = nil
return proto.Equal(&cpy, &ConfChangeV2{})
func (c ConfChangeV2) LeaveJoint() bool {
// NB: c is already a copy.
c.Context = nil
return proto.Equal(&c, &ConfChangeV2{})
}

// ConfChangesFromString parses a Space-delimited sequence of operations into a
Expand Down
2 changes: 1 addition & 1 deletion raft/rafttest/interaction_env_handler_deliver_msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (env *InteractionEnv) DeliverMsgs(rs ...Recipient) int {
}
toIdx := int(msg.To - 1)
if err := env.Nodes[toIdx].Step(msg); err != nil {
env.Output.WriteString(err.Error())
fmt.Fprintln(env.Output, err)
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion raft/rawnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ func (rn *RawNode) ProposeConfChange(cc pb.ConfChangeI) error {
return rn.raft.Step(m)
}

// ApplyConfChange applies a config change to the local node.
// ApplyConfChange applies a config change to the local node. The app must call
// this when it applies a configuration change, except when it decides to reject
// the configuration change, in which case no call must take place.
func (rn *RawNode) ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState {
cs := rn.raft.applyConfChange(cc.AsV2())
return &cs
Expand Down
27 changes: 20 additions & 7 deletions raft/rawnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,12 @@ func TestRawNodeProposeAndConfChange(t *testing.T) {
},
// Ditto implicit.
{
pb.ConfChangeV2{Changes: []pb.ConfChangeSingle{
{NodeID: 2, Type: pb.ConfChangeAddNode},
{NodeID: 1, Type: pb.ConfChangeAddLearnerNode},
{NodeID: 3, Type: pb.ConfChangeAddLearnerNode},
},
pb.ConfChangeV2{
Changes: []pb.ConfChangeSingle{
{NodeID: 2, Type: pb.ConfChangeAddNode},
{NodeID: 1, Type: pb.ConfChangeAddLearnerNode},
{NodeID: 3, Type: pb.ConfChangeAddLearnerNode},
},
Transition: pb.ConfChangeTransitionJointImplicit,
},
pb.ConfState{
Expand Down Expand Up @@ -282,7 +283,9 @@ func TestRawNodeProposeAndConfChange(t *testing.T) {
}

// Check that the last index is exactly the conf change we put in,
// down to the bits.
// down to the bits. Note that this comes from the Storage, which
// will not reflect any unstable entries that we'll only be presented
// with in the next Ready.
lastIndex, err = s.LastIndex()
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -313,7 +316,17 @@ func TestRawNodeProposeAndConfChange(t *testing.T) {
t.Fatalf("exp:\n%+v\nact:\n%+v", exp, cs)
}

if exp, act := lastIndex, rawNode.raft.pendingConfIndex; exp != act {
var maybePlusOne uint64
if autoLeave, ok := tc.cc.AsV2().EnterJoint(); ok && autoLeave {
// If this is an auto-leaving joint conf change, it will have
// appended the entry that auto-leaves, so add one to the last
// index that forms the basis of our expectations on
// pendingConfIndex. (Recall that lastIndex was taken from stable
// storage, but this auto-leaving entry isn't on stable storage
// yet).
maybePlusOne = 1
}
if exp, act := lastIndex+maybePlusOne, rawNode.raft.pendingConfIndex; exp != act {
t.Fatalf("pendingConfIndex: expected %d, got %d", exp, act)
}

Expand Down
Loading