diff --git a/cmd/backup.go b/cmd/backup.go index 73ae6106f..39aa4fd28 100644 --- a/cmd/backup.go +++ b/cmd/backup.go @@ -1,176 +1,21 @@ package cmd import ( - "context" - - "github.com/pingcap/errors" - "github.com/pingcap/log" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/session" "github.com/spf13/cobra" - "github.com/spf13/pflag" - "github.com/pingcap/br/pkg/backup" - "github.com/pingcap/br/pkg/storage" "github.com/pingcap/br/pkg/summary" + "github.com/pingcap/br/pkg/task" "github.com/pingcap/br/pkg/utils" ) -const ( - flagBackupTimeago = "timeago" - flagBackupRateLimit = "ratelimit" - flagBackupRateLimitUnit = "ratelimit-unit" - flagBackupConcurrency = "concurrency" - flagBackupChecksum = "checksum" - flagLastBackupTS = "lastbackupts" -) - -func defineBackupFlags(flagSet *pflag.FlagSet) { - flagSet.StringP( - flagBackupTimeago, "", "", - "The history version of the backup task, e.g. 1m, 1h. Do not exceed GCSafePoint") - flagSet.Uint64P( - flagBackupRateLimit, "", 0, "The rate limit of the backup task, MB/s per node") - flagSet.Uint32P( - flagBackupConcurrency, "", 4, "The size of thread pool on each node that execute the backup task") - flagSet.BoolP(flagBackupChecksum, "", true, - "Run checksum after backup") - flagSet.Uint64P(flagLastBackupTS, "", 0, "the last time backup ts") - _ = flagSet.MarkHidden(flagLastBackupTS) - - // Test only flag. - flagSet.Uint64P( - flagBackupRateLimitUnit, "", utils.MB, "The unit of rate limit of the backup task") - _ = flagSet.MarkHidden(flagBackupRateLimitUnit) -} - -func runBackup(flagSet *pflag.FlagSet, cmdName, db, table string) error { - ctx, cancel := context.WithCancel(defaultContext) - defer cancel() - - mgr, err := GetDefaultMgr() - if err != nil { - return err - } - defer mgr.Close() - - timeago, err := flagSet.GetString(flagBackupTimeago) - if err != nil { - return err - } - - ratelimit, err := flagSet.GetUint64(flagBackupRateLimit) - if err != nil { - return err - } - ratelimitUnit, err := flagSet.GetUint64(flagBackupRateLimitUnit) - if err != nil { - return err - } - ratelimit *= ratelimitUnit - - concurrency, err := flagSet.GetUint32(flagBackupConcurrency) - if err != nil { - return err - } - if concurrency == 0 { - err = errors.New("at least one thread required") +func runBackupCommand(command *cobra.Command, cmdName string) error { + cfg := task.BackupConfig{Config: task.Config{LogProgress: HasLogFile()}} + if err := cfg.ParseFromFlags(command.Flags()); err != nil { return err } - - checksum, err := flagSet.GetBool(flagBackupChecksum) - if err != nil { - return err - } - - lastBackupTS, err := flagSet.GetUint64(flagLastBackupTS) - if err != nil { - return nil - } - - u, err := storage.ParseBackendFromFlags(flagSet, FlagStorage) - if err != nil { - return err - } - - client, err := backup.NewBackupClient(ctx, mgr) - if err != nil { - return nil - } - - err = client.SetStorage(ctx, u) - if err != nil { - return err - } - - backupTS, err := client.GetTS(ctx, timeago) - if err != nil { - return err - } - - defer summary.Summary(cmdName) - - ranges, backupSchemas, err := backup.BuildBackupRangeAndSchema( - mgr.GetDomain(), mgr.GetTiKV(), backupTS, db, table) - if err != nil { - return err - } - - // The number of regions need to backup - approximateRegions := 0 - for _, r := range ranges { - var regionCount int - regionCount, err = mgr.GetRegionCount(ctx, r.StartKey, r.EndKey) - if err != nil { - return err - } - approximateRegions += regionCount - } - - summary.CollectInt("backup total regions", approximateRegions) - // Backup - // Redirect to log if there is no log file to avoid unreadable output. - updateCh := utils.StartProgress( - ctx, cmdName, int64(approximateRegions), !HasLogFile()) - err = client.BackupRanges( - ctx, ranges, lastBackupTS, backupTS, ratelimit, concurrency, updateCh) - if err != nil { - return err - } - // Backup has finished - close(updateCh) - - // Checksum - backupSchemasConcurrency := backup.DefaultSchemaConcurrency - if backupSchemas.Len() < backupSchemasConcurrency { - backupSchemasConcurrency = backupSchemas.Len() - } - updateCh = utils.StartProgress( - ctx, "Checksum", int64(backupSchemas.Len()), !HasLogFile()) - backupSchemas.SetSkipChecksum(!checksum) - backupSchemas.Start( - ctx, mgr.GetTiKV(), backupTS, uint(backupSchemasConcurrency), updateCh) - - err = client.CompleteMeta(backupSchemas) - if err != nil { - return err - } - - valid, err := client.FastChecksum() - if err != nil { - return err - } - if !valid { - log.Error("backup FastChecksum failed!") - } - // Checksum has finished - close(updateCh) - - err = client.SaveBackupMeta(ctx) - if err != nil { - return err - } - return nil + return task.RunBackup(GetDefaultContext(), cmdName, &cfg) } // NewBackupCommand return a full backup subcommand. @@ -200,7 +45,7 @@ func NewBackupCommand() *cobra.Command { newTableBackupCommand(), ) - defineBackupFlags(command.PersistentFlags()) + task.DefineBackupFlags(command.PersistentFlags()) return command } @@ -211,7 +56,7 @@ func newFullBackupCommand() *cobra.Command { Short: "backup all database", RunE: func(command *cobra.Command, _ []string) error { // empty db/table means full backup. - return runBackup(command.Flags(), "Full backup", "", "") + return runBackupCommand(command, "Full backup") }, } return command @@ -223,19 +68,10 @@ func newDbBackupCommand() *cobra.Command { Use: "db", Short: "backup a database", RunE: func(command *cobra.Command, _ []string) error { - db, err := command.Flags().GetString(flagDatabase) - if err != nil { - return err - } - if len(db) == 0 { - return errors.Errorf("empty database name is not allowed") - } - return runBackup(command.Flags(), "Database backup", db, "") + return runBackupCommand(command, "Database backup") }, } - command.Flags().StringP(flagDatabase, "", "", "backup a table in the specific db") - _ = command.MarkFlagRequired(flagDatabase) - + task.DefineDatabaseFlags(command) return command } @@ -245,26 +81,9 @@ func newTableBackupCommand() *cobra.Command { Use: "table", Short: "backup a table", RunE: func(command *cobra.Command, _ []string) error { - db, err := command.Flags().GetString(flagDatabase) - if err != nil { - return err - } - if len(db) == 0 { - return errors.Errorf("empty database name is not allowed") - } - table, err := command.Flags().GetString(flagTable) - if err != nil { - return err - } - if len(table) == 0 { - return errors.Errorf("empty table name is not allowed") - } - return runBackup(command.Flags(), "Table backup", db, table) + return runBackupCommand(command, "Table backup") }, } - command.Flags().StringP(flagDatabase, "", "", "backup a table in the specific db") - command.Flags().StringP(flagTable, "t", "", "backup the specific table") - _ = command.MarkFlagRequired(flagDatabase) - _ = command.MarkFlagRequired(flagTable) + task.DefineTableFlags(command) return command } diff --git a/cmd/cmd.go b/cmd/cmd.go index 468c35232..fdadaa6f8 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -2,47 +2,28 @@ package cmd import ( "context" - "fmt" "net/http" "net/http/pprof" "sync" "sync/atomic" - "github.com/pingcap/errors" "github.com/pingcap/log" - "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/util/logutil" "github.com/sirupsen/logrus" "github.com/spf13/cobra" "go.uber.org/zap" - "github.com/pingcap/br/pkg/conn" - "github.com/pingcap/br/pkg/storage" + "github.com/pingcap/br/pkg/task" "github.com/pingcap/br/pkg/utils" ) var ( initOnce = sync.Once{} defaultContext context.Context - pdAddress string hasLogFile uint64 - - connOnce = sync.Once{} - defaultMgr *conn.Mgr ) const ( - // FlagPD is the name of url flag. - FlagPD = "pd" - // FlagCA is the name of CA flag. - FlagCA = "ca" - // FlagCert is the name of cert flag. - FlagCert = "cert" - // FlagKey is the name of key flag. - FlagKey = "key" - // FlagStorage is the name of storage flag. - FlagStorage = "storage" // FlagLogLevel is the name of log-level flag. FlagLogLevel = "log-level" // FlagLogFile is the name of log-file flag. @@ -52,9 +33,6 @@ const ( // FlagSlowLogFile is the name of slow-log-file flag. FlagSlowLogFile = "slow-log-file" - flagDatabase = "db" - flagTable = "table" - flagVersion = "version" flagVersionShort = "V" ) @@ -65,19 +43,13 @@ func AddFlags(cmd *cobra.Command) { cmd.Flags().BoolP(flagVersion, flagVersionShort, false, "Display version information about BR") cmd.SetVersionTemplate("{{printf \"%s\" .Version}}\n") - cmd.PersistentFlags().StringP(FlagPD, "u", "127.0.0.1:2379", "PD address") - cmd.PersistentFlags().String(FlagCA, "", "CA certificate path for TLS connection") - cmd.PersistentFlags().String(FlagCert, "", "Certificate path for TLS connection") - cmd.PersistentFlags().String(FlagKey, "", "Private key path for TLS connection") - cmd.PersistentFlags().StringP(FlagStorage, "s", "", - `specify the url where backup storage, eg, "local:///path/to/save"`) cmd.PersistentFlags().StringP(FlagLogLevel, "L", "info", "Set the log level") cmd.PersistentFlags().String(FlagLogFile, "", "Set the log file path. If not set, logs will output to stdout") cmd.PersistentFlags().String(FlagStatusAddr, "", "Set the HTTP listening address for the status report service. Set to empty string to disable") - storage.DefineFlags(cmd.PersistentFlags()) + task.DefineCommonFlags(cmd.PersistentFlags()) cmd.PersistentFlags().StringP(FlagSlowLogFile, "", "", "Set the slow log file path. If not set, discard slow logs") @@ -140,12 +112,6 @@ func Init(cmd *cobra.Command) (err error) { } } }() - // Set the PD server address. - pdAddress, e = cmd.Flags().GetString(FlagPD) - if e != nil { - err = e - return - } }) return err } @@ -155,30 +121,6 @@ func HasLogFile() bool { return atomic.LoadUint64(&hasLogFile) != uint64(0) } -// GetDefaultMgr returns the default mgr for command line usage. -func GetDefaultMgr() (*conn.Mgr, error) { - if pdAddress == "" { - return nil, errors.New("pd address can not be empty") - } - - // Lazy initialize and defaultMgr - var err error - connOnce.Do(func() { - var storage kv.Storage - storage, err = tikv.Driver{}.Open( - // Disable GC because TiDB enables GC already. - fmt.Sprintf("tikv://%s?disableGC=true", pdAddress)) - if err != nil { - return - } - defaultMgr, err = conn.NewMgr(defaultContext, pdAddress, storage.(tikv.Storage)) - }) - if err != nil { - return nil, err - } - return defaultMgr, nil -} - // SetDefaultContext sets the default context for command line usage. func SetDefaultContext(ctx context.Context) { defaultContext = ctx diff --git a/cmd/restore.go b/cmd/restore.go index 4f66e47de..904bdd6d5 100644 --- a/cmd/restore.go +++ b/cmd/restore.go @@ -1,33 +1,20 @@ package cmd import ( - "context" - "strings" - - "github.com/gogo/protobuf/proto" - "github.com/pingcap/errors" - "github.com/pingcap/kvproto/pkg/backup" - "github.com/pingcap/log" "github.com/pingcap/tidb/session" "github.com/spf13/cobra" - flag "github.com/spf13/pflag" - "go.uber.org/zap" - "github.com/pingcap/br/pkg/conn" - "github.com/pingcap/br/pkg/restore" - "github.com/pingcap/br/pkg/storage" "github.com/pingcap/br/pkg/summary" + "github.com/pingcap/br/pkg/task" "github.com/pingcap/br/pkg/utils" ) -var schedulers = map[string]struct{}{ - "balance-leader-scheduler": {}, - "balance-hot-region-scheduler": {}, - "balance-region-scheduler": {}, - - "shuffle-leader-scheduler": {}, - "shuffle-region-scheduler": {}, - "shuffle-hot-region-scheduler": {}, +func runRestoreCommand(command *cobra.Command, cmdName string) error { + cfg := task.RestoreConfig{Config: task.Config{LogProgress: HasLogFile()}} + if err := cfg.ParseFromFlags(command.Flags()); err != nil { + return err + } + return task.RunRestore(GetDefaultContext(), cmdName, &cfg) } // NewRestoreCommand returns a restore subcommand @@ -54,172 +41,17 @@ func NewRestoreCommand() *cobra.Command { newDbRestoreCommand(), newTableRestoreCommand(), ) - - command.PersistentFlags().Uint("concurrency", 128, - "The size of thread pool that execute the restore task") - command.PersistentFlags().Uint64("ratelimit", 0, - "The rate limit of the restore task, MB/s per node. Set to 0 for unlimited speed.") - command.PersistentFlags().BoolP("checksum", "", true, - "Run checksum after restore") - command.PersistentFlags().BoolP("online", "", false, - "Whether online when restore") - // TODO remove hidden flag if it's stable - _ = command.PersistentFlags().MarkHidden("online") + task.DefineBackupFlags(command.PersistentFlags()) return command } -func runRestore(flagSet *flag.FlagSet, cmdName, dbName, tableName string) error { - ctx, cancel := context.WithCancel(GetDefaultContext()) - defer cancel() - - mgr, err := GetDefaultMgr() - if err != nil { - return err - } - defer mgr.Close() - - client, err := restore.NewRestoreClient( - ctx, mgr.GetPDClient(), mgr.GetTiKV()) - if err != nil { - return errors.Trace(err) - } - defer client.Close() - err = initRestoreClient(ctx, client, flagSet) - if err != nil { - return errors.Trace(err) - } - - files := make([]*backup.File, 0) - tables := make([]*utils.Table, 0) - - defer summary.Summary(cmdName) - - switch { - case len(dbName) == 0 && len(tableName) == 0: - // full restore - for _, db := range client.GetDatabases() { - err = client.CreateDatabase(db.Schema) - if err != nil { - return errors.Trace(err) - } - for _, table := range db.Tables { - files = append(files, table.Files...) - } - tables = append(tables, db.Tables...) - } - case len(dbName) != 0 && len(tableName) == 0: - // database restore - db := client.GetDatabase(dbName) - if db == nil { - return errors.Errorf("database %s not found in backup", dbName) - } - err = client.CreateDatabase(db.Schema) - if err != nil { - return errors.Trace(err) - } - for _, table := range db.Tables { - files = append(files, table.Files...) - } - tables = db.Tables - case len(dbName) != 0 && len(tableName) != 0: - // table restore - db := client.GetDatabase(dbName) - if db == nil { - return errors.Errorf("database %s not found in backup", dbName) - } - err = client.CreateDatabase(db.Schema) - if err != nil { - return errors.Trace(err) - } - table := db.GetTable(tableName) - files = table.Files - tables = append(tables, table) - default: - return errors.New("must set db when table was set") - } - var newTS uint64 - if client.IsIncremental() { - newTS, err = client.GetTS(ctx) - if err != nil { - return err - } - } - summary.CollectInt("restore files", len(files)) - rewriteRules, newTables, err := client.CreateTables(mgr.GetDomain(), tables, newTS) - if err != nil { - return errors.Trace(err) - } - ranges, err := restore.ValidateFileRanges(files, rewriteRules) - if err != nil { - return err - } - summary.CollectInt("restore ranges", len(ranges)) - - // Redirect to log if there is no log file to avoid unreadable output. - updateCh := utils.StartProgress( - ctx, - cmdName, - // Split/Scatter + Download/Ingest - int64(len(ranges)+len(files)), - !HasLogFile()) - - err = restore.SplitRanges(ctx, client, ranges, rewriteRules, updateCh) - if err != nil { - log.Error("split regions failed", zap.Error(err)) - return errors.Trace(err) - } - - if !client.IsIncremental() { - var pdAddr string - pdAddr, err = flagSet.GetString(FlagPD) - if err != nil { - return errors.Trace(err) - } - pdAddrs := strings.Split(pdAddr, ",") - err = client.ResetTS(pdAddrs) - if err != nil { - log.Error("reset pd TS failed", zap.Error(err)) - return errors.Trace(err) - } - } - - removedSchedulers, err := RestorePrepareWork(ctx, client, mgr) - if err != nil { - return errors.Trace(err) - } - - err = client.RestoreAll(rewriteRules, updateCh) - if err != nil { - return errors.Trace(err) - } - - err = RestorePostWork(ctx, client, mgr, removedSchedulers) - if err != nil { - return errors.Trace(err) - } - // Restore has finished. - close(updateCh) - - // Checksum - updateCh = utils.StartProgress( - ctx, "Checksum", int64(len(newTables)), !HasLogFile()) - err = client.ValidateChecksum( - ctx, mgr.GetTiKV().GetClient(), tables, newTables, updateCh) - if err != nil { - return err - } - close(updateCh) - - return nil -} - func newFullRestoreCommand() *cobra.Command { command := &cobra.Command{ Use: "full", Short: "restore all tables", RunE: func(cmd *cobra.Command, _ []string) error { - return runRestore(cmd.Flags(), "Full Restore", "", "") + return runRestoreCommand(cmd, "Full restore") }, } return command @@ -230,18 +62,10 @@ func newDbRestoreCommand() *cobra.Command { Use: "db", Short: "restore tables in a database", RunE: func(cmd *cobra.Command, _ []string) error { - db, err := cmd.Flags().GetString(flagDatabase) - if err != nil { - return err - } - if len(db) == 0 { - return errors.New("empty database name is not allowed") - } - return runRestore(cmd.Flags(), "Database Restore", db, "") + return runRestoreCommand(cmd, "Database restore") }, } - command.Flags().String(flagDatabase, "", "database name") - _ = command.MarkFlagRequired(flagDatabase) + task.DefineDatabaseFlags(command) return command } @@ -250,129 +74,9 @@ func newTableRestoreCommand() *cobra.Command { Use: "table", Short: "restore a table", RunE: func(cmd *cobra.Command, _ []string) error { - db, err := cmd.Flags().GetString(flagDatabase) - if err != nil { - return err - } - if len(db) == 0 { - return errors.New("empty database name is not allowed") - } - table, err := cmd.Flags().GetString(flagTable) - if err != nil { - return err - } - if len(table) == 0 { - return errors.New("empty table name is not allowed") - } - return runRestore(cmd.Flags(), "Table Restore", db, table) + return runRestoreCommand(cmd, "Table restore") }, } - - command.Flags().String(flagDatabase, "", "database name") - command.Flags().String(flagTable, "", "table name") - - _ = command.MarkFlagRequired(flagDatabase) - _ = command.MarkFlagRequired(flagTable) + task.DefineTableFlags(command) return command } - -func initRestoreClient(ctx context.Context, client *restore.Client, flagSet *flag.FlagSet) error { - u, err := storage.ParseBackendFromFlags(flagSet, FlagStorage) - if err != nil { - return err - } - rateLimit, err := flagSet.GetUint64("ratelimit") - if err != nil { - return err - } - client.SetRateLimit(rateLimit * utils.MB) - s, err := storage.Create(ctx, u) - if err != nil { - return errors.Trace(err) - } - metaData, err := s.Read(ctx, utils.MetaFile) - if err != nil { - return errors.Trace(err) - } - backupMeta := &backup.BackupMeta{} - err = proto.Unmarshal(metaData, backupMeta) - if err != nil { - return errors.Trace(err) - } - err = client.InitBackupMeta(backupMeta, u) - if err != nil { - return errors.Trace(err) - } - - concurrency, err := flagSet.GetUint("concurrency") - if err != nil { - return err - } - client.SetConcurrency(concurrency) - - isOnline, err := flagSet.GetBool("online") - if err != nil { - return err - } - if isOnline { - client.EnableOnline() - } - - return nil -} - -// RestorePrepareWork execute some prepare work before restore -func RestorePrepareWork(ctx context.Context, client *restore.Client, mgr *conn.Mgr) ([]string, error) { - if client.IsOnline() { - return nil, nil - } - err := client.SwitchToImportMode(ctx) - if err != nil { - return nil, errors.Trace(err) - } - existSchedulers, err := mgr.ListSchedulers(ctx) - if err != nil { - return nil, errors.Trace(err) - } - needRemoveSchedulers := make([]string, 0, len(existSchedulers)) - for _, s := range existSchedulers { - if _, ok := schedulers[s]; ok { - needRemoveSchedulers = append(needRemoveSchedulers, s) - } - } - return removePDLeaderScheduler(ctx, mgr, needRemoveSchedulers) -} - -func removePDLeaderScheduler(ctx context.Context, mgr *conn.Mgr, existSchedulers []string) ([]string, error) { - removedSchedulers := make([]string, 0, len(existSchedulers)) - for _, scheduler := range existSchedulers { - err := mgr.RemoveScheduler(ctx, scheduler) - if err != nil { - return nil, err - } - removedSchedulers = append(removedSchedulers, scheduler) - } - return removedSchedulers, nil -} - -// RestorePostWork execute some post work after restore -func RestorePostWork(ctx context.Context, client *restore.Client, mgr *conn.Mgr, removedSchedulers []string) error { - if client.IsOnline() { - return nil - } - err := client.SwitchToNormalMode(ctx) - if err != nil { - return errors.Trace(err) - } - return addPDLeaderScheduler(ctx, mgr, removedSchedulers) -} - -func addPDLeaderScheduler(ctx context.Context, mgr *conn.Mgr, removedSchedulers []string) error { - for _, scheduler := range removedSchedulers { - err := mgr.AddScheduler(ctx, scheduler) - if err != nil { - return err - } - } - return nil -} diff --git a/cmd/validate.go b/cmd/validate.go index 8ba72b372..559cb9983 100644 --- a/cmd/validate.go +++ b/cmd/validate.go @@ -19,7 +19,7 @@ import ( "go.uber.org/zap" "github.com/pingcap/br/pkg/restore" - "github.com/pingcap/br/pkg/storage" + "github.com/pingcap/br/pkg/task" "github.com/pingcap/br/pkg/utils" ) @@ -54,24 +54,14 @@ func newCheckSumCommand() *cobra.Command { ctx, cancel := context.WithCancel(GetDefaultContext()) defer cancel() - u, err := storage.ParseBackendFromFlags(cmd.Flags(), FlagStorage) - if err != nil { + var cfg task.Config + if err := cfg.ParseFromFlags(cmd.Flags()); err != nil { return err } - s, err := storage.Create(ctx, u) - if err != nil { - return errors.Trace(err) - } - - metaData, err := s.Read(ctx, utils.MetaFile) - if err != nil { - return errors.Trace(err) - } - backupMeta := &backup.BackupMeta{} - err = proto.Unmarshal(metaData, backupMeta) + _, s, backupMeta, err := task.ReadBackupMeta(ctx, &cfg) if err != nil { - return errors.Trace(err) + return err } dbs, err := utils.LoadBackupTables(backupMeta) @@ -152,24 +142,14 @@ func newBackupMetaCommand() *cobra.Command { if err != nil { return err } - u, err := storage.ParseBackendFromFlags(cmd.Flags(), FlagStorage) - if err != nil { - return err - } - s, err := storage.Create(ctx, u) - if err != nil { - log.Error("create storage failed", zap.Error(err)) - return errors.Trace(err) - } - data, err := s.Read(ctx, utils.MetaFile) - if err != nil { - log.Error("load backupmeta failed", zap.Error(err)) + + var cfg task.Config + if err = cfg.ParseFromFlags(cmd.Flags()); err != nil { return err } - backupMeta := &backup.BackupMeta{} - err = proto.Unmarshal(data, backupMeta) + _, _, backupMeta, err := task.ReadBackupMeta(ctx, &cfg) if err != nil { - log.Error("parse backupmeta failed", zap.Error(err)) + log.Error("read backupmeta failed", zap.Error(err)) return err } dbs, err := utils.LoadBackupTables(backupMeta) @@ -241,8 +221,7 @@ func newBackupMetaCommand() *cobra.Command { return nil }, } - command.Flags().String("path", "", "the path of backupmeta") - command.Flags().Uint64P("offset", "", 0, "the offset of table id alloctor") + command.Flags().Uint64("offset", 0, "the offset of table id alloctor") command.Hidden = true return command } @@ -254,24 +233,16 @@ func decodeBackupMetaCommand() *cobra.Command { RunE: func(cmd *cobra.Command, args []string) error { ctx, cancel := context.WithCancel(GetDefaultContext()) defer cancel() - u, err := storage.ParseBackendFromFlags(cmd.Flags(), FlagStorage) - if err != nil { - return errors.Trace(err) - } - s, err := storage.Create(ctx, u) - if err != nil { - return errors.Trace(err) + + var cfg task.Config + if err := cfg.ParseFromFlags(cmd.Flags()); err != nil { + return err } - metaData, err := s.Read(ctx, utils.MetaFile) + _, s, backupMeta, err := task.ReadBackupMeta(ctx, &cfg) if err != nil { - return errors.Trace(err) + return err } - backupMeta := &backup.BackupMeta{} - err = proto.Unmarshal(metaData, backupMeta) - if err != nil { - return errors.Trace(err) - } backupMetaJSON, err := json.Marshal(backupMeta) if err != nil { return errors.Trace(err) @@ -309,14 +280,16 @@ func encodeBackupMetaCommand() *cobra.Command { RunE: func(cmd *cobra.Command, args []string) error { ctx, cancel := context.WithCancel(GetDefaultContext()) defer cancel() - u, err := storage.ParseBackendFromFlags(cmd.Flags(), FlagStorage) - if err != nil { - return errors.Trace(err) + + var cfg task.Config + if err := cfg.ParseFromFlags(cmd.Flags()); err != nil { + return err } - s, err := storage.Create(ctx, u) + _, s, err := task.GetStorage(ctx, &cfg) if err != nil { - return errors.Trace(err) + return err } + metaData, err := s.Read(ctx, utils.MetaJSONFile) if err != nil { return errors.Trace(err) diff --git a/go.mod b/go.mod index 9951c2922..ea325da92 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,7 @@ require ( github.com/pingcap/parser v0.0.0-20191210060830-bdf23a7ade01 github.com/pingcap/pd v1.1.0-beta.0.20191212045800-234784c7a9c5 github.com/pingcap/tidb v1.1.0-beta.0.20191213040028-9009da737834 + github.com/pingcap/tidb-tools v4.0.0-beta+incompatible github.com/pingcap/tipb v0.0.0-20191209145133-44f75c9bef33 github.com/prometheus/client_golang v1.0.0 github.com/sirupsen/logrus v1.4.2 diff --git a/go.sum b/go.sum index 085e00355..4abf151be 100644 --- a/go.sum +++ b/go.sum @@ -48,10 +48,12 @@ github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd h1:qMd81Ts1T github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= +github.com/coreos/go-semver v0.2.0 h1:3Jm3tLmsgAYcjC+4Up7hJrFBPr+n7rAqYeSw/SZazuY= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/coreos/go-systemd v0.0.0-20181031085051-9002847aa142 h1:3jFq2xL4ZajGK4aZY8jz+DAF0FHjI51BXjjSwCzS1Dk= github.com/coreos/go-systemd v0.0.0-20181031085051-9002847aa142/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f h1:JOrtw2xFKzlg+cbHpyrpLDmnN1HqhBfnX7WDiW7eG2c= github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= @@ -62,6 +64,7 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 h1:iwZdTE0PVqJCos1vaoKsclOGD3ADKpshg3SRtYBbwso= github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548/go.mod h1:e6NPNENfs9mPDVNRekM7lKScauxd5kXTr1Mfyig6TDM= +github.com/cznic/sortutil v0.0.0-20150617083342-4c7342852e65 h1:hxuZop6tSoOi0sxFzoGGYdRqNrPubyaIf9KoBG9tPiE= github.com/cznic/sortutil v0.0.0-20150617083342-4c7342852e65/go.mod h1:q2w6Bg5jeox1B+QkJ6Wp/+Vn0G/bo3f1uY7Fn3vivIQ= github.com/cznic/sortutil v0.0.0-20181122101858-f5f958428db8 h1:LpMLYGyy67BoAFGda1NeOBQwqlv7nUXpm+rIVHGxZZ4= github.com/cznic/sortutil v0.0.0-20181122101858-f5f958428db8/go.mod h1:q2w6Bg5jeox1B+QkJ6Wp/+Vn0G/bo3f1uY7Fn3vivIQ= @@ -122,6 +125,7 @@ github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= @@ -169,7 +173,9 @@ github.com/grpc-ecosystem/grpc-gateway v1.4.1/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpg github.com/grpc-ecosystem/grpc-gateway v1.9.5 h1:UImYN5qQ8tuGpGE16ZmjvcTtTw24zw1QAp/SlnNrZhI= github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= @@ -191,6 +197,7 @@ github.com/juju/ratelimit v1.0.1/go.mod h1:qapgC/Gy+xNh9UxzV13HGGl/6UXNN+ct+vwSg github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= +github.com/kisielk/gotool v1.0.0 h1:AV2c/EiW3KqPNT9ZKl07ehoAGi4C5/01Cfbblndcapg= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/cpuid v0.0.0-20170728055534-ae7887de9fa5 h1:2U0HzY8BJ8hVwDKIzp7y4voR9CX/nvcfymLmg2UiOio= github.com/klauspost/cpuid v0.0.0-20170728055534-ae7887de9fa5/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= @@ -219,7 +226,9 @@ github.com/mattn/go-shellwords v1.0.3/go.mod h1:3xCvwCdWdlDJUrvuMn7Wuy9eWs4pE8vq github.com/matttproud/golang_protobuf_extensions v1.0.0/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= +github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= @@ -237,9 +246,11 @@ github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef h1:K0Fn+DoFqNqktdZtdV3 github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef/go.mod h1:7WjlapSfwQyo6LNmIvEWzsW1hbBQfpUO4JWnuQRmva8= github.com/nicksnyder/go-i18n v1.10.0/go.mod h1:HrK7VCrbOvQoUAQ7Vpy7i87N7JZZZ7R2xBGjv0j365Q= github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= +github.com/onsi/ginkgo v1.6.0 h1:Ix8l273rp3QzYgXSR+c8d1fTG7UPgYkOSELPhiY/YGw= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.10.3 h1:OoxbjfXVZyod1fmWYhI7SEyaD8B00ynP3T+D5GiyHOY= github.com/onsi/ginkgo v1.10.3/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v1.4.2 h1:3mYCb7aPxS/RU7TI1y4rkEn1oKmPRjNJLNEXgw7MH2I= github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.1 h1:K0jcRCwNQM3vFGh1ppMtDh/+7ApJrjldlX8fA0jDTLQ= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= @@ -248,6 +259,7 @@ github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKw github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg4X946/Y5Zwg= github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= +github.com/pelletier/go-toml v1.3.0 h1:e5+lF2E4Y2WCIxBefVowBuB0iHrUH4HZ8q+6mGF7fJc= github.com/pelletier/go-toml v1.3.0/go.mod h1:PN7xzY2wHTK0K9p34ErDQMlFxa51Fk0OUruD3k1mMwo= github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8 h1:USx2/E1bX46VG32FIw034Au6seQ2fY9NEILmNh/UlQg= github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8/go.mod h1:B1+S9LNcuMyLH/4HMTViQOJevkGiik3wW2AN9zb2fNQ= @@ -285,6 +297,8 @@ github.com/pingcap/tidb v1.1.0-beta.0.20191213040028-9009da737834 h1:eNf7bDY39mo github.com/pingcap/tidb v1.1.0-beta.0.20191213040028-9009da737834/go.mod h1:VWx47QOXISBHHtZeWrDQlBOdbvth9TE9gei6QpoqJ4g= github.com/pingcap/tidb-tools v3.0.6-0.20191106033616-90632dda3863+incompatible h1:H1jg0aDWz2SLRh3hNBo2HFtnuHtudIUvBumU7syRkic= github.com/pingcap/tidb-tools v3.0.6-0.20191106033616-90632dda3863+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= +github.com/pingcap/tidb-tools v4.0.0-beta+incompatible h1:+XJdcVLCM8GDgXiMS6lFV59N3XPVOqtNHeWNLVrb2pg= +github.com/pingcap/tidb-tools v4.0.0-beta+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= github.com/pingcap/tipb v0.0.0-20191209145133-44f75c9bef33 h1:cTSaVv1hue17BCPqt+sURADTFSMpSD26ZuvKRyYIjJs= github.com/pingcap/tipb v0.0.0-20191209145133-44f75c9bef33/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -331,11 +345,14 @@ github.com/soheilhy/cmux v0.1.4 h1:0HKaf1o97UwFjHH9o5XsHUOF+tqmdA7KEzXLpiyaw0E= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/spf13/afero v1.1.2 h1:m8/z1t7/fwjysjQRYbP0RD+bUIF/8tJwPdEZsI83ACI= github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= +github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8= github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= github.com/spf13/cobra v0.0.5 h1:f0B+LkLX6DtmRH1isoNA9VTtNUK9K8xYd28JNNfOv/s= github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU= +github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk= github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= github.com/spf13/pflag v1.0.1/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= @@ -374,6 +391,7 @@ github.com/urfave/negroni v0.3.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKn github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= +github.com/yookoala/realpath v1.0.0 h1:7OA9pj4FZd+oZDsyvXWQvjn5oBdcHRTV44PpdMSuImQ= github.com/yookoala/realpath v1.0.0/go.mod h1:gJJMA9wuX7AcqLy1+ffPatSCySA1FQ2S8Ya9AIoYBpE= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk= @@ -382,6 +400,7 @@ go.etcd.io/etcd v0.0.0-20190320044326-77d4b742cdbf/go.mod h1:KSGwdbiFchh5KIC9My2 go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738 h1:lWF4f9Nypl1ZqSb4gLeh/DGvBYVaUYHuiB93teOmwgc= go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= +go.opencensus.io v0.22.0 h1:C9hSCOW830chIVkdja34wa6Ky+IzWllkUinR+BtRZd4= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2 h1:75k/FF0Q2YM8QYo07VPddOLBslDt1MZOdEslOHvmzAs= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= @@ -457,6 +476,7 @@ golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190909003024-a7b16738d86b h1:XfVGCX+0T4WOStkaOsJRllbsiImhB2jgVBGc9L0lPGc= golang.org/x/net v0.0.0-20190909003024-a7b16738d86b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191011234655-491137f69257 h1:ry8e2D+cwaV6hk7lb3aRTjjZo24shrbK0e11QEOkTIg= golang.org/x/net v0.0.0-20191011234655-491137f69257/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -585,6 +605,7 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkep gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/pkg/backup/client.go b/pkg/backup/client.go index 5cba2d9bf..549c611ec 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -16,6 +16,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/parser/model" pd "github.com/pingcap/pd/client" + "github.com/pingcap/tidb-tools/pkg/filter" "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" @@ -67,21 +68,17 @@ func NewBackupClient(ctx context.Context, mgr ClientMgr) (*Client, error) { } // GetTS returns the latest timestamp. -func (bc *Client) GetTS(ctx context.Context, timeAgo string) (uint64, error) { +func (bc *Client) GetTS(ctx context.Context, duration time.Duration) (uint64, error) { p, l, err := bc.mgr.GetPDClient().GetTS(ctx) if err != nil { return 0, errors.Trace(err) } backupTS := oracle.ComposeTS(p, l) - if timeAgo != "" { - duration, err := time.ParseDuration(timeAgo) - if err != nil { - return 0, errors.Trace(err) - } - if duration <= 0 { - return 0, errors.New("negative timeago is not allowed") - } + switch { + case duration < 0: + return 0, errors.New("negative timeago is not allowed") + case duration > 0: log.Info("backup time ago", zap.Duration("timeago", duration)) backupTime := oracle.GetTimeFromTS(backupTS) @@ -102,9 +99,9 @@ func (bc *Client) GetTS(ctx context.Context, timeAgo string) (uint64, error) { } // SetStorage set ExternalStorage for client -func (bc *Client) SetStorage(ctx context.Context, backend *backup.StorageBackend) error { +func (bc *Client) SetStorage(ctx context.Context, backend *backup.StorageBackend, sendCreds bool) error { var err error - bc.storage, err = storage.Create(ctx, backend) + bc.storage, err = storage.Create(ctx, backend, sendCreds) if err != nil { return err } @@ -173,63 +170,27 @@ func appendRanges(tbl *model.TableInfo, tblID int64) ([]kv.KeyRange, error) { func BuildBackupRangeAndSchema( dom *domain.Domain, storage kv.Storage, + tableFilter *filter.Filter, backupTS uint64, - dbName, tableName string, ) ([]Range, *Schemas, error) { - SystemDatabases := [3]string{ - "information_schema", - "performance_schema", - "mysql", - } - info, err := dom.GetSnapshotInfoSchema(backupTS) if err != nil { return nil, nil, errors.Trace(err) } - var dbInfos []*model.DBInfo - var cTableName model.CIStr - switch { - case len(dbName) == 0 && len(tableName) != 0: - return nil, nil, errors.New("no database is not specified") - case len(dbName) != 0 && len(tableName) == 0: - // backup database - cDBName := model.NewCIStr(dbName) - dbInfo, exist := info.SchemaByName(cDBName) - if !exist { - return nil, nil, errors.Errorf("schema %s not found", dbName) - } - dbInfos = append(dbInfos, dbInfo) - case len(dbName) != 0 && len(tableName) != 0: - // backup table - cTableName = model.NewCIStr(tableName) - cDBName := model.NewCIStr(dbName) - dbInfo, exist := info.SchemaByName(cDBName) - if !exist { - return nil, nil, errors.Errorf("schema %s not found", dbName) - } - dbInfos = append(dbInfos, dbInfo) - case len(dbName) == 0 && len(tableName) == 0: - // backup full - dbInfos = info.AllSchemas() - } ranges := make([]Range, 0) backupSchemas := newBackupSchemas() -LoadDb: - for _, dbInfo := range dbInfos { + for _, dbInfo := range info.AllSchemas() { // skip system databases - for _, sysDbName := range SystemDatabases { - if sysDbName == dbInfo.Name.L { - continue LoadDb - } - } - dbData, err := json.Marshal(dbInfo) - if err != nil { - return nil, nil, errors.Trace(err) + if filter.IsSystemSchema(dbInfo.Name.L) { + continue } + + var dbData []byte idAlloc := autoid.NewAllocator(storage, dbInfo.ID, false) + for _, tableInfo := range dbInfo.Tables { - if len(cTableName.L) != 0 && cTableName.L != tableInfo.Name.L { + if !tableFilter.Match(&filter.Table{Schema: dbInfo.Name.O, Name: tableInfo.Name.O}) { // Skip tables other than the given table. continue } @@ -243,6 +204,12 @@ LoadDb: zap.Stringer("table", tableInfo.Name), zap.Int64("AutoIncID", globalAutoID)) + if dbData == nil { + dbData, err = json.Marshal(dbInfo) + if err != nil { + return nil, nil, errors.Trace(err) + } + } tableData, err := json.Marshal(tableInfo) if err != nil { return nil, nil, errors.Trace(err) @@ -267,11 +234,8 @@ LoadDb: } } - if len(cTableName.L) != 0 { - // Must find the given table. - if backupSchemas.Len() == 0 { - return nil, nil, errors.Errorf("table %s not found", cTableName) - } + if backupSchemas.Len() == 0 { + return nil, nil, errors.New("nothing to backup") } return ranges, backupSchemas, nil } diff --git a/pkg/backup/client_test.go b/pkg/backup/client_test.go index 44ca1ad5a..ddff45299 100644 --- a/pkg/backup/client_test.go +++ b/pkg/backup/client_test.go @@ -50,16 +50,10 @@ func (r *testBackup) TestGetTS(c *C) { deviation = 100 ) - // timeago not valid - timeAgo := "invalid" - _, err = r.backupClient.GetTS(r.ctx, timeAgo) - c.Assert(err, ErrorMatches, "time: invalid duration invalid") - // timeago not work - timeAgo = "" expectedDuration := 0 currentTs := time.Now().UnixNano() / int64(time.Millisecond) - ts, err := r.backupClient.GetTS(r.ctx, timeAgo) + ts, err := r.backupClient.GetTS(r.ctx, 0) c.Assert(err, IsNil) pdTs := oracle.ExtractPhysical(ts) duration := int(currentTs - pdTs) @@ -67,10 +61,9 @@ func (r *testBackup) TestGetTS(c *C) { c.Assert(duration, Less, expectedDuration+deviation) // timeago = "1.5m" - timeAgo = "1.5m" expectedDuration = 90000 currentTs = time.Now().UnixNano() / int64(time.Millisecond) - ts, err = r.backupClient.GetTS(r.ctx, timeAgo) + ts, err = r.backupClient.GetTS(r.ctx, 90*time.Second) c.Assert(err, IsNil) pdTs = oracle.ExtractPhysical(ts) duration = int(currentTs - pdTs) @@ -78,13 +71,11 @@ func (r *testBackup) TestGetTS(c *C) { c.Assert(duration, Less, expectedDuration+deviation) // timeago = "-1m" - timeAgo = "-1m" - _, err = r.backupClient.GetTS(r.ctx, timeAgo) + _, err = r.backupClient.GetTS(r.ctx, -time.Minute) c.Assert(err, ErrorMatches, "negative timeago is not allowed") // timeago = "1000000h" overflows - timeAgo = "1000000h" - _, err = r.backupClient.GetTS(r.ctx, timeAgo) + _, err = r.backupClient.GetTS(r.ctx, 1000000*time.Hour) c.Assert(err, ErrorMatches, "backup ts overflow.*") // timeago = "10h" exceed GCSafePoint @@ -93,8 +84,7 @@ func (r *testBackup) TestGetTS(c *C) { now := oracle.ComposeTS(p, l) _, err = r.backupClient.mgr.GetPDClient().UpdateGCSafePoint(r.ctx, now) c.Assert(err, IsNil) - timeAgo = "10h" - _, err = r.backupClient.GetTS(r.ctx, timeAgo) + _, err = r.backupClient.GetTS(r.ctx, 10*time.Hour) c.Assert(err, ErrorMatches, "GC safepoint [0-9]+ exceed TS [0-9]+") } diff --git a/pkg/backup/schema_test.go b/pkg/backup/schema_test.go index 3d10fd967..f657310bf 100644 --- a/pkg/backup/schema_test.go +++ b/pkg/backup/schema_test.go @@ -5,6 +5,7 @@ import ( "math" . "github.com/pingcap/check" + "github.com/pingcap/tidb-tools/pkg/filter" "github.com/pingcap/tidb/util/testkit" "github.com/pingcap/tidb/util/testleak" @@ -34,28 +35,32 @@ func (s *testBackupSchemaSuite) TestBuildBackupRangeAndSchema(c *C) { tk := testkit.NewTestKit(c, s.mock.Storage) // Table t1 is not exist. + testFilter, err := filter.New(false, &filter.Rules{ + DoTables: []*filter.Table{{Schema: "test", Name: "t1"}}, + }) + c.Assert(err, IsNil) _, backupSchemas, err := BuildBackupRangeAndSchema( - s.mock.Domain, s.mock.Storage, math.MaxUint64, "test", "t1") + s.mock.Domain, s.mock.Storage, testFilter, math.MaxUint64) c.Assert(err, NotNil) c.Assert(backupSchemas, IsNil) // Database is not exist. + fooFilter, err := filter.New(false, &filter.Rules{ + DoTables: []*filter.Table{{Schema: "foo", Name: "t1"}}, + }) + c.Assert(err, IsNil) _, backupSchemas, err = BuildBackupRangeAndSchema( - s.mock.Domain, s.mock.Storage, math.MaxUint64, "foo", "t1") + s.mock.Domain, s.mock.Storage, fooFilter, math.MaxUint64) c.Assert(err, NotNil) c.Assert(backupSchemas, IsNil) // Empty databse. - _, backupSchemas, err = BuildBackupRangeAndSchema( - s.mock.Domain, s.mock.Storage, math.MaxUint64, "", "") - c.Assert(err, IsNil) - c.Assert(backupSchemas, NotNil) - c.Assert(backupSchemas.Len(), Equals, 0) - updateCh := make(chan struct{}, 2) - backupSchemas.Start(context.Background(), s.mock.Storage, math.MaxUint64, 1, updateCh) - schemas, err := backupSchemas.finishTableChecksum() + noFilter, err := filter.New(false, &filter.Rules{}) c.Assert(err, IsNil) - c.Assert(len(schemas), Equals, 0) + _, backupSchemas, err = BuildBackupRangeAndSchema( + s.mock.Domain, s.mock.Storage, noFilter, math.MaxUint64) + c.Assert(err, NotNil) + c.Assert(backupSchemas, IsNil) tk.MustExec("use test") tk.MustExec("drop table if exists t1;") @@ -63,11 +68,12 @@ func (s *testBackupSchemaSuite) TestBuildBackupRangeAndSchema(c *C) { tk.MustExec("insert into t1 values (10);") _, backupSchemas, err = BuildBackupRangeAndSchema( - s.mock.Domain, s.mock.Storage, math.MaxUint64, "test", "t1") + s.mock.Domain, s.mock.Storage, testFilter, math.MaxUint64) c.Assert(err, IsNil) c.Assert(backupSchemas.Len(), Equals, 1) + updateCh := make(chan struct{}, 2) backupSchemas.Start(context.Background(), s.mock.Storage, math.MaxUint64, 1, updateCh) - schemas, err = backupSchemas.finishTableChecksum() + schemas, err := backupSchemas.finishTableChecksum() <-updateCh c.Assert(err, IsNil) c.Assert(len(schemas), Equals, 1) @@ -82,7 +88,7 @@ func (s *testBackupSchemaSuite) TestBuildBackupRangeAndSchema(c *C) { tk.MustExec("insert into t2 values (11);") _, backupSchemas, err = BuildBackupRangeAndSchema( - s.mock.Domain, s.mock.Storage, math.MaxUint64, "", "") + s.mock.Domain, s.mock.Storage, noFilter, math.MaxUint64) c.Assert(err, IsNil) c.Assert(backupSchemas.Len(), Equals, 2) backupSchemas.Start(context.Background(), s.mock.Storage, math.MaxUint64, 2, updateCh) diff --git a/pkg/storage/flags.go b/pkg/storage/flags.go index 51fd98af1..2340467ba 100644 --- a/pkg/storage/flags.go +++ b/pkg/storage/flags.go @@ -1,55 +1,19 @@ package storage import ( - "github.com/pingcap/errors" - "github.com/pingcap/kvproto/pkg/backup" "github.com/spf13/pflag" ) -const ( - // flagSendCredentialOption specify whether to send credentials to tikv - flagSendCredentialOption = "send-credentials-to-tikv" -) - -var ( - sendCredential bool -) - // DefineFlags adds flags to the flag set corresponding to all backend options. func DefineFlags(flags *pflag.FlagSet) { - flags.BoolP(flagSendCredentialOption, "c", true, - "Whether send credentials to tikv") defineS3Flags(flags) defineGCSFlags(flags) } -// GetBackendOptionsFromFlags obtains the backend options from the flag set. -func GetBackendOptionsFromFlags(flags *pflag.FlagSet) (options BackendOptions, err error) { - sendCredential, err = flags.GetBool(flagSendCredentialOption) - if err != nil { - err = errors.Trace(err) - return - } - - if options.S3, err = getBackendOptionsFromS3Flags(flags); err != nil { - return - } - if options.GCS, err = getBackendOptionsFromGCSFlags(flags); err != nil { - return - } - return -} - -// ParseBackendFromFlags is a convenient function to consecutively call -// GetBackendOptionsFromFlags and ParseBackend. -func ParseBackendFromFlags(flags *pflag.FlagSet, storageFlag string) (*backup.StorageBackend, error) { - u, err := flags.GetString(storageFlag) - if err != nil { - return nil, errors.Trace(err) - } - opts, err := GetBackendOptionsFromFlags(flags) - if err != nil { - return nil, err +// ParseFromFlags obtains the backend options from the flag set. +func (options *BackendOptions) ParseFromFlags(flags *pflag.FlagSet) error { + if err := options.S3.parseFromFlags(flags); err != nil { + return err } - return ParseBackend(u, &opts) + return options.GCS.parseFromFlags(flags) } diff --git a/pkg/storage/gcs.go b/pkg/storage/gcs.go index a0df5b03e..2eb310c3a 100644 --- a/pkg/storage/gcs.go +++ b/pkg/storage/gcs.go @@ -70,31 +70,28 @@ https://console.cloud.google.com/apis/credentials.`) _ = flags.MarkHidden(gcsCredentialsFile) } -func getBackendOptionsFromGCSFlags(flags *pflag.FlagSet) (options GCSBackendOptions, err error) { +func (options *GCSBackendOptions) parseFromFlags(flags *pflag.FlagSet) error { + var err error options.Endpoint, err = flags.GetString(gcsEndpointOption) if err != nil { - err = errors.Trace(err) - return + return errors.Trace(err) } options.StorageClass, err = flags.GetString(gcsStorageClassOption) if err != nil { - err = errors.Trace(err) - return + return errors.Trace(err) } options.PredefinedACL, err = flags.GetString(gcsPredefinedACL) if err != nil { - err = errors.Trace(err) - return + return errors.Trace(err) } options.CredentialsFile, err = flags.GetString(gcsCredentialsFile) if err != nil { - err = errors.Trace(err) - return + return errors.Trace(err) } - return + return nil } type gcsStorage struct { @@ -142,11 +139,16 @@ func (s *gcsStorage) FileExists(ctx context.Context, name string) (bool, error) return true, nil } -func newGCSStorage(ctx context.Context, gcs *backup.GCS) (*gcsStorage, error) { - return newGCSStorageWithHTTPClient(ctx, gcs, nil) +func newGCSStorage(ctx context.Context, gcs *backup.GCS, sendCredential bool) (*gcsStorage, error) { + return newGCSStorageWithHTTPClient(ctx, gcs, nil, sendCredential) } -func newGCSStorageWithHTTPClient(ctx context.Context, gcs *backup.GCS, hclient *http.Client) (*gcsStorage, error) { +func newGCSStorageWithHTTPClient( // revive:disable-line:flag-parameter + ctx context.Context, + gcs *backup.GCS, + hclient *http.Client, + sendCredential bool, +) (*gcsStorage, error) { var clientOps []option.ClientOption if gcs.CredentialsBlob == "" { creds, err := google.FindDefaultCredentials(ctx, storage.ScopeReadWrite) diff --git a/pkg/storage/gcs_test.go b/pkg/storage/gcs_test.go index da990cfe7..10bb44371 100644 --- a/pkg/storage/gcs_test.go +++ b/pkg/storage/gcs_test.go @@ -28,7 +28,7 @@ func (r *testStorageSuite) TestGCS(c *C) { PredefinedAcl: "private", CredentialsBlob: "Fake Credentials", } - stg, err := newGCSStorageWithHTTPClient(ctx, gcs, server.HTTPClient()) + stg, err := newGCSStorageWithHTTPClient(ctx, gcs, server.HTTPClient(), false) c.Assert(err, IsNil) err = stg.Write(ctx, "key", []byte("data")) @@ -66,7 +66,6 @@ func (r *testStorageSuite) TestNewGCSStorage(c *C) { server.CreateBucket(bucketName) { - sendCredential = true gcs := &backup.GCS{ Bucket: bucketName, Prefix: "a/b/", @@ -74,13 +73,12 @@ func (r *testStorageSuite) TestNewGCSStorage(c *C) { PredefinedAcl: "private", CredentialsBlob: "FakeCredentials", } - _, err := newGCSStorageWithHTTPClient(ctx, gcs, server.HTTPClient()) + _, err := newGCSStorageWithHTTPClient(ctx, gcs, server.HTTPClient(), true) c.Assert(err, IsNil) c.Assert(gcs.CredentialsBlob, Equals, "FakeCredentials") } { - sendCredential = false gcs := &backup.GCS{ Bucket: bucketName, Prefix: "a/b/", @@ -88,7 +86,7 @@ func (r *testStorageSuite) TestNewGCSStorage(c *C) { PredefinedAcl: "private", CredentialsBlob: "FakeCredentials", } - _, err := newGCSStorageWithHTTPClient(ctx, gcs, server.HTTPClient()) + _, err := newGCSStorageWithHTTPClient(ctx, gcs, server.HTTPClient(), false) c.Assert(err, IsNil) c.Assert(gcs.CredentialsBlob, Equals, "") } @@ -106,7 +104,6 @@ func (r *testStorageSuite) TestNewGCSStorage(c *C) { defer os.Unsetenv("GOOGLE_APPLICATION_CREDENTIALS") c.Assert(err, IsNil) - sendCredential = true gcs := &backup.GCS{ Bucket: bucketName, Prefix: "a/b/", @@ -114,7 +111,7 @@ func (r *testStorageSuite) TestNewGCSStorage(c *C) { PredefinedAcl: "private", CredentialsBlob: "", } - _, err = newGCSStorageWithHTTPClient(ctx, gcs, server.HTTPClient()) + _, err = newGCSStorageWithHTTPClient(ctx, gcs, server.HTTPClient(), true) c.Assert(err, IsNil) c.Assert(gcs.CredentialsBlob, Equals, `{"type": "service_account"}`) } @@ -132,7 +129,6 @@ func (r *testStorageSuite) TestNewGCSStorage(c *C) { defer os.Unsetenv("GOOGLE_APPLICATION_CREDENTIALS") c.Assert(err, IsNil) - sendCredential = false gcs := &backup.GCS{ Bucket: bucketName, Prefix: "a/b/", @@ -140,13 +136,12 @@ func (r *testStorageSuite) TestNewGCSStorage(c *C) { PredefinedAcl: "private", CredentialsBlob: "", } - _, err = newGCSStorageWithHTTPClient(ctx, gcs, server.HTTPClient()) + _, err = newGCSStorageWithHTTPClient(ctx, gcs, server.HTTPClient(), false) c.Assert(err, IsNil) c.Assert(gcs.CredentialsBlob, Equals, "") } { - sendCredential = true os.Unsetenv("GOOGLE_APPLICATION_CREDENTIALS") gcs := &backup.GCS{ Bucket: bucketName, @@ -155,7 +150,7 @@ func (r *testStorageSuite) TestNewGCSStorage(c *C) { PredefinedAcl: "private", CredentialsBlob: "", } - _, err = newGCSStorageWithHTTPClient(ctx, gcs, server.HTTPClient()) + _, err = newGCSStorageWithHTTPClient(ctx, gcs, server.HTTPClient(), true) c.Assert(err, NotNil) } } diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 5db54556c..8e04769b5 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -117,44 +117,41 @@ func defineS3Flags(flags *pflag.FlagSet) { _ = flags.MarkHidden(s3ProviderOption) } -func getBackendOptionsFromS3Flags(flags *pflag.FlagSet) (options S3BackendOptions, err error) { +func (options *S3BackendOptions) parseFromFlags(flags *pflag.FlagSet) error { + var err error options.Endpoint, err = flags.GetString(s3EndpointOption) if err != nil { - err = errors.Trace(err) - return + return errors.Trace(err) } options.Region, err = flags.GetString(s3RegionOption) if err != nil { - err = errors.Trace(err) - return + return errors.Trace(err) } options.SSE, err = flags.GetString(s3SSEOption) if err != nil { - err = errors.Trace(err) - return + return errors.Trace(err) } options.ACL, err = flags.GetString(s3ACLOption) if err != nil { - err = errors.Trace(err) - return + return errors.Trace(err) } options.StorageClass, err = flags.GetString(s3StorageClassOption) if err != nil { - err = errors.Trace(err) - return + return errors.Trace(err) } options.ForcePathStyle = true options.Provider, err = flags.GetString(s3ProviderOption) if err != nil { - err = errors.Trace(err) - return + return errors.Trace(err) } - - return options, err + return nil } // newS3Storage initialize a new s3 storage for metadata -func newS3Storage(backend *backup.S3) (*S3Storage, error) { +func newS3Storage( // revive:disable-line:flag-parameter + backend *backup.S3, + sendCredential bool, +) (*S3Storage, error) { qs := *backend awsConfig := aws.NewConfig(). WithMaxRetries(maxRetries). diff --git a/pkg/storage/s3_test.go b/pkg/storage/s3_test.go index 92a5a8737..3eaf1c206 100644 --- a/pkg/storage/s3_test.go +++ b/pkg/storage/s3_test.go @@ -236,7 +236,7 @@ func (r *testStorageSuite) TestS3Storage(c *C) { testFn := func(test *testcase, c *C) { c.Log(test.name) ctx := aws.BackgroundContext() - sendCredential = test.sendCredential + sendCredential := test.sendCredential if test.hackCheck { checkS3Bucket = func(svc *s3.S3, bucket string) error { return nil } } @@ -245,7 +245,7 @@ func (r *testStorageSuite) TestS3Storage(c *C) { S3: test.s3, }, } - _, err := Create(ctx, s3) + _, err := Create(ctx, s3, sendCredential) if test.errReturn { c.Assert(err, NotNil) return diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 173638bdd..f9ae368ae 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -18,7 +18,7 @@ type ExternalStorage interface { } // Create creates ExternalStorage -func Create(ctx context.Context, backend *backup.StorageBackend) (ExternalStorage, error) { +func Create(ctx context.Context, backend *backup.StorageBackend, sendCreds bool) (ExternalStorage, error) { switch backend := backend.Backend.(type) { case *backup.StorageBackend_Local: return newLocalStorage(backend.Local.Path) @@ -26,14 +26,14 @@ func Create(ctx context.Context, backend *backup.StorageBackend) (ExternalStorag if backend.S3 == nil { return nil, errors.New("s3 config not found") } - return newS3Storage(backend.S3) + return newS3Storage(backend.S3, sendCreds) case *backup.StorageBackend_Noop: return newNoopStorage(), nil case *backup.StorageBackend_Gcs: if backend.Gcs == nil { return nil, errors.New("GCS config not found") } - return newGCSStorage(ctx, backend.Gcs) + return newGCSStorage(ctx, backend.Gcs, sendCreds) default: return nil, errors.Errorf("storage %T is not supported yet", backend) } diff --git a/pkg/task/backup.go b/pkg/task/backup.go new file mode 100644 index 000000000..b9613cd56 --- /dev/null +++ b/pkg/task/backup.go @@ -0,0 +1,157 @@ +package task + +import ( + "context" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tidb-tools/pkg/filter" + "github.com/spf13/pflag" + + "github.com/pingcap/br/pkg/backup" + "github.com/pingcap/br/pkg/storage" + "github.com/pingcap/br/pkg/summary" + "github.com/pingcap/br/pkg/utils" +) + +const ( + flagBackupTimeago = "timeago" + flagLastBackupTS = "lastbackupts" +) + +// BackupConfig is the configuration specific for backup tasks. +type BackupConfig struct { + Config + + TimeAgo time.Duration `json:"time-ago" toml:"time-ago"` + LastBackupTS uint64 `json:"last-backup-ts" toml:"last-backup-ts"` +} + +// DefineBackupFlags defines common flags for the backup command. +func DefineBackupFlags(flags *pflag.FlagSet) { + flags.Duration( + flagBackupTimeago, 0, + "The history version of the backup task, e.g. 1m, 1h. Do not exceed GCSafePoint") + + flags.Uint64(flagLastBackupTS, 0, "the last time backup ts") + _ = flags.MarkHidden(flagLastBackupTS) +} + +// ParseFromFlags parses the backup-related flags from the flag set. +func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet) error { + timeAgo, err := flags.GetDuration(flagBackupTimeago) + if err != nil { + return errors.Trace(err) + } + if timeAgo < 0 { + return errors.New("negative timeago is not allowed") + } + cfg.TimeAgo = timeAgo + cfg.LastBackupTS, err = flags.GetUint64(flagLastBackupTS) + if err != nil { + return errors.Trace(err) + } + if err = cfg.Config.ParseFromFlags(flags); err != nil { + return errors.Trace(err) + } + return nil +} + +// RunBackup starts a backup task inside the current goroutine. +func RunBackup(c context.Context, cmdName string, cfg *BackupConfig) error { + ctx, cancel := context.WithCancel(c) + defer cancel() + + u, err := storage.ParseBackend(cfg.Storage, &cfg.BackendOptions) + if err != nil { + return err + } + tableFilter, err := filter.New(cfg.CaseSensitive, &cfg.Filter) + if err != nil { + return err + } + mgr, err := newMgr(ctx, cfg.PD) + if err != nil { + return err + } + defer mgr.Close() + + client, err := backup.NewBackupClient(ctx, mgr) + if err != nil { + return err + } + if err = client.SetStorage(ctx, u, cfg.SendCreds); err != nil { + return err + } + + backupTS, err := client.GetTS(ctx, cfg.TimeAgo) + if err != nil { + return err + } + + defer summary.Summary(cmdName) + + ranges, backupSchemas, err := backup.BuildBackupRangeAndSchema( + mgr.GetDomain(), mgr.GetTiKV(), tableFilter, backupTS) + if err != nil { + return err + } + + // The number of regions need to backup + approximateRegions := 0 + for _, r := range ranges { + var regionCount int + regionCount, err = mgr.GetRegionCount(ctx, r.StartKey, r.EndKey) + if err != nil { + return err + } + approximateRegions += regionCount + } + + summary.CollectInt("backup total regions", approximateRegions) + + // Backup + // Redirect to log if there is no log file to avoid unreadable output. + updateCh := utils.StartProgress( + ctx, cmdName, int64(approximateRegions), !cfg.LogProgress) + err = client.BackupRanges( + ctx, ranges, cfg.LastBackupTS, backupTS, cfg.RateLimit, cfg.Concurrency, updateCh) + if err != nil { + return err + } + // Backup has finished + close(updateCh) + + // Checksum + backupSchemasConcurrency := backup.DefaultSchemaConcurrency + if backupSchemas.Len() < backupSchemasConcurrency { + backupSchemasConcurrency = backupSchemas.Len() + } + updateCh = utils.StartProgress( + ctx, "Checksum", int64(backupSchemas.Len()), !cfg.LogProgress) + backupSchemas.SetSkipChecksum(!cfg.Checksum) + backupSchemas.Start( + ctx, mgr.GetTiKV(), backupTS, uint(backupSchemasConcurrency), updateCh) + + err = client.CompleteMeta(backupSchemas) + if err != nil { + return err + } + + valid, err := client.FastChecksum() + if err != nil { + return err + } + if !valid { + log.Error("backup FastChecksum mismatch!") + } + // Checksum has finished + close(updateCh) + + err = client.SaveBackupMeta(ctx) + if err != nil { + return err + } + return nil +} diff --git a/pkg/task/common.go b/pkg/task/common.go new file mode 100644 index 000000000..a926c417f --- /dev/null +++ b/pkg/task/common.go @@ -0,0 +1,236 @@ +package task + +import ( + "context" + "fmt" + "regexp" + "strings" + + "github.com/gogo/protobuf/proto" + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/backup" + "github.com/pingcap/tidb-tools/pkg/filter" + "github.com/pingcap/tidb/store/tikv" + "github.com/spf13/cobra" + "github.com/spf13/pflag" + + "github.com/pingcap/br/pkg/conn" + "github.com/pingcap/br/pkg/storage" + "github.com/pingcap/br/pkg/utils" +) + +const ( + // flagSendCreds specify whether to send credentials to tikv + flagSendCreds = "send-credentials-to-tikv" + // flagStorage is the name of storage flag. + flagStorage = "storage" + // flagPD is the name of PD url flag. + flagPD = "pd" + // flagCA is the name of TLS CA flag. + flagCA = "ca" + // flagCert is the name of TLS cert flag. + flagCert = "cert" + // flagKey is the name of TLS key flag. + flagKey = "key" + + flagDatabase = "db" + flagTable = "table" + + flagRateLimit = "ratelimit" + flagRateLimitUnit = "ratelimit-unit" + flagConcurrency = "concurrency" + flagChecksum = "checksum" +) + +// TLSConfig is the common configuration for TLS connection. +type TLSConfig struct { + CA string `json:"ca" toml:"ca"` + Cert string `json:"cert" toml:"cert"` + Key string `json:"key" toml:"key"` +} + +// Config is the common configuration for all BRIE tasks. +type Config struct { + storage.BackendOptions + + Storage string `json:"storage" toml:"storage"` + PD []string `json:"pd" toml:"pd"` + TLS TLSConfig `json:"tls" toml:"tls"` + RateLimit uint64 `json:"rate-limit" toml:"rate-limit"` + Concurrency uint32 `json:"concurrency" toml:"concurrency"` + Checksum bool `json:"checksum" toml:"checksum"` + SendCreds bool `json:"send-credentials-to-tikv" toml:"send-credentials-to-tikv"` + // LogProgress is true means the progress bar is printed to the log instead of stdout. + LogProgress bool `json:"log-progress" toml:"log-progress"` + + CaseSensitive bool `json:"case-sensitive" toml:"case-sensitive"` + Filter filter.Rules `json:"black-white-list" toml:"black-white-list"` +} + +// DefineCommonFlags defines the flags common to all BRIE commands. +func DefineCommonFlags(flags *pflag.FlagSet) { + flags.BoolP(flagSendCreds, "c", true, "Whether send credentials to tikv") + flags.StringP(flagStorage, "s", "", `specify the url where backup storage, eg, "local:///path/to/save"`) + flags.StringSliceP(flagPD, "u", []string{"127.0.0.1:2379"}, "PD address") + flags.String(flagCA, "", "CA certificate path for TLS connection") + flags.String(flagCert, "", "Certificate path for TLS connection") + flags.String(flagKey, "", "Private key path for TLS connection") + + flags.Uint64(flagRateLimit, 0, "The rate limit of the task, MB/s per node") + flags.Uint32(flagConcurrency, 4, "The size of thread pool on each node that executes the task") + flags.Bool(flagChecksum, true, "Run checksum at end of task") + + flags.Uint64(flagRateLimitUnit, utils.MB, "The unit of rate limit") + _ = flags.MarkHidden(flagRateLimitUnit) + + storage.DefineFlags(flags) +} + +// DefineDatabaseFlags defines the required --db flag. +func DefineDatabaseFlags(command *cobra.Command) { + command.Flags().String(flagDatabase, "", "database name") + _ = command.MarkFlagRequired(flagDatabase) +} + +// DefineTableFlags defines the required --db and --table flags. +func DefineTableFlags(command *cobra.Command) { + DefineDatabaseFlags(command) + command.Flags().String(flagTable, "t", "table name") + _ = command.MarkFlagRequired(flagTable) +} + +// ParseFromFlags parses the TLS config from the flag set. +func (tls *TLSConfig) ParseFromFlags(flags *pflag.FlagSet) error { + var err error + tls.CA, err = flags.GetString(flagCA) + if err != nil { + return errors.Trace(err) + } + tls.Cert, err = flags.GetString(flagCert) + if err != nil { + return errors.Trace(err) + } + tls.Key, err = flags.GetString(flagKey) + if err != nil { + return errors.Trace(err) + } + return nil +} + +// ParseFromFlags parses the config from the flag set. +func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error { + var err error + cfg.Storage, err = flags.GetString(flagStorage) + if err != nil { + return errors.Trace(err) + } + cfg.SendCreds, err = flags.GetBool(flagSendCreds) + if err != nil { + return errors.Trace(err) + } + cfg.PD, err = flags.GetStringSlice(flagPD) + if err != nil { + return errors.Trace(err) + } + if len(cfg.PD) == 0 { + return errors.New("must provide at least one PD server address") + } + cfg.Concurrency, err = flags.GetUint32(flagConcurrency) + if err != nil { + return errors.Trace(err) + } + cfg.Checksum, err = flags.GetBool(flagChecksum) + if err != nil { + return errors.Trace(err) + } + + var rateLimit, rateLimitUnit uint64 + rateLimit, err = flags.GetUint64(flagRateLimit) + if err != nil { + return errors.Trace(err) + } + rateLimitUnit, err = flags.GetUint64(flagRateLimitUnit) + if err != nil { + return errors.Trace(err) + } + cfg.RateLimit = rateLimit * rateLimitUnit + + if dbFlag := flags.Lookup(flagDatabase); dbFlag != nil { + db := escapeFilterName(dbFlag.Value.String()) + if len(db) == 0 { + return errors.New("empty database name is not allowed") + } + if tblFlag := flags.Lookup(flagTable); tblFlag != nil { + tbl := escapeFilterName(tblFlag.Value.String()) + if len(tbl) == 0 { + return errors.New("empty table name is not allowed") + } + cfg.Filter.DoTables = []*filter.Table{{Schema: db, Name: tbl}} + } else { + cfg.Filter.DoDBs = []string{db} + } + } + + if err := cfg.BackendOptions.ParseFromFlags(flags); err != nil { + return err + } + return cfg.TLS.ParseFromFlags(flags) +} + +// newMgr creates a new mgr at the given PD address. +func newMgr(ctx context.Context, pds []string) (*conn.Mgr, error) { + pdAddress := strings.Join(pds, ",") + if len(pdAddress) == 0 { + return nil, errors.New("pd address can not be empty") + } + + // Disable GC because TiDB enables GC already. + store, err := tikv.Driver{}.Open(fmt.Sprintf("tikv://%s?disableGC=true", pdAddress)) + if err != nil { + return nil, err + } + return conn.NewMgr(ctx, pdAddress, store.(tikv.Storage)) +} + +// GetStorage gets the storage backend from the config. +func GetStorage( + ctx context.Context, + cfg *Config, +) (*backup.StorageBackend, storage.ExternalStorage, error) { + u, err := storage.ParseBackend(cfg.Storage, &cfg.BackendOptions) + if err != nil { + return nil, nil, err + } + s, err := storage.Create(ctx, u, cfg.SendCreds) + if err != nil { + return nil, nil, errors.Annotate(err, "create storage failed") + } + return u, s, nil +} + +// ReadBackupMeta reads the backupmeta file from the storage. +func ReadBackupMeta( + ctx context.Context, + cfg *Config, +) (*backup.StorageBackend, storage.ExternalStorage, *backup.BackupMeta, error) { + u, s, err := GetStorage(ctx, cfg) + if err != nil { + return nil, nil, nil, err + } + metaData, err := s.Read(ctx, utils.MetaFile) + if err != nil { + return nil, nil, nil, errors.Annotate(err, "load backupmeta failed") + } + backupMeta := &backup.BackupMeta{} + if err = proto.Unmarshal(metaData, backupMeta); err != nil { + return nil, nil, nil, errors.Annotate(err, "parse backupmeta failed") + } + return u, s, backupMeta, nil +} + +func escapeFilterName(name string) string { + if !strings.HasPrefix(name, "~") { + return name + } + return "~^" + regexp.QuoteMeta(name) + "$" +} diff --git a/pkg/task/restore.go b/pkg/task/restore.go new file mode 100644 index 000000000..f2f3caf43 --- /dev/null +++ b/pkg/task/restore.go @@ -0,0 +1,254 @@ +package task + +import ( + "context" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/backup" + "github.com/pingcap/log" + "github.com/pingcap/tidb-tools/pkg/filter" + "github.com/spf13/pflag" + "go.uber.org/zap" + + "github.com/pingcap/br/pkg/conn" + "github.com/pingcap/br/pkg/restore" + "github.com/pingcap/br/pkg/summary" + "github.com/pingcap/br/pkg/utils" +) + +const ( + flagOnline = "online" +) + +var schedulers = map[string]struct{}{ + "balance-leader-scheduler": {}, + "balance-hot-region-scheduler": {}, + "balance-region-scheduler": {}, + + "shuffle-leader-scheduler": {}, + "shuffle-region-scheduler": {}, + "shuffle-hot-region-scheduler": {}, +} + +// RestoreConfig is the configuration specific for restore tasks. +type RestoreConfig struct { + Config + + Online bool `json:"online" toml:"online"` +} + +// DefineRestoreFlags defines common flags for the restore command. +func DefineRestoreFlags(flags *pflag.FlagSet) { + flags.Bool("online", false, "Whether online when restore") + // TODO remove hidden flag if it's stable + _ = flags.MarkHidden("online") +} + +// ParseFromFlags parses the restore-related flags from the flag set. +func (cfg *RestoreConfig) ParseFromFlags(flags *pflag.FlagSet) error { + var err error + cfg.Online, err = flags.GetBool(flagOnline) + if err != nil { + return errors.Trace(err) + } + return cfg.Config.ParseFromFlags(flags) +} + +// RunRestore starts a restore task inside the current goroutine. +func RunRestore(c context.Context, cmdName string, cfg *RestoreConfig) error { + ctx, cancel := context.WithCancel(c) + defer cancel() + + mgr, err := newMgr(ctx, cfg.PD) + if err != nil { + return err + } + defer mgr.Close() + + client, err := restore.NewRestoreClient(ctx, mgr.GetPDClient(), mgr.GetTiKV()) + if err != nil { + return err + } + defer client.Close() + + client.SetRateLimit(cfg.RateLimit) + client.SetConcurrency(uint(cfg.Concurrency)) + if cfg.Online { + client.EnableOnline() + } + + defer summary.Summary(cmdName) + + u, _, backupMeta, err := ReadBackupMeta(ctx, &cfg.Config) + if err != nil { + return err + } + if err = client.InitBackupMeta(backupMeta, u); err != nil { + return err + } + + files, tables, err := filterRestoreFiles(client, cfg) + if err != nil { + return err + } + if len(files) == 0 { + return errors.New("all files are filtered out from the backup archive, nothing to restore") + } + summary.CollectInt("restore files", len(files)) + + var newTS uint64 + if client.IsIncremental() { + newTS, err = client.GetTS(ctx) + if err != nil { + return err + } + } + rewriteRules, newTables, err := client.CreateTables(mgr.GetDomain(), tables, newTS) + if err != nil { + return err + } + + ranges, err := restore.ValidateFileRanges(files, rewriteRules) + if err != nil { + return err + } + summary.CollectInt("restore ranges", len(ranges)) + + // Redirect to log if there is no log file to avoid unreadable output. + updateCh := utils.StartProgress( + ctx, + cmdName, + // Split/Scatter + Download/Ingest + int64(len(ranges)+len(files)), + !cfg.LogProgress) + + err = restore.SplitRanges(ctx, client, ranges, rewriteRules, updateCh) + if err != nil { + log.Error("split regions failed", zap.Error(err)) + return err + } + + if !client.IsIncremental() { + if err = client.ResetTS(cfg.PD); err != nil { + log.Error("reset pd TS failed", zap.Error(err)) + return err + } + } + + removedSchedulers, err := restorePreWork(ctx, client, mgr) + if err != nil { + return err + } + err = client.RestoreAll(rewriteRules, updateCh) + // always run the post-work even on error, so we don't stuck in the import mode or emptied schedulers + postErr := restorePostWork(ctx, client, mgr, removedSchedulers) + + if err != nil { + return err + } + if postErr != nil { + return postErr + } + + // Restore has finished. + close(updateCh) + + // Checksum + updateCh = utils.StartProgress( + ctx, "Checksum", int64(len(newTables)), !cfg.LogProgress) + err = client.ValidateChecksum( + ctx, mgr.GetTiKV().GetClient(), tables, newTables, updateCh) + if err != nil { + return err + } + close(updateCh) + + return nil +} + +func filterRestoreFiles( + client *restore.Client, + cfg *RestoreConfig, +) (files []*backup.File, tables []*utils.Table, err error) { + tableFilter, err := filter.New(cfg.CaseSensitive, &cfg.Filter) + if err != nil { + return nil, nil, err + } + + for _, db := range client.GetDatabases() { + createdDatabase := false + for _, table := range db.Tables { + if !tableFilter.Match(&filter.Table{Schema: db.Schema.Name.O, Name: table.Schema.Name.O}) { + continue + } + + if !createdDatabase { + if err = client.CreateDatabase(db.Schema); err != nil { + return nil, nil, err + } + createdDatabase = true + } + + files = append(files, table.Files...) + tables = append(tables, table) + } + } + + return +} + +// restorePreWork executes some prepare work before restore +func restorePreWork(ctx context.Context, client *restore.Client, mgr *conn.Mgr) ([]string, error) { + if client.IsOnline() { + return nil, nil + } + + if err := client.SwitchToImportMode(ctx); err != nil { + return nil, err + } + + existSchedulers, err := mgr.ListSchedulers(ctx) + if err != nil { + return nil, errors.Trace(err) + } + needRemoveSchedulers := make([]string, 0, len(existSchedulers)) + for _, s := range existSchedulers { + if _, ok := schedulers[s]; ok { + needRemoveSchedulers = append(needRemoveSchedulers, s) + } + } + return removePDLeaderScheduler(ctx, mgr, needRemoveSchedulers) +} + +func removePDLeaderScheduler(ctx context.Context, mgr *conn.Mgr, existSchedulers []string) ([]string, error) { + removedSchedulers := make([]string, 0, len(existSchedulers)) + for _, scheduler := range existSchedulers { + err := mgr.RemoveScheduler(ctx, scheduler) + if err != nil { + return nil, err + } + removedSchedulers = append(removedSchedulers, scheduler) + } + return removedSchedulers, nil +} + +// restorePostWork executes some post work after restore +func restorePostWork(ctx context.Context, client *restore.Client, mgr *conn.Mgr, removedSchedulers []string) error { + if client.IsOnline() { + return nil + } + if err := client.SwitchToNormalMode(ctx); err != nil { + return err + } + return addPDLeaderScheduler(ctx, mgr, removedSchedulers) +} + +func addPDLeaderScheduler(ctx context.Context, mgr *conn.Mgr, removedSchedulers []string) error { + for _, scheduler := range removedSchedulers { + err := mgr.AddScheduler(ctx, scheduler) + if err != nil { + return err + } + } + return nil +}