Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

loader(dm): register tls config when clean checkpoint #3522

Merged
merged 31 commits into from
Dec 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
ceff2c5
dm/lightning: register tls config when clean checkpoint
Ehco1996 Nov 18, 2021
38032dd
fix lint
Ehco1996 Nov 18, 2021
3b2d9cc
fix again
Ehco1996 Nov 19, 2021
929a355
fix logger config
Ehco1996 Nov 19, 2021
0a41374
revert core change
Ehco1996 Nov 19, 2021
02039f7
Merge branch 'master' into lightning-tls
Ehco1996 Nov 19, 2021
2a732cc
Merge branch 'master' into lightning-tls
Ehco1996 Nov 22, 2021
89940a8
Merge branch 'master' into lightning-tls
Ehco1996 Nov 23, 2021
16df138
Merge branch 'master' into lightning-tls
Ehco1996 Dec 6, 2021
557a06a
Merge branch 'master' into lightning-tls
Ehco1996 Dec 7, 2021
8007383
enable test
Ehco1996 Dec 7, 2021
c787aca
fix ctl
Ehco1996 Dec 7, 2021
0685924
update dep
Ehco1996 Dec 8, 2021
8093c6e
add multi tls test
Ehco1996 Dec 8, 2021
c584bac
fix test
Ehco1996 Dec 8, 2021
8ee46cd
fix for syncdiff
Ehco1996 Dec 8, 2021
e7ff6d7
fix sync diff config
Ehco1996 Dec 8, 2021
f904eff
Merge branch 'master' into lightning-tls
Ehco1996 Dec 8, 2021
6bdda82
fix
Ehco1996 Dec 8, 2021
f63c58a
fix comment
Ehco1996 Dec 8, 2021
045d82c
add more print to debug
Ehco1996 Dec 8, 2021
b49cc99
fix test
Ehco1996 Dec 8, 2021
2b04bb6
clean up test dir
Ehco1996 Dec 8, 2021
aead0da
Merge branch 'master' into lightning-tls
Ehco1996 Dec 8, 2021
388bfce
Merge branch 'master' into lightning-tls
Ehco1996 Dec 8, 2021
e2b938b
open spell checker for bash and address comment
Ehco1996 Dec 9, 2021
d134f15
revert logging postion for debug
Ehco1996 Dec 9, 2021
4566dde
fix sh
Ehco1996 Dec 9, 2021
fa145d0
remove uselses lock
Ehco1996 Dec 14, 2021
be7c29b
Merge branch 'master' into lightning-tls
ti-chi-bot Dec 14, 2021
3966613
Merge branch 'master' into lightning-tls
ti-chi-bot Dec 14, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 40 additions & 48 deletions dm/loader/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,11 @@ import (
"strings"
"sync"

"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb-tools/pkg/dbutil"
"github.com/pingcap/tidb/br/pkg/lightning"
"github.com/pingcap/tidb/br/pkg/lightning/common"
lcfg "github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/parser/mysql"
"go.etcd.io/etcd/clientv3"
"go.uber.org/atomic"
"go.uber.org/zap"
Expand All @@ -38,6 +35,7 @@ import (
"github.com/pingcap/ticdc/dm/pkg/conn"
tcontext "github.com/pingcap/ticdc/dm/pkg/context"
"github.com/pingcap/ticdc/dm/pkg/log"
"github.com/pingcap/ticdc/dm/pkg/terror"
"github.com/pingcap/ticdc/dm/pkg/utils"
)

Expand All @@ -51,36 +49,38 @@ const (
type LightningLoader struct {
sync.RWMutex

cfg *config.SubTaskConfig
cli *clientv3.Client
checkPoint CheckPoint
checkPointList *LightningCheckpointList
workerName string
logger log.Logger
core *lightning.Lightning
toDB *conn.BaseDB
toDBConns []*DBConn
lightningConfig *lcfg.GlobalConfig
timeZone string
timeZone string
lightningGlobalConfig *lcfg.GlobalConfig
cfg *config.SubTaskConfig

checkPoint CheckPoint
checkPointList *LightningCheckpointList

logger log.Logger
cli *clientv3.Client
core *lightning.Lightning
cancel context.CancelFunc // for per task context, which maybe different from lightning context

toDBConns []*DBConn
toDB *conn.BaseDB

workerName string
finish atomic.Bool
closed atomic.Bool
metaBinlog atomic.String
metaBinlogGTID atomic.String
cancel context.CancelFunc // for per task context, which maybe different from lightning context
}

// NewLightning creates a new Loader importing data with lightning.
func NewLightning(cfg *config.SubTaskConfig, cli *clientv3.Client, workerName string) *LightningLoader {
lightningCfg := makeGlobalConfig(cfg)
core := lightning.New(lightningCfg)
loader := &LightningLoader{
cfg: cfg,
cli: cli,
core: core,
lightningConfig: lightningCfg,
logger: log.With(zap.String("task", cfg.Name), zap.String("unit", "lightning-load")),
workerName: workerName,
cfg: cfg,
cli: cli,
workerName: workerName,
lightningGlobalConfig: lightningCfg,
core: lightning.New(lightningCfg),
logger: log.With(zap.String("task", cfg.Name), zap.String("unit", "lightning-load")),
}
return loader
}
Expand All @@ -91,19 +91,23 @@ func makeGlobalConfig(cfg *config.SubTaskConfig) *lcfg.GlobalConfig {
lightningCfg.Security.CAPath = cfg.To.Security.SSLCA
lightningCfg.Security.CertPath = cfg.To.Security.SSLCert
lightningCfg.Security.KeyPath = cfg.To.Security.SSLKey
// use task name as tls config name to prevent multiple subtasks from conflicting with each other
lightningCfg.Security.TLSConfigName = cfg.Name
}
lightningCfg.TiDB.Host = cfg.To.Host
lightningCfg.TiDB.Psw = cfg.To.Password
lightningCfg.TiDB.User = cfg.To.User
lightningCfg.TiDB.Port = cfg.To.Port
lightningCfg.TiDB.StatusPort = cfg.TiDB.StatusPort
lightningCfg.TiDB.PdAddr = cfg.TiDB.PdAddr
lightningCfg.TiDB.LogLevel = cfg.LogLevel
lightningCfg.TikvImporter.Backend = cfg.TiDB.Backend
lightningCfg.PostRestore.Checksum = lcfg.OpLevelOff
if cfg.TiDB.Backend == lcfg.BackendLocal {
lightningCfg.TikvImporter.SortedKVDir = cfg.Dir
}
lightningCfg.Mydumper.SourceDir = cfg.Dir
lightningCfg.App.Config.File = "" // make lightning not init logger, see more in https://github.com/pingcap/tidb/pull/29291
return lightningCfg
}

Expand Down Expand Up @@ -156,8 +160,8 @@ func (l *LightningLoader) Init(ctx context.Context) (err error) {
}

func (l *LightningLoader) runLightning(ctx context.Context, cfg *lcfg.Config) error {
l.Lock()
taskCtx, cancel := context.WithCancel(ctx)
l.Lock()
l.cancel = cancel
l.Unlock()
err := l.core.RunOnce(taskCtx, cfg, nil)
Expand All @@ -172,7 +176,6 @@ func (l *LightningLoader) runLightning(ctx context.Context, cfg *lcfg.Config) er
}
}
})
l.logger.Info("end runLightning")
return err
}

Expand Down Expand Up @@ -204,39 +207,30 @@ func (l *LightningLoader) restore(ctx context.Context) error {
}
}
cfg := lcfg.NewConfig()
if err = cfg.LoadFromGlobal(l.lightningConfig); err != nil {
if err = cfg.LoadFromGlobal(l.lightningGlobalConfig); err != nil {
return err
}
cfg.Routes = l.cfg.RouteRules
cfg.Checkpoint.Driver = lcfg.CheckpointDriverMySQL
cfg.Checkpoint.Schema = config.TiDBLightningCheckpointPrefix + dbutil.TableName(l.workerName, l.cfg.Name)
cfg.Checkpoint.KeepAfterSuccess = lcfg.CheckpointOrigin
param := common.MySQLConnectParam{
Host: cfg.TiDB.Host,
Port: cfg.TiDB.Port,
User: cfg.TiDB.User,
Password: cfg.TiDB.Psw,
SQLMode: mysql.DefaultSQLMode,
MaxAllowedPacket: 64 * units.MiB,
TLS: cfg.TiDB.TLS,
}
cfg.Checkpoint.DSN = param.ToDSN()
cfg.TiDB.Vars = make(map[string]string)
if l.cfg.To.Session != nil {
for k, v := range l.cfg.To.Session {
cfg.TiDB.Vars[k] = v
}
}

cfg.TiDB.StrSQLMode = l.cfg.LoaderConfig.SQLMode
cfg.TiDB.Vars = map[string]string{
"time_zone": l.timeZone,
}
if err = cfg.Adjust(ctx); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lightning will auto adjust config in hehe, don't need to adjust again

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hehe 😄

return err
}
cfg.TiDB.Vars = map[string]string{"time_zone": l.timeZone}
err = l.runLightning(ctx, cfg)
if err == nil {
// lightning will auto deregister tls when task done, so we need to register it again for removing checkpoint
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we might let lightning not deregister the certificate. If there're more than one subtasks, the certificate may still be deregister between line 245 and line 249

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now lightning allow register tls config with different name, and it test is passed, ptal 🧡

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CheckpointRemove should be postponed to cleanmeta. I will add this logic in a following PR, so LGTM.

if l.cfg.To.Security != nil {
if registerErr := cfg.Security.RegisterMySQL(); registerErr != nil {
return terror.ErrConnRegistryTLSConfig.Delegate(registerErr)
}
defer cfg.Security.DeregisterMySQL()
}
err = lightning.CheckpointRemove(ctx, cfg, "all")
}
if err == nil {
Expand Down Expand Up @@ -300,10 +294,7 @@ func (l *LightningLoader) Process(ctx context.Context, pr chan pb.ProcessResult)
default:
}
l.logger.Info("lightning load end", zap.Bool("IsCanceled", isCanceled))
pr <- pb.ProcessResult{
IsCanceled: isCanceled,
Errors: errs,
}
pr <- pb.ProcessResult{IsCanceled: isCanceled, Errors: errs}
}

func (l *LightningLoader) isClosed() bool {
Expand Down Expand Up @@ -345,7 +336,7 @@ func (l *LightningLoader) Resume(ctx context.Context, pr chan pb.ProcessResult)
l.logger.Warn("try to resume, but already closed")
return
}
l.core = lightning.New(l.lightningConfig)
l.core = lightning.New(l.lightningGlobalConfig)
// continue the processing
l.Process(ctx, pr)
}
Expand All @@ -355,7 +346,8 @@ func (l *LightningLoader) Resume(ctx context.Context, pr chan pb.ProcessResult)
// now no config diff implemented, so simply re-init use new config
// no binlog filter for loader need to update.
func (l *LightningLoader) Update(ctx context.Context, cfg *config.SubTaskConfig) error {
// update l.cfg
l.Lock()
defer l.Unlock()
l.cfg.BAList = cfg.BAList
l.cfg.RouteRules = cfg.RouteRules
l.cfg.ColumnMappingRules = cfg.ColumnMappingRules
Expand Down
25 changes: 25 additions & 0 deletions dm/tests/tls/conf/diff_config-2.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
check-struct-only = false
check-thread-count = 4
export-fix-sql = true

[routes.rule1]
schema-pattern = "tls"
target-schema = "tls2"

[task]
output-dir = "/tmp/ticdc_dm_test/output"
source-instances = ["mysql1"]
target-check-tables = ["tls2.t"]
target-instance = "tidb0"

[data-sources.mysql1]
host = "127.0.0.1"
password = "123456"
port = 3306
route-rules = ["rule1"]
user = "root"

[data-sources.tidb0]
host = "127.0.0.1"
port = 4400
user = "root"
52 changes: 52 additions & 0 deletions dm/tests/tls/conf/dm-task-2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
---
name: test2
task-mode: all
is-sharding: false
meta-schema: "dm_meta"

target-database:
host: "127.0.0.1"
port: 4400
user: "root"
password: ""
security:
ssl-ca: "dir-placeholer/task-ca.pem"
ssl-cert: "dir-placeholer/dm.pem"
ssl-key: "dir-placeholer/dm.key"

mysql-instances:
- source-id: "mysql-replica-01"
black-white-list: "instance"
route-rules: ["route-rule-1"]
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"

black-white-list:
instance:
do-dbs: ["tls"]

routes:
route-rule-1:
schema-pattern: "tls"
target-schema: "tls2"

mydumpers:
global:
threads: 4
chunk-filesize: 0
skip-tz-utc: true
extra-args: "--statement-size=100"

loaders:
global:
pool-size: 16
dir: "./dumped_data"

syncers:
global:
worker-count: 16
batch: 100

tidb:
backend: "tidb"
4 changes: 2 additions & 2 deletions dm/tests/tls/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,5 @@ syncers:
worker-count: 16
batch: 100

#tidb:
# backend: "tidb"
tidb:
backend: "tidb"
Loading