Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: leader selection with tikv store balance score during the ebs data restore #39402

Merged
merged 19 commits into from
Nov 29, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions br/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ var (
ErrStorageInvalidPermission = errors.Normalize("external storage permission", errors.RFCCodeText("BR:ExternalStorage:ErrStorageInvalidPermission"))

// Snapshot restore
ErrRestoreTotalKVMismatch = errors.Normalize("restore total tikvs mismatch", errors.RFCCodeText("BR:EBS:ErrRestoreTotalKVMismatch"))
ErrRestoreInvalidPeer = errors.Normalize("restore met a invalid peer", errors.RFCCodeText("BR:EBS:ErrRestoreInvalidPeer"))
ErrRestoreTotalKVMismatch = errors.Normalize("restore total tikvs mismatch", errors.RFCCodeText("BR:EBS:ErrRestoreTotalKVMismatch"))
ErrRestoreInvalidPeer = errors.Normalize("restore met a invalid peer", errors.RFCCodeText("BR:EBS:ErrRestoreInvalidPeer"))
ErrRestoreRegionWithoutPeer = errors.Normalize("restore met a region without any peer", errors.RFCCodeText("BR:EBS:ErrRestoreRegionWithoutPeer"))

// Errors reported from TiKV.
ErrKVStorage = errors.Normalize("tikv storage occur I/O error", errors.RFCCodeText("BR:KV:ErrKVStorage"))
Expand Down
23 changes: 14 additions & 9 deletions br/pkg/restore/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ type RecoverRegion struct {
// 2. build a leader list for all region during the tikv startup
// 3. get max allocate id
func (recovery *Recovery) MakeRecoveryPlan() error {
storeBalanceScore := make(map[uint64]int, len(recovery.allStores))
// Group region peer info by region id. find the max allocateId
// region [id] [peer[0-n]]
var regions = make(map[uint64][]*RecoverRegion, 0)
Expand Down Expand Up @@ -410,16 +411,20 @@ func (recovery *Recovery) MakeRecoveryPlan() error {
}
} else {
// Generate normal commands.
log.Debug("detected valid peer", zap.Uint64("region id", regionId))
for i, peer := range peers {
log.Debug("make plan", zap.Uint64("store id", peer.StoreId), zap.Uint64("region id", peer.RegionId))
plan := &recovpb.RecoverRegionRequest{RegionId: peer.RegionId, AsLeader: i == 0}
// sorted by log term -> last index -> commit index in a region
if plan.AsLeader {
log.Debug("as leader peer", zap.Uint64("store id", peer.StoreId), zap.Uint64("region id", peer.RegionId))
recovery.RecoveryPlan[peer.StoreId] = append(recovery.RecoveryPlan[peer.StoreId], plan)
}
log.Debug("detected valid region", zap.Uint64("region id", regionId))
// calc the leader candidates
leaderCandidates, err := LeaderCandidates(peers)
if err != nil {
log.Warn("region without peer", zap.Uint64("region id", regionId))
return errors.Trace(err)
}

// select the leader base on tikv storeBalanceScore
leader := SelectRegionLeader(storeBalanceScore, leaderCandidates)
log.Debug("as leader peer", zap.Uint64("store id", leader.StoreId), zap.Uint64("region id", leader.RegionId))
plan := &recovpb.RecoverRegionRequest{RegionId: leader.RegionId, AsLeader: true}
recovery.RecoveryPlan[leader.StoreId] = append(recovery.RecoveryPlan[leader.StoreId], plan)
storeBalanceScore[leader.StoreId] += 1
}
}
return nil
Expand Down
40 changes: 40 additions & 0 deletions br/pkg/restore/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,3 +750,43 @@ func CheckConsistencyAndValidPeer(regionInfos []*RecoverRegionInfo) (map[uint64]
}
return validPeers, nil
}

// in cloud, since iops and bandwidth limitation, write operator in raft is slow, so raft state (logterm, lastlog, commitlog...) are the same among the peers
// LeaderCandidates select all peers can be select as a leader during the restore
func LeaderCandidates(peers []*RecoverRegion) ([]*RecoverRegion, error) {
if peers == nil {
return nil, errors.Annotatef(berrors.ErrRestoreRegionWithoutPeer,
"invalid region range")
}
candidates := make([]*RecoverRegion, 0, len(peers))
// by default, the peers[0] to be assign as a leader, since peers already sorted by leader selection rule
leader := peers[0]
candidates = append(candidates, leader)
for _, peer := range peers[1:] {
// qualificated candidate is leader.logterm = candidate.logterm && leader.lastindex = candidate.lastindex && && leader.commitindex = candidate.commitindex
if peer.LastLogTerm == leader.LastLogTerm && peer.LastIndex == leader.LastIndex && peer.CommitIndex == leader.CommitIndex {
log.Debug("leader candidate", zap.Uint64("store id", peer.StoreId), zap.Uint64("region id", peer.RegionId), zap.Uint64("peer id", peer.PeerId))
candidates = append(candidates, peer)
}
}
return candidates, nil
}

// for region A, has candidate leader x, y, z
// peer x on store 1 with storeBalanceScore 3
// peer y on store 3 with storeBalanceScore 2
// peer z on store 4 with storeBalanceScore 1
// result: peer z will be select as leader on store 4
func SelectRegionLeader(storeBalanceScore map[uint64]int, peers []*RecoverRegion) *RecoverRegion {
// by default, the peers[0] to be assign as a leader
leader := peers[0]
minLeaderStore := storeBalanceScore[leader.StoreId]
for _, peer := range peers {
fengou1 marked this conversation as resolved.
Show resolved Hide resolved
log.Debug("leader candidate", zap.Int("score", storeBalanceScore[peer.StoreId]), zap.Int("min-score", minLeaderStore), zap.Uint64("store id", peer.StoreId), zap.Uint64("region id", peer.RegionId), zap.Uint64("peer id", peer.PeerId))
if storeBalanceScore[peer.StoreId] < minLeaderStore {
minLeaderStore = storeBalanceScore[peer.StoreId]
leader = peer
}
}
return leader
}
49 changes: 49 additions & 0 deletions br/pkg/restore/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,3 +460,52 @@ func TestCheckConsistencyAndValidPeer(t *testing.T) {
require.Error(t, err)
require.Regexp(t, ".*invalid restore range.*", err.Error())
}

func TestLeaderCandidates(t *testing.T) {
//key space is continuous
validPeer1 := newPeerMeta(9, 11, 2, []byte(""), []byte("bb"), 2, 1, 0, 0, false)
validPeer2 := newPeerMeta(19, 22, 3, []byte("bb"), []byte("cc"), 2, 1, 0, 1, false)
validPeer3 := newPeerMeta(29, 30, 1, []byte("cc"), []byte(""), 2, 1, 0, 2, false)

peers := []*restore.RecoverRegion{
validPeer1,
validPeer2,
validPeer3,
}

candidates, err := restore.LeaderCandidates(peers)
require.NoError(t, err)
require.Equal(t, 3, len(candidates))
}

func TestSelectRegionLeader(t *testing.T) {
validPeer1 := newPeerMeta(9, 11, 2, []byte(""), []byte("bb"), 2, 1, 0, 0, false)
validPeer2 := newPeerMeta(19, 22, 3, []byte("bb"), []byte("cc"), 2, 1, 0, 1, false)
validPeer3 := newPeerMeta(29, 30, 1, []byte("cc"), []byte(""), 2, 1, 0, 2, false)

peers := []*restore.RecoverRegion{
validPeer1,
validPeer2,
validPeer3,
}
// init store banlance score all is 0
storeBalanceScore := make(map[uint64]int, len(peers))
leader := restore.SelectRegionLeader(storeBalanceScore, peers)
require.Equal(t, validPeer1, leader)

// change store banlance store
storeBalanceScore[2] = 3
storeBalanceScore[3] = 2
storeBalanceScore[1] = 1
leader = restore.SelectRegionLeader(storeBalanceScore, peers)
require.Equal(t, validPeer3, leader)

// one peer
peer := []*restore.RecoverRegion{
validPeer3,
}
// init store banlance score all is 0
storeScore := make(map[uint64]int, len(peer))
leader = restore.SelectRegionLeader(storeScore, peer)
require.Equal(t, validPeer3, leader)
}