Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

*: extracts runBackup/runRestore in cmd into pkg/task #156

Merged
merged 2 commits into from
Feb 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 11 additions & 192 deletions cmd/backup.go
Original file line number Diff line number Diff line change
@@ -1,176 +1,21 @@
package cmd

import (
"context"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/session"
"github.com/spf13/cobra"
"github.com/spf13/pflag"

"github.com/pingcap/br/pkg/backup"
"github.com/pingcap/br/pkg/storage"
"github.com/pingcap/br/pkg/summary"
"github.com/pingcap/br/pkg/task"
"github.com/pingcap/br/pkg/utils"
)

const (
flagBackupTimeago = "timeago"
flagBackupRateLimit = "ratelimit"
flagBackupRateLimitUnit = "ratelimit-unit"
flagBackupConcurrency = "concurrency"
flagBackupChecksum = "checksum"
flagLastBackupTS = "lastbackupts"
)

func defineBackupFlags(flagSet *pflag.FlagSet) {
flagSet.StringP(
flagBackupTimeago, "", "",
"The history version of the backup task, e.g. 1m, 1h. Do not exceed GCSafePoint")
flagSet.Uint64P(
flagBackupRateLimit, "", 0, "The rate limit of the backup task, MB/s per node")
flagSet.Uint32P(
flagBackupConcurrency, "", 4, "The size of thread pool on each node that execute the backup task")
flagSet.BoolP(flagBackupChecksum, "", true,
"Run checksum after backup")
flagSet.Uint64P(flagLastBackupTS, "", 0, "the last time backup ts")
_ = flagSet.MarkHidden(flagLastBackupTS)

// Test only flag.
flagSet.Uint64P(
flagBackupRateLimitUnit, "", utils.MB, "The unit of rate limit of the backup task")
_ = flagSet.MarkHidden(flagBackupRateLimitUnit)
}

func runBackup(flagSet *pflag.FlagSet, cmdName, db, table string) error {
ctx, cancel := context.WithCancel(defaultContext)
defer cancel()

mgr, err := GetDefaultMgr()
if err != nil {
return err
}
defer mgr.Close()

timeago, err := flagSet.GetString(flagBackupTimeago)
if err != nil {
return err
}

ratelimit, err := flagSet.GetUint64(flagBackupRateLimit)
if err != nil {
return err
}
ratelimitUnit, err := flagSet.GetUint64(flagBackupRateLimitUnit)
if err != nil {
return err
}
ratelimit *= ratelimitUnit

concurrency, err := flagSet.GetUint32(flagBackupConcurrency)
if err != nil {
return err
}
if concurrency == 0 {
err = errors.New("at least one thread required")
func runBackupCommand(command *cobra.Command, cmdName string) error {
cfg := task.BackupConfig{Config: task.Config{LogProgress: HasLogFile()}}
if err := cfg.ParseFromFlags(command.Flags()); err != nil {
return err
}

checksum, err := flagSet.GetBool(flagBackupChecksum)
if err != nil {
return err
}

lastBackupTS, err := flagSet.GetUint64(flagLastBackupTS)
if err != nil {
return nil
}

u, err := storage.ParseBackendFromFlags(flagSet, FlagStorage)
if err != nil {
return err
}

client, err := backup.NewBackupClient(ctx, mgr)
if err != nil {
return nil
}

err = client.SetStorage(ctx, u)
if err != nil {
return err
}

backupTS, err := client.GetTS(ctx, timeago)
if err != nil {
return err
}

defer summary.Summary(cmdName)

ranges, backupSchemas, err := backup.BuildBackupRangeAndSchema(
mgr.GetDomain(), mgr.GetTiKV(), backupTS, db, table)
if err != nil {
return err
}

// The number of regions need to backup
approximateRegions := 0
for _, r := range ranges {
var regionCount int
regionCount, err = mgr.GetRegionCount(ctx, r.StartKey, r.EndKey)
if err != nil {
return err
}
approximateRegions += regionCount
}

summary.CollectInt("backup total regions", approximateRegions)
// Backup
// Redirect to log if there is no log file to avoid unreadable output.
updateCh := utils.StartProgress(
ctx, cmdName, int64(approximateRegions), !HasLogFile())
err = client.BackupRanges(
ctx, ranges, lastBackupTS, backupTS, ratelimit, concurrency, updateCh)
if err != nil {
return err
}
// Backup has finished
close(updateCh)

// Checksum
backupSchemasConcurrency := backup.DefaultSchemaConcurrency
if backupSchemas.Len() < backupSchemasConcurrency {
backupSchemasConcurrency = backupSchemas.Len()
}
updateCh = utils.StartProgress(
ctx, "Checksum", int64(backupSchemas.Len()), !HasLogFile())
backupSchemas.SetSkipChecksum(!checksum)
backupSchemas.Start(
ctx, mgr.GetTiKV(), backupTS, uint(backupSchemasConcurrency), updateCh)

err = client.CompleteMeta(backupSchemas)
if err != nil {
return err
}

valid, err := client.FastChecksum()
if err != nil {
return err
}
if !valid {
log.Error("backup FastChecksum failed!")
}
// Checksum has finished
close(updateCh)

err = client.SaveBackupMeta(ctx)
if err != nil {
return err
}
return nil
return task.RunBackup(GetDefaultContext(), cmdName, &cfg)
}

// NewBackupCommand return a full backup subcommand.
Expand Down Expand Up @@ -200,7 +45,7 @@ func NewBackupCommand() *cobra.Command {
newTableBackupCommand(),
)

defineBackupFlags(command.PersistentFlags())
task.DefineBackupFlags(command.PersistentFlags())
return command
}

Expand All @@ -211,7 +56,7 @@ func newFullBackupCommand() *cobra.Command {
Short: "backup all database",
RunE: func(command *cobra.Command, _ []string) error {
// empty db/table means full backup.
return runBackup(command.Flags(), "Full backup", "", "")
return runBackupCommand(command, "Full backup")
},
}
return command
Expand All @@ -223,19 +68,10 @@ func newDbBackupCommand() *cobra.Command {
Use: "db",
Short: "backup a database",
RunE: func(command *cobra.Command, _ []string) error {
db, err := command.Flags().GetString(flagDatabase)
if err != nil {
return err
}
if len(db) == 0 {
return errors.Errorf("empty database name is not allowed")
}
return runBackup(command.Flags(), "Database backup", db, "")
return runBackupCommand(command, "Database backup")
},
}
command.Flags().StringP(flagDatabase, "", "", "backup a table in the specific db")
_ = command.MarkFlagRequired(flagDatabase)

task.DefineDatabaseFlags(command)
return command
}

Expand All @@ -245,26 +81,9 @@ func newTableBackupCommand() *cobra.Command {
Use: "table",
Short: "backup a table",
RunE: func(command *cobra.Command, _ []string) error {
db, err := command.Flags().GetString(flagDatabase)
if err != nil {
return err
}
if len(db) == 0 {
return errors.Errorf("empty database name is not allowed")
}
table, err := command.Flags().GetString(flagTable)
if err != nil {
return err
}
if len(table) == 0 {
return errors.Errorf("empty table name is not allowed")
}
return runBackup(command.Flags(), "Table backup", db, table)
return runBackupCommand(command, "Table backup")
},
}
command.Flags().StringP(flagDatabase, "", "", "backup a table in the specific db")
command.Flags().StringP(flagTable, "t", "", "backup the specific table")
_ = command.MarkFlagRequired(flagDatabase)
_ = command.MarkFlagRequired(flagTable)
task.DefineTableFlags(command)
return command
}
62 changes: 2 additions & 60 deletions cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,47 +2,28 @@ package cmd

import (
"context"
"fmt"
"net/http"
"net/http/pprof"
"sync"
"sync/atomic"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/util/logutil"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"go.uber.org/zap"

"github.com/pingcap/br/pkg/conn"
"github.com/pingcap/br/pkg/storage"
"github.com/pingcap/br/pkg/task"
"github.com/pingcap/br/pkg/utils"
)

var (
initOnce = sync.Once{}
defaultContext context.Context
pdAddress string
hasLogFile uint64

connOnce = sync.Once{}
defaultMgr *conn.Mgr
)

const (
// FlagPD is the name of url flag.
FlagPD = "pd"
// FlagCA is the name of CA flag.
FlagCA = "ca"
// FlagCert is the name of cert flag.
FlagCert = "cert"
// FlagKey is the name of key flag.
FlagKey = "key"
// FlagStorage is the name of storage flag.
FlagStorage = "storage"
// FlagLogLevel is the name of log-level flag.
FlagLogLevel = "log-level"
// FlagLogFile is the name of log-file flag.
Expand All @@ -52,9 +33,6 @@ const (
// FlagSlowLogFile is the name of slow-log-file flag.
FlagSlowLogFile = "slow-log-file"

flagDatabase = "db"
flagTable = "table"

flagVersion = "version"
flagVersionShort = "V"
)
Expand All @@ -65,19 +43,13 @@ func AddFlags(cmd *cobra.Command) {
cmd.Flags().BoolP(flagVersion, flagVersionShort, false, "Display version information about BR")
cmd.SetVersionTemplate("{{printf \"%s\" .Version}}\n")

cmd.PersistentFlags().StringP(FlagPD, "u", "127.0.0.1:2379", "PD address")
cmd.PersistentFlags().String(FlagCA, "", "CA certificate path for TLS connection")
cmd.PersistentFlags().String(FlagCert, "", "Certificate path for TLS connection")
cmd.PersistentFlags().String(FlagKey, "", "Private key path for TLS connection")
cmd.PersistentFlags().StringP(FlagStorage, "s", "",
`specify the url where backup storage, eg, "local:///path/to/save"`)
cmd.PersistentFlags().StringP(FlagLogLevel, "L", "info",
"Set the log level")
cmd.PersistentFlags().String(FlagLogFile, "",
"Set the log file path. If not set, logs will output to stdout")
cmd.PersistentFlags().String(FlagStatusAddr, "",
"Set the HTTP listening address for the status report service. Set to empty string to disable")
storage.DefineFlags(cmd.PersistentFlags())
task.DefineCommonFlags(cmd.PersistentFlags())

cmd.PersistentFlags().StringP(FlagSlowLogFile, "", "",
"Set the slow log file path. If not set, discard slow logs")
Expand Down Expand Up @@ -140,12 +112,6 @@ func Init(cmd *cobra.Command) (err error) {
}
}
}()
// Set the PD server address.
pdAddress, e = cmd.Flags().GetString(FlagPD)
if e != nil {
err = e
return
}
})
return err
}
Expand All @@ -155,30 +121,6 @@ func HasLogFile() bool {
return atomic.LoadUint64(&hasLogFile) != uint64(0)
}

// GetDefaultMgr returns the default mgr for command line usage.
func GetDefaultMgr() (*conn.Mgr, error) {
if pdAddress == "" {
return nil, errors.New("pd address can not be empty")
}

// Lazy initialize and defaultMgr
var err error
connOnce.Do(func() {
var storage kv.Storage
storage, err = tikv.Driver{}.Open(
// Disable GC because TiDB enables GC already.
fmt.Sprintf("tikv://%s?disableGC=true", pdAddress))
if err != nil {
return
}
defaultMgr, err = conn.NewMgr(defaultContext, pdAddress, storage.(tikv.Storage))
})
if err != nil {
return nil, err
}
return defaultMgr, nil
}

// SetDefaultContext sets the default context for command line usage.
func SetDefaultContext(ctx context.Context) {
defaultContext = ctx
Expand Down
Loading