Skip to content

Commit

Permalink
tikvclient: refine region-cache (#10256)
Browse files Browse the repository at this point in the history
1. mark store after send
* mark store be failed when sending data failure, it let tidb blackout this store in following in a period(continue fail count + last fail timestamp).
* mark store as success when sending data success.

2. invalidate region cache
* cache item never be deleted and only invalidate it to trigger pd re-fetch
* make cache item validate again to keep use old data if fetch failure caused by pd down.
  • Loading branch information
lysu authored and tiancaiamao committed May 21, 2019
1 parent 2e9ddb2 commit f6346a1
Show file tree
Hide file tree
Showing 9 changed files with 818 additions and 293 deletions.
2 changes: 1 addition & 1 deletion ddl/table_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func checkRegionStartWithTableID(c *C, id int64, store kvStore) {
c.Assert(err, IsNil)

// Region cache may be out of date, so we need to drop this expired region and load it again.
cache.DropRegion(loc.Region)
cache.InvalidateCachedRegion(loc.Region)
if bytes.Equal(loc.StartKey, []byte(regionStartKey)) {
return
}
Expand Down
5 changes: 4 additions & 1 deletion store/tikv/coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
_, regionIDs, _ := mocktikv.BootstrapWithMultiRegions(cluster, []byte("g"), []byte("n"), []byte("t"))
pdCli := &codecPDClient{mocktikv.NewPDClient(cluster)}
cache := NewRegionCache(pdCli)
defer cache.Close()

bo := NewBackoffer(context.Background(), 3000)

Expand Down Expand Up @@ -96,6 +97,7 @@ func (s *testCoprocessorSuite) TestSplitRegionRanges(c *C) {
mocktikv.BootstrapWithMultiRegions(cluster, []byte("g"), []byte("n"), []byte("t"))
pdCli := &codecPDClient{mocktikv.NewPDClient(cluster)}
cache := NewRegionCache(pdCli)
defer cache.Close()

bo := NewBackoffer(context.Background(), 3000)

Expand Down Expand Up @@ -148,6 +150,7 @@ func (s *testCoprocessorSuite) TestRebuild(c *C) {
storeID, regionIDs, peerIDs := mocktikv.BootstrapWithMultiRegions(cluster, []byte("m"))
pdCli := &codecPDClient{mocktikv.NewPDClient(cluster)}
cache := NewRegionCache(pdCli)
defer cache.Close()
bo := NewBackoffer(context.Background(), 3000)

tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "z"), false, false)
Expand All @@ -161,7 +164,7 @@ func (s *testCoprocessorSuite) TestRebuild(c *C) {
regionIDs = append(regionIDs, cluster.AllocID())
peerIDs = append(peerIDs, cluster.AllocID())
cluster.Split(regionIDs[1], regionIDs[2], []byte("q"), []uint64{peerIDs[2]}, storeID)
cache.DropRegion(tasks[1].region)
cache.InvalidateCachedRegion(tasks[1].region)

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "z"), true, false)
c.Assert(err, IsNil)
Expand Down
1 change: 1 addition & 0 deletions store/tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ func (s *tikvStore) Close() error {
if s.txnLatches != nil {
s.txnLatches.Close()
}
s.regionCache.Close()
return nil
}

Expand Down
10 changes: 9 additions & 1 deletion store/tikv/rawkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,15 @@ func NewRawKVClient(pdAddrs []string, security config.Security) (*RawKVClient, e

// Close closes the client.
func (c *RawKVClient) Close() error {
c.pdClient.Close()
if c.pdClient != nil {
c.pdClient.Close()
}
if c.regionCache != nil {
c.regionCache.Close()
}
if c.rpcClient == nil {
return nil
}
return c.rpcClient.Close()
}

Expand Down
Loading

0 comments on commit f6346a1

Please sign in to comment.