Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

config: introduce a config client to support load configs from PD online #14303

Merged
merged 19 commits into from
Feb 10, 2020
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"net/url"
"os"
"reflect"
"strings"
Expand All @@ -33,6 +34,7 @@ import (
"github.com/pingcap/tidb/util/logutil"
tracing "github.com/uber/jaeger-client-go/config"
"go.uber.org/atomic"
"go.uber.org/zap"
)

// Config number limitations
Expand Down Expand Up @@ -153,6 +155,16 @@ func (b *nullableBool) UnmarshalText(text []byte) error {
return nil
}

func (b nullableBool) MarshalText() ([]byte, error) {
if !b.IsValid {
return []byte(""), nil
}
if b.IsTrue {
return []byte("true"), nil
}
return []byte("false"), nil
}

func (b *nullableBool) UnmarshalJSON(data []byte) error {
var err error
var v interface{}
Expand Down Expand Up @@ -843,3 +855,28 @@ const (
OOMActionCancel = "cancel"
OOMActionLog = "log"
)

// ParsePath parses this path.
func ParsePath(path string) (etcdAddrs []string, disableGC bool, err error) {
var u *url.URL
u, err = url.Parse(path)
if err != nil {
err = errors.Trace(err)
return
}
if strings.ToLower(u.Scheme) != "tikv" {
err = errors.Errorf("Uri scheme expected[tikv] but found [%s]", u.Scheme)
qw4990 marked this conversation as resolved.
Show resolved Hide resolved
logutil.BgLogger().Error("parsePath error", zap.Error(err))
return
}
switch strings.ToLower(u.Query().Get("disableGC")) {
case "true":
disableGC = true
case "false", "":
default:
err = errors.New("disableGC flag should be true/false")
return
}
etcdAddrs = strings.Split(u.Host, ",")
return
}
214 changes: 214 additions & 0 deletions config/config_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
// Copyright 2019 PingCAP, Inc.
qw4990 marked this conversation as resolved.
Show resolved Hide resolved
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package config

import (
"bytes"
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/BurntSushi/toml"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/configpb"
"github.com/pingcap/pd/client"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)

// ConfHandler is used to load and update config online.
// See https://github.com/pingcap/tidb/pull/13660 for more details.
type ConfHandler interface {
Start()
Close()
GetConfig() *Config // read only
}

// ConfReloadFunc is used to reload the config to make it work.
type ConfReloadFunc func(oldConf, newConf *Config)

// NewConfHandler creates a new ConfHandler according to the local config.
func NewConfHandler(localConf *Config, reloadFunc ConfReloadFunc) (ConfHandler, error) {
switch defaultConf.Store {
case "tikv":
return newPDConfHandler(localConf, reloadFunc, nil)
default:
return &constantConfHandler{localConf}, nil
}
}

type constantConfHandler struct {
qw4990 marked this conversation as resolved.
Show resolved Hide resolved
conf *Config
}

func (cch *constantConfHandler) Start() {}

func (cch *constantConfHandler) Close() {}

func (cch *constantConfHandler) GetConfig() *Config { return cch.conf }

const (
pdConfHandlerRefreshInterval = 30 * time.Second
tidbComponentName = "tidb"
)

type pdConfHandler struct {
id string // ip:port
version *configpb.Version
curConf atomic.Value
interval time.Duration
wg sync.WaitGroup
exit chan struct{}
pdConfCli pd.ConfigClient
reloadFunc func(oldConf, newConf *Config)
}

func newPDConfHandler(localConf *Config, reloadFunc ConfReloadFunc,
newPDCliFunc func([]string, pd.SecurityOption) (pd.ConfigClient, error), // for test
) (*pdConfHandler, error) {
addresses, _, err := ParsePath(localConf.Path)
if err != nil {
return nil, err
}
host := localConf.Host
if localConf.AdvertiseAddress != "" {
host = localConf.AdvertiseAddress
}
id := fmt.Sprintf("%v:%v", host, localConf.Port)
security := localConf.Security
if newPDCliFunc == nil {
newPDCliFunc = pd.NewConfigClient
}
pdCli, err := newPDCliFunc(addresses, pd.SecurityOption{
CAPath: security.ClusterSSLCA,
CertPath: security.ClusterSSLCert,
KeyPath: security.ClusterSSLKey,
})
if err != nil {
return nil, errors.Trace(err)
}
confContent, err := encodeConfig(localConf)
if err != nil {
return nil, err
}

// register to PD and get the new default config.
// see https://github.com/pingcap/tidb/pull/13660 for more details.
// suppose port and security config items cannot be change online.
status, version, conf, err := pdCli.Create(context.Background(), new(configpb.Version), tidbComponentName, id, confContent)
if err != nil {
return nil, errors.Trace(err)
}
if status.Code != configpb.StatusCode_OK {
qw4990 marked this conversation as resolved.
Show resolved Hide resolved
return nil, errors.New(fmt.Sprintf("fail to register config to PD, errmsg=%v", status.Message))
}

newConf, err := decodeConfig(conf)
if err != nil {
return nil, err
}
if err := newConf.Valid(); err != nil {
return nil, errors.Trace(err)
qw4990 marked this conversation as resolved.
Show resolved Hide resolved
}

ch := &pdConfHandler{
id: id,
version: version,
interval: pdConfHandlerRefreshInterval,
exit: make(chan struct{}),
pdConfCli: pdCli,
reloadFunc: reloadFunc,
}
ch.curConf.Store(newConf)
return ch, nil
}

func (ch *pdConfHandler) Start() {
ch.wg.Add(1)
go ch.run()
}

func (ch *pdConfHandler) Close() {
close(ch.exit)
ch.wg.Wait()
ch.pdConfCli.Close()
}

func (ch *pdConfHandler) GetConfig() *Config {
return ch.curConf.Load().(*Config)
}

func (ch *pdConfHandler) run() {
defer func() {
qw4990 marked this conversation as resolved.
Show resolved Hide resolved
if r := recover(); r != nil {
logutil.Logger(context.Background()).Error("[PDConfHandler] panic: " + fmt.Sprintf("%v", r))
}
ch.wg.Done()
}()
for {
select {
case <-time.After(ch.interval):
// fetch new config from PD
status, version, newConfContent, err := ch.pdConfCli.Get(context.Background(), ch.version, tidbComponentName, ch.id)
if err != nil {
logutil.Logger(context.Background()).Error("[PDConfHandler] fetch new config error", zap.Error(err))
qw4990 marked this conversation as resolved.
Show resolved Hide resolved
continue
}
if status.Code == configpb.StatusCode_NOT_CHANGE {
continue
}
if status.Code != configpb.StatusCode_OK {
qw4990 marked this conversation as resolved.
Show resolved Hide resolved
logutil.Logger(context.Background()).Error("[PDConfHandler] fetch new config PD error",
zap.Int("code", int(status.Code)), zap.String("message", status.Message))
continue
}
newConf, err := decodeConfig(newConfContent)
if err != nil {
logutil.Logger(context.Background()).Error("[PDConfHandler] decode config error", zap.Error(err))
continue
}
if err := newConf.Valid(); err != nil {
logutil.Logger(context.Background()).Error("[PDConfHandler] invalid config", zap.Error(err))
continue
}

if ch.reloadFunc != nil {
qw4990 marked this conversation as resolved.
Show resolved Hide resolved
ch.reloadFunc(ch.curConf.Load().(*Config), newConf)
}
ch.curConf.Store(newConf)
logutil.Logger(context.Background()).Info("[PDConfHandler] update config successfully",
zap.String("fromVersion", ch.version.String()), zap.String("toVersion", version.String()))
ch.version = version
case <-ch.exit:
return
}
}
}

func encodeConfig(conf *Config) (string, error) {
confBuf := bytes.NewBuffer(nil)
te := toml.NewEncoder(confBuf)
if err := te.Encode(conf); err != nil {
return "", errors.New("encode config error=" + err.Error())
}
return confBuf.String(), nil
}

func decodeConfig(content string) (*Config, error) {
c := new(Config)
_, err := toml.Decode(content, c)
return c, errors.Trace(err)
}
Loading