diff --git a/pkg/storage/client_raft_test.go b/pkg/storage/client_raft_test.go index 02222a1a0d0a..812eb9860dc0 100644 --- a/pkg/storage/client_raft_test.go +++ b/pkg/storage/client_raft_test.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/engine" @@ -39,6 +40,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/storage/storagepb" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" @@ -2928,6 +2930,134 @@ func TestReplicaGCRace(t *testing.T) { } } +func requireOnlyAtomicChanges( + t *testing.T, db *sqlutils.SQLRunner, rangeID roachpb.RangeID, repFactor int, start time.Time, +) { + // From all events pertaining to the given rangeID and post-dating the start time, + // filter out those infos which indicate a (full and incoming) voter count in + // excess of the replication factor. Any rows returned have the full info JSON + // strings in them. + const q = ` +SELECT + "uniqueID", + count(t) AS repfactor, + string_agg(info, e'\\n') AS infos +FROM + [ + SELECT + "uniqueID", + replicas->'node_id' AS n, + COALESCE(replicas->'type', '0') AS t, + info + FROM + system.rangelog, + ROWS FROM ( + jsonb_array_elements( + info::JSONB->'UpdatedDesc'->'internal_replicas' + ) + ) + AS replicas + WHERE + info::JSONB->'UpdatedDesc'->'range_id' = $1::JSONB AND timestamp >= $2 + ORDER BY + "timestamp" ASC + ] +WHERE + t IN ('0', '2') +GROUP BY + "uniqueID" +HAVING + count(t) > $3; +` + matrix := db.QueryStr(t, q, rangeID, start, repFactor) + if len(matrix) > 0 { + t.Fatalf("more than %d voting replicas: %s", repFactor, sqlutils.MatrixToStr(matrix)) + } +} + +func TestDecommission(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 5, base.TestClusterArgs{ + ReplicationMode: base.ReplicationAuto, + }) + defer tc.Stopper().Stop(ctx) + + k := tc.ScratchRange(t) + cc, err := tc.Server(0).RPCContext().GRPCDialNode(tc.Server(0).RPCAddr(), 1, rpc.DefaultClass).Connect(ctx) + require.NoError(t, err) + admin := serverpb.NewAdminClient(cc) + // Decommission the first node, which holds most of the leases. + _, err = admin.Decommission( + ctx, &serverpb.DecommissionRequest{Decommissioning: true}, + ) + require.NoError(t, err) + + requireNoReplicas := func(storeID roachpb.StoreID, repFactor int) { + testutils.SucceedsSoon(t, func() error { + desc := tc.LookupRangeOrFatal(t, k) + for _, rDesc := range desc.Replicas().Voters() { + store, err := tc.Servers[int(rDesc.NodeID-1)].Stores().GetStore(rDesc.StoreID) + require.NoError(t, err) + if err := store.ForceReplicationScanAndProcess(); err != nil { + return err + } + } + if sl := desc.Replicas().Filter(func(rDesc roachpb.ReplicaDescriptor) bool { + return rDesc.StoreID == storeID + }); len(sl) > 0 { + return errors.Errorf("still a replica on s%d: %s", storeID, &desc) + } + if len(desc.Replicas().Voters()) != repFactor { + return errors.Errorf("expected %d replicas: %s", repFactor, &desc) + } + return nil + }) + } + + const triplicated = 3 + + requireNoReplicas(1, triplicated) + + runner := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + ts := timeutil.Now() + + _, err = admin.Decommission( + ctx, &serverpb.DecommissionRequest{NodeIDs: []roachpb.NodeID{2}, Decommissioning: true}, + ) + require.NoError(t, err) + + // Both s1 and s2 are out, so neither ought to have replicas. + requireNoReplicas(1, triplicated) + requireNoReplicas(2, triplicated) + + // Going from three replicas to three replicas should have used atomic swaps + // only. We didn't verify this before the first decommissioning op because + // lots of ranges were over-replicated due to ranges recently having split + // off from the five-fold replicated system ranges. + requireOnlyAtomicChanges(t, runner, tc.LookupRangeOrFatal(t, k).RangeID, triplicated, ts) + + sqlutils.SetZoneConfig(t, runner, "RANGE default", "num_replicas: 1") + + const single = 1 + + // The range should drop down to one replica, neither of which is on a decommissioning store. + requireNoReplicas(1, single) + requireNoReplicas(2, single) + + // Decommission two more nodes. Only n5 is left; getting the replicas there + // can't use atomic replica swaps because the leaseholder can't be removed. + _, err = admin.Decommission( + ctx, &serverpb.DecommissionRequest{NodeIDs: []roachpb.NodeID{3, 4}, Decommissioning: true}, + ) + require.NoError(t, err) + + requireNoReplicas(1, single) + requireNoReplicas(2, single) + requireNoReplicas(3, single) + requireNoReplicas(4, single) +} + // TestStoreRangeMoveDecommissioning verifies that if a store is set to // decommission, the ReplicateQueue will notice and move any replicas on it. func TestStoreRangeMoveDecommissioning(t *testing.T) { diff --git a/pkg/storage/replicate_queue.go b/pkg/storage/replicate_queue.go index dcb0571d0ae5..a139e78178cb 100644 --- a/pkg/storage/replicate_queue.go +++ b/pkg/storage/replicate_queue.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage/storagepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -335,20 +336,46 @@ func (rq *replicateQueue) processOneChange( case AllocatorAdd: // only include live replicas, since dead replicas should soon be removed existingReplicas := liveVoterReplicas - return rq.add(ctx, repl, existingReplicas, dryRun) + return rq.addOrReplace(ctx, repl, existingReplicas, -1 /* removeIdx */, dryRun) case AllocatorRemove: return rq.remove(ctx, repl, voterReplicas, dryRun) - case AllocatorReplaceDead, AllocatorReplaceDecommissioning: - existingReplicas := liveVoterReplicas - // WIP(tbg): pass a slice of replicas that can be replaced in. - // In ReplaceDead, it's the dead replicas, in ReplaceDecommissioning - // it's the decommissioning ones. - // Rename rq.add to rq.addOrReplace, and let it actually replace a replica - // atomically when there's suitable candidate in the slice. - return rq.add(ctx, repl, existingReplicas, dryRun) + case AllocatorReplaceDead: + if len(deadVoterReplicas) == 0 { + // Nothing to do. + return false, nil + } + removeIdx := -1 // guaranteed to be changed below + for i, rDesc := range voterReplicas { + if rDesc.StoreID == deadVoterReplicas[0].StoreID { + removeIdx = i + break + } + } + return rq.addOrReplace(ctx, repl, voterReplicas, removeIdx, dryRun) + case AllocatorReplaceDecommissioning: + decommissioningReplicas := rq.allocator.storePool.decommissioningReplicas( + desc.RangeID, voterReplicas) + if len(decommissioningReplicas) == 0 { + // Nothing to do. + return false, nil + } + removeIdx := -1 // guaranteed to be changed below + for i, rDesc := range voterReplicas { + if rDesc.StoreID == decommissioningReplicas[0].StoreID { + removeIdx = i + break + } + } + return rq.addOrReplace(ctx, repl, voterReplicas, removeIdx, dryRun) case AllocatorRemoveDecommissioning: + // NB: this path will only be hit when the range is over-replicated and + // has decommissioning replicas; in the common case we'll hit + // AllocatorReplaceDecommissioning above. return rq.removeDecommissioning(ctx, repl, dryRun) case AllocatorRemoveDead: + // NB: this path will only be hit when the range is over-replicated and + // has dead replicas; in the common case we'll hit AllocatorReplaceDead + // above. return rq.removeDead(ctx, repl, deadVoterReplicas, dryRun) case AllocatorRemoveLearner: return rq.removeLearner(ctx, repl, dryRun) @@ -359,23 +386,74 @@ func (rq *replicateQueue) processOneChange( // Requeue because either we failed to transition out of a joint state // (bad) or we did and there might be more to do for that range. return true, err + default: + log.Fatalf(ctx, "unknown allocator action %v", action) } return true, nil } -func (rq *replicateQueue) add( - ctx context.Context, repl *Replica, existingReplicas []roachpb.ReplicaDescriptor, dryRun bool, +// addOrReplace adds or replaces a replica. If removeIdx is -1, an addition is +// carried out. Otherwise, removeIdx must be a valid index into existingReplicas +// and specifies which replica to replace with a new one. +// +// The method preferably issues an atomic replica swap, but may not be able to +// do this in all cases, such as when atomic replication changes are not +// available, or when the range consists of a single replica. As a fall back, +// only the addition is carried out; the removal is then a follow-up step for +// the next scanner cycle. +func (rq *replicateQueue) addOrReplace( + ctx context.Context, + repl *Replica, + existingReplicas []roachpb.ReplicaDescriptor, + removeIdx int, // -1 for no removal + dryRun bool, ) (requeue bool, _ error) { + if len(existingReplicas) == 1 { + // If only one replica remains, that replica is the leaseholder and + // we won't be able to swap it out. Ignore the removal and simply add + // a replica. + removeIdx = -1 + } + if !rq.store.cfg.Settings.Version.IsActive(cluster.VersionAtomicChangeReplicas) { + // If we can't swap yet, don't. + removeIdx = -1 + } + + var remainingReplicas []roachpb.ReplicaDescriptor + if removeIdx >= 0 { + remainingReplicas = append(existingReplicas[:removeIdx:removeIdx], existingReplicas[removeIdx+1:]...) + // See about transferring the lease away if we're about to remove the + // leaseholder. + done, err := rq.maybeTransferLeaseAway(ctx, repl, existingReplicas[removeIdx].StoreID, dryRun) + if err != nil { + return false, err + } + if done { + // Lease was transferred away. Next leaseholder is going to take over. + return false, nil + } + } else { + remainingReplicas = existingReplicas + } + desc, zone := repl.DescAndZone() + // Allocate a target assuming that the replica we're replacing (if any) is + // already gone. The allocator should not try to re-add this replica since + // there is a reason we're removing it (i.e. dead or decommissioning). If we + // left the replica in the slice, the allocator would not be guaranteed to + // pick a replica that fills the gap removeRepl leaves once it's gone. newStore, details, err := rq.allocator.AllocateTarget( ctx, zone, desc.RangeID, - existingReplicas, + remainingReplicas, ) if err != nil { return false, err } + if removeIdx >= 0 && newStore.StoreID == existingReplicas[removeIdx].StoreID { + return false, errors.Errorf("allocator suggested to replace replica on s%d with itself", newStore.StoreID) + } newReplica := roachpb.ReplicationTarget{ NodeID: newStore.Node.NodeID, StoreID: newStore.StoreID, @@ -383,7 +461,6 @@ func (rq *replicateQueue) add( clusterNodes := rq.allocator.storePool.ClusterNodeCount() need := GetNeededReplicas(*zone.NumReplicas, clusterNodes) - willHave := len(existingReplicas) + 1 // Only up-replicate if there are suitable allocation targets such that, // either the replication goal is met, or it is possible to get to the next @@ -395,7 +472,10 @@ func (rq *replicateQueue) add( // NB: If willHave > need, then always allow up-replicating as that // will be the case when up-replicating a range with a decommissioning // replica. - if willHave < need && willHave%2 == 0 { + // + // We skip this check if we're swapping a replica, since that does not + // change the quorum size. + if willHave := len(existingReplicas) + 1; removeIdx < 0 && willHave < need && willHave%2 == 0 { // This means we are going to up-replicate to an even replica state. // Check if it is possible to go to an odd replica state beyond it. oldPlusNewReplicas := append([]roachpb.ReplicaDescriptor(nil), existingReplicas...) @@ -417,12 +497,26 @@ func (rq *replicateQueue) add( } } rq.metrics.AddReplicaCount.Inc(1) - log.VEventf(ctx, 1, "adding replica %+v due to under-replication: %s", - newReplica, rangeRaftProgress(repl.RaftStatus(), existingReplicas)) + ops := roachpb.MakeReplicationChanges(roachpb.ADD_REPLICA, newReplica) + if removeIdx < 0 { + log.VEventf(ctx, 1, "adding replica %+v: %s", + newReplica, rangeRaftProgress(repl.RaftStatus(), existingReplicas)) + } else { + rq.metrics.RemoveReplicaCount.Inc(1) + removeReplica := existingReplicas[removeIdx] + log.VEventf(ctx, 1, "replacing replica %s with %+v: %s", + removeReplica, newReplica, rangeRaftProgress(repl.RaftStatus(), existingReplicas)) + ops = append(ops, + roachpb.MakeReplicationChanges(roachpb.REMOVE_REPLICA, roachpb.ReplicationTarget{ + StoreID: removeReplica.StoreID, + NodeID: removeReplica.NodeID, + })...) + } + if err := rq.changeReplicas( ctx, repl, - roachpb.MakeReplicationChanges(roachpb.ADD_REPLICA, newReplica), + ops, desc, SnapshotRequest_RECOVERY, storagepb.ReasonRangeUnderReplicated, @@ -431,6 +525,7 @@ func (rq *replicateQueue) add( ); err != nil { return false, err } + // Always requeue to see if more work needs to be done. return true, nil }