diff --git a/server/config/config.go b/server/config/config.go index 4a70123ee51..95b27a4a406 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -129,6 +129,8 @@ type Config struct { logger *zap.Logger logProps *log.ZapProperties + + EnableConfigManager bool } // NewConfig creates a new config. @@ -162,6 +164,8 @@ func NewConfig() *Config { fs.StringVar(&cfg.Security.KeyPath, "key", "", "Path of file that contains X509 key in PEM format") fs.BoolVar(&cfg.ForceNewCluster, "force-new-cluster", false, "Force to create a new one-member cluster") + fs.BoolVar(&cfg.EnableConfigManager, "enable-config-manager", false, "Enable configuration manager") + return cfg } diff --git a/server/config_manager/config_manager.go b/server/config_manager/config_manager.go index bb8f8d1a259..1971cd78ac7 100644 --- a/server/config_manager/config_manager.go +++ b/server/config_manager/config_manager.go @@ -23,7 +23,9 @@ import ( "github.com/BurntSushi/toml" "github.com/pingcap/kvproto/pkg/configpb" + "github.com/pingcap/pd/server/cluster" "github.com/pingcap/pd/server/core" + "github.com/pingcap/pd/server/member" "github.com/pkg/errors" ) @@ -43,9 +45,19 @@ var ( errNotSupported = "not supported" ) +// Server is the interface for configuration manager. +type Server interface { + IsClosed() bool + ClusterID() uint64 + GetRaftCluster() *cluster.RaftCluster + GetStorage() *core.Storage + GetMember() *member.Member +} + // ConfigManager is used to manage all components' config. type ConfigManager struct { sync.RWMutex + svr Server // component -> GlobalConfig GlobalCfgs map[string]*GlobalConfig // component -> componentID -> LocalConfig @@ -53,8 +65,9 @@ type ConfigManager struct { } // NewConfigManager creates a new ConfigManager. -func NewConfigManager() *ConfigManager { +func NewConfigManager(svr Server) *ConfigManager { return &ConfigManager{ + svr: svr, GlobalCfgs: make(map[string]*GlobalConfig), LocalCfgs: make(map[string]map[string]*LocalConfig), } @@ -85,8 +98,8 @@ func (c *ConfigManager) getComponent(id string) string { return "" } -// Get returns config and the latest version. -func (c *ConfigManager) Get(version *configpb.Version, component, componentID string) (*configpb.Version, string, *configpb.Status) { +// GetConfig returns config and the latest version. +func (c *ConfigManager) GetConfig(version *configpb.Version, component, componentID string) (*configpb.Version, string, *configpb.Status) { c.RLock() defer c.RUnlock() var config string @@ -120,8 +133,8 @@ func (c *ConfigManager) Get(version *configpb.Version, component, componentID st return c.getLatestVersion(component, componentID), config, status } -// Create is used for registering a component to PD. -func (c *ConfigManager) Create(version *configpb.Version, component, componentID, cfg string) (*configpb.Version, string, *configpb.Status) { +// CreateConfig is used for registering a component to PD. +func (c *ConfigManager) CreateConfig(version *configpb.Version, component, componentID, cfg string) (*configpb.Version, string, *configpb.Status) { c.Lock() defer c.Unlock() var status *configpb.Status @@ -188,8 +201,8 @@ func (c *ConfigManager) getComponentCfg(component, componentID string) (string, return encodeConfigs(config) } -// Update is used to update a config with a given config type. -func (c *ConfigManager) Update(kind *configpb.ConfigKind, version *configpb.Version, entries []*configpb.ConfigEntry) (*configpb.Version, *configpb.Status) { +// UpdateConfig is used to update a config with a given config type. +func (c *ConfigManager) UpdateConfig(kind *configpb.ConfigKind, version *configpb.Version, entries []*configpb.ConfigEntry) (*configpb.Version, *configpb.Status) { c.Lock() defer c.Unlock() @@ -318,8 +331,8 @@ func (c *ConfigManager) updateLocal(componentID string, version *configpb.Versio return c.LocalCfgs[component][componentID].getVersion(), &configpb.Status{Code: configpb.StatusCode_OK} } -// Delete removes a component from the config manager. -func (c *ConfigManager) Delete(kind *configpb.ConfigKind, version *configpb.Version) *configpb.Status { +// DeleteConfig removes a component from the config manager. +func (c *ConfigManager) DeleteConfig(kind *configpb.ConfigKind, version *configpb.Version) *configpb.Status { c.Lock() defer c.Unlock() diff --git a/server/config_manager/config_manager_test.go b/server/config_manager/config_manager_test.go index 05cbcf3678f..3f76c305163 100644 --- a/server/config_manager/config_manager_test.go +++ b/server/config_manager/config_manager_test.go @@ -198,7 +198,7 @@ compression-per-level = [ [rocksdb.defaultcf.titan] discardable-ratio = 0.00156 ` - cfg := NewConfigManager() + cfg := NewConfigManager(nil) lc, err := NewLocalConfig(cfgData, &configpb.Version{Global: 0, Local: 1}) c.Assert(err, IsNil) gc := NewGlobalConfig( @@ -214,7 +214,7 @@ discardable-ratio = 0.00156 err = cfg.Persist(storage) c.Assert(err, IsNil) - cfg1 := NewConfigManager() + cfg1 := NewConfigManager(nil) err = cfg1.Reload(storage) c.Assert(err, IsNil) c.Assert(cfg1.LocalCfgs["tikv"]["tikv1"], DeepEquals, lc) @@ -266,7 +266,7 @@ compression-per-level = [ [rocksdb.defaultcf.titan] discardable-ratio = 0.00156 ` - cfg := NewConfigManager() + cfg := NewConfigManager(nil) lc, err := NewLocalConfig(cfgData, &configpb.Version{Global: 0, Local: 0}) c.Assert(err, IsNil) entry := []*configpb.ConfigEntry{{ @@ -315,31 +315,31 @@ func (s *testComponentsConfigSuite) TestCreate(c *C) { cfgData := ` log-level = "debug" ` - cfg := NewConfigManager() - v, config, status := cfg.Create(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1", cfgData) + cfg := NewConfigManager(nil) + v, config, status := cfg.CreateConfig(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1", cfgData) c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 0}) c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) expect := `log-level = "debug" ` c.Assert(config, Equals, expect) - v, config, status = cfg.Create(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1", cfgData) + v, config, status = cfg.CreateConfig(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1", cfgData) c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 0}) c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) c.Assert(config, Equals, expect) - v, status = cfg.Update( + v, status = cfg.UpdateConfig( &configpb.ConfigKind{Kind: &configpb.ConfigKind_Local{Local: &configpb.Local{ComponentId: "tikv1"}}}, &configpb.Version{Global: 0, Local: 0}, []*configpb.ConfigEntry{{Name: "log-level", Value: "info"}}, ) c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 1}) c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) - v, config, status = cfg.Create(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1", cfgData) + v, config, status = cfg.CreateConfig(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1", cfgData) c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 1}) c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) expect1 := `log-level = "info" ` c.Assert(config, Equals, expect1) - v, config, status = cfg.Create(&configpb.Version{Global: 10, Local: 10}, "tikv", "tikv1", cfgData) + v, config, status = cfg.CreateConfig(&configpb.Version{Global: 10, Local: 10}, "tikv", "tikv1", cfgData) c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 1}) c.Assert(status.GetCode(), Equals, configpb.StatusCode_WRONG_VERSION) c.Assert(config, Equals, expect1) @@ -349,8 +349,8 @@ func (s *testComponentsConfigSuite) TestUpdate(c *C) { cfgData := ` log-level = "debug" ` - cfg := NewConfigManager() - v, config, status := cfg.Create(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1", cfgData) + cfg := NewConfigManager(nil) + v, config, status := cfg.CreateConfig(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1", cfgData) c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 0}) c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) expect := `log-level = "debug" @@ -358,7 +358,7 @@ log-level = "debug" expect1 := `log-level = "info" ` c.Assert(config, Equals, expect) - v, status = cfg.Update( + v, status = cfg.UpdateConfig( &configpb.ConfigKind{Kind: &configpb.ConfigKind_Local{Local: &configpb.Local{ComponentId: "tikv1"}}}, &configpb.Version{Global: 0, Local: 0}, []*configpb.ConfigEntry{{Name: "log-level", Value: "info"}}, @@ -370,14 +370,14 @@ log-level = "debug" c.Assert(result, Equals, expect1) // stale update request - v, status = cfg.Update( + v, status = cfg.UpdateConfig( &configpb.ConfigKind{Kind: &configpb.ConfigKind_Local{Local: &configpb.Local{ComponentId: "tikv1"}}}, &configpb.Version{Global: 0, Local: 0}, []*configpb.ConfigEntry{{Name: "log-level", Value: "info"}}, ) c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 1}) c.Assert(status.GetCode(), Equals, configpb.StatusCode_WRONG_VERSION) - v, status = cfg.Update( + v, status = cfg.UpdateConfig( &configpb.ConfigKind{Kind: &configpb.ConfigKind_Global{Global: &configpb.Global{Component: "tikv"}}}, &configpb.Version{Global: 10, Local: 0}, []*configpb.ConfigEntry{{Name: "log-level", Value: "debug"}}, @@ -385,7 +385,7 @@ log-level = "debug" c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 0}) c.Assert(status.GetCode(), Equals, configpb.StatusCode_WRONG_VERSION) - v, status = cfg.Update( + v, status = cfg.UpdateConfig( &configpb.ConfigKind{Kind: &configpb.ConfigKind_Global{Global: &configpb.Global{Component: "tikv"}}}, &configpb.Version{Global: 0, Local: 0}, []*configpb.ConfigEntry{{Name: "log-level", Value: "debug"}}, @@ -396,7 +396,7 @@ log-level = "debug" c.Assert(err, IsNil) c.Assert(result, Equals, expect) - v, status = cfg.Update( + v, status = cfg.UpdateConfig( &configpb.ConfigKind{Kind: &configpb.ConfigKind_Global{Global: &configpb.Global{Component: "tikv"}}}, &configpb.Version{Global: 1, Local: 0}, []*configpb.ConfigEntry{{Name: "log-level", Value: "info"}}, @@ -406,7 +406,7 @@ log-level = "debug" result, err = cfg.getComponentCfg("tikv", "tikv1") c.Assert(err, IsNil) c.Assert(result, Equals, expect1) - v, status = cfg.Update( + v, status = cfg.UpdateConfig( &configpb.ConfigKind{Kind: &configpb.ConfigKind_Local{Local: &configpb.Local{ComponentId: "tikv1"}}}, &configpb.Version{Global: 2, Local: 0}, []*configpb.ConfigEntry{{Name: "log-level", Value: "debug"}}, @@ -418,7 +418,7 @@ log-level = "debug" c.Assert(result, Equals, expect) // stale update request - v, status = cfg.Update( + v, status = cfg.UpdateConfig( &configpb.ConfigKind{Kind: &configpb.ConfigKind_Global{Global: &configpb.Global{Component: "tikv"}}}, &configpb.Version{Global: 0, Local: 0}, []*configpb.ConfigEntry{{Name: "log-level", Value: "info"}}, @@ -427,7 +427,7 @@ log-level = "debug" c.Assert(status.GetCode(), Equals, configpb.StatusCode_WRONG_VERSION) // nil case - v, status = cfg.Update(nil, nil, nil) + v, status = cfg.UpdateConfig(nil, nil, nil) c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 0}) c.Assert(status.GetCode(), Equals, configpb.StatusCode_UNKNOWN) } @@ -436,31 +436,31 @@ func (s *testComponentsConfigSuite) TestGet(c *C) { cfgData := ` log-level = "debug" ` - cfg := NewConfigManager() - v, config, status := cfg.Create(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1", cfgData) + cfg := NewConfigManager(nil) + v, config, status := cfg.CreateConfig(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1", cfgData) c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 0}) c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) expect := `log-level = "debug" ` c.Assert(config, Equals, expect) - v, status = cfg.Update( + v, status = cfg.UpdateConfig( &configpb.ConfigKind{Kind: &configpb.ConfigKind_Local{Local: &configpb.Local{ComponentId: "tikv1"}}}, &configpb.Version{Global: 0, Local: 0}, []*configpb.ConfigEntry{{Name: "log-level", Value: "info"}}, ) c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 1}) c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) - v, config, status = cfg.Get(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1") + v, config, status = cfg.GetConfig(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1") c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 1}) c.Assert(status.GetCode(), Equals, configpb.StatusCode_WRONG_VERSION) expect1 := `log-level = "info" ` c.Assert(config, Equals, expect1) - v, config, status = cfg.Get(&configpb.Version{Global: 10, Local: 0}, "tikv", "tikv1") + v, config, status = cfg.GetConfig(&configpb.Version{Global: 10, Local: 0}, "tikv", "tikv1") c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 1}) c.Assert(status.GetCode(), Equals, configpb.StatusCode_WRONG_VERSION) c.Assert(config, Equals, expect1) - v, config, status = cfg.Get(&configpb.Version{Global: 10, Local: 1}, "tikv", "tikv1") + v, config, status = cfg.GetConfig(&configpb.Version{Global: 10, Local: 1}, "tikv", "tikv1") c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 1}) c.Assert(status.GetCode(), Equals, configpb.StatusCode_WRONG_VERSION) c.Assert(config, Equals, expect1) @@ -470,54 +470,54 @@ func (s *testComponentsConfigSuite) TestDeleteLocal(c *C) { cfgData := ` log-level = "debug" ` - cfg := NewConfigManager() + cfg := NewConfigManager(nil) - v, config, status := cfg.Create(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1", cfgData) + v, config, status := cfg.CreateConfig(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1", cfgData) c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 0}) c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) expect := `log-level = "debug" ` c.Assert(config, Equals, expect) - v, status = cfg.Update( + v, status = cfg.UpdateConfig( &configpb.ConfigKind{Kind: &configpb.ConfigKind_Local{Local: &configpb.Local{ComponentId: "tikv1"}}}, &configpb.Version{Global: 0, Local: 0}, []*configpb.ConfigEntry{{Name: "log-level", Value: "info"}}, ) c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 1}) c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) - v, config, status = cfg.Get(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1") + v, config, status = cfg.GetConfig(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1") c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 1}) c.Assert(status.GetCode(), Equals, configpb.StatusCode_WRONG_VERSION) expect1 := `log-level = "info" ` c.Assert(config, Equals, expect1) - status = cfg.Delete( + status = cfg.DeleteConfig( &configpb.ConfigKind{Kind: &configpb.ConfigKind_Local{Local: &configpb.Local{ComponentId: "tikv1"}}}, &configpb.Version{Global: 0, Local: 0}, ) c.Assert(status.GetCode(), Equals, configpb.StatusCode_WRONG_VERSION) - v, config, status = cfg.Get(&configpb.Version{Global: 0, Local: 1}, "tikv", "tikv1") + v, config, status = cfg.GetConfig(&configpb.Version{Global: 0, Local: 1}, "tikv", "tikv1") c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 1}) c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) c.Assert(config, Equals, expect1) - status = cfg.Delete( + status = cfg.DeleteConfig( &configpb.ConfigKind{Kind: &configpb.ConfigKind_Local{Local: &configpb.Local{ComponentId: "tikv1"}}}, &configpb.Version{Global: 1, Local: 1}, ) c.Assert(status.GetCode(), Equals, configpb.StatusCode_WRONG_VERSION) - v, config, status = cfg.Get(&configpb.Version{Global: 0, Local: 1}, "tikv", "tikv1") + v, config, status = cfg.GetConfig(&configpb.Version{Global: 0, Local: 1}, "tikv", "tikv1") c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 1}) c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) c.Assert(config, Equals, expect1) - status = cfg.Delete( + status = cfg.DeleteConfig( &configpb.ConfigKind{Kind: &configpb.ConfigKind_Local{Local: &configpb.Local{ComponentId: "tikv1"}}}, &configpb.Version{Global: 0, Local: 1}, ) c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK) - v, config, status = cfg.Get(&configpb.Version{Global: 0, Local: 1}, "tikv", "tikv1") + v, config, status = cfg.GetConfig(&configpb.Version{Global: 0, Local: 1}, "tikv", "tikv1") c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 0}) c.Assert(status.GetCode(), Equals, configpb.StatusCode_COMPONENT_ID_NOT_FOUND) c.Assert(config, Equals, "") diff --git a/server/config_manager/grpc_service.go b/server/config_manager/grpc_service.go new file mode 100644 index 00000000000..9e9bb85f5ef --- /dev/null +++ b/server/config_manager/grpc_service.go @@ -0,0 +1,110 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package configmanager + +import ( + "context" + + "github.com/pingcap/kvproto/pkg/configpb" + "github.com/pkg/errors" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +var notLeaderError = status.Errorf(codes.Unavailable, "not leader") + +// Create implements gRPC PDServer. +func (c *ConfigManager) Create(ctx context.Context, request *configpb.CreateRequest) (*configpb.CreateResponse, error) { + if err := c.validateComponentRequest(request.GetHeader()); err != nil { + return nil, err + } + + version, config, status := c.CreateConfig(request.GetVersion(), request.GetComponent(), request.GetComponentId(), request.GetConfig()) + if status.GetCode() == configpb.StatusCode_OK { + c.Persist(c.svr.GetStorage()) + } + + return &configpb.CreateResponse{ + Header: c.componentHeader(), + Status: status, + Version: version, + Config: config, + }, nil +} + +// Get implements gRPC PDServer. +func (c *ConfigManager) Get(ctx context.Context, request *configpb.GetRequest) (*configpb.GetResponse, error) { + if err := c.validateComponentRequest(request.GetHeader()); err != nil { + return nil, err + } + + version, config, status := c.GetConfig(request.GetVersion(), request.GetComponent(), request.GetComponentId()) + + return &configpb.GetResponse{ + Header: c.componentHeader(), + Status: status, + Version: version, + Config: config, + }, nil +} + +// Update implements gRPC PDServer. +func (c *ConfigManager) Update(ctx context.Context, request *configpb.UpdateRequest) (*configpb.UpdateResponse, error) { + if err := c.validateComponentRequest(request.GetHeader()); err != nil { + return nil, err + } + + version, status := c.UpdateConfig(request.GetKind(), request.GetVersion(), request.GetEntries()) + if status.GetCode() == configpb.StatusCode_OK { + c.Persist(c.svr.GetStorage()) + } + + return &configpb.UpdateResponse{ + Header: c.componentHeader(), + Status: status, + Version: version, + }, nil +} + +// Delete implements gRPC PDServer. +func (c *ConfigManager) Delete(ctx context.Context, request *configpb.DeleteRequest) (*configpb.DeleteResponse, error) { + if err := c.validateComponentRequest(request.GetHeader()); err != nil { + return nil, err + } + + status := c.DeleteConfig(request.GetKind(), request.GetVersion()) + if status.GetCode() == configpb.StatusCode_OK { + c.Persist(c.svr.GetStorage()) + } + + return &configpb.DeleteResponse{ + Header: c.componentHeader(), + Status: status, + }, nil +} + +func (c *ConfigManager) componentHeader() *configpb.Header { + return &configpb.Header{ClusterId: c.svr.ClusterID()} +} + +func (c *ConfigManager) validateComponentRequest(header *configpb.Header) error { + if c.svr.IsClosed() || !c.svr.GetMember().IsLeader() { + return errors.WithStack(notLeaderError) + } + clusterID := c.svr.ClusterID() + if header.GetClusterId() != clusterID { + return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", clusterID, header.GetClusterId()) + } + return nil +} diff --git a/server/server.go b/server/server.go index 1cef6d3f404..279b9ce6cfb 100755 --- a/server/server.go +++ b/server/server.go @@ -30,6 +30,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/gorilla/mux" "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/configpb" "github.com/pingcap/kvproto/pkg/diagnosticspb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" @@ -40,6 +41,7 @@ import ( "github.com/pingcap/pd/pkg/ui" "github.com/pingcap/pd/server/cluster" "github.com/pingcap/pd/server/config" + configmanager "github.com/pingcap/pd/server/config_manager" "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/id" "github.com/pingcap/pd/server/kv" @@ -116,6 +118,9 @@ type Server struct { // Zap logger lg *zap.Logger logProps *log.ZapProperties + + // components' configuration management + cfgManager *configmanager.ConfigManager } // HandlerBuilder builds a server HTTP handler. @@ -185,6 +190,8 @@ func CreateServer(ctx context.Context, cfg *config.Config, apiBuilders ...Handle ctx: ctx, DiagnosticsServer: sysutil.NewDiagnosticsServer(cfg.Log.File.Filename), } + + s.cfgManager = configmanager.NewConfigManager(s) s.handler = newHandler(s) // Adjust etcd config. @@ -206,6 +213,10 @@ func CreateServer(ctx context.Context, cfg *config.Config, apiBuilders ...Handle etcdCfg.ServiceRegister = func(gs *grpc.Server) { pdpb.RegisterPDServer(gs, s) diagnosticspb.RegisterDiagnosticsServer(gs, s) + + if cfg.EnableConfigManager { + configpb.RegisterConfigServer(gs, s.cfgManager) + } } s.etcdCfg = etcdCfg if EnableZap { @@ -974,5 +985,13 @@ func (s *Server) reloadConfigFromKV() error { s.storage.SwitchToDefaultStorage() log.Info("server disable region storage") } + + // The request only valid when there is a leader. + // And before the a PD becomes a leader it will firstly reload the config. + if s.cfg.EnableConfigManager { + err = s.cfgManager.Reload(s.storage) + return err + } + return nil }