Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Commit

Permalink
post-restore: add optional level for post-restore operations (#421)
Browse files Browse the repository at this point in the history
* add optional level for opst-restore operations

* trim leading and suffix '"

* use UnmarshalTOML to unmarshal post restore op level

* resolve comments and fix unit test
  • Loading branch information
glorv authored Oct 19, 2020
1 parent da84e5d commit 60c1c27
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 26 deletions.
72 changes: 66 additions & 6 deletions lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,72 @@ type Lightning struct {
CheckRequirements bool `toml:"check-requirements" json:"check-requirements"`
}

type PostOpLevel int

const (
OpLevelOff PostOpLevel = iota
OpLevelOptional
OpLevelRequired
)

func (t *PostOpLevel) UnmarshalTOML(v interface{}) error {
switch val := v.(type) {
case bool:
if val {
*t = OpLevelRequired
} else {
*t = OpLevelOff
}
case string:
return t.FromStringValue(val)
default:
return errors.Errorf("invalid op level '%v', please choose valid option between ['off', 'optional', 'required']", v)
}
return nil
}

// parser command line parameter
func (t *PostOpLevel) FromStringValue(s string) error {
switch strings.ToLower(s) {
case "off", "false":
*t = OpLevelOff
case "required", "true":
*t = OpLevelRequired
case "optional":
*t = OpLevelOptional
default:
return errors.Errorf("invalid op level '%s', please choose valid option between ['off', 'optional', 'required']", s)
}
return nil
}

func (t *PostOpLevel) MarshalJSON() ([]byte, error) {
return []byte(`"` + t.String() + `"`), nil
}

func (t *PostOpLevel) UnmarshalJSON(data []byte) error {
return t.FromStringValue(strings.Trim(string(data), `"`))
}

func (t PostOpLevel) String() string {
switch t {
case OpLevelOff:
return "off"
case OpLevelOptional:
return "optional"
case OpLevelRequired:
return "required"
default:
panic(fmt.Sprintf("invalid post process type '%d'", t))
}
}

// PostRestore has some options which will be executed after kv restored.
type PostRestore struct {
Level1Compact bool `toml:"level-1-compact" json:"level-1-compact"`
Compact bool `toml:"compact" json:"compact"`
Checksum bool `toml:"checksum" json:"checksum"`
Analyze bool `toml:"analyze" json:"analyze"`
Level1Compact bool `toml:"level-1-compact" json:"level-1-compact"`
Compact bool `toml:"compact" json:"compact"`
Checksum PostOpLevel `toml:"checksum" json:"checksum"`
Analyze PostOpLevel `toml:"analyze" json:"analyze"`
}

type CSVConfig struct {
Expand Down Expand Up @@ -286,8 +346,8 @@ func NewConfig() *Config {
RegionSplitSize: SplitRegionSize,
},
PostRestore: PostRestore{
Checksum: true,
Analyze: true,
Checksum: OpLevelRequired,
Analyze: OpLevelOptional,
},
}
}
Expand Down
47 changes: 43 additions & 4 deletions lightning/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,14 +484,14 @@ func (s *configTestSuite) TestLoadConfig(c *C) {
c.Assert(cfg.TiDB.PdAddr, Equals, "172.16.30.11:2379,172.16.30.12:2379")
c.Assert(cfg.Mydumper.SourceDir, Equals, path)
c.Assert(cfg.TikvImporter.Addr, Equals, "172.16.30.11:23008")
c.Assert(cfg.PostRestore.Checksum, IsFalse)
c.Assert(cfg.PostRestore.Analyze, IsTrue)
c.Assert(cfg.PostRestore.Checksum, Equals, config.OpLevelOff)
c.Assert(cfg.PostRestore.Analyze, Equals, config.OpLevelOptional)

taskCfg := config.NewConfig()
err = taskCfg.LoadFromGlobal(cfg)
c.Assert(err, IsNil)
c.Assert(taskCfg.PostRestore.Checksum, IsFalse)
c.Assert(taskCfg.PostRestore.Analyze, IsTrue)
c.Assert(taskCfg.PostRestore.Checksum, Equals, config.OpLevelOff)
c.Assert(taskCfg.PostRestore.Analyze, Equals, config.OpLevelOptional)

taskCfg.Checkpoint.DSN = ""
taskCfg.Checkpoint.Driver = config.CheckpointDriverMySQL
Expand Down Expand Up @@ -543,3 +543,42 @@ func (s *configTestSuite) TestLoadFromInvalidConfig(c *C) {
})
c.Assert(err, ErrorMatches, "Near line 1.*")
}

func (s *configTestSuite) TestTomlPostRestore(c *C) {
cfg := &config.Config{}
err := cfg.LoadFromTOML([]byte(`
[post-restore]
checksum = "req"
`))
c.Assert(err, ErrorMatches, regexp.QuoteMeta("invalid op level 'req', please choose valid option between ['off', 'optional', 'required']"))

err = cfg.LoadFromTOML([]byte(`
[post-restore]
analyze = 123
`))
c.Assert(err, ErrorMatches, regexp.QuoteMeta("invalid op level '123', please choose valid option between ['off', 'optional', 'required']"))

kvMap := map[string]config.PostOpLevel{
`"off"`: config.OpLevelOff,
`"required"`: config.OpLevelRequired,
`"optional"`: config.OpLevelOptional,
"true": config.OpLevelRequired,
"false": config.OpLevelOff,
}

for k, v := range kvMap {
cfg := &config.Config{}
confStr := fmt.Sprintf("[post-restore]\r\nchecksum= %s\r\n", k)
err := cfg.LoadFromTOML([]byte(confStr))
c.Assert(err, IsNil)
c.Assert(cfg.PostRestore.Checksum, Equals, v)
}

for k, v := range kvMap {
cfg := &config.Config{}
confStr := fmt.Sprintf("[post-restore]\r\nanalyze= %s\r\n", k)
err := cfg.LoadFromTOML([]byte(confStr))
c.Assert(err, IsNil)
c.Assert(cfg.PostRestore.Analyze, Equals, v)
}
}
20 changes: 10 additions & 10 deletions lightning/config/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ type GlobalCheckpoint struct {
}

type GlobalPostRestore struct {
Checksum bool `toml:"checksum" json:"checksum"`
Analyze bool `toml:"analyze" json:"analyze"`
Checksum PostOpLevel `toml:"checksum" json:"checksum"`
Analyze PostOpLevel `toml:"analyze" json:"analyze"`
}

func NewGlobalConfig() *GlobalConfig {
Expand All @@ -105,8 +105,8 @@ func NewGlobalConfig() *GlobalConfig {
Backend: "importer",
},
PostRestore: GlobalPostRestore{
Checksum: true,
Analyze: true,
Checksum: OpLevelRequired,
Analyze: OpLevelOptional,
},
}
}
Expand Down Expand Up @@ -156,8 +156,8 @@ func LoadGlobalConfig(args []string, extraFlags func(*flag.FlagSet)) (*GlobalCon
sortedKVDir := fs.String("sorted-kv-dir", "", "path for KV pairs when local backend enabled")
enableCheckpoint := fs.Bool("enable-checkpoint", true, "whether to enable checkpoints")
noSchema := fs.Bool("no-schema", false, "ignore schema files, get schema directly from TiDB instead")
checksum := fs.Bool("checksum", true, "compare checksum after importing")
analyze := fs.Bool("analyze", true, "analyze table after importing")
checksum := flagext.ChoiceVar(fs, "checksum", "", "compare checksum after importing.", "", "required", "optional", "off", "true", "false")
analyze := flagext.ChoiceVar(fs, "analyze", "", "analyze table after importing", "", "required", "optional", "off", "true", "false")
checkRequirements := fs.Bool("check-requirements", true, "check cluster version before starting")
tlsCAPath := fs.String("ca", "", "CA certificate path for TLS connection")
tlsCertPath := fs.String("cert", "", "certificate path for TLS connection")
Expand Down Expand Up @@ -246,11 +246,11 @@ func LoadGlobalConfig(args []string, extraFlags func(*flag.FlagSet)) (*GlobalCon
if *noSchema {
cfg.Mydumper.NoSchema = true
}
if !*checksum {
cfg.PostRestore.Checksum = false
if *checksum != "" {
_ = cfg.PostRestore.Checksum.FromStringValue(*checksum)
}
if !*analyze {
cfg.PostRestore.Analyze = false
if *analyze != "" {
_ = cfg.PostRestore.Analyze.FromStringValue(*analyze)
}
if cfg.App.StatusAddr == "" && cfg.App.PProfPort != 0 {
cfg.App.StatusAddr = fmt.Sprintf(":%d", cfg.App.PProfPort)
Expand Down
18 changes: 16 additions & 2 deletions lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1200,11 +1200,18 @@ func (t *TableRestore) postProcess(ctx context.Context, rc *RestoreController, c

t.logger.Info("local checksum", zap.Object("checksum", &localChecksum))
if cp.Status < CheckpointStatusChecksummed {
if !rc.cfg.PostRestore.Checksum {
if rc.cfg.PostRestore.Checksum == config.OpLevelOff {
t.logger.Info("skip checksum")
rc.saveStatusCheckpoint(t.tableName, WholeTableEngineID, nil, CheckpointStatusChecksumSkipped)
} else {
err := t.compareChecksum(ctx, rc.tidbMgr.db, localChecksum)
// witch post restore level 'optional', we will skip checksum error
if rc.cfg.PostRestore.Checksum == config.OpLevelOptional {
if err != nil {
t.logger.Warn("compare checksum failed, will skip this error and go on", log.ShortError(err))
err = nil
}
}
rc.saveStatusCheckpoint(t.tableName, WholeTableEngineID, err, CheckpointStatusChecksummed)
if err != nil {
return errors.Trace(err)
Expand All @@ -1214,11 +1221,18 @@ func (t *TableRestore) postProcess(ctx context.Context, rc *RestoreController, c

// 5. do table analyze
if cp.Status < CheckpointStatusAnalyzed {
if !rc.cfg.PostRestore.Analyze {
if rc.cfg.PostRestore.Analyze == config.OpLevelOff {
t.logger.Info("skip analyze")
rc.saveStatusCheckpoint(t.tableName, WholeTableEngineID, nil, CheckpointStatusAnalyzeSkipped)
} else {
err := t.analyzeTable(ctx, rc.tidbMgr.db)
// witch post restore level 'optional', we will skip analyze error
if rc.cfg.PostRestore.Analyze == config.OpLevelOptional {
if err != nil {
t.logger.Warn("analyze table failed, will skip this error and go on", log.ShortError(err))
err = nil
}
}
rc.saveStatusCheckpoint(t.tableName, WholeTableEngineID, err, CheckpointStatusAnalyzed)
if err != nil {
return errors.Trace(err)
Expand Down
15 changes: 11 additions & 4 deletions tidb-lightning.toml
Original file line number Diff line number Diff line change
Expand Up @@ -228,16 +228,23 @@ checksum-table-concurrency = 16
# post-restore provide some options which will be executed after all kv data has been imported into the tikv cluster.
# the execution order are(if set true): checksum -> analyze
[post-restore]
# if set true, checksum will do ADMIN CHECKSUM TABLE <table> for each table.
checksum = true
# config whether to do `ADMIN CHECKSUM TABLE <table>` after restore finished for each table.
# valid options:
# - "off". do not do checksum.
# - "optional". do execute admin checksum, but will ignore any error if checksum fails.
# - "required". default option. do execute admin checksum, if checksum fails, lightning will exit with failure.
# NOTE: for backward compatibility, bool values `true` and `false` is also allowed for this field. `true` is
# equivalent to "required" and `false` is equivalent to "off".
checksum = "required"
# if set true, analyze will do `ANALYZE TABLE <table>` for each table.
# the config options is the same as 'post-restore.checksum'.
analyze = "optional"
# if set to true, compact will do level 1 compaction to tikv data.
# if this setting is missing, the default value is false.
level-1-compact = false
# if set true, compact will do full compaction to tikv data.
# if this setting is missing, the default value is false.
compact = false
# if set true, analyze will do ANALYZE TABLE <table> for each table.
analyze = true

# cron performs some periodic actions in background
[cron]
Expand Down

0 comments on commit 60c1c27

Please sign in to comment.