Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

post-restore: add optional level for post-restore operations #421

Merged
merged 4 commits into from
Oct 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 66 additions & 6 deletions lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,72 @@ type Lightning struct {
CheckRequirements bool `toml:"check-requirements" json:"check-requirements"`
}

type PostOpLevel int

const (
OpLevelOff PostOpLevel = iota
OpLevelOptional
OpLevelRequired
)

func (t *PostOpLevel) UnmarshalTOML(v interface{}) error {
switch val := v.(type) {
case bool:
if val {
*t = OpLevelRequired
} else {
*t = OpLevelOff
}
case string:
return t.FromStringValue(val)
default:
return errors.Errorf("invalid op level '%v', please choose valid option between ['off', 'optional', 'required']", v)
}
return nil
}

// parser command line parameter
func (t *PostOpLevel) FromStringValue(s string) error {
switch strings.ToLower(s) {
case "off", "false":
*t = OpLevelOff
case "required", "true":
*t = OpLevelRequired
case "optional":
*t = OpLevelOptional
default:
return errors.Errorf("invalid op level '%s', please choose valid option between ['off', 'optional', 'required']", s)
}
return nil
}

func (t *PostOpLevel) MarshalJSON() ([]byte, error) {
return []byte(`"` + t.String() + `"`), nil
}

func (t *PostOpLevel) UnmarshalJSON(data []byte) error {
return t.FromStringValue(strings.Trim(string(data), `"`))
}

func (t PostOpLevel) String() string {
switch t {
case OpLevelOff:
return "off"
case OpLevelOptional:
return "optional"
case OpLevelRequired:
return "required"
default:
panic(fmt.Sprintf("invalid post process type '%d'", t))
}
}

// PostRestore has some options which will be executed after kv restored.
type PostRestore struct {
Level1Compact bool `toml:"level-1-compact" json:"level-1-compact"`
Compact bool `toml:"compact" json:"compact"`
Checksum bool `toml:"checksum" json:"checksum"`
Analyze bool `toml:"analyze" json:"analyze"`
Level1Compact bool `toml:"level-1-compact" json:"level-1-compact"`
Compact bool `toml:"compact" json:"compact"`
Checksum PostOpLevel `toml:"checksum" json:"checksum"`
Analyze PostOpLevel `toml:"analyze" json:"analyze"`
}

type CSVConfig struct {
Expand Down Expand Up @@ -286,8 +346,8 @@ func NewConfig() *Config {
RegionSplitSize: SplitRegionSize,
},
PostRestore: PostRestore{
Checksum: true,
Analyze: true,
Checksum: OpLevelRequired,
Analyze: OpLevelOptional,
},
}
}
Expand Down
47 changes: 43 additions & 4 deletions lightning/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,14 +484,14 @@ func (s *configTestSuite) TestLoadConfig(c *C) {
c.Assert(cfg.TiDB.PdAddr, Equals, "172.16.30.11:2379,172.16.30.12:2379")
c.Assert(cfg.Mydumper.SourceDir, Equals, path)
c.Assert(cfg.TikvImporter.Addr, Equals, "172.16.30.11:23008")
c.Assert(cfg.PostRestore.Checksum, IsFalse)
c.Assert(cfg.PostRestore.Analyze, IsTrue)
c.Assert(cfg.PostRestore.Checksum, Equals, config.OpLevelOff)
c.Assert(cfg.PostRestore.Analyze, Equals, config.OpLevelOptional)

taskCfg := config.NewConfig()
err = taskCfg.LoadFromGlobal(cfg)
c.Assert(err, IsNil)
c.Assert(taskCfg.PostRestore.Checksum, IsFalse)
c.Assert(taskCfg.PostRestore.Analyze, IsTrue)
c.Assert(taskCfg.PostRestore.Checksum, Equals, config.OpLevelOff)
c.Assert(taskCfg.PostRestore.Analyze, Equals, config.OpLevelOptional)

taskCfg.Checkpoint.DSN = ""
taskCfg.Checkpoint.Driver = config.CheckpointDriverMySQL
Expand Down Expand Up @@ -543,3 +543,42 @@ func (s *configTestSuite) TestLoadFromInvalidConfig(c *C) {
})
c.Assert(err, ErrorMatches, "Near line 1.*")
}

func (s *configTestSuite) TestTomlPostRestore(c *C) {
cfg := &config.Config{}
err := cfg.LoadFromTOML([]byte(`
[post-restore]
checksum = "req"
`))
c.Assert(err, ErrorMatches, regexp.QuoteMeta("invalid op level 'req', please choose valid option between ['off', 'optional', 'required']"))

err = cfg.LoadFromTOML([]byte(`
[post-restore]
analyze = 123
`))
c.Assert(err, ErrorMatches, regexp.QuoteMeta("invalid op level '123', please choose valid option between ['off', 'optional', 'required']"))

kvMap := map[string]config.PostOpLevel{
`"off"`: config.OpLevelOff,
`"required"`: config.OpLevelRequired,
`"optional"`: config.OpLevelOptional,
"true": config.OpLevelRequired,
"false": config.OpLevelOff,
}

for k, v := range kvMap {
cfg := &config.Config{}
confStr := fmt.Sprintf("[post-restore]\r\nchecksum= %s\r\n", k)
err := cfg.LoadFromTOML([]byte(confStr))
c.Assert(err, IsNil)
c.Assert(cfg.PostRestore.Checksum, Equals, v)
}

for k, v := range kvMap {
cfg := &config.Config{}
confStr := fmt.Sprintf("[post-restore]\r\nanalyze= %s\r\n", k)
err := cfg.LoadFromTOML([]byte(confStr))
c.Assert(err, IsNil)
c.Assert(cfg.PostRestore.Analyze, Equals, v)
}
}
20 changes: 10 additions & 10 deletions lightning/config/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ type GlobalCheckpoint struct {
}

type GlobalPostRestore struct {
Checksum bool `toml:"checksum" json:"checksum"`
Analyze bool `toml:"analyze" json:"analyze"`
Checksum PostOpLevel `toml:"checksum" json:"checksum"`
Analyze PostOpLevel `toml:"analyze" json:"analyze"`
}

func NewGlobalConfig() *GlobalConfig {
Expand All @@ -105,8 +105,8 @@ func NewGlobalConfig() *GlobalConfig {
Backend: "importer",
},
PostRestore: GlobalPostRestore{
Checksum: true,
Analyze: true,
Checksum: OpLevelRequired,
Analyze: OpLevelOptional,
},
}
}
Expand Down Expand Up @@ -156,8 +156,8 @@ func LoadGlobalConfig(args []string, extraFlags func(*flag.FlagSet)) (*GlobalCon
sortedKVDir := fs.String("sorted-kv-dir", "", "path for KV pairs when local backend enabled")
enableCheckpoint := fs.Bool("enable-checkpoint", true, "whether to enable checkpoints")
noSchema := fs.Bool("no-schema", false, "ignore schema files, get schema directly from TiDB instead")
checksum := fs.Bool("checksum", true, "compare checksum after importing")
analyze := fs.Bool("analyze", true, "analyze table after importing")
checksum := flagext.ChoiceVar(fs, "checksum", "", "compare checksum after importing.", "", "required", "optional", "off", "true", "false")
analyze := flagext.ChoiceVar(fs, "analyze", "", "analyze table after importing", "", "required", "optional", "off", "true", "false")
checkRequirements := fs.Bool("check-requirements", true, "check cluster version before starting")
tlsCAPath := fs.String("ca", "", "CA certificate path for TLS connection")
tlsCertPath := fs.String("cert", "", "certificate path for TLS connection")
Expand Down Expand Up @@ -246,11 +246,11 @@ func LoadGlobalConfig(args []string, extraFlags func(*flag.FlagSet)) (*GlobalCon
if *noSchema {
cfg.Mydumper.NoSchema = true
}
if !*checksum {
cfg.PostRestore.Checksum = false
if *checksum != "" {
_ = cfg.PostRestore.Checksum.FromStringValue(*checksum)
}
if !*analyze {
cfg.PostRestore.Analyze = false
if *analyze != "" {
_ = cfg.PostRestore.Analyze.FromStringValue(*analyze)
}
if cfg.App.StatusAddr == "" && cfg.App.PProfPort != 0 {
cfg.App.StatusAddr = fmt.Sprintf(":%d", cfg.App.PProfPort)
Expand Down
18 changes: 16 additions & 2 deletions lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1200,11 +1200,18 @@ func (t *TableRestore) postProcess(ctx context.Context, rc *RestoreController, c

t.logger.Info("local checksum", zap.Object("checksum", &localChecksum))
if cp.Status < CheckpointStatusChecksummed {
if !rc.cfg.PostRestore.Checksum {
if rc.cfg.PostRestore.Checksum == config.OpLevelOff {
t.logger.Info("skip checksum")
rc.saveStatusCheckpoint(t.tableName, WholeTableEngineID, nil, CheckpointStatusChecksumSkipped)
} else {
err := t.compareChecksum(ctx, rc.tidbMgr.db, localChecksum)
// witch post restore level 'optional', we will skip checksum error
if rc.cfg.PostRestore.Checksum == config.OpLevelOptional {
if err != nil {
t.logger.Warn("compare checksum failed, will skip this error and go on", log.ShortError(err))
err = nil
}
}
rc.saveStatusCheckpoint(t.tableName, WholeTableEngineID, err, CheckpointStatusChecksummed)
if err != nil {
return errors.Trace(err)
Expand All @@ -1214,11 +1221,18 @@ func (t *TableRestore) postProcess(ctx context.Context, rc *RestoreController, c

// 5. do table analyze
if cp.Status < CheckpointStatusAnalyzed {
if !rc.cfg.PostRestore.Analyze {
if rc.cfg.PostRestore.Analyze == config.OpLevelOff {
t.logger.Info("skip analyze")
rc.saveStatusCheckpoint(t.tableName, WholeTableEngineID, nil, CheckpointStatusAnalyzeSkipped)
} else {
err := t.analyzeTable(ctx, rc.tidbMgr.db)
// witch post restore level 'optional', we will skip analyze error
if rc.cfg.PostRestore.Analyze == config.OpLevelOptional {
if err != nil {
t.logger.Warn("analyze table failed, will skip this error and go on", log.ShortError(err))
err = nil
}
}
rc.saveStatusCheckpoint(t.tableName, WholeTableEngineID, err, CheckpointStatusAnalyzed)
if err != nil {
return errors.Trace(err)
Expand Down
15 changes: 11 additions & 4 deletions tidb-lightning.toml
Original file line number Diff line number Diff line change
Expand Up @@ -228,16 +228,23 @@ checksum-table-concurrency = 16
# post-restore provide some options which will be executed after all kv data has been imported into the tikv cluster.
# the execution order are(if set true): checksum -> analyze
[post-restore]
# if set true, checksum will do ADMIN CHECKSUM TABLE <table> for each table.
checksum = true
# config whether to do `ADMIN CHECKSUM TABLE <table>` after restore finished for each table.
# valid options:
# - "off". do not do checksum.
# - "optional". do execute admin checksum, but will ignore any error if checksum fails.
# - "required". default option. do execute admin checksum, if checksum fails, lightning will exit with failure.
# NOTE: for backward compatibility, bool values `true` and `false` is also allowed for this field. `true` is
# equivalent to "required" and `false` is equivalent to "off".
checksum = "required"
# if set true, analyze will do `ANALYZE TABLE <table>` for each table.
# the config options is the same as 'post-restore.checksum'.
analyze = "optional"
# if set to true, compact will do level 1 compaction to tikv data.
# if this setting is missing, the default value is false.
level-1-compact = false
# if set true, compact will do full compaction to tikv data.
# if this setting is missing, the default value is false.
compact = false
# if set true, analyze will do ANALYZE TABLE <table> for each table.
analyze = true

# cron performs some periodic actions in background
[cron]
Expand Down