Skip to content

Commit

Permalink
gc_worker: reduce GC scan locks when meeting region cache miss (#18385)…
Browse files Browse the repository at this point in the history
… (#18874)

Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
ti-srebot authored Aug 26, 2020
1 parent ac5d389 commit dae7cba
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 17 deletions.
67 changes: 50 additions & 17 deletions store/tikv/gcworker/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,18 @@ import (

// GCWorker periodically triggers GC process on tikv server.
type GCWorker struct {
uuid string
desc string
store tikv.Storage
pdClient pd.Client
gcIsRunning bool
lastFinish time.Time
cancel context.CancelFunc
done chan error
uuid string
desc string
store tikv.Storage
pdClient pd.Client
gcIsRunning bool
lastFinish time.Time
cancel context.CancelFunc
done chan error
testingKnobs struct {
scanLocks func(key []byte) []*tikv.Lock
resolveLocks func(regionID tikv.RegionVerID) (ok bool, err error)
}
}

// NewGCWorker creates a GCWorker instance.
Expand Down Expand Up @@ -823,6 +827,7 @@ func (w *GCWorker) resolveLocksForRange(
ctx = context.WithValue(ctx, "injectedBackoff", struct{}{})
bo = tikv.NewBackoffer(ctx, sleep)
})
retryScanAndResolve:
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -862,17 +867,33 @@ func (w *GCWorker) resolveLocksForRange(
for i := range locksInfo {
locks[i] = tikv.NewLock(locksInfo[i])
}

ok, err1 := w.store.GetLockResolver().BatchResolveLocks(bo, locks, loc.Region)
if err1 != nil {
return regions, errors.Trace(err1)
if w.testingKnobs.scanLocks != nil {
locks = append(locks, w.testingKnobs.scanLocks(key)...)
}
if !ok {
err = bo.Backoff(tikv.BoTxnLock, errors.Errorf("remain locks: %d", len(locks)))
if err != nil {
return regions, errors.Trace(err)
for {
ok, err1 := w.store.GetLockResolver().BatchResolveLocks(bo, locks, loc.Region)
if w.testingKnobs.resolveLocks != nil {
ok, err1 = w.testingKnobs.resolveLocks(loc.Region)
}
continue
if err1 != nil {
return regions, errors.Trace(err1)
}
if !ok {
err = bo.Backoff(tikv.BoTxnLock, errors.Errorf("remain locks: %d", len(locks)))
if err != nil {
return regions, errors.Trace(err)
}
stillInSame, refreshedLoc, err := w.tryRelocateLocksRegion(bo, locks)
if err != nil {
return regions, errors.Trace(err)
}
if stillInSame {
loc = refreshedLoc
continue
}
continue retryScanAndResolve
}
break
}
if len(locks) < gcScanLockLimit {
regions++
Expand All @@ -898,6 +919,18 @@ func (w *GCWorker) resolveLocksForRange(
return regions, nil
}

func (w *GCWorker) tryRelocateLocksRegion(bo *tikv.Backoffer, locks []*tikv.Lock) (stillInSameRegion bool, refreshedLoc *tikv.KeyLocation, err error) {
if len(locks) == 0 {
return
}
refreshedLoc, err = w.store.GetRegionCache().LocateKey(bo, locks[0].Key)
if err != nil {
return
}
stillInSameRegion = refreshedLoc.Contains(locks[len(locks)-1].Key)
return
}

func (w *GCWorker) uploadSafePointToPD(ctx context.Context, safePoint uint64) error {
var newSafePoint uint64
var err error
Expand Down
33 changes: 33 additions & 0 deletions store/tikv/gcworker/gc_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,39 @@ func (s *testGCWorkerSuite) TestResolveLockRangeInfine(c *C) {
c.Assert(err, NotNil)
}

func (s *testGCWorkerSuite) TestResolveLockRangeMeetRegionCacheMiss(c *C) {
var (
scanCnt int
scanCntRef = &scanCnt
resolveCnt int
resolveCntRef = &resolveCnt
)
s.gcWorker.testingKnobs.scanLocks = func(key []byte) []*tikv.Lock {
*scanCntRef++
return []*tikv.Lock{
{
Key: []byte{1},
},
{
Key: []byte{1},
},
}
}
s.gcWorker.testingKnobs.resolveLocks = func(regionID tikv.RegionVerID) (ok bool, err error) {
*resolveCntRef++
if *resolveCntRef == 1 {
s.gcWorker.store.GetRegionCache().InvalidateCachedRegion(regionID)
// mock the region cache miss error
return false, nil
}
return true, nil
}
_, err := s.gcWorker.resolveLocksForRange(context.Background(), 1, []byte{0}, []byte{10})
c.Assert(err, IsNil)
c.Assert(resolveCnt, Equals, 2)
c.Assert(scanCnt, Equals, 1)
}

func (s *testGCWorkerSuite) TestRunGCJob(c *C) {
gcSafePointCacheInterval = 0
err := RunGCJob(context.Background(), s.store, 0, "mock", 1)
Expand Down

0 comments on commit dae7cba

Please sign in to comment.