diff --git a/pkg/storage/client_raft_test.go b/pkg/storage/client_raft_test.go index 61997004e87c..171362323ad8 100644 --- a/pkg/storage/client_raft_test.go +++ b/pkg/storage/client_raft_test.go @@ -42,6 +42,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" @@ -1126,6 +1127,144 @@ func TestRefreshPendingCommands(t *testing.T) { } } +// Test that when a Raft group is not able to establish a quorum, its Raft log +// does not grow without bound. It tests two different scenerios where this used +// to be possible (see #27772): +// 1. The leader proposes a command and cannot establish a quorum. The leader +// continually re-proposes the command. +// 2. The follower proposes a command and forwards it to the leader, who cannot +// establish a quorum. The follower continually re-proposes and forwards the +// command to the leader. +func TestLogGrowthWhenRefreshingPendingCommands(t *testing.T) { + defer leaktest.AfterTest(t)() + + sc := storage.TestStoreConfig(nil) + // Drop the raft tick interval so the Raft group is ticked more. + sc.RaftTickInterval = 10 * time.Millisecond + // Don't timeout raft leader. We don't want leadership moving. + sc.RaftElectionTimeoutTicks = 1000000 + // Disable leader transfers during leaseholder changes so that we + // can easily create leader-not-leaseholder scenarios. + sc.TestingKnobs.DisableLeaderFollowsLeaseholder = true + // Refresh pending commands on every Raft group tick instead of + // every RaftElectionTimeoutTicks. + sc.TestingKnobs.RefreshReasonTicksPeriod = 1 + // Disable periodic gossip tasks which can move the range 1 lease + // unexpectedly. + sc.TestingKnobs.DisablePeriodicGossips = true + mtc := &multiTestContext{storeConfig: &sc} + defer mtc.Stop() + mtc.Start(t, 5) + + const rangeID = roachpb.RangeID(1) + mtc.replicateRange(rangeID, 1, 2, 3, 4) + + // Raft leadership is kept on node 0. + leaderRepl, err := mtc.Store(0).GetReplica(rangeID) + if err != nil { + t.Fatal(err) + } + + // Put some data in the range so we'll have something to test for. + incArgs := incrementArgs([]byte("a"), 5) + if _, err := client.SendWrapped(context.Background(), mtc.stores[0].TestSender(), incArgs); err != nil { + t.Fatal(err) + } + + // Wait for all nodes to catch up. + mtc.waitForValues(roachpb.Key("a"), []int64{5, 5, 5, 5, 5}) + + // Test proposing on leader and proposing on follower. Neither should result + // in unbounded raft log growth. + testutils.RunTrueAndFalse(t, "proposeOnFollower", func(t *testing.T, proposeOnFollower bool) { + // Restart any nodes that are down. + for _, s := range []int{2, 3, 4} { + if mtc.Store(s) == nil { + mtc.restartStore(s) + } + } + + // Determine which node to propose on. Transfer lease to that node. + var propIdx, otherIdx int + if !proposeOnFollower { + propIdx, otherIdx = 0, 1 + } else { + propIdx, otherIdx = 1, 0 + } + propNode := mtc.stores[propIdx].TestSender() + mtc.transferLease(context.TODO(), rangeID, otherIdx, propIdx) + testutils.SucceedsSoon(t, func() error { + // Lease transfers may not be immediately observed by the new + // leaseholder. Wait until the new leaseholder is aware. + repl, err := mtc.Store(propIdx).GetReplica(rangeID) + if err != nil { + t.Fatal(err) + } + repDesc, err := repl.GetReplicaDescriptor() + if err != nil { + t.Fatal(err) + } + if lease, _ := repl.GetLease(); lease.Replica != repDesc { + return errors.Errorf("lease not transferred yet; found %v", lease) + } + return nil + }) + + // Stop enough nodes to prevent a quorum. + for _, s := range []int{2, 3, 4} { + mtc.stopStore(s) + } + + // Determine the current raft log size. + initLogSize := leaderRepl.GetRaftLogSize() + + // While a majority nodes are down, write some data. + putRes := make(chan *roachpb.Error) + go func() { + putArgs := putArgs([]byte("b"), make([]byte, 8<<10 /* 8 KB */)) + _, err := client.SendWrapped(context.Background(), propNode, putArgs) + putRes <- err + }() + + // Wait for a bit and watch for Raft log growth. + wait := time.After(500 * time.Millisecond) + ticker := time.Tick(50 * time.Millisecond) + Loop: + for { + select { + case <-wait: + break Loop + case <-ticker: + // Verify that the leader is node 0. + status := leaderRepl.RaftStatus() + if status == nil || status.RaftState != raft.StateLeader { + t.Fatalf("raft leader should be node 0, but got status %+v", status) + } + + // Check raft log size. + const logSizeLimit = 64 << 10 // 64 KB + curlogSize := leaderRepl.GetRaftLogSize() + logSize := curlogSize - initLogSize + logSizeStr := humanizeutil.IBytes(logSize) + if logSize > logSizeLimit { + t.Fatalf("raft log size grew to %s", logSizeStr) + } + t.Logf("raft log size grew to %s", logSizeStr) + case err := <-putRes: + t.Fatalf("write finished with quorum unavailable; err=%v", err) + } + } + + // Start enough nodes to establish a quorum. + mtc.restartStore(2) + + // The write should now succeed. + if err := <-putRes; err != nil { + t.Fatal(err) + } + }) +} + // TestStoreRangeUpReplicate verifies that the replication queue will notice // under-replicated ranges and replicate them. Also tests that preemptive // snapshots which contain sideloaded proposals don't panic the receiving end. diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 3c5f1bb7ca9d..40908178f4ed 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -373,16 +373,20 @@ type Replica struct { minLeaseProposedTS hlc.Timestamp // Max bytes before split. maxBytes int64 - // proposals stores the Raft in-flight commands which - // originated at this Replica, i.e. all commands for which - // propose has been called, but which have not yet - // applied. + // localProposals stores the Raft in-flight commands which originated at + // this Replica, i.e. all commands for which propose has been called, + // but which have not yet applied. // // The *ProposalData in the map are "owned" by it. Elements from the // map must only be referenced while Replica.mu is held, except if the // element is removed from the map first. The notable exception is the // contained RaftCommand, which we treat as immutable. - proposals map[storagebase.CmdIDKey]*ProposalData + localProposals map[storagebase.CmdIDKey]*ProposalData + // remoteProposals is maintained by Raft leaders and stores in-flight + // commands that were forwarded to the leader during its current term. + // The set allows leaders to detect duplicate forwarded commands and + // avoid re-proposing the same forwarded command multiple times. + remoteProposals map[storagebase.CmdIDKey]struct{} internalRaftGroup *raft.RawNode // The ID of the replica within the Raft group. May be 0 if the replica has // been created from a preemptive snapshot (i.e. before being added to the @@ -691,7 +695,7 @@ func (r *Replica) initRaftMuLockedReplicaMuLocked( r.cmdQMu.queues[spanset.SpanLocal] = NewCommandQueue(false /* optimizeOverlap */) r.cmdQMu.Unlock() - r.mu.proposals = map[storagebase.CmdIDKey]*ProposalData{} + r.mu.localProposals = map[storagebase.CmdIDKey]*ProposalData{} r.mu.checksums = map[uuid.UUID]ReplicaChecksum{} // Clear the internal raft group in case we're being reset. Since we're // reloading the raft state below, it isn't safe to use the existing raft @@ -847,10 +851,11 @@ func (r *Replica) cancelPendingCommandsLocked() { Err: roachpb.NewError(roachpb.NewAmbiguousResultError("removing replica")), ProposalRetry: proposalRangeNoLongerExists, } - for _, p := range r.mu.proposals { + for _, p := range r.mu.localProposals { r.cleanupFailedProposalLocked(p) p.finishApplication(pr) } + r.mu.remoteProposals = nil } // cleanupFailedProposalLocked cleans up after a proposal that has failed. It @@ -858,7 +863,7 @@ func (r *Replica) cancelPendingCommandsLocked() { func (r *Replica) cleanupFailedProposalLocked(p *ProposalData) { // Clear the proposal from the proposals map. May be a no-op if the // proposal has not yet been inserted into the map. - delete(r.mu.proposals, p.idKey) + delete(r.mu.localProposals, p.idKey) // Release associated quota pool resources if we have been tracking // this command. // @@ -1860,7 +1865,7 @@ func (r *Replica) State() storagebase.RangeInfo { var ri storagebase.RangeInfo ri.ReplicaState = *(protoutil.Clone(&r.mu.state)).(*storagebase.ReplicaState) ri.LastIndex = r.mu.lastIndex - ri.NumPending = uint64(len(r.mu.proposals)) + ri.NumPending = uint64(len(r.mu.localProposals)) ri.RaftLogSize = r.mu.raftLogSize ri.NumDropped = uint64(r.mu.droppedMessages) if r.mu.proposalQuota != nil { @@ -3338,11 +3343,11 @@ func (r *Replica) insertProposalLocked( proposal.idKey, proposal.command.MaxLeaseIndex) } - if _, ok := r.mu.proposals[proposal.idKey]; ok { + if _, ok := r.mu.localProposals[proposal.idKey]; ok { ctx := r.AnnotateCtx(context.TODO()) log.Fatalf(ctx, "pending command already exists for %s", proposal.idKey) } - r.mu.proposals[proposal.idKey] = proposal + r.mu.localProposals[proposal.idKey] = proposal } func makeIDKey() storagebase.CmdIDKey { @@ -3550,7 +3555,7 @@ func (r *Replica) propose( // range. tryAbandon := func() bool { r.mu.Lock() - p, ok := r.mu.proposals[idKey] + p, ok := r.mu.localProposals[idKey] if ok { // TODO(radu): Should this context be created via tracer.ForkCtxSpan? // We'd need to make sure the span is finished eventually. @@ -3677,9 +3682,9 @@ func (r *Replica) quiesce() bool { func (r *Replica) quiesceLocked() bool { ctx := r.AnnotateCtx(context.TODO()) - if len(r.mu.proposals) != 0 { + if len(r.mu.localProposals) != 0 { if log.V(3) { - log.Infof(ctx, "not quiescing: %d pending commands", len(r.mu.proposals)) + log.Infof(ctx, "not quiescing: %d pending commands", len(r.mu.localProposals)) } return false } @@ -3747,6 +3752,39 @@ func (r *Replica) stepRaftGroup(req *RaftMessageRequest) error { // other replica is not quiesced, so we don't need to wake the leader. r.unquiesceLocked() r.refreshLastUpdateTimeForReplicaLocked(req.FromReplica.ReplicaID) + if req.Message.Type == raftpb.MsgProp { + // A proposal was forwarded to this replica. + if r.mu.replicaID == r.mu.leaderID { + // This replica is the leader. Record that the proposal + // was seen and drop the proposal if it was already seen. + // This prevents duplicate forwarded proposals from each + // being appended to a leader's raft log. + allSeen := true + for _, e := range req.Message.Entries { + switch e.Type { + case raftpb.EntryNormal: + cmdID, _ := DecodeRaftCommand(e.Data) + if r.mu.remoteProposals == nil { + r.mu.remoteProposals = map[storagebase.CmdIDKey]struct{}{} + } + if _, ok := r.mu.remoteProposals[cmdID]; !ok { + r.mu.remoteProposals[cmdID] = struct{}{} + allSeen = false + } + case raftpb.EntryConfChange: + // We could peek into the EntryConfChange to find the + // command ID, but we don't expect follower-initiated + // conf changes. + allSeen = false + default: + log.Fatalf(context.TODO(), "unexpected Raft entry: %v", e) + } + } + if allSeen { + return false /* unquiesceAndWakeLeader */, nil + } + } + } err := raftGroup.Step(req.Message) if err == raft.ErrProposalDropped { // A proposal was forwarded to this replica but we couldn't propose it. @@ -3855,6 +3893,9 @@ func (r *Replica) handleRaftReadyRaftMuLocked( if !r.store.TestingKnobs().DisableRefreshReasonNewLeader { refreshReason = reasonNewLeader } + // Clear the remote proposal set. Would have been nil already if not + // previously the leader. + r.mu.remoteProposals = nil leaderID = roachpb.ReplicaID(rd.SoftState.Lead) } @@ -4212,17 +4253,28 @@ func (r *Replica) tick(livenessMap map[roachpb.NodeID]bool) (bool, error) { r.mu.ticks++ r.mu.internalRaftGroup.Tick() + + refreshAtDelta := r.store.cfg.RaftElectionTimeoutTicks + if knob := r.store.TestingKnobs().RefreshReasonTicksPeriod; knob > 0 { + refreshAtDelta = knob + } if !r.store.TestingKnobs().DisableRefreshReasonTicks && - r.mu.ticks%r.store.cfg.RaftElectionTimeoutTicks == 0 { + r.mu.replicaID != r.mu.leaderID && + r.mu.ticks%refreshAtDelta == 0 { // RaftElectionTimeoutTicks is a reasonable approximation of how long we // should wait before deciding that our previous proposal didn't go // through. Note that the combination of the above condition and passing // RaftElectionTimeoutTicks to refreshProposalsLocked means that commands // will be refreshed when they have been pending for 1 to 2 election // cycles. - r.refreshProposalsLocked( - r.store.cfg.RaftElectionTimeoutTicks, reasonTicks, - ) + // + // However, we don't refresh proposals if we are the leader because + // doing so would be useless. The commands tracked by a leader replica + // were either all proposed when the replica was a leader or were + // re-proposed when the replica became a leader. Either way, they are + // guaranteed to be in the leader's Raft log so re-proposing won't do + // anything. + r.refreshProposalsLocked(refreshAtDelta, reasonTicks) } return true, nil } @@ -4286,7 +4338,7 @@ func (r *Replica) tick(livenessMap map[roachpb.NodeID]bool) (bool, error) { // correctness issues. func (r *Replica) maybeQuiesceLocked(livenessMap map[roachpb.NodeID]bool) bool { ctx := r.AnnotateCtx(context.TODO()) - status, ok := shouldReplicaQuiesce(ctx, r, r.store.Clock().Now(), len(r.mu.proposals), livenessMap) + status, ok := shouldReplicaQuiesce(ctx, r, r.store.Clock().Now(), len(r.mu.localProposals), livenessMap) if !ok { return false } @@ -4314,6 +4366,9 @@ func (r *Replica) maybeTransferRaftLeader( if !r.isLeaseValidRLocked(l, now) { return } + if r.store.TestingKnobs().DisableLeaderFollowsLeaseholder { + return + } if pr, ok := status.Progress[uint64(l.Replica.ReplicaID)]; ok && pr.Match >= status.Commit { log.VEventf(ctx, 1, "transferring raft leadership to replica ID %v", l.Replica.ReplicaID) r.store.metrics.RangeRaftLeaderTransfers.Inc(1) @@ -4544,7 +4599,7 @@ func (r *Replica) refreshProposalsLocked(refreshAtDelta int, reason refreshRaftR numShouldRetry := 0 var reproposals pendingCmdSlice - for _, p := range r.mu.proposals { + for _, p := range r.mu.localProposals { if p.command.MaxLeaseIndex == 0 { // Commands without a MaxLeaseIndex cannot be reproposed, as they might // apply twice. We also don't want to ask the proposer to retry these @@ -5014,16 +5069,19 @@ func (r *Replica) processRaftCommand( } r.mu.Lock() - proposal, proposedLocally := r.mu.proposals[idKey] + proposal, proposedLocally := r.mu.localProposals[idKey] // TODO(tschottdorf): consider the Trace situation here. if proposedLocally { // We initiated this command, so use the caller-supplied context. ctx = proposal.ctx proposal.ctx = nil // avoid confusion - delete(r.mu.proposals, idKey) + delete(r.mu.localProposals, idKey) } + // Delete the entry for a forwarded proposal set. + delete(r.mu.remoteProposals, idKey) + leaseIndex, proposalRetry, forcedErr := r.checkForcedErrLocked(ctx, idKey, raftCmd, proposal, proposedLocally) r.mu.Unlock() diff --git a/pkg/storage/replica_proposal.go b/pkg/storage/replica_proposal.go index 9cd31af75205..5290b9a9f4a2 100644 --- a/pkg/storage/replica_proposal.go +++ b/pkg/storage/replica_proposal.go @@ -310,7 +310,8 @@ func (r *Replica) leasePostApply(ctx context.Context, newLease roachpb.Lease) { r.txnWaitQueue.Clear(true /* disable */) } - if !iAmTheLeaseHolder && r.IsLeaseValid(newLease, r.store.Clock().Now()) { + if !iAmTheLeaseHolder && r.IsLeaseValid(newLease, r.store.Clock().Now()) && + !r.store.TestingKnobs().DisableLeaderFollowsLeaseholder { // If this replica is the raft leader but it is not the new lease holder, // then try to transfer the raft leadership to match the lease. We like it // when leases and raft leadership are collocated because that facilitates diff --git a/pkg/storage/replica_sideload.go b/pkg/storage/replica_sideload.go index 12962034a976..571cb3116335 100644 --- a/pkg/storage/replica_sideload.go +++ b/pkg/storage/replica_sideload.go @@ -70,7 +70,7 @@ func (r *Replica) maybeSideloadEntriesRaftMuLocked( maybeRaftCommand := func(cmdID storagebase.CmdIDKey) (storagebase.RaftCommand, bool) { r.mu.Lock() defer r.mu.Unlock() - cmd, ok := r.mu.proposals[cmdID] + cmd, ok := r.mu.localProposals[cmdID] if ok { return *cmd.command, true } diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 88d57dbe9e06..9b25207305af 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -7455,7 +7455,7 @@ func TestReplicaTryAbandon(t *testing.T) { func() { tc.repl.mu.Lock() defer tc.repl.mu.Unlock() - if len(tc.repl.mu.proposals) == 0 { + if len(tc.repl.mu.localProposals) == 0 { t.Fatal("expected non-empty proposals map") } }() @@ -8010,7 +8010,7 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) { } tc.repl.mu.Lock() - for _, p := range tc.repl.mu.proposals { + for _, p := range tc.repl.mu.localProposals { if v := p.ctx.Value(magicKey{}); v != nil { origIndexes = append(origIndexes, int(p.command.MaxLeaseIndex)) } @@ -8042,13 +8042,13 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) { tc.repl.mu.Lock() defer tc.repl.mu.Unlock() - nonePending := len(tc.repl.mu.proposals) == 0 + nonePending := len(tc.repl.mu.localProposals) == 0 c := int(tc.repl.mu.lastAssignedLeaseIndex) - int(tc.repl.mu.state.LeaseAppliedIndex) if nonePending && c > 0 { t.Errorf("no pending cmds, but have required index offset %d", c) } if !nonePending { - t.Fatalf("still pending commands: %+v", tc.repl.mu.proposals) + t.Fatalf("still pending commands: %+v", tc.repl.mu.localProposals) } } @@ -8141,8 +8141,14 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) { t.Fatal(err) } - electionTicks := tc.store.cfg.RaftElectionTimeoutTicks + // Only followers refresh pending commands during tick events. Change the + // replica that the range thinks is the leader so that the replica thinks + // it's a follower. + r.mu.Lock() + r.mu.leaderID = 2 + r.mu.Unlock() + electionTicks := tc.store.cfg.RaftElectionTimeoutTicks { // The verifications of the reproposal counts below rely on r.mu.ticks // starting with a value of 0 (modulo electionTicks). Move the replica into @@ -8200,7 +8206,7 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) { } // Build the map of expected reproposals at this stage. m := map[storagebase.CmdIDKey]int{} - for id, p := range r.mu.proposals { + for id, p := range r.mu.localProposals { m[id] = p.proposedAtTicks } r.mu.Unlock() diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 34d384a68190..0af044087c0b 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -759,6 +759,9 @@ type StoreTestingKnobs struct { DisableScanner bool // DisablePeriodicGossips disables periodic gossiping. DisablePeriodicGossips bool + // DisableLeaderFollowsLeaseholder disables attempts to transfer raft + // leadership when it diverges from the range's leaseholder. + DisableLeaderFollowsLeaseholder bool // DisableRefreshReasonTicks disables refreshing pending commands when a new // leader is discovered. DisableRefreshReasonNewLeader bool @@ -768,6 +771,10 @@ type StoreTestingKnobs struct { // DisableRefreshReasonTicks disables refreshing pending commands // periodically. DisableRefreshReasonTicks bool + // RefreshReasonTicksPeriod overrides the default period over which + // pending commands are refreshed. The period is specified as a multiple + // of Raft group ticks. + RefreshReasonTicksPeriod int // DisableProcessRaft disables the process raft loop. DisableProcessRaft bool // DisableLastProcessedCheck disables checking on replica queue last processed times.