diff --git a/cmd/validate.go b/cmd/validate.go index 8bca7e553..d358995a3 100644 --- a/cmd/validate.go +++ b/cmd/validate.go @@ -19,6 +19,7 @@ import ( "go.uber.org/zap" "github.com/pingcap/br/pkg/restore" + "github.com/pingcap/br/pkg/rtree" "github.com/pingcap/br/pkg/task" "github.com/pingcap/br/pkg/utils" ) @@ -166,15 +167,15 @@ func newBackupMetaCommand() *cobra.Command { tables = append(tables, db.Tables...) } // Check if the ranges of files overlapped - rangeTree := restore.NewRangeTree() + rangeTree := rtree.NewRangeTree() for _, file := range files { - if out := rangeTree.InsertRange(restore.Range{ + if out := rangeTree.InsertRange(rtree.Range{ StartKey: file.GetStartKey(), EndKey: file.GetEndKey(), }); out != nil { log.Error( "file ranges overlapped", - zap.Stringer("out", out.(*restore.Range)), + zap.Stringer("out", out), zap.Stringer("file", file), ) } diff --git a/pkg/backup/client.go b/pkg/backup/client.go index fb2960962..1b5b6b645 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tidb/util/ranger" "go.uber.org/zap" + "github.com/pingcap/br/pkg/rtree" "github.com/pingcap/br/pkg/storage" "github.com/pingcap/br/pkg/summary" "github.com/pingcap/br/pkg/utils" @@ -179,13 +180,13 @@ func BuildBackupRangeAndSchema( storage kv.Storage, tableFilter *filter.Filter, backupTS uint64, -) ([]Range, *Schemas, error) { +) ([]rtree.Range, *Schemas, error) { info, err := dom.GetSnapshotInfoSchema(backupTS) if err != nil { return nil, nil, errors.Trace(err) } - ranges := make([]Range, 0) + ranges := make([]rtree.Range, 0) backupSchemas := newBackupSchemas() for _, dbInfo := range info.AllSchemas() { // skip system databases @@ -233,7 +234,7 @@ func BuildBackupRangeAndSchema( return nil, nil, err } for _, r := range tableRanges { - ranges = append(ranges, Range{ + ranges = append(ranges, rtree.Range{ StartKey: r.StartKey, EndKey: r.EndKey, }) @@ -295,7 +296,7 @@ func GetBackupDDLJobs(dom *domain.Domain, lastBackupTS, backupTS uint64) ([]*mod // BackupRanges make a backup of the given key ranges. func (bc *Client) BackupRanges( ctx context.Context, - ranges []Range, + ranges []rtree.Range, req kvproto.BackupRequest, updateCh chan<- struct{}, ) error { @@ -389,12 +390,12 @@ func (bc *Client) BackupRange( push := newPushDown(ctx, bc.mgr, len(allStores)) - var results RangeTree + var results rtree.RangeTree results, err = push.pushBackup(req, allStores, updateCh) if err != nil { return err } - log.Info("finish backup push down", zap.Int("Ok", results.len())) + log.Info("finish backup push down", zap.Int("Ok", results.Len())) // Find and backup remaining ranges. // TODO: test fine grained backup. @@ -421,14 +422,14 @@ func (bc *Client) BackupRange( zap.Reflect("EndVersion", req.EndVersion)) } - results.tree.Ascend(func(i btree.Item) bool { - r := i.(*Range) + results.Ascend(func(i btree.Item) bool { + r := i.(*rtree.Range) bc.backupMeta.Files = append(bc.backupMeta.Files, r.Files...) return true }) // Check if there are duplicated files. - results.checkDupFiles() + rtree.CheckDupFiles(&results) return nil } @@ -466,13 +467,13 @@ func (bc *Client) fineGrainedBackup( backupTS uint64, rateLimit uint64, concurrency uint32, - rangeTree RangeTree, + rangeTree rtree.RangeTree, updateCh chan<- struct{}, ) error { bo := tikv.NewBackoffer(ctx, backupFineGrainedMaxBackoff) for { // Step1, check whether there is any incomplete range - incomplete := rangeTree.getIncompleteRange(startKey, endKey) + incomplete := rangeTree.GetIncompleteRange(startKey, endKey) if len(incomplete) == 0 { return nil } @@ -480,7 +481,7 @@ func (bc *Client) fineGrainedBackup( // Step2, retry backup on incomplete range respCh := make(chan *kvproto.BackupResponse, 4) errCh := make(chan error, 4) - retry := make(chan Range, 4) + retry := make(chan rtree.Range, 4) max := &struct { ms int @@ -539,7 +540,7 @@ func (bc *Client) fineGrainedBackup( zap.Binary("StartKey", resp.StartKey), zap.Binary("EndKey", resp.EndKey), ) - rangeTree.put(resp.StartKey, resp.EndKey, resp.Files) + rangeTree.Put(resp.StartKey, resp.EndKey, resp.Files) // Update progress updateCh <- struct{}{} @@ -625,7 +626,7 @@ func onBackupResponse( func (bc *Client) handleFineGrained( ctx context.Context, bo *tikv.Backoffer, - rg Range, + rg rtree.Range, lastBackupTS uint64, backupTS uint64, rateLimit uint64, diff --git a/pkg/backup/push.go b/pkg/backup/push.go index 23c4f01d4..803a8ec92 100644 --- a/pkg/backup/push.go +++ b/pkg/backup/push.go @@ -9,6 +9,8 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" "go.uber.org/zap" + + "github.com/pingcap/br/pkg/rtree" ) // pushDown warps a backup task. @@ -35,9 +37,9 @@ func (push *pushDown) pushBackup( req backup.BackupRequest, stores []*metapb.Store, updateCh chan<- struct{}, -) (RangeTree, error) { +) (rtree.RangeTree, error) { // Push down backup tasks to all tikv instances. - res := newRangeTree() + res := rtree.NewRangeTree() wg := new(sync.WaitGroup) for _, s := range stores { storeID := s.GetId() @@ -82,7 +84,7 @@ func (push *pushDown) pushBackup( } if resp.GetError() == nil { // None error means range has been backuped successfully. - res.put( + res.Put( resp.GetStartKey(), resp.GetEndKey(), resp.GetFiles()) // Update progress diff --git a/pkg/restore/client.go b/pkg/restore/client.go index a7e5c4d08..46cdcaa24 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -39,11 +39,10 @@ type Client struct { ctx context.Context cancel context.CancelFunc - pdClient pd.Client - fileImporter FileImporter - workerPool *utils.WorkerPool - tableWorkerPool *utils.WorkerPool - tlsConf *tls.Config + pdClient pd.Client + fileImporter FileImporter + workerPool *utils.WorkerPool + tlsConf *tls.Config databases map[string]*utils.Database ddlJobs []*model.Job @@ -70,12 +69,11 @@ func NewRestoreClient( } return &Client{ - ctx: ctx, - cancel: cancel, - pdClient: pdClient, - tableWorkerPool: utils.NewWorkerPool(128, "table"), - db: db, - tlsConf: tlsConf, + ctx: ctx, + cancel: cancel, + pdClient: pdClient, + db: db, + tlsConf: tlsConf, }, nil } diff --git a/pkg/restore/range.go b/pkg/restore/range.go index f3914539e..97e2469dc 100644 --- a/pkg/restore/range.go +++ b/pkg/restore/range.go @@ -1,45 +1,19 @@ package restore import ( - "bytes" - "fmt" - - "github.com/google/btree" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" "github.com/pingcap/tidb/tablecodec" "go.uber.org/zap" -) - -// Range represents a range of keys. -type Range struct { - StartKey []byte - EndKey []byte -} - -// String formats a range to a string -func (r *Range) String() string { - return fmt.Sprintf("[%x %x]", r.StartKey, r.EndKey) -} -// Less compares a range with a btree.Item -func (r *Range) Less(than btree.Item) bool { - t := than.(*Range) - return len(r.EndKey) != 0 && bytes.Compare(r.EndKey, t.StartKey) <= 0 -} - -// contains returns if a key is included in the range. -func (r *Range) contains(key []byte) bool { - start, end := r.StartKey, r.EndKey - return bytes.Compare(key, start) >= 0 && - (len(end) == 0 || bytes.Compare(key, end) < 0) -} + "github.com/pingcap/br/pkg/rtree" +) // sortRanges checks if the range overlapped and sort them -func sortRanges(ranges []Range, rewriteRules *RewriteRules) ([]Range, error) { - rangeTree := NewRangeTree() +func sortRanges(ranges []rtree.Range, rewriteRules *RewriteRules) ([]rtree.Range, error) { + rangeTree := rtree.NewRangeTree() for _, rg := range ranges { if rewriteRules != nil { startID := tablecodec.DecodeTableID(rg.StartKey) @@ -77,64 +51,10 @@ func sortRanges(ranges []Range, rewriteRules *RewriteRules) ([]Range, error) { return nil, errors.Errorf("ranges overlapped: %s, %s", out, rg) } } - sortedRanges := make([]Range, 0, len(ranges)) - rangeTree.Ascend(func(rg *Range) bool { - if rg == nil { - return false - } - sortedRanges = append(sortedRanges, *rg) - return true - }) + sortedRanges := rangeTree.GetSortedRanges() return sortedRanges, nil } -// RangeTree stores the ranges in an orderly manner. -// All the ranges it stored do not overlap. -type RangeTree struct { - tree *btree.BTree -} - -// NewRangeTree returns a new RangeTree. -func NewRangeTree() *RangeTree { - return &RangeTree{tree: btree.New(32)} -} - -// Find returns nil or a range in the range tree -func (rt *RangeTree) Find(key []byte) *Range { - var ret *Range - r := &Range{ - StartKey: key, - } - rt.tree.DescendLessOrEqual(r, func(i btree.Item) bool { - ret = i.(*Range) - return false - }) - if ret == nil || !ret.contains(key) { - return nil - } - return ret -} - -// InsertRange inserts ranges into the range tree. -// it returns true if all ranges inserted successfully. -// it returns false if there are some overlapped ranges. -func (rt *RangeTree) InsertRange(rg Range) btree.Item { - return rt.tree.ReplaceOrInsert(&rg) -} - -// RangeIterator allows callers of Ascend to iterate in-order over portions of -// the tree. When this function returns false, iteration will stop and the -// associated Ascend function will immediately return. -type RangeIterator func(rg *Range) bool - -// Ascend calls the iterator for every value in the tree within [first, last], -// until the iterator returns false. -func (rt *RangeTree) Ascend(iterator RangeIterator) { - rt.tree.Ascend(func(i btree.Item) bool { - return iterator(i.(*Range)) - }) -} - // RegionInfo includes a region and the leader of the region. type RegionInfo struct { Region *metapb.Region diff --git a/pkg/restore/range_test.go b/pkg/restore/range_test.go index a9edc5b82..371e79ebb 100644 --- a/pkg/restore/range_test.go +++ b/pkg/restore/range_test.go @@ -6,6 +6,8 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/tidb/tablecodec" + + "github.com/pingcap/br/pkg/rtree" ) type testRangeSuite struct{} @@ -21,8 +23,8 @@ var RangeEquals Checker = &rangeEquals{ } func (checker *rangeEquals) Check(params []interface{}, names []string) (result bool, error string) { - obtained := params[0].([]Range) - expected := params[1].([]Range) + obtained := params[0].([]rtree.Range) + expected := params[1].([]rtree.Range) if len(obtained) != len(expected) { return false, "" } @@ -44,20 +46,20 @@ func (s *testRangeSuite) TestSortRange(c *C) { Table: make([]*import_sstpb.RewriteRule, 0), Data: dataRules, } - ranges1 := []Range{ - {append(tablecodec.GenTableRecordPrefix(1), []byte("aaa")...), - append(tablecodec.GenTableRecordPrefix(1), []byte("bbb")...)}, + ranges1 := []rtree.Range{ + {StartKey: append(tablecodec.GenTableRecordPrefix(1), []byte("aaa")...), + EndKey: append(tablecodec.GenTableRecordPrefix(1), []byte("bbb")...), Files: nil}, } rs1, err := sortRanges(ranges1, rewriteRules) c.Assert(err, IsNil, Commentf("sort range1 failed: %v", err)) - c.Assert(rs1, RangeEquals, []Range{ - {append(tablecodec.GenTableRecordPrefix(4), []byte("aaa")...), - append(tablecodec.GenTableRecordPrefix(4), []byte("bbb")...)}, + c.Assert(rs1, RangeEquals, []rtree.Range{ + {StartKey: append(tablecodec.GenTableRecordPrefix(4), []byte("aaa")...), + EndKey: append(tablecodec.GenTableRecordPrefix(4), []byte("bbb")...), Files: nil}, }) - ranges2 := []Range{ - {append(tablecodec.GenTableRecordPrefix(1), []byte("aaa")...), - append(tablecodec.GenTableRecordPrefix(2), []byte("bbb")...)}, + ranges2 := []rtree.Range{ + {StartKey: append(tablecodec.GenTableRecordPrefix(1), []byte("aaa")...), + EndKey: append(tablecodec.GenTableRecordPrefix(2), []byte("bbb")...), Files: nil}, } _, err = sortRanges(ranges2, rewriteRules) c.Assert(err, ErrorMatches, ".*table id does not match.*") @@ -66,10 +68,10 @@ func (s *testRangeSuite) TestSortRange(c *C) { rewriteRules1 := initRewriteRules() rs3, err := sortRanges(ranges3, rewriteRules1) c.Assert(err, IsNil, Commentf("sort range1 failed: %v", err)) - c.Assert(rs3, RangeEquals, []Range{ - {[]byte("bbd"), []byte("bbf")}, - {[]byte("bbf"), []byte("bbj")}, - {[]byte("xxa"), []byte("xxe")}, - {[]byte("xxe"), []byte("xxz")}, + c.Assert(rs3, RangeEquals, []rtree.Range{ + {StartKey: []byte("bbd"), EndKey: []byte("bbf"), Files: nil}, + {StartKey: []byte("bbf"), EndKey: []byte("bbj"), Files: nil}, + {StartKey: []byte("xxa"), EndKey: []byte("xxe"), Files: nil}, + {StartKey: []byte("xxe"), EndKey: []byte("xxz"), Files: nil}, }) } diff --git a/pkg/restore/split.go b/pkg/restore/split.go index 64bf83e8c..4642ab853 100644 --- a/pkg/restore/split.go +++ b/pkg/restore/split.go @@ -13,6 +13,8 @@ import ( "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/codec" "go.uber.org/zap" + + "github.com/pingcap/br/pkg/rtree" ) // Constants for split retry machinery. @@ -54,7 +56,7 @@ type OnSplitFunc func(key [][]byte) // note: all ranges and rewrite rules must have raw key. func (rs *RegionSplitter) Split( ctx context.Context, - ranges []Range, + ranges []rtree.Range, rewriteRules *RewriteRules, onSplit OnSplitFunc, ) error { @@ -252,7 +254,7 @@ func (rs *RegionSplitter) splitAndScatterRegions( // getSplitKeys checks if the regions should be split by the new prefix of the rewrites rule and the end key of // the ranges, groups the split keys by region id -func getSplitKeys(rewriteRules *RewriteRules, ranges []Range, regions []*RegionInfo) map[uint64][][]byte { +func getSplitKeys(rewriteRules *RewriteRules, ranges []rtree.Range, regions []*RegionInfo) map[uint64][][]byte { splitKeyMap := make(map[uint64][][]byte) checkKeys := make([][]byte, 0) for _, rule := range rewriteRules.Table { diff --git a/pkg/restore/split_test.go b/pkg/restore/split_test.go index a0dbc3678..61896b114 100644 --- a/pkg/restore/split_test.go +++ b/pkg/restore/split_test.go @@ -13,6 +13,8 @@ import ( "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/schedule/placement" "github.com/pingcap/tidb/util/codec" + + "github.com/pingcap/br/pkg/rtree" ) type testClient struct { @@ -238,21 +240,21 @@ func initTestClient() *testClient { } // range: [aaa, aae), [aae, aaz), [ccd, ccf), [ccf, ccj) -func initRanges() []Range { - var ranges [4]Range - ranges[0] = Range{ +func initRanges() []rtree.Range { + var ranges [4]rtree.Range + ranges[0] = rtree.Range{ StartKey: []byte("aaa"), EndKey: []byte("aae"), } - ranges[1] = Range{ + ranges[1] = rtree.Range{ StartKey: []byte("aae"), EndKey: []byte("aaz"), } - ranges[2] = Range{ + ranges[2] = rtree.Range{ StartKey: []byte("ccd"), EndKey: []byte("ccf"), } - ranges[3] = Range{ + ranges[3] = rtree.Range{ StartKey: []byte("ccf"), EndKey: []byte("ccj"), } diff --git a/pkg/restore/util.go b/pkg/restore/util.go index 0936c1085..eb59e625f 100644 --- a/pkg/restore/util.go +++ b/pkg/restore/util.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/tidb/util/codec" "go.uber.org/zap" + "github.com/pingcap/br/pkg/rtree" "github.com/pingcap/br/pkg/summary" ) @@ -154,8 +155,8 @@ func getSSTMetaFromFile( func ValidateFileRanges( files []*backup.File, rewriteRules *RewriteRules, -) ([]Range, error) { - ranges := make([]Range, 0, len(files)) +) ([]rtree.Range, error) { + ranges := make([]rtree.Range, 0, len(files)) fileAppended := make(map[string]bool) for _, file := range files { @@ -174,7 +175,7 @@ func ValidateFileRanges( zap.Stringer("file", file)) return nil, errors.New("table ids dont match") } - ranges = append(ranges, Range{ + ranges = append(ranges, rtree.Range{ StartKey: file.GetStartKey(), EndKey: file.GetEndKey(), }) @@ -184,6 +185,39 @@ func ValidateFileRanges( return ranges, nil } +// AttachFilesToRanges attach files to ranges. +// Panic if range is overlapped or no range for files. +func AttachFilesToRanges( + files []*backup.File, + ranges []rtree.Range, +) []rtree.Range { + rangeTree := rtree.NewRangeTree() + for _, rg := range ranges { + rangeTree.Update(rg) + } + for _, f := range files { + + rg := rangeTree.Find(&rtree.Range{ + StartKey: f.GetStartKey(), + EndKey: f.GetEndKey(), + }) + if rg == nil { + log.Fatal("range not found", + zap.Binary("startKey", f.GetStartKey()), + zap.Binary("endKey", f.GetEndKey())) + } + file := *f + rg.Files = append(rg.Files, &file) + } + if rangeTree.Len() != len(ranges) { + log.Fatal("ranges overlapped", + zap.Int("ranges length", len(ranges)), + zap.Int("tree length", rangeTree.Len())) + } + sortedRanges := rangeTree.GetSortedRanges() + return sortedRanges +} + // ValidateFileRewriteRule uses rewrite rules to validate the ranges of a file func ValidateFileRewriteRule(file *backup.File, rewriteRules *RewriteRules) error { // Check if the start key has a matched rewrite key @@ -276,7 +310,7 @@ func truncateTS(key []byte) []byte { func SplitRanges( ctx context.Context, client *Client, - ranges []Range, + ranges []rtree.Range, rewriteRules *RewriteRules, updateCh chan<- struct{}, ) error { @@ -300,6 +334,10 @@ func rewriteFileKeys(file *backup.File, rewriteRules *RewriteRules) (startKey, e if startID == endID { startKey, rule = rewriteRawKey(file.GetStartKey(), rewriteRules) if rewriteRules != nil && rule == nil { + log.Error("cannot find rewrite rule", + zap.Binary("startKey", file.GetStartKey()), + zap.Reflect("rewrite table", rewriteRules.Table), + zap.Reflect("rewrite data", rewriteRules.Data)) err = errors.New("cannot find rewrite rule for start key") return } diff --git a/pkg/rtree/check.go b/pkg/rtree/check.go new file mode 100644 index 000000000..08c98d2f4 --- /dev/null +++ b/pkg/rtree/check.go @@ -0,0 +1,31 @@ +package rtree + +import ( + "encoding/hex" + + "github.com/google/btree" + "github.com/pingcap/log" + "go.uber.org/zap" +) + +// CheckDupFiles checks if there are any files are duplicated. +func CheckDupFiles(rangeTree *RangeTree) { + // Name -> SHA256 + files := make(map[string][]byte) + rangeTree.Ascend(func(i btree.Item) bool { + rg := i.(*Range) + for _, f := range rg.Files { + old, ok := files[f.Name] + if ok { + log.Error("dup file", + zap.String("Name", f.Name), + zap.String("SHA256_1", hex.EncodeToString(old)), + zap.String("SHA256_2", hex.EncodeToString(f.Sha256)), + ) + } else { + files[f.Name] = f.Sha256 + } + } + return true + }) +} diff --git a/pkg/backup/range_tree.go b/pkg/rtree/rtree.go similarity index 56% rename from pkg/backup/range_tree.go rename to pkg/rtree/rtree.go index 4d4b3c695..e3c136803 100644 --- a/pkg/backup/range_tree.go +++ b/pkg/rtree/rtree.go @@ -1,8 +1,21 @@ -package backup +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package rtree import ( "bytes" - "encoding/hex" + "fmt" "github.com/google/btree" "github.com/pingcap/kvproto/pkg/backup" @@ -15,10 +28,15 @@ type Range struct { StartKey []byte EndKey []byte Files []*backup.File - Error *backup.Error } -func (rg *Range) intersect( +// String formats a range to a string +func (rg *Range) String() string { + return fmt.Sprintf("[%x %x]", rg.StartKey, rg.EndKey) +} + +// Intersect returns +func (rg *Range) Intersect( start, end []byte, ) (subStart, subEnd []byte, isIntersect bool) { // empty mean the max end key @@ -49,8 +67,8 @@ func (rg *Range) intersect( return } -// contains check if the range contains the given key, [start, end) -func (rg *Range) contains(key []byte) bool { +// Contains check if the range contains the given key, [start, end) +func (rg *Range) Contains(key []byte) bool { start, end := rg.StartKey, rg.EndKey return bytes.Compare(key, start) >= 0 && (len(end) == 0 || bytes.Compare(key, end) < 0) @@ -65,31 +83,29 @@ func (rg *Range) Less(than btree.Item) bool { var _ btree.Item = &Range{} -// RangeTree is the result of a backup task +// RangeTree is sorted tree for Ranges. +// All the ranges it stored do not overlap. type RangeTree struct { - tree *btree.BTree + *btree.BTree } -func newRangeTree() RangeTree { +// NewRangeTree returns an empty range tree. +func NewRangeTree() RangeTree { return RangeTree{ - tree: btree.New(32), + BTree: btree.New(32), } } -func (rangeTree *RangeTree) len() int { - return rangeTree.tree.Len() -} - -// find is a helper function to find an item that contains the range start +// Find is a helper function to find an item that contains the range start // key. -func (rangeTree *RangeTree) find(rg *Range) *Range { +func (rangeTree *RangeTree) Find(rg *Range) *Range { var ret *Range - rangeTree.tree.DescendLessOrEqual(rg, func(i btree.Item) bool { + rangeTree.DescendLessOrEqual(rg, func(i btree.Item) bool { ret = i.(*Range) return false }) - if ret == nil || !ret.contains(rg.StartKey) { + if ret == nil || !ret.Contains(rg.StartKey) { return nil } @@ -104,13 +120,13 @@ func (rangeTree *RangeTree) getOverlaps(rg *Range) []*Range { // find() will return Range of range_a // and both startKey of range_a and range_b are less than endKey of range_d, // thus they are regarded as overlapped ranges. - found := rangeTree.find(rg) + found := rangeTree.Find(rg) if found == nil { found = rg } var overlaps []*Range - rangeTree.tree.AscendGreaterOrEqual(found, func(i btree.Item) bool { + rangeTree.AscendGreaterOrEqual(found, func(i btree.Item) bool { over := i.(*Range) if len(rg.EndKey) > 0 && bytes.Compare(rg.EndKey, over.StartKey) <= 0 { return false @@ -121,31 +137,57 @@ func (rangeTree *RangeTree) getOverlaps(rg *Range) []*Range { return overlaps } -func (rangeTree *RangeTree) update(rg *Range) { - overlaps := rangeTree.getOverlaps(rg) +// Update inserts range into tree and delete overlapping ranges. +func (rangeTree *RangeTree) Update(rg Range) { + overlaps := rangeTree.getOverlaps(&rg) // Range has backuped, overwrite overlapping range. for _, item := range overlaps { log.Info("delete overlapping range", zap.Binary("StartKey", item.StartKey), zap.Binary("EndKey", item.EndKey), ) - rangeTree.tree.Delete(item) + rangeTree.Delete(item) } - rangeTree.tree.ReplaceOrInsert(rg) + rangeTree.ReplaceOrInsert(&rg) } -func (rangeTree *RangeTree) put( +// Put forms a range and inserts it into tree. +func (rangeTree *RangeTree) Put( startKey, endKey []byte, files []*backup.File, ) { - rg := &Range{ + rg := Range{ StartKey: startKey, EndKey: endKey, Files: files, } - rangeTree.update(rg) + rangeTree.Update(rg) } -func (rangeTree *RangeTree) getIncompleteRange( +// InsertRange inserts ranges into the range tree. +// It returns a non-nil range if there are soe overlapped ranges. +func (rangeTree *RangeTree) InsertRange(rg Range) *Range { + out := rangeTree.ReplaceOrInsert(&rg) + if out == nil { + return nil + } + return out.(*Range) +} + +// GetSortedRanges collects and returns sorted ranges. +func (rangeTree *RangeTree) GetSortedRanges() []Range { + sortedRanges := make([]Range, 0, rangeTree.Len()) + rangeTree.Ascend(func(rg btree.Item) bool { + if rg == nil { + return false + } + sortedRanges = append(sortedRanges, *rg.(*Range)) + return true + }) + return sortedRanges +} + +// GetIncompleteRange returns missing range covered by startKey and endKey. +func (rangeTree *RangeTree) GetIncompleteRange( startKey, endKey []byte, ) []Range { if len(startKey) != 0 && bytes.Equal(startKey, endKey) { @@ -155,14 +197,14 @@ func (rangeTree *RangeTree) getIncompleteRange( requsetRange := Range{StartKey: startKey, EndKey: endKey} lastEndKey := startKey pviot := &Range{StartKey: startKey} - if first := rangeTree.find(pviot); first != nil { + if first := rangeTree.Find(pviot); first != nil { pviot.StartKey = first.StartKey } - rangeTree.tree.AscendGreaterOrEqual(pviot, func(i btree.Item) bool { + rangeTree.AscendGreaterOrEqual(pviot, func(i btree.Item) bool { rg := i.(*Range) if bytes.Compare(lastEndKey, rg.StartKey) < 0 { start, end, isIntersect := - requsetRange.intersect(lastEndKey, rg.StartKey) + requsetRange.Intersect(lastEndKey, rg.StartKey) if isIntersect { // There is a gap between the last item and the current item. incomplete = @@ -176,7 +218,7 @@ func (rangeTree *RangeTree) getIncompleteRange( // Check whether we need append the last range if !bytes.Equal(lastEndKey, endKey) && len(lastEndKey) != 0 && (len(endKey) == 0 || bytes.Compare(lastEndKey, endKey) < 0) { - start, end, isIntersect := requsetRange.intersect(lastEndKey, endKey) + start, end, isIntersect := requsetRange.Intersect(lastEndKey, endKey) if isIntersect { incomplete = append(incomplete, Range{StartKey: start, EndKey: end}) @@ -184,24 +226,3 @@ func (rangeTree *RangeTree) getIncompleteRange( } return incomplete } - -func (rangeTree *RangeTree) checkDupFiles() { - // Name -> SHA256 - files := make(map[string][]byte) - rangeTree.tree.Ascend(func(i btree.Item) bool { - rg := i.(*Range) - for _, f := range rg.Files { - old, ok := files[f.Name] - if ok { - log.Error("dup file", - zap.String("Name", f.Name), - zap.String("SHA256_1", hex.EncodeToString(old)), - zap.String("SHA256_2", hex.EncodeToString(f.Sha256)), - ) - } else { - files[f.Name] = f.Sha256 - } - } - return true - }) -} diff --git a/pkg/backup/range_tree_test.go b/pkg/rtree/rtree_test.go similarity index 72% rename from pkg/backup/range_tree_test.go rename to pkg/rtree/rtree_test.go index a7c2d1cd1..f4ec4f201 100644 --- a/pkg/backup/range_tree_test.go +++ b/pkg/rtree/rtree_test.go @@ -1,4 +1,4 @@ -// Copyright 2016 PingCAP, Inc. +// Copyright 2020 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package backup +package rtree import ( "fmt" @@ -31,63 +31,19 @@ func newRange(start, end []byte) *Range { } } -func (s *testRangeTreeSuite) TestRangeIntersect(c *C) { - rg := newRange([]byte("a"), []byte("c")) - - start, end, isIntersect := rg.intersect([]byte(""), []byte("")) - c.Assert(isIntersect, Equals, true) - c.Assert(start, DeepEquals, []byte("a")) - c.Assert(end, DeepEquals, []byte("c")) - - start, end, isIntersect = rg.intersect([]byte(""), []byte("a")) - c.Assert(isIntersect, Equals, false) - c.Assert(start, DeepEquals, []byte(nil)) - c.Assert(end, DeepEquals, []byte(nil)) - - start, end, isIntersect = rg.intersect([]byte(""), []byte("b")) - c.Assert(isIntersect, Equals, true) - c.Assert(start, DeepEquals, []byte("a")) - c.Assert(end, DeepEquals, []byte("b")) - - start, end, isIntersect = rg.intersect([]byte("a"), []byte("b")) - c.Assert(isIntersect, Equals, true) - c.Assert(start, DeepEquals, []byte("a")) - c.Assert(end, DeepEquals, []byte("b")) - - start, end, isIntersect = rg.intersect([]byte("aa"), []byte("b")) - c.Assert(isIntersect, Equals, true) - c.Assert(start, DeepEquals, []byte("aa")) - c.Assert(end, DeepEquals, []byte("b")) - - start, end, isIntersect = rg.intersect([]byte("b"), []byte("c")) - c.Assert(isIntersect, Equals, true) - c.Assert(start, DeepEquals, []byte("b")) - c.Assert(end, DeepEquals, []byte("c")) - - start, end, isIntersect = rg.intersect([]byte(""), []byte{1}) - c.Assert(isIntersect, Equals, false) - c.Assert(start, DeepEquals, []byte(nil)) - c.Assert(end, DeepEquals, []byte(nil)) - - start, end, isIntersect = rg.intersect([]byte("c"), []byte("")) - c.Assert(isIntersect, Equals, false) - c.Assert(start, DeepEquals, []byte(nil)) - c.Assert(end, DeepEquals, []byte(nil)) -} - func (s *testRangeTreeSuite) TestRangeTree(c *C) { - rangeTree := newRangeTree() - c.Assert(rangeTree.tree.Get(newRange([]byte(""), []byte(""))), IsNil) + rangeTree := NewRangeTree() + c.Assert(rangeTree.Get(newRange([]byte(""), []byte(""))), IsNil) search := func(key []byte) *Range { - rg := rangeTree.tree.Get(newRange(key, []byte(""))) + rg := rangeTree.Get(newRange(key, []byte(""))) if rg == nil { return nil } return rg.(*Range) } assertIncomplete := func(startKey, endKey []byte, ranges []Range) { - incomplete := rangeTree.getIncompleteRange(startKey, endKey) + incomplete := rangeTree.GetIncompleteRange(startKey, endKey) c.Logf("%#v %#v\n%#v\n%#v\n", startKey, endKey, incomplete, ranges) c.Assert(len(incomplete), Equals, len(ranges)) for idx, rg := range incomplete { @@ -111,8 +67,8 @@ func (s *testRangeTreeSuite) TestRangeTree(c *C) { rangeC := newRange([]byte("c"), []byte("d")) rangeD := newRange([]byte("d"), []byte("")) - rangeTree.update(rangeA) - c.Assert(rangeTree.len(), Equals, 1) + rangeTree.Update(*rangeA) + c.Assert(rangeTree.Len(), Equals, 1) assertIncomplete([]byte("a"), []byte("b"), []Range{}) assertIncomplete([]byte(""), []byte(""), []Range{ @@ -120,8 +76,8 @@ func (s *testRangeTreeSuite) TestRangeTree(c *C) { {StartKey: []byte("b"), EndKey: []byte("")}, }) - rangeTree.update(rangeC) - c.Assert(rangeTree.len(), Equals, 2) + rangeTree.Update(*rangeC) + c.Assert(rangeTree.Len(), Equals, 2) assertIncomplete([]byte("a"), []byte("c"), []Range{ {StartKey: []byte("b"), EndKey: []byte("c")}, }) @@ -136,55 +92,99 @@ func (s *testRangeTreeSuite) TestRangeTree(c *C) { }) c.Assert(search([]byte{}), IsNil) - c.Assert(search([]byte("a")), Equals, rangeA) + c.Assert(search([]byte("a")), DeepEquals, rangeA) c.Assert(search([]byte("b")), IsNil) - c.Assert(search([]byte("c")), Equals, rangeC) + c.Assert(search([]byte("c")), DeepEquals, rangeC) c.Assert(search([]byte("d")), IsNil) - rangeTree.update(rangeB) - c.Assert(rangeTree.len(), Equals, 3) - c.Assert(search([]byte("b")), Equals, rangeB) + rangeTree.Update(*rangeB) + c.Assert(rangeTree.Len(), Equals, 3) + c.Assert(search([]byte("b")), DeepEquals, rangeB) assertIncomplete([]byte(""), []byte(""), []Range{ {StartKey: []byte(""), EndKey: []byte("a")}, {StartKey: []byte("d"), EndKey: []byte("")}, }) - rangeTree.update(rangeD) - c.Assert(rangeTree.len(), Equals, 4) - c.Assert(search([]byte("d")), Equals, rangeD) + rangeTree.Update(*rangeD) + c.Assert(rangeTree.Len(), Equals, 4) + c.Assert(search([]byte("d")), DeepEquals, rangeD) assertIncomplete([]byte(""), []byte(""), []Range{ {StartKey: []byte(""), EndKey: []byte("a")}, }) // None incomplete for any range after insert range 0 - rangeTree.update(range0) - c.Assert(rangeTree.len(), Equals, 5) + rangeTree.Update(*range0) + c.Assert(rangeTree.Len(), Equals, 5) // Overwrite range B and C. rangeBD := newRange([]byte("b"), []byte("d")) - rangeTree.update(rangeBD) - c.Assert(rangeTree.len(), Equals, 4) + rangeTree.Update(*rangeBD) + c.Assert(rangeTree.Len(), Equals, 4) assertAllComplete() // Overwrite range BD, c-d should be empty - rangeTree.update(rangeB) - c.Assert(rangeTree.len(), Equals, 4) + rangeTree.Update(*rangeB) + c.Assert(rangeTree.Len(), Equals, 4) assertIncomplete([]byte(""), []byte(""), []Range{ {StartKey: []byte("c"), EndKey: []byte("d")}, }) - rangeTree.update(rangeC) - c.Assert(rangeTree.len(), Equals, 5) + rangeTree.Update(*rangeC) + c.Assert(rangeTree.Len(), Equals, 5) assertAllComplete() } +func (s *testRangeTreeSuite) TestRangeIntersect(c *C) { + rg := newRange([]byte("a"), []byte("c")) + + start, end, isIntersect := rg.Intersect([]byte(""), []byte("")) + c.Assert(isIntersect, Equals, true) + c.Assert(start, DeepEquals, []byte("a")) + c.Assert(end, DeepEquals, []byte("c")) + + start, end, isIntersect = rg.Intersect([]byte(""), []byte("a")) + c.Assert(isIntersect, Equals, false) + c.Assert(start, DeepEquals, []byte(nil)) + c.Assert(end, DeepEquals, []byte(nil)) + + start, end, isIntersect = rg.Intersect([]byte(""), []byte("b")) + c.Assert(isIntersect, Equals, true) + c.Assert(start, DeepEquals, []byte("a")) + c.Assert(end, DeepEquals, []byte("b")) + + start, end, isIntersect = rg.Intersect([]byte("a"), []byte("b")) + c.Assert(isIntersect, Equals, true) + c.Assert(start, DeepEquals, []byte("a")) + c.Assert(end, DeepEquals, []byte("b")) + + start, end, isIntersect = rg.Intersect([]byte("aa"), []byte("b")) + c.Assert(isIntersect, Equals, true) + c.Assert(start, DeepEquals, []byte("aa")) + c.Assert(end, DeepEquals, []byte("b")) + + start, end, isIntersect = rg.Intersect([]byte("b"), []byte("c")) + c.Assert(isIntersect, Equals, true) + c.Assert(start, DeepEquals, []byte("b")) + c.Assert(end, DeepEquals, []byte("c")) + + start, end, isIntersect = rg.Intersect([]byte(""), []byte{1}) + c.Assert(isIntersect, Equals, false) + c.Assert(start, DeepEquals, []byte(nil)) + c.Assert(end, DeepEquals, []byte(nil)) + + start, end, isIntersect = rg.Intersect([]byte("c"), []byte("")) + c.Assert(isIntersect, Equals, false) + c.Assert(start, DeepEquals, []byte(nil)) + c.Assert(end, DeepEquals, []byte(nil)) +} + func BenchmarkRangeTreeUpdate(b *testing.B) { - rangeTree := newRangeTree() + rangeTree := NewRangeTree() for i := 0; i < b.N; i++ { - item := &Range{ + item := Range{ StartKey: []byte(fmt.Sprintf("%20d", i)), EndKey: []byte(fmt.Sprintf("%20d", i+1))} - rangeTree.update(item) + rangeTree.Update(item) } } diff --git a/pkg/summary/collector.go b/pkg/summary/collector.go index cd5aac6c6..0fb1dfcf9 100644 --- a/pkg/summary/collector.go +++ b/pkg/summary/collector.go @@ -36,7 +36,9 @@ type LogCollector interface { Summary(name string) } -var collector = newLogCollector() +type logFunc func(msg string, fields ...zap.Field) + +var collector = newLogCollector(log.Info) type logCollector struct { mu sync.Mutex @@ -45,16 +47,21 @@ type logCollector struct { successCosts map[string]time.Duration successData map[string]uint64 failureReasons map[string]error - fields []zap.Field + durations map[string]time.Duration + ints map[string]int + + log logFunc } -func newLogCollector() LogCollector { +func newLogCollector(log logFunc) LogCollector { return &logCollector{ unitCount: 0, - fields: make([]zap.Field, 0), successCosts: make(map[string]time.Duration), successData: make(map[string]uint64), failureReasons: make(map[string]error), + durations: make(map[string]time.Duration), + ints: make(map[string]int), + log: log, } } @@ -97,19 +104,20 @@ func (tc *logCollector) CollectFailureUnit(name string, reason error) { func (tc *logCollector) CollectDuration(name string, t time.Duration) { tc.mu.Lock() defer tc.mu.Unlock() - tc.fields = append(tc.fields, zap.Duration(name, t)) + tc.durations[name] += t } func (tc *logCollector) CollectInt(name string, t int) { tc.mu.Lock() defer tc.mu.Unlock() - tc.fields = append(tc.fields, zap.Int(name, t)) + tc.ints[name] += t } func (tc *logCollector) Summary(name string) { tc.mu.Lock() defer func() { - tc.fields = tc.fields[:0] + tc.durations = make(map[string]time.Duration) + tc.ints = make(map[string]int) tc.successCosts = make(map[string]time.Duration) tc.failureReasons = make(map[string]error) tc.mu.Unlock() @@ -131,11 +139,17 @@ func (tc *logCollector) Summary(name string) { } } - logFields := tc.fields + logFields := make([]zap.Field, 0, len(tc.durations)+len(tc.ints)) + for key, val := range tc.durations { + logFields = append(logFields, zap.Duration(key, val)) + } + for key, val := range tc.ints { + logFields = append(logFields, zap.Int(key, val)) + } + if len(tc.failureReasons) != 0 { names := make([]string, 0, len(tc.failureReasons)) for name := range tc.failureReasons { - // logFields = append(logFields, zap.NamedError(name, reason)) names = append(names, name) } logFields = append(logFields, zap.Strings(msg, names)) @@ -162,7 +176,7 @@ func (tc *logCollector) Summary(name string) { msg += fmt.Sprintf(", %s: %d", name, data) } - log.Info(name+" summary: "+msg, logFields...) + tc.log(name+" summary: "+msg, logFields...) } // SetLogCollector allow pass LogCollector outside diff --git a/pkg/summary/collector_test.go b/pkg/summary/collector_test.go new file mode 100644 index 000000000..6a8704db2 --- /dev/null +++ b/pkg/summary/collector_test.go @@ -0,0 +1,46 @@ +package summary + +import ( + "testing" + "time" + + . "github.com/pingcap/check" + "go.uber.org/zap" +) + +func TestT(t *testing.T) { + TestingT(t) +} + +var _ = Suite(&testCollectorSuite{}) + +type testCollectorSuite struct { +} + +func (suit *testCollectorSuite) TestSumDurationInt(c *C) { + fields := []zap.Field{} + logger := func(msg string, fs ...zap.Field) { + fields = append(fields, fs...) + } + col := newLogCollector(logger) + col.CollectDuration("a", time.Second) + col.CollectDuration("b", time.Second) + col.CollectDuration("b", time.Second) + col.CollectInt("c", 2) + col.CollectInt("c", 2) + col.Summary("foo") + + c.Assert(len(fields), Equals, 3) + assertContains := func(field zap.Field) { + for _, f := range fields { + if f.Key == field.Key { + c.Assert(f, DeepEquals, field) + return + } + } + c.Error(fields, "do not contain", field) + } + assertContains(zap.Duration("a", time.Second)) + assertContains(zap.Duration("b", 2*time.Second)) + assertContains(zap.Int("c", 4)) +} diff --git a/pkg/task/backup.go b/pkg/task/backup.go index 2d9468394..4a2b6da12 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -22,6 +22,8 @@ import ( const ( flagBackupTimeago = "timeago" flagLastBackupTS = "lastbackupts" + + defaultBackupConcurrency = 4 ) // BackupConfig is the configuration specific for backup tasks. @@ -59,6 +61,9 @@ func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet) error { if err = cfg.Config.ParseFromFlags(flags); err != nil { return errors.Trace(err) } + if cfg.Config.Concurrency == 0 { + cfg.Config.Concurrency = defaultBackupConcurrency + } return nil } diff --git a/pkg/task/backup_raw.go b/pkg/task/backup_raw.go index 51d5267a5..721902afb 100644 --- a/pkg/task/backup_raw.go +++ b/pkg/task/backup_raw.go @@ -11,6 +11,7 @@ import ( "github.com/pingcap/br/pkg/backup" "github.com/pingcap/br/pkg/glue" + "github.com/pingcap/br/pkg/rtree" "github.com/pingcap/br/pkg/storage" "github.com/pingcap/br/pkg/summary" "github.com/pingcap/br/pkg/utils" @@ -102,7 +103,7 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *BackupRaw defer summary.Summary(cmdName) - backupRange := backup.Range{StartKey: cfg.StartKey, EndKey: cfg.EndKey} + backupRange := rtree.Range{StartKey: cfg.StartKey, EndKey: cfg.EndKey} // The number of regions need to backup approximateRegions, err := mgr.GetRegionCount(ctx, backupRange.StartKey, backupRange.EndKey) diff --git a/pkg/task/common.go b/pkg/task/common.go index 2de01b326..1e03177cb 100644 --- a/pkg/task/common.go +++ b/pkg/task/common.go @@ -100,9 +100,14 @@ func DefineCommonFlags(flags *pflag.FlagSet) { flags.String(flagKey, "", "Private key path for TLS connection") flags.Uint64(flagRateLimit, 0, "The rate limit of the task, MB/s per node") - flags.Uint32(flagConcurrency, 4, "The size of thread pool on each node that executes the task") flags.Bool(flagChecksum, true, "Run checksum at end of task") + // Default concurrency is different for backup and restore. + // Leave it 0 and let them adjust the value. + flags.Uint32(flagConcurrency, 0, "The size of thread pool on each node that executes the task") + // It may confuse users , so just hide it. + _ = flags.MarkHidden(flagConcurrency) + flags.Uint64(flagRateLimitUnit, utils.MB, "The unit of rate limit") _ = flags.MarkHidden(flagRateLimitUnit) diff --git a/pkg/task/restore.go b/pkg/task/restore.go index ef1ac861f..f5020403a 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -13,6 +13,7 @@ import ( "github.com/pingcap/br/pkg/conn" "github.com/pingcap/br/pkg/glue" "github.com/pingcap/br/pkg/restore" + "github.com/pingcap/br/pkg/rtree" "github.com/pingcap/br/pkg/summary" "github.com/pingcap/br/pkg/utils" ) @@ -31,6 +32,11 @@ var schedulers = map[string]struct{}{ "shuffle-hot-region-scheduler": {}, } +const ( + defaultRestoreConcurrency = 128 + maxRestoreBatchSizeLimit = 256 +) + // RestoreConfig is the configuration specific for restore tasks. type RestoreConfig struct { Config @@ -52,7 +58,14 @@ func (cfg *RestoreConfig) ParseFromFlags(flags *pflag.FlagSet) error { if err != nil { return errors.Trace(err) } - return cfg.Config.ParseFromFlags(flags) + err = cfg.Config.ParseFromFlags(flags) + if err != nil { + return errors.Trace(err) + } + if cfg.Config.Concurrency == 0 { + cfg.Config.Concurrency = defaultRestoreConcurrency + } + return nil } // RunRestore starts a restore task inside the current goroutine. @@ -123,6 +136,8 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf } summary.CollectInt("restore ranges", len(ranges)) + ranges = restore.AttachFilesToRanges(files, ranges) + // Redirect to log if there is no log file to avoid unreadable output. updateCh := utils.StartProgress( ctx, @@ -131,12 +146,13 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf int64(len(ranges)+len(files)), !cfg.LogProgress) - err = restore.SplitRanges(ctx, client, ranges, rewriteRules, updateCh) + clusterCfg, err := restorePreWork(ctx, client, mgr) if err != nil { - log.Error("split regions failed", zap.Error(err)) return err } + // Do not reset timestamp if we are doing incremental restore, because + // we are not allowed to decrease timestamp. if !client.IsIncremental() { if err = client.ResetTS(cfg.PD); err != nil { log.Error("reset pd TS failed", zap.Error(err)) @@ -144,20 +160,47 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf } } - removedSchedulers, err := restorePreWork(ctx, client, mgr) - if err != nil { - return err + // Restore sst files in batch. + batchSize := int(cfg.Concurrency) + if batchSize > maxRestoreBatchSizeLimit { + batchSize = maxRestoreBatchSizeLimit // 256 + } + for { + if len(ranges) == 0 { + break + } + if batchSize > len(ranges) { + batchSize = len(ranges) + } + var rangeBatch []rtree.Range + ranges, rangeBatch = ranges[batchSize:], ranges[0:batchSize:batchSize] + + // Split regions by the given rangeBatch. + err = restore.SplitRanges(ctx, client, rangeBatch, rewriteRules, updateCh) + if err != nil { + log.Error("split regions failed", zap.Error(err)) + return err + } + + // Collect related files in the given rangeBatch. + fileBatch := make([]*backup.File, 0, 2*len(rangeBatch)) + for _, rg := range rangeBatch { + fileBatch = append(fileBatch, rg.Files...) + } + + // After split, we can restore backup files. + err = client.RestoreFiles(fileBatch, rewriteRules, updateCh) + if err != nil { + break + } } - err = client.RestoreFiles(files, rewriteRules, updateCh) - // always run the post-work even on error, so we don't stuck in the import mode or emptied schedulers - postErr := restorePostWork(ctx, client, mgr, removedSchedulers) + // Always run the post-work even on error, so we don't stuck in the import + // mode or emptied schedulers + err = restorePostWork(ctx, client, mgr, clusterCfg) if err != nil { return err } - if postErr != nil { - return postErr - } // Restore has finished. close(updateCh)