Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: prevent unbounded raft log growth without quorum #27774

Merged
merged 2 commits into from
Jul 23, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 139 additions & 0 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
Expand Down Expand Up @@ -1126,6 +1127,144 @@ func TestRefreshPendingCommands(t *testing.T) {
}
}

// Test that when a Raft group is not able to establish a quorum, its Raft log
// does not grow without bound. It tests two different scenerios where this used
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scenerios 💩

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// to be possible (see #27772):
// 1. The leader proposes a command and cannot establish a quorum. The leader
// continually re-proposes the command.
// 2. The follower proposes a command and forwards it to the leader, who cannot
// establish a quorum. The follower continually re-proposes and forwards the
// command to the leader.
func TestLogGrowthWhenRefreshingPendingCommands(t *testing.T) {
defer leaktest.AfterTest(t)()

sc := storage.TestStoreConfig(nil)
// Drop the raft tick interval so the Raft group is ticked more.
sc.RaftTickInterval = 10 * time.Millisecond
// Don't timeout raft leader. We don't want leadership moving.
sc.RaftElectionTimeoutTicks = 1000000
// Disable leader transfers during leaseholder changes so that we
// can easily create leader-not-leaseholder scenarios.
sc.TestingKnobs.DisableLeaderFollowsLeaseholder = true
// Refresh pending commands on every Raft group tick instead of
// every RaftElectionTimeoutTicks.
sc.TestingKnobs.RefreshReasonTicksPeriod = 1
// Disable periodic gossip tasks which can move the range 1 lease
// unexpectedly.
sc.TestingKnobs.DisablePeriodicGossips = true
mtc := &multiTestContext{storeConfig: &sc}
defer mtc.Stop()
mtc.Start(t, 5)

const rangeID = roachpb.RangeID(1)
mtc.replicateRange(rangeID, 1, 2, 3, 4)

// Raft leadership is kept on node 0.
leaderRepl, err := mtc.Store(0).GetReplica(rangeID)
if err != nil {
t.Fatal(err)
}

// Put some data in the range so we'll have something to test for.
incArgs := incrementArgs([]byte("a"), 5)
if _, err := client.SendWrapped(context.Background(), mtc.stores[0].TestSender(), incArgs); err != nil {
t.Fatal(err)
}

// Wait for all nodes to catch up.
mtc.waitForValues(roachpb.Key("a"), []int64{5, 5, 5, 5, 5})

// Test proposing on leader and proposing on follower. Neither should result
// in unbounded raft log growth.
testutils.RunTrueAndFalse(t, "proposeOnFollower", func(t *testing.T, proposeOnFollower bool) {
// Restart any nodes that are down.
for _, s := range []int{2, 3, 4} {
if mtc.Store(s) == nil {
mtc.restartStore(s)
}
}

// Determine which node to propose on. Transfer lease to that node.
var propIdx, otherIdx int
if !proposeOnFollower {
propIdx, otherIdx = 0, 1
} else {
propIdx, otherIdx = 1, 0
}
propNode := mtc.stores[propIdx].TestSender()
mtc.transferLease(context.TODO(), rangeID, otherIdx, propIdx)
testutils.SucceedsSoon(t, func() error {
// Lease transfers may not be immediately observed by the new
// leaseholder. Wait until the new leaseholder is aware.
repl, err := mtc.Store(propIdx).GetReplica(rangeID)
if err != nil {
t.Fatal(err)
}
repDesc, err := repl.GetReplicaDescriptor()
if err != nil {
t.Fatal(err)
}
if lease, _ := repl.GetLease(); lease.Replica != repDesc {
return errors.Errorf("lease not transferred yet; found %v", lease)
}
return nil
})

// Stop enough nodes to prevent a quorum.
for _, s := range []int{2, 3, 4} {
mtc.stopStore(s)
}

// Determine the current raft log size.
initLogSize := leaderRepl.GetRaftLogSize()

// While a majority nodes are down, write some data.
putRes := make(chan *roachpb.Error)
go func() {
putArgs := putArgs([]byte("b"), make([]byte, 8<<10 /* 8 KB */))
_, err := client.SendWrapped(context.Background(), propNode, putArgs)
putRes <- err
}()

// Wait for a bit and watch for Raft log growth.
wait := time.After(500 * time.Millisecond)
ticker := time.Tick(50 * time.Millisecond)
Loop:
for {
select {
case <-wait:
break Loop
case <-ticker:
// Verify that the leader is node 0.
status := leaderRepl.RaftStatus()
if status == nil || status.RaftState != raft.StateLeader {
t.Fatalf("raft leader should be node 0, but got status %+v", status)
}

// Check raft log size.
const logSizeLimit = 64 << 10 // 64 KB
curlogSize := leaderRepl.GetRaftLogSize()
logSize := curlogSize - initLogSize
logSizeStr := humanizeutil.IBytes(logSize)
if logSize > logSizeLimit {
t.Fatalf("raft log size grew to %s", logSizeStr)
}
t.Logf("raft log size grew to %s", logSizeStr)
case err := <-putRes:
t.Fatalf("write finished with quorum unavailable; err=%v", err)
}
}

// Start enough nodes to establish a quorum.
mtc.restartStore(2)

// The write should now succeed.
if err := <-putRes; err != nil {
t.Fatal(err)
}
})
}

// TestStoreRangeUpReplicate verifies that the replication queue will notice
// under-replicated ranges and replicate them. Also tests that preemptive
// snapshots which contain sideloaded proposals don't panic the receiving end.
Expand Down
69 changes: 64 additions & 5 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,12 @@ type Replica struct {
// map must only be referenced while Replica.mu is held, except if the
// element is removed from the map first. The notable exception is the
// contained RaftCommand, which we treat as immutable.
proposals map[storagebase.CmdIDKey]*ProposalData
proposals map[storagebase.CmdIDKey]*ProposalData
// remoteProposals is maintained by Raft leaders and stores in-flight
// commands that were forwarded to the leader during its current term.
// The set allows leaders to detect duplicate forwarded commands and
// avoid re-proposing the same forwarded command multiple times.
remoteProposals map[storagebase.CmdIDKey]struct{}
internalRaftGroup *raft.RawNode
// The ID of the replica within the Raft group. May be 0 if the replica has
// been created from a preemptive snapshot (i.e. before being added to the
Expand Down Expand Up @@ -851,6 +856,7 @@ func (r *Replica) cancelPendingCommandsLocked() {
r.cleanupFailedProposalLocked(p)
p.finishApplication(pr)
}
r.mu.remoteProposals = nil
}

// cleanupFailedProposalLocked cleans up after a proposal that has failed. It
Expand Down Expand Up @@ -3747,6 +3753,39 @@ func (r *Replica) stepRaftGroup(req *RaftMessageRequest) error {
// other replica is not quiesced, so we don't need to wake the leader.
r.unquiesceLocked()
r.refreshLastUpdateTimeForReplicaLocked(req.FromReplica.ReplicaID)
if req.Message.Type == raftpb.MsgProp {
// A proposal was forwarded to this replica.
if r.mu.replicaID == r.mu.leaderID {
// This replica is the leader. Record that the proposal
// was seen and drop the proposal if it was already seen.
// This prevents duplicate forwarded proposals from each
// being appended to a leader's raft log.
allSeen := true
for _, e := range req.Message.Entries {
switch e.Type {
case raftpb.EntryNormal:
cmdID, _ := DecodeRaftCommand(e.Data)
if r.mu.remoteProposals == nil {
r.mu.remoteProposals = map[storagebase.CmdIDKey]struct{}{}
}
if _, ok := r.mu.remoteProposals[cmdID]; !ok {
r.mu.remoteProposals[cmdID] = struct{}{}
allSeen = false
}
case raftpb.EntryConfChange:
// We could peek into the EntryConfChange to find the
// command ID, but we don't expect follower-initiated
// conf changes.
allSeen = false
default:
log.Fatalf(context.TODO(), "unexpected Raft entry: %v", e)
}
}
if allSeen {
return false /* unquiesceAndWakeLeader */, nil
}
}
}
err := raftGroup.Step(req.Message)
if err == raft.ErrProposalDropped {
// A proposal was forwarded to this replica but we couldn't propose it.
Expand Down Expand Up @@ -3855,6 +3894,9 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
if !r.store.TestingKnobs().DisableRefreshReasonNewLeader {
refreshReason = reasonNewLeader
}
// Clear the remote proposal set. Would have been nil already if not
// previously the leader.
r.mu.remoteProposals = nil
leaderID = roachpb.ReplicaID(rd.SoftState.Lead)
}

Expand Down Expand Up @@ -4212,17 +4254,28 @@ func (r *Replica) tick(livenessMap map[roachpb.NodeID]bool) (bool, error) {

r.mu.ticks++
r.mu.internalRaftGroup.Tick()

refreshAtDelta := r.store.cfg.RaftElectionTimeoutTicks
if knob := r.store.TestingKnobs().RefreshReasonTicksPeriod; knob > 0 {
refreshAtDelta = knob
}
if !r.store.TestingKnobs().DisableRefreshReasonTicks &&
r.mu.ticks%r.store.cfg.RaftElectionTimeoutTicks == 0 {
r.mu.replicaID != r.mu.leaderID &&
r.mu.ticks%refreshAtDelta == 0 {
// RaftElectionTimeoutTicks is a reasonable approximation of how long we
// should wait before deciding that our previous proposal didn't go
// through. Note that the combination of the above condition and passing
// RaftElectionTimeoutTicks to refreshProposalsLocked means that commands
// will be refreshed when they have been pending for 1 to 2 election
// cycles.
r.refreshProposalsLocked(
r.store.cfg.RaftElectionTimeoutTicks, reasonTicks,
)
//
// However, we don't refresh proposals if we are the leader because
// doing so would be useless. The commands tracked by a leader replica
// were either all proposed when the replica was a leader or were
// re-proposed when the replica became a leader. Either way, they are
// guaranteed to be in the leader's Raft log so re-proposing won't do
// anything.
r.refreshProposalsLocked(refreshAtDelta, reasonTicks)
}
return true, nil
}
Expand Down Expand Up @@ -4314,6 +4367,9 @@ func (r *Replica) maybeTransferRaftLeader(
if !r.isLeaseValidRLocked(l, now) {
return
}
if r.store.TestingKnobs().DisableLeaderFollowsLeaseholder {
return
}
if pr, ok := status.Progress[uint64(l.Replica.ReplicaID)]; ok && pr.Match >= status.Commit {
log.VEventf(ctx, 1, "transferring raft leadership to replica ID %v", l.Replica.ReplicaID)
r.store.metrics.RangeRaftLeaderTransfers.Inc(1)
Expand Down Expand Up @@ -5024,6 +5080,9 @@ func (r *Replica) processRaftCommand(
delete(r.mu.proposals, idKey)
}

// Delete the entry for a forwarded proposal set.
delete(r.mu.remoteProposals, idKey)

leaseIndex, proposalRetry, forcedErr := r.checkForcedErrLocked(ctx, idKey, raftCmd, proposal, proposedLocally)

r.mu.Unlock()
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,8 @@ func (r *Replica) leasePostApply(ctx context.Context, newLease roachpb.Lease) {
r.txnWaitQueue.Clear(true /* disable */)
}

if !iAmTheLeaseHolder && r.IsLeaseValid(newLease, r.store.Clock().Now()) {
if !iAmTheLeaseHolder && r.IsLeaseValid(newLease, r.store.Clock().Now()) &&
!r.store.TestingKnobs().DisableLeaderFollowsLeaseholder {
// If this replica is the raft leader but it is not the new lease holder,
// then try to transfer the raft leadership to match the lease. We like it
// when leases and raft leadership are collocated because that facilitates
Expand Down
8 changes: 7 additions & 1 deletion pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8141,8 +8141,14 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) {
t.Fatal(err)
}

electionTicks := tc.store.cfg.RaftElectionTimeoutTicks
// Only followers refresh pending commands during tick events. Change the
// replica that the range thinks is the leader so that the replica thinks
// it's a follower.
r.mu.Lock()
r.mu.leaderID = 2
r.mu.Unlock()

electionTicks := tc.store.cfg.RaftElectionTimeoutTicks
{
// The verifications of the reproposal counts below rely on r.mu.ticks
// starting with a value of 0 (modulo electionTicks). Move the replica into
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,9 @@ type StoreTestingKnobs struct {
DisableScanner bool
// DisablePeriodicGossips disables periodic gossiping.
DisablePeriodicGossips bool
// DisableLeaderFollowsLeaseholder disables attempts to transfer raft
// leadership when it diverges from the range's leaseholder.
DisableLeaderFollowsLeaseholder bool
// DisableRefreshReasonTicks disables refreshing pending commands when a new
// leader is discovered.
DisableRefreshReasonNewLeader bool
Expand All @@ -768,6 +771,10 @@ type StoreTestingKnobs struct {
// DisableRefreshReasonTicks disables refreshing pending commands
// periodically.
DisableRefreshReasonTicks bool
// RefreshReasonTicksPeriod overrides the default period over which
// pending commands are refreshed. The period is specified as a multiple
// of Raft group ticks.
RefreshReasonTicksPeriod int
// DisableProcessRaft disables the process raft loop.
DisableProcessRaft bool
// DisableLastProcessedCheck disables checking on replica queue last processed times.
Expand Down