Skip to content

Commit

Permalink
storage: move off dead/decommissioning stores via atomic swaps
Browse files Browse the repository at this point in the history
This is the last known work item for cockroachdb#12768. Previously, the replicate
queue would use an add-and-remove strategy to get replicas off
decommissioning or dead stores. Now it uses swaps whenever it can,
subject to the usual restrictions around single-replica groups and
cluster versions.

Release justification: feature completes atomic replication changes.

Release note: None
  • Loading branch information
tbg committed Sep 30, 2019
1 parent 3711de6 commit d3de4dc
Show file tree
Hide file tree
Showing 2 changed files with 242 additions and 17 deletions.
130 changes: 130 additions & 0 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@ import (
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/stateloader"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
Expand Down Expand Up @@ -2928,6 +2930,134 @@ func TestReplicaGCRace(t *testing.T) {
}
}

func requireOnlyAtomicChanges(
t *testing.T, db *sqlutils.SQLRunner, rangeID roachpb.RangeID, repFactor int, start time.Time,
) {
// From all events pertaining to the given rangeID and post-dating the start time,
// filter out those infos which indicate a (full and incoming) voter count in
// excess of the replication factor. Any rows returned have the full info JSON
// strings in them.
const q = `
SELECT
"uniqueID",
count(t) AS repfactor,
string_agg(info, e'\\n') AS infos
FROM
[
SELECT
"uniqueID",
replicas->'node_id' AS n,
COALESCE(replicas->'type', '0') AS t,
info
FROM
system.rangelog,
ROWS FROM (
jsonb_array_elements(
info::JSONB->'UpdatedDesc'->'internal_replicas'
)
)
AS replicas
WHERE
info::JSONB->'UpdatedDesc'->'range_id' = $1::JSONB AND timestamp >= $2
ORDER BY
"timestamp" ASC
]
WHERE
t IN ('0', '2')
GROUP BY
"uniqueID"
HAVING
count(t) > $3;
`
matrix := db.QueryStr(t, q, rangeID, start, repFactor)
if len(matrix) > 0 {
t.Fatalf("more than %d voting replicas: %s", repFactor, sqlutils.MatrixToStr(matrix))
}
}

func TestDecommission(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
tc := testcluster.StartTestCluster(t, 5, base.TestClusterArgs{
ReplicationMode: base.ReplicationAuto,
})
defer tc.Stopper().Stop(ctx)

k := tc.ScratchRange(t)
cc, err := tc.Server(0).RPCContext().GRPCDialNode(tc.Server(0).RPCAddr(), 1, rpc.DefaultClass).Connect(ctx)
require.NoError(t, err)
admin := serverpb.NewAdminClient(cc)
// Decommission the first node, which holds most of the leases.
_, err = admin.Decommission(
ctx, &serverpb.DecommissionRequest{Decommissioning: true},
)
require.NoError(t, err)

requireNoReplicas := func(storeID roachpb.StoreID, repFactor int) {
testutils.SucceedsSoon(t, func() error {
desc := tc.LookupRangeOrFatal(t, k)
for _, rDesc := range desc.Replicas().Voters() {
store, err := tc.Servers[int(rDesc.NodeID-1)].Stores().GetStore(rDesc.StoreID)
require.NoError(t, err)
if err := store.ForceReplicationScanAndProcess(); err != nil {
return err
}
}
if sl := desc.Replicas().Filter(func(rDesc roachpb.ReplicaDescriptor) bool {
return rDesc.StoreID == storeID
}); len(sl) > 0 {
return errors.Errorf("still a replica on s%d: %s", storeID, &desc)
}
if len(desc.Replicas().Voters()) != repFactor {
return errors.Errorf("expected %d replicas: %s", repFactor, &desc)
}
return nil
})
}

const triplicated = 3

requireNoReplicas(1, triplicated)

runner := sqlutils.MakeSQLRunner(tc.ServerConn(0))
ts := timeutil.Now()

_, err = admin.Decommission(
ctx, &serverpb.DecommissionRequest{NodeIDs: []roachpb.NodeID{2}, Decommissioning: true},
)
require.NoError(t, err)

// Both s1 and s2 are out, so neither ought to have replicas.
requireNoReplicas(1, triplicated)
requireNoReplicas(2, triplicated)

// Going from three replicas to three replicas should have used atomic swaps
// only. We didn't verify this before the first decommissioning op because
// lots of ranges were over-replicated due to ranges recently having split
// off from the five-fold replicated system ranges.
requireOnlyAtomicChanges(t, runner, tc.LookupRangeOrFatal(t, k).RangeID, triplicated, ts)

sqlutils.SetZoneConfig(t, runner, "RANGE default", "num_replicas: 1")

const single = 1

// The range should drop down to one replica, neither of which is on a decommissioning store.
requireNoReplicas(1, single)
requireNoReplicas(2, single)

// Decommission two more nodes. Only n5 is left; getting the replicas there
// can't use atomic replica swaps because the leaseholder can't be removed.
_, err = admin.Decommission(
ctx, &serverpb.DecommissionRequest{NodeIDs: []roachpb.NodeID{3, 4}, Decommissioning: true},
)
require.NoError(t, err)

requireNoReplicas(1, single)
requireNoReplicas(2, single)
requireNoReplicas(3, single)
requireNoReplicas(4, single)
}

// TestStoreRangeMoveDecommissioning verifies that if a store is set to
// decommission, the ReplicateQueue will notice and move any replicas on it.
func TestStoreRangeMoveDecommissioning(t *testing.T) {
Expand Down
129 changes: 112 additions & 17 deletions pkg/storage/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -335,20 +336,46 @@ func (rq *replicateQueue) processOneChange(
case AllocatorAdd:
// only include live replicas, since dead replicas should soon be removed
existingReplicas := liveVoterReplicas
return rq.add(ctx, repl, existingReplicas, dryRun)
return rq.addOrReplace(ctx, repl, existingReplicas, -1 /* removeIdx */, dryRun)
case AllocatorRemove:
return rq.remove(ctx, repl, voterReplicas, dryRun)
case AllocatorReplaceDead, AllocatorReplaceDecommissioning:
existingReplicas := liveVoterReplicas
// WIP(tbg): pass a slice of replicas that can be replaced in.
// In ReplaceDead, it's the dead replicas, in ReplaceDecommissioning
// it's the decommissioning ones.
// Rename rq.add to rq.addOrReplace, and let it actually replace a replica
// atomically when there's suitable candidate in the slice.
return rq.add(ctx, repl, existingReplicas, dryRun)
case AllocatorReplaceDead:
if len(deadVoterReplicas) == 0 {
// Nothing to do.
return false, nil
}
removeIdx := -1 // guaranteed to be changed below
for i, rDesc := range voterReplicas {
if rDesc.StoreID == deadVoterReplicas[0].StoreID {
removeIdx = i
break
}
}
return rq.addOrReplace(ctx, repl, voterReplicas, removeIdx, dryRun)
case AllocatorReplaceDecommissioning:
decommissioningReplicas := rq.allocator.storePool.decommissioningReplicas(
desc.RangeID, voterReplicas)
if len(decommissioningReplicas) == 0 {
// Nothing to do.
return false, nil
}
removeIdx := -1 // guaranteed to be changed below
for i, rDesc := range voterReplicas {
if rDesc.StoreID == decommissioningReplicas[0].StoreID {
removeIdx = i
break
}
}
return rq.addOrReplace(ctx, repl, voterReplicas, removeIdx, dryRun)
case AllocatorRemoveDecommissioning:
// NB: this path will only be hit when the range is over-replicated and
// has decommissioning replicas; in the common case we'll hit
// AllocatorReplaceDecommissioning above.
return rq.removeDecommissioning(ctx, repl, dryRun)
case AllocatorRemoveDead:
// NB: this path will only be hit when the range is over-replicated and
// has dead replicas; in the common case we'll hit AllocatorReplaceDead
// above.
return rq.removeDead(ctx, repl, deadVoterReplicas, dryRun)
case AllocatorRemoveLearner:
return rq.removeLearner(ctx, repl, dryRun)
Expand All @@ -359,31 +386,81 @@ func (rq *replicateQueue) processOneChange(
// Requeue because either we failed to transition out of a joint state
// (bad) or we did and there might be more to do for that range.
return true, err
default:
log.Fatalf(ctx, "unknown allocator action %v", action)
}
return true, nil
}

func (rq *replicateQueue) add(
ctx context.Context, repl *Replica, existingReplicas []roachpb.ReplicaDescriptor, dryRun bool,
// addOrReplace adds or replaces a replica. If removeIdx is -1, an addition is
// carried out. Otherwise, removeIdx must be a valid index into existingReplicas
// and specifies which replica to replace with a new one.
//
// The method preferably issues an atomic replica swap, but may not be able to
// do this in all cases, such as when atomic replication changes are not
// available, or when the range consists of a single replica. As a fall back,
// only the addition is carried out; the removal is then a follow-up step for
// the next scanner cycle.
func (rq *replicateQueue) addOrReplace(
ctx context.Context,
repl *Replica,
existingReplicas []roachpb.ReplicaDescriptor,
removeIdx int, // -1 for no removal
dryRun bool,
) (requeue bool, _ error) {
if len(existingReplicas) == 1 {
// If only one replica remains, that replica is the leaseholder and
// we won't be able to swap it out. Ignore the removal and simply add
// a replica.
removeIdx = -1
}
if !rq.store.cfg.Settings.Version.IsActive(cluster.VersionAtomicChangeReplicas) {
// If we can't swap yet, don't.
removeIdx = -1
}

var remainingReplicas []roachpb.ReplicaDescriptor
if removeIdx >= 0 {
remainingReplicas = append(existingReplicas[:removeIdx:removeIdx], existingReplicas[removeIdx+1:]...)
// See about transferring the lease away if we're about to remove the
// leaseholder.
done, err := rq.maybeTransferLeaseAway(ctx, repl, existingReplicas[removeIdx].StoreID, dryRun)
if err != nil {
return false, err
}
if done {
// Lease was transferred away. Next leaseholder is going to take over.
return false, nil
}
} else {
remainingReplicas = existingReplicas
}

desc, zone := repl.DescAndZone()
// Allocate a target assuming that the replica we're replacing (if any) is
// already gone. The allocator should not try to re-add this replica since
// there is a reason we're removing it (i.e. dead or decommissioning). If we
// left the replica in the slice, the allocator would not be guaranteed to
// pick a replica that fills the gap removeRepl leaves once it's gone.
newStore, details, err := rq.allocator.AllocateTarget(
ctx,
zone,
desc.RangeID,
existingReplicas,
remainingReplicas,
)
if err != nil {
return false, err
}
if removeIdx >= 0 && newStore.StoreID == existingReplicas[removeIdx].StoreID {
return false, errors.Errorf("allocator suggested to replace replica on s%d with itself", newStore.StoreID)
}
newReplica := roachpb.ReplicationTarget{
NodeID: newStore.Node.NodeID,
StoreID: newStore.StoreID,
}

clusterNodes := rq.allocator.storePool.ClusterNodeCount()
need := GetNeededReplicas(*zone.NumReplicas, clusterNodes)
willHave := len(existingReplicas) + 1

// Only up-replicate if there are suitable allocation targets such that,
// either the replication goal is met, or it is possible to get to the next
Expand All @@ -395,7 +472,10 @@ func (rq *replicateQueue) add(
// NB: If willHave > need, then always allow up-replicating as that
// will be the case when up-replicating a range with a decommissioning
// replica.
if willHave < need && willHave%2 == 0 {
//
// We skip this check if we're swapping a replica, since that does not
// change the quorum size.
if willHave := len(existingReplicas) + 1; removeIdx < 0 && willHave < need && willHave%2 == 0 {
// This means we are going to up-replicate to an even replica state.
// Check if it is possible to go to an odd replica state beyond it.
oldPlusNewReplicas := append([]roachpb.ReplicaDescriptor(nil), existingReplicas...)
Expand All @@ -417,12 +497,26 @@ func (rq *replicateQueue) add(
}
}
rq.metrics.AddReplicaCount.Inc(1)
log.VEventf(ctx, 1, "adding replica %+v due to under-replication: %s",
newReplica, rangeRaftProgress(repl.RaftStatus(), existingReplicas))
ops := roachpb.MakeReplicationChanges(roachpb.ADD_REPLICA, newReplica)
if removeIdx < 0 {
log.VEventf(ctx, 1, "adding replica %+v: %s",
newReplica, rangeRaftProgress(repl.RaftStatus(), existingReplicas))
} else {
rq.metrics.RemoveReplicaCount.Inc(1)
removeReplica := existingReplicas[removeIdx]
log.VEventf(ctx, 1, "replacing replica %s with %+v: %s",
removeReplica, newReplica, rangeRaftProgress(repl.RaftStatus(), existingReplicas))
ops = append(ops,
roachpb.MakeReplicationChanges(roachpb.REMOVE_REPLICA, roachpb.ReplicationTarget{
StoreID: removeReplica.StoreID,
NodeID: removeReplica.NodeID,
})...)
}

if err := rq.changeReplicas(
ctx,
repl,
roachpb.MakeReplicationChanges(roachpb.ADD_REPLICA, newReplica),
ops,
desc,
SnapshotRequest_RECOVERY,
storagepb.ReasonRangeUnderReplicated,
Expand All @@ -431,6 +525,7 @@ func (rq *replicateQueue) add(
); err != nil {
return false, err
}
// Always requeue to see if more work needs to be done.
return true, nil
}

Expand Down

0 comments on commit d3de4dc

Please sign in to comment.