Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
add NewPdController
Browse files Browse the repository at this point in the history
  • Loading branch information
glorv committed Sep 25, 2020
1 parent 38e0f7a commit 4c1b788
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 68 deletions.
60 changes: 9 additions & 51 deletions pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"fmt"
"net/http"
"net/url"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -121,50 +120,13 @@ func NewMgr(
storeBehavior StoreBehavior,
checkRequirements bool,
) (*Mgr, error) {
addrs := strings.Split(pdAddrs, ",")

failure := errors.Errorf("pd address (%s) has wrong format", pdAddrs)
cli := &http.Client{Timeout: 30 * time.Second}
if tlsConf != nil {
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.TLSClientConfig = tlsConf
cli.Transport = transport
}

processedAddrs := make([]string, 0, len(addrs))
for _, addr := range addrs {
if addr != "" && !strings.HasPrefix("http", addr) {
if tlsConf != nil {
addr = "https://" + addr
} else {
addr = "http://" + addr
}
}
processedAddrs = append(processedAddrs, addr)
_, failure = pdRequest(ctx, addr, clusterVersionPrefix, cli, http.MethodGet, nil)
if failure == nil {
break
}
}
if failure != nil {
return nil, errors.Annotatef(failure, "pd address (%s) not available, please check network", pdAddrs)
}

maxCallMsgSize := []grpc.DialOption{
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(maxMsgSize)),
}
pdClient, err := pd.NewClientWithContext(
ctx, addrs, securityOption,
pd.WithGRPCDialOptions(maxCallMsgSize...),
pd.WithCustomTimeoutOption(10*time.Second),
)
controller, err := NewPdController(ctx, pdAddrs, tlsConf, securityOption)
if err != nil {
log.Error("fail to create pd client", zap.Error(err))
log.Error("fail to create pd controller", zap.Error(err))
return nil, err
}
if checkRequirements {
err = utils.CheckClusterVersion(ctx, pdClient)
err = utils.CheckClusterVersion(ctx, controller.pdClient)
if err != nil {
errMsg := "running BR in incompatible version of cluster, " +
"if you believe it's OK, use --check-requirements=false to skip."
Expand All @@ -174,7 +136,7 @@ func NewMgr(
log.Info("new mgr", zap.String("pdAddrs", pdAddrs))

// Check live tikv.
stores, err := GetAllTiKVStores(ctx, pdClient, storeBehavior)
stores, err := GetAllTiKVStores(ctx, controller.pdClient, storeBehavior)
if err != nil {
log.Error("fail to get store", zap.Error(err))
return nil, err
Expand All @@ -199,16 +161,12 @@ func NewMgr(
}

mgr := &Mgr{
PdController: &PdController{
pdClient: pdClient,
},
storage: storage,
dom: dom,
tlsConf: tlsConf,
ownsStorage: g.OwnsStorage(),
PdController: controller,
storage: storage,
dom: dom,
tlsConf: tlsConf,
ownsStorage: g.OwnsStorage(),
}
mgr.PdController.addrs = processedAddrs
mgr.PdController.cli = cli
mgr.grpcClis.clis = make(map[uint64]*grpc.ClientConn)
return mgr, nil
}
Expand Down
76 changes: 69 additions & 7 deletions pkg/conn/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,20 @@ package conn
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"math"
"net/http"
"net/url"
"strings"
"time"

"github.com/pingcap/log"
"go.uber.org/zap"
"google.golang.org/grpc"

"github.com/pingcap/errors"
pd "github.com/tikv/pd/client"
Expand Down Expand Up @@ -49,13 +56,6 @@ var (
}
)

// PdController manage get/update config from pd.
type PdController struct {
addrs []string
cli *http.Client
pdClient pd.Client
}

type pdHTTPRequest func(context.Context, string, string, *http.Client, string, io.Reader) ([]byte, error)

func pdRequest(
Expand Down Expand Up @@ -88,6 +88,68 @@ func pdRequest(
return r, nil
}

// PdController manage get/update config from pd.
type PdController struct {
addrs []string
cli *http.Client
pdClient pd.Client
}

func NewPdController(
ctx context.Context,
pdAddrs string,
tlsConf *tls.Config,
securityOption pd.SecurityOption,
) (*PdController, error) {
cli := &http.Client{Timeout: 30 * time.Second}
if tlsConf != nil {
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.TLSClientConfig = tlsConf
cli.Transport = transport
}

addrs := strings.Split(pdAddrs, ",")
processedAddrs := make([]string, 0, len(addrs))
var failure error
for _, addr := range addrs {
if addr != "" && !strings.HasPrefix("http", addr) {
if tlsConf != nil {
addr = "https://" + addr
} else {
addr = "http://" + addr
}
}
processedAddrs = append(processedAddrs, addr)
_, failure = pdRequest(ctx, addr, clusterVersionPrefix, cli, http.MethodGet, nil)
if failure == nil {
break
}
}
if failure != nil {
return nil, errors.Annotatef(failure, "pd address (%s) not available, please check network", pdAddrs)
}

maxCallMsgSize := []grpc.DialOption{
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(maxMsgSize)),
}
pdClient, err := pd.NewClientWithContext(
ctx, addrs, securityOption,
pd.WithGRPCDialOptions(maxCallMsgSize...),
pd.WithCustomTimeoutOption(10*time.Second),
)
if err != nil {
log.Error("fail to create pd client", zap.Error(err))
return nil, err
}

return &PdController{
addrs: addrs,
cli: cli,
pdClient: pdClient,
}, nil
}

// RemoveScheduler remove pd scheduler.
func (p *PdController) RemoveScheduler(ctx context.Context, scheduler string) error {
return p.removeSchedulerWith(ctx, scheduler, pdRequest)
Expand Down
22 changes: 12 additions & 10 deletions pkg/conn/pd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,44 +3,46 @@ package conn
import (
"context"
"errors"
"fmt"
"io"
"net/http"

. "github.com/pingcap/check"
)

type testPdMgrSuite struct {
type testPDControllerSuite struct {
}

var _ = Suite(&testPdMgrSuite{})
var _ = Suite(&testPDControllerSuite{})

func (s *testPdMgrSuite) TestScheduler(c *C) {
func (s *testPDControllerSuite) TestScheduler(c *C) {
ctx := context.Background()

scheduler := "balance-leader-scheduler"
mock := func(context.Context, string, string, *http.Client, string, io.Reader) ([]byte, error) {
return nil, errors.New("failed")
}
pdMgr := &PdController{}
err := pdMgr.removeSchedulerWith(ctx, scheduler, mock)
pdController := &PdController{addrs: []string{"", ""}}
err := pdController.removeSchedulerWith(ctx, scheduler, mock)
fmt.Printf("err: %v\n", err)
c.Assert(err, ErrorMatches, "failed")

err = pdMgr.addSchedulerWith(ctx, scheduler, mock)
err = pdController.addSchedulerWith(ctx, scheduler, mock)
c.Assert(err, ErrorMatches, "failed")

_, err = pdMgr.listSchedulersWith(ctx, mock)
_, err = pdController.listSchedulersWith(ctx, mock)
c.Assert(err, ErrorMatches, "failed")

mock = func(context.Context, string, string, *http.Client, string, io.Reader) ([]byte, error) {
return []byte(`["` + scheduler + `"]`), nil
}
err = pdMgr.removeSchedulerWith(ctx, scheduler, mock)
err = pdController.removeSchedulerWith(ctx, scheduler, mock)
c.Assert(err, IsNil)

err = pdMgr.addSchedulerWith(ctx, scheduler, mock)
err = pdController.addSchedulerWith(ctx, scheduler, mock)
c.Assert(err, IsNil)

schedulers, err := pdMgr.listSchedulersWith(ctx, mock)
schedulers, err := pdController.listSchedulersWith(ctx, mock)
c.Assert(err, IsNil)
c.Assert(schedulers, HasLen, 1)
c.Assert(schedulers[0], Equals, scheduler)
Expand Down

0 comments on commit 4c1b788

Please sign in to comment.