Skip to content

Commit

Permalink
config: introduce a config client to support load configs from PD onl…
Browse files Browse the repository at this point in the history
…ine (#14303)
  • Loading branch information
qw4990 authored Feb 10, 2020
1 parent da2ed2b commit ad1d07f
Show file tree
Hide file tree
Showing 9 changed files with 424 additions and 44 deletions.
37 changes: 37 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"net/url"
"os"
"reflect"
"strings"
Expand All @@ -33,6 +34,7 @@ import (
"github.com/pingcap/tidb/util/logutil"
tracing "github.com/uber/jaeger-client-go/config"
"go.uber.org/atomic"
"go.uber.org/zap"
)

// Config number limitations
Expand Down Expand Up @@ -157,6 +159,16 @@ func (b *nullableBool) UnmarshalText(text []byte) error {
return nil
}

func (b nullableBool) MarshalText() ([]byte, error) {
if !b.IsValid {
return []byte(""), nil
}
if b.IsTrue {
return []byte("true"), nil
}
return []byte("false"), nil
}

func (b *nullableBool) UnmarshalJSON(data []byte) error {
var err error
var v interface{}
Expand Down Expand Up @@ -864,3 +876,28 @@ const (
OOMActionCancel = "cancel"
OOMActionLog = "log"
)

// ParsePath parses this path.
func ParsePath(path string) (etcdAddrs []string, disableGC bool, err error) {
var u *url.URL
u, err = url.Parse(path)
if err != nil {
err = errors.Trace(err)
return
}
if strings.ToLower(u.Scheme) != "tikv" {
err = errors.Errorf("Uri scheme expected [tikv] but found [%s]", u.Scheme)
logutil.BgLogger().Error("parsePath error", zap.Error(err))
return
}
switch strings.ToLower(u.Query().Get("disableGC")) {
case "true":
disableGC = true
case "false", "":
default:
err = errors.New("disableGC flag should be true/false")
return
}
etcdAddrs = strings.Split(u.Host, ",")
return
}
223 changes: 223 additions & 0 deletions config/config_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package config

import (
"bytes"
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/BurntSushi/toml"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/configpb"
"github.com/pingcap/pd/client"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)

// ConfHandler is used to load and update config online.
// See https://github.com/pingcap/tidb/pull/13660 for more details.
type ConfHandler interface {
Start()
Close()
GetConfig() *Config // read only
}

// ConfReloadFunc is used to reload the config to make it work.
type ConfReloadFunc func(oldConf, newConf *Config)

// NewConfHandler creates a new ConfHandler according to the local config.
func NewConfHandler(localConf *Config, reloadFunc ConfReloadFunc) (ConfHandler, error) {
switch defaultConf.Store {
case "tikv":
return newPDConfHandler(localConf, reloadFunc, nil)
default:
return &constantConfHandler{localConf}, nil
}
}

// constantConfHandler is used in local or debug environment.
// It always returns the constant config initialized at the beginning.
type constantConfHandler struct {
conf *Config
}

func (cch *constantConfHandler) Start() {}

func (cch *constantConfHandler) Close() {}

func (cch *constantConfHandler) GetConfig() *Config { return cch.conf }

const (
pdConfHandlerRefreshInterval = 30 * time.Second
tidbComponentName = "tidb"
)

type pdConfHandler struct {
id string // ip:port
version *configpb.Version
curConf atomic.Value
interval time.Duration
wg sync.WaitGroup
exit chan struct{}
pdConfCli pd.ConfigClient
reloadFunc func(oldConf, newConf *Config)
}

func newPDConfHandler(localConf *Config, reloadFunc ConfReloadFunc,
newPDCliFunc func([]string, pd.SecurityOption) (pd.ConfigClient, error), // for test
) (*pdConfHandler, error) {
addresses, _, err := ParsePath(localConf.Path)
if err != nil {
return nil, err
}
host := localConf.Host
if localConf.AdvertiseAddress != "" {
host = localConf.AdvertiseAddress
}
id := fmt.Sprintf("%v:%v", host, localConf.Port)
security := localConf.Security
if newPDCliFunc == nil {
newPDCliFunc = pd.NewConfigClient
}
pdCli, err := newPDCliFunc(addresses, pd.SecurityOption{
CAPath: security.ClusterSSLCA,
CertPath: security.ClusterSSLCert,
KeyPath: security.ClusterSSLKey,
})
if err != nil {
return nil, err
}
confContent, err := encodeConfig(localConf)
if err != nil {
return nil, err
}

// register to PD and get the new default config.
// see https://github.com/pingcap/tidb/pull/13660 for more details.
// suppose port and security config items cannot be change online.
status, version, conf, err := pdCli.Create(context.Background(), new(configpb.Version), tidbComponentName, id, confContent)
if err != nil {
logutil.Logger(context.Background()).Warn("register the config to PD error, local config will be used", zap.Error(err))
} else if status.Code != configpb.StatusCode_OK && status.Code != configpb.StatusCode_WRONG_VERSION {
logutil.Logger(context.Background()).Warn("invalid status when registering the config to PD", zap.String("code", status.Code.String()), zap.String("errmsg", status.Message))
conf = ""
}

tmpConf := *localConf // use the local config if the remote config is invalid
newConf := &tmpConf
if conf != "" {
newConf, err = decodeConfig(conf)
if err != nil {
logutil.Logger(context.Background()).Warn("decode remote config error", zap.Error(err))
newConf = &tmpConf
} else if err := newConf.Valid(); err != nil {
logutil.Logger(context.Background()).Warn("invalid remote config", zap.Error(err))
newConf = &tmpConf
}
}

ch := &pdConfHandler{
id: id,
version: version,
interval: pdConfHandlerRefreshInterval,
exit: make(chan struct{}),
pdConfCli: pdCli,
reloadFunc: reloadFunc,
}
ch.curConf.Store(newConf)
return ch, nil
}

func (ch *pdConfHandler) Start() {
ch.wg.Add(1)
go ch.run()
}

func (ch *pdConfHandler) Close() {
close(ch.exit)
ch.wg.Wait()
ch.pdConfCli.Close()
}

func (ch *pdConfHandler) GetConfig() *Config {
return ch.curConf.Load().(*Config)
}

func (ch *pdConfHandler) run() {
defer func() {
if r := recover(); r != nil {
logutil.Logger(context.Background()).Error("panic in the recoverable goroutine",
zap.Reflect("r", r),
zap.Stack("stack trace"))
}
ch.wg.Done()
}()

for {
select {
case <-time.After(ch.interval):
// fetch new config from PD
status, version, newConfContent, err := ch.pdConfCli.Get(context.Background(), ch.version, tidbComponentName, ch.id)
if err != nil {
logutil.Logger(context.Background()).Error("PDConfHandler fetch new config error", zap.Error(err))
continue
}
if status.Code == configpb.StatusCode_OK { // StatusCode_OK represents the request is successful and there is no change.
continue
}
if status.Code != configpb.StatusCode_WRONG_VERSION {
// StatusCode_WRONG_VERSION represents the request is successful and the config has been updated.
logutil.Logger(context.Background()).Error("PDConfHandler fetch new config PD error",
zap.Int("code", int(status.Code)), zap.String("message", status.Message))
continue
}
newConf, err := decodeConfig(newConfContent)
if err != nil {
logutil.Logger(context.Background()).Error("PDConfHandler decode config error", zap.Error(err))
continue
}
if err := newConf.Valid(); err != nil {
logutil.Logger(context.Background()).Error("PDConfHandler invalid config", zap.Error(err))
continue
}

ch.reloadFunc(ch.curConf.Load().(*Config), newConf)
ch.curConf.Store(newConf)
logutil.Logger(context.Background()).Info("PDConfHandler update config successfully",
zap.String("fromVersion", ch.version.String()), zap.String("toVersion", version.String()))
ch.version = version
case <-ch.exit:
return
}
}
}

func encodeConfig(conf *Config) (string, error) {
confBuf := bytes.NewBuffer(nil)
te := toml.NewEncoder(confBuf)
if err := te.Encode(conf); err != nil {
return "", errors.New("encode config error=" + err.Error())
}
return confBuf.String(), nil
}

func decodeConfig(content string) (*Config, error) {
c := new(Config)
_, err := toml.Decode(content, c)
return c, err
}
Loading

0 comments on commit ad1d07f

Please sign in to comment.