Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Batch restore #167

Merged
merged 11 commits into from
Mar 6, 2020
7 changes: 4 additions & 3 deletions cmd/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"go.uber.org/zap"

"github.com/pingcap/br/pkg/restore"
"github.com/pingcap/br/pkg/rtree"
"github.com/pingcap/br/pkg/task"
"github.com/pingcap/br/pkg/utils"
)
Expand Down Expand Up @@ -166,15 +167,15 @@ func newBackupMetaCommand() *cobra.Command {
tables = append(tables, db.Tables...)
}
// Check if the ranges of files overlapped
rangeTree := restore.NewRangeTree()
rangeTree := rtree.NewRangeTree()
for _, file := range files {
if out := rangeTree.InsertRange(restore.Range{
if out := rangeTree.InsertRange(rtree.Range{
StartKey: file.GetStartKey(),
EndKey: file.GetEndKey(),
}); out != nil {
log.Error(
"file ranges overlapped",
zap.Stringer("out", out.(*restore.Range)),
zap.Stringer("out", out),
zap.Stringer("file", file),
)
}
Expand Down
29 changes: 15 additions & 14 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/util/ranger"
"go.uber.org/zap"

"github.com/pingcap/br/pkg/rtree"
"github.com/pingcap/br/pkg/storage"
"github.com/pingcap/br/pkg/summary"
"github.com/pingcap/br/pkg/utils"
Expand Down Expand Up @@ -179,13 +180,13 @@ func BuildBackupRangeAndSchema(
storage kv.Storage,
tableFilter *filter.Filter,
backupTS uint64,
) ([]Range, *Schemas, error) {
) ([]rtree.Range, *Schemas, error) {
info, err := dom.GetSnapshotInfoSchema(backupTS)
if err != nil {
return nil, nil, errors.Trace(err)
}

ranges := make([]Range, 0)
ranges := make([]rtree.Range, 0)
backupSchemas := newBackupSchemas()
for _, dbInfo := range info.AllSchemas() {
// skip system databases
Expand Down Expand Up @@ -233,7 +234,7 @@ func BuildBackupRangeAndSchema(
return nil, nil, err
}
for _, r := range tableRanges {
ranges = append(ranges, Range{
ranges = append(ranges, rtree.Range{
StartKey: r.StartKey,
EndKey: r.EndKey,
})
Expand Down Expand Up @@ -295,7 +296,7 @@ func GetBackupDDLJobs(dom *domain.Domain, lastBackupTS, backupTS uint64) ([]*mod
// BackupRanges make a backup of the given key ranges.
func (bc *Client) BackupRanges(
ctx context.Context,
ranges []Range,
ranges []rtree.Range,
req kvproto.BackupRequest,
updateCh chan<- struct{},
) error {
Expand Down Expand Up @@ -389,12 +390,12 @@ func (bc *Client) BackupRange(

push := newPushDown(ctx, bc.mgr, len(allStores))

var results RangeTree
var results rtree.RangeTree
results, err = push.pushBackup(req, allStores, updateCh)
if err != nil {
return err
}
log.Info("finish backup push down", zap.Int("Ok", results.len()))
log.Info("finish backup push down", zap.Int("Ok", results.Len()))

// Find and backup remaining ranges.
// TODO: test fine grained backup.
Expand All @@ -421,14 +422,14 @@ func (bc *Client) BackupRange(
zap.Reflect("EndVersion", req.EndVersion))
}

results.tree.Ascend(func(i btree.Item) bool {
r := i.(*Range)
results.Ascend(func(i btree.Item) bool {
r := i.(*rtree.Range)
bc.backupMeta.Files = append(bc.backupMeta.Files, r.Files...)
return true
})

// Check if there are duplicated files.
results.checkDupFiles()
rtree.CheckDupFiles(&results)

return nil
}
Expand Down Expand Up @@ -466,21 +467,21 @@ func (bc *Client) fineGrainedBackup(
backupTS uint64,
rateLimit uint64,
concurrency uint32,
rangeTree RangeTree,
rangeTree rtree.RangeTree,
updateCh chan<- struct{},
) error {
bo := tikv.NewBackoffer(ctx, backupFineGrainedMaxBackoff)
for {
// Step1, check whether there is any incomplete range
incomplete := rangeTree.getIncompleteRange(startKey, endKey)
incomplete := rangeTree.GetIncompleteRange(startKey, endKey)
if len(incomplete) == 0 {
return nil
}
log.Info("start fine grained backup", zap.Int("incomplete", len(incomplete)))
// Step2, retry backup on incomplete range
respCh := make(chan *kvproto.BackupResponse, 4)
errCh := make(chan error, 4)
retry := make(chan Range, 4)
retry := make(chan rtree.Range, 4)

max := &struct {
ms int
Expand Down Expand Up @@ -539,7 +540,7 @@ func (bc *Client) fineGrainedBackup(
zap.Binary("StartKey", resp.StartKey),
zap.Binary("EndKey", resp.EndKey),
)
rangeTree.put(resp.StartKey, resp.EndKey, resp.Files)
rangeTree.Put(resp.StartKey, resp.EndKey, resp.Files)

// Update progress
updateCh <- struct{}{}
Expand Down Expand Up @@ -625,7 +626,7 @@ func onBackupResponse(
func (bc *Client) handleFineGrained(
ctx context.Context,
bo *tikv.Backoffer,
rg Range,
rg rtree.Range,
lastBackupTS uint64,
backupTS uint64,
rateLimit uint64,
Expand Down
8 changes: 5 additions & 3 deletions pkg/backup/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"go.uber.org/zap"

"github.com/pingcap/br/pkg/rtree"
)

// pushDown warps a backup task.
Expand All @@ -35,9 +37,9 @@ func (push *pushDown) pushBackup(
req backup.BackupRequest,
stores []*metapb.Store,
updateCh chan<- struct{},
) (RangeTree, error) {
) (rtree.RangeTree, error) {
// Push down backup tasks to all tikv instances.
res := newRangeTree()
res := rtree.NewRangeTree()
wg := new(sync.WaitGroup)
for _, s := range stores {
storeID := s.GetId()
Expand Down Expand Up @@ -82,7 +84,7 @@ func (push *pushDown) pushBackup(
}
if resp.GetError() == nil {
// None error means range has been backuped successfully.
res.put(
res.Put(
resp.GetStartKey(), resp.GetEndKey(), resp.GetFiles())

// Update progress
Expand Down
20 changes: 9 additions & 11 deletions pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,10 @@ type Client struct {
ctx context.Context
cancel context.CancelFunc

pdClient pd.Client
fileImporter FileImporter
workerPool *utils.WorkerPool
tableWorkerPool *utils.WorkerPool
tlsConf *tls.Config
pdClient pd.Client
fileImporter FileImporter
workerPool *utils.WorkerPool
tlsConf *tls.Config

databases map[string]*utils.Database
ddlJobs []*model.Job
Expand All @@ -70,12 +69,11 @@ func NewRestoreClient(
}

return &Client{
ctx: ctx,
cancel: cancel,
pdClient: pdClient,
tableWorkerPool: utils.NewWorkerPool(128, "table"),
db: db,
tlsConf: tlsConf,
ctx: ctx,
cancel: cancel,
pdClient: pdClient,
db: db,
tlsConf: tlsConf,
}, nil
}

Expand Down
90 changes: 5 additions & 85 deletions pkg/restore/range.go
Original file line number Diff line number Diff line change
@@ -1,45 +1,19 @@
package restore

import (
"bytes"
"fmt"

"github.com/google/btree"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/tablecodec"
"go.uber.org/zap"
)

// Range represents a range of keys.
type Range struct {
StartKey []byte
EndKey []byte
}

// String formats a range to a string
func (r *Range) String() string {
return fmt.Sprintf("[%x %x]", r.StartKey, r.EndKey)
}

// Less compares a range with a btree.Item
func (r *Range) Less(than btree.Item) bool {
t := than.(*Range)
return len(r.EndKey) != 0 && bytes.Compare(r.EndKey, t.StartKey) <= 0
}

// contains returns if a key is included in the range.
func (r *Range) contains(key []byte) bool {
start, end := r.StartKey, r.EndKey
return bytes.Compare(key, start) >= 0 &&
(len(end) == 0 || bytes.Compare(key, end) < 0)
}
"github.com/pingcap/br/pkg/rtree"
)

// sortRanges checks if the range overlapped and sort them
func sortRanges(ranges []Range, rewriteRules *RewriteRules) ([]Range, error) {
rangeTree := NewRangeTree()
func sortRanges(ranges []rtree.Range, rewriteRules *RewriteRules) ([]rtree.Range, error) {
rangeTree := rtree.NewRangeTree()
for _, rg := range ranges {
if rewriteRules != nil {
startID := tablecodec.DecodeTableID(rg.StartKey)
Expand Down Expand Up @@ -77,64 +51,10 @@ func sortRanges(ranges []Range, rewriteRules *RewriteRules) ([]Range, error) {
return nil, errors.Errorf("ranges overlapped: %s, %s", out, rg)
}
}
sortedRanges := make([]Range, 0, len(ranges))
rangeTree.Ascend(func(rg *Range) bool {
if rg == nil {
return false
}
sortedRanges = append(sortedRanges, *rg)
return true
})
sortedRanges := rangeTree.GetSortedRanges()
return sortedRanges, nil
}

// RangeTree stores the ranges in an orderly manner.
// All the ranges it stored do not overlap.
type RangeTree struct {
tree *btree.BTree
}

// NewRangeTree returns a new RangeTree.
func NewRangeTree() *RangeTree {
return &RangeTree{tree: btree.New(32)}
}

// Find returns nil or a range in the range tree
func (rt *RangeTree) Find(key []byte) *Range {
var ret *Range
r := &Range{
StartKey: key,
}
rt.tree.DescendLessOrEqual(r, func(i btree.Item) bool {
ret = i.(*Range)
return false
})
if ret == nil || !ret.contains(key) {
return nil
}
return ret
}

// InsertRange inserts ranges into the range tree.
// it returns true if all ranges inserted successfully.
// it returns false if there are some overlapped ranges.
func (rt *RangeTree) InsertRange(rg Range) btree.Item {
return rt.tree.ReplaceOrInsert(&rg)
}

// RangeIterator allows callers of Ascend to iterate in-order over portions of
// the tree. When this function returns false, iteration will stop and the
// associated Ascend function will immediately return.
type RangeIterator func(rg *Range) bool

// Ascend calls the iterator for every value in the tree within [first, last],
// until the iterator returns false.
func (rt *RangeTree) Ascend(iterator RangeIterator) {
rt.tree.Ascend(func(i btree.Item) bool {
return iterator(i.(*Range))
})
}

// RegionInfo includes a region and the leader of the region.
type RegionInfo struct {
Region *metapb.Region
Expand Down
34 changes: 18 additions & 16 deletions pkg/restore/range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/tidb/tablecodec"

"github.com/pingcap/br/pkg/rtree"
)

type testRangeSuite struct{}
Expand All @@ -21,8 +23,8 @@ var RangeEquals Checker = &rangeEquals{
}

func (checker *rangeEquals) Check(params []interface{}, names []string) (result bool, error string) {
obtained := params[0].([]Range)
expected := params[1].([]Range)
obtained := params[0].([]rtree.Range)
expected := params[1].([]rtree.Range)
if len(obtained) != len(expected) {
return false, ""
}
Expand All @@ -44,20 +46,20 @@ func (s *testRangeSuite) TestSortRange(c *C) {
Table: make([]*import_sstpb.RewriteRule, 0),
Data: dataRules,
}
ranges1 := []Range{
{append(tablecodec.GenTableRecordPrefix(1), []byte("aaa")...),
append(tablecodec.GenTableRecordPrefix(1), []byte("bbb")...)},
ranges1 := []rtree.Range{
{StartKey: append(tablecodec.GenTableRecordPrefix(1), []byte("aaa")...),
EndKey: append(tablecodec.GenTableRecordPrefix(1), []byte("bbb")...), Files: nil},
}
rs1, err := sortRanges(ranges1, rewriteRules)
c.Assert(err, IsNil, Commentf("sort range1 failed: %v", err))
c.Assert(rs1, RangeEquals, []Range{
{append(tablecodec.GenTableRecordPrefix(4), []byte("aaa")...),
append(tablecodec.GenTableRecordPrefix(4), []byte("bbb")...)},
c.Assert(rs1, RangeEquals, []rtree.Range{
{StartKey: append(tablecodec.GenTableRecordPrefix(4), []byte("aaa")...),
EndKey: append(tablecodec.GenTableRecordPrefix(4), []byte("bbb")...), Files: nil},
})

ranges2 := []Range{
{append(tablecodec.GenTableRecordPrefix(1), []byte("aaa")...),
append(tablecodec.GenTableRecordPrefix(2), []byte("bbb")...)},
ranges2 := []rtree.Range{
{StartKey: append(tablecodec.GenTableRecordPrefix(1), []byte("aaa")...),
EndKey: append(tablecodec.GenTableRecordPrefix(2), []byte("bbb")...), Files: nil},
}
_, err = sortRanges(ranges2, rewriteRules)
c.Assert(err, ErrorMatches, ".*table id does not match.*")
Expand All @@ -66,10 +68,10 @@ func (s *testRangeSuite) TestSortRange(c *C) {
rewriteRules1 := initRewriteRules()
rs3, err := sortRanges(ranges3, rewriteRules1)
c.Assert(err, IsNil, Commentf("sort range1 failed: %v", err))
c.Assert(rs3, RangeEquals, []Range{
{[]byte("bbd"), []byte("bbf")},
{[]byte("bbf"), []byte("bbj")},
{[]byte("xxa"), []byte("xxe")},
{[]byte("xxe"), []byte("xxz")},
c.Assert(rs3, RangeEquals, []rtree.Range{
{StartKey: []byte("bbd"), EndKey: []byte("bbf"), Files: nil},
{StartKey: []byte("bbf"), EndKey: []byte("bbj"), Files: nil},
{StartKey: []byte("xxa"), EndKey: []byte("xxe"), Files: nil},
{StartKey: []byte("xxe"), EndKey: []byte("xxz"), Files: nil},
})
}
Loading