From 4abf1bc4ee06eb7820465c4b33610d8a9436b8bc Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Thu, 19 Dec 2019 21:35:53 +0800 Subject: [PATCH] add config client Signed-off-by: Ryan Leung --- client/config_client.go | 382 ++++++++++++++++++++++++ client/metrics.go | 33 ++ server/config_manager/config_manager.go | 27 +- tests/client/client_test.go | 16 +- tests/client/config_client_test.go | 291 ++++++++++++++++++ 5 files changed, 736 insertions(+), 13 deletions(-) create mode 100644 client/config_client.go create mode 100644 tests/client/config_client_test.go diff --git a/client/config_client.go b/client/config_client.go new file mode 100644 index 000000000000..12c3024eaa7f --- /dev/null +++ b/client/config_client.go @@ -0,0 +1,382 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package pd + +import ( + "context" + "sync" + "time" + + "github.com/opentracing/opentracing-go" + "github.com/pingcap/kvproto/pkg/configpb" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/log" + "github.com/pingcap/pd/pkg/grpcutil" + "github.com/pkg/errors" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +// ConfigClient is a client to manage the configuration. +// It should not be used after calling Close(). +type ConfigClient interface { + GetClusterID(ctx context.Context) uint64 + Create(ctx context.Context, v *configpb.Version, component, componentID, config string) (*configpb.Status, *configpb.Version, string, error) + Get(ctx context.Context, v *configpb.Version, component, componentID string) (*configpb.Status, *configpb.Version, string, error) + Update(ctx context.Context, v *configpb.Version, kind *configpb.ConfigKind, entries []*configpb.ConfigEntry) (*configpb.Status, *configpb.Version, error) + Delete(ctx context.Context, v *configpb.Version, kind *configpb.ConfigKind) (*configpb.Status, error) + // Close closes the client. + Close() +} + +type configClient struct { + urls []string + clusterID uint64 + connMu struct { + sync.RWMutex + clientConns map[string]*grpc.ClientConn + leader string + } + + checkLeaderCh chan struct{} + + wg sync.WaitGroup + ctx context.Context + cancel context.CancelFunc + + security SecurityOption +} + +// NewConfigClient creates a PD configuration client. +func NewConfigClient(pdAddrs []string, security SecurityOption) (ConfigClient, error) { + return NewConfigClientWithContext(context.Background(), pdAddrs, security) +} + +// NewConfigClientWithContext creates a PD configuration client with the context. +func NewConfigClientWithContext(ctx context.Context, pdAddrs []string, security SecurityOption) (ConfigClient, error) { + log.Info("[pd] create pd configuration client with endpoints", zap.Strings("pd-address", pdAddrs)) + ctx1, cancel := context.WithCancel(ctx) + c := &configClient{ + urls: addrsToUrls(pdAddrs), + checkLeaderCh: make(chan struct{}, 1), + ctx: ctx1, + cancel: cancel, + security: security, + } + c.connMu.clientConns = make(map[string]*grpc.ClientConn) + + if err := c.initRetry(c.initClusterID); err != nil { + return nil, err + } + if err := c.initRetry(c.updateLeader); err != nil { + return nil, err + } + log.Info("[pd] init cluster id", zap.Uint64("cluster-id", c.clusterID)) + + c.wg.Add(1) + go c.leaderLoop() + + return c, nil +} + +func (c *configClient) updateURLs(members []*pdpb.Member) { + urls := make([]string, 0, len(members)) + for _, m := range members { + urls = append(urls, m.GetClientUrls()...) + } + c.urls = urls +} + +func (c *configClient) initRetry(f func() error) error { + var err error + for i := 0; i < maxInitClusterRetries; i++ { + if err = f(); err == nil { + return nil + } + time.Sleep(time.Second) + } + return errors.WithStack(err) +} + +func (c *configClient) initClusterID() error { + ctx, cancel := context.WithCancel(c.ctx) + defer cancel() + for _, u := range c.urls { + timeoutCtx, timeoutCancel := context.WithTimeout(ctx, pdTimeout) + members, err := c.getMembers(timeoutCtx, u) + timeoutCancel() + if err != nil || members.GetHeader() == nil { + log.Warn("[pd] failed to get cluster id", zap.String("url", u), zap.Error(err)) + continue + } + c.clusterID = members.GetHeader().GetClusterId() + return nil + } + return errors.WithStack(errFailInitClusterID) +} + +func (c *configClient) updateLeader() error { + for _, u := range c.urls { + ctx, cancel := context.WithTimeout(c.ctx, updateLeaderTimeout) + members, err := c.getMembers(ctx, u) + cancel() + if err != nil || members.GetLeader() == nil || len(members.GetLeader().GetClientUrls()) == 0 { + select { + case <-c.ctx.Done(): + return errors.WithStack(err) + default: + continue + } + } + c.updateURLs(members.GetMembers()) + return c.switchLeader(members.GetLeader().GetClientUrls()) + } + return errors.Errorf("failed to get leader from %v", c.urls) +} + +func (c *configClient) getMembers(ctx context.Context, url string) (*pdpb.GetMembersResponse, error) { + cc, err := c.getOrCreateGRPCConn(url) + if err != nil { + return nil, err + } + members, err := pdpb.NewPDClient(cc).GetMembers(ctx, &pdpb.GetMembersRequest{}) + if err != nil { + return nil, errors.WithStack(err) + } + return members, nil +} + +func (c *configClient) switchLeader(addrs []string) error { + // FIXME: How to safely compare leader urls? For now, only allows one client url. + addr := addrs[0] + + c.connMu.RLock() + oldLeader := c.connMu.leader + c.connMu.RUnlock() + + if addr == oldLeader { + return nil + } + + log.Info("[pd] switch leader", zap.String("new-leader", addr), zap.String("old-leader", oldLeader)) + if _, err := c.getOrCreateGRPCConn(addr); err != nil { + return err + } + + c.connMu.Lock() + defer c.connMu.Unlock() + c.connMu.leader = addr + return nil +} + +func (c *configClient) getOrCreateGRPCConn(addr string) (*grpc.ClientConn, error) { + c.connMu.RLock() + conn, ok := c.connMu.clientConns[addr] + c.connMu.RUnlock() + if ok { + return conn, nil + } + + cc, err := grpcutil.GetClientConn(addr, c.security.CAPath, c.security.CertPath, c.security.KeyPath) + if err != nil { + return nil, errors.WithStack(err) + } + c.connMu.Lock() + defer c.connMu.Unlock() + if old, ok := c.connMu.clientConns[addr]; ok { + cc.Close() + return old, nil + } + + c.connMu.clientConns[addr] = cc + return cc, nil +} + +func (c *configClient) leaderLoop() { + defer c.wg.Done() + + ctx, cancel := context.WithCancel(c.ctx) + defer cancel() + + for { + select { + case <-c.checkLeaderCh: + case <-time.After(time.Minute): + case <-ctx.Done(): + return + } + + if err := c.updateLeader(); err != nil { + log.Error("[pd] failed updateLeader", zap.Error(err)) + } + } +} + +func (c *configClient) Close() { + c.cancel() + c.wg.Wait() + + c.connMu.Lock() + defer c.connMu.Unlock() + for _, cc := range c.connMu.clientConns { + if err := cc.Close(); err != nil { + log.Error("[pd] failed close grpc clientConn", zap.Error(err)) + } + } +} + +// leaderClient gets the client of current PD leader. +func (c *configClient) leaderClient() configpb.ConfigClient { + c.connMu.RLock() + defer c.connMu.RUnlock() + + return configpb.NewConfigClient(c.connMu.clientConns[c.connMu.leader]) +} + +func (c *configClient) ScheduleCheckLeader() { + select { + case c.checkLeaderCh <- struct{}{}: + default: + } +} + +func (c *configClient) GetClusterID(context.Context) uint64 { + return c.clusterID +} + +// For testing use. +func (c *configClient) GetLeaderAddr() string { + c.connMu.RLock() + defer c.connMu.RUnlock() + return c.connMu.leader +} + +// For testing use. It should only be called when the client is closed. +func (c *configClient) GetURLs() []string { + return c.urls +} + +func (c *configClient) Create(ctx context.Context, v *configpb.Version, component, componentID, config string) (*configpb.Status, *configpb.Version, string, error) { + if span := opentracing.SpanFromContext(ctx); span != nil { + span = opentracing.StartSpan("configclient.Create", opentracing.ChildOf(span.Context())) + defer span.Finish() + } + + start := time.Now() + defer func() { configCmdDurationCreate.Observe(time.Since(start).Seconds()) }() + + ctx, cancel := context.WithTimeout(ctx, pdTimeout) + resp, err := c.leaderClient().Create(ctx, &configpb.CreateRequest{ + Header: c.requestHeader(), + Version: v, + Component: component, + ComponentId: componentID, + Config: config, + }) + cancel() + + if err != nil { + configCmdFailDurationCreate.Observe(time.Since(start).Seconds()) + c.ScheduleCheckLeader() + return nil, nil, "", errors.WithStack(err) + } + + return resp.GetStatus(), resp.GetVersion(), resp.GetConfig(), nil +} + +func (c *configClient) Get(ctx context.Context, v *configpb.Version, component, componentID string) (*configpb.Status, *configpb.Version, string, error) { + if span := opentracing.SpanFromContext(ctx); span != nil { + span = opentracing.StartSpan("configclient.Get", opentracing.ChildOf(span.Context())) + defer span.Finish() + } + + start := time.Now() + defer func() { configCmdDurationGet.Observe(time.Since(start).Seconds()) }() + + ctx, cancel := context.WithTimeout(ctx, pdTimeout) + resp, err := c.leaderClient().Get(ctx, &configpb.GetRequest{ + Header: c.requestHeader(), + Version: v, + Component: component, + ComponentId: componentID, + }) + cancel() + + if err != nil { + configCmdFailDurationGet.Observe(time.Since(start).Seconds()) + c.ScheduleCheckLeader() + return nil, nil, "", errors.WithStack(err) + } + + return resp.GetStatus(), resp.GetVersion(), resp.GetConfig(), nil +} + +func (c *configClient) Update(ctx context.Context, v *configpb.Version, kind *configpb.ConfigKind, entries []*configpb.ConfigEntry) (*configpb.Status, *configpb.Version, error) { + if span := opentracing.SpanFromContext(ctx); span != nil { + span = opentracing.StartSpan("configclient.Update", opentracing.ChildOf(span.Context())) + defer span.Finish() + } + + start := time.Now() + defer func() { configCmdDurationUpdate.Observe(time.Since(start).Seconds()) }() + + ctx, cancel := context.WithTimeout(ctx, pdTimeout) + resp, err := c.leaderClient().Update(ctx, &configpb.UpdateRequest{ + Header: c.requestHeader(), + Version: v, + Kind: kind, + Entries: entries, + }) + cancel() + + if err != nil { + configCmdFailDurationUpdate.Observe(time.Since(start).Seconds()) + c.ScheduleCheckLeader() + return nil, nil, errors.WithStack(err) + } + + return resp.GetStatus(), resp.GetVersion(), nil +} + +func (c *configClient) Delete(ctx context.Context, v *configpb.Version, kind *configpb.ConfigKind) (*configpb.Status, error) { + if span := opentracing.SpanFromContext(ctx); span != nil { + span = opentracing.StartSpan("configclient.Delete", opentracing.ChildOf(span.Context())) + defer span.Finish() + } + + start := time.Now() + defer func() { configCmdDurationDelete.Observe(time.Since(start).Seconds()) }() + + ctx, cancel := context.WithTimeout(ctx, pdTimeout) + resp, err := c.leaderClient().Update(ctx, &configpb.UpdateRequest{ + Header: c.requestHeader(), + Version: v, + Kind: kind, + }) + cancel() + + if err != nil { + configCmdFailDurationDelete.Observe(time.Since(start).Seconds()) + c.ScheduleCheckLeader() + return nil, errors.WithStack(err) + } + + return resp.GetStatus(), nil +} + +func (c *configClient) requestHeader() *configpb.Header { + return &configpb.Header{ + ClusterId: c.clusterID, + } +} diff --git a/client/metrics.go b/client/metrics.go index 5c5edb7e01ea..dec6c0c6a228 100644 --- a/client/metrics.go +++ b/client/metrics.go @@ -51,6 +51,24 @@ var ( Help: "Bucketed histogram of the batch size of handled requests.", Buckets: prometheus.ExponentialBuckets(1, 2, 13), }) + + configCmdDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "config_client", + Subsystem: "cmd", + Name: "handle_cmds_duration_seconds", + Help: "Bucketed histogram of processing time (s) of handled success cmds.", + Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13), + }, []string{"type"}) + + configCmdFailedDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "config_client", + Subsystem: "cmd", + Name: "handle_failed_cmds_duration_seconds", + Help: "Bucketed histogram of processing time (s) of failed handled cmds.", + Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13), + }, []string{"type"}) ) var ( @@ -77,6 +95,17 @@ var ( cmdFailedDurationGetAllStores = cmdFailedDuration.WithLabelValues("get_all_stores") cmdFailedDurationUpdateGCSafePoint = cmdFailedDuration.WithLabelValues("update_gc_safe_point") requestDurationTSO = requestDuration.WithLabelValues("tso") + + // config + configCmdDurationCreate = configCmdDuration.WithLabelValues("create") + configCmdDurationGet = configCmdDuration.WithLabelValues("get") + configCmdDurationUpdate = configCmdDuration.WithLabelValues("update") + configCmdDurationDelete = configCmdDuration.WithLabelValues("delete") + + configCmdFailDurationCreate = configCmdFailedDuration.WithLabelValues("create") + configCmdFailDurationGet = configCmdFailedDuration.WithLabelValues("get") + configCmdFailDurationUpdate = configCmdFailedDuration.WithLabelValues("update") + configCmdFailDurationDelete = configCmdFailedDuration.WithLabelValues("delete") ) func init() { @@ -84,4 +113,8 @@ func init() { prometheus.MustRegister(cmdFailedDuration) prometheus.MustRegister(requestDuration) prometheus.MustRegister(tsoBatchSize) + + // config + prometheus.MustRegister(configCmdDuration) + prometheus.MustRegister(configCmdFailedDuration) } diff --git a/server/config_manager/config_manager.go b/server/config_manager/config_manager.go index 1971cd78ac77..39b80e4386c5 100644 --- a/server/config_manager/config_manager.go +++ b/server/config_manager/config_manager.go @@ -234,7 +234,8 @@ func (c *ConfigManager) ApplyGlobalConifg(globalCfg *GlobalConfig, component str // update all local config // merge the global config with each local config and update it for _, LocalCfg := range c.LocalCfgs[component] { - if err := mergeAndUpdateConfig(LocalCfg, updateEntries); err != nil { + if wrongEntry, err := mergeAndUpdateConfig(LocalCfg, updateEntries); err != nil { + c.deleteEntry(component, wrongEntry) return err } LocalCfg.Version = &configpb.Version{Global: newGlobalVersion, Local: 0} @@ -275,7 +276,7 @@ func (c *ConfigManager) updateGlobal(component string, version *configpb.Version return &configpb.Version{Global: c.GlobalCfgs[component].getVersion(), Local: 0}, &configpb.Status{Code: configpb.StatusCode_OK} } -func mergeAndUpdateConfig(localCfg *LocalConfig, updateEntries map[string]*EntryValue) error { +func mergeAndUpdateConfig(localCfg *LocalConfig, updateEntries map[string]*EntryValue) (string, error) { config := localCfg.getConfigs() newUpdateEntries := make(map[string]*EntryValue) for k, v := range updateEntries { @@ -297,10 +298,10 @@ func mergeAndUpdateConfig(localCfg *LocalConfig, updateEntries map[string]*Entry for k, v := range newUpdateEntries { configName := strings.Split(k, ".") if err := update(config, configName, v.Value); err != nil { - return err + return k, err } } - return nil + return "", nil } func (c *ConfigManager) updateLocal(componentID string, version *configpb.Version, entries []*configpb.ConfigEntry) (*configpb.Version, *configpb.Status) { @@ -323,7 +324,11 @@ func (c *ConfigManager) updateLocal(componentID string, version *configpb.Versio for _, entry := range entries { localCfg.updateEntry(entry, version) } - mergeAndUpdateConfig(localCfg, updateEntries) + if wrongEntry, err := mergeAndUpdateConfig(localCfg, updateEntries); err != nil { + c.deleteEntry(component, wrongEntry) + + return localLatestVersion, &configpb.Status{Code: configpb.StatusCode_UNKNOWN, Message: err.Error()} + } localCfg.Version = &configpb.Version{Global: version.GetGlobal(), Local: version.GetLocal() + 1} } else { return version, &configpb.Status{Code: configpb.StatusCode_COMPONENT_ID_NOT_FOUND} @@ -331,6 +336,15 @@ func (c *ConfigManager) updateLocal(componentID string, version *configpb.Versio return c.LocalCfgs[component][componentID].getVersion(), &configpb.Status{Code: configpb.StatusCode_OK} } +func (c *ConfigManager) deleteEntry(component, e string) { + if globalCfg, ok := c.GlobalCfgs[component]; ok { + delete(globalCfg.UpdateEntries, e) + } + for _, localCfg := range c.LocalCfgs[component] { + delete(localCfg.UpdateEntries, e) + } +} + // DeleteConfig removes a component from the config manager. func (c *ConfigManager) DeleteConfig(kind *configpb.ConfigKind, version *configpb.Version) *configpb.Status { c.Lock() @@ -486,6 +500,9 @@ func update(config map[string]interface{}, configName []string, value string) er } t := reflect.TypeOf(res[configName[0]]) + if t == nil { + return errors.Errorf("config item %s is not existed", configName[0]) + } // TODO: support more types var v interface{} var err error diff --git a/tests/client/client_test.go b/tests/client/client_test.go index 824bc80608c9..3784dabc86ac 100644 --- a/tests/client/client_test.go +++ b/tests/client/client_test.go @@ -34,19 +34,19 @@ func Test(t *testing.T) { TestingT(t) } -var _ = Suite(&serverTestSuite{}) +var _ = Suite(&clientTestSuite{}) -type serverTestSuite struct { +type clientTestSuite struct { ctx context.Context cancel context.CancelFunc } -func (s *serverTestSuite) SetUpSuite(c *C) { +func (s *clientTestSuite) SetUpSuite(c *C) { s.ctx, s.cancel = context.WithCancel(context.Background()) server.EnableZap = true } -func (s *serverTestSuite) TearDownSuite(c *C) { +func (s *clientTestSuite) TearDownSuite(c *C) { s.cancel() } @@ -56,7 +56,7 @@ type client interface { GetURLs() []string } -func (s *serverTestSuite) TestClientLeaderChange(c *C) { +func (s *clientTestSuite) TestClientLeaderChange(c *C) { cluster, err := tests.NewTestCluster(3) c.Assert(err, IsNil) defer cluster.Destroy() @@ -110,7 +110,7 @@ func (s *serverTestSuite) TestClientLeaderChange(c *C) { c.Assert(urls, DeepEquals, endpoints) } -func (s *serverTestSuite) TestLeaderTransfer(c *C) { +func (s *clientTestSuite) TestLeaderTransfer(c *C) { cluster, err := tests.NewTestCluster(2) c.Assert(err, IsNil) defer cluster.Destroy() @@ -176,13 +176,13 @@ func (s *serverTestSuite) TestLeaderTransfer(c *C) { wg.Wait() } -func (s *serverTestSuite) waitLeader(c *C, cli client, leader string) { +func (s *clientTestSuite) waitLeader(c *C, cli client, leader string) { testutil.WaitUntil(c, func(c *C) bool { cli.ScheduleCheckLeader() return cli.GetLeaderAddr() == leader }) } -func (s *serverTestSuite) makeTS(physical, logical int64) uint64 { +func (s *clientTestSuite) makeTS(physical, logical int64) uint64 { return uint64(physical<<18 + logical) } diff --git a/tests/client/config_client_test.go b/tests/client/config_client_test.go new file mode 100644 index 000000000000..537380fffedd --- /dev/null +++ b/tests/client/config_client_test.go @@ -0,0 +1,291 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package client_test + +import ( + "context" + "path/filepath" + "sort" + "strconv" + "strings" + "time" + + . "github.com/pingcap/check" + "github.com/pingcap/kvproto/pkg/configpb" + pd "github.com/pingcap/pd/client" + "github.com/pingcap/pd/pkg/testutil" + "github.com/pingcap/pd/server" + "github.com/pingcap/pd/server/config" + "github.com/pingcap/pd/tests" + "go.etcd.io/etcd/clientv3" +) + +var _ = Suite(&configClientTestSuite{}) + +type configClientTestSuite struct { + ctx context.Context + cancel context.CancelFunc +} + +func (s *configClientTestSuite) SetUpSuite(c *C) { + s.ctx, s.cancel = context.WithCancel(context.Background()) + server.EnableZap = true +} + +func (s *configClientTestSuite) TearDownSuite(c *C) { + s.cancel() +} + +func (s *configClientTestSuite) TestUpdateWrongEntry(c *C) { + cluster, err := tests.NewTestCluster(1, func(cfg *config.Config) { cfg.EnableConfigManager = true }) + c.Assert(err, IsNil) + defer cluster.Destroy() + + err = cluster.RunInitialServers(s.ctx) + c.Assert(err, IsNil) + cluster.WaitLeader() + leaderServer := cluster.GetServer(cluster.GetLeader()) + c.Assert(leaderServer.BootstrapCluster(), IsNil) + + var endpoints []string + for _, s := range cluster.GetServers() { + endpoints = append(endpoints, s.GetConfig().AdvertiseClientUrls) + } + cli, err := pd.NewConfigClient(endpoints, pd.SecurityOption{}) + c.Assert(err, IsNil) + + cfgData := `[aaa] + xxx-yyy-zzz = 1 + [aaa.bbb] + xxx-yyy = "1KB" + xxx-zzz = false + yyy-zzz = ["aa", "bb"] + [aaa.bbb.ccc] + yyy-xxx = 0.00005 +` + + // create config + status, version, config, err := cli.Create(s.ctx, &configpb.Version{Global: 0, Local: 0}, "component", "component1", cfgData) + c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) + c.Assert(config, Equals, cfgData) + c.Assert(version, DeepEquals, &configpb.Version{Global: 0, Local: 0}) + c.Assert(err, IsNil) + + // update wrong config + status, version, err = cli.Update(s.ctx, + &configpb.Version{Global: 0, Local: 0}, + &configpb.ConfigKind{Kind: &configpb.ConfigKind_Global{Global: &configpb.Global{Component: "component"}}}, + []*configpb.ConfigEntry{{Name: "aaa.xxx-xxx", Value: "2"}}, + ) + c.Assert(status.GetCode(), Equals, configpb.StatusCode_UNKNOWN) + c.Assert(strings.Contains(status.GetMessage(), "is not existed"), IsTrue) + c.Assert(version, DeepEquals, &configpb.Version{Global: 0, Local: 0}) + c.Assert(err, IsNil) + + // update right config + status, version, err = cli.Update(s.ctx, + &configpb.Version{Global: 0, Local: 0}, + &configpb.ConfigKind{Kind: &configpb.ConfigKind_Global{Global: &configpb.Global{Component: "component"}}}, + []*configpb.ConfigEntry{{Name: "aaa.xxx-yyy-zzz", Value: "2"}}, + ) + c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) + c.Assert(version, DeepEquals, &configpb.Version{Global: 1, Local: 0}) + c.Assert(err, IsNil) +} + +func (s *configClientTestSuite) TestClientLeaderChange(c *C) { + cluster, err := tests.NewTestCluster(3, func(cfg *config.Config) { cfg.EnableConfigManager = true }) + c.Assert(err, IsNil) + defer cluster.Destroy() + + err = cluster.RunInitialServers(s.ctx) + c.Assert(err, IsNil) + cluster.WaitLeader() + leaderServer := cluster.GetServer(cluster.GetLeader()) + c.Assert(leaderServer.BootstrapCluster(), IsNil) + + var endpoints []string + for _, s := range cluster.GetServers() { + endpoints = append(endpoints, s.GetConfig().AdvertiseClientUrls) + } + cli, err := pd.NewConfigClient(endpoints, pd.SecurityOption{}) + c.Assert(err, IsNil) + + cfgData := `[aaa] + xxx-yyy-zzz = 1 + [aaa.bbb] + xxx-yyy = "1KB" + xxx-zzz = false + yyy-zzz = ["aa", "bb"] + [aaa.bbb.ccc] + yyy-xxx = 0.00005 +` + + // create config + status, version, config, err := cli.Create(s.ctx, &configpb.Version{Global: 0, Local: 0}, "component", "component1", cfgData) + c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) + c.Assert(config, Equals, cfgData) + c.Assert(version, DeepEquals, &configpb.Version{Global: 0, Local: 0}) + c.Assert(err, IsNil) + + // get config + status, version, config, err = cli.Get(s.ctx, &configpb.Version{Global: 0, Local: 0}, "component", "component1") + c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) + c.Assert(config, Equals, cfgData) + c.Assert(version, DeepEquals, &configpb.Version{Global: 0, Local: 0}) + c.Assert(err, IsNil) + + // update config + status, version, err = cli.Update(s.ctx, + &configpb.Version{Global: 0, Local: 0}, + &configpb.ConfigKind{Kind: &configpb.ConfigKind_Global{Global: &configpb.Global{Component: "component"}}}, + []*configpb.ConfigEntry{{Name: "aaa.xxx-yyy-zzz", Value: "2"}}, + ) + c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) + c.Assert(version, DeepEquals, &configpb.Version{Global: 1, Local: 0}) + c.Assert(err, IsNil) + cfgData1 := `[aaa] + xxx-yyy-zzz = 2 + [aaa.bbb] + xxx-yyy = "1KB" + xxx-zzz = false + yyy-zzz = ["aa", "bb"] + [aaa.bbb.ccc] + yyy-xxx = 0.00005 +` + // get config + status, version, config, err = cli.Get(s.ctx, &configpb.Version{Global: 1, Local: 0}, "component", "component1") + c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) + c.Assert(config, Equals, cfgData1) + c.Assert(version, DeepEquals, &configpb.Version{Global: 1, Local: 0}) + c.Assert(err, IsNil) + + leader := cluster.GetLeader() + s.waitLeader(c, cli.(client), cluster.GetServer(leader).GetConfig().ClientUrls) + + err = cluster.GetServer(leader).Stop() + c.Assert(err, IsNil) + leader = cluster.WaitLeader() + c.Assert(leader, Not(Equals), "") + s.waitLeader(c, cli.(client), cluster.GetServer(leader).GetConfig().ClientUrls) + + // get config + status, version, config, err = cli.Get(s.ctx, &configpb.Version{Global: 1, Local: 0}, "component", "component1") + c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) + c.Assert(config, Equals, cfgData1) + c.Assert(version, DeepEquals, &configpb.Version{Global: 1, Local: 0}) + c.Assert(err, IsNil) + + // Check URL list. + cli.Close() + urls := cli.(client).GetURLs() + sort.Strings(urls) + sort.Strings(endpoints) + c.Assert(urls, DeepEquals, endpoints) +} + +func (s *configClientTestSuite) TestLeaderTransfer(c *C) { + cluster, err := tests.NewTestCluster(2, func(cfg *config.Config) { cfg.EnableConfigManager = true }) + c.Assert(err, IsNil) + defer cluster.Destroy() + + err = cluster.RunInitialServers(s.ctx) + c.Assert(err, IsNil) + cluster.WaitLeader() + leaderServer := cluster.GetServer(cluster.GetLeader()) + c.Assert(leaderServer.BootstrapCluster(), IsNil) + + var endpoints []string + for _, s := range cluster.GetServers() { + endpoints = append(endpoints, s.GetConfig().AdvertiseClientUrls) + } + cli, err := pd.NewConfigClient(endpoints, pd.SecurityOption{}) + c.Assert(err, IsNil) + cfgData := `[aaa] + xxx-yyy-zzz = 1 + [aaa.bbb] + xxx-yyy = "1KB" + xxx-zzz = false + yyy-zzz = ["aa", "bb"] + [aaa.bbb.ccc] + yyy-xxx = 0.00005 +` + // create config + status, version, config, err := cli.Create(s.ctx, &configpb.Version{Global: 0, Local: 0}, "component", "component1", cfgData) + c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) + c.Assert(config, Equals, cfgData) + c.Assert(version, DeepEquals, &configpb.Version{Global: 0, Local: 0}) + c.Assert(err, IsNil) + + // get config + status, version, config, err = cli.Get(s.ctx, &configpb.Version{Global: 0, Local: 0}, "component", "component1") + c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) + c.Assert(config, Equals, cfgData) + c.Assert(version, DeepEquals, &configpb.Version{Global: 0, Local: 0}) + c.Assert(err, IsNil) + + // update config + status, version, err = cli.Update(s.ctx, + &configpb.Version{Global: 0, Local: 0}, + &configpb.ConfigKind{Kind: &configpb.ConfigKind_Global{Global: &configpb.Global{Component: "component"}}}, + []*configpb.ConfigEntry{{Name: "aaa.bbb.xxx-yyy", Value: "2KB"}}, + ) + c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) + c.Assert(version, DeepEquals, &configpb.Version{Global: 1, Local: 0}) + c.Assert(err, IsNil) + cfgData1 := `[aaa] + xxx-yyy-zzz = 1 + [aaa.bbb] + xxx-yyy = "2KB" + xxx-zzz = false + yyy-zzz = ["aa", "bb"] + [aaa.bbb.ccc] + yyy-xxx = 0.00005 +` + // get config + status, version, config, err = cli.Get(s.ctx, &configpb.Version{Global: 1, Local: 0}, "component", "component1") + c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) + c.Assert(config, Equals, cfgData1) + c.Assert(version, DeepEquals, &configpb.Version{Global: 1, Local: 0}) + c.Assert(err, IsNil) + + // Transfer leader. + etcdCli, err := clientv3.New(clientv3.Config{ + Endpoints: endpoints, + DialTimeout: time.Second, + }) + c.Assert(err, IsNil) + leaderPath := filepath.Join("/pd", strconv.FormatUint(cli.GetClusterID(context.Background()), 10), "leader") + for i := 0; i < 10; i++ { + cluster.WaitLeader() + _, err = etcdCli.Delete(context.TODO(), leaderPath) + c.Assert(err, IsNil) + // Sleep to make sure all servers are notified and starts campaign. + time.Sleep(time.Second) + } + + // get config + status, version, config, err = cli.Get(s.ctx, &configpb.Version{Global: 1, Local: 0}, "component", "component1") + c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) + c.Assert(config, Equals, cfgData1) + c.Assert(version, DeepEquals, &configpb.Version{Global: 1, Local: 0}) + c.Assert(err, IsNil) +} + +func (s *configClientTestSuite) waitLeader(c *C, cli client, leader string) { + testutil.WaitUntil(c, func(c *C) bool { + cli.ScheduleCheckLeader() + return cli.GetLeaderAddr() == leader + }) +}