diff --git a/store/tikv/gcworker/gc_worker.go b/store/tikv/gcworker/gc_worker.go index a6fd34438972c..0e87b58afcfa2 100644 --- a/store/tikv/gcworker/gc_worker.go +++ b/store/tikv/gcworker/gc_worker.go @@ -48,14 +48,18 @@ import ( // GCWorker periodically triggers GC process on tikv server. type GCWorker struct { - uuid string - desc string - store tikv.Storage - pdClient pd.Client - gcIsRunning bool - lastFinish time.Time - cancel context.CancelFunc - done chan error + uuid string + desc string + store tikv.Storage + pdClient pd.Client + gcIsRunning bool + lastFinish time.Time + cancel context.CancelFunc + done chan error + testingKnobs struct { + scanLocks func(key []byte) []*tikv.Lock + resolveLocks func(regionID tikv.RegionVerID) (ok bool, err error) + } } // NewGCWorker creates a GCWorker instance. @@ -870,6 +874,7 @@ func (w *GCWorker) resolveLocksForRange( ctx = context.WithValue(ctx, "injectedBackoff", struct{}{}) bo = tikv.NewBackoffer(ctx, sleep) }) +retryScanAndResolve: for { select { case <-ctx.Done(): @@ -910,16 +915,33 @@ func (w *GCWorker) resolveLocksForRange( locks[i] = tikv.NewLock(locksInfo[i]) } - ok, err1 := w.store.GetLockResolver().BatchResolveLocks(bo, locks, loc.Region) - if err1 != nil { - return regions, errors.Trace(err1) + if w.testingKnobs.scanLocks != nil { + locks = append(locks, w.testingKnobs.scanLocks(key)...) } - if !ok { - err = bo.Backoff(tikv.BoTxnLock, errors.Errorf("remain locks: %d", len(locks))) - if err != nil { - return regions, errors.Trace(err) + for { + ok, err1 := w.store.GetLockResolver().BatchResolveLocks(bo, locks, loc.Region) + if w.testingKnobs.resolveLocks != nil { + ok, err1 = w.testingKnobs.resolveLocks(loc.Region) } - continue + if err1 != nil { + return regions, errors.Trace(err1) + } + if !ok { + err = bo.Backoff(tikv.BoTxnLock, errors.Errorf("remain locks: %d", len(locks))) + if err != nil { + return regions, errors.Trace(err) + } + stillInSame, refreshedLoc, err := w.tryRelocateLocksRegion(bo, locks) + if err != nil { + return regions, errors.Trace(err) + } + if stillInSame { + loc = refreshedLoc + continue + } + continue retryScanAndResolve + } + break } if len(locks) < gcScanLockLimit { regions++ @@ -945,6 +967,18 @@ func (w *GCWorker) resolveLocksForRange( return regions, nil } +func (w *GCWorker) tryRelocateLocksRegion(bo *tikv.Backoffer, locks []*tikv.Lock) (stillInSameRegion bool, refreshedLoc *tikv.KeyLocation, err error) { + if len(locks) == 0 { + return + } + refreshedLoc, err = w.store.GetRegionCache().LocateKey(bo, locks[0].Key) + if err != nil { + return + } + stillInSameRegion = refreshedLoc.Contains(locks[len(locks)-1].Key) + return +} + func (w *GCWorker) uploadSafePointToPD(ctx context.Context, safePoint uint64) error { var newSafePoint uint64 var err error diff --git a/store/tikv/gcworker/gc_worker_test.go b/store/tikv/gcworker/gc_worker_test.go index 342694b1dadc8..3fde8d1d73a61 100644 --- a/store/tikv/gcworker/gc_worker_test.go +++ b/store/tikv/gcworker/gc_worker_test.go @@ -370,6 +370,39 @@ func (s *testGCWorkerSuite) TestResolveLockRangeInfine(c *C) { c.Assert(err, NotNil) } +func (s *testGCWorkerSuite) TestResolveLockRangeMeetRegionCacheMiss(c *C) { + var ( + scanCnt int + scanCntRef = &scanCnt + resolveCnt int + resolveCntRef = &resolveCnt + ) + s.gcWorker.testingKnobs.scanLocks = func(key []byte) []*tikv.Lock { + *scanCntRef++ + return []*tikv.Lock{ + { + Key: []byte{1}, + }, + { + Key: []byte{1}, + }, + } + } + s.gcWorker.testingKnobs.resolveLocks = func(regionID tikv.RegionVerID) (ok bool, err error) { + *resolveCntRef++ + if *resolveCntRef == 1 { + s.gcWorker.store.GetRegionCache().InvalidateCachedRegion(regionID) + // mock the region cache miss error + return false, nil + } + return true, nil + } + _, err := s.gcWorker.resolveLocksForRange(context.Background(), 1, []byte{0}, []byte{10}) + c.Assert(err, IsNil) + c.Assert(resolveCnt, Equals, 2) + c.Assert(scanCnt, Equals, 1) +} + func (s *testGCWorkerSuite) TestRunGCJob(c *C) { gcSafePointCacheInterval = 0 err := RunGCJob(context.Background(), s.store, 0, "mock", 1)