Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add config gRPC service #2033

Merged
merged 7 commits into from
Dec 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ type Config struct {

logger *zap.Logger
logProps *log.ZapProperties

EnableConfigManager bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can enable it by default. No need the config. If no client use it, the feature has no side effect.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The switch will also control the PD config management which will start a goroutine in #2049.

}

// NewConfig creates a new config.
Expand Down Expand Up @@ -162,6 +164,8 @@ func NewConfig() *Config {
fs.StringVar(&cfg.Security.KeyPath, "key", "", "Path of file that contains X509 key in PEM format")
fs.BoolVar(&cfg.ForceNewCluster, "force-new-cluster", false, "Force to create a new one-member cluster")

fs.BoolVar(&cfg.EnableConfigManager, "enable-config-manager", false, "Enable configuration manager")

return cfg
}

Expand Down
31 changes: 22 additions & 9 deletions server/config_manager/config_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (

"github.com/BurntSushi/toml"
"github.com/pingcap/kvproto/pkg/configpb"
"github.com/pingcap/pd/server/cluster"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/member"
"github.com/pkg/errors"
)

Expand All @@ -43,18 +45,29 @@ var (
errNotSupported = "not supported"
)

// Server is the interface for configuration manager.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it seem that we have another interface called Server? Maybe we should define the interface with some specifications.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and actually we indeed have some interfaces called Server in different packages. Maybe we can handle it in the future. But I don't have a good idea.

type Server interface {
IsClosed() bool
ClusterID() uint64
GetRaftCluster() *cluster.RaftCluster
GetStorage() *core.Storage
GetMember() *member.Member
}

// ConfigManager is used to manage all components' config.
type ConfigManager struct {
sync.RWMutex
svr Server
// component -> GlobalConfig
GlobalCfgs map[string]*GlobalConfig
// component -> componentID -> LocalConfig
LocalCfgs map[string]map[string]*LocalConfig
}

// NewConfigManager creates a new ConfigManager.
func NewConfigManager() *ConfigManager {
func NewConfigManager(svr Server) *ConfigManager {
return &ConfigManager{
svr: svr,
GlobalCfgs: make(map[string]*GlobalConfig),
LocalCfgs: make(map[string]map[string]*LocalConfig),
}
Expand Down Expand Up @@ -85,8 +98,8 @@ func (c *ConfigManager) getComponent(id string) string {
return ""
}

// Get returns config and the latest version.
func (c *ConfigManager) Get(version *configpb.Version, component, componentID string) (*configpb.Version, string, *configpb.Status) {
// GetConfig returns config and the latest version.
func (c *ConfigManager) GetConfig(version *configpb.Version, component, componentID string) (*configpb.Version, string, *configpb.Status) {
c.RLock()
defer c.RUnlock()
var config string
Expand Down Expand Up @@ -120,8 +133,8 @@ func (c *ConfigManager) Get(version *configpb.Version, component, componentID st
return c.getLatestVersion(component, componentID), config, status
}

// Create is used for registering a component to PD.
func (c *ConfigManager) Create(version *configpb.Version, component, componentID, cfg string) (*configpb.Version, string, *configpb.Status) {
// CreateConfig is used for registering a component to PD.
func (c *ConfigManager) CreateConfig(version *configpb.Version, component, componentID, cfg string) (*configpb.Version, string, *configpb.Status) {
c.Lock()
defer c.Unlock()
var status *configpb.Status
Expand Down Expand Up @@ -188,8 +201,8 @@ func (c *ConfigManager) getComponentCfg(component, componentID string) (string,
return encodeConfigs(config)
}

// Update is used to update a config with a given config type.
func (c *ConfigManager) Update(kind *configpb.ConfigKind, version *configpb.Version, entries []*configpb.ConfigEntry) (*configpb.Version, *configpb.Status) {
// UpdateConfig is used to update a config with a given config type.
func (c *ConfigManager) UpdateConfig(kind *configpb.ConfigKind, version *configpb.Version, entries []*configpb.ConfigEntry) (*configpb.Version, *configpb.Status) {
c.Lock()
defer c.Unlock()

Expand Down Expand Up @@ -318,8 +331,8 @@ func (c *ConfigManager) updateLocal(componentID string, version *configpb.Versio
return c.LocalCfgs[component][componentID].getVersion(), &configpb.Status{Code: configpb.StatusCode_OK}
}

// Delete removes a component from the config manager.
func (c *ConfigManager) Delete(kind *configpb.ConfigKind, version *configpb.Version) *configpb.Status {
// DeleteConfig removes a component from the config manager.
func (c *ConfigManager) DeleteConfig(kind *configpb.ConfigKind, version *configpb.Version) *configpb.Status {
c.Lock()
defer c.Unlock()

Expand Down
70 changes: 35 additions & 35 deletions server/config_manager/config_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ compression-per-level = [
[rocksdb.defaultcf.titan]
discardable-ratio = 0.00156
`
cfg := NewConfigManager()
cfg := NewConfigManager(nil)
lc, err := NewLocalConfig(cfgData, &configpb.Version{Global: 0, Local: 1})
c.Assert(err, IsNil)
gc := NewGlobalConfig(
Expand All @@ -214,7 +214,7 @@ discardable-ratio = 0.00156
err = cfg.Persist(storage)
c.Assert(err, IsNil)

cfg1 := NewConfigManager()
cfg1 := NewConfigManager(nil)
err = cfg1.Reload(storage)
c.Assert(err, IsNil)
c.Assert(cfg1.LocalCfgs["tikv"]["tikv1"], DeepEquals, lc)
Expand Down Expand Up @@ -266,7 +266,7 @@ compression-per-level = [
[rocksdb.defaultcf.titan]
discardable-ratio = 0.00156
`
cfg := NewConfigManager()
cfg := NewConfigManager(nil)
lc, err := NewLocalConfig(cfgData, &configpb.Version{Global: 0, Local: 0})
c.Assert(err, IsNil)
entry := []*configpb.ConfigEntry{{
Expand Down Expand Up @@ -315,31 +315,31 @@ func (s *testComponentsConfigSuite) TestCreate(c *C) {
cfgData := `
log-level = "debug"
`
cfg := NewConfigManager()
v, config, status := cfg.Create(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1", cfgData)
cfg := NewConfigManager(nil)
v, config, status := cfg.CreateConfig(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1", cfgData)
c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 0})
c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK)
expect := `log-level = "debug"
`
c.Assert(config, Equals, expect)
v, config, status = cfg.Create(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1", cfgData)
v, config, status = cfg.CreateConfig(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1", cfgData)
c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 0})
c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK)
c.Assert(config, Equals, expect)
v, status = cfg.Update(
v, status = cfg.UpdateConfig(
&configpb.ConfigKind{Kind: &configpb.ConfigKind_Local{Local: &configpb.Local{ComponentId: "tikv1"}}},
&configpb.Version{Global: 0, Local: 0},
[]*configpb.ConfigEntry{{Name: "log-level", Value: "info"}},
)
c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 1})
c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK)
v, config, status = cfg.Create(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1", cfgData)
v, config, status = cfg.CreateConfig(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1", cfgData)
c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 1})
c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK)
expect1 := `log-level = "info"
`
c.Assert(config, Equals, expect1)
v, config, status = cfg.Create(&configpb.Version{Global: 10, Local: 10}, "tikv", "tikv1", cfgData)
v, config, status = cfg.CreateConfig(&configpb.Version{Global: 10, Local: 10}, "tikv", "tikv1", cfgData)
c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 1})
c.Assert(status.GetCode(), Equals, configpb.StatusCode_WRONG_VERSION)
c.Assert(config, Equals, expect1)
Expand All @@ -349,16 +349,16 @@ func (s *testComponentsConfigSuite) TestUpdate(c *C) {
cfgData := `
log-level = "debug"
`
cfg := NewConfigManager()
v, config, status := cfg.Create(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1", cfgData)
cfg := NewConfigManager(nil)
v, config, status := cfg.CreateConfig(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1", cfgData)
c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 0})
c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK)
expect := `log-level = "debug"
`
expect1 := `log-level = "info"
`
c.Assert(config, Equals, expect)
v, status = cfg.Update(
v, status = cfg.UpdateConfig(
&configpb.ConfigKind{Kind: &configpb.ConfigKind_Local{Local: &configpb.Local{ComponentId: "tikv1"}}},
&configpb.Version{Global: 0, Local: 0},
[]*configpb.ConfigEntry{{Name: "log-level", Value: "info"}},
Expand All @@ -370,22 +370,22 @@ log-level = "debug"
c.Assert(result, Equals, expect1)

// stale update request
v, status = cfg.Update(
v, status = cfg.UpdateConfig(
&configpb.ConfigKind{Kind: &configpb.ConfigKind_Local{Local: &configpb.Local{ComponentId: "tikv1"}}},
&configpb.Version{Global: 0, Local: 0},
[]*configpb.ConfigEntry{{Name: "log-level", Value: "info"}},
)
c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 1})
c.Assert(status.GetCode(), Equals, configpb.StatusCode_WRONG_VERSION)
v, status = cfg.Update(
v, status = cfg.UpdateConfig(
&configpb.ConfigKind{Kind: &configpb.ConfigKind_Global{Global: &configpb.Global{Component: "tikv"}}},
&configpb.Version{Global: 10, Local: 0},
[]*configpb.ConfigEntry{{Name: "log-level", Value: "debug"}},
)
c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 0})
c.Assert(status.GetCode(), Equals, configpb.StatusCode_WRONG_VERSION)

v, status = cfg.Update(
v, status = cfg.UpdateConfig(
&configpb.ConfigKind{Kind: &configpb.ConfigKind_Global{Global: &configpb.Global{Component: "tikv"}}},
&configpb.Version{Global: 0, Local: 0},
[]*configpb.ConfigEntry{{Name: "log-level", Value: "debug"}},
Expand All @@ -396,7 +396,7 @@ log-level = "debug"
c.Assert(err, IsNil)
c.Assert(result, Equals, expect)

v, status = cfg.Update(
v, status = cfg.UpdateConfig(
&configpb.ConfigKind{Kind: &configpb.ConfigKind_Global{Global: &configpb.Global{Component: "tikv"}}},
&configpb.Version{Global: 1, Local: 0},
[]*configpb.ConfigEntry{{Name: "log-level", Value: "info"}},
Expand All @@ -406,7 +406,7 @@ log-level = "debug"
result, err = cfg.getComponentCfg("tikv", "tikv1")
c.Assert(err, IsNil)
c.Assert(result, Equals, expect1)
v, status = cfg.Update(
v, status = cfg.UpdateConfig(
&configpb.ConfigKind{Kind: &configpb.ConfigKind_Local{Local: &configpb.Local{ComponentId: "tikv1"}}},
&configpb.Version{Global: 2, Local: 0},
[]*configpb.ConfigEntry{{Name: "log-level", Value: "debug"}},
Expand All @@ -418,7 +418,7 @@ log-level = "debug"
c.Assert(result, Equals, expect)

// stale update request
v, status = cfg.Update(
v, status = cfg.UpdateConfig(
&configpb.ConfigKind{Kind: &configpb.ConfigKind_Global{Global: &configpb.Global{Component: "tikv"}}},
&configpb.Version{Global: 0, Local: 0},
[]*configpb.ConfigEntry{{Name: "log-level", Value: "info"}},
Expand All @@ -427,7 +427,7 @@ log-level = "debug"
c.Assert(status.GetCode(), Equals, configpb.StatusCode_WRONG_VERSION)

// nil case
v, status = cfg.Update(nil, nil, nil)
v, status = cfg.UpdateConfig(nil, nil, nil)
c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 0})
c.Assert(status.GetCode(), Equals, configpb.StatusCode_UNKNOWN)
}
Expand All @@ -436,31 +436,31 @@ func (s *testComponentsConfigSuite) TestGet(c *C) {
cfgData := `
log-level = "debug"
`
cfg := NewConfigManager()
v, config, status := cfg.Create(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1", cfgData)
cfg := NewConfigManager(nil)
v, config, status := cfg.CreateConfig(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1", cfgData)
c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 0})
c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK)
expect := `log-level = "debug"
`
c.Assert(config, Equals, expect)
v, status = cfg.Update(
v, status = cfg.UpdateConfig(
&configpb.ConfigKind{Kind: &configpb.ConfigKind_Local{Local: &configpb.Local{ComponentId: "tikv1"}}},
&configpb.Version{Global: 0, Local: 0},
[]*configpb.ConfigEntry{{Name: "log-level", Value: "info"}},
)
c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 1})
c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK)
v, config, status = cfg.Get(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1")
v, config, status = cfg.GetConfig(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1")
c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 1})
c.Assert(status.GetCode(), Equals, configpb.StatusCode_WRONG_VERSION)
expect1 := `log-level = "info"
`
c.Assert(config, Equals, expect1)
v, config, status = cfg.Get(&configpb.Version{Global: 10, Local: 0}, "tikv", "tikv1")
v, config, status = cfg.GetConfig(&configpb.Version{Global: 10, Local: 0}, "tikv", "tikv1")
c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 1})
c.Assert(status.GetCode(), Equals, configpb.StatusCode_WRONG_VERSION)
c.Assert(config, Equals, expect1)
v, config, status = cfg.Get(&configpb.Version{Global: 10, Local: 1}, "tikv", "tikv1")
v, config, status = cfg.GetConfig(&configpb.Version{Global: 10, Local: 1}, "tikv", "tikv1")
c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 1})
c.Assert(status.GetCode(), Equals, configpb.StatusCode_WRONG_VERSION)
c.Assert(config, Equals, expect1)
Expand All @@ -470,54 +470,54 @@ func (s *testComponentsConfigSuite) TestDeleteLocal(c *C) {
cfgData := `
log-level = "debug"
`
cfg := NewConfigManager()
cfg := NewConfigManager(nil)

v, config, status := cfg.Create(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1", cfgData)
v, config, status := cfg.CreateConfig(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1", cfgData)
c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 0})
c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK)
expect := `log-level = "debug"
`
c.Assert(config, Equals, expect)
v, status = cfg.Update(
v, status = cfg.UpdateConfig(
&configpb.ConfigKind{Kind: &configpb.ConfigKind_Local{Local: &configpb.Local{ComponentId: "tikv1"}}},
&configpb.Version{Global: 0, Local: 0},
[]*configpb.ConfigEntry{{Name: "log-level", Value: "info"}},
)
c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 1})
c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK)
v, config, status = cfg.Get(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1")
v, config, status = cfg.GetConfig(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1")
c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 1})
c.Assert(status.GetCode(), Equals, configpb.StatusCode_WRONG_VERSION)
expect1 := `log-level = "info"
`
c.Assert(config, Equals, expect1)

status = cfg.Delete(
status = cfg.DeleteConfig(
&configpb.ConfigKind{Kind: &configpb.ConfigKind_Local{Local: &configpb.Local{ComponentId: "tikv1"}}},
&configpb.Version{Global: 0, Local: 0},
)
c.Assert(status.GetCode(), Equals, configpb.StatusCode_WRONG_VERSION)
v, config, status = cfg.Get(&configpb.Version{Global: 0, Local: 1}, "tikv", "tikv1")
v, config, status = cfg.GetConfig(&configpb.Version{Global: 0, Local: 1}, "tikv", "tikv1")
c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 1})
c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK)
c.Assert(config, Equals, expect1)

status = cfg.Delete(
status = cfg.DeleteConfig(
&configpb.ConfigKind{Kind: &configpb.ConfigKind_Local{Local: &configpb.Local{ComponentId: "tikv1"}}},
&configpb.Version{Global: 1, Local: 1},
)
c.Assert(status.GetCode(), Equals, configpb.StatusCode_WRONG_VERSION)
v, config, status = cfg.Get(&configpb.Version{Global: 0, Local: 1}, "tikv", "tikv1")
v, config, status = cfg.GetConfig(&configpb.Version{Global: 0, Local: 1}, "tikv", "tikv1")
c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 1})
c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK)
c.Assert(config, Equals, expect1)

status = cfg.Delete(
status = cfg.DeleteConfig(
&configpb.ConfigKind{Kind: &configpb.ConfigKind_Local{Local: &configpb.Local{ComponentId: "tikv1"}}},
&configpb.Version{Global: 0, Local: 1},
)
c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK)
v, config, status = cfg.Get(&configpb.Version{Global: 0, Local: 1}, "tikv", "tikv1")
v, config, status = cfg.GetConfig(&configpb.Version{Global: 0, Local: 1}, "tikv", "tikv1")
c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 0})
c.Assert(status.GetCode(), Equals, configpb.StatusCode_COMPONENT_ID_NOT_FOUND)
c.Assert(config, Equals, "")
Expand Down
Loading