Skip to content

Commit

Permalink
lightning: always get latest PD leader when access PD after initializ…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Oct 17, 2023
1 parent acdac74 commit 49e9daf
Show file tree
Hide file tree
Showing 20 changed files with 185 additions and 47 deletions.
3 changes: 3 additions & 0 deletions br/pkg/lightning/importer/check_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ func TestCheckCSVHeader(t *testing.T) {
dbMetas,
preInfoGetter,
nil,
nil,
)
preInfoGetter.dbInfosCache = rc.dbInfos
err = rc.checkCSVHeader(ctx)
Expand Down Expand Up @@ -465,6 +466,7 @@ func TestCheckTableEmpty(t *testing.T) {
dbMetas,
preInfoGetter,
nil,
nil,
)

rc := &Controller{
Expand Down Expand Up @@ -622,6 +624,7 @@ func TestLocalResource(t *testing.T) {
nil,
preInfoGetter,
nil,
nil,
)
rc := &Controller{
cfg: cfg,
Expand Down
17 changes: 12 additions & 5 deletions br/pkg/lightning/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ func NewImportControllerWithPauser(
}

preCheckBuilder := NewPrecheckItemBuilder(
cfg, p.DBMetas, preInfoGetter, cpdb,
cfg, p.DBMetas, preInfoGetter, cpdb, pdCli,
)

rc := &Controller{
Expand Down Expand Up @@ -488,6 +488,8 @@ func (rc *Controller) Close() {

// Run starts the restore task.
func (rc *Controller) Run(ctx context.Context) error {
failpoint.Inject("beforeRun", func() {})

opts := []func(context.Context) error{
rc.setGlobalVariables,
rc.restoreSchema,
Expand Down Expand Up @@ -1395,7 +1397,7 @@ const (

func (rc *Controller) keepPauseGCForDupeRes(ctx context.Context) (<-chan struct{}, error) {
tlsOpt := rc.tls.ToPDSecurityOption()
pdCli, err := pd.NewClientWithContext(ctx, []string{rc.cfg.TiDB.PdAddr}, tlsOpt)
pdCli, err := pd.NewClientWithContext(ctx, []string{rc.pdCli.GetLeaderAddr()}, tlsOpt)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -1556,8 +1558,13 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) {
}

// Disable GC because TiDB enables GC already.

currentLeaderAddr := rc.pdCli.GetLeaderAddr()
// remove URL scheme
currentLeaderAddr = strings.TrimPrefix(currentLeaderAddr, "http://")
currentLeaderAddr = strings.TrimPrefix(currentLeaderAddr, "https://")
kvStore, err = driver.TiKVDriver{}.OpenWithOptions(
fmt.Sprintf("tikv://%s?disableGC=true&keyspaceName=%s", rc.cfg.TiDB.PdAddr, rc.keyspaceName),
fmt.Sprintf("tikv://%s?disableGC=true&keyspaceName=%s", currentLeaderAddr, rc.keyspaceName),
driver.WithSecurity(rc.tls.ToTiKVSecurityConfig()),
)
if err != nil {
Expand Down Expand Up @@ -1769,7 +1776,7 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) {
}

func (rc *Controller) registerTaskToPD(ctx context.Context) (undo func(), _ error) {
etcdCli, err := dialEtcdWithCfg(ctx, rc.cfg)
etcdCli, err := dialEtcdWithCfg(ctx, rc.cfg, rc.pdCli.GetLeaderAddr())
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -2181,7 +2188,7 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error {
rc.status.TotalFileSize.Store(estimatedSizeResult.SizeWithoutIndex)
}
if isLocalBackend(rc.cfg) {
pdController, err := pdutil.NewPdController(ctx, rc.cfg.TiDB.PdAddr,
pdController, err := pdutil.NewPdController(ctx, rc.pdCli.GetLeaderAddr(),
rc.tls.TLSConfig(), rc.tls.ToPDSecurityOption())
if err != nil {
return common.NormalizeOrWrapErr(common.ErrCreatePDClient, err)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/importer/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func TestPreCheckFailed(t *testing.T) {
dbMetas: make([]*mydump.MDDatabaseMeta, 0),
}
cpdb := panicCheckpointDB{}
theCheckBuilder := NewPrecheckItemBuilder(cfg, make([]*mydump.MDDatabaseMeta, 0), preInfoGetter, cpdb)
theCheckBuilder := NewPrecheckItemBuilder(cfg, make([]*mydump.MDDatabaseMeta, 0), preInfoGetter, cpdb, nil)
ctl := &Controller{
cfg: cfg,
saveCpCh: make(chan saveCp),
Expand Down
37 changes: 26 additions & 11 deletions br/pkg/lightning/importer/precheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,21 @@ func WithPrecheckKey(ctx context.Context, key precheckContextKey, val any) conte

// PrecheckItemBuilder is used to build precheck items
type PrecheckItemBuilder struct {
cfg *config.Config
dbMetas []*mydump.MDDatabaseMeta
preInfoGetter PreImportInfoGetter
checkpointsDB checkpoints.DB
cfg *config.Config
dbMetas []*mydump.MDDatabaseMeta
preInfoGetter PreImportInfoGetter
checkpointsDB checkpoints.DB
pdLeaderAddrGetter func() string
}

// NewPrecheckItemBuilderFromConfig creates a new PrecheckItemBuilder from config
// pdCli **must not** be nil for local backend
func NewPrecheckItemBuilderFromConfig(ctx context.Context, cfg *config.Config, pdCli pd.Client, opts ...ropts.PrecheckItemBuilderOption) (*PrecheckItemBuilder, error) {
func NewPrecheckItemBuilderFromConfig(
ctx context.Context,
cfg *config.Config,
pdCli pd.Client,
opts ...ropts.PrecheckItemBuilderOption,
) (*PrecheckItemBuilder, error) {
var gerr error
builderCfg := new(ropts.PrecheckItemBuilderConfig)
for _, o := range opts {
Expand Down Expand Up @@ -71,7 +77,7 @@ func NewPrecheckItemBuilderFromConfig(ctx context.Context, cfg *config.Config, p
if err != nil {
return nil, errors.Trace(err)
}
return NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, cpdb), gerr
return NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, cpdb, pdCli), gerr
}

// NewPrecheckItemBuilder creates a new PrecheckItemBuilder
Expand All @@ -80,12 +86,21 @@ func NewPrecheckItemBuilder(
dbMetas []*mydump.MDDatabaseMeta,
preInfoGetter PreImportInfoGetter,
checkpointsDB checkpoints.DB,
pdCli pd.Client,
) *PrecheckItemBuilder {
leaderAddrGetter := func() string {
return cfg.TiDB.PdAddr
}
// in tests we may not have a pdCli
if pdCli != nil {
leaderAddrGetter = pdCli.GetLeaderAddr
}
return &PrecheckItemBuilder{
cfg: cfg,
dbMetas: dbMetas,
preInfoGetter: preInfoGetter,
checkpointsDB: checkpointsDB,
cfg: cfg,
dbMetas: dbMetas,
preInfoGetter: preInfoGetter,
checkpointsDB: checkpointsDB,
pdLeaderAddrGetter: leaderAddrGetter,
}
}

Expand Down Expand Up @@ -117,7 +132,7 @@ func (b *PrecheckItemBuilder) BuildPrecheckItem(checkID precheck.CheckItemID) (p
case precheck.CheckLocalTempKVDir:
return NewLocalTempKVDirCheckItem(b.cfg, b.preInfoGetter, b.dbMetas), nil
case precheck.CheckTargetUsingCDCPITR:
return NewCDCPITRCheckItem(b.cfg), nil
return NewCDCPITRCheckItem(b.cfg, b.pdLeaderAddrGetter), nil
default:
return nil, errors.Errorf("unsupported check item: %v", checkID)
}
Expand Down
22 changes: 14 additions & 8 deletions br/pkg/lightning/importer/precheck_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,17 +746,19 @@ func (ci *checkpointCheckItem) checkpointIsValid(ctx context.Context, tableInfo
// CDCPITRCheckItem check downstream has enabled CDC or PiTR. It's exposed to let
// caller override the Instruction message.
type CDCPITRCheckItem struct {
cfg *config.Config
Instruction string
cfg *config.Config
Instruction string
leaderAddrGetter func() string
// used in test
etcdCli *clientv3.Client
}

// NewCDCPITRCheckItem creates a checker to check downstream has enabled CDC or PiTR.
func NewCDCPITRCheckItem(cfg *config.Config) precheck.Checker {
func NewCDCPITRCheckItem(cfg *config.Config, leaderAddrGetter func() string) precheck.Checker {
return &CDCPITRCheckItem{
cfg: cfg,
Instruction: "local backend is not compatible with them. Please switch to tidb backend then try again.",
cfg: cfg,
Instruction: "local backend is not compatible with them. Please switch to tidb backend then try again.",
leaderAddrGetter: leaderAddrGetter,
}
}

Expand All @@ -765,7 +767,11 @@ func (*CDCPITRCheckItem) GetCheckItemID() precheck.CheckItemID {
return precheck.CheckTargetUsingCDCPITR
}

func dialEtcdWithCfg(ctx context.Context, cfg *config.Config) (*clientv3.Client, error) {
func dialEtcdWithCfg(
ctx context.Context,
cfg *config.Config,
leaderAddr string,
) (*clientv3.Client, error) {
cfg2, err := cfg.ToTLS()
if err != nil {
return nil, err
Expand All @@ -774,7 +780,7 @@ func dialEtcdWithCfg(ctx context.Context, cfg *config.Config) (*clientv3.Client,

return clientv3.New(clientv3.Config{
TLS: tlsConfig,
Endpoints: []string{cfg.TiDB.PdAddr},
Endpoints: []string{leaderAddr},
AutoSyncInterval: 30 * time.Second,
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{
Expand All @@ -801,7 +807,7 @@ func (ci *CDCPITRCheckItem) Check(ctx context.Context) (*precheck.CheckResult, e

if ci.etcdCli == nil {
var err error
ci.etcdCli, err = dialEtcdWithCfg(ctx, ci.cfg)
ci.etcdCli, err = dialEtcdWithCfg(ctx, ci.cfg, ci.leaderAddrGetter())
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/importer/precheck_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ func (s *precheckImplSuite) TestCDCPITRCheckItem() {
Backend: config.BackendLocal,
},
}
ci := NewCDCPITRCheckItem(cfg)
ci := NewCDCPITRCheckItem(cfg, nil)
checker := ci.(*CDCPITRCheckItem)
checker.etcdCli = testEtcdCluster.RandClient()
result, err := ci.Check(ctx)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/importer/precheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestPrecheckBuilderBasic(t *testing.T) {

preInfoGetter, err := NewPreImportInfoGetter(cfg, mockSrc.GetAllDBFileMetas(), mockSrc.GetStorage(), mockTarget, nil, nil)
require.NoError(t, err)
theCheckBuilder := NewPrecheckItemBuilder(cfg, mockSrc.GetAllDBFileMetas(), preInfoGetter, nil)
theCheckBuilder := NewPrecheckItemBuilder(cfg, mockSrc.GetAllDBFileMetas(), preInfoGetter, nil, nil)
for _, checkItemID := range []precheck.CheckItemID{
precheck.CheckLargeDataFile,
precheck.CheckSourcePermission,
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/lightning/importer/table_import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1193,7 +1193,7 @@ func (s *tableRestoreSuite) TestCheckClusterResource() {
targetInfoGetter: targetInfoGetter,
srcStorage: mockStore,
}
theCheckBuilder := NewPrecheckItemBuilder(cfg, []*mydump.MDDatabaseMeta{}, preInfoGetter, nil)
theCheckBuilder := NewPrecheckItemBuilder(cfg, []*mydump.MDDatabaseMeta{}, preInfoGetter, nil, nil)
rc := &Controller{
cfg: cfg,
tls: tls,
Expand Down Expand Up @@ -1352,7 +1352,7 @@ func (s *tableRestoreSuite) TestCheckClusterRegion() {
targetInfoGetter: targetInfoGetter,
dbMetas: dbMetas,
}
theCheckBuilder := NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, checkpoints.NewNullCheckpointsDB())
theCheckBuilder := NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, checkpoints.NewNullCheckpointsDB(), nil)
rc := &Controller{
cfg: cfg,
tls: tls,
Expand Down Expand Up @@ -1448,7 +1448,7 @@ func (s *tableRestoreSuite) TestCheckHasLargeCSV() {
for _, ca := range cases {
template := NewSimpleTemplate()
cfg := &config.Config{Mydumper: config.MydumperRuntime{StrictFormat: ca.strictFormat}}
theCheckBuilder := NewPrecheckItemBuilder(cfg, ca.dbMetas, nil, nil)
theCheckBuilder := NewPrecheckItemBuilder(cfg, ca.dbMetas, nil, nil, nil)
rc := &Controller{
cfg: cfg,
checkTemplate: template,
Expand Down
Loading

0 comments on commit 49e9daf

Please sign in to comment.