Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
conn, restore: paginate scan regions (#165)
Browse files Browse the repository at this point in the history
* conn, restore: paginate scan regions

Signed-off-by: Neil Shen <overvenus@gmail.com>

* tests: large timeout

Signed-off-by: Neil Shen <overvenus@gmail.com>
  • Loading branch information
overvenus authored Mar 5, 2020
1 parent 0a1a044 commit 09fb715
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 24 deletions.
9 changes: 8 additions & 1 deletion pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ import (
"google.golang.org/grpc/keepalive"

"github.com/pingcap/br/pkg/glue"
"github.com/pingcap/br/pkg/utils"
)

const (
dialTimeout = 5 * time.Second
clusterVersionPrefix = "pd/api/v1/config/cluster-version"
regionCountPrefix = "pd/api/v1/stats/region"
schdulerPrefix = "pd/api/v1/schedulers"
maxMsgSize = int(128 * utils.MB) // pd.ScanRegion may return a large response
)

// Mgr manages connections to a TiDB cluster.
Expand Down Expand Up @@ -125,7 +127,12 @@ func NewMgr(
return nil, errors.Annotatef(failure, "pd address (%s) not available, please check network", pdAddrs)
}

pdClient, err := pd.NewClient(addrs, securityOption)
maxCallMsgSize := []grpc.DialOption{
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(maxMsgSize)),
}
pdClient, err := pd.NewClient(
addrs, securityOption, pd.WithGRPCDialOptions(maxCallMsgSize...))
if err != nil {
log.Error("fail to create pd client", zap.Error(err))
return nil, err
Expand Down
5 changes: 4 additions & 1 deletion pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
)

const importScanRegionTime = 10 * time.Second
const scanRegionPaginationLimit = int(128)

// ImporterClient is used to import a file to TiKV
type ImporterClient interface {
Expand Down Expand Up @@ -171,7 +172,8 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRul
ctx, cancel := context.WithTimeout(importer.ctx, importScanRegionTime)
defer cancel()
// Scan regions covered by the file range
regionInfos, errScanRegion := importer.metaClient.ScanRegions(ctx, startKey, endKey, 0)
regionInfos, errScanRegion := paginateScanRegion(
ctx, importer.metaClient, startKey, endKey, scanRegionPaginationLimit)
if errScanRegion != nil {
return errors.Trace(errScanRegion)
}
Expand Down Expand Up @@ -199,6 +201,7 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRul
zap.Error(errDownload))
return errDownload
}

ingestResp, errIngest := importer.ingestSST(downloadMeta, info)
ingestRetry:
for errIngest == nil {
Expand Down
7 changes: 3 additions & 4 deletions pkg/restore/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,9 @@ func (rs *RegionSplitter) Split(
scatterRegions := make([]*RegionInfo, 0)
SplitRegions:
for i := 0; i < SplitRetryTimes; i++ {
var regions []*RegionInfo
regions, err = rs.client.ScanRegions(ctx, minKey, maxKey, 0)
if err != nil {
return errors.Trace(err)
regions, err1 := paginateScanRegion(ctx, rs.client, minKey, maxKey, scanRegionPaginationLimit)
if err1 != nil {
return errors.Trace(err1)
}
if len(regions) == 0 {
log.Warn("cannot scan any region")
Expand Down
24 changes: 14 additions & 10 deletions pkg/restore/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/schedule/placement"
"github.com/pingcap/tidb/util/codec"
)
Expand All @@ -18,13 +19,19 @@ type testClient struct {
mu sync.RWMutex
stores map[uint64]*metapb.Store
regions map[uint64]*RegionInfo
regionsInfo *core.RegionsInfo // For now it's only used in ScanRegions
nextRegionID uint64
}

func newTestClient(stores map[uint64]*metapb.Store, regions map[uint64]*RegionInfo, nextRegionID uint64) *testClient {
regionsInfo := core.NewRegionsInfo()
for _, regionInfo := range regions {
regionsInfo.AddRegion(core.NewRegionInfo(regionInfo.Region, regionInfo.Leader))
}
return &testClient{
stores: stores,
regions: regions,
regionsInfo: regionsInfo,
nextRegionID: nextRegionID,
}
}
Expand Down Expand Up @@ -142,16 +149,13 @@ func (c *testClient) GetOperator(ctx context.Context, regionID uint64) (*pdpb.Ge
}

func (c *testClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*RegionInfo, error) {
regions := make([]*RegionInfo, 0)
for _, region := range c.regions {
if limit > 0 && len(regions) >= limit {
break
}
if (len(region.Region.GetEndKey()) != 0 && bytes.Compare(region.Region.GetEndKey(), key) <= 0) ||
bytes.Compare(region.Region.GetStartKey(), endKey) > 0 {
continue
}
regions = append(regions, region)
infos := c.regionsInfo.ScanRange(key, endKey, limit)
regions := make([]*RegionInfo, 0, len(infos))
for _, info := range infos {
regions = append(regions, &RegionInfo{
Region: info.GetMeta(),
Leader: info.GetLeader(),
})
}
return regions, nil
}
Expand Down
33 changes: 33 additions & 0 deletions pkg/restore/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package restore
import (
"bytes"
"context"
"encoding/hex"
"strings"
"time"

Expand Down Expand Up @@ -324,3 +325,35 @@ func encodeKeyPrefix(key []byte) []byte {
encodedPrefix = append(encodedPrefix, codec.EncodeBytes([]byte{}, key[:len(key)-ungroupedLen])...)
return append(encodedPrefix[:len(encodedPrefix)-9], key[len(key)-ungroupedLen:]...)
}

// paginateScanRegion scan regions with a limit pagination and
// return all regions at once.
// It reduces max gRPC message size.
func paginateScanRegion(
ctx context.Context, client SplitClient, startKey, endKey []byte, limit int,
) ([]*RegionInfo, error) {
if len(endKey) != 0 && bytes.Compare(startKey, endKey) >= 0 {
return nil, errors.Errorf("startKey >= endKey, startKey %s, endkey %s",
hex.EncodeToString(startKey), hex.EncodeToString(endKey))
}

regions := []*RegionInfo{}
for {
batch, err := client.ScanRegions(ctx, startKey, endKey, limit)
if err != nil {
return nil, errors.Trace(err)
}
regions = append(regions, batch...)
if len(batch) < limit {
// No more region
break
}
startKey = batch[len(batch)-1].Region.GetEndKey()
if len(startKey) == 0 ||
(len(endKey) > 0 && bytes.Compare(startKey, endKey) >= 0) {
// All key space have scanned
break
}
}
return regions, nil
}
106 changes: 106 additions & 0 deletions pkg/restore/util_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package restore

import (
"context"
"encoding/binary"

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/backup"
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/codec"
)

var _ = Suite(&testRestoreUtilSuite{})
Expand Down Expand Up @@ -103,3 +107,105 @@ func (s *testRestoreUtilSuite) TestValidateFileRanges(c *C) {
)
c.Assert(err, ErrorMatches, "unexpected rewrite rules")
}

func (s *testRestoreUtilSuite) TestPaginateScanRegion(c *C) {
peers := make([]*metapb.Peer, 1)
peers[0] = &metapb.Peer{
Id: 1,
StoreId: 1,
}
stores := make(map[uint64]*metapb.Store)
stores[1] = &metapb.Store{
Id: 1,
}

makeRegions := func(num uint64) (map[uint64]*RegionInfo, []*RegionInfo) {
regionsMap := make(map[uint64]*RegionInfo, num)
regions := make([]*RegionInfo, 0, num)
endKey := make([]byte, 8)
for i := uint64(0); i < num-1; i++ {
ri := &RegionInfo{
Region: &metapb.Region{
Id: i + 1,
Peers: peers,
},
}

if i != 0 {
startKey := make([]byte, 8)
binary.BigEndian.PutUint64(startKey, i)
ri.Region.StartKey = codec.EncodeBytes([]byte{}, startKey)
}
endKey = make([]byte, 8)
binary.BigEndian.PutUint64(endKey, i+1)
ri.Region.EndKey = codec.EncodeBytes([]byte{}, endKey)

regionsMap[i] = ri
regions = append(regions, ri)
}

if num == 1 {
endKey = []byte{}
} else {
endKey = codec.EncodeBytes([]byte{}, endKey)
}
ri := &RegionInfo{
Region: &metapb.Region{
Id: num,
Peers: peers,
StartKey: endKey,
EndKey: []byte{},
},
}
regionsMap[num] = ri
regions = append(regions, ri)

return regionsMap, regions
}

ctx := context.Background()
regionMap := make(map[uint64]*RegionInfo)
regions := []*RegionInfo{}
batch, err := paginateScanRegion(ctx, newTestClient(stores, regionMap, 0), []byte{}, []byte{}, 3)
c.Assert(err, IsNil)
c.Assert(batch, DeepEquals, regions)

regionMap, regions = makeRegions(1)
batch, err = paginateScanRegion(ctx, newTestClient(stores, regionMap, 0), []byte{}, []byte{}, 3)
c.Assert(err, IsNil)
c.Assert(batch, DeepEquals, regions)

regionMap, regions = makeRegions(2)
batch, err = paginateScanRegion(ctx, newTestClient(stores, regionMap, 0), []byte{}, []byte{}, 3)
c.Assert(err, IsNil)
c.Assert(batch, DeepEquals, regions)

regionMap, regions = makeRegions(3)
batch, err = paginateScanRegion(ctx, newTestClient(stores, regionMap, 0), []byte{}, []byte{}, 3)
c.Assert(err, IsNil)
c.Assert(batch, DeepEquals, regions)

regionMap, regions = makeRegions(8)
batch, err = paginateScanRegion(ctx, newTestClient(stores, regionMap, 0), []byte{}, []byte{}, 3)
c.Assert(err, IsNil)
c.Assert(batch, DeepEquals, regions)

regionMap, regions = makeRegions(8)
batch, err = paginateScanRegion(
ctx, newTestClient(stores, regionMap, 0), regions[1].Region.StartKey, []byte{}, 3)
c.Assert(err, IsNil)
c.Assert(batch, DeepEquals, regions[1:])

batch, err = paginateScanRegion(
ctx, newTestClient(stores, regionMap, 0), []byte{}, regions[6].Region.EndKey, 3)
c.Assert(err, IsNil)
c.Assert(batch, DeepEquals, regions[:7])

batch, err = paginateScanRegion(
ctx, newTestClient(stores, regionMap, 0), regions[1].Region.StartKey, regions[1].Region.EndKey, 3)
c.Assert(err, IsNil)
c.Assert(batch, DeepEquals, regions[1:2])

_, err = paginateScanRegion(ctx, newTestClient(stores, regionMap, 0), []byte{2}, []byte{1}, 3)
c.Assert(err, ErrorMatches, "startKey >= endKey.*")
}
16 changes: 8 additions & 8 deletions tests/_utils/run_services
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ start_services() {
i=0
while ! curl -o /dev/null -sf "http://$PD_ADDR/pd/api/v1/version"; do
i=$((i+1))
if [ "$i" -gt 10 ]; then
if [ "$i" -gt 20 ]; then
echo 'Failed to start PD'
exit 1
fi
Expand All @@ -70,7 +70,7 @@ start_services() {
echo "Waiting initializing TiKV..."
while ! curl -sf "http://$PD_ADDR/pd/api/v1/cluster/status" | grep '"is_initialized": true'; do
i=$((i+1))
if [ "$i" -gt 10 ]; then
if [ "$i" -gt 20 ]; then
echo 'Failed to initialize TiKV cluster'
exit 1
fi
Expand All @@ -90,7 +90,7 @@ start_services() {
i=0
while ! curl -o /dev/null -sf "http://$TIDB_IP:10080/status"; do
i=$((i+1))
if [ "$i" -gt 10 ]; then
if [ "$i" -gt 20 ]; then
echo 'Failed to start TiDB'
exit 1
fi
Expand All @@ -100,7 +100,7 @@ start_services() {
i=0
while ! curl "http://$PD_ADDR/pd/api/v1/cluster/status" -sf | grep -q "\"is_initialized\": true"; do
i=$((i+1))
if [ "$i" -gt 10 ]; then
if [ "$i" -gt 20 ]; then
echo 'Failed to bootstrap cluster'
exit 1
fi
Expand Down Expand Up @@ -132,7 +132,7 @@ start_services_withTLS() {
--key $1/certificates/client-key.pem \
-o /dev/null -sf "https://$PD_ADDR/pd/api/v1/version"; do
i=$((i+1))
if [ "$i" -gt 10 ]; then
if [ "$i" -gt 20 ]; then
echo 'Failed to start PD'
exit 1
fi
Expand All @@ -155,7 +155,7 @@ start_services_withTLS() {
--key $1/certificates/client-key.pem \
-sf "https://$PD_ADDR/pd/api/v1/cluster/status" | grep '"is_initialized": true'; do
i=$((i+1))
if [ "$i" -gt 10 ]; then
if [ "$i" -gt 20 ]; then
echo 'Failed to initialize TiKV cluster'
exit 1
fi
Expand All @@ -178,7 +178,7 @@ start_services_withTLS() {
--key $1/certificates/client-key.pem \
-o /dev/null -sf "https://$TIDB_IP:10080/status"; do
i=$((i+1))
if [ "$i" -gt 10 ]; then
if [ "$i" -gt 20 ]; then
echo 'Failed to start TiDB'
exit 1
fi
Expand All @@ -191,7 +191,7 @@ start_services_withTLS() {
--key $1/certificates/client-key.pem \
"https://$PD_ADDR/pd/api/v1/cluster/status" -sf | grep -q "\"is_initialized\": true"; do
i=$((i+1))
if [ "$i" -gt 10 ]; then
if [ "$i" -gt 20 ]; then
echo 'Failed to bootstrap cluster'
exit 1
fi
Expand Down

0 comments on commit 09fb715

Please sign in to comment.