diff --git a/cmd/validate.go b/cmd/validate.go index 86ebcc9e0..1ede767f7 100644 --- a/cmd/validate.go +++ b/cmd/validate.go @@ -21,7 +21,7 @@ import ( "github.com/tikv/pd/pkg/mock/mockid" "go.uber.org/zap" - "github.com/pingcap/br/pkg/conn" + "github.com/pingcap/br/pkg/pdutil" "github.com/pingcap/br/pkg/restore" "github.com/pingcap/br/pkg/rtree" "github.com/pingcap/br/pkg/task" @@ -346,7 +346,7 @@ func setPDConfigCommand() *cobra.Command { } defer mgr.Close() - for scheduler := range conn.Schedulers { + for scheduler := range pdutil.Schedulers { if strings.HasPrefix(scheduler, "balance") { err := mgr.AddScheduler(ctx, scheduler) if err != nil { @@ -357,10 +357,10 @@ func setPDConfigCommand() *cobra.Command { } } - if err := mgr.UpdatePDScheduleConfig(ctx, conn.DefaultPDCfg); err != nil { + if err := mgr.UpdatePDScheduleConfig(ctx, pdutil.DefaultPDCfg); err != nil { return errors.Annotate(err, "fail to update PD merge config") } - log.Info("add pd configs succeed", zap.Any("config", conn.DefaultPDCfg)) + log.Info("add pd configs succeed", zap.Any("config", pdutil.DefaultPDCfg)) return nil }, } diff --git a/pkg/backup/client_test.go b/pkg/backup/client_test.go index e52421eec..6a6ba0196 100644 --- a/pkg/backup/client_test.go +++ b/pkg/backup/client_test.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/br/pkg/backup" "github.com/pingcap/br/pkg/conn" + "github.com/pingcap/br/pkg/pdutil" ) type testBackup struct { @@ -39,9 +40,9 @@ func (r *testBackup) SetUpSuite(c *C) { mvccStore := mocktikv.MustNewMVCCStore() r.mockPDClient = mocktikv.NewPDClient(mocktikv.NewCluster(mvccStore)) r.ctx, r.cancel = context.WithCancel(context.Background()) - mockMgr := &conn.Mgr{} + mockMgr := &conn.Mgr{PdController: &pdutil.PdController{}} mockMgr.SetPDClient(r.mockPDClient) - mockMgr.SetPDHTTP([]string{"test"}, nil) + mockMgr.SetHTTP([]string{"test"}, nil) var err error r.backupClient, err = backup.NewBackupClient(r.ctx, mockMgr) c.Assert(err, IsNil) diff --git a/pkg/conn/conn.go b/pkg/conn/conn.go index ee8aef11e..d5e6519e2 100644 --- a/pkg/conn/conn.go +++ b/pkg/conn/conn.go @@ -3,16 +3,8 @@ package conn import ( - "bytes" "context" "crypto/tls" - "encoding/json" - "fmt" - "io" - "io/ioutil" - "net/http" - "net/url" - "strings" "sync" "sync/atomic" "time" @@ -23,7 +15,6 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/store/tikv" - "github.com/pingcap/tidb/util/codec" pd "github.com/tikv/pd/client" "go.uber.org/zap" "google.golang.org/grpc" @@ -32,27 +23,19 @@ import ( "google.golang.org/grpc/keepalive" "github.com/pingcap/br/pkg/glue" + "github.com/pingcap/br/pkg/pdutil" "github.com/pingcap/br/pkg/utils" ) const ( - dialTimeout = 30 * time.Second - clusterVersionPrefix = "pd/api/v1/config/cluster-version" - regionCountPrefix = "pd/api/v1/stats/region" - schdulerPrefix = "pd/api/v1/schedulers" - maxMsgSize = int(128 * utils.MB) // pd.ScanRegion may return a large response - scheduleConfigPrefix = "pd/api/v1/config/schedule" + dialTimeout = 30 * time.Second resetRetryTimes = 3 ) // Mgr manages connections to a TiDB cluster. type Mgr struct { - pdClient pd.Client - pdHTTP struct { - addrs []string - cli *http.Client - } + *pdutil.PdController tlsConf *tls.Config dom *domain.Domain storage tikv.Storage @@ -63,38 +46,6 @@ type Mgr struct { ownsStorage bool } -type pdHTTPRequest func(context.Context, string, string, *http.Client, string, io.Reader) ([]byte, error) - -func pdRequest( - ctx context.Context, - addr string, prefix string, - cli *http.Client, method string, body io.Reader) ([]byte, error) { - u, err := url.Parse(addr) - if err != nil { - return nil, errors.Trace(err) - } - url := fmt.Sprintf("%s/%s", u, prefix) - req, err := http.NewRequestWithContext(ctx, method, url, body) - if err != nil { - return nil, errors.Trace(err) - } - resp, err := cli.Do(req) - if err != nil { - return nil, errors.Trace(err) - } - defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - res, _ := ioutil.ReadAll(resp.Body) - return nil, errors.Errorf("[%d] %s %s", resp.StatusCode, res, url) - } - - r, err := ioutil.ReadAll(resp.Body) - if err != nil { - return nil, errors.Trace(err) - } - return r, nil -} - // StoreBehavior is the action to do in GetAllTiKVStores when a non-TiKV // store (e.g. TiFlash store) is found. type StoreBehavior uint8 @@ -160,50 +111,13 @@ func NewMgr( storeBehavior StoreBehavior, checkRequirements bool, ) (*Mgr, error) { - addrs := strings.Split(pdAddrs, ",") - - failure := errors.Errorf("pd address (%s) has wrong format", pdAddrs) - cli := &http.Client{Timeout: 30 * time.Second} - if tlsConf != nil { - transport := http.DefaultTransport.(*http.Transport).Clone() - transport.TLSClientConfig = tlsConf - cli.Transport = transport - } - - processedAddrs := make([]string, 0, len(addrs)) - for _, addr := range addrs { - if addr != "" && !strings.HasPrefix("http", addr) { - if tlsConf != nil { - addr = "https://" + addr - } else { - addr = "http://" + addr - } - } - processedAddrs = append(processedAddrs, addr) - _, failure = pdRequest(ctx, addr, clusterVersionPrefix, cli, http.MethodGet, nil) - if failure == nil { - break - } - } - if failure != nil { - return nil, errors.Annotatef(failure, "pd address (%s) not available, please check network", pdAddrs) - } - - maxCallMsgSize := []grpc.DialOption{ - grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)), - grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(maxMsgSize)), - } - pdClient, err := pd.NewClientWithContext( - ctx, addrs, securityOption, - pd.WithGRPCDialOptions(maxCallMsgSize...), - pd.WithCustomTimeoutOption(10*time.Second), - ) + controller, err := pdutil.NewPdController(ctx, pdAddrs, tlsConf, securityOption) if err != nil { - log.Error("fail to create pd client", zap.Error(err)) + log.Error("fail to create pd controller", zap.Error(err)) return nil, err } if checkRequirements { - err = utils.CheckClusterVersion(ctx, pdClient) + err = utils.CheckClusterVersion(ctx, controller.GetPDClient()) if err != nil { errMsg := "running BR in incompatible version of cluster, " + "if you believe it's OK, use --check-requirements=false to skip." @@ -213,7 +127,7 @@ func NewMgr( log.Info("new mgr", zap.String("pdAddrs", pdAddrs)) // Check live tikv. - stores, err := GetAllTiKVStores(ctx, pdClient, storeBehavior) + stores, err := GetAllTiKVStores(ctx, controller.GetPDClient(), storeBehavior) if err != nil { log.Error("fail to get store", zap.Error(err)) return nil, err @@ -238,84 +152,18 @@ func NewMgr( } mgr := &Mgr{ - pdClient: pdClient, - storage: storage, - dom: dom, - tlsConf: tlsConf, - ownsStorage: g.OwnsStorage(), + PdController: controller, + storage: storage, + dom: dom, + tlsConf: tlsConf, + ownsStorage: g.OwnsStorage(), } - mgr.pdHTTP.addrs = processedAddrs - mgr.pdHTTP.cli = cli mgr.grpcClis.clis = make(map[uint64]*grpc.ClientConn) return mgr, nil } -// SetPDHTTP set pd addrs and cli for test. -func (mgr *Mgr) SetPDHTTP(addrs []string, cli *http.Client) { - mgr.pdHTTP.addrs = addrs - mgr.pdHTTP.cli = cli -} - -// SetPDClient set pd addrs and cli for test. -func (mgr *Mgr) SetPDClient(pdClient pd.Client) { - mgr.pdClient = pdClient -} - -// GetClusterVersion returns the current cluster version. -func (mgr *Mgr) GetClusterVersion(ctx context.Context) (string, error) { - return mgr.getClusterVersionWith(ctx, pdRequest) -} - -func (mgr *Mgr) getClusterVersionWith(ctx context.Context, get pdHTTPRequest) (string, error) { - var err error - for _, addr := range mgr.pdHTTP.addrs { - v, e := get(ctx, addr, clusterVersionPrefix, mgr.pdHTTP.cli, http.MethodGet, nil) - if e != nil { - err = e - continue - } - return string(v), nil - } - - return "", err -} - -// GetRegionCount returns the region count in the specified range. -func (mgr *Mgr) GetRegionCount(ctx context.Context, startKey, endKey []byte) (int, error) { - return mgr.getRegionCountWith(ctx, pdRequest, startKey, endKey) -} - -func (mgr *Mgr) getRegionCountWith( - ctx context.Context, get pdHTTPRequest, startKey, endKey []byte, -) (int, error) { - // TiKV reports region start/end keys to PD in memcomparable-format. - var start, end string - start = url.QueryEscape(string(codec.EncodeBytes(nil, startKey))) - if len(endKey) != 0 { // Empty end key means the max. - end = url.QueryEscape(string(codec.EncodeBytes(nil, endKey))) - } - var err error - for _, addr := range mgr.pdHTTP.addrs { - query := fmt.Sprintf( - "%s?start_key=%s&end_key=%s", - regionCountPrefix, start, end) - v, e := get(ctx, addr, query, mgr.pdHTTP.cli, http.MethodGet, nil) - if e != nil { - err = e - continue - } - regionsMap := make(map[string]interface{}) - err = json.Unmarshal(v, ®ionsMap) - if err != nil { - return 0, err - } - return int(regionsMap["count"].(float64)), nil - } - return 0, err -} - func (mgr *Mgr) getGrpcConnLocked(ctx context.Context, storeID uint64) (*grpc.ClientConn, error) { - store, err := mgr.pdClient.GetStore(ctx, storeID) + store, err := mgr.GetPDClient().GetStore(ctx, storeID) if err != nil { return nil, errors.Trace(err) } @@ -404,11 +252,6 @@ func (mgr *Mgr) ResetBackupClient(ctx context.Context, storeID uint64) (backup.B return backup.NewBackupClient(conn), nil } -// GetPDClient returns a pd client. -func (mgr *Mgr) GetPDClient() pd.Client { - return mgr.pdClient -} - // GetTiKV returns a tikv storage. func (mgr *Mgr) GetTiKV() tikv.Storage { return mgr.storage @@ -429,104 +272,6 @@ func (mgr *Mgr) GetDomain() *domain.Domain { return mgr.dom } -// RemoveScheduler remove pd scheduler. -func (mgr *Mgr) RemoveScheduler(ctx context.Context, scheduler string) error { - return mgr.removeSchedulerWith(ctx, scheduler, pdRequest) -} - -func (mgr *Mgr) removeSchedulerWith(ctx context.Context, scheduler string, delete pdHTTPRequest) (err error) { - for _, addr := range mgr.pdHTTP.addrs { - prefix := fmt.Sprintf("%s/%s", schdulerPrefix, scheduler) - _, err = delete(ctx, addr, prefix, mgr.pdHTTP.cli, http.MethodDelete, nil) - if err != nil { - continue - } - return nil - } - return err -} - -// AddScheduler add pd scheduler. -func (mgr *Mgr) AddScheduler(ctx context.Context, scheduler string) error { - return mgr.addSchedulerWith(ctx, scheduler, pdRequest) -} - -func (mgr *Mgr) addSchedulerWith(ctx context.Context, scheduler string, post pdHTTPRequest) (err error) { - for _, addr := range mgr.pdHTTP.addrs { - body := bytes.NewBuffer([]byte(`{"name":"` + scheduler + `"}`)) - _, err = post(ctx, addr, schdulerPrefix, mgr.pdHTTP.cli, http.MethodPost, body) - if err != nil { - continue - } - return nil - } - return err -} - -// ListSchedulers list all pd scheduler. -func (mgr *Mgr) ListSchedulers(ctx context.Context) ([]string, error) { - return mgr.listSchedulersWith(ctx, pdRequest) -} - -func (mgr *Mgr) listSchedulersWith(ctx context.Context, get pdHTTPRequest) ([]string, error) { - var err error - for _, addr := range mgr.pdHTTP.addrs { - v, e := get(ctx, addr, schdulerPrefix, mgr.pdHTTP.cli, http.MethodGet, nil) - if e != nil { - err = e - continue - } - d := make([]string, 0) - err = json.Unmarshal(v, &d) - if err != nil { - return nil, err - } - return d, nil - } - return nil, err -} - -// GetPDScheduleConfig returns PD schedule config value associated with the key. -// It returns nil if there is no such config item. -func (mgr *Mgr) GetPDScheduleConfig( - ctx context.Context, -) (map[string]interface{}, error) { - var err error - for _, addr := range mgr.pdHTTP.addrs { - v, e := pdRequest( - ctx, addr, scheduleConfigPrefix, mgr.pdHTTP.cli, http.MethodGet, nil) - if e != nil { - err = e - continue - } - cfg := make(map[string]interface{}) - err = json.Unmarshal(v, &cfg) - if err != nil { - return nil, err - } - return cfg, nil - } - return nil, err -} - -// UpdatePDScheduleConfig updates PD schedule config value associated with the key. -func (mgr *Mgr) UpdatePDScheduleConfig( - ctx context.Context, cfg map[string]interface{}, -) error { - for _, addr := range mgr.pdHTTP.addrs { - reqData, err := json.Marshal(cfg) - if err != nil { - return err - } - _, e := pdRequest(ctx, addr, scheduleConfigPrefix, - mgr.pdHTTP.cli, http.MethodPost, bytes.NewBuffer(reqData)) - if e == nil { - return nil - } - } - return errors.New("update PD schedule config failed") -} - // Close closes all client in Mgr. func (mgr *Mgr) Close() { mgr.grpcClis.mu.Lock() @@ -549,5 +294,5 @@ func (mgr *Mgr) Close() { mgr.storage.Close() } - mgr.pdClient.Close() + mgr.PdController.Close() } diff --git a/pkg/conn/conn_test.go b/pkg/conn/conn_test.go index 63ff50385..89b235731 100644 --- a/pkg/conn/conn_test.go +++ b/pkg/conn/conn_test.go @@ -4,21 +4,14 @@ package conn import ( "context" - "encoding/hex" - "encoding/json" - "fmt" - "io" - "net/http" - "net/url" "testing" + "github.com/pingcap/br/pkg/pdutil" + . "github.com/pingcap/check" - "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/tidb/util/codec" pd "github.com/tikv/pd/client" "github.com/tikv/pd/server/core" - "github.com/tikv/pd/server/statistics" ) func TestT(t *testing.T) { @@ -37,7 +30,7 @@ type testClientSuite struct { func (s *testClientSuite) SetUpSuite(c *C) { s.ctx, s.cancel = context.WithCancel(context.Background()) - s.mgr = &Mgr{} + s.mgr = &Mgr{PdController: &pdutil.PdController{}} s.regions = core.NewRegionsInfo() } @@ -45,112 +38,6 @@ func (s *testClientSuite) TearDownSuite(c *C) { s.cancel() } -func (s *testClientSuite) TestGetClusterVersion(c *C) { - s.mgr.pdHTTP.addrs = []string{"", ""} // two endpoints - counter := 0 - mock := func(context.Context, string, string, *http.Client, string, io.Reader) ([]byte, error) { - counter++ - if counter <= 1 { - return nil, errors.New("mock error") - } - return []byte(`test`), nil - } - - ctx := context.Background() - respString, err := s.mgr.getClusterVersionWith(ctx, mock) - c.Assert(err, IsNil) - c.Assert(respString, Equals, "test") - - mock = func(context.Context, string, string, *http.Client, string, io.Reader) ([]byte, error) { - return nil, errors.New("mock error") - } - _, err = s.mgr.getClusterVersionWith(ctx, mock) - c.Assert(err, NotNil) -} - -func (s *testClientSuite) TestScheduler(c *C) { - ctx := context.Background() - - scheduler := "balance-leader-scheduler" - mock := func(context.Context, string, string, *http.Client, string, io.Reader) ([]byte, error) { - return nil, errors.New("failed") - } - err := s.mgr.removeSchedulerWith(ctx, scheduler, mock) - c.Assert(err, ErrorMatches, "failed") - - err = s.mgr.addSchedulerWith(ctx, scheduler, mock) - c.Assert(err, ErrorMatches, "failed") - - _, err = s.mgr.listSchedulersWith(ctx, mock) - c.Assert(err, ErrorMatches, "failed") - - mock = func(context.Context, string, string, *http.Client, string, io.Reader) ([]byte, error) { - return []byte(`["` + scheduler + `"]`), nil - } - err = s.mgr.removeSchedulerWith(ctx, scheduler, mock) - c.Assert(err, IsNil) - - err = s.mgr.addSchedulerWith(ctx, scheduler, mock) - c.Assert(err, IsNil) - - schedulers, err := s.mgr.listSchedulersWith(ctx, mock) - c.Assert(err, IsNil) - c.Assert(schedulers, HasLen, 1) - c.Assert(schedulers[0], Equals, scheduler) -} - -func (s *testClientSuite) TestRegionCount(c *C) { - s.regions.SetRegion(core.NewRegionInfo(&metapb.Region{ - Id: 1, - StartKey: codec.EncodeBytes(nil, []byte{1, 1}), - EndKey: codec.EncodeBytes(nil, []byte{1, 3}), - RegionEpoch: &metapb.RegionEpoch{}, - }, nil)) - s.regions.SetRegion(core.NewRegionInfo(&metapb.Region{ - Id: 2, - StartKey: codec.EncodeBytes(nil, []byte{1, 3}), - EndKey: codec.EncodeBytes(nil, []byte{1, 5}), - RegionEpoch: &metapb.RegionEpoch{}, - }, nil)) - s.regions.SetRegion(core.NewRegionInfo(&metapb.Region{ - Id: 3, - StartKey: codec.EncodeBytes(nil, []byte{2, 3}), - EndKey: codec.EncodeBytes(nil, []byte{3, 4}), - RegionEpoch: &metapb.RegionEpoch{}, - }, nil)) - c.Assert(s.regions.Length(), Equals, 3) - - mock := func( - _ context.Context, addr string, prefix string, _ *http.Client, _ string, _ io.Reader, - ) ([]byte, error) { - query := fmt.Sprintf("%s/%s", addr, prefix) - u, e := url.Parse(query) - c.Assert(e, IsNil, Commentf("%s", query)) - start := u.Query().Get("start_key") - end := u.Query().Get("end_key") - c.Log(hex.EncodeToString([]byte(start))) - c.Log(hex.EncodeToString([]byte(end))) - regions := s.regions.ScanRange([]byte(start), []byte(end), 0) - stats := statistics.RegionStats{Count: len(regions)} - ret, err := json.Marshal(stats) - c.Assert(err, IsNil) - return ret, nil - } - s.mgr.pdHTTP.addrs = []string{"http://mock"} - ctx := context.Background() - resp, err := s.mgr.getRegionCountWith(ctx, mock, []byte{}, []byte{}) - c.Assert(err, IsNil) - c.Assert(resp, Equals, 3) - - resp, err = s.mgr.getRegionCountWith(ctx, mock, []byte{0}, []byte{0xff}) - c.Assert(err, IsNil) - c.Assert(resp, Equals, 3) - - resp, err = s.mgr.getRegionCountWith(ctx, mock, []byte{1, 2}, []byte{1, 4}) - c.Assert(err, IsNil) - c.Assert(resp, Equals, 2) -} - type fakePDClient struct { pd.Client stores []*metapb.Store diff --git a/pkg/conn/scheduler_utils.go b/pkg/conn/scheduler_utils.go deleted file mode 100644 index b31f3415b..000000000 --- a/pkg/conn/scheduler_utils.go +++ /dev/null @@ -1,179 +0,0 @@ -// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. - -package conn - -import ( - "context" - "math" - - "github.com/pingcap/errors" - - "github.com/pingcap/br/pkg/utils" -) - -// clusterConfig represents a set of scheduler whose config have been modified -// along with their original config. -type clusterConfig struct { - // Enable PD schedulers before restore - scheduler []string - // Original scheudle configuration - scheduleCfg map[string]interface{} -} - -var ( - // Schedulers represent region/leader schedulers which can impact on performance. - Schedulers = map[string]struct{}{ - "balance-leader-scheduler": {}, - "balance-hot-region-scheduler": {}, - "balance-region-scheduler": {}, - - "shuffle-leader-scheduler": {}, - "shuffle-region-scheduler": {}, - "shuffle-hot-region-scheduler": {}, - } - - pdRegionMergeCfg = []string{ - "max-merge-region-keys", - "max-merge-region-size", - } - pdScheduleLimitCfg = []string{ - "leader-schedule-limit", - "region-schedule-limit", - "max-snapshot-count", - } - - // DefaultPDCfg find by https://github.com/tikv/pd/blob/master/conf/config.toml. - DefaultPDCfg = map[string]interface{}{ - "max-merge-region-keys": 200000, - "max-merge-region-size": 20, - "leader-schedule-limit": 4, - "region-schedule-limit": 2048, - "max-snapshot-count": 3, - } -) - -func addPDLeaderScheduler(ctx context.Context, mgr *Mgr, removedSchedulers []string) error { - for _, scheduler := range removedSchedulers { - err := mgr.AddScheduler(ctx, scheduler) - if err != nil { - return err - } - } - return nil -} - -func restoreSchedulers(ctx context.Context, mgr *Mgr, clusterCfg clusterConfig) error { - if err := addPDLeaderScheduler(ctx, mgr, clusterCfg.scheduler); err != nil { - return errors.Annotate(err, "fail to add PD schedulers") - } - mergeCfg := make(map[string]interface{}) - for _, cfgKey := range pdRegionMergeCfg { - value := clusterCfg.scheduleCfg[cfgKey] - if value == nil { - // Ignore non-exist config. - continue - } - mergeCfg[cfgKey] = value - } - if err := mgr.UpdatePDScheduleConfig(ctx, mergeCfg); err != nil { - return errors.Annotate(err, "fail to update PD merge config") - } - - scheduleLimitCfg := make(map[string]interface{}) - for _, cfgKey := range pdScheduleLimitCfg { - value := clusterCfg.scheduleCfg[cfgKey] - if value == nil { - // Ignore non-exist config. - continue - } - scheduleLimitCfg[cfgKey] = value - } - if err := mgr.UpdatePDScheduleConfig(ctx, scheduleLimitCfg); err != nil { - return errors.Annotate(err, "fail to update PD schedule config") - } - return nil -} - -func (mgr *Mgr) makeUndoFunctionByConfig(config clusterConfig) utils.UndoFunc { - restore := func(ctx context.Context) error { - return restoreSchedulers(ctx, mgr, config) - } - return restore -} - -// RemoveSchedulers removes the schedulers that may slow down BR speed. -func (mgr *Mgr) RemoveSchedulers(ctx context.Context) (undo utils.UndoFunc, err error) { - undo = utils.Nop - - // Remove default PD scheduler that may affect restore process. - existSchedulers, err := mgr.ListSchedulers(ctx) - if err != nil { - return - } - needRemoveSchedulers := make([]string, 0, len(existSchedulers)) - for _, s := range existSchedulers { - if _, ok := Schedulers[s]; ok { - needRemoveSchedulers = append(needRemoveSchedulers, s) - } - } - scheduler, err := removePDLeaderScheduler(ctx, mgr, needRemoveSchedulers) - if err != nil { - return - } - - undo = mgr.makeUndoFunctionByConfig(clusterConfig{scheduler: scheduler}) - - stores, err := mgr.GetPDClient().GetAllStores(ctx) - if err != nil { - return - } - scheduleCfg, err := mgr.GetPDScheduleConfig(ctx) - if err != nil { - return - } - - undo = mgr.makeUndoFunctionByConfig(clusterConfig{scheduler: scheduler, scheduleCfg: scheduleCfg}) - - disableMergeCfg := make(map[string]interface{}) - for _, cfgKey := range pdRegionMergeCfg { - value := scheduleCfg[cfgKey] - if value == nil { - // Ignore non-exist config. - continue - } - // Disable region merge by setting config to 0. - disableMergeCfg[cfgKey] = 0 - } - err = mgr.UpdatePDScheduleConfig(ctx, disableMergeCfg) - if err != nil { - return - } - - scheduleLimitCfg := make(map[string]interface{}) - for _, cfgKey := range pdScheduleLimitCfg { - value := scheduleCfg[cfgKey] - if value == nil { - // Ignore non-exist config. - continue - } - - // Speed update PD scheduler by enlarging scheduling limits. - // Multiply limits by store count but no more than 40. - // Larger limit may make cluster unstable. - limit := int(value.(float64)) - scheduleLimitCfg[cfgKey] = math.Min(40, float64(limit*len(stores))) - } - return undo, mgr.UpdatePDScheduleConfig(ctx, scheduleLimitCfg) -} - -func removePDLeaderScheduler(ctx context.Context, mgr *Mgr, existSchedulers []string) ([]string, error) { - removedSchedulers := make([]string, 0, len(existSchedulers)) - for _, scheduler := range existSchedulers { - err := mgr.RemoveScheduler(ctx, scheduler) - if err != nil { - return nil, err - } - removedSchedulers = append(removedSchedulers, scheduler) - } - return removedSchedulers, nil -} diff --git a/pkg/pdutil/pd.go b/pkg/pdutil/pd.go new file mode 100644 index 000000000..c1f4b99cc --- /dev/null +++ b/pkg/pdutil/pd.go @@ -0,0 +1,471 @@ +// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. + +package pdutil + +import ( + "bytes" + "context" + "crypto/tls" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "math" + "net/http" + "net/url" + "strings" + "time" + + "go.uber.org/zap" + "google.golang.org/grpc" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tidb/util/codec" + pd "github.com/tikv/pd/client" + + "github.com/pingcap/br/pkg/utils" +) + +const ( + clusterVersionPrefix = "pd/api/v1/config/cluster-version" + regionCountPrefix = "pd/api/v1/stats/region" + schedulerPrefix = "pd/api/v1/schedulers" + maxMsgSize = int(128 * utils.MB) // pd.ScanRegion may return a large response + scheduleConfigPrefix = "pd/api/v1/config/schedule" +) + +// clusterConfig represents a set of scheduler whose config have been modified +// along with their original config. +type clusterConfig struct { + // Enable PD schedulers before restore + scheduler []string + // Original scheudle configuration + scheduleCfg map[string]interface{} +} + +var ( + // Schedulers represent region/leader schedulers which can impact on performance. + Schedulers = map[string]struct{}{ + "balance-leader-scheduler": {}, + "balance-hot-region-scheduler": {}, + "balance-region-scheduler": {}, + + "shuffle-leader-scheduler": {}, + "shuffle-region-scheduler": {}, + "shuffle-hot-region-scheduler": {}, + } + pdRegionMergeCfg = []string{ + "max-merge-region-keys", + "max-merge-region-size", + } + pdScheduleLimitCfg = []string{ + "leader-schedule-limit", + "region-schedule-limit", + "max-snapshot-count", + } + + // DefaultPDCfg find by https://github.com/tikv/pd/blob/master/conf/config.toml. + DefaultPDCfg = map[string]interface{}{ + "max-merge-region-keys": 200000, + "max-merge-region-size": 20, + "leader-schedule-limit": 4, + "region-schedule-limit": 2048, + "max-snapshot-count": 3, + } +) + +// pdHTTPRequest defines the interface to send a request to pd and return the result in bytes. +type pdHTTPRequest func(context.Context, string, string, *http.Client, string, io.Reader) ([]byte, error) + +// pdRequest is a func to send a HTTP to pd and return the result bytes. +func pdRequest( + ctx context.Context, + addr string, prefix string, + cli *http.Client, method string, body io.Reader) ([]byte, error) { + u, err := url.Parse(addr) + if err != nil { + return nil, errors.Trace(err) + } + reqURL := fmt.Sprintf("%s/%s", u, prefix) + req, err := http.NewRequestWithContext(ctx, method, reqURL, body) + if err != nil { + return nil, errors.Trace(err) + } + resp, err := cli.Do(req) + if err != nil { + return nil, errors.Trace(err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + res, _ := ioutil.ReadAll(resp.Body) + return nil, errors.Errorf("[%d] %s %s", resp.StatusCode, res, reqURL) + } + + r, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, errors.Trace(err) + } + return r, nil +} + +// PdController manage get/update config from pd. +type PdController struct { + addrs []string + cli *http.Client + pdClient pd.Client +} + +// NewPdController creates a new PdController. +func NewPdController( + ctx context.Context, + pdAddrs string, + tlsConf *tls.Config, + securityOption pd.SecurityOption, +) (*PdController, error) { + cli := &http.Client{Timeout: 30 * time.Second} + if tlsConf != nil { + transport := http.DefaultTransport.(*http.Transport).Clone() + transport.TLSClientConfig = tlsConf + cli.Transport = transport + } + + addrs := strings.Split(pdAddrs, ",") + processedAddrs := make([]string, 0, len(addrs)) + var failure error + for _, addr := range addrs { + if addr != "" && !strings.HasPrefix("http", addr) { + if tlsConf != nil { + addr = "https://" + addr + } else { + addr = "http://" + addr + } + } + processedAddrs = append(processedAddrs, addr) + _, failure = pdRequest(ctx, addr, clusterVersionPrefix, cli, http.MethodGet, nil) + if failure == nil { + break + } + } + if failure != nil { + return nil, errors.Annotatef(failure, "pd address (%s) not available, please check network", pdAddrs) + } + + maxCallMsgSize := []grpc.DialOption{ + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)), + grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(maxMsgSize)), + } + pdClient, err := pd.NewClientWithContext( + ctx, addrs, securityOption, + pd.WithGRPCDialOptions(maxCallMsgSize...), + pd.WithCustomTimeoutOption(10*time.Second), + ) + if err != nil { + log.Error("fail to create pd client", zap.Error(err)) + return nil, err + } + + return &PdController{ + addrs: processedAddrs, + cli: cli, + pdClient: pdClient, + }, nil +} + +// SetHTTP set pd addrs and cli for test. +func (p *PdController) SetHTTP(addrs []string, cli *http.Client) { + p.addrs = addrs + p.cli = cli +} + +// SetPDClient set pd addrs and cli for test. +func (p *PdController) SetPDClient(pdClient pd.Client) { + p.pdClient = pdClient +} + +// GetPDClient set pd addrs and cli for test. +func (p *PdController) GetPDClient() pd.Client { + return p.pdClient +} + +// GetClusterVersion returns the current cluster version. +func (p *PdController) GetClusterVersion(ctx context.Context) (string, error) { + return p.getClusterVersionWith(ctx, pdRequest) +} + +func (p *PdController) getClusterVersionWith(ctx context.Context, get pdHTTPRequest) (string, error) { + var err error + for _, addr := range p.addrs { + v, e := get(ctx, addr, clusterVersionPrefix, p.cli, http.MethodGet, nil) + if e != nil { + err = e + continue + } + return string(v), nil + } + + return "", err +} + +// GetRegionCount returns the region count in the specified range. +func (p *PdController) GetRegionCount(ctx context.Context, startKey, endKey []byte) (int, error) { + return p.getRegionCountWith(ctx, pdRequest, startKey, endKey) +} + +func (p *PdController) getRegionCountWith( + ctx context.Context, get pdHTTPRequest, startKey, endKey []byte, +) (int, error) { + // TiKV reports region start/end keys to PD in memcomparable-format. + var start, end string + start = url.QueryEscape(string(codec.EncodeBytes(nil, startKey))) + if len(endKey) != 0 { // Empty end key means the max. + end = url.QueryEscape(string(codec.EncodeBytes(nil, endKey))) + } + var err error + for _, addr := range p.addrs { + query := fmt.Sprintf( + "%s?start_key=%s&end_key=%s", + regionCountPrefix, start, end) + v, e := get(ctx, addr, query, p.cli, http.MethodGet, nil) + if e != nil { + err = e + continue + } + regionsMap := make(map[string]interface{}) + err = json.Unmarshal(v, ®ionsMap) + if err != nil { + return 0, err + } + return int(regionsMap["count"].(float64)), nil + } + return 0, err +} + +// RemoveScheduler remove pd scheduler. +func (p *PdController) RemoveScheduler(ctx context.Context, scheduler string) error { + return p.removeSchedulerWith(ctx, scheduler, pdRequest) +} + +func (p *PdController) removeSchedulerWith(ctx context.Context, scheduler string, delete pdHTTPRequest) (err error) { + for _, addr := range p.addrs { + prefix := fmt.Sprintf("%s/%s", schedulerPrefix, scheduler) + _, err = delete(ctx, addr, prefix, p.cli, http.MethodDelete, nil) + if err != nil { + continue + } + return nil + } + return err +} + +// AddScheduler add pd scheduler. +func (p *PdController) AddScheduler(ctx context.Context, scheduler string) error { + return p.addSchedulerWith(ctx, scheduler, pdRequest) +} + +func (p *PdController) addSchedulerWith(ctx context.Context, scheduler string, post pdHTTPRequest) (err error) { + for _, addr := range p.addrs { + body := bytes.NewBuffer([]byte(`{"name":"` + scheduler + `"}`)) + _, err = post(ctx, addr, schedulerPrefix, p.cli, http.MethodPost, body) + if err != nil { + continue + } + return nil + } + return err +} + +// ListSchedulers list all pd scheduler. +func (p *PdController) ListSchedulers(ctx context.Context) ([]string, error) { + return p.listSchedulersWith(ctx, pdRequest) +} + +func (p *PdController) listSchedulersWith(ctx context.Context, get pdHTTPRequest) ([]string, error) { + var err error + for _, addr := range p.addrs { + v, e := get(ctx, addr, schedulerPrefix, p.cli, http.MethodGet, nil) + if e != nil { + err = e + continue + } + d := make([]string, 0) + err = json.Unmarshal(v, &d) + if err != nil { + return nil, err + } + return d, nil + } + return nil, err +} + +// GetPDScheduleConfig returns PD schedule config value associated with the key. +// It returns nil if there is no such config item. +func (p *PdController) GetPDScheduleConfig( + ctx context.Context, +) (map[string]interface{}, error) { + var err error + for _, addr := range p.addrs { + v, e := pdRequest( + ctx, addr, scheduleConfigPrefix, p.cli, http.MethodGet, nil) + if e != nil { + err = e + continue + } + cfg := make(map[string]interface{}) + err = json.Unmarshal(v, &cfg) + if err != nil { + return nil, err + } + return cfg, nil + } + return nil, err +} + +// UpdatePDScheduleConfig updates PD schedule config value associated with the key. +func (p *PdController) UpdatePDScheduleConfig( + ctx context.Context, cfg map[string]interface{}, +) error { + for _, addr := range p.addrs { + reqData, err := json.Marshal(cfg) + if err != nil { + return err + } + _, e := pdRequest(ctx, addr, scheduleConfigPrefix, + p.cli, http.MethodPost, bytes.NewBuffer(reqData)) + if e == nil { + return nil + } + } + return errors.New("update PD schedule config failed") +} + +func addPDLeaderScheduler(ctx context.Context, pd *PdController, removedSchedulers []string) error { + for _, scheduler := range removedSchedulers { + err := pd.AddScheduler(ctx, scheduler) + if err != nil { + return err + } + } + return nil +} + +func restoreSchedulers(ctx context.Context, pd *PdController, clusterCfg clusterConfig) error { + if err := addPDLeaderScheduler(ctx, pd, clusterCfg.scheduler); err != nil { + return errors.Annotate(err, "fail to add PD schedulers") + } + mergeCfg := make(map[string]interface{}) + for _, cfgKey := range pdRegionMergeCfg { + value := clusterCfg.scheduleCfg[cfgKey] + if value == nil { + // Ignore non-exist config. + continue + } + mergeCfg[cfgKey] = value + } + if err := pd.UpdatePDScheduleConfig(ctx, mergeCfg); err != nil { + return errors.Annotate(err, "fail to update PD merge config") + } + + scheduleLimitCfg := make(map[string]interface{}) + for _, cfgKey := range pdScheduleLimitCfg { + value := clusterCfg.scheduleCfg[cfgKey] + if value == nil { + // Ignore non-exist config. + continue + } + scheduleLimitCfg[cfgKey] = value + } + if err := pd.UpdatePDScheduleConfig(ctx, scheduleLimitCfg); err != nil { + return errors.Annotate(err, "fail to update PD schedule config") + } + return nil +} + +func (p *PdController) makeUndoFunctionByConfig(config clusterConfig) utils.UndoFunc { + restore := func(ctx context.Context) error { + return restoreSchedulers(ctx, p, config) + } + return restore +} + +// RemoveSchedulers removes the schedulers that may slow down BR speed. +func (p *PdController) RemoveSchedulers(ctx context.Context) (undo utils.UndoFunc, err error) { + undo = utils.Nop + + // Remove default PD scheduler that may affect restore process. + existSchedulers, err := p.ListSchedulers(ctx) + if err != nil { + return + } + needRemoveSchedulers := make([]string, 0, len(existSchedulers)) + for _, s := range existSchedulers { + if _, ok := Schedulers[s]; ok { + needRemoveSchedulers = append(needRemoveSchedulers, s) + } + } + scheduler, err := removePDLeaderScheduler(ctx, p, needRemoveSchedulers) + if err != nil { + return + } + + undo = p.makeUndoFunctionByConfig(clusterConfig{scheduler: scheduler}) + + stores, err := p.pdClient.GetAllStores(ctx) + if err != nil { + return + } + scheduleCfg, err := p.GetPDScheduleConfig(ctx) + if err != nil { + return + } + + undo = p.makeUndoFunctionByConfig(clusterConfig{scheduler: scheduler, scheduleCfg: scheduleCfg}) + + disableMergeCfg := make(map[string]interface{}) + for _, cfgKey := range pdRegionMergeCfg { + value := scheduleCfg[cfgKey] + if value == nil { + // Ignore non-exist config. + continue + } + // Disable region merge by setting config to 0. + disableMergeCfg[cfgKey] = 0 + } + err = p.UpdatePDScheduleConfig(ctx, disableMergeCfg) + if err != nil { + return + } + + scheduleLimitCfg := make(map[string]interface{}) + for _, cfgKey := range pdScheduleLimitCfg { + value := scheduleCfg[cfgKey] + if value == nil { + // Ignore non-exist config. + continue + } + + // Speed update PD scheduler by enlarging scheduling limits. + // Multiply limits by store count but no more than 40. + // Larger limit may make cluster unstable. + limit := int(value.(float64)) + scheduleLimitCfg[cfgKey] = math.Min(40, float64(limit*len(stores))) + } + return undo, p.UpdatePDScheduleConfig(ctx, scheduleLimitCfg) +} + +// Close close the connection to pd. +func (p *PdController) Close() { + p.pdClient.Close() +} + +func removePDLeaderScheduler(ctx context.Context, pd *PdController, existSchedulers []string) ([]string, error) { + removedSchedulers := make([]string, 0, len(existSchedulers)) + for _, scheduler := range existSchedulers { + err := pd.RemoveScheduler(ctx, scheduler) + if err != nil { + return nil, err + } + removedSchedulers = append(removedSchedulers, scheduler) + } + return removedSchedulers, nil +} diff --git a/pkg/pdutil/pd_test.go b/pkg/pdutil/pd_test.go new file mode 100644 index 000000000..be442bbbc --- /dev/null +++ b/pkg/pdutil/pd_test.go @@ -0,0 +1,139 @@ +// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. + +package pdutil + +import ( + "context" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "testing" + + . "github.com/pingcap/check" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/tidb/util/codec" + "github.com/tikv/pd/server/core" + "github.com/tikv/pd/server/statistics" +) + +func TestT(t *testing.T) { + TestingT(t) +} + +type testPDControllerSuite struct { +} + +var _ = Suite(&testPDControllerSuite{}) + +func (s *testPDControllerSuite) TestScheduler(c *C) { + ctx := context.Background() + + scheduler := "balance-leader-scheduler" + mock := func(context.Context, string, string, *http.Client, string, io.Reader) ([]byte, error) { + return nil, errors.New("failed") + } + pdController := &PdController{addrs: []string{"", ""}} + err := pdController.removeSchedulerWith(ctx, scheduler, mock) + c.Assert(err, ErrorMatches, "failed") + + err = pdController.addSchedulerWith(ctx, scheduler, mock) + c.Assert(err, ErrorMatches, "failed") + + _, err = pdController.listSchedulersWith(ctx, mock) + c.Assert(err, ErrorMatches, "failed") + + mock = func(context.Context, string, string, *http.Client, string, io.Reader) ([]byte, error) { + return []byte(`["` + scheduler + `"]`), nil + } + err = pdController.removeSchedulerWith(ctx, scheduler, mock) + c.Assert(err, IsNil) + + err = pdController.addSchedulerWith(ctx, scheduler, mock) + c.Assert(err, IsNil) + + schedulers, err := pdController.listSchedulersWith(ctx, mock) + c.Assert(err, IsNil) + c.Assert(schedulers, HasLen, 1) + c.Assert(schedulers[0], Equals, scheduler) +} + +func (s *testPDControllerSuite) TestGetClusterVersion(c *C) { + pdController := &PdController{addrs: []string{"", ""}} // two endpoints + counter := 0 + mock := func(context.Context, string, string, *http.Client, string, io.Reader) ([]byte, error) { + counter++ + if counter <= 1 { + return nil, errors.New("mock error") + } + return []byte(`test`), nil + } + + ctx := context.Background() + respString, err := pdController.getClusterVersionWith(ctx, mock) + c.Assert(err, IsNil) + c.Assert(respString, Equals, "test") + + mock = func(context.Context, string, string, *http.Client, string, io.Reader) ([]byte, error) { + return nil, errors.New("mock error") + } + _, err = pdController.getClusterVersionWith(ctx, mock) + c.Assert(err, NotNil) +} + +func (s *testPDControllerSuite) TestRegionCount(c *C) { + regions := core.NewRegionsInfo() + regions.SetRegion(core.NewRegionInfo(&metapb.Region{ + Id: 1, + StartKey: codec.EncodeBytes(nil, []byte{1, 1}), + EndKey: codec.EncodeBytes(nil, []byte{1, 3}), + RegionEpoch: &metapb.RegionEpoch{}, + }, nil)) + regions.SetRegion(core.NewRegionInfo(&metapb.Region{ + Id: 2, + StartKey: codec.EncodeBytes(nil, []byte{1, 3}), + EndKey: codec.EncodeBytes(nil, []byte{1, 5}), + RegionEpoch: &metapb.RegionEpoch{}, + }, nil)) + regions.SetRegion(core.NewRegionInfo(&metapb.Region{ + Id: 3, + StartKey: codec.EncodeBytes(nil, []byte{2, 3}), + EndKey: codec.EncodeBytes(nil, []byte{3, 4}), + RegionEpoch: &metapb.RegionEpoch{}, + }, nil)) + c.Assert(regions.Length(), Equals, 3) + + mock := func( + _ context.Context, addr string, prefix string, _ *http.Client, _ string, _ io.Reader, + ) ([]byte, error) { + query := fmt.Sprintf("%s/%s", addr, prefix) + u, e := url.Parse(query) + c.Assert(e, IsNil, Commentf("%s", query)) + start := u.Query().Get("start_key") + end := u.Query().Get("end_key") + c.Log(hex.EncodeToString([]byte(start))) + c.Log(hex.EncodeToString([]byte(end))) + scanRegions := regions.ScanRange([]byte(start), []byte(end), 0) + stats := statistics.RegionStats{Count: len(scanRegions)} + ret, err := json.Marshal(stats) + c.Assert(err, IsNil) + return ret, nil + } + + pdController := &PdController{addrs: []string{"http://mock"}} + ctx := context.Background() + resp, err := pdController.getRegionCountWith(ctx, mock, []byte{}, []byte{}) + c.Assert(err, IsNil) + c.Assert(resp, Equals, 3) + + resp, err = pdController.getRegionCountWith(ctx, mock, []byte{0}, []byte{0xff}) + c.Assert(err, IsNil) + c.Assert(resp, Equals, 3) + + resp, err = pdController.getRegionCountWith(ctx, mock, []byte{1, 2}, []byte{1, 4}) + c.Assert(err, IsNil) + c.Assert(resp, Equals, 2) +}