From 03b116f631cc41b8a26b02b542a7401eecb6ff62 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Thu, 19 Jul 2018 20:34:16 -0400 Subject: [PATCH 1/2] storage: prevent unbounded raft log growth without quorum Fixes #27772. This change adds safeguards to prevent cases where a raft log would grow without bound during loss of quorum scenarios. It also adds a new test that demonstrates that the raft log does not grow without bound in these cases. There are two cases that need to be handled to prevent the unbounded raft log growth observed in #27772. 1. When the leader proposes a command and cannot establish a quorum. In this case, we know the leader has the entry in its log, so there's no need to refresh it with `reasonTicks`. To avoid this, we no longer use `refreshTicks` as a leader. 2. When a follower proposes a command that is forwarded to the leader who cannot establish a quorum. In this case, the follower can't be sure (currently) that the leader got the proposal, so it needs to refresh using `reasonTicks`. However, the leader now detects duplicate forwarded proposals and avoids appending redundant entries to its log. It does so by maintaining a set of in-flight forwarded proposals that it has received during its term as leader. This set is reset after every leadership change. Both of these cases are tested against in the new TestLogGrowthWhenRefreshingPendingCommands. Without both of the safeguards introduced in this commit, the test fails. Release note (bug fix): Prevent unbounded growth of the raft log caused by a loss of quorum. --- pkg/storage/client_raft_test.go | 139 ++++++++++++++++++++++++++++++++ pkg/storage/replica.go | 69 ++++++++++++++-- pkg/storage/replica_proposal.go | 3 +- pkg/storage/replica_test.go | 8 +- pkg/storage/store.go | 7 ++ 5 files changed, 219 insertions(+), 7 deletions(-) diff --git a/pkg/storage/client_raft_test.go b/pkg/storage/client_raft_test.go index 61997004e87c..171362323ad8 100644 --- a/pkg/storage/client_raft_test.go +++ b/pkg/storage/client_raft_test.go @@ -42,6 +42,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" @@ -1126,6 +1127,144 @@ func TestRefreshPendingCommands(t *testing.T) { } } +// Test that when a Raft group is not able to establish a quorum, its Raft log +// does not grow without bound. It tests two different scenerios where this used +// to be possible (see #27772): +// 1. The leader proposes a command and cannot establish a quorum. The leader +// continually re-proposes the command. +// 2. The follower proposes a command and forwards it to the leader, who cannot +// establish a quorum. The follower continually re-proposes and forwards the +// command to the leader. +func TestLogGrowthWhenRefreshingPendingCommands(t *testing.T) { + defer leaktest.AfterTest(t)() + + sc := storage.TestStoreConfig(nil) + // Drop the raft tick interval so the Raft group is ticked more. + sc.RaftTickInterval = 10 * time.Millisecond + // Don't timeout raft leader. We don't want leadership moving. + sc.RaftElectionTimeoutTicks = 1000000 + // Disable leader transfers during leaseholder changes so that we + // can easily create leader-not-leaseholder scenarios. + sc.TestingKnobs.DisableLeaderFollowsLeaseholder = true + // Refresh pending commands on every Raft group tick instead of + // every RaftElectionTimeoutTicks. + sc.TestingKnobs.RefreshReasonTicksPeriod = 1 + // Disable periodic gossip tasks which can move the range 1 lease + // unexpectedly. + sc.TestingKnobs.DisablePeriodicGossips = true + mtc := &multiTestContext{storeConfig: &sc} + defer mtc.Stop() + mtc.Start(t, 5) + + const rangeID = roachpb.RangeID(1) + mtc.replicateRange(rangeID, 1, 2, 3, 4) + + // Raft leadership is kept on node 0. + leaderRepl, err := mtc.Store(0).GetReplica(rangeID) + if err != nil { + t.Fatal(err) + } + + // Put some data in the range so we'll have something to test for. + incArgs := incrementArgs([]byte("a"), 5) + if _, err := client.SendWrapped(context.Background(), mtc.stores[0].TestSender(), incArgs); err != nil { + t.Fatal(err) + } + + // Wait for all nodes to catch up. + mtc.waitForValues(roachpb.Key("a"), []int64{5, 5, 5, 5, 5}) + + // Test proposing on leader and proposing on follower. Neither should result + // in unbounded raft log growth. + testutils.RunTrueAndFalse(t, "proposeOnFollower", func(t *testing.T, proposeOnFollower bool) { + // Restart any nodes that are down. + for _, s := range []int{2, 3, 4} { + if mtc.Store(s) == nil { + mtc.restartStore(s) + } + } + + // Determine which node to propose on. Transfer lease to that node. + var propIdx, otherIdx int + if !proposeOnFollower { + propIdx, otherIdx = 0, 1 + } else { + propIdx, otherIdx = 1, 0 + } + propNode := mtc.stores[propIdx].TestSender() + mtc.transferLease(context.TODO(), rangeID, otherIdx, propIdx) + testutils.SucceedsSoon(t, func() error { + // Lease transfers may not be immediately observed by the new + // leaseholder. Wait until the new leaseholder is aware. + repl, err := mtc.Store(propIdx).GetReplica(rangeID) + if err != nil { + t.Fatal(err) + } + repDesc, err := repl.GetReplicaDescriptor() + if err != nil { + t.Fatal(err) + } + if lease, _ := repl.GetLease(); lease.Replica != repDesc { + return errors.Errorf("lease not transferred yet; found %v", lease) + } + return nil + }) + + // Stop enough nodes to prevent a quorum. + for _, s := range []int{2, 3, 4} { + mtc.stopStore(s) + } + + // Determine the current raft log size. + initLogSize := leaderRepl.GetRaftLogSize() + + // While a majority nodes are down, write some data. + putRes := make(chan *roachpb.Error) + go func() { + putArgs := putArgs([]byte("b"), make([]byte, 8<<10 /* 8 KB */)) + _, err := client.SendWrapped(context.Background(), propNode, putArgs) + putRes <- err + }() + + // Wait for a bit and watch for Raft log growth. + wait := time.After(500 * time.Millisecond) + ticker := time.Tick(50 * time.Millisecond) + Loop: + for { + select { + case <-wait: + break Loop + case <-ticker: + // Verify that the leader is node 0. + status := leaderRepl.RaftStatus() + if status == nil || status.RaftState != raft.StateLeader { + t.Fatalf("raft leader should be node 0, but got status %+v", status) + } + + // Check raft log size. + const logSizeLimit = 64 << 10 // 64 KB + curlogSize := leaderRepl.GetRaftLogSize() + logSize := curlogSize - initLogSize + logSizeStr := humanizeutil.IBytes(logSize) + if logSize > logSizeLimit { + t.Fatalf("raft log size grew to %s", logSizeStr) + } + t.Logf("raft log size grew to %s", logSizeStr) + case err := <-putRes: + t.Fatalf("write finished with quorum unavailable; err=%v", err) + } + } + + // Start enough nodes to establish a quorum. + mtc.restartStore(2) + + // The write should now succeed. + if err := <-putRes; err != nil { + t.Fatal(err) + } + }) +} + // TestStoreRangeUpReplicate verifies that the replication queue will notice // under-replicated ranges and replicate them. Also tests that preemptive // snapshots which contain sideloaded proposals don't panic the receiving end. diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 3c5f1bb7ca9d..6e2474d2e0c1 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -382,7 +382,12 @@ type Replica struct { // map must only be referenced while Replica.mu is held, except if the // element is removed from the map first. The notable exception is the // contained RaftCommand, which we treat as immutable. - proposals map[storagebase.CmdIDKey]*ProposalData + proposals map[storagebase.CmdIDKey]*ProposalData + // remoteProposals is maintained by Raft leaders and stores in-flight + // commands that were forwarded to the leader during its current term. + // The set allows leaders to detect duplicate forwarded commands and + // avoid re-proposing the same forwarded command multiple times. + remoteProposals map[storagebase.CmdIDKey]struct{} internalRaftGroup *raft.RawNode // The ID of the replica within the Raft group. May be 0 if the replica has // been created from a preemptive snapshot (i.e. before being added to the @@ -851,6 +856,7 @@ func (r *Replica) cancelPendingCommandsLocked() { r.cleanupFailedProposalLocked(p) p.finishApplication(pr) } + r.mu.remoteProposals = nil } // cleanupFailedProposalLocked cleans up after a proposal that has failed. It @@ -3747,6 +3753,39 @@ func (r *Replica) stepRaftGroup(req *RaftMessageRequest) error { // other replica is not quiesced, so we don't need to wake the leader. r.unquiesceLocked() r.refreshLastUpdateTimeForReplicaLocked(req.FromReplica.ReplicaID) + if req.Message.Type == raftpb.MsgProp { + // A proposal was forwarded to this replica. + if r.mu.replicaID == r.mu.leaderID { + // This replica is the leader. Record that the proposal + // was seen and drop the proposal if it was already seen. + // This prevents duplicate forwarded proposals from each + // being appended to a leader's raft log. + allSeen := true + for _, e := range req.Message.Entries { + switch e.Type { + case raftpb.EntryNormal: + cmdID, _ := DecodeRaftCommand(e.Data) + if r.mu.remoteProposals == nil { + r.mu.remoteProposals = map[storagebase.CmdIDKey]struct{}{} + } + if _, ok := r.mu.remoteProposals[cmdID]; !ok { + r.mu.remoteProposals[cmdID] = struct{}{} + allSeen = false + } + case raftpb.EntryConfChange: + // We could peek into the EntryConfChange to find the + // command ID, but we don't expect follower-initiated + // conf changes. + allSeen = false + default: + log.Fatalf(context.TODO(), "unexpected Raft entry: %v", e) + } + } + if allSeen { + return false /* unquiesceAndWakeLeader */, nil + } + } + } err := raftGroup.Step(req.Message) if err == raft.ErrProposalDropped { // A proposal was forwarded to this replica but we couldn't propose it. @@ -3855,6 +3894,9 @@ func (r *Replica) handleRaftReadyRaftMuLocked( if !r.store.TestingKnobs().DisableRefreshReasonNewLeader { refreshReason = reasonNewLeader } + // Clear the remote proposal set. Would have been nil already if not + // previously the leader. + r.mu.remoteProposals = nil leaderID = roachpb.ReplicaID(rd.SoftState.Lead) } @@ -4212,17 +4254,28 @@ func (r *Replica) tick(livenessMap map[roachpb.NodeID]bool) (bool, error) { r.mu.ticks++ r.mu.internalRaftGroup.Tick() + + refreshAtDelta := r.store.cfg.RaftElectionTimeoutTicks + if knob := r.store.TestingKnobs().RefreshReasonTicksPeriod; knob > 0 { + refreshAtDelta = knob + } if !r.store.TestingKnobs().DisableRefreshReasonTicks && - r.mu.ticks%r.store.cfg.RaftElectionTimeoutTicks == 0 { + r.mu.replicaID != r.mu.leaderID && + r.mu.ticks%refreshAtDelta == 0 { // RaftElectionTimeoutTicks is a reasonable approximation of how long we // should wait before deciding that our previous proposal didn't go // through. Note that the combination of the above condition and passing // RaftElectionTimeoutTicks to refreshProposalsLocked means that commands // will be refreshed when they have been pending for 1 to 2 election // cycles. - r.refreshProposalsLocked( - r.store.cfg.RaftElectionTimeoutTicks, reasonTicks, - ) + // + // However, we don't refresh proposals if we are the leader because + // doing so would be useless. The commands tracked by a leader replica + // were either all proposed when the replica was a leader or were + // re-proposed when the replica became a leader. Either way, they are + // guaranteed to be in the leader's Raft log so re-proposing won't do + // anything. + r.refreshProposalsLocked(refreshAtDelta, reasonTicks) } return true, nil } @@ -4314,6 +4367,9 @@ func (r *Replica) maybeTransferRaftLeader( if !r.isLeaseValidRLocked(l, now) { return } + if r.store.TestingKnobs().DisableLeaderFollowsLeaseholder { + return + } if pr, ok := status.Progress[uint64(l.Replica.ReplicaID)]; ok && pr.Match >= status.Commit { log.VEventf(ctx, 1, "transferring raft leadership to replica ID %v", l.Replica.ReplicaID) r.store.metrics.RangeRaftLeaderTransfers.Inc(1) @@ -5024,6 +5080,9 @@ func (r *Replica) processRaftCommand( delete(r.mu.proposals, idKey) } + // Delete the entry for a forwarded proposal set. + delete(r.mu.remoteProposals, idKey) + leaseIndex, proposalRetry, forcedErr := r.checkForcedErrLocked(ctx, idKey, raftCmd, proposal, proposedLocally) r.mu.Unlock() diff --git a/pkg/storage/replica_proposal.go b/pkg/storage/replica_proposal.go index 9cd31af75205..5290b9a9f4a2 100644 --- a/pkg/storage/replica_proposal.go +++ b/pkg/storage/replica_proposal.go @@ -310,7 +310,8 @@ func (r *Replica) leasePostApply(ctx context.Context, newLease roachpb.Lease) { r.txnWaitQueue.Clear(true /* disable */) } - if !iAmTheLeaseHolder && r.IsLeaseValid(newLease, r.store.Clock().Now()) { + if !iAmTheLeaseHolder && r.IsLeaseValid(newLease, r.store.Clock().Now()) && + !r.store.TestingKnobs().DisableLeaderFollowsLeaseholder { // If this replica is the raft leader but it is not the new lease holder, // then try to transfer the raft leadership to match the lease. We like it // when leases and raft leadership are collocated because that facilitates diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 88d57dbe9e06..f02895fdf23b 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -8141,8 +8141,14 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) { t.Fatal(err) } - electionTicks := tc.store.cfg.RaftElectionTimeoutTicks + // Only followers refresh pending commands during tick events. Change the + // replica that the range thinks is the leader so that the replica thinks + // it's a follower. + r.mu.Lock() + r.mu.leaderID = 2 + r.mu.Unlock() + electionTicks := tc.store.cfg.RaftElectionTimeoutTicks { // The verifications of the reproposal counts below rely on r.mu.ticks // starting with a value of 0 (modulo electionTicks). Move the replica into diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 34d384a68190..0af044087c0b 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -759,6 +759,9 @@ type StoreTestingKnobs struct { DisableScanner bool // DisablePeriodicGossips disables periodic gossiping. DisablePeriodicGossips bool + // DisableLeaderFollowsLeaseholder disables attempts to transfer raft + // leadership when it diverges from the range's leaseholder. + DisableLeaderFollowsLeaseholder bool // DisableRefreshReasonTicks disables refreshing pending commands when a new // leader is discovered. DisableRefreshReasonNewLeader bool @@ -768,6 +771,10 @@ type StoreTestingKnobs struct { // DisableRefreshReasonTicks disables refreshing pending commands // periodically. DisableRefreshReasonTicks bool + // RefreshReasonTicksPeriod overrides the default period over which + // pending commands are refreshed. The period is specified as a multiple + // of Raft group ticks. + RefreshReasonTicksPeriod int // DisableProcessRaft disables the process raft loop. DisableProcessRaft bool // DisableLastProcessedCheck disables checking on replica queue last processed times. From 192a828813ab28c8065c92deeab177daaaf0f680 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Mon, 23 Jul 2018 09:35:23 -0400 Subject: [PATCH 2/2] storage: s/replica.mu.proposals/replica.mu.localProposals/ Release note: None --- pkg/storage/replica.go | 35 ++++++++++++++++----------------- pkg/storage/replica_sideload.go | 2 +- pkg/storage/replica_test.go | 10 +++++----- 3 files changed, 23 insertions(+), 24 deletions(-) diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 6e2474d2e0c1..40908178f4ed 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -373,16 +373,15 @@ type Replica struct { minLeaseProposedTS hlc.Timestamp // Max bytes before split. maxBytes int64 - // proposals stores the Raft in-flight commands which - // originated at this Replica, i.e. all commands for which - // propose has been called, but which have not yet - // applied. + // localProposals stores the Raft in-flight commands which originated at + // this Replica, i.e. all commands for which propose has been called, + // but which have not yet applied. // // The *ProposalData in the map are "owned" by it. Elements from the // map must only be referenced while Replica.mu is held, except if the // element is removed from the map first. The notable exception is the // contained RaftCommand, which we treat as immutable. - proposals map[storagebase.CmdIDKey]*ProposalData + localProposals map[storagebase.CmdIDKey]*ProposalData // remoteProposals is maintained by Raft leaders and stores in-flight // commands that were forwarded to the leader during its current term. // The set allows leaders to detect duplicate forwarded commands and @@ -696,7 +695,7 @@ func (r *Replica) initRaftMuLockedReplicaMuLocked( r.cmdQMu.queues[spanset.SpanLocal] = NewCommandQueue(false /* optimizeOverlap */) r.cmdQMu.Unlock() - r.mu.proposals = map[storagebase.CmdIDKey]*ProposalData{} + r.mu.localProposals = map[storagebase.CmdIDKey]*ProposalData{} r.mu.checksums = map[uuid.UUID]ReplicaChecksum{} // Clear the internal raft group in case we're being reset. Since we're // reloading the raft state below, it isn't safe to use the existing raft @@ -852,7 +851,7 @@ func (r *Replica) cancelPendingCommandsLocked() { Err: roachpb.NewError(roachpb.NewAmbiguousResultError("removing replica")), ProposalRetry: proposalRangeNoLongerExists, } - for _, p := range r.mu.proposals { + for _, p := range r.mu.localProposals { r.cleanupFailedProposalLocked(p) p.finishApplication(pr) } @@ -864,7 +863,7 @@ func (r *Replica) cancelPendingCommandsLocked() { func (r *Replica) cleanupFailedProposalLocked(p *ProposalData) { // Clear the proposal from the proposals map. May be a no-op if the // proposal has not yet been inserted into the map. - delete(r.mu.proposals, p.idKey) + delete(r.mu.localProposals, p.idKey) // Release associated quota pool resources if we have been tracking // this command. // @@ -1866,7 +1865,7 @@ func (r *Replica) State() storagebase.RangeInfo { var ri storagebase.RangeInfo ri.ReplicaState = *(protoutil.Clone(&r.mu.state)).(*storagebase.ReplicaState) ri.LastIndex = r.mu.lastIndex - ri.NumPending = uint64(len(r.mu.proposals)) + ri.NumPending = uint64(len(r.mu.localProposals)) ri.RaftLogSize = r.mu.raftLogSize ri.NumDropped = uint64(r.mu.droppedMessages) if r.mu.proposalQuota != nil { @@ -3344,11 +3343,11 @@ func (r *Replica) insertProposalLocked( proposal.idKey, proposal.command.MaxLeaseIndex) } - if _, ok := r.mu.proposals[proposal.idKey]; ok { + if _, ok := r.mu.localProposals[proposal.idKey]; ok { ctx := r.AnnotateCtx(context.TODO()) log.Fatalf(ctx, "pending command already exists for %s", proposal.idKey) } - r.mu.proposals[proposal.idKey] = proposal + r.mu.localProposals[proposal.idKey] = proposal } func makeIDKey() storagebase.CmdIDKey { @@ -3556,7 +3555,7 @@ func (r *Replica) propose( // range. tryAbandon := func() bool { r.mu.Lock() - p, ok := r.mu.proposals[idKey] + p, ok := r.mu.localProposals[idKey] if ok { // TODO(radu): Should this context be created via tracer.ForkCtxSpan? // We'd need to make sure the span is finished eventually. @@ -3683,9 +3682,9 @@ func (r *Replica) quiesce() bool { func (r *Replica) quiesceLocked() bool { ctx := r.AnnotateCtx(context.TODO()) - if len(r.mu.proposals) != 0 { + if len(r.mu.localProposals) != 0 { if log.V(3) { - log.Infof(ctx, "not quiescing: %d pending commands", len(r.mu.proposals)) + log.Infof(ctx, "not quiescing: %d pending commands", len(r.mu.localProposals)) } return false } @@ -4339,7 +4338,7 @@ func (r *Replica) tick(livenessMap map[roachpb.NodeID]bool) (bool, error) { // correctness issues. func (r *Replica) maybeQuiesceLocked(livenessMap map[roachpb.NodeID]bool) bool { ctx := r.AnnotateCtx(context.TODO()) - status, ok := shouldReplicaQuiesce(ctx, r, r.store.Clock().Now(), len(r.mu.proposals), livenessMap) + status, ok := shouldReplicaQuiesce(ctx, r, r.store.Clock().Now(), len(r.mu.localProposals), livenessMap) if !ok { return false } @@ -4600,7 +4599,7 @@ func (r *Replica) refreshProposalsLocked(refreshAtDelta int, reason refreshRaftR numShouldRetry := 0 var reproposals pendingCmdSlice - for _, p := range r.mu.proposals { + for _, p := range r.mu.localProposals { if p.command.MaxLeaseIndex == 0 { // Commands without a MaxLeaseIndex cannot be reproposed, as they might // apply twice. We also don't want to ask the proposer to retry these @@ -5070,14 +5069,14 @@ func (r *Replica) processRaftCommand( } r.mu.Lock() - proposal, proposedLocally := r.mu.proposals[idKey] + proposal, proposedLocally := r.mu.localProposals[idKey] // TODO(tschottdorf): consider the Trace situation here. if proposedLocally { // We initiated this command, so use the caller-supplied context. ctx = proposal.ctx proposal.ctx = nil // avoid confusion - delete(r.mu.proposals, idKey) + delete(r.mu.localProposals, idKey) } // Delete the entry for a forwarded proposal set. diff --git a/pkg/storage/replica_sideload.go b/pkg/storage/replica_sideload.go index 12962034a976..571cb3116335 100644 --- a/pkg/storage/replica_sideload.go +++ b/pkg/storage/replica_sideload.go @@ -70,7 +70,7 @@ func (r *Replica) maybeSideloadEntriesRaftMuLocked( maybeRaftCommand := func(cmdID storagebase.CmdIDKey) (storagebase.RaftCommand, bool) { r.mu.Lock() defer r.mu.Unlock() - cmd, ok := r.mu.proposals[cmdID] + cmd, ok := r.mu.localProposals[cmdID] if ok { return *cmd.command, true } diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index f02895fdf23b..9b25207305af 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -7455,7 +7455,7 @@ func TestReplicaTryAbandon(t *testing.T) { func() { tc.repl.mu.Lock() defer tc.repl.mu.Unlock() - if len(tc.repl.mu.proposals) == 0 { + if len(tc.repl.mu.localProposals) == 0 { t.Fatal("expected non-empty proposals map") } }() @@ -8010,7 +8010,7 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) { } tc.repl.mu.Lock() - for _, p := range tc.repl.mu.proposals { + for _, p := range tc.repl.mu.localProposals { if v := p.ctx.Value(magicKey{}); v != nil { origIndexes = append(origIndexes, int(p.command.MaxLeaseIndex)) } @@ -8042,13 +8042,13 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) { tc.repl.mu.Lock() defer tc.repl.mu.Unlock() - nonePending := len(tc.repl.mu.proposals) == 0 + nonePending := len(tc.repl.mu.localProposals) == 0 c := int(tc.repl.mu.lastAssignedLeaseIndex) - int(tc.repl.mu.state.LeaseAppliedIndex) if nonePending && c > 0 { t.Errorf("no pending cmds, but have required index offset %d", c) } if !nonePending { - t.Fatalf("still pending commands: %+v", tc.repl.mu.proposals) + t.Fatalf("still pending commands: %+v", tc.repl.mu.localProposals) } } @@ -8206,7 +8206,7 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) { } // Build the map of expected reproposals at this stage. m := map[storagebase.CmdIDKey]int{} - for id, p := range r.mu.proposals { + for id, p := range r.mu.localProposals { m[id] = p.proposedAtTicks } r.mu.Unlock()