Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

.*: support TLS #569

Merged
merged 56 commits into from
Jul 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
72ba1e8
add security config
WangXiangUSTC Mar 15, 2020
87d8a77
update mod
WangXiangUSTC Mar 15, 2020
14cb434
support tls in worker
WangXiangUSTC Mar 15, 2020
b32e4ac
update tools
WangXiangUSTC Mar 16, 2020
c1f1a9f
master support tls
WangXiangUSTC Mar 16, 2020
054ab17
dmctl support tls
WangXiangUSTC Mar 16, 2020
e681c98
add todo
WangXiangUSTC Mar 16, 2020
cd271fd
support tls for connection to database
WangXiangUSTC Mar 17, 2020
de430c3
minor update for database tls
WangXiangUSTC Mar 19, 2020
e8bed44
update tls test
WangXiangUSTC Mar 19, 2020
5cecdfc
use https
WangXiangUSTC Mar 22, 2020
218ee3f
refine code and add check scripts
WangXiangUSTC Mar 22, 2020
ba490ec
remove useless code
WangXiangUSTC Mar 22, 2020
f565236
minor update
WangXiangUSTC Mar 22, 2020
b89233c
minor fix
WangXiangUSTC Mar 22, 2020
b25e3f3
print tidb log
WangXiangUSTC Mar 23, 2020
2b9532f
update tidb config
WangXiangUSTC Mar 23, 2020
629ab7a
minor fix and update ut for config adjust
WangXiangUSTC Mar 23, 2020
d5054f9
update tidb-tools
WangXiangUSTC Mar 23, 2020
c69430a
merge master and resolve conflicts
WangXiangUSTC May 6, 2020
0d3d442
minor fix
WangXiangUSTC May 7, 2020
7bb9152
minor update
WangXiangUSTC May 7, 2020
2095335
Merge branch 'master' into xiang/tls
WangXiangUSTC May 7, 2020
05df068
Merge branch 'master' into xiang/tls
WangXiangUSTC May 8, 2020
3e3e7c1
merge master and resolve conflicts
WangXiangUSTC Jul 14, 2020
c56bc5c
Merge branch 'xiang/tls' of https://github.com/pingcap/dm into xiang/tls
WangXiangUSTC Jul 14, 2020
4c8c3d2
update task fiel
WangXiangUSTC Jul 14, 2020
7ef2b5a
minor fix
WangXiangUSTC Jul 14, 2020
ffc54ea
fix use http api
WangXiangUSTC Jul 15, 2020
77a9e95
add http api test
WangXiangUSTC Jul 15, 2020
0f36bb0
minor fix && address comment
WangXiangUSTC Jul 16, 2020
56aa0aa
update error
WangXiangUSTC Jul 16, 2020
628e719
fix test
WangXiangUSTC Jul 16, 2020
0f97c30
Apply suggestions from code review
WangXiangUSTC Jul 16, 2020
3cea158
use terror
WangXiangUSTC Jul 16, 2020
db51e01
Merge branch 'xiang/tls' of https://github.com/pingcap/dm into xiang/tls
WangXiangUSTC Jul 16, 2020
210f306
update terror
WangXiangUSTC Jul 16, 2020
23e1a5e
print log
WangXiangUSTC Jul 16, 2020
86ef642
merge mnaster
WangXiangUSTC Jul 16, 2020
fbd42d0
comment script
WangXiangUSTC Jul 16, 2020
23d020f
minor fix on test
WangXiangUSTC Jul 17, 2020
e862c9f
merge master
WangXiangUSTC Jul 17, 2020
bc5315f
merge master
WangXiangUSTC Jul 18, 2020
3627475
Merge branch 'master' into xiang/tls
GMHDBJD Jul 20, 2020
30f5fa1
Merge branch 'master' into xiang/tls
GMHDBJD Jul 20, 2020
5cf5d6c
address comemnt
WangXiangUSTC Jul 22, 2020
4ed6db9
add flag for tls
WangXiangUSTC Jul 22, 2020
3e656ee
update String for strArray
WangXiangUSTC Jul 22, 2020
d8a0fe0
remove useless config for embed etcd, and can works for allowd cn
WangXiangUSTC Jul 22, 2020
126769b
revert unexpect update && update test
WangXiangUSTC Jul 22, 2020
809fe07
remove https prefix check
WangXiangUSTC Jul 22, 2020
3a18348
update RegisterTLSConfig for mysql
WangXiangUSTC Jul 22, 2020
d4cf9d2
add func do in db's close
WangXiangUSTC Jul 23, 2020
66b8e9d
revert change
WangXiangUSTC Jul 23, 2020
677c679
Merge branch 'master' into xiang/tls
csuzhangxc Jul 23, 2020
32f0b57
address comment
WangXiangUSTC Jul 23, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ ErrElectionGetLeaderIDFail,[code=11108:class=functional:scope=internal:level=med
ErrBinlogInvalidFilenameWithUUIDSuffix,[code=11109:class=functional:scope=internal:level=high], "Message: invalid binlog filename with uuid suffix %s"
ErrDecodeEtcdKeyFail,[code=11110:class=functional:scope=internal:level=medium], "Message: fail to decode etcd key: %s"
ErrShardDDLOptimismTrySyncFail,[code=11111:class=functional:scope=internal:level=medium], "Message: fail to try sync the optimistic shard ddl lock %s: %s, Workaround: Please use show-ddl-locks command for more details."
ErrConnInvalidTLSConfig,[code=11112:class=functional:scope=internal:level=medium], "Message: invalid TLS config, Workaround: Please check the `ssl-ca`, `ssl-cert` and `ssl-key` config."
ErrConnRegistryTLSConfig,[code=11113:class=functional:scope=internal:level=medium], "Message: fail to registry TLS config"
ErrConfigCheckItemNotSupport,[code=20001:class=config:scope=internal:level=medium], "Message: checking item %s is not supported\n%s, Workaround: Please check `ignore-checking-items` config in task configuration file, which can be set including `all`/`dump_privilege`/`replication_privilege`/`version`/`binlog_enable`/`binlog_format`/`binlog_row_image`/`table_schema`/`schema_of_shard_tables`/`auto_increment_ID`."
ErrConfigTomlTransform,[code=20002:class=config:scope=internal:level=medium], "Message: %s, Workaround: Please check the configuration file has correct TOML format."
ErrConfigTaskYamlTransform,[code=20003:class=config:scope=internal:level=medium], "Message: %s, Workaround: Please check the configuration file has correct YAML format."
Expand Down Expand Up @@ -342,6 +344,8 @@ ErrMasterPessimistNotStarted,[code=38046:class=dm-master:scope=internal:level=me
ErrMasterOptimistNotStarted,[code=38047:class=dm-master:scope=internal:level=medium], "Message: the shardddl optimist has not started"
ErrMasterMasterNameNotExist,[code=38048:class=dm-master:scope=internal:level=low], "Message: dm-master with name %s not exists, Workaround: Please use list-member command to see masters."
ErrMasterInvalidOfflineType,[code=38049:class=dm-master:scope=internal:level=low], "Message: offline member type %s is invalid, Workaround: Please use master/worker."
ErrMasterAdvertisePeerURLsNotValid,[code=38050:class=dm-master:scope=internal:level=high], "Message: advertise peer urls %s not valid, Workaround: Please check the `advertise-peer-urls` config in master configuration file."
ErrMasterTLSConfigNotValid,[code=38051:class=dm-master:scope=internal:level=high], "Message: TLS config not valid, Workaround: Please check the `ssl-ca`, `ssl-cert` and `ssl-key` config in master configuration file."
ErrWorkerParseFlagSet,[code=40001:class=dm-worker:scope=internal:level=medium], "Message: parse dm-worker config flag set"
ErrWorkerInvalidFlag,[code=40002:class=dm-worker:scope=internal:level=medium], "Message: '%s' is an invalid flag"
ErrWorkerDecodeConfigFromFile,[code=40003:class=dm-worker:scope=internal:level=medium], "Message: toml decode file, Workaround: Please check the configuration file has correct TOML format."
Expand Down Expand Up @@ -417,6 +421,7 @@ ErrWorkerSourceNotMatch,[code=40072:class=dm-worker:scope=internal:level=high],
ErrWorkerFailToGetSubtaskConfigFromEtcd,[code=40073:class=dm-worker:scope=internal:level=medium], "Message: there is no relative subtask config for task %s in etcd"
ErrWorkerFailToGetSourceConfigFromEtcd,[code=40074:class=dm-worker:scope=internal:level=medium], "Message: there is no relative source config for source %s in etcd"
ErrWorkerDDLLockOpNotFound,[code=40075:class=dm-worker:scope=internal:level=high], "Message: missing shard DDL lock operation for shard DDL info (%s)"
ErrWorkerTLSConfigNotValid,[code=40076:class=dm-worker:scope=internal:level=high], "Message: TLS config not valid, Workaround: Please check the `ssl-ca`, `ssl-cert` and `ssl-key` config in worker configuration file."
ErrTracerParseFlagSet,[code=42001:class=dm-tracer:scope=internal:level=medium], "Message: parse dm-tracer config flag set"
ErrTracerConfigTomlTransform,[code=42002:class=dm-tracer:scope=internal:level=medium], "Message: config toml transform, Workaround: Please check the configuration file has correct TOML format."
ErrTracerConfigInvalidFlag,[code=42003:class=dm-tracer:scope=internal:level=medium], "Message: '%s' is an invalid flag"
Expand Down Expand Up @@ -457,4 +462,5 @@ ErrSchedulerSubTaskOpTaskNotExist,[code=46016:class=dm-master:scope=internal:lev
ErrSchedulerSubTaskOpSourceNotExist,[code=46017:class=dm-master:scope=internal:level=medium], "Message: sources %v need to be operate not exist"
ErrSchedulerTaskNotExist,[code=46018:class=scheduler:scope=internal:level=medium], "Message: task with name %s not exist, Workaround: Please use `query-status` command to see tasks."
ErrCtlGRPCCreateConn,[code=48001:class=dmctl:scope=internal:level=high], "Message: can not create grpc connection, Workaround: Please check your network connection."
ErrCtlInvalidTLSCfg,[code=48002:class=dmctl:scope=internal:level=medium], "Message: invalid TLS config, Workaround: Please check the `ssl-ca`, `ssl-cert` and `ssl-key` config in command line."
ErrNotSet,[code=50000:class=not-set:scope=not-set:level=high]
36 changes: 36 additions & 0 deletions dm/config/security.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package config
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved

import "fmt"

// Security config
type Security struct {
SSLCA string `toml:"ssl-ca" json:"ssl-ca" yaml:"ssl-ca"`
SSLCert string `toml:"ssl-cert" json:"ssl-cert" yaml:"ssl-cert"`
SSLKey string `toml:"ssl-key" json:"ssl-key" yaml:"ssl-key"`
CertAllowedCN strArray `toml:"cert-allowed-cn" json:"cert-allowed-cn" yaml:"cert-allowed-cn"`
}

// used for parse string slice in flag
type strArray []string

func (i *strArray) String() string {
return fmt.Sprint([]string(*i))
}

func (i *strArray) Set(value string) error {
*i = append(*i, value)
return nil
}
3 changes: 3 additions & 0 deletions dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ type DBConfig struct {
MaxAllowedPacket *int `toml:"max-allowed-packet" json:"max-allowed-packet" yaml:"max-allowed-packet"`
Session map[string]string `toml:"session" json:"session" yaml:"session"`

// security config
Security *Security `toml:"security" json:"security" yaml:"security"`

RawDBCfg *RawDBConfig `toml:"-" json:"-" yaml:"-"`
}

Expand Down
8 changes: 7 additions & 1 deletion dm/ctl/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"net"
"time"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/pkg/utils"

"github.com/BurntSushi/toml"
Expand Down Expand Up @@ -54,6 +55,9 @@ func NewConfig() *Config {
fs.StringVar(&cfg.MasterAddr, "master-addr", "", "master API server addr")
fs.StringVar(&cfg.RPCTimeoutStr, "rpc-timeout", defaultRPCTimeout, fmt.Sprintf("rpc timeout, default is %s", defaultRPCTimeout))
fs.StringVar(&cfg.encrypt, EncryptCmdName, "", "encrypt plaintext to ciphertext")
fs.StringVar(&cfg.SSLCA, "ssl-ca", "", "path of file that contains list of trusted SSL CAs for connection")
fs.StringVar(&cfg.SSLCert, "ssl-cert", "", "path of file that contains X509 certificate in PEM format for connection")
fs.StringVar(&cfg.SSLKey, "ssl-key", "", "path of file that contains X509 key in PEM format for connection")
fs.StringVar(&cfg.decrypt, DecryptCmdName, "", "decrypt ciphertext to plaintext")

return cfg
Expand All @@ -72,7 +76,9 @@ type Config struct {

printVersion bool
encrypt string // string need to be encrypted
decrypt string // string need to be decrypted

config.Security
decrypt string // string need to be decrypted
}

func (c *Config) String() string {
Expand Down
13 changes: 10 additions & 3 deletions dm/ctl/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strings"
"time"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
parserpkg "github.com/pingcap/dm/pkg/parser"
"github.com/pingcap/dm/pkg/terror"
Expand All @@ -29,6 +30,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/parser"
"github.com/pingcap/parser/ast"
toolutils "github.com/pingcap/tidb-tools/pkg/utils"
"github.com/spf13/cobra"
"google.golang.org/grpc"
)
Expand All @@ -41,12 +43,17 @@ var (
// InitUtils inits necessary dmctl utils
func InitUtils(cfg *Config) error {
globalConfig = cfg
return errors.Trace(InitClient(cfg.MasterAddr))
return errors.Trace(InitClient(cfg.MasterAddr, cfg.Security))
}

// InitClient initializes dm-master client
func InitClient(addr string) error {
conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(3*time.Second), grpc.WithBlock(), grpc.WithTimeout(3*time.Second))
func InitClient(addr string, securityCfg config.Security) error {
tls, err := toolutils.NewTLS(securityCfg.SSLCA, securityCfg.SSLCert, securityCfg.SSLKey, "", securityCfg.CertAllowedCN)
if err != nil {
return terror.ErrCtlInvalidTLSCfg.Delegate(err)
}

conn, err := grpc.Dial(addr, tls.ToGRPCDialOption(), grpc.WithBackoffMaxDelay(3*time.Second), grpc.WithBlock(), grpc.WithTimeout(3*time.Second))
if err != nil {
return terror.ErrCtlGRPCCreateConn.AnnotateDelegate(err, "can't connect to %s", addr)
}
Expand Down
35 changes: 34 additions & 1 deletion dm/master/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ import (
"net/url"
"os"
"strings"
"sync/atomic"
"time"

"github.com/BurntSushi/toml"
"go.etcd.io/etcd/embed"
"go.uber.org/zap"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
Expand Down Expand Up @@ -75,6 +77,11 @@ func NewConfig() *Config {
fs.StringVar(&cfg.AdvertisePeerUrls, "advertise-peer-urls", "", `advertise URLs for peer traffic (default "${peer-urls}")`)
fs.StringVar(&cfg.Join, "join", "", `join to an existing cluster (usage: cluster's "${master-addr}" list, e.g. "127.0.0.1:8261,127.0.0.1:18261"`)

fs.StringVar(&cfg.SSLCA, "ssl-ca", "", "path of file that contains list of trusted SSL CAs for connection")
fs.StringVar(&cfg.SSLCert, "ssl-cert", "", "path of file that contains X509 certificate in PEM format for connection")
fs.StringVar(&cfg.SSLKey, "ssl-key", "", "path of file that contains X509 key in PEM format for connection")
fs.Var(&cfg.CertAllowedCN, "cert-allowed-cn", "the trusted common name that allowed to visit")

return cfg
}

Expand Down Expand Up @@ -129,6 +136,9 @@ type Config struct {
Join string `toml:"join" json:"join"` // cluster's client address (endpoints), not peer address
Debug bool `toml:"debug" json:"debug"` // only use for test

// tls config
config.Security
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved

printVersion bool
printSampleConfig bool
}
Expand Down Expand Up @@ -358,6 +368,25 @@ func (c *Config) genEmbedEtcdConfig(cfg *embed.Config) (*embed.Config, error) {
return nil, terror.ErrMasterGenEmbedEtcdConfigFail.AnnotateDelegate(err, "fail to validate embed etcd config")
}

// security config
if len(c.SSLCA) != 0 {
cfg.ClientTLSInfo.TrustedCAFile = c.SSLCA
cfg.ClientTLSInfo.CertFile = c.SSLCert
cfg.ClientTLSInfo.KeyFile = c.SSLKey

cfg.PeerTLSInfo.TrustedCAFile = c.SSLCA
cfg.PeerTLSInfo.CertFile = c.SSLCert
cfg.PeerTLSInfo.KeyFile = c.SSLKey

// NOTE: etcd only support one allowed CN
if len(c.CertAllowedCN) > 0 {
cfg.ClientTLSInfo.AllowedCN = c.CertAllowedCN[0]
cfg.PeerTLSInfo.AllowedCN = c.CertAllowedCN[0]
cfg.PeerTLSInfo.ClientCertAuth = len(c.SSLCA) != 0
cfg.ClientTLSInfo.ClientCertAuth = len(c.SSLCA) != 0
}
}

return cfg, nil
}

Expand All @@ -378,7 +407,11 @@ func parseURLs(s string) ([]url.URL, error) {
// `127.0.0.1:8261`: first path segment in URL cannot contain colon
if err != nil && (strings.Contains(err.Error(), "missing protocol scheme") ||
strings.Contains(err.Error(), "first path segment in URL cannot contain colon")) {
u, err = url.Parse("http://" + item)
prefix := "http://"
if atomic.LoadInt32(&useTLS) == 1 {
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
prefix = "https://"
}
u, err = url.Parse(prefix + item)
}
if err != nil {
return nil, terror.ErrMasterParseURLFail.Delegate(err, item)
Expand Down
11 changes: 11 additions & 0 deletions dm/master/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,13 +300,24 @@ func (t *testConfigSuite) TestParseURLs(c *check.C) {
str: "http://127.0.0.1:8291",
urls: []url.URL{{Scheme: "http", Host: "127.0.0.1:8291"}},
},
{
str: "https://127.0.0.1:8291",
urls: []url.URL{{Scheme: "https", Host: "127.0.0.1:8291"}},
},
{
str: "http://127.0.0.1:8291,http://127.0.0.1:18291",
urls: []url.URL{
{Scheme: "http", Host: "127.0.0.1:8291"},
{Scheme: "http", Host: "127.0.0.1:18291"},
},
},
{
str: "https://127.0.0.1:8291,https://127.0.0.1:18291",
urls: []url.URL{
{Scheme: "https", Host: "127.0.0.1:8291"},
{Scheme: "https", Host: "127.0.0.1:18291"},
},
},
{
str: "127.0.0.1:8291", // no scheme
urls: []url.URL{{Scheme: "http", Host: "127.0.0.1:8291"}},
Expand Down
9 changes: 8 additions & 1 deletion dm/master/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/failpoint"

toolutils "github.com/pingcap/tidb-tools/pkg/utils"
"go.uber.org/zap"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -89,7 +90,13 @@ func (s *Server) electionNotify(ctx context.Context) {
func (s *Server) createLeaderClient(leaderAddr string) {
s.closeLeaderClient()

conn, err := grpc.Dial(leaderAddr, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(3*time.Second))
tls, err := toolutils.NewTLS(s.cfg.SSLCA, s.cfg.SSLCert, s.cfg.SSLKey, s.cfg.AdvertiseAddr, s.cfg.CertAllowedCN)
if err != nil {
log.L().Error("can't create grpc connection with leader, can't forward request to leader", zap.String("leader", leaderAddr), zap.Error(err))
return
}

conn, err := grpc.Dial(leaderAddr, tls.ToGRPCDialOption(), grpc.WithBackoffMaxDelay(3*time.Second))
if err != nil {
log.L().Error("can't create grpc connection with leader, can't forward request to leader", zap.String("leader", leaderAddr), zap.Error(err))
return
Expand Down
2 changes: 1 addition & 1 deletion dm/master/election_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (t *testMaster) TestFailToStartLeader(c *check.C) {
c.Assert(s2.Start(ctx), check.IsNil)
defer s2.Close()

client, err := etcdutil.CreateClient(strings.Split(cfg1.AdvertisePeerUrls, ","))
client, err := etcdutil.CreateClient(strings.Split(cfg1.AdvertisePeerUrls, ","), nil)
c.Assert(err, check.IsNil)
defer client.Close()

Expand Down
8 changes: 7 additions & 1 deletion dm/master/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strings"
"time"

toolutils "github.com/pingcap/tidb-tools/pkg/utils"
"go.etcd.io/etcd/embed"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand Down Expand Up @@ -124,8 +125,13 @@ func prepareJoinEtcd(cfg *Config) error {
return nil
}

tlsCfg, err := toolutils.ToTLSConfig(cfg.SSLCA, cfg.SSLCert, cfg.SSLKey)
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, "generate tls config")
}

// if without previous data, we need a client to contact with the existing cluster.
client, err := etcdutil.CreateClient(strings.Split(cfg.Join, ","))
client, err := etcdutil.CreateClient(strings.Split(cfg.Join, ","), tlsCfg)
if err != nil {
return terror.ErrMasterJoinEmbedEtcdFail.Delegate(err, fmt.Sprintf("create etcd client for %s", cfg.Join))
}
Expand Down
18 changes: 15 additions & 3 deletions dm/master/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"net/http"
"net/http/pprof"

"github.com/gogo/gateway"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"google.golang.org/grpc"

Expand Down Expand Up @@ -45,16 +46,27 @@ func getStatusHandle() http.Handler {
}

// getHTTPAPIHandler returns a HTTP handler to handle DM-master APIs.
func getHTTPAPIHandler(ctx context.Context, addr string) (http.Handler, error) {
func getHTTPAPIHandler(ctx context.Context, addr string, securityOpt grpc.DialOption) (http.Handler, error) {
// dial the real API server in non-blocking mode, it may not started yet.
opts := []grpc.DialOption{grpc.WithInsecure()}
opts := []grpc.DialOption{securityOpt}
// NOTE: should we need to replace `host` in `addr` to `127.0.0.1`?
conn, err := grpc.DialContext(ctx, addr, opts...)
if err != nil {
return nil, terror.ErrMasterHandleHTTPApis.Delegate(err)
}

gwmux := runtime.NewServeMux()
jsonpb := &gateway.JSONPb{
EmitDefaults: true,
Indent: " ",
OrigName: true,
}
Comment on lines +58 to +62
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will change the output of HTTP API, the old will output all the result in one line, after this pr will output the same with dmctl. so some integration test need to update

Copy link
Member

@csuzhangxc csuzhangxc Jul 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this what you want to do or you have to do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not necessary, I just found these code in another project, and this will make the output of HTTP response more clear, so add here


gwmux := runtime.NewServeMux(
runtime.WithMarshalerOption(runtime.MIMEWildcard, jsonpb),
// This is necessary to get error details properly
// marshalled in unary requests.
runtime.WithProtoErrorHandler(runtime.DefaultHTTPProtoErrorHandler),
)
err = pb.RegisterMasterHandler(ctx, gwmux, conn)
if err != nil {
return nil, terror.ErrMasterHandleHTTPApis.Delegate(err)
Expand Down
7 changes: 5 additions & 2 deletions dm/master/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,12 @@ type Scheduler struct {
// delete:
// - remove/stop subtask by user request (calling `RemoveSubTasks`).
expectSubTaskStages map[string]map[string]ha.Stage

securityCfg config.Security
}

// NewScheduler creates a new scheduler instance.
func NewScheduler(pLogger *log.Logger) *Scheduler {
func NewScheduler(pLogger *log.Logger, securityCfg config.Security) *Scheduler {
return &Scheduler{
logger: pLogger.WithFields(zap.String("component", "scheduler")),
sourceCfgs: make(map[string]config.SourceConfig),
Expand All @@ -152,6 +154,7 @@ func NewScheduler(pLogger *log.Logger) *Scheduler {
unbounds: make(map[string]struct{}),
expectRelayStages: make(map[string]ha.Stage),
expectSubTaskStages: make(map[string]map[string]ha.Stage),
securityCfg: securityCfg,
}
}

Expand Down Expand Up @@ -1368,7 +1371,7 @@ func (s *Scheduler) boundSourceToWorker(source string, w *Worker) error {
// this func is used when adding a new worker.
// NOTE: trigger scheduler when the worker become online, not when added.
func (s *Scheduler) recordWorker(info ha.WorkerInfo) (*Worker, error) {
w, err := NewWorker(info)
w, err := NewWorker(info, s.securityCfg)
if err != nil {
return nil, err
}
Expand Down
Loading