Skip to content

Commit

Permalink
validate: manual reset pd config back (pingcap#530)
Browse files Browse the repository at this point in the history
* validate: manual reset pd config back

* add integration test for new command

* fix test

* address comment

* address commment

Co-authored-by: ti-srebot <66930949+ti-srebot@users.noreply.github.com>
  • Loading branch information
3pointer and ti-srebot committed Oct 16, 2020
1 parent 59368ac commit eb477c0
Show file tree
Hide file tree
Showing 9 changed files with 390 additions and 9 deletions.
43 changes: 43 additions & 0 deletions cmd/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"strings"

"github.com/gogo/protobuf/proto"
"github.com/pingcap/errors"
Expand All @@ -20,6 +21,7 @@ import (
"github.com/tikv/pd/pkg/mock/mockid"
"go.uber.org/zap"

"github.com/pingcap/br/pkg/conn"
"github.com/pingcap/br/pkg/restore"
"github.com/pingcap/br/pkg/rtree"
"github.com/pingcap/br/pkg/task"
Expand All @@ -45,6 +47,7 @@ func NewValidateCommand() *cobra.Command {
meta.AddCommand(newBackupMetaCommand())
meta.AddCommand(decodeBackupMetaCommand())
meta.AddCommand(encodeBackupMetaCommand())
meta.AddCommand(setPDConfigCommand())
meta.Hidden = true

return meta
Expand Down Expand Up @@ -323,3 +326,43 @@ func encodeBackupMetaCommand() *cobra.Command {
}
return encodeBackupMetaCmd
}

func setPDConfigCommand() *cobra.Command {
pdConfigCmd := &cobra.Command{
Use: "reset-pd-config-as-default",
Short: "reset pd scheduler and config adjusted by BR to default value",
RunE: func(cmd *cobra.Command, args []string) error {
ctx, cancel := context.WithCancel(GetDefaultContext())
defer cancel()

var cfg task.Config
if err := cfg.ParseFromFlags(cmd.Flags()); err != nil {
return err
}

mgr, err := task.NewMgr(ctx, tidbGlue, cfg.PD, cfg.TLS, cfg.CheckRequirements)
if err != nil {
return err
}
defer mgr.Close()

for scheduler := range conn.Schedulers {
if strings.HasPrefix(scheduler, "balance") {
err := mgr.AddScheduler(ctx, scheduler)
if err != nil {
return err
}
log.Info("add pd schedulers succeed",
zap.String("schedulers", scheduler))
}
}

if err := mgr.UpdatePDScheduleConfig(ctx, conn.DefaultPDCfg); err != nil {
return errors.Annotate(err, "fail to update PD merge config")
}
log.Info("add pd configs succeed", zap.Any("config", conn.DefaultPDCfg))
return nil
},
}
return pdConfigCmd
}
179 changes: 179 additions & 0 deletions pkg/conn/scheduler_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.

package conn

import (
"context"
"math"

"github.com/pingcap/errors"

"github.com/pingcap/br/pkg/utils"
)

// clusterConfig represents a set of scheduler whose config have been modified
// along with their original config.
type clusterConfig struct {
// Enable PD schedulers before restore
scheduler []string
// Original scheudle configuration
scheduleCfg map[string]interface{}
}

var (
// Schedulers represent region/leader schedulers which can impact on performance.
Schedulers = map[string]struct{}{
"balance-leader-scheduler": {},
"balance-hot-region-scheduler": {},
"balance-region-scheduler": {},

"shuffle-leader-scheduler": {},
"shuffle-region-scheduler": {},
"shuffle-hot-region-scheduler": {},
}

pdRegionMergeCfg = []string{
"max-merge-region-keys",
"max-merge-region-size",
}
pdScheduleLimitCfg = []string{
"leader-schedule-limit",
"region-schedule-limit",
"max-snapshot-count",
}

// DefaultPDCfg find by https://github.com/tikv/pd/blob/master/conf/config.toml.
DefaultPDCfg = map[string]interface{}{
"max-merge-region-keys": 200000,
"max-merge-region-size": 20,
"leader-schedule-limit": 4,
"region-schedule-limit": 2048,
"max-snapshot-count": 3,
}
)

func addPDLeaderScheduler(ctx context.Context, mgr *Mgr, removedSchedulers []string) error {
for _, scheduler := range removedSchedulers {
err := mgr.AddScheduler(ctx, scheduler)
if err != nil {
return err
}
}
return nil
}

func restoreSchedulers(ctx context.Context, mgr *Mgr, clusterCfg clusterConfig) error {
if err := addPDLeaderScheduler(ctx, mgr, clusterCfg.scheduler); err != nil {
return errors.Annotate(err, "fail to add PD schedulers")
}
mergeCfg := make(map[string]interface{})
for _, cfgKey := range pdRegionMergeCfg {
value := clusterCfg.scheduleCfg[cfgKey]
if value == nil {
// Ignore non-exist config.
continue
}
mergeCfg[cfgKey] = value
}
if err := mgr.UpdatePDScheduleConfig(ctx, mergeCfg); err != nil {
return errors.Annotate(err, "fail to update PD merge config")
}

scheduleLimitCfg := make(map[string]interface{})
for _, cfgKey := range pdScheduleLimitCfg {
value := clusterCfg.scheduleCfg[cfgKey]
if value == nil {
// Ignore non-exist config.
continue
}
scheduleLimitCfg[cfgKey] = value
}
if err := mgr.UpdatePDScheduleConfig(ctx, scheduleLimitCfg); err != nil {
return errors.Annotate(err, "fail to update PD schedule config")
}
return nil
}

func (mgr *Mgr) makeUndoFunctionByConfig(config clusterConfig) utils.UndoFunc {
restore := func(ctx context.Context) error {
return restoreSchedulers(ctx, mgr, config)
}
return restore
}

// RemoveSchedulers removes the schedulers that may slow down BR speed.
func (mgr *Mgr) RemoveSchedulers(ctx context.Context) (undo utils.UndoFunc, err error) {
undo = utils.Nop

// Remove default PD scheduler that may affect restore process.
existSchedulers, err := mgr.ListSchedulers(ctx)
if err != nil {
return
}
needRemoveSchedulers := make([]string, 0, len(existSchedulers))
for _, s := range existSchedulers {
if _, ok := Schedulers[s]; ok {
needRemoveSchedulers = append(needRemoveSchedulers, s)
}
}
scheduler, err := removePDLeaderScheduler(ctx, mgr, needRemoveSchedulers)
if err != nil {
return
}

undo = mgr.makeUndoFunctionByConfig(clusterConfig{scheduler: scheduler})

stores, err := mgr.GetPDClient().GetAllStores(ctx)
if err != nil {
return
}
scheduleCfg, err := mgr.GetPDScheduleConfig(ctx)
if err != nil {
return
}

undo = mgr.makeUndoFunctionByConfig(clusterConfig{scheduler: scheduler, scheduleCfg: scheduleCfg})

disableMergeCfg := make(map[string]interface{})
for _, cfgKey := range pdRegionMergeCfg {
value := scheduleCfg[cfgKey]
if value == nil {
// Ignore non-exist config.
continue
}
// Disable region merge by setting config to 0.
disableMergeCfg[cfgKey] = 0
}
err = mgr.UpdatePDScheduleConfig(ctx, disableMergeCfg)
if err != nil {
return
}

scheduleLimitCfg := make(map[string]interface{})
for _, cfgKey := range pdScheduleLimitCfg {
value := scheduleCfg[cfgKey]
if value == nil {
// Ignore non-exist config.
continue
}

// Speed update PD scheduler by enlarging scheduling limits.
// Multiply limits by store count but no more than 40.
// Larger limit may make cluster unstable.
limit := int(value.(float64))
scheduleLimitCfg[cfgKey] = math.Min(40, float64(limit*len(stores)))
}
return undo, mgr.UpdatePDScheduleConfig(ctx, scheduleLimitCfg)
}

func removePDLeaderScheduler(ctx context.Context, mgr *Mgr, existSchedulers []string) ([]string, error) {
removedSchedulers := make([]string, 0, len(existSchedulers))
for _, scheduler := range existSchedulers {
err := mgr.RemoveScheduler(ctx, scheduler)
if err != nil {
return nil, err
}
removedSchedulers = append(removedSchedulers, scheduler)
}
return removedSchedulers, nil
}
2 changes: 1 addition & 1 deletion pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
if err != nil {
return err
}
mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS, cfg.CheckRequirements)
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, cfg.CheckRequirements)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/task/backup_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf
if err != nil {
return err
}
mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS, cfg.CheckRequirements)
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, cfg.CheckRequirements)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,8 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error {
return cfg.TLS.ParseFromFlags(flags)
}

// newMgr creates a new mgr at the given PD address.
func newMgr(ctx context.Context,
// NewMgr creates a new mgr at the given PD address.
func NewMgr(ctx context.Context,
g glue.Glue, pds []string,
tlsConfig TLSConfig,
checkRequirements bool) (*conn.Mgr, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
ctx, cancel := context.WithCancel(c)
defer cancel()

mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS, cfg.CheckRequirements)
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, cfg.CheckRequirements)
if err != nil {
return err
}
Expand Down Expand Up @@ -369,7 +369,7 @@ func RunRestoreTiflashReplica(c context.Context, g glue.Glue, cmdName string, cf
ctx, cancel := context.WithCancel(c)
defer cancel()

mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS, cfg.CheckRequirements)
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, cfg.CheckRequirements)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit eb477c0

Please sign in to comment.