Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

pdutil: use temp pause config #592

Merged
merged 25 commits into from
Nov 16, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 123 additions & 100 deletions pkg/pdutil/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc"

"github.com/coreos/go-semver/semver"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/util/codec"
Expand All @@ -37,6 +38,12 @@ const (
pauseTimeout = 5 * time.Minute
)

var (
// in v4.0.8 version we can use pause configs
// see https://github.com/tikv/pd/pull/3088
pauseConfigVersion = semver.New("4.0.7")
)

// clusterConfig represents a set of scheduler whose config have been modified
// along with their original config.
type clusterConfig struct {
Expand All @@ -61,15 +68,14 @@ var (
"shuffle-region-scheduler": {},
"shuffle-hot-region-scheduler": {},
}
// TODO remove this, see https://github.com/pingcap/br/pull/555#discussion_r509855972
pdRegionMergeCfg = []string{
"max-merge-region-keys",
"max-merge-region-size",
}
pdScheduleLimitCfg = []string{
"leader-schedule-limit",
"region-schedule-limit",
"max-snapshot-count",
expectPDCfg = map[string]interface{}{
"max-merge-region-keys": 0,
"max-merge-region-size": 0,
// TODO remove this schedule-limits, see https://github.com/pingcap/br/pull/555#discussion_r509855972
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as the rationale keeping the two -limits in #555 was

I think we could leave a TODO here and try to merge this firstly... I'm afraid that if any error occurs during editing, we cannot merge this in the current unstable CI environment...

do we still need to keep them here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some misunderstanding.

  1. leader-schedule-limit and region-schedule-limit does need in previous and current version to reduce the schedule. I'll update the comment later.
  2. Before Persist temporary setting to etcd  tikv/pd#3131 leader-schedule-limit and region-schedule-limit doesn't support ttl second. I think we should wait PD merge 3131 then test.

I want to make sure this PR is compatible with the old BR version and easy to support set pd ttl config. I'll file another PR to update config when 3131 merged.

"leader-schedule-limit": 1,
"region-schedule-limit": 1,
"max-snapshot-count": 1,
"enable-location-replacement": false,
}

// DefaultPDCfg find by https://github.com/tikv/pd/blob/master/conf/config.toml.
Expand Down Expand Up @@ -122,6 +128,7 @@ type PdController struct {
addrs []string
cli *http.Client
pdClient pd.Client
version *semver.Version

// control the pause schedulers goroutine
schedulerPauseCh chan struct{}
Expand All @@ -144,6 +151,7 @@ func NewPdController(
addrs := strings.Split(pdAddrs, ",")
processedAddrs := make([]string, 0, len(addrs))
var failure error
var versionBytes []byte
for _, addr := range addrs {
if addr != "" && !strings.HasPrefix("http", addr) {
if tlsConf != nil {
Expand All @@ -153,15 +161,21 @@ func NewPdController(
}
}
processedAddrs = append(processedAddrs, addr)
_, failure = pdRequest(ctx, addr, clusterVersionPrefix, cli, http.MethodGet, nil)
versionBytes, failure = pdRequest(ctx, addr, clusterVersionPrefix, cli, http.MethodGet, nil)
if failure == nil {
break
}
}
if failure != nil {
return nil, errors.Annotatef(failure, "pd address (%s) not available, please check network", pdAddrs)
return nil, errors.Annotatef(berrors.ErrPDUpdateFailed, "pd address (%s) not available, please check network", pdAddrs)
}

version, err := semver.NewVersion(string(versionBytes))
if err != nil {
log.Warn("fail back to old version",
zap.Binary("version", versionBytes), zap.Error(err))
3pointer marked this conversation as resolved.
Show resolved Hide resolved
version = pauseConfigVersion
kennytm marked this conversation as resolved.
Show resolved Hide resolved
}
maxCallMsgSize := []grpc.DialOption{
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(maxMsgSize)),
Expand All @@ -180,12 +194,17 @@ func NewPdController(
addrs: processedAddrs,
cli: cli,
pdClient: pdClient,
version: version,
// We should make a buffered channel here otherwise when context canceled,
// gracefully shutdown will stick at resuming schedulers.
schedulerPauseCh: make(chan struct{}, 1),
}, nil
}

func (p *PdController) isPauseConfigEnabled() bool {
return p.version.Compare(*pauseConfigVersion) > 0
}

// SetHTTP set pd addrs and cli for test.
func (p *PdController) SetHTTP(addrs []string, cli *http.Client) {
p.addrs = addrs
Expand Down Expand Up @@ -257,19 +276,17 @@ func (p *PdController) getRegionCountWith(

// PauseSchedulers remove pd scheduler temporarily.
func (p *PdController) PauseSchedulers(ctx context.Context, schedulers []string) ([]string, error) {
return p.pauseSchedulersWith(ctx, schedulers, pdRequest)
return p.pauseSchedulersAndConfigWith(ctx, schedulers, nil, pdRequest)
}

func (p *PdController) pauseSchedulersWith(ctx context.Context, schedulers []string, post pdHTTPRequest) ([]string, error) {
removedSchedulers := make([]string, 0, len(schedulers))
func (p *PdController) doPauseSchedulers(ctx context.Context, schedulers []string, post pdHTTPRequest) ([]string, error) {
// pause this scheduler with 300 seconds
body, err := json.Marshal(pauseSchedulerBody{Delay: int64(pauseTimeout)})
if err != nil {
return nil, err
}

// first pause this scheduler, if the first time failed. we should return the error
// so put first time out of for loop. and in for loop we could ignore other failed pause.
// PauseSchedulers remove pd scheduler temporarily.
removedSchedulers := make([]string, 0, len(schedulers))
for _, scheduler := range schedulers {
prefix := fmt.Sprintf("%s/%s", schedulerPrefix, scheduler)
for _, addr := range p.addrs {
Expand All @@ -280,12 +297,34 @@ func (p *PdController) pauseSchedulersWith(ctx context.Context, schedulers []str
}
}
if err != nil {
log.Error("failed to pause scheduler at beginning",
zap.Strings("name", schedulers), zap.Error(err))
return nil, err
return removedSchedulers, err
}
}
return removedSchedulers, nil
}

func (p *PdController) pauseSchedulersAndConfigWith(
ctx context.Context, schedulers []string,
schedulerCfg map[string]interface{}, post pdHTTPRequest,
) ([]string, error) {
// first pause this scheduler, if the first time failed. we should return the error
// so put first time out of for loop. and in for loop we could ignore other failed pause.
removedSchedulers, err := p.doPauseSchedulers(ctx, schedulers, post)
if err != nil {
log.Error("failed to pause scheduler at beginning",
zap.Strings("name", schedulers), zap.Error(err))
return nil, err
}
log.Info("pause scheduler successful at beginning", zap.Strings("name", schedulers))
if schedulerCfg != nil {
err = p.doPauseConfigs(ctx, schedulerCfg)
if err != nil {
log.Error("failed to pause config at beginning",
zap.Any("cfg", schedulerCfg), zap.Error(err))
return nil, err
}
log.Info("pause configs successful at beginning", zap.Any("cfg", schedulerCfg))
}

go func() {
tick := time.NewTicker(pauseTimeout / 3)
Expand All @@ -296,22 +335,19 @@ func (p *PdController) pauseSchedulersWith(ctx context.Context, schedulers []str
case <-ctx.Done():
return
case <-tick.C:
for _, scheduler := range schedulers {
prefix := fmt.Sprintf("%s/%s", schedulerPrefix, scheduler)
for _, addr := range p.addrs {
_, err = post(ctx, addr, prefix, p.cli, http.MethodPost, bytes.NewBuffer(body))
if err == nil {
break
}
}
if err == nil {
log.Info("pause scheduler", zap.String("name", scheduler))
} else {
log.Warn("pause scheduler failed, ignore it and wait next time pause", zap.Error(err))
_, err := p.doPauseSchedulers(ctx, schedulers, post)
if err != nil {
log.Warn("pause scheduler failed, ignore it and wait next time pause", zap.Error(err))
}
if schedulerCfg != nil {
err = p.doPauseConfigs(ctx, schedulerCfg)
if err != nil {
log.Warn("pause configs failed, ignore it and wait next time pause", zap.Error(err))
}
}
log.Info("pause scheduler(configs)", zap.Strings("name", removedSchedulers))
case <-p.schedulerPauseCh:
log.Info("exit pause scheduler successful")
log.Info("exit pause scheduler and configs successful")
return
}
}
Expand Down Expand Up @@ -400,14 +436,18 @@ func (p *PdController) GetPDScheduleConfig(

// UpdatePDScheduleConfig updates PD schedule config value associated with the key.
func (p *PdController) UpdatePDScheduleConfig(
ctx context.Context, cfg map[string]interface{},
ctx context.Context, cfg map[string]interface{}, prefixs ...string,
) error {
prefix := scheduleConfigPrefix
if len(prefixs) == 0 {
prefix = prefixs[0]
}
for _, addr := range p.addrs {
reqData, err := json.Marshal(cfg)
if err != nil {
return err
}
_, e := pdRequest(ctx, addr, scheduleConfigPrefix,
_, e := pdRequest(ctx, addr, prefix,
p.cli, http.MethodPost, bytes.NewBuffer(reqData))
if e == nil {
return nil
Expand All @@ -417,13 +457,25 @@ func (p *PdController) UpdatePDScheduleConfig(
return errors.Annotate(berrors.ErrPDUpdateFailed, "failed to update PD schedule config")
}

func (p *PdController) pauseSchedulersAndConfig(
ctx context.Context, schedulers []string, cfg map[string]interface{},
) ([]string, error) {
return p.pauseSchedulersAndConfigWith(ctx, schedulers, cfg, pdRequest)
}

func (p *PdController) doPauseConfigs(ctx context.Context, cfg map[string]interface{}) error {
// pause this scheduler with 300 seconds
prefix := fmt.Sprintf("%s?ttlSecond=%d", schedulerPrefix, 300)
3pointer marked this conversation as resolved.
Show resolved Hide resolved
return p.UpdatePDScheduleConfig(ctx, cfg, prefix)
}

func restoreSchedulers(ctx context.Context, pd *PdController, clusterCfg clusterConfig) error {
if err := pd.ResumeSchedulers(ctx, clusterCfg.scheduler); err != nil {
return errors.Annotate(err, "fail to add PD schedulers")
}
log.Info("restoring config", zap.Any("config", clusterCfg.scheduleCfg))
mergeCfg := make(map[string]interface{})
for _, cfgKey := range pdRegionMergeCfg {
for cfgKey := range expectPDCfg {
value := clusterCfg.scheduleCfg[cfgKey]
if value == nil {
// Ignore non-exist config.
Expand All @@ -434,25 +486,6 @@ func restoreSchedulers(ctx context.Context, pd *PdController, clusterCfg cluster
if err := pd.UpdatePDScheduleConfig(ctx, mergeCfg); err != nil {
return errors.Annotate(err, "fail to update PD merge config")
}

scheduleLimitCfg := make(map[string]interface{})
for _, cfgKey := range pdScheduleLimitCfg {
value := clusterCfg.scheduleCfg[cfgKey]
if value == nil {
// Ignore non-exist config.
continue
}
scheduleLimitCfg[cfgKey] = value
}
if err := pd.UpdatePDScheduleConfig(ctx, scheduleLimitCfg); err != nil {
return errors.Annotate(err, "fail to update PD schedule config")
}
if locationPlacement, ok := clusterCfg.scheduleCfg["enable-location-replacement"]; ok {
log.Debug("restoring config enable-location-replacement", zap.Any("enable-location-placement", locationPlacement))
if err := pd.UpdatePDScheduleConfig(ctx, map[string]interface{}{"enable-location-replacement": locationPlacement}); err != nil {
return err
}
}
return nil
}

Expand All @@ -466,24 +499,6 @@ func (p *PdController) makeUndoFunctionByConfig(config clusterConfig) utils.Undo
// RemoveSchedulers removes the schedulers that may slow down BR speed.
func (p *PdController) RemoveSchedulers(ctx context.Context) (undo utils.UndoFunc, err error) {
undo = utils.Nop

// Remove default PD scheduler that may affect restore process.
existSchedulers, err := p.ListSchedulers(ctx)
if err != nil {
return
}
needRemoveSchedulers := make([]string, 0, len(existSchedulers))
for _, s := range existSchedulers {
if _, ok := Schedulers[s]; ok {
needRemoveSchedulers = append(needRemoveSchedulers, s)
}
}
removedSchedulers, err := p.PauseSchedulers(ctx, needRemoveSchedulers)
if err != nil {
return
}
undo = p.makeUndoFunctionByConfig(clusterConfig{scheduler: removedSchedulers})

stores, err := p.pdClient.GetAllStores(ctx)
if err != nil {
return
Expand All @@ -492,43 +507,51 @@ func (p *PdController) RemoveSchedulers(ctx context.Context) (undo utils.UndoFun
if err != nil {
return
}

undo = p.makeUndoFunctionByConfig(clusterConfig{scheduler: removedSchedulers, scheduleCfg: scheduleCfg})
log.Debug("saved PD config", zap.Any("config", scheduleCfg))

disableMergeCfg := make(map[string]interface{})
for _, cfgKey := range pdRegionMergeCfg {
value := scheduleCfg[cfgKey]
if value == nil {
disablePDCfg := make(map[string]interface{})
for cfgKey, cfgVal := range expectPDCfg {
value, ok := scheduleCfg[cfgKey]
if !ok {
// Ignore non-exist config.
continue
}
// Disable region merge by setting config to 0.
disableMergeCfg[cfgKey] = 0
switch v := cfgVal.(type) {
case bool:
disablePDCfg[cfgKey] = false
case int:
limit := int(value.(float64))
disablePDCfg[cfgKey] = int(math.Min(40, float64(limit*len(stores)))) * v
}
}
err = p.UpdatePDScheduleConfig(ctx, disableMergeCfg)
undo = p.makeUndoFunctionByConfig(clusterConfig{scheduleCfg: scheduleCfg})
log.Debug("saved PD config", zap.Any("config", scheduleCfg))

// Remove default PD scheduler that may affect restore process.
existSchedulers, err := p.ListSchedulers(ctx)
if err != nil {
return
}

scheduleLimitCfg := make(map[string]interface{})
for _, cfgKey := range pdScheduleLimitCfg {
value := scheduleCfg[cfgKey]
if value == nil {
// Ignore non-exist config.
continue
needRemoveSchedulers := make([]string, 0, len(existSchedulers))
for _, s := range existSchedulers {
if _, ok := Schedulers[s]; ok {
needRemoveSchedulers = append(needRemoveSchedulers, s)
}

// Speed update PD scheduler by enlarging scheduling limits.
// Multiply limits by store count but no more than 40.
// Larger limit may make cluster unstable.
limit := int(value.(float64))
scheduleLimitCfg[cfgKey] = math.Min(40, float64(limit*len(stores)))
}
if err := p.UpdatePDScheduleConfig(ctx, scheduleLimitCfg); err != nil {
return undo, err

var removedSchedulers []string
if p.isPauseConfigEnabled() {
// after 4.0.8 we can set these config with TTL
removedSchedulers, err = p.pauseSchedulersAndConfig(ctx, needRemoveSchedulers, disablePDCfg)
} else {
// adapt to earlier version (before 4.0.8) of pd cluster
// which doesn't have temporary config setting.
err = p.UpdatePDScheduleConfig(ctx, disablePDCfg)
if err != nil {
return
}
removedSchedulers, err = p.PauseSchedulers(ctx, needRemoveSchedulers)
}
return undo, p.UpdatePDScheduleConfig(ctx, map[string]interface{}{"enable-location-replacement": "false"})
undo = p.makeUndoFunctionByConfig(clusterConfig{scheduler: removedSchedulers, scheduleCfg: scheduleCfg})
return undo, err
}

// Close close the connection to pd.
Expand Down
4 changes: 2 additions & 2 deletions pkg/pdutil/pd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (s *testPDControllerSuite) TestScheduler(c *C) {
}
schedulerPauseCh := make(chan struct{})
pdController := &PdController{addrs: []string{"", ""}, schedulerPauseCh: schedulerPauseCh}
_, err := pdController.pauseSchedulersWith(ctx, []string{scheduler}, mock)
_, err := pdController.pauseSchedulersAndConfigWith(ctx, []string{scheduler}, nil, mock)
c.Assert(err, ErrorMatches, "failed")

go func() {
Expand All @@ -53,7 +53,7 @@ func (s *testPDControllerSuite) TestScheduler(c *C) {
mock = func(context.Context, string, string, *http.Client, string, io.Reader) ([]byte, error) {
return []byte(`["` + scheduler + `"]`), nil
}
_, err = pdController.pauseSchedulersWith(ctx, []string{scheduler}, mock)
_, err = pdController.pauseSchedulersAndConfigWith(ctx, []string{scheduler}, nil, mock)
c.Assert(err, IsNil)

go func() {
Expand Down