Skip to content

Commit

Permalink
Merge #41190
Browse files Browse the repository at this point in the history
41190: storage: move off dead/decommissioning stores via atomic swaps r=danhhz a=tbg

This is the last known work item for #12768. Previously, the replicate
queue would use an add-and-remove strategy to get replicas off
decommissioning or dead stores. Now it uses swaps whenever it can,
subject to the usual restrictions around single-replica groups and
cluster versions.

Release justification: feature completes atomic replication changes.

Release note: None

Co-authored-by: Tobias Schottdorf <tobias.schottdorf@gmail.com>
  • Loading branch information
craig[bot] and tbg committed Oct 1, 2019
2 parents f43f284 + 6df6142 commit 4280403
Show file tree
Hide file tree
Showing 2 changed files with 247 additions and 84 deletions.
189 changes: 124 additions & 65 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@ import (
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/stateloader"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
Expand Down Expand Up @@ -1595,24 +1597,6 @@ func TestStoreRangeUpReplicate(t *testing.T) {
}
}

// getRangeMetadata retrieves the current range descriptor for the target
// range.
func getRangeMetadata(
key roachpb.RKey, mtc *multiTestContext, t *testing.T,
) roachpb.RangeDescriptor {
// Calls to RangeLookup typically use inconsistent reads, but we
// want to do a consistent read here. This is important when we are
// considering one of the metadata ranges: we must not do an
// inconsistent lookup in our own copy of the range.
sender := mtc.dbs[0].NonTransactionalSender()
rs, _, err := client.RangeLookup(context.TODO(), sender, key.AsRawKey(),
roachpb.CONSISTENT, 0 /* prefetchNum */, false /* reverse */)
if err != nil {
t.Fatalf("error getting range metadata: %+v", err)
}
return rs[0]
}

// TestUnreplicateFirstRange verifies that multiTestContext still functions in
// the case where the first range (which contains range metadata) is
// unreplicated from the first store. This situation can arise occasionally in
Expand Down Expand Up @@ -2928,64 +2912,139 @@ func TestReplicaGCRace(t *testing.T) {
}
}

// TestStoreRangeMoveDecommissioning verifies that if a store is set to
// decommission, the ReplicateQueue will notice and move any replicas on it.
func TestStoreRangeMoveDecommissioning(t *testing.T) {
func requireOnlyAtomicChanges(
t *testing.T, db *sqlutils.SQLRunner, rangeID roachpb.RangeID, repFactor int, start time.Time,
) {
// From all events pertaining to the given rangeID and post-dating the start time,
// filter out those infos which indicate a (full and incoming) voter count in
// excess of the replication factor. Any rows returned have the full info JSON
// strings in them.
const q = `
SELECT
"uniqueID",
count(t) AS repfactor,
string_agg(info, e'\\n') AS infos
FROM
[
SELECT
"uniqueID",
replicas->'node_id' AS n,
COALESCE(replicas->'type', '0') AS t,
info
FROM
system.rangelog,
ROWS FROM (
jsonb_array_elements(
info::JSONB->'UpdatedDesc'->'internal_replicas'
)
)
AS replicas
WHERE
info::JSONB->'UpdatedDesc'->'range_id' = $1::JSONB AND timestamp >= $2
ORDER BY
"timestamp" ASC
]
WHERE
t IN ('0', '2')
GROUP BY
"uniqueID"
HAVING
count(t) > $3;
`
matrix := db.QueryStr(t, q, rangeID, start, repFactor)
if len(matrix) > 0 {
t.Fatalf("more than %d voting replicas: %s", repFactor, sqlutils.MatrixToStr(matrix))
}
}

func TestDecommission(t *testing.T) {
defer leaktest.AfterTest(t)()

if util.RaceEnabled {
// Six nodes is too much to reliably run under testrace with our aggressive
// Five nodes is too much to reliably run under testrace with our aggressive
// liveness timings.
t.Skip("skipping under testrace: #39807 and #37811")
}

sc := storage.TestStoreConfig(nil)
sc.TestingKnobs.DisableReplicaRebalancing = true
mtc := &multiTestContext{storeConfig: &sc}
defer mtc.Stop()
mtc.Start(t, 6)
mtc.initGossipNetwork()

// Replicate the range to 2 more stores. Note that there are 4 stores in the
// cluster leaving an extra store available as a replication target once the
// replica on the dead node is removed.
replica := mtc.stores[0].LookupReplica(roachpb.RKeyMin)
mtc.replicateRange(replica.RangeID, 1, 2)

origReplicas := getRangeMetadata(roachpb.RKeyMin, mtc, t).InternalReplicas

ctx := context.Background()
decommingNodeIdx := 2
decommingNodeID := mtc.idents[decommingNodeIdx].NodeID
if _, err := mtc.nodeLivenesses[decommingNodeIdx].
SetDecommissioning(ctx, decommingNodeID, true); err != nil {
t.Fatal(err)
}
tc := testcluster.StartTestCluster(t, 5, base.TestClusterArgs{
ReplicationMode: base.ReplicationAuto,
})
defer tc.Stopper().Stop(ctx)

testutils.SucceedsSoon(t, func() error {
// Force the repair queues on all stores to run.
for _, s := range mtc.stores {
if err := s.ForceReplicationScanAndProcess(); err != nil {
t.Fatal(err)
k := tc.ScratchRange(t)
cc, err := tc.Server(0).RPCContext().GRPCDialNode(tc.Server(0).RPCAddr(), 1, rpc.DefaultClass).Connect(ctx)
require.NoError(t, err)
admin := serverpb.NewAdminClient(cc)
// Decommission the first node, which holds most of the leases.
_, err = admin.Decommission(
ctx, &serverpb.DecommissionRequest{Decommissioning: true},
)
require.NoError(t, err)

requireNoReplicas := func(storeID roachpb.StoreID, repFactor int) {
testutils.SucceedsSoon(t, func() error {
desc := tc.LookupRangeOrFatal(t, k)
for _, rDesc := range desc.Replicas().Voters() {
store, err := tc.Servers[int(rDesc.NodeID-1)].Stores().GetStore(rDesc.StoreID)
require.NoError(t, err)
if err := store.ForceReplicationScanAndProcess(); err != nil {
return err
}
}
}
// Wait for a replacement replica for the decommissioning node to be added
// and the replica on the decommissioning node to be removed.
curReplicas := getRangeMetadata(roachpb.RKeyMin, mtc, t).InternalReplicas
if len(curReplicas) != 5 {
return errors.Errorf("expected 5 replicas, got %v", curReplicas)
}
if reflect.DeepEqual(origReplicas, curReplicas) {
return errors.Errorf("expected replica to be moved, but found them to be same as original %v", origReplicas)
}
for _, r := range curReplicas {
if r.NodeID == decommingNodeID {
return errors.Errorf("expected replica to be moved off node %d, but got replicas %v",
decommingNodeID, curReplicas)
if sl := desc.Replicas().Filter(func(rDesc roachpb.ReplicaDescriptor) bool {
return rDesc.StoreID == storeID
}); len(sl) > 0 {
return errors.Errorf("still a replica on s%d: %s", storeID, &desc)
}
}
return nil
})
if len(desc.Replicas().Voters()) != repFactor {
return errors.Errorf("expected %d replicas: %s", repFactor, &desc)
}
return nil
})
}

const triplicated = 3

requireNoReplicas(1, triplicated)

runner := sqlutils.MakeSQLRunner(tc.ServerConn(0))
ts := timeutil.Now()

_, err = admin.Decommission(
ctx, &serverpb.DecommissionRequest{NodeIDs: []roachpb.NodeID{2}, Decommissioning: true},
)
require.NoError(t, err)

// Both s1 and s2 are out, so neither ought to have replicas.
requireNoReplicas(1, triplicated)
requireNoReplicas(2, triplicated)

// Going from three replicas to three replicas should have used atomic swaps
// only. We didn't verify this before the first decommissioning op because
// lots of ranges were over-replicated due to ranges recently having split
// off from the five-fold replicated system ranges.
requireOnlyAtomicChanges(t, runner, tc.LookupRangeOrFatal(t, k).RangeID, triplicated, ts)

sqlutils.SetZoneConfig(t, runner, "RANGE default", "num_replicas: 1")

const single = 1

// The range should drop down to one replica on a non-decommissioning store.
requireNoReplicas(1, single)
requireNoReplicas(2, single)

// Decommission two more nodes. Only n5 is left; getting the replicas there
// can't use atomic replica swaps because the leaseholder can't be removed.
_, err = admin.Decommission(
ctx, &serverpb.DecommissionRequest{NodeIDs: []roachpb.NodeID{3, 4}, Decommissioning: true},
)
require.NoError(t, err)

requireNoReplicas(1, single)
requireNoReplicas(2, single)
requireNoReplicas(3, single)
requireNoReplicas(4, single)
}

// TestReplicateRogueRemovedNode ensures that a rogue removed node
Expand Down
Loading

0 comments on commit 4280403

Please sign in to comment.