Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

config: introduce a config client to support load configs from PD online #14303

Merged
merged 19 commits into from
Feb 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"net/url"
"os"
"reflect"
"strings"
Expand All @@ -33,6 +34,7 @@ import (
"github.com/pingcap/tidb/util/logutil"
tracing "github.com/uber/jaeger-client-go/config"
"go.uber.org/atomic"
"go.uber.org/zap"
)

// Config number limitations
Expand Down Expand Up @@ -157,6 +159,16 @@ func (b *nullableBool) UnmarshalText(text []byte) error {
return nil
}

func (b nullableBool) MarshalText() ([]byte, error) {
if !b.IsValid {
return []byte(""), nil
}
if b.IsTrue {
return []byte("true"), nil
}
return []byte("false"), nil
}

func (b *nullableBool) UnmarshalJSON(data []byte) error {
var err error
var v interface{}
Expand Down Expand Up @@ -864,3 +876,28 @@ const (
OOMActionCancel = "cancel"
OOMActionLog = "log"
)

// ParsePath parses this path.
func ParsePath(path string) (etcdAddrs []string, disableGC bool, err error) {
var u *url.URL
u, err = url.Parse(path)
if err != nil {
err = errors.Trace(err)
return
}
if strings.ToLower(u.Scheme) != "tikv" {
err = errors.Errorf("Uri scheme expected [tikv] but found [%s]", u.Scheme)
logutil.BgLogger().Error("parsePath error", zap.Error(err))
return
}
switch strings.ToLower(u.Query().Get("disableGC")) {
case "true":
disableGC = true
case "false", "":
default:
err = errors.New("disableGC flag should be true/false")
return
}
etcdAddrs = strings.Split(u.Host, ",")
return
}
223 changes: 223 additions & 0 deletions config/config_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package config

import (
"bytes"
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/BurntSushi/toml"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/configpb"
"github.com/pingcap/pd/client"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)

// ConfHandler is used to load and update config online.
// See https://github.com/pingcap/tidb/pull/13660 for more details.
type ConfHandler interface {
Start()
Close()
GetConfig() *Config // read only
}

// ConfReloadFunc is used to reload the config to make it work.
type ConfReloadFunc func(oldConf, newConf *Config)

// NewConfHandler creates a new ConfHandler according to the local config.
func NewConfHandler(localConf *Config, reloadFunc ConfReloadFunc) (ConfHandler, error) {
switch defaultConf.Store {
case "tikv":
return newPDConfHandler(localConf, reloadFunc, nil)
default:
return &constantConfHandler{localConf}, nil
}
}

// constantConfHandler is used in local or debug environment.
// It always returns the constant config initialized at the beginning.
type constantConfHandler struct {
qw4990 marked this conversation as resolved.
Show resolved Hide resolved
conf *Config
}

func (cch *constantConfHandler) Start() {}

func (cch *constantConfHandler) Close() {}

func (cch *constantConfHandler) GetConfig() *Config { return cch.conf }

const (
pdConfHandlerRefreshInterval = 30 * time.Second
tidbComponentName = "tidb"
)

type pdConfHandler struct {
id string // ip:port
version *configpb.Version
curConf atomic.Value
interval time.Duration
wg sync.WaitGroup
exit chan struct{}
pdConfCli pd.ConfigClient
reloadFunc func(oldConf, newConf *Config)
}

func newPDConfHandler(localConf *Config, reloadFunc ConfReloadFunc,
newPDCliFunc func([]string, pd.SecurityOption) (pd.ConfigClient, error), // for test
) (*pdConfHandler, error) {
addresses, _, err := ParsePath(localConf.Path)
if err != nil {
return nil, err
}
host := localConf.Host
if localConf.AdvertiseAddress != "" {
host = localConf.AdvertiseAddress
}
id := fmt.Sprintf("%v:%v", host, localConf.Port)
security := localConf.Security
if newPDCliFunc == nil {
newPDCliFunc = pd.NewConfigClient
}
pdCli, err := newPDCliFunc(addresses, pd.SecurityOption{
CAPath: security.ClusterSSLCA,
CertPath: security.ClusterSSLCert,
KeyPath: security.ClusterSSLKey,
})
if err != nil {
return nil, err
}
confContent, err := encodeConfig(localConf)
if err != nil {
return nil, err
}

// register to PD and get the new default config.
// see https://github.com/pingcap/tidb/pull/13660 for more details.
// suppose port and security config items cannot be change online.
status, version, conf, err := pdCli.Create(context.Background(), new(configpb.Version), tidbComponentName, id, confContent)
if err != nil {
logutil.Logger(context.Background()).Warn("register the config to PD error, local config will be used", zap.Error(err))
} else if status.Code != configpb.StatusCode_OK && status.Code != configpb.StatusCode_WRONG_VERSION {
logutil.Logger(context.Background()).Warn("invalid status when registering the config to PD", zap.String("code", status.Code.String()), zap.String("errmsg", status.Message))
conf = ""
}

tmpConf := *localConf // use the local config if the remote config is invalid
newConf := &tmpConf
if conf != "" {
newConf, err = decodeConfig(conf)
if err != nil {
logutil.Logger(context.Background()).Warn("decode remote config error", zap.Error(err))
newConf = &tmpConf
} else if err := newConf.Valid(); err != nil {
logutil.Logger(context.Background()).Warn("invalid remote config", zap.Error(err))
newConf = &tmpConf
}
}

ch := &pdConfHandler{
id: id,
version: version,
interval: pdConfHandlerRefreshInterval,
exit: make(chan struct{}),
pdConfCli: pdCli,
reloadFunc: reloadFunc,
}
ch.curConf.Store(newConf)
return ch, nil
}

func (ch *pdConfHandler) Start() {
ch.wg.Add(1)
go ch.run()
}

func (ch *pdConfHandler) Close() {
close(ch.exit)
ch.wg.Wait()
ch.pdConfCli.Close()
}

func (ch *pdConfHandler) GetConfig() *Config {
return ch.curConf.Load().(*Config)
}

func (ch *pdConfHandler) run() {
defer func() {
qw4990 marked this conversation as resolved.
Show resolved Hide resolved
if r := recover(); r != nil {
logutil.Logger(context.Background()).Error("panic in the recoverable goroutine",
zap.Reflect("r", r),
zap.Stack("stack trace"))
}
ch.wg.Done()
}()

for {
select {
case <-time.After(ch.interval):
// fetch new config from PD
status, version, newConfContent, err := ch.pdConfCli.Get(context.Background(), ch.version, tidbComponentName, ch.id)
if err != nil {
logutil.Logger(context.Background()).Error("PDConfHandler fetch new config error", zap.Error(err))
continue
}
if status.Code == configpb.StatusCode_OK { // StatusCode_OK represents the request is successful and there is no change.
continue
}
if status.Code != configpb.StatusCode_WRONG_VERSION {
// StatusCode_WRONG_VERSION represents the request is successful and the config has been updated.
logutil.Logger(context.Background()).Error("PDConfHandler fetch new config PD error",
zap.Int("code", int(status.Code)), zap.String("message", status.Message))
continue
}
newConf, err := decodeConfig(newConfContent)
if err != nil {
logutil.Logger(context.Background()).Error("PDConfHandler decode config error", zap.Error(err))
continue
}
if err := newConf.Valid(); err != nil {
logutil.Logger(context.Background()).Error("PDConfHandler invalid config", zap.Error(err))
continue
}

ch.reloadFunc(ch.curConf.Load().(*Config), newConf)
ch.curConf.Store(newConf)
logutil.Logger(context.Background()).Info("PDConfHandler update config successfully",
zap.String("fromVersion", ch.version.String()), zap.String("toVersion", version.String()))
ch.version = version
case <-ch.exit:
return
}
}
}

func encodeConfig(conf *Config) (string, error) {
confBuf := bytes.NewBuffer(nil)
te := toml.NewEncoder(confBuf)
if err := te.Encode(conf); err != nil {
return "", errors.New("encode config error=" + err.Error())
}
return confBuf.String(), nil
}

func decodeConfig(content string) (*Config, error) {
c := new(Config)
_, err := toml.Decode(content, c)
return c, err
}
Loading