diff --git a/cmd/debug.go b/cmd/debug.go index 65ad9ea6d..226a53bed 100644 --- a/cmd/debug.go +++ b/cmd/debug.go @@ -22,7 +22,6 @@ import ( "go.uber.org/zap" berrors "github.com/pingcap/br/pkg/errors" - "github.com/pingcap/br/pkg/pdutil" "github.com/pingcap/br/pkg/restore" "github.com/pingcap/br/pkg/rtree" "github.com/pingcap/br/pkg/task" @@ -367,10 +366,10 @@ func setPDConfigCommand() *cobra.Command { } defer mgr.Close() - if err := mgr.UpdatePDScheduleConfig(ctx, pdutil.DefaultPDCfg); err != nil { + if err := mgr.UpdatePDScheduleConfig(ctx); err != nil { return errors.Annotate(err, "fail to update PD merge config") } - log.Info("add pd configs succeed", zap.Any("config", pdutil.DefaultPDCfg)) + log.Info("add pd configs succeed") return nil }, } diff --git a/pkg/pdutil/pd.go b/pkg/pdutil/pd.go index 6a9d133e6..3ffa6fe4c 100644 --- a/pkg/pdutil/pd.go +++ b/pkg/pdutil/pd.go @@ -16,9 +16,11 @@ import ( "strings" "time" + "github.com/pingcap/failpoint" "go.uber.org/zap" "google.golang.org/grpc" + "github.com/coreos/go-semver/semver" "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb/util/codec" @@ -37,6 +39,25 @@ const ( pauseTimeout = 5 * time.Minute ) +type pauseConfigExpectation uint8 + +const ( + // pauseConfigSetZero sets the config to 0. + pauseConfigSetZero pauseConfigExpectation = iota + // pauseConfigSetMulStoresCount multiplies the existing value by + // number of stores. The value is limited to 40, as larger value + // may make the cluster unstable. + pauseConfigMulStoresCount + // pauseConfigSetFalse sets the config to "false". + pauseConfigSetFalse +) + +var ( + // in v4.0.8 version we can use pause configs + // see https://github.com/tikv/pd/pull/3088 + pauseConfigVersion = semver.Version{Major: 4, Minor: 0, Patch: 8} +) + // clusterConfig represents a set of scheduler whose config have been modified // along with their original config. type clusterConfig struct { @@ -61,19 +82,21 @@ var ( "shuffle-region-scheduler": {}, "shuffle-hot-region-scheduler": {}, } - // TODO remove this, see https://github.com/pingcap/br/pull/555#discussion_r509855972 - pdRegionMergeCfg = []string{ - "max-merge-region-keys", - "max-merge-region-size", - } - pdScheduleLimitCfg = []string{ - "leader-schedule-limit", - "region-schedule-limit", - "max-snapshot-count", - } - - // DefaultPDCfg find by https://github.com/tikv/pd/blob/master/conf/config.toml. - DefaultPDCfg = map[string]interface{}{ + expectPDCfg = map[string]pauseConfigExpectation{ + "max-merge-region-keys": pauseConfigSetZero, + "max-merge-region-size": pauseConfigSetZero, + // TODO "leader-schedule-limit" and "region-schedule-limit" don't support ttl for now, + // but we still need set these config for compatible with old version. + // we need wait for https://github.com/tikv/pd/pull/3131 merged. + // see details https://github.com/pingcap/br/pull/592#discussion_r522684325 + "leader-schedule-limit": pauseConfigMulStoresCount, + "region-schedule-limit": pauseConfigMulStoresCount, + "max-snapshot-count": pauseConfigMulStoresCount, + "enable-location-replacement": pauseConfigSetFalse, + } + + // defaultPDCfg find by https://github.com/tikv/pd/blob/master/conf/config.toml. + defaultPDCfg = map[string]interface{}{ "max-merge-region-keys": 200000, "max-merge-region-size": 20, "leader-schedule-limit": 4, @@ -122,6 +145,7 @@ type PdController struct { addrs []string cli *http.Client pdClient pd.Client + version *semver.Version // control the pause schedulers goroutine schedulerPauseCh chan struct{} @@ -144,6 +168,7 @@ func NewPdController( addrs := strings.Split(pdAddrs, ",") processedAddrs := make([]string, 0, len(addrs)) var failure error + var versionBytes []byte for _, addr := range addrs { if addr != "" && !strings.HasPrefix("http", addr) { if tlsConf != nil { @@ -153,15 +178,27 @@ func NewPdController( } } processedAddrs = append(processedAddrs, addr) - _, failure = pdRequest(ctx, addr, clusterVersionPrefix, cli, http.MethodGet, nil) + versionBytes, failure = pdRequest(ctx, addr, clusterVersionPrefix, cli, http.MethodGet, nil) if failure == nil { break } } if failure != nil { - return nil, errors.Annotatef(failure, "pd address (%s) not available, please check network", pdAddrs) + return nil, errors.Annotatef(berrors.ErrPDUpdateFailed, "pd address (%s) not available, please check network", pdAddrs) } + version, err := semver.NewVersion(string(versionBytes)) + if err != nil { + log.Warn("fail back to v0.0.0 version", + zap.ByteString("version", versionBytes), zap.Error(err)) + version = &semver.Version{Major: 0, Minor: 0, Patch: 0} + } + failpoint.Inject("PDEnabledPauseConfig", func(val failpoint.Value) { + if val.(bool) { + // test pause config is enable + version = &semver.Version{Major: 5, Minor: 0, Patch: 0} + } + }) maxCallMsgSize := []grpc.DialOption{ grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)), grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(maxMsgSize)), @@ -180,12 +217,17 @@ func NewPdController( addrs: processedAddrs, cli: cli, pdClient: pdClient, + version: version, // We should make a buffered channel here otherwise when context canceled, // gracefully shutdown will stick at resuming schedulers. schedulerPauseCh: make(chan struct{}, 1), }, nil } +func (p *PdController) isPauseConfigEnabled() bool { + return p.version.Compare(pauseConfigVersion) >= 0 +} + // SetHTTP set pd addrs and cli for test. func (p *PdController) SetHTTP(addrs []string, cli *http.Client) { p.addrs = addrs @@ -255,21 +297,14 @@ func (p *PdController) getRegionCountWith( return 0, err } -// PauseSchedulers remove pd scheduler temporarily. -func (p *PdController) PauseSchedulers(ctx context.Context, schedulers []string) ([]string, error) { - return p.pauseSchedulersWith(ctx, schedulers, pdRequest) -} - -func (p *PdController) pauseSchedulersWith(ctx context.Context, schedulers []string, post pdHTTPRequest) ([]string, error) { - removedSchedulers := make([]string, 0, len(schedulers)) +func (p *PdController) doPauseSchedulers(ctx context.Context, schedulers []string, post pdHTTPRequest) ([]string, error) { // pause this scheduler with 300 seconds body, err := json.Marshal(pauseSchedulerBody{Delay: int64(pauseTimeout)}) if err != nil { return nil, err } - - // first pause this scheduler, if the first time failed. we should return the error - // so put first time out of for loop. and in for loop we could ignore other failed pause. + // PauseSchedulers remove pd scheduler temporarily. + removedSchedulers := make([]string, 0, len(schedulers)) for _, scheduler := range schedulers { prefix := fmt.Sprintf("%s/%s", schedulerPrefix, scheduler) for _, addr := range p.addrs { @@ -280,12 +315,34 @@ func (p *PdController) pauseSchedulersWith(ctx context.Context, schedulers []str } } if err != nil { - log.Error("failed to pause scheduler at beginning", - zap.Strings("name", schedulers), zap.Error(err)) - return nil, err + return removedSchedulers, err } } + return removedSchedulers, nil +} + +func (p *PdController) pauseSchedulersAndConfigWith( + ctx context.Context, schedulers []string, + schedulerCfg map[string]interface{}, post pdHTTPRequest, +) ([]string, error) { + // first pause this scheduler, if the first time failed. we should return the error + // so put first time out of for loop. and in for loop we could ignore other failed pause. + removedSchedulers, err := p.doPauseSchedulers(ctx, schedulers, post) + if err != nil { + log.Error("failed to pause scheduler at beginning", + zap.Strings("name", schedulers), zap.Error(err)) + return nil, err + } log.Info("pause scheduler successful at beginning", zap.Strings("name", schedulers)) + if schedulerCfg != nil { + err = p.doPauseConfigs(ctx, schedulerCfg, post) + if err != nil { + log.Error("failed to pause config at beginning", + zap.Any("cfg", schedulerCfg), zap.Error(err)) + return nil, err + } + log.Info("pause configs successful at beginning", zap.Any("cfg", schedulerCfg)) + } go func() { tick := time.NewTicker(pauseTimeout / 3) @@ -296,22 +353,20 @@ func (p *PdController) pauseSchedulersWith(ctx context.Context, schedulers []str case <-ctx.Done(): return case <-tick.C: - for _, scheduler := range schedulers { - prefix := fmt.Sprintf("%s/%s", schedulerPrefix, scheduler) - for _, addr := range p.addrs { - _, err = post(ctx, addr, prefix, p.cli, http.MethodPost, bytes.NewBuffer(body)) - if err == nil { - break - } - } - if err == nil { - log.Info("pause scheduler", zap.String("name", scheduler)) - } else { - log.Warn("pause scheduler failed, ignore it and wait next time pause", zap.Error(err)) + _, err := p.doPauseSchedulers(ctx, schedulers, post) + if err != nil { + log.Warn("pause scheduler failed, ignore it and wait next time pause", zap.Error(err)) + } + if schedulerCfg != nil { + err = p.doPauseConfigs(ctx, schedulerCfg, post) + if err != nil { + log.Warn("pause configs failed, ignore it and wait next time pause", zap.Error(err)) } } + log.Info("pause scheduler(configs)", zap.Strings("name", removedSchedulers), + zap.Any("cfg", schedulerCfg)) case <-p.schedulerPauseCh: - log.Info("exit pause scheduler successful") + log.Info("exit pause scheduler and configs successful") return } } @@ -399,15 +454,23 @@ func (p *PdController) GetPDScheduleConfig( } // UpdatePDScheduleConfig updates PD schedule config value associated with the key. -func (p *PdController) UpdatePDScheduleConfig( - ctx context.Context, cfg map[string]interface{}, +func (p *PdController) UpdatePDScheduleConfig(ctx context.Context) error { + log.Info("update pd with default config", zap.Any("cfg", defaultPDCfg)) + return p.doUpdatePDScheduleConfig(ctx, defaultPDCfg, pdRequest) +} +func (p *PdController) doUpdatePDScheduleConfig( + ctx context.Context, cfg map[string]interface{}, post pdHTTPRequest, prefixs ...string, ) error { + prefix := scheduleConfigPrefix + if len(prefixs) != 0 { + prefix = prefixs[0] + } for _, addr := range p.addrs { reqData, err := json.Marshal(cfg) if err != nil { return err } - _, e := pdRequest(ctx, addr, scheduleConfigPrefix, + _, e := post(ctx, addr, prefix, p.cli, http.MethodPost, bytes.NewBuffer(reqData)) if e == nil { return nil @@ -417,13 +480,19 @@ func (p *PdController) UpdatePDScheduleConfig( return errors.Annotate(berrors.ErrPDUpdateFailed, "failed to update PD schedule config") } +func (p *PdController) doPauseConfigs(ctx context.Context, cfg map[string]interface{}, post pdHTTPRequest) error { + // pause this scheduler with 300 seconds + prefix := fmt.Sprintf("%s?ttlSecond=%.0f", scheduleConfigPrefix, pauseTimeout.Seconds()) + return p.doUpdatePDScheduleConfig(ctx, cfg, post, prefix) +} + func restoreSchedulers(ctx context.Context, pd *PdController, clusterCfg clusterConfig) error { if err := pd.ResumeSchedulers(ctx, clusterCfg.scheduler); err != nil { return errors.Annotate(err, "fail to add PD schedulers") } log.Info("restoring config", zap.Any("config", clusterCfg.scheduleCfg)) mergeCfg := make(map[string]interface{}) - for _, cfgKey := range pdRegionMergeCfg { + for cfgKey := range expectPDCfg { value := clusterCfg.scheduleCfg[cfgKey] if value == nil { // Ignore non-exist config. @@ -431,27 +500,15 @@ func restoreSchedulers(ctx context.Context, pd *PdController, clusterCfg cluster } mergeCfg[cfgKey] = value } - if err := pd.UpdatePDScheduleConfig(ctx, mergeCfg); err != nil { - return errors.Annotate(err, "fail to update PD merge config") - } - scheduleLimitCfg := make(map[string]interface{}) - for _, cfgKey := range pdScheduleLimitCfg { - value := clusterCfg.scheduleCfg[cfgKey] - if value == nil { - // Ignore non-exist config. - continue - } - scheduleLimitCfg[cfgKey] = value - } - if err := pd.UpdatePDScheduleConfig(ctx, scheduleLimitCfg); err != nil { - return errors.Annotate(err, "fail to update PD schedule config") + prefix := make([]string, 0, 1) + if pd.isPauseConfigEnabled() { + // set config's ttl to zero, make temporary config invalid immediately. + prefix = append(prefix, fmt.Sprintf("%s?ttlSecond=%d", schedulerPrefix, 0)) } - if locationPlacement, ok := clusterCfg.scheduleCfg["enable-location-replacement"]; ok { - log.Debug("restoring config enable-location-replacement", zap.Any("enable-location-placement", locationPlacement)) - if err := pd.UpdatePDScheduleConfig(ctx, map[string]interface{}{"enable-location-replacement": locationPlacement}); err != nil { - return err - } + // reset config with previous value. + if err := pd.doUpdatePDScheduleConfig(ctx, mergeCfg, pdRequest, prefix...); err != nil { + return errors.Annotate(err, "fail to update PD merge config") } return nil } @@ -466,24 +523,6 @@ func (p *PdController) makeUndoFunctionByConfig(config clusterConfig) utils.Undo // RemoveSchedulers removes the schedulers that may slow down BR speed. func (p *PdController) RemoveSchedulers(ctx context.Context) (undo utils.UndoFunc, err error) { undo = utils.Nop - - // Remove default PD scheduler that may affect restore process. - existSchedulers, err := p.ListSchedulers(ctx) - if err != nil { - return - } - needRemoveSchedulers := make([]string, 0, len(existSchedulers)) - for _, s := range existSchedulers { - if _, ok := Schedulers[s]; ok { - needRemoveSchedulers = append(needRemoveSchedulers, s) - } - } - removedSchedulers, err := p.PauseSchedulers(ctx, needRemoveSchedulers) - if err != nil { - return - } - undo = p.makeUndoFunctionByConfig(clusterConfig{scheduler: removedSchedulers}) - stores, err := p.pdClient.GetAllStores(ctx) if err != nil { return @@ -492,43 +531,55 @@ func (p *PdController) RemoveSchedulers(ctx context.Context) (undo utils.UndoFun if err != nil { return } - - undo = p.makeUndoFunctionByConfig(clusterConfig{scheduler: removedSchedulers, scheduleCfg: scheduleCfg}) - log.Debug("saved PD config", zap.Any("config", scheduleCfg)) - - disableMergeCfg := make(map[string]interface{}) - for _, cfgKey := range pdRegionMergeCfg { - value := scheduleCfg[cfgKey] - if value == nil { + disablePDCfg := make(map[string]interface{}) + for cfgKey, cfgVal := range expectPDCfg { + value, ok := scheduleCfg[cfgKey] + if !ok { // Ignore non-exist config. continue } - // Disable region merge by setting config to 0. - disableMergeCfg[cfgKey] = 0 + switch cfgVal { + case pauseConfigSetZero: + disablePDCfg[cfgKey] = 0 + case pauseConfigSetFalse: + // this has to be a string instead of a boolean otherwise + // pd will return unmarshal string failure. + disablePDCfg[cfgKey] = "false" + case pauseConfigMulStoresCount: + limit := int(value.(float64)) + disablePDCfg[cfgKey] = math.Min(40, float64(limit*len(stores))) + } } - err = p.UpdatePDScheduleConfig(ctx, disableMergeCfg) + undo = p.makeUndoFunctionByConfig(clusterConfig{scheduleCfg: scheduleCfg}) + log.Debug("saved PD config", zap.Any("config", scheduleCfg)) + + // Remove default PD scheduler that may affect restore process. + existSchedulers, err := p.ListSchedulers(ctx) if err != nil { return } - - scheduleLimitCfg := make(map[string]interface{}) - for _, cfgKey := range pdScheduleLimitCfg { - value := scheduleCfg[cfgKey] - if value == nil { - // Ignore non-exist config. - continue + needRemoveSchedulers := make([]string, 0, len(existSchedulers)) + for _, s := range existSchedulers { + if _, ok := Schedulers[s]; ok { + needRemoveSchedulers = append(needRemoveSchedulers, s) } - - // Speed update PD scheduler by enlarging scheduling limits. - // Multiply limits by store count but no more than 40. - // Larger limit may make cluster unstable. - limit := int(value.(float64)) - scheduleLimitCfg[cfgKey] = math.Min(40, float64(limit*len(stores))) } - if err := p.UpdatePDScheduleConfig(ctx, scheduleLimitCfg); err != nil { - return undo, err + + var removedSchedulers []string + if p.isPauseConfigEnabled() { + // after 4.0.8 we can set these config with TTL + removedSchedulers, err = p.pauseSchedulersAndConfigWith(ctx, needRemoveSchedulers, disablePDCfg, pdRequest) + } else { + // adapt to earlier version (before 4.0.8) of pd cluster + // which doesn't have temporary config setting. + err = p.doUpdatePDScheduleConfig(ctx, disablePDCfg, pdRequest) + if err != nil { + return + } + removedSchedulers, err = p.pauseSchedulersAndConfigWith(ctx, needRemoveSchedulers, nil, pdRequest) } - return undo, p.UpdatePDScheduleConfig(ctx, map[string]interface{}{"enable-location-replacement": "false"}) + undo = p.makeUndoFunctionByConfig(clusterConfig{scheduler: removedSchedulers, scheduleCfg: scheduleCfg}) + return undo, err } // Close close the connection to pd. diff --git a/pkg/pdutil/pd_test.go b/pkg/pdutil/pd_test.go index bbfb59e9c..b3cfbc6cb 100644 --- a/pkg/pdutil/pd_test.go +++ b/pkg/pdutil/pd_test.go @@ -38,7 +38,8 @@ func (s *testPDControllerSuite) TestScheduler(c *C) { } schedulerPauseCh := make(chan struct{}) pdController := &PdController{addrs: []string{"", ""}, schedulerPauseCh: schedulerPauseCh} - _, err := pdController.pauseSchedulersWith(ctx, []string{scheduler}, mock) + + _, err := pdController.pauseSchedulersAndConfigWith(ctx, []string{scheduler}, nil, mock) c.Assert(err, ErrorMatches, "failed") go func() { @@ -47,13 +48,25 @@ func (s *testPDControllerSuite) TestScheduler(c *C) { err = pdController.resumeSchedulerWith(ctx, []string{scheduler}, mock) c.Assert(err, IsNil) + cfg := map[string]interface{}{ + "max-merge-region-keys": 0, + "max-snapshot": 1, + "enable-location-replacement": false, + } + _, err = pdController.pauseSchedulersAndConfigWith(ctx, []string{}, cfg, mock) + c.Assert(err, ErrorMatches, "failed to update PD.*") + go func() { + <-schedulerPauseCh + }() + _, err = pdController.listSchedulersWith(ctx, mock) c.Assert(err, ErrorMatches, "failed") mock = func(context.Context, string, string, *http.Client, string, io.Reader) ([]byte, error) { return []byte(`["` + scheduler + `"]`), nil } - _, err = pdController.pauseSchedulersWith(ctx, []string{scheduler}, mock) + + _, err = pdController.pauseSchedulersAndConfigWith(ctx, []string{scheduler}, cfg, mock) c.Assert(err, IsNil) go func() { diff --git a/tests/br_full_ddl/run.sh b/tests/br_full_ddl/run.sh index 1225b35ea..6983d4619 100755 --- a/tests/br_full_ddl/run.sh +++ b/tests/br_full_ddl/run.sh @@ -39,7 +39,6 @@ echo "backup start..." # Do not log to terminal unset BR_LOG_TO_TERM run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB" --ratelimit 5 --concurrency 4 --log-file $LOG || cat $LOG -BR_LOG_TO_TERM=1 checksum_count=$(cat $LOG | grep "checksum success" | wc -l | xargs) @@ -53,7 +52,17 @@ run_sql "DROP DATABASE $DB;" # restore full echo "restore start..." -run_br restore full -s "local://$TEST_DIR/$DB" --pd $PD_ADDR +export GO_FAILPOINTS="github.com/pingcap/br/pkg/pdutil/PDEnabledPauseConfig=return(true)" +run_br restore full -s "local://$TEST_DIR/$DB" --pd $PD_ADDR --log-file $LOG +export GO_FAILPOINTS="" + +pause_count=$(cat $LOG | grep "pause configs successful"| wc -l | xargs) +if [ "${pause_count}" != "1" ];then + echo "TEST: [$TEST_NAME] fail on pause config" + exit 1 +fi + +BR_LOG_TO_TERM=1 row_count_new=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}')