Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

BR support TLS #161

Merged
merged 3 commits into from
Feb 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ require (
github.com/fsouza/fake-gcs-server v1.15.0
github.com/go-sql-driver/mysql v1.4.1
github.com/gogo/protobuf v1.3.1
github.com/golang/snappy v0.0.1 // indirect
github.com/google/btree v1.0.0
github.com/google/uuid v1.1.1
github.com/mattn/go-runewidth v0.0.7 // indirect
Expand All @@ -31,6 +30,9 @@ require (
github.com/sirupsen/logrus v1.4.2
github.com/spf13/cobra v0.0.5
github.com/spf13/pflag v1.0.3
github.com/syndtr/goleveldb v1.0.1-0.20190625010220-02440ea7a285 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect
go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738
go.opencensus.io v0.22.2 // indirect
go.uber.org/zap v1.13.0
golang.org/x/net v0.0.0-20191011234655-491137f69257 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -264,10 +264,12 @@ github.com/nicksnyder/go-i18n v1.10.0/go.mod h1:HrK7VCrbOvQoUAQ7Vpy7i87N7JZZZ7R2
github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
github.com/onsi/ginkgo v1.6.0 h1:Ix8l273rp3QzYgXSR+c8d1fTG7UPgYkOSELPhiY/YGw=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.10.3 h1:OoxbjfXVZyod1fmWYhI7SEyaD8B00ynP3T+D5GiyHOY=
github.com/onsi/ginkgo v1.10.3/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.2 h1:3mYCb7aPxS/RU7TI1y4rkEn1oKmPRjNJLNEXgw7MH2I=
github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.7.1 h1:K0jcRCwNQM3vFGh1ppMtDh/+7ApJrjldlX8fA0jDTLQ=
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/opentracing/basictracer-go v1.0.0 h1:YyUAhaEfjoWXclZVJ9sGoNct7j4TVk7lZWlQw5UXuoo=
Expand Down Expand Up @@ -388,11 +390,15 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/syndtr/goleveldb v0.0.0-20180815032940-ae2bd5eed72d h1:4J9HCZVpvDmj2tiKGSTUnb3Ok/9CEQb9oqu9LHKQQpc=
github.com/syndtr/goleveldb v0.0.0-20180815032940-ae2bd5eed72d/go.mod h1:Z4AUp2Km+PwemOoO/VB5AOx9XSsIItzFjoJlOSiYmn0=
github.com/syndtr/goleveldb v1.0.1-0.20190625010220-02440ea7a285 h1:uSDYjYejelKyceA6DiCsngFof9jAyeaSyX9XC5a1a7Q=
github.com/syndtr/goleveldb v1.0.1-0.20190625010220-02440ea7a285/go.mod h1:9OrXJhf154huy1nPWmuSrkgjPUtUNhA+Zmy+6AESzuA=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6 h1:lYIiVDtZnyTWlNwiAxLj0bbpTcx1BWCFhXjfsvmPdNc=
github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 h1:LnC5Kc/wtumK+WB441p7ynQJzVuNRJiqddSIE3IlSEQ=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/uber-go/atomic v1.3.2 h1:Azu9lPBWRNKzYXSIwRfgRuDuS0YKsK4NFhiQv98gkxo=
github.com/uber-go/atomic v1.3.2/go.mod h1:/Ct5t2lcmbJ4OSe/waGBoaVvVqtO0bmtfVNex1PFV8g=
github.com/uber/jaeger-client-go v2.15.0+incompatible h1:NP3qsSqNxh8VYr956ur1N/1C1PjvOJnJykCzcD5QHbk=
Expand Down
42 changes: 36 additions & 6 deletions pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package conn
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
Expand All @@ -25,6 +26,7 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"

"github.com/pingcap/br/pkg/glue"
Expand All @@ -44,6 +46,7 @@ type Mgr struct {
addrs []string
cli *http.Client
}
tlsConf *tls.Config
dom *domain.Domain
storage tikv.Storage
grpcClis struct {
Expand All @@ -58,9 +61,6 @@ func pdRequest(
ctx context.Context,
addr string, prefix string,
cli *http.Client, method string, body io.Reader) ([]byte, error) {
if addr != "" && !strings.HasPrefix("http", addr) {
addr = "http://" + addr
}
u, err := url.Parse(addr)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -88,12 +88,33 @@ func pdRequest(
}

// NewMgr creates a new Mgr.
func NewMgr(ctx context.Context, g glue.Glue, pdAddrs string, storage tikv.Storage) (*Mgr, error) {
func NewMgr(
ctx context.Context,
g glue.Glue,
pdAddrs string,
storage tikv.Storage,
tlsConf *tls.Config,
securityOption pd.SecurityOption) (*Mgr, error) {
3pointer marked this conversation as resolved.
Show resolved Hide resolved
addrs := strings.Split(pdAddrs, ",")

failure := errors.Errorf("pd address (%s) has wrong format", pdAddrs)
cli := &http.Client{Timeout: 30 * time.Second}
if tlsConf != nil {
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.TLSClientConfig = tlsConf
cli.Transport = transport
}

processedAddrs := make([]string, 0, len(addrs))
for _, addr := range addrs {
if addr != "" && !strings.HasPrefix("http", addr) {
if tlsConf != nil {
addr = "https://" + addr
} else {
addr = "http://" + addr
}
}
processedAddrs = append(processedAddrs, addr)
_, failure = pdRequest(ctx, addr, clusterVersionPrefix, cli, http.MethodGet, nil)
// TODO need check cluster version >= 3.1 when br release
if failure == nil {
Expand All @@ -104,7 +125,7 @@ func NewMgr(ctx context.Context, g glue.Glue, pdAddrs string, storage tikv.Stora
return nil, errors.Annotatef(failure, "pd address (%s) not available, please check network", pdAddrs)
}

pdClient, err := pd.NewClient(addrs, pd.SecurityOption{})
pdClient, err := pd.NewClient(addrs, securityOption)
if err != nil {
log.Error("fail to create pd client", zap.Error(err))
return nil, err
Expand Down Expand Up @@ -140,8 +161,9 @@ func NewMgr(ctx context.Context, g glue.Glue, pdAddrs string, storage tikv.Stora
pdClient: pdClient,
storage: storage,
dom: dom,
tlsConf: tlsConf,
}
mgr.pdHTTP.addrs = addrs
mgr.pdHTTP.addrs = processedAddrs
mgr.pdHTTP.cli = cli
mgr.grpcClis.clis = make(map[uint64]*grpc.ClientConn)
return mgr, nil
Expand Down Expand Up @@ -217,6 +239,9 @@ func (mgr *Mgr) getGrpcConnLocked(ctx context.Context, storeID uint64) (*grpc.Cl
return nil, errors.Trace(err)
}
opt := grpc.WithInsecure()
if mgr.tlsConf != nil {
opt = grpc.WithTransportCredentials(credentials.NewTLS(mgr.tlsConf))
}
ctx, cancel := context.WithTimeout(ctx, dialTimeout)
keepAlive := 10
keepAliveTimeout := 3
Expand Down Expand Up @@ -269,6 +294,11 @@ func (mgr *Mgr) GetTiKV() tikv.Storage {
return mgr.storage
}

// GetTLSConfig returns the tls config
func (mgr *Mgr) GetTLSConfig() *tls.Config {
return mgr.tlsConf
}

// GetLockResolver gets the LockResolver.
func (mgr *Mgr) GetLockResolver() *tikv.LockResolver {
return mgr.storage.GetLockResolver()
Expand Down
2 changes: 2 additions & 0 deletions pkg/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/pingcap/parser/model"
pd "github.com/pingcap/pd/client"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
Expand All @@ -13,6 +14,7 @@ import (
type Glue interface {
BootstrapSession(store kv.Storage) (*domain.Domain, error)
CreateSession(store kv.Storage) (Session, error)
Open(path string, option pd.SecurityOption) (kv.Storage, error)
}

// Session is an abstraction of the session.Session interface.
Expand Down
15 changes: 15 additions & 0 deletions pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ import (
"context"

"github.com/pingcap/parser/model"
pd "github.com/pingcap/pd/client"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/tikv"

"github.com/pingcap/br/pkg/glue"
)
Expand All @@ -35,6 +38,18 @@ func (Glue) CreateSession(store kv.Storage) (glue.Session, error) {
return &tidbSession{se: se}, nil
}

// Open implements glue.Glue
func (Glue) Open(path string, option pd.SecurityOption) (kv.Storage, error) {
if option.CAPath != "" {
conf := config.GetGlobalConfig()
conf.Security.ClusterSSLCA = option.CAPath
conf.Security.ClusterSSLCert = option.CertPath
conf.Security.ClusterSSLKey = option.KeyPath
config.StoreGlobalConfig(conf)
}
return tikv.Driver{}.Open(path)
}

// Execute implements glue.Session
func (gs *tidbSession) Execute(ctx context.Context, sql string) error {
_, err := gs.se.Execute(ctx, sql)
Expand Down
19 changes: 16 additions & 3 deletions pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package restore

import (
"context"
"crypto/tls"
"encoding/json"
"math"
"sort"
Expand All @@ -20,6 +21,7 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"

"github.com/pingcap/br/pkg/checksum"
Expand All @@ -41,6 +43,7 @@ type Client struct {
fileImporter FileImporter
workerPool *utils.WorkerPool
tableWorkerPool *utils.WorkerPool
tlsConf *tls.Config

databases map[string]*utils.Database
ddlJobs []*model.Job
Expand All @@ -57,6 +60,7 @@ func NewRestoreClient(
g glue.Glue,
pdClient pd.Client,
store kv.Storage,
tlsConf *tls.Config,
) (*Client, error) {
ctx, cancel := context.WithCancel(ctx)
db, err := NewDB(g, store)
Expand All @@ -71,6 +75,7 @@ func NewRestoreClient(
pdClient: pdClient,
tableWorkerPool: utils.NewWorkerPool(128, "table"),
db: db,
tlsConf: tlsConf,
}, nil
}

Expand Down Expand Up @@ -112,8 +117,8 @@ func (rc *Client) InitBackupMeta(backupMeta *backup.BackupMeta, backend *backup.
rc.backupMeta = backupMeta
log.Info("load backupmeta", zap.Int("databases", len(rc.databases)), zap.Int("jobs", len(rc.ddlJobs)))

metaClient := NewSplitClient(rc.pdClient)
importClient := NewImportClient(metaClient)
metaClient := NewSplitClient(rc.pdClient, rc.tlsConf)
importClient := NewImportClient(metaClient, rc.tlsConf)
rc.fileImporter = NewFileImporter(rc.ctx, metaClient, importClient, backend, rc.rateLimit)
return nil
}
Expand All @@ -128,6 +133,11 @@ func (rc *Client) EnableOnline() {
rc.isOnline = true
}

// GetTLSConfig returns the tls config
func (rc *Client) GetTLSConfig() *tls.Config {
return rc.tlsConf
}

// GetTS gets a new timestamp from PD
func (rc *Client) GetTS(ctx context.Context) (uint64, error) {
p, l, err := rc.pdClient.GetTS(ctx)
Expand All @@ -145,7 +155,7 @@ func (rc *Client) ResetTS(pdAddrs []string) error {
i := 0
return utils.WithRetry(rc.ctx, func() error {
idx := i % len(pdAddrs)
return utils.ResetTS(pdAddrs[idx], restoreTS)
return utils.ResetTS(pdAddrs[idx], restoreTS, rc.tlsConf)
}, newResetTSBackoffer())
}

Expand Down Expand Up @@ -332,6 +342,9 @@ func (rc *Client) switchTiKVMode(ctx context.Context, mode import_sstpb.SwitchMo
bfConf.MaxDelay = time.Second * 3
for _, store := range stores {
opt := grpc.WithInsecure()
if rc.tlsConf != nil {
opt = grpc.WithTransportCredentials(credentials.NewTLS(rc.tlsConf))
}
gctx, cancel := context.WithTimeout(ctx, time.Second*5)
keepAlive := 10
keepAliveTimeout := 3
Expand Down
12 changes: 10 additions & 2 deletions pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package restore

import (
"context"
"crypto/tls"
"strings"
"sync"
"time"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/pingcap/pd/pkg/codec"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"

"github.com/pingcap/br/pkg/summary"
"github.com/pingcap/br/pkg/utils"
Expand Down Expand Up @@ -47,13 +49,15 @@ type importClient struct {
mu sync.Mutex
metaClient SplitClient
clients map[uint64]import_sstpb.ImportSSTClient
tlsConf *tls.Config
}

// NewImportClient returns a new ImporterClient
func NewImportClient(metaClient SplitClient) ImporterClient {
func NewImportClient(metaClient SplitClient, tlsConf *tls.Config) ImporterClient {
return &importClient{
metaClient: metaClient,
clients: make(map[uint64]import_sstpb.ImportSSTClient),
tlsConf: tlsConf,
}
}

Expand Down Expand Up @@ -107,7 +111,11 @@ func (ic *importClient) getImportClient(
if err != nil {
return nil, err
}
conn, err := grpc.Dial(store.GetAddress(), grpc.WithInsecure())
opt := grpc.WithInsecure()
if ic.tlsConf != nil {
opt = grpc.WithTransportCredentials(credentials.NewTLS(ic.tlsConf))
}
conn, err := grpc.Dial(store.GetAddress(), opt)
if err != nil {
return nil, err
}
Expand Down
12 changes: 10 additions & 2 deletions pkg/restore/split_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package restore
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io/ioutil"
Expand All @@ -20,6 +21,7 @@ import (
pd "github.com/pingcap/pd/client"
"github.com/pingcap/pd/server/schedule/placement"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

// SplitClient is an external client used by RegionSplitter.
Expand Down Expand Up @@ -58,13 +60,15 @@ type SplitClient interface {
type pdClient struct {
mu sync.Mutex
client pd.Client
tlsConf *tls.Config
storeCache map[uint64]*metapb.Store
}

// NewSplitClient returns a client used by RegionSplitter.
func NewSplitClient(client pd.Client) SplitClient {
func NewSplitClient(client pd.Client, tlsConf *tls.Config) SplitClient {
return &pdClient{
client: client,
tlsConf: tlsConf,
storeCache: make(map[uint64]*metapb.Store),
}
}
Expand Down Expand Up @@ -199,7 +203,11 @@ func (c *pdClient) BatchSplitRegions(
if err != nil {
return nil, err
}
conn, err := grpc.Dial(store.GetAddress(), grpc.WithInsecure())
opt := grpc.WithInsecure()
if c.tlsConf != nil {
opt = grpc.WithTransportCredentials(credentials.NewTLS(c.tlsConf))
}
conn, err := grpc.Dial(store.GetAddress(), opt)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/restore/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func SplitRanges(
elapsed := time.Since(start)
summary.CollectDuration("split region", elapsed)
}()
splitter := NewRegionSplitter(NewSplitClient(client.GetPDClient()))
splitter := NewRegionSplitter(NewSplitClient(client.GetPDClient(), client.GetTLSConfig()))
return splitter.Split(ctx, ranges, rewriteRules, func(keys [][]byte) {
for range keys {
updateCh <- struct{}{}
Expand Down
2 changes: 1 addition & 1 deletion pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
if err != nil {
return err
}
mgr, err := newMgr(ctx, g, cfg.PD)
mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS)
if err != nil {
return err
}
Expand Down
Loading