Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
ignore exists table when restore tables (#384)
Browse files Browse the repository at this point in the history
* ignore exists table when restore tables

* record backupts and size to summary log

* backup: filter schemas without files in incremental backup

* address comment
  • Loading branch information
3pointer authored Jun 29, 2020
1 parent 980627a commit 1a63149
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 20 deletions.
30 changes: 30 additions & 0 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ func (bc *Client) SaveBackupMeta(ctx context.Context, ddlJobs []*model.Job) erro
if err != nil {
return errors.Trace(err)
}

bc.backupMeta.Ddls = ddlJobsData
backupMetaData, err := proto.Marshal(&bc.backupMeta)
if err != nil {
Expand Down Expand Up @@ -943,3 +944,32 @@ func (bc *Client) CopyMetaFrom(backupSchemas *Schemas) {
}
bc.backupMeta.Schemas = schemas
}

// FilterSchema filter schema that doesn't have backup files
// this is useful during incremental backup, no files in backup means no files to restore
// so we can skip some DDL in restore to speed up restoration.
func (bc *Client) FilterSchema() error {
dbs, err := utils.LoadBackupTables(&bc.backupMeta)
if err != nil {
return err
}
schemas := make([]*kvproto.Schema, 0, len(bc.backupMeta.Schemas))
for _, schema := range bc.backupMeta.Schemas {
dbInfo := &model.DBInfo{}
err := json.Unmarshal(schema.Db, dbInfo)
if err != nil {
return err
}
tblInfo := &model.TableInfo{}
err = json.Unmarshal(schema.Table, tblInfo)
if err != nil {
return err
}
tbl := dbs[dbInfo.Name.String()].GetTable(tblInfo.Name.String())
if len(tbl.Files) > 0 {
schemas = append(schemas, schema)
}
}
bc.backupMeta.Schemas = schemas
return nil
}
5 changes: 4 additions & 1 deletion pkg/gluetikv/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/pingcap/tidb/store/tikv"

"github.com/pingcap/br/pkg/glue"
"github.com/pingcap/br/pkg/summary"
"github.com/pingcap/br/pkg/utils"
)

Expand Down Expand Up @@ -51,7 +52,9 @@ func (Glue) StartProgress(ctx context.Context, cmdName string, total int64, redi
}

// Record implements glue.Glue.
func (Glue) Record(string, uint64) {}
func (Glue) Record(name string, val uint64) {
summary.CollectUint(name, val)
}

type progress struct {
ch chan<- struct{}
Expand Down
13 changes: 13 additions & 0 deletions pkg/summary/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type LogCollector interface {

CollectInt(name string, t int)

CollectUInt(name string, t uint64)

SetSuccessStatus(success bool)

Summary(name string)
Expand Down Expand Up @@ -73,6 +75,7 @@ type logCollector struct {
failureReasons map[string]error
durations map[string]time.Duration
ints map[string]int
uints map[string]uint64
successStatus bool

log logFunc
Expand All @@ -87,6 +90,7 @@ func newLogCollector(log logFunc) LogCollector {
failureReasons: make(map[string]error),
durations: make(map[string]time.Duration),
ints: make(map[string]int),
uints: make(map[string]uint64),
log: log,
}
}
Expand Down Expand Up @@ -131,6 +135,12 @@ func (tc *logCollector) CollectInt(name string, t int) {
tc.ints[name] += t
}

func (tc *logCollector) CollectUInt(name string, t uint64) {
tc.mu.Lock()
defer tc.mu.Unlock()
tc.uints[name] += t
}

func (tc *logCollector) SetSuccessStatus(success bool) {
tc.mu.Lock()
defer tc.mu.Unlock()
Expand Down Expand Up @@ -164,6 +174,9 @@ func (tc *logCollector) Summary(name string) {
for key, val := range tc.ints {
logFields = append(logFields, zap.Int(key, val))
}
for key, val := range tc.uints {
logFields = append(logFields, zap.Uint64(key, val))
}

if len(tc.failureReasons) != 0 || !tc.successStatus {
for unitName, reason := range tc.failureReasons {
Expand Down
5 changes: 5 additions & 0 deletions pkg/summary/summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ func CollectInt(name string, t int) {
collector.CollectInt(name, t)
}

// CollectUint collects log uint64 field.
func CollectUint(name string, t uint64) {
collector.CollectUInt(name, t)
}

// SetSuccessStatus sets final success status.
func SetSuccessStatus(success bool) {
collector.SetSuccessStatus(success)
Expand Down
44 changes: 25 additions & 19 deletions pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
}
g.Record("BackupTS", backupTS)

isIncrementalBackup := cfg.LastBackupTS > 0

ranges, backupSchemas, err := backup.BuildBackupRangeAndSchema(
mgr.GetDomain(), mgr.GetTiKV(), cfg.TableFilter, backupTS)
if err != nil {
Expand All @@ -148,7 +150,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
}

ddlJobs := make([]*model.Job, 0)
if cfg.LastBackupTS > 0 {
if isIncrementalBackup {
if backupTS <= cfg.LastBackupTS {
log.Error("LastBackupTS is larger or equal to current TS")
return errors.New("LastBackupTS is larger or equal to current TS")
Expand Down Expand Up @@ -197,7 +199,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
updateCh.Close()

// Checksum from server, and then fulfill the backup metadata.
if cfg.Checksum {
if cfg.Checksum && !isIncrementalBackup {
backupSchemasConcurrency := utils.MinInt(backup.DefaultSchemaConcurrency, backupSchemas.Len())
updateCh = g.StartProgress(
ctx, "Checksum", int64(backupSchemas.Len()), !cfg.LogProgress)
Expand All @@ -210,17 +212,26 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
// Checksum has finished
updateCh.Close()
// collect file information.
err = checkChecksums(client, cfg)
err = checkChecksums(client)
if err != nil {
return err
}
} else {
// When user specified not to calculate checksum, don't calculate checksum.
// Just... copy schemas from origin.
log.Info("Skip fast checksum because user requirement.")
client.CopyMetaFrom(backupSchemas)
// Anyway, let's collect file info for summary.
client.CollectFileInfo()
if isIncrementalBackup {
// Since we don't support checksum for incremental data, fast checksum should be skipped.
log.Info("Skip fast checksum in incremental backup")
err = client.FilterSchema()
if err != nil {
return err
}
} else {
// When user specified not to calculate checksum, don't calculate checksum.
log.Info("Skip fast checksum because user requirement.")
}
}

err = client.SaveBackupMeta(ctx, ddlJobs)
Expand All @@ -237,25 +248,20 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig

// checkChecksums checks the checksum of the client, once failed,
// returning a error with message: "mismatched checksum".
func checkChecksums(client *backup.Client, cfg *BackupConfig) error {
func checkChecksums(client *backup.Client) error {
checksums, err := client.CollectChecksums()
if err != nil {
return err
}
if cfg.LastBackupTS == 0 {
var matches bool
matches, err = client.ChecksumMatches(checksums)
if err != nil {
return err
}
if !matches {
log.Error("backup FastChecksum mismatch!")
return errors.New("mismatched checksum")
}
return nil
var matches bool
matches, err = client.ChecksumMatches(checksums)
if err != nil {
return err
}
if !matches {
log.Error("backup FastChecksum mismatch!")
return errors.New("mismatched checksum")
}
// Since we don't support checksum for incremental data, fast checksum should be skipped.
log.Info("Skip fast checksum in incremental backup")
return nil
}

Expand Down

0 comments on commit 1a63149

Please sign in to comment.