Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
*: extracts runBackup/runRestore in cmd into pkg/task
Browse files Browse the repository at this point in the history
Defines a "Config" structure to store the parsed flags.

Use the "black-white-list" structure to define what tables/databases to
backup/restore.
  • Loading branch information
kennytm committed Feb 8, 2020
1 parent 6b65080 commit 86b189f
Show file tree
Hide file tree
Showing 18 changed files with 812 additions and 787 deletions.
203 changes: 11 additions & 192 deletions cmd/backup.go
Original file line number Diff line number Diff line change
@@ -1,176 +1,21 @@
package cmd

import (
"context"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/session"
"github.com/spf13/cobra"
"github.com/spf13/pflag"

"github.com/pingcap/br/pkg/backup"
"github.com/pingcap/br/pkg/storage"
"github.com/pingcap/br/pkg/summary"
"github.com/pingcap/br/pkg/task"
"github.com/pingcap/br/pkg/utils"
)

const (
flagBackupTimeago = "timeago"
flagBackupRateLimit = "ratelimit"
flagBackupRateLimitUnit = "ratelimit-unit"
flagBackupConcurrency = "concurrency"
flagBackupChecksum = "checksum"
flagLastBackupTS = "lastbackupts"
)

func defineBackupFlags(flagSet *pflag.FlagSet) {
flagSet.StringP(
flagBackupTimeago, "", "",
"The history version of the backup task, e.g. 1m, 1h. Do not exceed GCSafePoint")
flagSet.Uint64P(
flagBackupRateLimit, "", 0, "The rate limit of the backup task, MB/s per node")
flagSet.Uint32P(
flagBackupConcurrency, "", 4, "The size of thread pool on each node that execute the backup task")
flagSet.BoolP(flagBackupChecksum, "", true,
"Run checksum after backup")
flagSet.Uint64P(flagLastBackupTS, "", 0, "the last time backup ts")
_ = flagSet.MarkHidden(flagLastBackupTS)

// Test only flag.
flagSet.Uint64P(
flagBackupRateLimitUnit, "", utils.MB, "The unit of rate limit of the backup task")
_ = flagSet.MarkHidden(flagBackupRateLimitUnit)
}

func runBackup(flagSet *pflag.FlagSet, cmdName, db, table string) error {
ctx, cancel := context.WithCancel(defaultContext)
defer cancel()

mgr, err := GetDefaultMgr()
if err != nil {
return err
}
defer mgr.Close()

timeago, err := flagSet.GetString(flagBackupTimeago)
if err != nil {
return err
}

ratelimit, err := flagSet.GetUint64(flagBackupRateLimit)
if err != nil {
return err
}
ratelimitUnit, err := flagSet.GetUint64(flagBackupRateLimitUnit)
if err != nil {
return err
}
ratelimit *= ratelimitUnit

concurrency, err := flagSet.GetUint32(flagBackupConcurrency)
if err != nil {
return err
}
if concurrency == 0 {
err = errors.New("at least one thread required")
func runBackupCommand(command *cobra.Command, cmdName string) error {
cfg := task.BackupConfig{Config: task.Config{LogProgress: HasLogFile()}}
if err := cfg.ParseFromFlags(command.Flags()); err != nil {
return err
}

checksum, err := flagSet.GetBool(flagBackupChecksum)
if err != nil {
return err
}

lastBackupTS, err := flagSet.GetUint64(flagLastBackupTS)
if err != nil {
return nil
}

u, err := storage.ParseBackendFromFlags(flagSet, FlagStorage)
if err != nil {
return err
}

client, err := backup.NewBackupClient(ctx, mgr)
if err != nil {
return nil
}

err = client.SetStorage(ctx, u)
if err != nil {
return err
}

backupTS, err := client.GetTS(ctx, timeago)
if err != nil {
return err
}

defer summary.Summary(cmdName)

ranges, backupSchemas, err := backup.BuildBackupRangeAndSchema(
mgr.GetDomain(), mgr.GetTiKV(), backupTS, db, table)
if err != nil {
return err
}

// The number of regions need to backup
approximateRegions := 0
for _, r := range ranges {
var regionCount int
regionCount, err = mgr.GetRegionCount(ctx, r.StartKey, r.EndKey)
if err != nil {
return err
}
approximateRegions += regionCount
}

summary.CollectInt("backup total regions", approximateRegions)
// Backup
// Redirect to log if there is no log file to avoid unreadable output.
updateCh := utils.StartProgress(
ctx, cmdName, int64(approximateRegions), !HasLogFile())
err = client.BackupRanges(
ctx, ranges, lastBackupTS, backupTS, ratelimit, concurrency, updateCh)
if err != nil {
return err
}
// Backup has finished
close(updateCh)

// Checksum
backupSchemasConcurrency := backup.DefaultSchemaConcurrency
if backupSchemas.Len() < backupSchemasConcurrency {
backupSchemasConcurrency = backupSchemas.Len()
}
updateCh = utils.StartProgress(
ctx, "Checksum", int64(backupSchemas.Len()), !HasLogFile())
backupSchemas.SetSkipChecksum(!checksum)
backupSchemas.Start(
ctx, mgr.GetTiKV(), backupTS, uint(backupSchemasConcurrency), updateCh)

err = client.CompleteMeta(backupSchemas)
if err != nil {
return err
}

valid, err := client.FastChecksum()
if err != nil {
return err
}
if !valid {
log.Error("backup FastChecksum failed!")
}
// Checksum has finished
close(updateCh)

err = client.SaveBackupMeta(ctx)
if err != nil {
return err
}
return nil
return task.RunBackup(GetDefaultContext(), cmdName, &cfg)
}

// NewBackupCommand return a full backup subcommand.
Expand Down Expand Up @@ -200,7 +45,7 @@ func NewBackupCommand() *cobra.Command {
newTableBackupCommand(),
)

defineBackupFlags(command.PersistentFlags())
task.DefineBackupFlags(command.PersistentFlags())
return command
}

Expand All @@ -211,7 +56,7 @@ func newFullBackupCommand() *cobra.Command {
Short: "backup all database",
RunE: func(command *cobra.Command, _ []string) error {
// empty db/table means full backup.
return runBackup(command.Flags(), "Full backup", "", "")
return runBackupCommand(command, "Full backup")
},
}
return command
Expand All @@ -223,19 +68,10 @@ func newDbBackupCommand() *cobra.Command {
Use: "db",
Short: "backup a database",
RunE: func(command *cobra.Command, _ []string) error {
db, err := command.Flags().GetString(flagDatabase)
if err != nil {
return err
}
if len(db) == 0 {
return errors.Errorf("empty database name is not allowed")
}
return runBackup(command.Flags(), "Database backup", db, "")
return runBackupCommand(command, "Database backup")
},
}
command.Flags().StringP(flagDatabase, "", "", "backup a table in the specific db")
_ = command.MarkFlagRequired(flagDatabase)

task.DefineDatabaseFlags(command)
return command
}

Expand All @@ -245,26 +81,9 @@ func newTableBackupCommand() *cobra.Command {
Use: "table",
Short: "backup a table",
RunE: func(command *cobra.Command, _ []string) error {
db, err := command.Flags().GetString(flagDatabase)
if err != nil {
return err
}
if len(db) == 0 {
return errors.Errorf("empty database name is not allowed")
}
table, err := command.Flags().GetString(flagTable)
if err != nil {
return err
}
if len(table) == 0 {
return errors.Errorf("empty table name is not allowed")
}
return runBackup(command.Flags(), "Table backup", db, table)
return runBackupCommand(command, "Table backup")
},
}
command.Flags().StringP(flagDatabase, "", "", "backup a table in the specific db")
command.Flags().StringP(flagTable, "t", "", "backup the specific table")
_ = command.MarkFlagRequired(flagDatabase)
_ = command.MarkFlagRequired(flagTable)
task.DefineTableFlags(command)
return command
}
62 changes: 2 additions & 60 deletions cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,47 +2,28 @@ package cmd

import (
"context"
"fmt"
"net/http"
"net/http/pprof"
"sync"
"sync/atomic"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/util/logutil"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"go.uber.org/zap"

"github.com/pingcap/br/pkg/conn"
"github.com/pingcap/br/pkg/storage"
"github.com/pingcap/br/pkg/task"
"github.com/pingcap/br/pkg/utils"
)

var (
initOnce = sync.Once{}
defaultContext context.Context
pdAddress string
hasLogFile uint64

connOnce = sync.Once{}
defaultMgr *conn.Mgr
)

const (
// FlagPD is the name of url flag.
FlagPD = "pd"
// FlagCA is the name of CA flag.
FlagCA = "ca"
// FlagCert is the name of cert flag.
FlagCert = "cert"
// FlagKey is the name of key flag.
FlagKey = "key"
// FlagStorage is the name of storage flag.
FlagStorage = "storage"
// FlagLogLevel is the name of log-level flag.
FlagLogLevel = "log-level"
// FlagLogFile is the name of log-file flag.
Expand All @@ -52,9 +33,6 @@ const (
// FlagSlowLogFile is the name of slow-log-file flag.
FlagSlowLogFile = "slow-log-file"

flagDatabase = "db"
flagTable = "table"

flagVersion = "version"
flagVersionShort = "V"
)
Expand All @@ -65,19 +43,13 @@ func AddFlags(cmd *cobra.Command) {
cmd.Flags().BoolP(flagVersion, flagVersionShort, false, "Display version information about BR")
cmd.SetVersionTemplate("{{printf \"%s\" .Version}}\n")

cmd.PersistentFlags().StringP(FlagPD, "u", "127.0.0.1:2379", "PD address")
cmd.PersistentFlags().String(FlagCA, "", "CA certificate path for TLS connection")
cmd.PersistentFlags().String(FlagCert, "", "Certificate path for TLS connection")
cmd.PersistentFlags().String(FlagKey, "", "Private key path for TLS connection")
cmd.PersistentFlags().StringP(FlagStorage, "s", "",
`specify the url where backup storage, eg, "local:///path/to/save"`)
cmd.PersistentFlags().StringP(FlagLogLevel, "L", "info",
"Set the log level")
cmd.PersistentFlags().String(FlagLogFile, "",
"Set the log file path. If not set, logs will output to stdout")
cmd.PersistentFlags().String(FlagStatusAddr, "",
"Set the HTTP listening address for the status report service. Set to empty string to disable")
storage.DefineFlags(cmd.PersistentFlags())
task.DefineCommonFlags(cmd.PersistentFlags())

cmd.PersistentFlags().StringP(FlagSlowLogFile, "", "",
"Set the slow log file path. If not set, discard slow logs")
Expand Down Expand Up @@ -140,12 +112,6 @@ func Init(cmd *cobra.Command) (err error) {
}
}
}()
// Set the PD server address.
pdAddress, e = cmd.Flags().GetString(FlagPD)
if e != nil {
err = e
return
}
})
return err
}
Expand All @@ -155,30 +121,6 @@ func HasLogFile() bool {
return atomic.LoadUint64(&hasLogFile) != uint64(0)
}

// GetDefaultMgr returns the default mgr for command line usage.
func GetDefaultMgr() (*conn.Mgr, error) {
if pdAddress == "" {
return nil, errors.New("pd address can not be empty")
}

// Lazy initialize and defaultMgr
var err error
connOnce.Do(func() {
var storage kv.Storage
storage, err = tikv.Driver{}.Open(
// Disable GC because TiDB enables GC already.
fmt.Sprintf("tikv://%s?disableGC=true", pdAddress))
if err != nil {
return
}
defaultMgr, err = conn.NewMgr(defaultContext, pdAddress, storage.(tikv.Storage))
})
if err != nil {
return nil, err
}
return defaultMgr, nil
}

// SetDefaultContext sets the default context for command line usage.
func SetDefaultContext(ctx context.Context) {
defaultContext = ctx
Expand Down
Loading

0 comments on commit 86b189f

Please sign in to comment.