Skip to content

Commit

Permalink
Merge #41084
Browse files Browse the repository at this point in the history
41084: storage: use atomic replication changes in RelocateRange r=danhhz a=tbg

I wrote this up rather quickly, but wanted to get this out for review sooner
rather than later.

----

Touches #12768.

Release justification: the previous code would enter vulnerable
replication configurations when it wasn't necessary, thus undermining
what we wanted to achieve in #12768.

Release note: None

Co-authored-by: Tobias Schottdorf <tobias.schottdorf@gmail.com>
  • Loading branch information
craig[bot] and tbg committed Sep 27, 2019
2 parents a6c670a + d9412d3 commit 9d1eea0
Show file tree
Hide file tree
Showing 6 changed files with 495 additions and 175 deletions.
23 changes: 22 additions & 1 deletion pkg/internal/client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,14 +560,35 @@ func (db *DB) AdminTransferLease(
return getOneErr(db.Run(ctx, b), b)
}

// ChangeReplicasCanMixAddAndRemoveContext convinces
// (*client.DB).AdminChangeReplicas that the caller is aware that 19.1 nodes
// don't know how to handle requests that mix additions and removals; 19.2+
// binaries understand this due to the work done in the context of atomic
// replication changes. If 19.1 nodes received such a request they'd mistake the
// removals for additions.
//
// In effect users of the RPC need to check the cluster version which in the
// past has been a brittle pattern, so this time the DB disallows the new
// behavior unless it can determine (via the ctx) that the caller went through
// this method and is thus aware of the intricacies.
//
// See https://github.com/cockroachdb/cockroach/pull/39611.
//
// TODO(tbg): remove in 20.1.
func ChangeReplicasCanMixAddAndRemoveContext(ctx context.Context) context.Context {
return context.WithValue(ctx, adminChangeReplicasMixHint{}, adminChangeReplicasMixHint{})
}

type adminChangeReplicasMixHint struct{}

// AdminChangeReplicas adds or removes a set of replicas for a range.
func (db *DB) AdminChangeReplicas(
ctx context.Context,
key interface{},
expDesc roachpb.RangeDescriptor,
chgs []roachpb.ReplicationChange,
) (*roachpb.RangeDescriptor, error) {
if ctx.Value("testing") == nil {
if ctx.Value(adminChangeReplicasMixHint{}) == nil {
// Disallow trying to add and remove replicas in the same set of
// changes. This will only work when the node receiving the request is
// running 19.2 (code, not cluster version).
Expand Down
5 changes: 2 additions & 3 deletions pkg/storage/client_atomic_membership_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -60,9 +61,7 @@ func TestAtomicReplicationChange(t *testing.T) {
runChange := func(expDesc roachpb.RangeDescriptor, chgs []roachpb.ReplicationChange) roachpb.RangeDescriptor {
t.Helper()
desc, err := tc.Servers[0].DB().AdminChangeReplicas(
// TODO(tbg): when 19.2 is out, remove this "feature gate" here and in
// AdminChangeReplicas.
context.WithValue(ctx, "testing", "testing"),
client.ChangeReplicasCanMixAddAndRemoveContext(ctx),
k, expDesc, chgs,
)
require.NoError(t, err)
Expand Down
192 changes: 192 additions & 0 deletions pkg/storage/client_relocate_range_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package storage_test

import (
"context"
"sort"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

func relocateAndCheck(
t *testing.T,
tc *testcluster.TestCluster,
startKey roachpb.RKey,
targets []roachpb.ReplicationTarget,
) {
require.NoError(t, tc.Servers[0].DB().AdminRelocateRange(context.Background(), startKey.AsRawKey(), targets))
desc, err := tc.Servers[0].LookupRange(startKey.AsRawKey())
require.NoError(t, err)
requireDescMembers(t, desc, targets)
requireLeaseAt(t, tc, desc, targets[0])
}

func requireDescMembers(
t *testing.T, desc roachpb.RangeDescriptor, targets []roachpb.ReplicationTarget,
) {
t.Helper()
targets = append([]roachpb.ReplicationTarget(nil), targets...)
sort.Slice(targets, func(i, j int) bool { return targets[i].StoreID < targets[j].StoreID })

have := make([]roachpb.ReplicationTarget, 0, len(targets))
for _, rDesc := range desc.Replicas().All() {
have = append(have, roachpb.ReplicationTarget{
NodeID: rDesc.NodeID,
StoreID: rDesc.StoreID,
})
}
sort.Slice(have, func(i, j int) bool { return have[i].StoreID < have[j].StoreID })
require.Equal(t, targets, have)
}

func requireLeaseAt(
t *testing.T,
tc *testcluster.TestCluster,
desc roachpb.RangeDescriptor,
target roachpb.ReplicationTarget,
) {
t.Helper()
// NB: under stressrace the lease will sometimes be inactive by the time
// it's returned here, so don't use FindRangeLeaseHolder which fails when
// that happens.
testutils.SucceedsSoon(t, func() error {
lease, _, err := tc.FindRangeLease(desc, &target)
if err != nil {
return err
}
if target != (roachpb.ReplicationTarget{
NodeID: lease.Replica.NodeID,
StoreID: lease.Replica.StoreID,
}) {
return errors.Errorf("lease %v is not held by %+v", lease, target)
}
return nil
})
}

func TestAdminRelocateRange(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()

type intercept struct {
ops []roachpb.ReplicationChange
leaseTarget *roachpb.ReplicationTarget
err error
}
var intercepted []intercept

requireNumAtomic := func(expAtomic int, expSingle int, f func()) {
t.Helper()
intercepted = nil
f()
var actAtomic, actSingle int
for _, ic := range intercepted {
if ic.err != nil {
continue
}
if len(ic.ops) == 2 && ic.ops[0].ChangeType == roachpb.ADD_REPLICA && ic.ops[1].ChangeType == roachpb.REMOVE_REPLICA {
actAtomic++
} else {
actSingle++
}
}
require.Equal(t, expAtomic, actAtomic, "wrong number of atomic changes: %+v", intercepted)
require.Equal(t, expSingle, actSingle, "wrong number of single changes: %+v", intercepted)
}

knobs := base.TestingKnobs{
Store: &storage.StoreTestingKnobs{
BeforeRelocateOne: func(ops []roachpb.ReplicationChange, leaseTarget *roachpb.ReplicationTarget, err error) {
intercepted = append(intercepted, intercept{
ops: ops,
leaseTarget: leaseTarget,
err: err,
})
},
},
}
args := base.TestClusterArgs{
ServerArgs: base.TestServerArgs{Knobs: knobs},
ReplicationMode: base.ReplicationManual,
}
tc := testcluster.StartTestCluster(t, 6, args)
defer tc.Stopper().Stop(ctx)

// s1 (LH) ---> s2 (LH) s1 s3
// Pure upreplication.
k := keys.MustAddr(tc.ScratchRange(t))
{
targets := tc.Targets(1, 0, 2)
// Expect two single additions, and that's it.
requireNumAtomic(0, 2, func() {
relocateAndCheck(t, tc, k, targets)
})
}

// s1 (LH) s2 s3 ---> s4 (LH) s5 s6.
// This is trickier because the leaseholder gets removed, and so do all
// other replicas (i.e. a simple lease transfer at the beginning won't solve
// the problem).
{
targets := tc.Targets(3, 4, 5)
// Should carry out three swaps. Note that the leaseholder gets removed
// in the process (i.e. internally the lease must've been moved around
// to achieve that).
requireNumAtomic(3, 0, func() {
relocateAndCheck(t, tc, k, targets)
})
}

// s4 (LH) s5 s6 ---> s5 (LH)
// Pure downreplication.
{
requireNumAtomic(0, 2, func() {
relocateAndCheck(t, tc, k, tc.Targets(4))
})
}

// s5 (LH) ---> s3 (LH)
// Lateral movement while at replication factor one. In this case atomic
// replication changes cannot be used; we add-then-remove instead.
{
requireNumAtomic(0, 2, func() {
relocateAndCheck(t, tc, k, tc.Targets(2))
})
}

// s3 (LH) ---> s2 (LH) s4 s1 --> s4 (LH) s2 s6 s1 --> s3 (LH) s5
// A grab bag.
{
// s3 -(add)-> s3 s2 -(swap)-> s4 s2 -(add)-> s4 s2 s1 (=s2 s4 s1)
requireNumAtomic(1, 2, func() {
relocateAndCheck(t, tc, k, tc.Targets(1, 3, 0))
})
// s2 s4 s1 -(add)-> s2 s4 s1 s6 (=s4 s2 s6 s1)
requireNumAtomic(0, 1, func() {
relocateAndCheck(t, tc, k, tc.Targets(3, 1, 5, 0))
})
// s4 s2 s6 s1 -(swap)-> s3 s2 s6 s1 -(swap)-> s3 s5 s6 s1 -(del)-> s3 s5 s6 -(del)-> s3 s5
requireNumAtomic(2, 2, func() {
relocateAndCheck(t, tc, k, tc.Targets(2, 4))
})
}
}
2 changes: 1 addition & 1 deletion pkg/storage/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2705,7 +2705,7 @@ func TestChangeReplicasLeaveAtomicRacesWithMerge(t *testing.T) {
ReplicationMode: base.ReplicationManual,
})
// Make a magical context which will allow us to use atomic replica changes.
ctx := context.WithValue(context.Background(), "testing", "testing")
ctx := client.ChangeReplicasCanMixAddAndRemoveContext(context.Background())
defer tc.Stopper().Stop(ctx)

// We want to first get into a joint consensus scenario.
Expand Down
Loading

0 comments on commit 9d1eea0

Please sign in to comment.