diff --git a/pkg/internal/client/db.go b/pkg/internal/client/db.go index 94e6aaa00432..3eb1d837fc11 100644 --- a/pkg/internal/client/db.go +++ b/pkg/internal/client/db.go @@ -560,6 +560,27 @@ func (db *DB) AdminTransferLease( return getOneErr(db.Run(ctx, b), b) } +// ChangeReplicasCanMixAddAndRemoveContext convinces +// (*client.DB).AdminChangeReplicas that the caller is aware that 19.1 nodes +// don't know how to handle requests that mix additions and removals; 19.2+ +// binaries understand this due to the work done in the context of atomic +// replication changes. If 19.1 nodes received such a request they'd mistake the +// removals for additions. +// +// In effect users of the RPC need to check the cluster version which in the +// past has been a brittle pattern, so this time the DB disallows the new +// behavior unless it can determine (via the ctx) that the caller went through +// this method and is thus aware of the intricacies. +// +// See https://github.com/cockroachdb/cockroach/pull/39611. +// +// TODO(tbg): remove in 20.1. +func ChangeReplicasCanMixAddAndRemoveContext(ctx context.Context) context.Context { + return context.WithValue(ctx, adminChangeReplicasMixHint{}, adminChangeReplicasMixHint{}) +} + +type adminChangeReplicasMixHint struct{} + // AdminChangeReplicas adds or removes a set of replicas for a range. func (db *DB) AdminChangeReplicas( ctx context.Context, @@ -567,7 +588,7 @@ func (db *DB) AdminChangeReplicas( expDesc roachpb.RangeDescriptor, chgs []roachpb.ReplicationChange, ) (*roachpb.RangeDescriptor, error) { - if ctx.Value("testing") == nil { + if ctx.Value(adminChangeReplicasMixHint{}) == nil { // Disallow trying to add and remove replicas in the same set of // changes. This will only work when the node receiving the request is // running 19.2 (code, not cluster version). diff --git a/pkg/storage/client_atomic_membership_change_test.go b/pkg/storage/client_atomic_membership_change_test.go index 53bc697b2b85..32f0de196135 100644 --- a/pkg/storage/client_atomic_membership_change_test.go +++ b/pkg/storage/client_atomic_membership_change_test.go @@ -16,6 +16,7 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -60,9 +61,7 @@ func TestAtomicReplicationChange(t *testing.T) { runChange := func(expDesc roachpb.RangeDescriptor, chgs []roachpb.ReplicationChange) roachpb.RangeDescriptor { t.Helper() desc, err := tc.Servers[0].DB().AdminChangeReplicas( - // TODO(tbg): when 19.2 is out, remove this "feature gate" here and in - // AdminChangeReplicas. - context.WithValue(ctx, "testing", "testing"), + client.ChangeReplicasCanMixAddAndRemoveContext(ctx), k, expDesc, chgs, ) require.NoError(t, err) diff --git a/pkg/storage/client_relocate_range_test.go b/pkg/storage/client_relocate_range_test.go new file mode 100644 index 000000000000..7e92be759d0b --- /dev/null +++ b/pkg/storage/client_relocate_range_test.go @@ -0,0 +1,192 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package storage_test + +import ( + "context" + "sort" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +func relocateAndCheck( + t *testing.T, + tc *testcluster.TestCluster, + startKey roachpb.RKey, + targets []roachpb.ReplicationTarget, +) { + require.NoError(t, tc.Servers[0].DB().AdminRelocateRange(context.Background(), startKey.AsRawKey(), targets)) + desc, err := tc.Servers[0].LookupRange(startKey.AsRawKey()) + require.NoError(t, err) + requireDescMembers(t, desc, targets) + requireLeaseAt(t, tc, desc, targets[0]) +} + +func requireDescMembers( + t *testing.T, desc roachpb.RangeDescriptor, targets []roachpb.ReplicationTarget, +) { + t.Helper() + targets = append([]roachpb.ReplicationTarget(nil), targets...) + sort.Slice(targets, func(i, j int) bool { return targets[i].StoreID < targets[j].StoreID }) + + have := make([]roachpb.ReplicationTarget, 0, len(targets)) + for _, rDesc := range desc.Replicas().All() { + have = append(have, roachpb.ReplicationTarget{ + NodeID: rDesc.NodeID, + StoreID: rDesc.StoreID, + }) + } + sort.Slice(have, func(i, j int) bool { return have[i].StoreID < have[j].StoreID }) + require.Equal(t, targets, have) +} + +func requireLeaseAt( + t *testing.T, + tc *testcluster.TestCluster, + desc roachpb.RangeDescriptor, + target roachpb.ReplicationTarget, +) { + t.Helper() + // NB: under stressrace the lease will sometimes be inactive by the time + // it's returned here, so don't use FindRangeLeaseHolder which fails when + // that happens. + testutils.SucceedsSoon(t, func() error { + lease, _, err := tc.FindRangeLease(desc, &target) + if err != nil { + return err + } + if target != (roachpb.ReplicationTarget{ + NodeID: lease.Replica.NodeID, + StoreID: lease.Replica.StoreID, + }) { + return errors.Errorf("lease %v is not held by %+v", lease, target) + } + return nil + }) +} + +func TestAdminRelocateRange(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + + type intercept struct { + ops []roachpb.ReplicationChange + leaseTarget *roachpb.ReplicationTarget + err error + } + var intercepted []intercept + + requireNumAtomic := func(expAtomic int, expSingle int, f func()) { + t.Helper() + intercepted = nil + f() + var actAtomic, actSingle int + for _, ic := range intercepted { + if ic.err != nil { + continue + } + if len(ic.ops) == 2 && ic.ops[0].ChangeType == roachpb.ADD_REPLICA && ic.ops[1].ChangeType == roachpb.REMOVE_REPLICA { + actAtomic++ + } else { + actSingle++ + } + } + require.Equal(t, expAtomic, actAtomic, "wrong number of atomic changes: %+v", intercepted) + require.Equal(t, expSingle, actSingle, "wrong number of single changes: %+v", intercepted) + } + + knobs := base.TestingKnobs{ + Store: &storage.StoreTestingKnobs{ + BeforeRelocateOne: func(ops []roachpb.ReplicationChange, leaseTarget *roachpb.ReplicationTarget, err error) { + intercepted = append(intercepted, intercept{ + ops: ops, + leaseTarget: leaseTarget, + err: err, + }) + }, + }, + } + args := base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{Knobs: knobs}, + ReplicationMode: base.ReplicationManual, + } + tc := testcluster.StartTestCluster(t, 6, args) + defer tc.Stopper().Stop(ctx) + + // s1 (LH) ---> s2 (LH) s1 s3 + // Pure upreplication. + k := keys.MustAddr(tc.ScratchRange(t)) + { + targets := tc.Targets(1, 0, 2) + // Expect two single additions, and that's it. + requireNumAtomic(0, 2, func() { + relocateAndCheck(t, tc, k, targets) + }) + } + + // s1 (LH) s2 s3 ---> s4 (LH) s5 s6. + // This is trickier because the leaseholder gets removed, and so do all + // other replicas (i.e. a simple lease transfer at the beginning won't solve + // the problem). + { + targets := tc.Targets(3, 4, 5) + // Should carry out three swaps. Note that the leaseholder gets removed + // in the process (i.e. internally the lease must've been moved around + // to achieve that). + requireNumAtomic(3, 0, func() { + relocateAndCheck(t, tc, k, targets) + }) + } + + // s4 (LH) s5 s6 ---> s5 (LH) + // Pure downreplication. + { + requireNumAtomic(0, 2, func() { + relocateAndCheck(t, tc, k, tc.Targets(4)) + }) + } + + // s5 (LH) ---> s3 (LH) + // Lateral movement while at replication factor one. In this case atomic + // replication changes cannot be used; we add-then-remove instead. + { + requireNumAtomic(0, 2, func() { + relocateAndCheck(t, tc, k, tc.Targets(2)) + }) + } + + // s3 (LH) ---> s2 (LH) s4 s1 --> s4 (LH) s2 s6 s1 --> s3 (LH) s5 + // A grab bag. + { + // s3 -(add)-> s3 s2 -(swap)-> s4 s2 -(add)-> s4 s2 s1 (=s2 s4 s1) + requireNumAtomic(1, 2, func() { + relocateAndCheck(t, tc, k, tc.Targets(1, 3, 0)) + }) + // s2 s4 s1 -(add)-> s2 s4 s1 s6 (=s4 s2 s6 s1) + requireNumAtomic(0, 1, func() { + relocateAndCheck(t, tc, k, tc.Targets(3, 1, 5, 0)) + }) + // s4 s2 s6 s1 -(swap)-> s3 s2 s6 s1 -(swap)-> s3 s5 s6 s1 -(del)-> s3 s5 s6 -(del)-> s3 s5 + requireNumAtomic(2, 2, func() { + relocateAndCheck(t, tc, k, tc.Targets(2, 4)) + }) + } +} diff --git a/pkg/storage/client_replica_test.go b/pkg/storage/client_replica_test.go index 545cfe8d0d1a..45bcf7f32f93 100644 --- a/pkg/storage/client_replica_test.go +++ b/pkg/storage/client_replica_test.go @@ -2705,7 +2705,7 @@ func TestChangeReplicasLeaveAtomicRacesWithMerge(t *testing.T) { ReplicationMode: base.ReplicationManual, }) // Make a magical context which will allow us to use atomic replica changes. - ctx := context.WithValue(context.Background(), "testing", "testing") + ctx := client.ChangeReplicasCanMixAddAndRemoveContext(context.Background()) defer tc.Stopper().Stop(ctx) // We want to first get into a joint consensus scenario. diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index 1e29805874ef..205cb2475a87 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -15,6 +15,7 @@ import ( "context" "fmt" "math/rand" + "sort" "strings" "time" @@ -1996,6 +1997,21 @@ func updateRangeDescriptor( func (s *Store) AdminRelocateRange( ctx context.Context, rangeDesc roachpb.RangeDescriptor, targets []roachpb.ReplicationTarget, ) error { + useAtomic := s.ClusterSettings().Version.IsActive(cluster.VersionAtomicChangeReplicas) + if useAtomic { + // AdminChangeReplicas will only allow atomic replication changes when + // this magic flag is set because we changed the corresponding request + // to accommodate them; only 19.2 nodes will understand it. We're going + // to make sure below that when !useAtomic we carry out ops one by one. + // Other uses of ChangeReplicas have a Replica that they can call into + // directly, bypassing the RPC layer, which is morally possible here as + // well but seems more likely to invite undesired fallout, so we stick + // with this hack for a few weeks. + // + // TODO(tbg): remove in 20.1. + ctx = client.ChangeReplicasCanMixAddAndRemoveContext(ctx) + } + // Step 0: Remove all learners so we don't have to think about them. We could // do something smarter here and try to promote them, but it doesn't seem // worth the complexity right now. Revisit if this is an issue in practice. @@ -2009,15 +2025,151 @@ func (s *Store) AdminRelocateRange( return err } rangeDesc = *newDesc - rangeReplicas := rangeDesc.Replicas().All() - if len(rangeReplicas) != len(rangeDesc.Replicas().Voters()) { - // We just removed all the learners, so there shouldn't be anything but + + canRetry := func(err error) bool { + whitelist := []string{ + snapshotApplySemBusyMsg, + IntersectingSnapshotMsg, + } + errStr := err.Error() + for _, substr := range whitelist { + if strings.Contains(errStr, substr) { + return true + } + } + return false + } + + startKey := rangeDesc.StartKey.AsRawKey() + transferLease := func(target roachpb.ReplicationTarget) { + // TODO(tbg): we ignore errors here, but it seems that in practice these + // transfers "always work". Some of them are essential (we can't remove + // the leaseholder so we'll fail there later if this fails), so it + // seems like a good idea to return any errors here to the caller (or + // to retry some errors appropriately). + if err := s.DB().AdminTransferLease( + ctx, startKey, target.StoreID, + ); err != nil { + log.Warningf(ctx, "while transferring lease: %+v", err) + } + } + + // Step 2: Repeatedly add and/or remove a replica until we reach the + // desired state. In an "atomic replication changes" world, this is + // conceptually easy: change from the old set of replicas to the new + // one. But there are two reasons that complicate this: + // 1. we can't remove the leaseholder, so if we ultimately want to do that + // the lease has to be moved first. If we start out with *only* the + // leaseholder, we will have to add a replica first. + // 2. this code is rewritten late in the cycle and it is both safer and + // closer to its previous incarnation to never issue atomic changes + // other than simple swaps. + // + // The loop below repeatedly calls relocateOne, which gives us either one or + // two ops that move the range towards the desired replication state. If + // it's one op, then a single add or remove is carried out (and it's only + // done when we can't swap instead). If it's two ops, then we're swapping + // (though this code doesn't concern itself with the details); and it's + // possible that we need to transfer the lease before we carry out the ops, + // determined via the leaseTarget variable. + // + // Transient errors returned from relocateOne are retried until things work + // out. + every := log.Every(time.Minute) + for { + for re := retry.StartWithCtx(ctx, retry.Options{MaxBackoff: 5 * time.Second}); ; re.Next() { + if err := ctx.Err(); err != nil { + return err + } + + ops, leaseTarget, err := s.relocateOne(ctx, &rangeDesc, targets) + if err != nil { + return err + } + if leaseTarget != nil { + // NB: we may need to transfer even if there are no ops, to make + // sure the attempt is made to make the first target the final + // leaseholder. + transferLease(*leaseTarget) + } + if len(ops) == 0 { + // Done. + return ctx.Err() + } + if fn := s.cfg.TestingKnobs.BeforeRelocateOne; fn != nil { + fn(ops, leaseTarget, err) + } + // When a swap is in order but we're not sure that all nodes are running + // 19.2+ (in which the AdminChangeReplicas RPC was extended to support + // mixing additions and removals), don't send such requests but unroll + // the ops here, running them one by one; see for details: + _ = client.ChangeReplicasCanMixAddAndRemoveContext + + // Make sure we don't issue anything but singles and swaps before + // this migration is gone (for it doesn't support anything else). + if len(ops) > 2 { + log.Fatalf(ctx, "received more than 2 ops: %+v", ops) + } + opss := [][]roachpb.ReplicationChange{ops} + if !useAtomic && len(ops) == 2 { + opss = [][]roachpb.ReplicationChange{ops[:1], ops[1:]} + } + success := true + for _, ops := range opss { + newDesc, err := s.DB().AdminChangeReplicas( + client.ChangeReplicasCanMixAddAndRemoveContext(ctx), + startKey, + rangeDesc, + ops, + ) + if err != nil { + returnErr := errors.Wrapf(err, "while carrying out changes %v", ops) + if !canRetry(err) { + return returnErr + } + if every.ShouldLog() { + log.Info(ctx, returnErr) + } + success = false + break + } + rangeDesc = *newDesc + } + if success { + break + } + } + } + +} + +func (s *Store) relocateOne( + ctx context.Context, desc *roachpb.RangeDescriptor, targets []roachpb.ReplicationTarget, +) ([]roachpb.ReplicationChange, *roachpb.ReplicationTarget, error) { + rangeReplicas := desc.Replicas().All() + if len(rangeReplicas) != len(desc.Replicas().Voters()) { + // The caller removed all the learners, so there shouldn't be anything but // voters. - return crdberrors.AssertionFailedf( - `range %s had non-voter replicas: %v`, &rangeDesc, rangeDesc.Replicas()) + return nil, nil, crdberrors.AssertionFailedf( + `range %s had non-voter replicas: %v`, desc, desc.Replicas()) } - // Step 1: Compute which replicas are to be added and which are to be removed. + sysCfg := s.cfg.Gossip.GetSystemConfig() + if sysCfg == nil { + return nil, nil, fmt.Errorf("no system config available, unable to perform RelocateRange") + } + zone, err := sysCfg.GetZoneConfigForKey(desc.StartKey) + if err != nil { + return nil, nil, err + } + + storeList, _, _ := s.allocator.storePool.getStoreList(desc.RangeID, storeFilterNone) + storeMap := storeListToMap(storeList) + + // Compute which replica to add and/or remove, respectively. We ask the allocator + // about this because we want to respect the constraints. For example, it would be + // unfortunate if we put two replicas into the same zone despite having a locality- + // preserving option available. // // TODO(radu): we can't have multiple replicas on different stores on the // same node, and this code doesn't do anything to specifically avoid that @@ -2058,183 +2210,136 @@ func (s *Store) AdminRelocateRange( } } - canRetry := func(err error) bool { - whitelist := []string{ - snapshotApplySemBusyMsg, - IntersectingSnapshotMsg, - } - for _, substr := range whitelist { - if strings.Contains(err.Error(), substr) { - return true - } - } - return false - } - - startKey := rangeDesc.StartKey.AsRawKey() - transferLease := func() { - if err := s.DB().AdminTransferLease( - ctx, startKey, targets[0].StoreID, - ); err != nil { - log.Warningf(ctx, "while transferring lease: %+v", err) - } - } + var ops roachpb.ReplicationChanges - sysCfg := s.cfg.Gossip.GetSystemConfig() - if sysCfg == nil { - return fmt.Errorf("no system config available, unable to perform RelocateRange") - } - zone, err := sysCfg.GetZoneConfigForKey(rangeDesc.StartKey) - if err != nil { - return err - } - - storeList, _, _ := s.allocator.storePool.getStoreList(rangeDesc.RangeID, storeFilterNone) - storeMap := storeListToMap(storeList) - - // Step 2: Repeatedly add a replica then remove a replica until we reach the - // desired state. - every := log.Every(time.Minute) - re := retry.StartWithCtx(ctx, retry.Options{MaxBackoff: 5 * time.Second}) - for len(addTargets) > 0 || len(removeTargets) > 0 { - if err := ctx.Err(); err != nil { - return err - } - - if len(addTargets) > 0 && len(addTargets) >= len(removeTargets) { - // Each iteration, pick the most desirable replica to add. However, - // prefer the first target if it doesn't yet have a replica so that we - // can always transfer the lease to it before removing a replica below. - // This makes it easier to avoid removing a replica that's still - // leaseholder without needing to bounce the lease around a bunch. - candidateTargets := addTargets - if storeHasReplica(targets[0].StoreID, candidateTargets) { - candidateTargets = []roachpb.ReplicaDescriptor{ - {NodeID: targets[0].NodeID, StoreID: targets[0].StoreID}, - } + if len(addTargets) > 0 { + // Each iteration, pick the most desirable replica to add. However, + // prefer the first target because it's the one that should hold the + // lease in the end; it helps to add it early so that the lease doesn't + // have to move too much. + candidateTargets := addTargets + if storeHasReplica(targets[0].StoreID, candidateTargets) { + candidateTargets = []roachpb.ReplicaDescriptor{ + {NodeID: targets[0].NodeID, StoreID: targets[0].StoreID}, } + } - // The storeList's list of stores is used to constrain which stores the - // allocator considers putting a new replica on. We want it to only - // consider the stores in candidateTargets. - candidateDescs := make([]roachpb.StoreDescriptor, 0, len(candidateTargets)) - for _, candidate := range candidateTargets { - store, ok := storeMap[candidate.StoreID] - if !ok { - return fmt.Errorf("cannot up-replicate to s%d; missing gossiped StoreDescriptor", - candidate.StoreID) - } - candidateDescs = append(candidateDescs, *store) - } - storeList = makeStoreList(candidateDescs) - - targetStore, _ := s.allocator.allocateTargetFromList( - ctx, - storeList, - zone, - rangeReplicas, - s.allocator.scorerOptions()) - if targetStore == nil { - return fmt.Errorf("none of the remaining targets %v are legal additions to %v", - addTargets, rangeDesc.Replicas()) + // The storeList's list of stores is used to constrain which stores the + // allocator considers putting a new replica on. We want it to only + // consider the stores in candidateTargets. + candidateDescs := make([]roachpb.StoreDescriptor, 0, len(candidateTargets)) + for _, candidate := range candidateTargets { + store, ok := storeMap[candidate.StoreID] + if !ok { + return nil, nil, fmt.Errorf("cannot up-replicate to s%d; missing gossiped StoreDescriptor", + candidate.StoreID) } + candidateDescs = append(candidateDescs, *store) + } + storeList = makeStoreList(candidateDescs) + + targetStore, _ := s.allocator.allocateTargetFromList( + ctx, + storeList, + zone, + rangeReplicas, + s.allocator.scorerOptions()) + if targetStore == nil { + return nil, nil, fmt.Errorf("none of the remaining targets %v are legal additions to %v", + addTargets, desc.Replicas()) + } + + target := roachpb.ReplicationTarget{ + NodeID: targetStore.Node.NodeID, + StoreID: targetStore.StoreID, + } + ops = append(ops, roachpb.MakeReplicationChanges(roachpb.ADD_REPLICA, target)...) + // Pretend the voter is already there so that the removal logic below will + // take it into account when deciding which replica to remove. + rangeReplicas = append(rangeReplicas, roachpb.ReplicaDescriptor{ + NodeID: target.NodeID, + StoreID: target.StoreID, + ReplicaID: desc.NextReplicaID, + Type: roachpb.ReplicaTypeVoterFull(), + }) + } - target := roachpb.ReplicationTarget{ - NodeID: targetStore.Node.NodeID, - StoreID: targetStore.StoreID, - } - newDesc, err := s.DB().AdminChangeReplicas( - ctx, startKey, rangeDesc, - roachpb.MakeReplicationChanges(roachpb.ADD_REPLICA, target)) - if err != nil { - returnErr := errors.Wrapf(err, "while adding target %v", target) - if !canRetry(err) { - return returnErr - } - if every.ShouldLog() { - log.Warning(ctx, returnErr) + var transferTarget *roachpb.ReplicationTarget + if len(removeTargets) > 0 { + // Pick a replica to remove. Note that rangeReplicas may already reflect + // a replica we're adding in the current round. This is the right thing + // to do. For example, consider relocating from (s1,s2,s3) to (s1,s2,s4) + // where addTargets will be (s4) and removeTargets is (s3). In this code, + // we'll want the allocator to see if s3 can be removed from + // (s1,s2,s3,s4) which is a reasonable request; that replica set is + // overreplicated. If we asked it instead to remove s3 from (s1,s2,s3) + // it may not want to do that due to constraints. + targetStore, _, err := s.allocator.RemoveTarget(ctx, zone, removeTargets, rangeReplicas) + if err != nil { + return nil, nil, errors.Wrapf(err, "unable to select removal target from %v; current replicas %v", + removeTargets, rangeReplicas) + } + removalTarget := roachpb.ReplicationTarget{ + NodeID: targetStore.NodeID, + StoreID: targetStore.StoreID, + } + // We can't remove the leaseholder, which really throws a wrench into + // atomic replication changes. If we find that we're trying to do just + // that, we need to first move the lease elsewhere. This is not possible + // if there is no other replica available at that point, i.e. if the + // existing descriptor is a single replica that's being replaced. + var b client.Batch + liReq := &roachpb.LeaseInfoRequest{} + liReq.Key = desc.StartKey.AsRawKey() + b.AddRawRequest(liReq) + if err := s.DB().Run(ctx, &b); err != nil { + return nil, nil, errors.Wrap(err, "looking up lease") + } + curLeaseholder := b.RawResponse().Responses[0].GetLeaseInfo().Lease.Replica + ok := curLeaseholder.StoreID != removalTarget.StoreID + if !ok { + // Pick a replica that we can give the lease to. We sort the first + // target to the beginning (if it's there) because that's where the + // lease needs to be in the end. We also exclude the last replica if + // it was added by the add branch above (in which case it doesn't + // exist yet). + sortedTargetReplicas := append([]roachpb.ReplicaDescriptor(nil), rangeReplicas[:len(rangeReplicas)-len(ops)]...) + sort.Slice(sortedTargetReplicas, func(i, j int) bool { + sl := sortedTargetReplicas + // targets[0] goes to the front (if it's present). + return sl[i].StoreID == targets[0].StoreID + }) + for _, rDesc := range sortedTargetReplicas { + if rDesc.StoreID != curLeaseholder.StoreID { + transferTarget = &roachpb.ReplicationTarget{ + NodeID: rDesc.NodeID, + StoreID: rDesc.StoreID, + } + ok = true + break } - re.Next() - continue - } - - // Upon success, remove the target from our to-do list and add it to our - // local copy of the range descriptor such that future allocator - // decisions take it into account. - addTargets = removeTargetFromSlice(addTargets, target) - rangeDesc = *newDesc - rangeReplicas = rangeDesc.Replicas().All() - if len(rangeReplicas) != len(rangeDesc.Replicas().Voters()) { - // We just removed all the learners, so there shouldn't be anything but - // voters. - return crdberrors.AssertionFailedf( - `range %s had non-voter replicas: %v`, &rangeDesc, rangeDesc.Replicas()) } } - if len(removeTargets) > 0 && len(removeTargets) > len(addTargets) { - targetStore, _, err := s.allocator.RemoveTarget(ctx, zone, removeTargets, rangeReplicas) - if err != nil { - return errors.Wrapf(err, "unable to select removal target from %v; current replicas %v", - removeTargets, rangeReplicas) - } - target := roachpb.ReplicationTarget{ - NodeID: targetStore.NodeID, - StoreID: targetStore.StoreID, - } - // Note that attempting to remove the leaseholder won't work, so transfer - // the lease first in such scenarios. The first specified target should be - // the leaseholder now, so we can always transfer the lease there. - transferLease() - newDesc, err := s.DB().AdminChangeReplicas(ctx, startKey, rangeDesc, - roachpb.MakeReplicationChanges( - roachpb.REMOVE_REPLICA, - target), - ) - if err != nil { - log.Warningf(ctx, "while removing target %v: %+v", target, err) - if !canRetry(err) { - return err - } - re.Next() - continue - } - - // Upon success, remove the target from our to-do list and from our local - // copy of the range descriptor such that future allocator decisions take - // its absence into account. - removeTargets = removeTargetFromSlice(removeTargets, target) - rangeDesc = *newDesc - rangeReplicas = rangeDesc.Replicas().All() - if len(rangeReplicas) != len(rangeDesc.Replicas().Voters()) { - // We just removed all the learners, so there shouldn't be anything but - // voters. - return crdberrors.AssertionFailedf( - `range %s had non-voter replicas: %v`, &rangeDesc, rangeDesc.Replicas()) - } + // Carry out the removal only if there was no lease problem above. If + // there was, we're not going to do a swap in this round but just do the + // addition. (Note that !ok implies that len(ops) is not empty, or we're + // trying to remove the last replica left in the descriptor which is + // illegal). + if ok { + ops = append(ops, roachpb.MakeReplicationChanges( + roachpb.REMOVE_REPLICA, + removalTarget)...) } } - // Step 3: Transfer the lease to the first listed target, as the API specifies. - transferLease() - - return ctx.Err() -} - -// Modifies the underlying storage of the slice rather than copying. -// Don't use on a shared slice where the order matters. -func removeTargetFromSlice( - targets []roachpb.ReplicaDescriptor, target roachpb.ReplicationTarget, -) []roachpb.ReplicaDescriptor { - for i, t := range targets { - if t.NodeID == target.NodeID && t.StoreID == target.StoreID { - // Swap the removed target with the last element in the slice and return - // a slice that's 1 element shorter than before. - targets[i], targets[len(targets)-1] = targets[len(targets)-1], targets[i] - return targets[:len(targets)-1] - } + if len(ops) == 0 { + // Make sure that the first target is the final leaseholder, as + // AdminRelocateRange specifies. + transferTarget = &targets[0] } - return targets + + return ops, transferTarget, nil } func removeLearners( diff --git a/pkg/storage/testing_knobs.go b/pkg/storage/testing_knobs.go index f039a5a2911a..c55b42ba6ae3 100644 --- a/pkg/storage/testing_knobs.go +++ b/pkg/storage/testing_knobs.go @@ -221,6 +221,9 @@ type StoreTestingKnobs struct { // BeforeSnapshotSSTIngestion is run just before the SSTs are ingested when // applying a snapshot. BeforeSnapshotSSTIngestion func(IncomingSnapshot, SnapshotRequest_Type, []string) error + // BeforeRelocateOne intercepts the return values of s.relocateOne before + // they're being put into effect. + BeforeRelocateOne func(_ []roachpb.ReplicationChange, leaseTarget *roachpb.ReplicationTarget, _ error) // MaxApplicationBatchSize enforces a maximum size on application batches. // This can be useful for testing conditions which require commands to be