From d485c505172d0688aefc9519c9249d41fa0ba19b Mon Sep 17 00:00:00 2001 From: luancheng Date: Fri, 6 Nov 2020 19:37:37 +0800 Subject: [PATCH 01/23] pdutil: use temp pause config --- pkg/pdutil/pd.go | 122 +++++++++++++++++++++-------------------------- 1 file changed, 54 insertions(+), 68 deletions(-) diff --git a/pkg/pdutil/pd.go b/pkg/pdutil/pd.go index 6a9d133e6..2f2e98e9a 100644 --- a/pkg/pdutil/pd.go +++ b/pkg/pdutil/pd.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tidb/util/codec" pd "github.com/tikv/pd/client" + "github.com/coreos/go-semver/semver" berrors "github.com/pingcap/br/pkg/errors" "github.com/pingcap/br/pkg/utils" @@ -37,6 +38,12 @@ const ( pauseTimeout = 5 * time.Minute ) +var( + // in this version we can use pause configs + // see https://github.com/tikv/pd/pull/3088 + pauseConfigVersion = semver.New("4.0.8") +) + // clusterConfig represents a set of scheduler whose config have been modified // along with their original config. type clusterConfig struct { @@ -62,14 +69,12 @@ var ( "shuffle-hot-region-scheduler": {}, } // TODO remove this, see https://github.com/pingcap/br/pull/555#discussion_r509855972 - pdRegionMergeCfg = []string{ - "max-merge-region-keys", - "max-merge-region-size", - } - pdScheduleLimitCfg = []string{ - "leader-schedule-limit", - "region-schedule-limit", - "max-snapshot-count", + expectPDCfg = map[string]int { + "max-merge-region-keys": 0, + "max-merge-region-size": 0, + "leader-schedule-limit": 1, + "region-schedule-limit": 1, + "max-snapshot-count": 1, } // DefaultPDCfg find by https://github.com/tikv/pd/blob/master/conf/config.toml. @@ -122,6 +127,7 @@ type PdController struct { addrs []string cli *http.Client pdClient pd.Client + version *semver.Version // control the pause schedulers goroutine schedulerPauseCh chan struct{} @@ -144,6 +150,7 @@ func NewPdController( addrs := strings.Split(pdAddrs, ",") processedAddrs := make([]string, 0, len(addrs)) var failure error + var versionBytes []byte for _, addr := range addrs { if addr != "" && !strings.HasPrefix("http", addr) { if tlsConf != nil { @@ -153,7 +160,7 @@ func NewPdController( } } processedAddrs = append(processedAddrs, addr) - _, failure = pdRequest(ctx, addr, clusterVersionPrefix, cli, http.MethodGet, nil) + versionBytes, failure = pdRequest(ctx, addr, clusterVersionPrefix, cli, http.MethodGet, nil) if failure == nil { break } @@ -161,6 +168,10 @@ func NewPdController( if failure != nil { return nil, errors.Annotatef(failure, "pd address (%s) not available, please check network", pdAddrs) } + version, err := semver.NewVersion(string(versionBytes)) + if err != nil { + return nil, errors.Annotatef(err, "transform pd version failed", string(versionBytes)) + } maxCallMsgSize := []grpc.DialOption{ grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)), @@ -180,12 +191,17 @@ func NewPdController( addrs: processedAddrs, cli: cli, pdClient: pdClient, + version: version, // We should make a buffered channel here otherwise when context canceled, // gracefully shutdown will stick at resuming schedulers. schedulerPauseCh: make(chan struct{}, 1), }, nil } +func (p *PdController) isPauseConfigEnabled() bool { + return p.version.Compare(*pauseConfigVersion) >= 0 +} + // SetHTTP set pd addrs and cli for test. func (p *PdController) SetHTTP(addrs []string, cli *http.Client) { p.addrs = addrs @@ -260,16 +276,14 @@ func (p *PdController) PauseSchedulers(ctx context.Context, schedulers []string) return p.pauseSchedulersWith(ctx, schedulers, pdRequest) } -func (p *PdController) pauseSchedulersWith(ctx context.Context, schedulers []string, post pdHTTPRequest) ([]string, error) { - removedSchedulers := make([]string, 0, len(schedulers)) +func (p *PdController) doPauseSchedulers(ctx context.Context, schedulers []string, post pdHTTPRequest) ([]string, error) { // pause this scheduler with 300 seconds body, err := json.Marshal(pauseSchedulerBody{Delay: int64(pauseTimeout)}) if err != nil { return nil, err } - - // first pause this scheduler, if the first time failed. we should return the error - // so put first time out of for loop. and in for loop we could ignore other failed pause. + // PauseSchedulers remove pd scheduler temporarily. + removedSchedulers := make([]string, 0, len(schedulers)) for _, scheduler := range schedulers { prefix := fmt.Sprintf("%s/%s", schedulerPrefix, scheduler) for _, addr := range p.addrs { @@ -280,11 +294,21 @@ func (p *PdController) pauseSchedulersWith(ctx context.Context, schedulers []str } } if err != nil { - log.Error("failed to pause scheduler at beginning", - zap.Strings("name", schedulers), zap.Error(err)) - return nil, err + return removedSchedulers, err } } + return removedSchedulers, nil +} + +func (p *PdController) pauseSchedulersWith(ctx context.Context, schedulers []string, post pdHTTPRequest) ([]string, error) { + // first pause this scheduler, if the first time failed. we should return the error + // so put first time out of for loop. and in for loop we could ignore other failed pause. + removedSchedulers, err := p.doPauseSchedulers(ctx, schedulers, post) + if err != nil { + log.Error("failed to pause scheduler at beginning", + zap.Strings("name", schedulers), zap.Error(err)) + return nil, err + } log.Info("pause scheduler successful at beginning", zap.Strings("name", schedulers)) go func() { @@ -296,19 +320,11 @@ func (p *PdController) pauseSchedulersWith(ctx context.Context, schedulers []str case <-ctx.Done(): return case <-tick.C: - for _, scheduler := range schedulers { - prefix := fmt.Sprintf("%s/%s", schedulerPrefix, scheduler) - for _, addr := range p.addrs { - _, err = post(ctx, addr, prefix, p.cli, http.MethodPost, bytes.NewBuffer(body)) - if err == nil { - break - } - } - if err == nil { - log.Info("pause scheduler", zap.String("name", scheduler)) - } else { - log.Warn("pause scheduler failed, ignore it and wait next time pause", zap.Error(err)) - } + _, err := p.doPauseSchedulers(ctx, schedulers, post) + if err == nil { + log.Info("pause scheduler", zap.Strings("name", removedSchedulers)) + } else { + log.Warn("pause scheduler failed, ignore it and wait next time pause", zap.Error(err)) } case <-p.schedulerPauseCh: log.Info("exit pause scheduler successful") @@ -423,7 +439,7 @@ func restoreSchedulers(ctx context.Context, pd *PdController, clusterCfg cluster } log.Info("restoring config", zap.Any("config", clusterCfg.scheduleCfg)) mergeCfg := make(map[string]interface{}) - for _, cfgKey := range pdRegionMergeCfg { + for cfgKey := range expectPDCfg { value := clusterCfg.scheduleCfg[cfgKey] if value == nil { // Ignore non-exist config. @@ -435,18 +451,6 @@ func restoreSchedulers(ctx context.Context, pd *PdController, clusterCfg cluster return errors.Annotate(err, "fail to update PD merge config") } - scheduleLimitCfg := make(map[string]interface{}) - for _, cfgKey := range pdScheduleLimitCfg { - value := clusterCfg.scheduleCfg[cfgKey] - if value == nil { - // Ignore non-exist config. - continue - } - scheduleLimitCfg[cfgKey] = value - } - if err := pd.UpdatePDScheduleConfig(ctx, scheduleLimitCfg); err != nil { - return errors.Annotate(err, "fail to update PD schedule config") - } if locationPlacement, ok := clusterCfg.scheduleCfg["enable-location-replacement"]; ok { log.Debug("restoring config enable-location-replacement", zap.Any("enable-location-placement", locationPlacement)) if err := pd.UpdatePDScheduleConfig(ctx, map[string]interface{}{"enable-location-replacement": locationPlacement}); err != nil { @@ -496,38 +500,20 @@ func (p *PdController) RemoveSchedulers(ctx context.Context) (undo utils.UndoFun undo = p.makeUndoFunctionByConfig(clusterConfig{scheduler: removedSchedulers, scheduleCfg: scheduleCfg}) log.Debug("saved PD config", zap.Any("config", scheduleCfg)) - disableMergeCfg := make(map[string]interface{}) - for _, cfgKey := range pdRegionMergeCfg { - value := scheduleCfg[cfgKey] - if value == nil { + disablePDCfg := make(map[string]interface{}) + for cfgKey, cfgVal := range expectPDCfg { + value, ok := scheduleCfg[cfgKey] + if !ok { // Ignore non-exist config. continue } - // Disable region merge by setting config to 0. - disableMergeCfg[cfgKey] = 0 + limit := int(value.(float64)) + disablePDCfg[cfgKey] = int(math.Min(40, float64(limit*len(stores)))) * cfgVal } - err = p.UpdatePDScheduleConfig(ctx, disableMergeCfg) + err = p.UpdatePDScheduleConfig(ctx, disablePDCfg) if err != nil { return } - - scheduleLimitCfg := make(map[string]interface{}) - for _, cfgKey := range pdScheduleLimitCfg { - value := scheduleCfg[cfgKey] - if value == nil { - // Ignore non-exist config. - continue - } - - // Speed update PD scheduler by enlarging scheduling limits. - // Multiply limits by store count but no more than 40. - // Larger limit may make cluster unstable. - limit := int(value.(float64)) - scheduleLimitCfg[cfgKey] = math.Min(40, float64(limit*len(stores))) - } - if err := p.UpdatePDScheduleConfig(ctx, scheduleLimitCfg); err != nil { - return undo, err - } return undo, p.UpdatePDScheduleConfig(ctx, map[string]interface{}{"enable-location-replacement": "false"}) } From 6f141e6aaca7d4cf53ab9da4b0bcaced126a979c Mon Sep 17 00:00:00 2001 From: luancheng Date: Tue, 10 Nov 2020 15:31:33 +0800 Subject: [PATCH 02/23] refactor pdutil --- pkg/pdutil/pd.go | 142 ++++++++++++++++++++++++++---------------- pkg/pdutil/pd_test.go | 4 +- 2 files changed, 89 insertions(+), 57 deletions(-) diff --git a/pkg/pdutil/pd.go b/pkg/pdutil/pd.go index 2f2e98e9a..e43c8a3d6 100644 --- a/pkg/pdutil/pd.go +++ b/pkg/pdutil/pd.go @@ -19,11 +19,11 @@ import ( "go.uber.org/zap" "google.golang.org/grpc" + "github.com/coreos/go-semver/semver" "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb/util/codec" pd "github.com/tikv/pd/client" - "github.com/coreos/go-semver/semver" berrors "github.com/pingcap/br/pkg/errors" "github.com/pingcap/br/pkg/utils" @@ -38,10 +38,10 @@ const ( pauseTimeout = 5 * time.Minute ) -var( +var ( // in this version we can use pause configs // see https://github.com/tikv/pd/pull/3088 - pauseConfigVersion = semver.New("4.0.8") + pauseConfigVersion = semver.New("4.0.8") ) // clusterConfig represents a set of scheduler whose config have been modified @@ -68,13 +68,14 @@ var ( "shuffle-region-scheduler": {}, "shuffle-hot-region-scheduler": {}, } - // TODO remove this, see https://github.com/pingcap/br/pull/555#discussion_r509855972 - expectPDCfg = map[string]int { + expectPDCfg = map[string]interface{}{ "max-merge-region-keys": 0, "max-merge-region-size": 0, - "leader-schedule-limit": 1, - "region-schedule-limit": 1, - "max-snapshot-count": 1, + // TODO remove this schedule-limits, see https://github.com/pingcap/br/pull/555#discussion_r509855972 + "leader-schedule-limit": 1, + "region-schedule-limit": 1, + "max-snapshot-count": 1, + "enable-location-replacement": false, } // DefaultPDCfg find by https://github.com/tikv/pd/blob/master/conf/config.toml. @@ -166,13 +167,10 @@ func NewPdController( } } if failure != nil { - return nil, errors.Annotatef(failure, "pd address (%s) not available, please check network", pdAddrs) - } - version, err := semver.NewVersion(string(versionBytes)) - if err != nil { - return nil, errors.Annotatef(err, "transform pd version failed", string(versionBytes)) + return nil, errors.Annotatef(berrors.ErrPDUpdateFailed, "pd address (%s) not available, please check network", pdAddrs) } + version := semver.New(string(versionBytes)) maxCallMsgSize := []grpc.DialOption{ grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)), grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(maxMsgSize)), @@ -191,7 +189,7 @@ func NewPdController( addrs: processedAddrs, cli: cli, pdClient: pdClient, - version: version, + version: version, // We should make a buffered channel here otherwise when context canceled, // gracefully shutdown will stick at resuming schedulers. schedulerPauseCh: make(chan struct{}, 1), @@ -273,7 +271,7 @@ func (p *PdController) getRegionCountWith( // PauseSchedulers remove pd scheduler temporarily. func (p *PdController) PauseSchedulers(ctx context.Context, schedulers []string) ([]string, error) { - return p.pauseSchedulersWith(ctx, schedulers, pdRequest) + return p.pauseSchedulersAndConfigWith(ctx, schedulers, nil, pdRequest) } func (p *PdController) doPauseSchedulers(ctx context.Context, schedulers []string, post pdHTTPRequest) ([]string, error) { @@ -300,7 +298,10 @@ func (p *PdController) doPauseSchedulers(ctx context.Context, schedulers []strin return removedSchedulers, nil } -func (p *PdController) pauseSchedulersWith(ctx context.Context, schedulers []string, post pdHTTPRequest) ([]string, error) { +func (p *PdController) pauseSchedulersAndConfigWith( + ctx context.Context, schedulers []string, + schedulerCfg map[string]interface{}, post pdHTTPRequest, +) ([]string, error) { // first pause this scheduler, if the first time failed. we should return the error // so put first time out of for loop. and in for loop we could ignore other failed pause. removedSchedulers, err := p.doPauseSchedulers(ctx, schedulers, post) @@ -310,6 +311,15 @@ func (p *PdController) pauseSchedulersWith(ctx context.Context, schedulers []str return nil, err } log.Info("pause scheduler successful at beginning", zap.Strings("name", schedulers)) + if schedulerCfg != nil { + err = p.doPauseConfigs(ctx, schedulerCfg) + if err != nil { + log.Error("failed to pause config at beginning", + zap.Any("cfg", schedulerCfg), zap.Error(err)) + return nil, err + } + log.Info("pause configs successful at beginning", zap.Any("cfg", schedulerCfg)) + } go func() { tick := time.NewTicker(pauseTimeout / 3) @@ -321,13 +331,18 @@ func (p *PdController) pauseSchedulersWith(ctx context.Context, schedulers []str return case <-tick.C: _, err := p.doPauseSchedulers(ctx, schedulers, post) - if err == nil { - log.Info("pause scheduler", zap.Strings("name", removedSchedulers)) - } else { + if err != nil { log.Warn("pause scheduler failed, ignore it and wait next time pause", zap.Error(err)) } + if schedulerCfg != nil { + err = p.doPauseConfigs(ctx, schedulerCfg) + if err != nil { + log.Warn("pause configs failed, ignore it and wait next time pause", zap.Error(err)) + } + } + log.Info("pause scheduler(configs)", zap.Strings("name", removedSchedulers)) case <-p.schedulerPauseCh: - log.Info("exit pause scheduler successful") + log.Info("exit pause scheduler and configs successful") return } } @@ -416,14 +431,18 @@ func (p *PdController) GetPDScheduleConfig( // UpdatePDScheduleConfig updates PD schedule config value associated with the key. func (p *PdController) UpdatePDScheduleConfig( - ctx context.Context, cfg map[string]interface{}, + ctx context.Context, cfg map[string]interface{}, prefixs ...string, ) error { + prefix := scheduleConfigPrefix + if len(prefixs) == 0 { + prefix = prefixs[0] + } for _, addr := range p.addrs { reqData, err := json.Marshal(cfg) if err != nil { return err } - _, e := pdRequest(ctx, addr, scheduleConfigPrefix, + _, e := pdRequest(ctx, addr, prefix, p.cli, http.MethodPost, bytes.NewBuffer(reqData)) if e == nil { return nil @@ -433,6 +452,18 @@ func (p *PdController) UpdatePDScheduleConfig( return errors.Annotate(berrors.ErrPDUpdateFailed, "failed to update PD schedule config") } +func (p *PdController) pauseSchedulersAndConfig( + ctx context.Context, schedulers []string, cfg map[string]interface{}, +) ([]string, error) { + return p.pauseSchedulersAndConfigWith(ctx, schedulers, cfg, pdRequest) +} + +func (p *PdController) doPauseConfigs(ctx context.Context, cfg map[string]interface{}) error { + // pause this scheduler with 300 seconds + prefix := fmt.Sprintf("%s?ttlSecond=%d", schedulerPrefix, 300) + return p.UpdatePDScheduleConfig(ctx, cfg, prefix) +} + func restoreSchedulers(ctx context.Context, pd *PdController, clusterCfg clusterConfig) error { if err := pd.ResumeSchedulers(ctx, clusterCfg.scheduler); err != nil { return errors.Annotate(err, "fail to add PD schedulers") @@ -450,13 +481,6 @@ func restoreSchedulers(ctx context.Context, pd *PdController, clusterCfg cluster if err := pd.UpdatePDScheduleConfig(ctx, mergeCfg); err != nil { return errors.Annotate(err, "fail to update PD merge config") } - - if locationPlacement, ok := clusterCfg.scheduleCfg["enable-location-replacement"]; ok { - log.Debug("restoring config enable-location-replacement", zap.Any("enable-location-placement", locationPlacement)) - if err := pd.UpdatePDScheduleConfig(ctx, map[string]interface{}{"enable-location-replacement": locationPlacement}); err != nil { - return err - } - } return nil } @@ -470,24 +494,6 @@ func (p *PdController) makeUndoFunctionByConfig(config clusterConfig) utils.Undo // RemoveSchedulers removes the schedulers that may slow down BR speed. func (p *PdController) RemoveSchedulers(ctx context.Context) (undo utils.UndoFunc, err error) { undo = utils.Nop - - // Remove default PD scheduler that may affect restore process. - existSchedulers, err := p.ListSchedulers(ctx) - if err != nil { - return - } - needRemoveSchedulers := make([]string, 0, len(existSchedulers)) - for _, s := range existSchedulers { - if _, ok := Schedulers[s]; ok { - needRemoveSchedulers = append(needRemoveSchedulers, s) - } - } - removedSchedulers, err := p.PauseSchedulers(ctx, needRemoveSchedulers) - if err != nil { - return - } - undo = p.makeUndoFunctionByConfig(clusterConfig{scheduler: removedSchedulers}) - stores, err := p.pdClient.GetAllStores(ctx) if err != nil { return @@ -496,10 +502,6 @@ func (p *PdController) RemoveSchedulers(ctx context.Context) (undo utils.UndoFun if err != nil { return } - - undo = p.makeUndoFunctionByConfig(clusterConfig{scheduler: removedSchedulers, scheduleCfg: scheduleCfg}) - log.Debug("saved PD config", zap.Any("config", scheduleCfg)) - disablePDCfg := make(map[string]interface{}) for cfgKey, cfgVal := range expectPDCfg { value, ok := scheduleCfg[cfgKey] @@ -507,14 +509,44 @@ func (p *PdController) RemoveSchedulers(ctx context.Context) (undo utils.UndoFun // Ignore non-exist config. continue } - limit := int(value.(float64)) - disablePDCfg[cfgKey] = int(math.Min(40, float64(limit*len(stores)))) * cfgVal + switch cfgVal.(type) { + case bool: + disablePDCfg[cfgKey] = false + case int: + limit := int(value.(float64)) + disablePDCfg[cfgKey] = int(math.Min(40, float64(limit*len(stores)))) * cfgVal.(int) + } } - err = p.UpdatePDScheduleConfig(ctx, disablePDCfg) + undo = p.makeUndoFunctionByConfig(clusterConfig{scheduleCfg: scheduleCfg}) + log.Debug("saved PD config", zap.Any("config", scheduleCfg)) + + // Remove default PD scheduler that may affect restore process. + existSchedulers, err := p.ListSchedulers(ctx) if err != nil { return } - return undo, p.UpdatePDScheduleConfig(ctx, map[string]interface{}{"enable-location-replacement": "false"}) + needRemoveSchedulers := make([]string, 0, len(existSchedulers)) + for _, s := range existSchedulers { + if _, ok := Schedulers[s]; ok { + needRemoveSchedulers = append(needRemoveSchedulers, s) + } + } + + var removedSchedulers []string + if p.isPauseConfigEnabled() { + // after 4.0.8 we can set these config with TTL + removedSchedulers, err = p.pauseSchedulersAndConfig(ctx, needRemoveSchedulers, disablePDCfg) + } else { + // adapt to earlier version (before 4.0.8) of pd cluster + // which doesn't have temporary config setting. + err = p.UpdatePDScheduleConfig(ctx, disablePDCfg) + if err != nil { + return + } + removedSchedulers, err = p.PauseSchedulers(ctx, needRemoveSchedulers) + } + undo = p.makeUndoFunctionByConfig(clusterConfig{scheduler: removedSchedulers, scheduleCfg: scheduleCfg}) + return undo, err } // Close close the connection to pd. diff --git a/pkg/pdutil/pd_test.go b/pkg/pdutil/pd_test.go index bbfb59e9c..b46589cac 100644 --- a/pkg/pdutil/pd_test.go +++ b/pkg/pdutil/pd_test.go @@ -38,7 +38,7 @@ func (s *testPDControllerSuite) TestScheduler(c *C) { } schedulerPauseCh := make(chan struct{}) pdController := &PdController{addrs: []string{"", ""}, schedulerPauseCh: schedulerPauseCh} - _, err := pdController.pauseSchedulersWith(ctx, []string{scheduler}, mock) + _, err := pdController.pauseSchedulersAndConfigWith(ctx, []string{scheduler}, mock) c.Assert(err, ErrorMatches, "failed") go func() { @@ -53,7 +53,7 @@ func (s *testPDControllerSuite) TestScheduler(c *C) { mock = func(context.Context, string, string, *http.Client, string, io.Reader) ([]byte, error) { return []byte(`["` + scheduler + `"]`), nil } - _, err = pdController.pauseSchedulersWith(ctx, []string{scheduler}, mock) + _, err = pdController.pauseSchedulersAndConfigWith(ctx, []string{scheduler}, mock) c.Assert(err, IsNil) go func() { From 7cf0c64bfabcc16800305514d039fc09ee5dcaaf Mon Sep 17 00:00:00 2001 From: luancheng Date: Tue, 10 Nov 2020 17:39:25 +0800 Subject: [PATCH 03/23] fix ci --- pkg/pdutil/pd.go | 4 ++-- pkg/pdutil/pd_test.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/pdutil/pd.go b/pkg/pdutil/pd.go index e43c8a3d6..1d6aafb8a 100644 --- a/pkg/pdutil/pd.go +++ b/pkg/pdutil/pd.go @@ -509,12 +509,12 @@ func (p *PdController) RemoveSchedulers(ctx context.Context) (undo utils.UndoFun // Ignore non-exist config. continue } - switch cfgVal.(type) { + switch v := cfgVal.(type) { case bool: disablePDCfg[cfgKey] = false case int: limit := int(value.(float64)) - disablePDCfg[cfgKey] = int(math.Min(40, float64(limit*len(stores)))) * cfgVal.(int) + disablePDCfg[cfgKey] = int(math.Min(40, float64(limit*len(stores)))) * v } } undo = p.makeUndoFunctionByConfig(clusterConfig{scheduleCfg: scheduleCfg}) diff --git a/pkg/pdutil/pd_test.go b/pkg/pdutil/pd_test.go index b46589cac..987e5260d 100644 --- a/pkg/pdutil/pd_test.go +++ b/pkg/pdutil/pd_test.go @@ -38,7 +38,7 @@ func (s *testPDControllerSuite) TestScheduler(c *C) { } schedulerPauseCh := make(chan struct{}) pdController := &PdController{addrs: []string{"", ""}, schedulerPauseCh: schedulerPauseCh} - _, err := pdController.pauseSchedulersAndConfigWith(ctx, []string{scheduler}, mock) + _, err := pdController.pauseSchedulersAndConfigWith(ctx, []string{scheduler}, nil, mock) c.Assert(err, ErrorMatches, "failed") go func() { @@ -53,7 +53,7 @@ func (s *testPDControllerSuite) TestScheduler(c *C) { mock = func(context.Context, string, string, *http.Client, string, io.Reader) ([]byte, error) { return []byte(`["` + scheduler + `"]`), nil } - _, err = pdController.pauseSchedulersAndConfigWith(ctx, []string{scheduler}, mock) + _, err = pdController.pauseSchedulersAndConfigWith(ctx, []string{scheduler}, nil, mock) c.Assert(err, IsNil) go func() { From 24f2d24f6bc73dca5baec7ea81d2891783e6fc60 Mon Sep 17 00:00:00 2001 From: luancheng Date: Tue, 10 Nov 2020 19:32:58 +0800 Subject: [PATCH 04/23] add pd version fail back --- pkg/pdutil/pd.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/pkg/pdutil/pd.go b/pkg/pdutil/pd.go index 1d6aafb8a..2f76c913f 100644 --- a/pkg/pdutil/pd.go +++ b/pkg/pdutil/pd.go @@ -39,9 +39,9 @@ const ( ) var ( - // in this version we can use pause configs + // in v4.0.8 version we can use pause configs // see https://github.com/tikv/pd/pull/3088 - pauseConfigVersion = semver.New("4.0.8") + pauseConfigVersion = semver.New("4.0.7") ) // clusterConfig represents a set of scheduler whose config have been modified @@ -170,7 +170,12 @@ func NewPdController( return nil, errors.Annotatef(berrors.ErrPDUpdateFailed, "pd address (%s) not available, please check network", pdAddrs) } - version := semver.New(string(versionBytes)) + version, err := semver.NewVersion(string(versionBytes)) + if err != nil { + log.Warn("fail back to old version", + zap.Binary("version", versionBytes), zap.Error(err)) + version = pauseConfigVersion + } maxCallMsgSize := []grpc.DialOption{ grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)), grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(maxMsgSize)), @@ -197,7 +202,7 @@ func NewPdController( } func (p *PdController) isPauseConfigEnabled() bool { - return p.version.Compare(*pauseConfigVersion) >= 0 + return p.version.Compare(*pauseConfigVersion) > 0 } // SetHTTP set pd addrs and cli for test. From 99692eb4cbcd243d3faad8669790dcdbbe59e8c5 Mon Sep 17 00:00:00 2001 From: luancheng Date: Tue, 10 Nov 2020 20:18:39 +0800 Subject: [PATCH 05/23] fix ci --- pkg/pdutil/pd.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/pdutil/pd.go b/pkg/pdutil/pd.go index 2f76c913f..3402e3c09 100644 --- a/pkg/pdutil/pd.go +++ b/pkg/pdutil/pd.go @@ -439,7 +439,7 @@ func (p *PdController) UpdatePDScheduleConfig( ctx context.Context, cfg map[string]interface{}, prefixs ...string, ) error { prefix := scheduleConfigPrefix - if len(prefixs) == 0 { + if len(prefixs) != 0 { prefix = prefixs[0] } for _, addr := range p.addrs { @@ -559,3 +559,4 @@ func (p *PdController) Close() { p.pdClient.Close() close(p.schedulerPauseCh) } + From 682fd5ff4e359d2c5b5d3ad8cd4ded6012f7d669 Mon Sep 17 00:00:00 2001 From: luancheng Date: Tue, 10 Nov 2020 20:21:00 +0800 Subject: [PATCH 06/23] fix ci --- pkg/pdutil/pd.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/pdutil/pd.go b/pkg/pdutil/pd.go index 3402e3c09..e5021cf6f 100644 --- a/pkg/pdutil/pd.go +++ b/pkg/pdutil/pd.go @@ -559,4 +559,3 @@ func (p *PdController) Close() { p.pdClient.Close() close(p.schedulerPauseCh) } - From 64d6d4ca44c3f85b4d0a60dc7110c1029553214e Mon Sep 17 00:00:00 2001 From: luancheng Date: Wed, 11 Nov 2020 12:09:59 +0800 Subject: [PATCH 07/23] fix unmarshal bool --- pkg/pdutil/pd.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/pdutil/pd.go b/pkg/pdutil/pd.go index e5021cf6f..410a8b531 100644 --- a/pkg/pdutil/pd.go +++ b/pkg/pdutil/pd.go @@ -516,7 +516,7 @@ func (p *PdController) RemoveSchedulers(ctx context.Context) (undo utils.UndoFun } switch v := cfgVal.(type) { case bool: - disablePDCfg[cfgKey] = false + disablePDCfg[cfgKey] = "false" case int: limit := int(value.(float64)) disablePDCfg[cfgKey] = int(math.Min(40, float64(limit*len(stores)))) * v From 30e0887d7de238f8951704d872d926cbb9f783fd Mon Sep 17 00:00:00 2001 From: luancheng Date: Wed, 11 Nov 2020 12:24:15 +0800 Subject: [PATCH 08/23] address comment --- pkg/pdutil/pd.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/pdutil/pd.go b/pkg/pdutil/pd.go index 410a8b531..479a5e759 100644 --- a/pkg/pdutil/pd.go +++ b/pkg/pdutil/pd.go @@ -41,7 +41,7 @@ const ( var ( // in v4.0.8 version we can use pause configs // see https://github.com/tikv/pd/pull/3088 - pauseConfigVersion = semver.New("4.0.7") + pauseConfigVersion = semver.New("4.0.8") ) // clusterConfig represents a set of scheduler whose config have been modified @@ -172,9 +172,9 @@ func NewPdController( version, err := semver.NewVersion(string(versionBytes)) if err != nil { - log.Warn("fail back to old version", + log.Warn("fail back to v0.0.0 version", zap.Binary("version", versionBytes), zap.Error(err)) - version = pauseConfigVersion + version = semver.New("v0.0.0") } maxCallMsgSize := []grpc.DialOption{ grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)), @@ -202,7 +202,7 @@ func NewPdController( } func (p *PdController) isPauseConfigEnabled() bool { - return p.version.Compare(*pauseConfigVersion) > 0 + return p.version.Compare(*pauseConfigVersion) >= 0 } // SetHTTP set pd addrs and cli for test. From 571db35bf3cdc492898fdc077ae9bed7c353d05f Mon Sep 17 00:00:00 2001 From: luancheng Date: Wed, 11 Nov 2020 12:30:11 +0800 Subject: [PATCH 09/23] fi --- pkg/pdutil/pd.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/pdutil/pd.go b/pkg/pdutil/pd.go index 479a5e759..a59487fb3 100644 --- a/pkg/pdutil/pd.go +++ b/pkg/pdutil/pd.go @@ -174,7 +174,7 @@ func NewPdController( if err != nil { log.Warn("fail back to v0.0.0 version", zap.Binary("version", versionBytes), zap.Error(err)) - version = semver.New("v0.0.0") + version = semver.New("0.0.0") } maxCallMsgSize := []grpc.DialOption{ grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)), From 002f54cacbacef9e485c03d79992d438250d254e Mon Sep 17 00:00:00 2001 From: luancheng Date: Wed, 11 Nov 2020 17:14:33 +0800 Subject: [PATCH 10/23] add test case --- cmd/debug.go | 2 +- pkg/pdutil/pd.go | 36 ++++++++++++++++++------------------ pkg/pdutil/pd_test.go | 15 ++++++++++++++- 3 files changed, 33 insertions(+), 20 deletions(-) diff --git a/cmd/debug.go b/cmd/debug.go index 35f6e502d..b1b4d4c5a 100644 --- a/cmd/debug.go +++ b/cmd/debug.go @@ -362,7 +362,7 @@ func setPDConfigCommand() *cobra.Command { } defer mgr.Close() - if err := mgr.UpdatePDScheduleConfig(ctx, pdutil.DefaultPDCfg); err != nil { + if err := mgr.UpdatePDScheduleConfig(ctx, pdutil.DefaultPDCfg, pdutil.PDRequest); err != nil { return errors.Annotate(err, "fail to update PD merge config") } log.Info("add pd configs succeed", zap.Any("config", pdutil.DefaultPDCfg)) diff --git a/pkg/pdutil/pd.go b/pkg/pdutil/pd.go index a59487fb3..3b358cf70 100644 --- a/pkg/pdutil/pd.go +++ b/pkg/pdutil/pd.go @@ -92,8 +92,8 @@ var ( // pdHTTPRequest defines the interface to send a request to pd and return the result in bytes. type pdHTTPRequest func(context.Context, string, string, *http.Client, string, io.Reader) ([]byte, error) -// pdRequest is a func to send a HTTP to pd and return the result bytes. -func pdRequest( +// PDRequest is a func to send a HTTP to pd and return the result bytes. +func PDRequest( ctx context.Context, addr string, prefix string, cli *http.Client, method string, body io.Reader) ([]byte, error) { @@ -161,7 +161,7 @@ func NewPdController( } } processedAddrs = append(processedAddrs, addr) - versionBytes, failure = pdRequest(ctx, addr, clusterVersionPrefix, cli, http.MethodGet, nil) + versionBytes, failure = PDRequest(ctx, addr, clusterVersionPrefix, cli, http.MethodGet, nil) if failure == nil { break } @@ -223,7 +223,7 @@ func (p *PdController) GetPDClient() pd.Client { // GetClusterVersion returns the current cluster version. func (p *PdController) GetClusterVersion(ctx context.Context) (string, error) { - return p.getClusterVersionWith(ctx, pdRequest) + return p.getClusterVersionWith(ctx, PDRequest) } func (p *PdController) getClusterVersionWith(ctx context.Context, get pdHTTPRequest) (string, error) { @@ -242,7 +242,7 @@ func (p *PdController) getClusterVersionWith(ctx context.Context, get pdHTTPRequ // GetRegionCount returns the region count in the specified range. func (p *PdController) GetRegionCount(ctx context.Context, startKey, endKey []byte) (int, error) { - return p.getRegionCountWith(ctx, pdRequest, startKey, endKey) + return p.getRegionCountWith(ctx, PDRequest, startKey, endKey) } func (p *PdController) getRegionCountWith( @@ -276,7 +276,7 @@ func (p *PdController) getRegionCountWith( // PauseSchedulers remove pd scheduler temporarily. func (p *PdController) PauseSchedulers(ctx context.Context, schedulers []string) ([]string, error) { - return p.pauseSchedulersAndConfigWith(ctx, schedulers, nil, pdRequest) + return p.pauseSchedulersAndConfigWith(ctx, schedulers, nil, PDRequest) } func (p *PdController) doPauseSchedulers(ctx context.Context, schedulers []string, post pdHTTPRequest) ([]string, error) { @@ -317,7 +317,7 @@ func (p *PdController) pauseSchedulersAndConfigWith( } log.Info("pause scheduler successful at beginning", zap.Strings("name", schedulers)) if schedulerCfg != nil { - err = p.doPauseConfigs(ctx, schedulerCfg) + err = p.doPauseConfigs(ctx, schedulerCfg, post) if err != nil { log.Error("failed to pause config at beginning", zap.Any("cfg", schedulerCfg), zap.Error(err)) @@ -340,7 +340,7 @@ func (p *PdController) pauseSchedulersAndConfigWith( log.Warn("pause scheduler failed, ignore it and wait next time pause", zap.Error(err)) } if schedulerCfg != nil { - err = p.doPauseConfigs(ctx, schedulerCfg) + err = p.doPauseConfigs(ctx, schedulerCfg, post) if err != nil { log.Warn("pause configs failed, ignore it and wait next time pause", zap.Error(err)) } @@ -357,7 +357,7 @@ func (p *PdController) pauseSchedulersAndConfigWith( // ResumeSchedulers resume pd scheduler. func (p *PdController) ResumeSchedulers(ctx context.Context, schedulers []string) error { - return p.resumeSchedulerWith(ctx, schedulers, pdRequest) + return p.resumeSchedulerWith(ctx, schedulers, PDRequest) } func (p *PdController) resumeSchedulerWith(ctx context.Context, schedulers []string, post pdHTTPRequest) (err error) { @@ -390,7 +390,7 @@ func (p *PdController) resumeSchedulerWith(ctx context.Context, schedulers []str // ListSchedulers list all pd scheduler. func (p *PdController) ListSchedulers(ctx context.Context) ([]string, error) { - return p.listSchedulersWith(ctx, pdRequest) + return p.listSchedulersWith(ctx, PDRequest) } func (p *PdController) listSchedulersWith(ctx context.Context, get pdHTTPRequest) ([]string, error) { @@ -418,7 +418,7 @@ func (p *PdController) GetPDScheduleConfig( ) (map[string]interface{}, error) { var err error for _, addr := range p.addrs { - v, e := pdRequest( + v, e := PDRequest( ctx, addr, scheduleConfigPrefix, p.cli, http.MethodGet, nil) if e != nil { err = e @@ -436,7 +436,7 @@ func (p *PdController) GetPDScheduleConfig( // UpdatePDScheduleConfig updates PD schedule config value associated with the key. func (p *PdController) UpdatePDScheduleConfig( - ctx context.Context, cfg map[string]interface{}, prefixs ...string, + ctx context.Context, cfg map[string]interface{}, post pdHTTPRequest, prefixs ...string, ) error { prefix := scheduleConfigPrefix if len(prefixs) != 0 { @@ -447,7 +447,7 @@ func (p *PdController) UpdatePDScheduleConfig( if err != nil { return err } - _, e := pdRequest(ctx, addr, prefix, + _, e := post(ctx, addr, prefix, p.cli, http.MethodPost, bytes.NewBuffer(reqData)) if e == nil { return nil @@ -460,13 +460,13 @@ func (p *PdController) UpdatePDScheduleConfig( func (p *PdController) pauseSchedulersAndConfig( ctx context.Context, schedulers []string, cfg map[string]interface{}, ) ([]string, error) { - return p.pauseSchedulersAndConfigWith(ctx, schedulers, cfg, pdRequest) + return p.pauseSchedulersAndConfigWith(ctx, schedulers, cfg, PDRequest) } -func (p *PdController) doPauseConfigs(ctx context.Context, cfg map[string]interface{}) error { +func (p *PdController) doPauseConfigs(ctx context.Context, cfg map[string]interface{}, post pdHTTPRequest) error { // pause this scheduler with 300 seconds prefix := fmt.Sprintf("%s?ttlSecond=%d", schedulerPrefix, 300) - return p.UpdatePDScheduleConfig(ctx, cfg, prefix) + return p.UpdatePDScheduleConfig(ctx, cfg, post, prefix) } func restoreSchedulers(ctx context.Context, pd *PdController, clusterCfg clusterConfig) error { @@ -483,7 +483,7 @@ func restoreSchedulers(ctx context.Context, pd *PdController, clusterCfg cluster } mergeCfg[cfgKey] = value } - if err := pd.UpdatePDScheduleConfig(ctx, mergeCfg); err != nil { + if err := pd.UpdatePDScheduleConfig(ctx, mergeCfg, PDRequest); err != nil { return errors.Annotate(err, "fail to update PD merge config") } return nil @@ -544,7 +544,7 @@ func (p *PdController) RemoveSchedulers(ctx context.Context) (undo utils.UndoFun } else { // adapt to earlier version (before 4.0.8) of pd cluster // which doesn't have temporary config setting. - err = p.UpdatePDScheduleConfig(ctx, disablePDCfg) + err = p.UpdatePDScheduleConfig(ctx, disablePDCfg, PDRequest) if err != nil { return } diff --git a/pkg/pdutil/pd_test.go b/pkg/pdutil/pd_test.go index 987e5260d..b3cfbc6cb 100644 --- a/pkg/pdutil/pd_test.go +++ b/pkg/pdutil/pd_test.go @@ -38,6 +38,7 @@ func (s *testPDControllerSuite) TestScheduler(c *C) { } schedulerPauseCh := make(chan struct{}) pdController := &PdController{addrs: []string{"", ""}, schedulerPauseCh: schedulerPauseCh} + _, err := pdController.pauseSchedulersAndConfigWith(ctx, []string{scheduler}, nil, mock) c.Assert(err, ErrorMatches, "failed") @@ -47,13 +48,25 @@ func (s *testPDControllerSuite) TestScheduler(c *C) { err = pdController.resumeSchedulerWith(ctx, []string{scheduler}, mock) c.Assert(err, IsNil) + cfg := map[string]interface{}{ + "max-merge-region-keys": 0, + "max-snapshot": 1, + "enable-location-replacement": false, + } + _, err = pdController.pauseSchedulersAndConfigWith(ctx, []string{}, cfg, mock) + c.Assert(err, ErrorMatches, "failed to update PD.*") + go func() { + <-schedulerPauseCh + }() + _, err = pdController.listSchedulersWith(ctx, mock) c.Assert(err, ErrorMatches, "failed") mock = func(context.Context, string, string, *http.Client, string, io.Reader) ([]byte, error) { return []byte(`["` + scheduler + `"]`), nil } - _, err = pdController.pauseSchedulersAndConfigWith(ctx, []string{scheduler}, nil, mock) + + _, err = pdController.pauseSchedulersAndConfigWith(ctx, []string{scheduler}, cfg, mock) c.Assert(err, IsNil) go func() { From 5c644336c2b36c187eeddc87a978fcc058fdeb65 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Fri, 13 Nov 2020 14:01:37 +0800 Subject: [PATCH 11/23] Apply suggestions from code review Co-authored-by: kennytm --- pkg/pdutil/pd.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/pdutil/pd.go b/pkg/pdutil/pd.go index 3b358cf70..8a2b32683 100644 --- a/pkg/pdutil/pd.go +++ b/pkg/pdutil/pd.go @@ -41,7 +41,7 @@ const ( var ( // in v4.0.8 version we can use pause configs // see https://github.com/tikv/pd/pull/3088 - pauseConfigVersion = semver.New("4.0.8") + pauseConfigVersion = semver.Version{Major: 4, Minor: 0, Patch: 8} ) // clusterConfig represents a set of scheduler whose config have been modified @@ -173,8 +173,8 @@ func NewPdController( version, err := semver.NewVersion(string(versionBytes)) if err != nil { log.Warn("fail back to v0.0.0 version", - zap.Binary("version", versionBytes), zap.Error(err)) - version = semver.New("0.0.0") + zap.ByteString("version", versionBytes), zap.Error(err)) + version = &semver.Version{Major: 0, Minor: 0, Patch: 0} } maxCallMsgSize := []grpc.DialOption{ grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)), From 0364fc54ce38184d722cf00de26d57fdfb20f077 Mon Sep 17 00:00:00 2001 From: luancheng Date: Fri, 13 Nov 2020 14:05:21 +0800 Subject: [PATCH 12/23] address comment --- pkg/pdutil/pd.go | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/pkg/pdutil/pd.go b/pkg/pdutil/pd.go index 8a2b32683..4b2fb1352 100644 --- a/pkg/pdutil/pd.go +++ b/pkg/pdutil/pd.go @@ -274,11 +274,6 @@ func (p *PdController) getRegionCountWith( return 0, err } -// PauseSchedulers remove pd scheduler temporarily. -func (p *PdController) PauseSchedulers(ctx context.Context, schedulers []string) ([]string, error) { - return p.pauseSchedulersAndConfigWith(ctx, schedulers, nil, PDRequest) -} - func (p *PdController) doPauseSchedulers(ctx context.Context, schedulers []string, post pdHTTPRequest) ([]string, error) { // pause this scheduler with 300 seconds body, err := json.Marshal(pauseSchedulerBody{Delay: int64(pauseTimeout)}) @@ -457,12 +452,6 @@ func (p *PdController) UpdatePDScheduleConfig( return errors.Annotate(berrors.ErrPDUpdateFailed, "failed to update PD schedule config") } -func (p *PdController) pauseSchedulersAndConfig( - ctx context.Context, schedulers []string, cfg map[string]interface{}, -) ([]string, error) { - return p.pauseSchedulersAndConfigWith(ctx, schedulers, cfg, PDRequest) -} - func (p *PdController) doPauseConfigs(ctx context.Context, cfg map[string]interface{}, post pdHTTPRequest) error { // pause this scheduler with 300 seconds prefix := fmt.Sprintf("%s?ttlSecond=%d", schedulerPrefix, 300) @@ -540,7 +529,7 @@ func (p *PdController) RemoveSchedulers(ctx context.Context) (undo utils.UndoFun var removedSchedulers []string if p.isPauseConfigEnabled() { // after 4.0.8 we can set these config with TTL - removedSchedulers, err = p.pauseSchedulersAndConfig(ctx, needRemoveSchedulers, disablePDCfg) + removedSchedulers, err = p.pauseSchedulersAndConfigWith(ctx, needRemoveSchedulers, disablePDCfg, PDRequest) } else { // adapt to earlier version (before 4.0.8) of pd cluster // which doesn't have temporary config setting. @@ -548,7 +537,7 @@ func (p *PdController) RemoveSchedulers(ctx context.Context) (undo utils.UndoFun if err != nil { return } - removedSchedulers, err = p.PauseSchedulers(ctx, needRemoveSchedulers) + removedSchedulers, err = p.pauseSchedulersAndConfigWith(ctx, needRemoveSchedulers, nil, PDRequest) } undo = p.makeUndoFunctionByConfig(clusterConfig{scheduler: removedSchedulers, scheduleCfg: scheduleCfg}) return undo, err From 65febef42769550a159bb0cd8c689c4208482f44 Mon Sep 17 00:00:00 2001 From: luancheng Date: Fri, 13 Nov 2020 14:09:51 +0800 Subject: [PATCH 13/23] fix ci --- pkg/pdutil/pd.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/pdutil/pd.go b/pkg/pdutil/pd.go index 4b2fb1352..e0def532c 100644 --- a/pkg/pdutil/pd.go +++ b/pkg/pdutil/pd.go @@ -202,7 +202,7 @@ func NewPdController( } func (p *PdController) isPauseConfigEnabled() bool { - return p.version.Compare(*pauseConfigVersion) >= 0 + return p.version.Compare(pauseConfigVersion) >= 0 } // SetHTTP set pd addrs and cli for test. From e198c7ef1c317203149997832c6b985b8326e4db Mon Sep 17 00:00:00 2001 From: luancheng Date: Fri, 13 Nov 2020 14:41:52 +0800 Subject: [PATCH 14/23] update comment --- pkg/pdutil/pd.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/pdutil/pd.go b/pkg/pdutil/pd.go index e0def532c..0e03c2889 100644 --- a/pkg/pdutil/pd.go +++ b/pkg/pdutil/pd.go @@ -71,7 +71,9 @@ var ( expectPDCfg = map[string]interface{}{ "max-merge-region-keys": 0, "max-merge-region-size": 0, - // TODO remove this schedule-limits, see https://github.com/pingcap/br/pull/555#discussion_r509855972 + // TODO "leader-schedule-limit" and "region-schedule-limit" don't support ttl for now, + // but we still need set these config for compatible with old version. + // we need wait for https://github.com/tikv/pd/pull/3131 merged. "leader-schedule-limit": 1, "region-schedule-limit": 1, "max-snapshot-count": 1, From 355cdac2bd9566b23ca816935e74596e81a6a3d2 Mon Sep 17 00:00:00 2001 From: luancheng Date: Fri, 13 Nov 2020 16:40:18 +0800 Subject: [PATCH 15/23] update comment --- pkg/pdutil/pd.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/pdutil/pd.go b/pkg/pdutil/pd.go index 0e03c2889..25b99573c 100644 --- a/pkg/pdutil/pd.go +++ b/pkg/pdutil/pd.go @@ -74,6 +74,7 @@ var ( // TODO "leader-schedule-limit" and "region-schedule-limit" don't support ttl for now, // but we still need set these config for compatible with old version. // we need wait for https://github.com/tikv/pd/pull/3131 merged. + // see details https://github.com/pingcap/br/pull/592#discussion_r522684325 "leader-schedule-limit": 1, "region-schedule-limit": 1, "max-snapshot-count": 1, From c8a650d3b54b8c7509569aea6eed2db7ad4e5e67 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Fri, 13 Nov 2020 17:52:24 +0800 Subject: [PATCH 16/23] Update pkg/pdutil/pd.go Co-authored-by: kennytm --- pkg/pdutil/pd.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/pdutil/pd.go b/pkg/pdutil/pd.go index 25b99573c..9fc1b84d7 100644 --- a/pkg/pdutil/pd.go +++ b/pkg/pdutil/pd.go @@ -457,7 +457,7 @@ func (p *PdController) UpdatePDScheduleConfig( func (p *PdController) doPauseConfigs(ctx context.Context, cfg map[string]interface{}, post pdHTTPRequest) error { // pause this scheduler with 300 seconds - prefix := fmt.Sprintf("%s?ttlSecond=%d", schedulerPrefix, 300) + prefix := fmt.Sprintf("%s?ttlSecond=%.0f", schedulerPrefix, pauseTimeout.Seconds()) return p.UpdatePDScheduleConfig(ctx, cfg, post, prefix) } From 7b3f8cb6625e13e776f05536adec73108be641c2 Mon Sep 17 00:00:00 2001 From: luancheng Date: Mon, 16 Nov 2020 11:07:13 +0800 Subject: [PATCH 17/23] address comment --- cmd/debug.go | 5 ++--- pkg/pdutil/pd.go | 36 ++++++++++++++++++++---------------- 2 files changed, 22 insertions(+), 19 deletions(-) diff --git a/cmd/debug.go b/cmd/debug.go index 1a3f694d6..226a53bed 100644 --- a/cmd/debug.go +++ b/cmd/debug.go @@ -22,7 +22,6 @@ import ( "go.uber.org/zap" berrors "github.com/pingcap/br/pkg/errors" - "github.com/pingcap/br/pkg/pdutil" "github.com/pingcap/br/pkg/restore" "github.com/pingcap/br/pkg/rtree" "github.com/pingcap/br/pkg/task" @@ -367,10 +366,10 @@ func setPDConfigCommand() *cobra.Command { } defer mgr.Close() - if err := mgr.UpdatePDScheduleConfig(ctx, pdutil.DefaultPDCfg, pdutil.PDRequest); err != nil { + if err := mgr.UpdatePDScheduleConfig(ctx); err != nil { return errors.Annotate(err, "fail to update PD merge config") } - log.Info("add pd configs succeed", zap.Any("config", pdutil.DefaultPDCfg)) + log.Info("add pd configs succeed") return nil }, } diff --git a/pkg/pdutil/pd.go b/pkg/pdutil/pd.go index 9fc1b84d7..a0952284f 100644 --- a/pkg/pdutil/pd.go +++ b/pkg/pdutil/pd.go @@ -81,8 +81,8 @@ var ( "enable-location-replacement": false, } - // DefaultPDCfg find by https://github.com/tikv/pd/blob/master/conf/config.toml. - DefaultPDCfg = map[string]interface{}{ + // defaultPDCfg find by https://github.com/tikv/pd/blob/master/conf/config.toml. + defaultPDCfg = map[string]interface{}{ "max-merge-region-keys": 200000, "max-merge-region-size": 20, "leader-schedule-limit": 4, @@ -95,8 +95,8 @@ var ( // pdHTTPRequest defines the interface to send a request to pd and return the result in bytes. type pdHTTPRequest func(context.Context, string, string, *http.Client, string, io.Reader) ([]byte, error) -// PDRequest is a func to send a HTTP to pd and return the result bytes. -func PDRequest( +// pdRequest is a func to send a HTTP to pd and return the result bytes. +func pdRequest( ctx context.Context, addr string, prefix string, cli *http.Client, method string, body io.Reader) ([]byte, error) { @@ -164,7 +164,7 @@ func NewPdController( } } processedAddrs = append(processedAddrs, addr) - versionBytes, failure = PDRequest(ctx, addr, clusterVersionPrefix, cli, http.MethodGet, nil) + versionBytes, failure = pdRequest(ctx, addr, clusterVersionPrefix, cli, http.MethodGet, nil) if failure == nil { break } @@ -226,7 +226,7 @@ func (p *PdController) GetPDClient() pd.Client { // GetClusterVersion returns the current cluster version. func (p *PdController) GetClusterVersion(ctx context.Context) (string, error) { - return p.getClusterVersionWith(ctx, PDRequest) + return p.getClusterVersionWith(ctx, pdRequest) } func (p *PdController) getClusterVersionWith(ctx context.Context, get pdHTTPRequest) (string, error) { @@ -245,7 +245,7 @@ func (p *PdController) getClusterVersionWith(ctx context.Context, get pdHTTPRequ // GetRegionCount returns the region count in the specified range. func (p *PdController) GetRegionCount(ctx context.Context, startKey, endKey []byte) (int, error) { - return p.getRegionCountWith(ctx, PDRequest, startKey, endKey) + return p.getRegionCountWith(ctx, pdRequest, startKey, endKey) } func (p *PdController) getRegionCountWith( @@ -355,7 +355,7 @@ func (p *PdController) pauseSchedulersAndConfigWith( // ResumeSchedulers resume pd scheduler. func (p *PdController) ResumeSchedulers(ctx context.Context, schedulers []string) error { - return p.resumeSchedulerWith(ctx, schedulers, PDRequest) + return p.resumeSchedulerWith(ctx, schedulers, pdRequest) } func (p *PdController) resumeSchedulerWith(ctx context.Context, schedulers []string, post pdHTTPRequest) (err error) { @@ -388,7 +388,7 @@ func (p *PdController) resumeSchedulerWith(ctx context.Context, schedulers []str // ListSchedulers list all pd scheduler. func (p *PdController) ListSchedulers(ctx context.Context) ([]string, error) { - return p.listSchedulersWith(ctx, PDRequest) + return p.listSchedulersWith(ctx, pdRequest) } func (p *PdController) listSchedulersWith(ctx context.Context, get pdHTTPRequest) ([]string, error) { @@ -416,7 +416,7 @@ func (p *PdController) GetPDScheduleConfig( ) (map[string]interface{}, error) { var err error for _, addr := range p.addrs { - v, e := PDRequest( + v, e := pdRequest( ctx, addr, scheduleConfigPrefix, p.cli, http.MethodGet, nil) if e != nil { err = e @@ -433,7 +433,11 @@ func (p *PdController) GetPDScheduleConfig( } // UpdatePDScheduleConfig updates PD schedule config value associated with the key. -func (p *PdController) UpdatePDScheduleConfig( +func (p *PdController) UpdatePDScheduleConfig(ctx context.Context) error { + log.Info("update pd with default config", zap.Any("cfg", defaultPDCfg)) + return p.doUpdatePDScheduleConfig(ctx, defaultPDCfg, pdRequest) +} +func (p *PdController) doUpdatePDScheduleConfig( ctx context.Context, cfg map[string]interface{}, post pdHTTPRequest, prefixs ...string, ) error { prefix := scheduleConfigPrefix @@ -458,7 +462,7 @@ func (p *PdController) UpdatePDScheduleConfig( func (p *PdController) doPauseConfigs(ctx context.Context, cfg map[string]interface{}, post pdHTTPRequest) error { // pause this scheduler with 300 seconds prefix := fmt.Sprintf("%s?ttlSecond=%.0f", schedulerPrefix, pauseTimeout.Seconds()) - return p.UpdatePDScheduleConfig(ctx, cfg, post, prefix) + return p.doUpdatePDScheduleConfig(ctx, cfg, post, prefix) } func restoreSchedulers(ctx context.Context, pd *PdController, clusterCfg clusterConfig) error { @@ -475,7 +479,7 @@ func restoreSchedulers(ctx context.Context, pd *PdController, clusterCfg cluster } mergeCfg[cfgKey] = value } - if err := pd.UpdatePDScheduleConfig(ctx, mergeCfg, PDRequest); err != nil { + if err := pd.doUpdatePDScheduleConfig(ctx, mergeCfg, pdRequest); err != nil { return errors.Annotate(err, "fail to update PD merge config") } return nil @@ -532,15 +536,15 @@ func (p *PdController) RemoveSchedulers(ctx context.Context) (undo utils.UndoFun var removedSchedulers []string if p.isPauseConfigEnabled() { // after 4.0.8 we can set these config with TTL - removedSchedulers, err = p.pauseSchedulersAndConfigWith(ctx, needRemoveSchedulers, disablePDCfg, PDRequest) + removedSchedulers, err = p.pauseSchedulersAndConfigWith(ctx, needRemoveSchedulers, disablePDCfg, pdRequest) } else { // adapt to earlier version (before 4.0.8) of pd cluster // which doesn't have temporary config setting. - err = p.UpdatePDScheduleConfig(ctx, disablePDCfg, PDRequest) + err = p.doUpdatePDScheduleConfig(ctx, disablePDCfg, pdRequest) if err != nil { return } - removedSchedulers, err = p.pauseSchedulersAndConfigWith(ctx, needRemoveSchedulers, nil, PDRequest) + removedSchedulers, err = p.pauseSchedulersAndConfigWith(ctx, needRemoveSchedulers, nil, pdRequest) } undo = p.makeUndoFunctionByConfig(clusterConfig{scheduler: removedSchedulers, scheduleCfg: scheduleCfg}) return undo, err From a7f0367394f59954ccdaf6f81b2a3a6d6b71ea35 Mon Sep 17 00:00:00 2001 From: luancheng Date: Mon, 16 Nov 2020 14:03:42 +0800 Subject: [PATCH 18/23] change convoluted logic --- pkg/pdutil/pd.go | 39 ++++++++++++++++++++++++++++----------- 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/pkg/pdutil/pd.go b/pkg/pdutil/pd.go index a0952284f..b7d885bcb 100644 --- a/pkg/pdutil/pd.go +++ b/pkg/pdutil/pd.go @@ -38,6 +38,19 @@ const ( pauseTimeout = 5 * time.Minute ) +type pauseConfigExpectation uint8 + +const ( + // pauseConfigSetZero sets the config to 0. + pauseConfigSetZero pauseConfigExpectation = iota + // pauseConfigSetMulStoresCount multiplies the existing value by + // number of stores. The value is limited to 40, as larger value + // may make the cluster unstable. + pauseConfigMulStoresCount + // pauseConfigSetFalse sets the config to "false". + pauseConfigSetFalse +) + var ( // in v4.0.8 version we can use pause configs // see https://github.com/tikv/pd/pull/3088 @@ -68,17 +81,17 @@ var ( "shuffle-region-scheduler": {}, "shuffle-hot-region-scheduler": {}, } - expectPDCfg = map[string]interface{}{ - "max-merge-region-keys": 0, - "max-merge-region-size": 0, + expectPDCfg = map[string]pauseConfigExpectation{ + "max-merge-region-keys": pauseConfigSetZero, + "max-merge-region-size": pauseConfigSetZero, // TODO "leader-schedule-limit" and "region-schedule-limit" don't support ttl for now, // but we still need set these config for compatible with old version. // we need wait for https://github.com/tikv/pd/pull/3131 merged. // see details https://github.com/pingcap/br/pull/592#discussion_r522684325 - "leader-schedule-limit": 1, - "region-schedule-limit": 1, - "max-snapshot-count": 1, - "enable-location-replacement": false, + "leader-schedule-limit": pauseConfigMulStoresCount, + "region-schedule-limit": pauseConfigMulStoresCount, + "max-snapshot-count": pauseConfigMulStoresCount, + "enable-location-replacement": pauseConfigSetFalse, } // defaultPDCfg find by https://github.com/tikv/pd/blob/master/conf/config.toml. @@ -510,12 +523,16 @@ func (p *PdController) RemoveSchedulers(ctx context.Context) (undo utils.UndoFun // Ignore non-exist config. continue } - switch v := cfgVal.(type) { - case bool: + switch cfgVal { + case pauseConfigSetZero: + disablePDCfg[cfgKey] = 0 + case pauseConfigSetFalse: + // this has to be a string instead of a boolean otherwise + // pd will return unmarshal string failure. disablePDCfg[cfgKey] = "false" - case int: + case pauseConfigMulStoresCount: limit := int(value.(float64)) - disablePDCfg[cfgKey] = int(math.Min(40, float64(limit*len(stores)))) * v + disablePDCfg[cfgKey] = math.Min(40, float64(limit*len(stores))) } } undo = p.makeUndoFunctionByConfig(clusterConfig{scheduleCfg: scheduleCfg}) From a649e7f06d035fcd9feacd7555d557c853c3c2c9 Mon Sep 17 00:00:00 2001 From: luancheng Date: Mon, 16 Nov 2020 14:25:35 +0800 Subject: [PATCH 19/23] resume config by set ttlSecond to 0 --- pkg/pdutil/pd.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/pdutil/pd.go b/pkg/pdutil/pd.go index b7d885bcb..8d0b4dda2 100644 --- a/pkg/pdutil/pd.go +++ b/pkg/pdutil/pd.go @@ -492,7 +492,10 @@ func restoreSchedulers(ctx context.Context, pd *PdController, clusterCfg cluster } mergeCfg[cfgKey] = value } - if err := pd.doUpdatePDScheduleConfig(ctx, mergeCfg, pdRequest); err != nil { + + // set config's ttl to zero, make temporary config invalid immediately. + prefix := fmt.Sprintf("%s?ttlSecond=%d", schedulerPrefix, 0) + if err := pd.doUpdatePDScheduleConfig(ctx, mergeCfg, pdRequest, prefix); err != nil { return errors.Annotate(err, "fail to update PD merge config") } return nil From 2d126599aaf0cd53c074eecc70ccf4c667687759 Mon Sep 17 00:00:00 2001 From: luancheng Date: Mon, 16 Nov 2020 14:59:15 +0800 Subject: [PATCH 20/23] test pause config --- pkg/pdutil/pd.go | 12 ++++++++++-- tests/br_db/run.sh | 2 ++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/pkg/pdutil/pd.go b/pkg/pdutil/pd.go index 8d0b4dda2..88b00e1b3 100644 --- a/pkg/pdutil/pd.go +++ b/pkg/pdutil/pd.go @@ -16,6 +16,7 @@ import ( "strings" "time" + "github.com/pingcap/failpoint" "go.uber.org/zap" "google.golang.org/grpc" @@ -192,6 +193,12 @@ func NewPdController( zap.ByteString("version", versionBytes), zap.Error(err)) version = &semver.Version{Major: 0, Minor: 0, Patch: 0} } + failpoint.Inject("PDEnabledPauseConfig", func(val failpoint.Value) { + if val.(bool) { + // test pause config is enable + version = &semver.Version{Major: 5, Minor: 0, Patch: 0} + } + }) maxCallMsgSize := []grpc.DialOption{ grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)), grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(maxMsgSize)), @@ -356,7 +363,8 @@ func (p *PdController) pauseSchedulersAndConfigWith( log.Warn("pause configs failed, ignore it and wait next time pause", zap.Error(err)) } } - log.Info("pause scheduler(configs)", zap.Strings("name", removedSchedulers)) + log.Info("pause scheduler(configs)", zap.Strings("name", removedSchedulers), + zap.Any("cfg", schedulerCfg)) case <-p.schedulerPauseCh: log.Info("exit pause scheduler and configs successful") return @@ -474,7 +482,7 @@ func (p *PdController) doUpdatePDScheduleConfig( func (p *PdController) doPauseConfigs(ctx context.Context, cfg map[string]interface{}, post pdHTTPRequest) error { // pause this scheduler with 300 seconds - prefix := fmt.Sprintf("%s?ttlSecond=%.0f", schedulerPrefix, pauseTimeout.Seconds()) + prefix := fmt.Sprintf("%s?ttlSecond=%.0f", scheduleConfigPrefix, pauseTimeout.Seconds()) return p.doUpdatePDScheduleConfig(ctx, cfg, post, prefix) } diff --git a/tests/br_db/run.sh b/tests/br_db/run.sh index de515bd06..cf2312b36 100755 --- a/tests/br_db/run.sh +++ b/tests/br_db/run.sh @@ -43,7 +43,9 @@ run_sql "DROP DATABASE $DB;" # restore db echo "restore start..." +export GO_FAILPOINTS="github.com/pingcap/br/pkg/pdutil/pd/PDEnabledPauseConfig=return(true)" run_br restore db --db $DB -s "local://$TEST_DIR/$DB" --pd $PD_ADDR +export GO_FAILPOINTS="" table_count=$(run_sql "use $DB; show tables;" | grep "Tables_in" | wc -l) if [ "$table_count" -ne "2" ];then From f58654945fbe75680d1da8e0057a93a5482bb5fc Mon Sep 17 00:00:00 2001 From: luancheng Date: Mon, 16 Nov 2020 15:12:46 +0800 Subject: [PATCH 21/23] fix failpoint --- tests/br_db/run.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/br_db/run.sh b/tests/br_db/run.sh index cf2312b36..cd6cb9072 100755 --- a/tests/br_db/run.sh +++ b/tests/br_db/run.sh @@ -43,7 +43,7 @@ run_sql "DROP DATABASE $DB;" # restore db echo "restore start..." -export GO_FAILPOINTS="github.com/pingcap/br/pkg/pdutil/pd/PDEnabledPauseConfig=return(true)" +export GO_FAILPOINTS="github.com/pingcap/br/pkg/pdutil/PDEnabledPauseConfig=return(true)" run_br restore db --db $DB -s "local://$TEST_DIR/$DB" --pd $PD_ADDR export GO_FAILPOINTS="" From 7d5cd0b09ecfb0ff1eb76c1775fe3b81f0c60a37 Mon Sep 17 00:00:00 2001 From: luancheng Date: Mon, 16 Nov 2020 15:18:17 +0800 Subject: [PATCH 22/23] update test --- tests/br_db/run.sh | 2 -- tests/br_full_ddl/run.sh | 13 +++++++++++-- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/tests/br_db/run.sh b/tests/br_db/run.sh index cd6cb9072..de515bd06 100755 --- a/tests/br_db/run.sh +++ b/tests/br_db/run.sh @@ -43,9 +43,7 @@ run_sql "DROP DATABASE $DB;" # restore db echo "restore start..." -export GO_FAILPOINTS="github.com/pingcap/br/pkg/pdutil/PDEnabledPauseConfig=return(true)" run_br restore db --db $DB -s "local://$TEST_DIR/$DB" --pd $PD_ADDR -export GO_FAILPOINTS="" table_count=$(run_sql "use $DB; show tables;" | grep "Tables_in" | wc -l) if [ "$table_count" -ne "2" ];then diff --git a/tests/br_full_ddl/run.sh b/tests/br_full_ddl/run.sh index 1225b35ea..6983d4619 100755 --- a/tests/br_full_ddl/run.sh +++ b/tests/br_full_ddl/run.sh @@ -39,7 +39,6 @@ echo "backup start..." # Do not log to terminal unset BR_LOG_TO_TERM run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB" --ratelimit 5 --concurrency 4 --log-file $LOG || cat $LOG -BR_LOG_TO_TERM=1 checksum_count=$(cat $LOG | grep "checksum success" | wc -l | xargs) @@ -53,7 +52,17 @@ run_sql "DROP DATABASE $DB;" # restore full echo "restore start..." -run_br restore full -s "local://$TEST_DIR/$DB" --pd $PD_ADDR +export GO_FAILPOINTS="github.com/pingcap/br/pkg/pdutil/PDEnabledPauseConfig=return(true)" +run_br restore full -s "local://$TEST_DIR/$DB" --pd $PD_ADDR --log-file $LOG +export GO_FAILPOINTS="" + +pause_count=$(cat $LOG | grep "pause configs successful"| wc -l | xargs) +if [ "${pause_count}" != "1" ];then + echo "TEST: [$TEST_NAME] fail on pause config" + exit 1 +fi + +BR_LOG_TO_TERM=1 row_count_new=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}') From 9d38da616b1c521d573f5c12e9be4d6a4733018a Mon Sep 17 00:00:00 2001 From: luancheng Date: Mon, 16 Nov 2020 16:21:00 +0800 Subject: [PATCH 23/23] fix test --- pkg/pdutil/pd.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/pkg/pdutil/pd.go b/pkg/pdutil/pd.go index 88b00e1b3..3ffa6fe4c 100644 --- a/pkg/pdutil/pd.go +++ b/pkg/pdutil/pd.go @@ -501,9 +501,13 @@ func restoreSchedulers(ctx context.Context, pd *PdController, clusterCfg cluster mergeCfg[cfgKey] = value } - // set config's ttl to zero, make temporary config invalid immediately. - prefix := fmt.Sprintf("%s?ttlSecond=%d", schedulerPrefix, 0) - if err := pd.doUpdatePDScheduleConfig(ctx, mergeCfg, pdRequest, prefix); err != nil { + prefix := make([]string, 0, 1) + if pd.isPauseConfigEnabled() { + // set config's ttl to zero, make temporary config invalid immediately. + prefix = append(prefix, fmt.Sprintf("%s?ttlSecond=%d", schedulerPrefix, 0)) + } + // reset config with previous value. + if err := pd.doUpdatePDScheduleConfig(ctx, mergeCfg, pdRequest, prefix...); err != nil { return errors.Annotate(err, "fail to update PD merge config") } return nil