Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Jan 2, 2020
1 parent 62dc5ea commit 32bf2cc
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 53 deletions.
54 changes: 34 additions & 20 deletions client/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
"google.golang.org/grpc"
)

// BaseClient is a basic client for all other complex client.
type BaseClient struct {
// baseClient is a basic client for all other complex client.
type baseClient struct {
urls []string
clusterID uint64
connMu struct {
Expand Down Expand Up @@ -57,19 +57,19 @@ type SecurityOption struct {
}

// ClientOption configures client.
type ClientOption func(c *BaseClient)
type ClientOption func(c *baseClient)

// WithGRPCDialOptions configures the client with gRPC dial options.
func WithGRPCDialOptions(opts ...grpc.DialOption) ClientOption {
return func(c *BaseClient) {
return func(c *baseClient) {
c.gRPCDialOptions = append(c.gRPCDialOptions, opts...)
}
}

// NewBaseClient returns a new BaseClient.
func NewBaseClient(ctx context.Context, urls []string, security SecurityOption, opts ...ClientOption) *BaseClient {
// newBaseClient returns a new baseClient.
func newBaseClient(ctx context.Context, urls []string, security SecurityOption, opts ...ClientOption) (*baseClient, error) {
ctx1, cancel := context.WithCancel(ctx)
c := &BaseClient{
c := &baseClient{
urls: urls,
checkLeaderCh: make(chan struct{}, 1),
ctx: ctx1,
Expand All @@ -80,10 +80,24 @@ func NewBaseClient(ctx context.Context, urls []string, security SecurityOption,
for _, opt := range opts {
opt(c)
}
return c

if err := c.initRetry(c.initClusterID); err != nil {
c.cancel()
return nil, err
}
if err := c.initRetry(c.updateLeader); err != nil {
c.cancel()
return nil, err
}
log.Info("[pd] init cluster id", zap.Uint64("cluster-id", c.clusterID))

c.wg.Add(1)
go c.leaderLoop()

return c, nil
}

func (c *BaseClient) initRetry(f func() error) error {
func (c *baseClient) initRetry(f func() error) error {
var err error
for i := 0; i < maxInitClusterRetries; i++ {
if err = f(); err == nil {
Expand All @@ -98,7 +112,7 @@ func (c *BaseClient) initRetry(f func() error) error {
return errors.WithStack(err)
}

func (c *BaseClient) leaderLoop() {
func (c *baseClient) leaderLoop() {
defer c.wg.Done()

ctx, cancel := context.WithCancel(c.ctx)
Expand All @@ -119,33 +133,33 @@ func (c *BaseClient) leaderLoop() {
}

// ScheduleCheckLeader is used to check leader.
func (c *BaseClient) ScheduleCheckLeader() {
func (c *baseClient) ScheduleCheckLeader() {
select {
case c.checkLeaderCh <- struct{}{}:
default:
}
}

// GetClusterID returns the ClusterID.
func (c *BaseClient) GetClusterID(context.Context) uint64 {
func (c *baseClient) GetClusterID(context.Context) uint64 {
return c.clusterID
}

// GetLeaderAddr returns the leader address.
// For testing use.
func (c *BaseClient) GetLeaderAddr() string {
func (c *baseClient) GetLeaderAddr() string {
c.connMu.RLock()
defer c.connMu.RUnlock()
return c.connMu.leader
}

// GetURLs returns the URLs.
// For testing use. It should only be called when the client is closed.
func (c *BaseClient) GetURLs() []string {
func (c *baseClient) GetURLs() []string {
return c.urls
}

func (c *BaseClient) initClusterID() error {
func (c *baseClient) initClusterID() error {
ctx, cancel := context.WithCancel(c.ctx)
defer cancel()
for _, u := range c.urls {
Expand All @@ -162,7 +176,7 @@ func (c *BaseClient) initClusterID() error {
return errors.WithStack(errFailInitClusterID)
}

func (c *BaseClient) updateLeader() error {
func (c *baseClient) updateLeader() error {
for _, u := range c.urls {
ctx, cancel := context.WithTimeout(c.ctx, updateLeaderTimeout)
members, err := c.getMembers(ctx, u)
Expand All @@ -184,7 +198,7 @@ func (c *BaseClient) updateLeader() error {
return errors.Errorf("failed to get leader from %v", c.urls)
}

func (c *BaseClient) getMembers(ctx context.Context, url string) (*pdpb.GetMembersResponse, error) {
func (c *baseClient) getMembers(ctx context.Context, url string) (*pdpb.GetMembersResponse, error) {
cc, err := c.getOrCreateGRPCConn(url)
if err != nil {
return nil, err
Expand All @@ -197,7 +211,7 @@ func (c *BaseClient) getMembers(ctx context.Context, url string) (*pdpb.GetMembe
return members, nil
}

func (c *BaseClient) updateURLs(members []*pdpb.Member) {
func (c *baseClient) updateURLs(members []*pdpb.Member) {
urls := make([]string, 0, len(members))
for _, m := range members {
urls = append(urls, m.GetClientUrls()...)
Expand All @@ -213,7 +227,7 @@ func (c *BaseClient) updateURLs(members []*pdpb.Member) {
c.urls = urls
}

func (c *BaseClient) switchLeader(addrs []string) error {
func (c *baseClient) switchLeader(addrs []string) error {
// FIXME: How to safely compare leader urls? For now, only allows one client url.
addr := addrs[0]

Expand All @@ -236,7 +250,7 @@ func (c *BaseClient) switchLeader(addrs []string) error {
return nil
}

func (c *BaseClient) getOrCreateGRPCConn(addr string) (*grpc.ClientConn, error) {
func (c *baseClient) getOrCreateGRPCConn(addr string) (*grpc.ClientConn, error) {
c.connMu.RLock()
conn, ok := c.connMu.clientConns[addr]
c.connMu.RUnlock()
Expand Down
22 changes: 7 additions & 15 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ var (
)

type client struct {
*BaseClient
*baseClient
tsoRequests chan *tsoRequest

lastPhysical int64
Expand All @@ -130,27 +130,19 @@ func NewClient(pdAddrs []string, security SecurityOption, opts ...ClientOption)
// NewClientWithContext creates a PD client with context.
func NewClientWithContext(ctx context.Context, pdAddrs []string, security SecurityOption, opts ...ClientOption) (Client, error) {
log.Info("[pd] create pd client with endpoints", zap.Strings("pd-address", pdAddrs))
base := NewBaseClient(ctx, addrsToUrls(pdAddrs), security, opts...)
base, err := newBaseClient(ctx, addrsToUrls(pdAddrs), security, opts...)
if err != nil {
return nil, err
}
c := &client{
BaseClient: base,
baseClient: base,
tsoRequests: make(chan *tsoRequest, maxMergeTSORequests),
tsDeadlineCh: make(chan deadline, 1),
}

if err := c.initRetry(c.initClusterID); err != nil {
c.cancel()
return nil, err
}
if err := c.initRetry(c.updateLeader); err != nil {
c.cancel()
return nil, err
}
log.Info("[pd] init cluster id", zap.Uint64("cluster-id", c.clusterID))

c.wg.Add(3)
c.wg.Add(2)
go c.tsLoop()
go c.tsCancelLoop()
go c.leaderLoop()

return c, nil
}
Expand Down
2 changes: 1 addition & 1 deletion client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ func (s *testClientSuite) TestUpdateURLs(c *C) {
}
return
}
cli := &BaseClient{}
cli := &baseClient{}
cli.updateURLs(members[1:])
c.Assert(cli.urls, DeepEquals, getURLs([]*pdpb.Member{members[1], members[3], members[2]}))
cli.updateURLs(members[1:])
Expand Down
20 changes: 4 additions & 16 deletions client/config_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/pingcap/log"
"github.com/pkg/errors"
"go.uber.org/zap"
"google.golang.org/grpc"
)

// ConfigClient is a client to manage the configuration.
Expand All @@ -38,7 +37,7 @@ type ConfigClient interface {
}

type configClient struct {
*BaseClient
*baseClient
}

// NewConfigClient creates a PD configuration client.
Expand All @@ -49,22 +48,11 @@ func NewConfigClient(pdAddrs []string, security SecurityOption) (ConfigClient, e
// NewConfigClientWithContext creates a PD configuration client with the context.
func NewConfigClientWithContext(ctx context.Context, pdAddrs []string, security SecurityOption) (ConfigClient, error) {
log.Info("[pd] create pd configuration client with endpoints", zap.Strings("pd-address", pdAddrs))
base := NewBaseClient(ctx, addrsToUrls(pdAddrs), security)
c := &configClient{base}
c.connMu.clientConns = make(map[string]*grpc.ClientConn)

if err := c.initRetry(c.initClusterID); err != nil {
return nil, err
}
if err := c.initRetry(c.updateLeader); err != nil {
base, err := newBaseClient(ctx, addrsToUrls(pdAddrs), security)
if err != nil {
return nil, err
}
log.Info("[pd] init cluster id", zap.Uint64("cluster-id", c.clusterID))

c.wg.Add(1)
go c.leaderLoop()

return c, nil
return &configClient{base}, nil
}

func (c *configClient) Close() {
Expand Down
1 change: 0 additions & 1 deletion server/config_manager/config_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,6 @@ func (c *ConfigManager) updateLocal(componentID string, version *configpb.Versio
}
if wrongEntry, err := mergeAndUpdateConfig(localCfg, updateEntries); err != nil {
c.deleteEntry(component, wrongEntry)

return localLatestVersion, &configpb.Status{Code: configpb.StatusCode_UNKNOWN, Message: err.Error()}
}
localCfg.Version = &configpb.Version{Global: version.GetGlobal(), Local: version.GetLocal() + 1}
Expand Down

0 comments on commit 32bf2cc

Please sign in to comment.