Skip to content

Commit

Permalink
store/tikv/scan: Cut the scan range by region before sending request …
Browse files Browse the repository at this point in the history
…to TiKV (#9070)

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
  • Loading branch information
MyonKeminta authored Jan 16, 2019
1 parent 33b4c3e commit cf456f0
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 2 deletions.
9 changes: 8 additions & 1 deletion store/tikv/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package tikv

import (
"bytes"
"context"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -151,11 +152,17 @@ func (s *Scanner) getData(bo *Backoffer) error {
if err != nil {
return errors.Trace(err)
}

reqEndKey := s.endKey
if len(reqEndKey) > 0 && len(loc.EndKey) > 0 && bytes.Compare(loc.EndKey, reqEndKey) < 0 {
reqEndKey = loc.EndKey
}

req := &tikvrpc.Request{
Type: tikvrpc.CmdScan,
Scan: &pb.ScanRequest{
StartKey: s.nextStartKey,
EndKey: s.endKey,
EndKey: reqEndKey,
Limit: uint32(s.batchSize),
Version: s.startTS(),
KeyOnly: s.snapshot.keyOnly,
Expand Down
12 changes: 11 additions & 1 deletion store/tikv/scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (s *testScanSuite) SetUpSuite(c *C) {
s.OneByOneSuite.SetUpSuite(c)
s.store = NewTestStore(c).(*tikvStore)
s.prefix = fmt.Sprintf("seek_%d", time.Now().Unix())
s.rowNums = append(s.rowNums, 1, scanBatchSize, scanBatchSize+1)
s.rowNums = append(s.rowNums, 1, scanBatchSize, scanBatchSize+1, scanBatchSize*3)
}

func (s *testScanSuite) TearDownSuite(c *C) {
Expand Down Expand Up @@ -90,6 +90,16 @@ func (s *testScanSuite) TestScan(c *C) {
err := txn.Commit(context.Background())
c.Assert(err, IsNil)

if rowNum > 123 {
err = s.store.SplitRegion(encodeKey(s.prefix, s08d("key", 123)))
c.Assert(err, IsNil)
}

if rowNum > 456 {
err = s.store.SplitRegion(encodeKey(s.prefix, s08d("key", 456)))
c.Assert(err, IsNil)
}

txn2 := s.beginTxn(c)
val, err := txn2.Get(encodeKey(s.prefix, s08d("key", 0)))
c.Assert(err, IsNil)
Expand Down

0 comments on commit cf456f0

Please sign in to comment.