Skip to content

Commit

Permalink
kvserver: Allow rebalances between stores on the same nodes.
Browse files Browse the repository at this point in the history
Closes cockroachdb#6782

This change modifies the replica_queue to allow rebalances between multiple stores within a single node.
This is possible because we previously introduced atomic rebalances in cockroachdb#12768.

The first step was to remove the constraints in the allocator that prevented same node rebalances and update the validation in the replica_queue to accept these rebalance proposals. There is one caveat that with 1x replication an atomic rebalance is not possible, so we now support adding multiple replicas of the range to the same node under this condition.

With the constraints removed there would be nothing in the allocator to prevent it from placing multiple replicas of a range on the same node across multiple stores. This is not desired because in a simple 3x replication scenario a single node crash may result in a whole range becoming unavailable. Allocator uses locality tags to model failure domains, but a node was not considered to be a locality. It is thus natural to extend the failure domain definition to the node and model it as a locality tier. Now stores on the same node would be factored into the diversity_score and repel each other, just like nodes in the same datacenter do in a multi-region setup.

Release note (performance improvement): This change removes the last roadblock to running CockroachDB with multiple stores (i.e. disks) per node. The allocation algorithm now supports intra-node rebalances, which means CRDB can fully utilize the additional stores on the same node.
  • Loading branch information
lunevalex committed Aug 19, 2020
1 parent 7b1abbf commit c6a87a7
Show file tree
Hide file tree
Showing 13 changed files with 531 additions and 184 deletions.
13 changes: 6 additions & 7 deletions pkg/kv/kvserver/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,8 +502,8 @@ func (a *Allocator) allocateTargetFromList(
analyzedConstraints := constraint.AnalyzeConstraints(
ctx, a.storePool.getStoreDescriptor, candidateReplicas, zone)
candidates := allocateCandidates(
sl, analyzedConstraints, candidateReplicas, a.storePool.getLocalities(candidateReplicas),
options,
sl, analyzedConstraints, candidateReplicas,
a.storePool.getLocalitiesByStore(candidateReplicas), options,
)
log.VEventf(ctx, 3, "allocate candidates: %s", candidates)
if target := candidates.selectGood(a.randGen); target != nil {
Expand Down Expand Up @@ -568,7 +568,7 @@ func (a Allocator) RemoveTarget(
rankedCandidates := removeCandidates(
sl,
analyzedConstraints,
a.storePool.getLocalities(existingReplicas),
a.storePool.getLocalitiesByStore(existingReplicas),
options,
)
log.VEventf(ctx, 3, "remove candidates: %s", rankedCandidates)
Expand Down Expand Up @@ -663,8 +663,7 @@ func (a Allocator) RebalanceTarget(
sl,
analyzedConstraints,
existingReplicas,
a.storePool.getLocalities(existingReplicas),
a.storePool.getNodeLocalityString,
a.storePool.getLocalitiesByStore(existingReplicas),
options,
)

Expand Down Expand Up @@ -742,7 +741,7 @@ func (a Allocator) RebalanceTarget(
}
detailsBytes, err := json.Marshal(dDetails)
if err != nil {
log.Warningf(ctx, "failed to marshal details for choosing rebalance target: %+v", err)
log.VEventf(ctx, 2, "failed to marshal details for choosing rebalance target: %+v", err)
}

addTarget := roachpb.ReplicationTarget{
Expand Down Expand Up @@ -1005,7 +1004,7 @@ func (a Allocator) shouldTransferLeaseUsingStats(
if stats == nil || !enableLoadBasedLeaseRebalancing.Get(&a.storePool.st.SV) {
return decideWithoutStats, roachpb.ReplicaDescriptor{}
}
replicaLocalities := a.storePool.getLocalities(existing)
replicaLocalities := a.storePool.getLocalitiesByNode(existing)
for _, locality := range replicaLocalities {
if len(locality.Tiers) == 0 {
return decideWithoutStats, roachpb.ReplicaDescriptor{}
Expand Down
132 changes: 57 additions & 75 deletions pkg/kv/kvserver/allocator_scorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ func allocateCandidates(
sl StoreList,
constraints constraint.AnalyzedConstraints,
existing []roachpb.ReplicaDescriptor,
existingNodeLocalities map[roachpb.NodeID]roachpb.Locality,
existingStoreLocalities map[roachpb.StoreID]roachpb.Locality,
options scorerOptions,
) candidateList {
var candidates candidateList
Expand All @@ -425,7 +425,7 @@ func allocateCandidates(
if !maxCapacityCheck(s) {
continue
}
diversityScore := diversityAllocateScore(s, existingNodeLocalities)
diversityScore := diversityAllocateScore(s, existingStoreLocalities)
balanceScore := balanceScore(sl, s.Capacity, options)
var convergesScore int
if options.qpsRebalanceThreshold > 0 {
Expand Down Expand Up @@ -463,7 +463,7 @@ func allocateCandidates(
func removeCandidates(
sl StoreList,
constraints constraint.AnalyzedConstraints,
existingNodeLocalities map[roachpb.NodeID]roachpb.Locality,
existingStoreLocalities map[roachpb.StoreID]roachpb.Locality,
options scorerOptions,
) candidateList {
var candidates candidateList
Expand All @@ -478,7 +478,7 @@ func removeCandidates(
})
continue
}
diversityScore := diversityRemovalScore(s.Node.NodeID, existingNodeLocalities)
diversityScore := diversityRemovalScore(s.StoreID, existingStoreLocalities)
balanceScore := balanceScore(sl, s.Capacity, options)
var convergesScore int
if !rebalanceFromConvergesOnMean(sl, s.Capacity) {
Expand Down Expand Up @@ -522,18 +522,13 @@ func rebalanceCandidates(
allStores StoreList,
constraints constraint.AnalyzedConstraints,
existingReplicas []roachpb.ReplicaDescriptor,
existingNodeLocalities map[roachpb.NodeID]roachpb.Locality,
localityLookupFn func(roachpb.NodeID) string,
existingStoreLocalities map[roachpb.StoreID]roachpb.Locality,
options scorerOptions,
) []rebalanceOptions {
// 1. Determine whether existing replicas are valid and/or necessary.
type existingStore struct {
cand candidate
localityStr string
}
existingStores := make(map[roachpb.StoreID]existingStore)
existingStores := make(map[roachpb.StoreID]candidate)
var needRebalanceFrom bool
curDiversityScore := rangeDiversityScore(existingNodeLocalities)
curDiversityScore := rangeDiversityScore(existingStoreLocalities)
for _, store := range allStores.stores {
for _, repl := range existingReplicas {
if store.StoreID != repl.StoreID {
Expand All @@ -544,7 +539,7 @@ func rebalanceCandidates(
if !valid {
if !needRebalanceFrom {
log.VEventf(ctx, 2, "s%d: should-rebalance(invalid): locality:%q",
store.StoreID, store.Node.Locality)
store.StoreID, store.Locality())
}
needRebalanceFrom = true
}
Expand All @@ -555,15 +550,12 @@ func rebalanceCandidates(
}
needRebalanceFrom = true
}
existingStores[store.StoreID] = existingStore{
cand: candidate{
store: store,
valid: valid,
necessary: necessary,
fullDisk: fullDisk,
diversityScore: curDiversityScore,
},
localityStr: localityLookupFn(store.Node.NodeID),
existingStores[store.StoreID] = candidate{
store: store,
valid: valid,
necessary: necessary,
fullDisk: fullDisk,
diversityScore: curDiversityScore,
}
}
}
Expand Down Expand Up @@ -599,8 +591,8 @@ func rebalanceCandidates(
// include Node/Store Attributes because they affect constraints.
var matchedOtherExisting bool
for i, stores := range comparableStores {
if sameLocalityAndAttrs(stores.existing[0], existing.cand.store) {
comparableStores[i].existing = append(comparableStores[i].existing, existing.cand.store)
if sameLocalityAndAttrs(stores.existing[0], existing.store) {
comparableStores[i].existing = append(comparableStores[i].existing, existing.store)
matchedOtherExisting = true
break
}
Expand All @@ -610,37 +602,27 @@ func rebalanceCandidates(
}
var comparableCands candidateList
for _, store := range allStores.stores {
// Nodes that already have a replica on one of their stores aren't valid
// rebalance targets. We do include stores that currently have a replica
// because we want them to be considered as valid stores in the
// ConvergesOnMean calculations below. This is subtle but important.
if nodeHasReplica(store.Node.NodeID, existingReplicas) &&
!storeHasReplica(store.StoreID, existingReplicas) {
log.VEventf(ctx, 2, "nodeHasReplica(n%d, %v)=true",
store.Node.NodeID, existingReplicas)
continue
}
constraintsOK, necessary := rebalanceFromConstraintsCheck(
store, existing.cand.store.StoreID, constraints)
store, existing.store.StoreID, constraints)
maxCapacityOK := maxCapacityCheck(store)
diversityScore := diversityRebalanceFromScore(
store, existing.cand.store.Node.NodeID, existingNodeLocalities)
store, existing.store.StoreID, existingStoreLocalities)
cand := candidate{
store: store,
valid: constraintsOK,
necessary: necessary,
fullDisk: !maxCapacityOK,
diversityScore: diversityScore,
}
if !cand.less(existing.cand) {
if !cand.less(existing) {
comparableCands = append(comparableCands, cand)
if !needRebalanceFrom && !needRebalanceTo && existing.cand.less(cand) {
if !needRebalanceFrom && !needRebalanceTo && existing.less(cand) {
needRebalanceTo = true
log.VEventf(ctx, 2,
"s%d: should-rebalance(necessary/diversity=s%d): oldNecessary:%t, newNecessary:%t, "+
"oldDiversity:%f, newDiversity:%f, locality:%q",
existing.cand.store.StoreID, store.StoreID, existing.cand.necessary, cand.necessary,
existing.cand.diversityScore, cand.diversityScore, store.Node.Locality)
existing.store.StoreID, store.StoreID, existing.necessary, cand.necessary,
existing.diversityScore, cand.diversityScore, store.Locality())
}
}
}
Expand All @@ -655,7 +637,7 @@ func rebalanceCandidates(
bestStores[i] = bestCands[i].store
}
comparableStores = append(comparableStores, comparableStoreList{
existing: []roachpb.StoreDescriptor{existing.cand.store},
existing: []roachpb.StoreDescriptor{existing.store},
sl: makeStoreList(bestStores),
candidates: bestCands,
})
Expand All @@ -673,15 +655,15 @@ func rebalanceCandidates(
outer:
for _, comparable := range comparableStores {
for _, existingCand := range comparable.existing {
if existing.cand.store.StoreID == existingCand.StoreID {
if existing.store.StoreID == existingCand.StoreID {
sl = comparable.sl
break outer
}
}
}
// TODO(a-robinson): Some moderate refactoring could extract this logic out
// into the loop below, avoiding duplicate balanceScore calculations.
if shouldRebalance(ctx, existing.cand.store, sl, options) {
if shouldRebalance(ctx, existing.store, sl, options) {
shouldRebalanceCheck = true
break
}
Expand All @@ -705,24 +687,24 @@ func rebalanceCandidates(
existingDesc, existingStores)
continue
}
if !existing.cand.valid {
existing.cand.details = "constraint check fail"
existingCandidates = append(existingCandidates, existing.cand)
if !existing.valid {
existing.details = "constraint check fail"
existingCandidates = append(existingCandidates, existing)
continue
}
balanceScore := balanceScore(comparable.sl, existing.cand.store.Capacity, options)
balanceScore := balanceScore(comparable.sl, existing.store.Capacity, options)
var convergesScore int
if !rebalanceFromConvergesOnMean(comparable.sl, existing.cand.store.Capacity) {
if !rebalanceFromConvergesOnMean(comparable.sl, existing.store.Capacity) {
// Similarly to in removeCandidates, any replica whose removal
// would not converge the range stats to their means is given a
// constraint score boost of 1 to make it less attractive for
// removal.
convergesScore = 1
}
existing.cand.convergesScore = convergesScore
existing.cand.balanceScore = balanceScore
existing.cand.rangeCount = int(existing.cand.store.Capacity.RangeCount)
existingCandidates = append(existingCandidates, existing.cand)
existing.convergesScore = convergesScore
existing.balanceScore = balanceScore
existing.rangeCount = int(existing.store.Capacity.RangeCount)
existingCandidates = append(existingCandidates, existing)
}

for _, cand := range comparable.candidates {
Expand Down Expand Up @@ -898,7 +880,7 @@ func storeHasReplica(storeID roachpb.StoreID, existing []roachpb.ReplicaDescript
}

func sameLocalityAndAttrs(s1, s2 roachpb.StoreDescriptor) bool {
if !s1.Node.Locality.Equals(s2.Node.Locality) {
if !s1.Locality().Equals(s2.Locality()) {
return false
}
if !s1.Node.Attrs.Equals(s2.Node.Attrs) {
Expand Down Expand Up @@ -1058,14 +1040,14 @@ func constraintsCheck(
// given range is. A higher score means the range is more diverse.
// All below diversity-scoring methods should in theory be implemented by
// calling into this one, but they aren't to avoid allocations.
func rangeDiversityScore(existingNodeLocalities map[roachpb.NodeID]roachpb.Locality) float64 {
func rangeDiversityScore(existingStoreLocalities map[roachpb.StoreID]roachpb.Locality) float64 {
var sumScore float64
var numSamples int
for n1, l1 := range existingNodeLocalities {
for n2, l2 := range existingNodeLocalities {
for s1, l1 := range existingStoreLocalities {
for s2, l2 := range existingStoreLocalities {
// Only compare pairs of replicas where s2 > s1 to avoid computing the
// diversity score between each pair of localities twice.
if n2 <= n1 {
if s2 <= s1 {
continue
}
sumScore += l1.DiversityScore(l2)
Expand All @@ -1082,16 +1064,16 @@ func rangeDiversityScore(existingNodeLocalities map[roachpb.NodeID]roachpb.Local
// desirable it would be to add a replica to store. A higher score means the
// store is a better fit.
func diversityAllocateScore(
store roachpb.StoreDescriptor, existingNodeLocalities map[roachpb.NodeID]roachpb.Locality,
store roachpb.StoreDescriptor, existingStoreLocalities map[roachpb.StoreID]roachpb.Locality,
) float64 {
var sumScore float64
var numSamples int
// We don't need to calculate the overall diversityScore for the range, just
// how well the new store would fit, because for any store that we might
// consider adding the pairwise average diversity of the existing replicas
// is the same.
for _, locality := range existingNodeLocalities {
newScore := store.Node.Locality.DiversityScore(locality)
for _, locality := range existingStoreLocalities {
newScore := store.Locality().DiversityScore(locality)
sumScore += newScore
numSamples++
}
Expand All @@ -1106,15 +1088,15 @@ func diversityAllocateScore(
// it would be to remove a node's replica of a range. A higher score indicates
// that the node is a better fit (i.e. keeping it around is good for diversity).
func diversityRemovalScore(
nodeID roachpb.NodeID, existingNodeLocalities map[roachpb.NodeID]roachpb.Locality,
storeID roachpb.StoreID, existingStoreLocalities map[roachpb.StoreID]roachpb.Locality,
) float64 {
var sumScore float64
var numSamples int
locality := existingNodeLocalities[nodeID]
locality := existingStoreLocalities[storeID]
// We don't need to calculate the overall diversityScore for the range, because the original overall diversityScore
// of this range is always the same.
for otherNodeID, otherLocality := range existingNodeLocalities {
if otherNodeID == nodeID {
for otherStoreID, otherLocality := range existingStoreLocalities {
if otherStoreID == storeID {
continue
}
newScore := locality.DiversityScore(otherLocality)
Expand All @@ -1134,16 +1116,16 @@ func diversityRemovalScore(
// higher score indicates that the provided store is a better fit for the
// range.
func diversityRebalanceScore(
store roachpb.StoreDescriptor, existingNodeLocalities map[roachpb.NodeID]roachpb.Locality,
store roachpb.StoreDescriptor, existingStoreLocalities map[roachpb.StoreID]roachpb.Locality,
) float64 {
if len(existingNodeLocalities) == 0 {
if len(existingStoreLocalities) == 0 {
return roachpb.MaxDiversityScore
}
var maxScore float64
// For every existing node, calculate what the diversity score would be if we
// remove that node's replica to replace it with one on the provided store.
for removedNodeID := range existingNodeLocalities {
score := diversityRebalanceFromScore(store, removedNodeID, existingNodeLocalities)
for removedStoreID := range existingStoreLocalities {
score := diversityRebalanceFromScore(store, removedStoreID, existingStoreLocalities)
if score > maxScore {
maxScore = score
}
Expand All @@ -1159,24 +1141,24 @@ func diversityRebalanceScore(
// range.
func diversityRebalanceFromScore(
store roachpb.StoreDescriptor,
fromNodeID roachpb.NodeID,
existingNodeLocalities map[roachpb.NodeID]roachpb.Locality,
fromStoreID roachpb.StoreID,
existingNodeLocalities map[roachpb.StoreID]roachpb.Locality,
) float64 {
// Compute the pairwise diversity score of all replicas that will exist
// after adding store and removing fromNodeID.
var sumScore float64
var numSamples int
for nodeID, locality := range existingNodeLocalities {
if nodeID == fromNodeID {
for storeID, locality := range existingNodeLocalities {
if storeID == fromStoreID {
continue
}
newScore := store.Node.Locality.DiversityScore(locality)
newScore := store.Locality().DiversityScore(locality)
sumScore += newScore
numSamples++
for otherNodeID, otherLocality := range existingNodeLocalities {
for otherStoreID, otherLocality := range existingNodeLocalities {
// Only compare pairs of replicas where otherNodeID > nodeID to avoid
// computing the diversity score between each pair of localities twice.
if otherNodeID <= nodeID || otherNodeID == fromNodeID {
if otherStoreID <= storeID || otherStoreID == fromStoreID {
continue
}
newScore := locality.DiversityScore(otherLocality)
Expand Down
Loading

0 comments on commit c6a87a7

Please sign in to comment.