From c6a87a77447d26b2eafa9155f1db1794b66f6aa7 Mon Sep 17 00:00:00 2001 From: Alex Lunev Date: Fri, 17 Jul 2020 13:00:21 -0700 Subject: [PATCH] kvserver: Allow rebalances between stores on the same nodes. Closes #6782 This change modifies the replica_queue to allow rebalances between multiple stores within a single node. This is possible because we previously introduced atomic rebalances in #12768. The first step was to remove the constraints in the allocator that prevented same node rebalances and update the validation in the replica_queue to accept these rebalance proposals. There is one caveat that with 1x replication an atomic rebalance is not possible, so we now support adding multiple replicas of the range to the same node under this condition. With the constraints removed there would be nothing in the allocator to prevent it from placing multiple replicas of a range on the same node across multiple stores. This is not desired because in a simple 3x replication scenario a single node crash may result in a whole range becoming unavailable. Allocator uses locality tags to model failure domains, but a node was not considered to be a locality. It is thus natural to extend the failure domain definition to the node and model it as a locality tier. Now stores on the same node would be factored into the diversity_score and repel each other, just like nodes in the same datacenter do in a multi-region setup. Release note (performance improvement): This change removes the last roadblock to running CockroachDB with multiple stores (i.e. disks) per node. The allocation algorithm now supports intra-node rebalances, which means CRDB can fully utilize the additional stores on the same node. --- pkg/kv/kvserver/allocator.go | 13 +- pkg/kv/kvserver/allocator_scorer.go | 132 +++++++++------------ pkg/kv/kvserver/allocator_scorer_test.go | 79 ++++++------ pkg/kv/kvserver/allocator_test.go | 145 ++++++++++++++++++++--- pkg/kv/kvserver/replica_command.go | 101 +++++++++++----- pkg/kv/kvserver/replica_command_test.go | 139 ++++++++++++++++++++++ pkg/kv/kvserver/replica_test.go | 6 + pkg/kv/kvserver/store_pool.go | 28 ++++- pkg/kv/kvserver/store_pool_test.go | 31 +++-- pkg/kv/kvserver/store_rebalancer.go | 4 +- pkg/roachpb/metadata.go | 18 +++ pkg/roachpb/metadata_test.go | 12 ++ pkg/server/status_test.go | 7 ++ 13 files changed, 531 insertions(+), 184 deletions(-) diff --git a/pkg/kv/kvserver/allocator.go b/pkg/kv/kvserver/allocator.go index b2d38985bdb6..e5b9cf363d12 100644 --- a/pkg/kv/kvserver/allocator.go +++ b/pkg/kv/kvserver/allocator.go @@ -502,8 +502,8 @@ func (a *Allocator) allocateTargetFromList( analyzedConstraints := constraint.AnalyzeConstraints( ctx, a.storePool.getStoreDescriptor, candidateReplicas, zone) candidates := allocateCandidates( - sl, analyzedConstraints, candidateReplicas, a.storePool.getLocalities(candidateReplicas), - options, + sl, analyzedConstraints, candidateReplicas, + a.storePool.getLocalitiesByStore(candidateReplicas), options, ) log.VEventf(ctx, 3, "allocate candidates: %s", candidates) if target := candidates.selectGood(a.randGen); target != nil { @@ -568,7 +568,7 @@ func (a Allocator) RemoveTarget( rankedCandidates := removeCandidates( sl, analyzedConstraints, - a.storePool.getLocalities(existingReplicas), + a.storePool.getLocalitiesByStore(existingReplicas), options, ) log.VEventf(ctx, 3, "remove candidates: %s", rankedCandidates) @@ -663,8 +663,7 @@ func (a Allocator) RebalanceTarget( sl, analyzedConstraints, existingReplicas, - a.storePool.getLocalities(existingReplicas), - a.storePool.getNodeLocalityString, + a.storePool.getLocalitiesByStore(existingReplicas), options, ) @@ -742,7 +741,7 @@ func (a Allocator) RebalanceTarget( } detailsBytes, err := json.Marshal(dDetails) if err != nil { - log.Warningf(ctx, "failed to marshal details for choosing rebalance target: %+v", err) + log.VEventf(ctx, 2, "failed to marshal details for choosing rebalance target: %+v", err) } addTarget := roachpb.ReplicationTarget{ @@ -1005,7 +1004,7 @@ func (a Allocator) shouldTransferLeaseUsingStats( if stats == nil || !enableLoadBasedLeaseRebalancing.Get(&a.storePool.st.SV) { return decideWithoutStats, roachpb.ReplicaDescriptor{} } - replicaLocalities := a.storePool.getLocalities(existing) + replicaLocalities := a.storePool.getLocalitiesByNode(existing) for _, locality := range replicaLocalities { if len(locality.Tiers) == 0 { return decideWithoutStats, roachpb.ReplicaDescriptor{} diff --git a/pkg/kv/kvserver/allocator_scorer.go b/pkg/kv/kvserver/allocator_scorer.go index a13b65eab46c..bf882745acc1 100644 --- a/pkg/kv/kvserver/allocator_scorer.go +++ b/pkg/kv/kvserver/allocator_scorer.go @@ -410,7 +410,7 @@ func allocateCandidates( sl StoreList, constraints constraint.AnalyzedConstraints, existing []roachpb.ReplicaDescriptor, - existingNodeLocalities map[roachpb.NodeID]roachpb.Locality, + existingStoreLocalities map[roachpb.StoreID]roachpb.Locality, options scorerOptions, ) candidateList { var candidates candidateList @@ -425,7 +425,7 @@ func allocateCandidates( if !maxCapacityCheck(s) { continue } - diversityScore := diversityAllocateScore(s, existingNodeLocalities) + diversityScore := diversityAllocateScore(s, existingStoreLocalities) balanceScore := balanceScore(sl, s.Capacity, options) var convergesScore int if options.qpsRebalanceThreshold > 0 { @@ -463,7 +463,7 @@ func allocateCandidates( func removeCandidates( sl StoreList, constraints constraint.AnalyzedConstraints, - existingNodeLocalities map[roachpb.NodeID]roachpb.Locality, + existingStoreLocalities map[roachpb.StoreID]roachpb.Locality, options scorerOptions, ) candidateList { var candidates candidateList @@ -478,7 +478,7 @@ func removeCandidates( }) continue } - diversityScore := diversityRemovalScore(s.Node.NodeID, existingNodeLocalities) + diversityScore := diversityRemovalScore(s.StoreID, existingStoreLocalities) balanceScore := balanceScore(sl, s.Capacity, options) var convergesScore int if !rebalanceFromConvergesOnMean(sl, s.Capacity) { @@ -522,18 +522,13 @@ func rebalanceCandidates( allStores StoreList, constraints constraint.AnalyzedConstraints, existingReplicas []roachpb.ReplicaDescriptor, - existingNodeLocalities map[roachpb.NodeID]roachpb.Locality, - localityLookupFn func(roachpb.NodeID) string, + existingStoreLocalities map[roachpb.StoreID]roachpb.Locality, options scorerOptions, ) []rebalanceOptions { // 1. Determine whether existing replicas are valid and/or necessary. - type existingStore struct { - cand candidate - localityStr string - } - existingStores := make(map[roachpb.StoreID]existingStore) + existingStores := make(map[roachpb.StoreID]candidate) var needRebalanceFrom bool - curDiversityScore := rangeDiversityScore(existingNodeLocalities) + curDiversityScore := rangeDiversityScore(existingStoreLocalities) for _, store := range allStores.stores { for _, repl := range existingReplicas { if store.StoreID != repl.StoreID { @@ -544,7 +539,7 @@ func rebalanceCandidates( if !valid { if !needRebalanceFrom { log.VEventf(ctx, 2, "s%d: should-rebalance(invalid): locality:%q", - store.StoreID, store.Node.Locality) + store.StoreID, store.Locality()) } needRebalanceFrom = true } @@ -555,15 +550,12 @@ func rebalanceCandidates( } needRebalanceFrom = true } - existingStores[store.StoreID] = existingStore{ - cand: candidate{ - store: store, - valid: valid, - necessary: necessary, - fullDisk: fullDisk, - diversityScore: curDiversityScore, - }, - localityStr: localityLookupFn(store.Node.NodeID), + existingStores[store.StoreID] = candidate{ + store: store, + valid: valid, + necessary: necessary, + fullDisk: fullDisk, + diversityScore: curDiversityScore, } } } @@ -599,8 +591,8 @@ func rebalanceCandidates( // include Node/Store Attributes because they affect constraints. var matchedOtherExisting bool for i, stores := range comparableStores { - if sameLocalityAndAttrs(stores.existing[0], existing.cand.store) { - comparableStores[i].existing = append(comparableStores[i].existing, existing.cand.store) + if sameLocalityAndAttrs(stores.existing[0], existing.store) { + comparableStores[i].existing = append(comparableStores[i].existing, existing.store) matchedOtherExisting = true break } @@ -610,21 +602,11 @@ func rebalanceCandidates( } var comparableCands candidateList for _, store := range allStores.stores { - // Nodes that already have a replica on one of their stores aren't valid - // rebalance targets. We do include stores that currently have a replica - // because we want them to be considered as valid stores in the - // ConvergesOnMean calculations below. This is subtle but important. - if nodeHasReplica(store.Node.NodeID, existingReplicas) && - !storeHasReplica(store.StoreID, existingReplicas) { - log.VEventf(ctx, 2, "nodeHasReplica(n%d, %v)=true", - store.Node.NodeID, existingReplicas) - continue - } constraintsOK, necessary := rebalanceFromConstraintsCheck( - store, existing.cand.store.StoreID, constraints) + store, existing.store.StoreID, constraints) maxCapacityOK := maxCapacityCheck(store) diversityScore := diversityRebalanceFromScore( - store, existing.cand.store.Node.NodeID, existingNodeLocalities) + store, existing.store.StoreID, existingStoreLocalities) cand := candidate{ store: store, valid: constraintsOK, @@ -632,15 +614,15 @@ func rebalanceCandidates( fullDisk: !maxCapacityOK, diversityScore: diversityScore, } - if !cand.less(existing.cand) { + if !cand.less(existing) { comparableCands = append(comparableCands, cand) - if !needRebalanceFrom && !needRebalanceTo && existing.cand.less(cand) { + if !needRebalanceFrom && !needRebalanceTo && existing.less(cand) { needRebalanceTo = true log.VEventf(ctx, 2, "s%d: should-rebalance(necessary/diversity=s%d): oldNecessary:%t, newNecessary:%t, "+ "oldDiversity:%f, newDiversity:%f, locality:%q", - existing.cand.store.StoreID, store.StoreID, existing.cand.necessary, cand.necessary, - existing.cand.diversityScore, cand.diversityScore, store.Node.Locality) + existing.store.StoreID, store.StoreID, existing.necessary, cand.necessary, + existing.diversityScore, cand.diversityScore, store.Locality()) } } } @@ -655,7 +637,7 @@ func rebalanceCandidates( bestStores[i] = bestCands[i].store } comparableStores = append(comparableStores, comparableStoreList{ - existing: []roachpb.StoreDescriptor{existing.cand.store}, + existing: []roachpb.StoreDescriptor{existing.store}, sl: makeStoreList(bestStores), candidates: bestCands, }) @@ -673,7 +655,7 @@ func rebalanceCandidates( outer: for _, comparable := range comparableStores { for _, existingCand := range comparable.existing { - if existing.cand.store.StoreID == existingCand.StoreID { + if existing.store.StoreID == existingCand.StoreID { sl = comparable.sl break outer } @@ -681,7 +663,7 @@ func rebalanceCandidates( } // TODO(a-robinson): Some moderate refactoring could extract this logic out // into the loop below, avoiding duplicate balanceScore calculations. - if shouldRebalance(ctx, existing.cand.store, sl, options) { + if shouldRebalance(ctx, existing.store, sl, options) { shouldRebalanceCheck = true break } @@ -705,24 +687,24 @@ func rebalanceCandidates( existingDesc, existingStores) continue } - if !existing.cand.valid { - existing.cand.details = "constraint check fail" - existingCandidates = append(existingCandidates, existing.cand) + if !existing.valid { + existing.details = "constraint check fail" + existingCandidates = append(existingCandidates, existing) continue } - balanceScore := balanceScore(comparable.sl, existing.cand.store.Capacity, options) + balanceScore := balanceScore(comparable.sl, existing.store.Capacity, options) var convergesScore int - if !rebalanceFromConvergesOnMean(comparable.sl, existing.cand.store.Capacity) { + if !rebalanceFromConvergesOnMean(comparable.sl, existing.store.Capacity) { // Similarly to in removeCandidates, any replica whose removal // would not converge the range stats to their means is given a // constraint score boost of 1 to make it less attractive for // removal. convergesScore = 1 } - existing.cand.convergesScore = convergesScore - existing.cand.balanceScore = balanceScore - existing.cand.rangeCount = int(existing.cand.store.Capacity.RangeCount) - existingCandidates = append(existingCandidates, existing.cand) + existing.convergesScore = convergesScore + existing.balanceScore = balanceScore + existing.rangeCount = int(existing.store.Capacity.RangeCount) + existingCandidates = append(existingCandidates, existing) } for _, cand := range comparable.candidates { @@ -898,7 +880,7 @@ func storeHasReplica(storeID roachpb.StoreID, existing []roachpb.ReplicaDescript } func sameLocalityAndAttrs(s1, s2 roachpb.StoreDescriptor) bool { - if !s1.Node.Locality.Equals(s2.Node.Locality) { + if !s1.Locality().Equals(s2.Locality()) { return false } if !s1.Node.Attrs.Equals(s2.Node.Attrs) { @@ -1058,14 +1040,14 @@ func constraintsCheck( // given range is. A higher score means the range is more diverse. // All below diversity-scoring methods should in theory be implemented by // calling into this one, but they aren't to avoid allocations. -func rangeDiversityScore(existingNodeLocalities map[roachpb.NodeID]roachpb.Locality) float64 { +func rangeDiversityScore(existingStoreLocalities map[roachpb.StoreID]roachpb.Locality) float64 { var sumScore float64 var numSamples int - for n1, l1 := range existingNodeLocalities { - for n2, l2 := range existingNodeLocalities { + for s1, l1 := range existingStoreLocalities { + for s2, l2 := range existingStoreLocalities { // Only compare pairs of replicas where s2 > s1 to avoid computing the // diversity score between each pair of localities twice. - if n2 <= n1 { + if s2 <= s1 { continue } sumScore += l1.DiversityScore(l2) @@ -1082,7 +1064,7 @@ func rangeDiversityScore(existingNodeLocalities map[roachpb.NodeID]roachpb.Local // desirable it would be to add a replica to store. A higher score means the // store is a better fit. func diversityAllocateScore( - store roachpb.StoreDescriptor, existingNodeLocalities map[roachpb.NodeID]roachpb.Locality, + store roachpb.StoreDescriptor, existingStoreLocalities map[roachpb.StoreID]roachpb.Locality, ) float64 { var sumScore float64 var numSamples int @@ -1090,8 +1072,8 @@ func diversityAllocateScore( // how well the new store would fit, because for any store that we might // consider adding the pairwise average diversity of the existing replicas // is the same. - for _, locality := range existingNodeLocalities { - newScore := store.Node.Locality.DiversityScore(locality) + for _, locality := range existingStoreLocalities { + newScore := store.Locality().DiversityScore(locality) sumScore += newScore numSamples++ } @@ -1106,15 +1088,15 @@ func diversityAllocateScore( // it would be to remove a node's replica of a range. A higher score indicates // that the node is a better fit (i.e. keeping it around is good for diversity). func diversityRemovalScore( - nodeID roachpb.NodeID, existingNodeLocalities map[roachpb.NodeID]roachpb.Locality, + storeID roachpb.StoreID, existingStoreLocalities map[roachpb.StoreID]roachpb.Locality, ) float64 { var sumScore float64 var numSamples int - locality := existingNodeLocalities[nodeID] + locality := existingStoreLocalities[storeID] // We don't need to calculate the overall diversityScore for the range, because the original overall diversityScore // of this range is always the same. - for otherNodeID, otherLocality := range existingNodeLocalities { - if otherNodeID == nodeID { + for otherStoreID, otherLocality := range existingStoreLocalities { + if otherStoreID == storeID { continue } newScore := locality.DiversityScore(otherLocality) @@ -1134,16 +1116,16 @@ func diversityRemovalScore( // higher score indicates that the provided store is a better fit for the // range. func diversityRebalanceScore( - store roachpb.StoreDescriptor, existingNodeLocalities map[roachpb.NodeID]roachpb.Locality, + store roachpb.StoreDescriptor, existingStoreLocalities map[roachpb.StoreID]roachpb.Locality, ) float64 { - if len(existingNodeLocalities) == 0 { + if len(existingStoreLocalities) == 0 { return roachpb.MaxDiversityScore } var maxScore float64 // For every existing node, calculate what the diversity score would be if we // remove that node's replica to replace it with one on the provided store. - for removedNodeID := range existingNodeLocalities { - score := diversityRebalanceFromScore(store, removedNodeID, existingNodeLocalities) + for removedStoreID := range existingStoreLocalities { + score := diversityRebalanceFromScore(store, removedStoreID, existingStoreLocalities) if score > maxScore { maxScore = score } @@ -1159,24 +1141,24 @@ func diversityRebalanceScore( // range. func diversityRebalanceFromScore( store roachpb.StoreDescriptor, - fromNodeID roachpb.NodeID, - existingNodeLocalities map[roachpb.NodeID]roachpb.Locality, + fromStoreID roachpb.StoreID, + existingNodeLocalities map[roachpb.StoreID]roachpb.Locality, ) float64 { // Compute the pairwise diversity score of all replicas that will exist // after adding store and removing fromNodeID. var sumScore float64 var numSamples int - for nodeID, locality := range existingNodeLocalities { - if nodeID == fromNodeID { + for storeID, locality := range existingNodeLocalities { + if storeID == fromStoreID { continue } - newScore := store.Node.Locality.DiversityScore(locality) + newScore := store.Locality().DiversityScore(locality) sumScore += newScore numSamples++ - for otherNodeID, otherLocality := range existingNodeLocalities { + for otherStoreID, otherLocality := range existingNodeLocalities { // Only compare pairs of replicas where otherNodeID > nodeID to avoid // computing the diversity score between each pair of localities twice. - if otherNodeID <= nodeID || otherNodeID == fromNodeID { + if otherStoreID <= storeID || otherStoreID == fromStoreID { continue } newScore := locality.DiversityScore(otherLocality) diff --git a/pkg/kv/kvserver/allocator_scorer_test.go b/pkg/kv/kvserver/allocator_scorer_test.go index a59c022f1b83..941a365d3f7b 100644 --- a/pkg/kv/kvserver/allocator_scorer_test.go +++ b/pkg/kv/kvserver/allocator_scorer_test.go @@ -482,7 +482,7 @@ var ( Attrs: []string{"a"}, }, Node: roachpb.NodeDescriptor{ - NodeID: roachpb.NodeID(testStoreUSa15Dupe), + NodeID: roachpb.NodeID(testStoreUSa15), Locality: roachpb.Locality{ Tiers: testStoreTierSetup("us", "a", "1", "5"), }, @@ -1082,10 +1082,10 @@ func TestShouldRebalanceDiversity(t *testing.T) { }, } } - localityForNodeID := func(sl StoreList, id roachpb.NodeID) roachpb.Locality { + localityForNodeID := func(sl StoreList, id roachpb.StoreID) roachpb.Locality { for _, store := range sl.stores { - if store.Node.NodeID == id { - return store.Node.Locality + if store.StoreID == id { + return store.Locality() } } t.Fatalf("no locality for n%d in StoreList %+v", id, sl) @@ -1191,14 +1191,15 @@ func TestShouldRebalanceDiversity(t *testing.T) { } filteredSL := tc.sl filteredSL.stores = append([]roachpb.StoreDescriptor(nil), filteredSL.stores...) - existingNodeLocalities := make(map[roachpb.NodeID]roachpb.Locality) + existingStoreLocalities := make(map[roachpb.StoreID]roachpb.Locality) var replicas []roachpb.ReplicaDescriptor for _, nodeID := range tc.existingNodeIDs { + storeID := roachpb.StoreID(nodeID) replicas = append(replicas, roachpb.ReplicaDescriptor{ NodeID: nodeID, - StoreID: roachpb.StoreID(nodeID), + StoreID: storeID, }) - existingNodeLocalities[nodeID] = localityForNodeID(tc.sl, nodeID) + existingStoreLocalities[storeID] = localityForNodeID(tc.sl, storeID) // For the sake of testing, remove all other existing stores from the // store list to only test whether we want to remove the replica on tc.s. if nodeID != tc.s.Node.NodeID { @@ -1211,11 +1212,7 @@ func TestShouldRebalanceDiversity(t *testing.T) { filteredSL, constraint.AnalyzedConstraints{}, replicas, - existingNodeLocalities, - func(nodeID roachpb.NodeID) string { - locality := localityForNodeID(tc.sl, nodeID) - return locality.String() - }, + existingStoreLocalities, options) actual := len(targets) > 0 if actual != tc.expected { @@ -1254,17 +1251,17 @@ func TestAllocateDiversityScore(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - existingNodeLocalities := make(map[roachpb.NodeID]roachpb.Locality) + existingStoreLocalities := make(map[roachpb.StoreID]roachpb.Locality) for _, s := range tc.stores { - existingNodeLocalities[testStores[s].Node.NodeID] = testStores[s].Node.Locality + existingStoreLocalities[testStores[s].StoreID] = testStores[s].Locality() } var scores storeScores for _, s := range testStores { - if _, ok := existingNodeLocalities[s.Node.NodeID]; ok { + if _, ok := existingStoreLocalities[s.StoreID]; ok { continue } var score storeScore - actualScore := diversityAllocateScore(s, existingNodeLocalities) + actualScore := diversityAllocateScore(s, existingStoreLocalities) score.storeID = s.StoreID score.score = actualScore scores = append(scores, score) @@ -1329,17 +1326,17 @@ func TestRebalanceToDiversityScore(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - existingNodeLocalities := make(map[roachpb.NodeID]roachpb.Locality) + existingStoreLocalities := make(map[roachpb.StoreID]roachpb.Locality) for _, s := range tc.stores { - existingNodeLocalities[testStores[s].Node.NodeID] = testStores[s].Node.Locality + existingStoreLocalities[testStores[s].StoreID] = testStores[s].Locality() } var scores storeScores for _, s := range testStores { - if _, ok := existingNodeLocalities[s.Node.NodeID]; ok { + if _, ok := existingStoreLocalities[s.StoreID]; ok { continue } var score storeScore - actualScore := diversityRebalanceScore(s, existingNodeLocalities) + actualScore := diversityRebalanceScore(s, existingStoreLocalities) score.storeID = s.StoreID score.score = actualScore scores = append(scores, score) @@ -1400,15 +1397,15 @@ func TestRemovalDiversityScore(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - existingNodeLocalities := make(map[roachpb.NodeID]roachpb.Locality) + existingStoreLocalities := make(map[roachpb.StoreID]roachpb.Locality) for _, s := range tc.stores { - existingNodeLocalities[testStores[s].Node.NodeID] = testStores[s].Node.Locality + existingStoreLocalities[testStores[s].StoreID] = testStores[s].Locality() } var scores storeScores for _, storeID := range tc.stores { s := testStores[storeID] var score storeScore - actualScore := diversityRemovalScore(s.Node.NodeID, existingNodeLocalities) + actualScore := diversityRemovalScore(s.StoreID, existingStoreLocalities) score.storeID = s.StoreID score.score = actualScore scores = append(scores, score) @@ -1436,19 +1433,19 @@ func TestDiversityScoreEquivalence(t *testing.T) { {[]roachpb.StoreID{testStoreUSa15}, 1.0}, {[]roachpb.StoreID{testStoreUSa15, testStoreUSa15Dupe}, 0.0}, {[]roachpb.StoreID{testStoreUSa15, testStoreUSa1}, 0.25}, - {[]roachpb.StoreID{testStoreUSa15, testStoreUSb}, 0.5}, + {[]roachpb.StoreID{testStoreUSa15, testStoreUSb}, 2.0 / 3.0}, {[]roachpb.StoreID{testStoreUSa15, testStoreEurope}, 1.0}, {[]roachpb.StoreID{testStoreUSa15, testStoreUSa15Dupe, testStoreUSa1}, 1.0 / 6.0}, - {[]roachpb.StoreID{testStoreUSa15, testStoreUSa15Dupe, testStoreUSb}, 1.0 / 3.0}, + {[]roachpb.StoreID{testStoreUSa15, testStoreUSa15Dupe, testStoreUSb}, 4.0 / 9.0}, {[]roachpb.StoreID{testStoreUSa15, testStoreUSa15Dupe, testStoreEurope}, 2.0 / 3.0}, - {[]roachpb.StoreID{testStoreUSa15, testStoreUSa1, testStoreUSb}, 5.0 / 12.0}, + {[]roachpb.StoreID{testStoreUSa15, testStoreUSa1, testStoreUSb}, 19.0 / 36.0}, {[]roachpb.StoreID{testStoreUSa15, testStoreUSa1, testStoreEurope}, 3.0 / 4.0}, - {[]roachpb.StoreID{testStoreUSa1, testStoreUSb, testStoreEurope}, 5.0 / 6.0}, - {[]roachpb.StoreID{testStoreUSa15, testStoreUSa15Dupe, testStoreUSa1, testStoreUSb}, 1.0 / 3.0}, + {[]roachpb.StoreID{testStoreUSa1, testStoreUSb, testStoreEurope}, 8.0 / 9.0}, + {[]roachpb.StoreID{testStoreUSa15, testStoreUSa15Dupe, testStoreUSa1, testStoreUSb}, 5.0 / 12.0}, {[]roachpb.StoreID{testStoreUSa15, testStoreUSa15Dupe, testStoreUSa1, testStoreEurope}, 7.0 / 12.0}, - {[]roachpb.StoreID{testStoreUSa15, testStoreUSa15Dupe, testStoreUSb, testStoreEurope}, 2.0 / 3.0}, - {[]roachpb.StoreID{testStoreUSa15, testStoreUSa1, testStoreUSb, testStoreEurope}, 17.0 / 24.0}, - {[]roachpb.StoreID{testStoreUSa15, testStoreUSa15Dupe, testStoreUSa1, testStoreUSb, testStoreEurope}, 3.0 / 5.0}, + {[]roachpb.StoreID{testStoreUSa15, testStoreUSa15Dupe, testStoreUSb, testStoreEurope}, 26.0 / 36.0}, + {[]roachpb.StoreID{testStoreUSa15, testStoreUSa1, testStoreUSb, testStoreEurope}, 55.0 / 72.0}, + {[]roachpb.StoreID{testStoreUSa15, testStoreUSa15Dupe, testStoreUSa1, testStoreUSb, testStoreEurope}, 13.0 / 20.0}, } // Ensure that rangeDiversityScore and diversityRebalanceFromScore return @@ -1457,27 +1454,27 @@ func TestDiversityScoreEquivalence(t *testing.T) { // diversityAllocateScore and diversityRemovalScore as of their initial // creation or else we would test them here as well. for _, tc := range testCases { - existingLocalities := make(map[roachpb.NodeID]roachpb.Locality) + existingLocalities := make(map[roachpb.StoreID]roachpb.Locality) for _, storeID := range tc.stores { s := testStores[storeID] - existingLocalities[s.Node.NodeID] = s.Node.Locality + existingLocalities[s.StoreID] = s.Locality() } rangeScore := rangeDiversityScore(existingLocalities) - if a, e := rangeScore, tc.expected; a != e { + if a, e := rangeScore, tc.expected; !scoresAlmostEqual(a, e) { t.Errorf("rangeDiversityScore(%v) got %f, want %f", existingLocalities, a, e) } for _, storeID := range tc.stores { s := testStores[storeID] - fromNodeID := s.Node.NodeID - s.Node.NodeID = 99 - rebalanceScore := diversityRebalanceFromScore(s, fromNodeID, existingLocalities) - if a, e := rebalanceScore, tc.expected; a != e { + fromStoreID := s.StoreID + s.StoreID = 99 + rebalanceScore := diversityRebalanceFromScore(s, fromStoreID, existingLocalities) + if a, e := rebalanceScore, tc.expected; !scoresAlmostEqual(a, e) { t.Errorf("diversityRebalanceFromScore(%v, %d, %v) got %f, want %f", - s, fromNodeID, existingLocalities, a, e) + s, fromStoreID, existingLocalities, a, e) } - if a, e := rebalanceScore, rangeScore; a != e { + if a, e := rebalanceScore, rangeScore; !scoresAlmostEqual(a, e) { t.Errorf("diversityRebalanceFromScore(%v, %d, %v)=%f not equal to rangeDiversityScore(%v)=%f", - s, fromNodeID, existingLocalities, a, existingLocalities, e) + s, fromStoreID, existingLocalities, a, existingLocalities, e) } } } diff --git a/pkg/kv/kvserver/allocator_test.go b/pkg/kv/kvserver/allocator_test.go index 7b5b40aa316a..c793adf8de63 100644 --- a/pkg/kv/kvserver/allocator_test.go +++ b/pkg/kv/kvserver/allocator_test.go @@ -550,28 +550,32 @@ func TestAllocatorMultipleStoresPerNode(t *testing.T) { gossiputil.NewStoreGossiper(g).GossipStores(stores, t) testCases := []struct { - existing []roachpb.ReplicaDescriptor - expectTarget bool + existing []roachpb.ReplicaDescriptor + expectTargetAllocate bool + expectTargetRebalance bool }{ { existing: []roachpb.ReplicaDescriptor{ {NodeID: 1, StoreID: 1}, }, - expectTarget: true, + expectTargetAllocate: true, + expectTargetRebalance: true, }, { existing: []roachpb.ReplicaDescriptor{ {NodeID: 1, StoreID: 2}, {NodeID: 2, StoreID: 3}, }, - expectTarget: true, + expectTargetAllocate: true, + expectTargetRebalance: true, }, { existing: []roachpb.ReplicaDescriptor{ {NodeID: 1, StoreID: 2}, {NodeID: 3, StoreID: 6}, }, - expectTarget: true, + expectTargetAllocate: true, + expectTargetRebalance: true, }, { existing: []roachpb.ReplicaDescriptor{ @@ -579,7 +583,8 @@ func TestAllocatorMultipleStoresPerNode(t *testing.T) { {NodeID: 2, StoreID: 3}, {NodeID: 3, StoreID: 5}, }, - expectTarget: false, + expectTargetAllocate: false, + expectTargetRebalance: true, }, { existing: []roachpb.ReplicaDescriptor{ @@ -587,7 +592,8 @@ func TestAllocatorMultipleStoresPerNode(t *testing.T) { {NodeID: 2, StoreID: 4}, {NodeID: 3, StoreID: 6}, }, - expectTarget: false, + expectTargetAllocate: false, + expectTargetRebalance: false, }, } @@ -598,9 +604,9 @@ func TestAllocatorMultipleStoresPerNode(t *testing.T) { zonepb.EmptyCompleteZoneConfig(), tc.existing, ) - if e, a := tc.expectTarget, result != nil; e != a { + if e, a := tc.expectTargetAllocate, result != nil; e != a { t.Errorf("AllocateTarget(%v) got target %v, err %v; expectTarget=%v", - tc.existing, result, err, tc.expectTarget) + tc.existing, result, err, tc.expectTargetAllocate) } } @@ -614,14 +620,118 @@ func TestAllocatorMultipleStoresPerNode(t *testing.T) { rangeUsageInfo, storeFilterThrottled, ) - if e, a := tc.expectTarget, ok; e != a { + if e, a := tc.expectTargetRebalance, ok; e != a { t.Errorf("RebalanceTarget(%v) got target %v, details %v; expectTarget=%v", - tc.existing, target, details, tc.expectTarget) + tc.existing, target, details, tc.expectTargetRebalance) } } } } +func TestAllocatorMultipleStoresPerNodeLopsided(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + store1 := roachpb.StoreDescriptor{ + StoreID: 1, + Node: roachpb.NodeDescriptor{NodeID: 1}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 100, RangeCount: 40}, + } + store2 := roachpb.StoreDescriptor{ + StoreID: 2, + Node: roachpb.NodeDescriptor{NodeID: 1}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 100, RangeCount: 0}, + } + + // We start out with 40 ranges on 3 nodes and 3 stores, we then add a new store + // on Node 1 and try to rebalance all the ranges. What we want to see happen + // is an equilibrium where 20 ranges move from Store 1 to Store 2. + stores := []*roachpb.StoreDescriptor{ + &store1, + &store2, + { + StoreID: 3, + Node: roachpb.NodeDescriptor{NodeID: 2}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 100, RangeCount: 40}, + }, + { + StoreID: 4, + Node: roachpb.NodeDescriptor{NodeID: 3}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 100, RangeCount: 40}, + }, + } + + ranges := make([]roachpb.RangeDescriptor, 40) + for i := 0; i < 40; i++ { + ranges[i] = roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1}, + {NodeID: 2, StoreID: 3}, + {NodeID: 3, StoreID: 4}, + }, + } + } + + stopper, g, _, a, _ := createTestAllocator(10, false /* deterministic */) + defer stopper.Stop(context.Background()) + storeGossiper := gossiputil.NewStoreGossiper(g) + storeGossiper.GossipStores(stores, t) + + // We run through all the ranges once to get the cluster to balance. + // After that we should not be seeing replicas move. + var rangeUsageInfo RangeUsageInfo + for i := 1; i < 40; i++ { + add, remove, _, ok := a.RebalanceTarget( + context.Background(), + zonepb.EmptyCompleteZoneConfig(), + nil, /* raftStatus */ + ranges[i].InternalReplicas, + rangeUsageInfo, + storeFilterThrottled, + ) + if ok { + // Update the descriptor. + newReplicas := make([]roachpb.ReplicaDescriptor, 0, len(ranges[i].InternalReplicas)) + for _, repl := range ranges[i].InternalReplicas { + if remove.StoreID != repl.StoreID { + newReplicas = append(newReplicas, repl) + } + } + newReplicas = append(newReplicas, roachpb.ReplicaDescriptor{ + StoreID: add.StoreID, + NodeID: add.NodeID, + }) + ranges[i].InternalReplicas = newReplicas + + for _, store := range stores { + if store.StoreID == add.StoreID { + store.Capacity.RangeCount = store.Capacity.RangeCount + 1 + } else if store.StoreID == remove.StoreID { + store.Capacity.RangeCount = store.Capacity.RangeCount - 1 + } + } + storeGossiper.GossipStores(stores, t) + } + } + + // Verify that the stores are reasonably balanced. + require.True(t, math.Abs(float64( + store1.Capacity.RangeCount-store2.Capacity.RangeCount)) <= minRangeRebalanceThreshold*2) + // We dont expect any range wanting to move since the system should have + // reached a stable state at this point. + for i := 1; i < 40; i++ { + _, _, _, ok := a.RebalanceTarget( + context.Background(), + zonepb.EmptyCompleteZoneConfig(), + nil, /* raftStatus */ + ranges[i].InternalReplicas, + rangeUsageInfo, + storeFilterThrottled, + ) + require.False(t, ok) + } +} + // TestAllocatorRebalance verifies that rebalance targets are chosen // randomly from amongst stores under the maxFractionUsedThreshold. func TestAllocatorRebalance(t *testing.T) { @@ -2559,7 +2669,7 @@ func TestAllocateCandidatesNumReplicasConstraints(t *testing.T) { sl, analyzed, existingRepls, - a.storePool.getLocalities(existingRepls), + a.storePool.getLocalitiesByStore(existingRepls), a.scorerOptions(), ) best := candidates.best() @@ -2782,7 +2892,7 @@ func TestRemoveCandidatesNumReplicasConstraints(t *testing.T) { candidates := removeCandidates( sl, analyzed, - a.storePool.getLocalities(existingRepls), + a.storePool.getLocalitiesByStore(existingRepls), a.scorerOptions(), ) if !expectedStoreIDsMatch(tc.expected, candidates.worst()) { @@ -3579,8 +3689,7 @@ func TestRebalanceCandidatesNumReplicasConstraints(t *testing.T) { sl, analyzed, existingRepls, - a.storePool.getLocalities(existingRepls), - a.storePool.getNodeLocalityString, + a.storePool.getLocalitiesByStore(existingRepls), a.scorerOptions(), ) match := true @@ -5477,9 +5586,9 @@ func TestAllocatorRebalanceAway(t *testing.T) { } existingReplicas := []roachpb.ReplicaDescriptor{ - {StoreID: stores[0].StoreID}, - {StoreID: stores[1].StoreID}, - {StoreID: stores[2].StoreID}, + {StoreID: stores[0].StoreID, NodeID: stores[0].Node.NodeID}, + {StoreID: stores[1].StoreID, NodeID: stores[1].Node.NodeID}, + {StoreID: stores[2].StoreID, NodeID: stores[2].Node.NodeID}, } testCases := []struct { constraint zonepb.Constraint diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 22d13a2ccfe1..2e2780a86162 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -1125,50 +1125,95 @@ func validateReplicationChanges( ) error { // First make sure that the changes don't self-overlap (i.e. we're not adding // a replica twice, or removing and immediately re-adding it). - byNodeID := make(map[roachpb.NodeID]roachpb.ReplicationChange, len(chgs)) + byNodeAndStoreID := make(map[roachpb.NodeID]map[roachpb.StoreID]roachpb.ReplicationChange, len(chgs)) for _, chg := range chgs { - if _, ok := byNodeID[chg.Target.NodeID]; ok { - return fmt.Errorf("changes %+v refer to n%d twice", chgs, chg.Target.NodeID) + byStoreID, ok := byNodeAndStoreID[chg.Target.NodeID] + if !ok { + byStoreID = make(map[roachpb.StoreID]roachpb.ReplicationChange) + byNodeAndStoreID[chg.Target.NodeID] = byStoreID + } else { + // The only operation that is allowed within a node is an Add/Remove. + for _, prevChg := range byStoreID { + if prevChg.ChangeType == chg.ChangeType { + return fmt.Errorf("changes %+v refer to n%d twice for change %v", + chgs, chg.Target.NodeID, chg.ChangeType) + } + if prevChg.ChangeType != roachpb.ADD_REPLICA { + return fmt.Errorf("can only add-remove a replica within a node, but got %+v", chgs) + } + } } - byNodeID[chg.Target.NodeID] = chg + if _, ok := byStoreID[chg.Target.StoreID]; ok { + return fmt.Errorf("changes %+v refer to n%d and s%d twice", chgs, + chg.Target.NodeID, chg.Target.StoreID) + } + byStoreID[chg.Target.StoreID] = chg } // Then, check that we're not adding a second replica on nodes that already - // have one, or "re-add" an existing replica. We delete from byNodeID so that - // after this loop, it contains only StoreIDs that we haven't seen in desc. + // have one, or "re-add" an existing replica. We delete from byNodeAndStoreID so that + // after this loop, it contains only Nodes that we haven't seen in desc. for _, rDesc := range desc.Replicas().All() { - chg, ok := byNodeID[rDesc.NodeID] - delete(byNodeID, rDesc.NodeID) - if !ok || chg.ChangeType != roachpb.ADD_REPLICA { + byStoreID, ok := byNodeAndStoreID[rDesc.NodeID] + if !ok { continue } - // We're adding a replica that's already there. This isn't allowed, even - // when the newly added one would be on a different store. - if rDesc.StoreID != chg.Target.StoreID { - return errors.Errorf("unable to add replica %v; node already has a replica in %s", chg.Target.StoreID, desc) + delete(byNodeAndStoreID, rDesc.NodeID) + if len(byStoreID) == 2 { + chg, k := byStoreID[rDesc.StoreID] + // We should be removing the replica from the existing store during a + // rebalance within the node. + if !k || chg.ChangeType != roachpb.REMOVE_REPLICA { + return errors.Errorf( + "Expected replica to be removed from %v during a lateral rebalance %v within the node.", rDesc, chgs) + } + continue } + chg, ok := byStoreID[rDesc.StoreID] + // There are two valid conditions here: + // (1) removal of an existing store. + // (2) add on the node, when we only have one replica. + // See https://github.com/cockroachdb/cockroach/issues/40333. + if ok { + if chg.ChangeType == roachpb.REMOVE_REPLICA { + continue + } + // Looks like we found a replica with the same store and node id. If the + // replica is already a learner, then either some previous leaseholder was + // trying to add it with the learner+snapshot+voter cycle and got + // interrupted or else we hit a race between the replicate queue and + // AdminChangeReplicas. + if rDesc.GetType() == roachpb.LEARNER { + return errors.Errorf( + "unable to add replica %v which is already present as a learner in %s", chg.Target, desc) + } - // Looks like we found a replica with the same store and node id. If the - // replica is already a learner, then either some previous leaseholder was - // trying to add it with the learner+snapshot+voter cycle and got - // interrupted or else we hit a race between the replicate queue and - // AdminChangeReplicas. - if rDesc.GetType() == roachpb.LEARNER { - return errors.Errorf( - "unable to add replica %v which is already present as a learner in %s", chg.Target, desc) + // Otherwise, we already had a full voter replica. Can't add another to + // this store. + return errors.Errorf("unable to add replica %v which is already present in %s", chg.Target, desc) } - // Otherwise, we already had a full voter replica. Can't add another to - // this store. - return errors.Errorf("unable to add replica %v which is already present in %s", chg.Target, desc) + for _, c := range byStoreID { + // We're adding a replica that's already there. This isn't allowed, even + // when the newly added one would be on a different store. + if c.ChangeType == roachpb.ADD_REPLICA { + if len(desc.Replicas().All()) > 1 { + return errors.Errorf("unable to add replica %v; node already has a replica in %s", c.Target.StoreID, desc) + } + } else { + return errors.Errorf("removing %v which is not in %s", c.Target, desc) + } + } } // Any removals left in the map now refer to nonexisting replicas, and we refuse them. - for _, chg := range byNodeID { - if chg.ChangeType != roachpb.REMOVE_REPLICA { - continue + for _, byStoreID := range byNodeAndStoreID { + for _, chg := range byStoreID { + if chg.ChangeType != roachpb.REMOVE_REPLICA { + continue + } + return errors.Errorf("removing %v which is not in %s", chg.Target, desc) } - return errors.Errorf("removing %v which is not in %s", chg.Target, desc) } return nil } diff --git a/pkg/kv/kvserver/replica_command_test.go b/pkg/kv/kvserver/replica_command_test.go index 75bb0c030787..48328d6ea408 100644 --- a/pkg/kv/kvserver/replica_command_test.go +++ b/pkg/kv/kvserver/replica_command_test.go @@ -91,3 +91,142 @@ func TestRangeDescriptorUpdateProtoChangedAcrossVersions(t *testing.T) { t.Fatal(err) } } + +func TestValidateReplicationChanges(t *testing.T) { + defer leaktest.AfterTest(t)() + + learnerType := roachpb.LEARNER + desc := &roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1}, + {NodeID: 3, StoreID: 3}, + {NodeID: 4, StoreID: 4, Type: &learnerType}, + }, + } + + // Test Case 1: Add a new replica to another node. + err := validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 2, StoreID: 2}}, + }) + require.NoError(t, err) + + // Test Case 2: Remove a replica from an existing node. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}}, + }) + require.NoError(t, err) + + // Test Case 3: Remove a replica from wrong node. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 2, StoreID: 2}}, + }) + require.Regexp(t, "removing n2,s2 which is not in", err) + + // Test Case 4: Remove a replica from wrong store. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 2}}, + }) + require.Regexp(t, "removing n1,s2 which is not in", err) + + // Test Case 5: Re-balance a replica within a store. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 2}}, + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}}, + }) + require.NoError(t, err) + + // Test Case 6: Re-balance a replica within a store, but attempt remove from + // the wrong one. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 3}}, + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 2}}, + }) + require.Regexp(t, "Expected replica to be removed from", err) + + // Test Case 7: Add replica to same node and store. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}}, + }) + require.Regexp(t, "unable to add replica n1,s1 which is already present", err) + + // Test Case 8: Add replica to same node and different store. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 2}}, + }) + require.Regexp(t, "unable to add replica 2", err) + + // Test Case 9: Try to rebalance a replica on the same node, but also add an extra. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}}, + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 2}}, + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 3}}, + }) + require.Regexp(t, "can only add-remove a replica within a node", err) + + // Test Case 10: Try to add twice to the same node. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 4, StoreID: 4}}, + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 4, StoreID: 5}}, + }) + require.Regexp(t, "refer to n4 twice for change ADD_REPLICA", err) + + // Test Case 11: Try to remove twice to the same node. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}}, + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 2}}, + }) + require.Regexp(t, "refer to n1 twice for change REMOVE_REPLICA", err) + + // Test Case 12: Try to add where there is already a learner. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 4, StoreID: 5}}, + }) + require.Error(t, err) + + // Test Case 13: Add/Remove multiple replicas. + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 2, StoreID: 2}}, + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 5, StoreID: 5}}, + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 6, StoreID: 6}}, + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}}, + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 3, StoreID: 3}}, + }) + require.NoError(t, err) + + // Test Case 14: We are rebalancing within a node and do a remove. + descRebalancing := &roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1}, + {NodeID: 2, StoreID: 2}, + {NodeID: 1, StoreID: 2, Type: &learnerType}, + }, + } + err = validateReplicationChanges(descRebalancing, roachpb.ReplicationChanges{ + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}}, + }) + require.NoError(t, err) + + // Test Case 15: Do an add while rebalancing within a node + err = validateReplicationChanges(descRebalancing, roachpb.ReplicationChanges{ + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 3, StoreID: 3}}, + }) + require.NoError(t, err) + + // Test Case 16: Remove/Add within a node is not allowed, since we expect Add/Remove + err = validateReplicationChanges(desc, roachpb.ReplicationChanges{ + {ChangeType: roachpb.REMOVE_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}}, + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 2}}, + }) + require.Regexp(t, "can only add-remove a replica within a node, but got ", err) + + // Test Case 17: We are rebalancing within a node and have only one replica + descSingle := &roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1}, + }, + } + err = validateReplicationChanges(descSingle, roachpb.ReplicationChanges{ + {ChangeType: roachpb.ADD_REPLICA, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 2}}, + }) + require.NoError(t, err) +} diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index d4582205e695..bc8b433b0fc2 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -6432,6 +6432,12 @@ func TestChangeReplicasDuplicateError(t *testing.T) { defer stopper.Stop(context.Background()) tc.Start(t, stopper) + // We now allow adding a replica to the same node, to support rebalances + // within the same node when replication is 1x, so add another replica to the + // range descriptor to avoid this case. + if _, err := tc.addBogusReplicaToRangeDesc(context.Background()); err != nil { + t.Fatalf("Unexpected error %v", err) + } chgs := roachpb.MakeReplicationChanges(roachpb.ADD_REPLICA, roachpb.ReplicationTarget{ NodeID: tc.store.Ident.NodeID, StoreID: 9999, diff --git a/pkg/kv/kvserver/store_pool.go b/pkg/kv/kvserver/store_pool.go index ccf3e133edc8..4a3b25b79a7e 100644 --- a/pkg/kv/kvserver/store_pool.go +++ b/pkg/kv/kvserver/store_pool.go @@ -757,10 +757,34 @@ func (sp *StorePool) throttle(reason throttleReason, why string, storeID roachpb } } -// getLocalities returns the localities for the provided replicas. +// getLocalitiesByStore returns the localities for the provided replicas. In +// this case we consider the node part of the failure domain and add it to +// the locality data. +func (sp *StorePool) getLocalitiesByStore( + replicas []roachpb.ReplicaDescriptor, +) map[roachpb.StoreID]roachpb.Locality { + sp.localitiesMu.RLock() + defer sp.localitiesMu.RUnlock() + localities := make(map[roachpb.StoreID]roachpb.Locality) + for _, replica := range replicas { + nodeTier := roachpb.Tier{Key: "node", Value: replica.NodeID.String()} + if locality, ok := sp.localitiesMu.nodeLocalities[replica.NodeID]; ok { + localities[replica.StoreID] = locality.locality.AddTier(nodeTier) + } else { + localities[replica.StoreID] = roachpb.Locality{ + Tiers: []roachpb.Tier{nodeTier}, + } + } + } + return localities +} + +// getLocalitiesByNode returns the localities for the provided replicas. In this +// case we only consider the locality by node, where the node itself is not +// part of the failure domain. // TODO(bram): consider storing a full list of all node to node diversity // scores for faster lookups. -func (sp *StorePool) getLocalities( +func (sp *StorePool) getLocalitiesByNode( replicas []roachpb.ReplicaDescriptor, ) map[roachpb.NodeID]roachpb.Locality { sp.localitiesMu.RLock() diff --git a/pkg/kv/kvserver/store_pool_test.go b/pkg/kv/kvserver/store_pool_test.go index adaf281e18c0..ca0ac4cf6258 100644 --- a/pkg/kv/kvserver/store_pool_test.go +++ b/pkg/kv/kvserver/store_pool_test.go @@ -39,6 +39,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/kr/pretty" + "github.com/stretchr/testify/require" ) var uniqueStore = []*roachpb.StoreDescriptor{ @@ -837,7 +838,7 @@ func TestGetLocalities(t *testing.T) { createDescWithLocality := func(tierCount int) roachpb.NodeDescriptor { return roachpb.NodeDescriptor{ NodeID: roachpb.NodeID(tierCount), - Locality: createLocality(tierCount), + Locality: createLocality(tierCount - 1), } } @@ -864,22 +865,30 @@ func TestGetLocalities(t *testing.T) { var existingReplicas []roachpb.ReplicaDescriptor for _, store := range stores { - existingReplicas = append(existingReplicas, roachpb.ReplicaDescriptor{NodeID: store.Node.NodeID}) + existingReplicas = append(existingReplicas, + roachpb.ReplicaDescriptor{ + NodeID: store.Node.NodeID, + StoreID: store.StoreID, + }, + ) } - localities := sp.getLocalities(existingReplicas) + localitiesByStore := sp.getLocalitiesByStore(existingReplicas) + localitiesByNode := sp.getLocalitiesByNode(existingReplicas) for _, store := range stores { + storeID := store.StoreID nodeID := store.Node.NodeID - locality, ok := localities[nodeID] + localityByStore, ok := localitiesByStore[storeID] if !ok { - t.Fatalf("could not find locality for node %d", nodeID) - } - if e, a := int(nodeID), len(locality.Tiers); e != a { - t.Fatalf("for node %d, expected %d tiers, only got %d", nodeID, e, a) - } - if e, a := createLocality(int(nodeID)).String(), sp.getNodeLocalityString(nodeID); e != a { - t.Fatalf("for getNodeLocalityString(%d), expected %q, got %q", nodeID, e, a) + t.Fatalf("could not find locality for store %d", storeID) } + localityByNode, ok := localitiesByNode[nodeID] + require.Truef(t, ok, "could not find locality for node %d", nodeID) + require.Equal(t, int(nodeID), len(localityByStore.Tiers)) + require.Equal(t, localityByStore.Tiers[len(localityByStore.Tiers)-1], + roachpb.Tier{Key: "node", Value: nodeID.String()}) + require.Equal(t, int(nodeID)-1, len(localityByNode.Tiers)) + require.Equal(t, createLocality(int(nodeID)-1).String(), sp.getNodeLocalityString(nodeID)) } } diff --git a/pkg/kv/kvserver/store_rebalancer.go b/pkg/kv/kvserver/store_rebalancer.go index 0fc25b15b30b..1219d8c1024b 100644 --- a/pkg/kv/kvserver/store_rebalancer.go +++ b/pkg/kv/kvserver/store_rebalancer.go @@ -509,7 +509,7 @@ func (sr *StoreRebalancer) chooseReplicaToRebalance( // Check the range's existing diversity score, since we want to ensure we // don't hurt locality diversity just to improve QPS. curDiversity := rangeDiversityScore( - sr.rq.allocator.storePool.getLocalities(currentReplicas)) + sr.rq.allocator.storePool.getLocalitiesByStore(currentReplicas)) // Check the existing replicas, keeping around those that aren't overloaded. for i := range currentReplicas { @@ -579,7 +579,7 @@ func (sr *StoreRebalancer) chooseReplicaToRebalance( desc.RangeID, len(targets), desiredReplicas) continue } - newDiversity := rangeDiversityScore(sr.rq.allocator.storePool.getLocalities(targetReplicas)) + newDiversity := rangeDiversityScore(sr.rq.allocator.storePool.getLocalitiesByStore(targetReplicas)) if newDiversity < curDiversity { log.VEventf(ctx, 3, "new diversity %.2f for r%d worse than current diversity %.2f; not rebalancing", diff --git a/pkg/roachpb/metadata.go b/pkg/roachpb/metadata.go index 638de8524814..167d0ad9e2e0 100644 --- a/pkg/roachpb/metadata.go +++ b/pkg/roachpb/metadata.go @@ -659,3 +659,21 @@ var DefaultLocationInformation = []struct { Longitude: "3.81886", }, } + +// Locality returns the locality of the Store, which is the Locality of the node +// plus an extra tier for the node itself. +func (s StoreDescriptor) Locality() Locality { + return s.Node.Locality.AddTier( + Tier{Key: "node", Value: s.Node.NodeID.String()}) +} + +// AddTier creates a new Locality with a Tier at the end. +func (l Locality) AddTier(tier Tier) Locality { + if len(l.Tiers) > 0 { + tiers := make([]Tier, len(l.Tiers), len(l.Tiers)+1) + copy(tiers, l.Tiers) + tiers = append(tiers, tier) + return Locality{Tiers: tiers} + } + return Locality{Tiers: []Tier{tier}} +} diff --git a/pkg/roachpb/metadata_test.go b/pkg/roachpb/metadata_test.go index 76f12b182f95..dc9300fac542 100644 --- a/pkg/roachpb/metadata_test.go +++ b/pkg/roachpb/metadata_test.go @@ -268,3 +268,15 @@ func TestDiversityScore(t *testing.T) { }) } } + +func TestAddTier(t *testing.T) { + l1 := Locality{} + l2 := Locality{ + Tiers: []Tier{{Key: "foo", Value: "bar"}}, + } + l3 := Locality{ + Tiers: []Tier{{Key: "foo", Value: "bar"}, {Key: "bar", Value: "foo"}}, + } + require.Equal(t, l2, l1.AddTier(Tier{Key: "foo", Value: "bar"})) + require.Equal(t, l3, l2.AddTier(Tier{Key: "bar", Value: "foo"})) +} diff --git a/pkg/server/status_test.go b/pkg/server/status_test.go index d52ab46d5f0b..fb5b1b4a3b61 100644 --- a/pkg/server/status_test.go +++ b/pkg/server/status_test.go @@ -329,6 +329,13 @@ func startServer(t *testing.T) *TestServer { base.DefaultTestStoreSpec, base.DefaultTestStoreSpec, }, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + // Now that we allow same node rebalances, disable it in these tests, + // as they dont expect replicas to move. + DisableReplicaRebalancing: true, + }, + }, }) ts := tsI.(*TestServer)