Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

scheduler: use pause instead of remove schedulers #551

Merged
merged 12 commits into from
Oct 15, 2020
14 changes: 1 addition & 13 deletions cmd/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"encoding/json"
"path"
"reflect"
"strings"

"github.com/gogo/protobuf/proto"
"github.com/pingcap/errors"
Expand Down Expand Up @@ -347,7 +346,7 @@ func encodeBackupMetaCommand() *cobra.Command {
func setPDConfigCommand() *cobra.Command {
pdConfigCmd := &cobra.Command{
Use: "reset-pd-config-as-default",
Short: "reset pd scheduler and config adjusted by BR to default value",
Short: "reset pd config adjusted by BR to default value",
RunE: func(cmd *cobra.Command, args []string) error {
ctx, cancel := context.WithCancel(GetDefaultContext())
defer cancel()
Expand All @@ -363,17 +362,6 @@ func setPDConfigCommand() *cobra.Command {
}
defer mgr.Close()

for scheduler := range pdutil.Schedulers {
if strings.HasPrefix(scheduler, "balance") {
err := mgr.AddScheduler(ctx, scheduler)
if err != nil {
return err
}
log.Info("add pd schedulers succeed",
zap.String("schedulers", scheduler))
}
}

if err := mgr.UpdatePDScheduleConfig(ctx, pdutil.DefaultPDCfg); err != nil {
return errors.Annotate(err, "fail to update PD merge config")
}
Expand Down
105 changes: 83 additions & 22 deletions pkg/pdutil/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
schedulerPrefix = "pd/api/v1/schedulers"
maxMsgSize = int(128 * utils.MB) // pd.ScanRegion may return a large response
scheduleConfigPrefix = "pd/api/v1/config/schedule"
pauseTimeout = 5 * time.Minute
)

// clusterConfig represents a set of scheduler whose config have been modified
Expand All @@ -45,6 +46,10 @@ type clusterConfig struct {
scheduleCfg map[string]interface{}
}

type pauseSchedulerBody struct {
Delay int64 `json:"delay"`
}

var (
// Schedulers represent region/leader schedulers which can impact on performance.
Schedulers = map[string]struct{}{
Expand Down Expand Up @@ -115,6 +120,9 @@ type PdController struct {
addrs []string
cli *http.Client
pdClient pd.Client

// control the pause schedulers goroutine
schedulerPauseCh map[string]chan struct{}
}

// NewPdController creates a new PdController.
Expand Down Expand Up @@ -167,9 +175,10 @@ func NewPdController(
}

return &PdController{
addrs: processedAddrs,
cli: cli,
pdClient: pdClient,
addrs: processedAddrs,
cli: cli,
pdClient: pdClient,
schedulerPauseCh: make(map[string]chan struct{}),
}, nil
}

Expand Down Expand Up @@ -242,32 +251,81 @@ func (p *PdController) getRegionCountWith(
return 0, err
}

// RemoveScheduler remove pd scheduler.
func (p *PdController) RemoveScheduler(ctx context.Context, scheduler string) error {
return p.removeSchedulerWith(ctx, scheduler, pdRequest)
// PauseScheduler remove pd scheduler temporarily.
func (p *PdController) PauseScheduler(ctx context.Context, scheduler string) error {
kennytm marked this conversation as resolved.
Show resolved Hide resolved
return p.pauseSchedulerWith(ctx, scheduler, pdRequest)
}

func (p *PdController) removeSchedulerWith(ctx context.Context, scheduler string, delete pdHTTPRequest) (err error) {
func (p *PdController) pauseSchedulerWith(ctx context.Context, scheduler string, post pdHTTPRequest) (err error) {
// pause this scheduler with 300 seconds
body, err := json.Marshal(pauseSchedulerBody{Delay: int64(pauseTimeout)})
if err != nil {
return err
}

// first pause this scheduler, if the first time failed. we should return the error
// so put first time out of for loop. and in for loop we could ignore other failed pause.
for _, addr := range p.addrs {
prefix := fmt.Sprintf("%s/%s", schedulerPrefix, scheduler)
_, err = delete(ctx, addr, prefix, p.cli, http.MethodDelete, nil)
_, err = post(ctx, addr, prefix, p.cli, http.MethodPost, bytes.NewBuffer(body))
if err != nil {
continue
}
return nil
}
return err
if err != nil {
return err
}
log.Info("pause scheduler at beginning", zap.String("name", scheduler))
p.schedulerPauseCh[scheduler] = make(chan struct{})

go func() {
tick := time.NewTicker(pauseTimeout / 3)
defer tick.Stop()

for {
select {
case <-ctx.Done():
return
case <-tick.C:
for _, addr := range p.addrs {
prefix := fmt.Sprintf("%s/%s", schedulerPrefix, scheduler)
_, err = post(ctx, addr, prefix, p.cli, http.MethodPost, bytes.NewBuffer(body))
if err != nil {
continue
}
}
if err == nil {
log.Info("pause scheduler", zap.String("name", scheduler))
} else {
log.Warn("pause scheduler failed, ignore it and wait next time pause", zap.Error(err))
}
case <-p.schedulerPauseCh[scheduler]:
log.Info("exit pause scheduler successful", zap.String("name", scheduler))
return
}
}
}()
return nil
}

// AddScheduler add pd scheduler.
func (p *PdController) AddScheduler(ctx context.Context, scheduler string) error {
return p.addSchedulerWith(ctx, scheduler, pdRequest)
// StopPauseScheduler add pd scheduler.
kennytm marked this conversation as resolved.
Show resolved Hide resolved
func (p *PdController) StopPauseScheduler(ctx context.Context, scheduler string) error {
return p.stopPauseSchedulerWith(ctx, scheduler, pdRequest)
}

func (p *PdController) addSchedulerWith(ctx context.Context, scheduler string, post pdHTTPRequest) (err error) {
func (p *PdController) stopPauseSchedulerWith(ctx context.Context, scheduler string, post pdHTTPRequest) (err error) {
log.Info("stop pause scheduler", zap.String("scheduler", scheduler))
p.schedulerPauseCh[scheduler] <- struct{}{}

// 0 means stop pause.
body, err := json.Marshal(pauseSchedulerBody{Delay: 0})
if err != nil {
return err
}
prefix := fmt.Sprintf("%s/%s", schedulerPrefix, scheduler)

for _, addr := range p.addrs {
body := bytes.NewBuffer([]byte(`{"name":"` + scheduler + `"}`))
_, err = post(ctx, addr, schedulerPrefix, p.cli, http.MethodPost, body)
_, err = post(ctx, addr, prefix, p.cli, http.MethodPost, bytes.NewBuffer(body))
if err != nil {
continue
}
Expand Down Expand Up @@ -342,7 +400,7 @@ func (p *PdController) UpdatePDScheduleConfig(

func addPDLeaderScheduler(ctx context.Context, pd *PdController, removedSchedulers []string) error {
for _, scheduler := range removedSchedulers {
err := pd.AddScheduler(ctx, scheduler)
err := pd.StopPauseScheduler(ctx, scheduler)
if err != nil {
return err
}
Expand Down Expand Up @@ -404,12 +462,11 @@ func (p *PdController) RemoveSchedulers(ctx context.Context) (undo utils.UndoFun
needRemoveSchedulers = append(needRemoveSchedulers, s)
}
}
scheduler, err := removePDLeaderScheduler(ctx, p, needRemoveSchedulers)
removedSchedulers, err := removePDLeaderScheduler(ctx, p, needRemoveSchedulers)
if err != nil {
return
}

undo = p.makeUndoFunctionByConfig(clusterConfig{scheduler: scheduler})
undo = p.makeUndoFunctionByConfig(clusterConfig{scheduler: removedSchedulers})

stores, err := p.pdClient.GetAllStores(ctx)
if err != nil {
Expand All @@ -420,7 +477,7 @@ func (p *PdController) RemoveSchedulers(ctx context.Context) (undo utils.UndoFun
return
}

undo = p.makeUndoFunctionByConfig(clusterConfig{scheduler: scheduler, scheduleCfg: scheduleCfg})
undo = p.makeUndoFunctionByConfig(clusterConfig{scheduler: removedSchedulers, scheduleCfg: scheduleCfg})

disableMergeCfg := make(map[string]interface{})
for _, cfgKey := range pdRegionMergeCfg {
Expand Down Expand Up @@ -457,12 +514,16 @@ func (p *PdController) RemoveSchedulers(ctx context.Context) (undo utils.UndoFun
// Close close the connection to pd.
func (p *PdController) Close() {
p.pdClient.Close()

for _, ch := range p.schedulerPauseCh {
close(ch)
}
}

func removePDLeaderScheduler(ctx context.Context, pd *PdController, existSchedulers []string) ([]string, error) {
removedSchedulers := make([]string, 0, len(existSchedulers))
for _, scheduler := range existSchedulers {
err := pd.RemoveScheduler(ctx, scheduler)
err := pd.PauseScheduler(ctx, scheduler)
if err != nil {
return nil, err
}
Expand Down
18 changes: 13 additions & 5 deletions pkg/pdutil/pd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,16 @@ func (s *testPDControllerSuite) TestScheduler(c *C) {
mock := func(context.Context, string, string, *http.Client, string, io.Reader) ([]byte, error) {
return nil, errors.New("failed")
}
pdController := &PdController{addrs: []string{"", ""}}
err := pdController.removeSchedulerWith(ctx, scheduler, mock)
schedulerPauseCh := make(map[string]chan struct{})
pdController := &PdController{addrs: []string{"", ""}, schedulerPauseCh: schedulerPauseCh}
schedulerPauseCh[scheduler] = make(chan struct{})
err := pdController.pauseSchedulerWith(ctx, scheduler, mock)
c.Assert(err, ErrorMatches, "failed")

err = pdController.addSchedulerWith(ctx, scheduler, mock)
go func() {
<-schedulerPauseCh[scheduler]
}()
err = pdController.stopPauseSchedulerWith(ctx, scheduler, mock)
c.Assert(err, ErrorMatches, "failed")

_, err = pdController.listSchedulersWith(ctx, mock)
Expand All @@ -49,10 +54,13 @@ func (s *testPDControllerSuite) TestScheduler(c *C) {
mock = func(context.Context, string, string, *http.Client, string, io.Reader) ([]byte, error) {
return []byte(`["` + scheduler + `"]`), nil
}
err = pdController.removeSchedulerWith(ctx, scheduler, mock)
err = pdController.pauseSchedulerWith(ctx, scheduler, mock)
c.Assert(err, IsNil)

err = pdController.addSchedulerWith(ctx, scheduler, mock)
go func() {
<-schedulerPauseCh[scheduler]
}()
err = pdController.stopPauseSchedulerWith(ctx, scheduler, mock)
c.Assert(err, IsNil)

schedulers, err := pdController.listSchedulersWith(ctx, mock)
Expand Down
36 changes: 26 additions & 10 deletions tests/br_other/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ sleep 1
curl "http://localhost:$PPROF_PORT/debug/pprof/trace?seconds=1" 2>&1 > /dev/null
echo "pprof started..."

curl http://$PD_ADDR/pd/api/v1/config/schedule | grep '"disable": true'
curl http://$PD_ADDR/pd/api/v1/config/schedule | grep '"disable": false'

backup_fail=0
echo "another backup start expect to fail due to last backup add a lockfile"
Expand All @@ -88,6 +88,13 @@ if [ "$backup_fail" -ne "1" ];then
exit 1
fi

# check is there still exists scheduler not in pause.
pause_schedulers=$(curl http://$PD_ADDR/pd/api/v1/schedulers?status="paused" | grep "scheduler" | wc -l)
if [ "$pause_schedulers" -ne "3" ];then
echo "TEST: [$TEST_NAME] failed because paused scheduler are not enough"
exit 1
fi

if ps -p $_pid > /dev/null
then
echo "$_pid is running"
Expand All @@ -101,29 +108,38 @@ fi
# make sure we won't stuck in non-scheduler state, even we send a SIGTERM to it.
# give enough time to BR so it can gracefully stop.
sleep 5
if curl http://$PD_ADDR/pd/api/v1/config/schedule | jq '[."schedulers-v2"][0][0]' | grep -q '"disable": false'
if curl http://$PD_ADDR/pd/api/v1/config/schedule | jq '[."schedulers-v2"][0][0]' | grep -q '"disable": true'
then
echo "TEST: [$TEST_NAME] failed because scheduler has not been removed"
echo "TEST: [$TEST_NAME] failed because scheduler has been removed"
exit 1
fi

pd_settings=5
# we need reset pd scheduler/config to default
# until pd has the solution to temporary set these scheduler/configs.
run_br validate reset-pd-config-as-default

# max-merge-region-size set to default 20
curl http://$PD_ADDR/pd/api/v1/config/schedule | jq '."max-merge-region-size"' | grep "20" || ((pd_settings--))
# check is there still exists scheduler in pause.
pause_schedulers=$(curl http://$PD_ADDR/pd/api/v1/schedulers?status="paused" | grep "scheduler" | wc -l)
if [ "$pause_schedulers" -ne "3" ];then
echo "TEST: [$TEST_NAME] failed because paused scheduler has changed"
exit 1
fi

# max-merge-region-keys set to default 200000
curl http://$PD_ADDR/pd/api/v1/config/schedule | jq '."max-merge-region-keys"' | grep "200000" || ((pd_settings--))
# balance-region scheduler enabled
curl http://$PD_ADDR/pd/api/v1/config/schedule | jq '."schedulers-v2"[] | {disable: .disable, type: ."type" | select (.=="balance-region")}' | grep '"disable": false' || ((pd_settings--))
# balance-leader scheduler enabled
curl http://$PD_ADDR/pd/api/v1/config/schedule | jq '."schedulers-v2"[] | {disable: .disable, type: ."type" | select (.=="balance-leader")}' | grep '"disable": false' || ((pd_settings--))
# hot region scheduler enabled
curl http://$PD_ADDR/pd/api/v1/config/schedule | jq '."schedulers-v2"[] | {disable: .disable, type: ."type" | select (.=="hot-region")}' | grep '"disable": false' || ((pd_settings--))

# we need reset pd config to default
# until pd has the solution to temporary set these scheduler/configs.
run_br validate reset-pd-config-as-default --pd $PD_ADDR

# max-merge-region-size set to default 20
curl http://$PD_ADDR/pd/api/v1/config/schedule | jq '."max-merge-region-size"' | grep "20" || ((pd_settings--))

# max-merge-region-keys set to default 200000
curl http://$PD_ADDR/pd/api/v1/config/schedule | jq '."max-merge-region-keys"' | grep "200000" || ((pd_settings--))

if [ "$pd_settings" -ne "5" ];then
echo "TEST: [$TEST_NAME] test validate reset pd config failed!"
exit 1
Expand Down