Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
.*: support TLS (#569)
Browse files Browse the repository at this point in the history
  • Loading branch information
WangXiangUSTC authored Jul 23, 2020
1 parent 63dfa75 commit 7ec79ae
Show file tree
Hide file tree
Showing 59 changed files with 846 additions and 80 deletions.
6 changes: 6 additions & 0 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ ErrElectionGetLeaderIDFail,[code=11108:class=functional:scope=internal:level=med
ErrBinlogInvalidFilenameWithUUIDSuffix,[code=11109:class=functional:scope=internal:level=high], "Message: invalid binlog filename with uuid suffix %s"
ErrDecodeEtcdKeyFail,[code=11110:class=functional:scope=internal:level=medium], "Message: fail to decode etcd key: %s"
ErrShardDDLOptimismTrySyncFail,[code=11111:class=functional:scope=internal:level=medium], "Message: fail to try sync the optimistic shard ddl lock %s: %s, Workaround: Please use show-ddl-locks command for more details."
ErrConnInvalidTLSConfig,[code=11112:class=functional:scope=internal:level=medium], "Message: invalid TLS config, Workaround: Please check the `ssl-ca`, `ssl-cert` and `ssl-key` config."
ErrConnRegistryTLSConfig,[code=11113:class=functional:scope=internal:level=medium], "Message: fail to registry TLS config"
ErrConfigCheckItemNotSupport,[code=20001:class=config:scope=internal:level=medium], "Message: checking item %s is not supported\n%s, Workaround: Please check `ignore-checking-items` config in task configuration file, which can be set including `all`/`dump_privilege`/`replication_privilege`/`version`/`binlog_enable`/`binlog_format`/`binlog_row_image`/`table_schema`/`schema_of_shard_tables`/`auto_increment_ID`."
ErrConfigTomlTransform,[code=20002:class=config:scope=internal:level=medium], "Message: %s, Workaround: Please check the configuration file has correct TOML format."
ErrConfigTaskYamlTransform,[code=20003:class=config:scope=internal:level=medium], "Message: %s, Workaround: Please check the configuration file has correct YAML format."
Expand Down Expand Up @@ -342,6 +344,8 @@ ErrMasterPessimistNotStarted,[code=38046:class=dm-master:scope=internal:level=me
ErrMasterOptimistNotStarted,[code=38047:class=dm-master:scope=internal:level=medium], "Message: the shardddl optimist has not started"
ErrMasterMasterNameNotExist,[code=38048:class=dm-master:scope=internal:level=low], "Message: dm-master with name %s not exists, Workaround: Please use list-member command to see masters."
ErrMasterInvalidOfflineType,[code=38049:class=dm-master:scope=internal:level=low], "Message: offline member type %s is invalid, Workaround: Please use master/worker."
ErrMasterAdvertisePeerURLsNotValid,[code=38050:class=dm-master:scope=internal:level=high], "Message: advertise peer urls %s not valid, Workaround: Please check the `advertise-peer-urls` config in master configuration file."
ErrMasterTLSConfigNotValid,[code=38051:class=dm-master:scope=internal:level=high], "Message: TLS config not valid, Workaround: Please check the `ssl-ca`, `ssl-cert` and `ssl-key` config in master configuration file."
ErrWorkerParseFlagSet,[code=40001:class=dm-worker:scope=internal:level=medium], "Message: parse dm-worker config flag set"
ErrWorkerInvalidFlag,[code=40002:class=dm-worker:scope=internal:level=medium], "Message: '%s' is an invalid flag"
ErrWorkerDecodeConfigFromFile,[code=40003:class=dm-worker:scope=internal:level=medium], "Message: toml decode file, Workaround: Please check the configuration file has correct TOML format."
Expand Down Expand Up @@ -417,6 +421,7 @@ ErrWorkerSourceNotMatch,[code=40072:class=dm-worker:scope=internal:level=high],
ErrWorkerFailToGetSubtaskConfigFromEtcd,[code=40073:class=dm-worker:scope=internal:level=medium], "Message: there is no relative subtask config for task %s in etcd"
ErrWorkerFailToGetSourceConfigFromEtcd,[code=40074:class=dm-worker:scope=internal:level=medium], "Message: there is no relative source config for source %s in etcd"
ErrWorkerDDLLockOpNotFound,[code=40075:class=dm-worker:scope=internal:level=high], "Message: missing shard DDL lock operation for shard DDL info (%s)"
ErrWorkerTLSConfigNotValid,[code=40076:class=dm-worker:scope=internal:level=high], "Message: TLS config not valid, Workaround: Please check the `ssl-ca`, `ssl-cert` and `ssl-key` config in worker configuration file."
ErrTracerParseFlagSet,[code=42001:class=dm-tracer:scope=internal:level=medium], "Message: parse dm-tracer config flag set"
ErrTracerConfigTomlTransform,[code=42002:class=dm-tracer:scope=internal:level=medium], "Message: config toml transform, Workaround: Please check the configuration file has correct TOML format."
ErrTracerConfigInvalidFlag,[code=42003:class=dm-tracer:scope=internal:level=medium], "Message: '%s' is an invalid flag"
Expand Down Expand Up @@ -457,4 +462,5 @@ ErrSchedulerSubTaskOpTaskNotExist,[code=46016:class=dm-master:scope=internal:lev
ErrSchedulerSubTaskOpSourceNotExist,[code=46017:class=dm-master:scope=internal:level=medium], "Message: sources %v need to be operate not exist"
ErrSchedulerTaskNotExist,[code=46018:class=scheduler:scope=internal:level=medium], "Message: task with name %s not exist, Workaround: Please use `query-status` command to see tasks."
ErrCtlGRPCCreateConn,[code=48001:class=dmctl:scope=internal:level=high], "Message: can not create grpc connection, Workaround: Please check your network connection."
ErrCtlInvalidTLSCfg,[code=48002:class=dmctl:scope=internal:level=medium], "Message: invalid TLS config, Workaround: Please check the `ssl-ca`, `ssl-cert` and `ssl-key` config in command line."
ErrNotSet,[code=50000:class=not-set:scope=not-set:level=high]
36 changes: 36 additions & 0 deletions dm/config/security.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package config

import "fmt"

// Security config
type Security struct {
SSLCA string `toml:"ssl-ca" json:"ssl-ca" yaml:"ssl-ca"`
SSLCert string `toml:"ssl-cert" json:"ssl-cert" yaml:"ssl-cert"`
SSLKey string `toml:"ssl-key" json:"ssl-key" yaml:"ssl-key"`
CertAllowedCN strArray `toml:"cert-allowed-cn" json:"cert-allowed-cn" yaml:"cert-allowed-cn"`
}

// used for parse string slice in flag
type strArray []string

func (i *strArray) String() string {
return fmt.Sprint([]string(*i))
}

func (i *strArray) Set(value string) error {
*i = append(*i, value)
return nil
}
3 changes: 3 additions & 0 deletions dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ type DBConfig struct {
MaxAllowedPacket *int `toml:"max-allowed-packet" json:"max-allowed-packet" yaml:"max-allowed-packet"`
Session map[string]string `toml:"session" json:"session" yaml:"session"`

// security config
Security *Security `toml:"security" json:"security" yaml:"security"`

RawDBCfg *RawDBConfig `toml:"-" json:"-" yaml:"-"`
}

Expand Down
8 changes: 7 additions & 1 deletion dm/ctl/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"net"
"time"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/pkg/utils"

"github.com/BurntSushi/toml"
Expand Down Expand Up @@ -54,6 +55,9 @@ func NewConfig() *Config {
fs.StringVar(&cfg.MasterAddr, "master-addr", "", "master API server addr")
fs.StringVar(&cfg.RPCTimeoutStr, "rpc-timeout", defaultRPCTimeout, fmt.Sprintf("rpc timeout, default is %s", defaultRPCTimeout))
fs.StringVar(&cfg.encrypt, EncryptCmdName, "", "encrypt plaintext to ciphertext")
fs.StringVar(&cfg.SSLCA, "ssl-ca", "", "path of file that contains list of trusted SSL CAs for connection")
fs.StringVar(&cfg.SSLCert, "ssl-cert", "", "path of file that contains X509 certificate in PEM format for connection")
fs.StringVar(&cfg.SSLKey, "ssl-key", "", "path of file that contains X509 key in PEM format for connection")
fs.StringVar(&cfg.decrypt, DecryptCmdName, "", "decrypt ciphertext to plaintext")

return cfg
Expand All @@ -72,7 +76,9 @@ type Config struct {

printVersion bool
encrypt string // string need to be encrypted
decrypt string // string need to be decrypted

config.Security
decrypt string // string need to be decrypted
}

func (c *Config) String() string {
Expand Down
13 changes: 10 additions & 3 deletions dm/ctl/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strings"
"time"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
parserpkg "github.com/pingcap/dm/pkg/parser"
"github.com/pingcap/dm/pkg/terror"
Expand All @@ -29,6 +30,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/parser"
"github.com/pingcap/parser/ast"
toolutils "github.com/pingcap/tidb-tools/pkg/utils"
"github.com/spf13/cobra"
"google.golang.org/grpc"
)
Expand All @@ -41,12 +43,17 @@ var (
// InitUtils inits necessary dmctl utils
func InitUtils(cfg *Config) error {
globalConfig = cfg
return errors.Trace(InitClient(cfg.MasterAddr))
return errors.Trace(InitClient(cfg.MasterAddr, cfg.Security))
}

// InitClient initializes dm-master client
func InitClient(addr string) error {
conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(3*time.Second), grpc.WithBlock(), grpc.WithTimeout(3*time.Second))
func InitClient(addr string, securityCfg config.Security) error {
tls, err := toolutils.NewTLS(securityCfg.SSLCA, securityCfg.SSLCert, securityCfg.SSLKey, "", securityCfg.CertAllowedCN)
if err != nil {
return terror.ErrCtlInvalidTLSCfg.Delegate(err)
}

conn, err := grpc.Dial(addr, tls.ToGRPCDialOption(), grpc.WithBackoffMaxDelay(3*time.Second), grpc.WithBlock(), grpc.WithTimeout(3*time.Second))
if err != nil {
return terror.ErrCtlGRPCCreateConn.AnnotateDelegate(err, "can't connect to %s", addr)
}
Expand Down
35 changes: 34 additions & 1 deletion dm/master/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ import (
"net/url"
"os"
"strings"
"sync/atomic"
"time"

"github.com/BurntSushi/toml"
"go.etcd.io/etcd/embed"
"go.uber.org/zap"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
Expand Down Expand Up @@ -75,6 +77,11 @@ func NewConfig() *Config {
fs.StringVar(&cfg.AdvertisePeerUrls, "advertise-peer-urls", "", `advertise URLs for peer traffic (default "${peer-urls}")`)
fs.StringVar(&cfg.Join, "join", "", `join to an existing cluster (usage: cluster's "${master-addr}" list, e.g. "127.0.0.1:8261,127.0.0.1:18261"`)

fs.StringVar(&cfg.SSLCA, "ssl-ca", "", "path of file that contains list of trusted SSL CAs for connection")
fs.StringVar(&cfg.SSLCert, "ssl-cert", "", "path of file that contains X509 certificate in PEM format for connection")
fs.StringVar(&cfg.SSLKey, "ssl-key", "", "path of file that contains X509 key in PEM format for connection")
fs.Var(&cfg.CertAllowedCN, "cert-allowed-cn", "the trusted common name that allowed to visit")

return cfg
}

Expand Down Expand Up @@ -129,6 +136,9 @@ type Config struct {
Join string `toml:"join" json:"join"` // cluster's client address (endpoints), not peer address
Debug bool `toml:"debug" json:"debug"` // only use for test

// tls config
config.Security

printVersion bool
printSampleConfig bool
}
Expand Down Expand Up @@ -358,6 +368,25 @@ func (c *Config) genEmbedEtcdConfig(cfg *embed.Config) (*embed.Config, error) {
return nil, terror.ErrMasterGenEmbedEtcdConfigFail.AnnotateDelegate(err, "fail to validate embed etcd config")
}

// security config
if len(c.SSLCA) != 0 {
cfg.ClientTLSInfo.TrustedCAFile = c.SSLCA
cfg.ClientTLSInfo.CertFile = c.SSLCert
cfg.ClientTLSInfo.KeyFile = c.SSLKey

cfg.PeerTLSInfo.TrustedCAFile = c.SSLCA
cfg.PeerTLSInfo.CertFile = c.SSLCert
cfg.PeerTLSInfo.KeyFile = c.SSLKey

// NOTE: etcd only support one allowed CN
if len(c.CertAllowedCN) > 0 {
cfg.ClientTLSInfo.AllowedCN = c.CertAllowedCN[0]
cfg.PeerTLSInfo.AllowedCN = c.CertAllowedCN[0]
cfg.PeerTLSInfo.ClientCertAuth = len(c.SSLCA) != 0
cfg.ClientTLSInfo.ClientCertAuth = len(c.SSLCA) != 0
}
}

return cfg, nil
}

Expand All @@ -378,7 +407,11 @@ func parseURLs(s string) ([]url.URL, error) {
// `127.0.0.1:8261`: first path segment in URL cannot contain colon
if err != nil && (strings.Contains(err.Error(), "missing protocol scheme") ||
strings.Contains(err.Error(), "first path segment in URL cannot contain colon")) {
u, err = url.Parse("http://" + item)
prefix := "http://"
if atomic.LoadInt32(&useTLS) == 1 {
prefix = "https://"
}
u, err = url.Parse(prefix + item)
}
if err != nil {
return nil, terror.ErrMasterParseURLFail.Delegate(err, item)
Expand Down
11 changes: 11 additions & 0 deletions dm/master/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,13 +300,24 @@ func (t *testConfigSuite) TestParseURLs(c *check.C) {
str: "http://127.0.0.1:8291",
urls: []url.URL{{Scheme: "http", Host: "127.0.0.1:8291"}},
},
{
str: "https://127.0.0.1:8291",
urls: []url.URL{{Scheme: "https", Host: "127.0.0.1:8291"}},
},
{
str: "http://127.0.0.1:8291,http://127.0.0.1:18291",
urls: []url.URL{
{Scheme: "http", Host: "127.0.0.1:8291"},
{Scheme: "http", Host: "127.0.0.1:18291"},
},
},
{
str: "https://127.0.0.1:8291,https://127.0.0.1:18291",
urls: []url.URL{
{Scheme: "https", Host: "127.0.0.1:8291"},
{Scheme: "https", Host: "127.0.0.1:18291"},
},
},
{
str: "127.0.0.1:8291", // no scheme
urls: []url.URL{{Scheme: "http", Host: "127.0.0.1:8291"}},
Expand Down
9 changes: 8 additions & 1 deletion dm/master/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/failpoint"

toolutils "github.com/pingcap/tidb-tools/pkg/utils"
"go.uber.org/zap"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -89,7 +90,13 @@ func (s *Server) electionNotify(ctx context.Context) {
func (s *Server) createLeaderClient(leaderAddr string) {
s.closeLeaderClient()

conn, err := grpc.Dial(leaderAddr, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(3*time.Second))
tls, err := toolutils.NewTLS(s.cfg.SSLCA, s.cfg.SSLCert, s.cfg.SSLKey, s.cfg.AdvertiseAddr, s.cfg.CertAllowedCN)
if err != nil {
log.L().Error("can't create grpc connection with leader, can't forward request to leader", zap.String("leader", leaderAddr), zap.Error(err))
return
}

conn, err := grpc.Dial(leaderAddr, tls.ToGRPCDialOption(), grpc.WithBackoffMaxDelay(3*time.Second))
if err != nil {
log.L().Error("can't create grpc connection with leader, can't forward request to leader", zap.String("leader", leaderAddr), zap.Error(err))
return
Expand Down
2 changes: 1 addition & 1 deletion dm/master/election_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (t *testMaster) TestFailToStartLeader(c *check.C) {
c.Assert(s2.Start(ctx), check.IsNil)
defer s2.Close()

client, err := etcdutil.CreateClient(strings.Split(cfg1.AdvertisePeerUrls, ","))
client, err := etcdutil.CreateClient(strings.Split(cfg1.AdvertisePeerUrls, ","), nil)
c.Assert(err, check.IsNil)
defer client.Close()

Expand Down
8 changes: 7 additions & 1 deletion dm/master/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strings"
"time"

toolutils "github.com/pingcap/tidb-tools/pkg/utils"
"go.etcd.io/etcd/embed"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand Down Expand Up @@ -124,8 +125,13 @@ func prepareJoinEtcd(cfg *Config) error {
return nil
}

tlsCfg, err := toolutils.ToTLSConfig(cfg.SSLCA, cfg.SSLCert, cfg.SSLKey)
if err != nil {
return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, "generate tls config")
}

// if without previous data, we need a client to contact with the existing cluster.
client, err := etcdutil.CreateClient(strings.Split(cfg.Join, ","))
client, err := etcdutil.CreateClient(strings.Split(cfg.Join, ","), tlsCfg)
if err != nil {
return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, fmt.Sprintf("create etcd client for %s", cfg.Join))
}
Expand Down
18 changes: 15 additions & 3 deletions dm/master/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"net/http"
"net/http/pprof"

"github.com/gogo/gateway"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"google.golang.org/grpc"

Expand Down Expand Up @@ -45,16 +46,27 @@ func getStatusHandle() http.Handler {
}

// getHTTPAPIHandler returns a HTTP handler to handle DM-master APIs.
func getHTTPAPIHandler(ctx context.Context, addr string) (http.Handler, error) {
func getHTTPAPIHandler(ctx context.Context, addr string, securityOpt grpc.DialOption) (http.Handler, error) {
// dial the real API server in non-blocking mode, it may not started yet.
opts := []grpc.DialOption{grpc.WithInsecure()}
opts := []grpc.DialOption{securityOpt}
// NOTE: should we need to replace `host` in `addr` to `127.0.0.1`?
conn, err := grpc.DialContext(ctx, addr, opts...)
if err != nil {
return nil, terror.ErrMasterHandleHTTPApis.Delegate(err)
}

gwmux := runtime.NewServeMux()
jsonpb := &gateway.JSONPb{
EmitDefaults: true,
Indent: " ",
OrigName: true,
}

gwmux := runtime.NewServeMux(
runtime.WithMarshalerOption(runtime.MIMEWildcard, jsonpb),
// This is necessary to get error details properly
// marshalled in unary requests.
runtime.WithProtoErrorHandler(runtime.DefaultHTTPProtoErrorHandler),
)
err = pb.RegisterMasterHandler(ctx, gwmux, conn)
if err != nil {
return nil, terror.ErrMasterHandleHTTPApis.Delegate(err)
Expand Down
7 changes: 5 additions & 2 deletions dm/master/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,12 @@ type Scheduler struct {
// delete:
// - remove/stop subtask by user request (calling `RemoveSubTasks`).
expectSubTaskStages map[string]map[string]ha.Stage

securityCfg config.Security
}

// NewScheduler creates a new scheduler instance.
func NewScheduler(pLogger *log.Logger) *Scheduler {
func NewScheduler(pLogger *log.Logger, securityCfg config.Security) *Scheduler {
return &Scheduler{
logger: pLogger.WithFields(zap.String("component", "scheduler")),
sourceCfgs: make(map[string]config.SourceConfig),
Expand All @@ -152,6 +154,7 @@ func NewScheduler(pLogger *log.Logger) *Scheduler {
unbounds: make(map[string]struct{}),
expectRelayStages: make(map[string]ha.Stage),
expectSubTaskStages: make(map[string]map[string]ha.Stage),
securityCfg: securityCfg,
}
}

Expand Down Expand Up @@ -1368,7 +1371,7 @@ func (s *Scheduler) boundSourceToWorker(source string, w *Worker) error {
// this func is used when adding a new worker.
// NOTE: trigger scheduler when the worker become online, not when added.
func (s *Scheduler) recordWorker(info ha.WorkerInfo) (*Worker, error) {
w, err := NewWorker(info)
w, err := NewWorker(info, s.securityCfg)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 7ec79ae

Please sign in to comment.