Skip to content

Commit

Permalink
add captureConfigChange
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Dec 11, 2019
1 parent e38eeb1 commit 55a6b1b
Show file tree
Hide file tree
Showing 12 changed files with 714 additions and 527 deletions.
438 changes: 2 additions & 436 deletions client/client_test.go

Large diffs are not rendered by default.

10 changes: 8 additions & 2 deletions client/config_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,17 @@ type configClient struct {

// NewConfigClient creates a PD configuration client.
func NewConfigClient(pdAddrs []string, security SecurityOption) (ConfigClient, error) {
return NewConfigClientWithContext(context.Background(), pdAddrs, security)
}

// NewConfigClientWithContext creates a PD configuration client with the context.
func NewConfigClientWithContext(ctx context.Context, pdAddrs []string, security SecurityOption) (ConfigClient, error) {
log.Info("[pd] create pd configuration client with endpoints", zap.Strings("pd-address", pdAddrs))
ctx, cancel := context.WithCancel(context.Background())
ctx1, cancel := context.WithCancel(ctx)
c := &configClient{
urls: addrsToUrls(pdAddrs),
checkLeaderCh: make(chan struct{}, 1),
ctx: ctx,
ctx: ctx1,
cancel: cancel,
security: security,
}
Expand Down Expand Up @@ -276,6 +281,7 @@ func (c *configClient) Create(ctx context.Context, v *configpb.Version, componen
Version: v,
Component: component,
ComponentId: componentID,
Config: config,
})
cancel()

Expand Down
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ require (
github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8
github.com/pingcap/errcode v0.0.0-20180921232412-a1a7271709d9
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c
github.com/pingcap/kvproto v0.0.0-20191030021250-51b332bcb20b
github.com/pingcap/kvproto v0.0.0-20191211032946-5dbce7e7b868
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd
github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v0.8.0
Expand All @@ -54,5 +54,3 @@ require (
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0
)

replace github.com/pingcap/kvproto => github.com/rleungx/kvproto v0.0.0-20191209064212-97062f464176
7 changes: 3 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ github.com/pingcap/errors v0.11.0 h1:DCJQB8jrHbQ1VVlMFIrbj2ApScNNotVmkSNplu2yUt4
github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c h1:hvQd3aOLKLF7xvRV6DzvPkKY4QXzfVbjU1BhW0d9yL8=
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c/go.mod h1:DNS3Qg7bEDhU6EXNHF+XSv/PGznQaMJ5FWvctpm6pQI=
github.com/pingcap/kvproto v0.0.0-20191211032946-5dbce7e7b868 h1:4FIcMXOHW2CwuTHJFuU/nLQ9KH2YYIoizuWfh8cQKk8=
github.com/pingcap/kvproto v0.0.0-20191211032946-5dbce7e7b868/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd h1:hWDol43WY5PGhsh3+8794bFHY1bPrmu6bTalpssCrGg=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand All @@ -127,10 +129,7 @@ github.com/prometheus/client_model v0.0.0-20171117100541-99fa1f4be8e5/go.mod h1:
github.com/prometheus/common v0.0.0-20180518154759-7600349dcfe1 h1:osmNoEW2SCW3L7EX0km2LYM8HKpNWRiouxjE3XHkyGc=
github.com/prometheus/common v0.0.0-20180518154759-7600349dcfe1/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/procfs v0.0.0-20180612222113-7d6f385de8be h1:MoyXp/VjXUwM0GyDcdwT7Ubea2gxOSHpPaFo3qV+Y2A=
github.com/prometheus/procfs v0.0.0-20180612222113-7d6f385de8be/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6b
github.com/rleungx/kvproto v0.0.0-20191209064212-97062f464176 h1:3TXH0ganqVyPGcntrFTytaXgDuqNWP+AwjPpTVV594Y=
github.com/rleungx/kvproto v0.0.0-20191209064212-97062f464176/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/prometheus/procfs v0.0.0-20180612222113-7d6f385de8be/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/sirupsen/logrus v1.0.5 h1:8c8b5uO0zS4X6RPl/sd1ENwSkIc0/H2PaHxE3udaE8I=
github.com/sirupsen/logrus v1.0.5/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc=
Expand Down
5 changes: 5 additions & 0 deletions pkg/typeutil/duration.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,8 @@ func (d *Duration) UnmarshalText(text []byte) error {
d.Duration, err = time.ParseDuration(string(text))
return errors.WithStack(err)
}

// MarshalText parses the duration string into the TOML.
func (d Duration) MarshalText() (text []byte, err error) {
return []byte(fmt.Sprintf("%v", d.Duration)), nil
}
160 changes: 153 additions & 7 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,24 @@
package cluster

import (
"bytes"
"context"
"fmt"
"net/http"
"reflect"
"sync"
"time"

"github.com/BurntSushi/toml"
"github.com/coreos/go-semver/semver"
"github.com/gogo/protobuf/proto"
"github.com/pingcap/errcode"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/configpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
pd "github.com/pingcap/pd/client"
"github.com/pingcap/pd/pkg/etcdutil"
"github.com/pingcap/pd/pkg/logutil"
"github.com/pingcap/pd/pkg/typeutil"
Expand All @@ -47,8 +52,10 @@ import (
var backgroundJobInterval = time.Minute

const (
clientTimeout = 3 * time.Second
defaultChangedRegionsLimit = 10000
clientTimeout = 3 * time.Second
defaultChangedRegionsLimit = 10000
captureConfigChangeInterval = 10 * time.Second
component = "pd"
)

// ErrRegionIsStale is error info for region is stale.
Expand All @@ -61,8 +68,19 @@ type Server interface {
GetAllocator() *id.AllocatorImpl
GetScheduleOption() *config.ScheduleOption
GetStorage() *core.Storage
GetEndpoints() []string
GetHBStreams() opt.HeartbeatStreams
GetRaftCluster() *RaftCluster
GetSecurityConfig() *config.SecurityConfig
GetConfig() *config.Config
GetConfigVersion() *configpb.Version
GetLeader() *pdpb.Member
GetMemberInfo() *pdpb.Member

SetScheduleConfig(cfg config.ScheduleConfig) error
SetReplicationConfig(cfg config.ReplicationConfig) error
SetPDServerConfig(cfg config.PDServerConfig) error
SetConfigVersion(version *configpb.Version)
}

// RaftCluster is used for cluster config management.
Expand All @@ -76,6 +94,8 @@ type RaftCluster struct {
sync.RWMutex
ctx context.Context

server Server

running bool

clusterID uint64
Expand Down Expand Up @@ -104,6 +124,9 @@ type RaftCluster struct {

ruleManager *placement.RuleManager
client *clientv3.Client

// config client
configClient pd.ConfigClient
}

// Status saves some state information.
Expand All @@ -113,13 +136,14 @@ type Status struct {
}

// NewRaftCluster create a new cluster.
func NewRaftCluster(ctx context.Context, root string, clusterID uint64, regionSyncer *syncer.RegionSyncer, client *clientv3.Client) *RaftCluster {
func NewRaftCluster(ctx context.Context, root string, clusterID uint64, regionSyncer *syncer.RegionSyncer, s Server, client *clientv3.Client) *RaftCluster {
return &RaftCluster{
ctx: ctx,
running: false,
clusterID: clusterID,
clusterRoot: root,
regionSyncer: regionSyncer,
server: s,
client: client,
}
}
Expand Down Expand Up @@ -186,7 +210,7 @@ func (c *RaftCluster) InitCluster(id id.Allocator, opt *config.ScheduleOption, s
}

// Start starts a cluster.
func (c *RaftCluster) Start(s Server) error {
func (c *RaftCluster) Start() error {
c.Lock()
defer c.Unlock()

Expand All @@ -195,7 +219,7 @@ func (c *RaftCluster) Start(s Server) error {
return nil
}

c.InitCluster(s.GetAllocator(), s.GetScheduleOption(), s.GetStorage())
c.InitCluster(c.server.GetAllocator(), c.server.GetScheduleOption(), c.server.GetStorage())
cluster, err := c.LoadClusterInfo()
if err != nil {
return err
Expand All @@ -204,17 +228,28 @@ func (c *RaftCluster) Start(s Server) error {
return nil
}

c.coordinator = newCoordinator(c.ctx, cluster, s.GetHBStreams())
c.coordinator = newCoordinator(c.ctx, cluster, c.server.GetHBStreams())
c.regionStats = statistics.NewRegionStatistics(c.opt)

securityConfig := c.server.GetSecurityConfig()
c.configClient, err = pd.NewConfigClientWithContext(c.ctx, c.server.GetEndpoints(), pd.SecurityOption{
CAPath: securityConfig.CAPath,
CertPath: securityConfig.CertPath,
KeyPath: securityConfig.KeyPath,
})
if err != nil {
return err
}
c.quit = make(chan struct{})

c.wg.Add(3)
c.wg.Add(4)
go c.runCoordinator()
failpoint.Inject("highFrequencyClusterJobs", func() {
backgroundJobInterval = 100 * time.Microsecond
})
go c.runBackgroundJobs(backgroundJobInterval)
go c.syncRegions()
go c.captureConfigChange(captureConfigChangeInterval)
c.running = true

return nil
Expand Down Expand Up @@ -277,6 +312,58 @@ func (c *RaftCluster) runBackgroundJobs(interval time.Duration) {
}
}

func (c *RaftCluster) captureConfigChange(interval time.Duration) {
defer logutil.LogPanic()
defer c.wg.Done()

ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
leader := c.server.GetLeader()
if leader != nil {
break
}
}
ctx, cancel := context.WithCancel(c.ctx)

name := c.server.GetMemberInfo().GetName()
retry := true
var err error
for retry {
version := c.server.GetConfigVersion()
config := new(bytes.Buffer)
if err := toml.NewEncoder(config).Encode(*c.server.GetConfig()); err != nil {
log.Error("failed to encode config", zap.Error(err))
cancel()
return
}
retry, err = c.createComponentConfig(ctx, version, name, config.String())
if err != nil {
log.Error("failed to create config", zap.Error(err))
cancel()
return
}
}

for {
select {
case <-c.quit:
log.Info("capture config change has been stopped")
cancel()
return
case <-ticker.C:
version := c.server.GetConfigVersion()
config, err := c.getComponentConfig(ctx, version, name)
if err != nil {
log.Error("failed to get config", zap.Error(err))
}
if config != "" {
c.updateComponentConfig(config)
}
}
}
}

func (c *RaftCluster) runCoordinator() {
defer logutil.LogPanic()
defer c.wg.Done()
Expand Down Expand Up @@ -555,6 +642,65 @@ func (c *RaftCluster) updateStoreStatusLocked(id uint64) {
c.core.UpdateStoreStatus(id, leaderCount, regionCount, pendingPeerCount, leaderRegionSize, regionSize)
}

func (c *RaftCluster) createComponentConfig(ctx context.Context, version *configpb.Version, componentID, config string) (bool, error) {
status, v, config, err := c.configClient.Create(ctx, version, component, componentID, config)
if err != nil {
return false, err
}
var retry bool
switch status.GetCode() {
case configpb.StatusCode_OK:
c.server.SetConfigVersion(v)
c.updateComponentConfig(config)
retry = false
case configpb.StatusCode_NOT_CHANGE:
retry = false
case configpb.StatusCode_WRONG_VERSION:
c.server.SetConfigVersion(v)
retry = true
case configpb.StatusCode_UNKNOWN:
return false, errors.Errorf("unknown error: %v", status.GetMessage())
}
return retry, nil
}

func (c *RaftCluster) getComponentConfig(ctx context.Context, version *configpb.Version, componentID string) (string, error) {
status, v, cfg, err := c.configClient.Get(ctx, version, component, componentID)
if err != nil {
return "", err
}
var config string
switch status.GetCode() {
case configpb.StatusCode_OK:
config = cfg
case configpb.StatusCode_WRONG_VERSION:
c.server.SetConfigVersion(v)
case configpb.StatusCode_UNKNOWN:
return "", err
}
return config, nil
}

func (c *RaftCluster) updateComponentConfig(cfg string) error {
old := c.server.GetConfig()
new := &config.Config{}
if _, err := toml.Decode(cfg, &new); err != nil {
return err
}
if !reflect.DeepEqual(old.Schedule, new.Schedule) {
c.server.SetScheduleConfig(new.Schedule)
}

if !reflect.DeepEqual(old.Replication, new.Replication) {
c.server.SetReplicationConfig(new.Replication)
}

if !reflect.DeepEqual(old.PDServerCfg, new.PDServerCfg) {
c.server.SetPDServerConfig(new.PDServerCfg)
}
return nil
}

func (c *RaftCluster) getClusterID() uint64 {
c.RLock()
defer c.RUnlock()
Expand Down
Loading

0 comments on commit 55a6b1b

Please sign in to comment.