Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

do gc parallel #5832

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions store/tikv/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ const (
getMaxBackoff = 20000
prewriteMaxBackoff = 20000
cleanupMaxBackoff = 20000
GcGetTaskMaxBackoff = 600000
GcOneRegionMaxBackoff = 20000
GcResolveLockMaxBackoff = 100000
GcDeleteRangeMaxBackoff = 100000
Expand Down
161 changes: 133 additions & 28 deletions store/tikv/gcworker/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ const (
gcSafePointKey = "tikv_gc_safe_point"
gcSafePointCacheInterval = tikv.GcSafePointCacheInterval
// We don't want gc to sweep out the cached info belong to other processes, like coprocessor.
gcScanLockLimit = tikv.ResolvedCacheSize / 2
gcScanLockLimit = tikv.ResolvedCacheSize / 2
gcDefaultGCConcurrency = 8
)

var gcVariableComments = map[string]string{
Expand Down Expand Up @@ -422,6 +423,127 @@ func (w *GCWorker) deleteRanges(ctx goctx.Context, safePoint uint64) error {
return nil
}

type gcTask struct {
startKey []byte
endKey []byte
safePoint uint64
}

type gcResult struct {
task *gcTask
successRegions int
failedRegions int
err error
}

func getNextGCTask(store tikv.Storage, bo *tikv.Backoffer, safePoint uint64, lastKey kv.Key) (*gcTask, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/getNextGCTask/genNextGCTask

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

loc, err := store.GetRegionCache().LocateKey(bo, lastKey)
if err != nil {
return nil, errors.Trace(err)
}

task := &gcTask{
startKey: lastKey,
endKey: loc.EndKey,
safePoint: safePoint,
}
return task, nil
}

func doGC(ctx goctx.Context, store tikv.Storage, safePoint uint64, identifier string) error {
gcWorkerCounter.WithLabelValues("do_gc").Inc()

err := saveSafePoint(store.GetSafePointKV(), tikv.GcSavedSafePoint, safePoint)
if err != nil {
return errors.Trace(err)
}

// Sleep to wait for all other tidb instances update their safepoint cache.
time.Sleep(gcSafePointCacheInterval)

successRegions := 0
failedRegions := 0
remained := 0
startTime := time.Now()
log.Infof("[gc worker] %s start gc, safePoint: %v.", identifier, safePoint)
defer func() {
log.Infof("[gc worker] %s finish gc, running jobs %v, safePoint: %v, successful regions: %v,"+
" failed regions: %v, cost time: %s",
identifier, remained, safePoint, successRegions, failedRegions, time.Since(startTime))
gcHistogram.WithLabelValues("do_gc").Observe(time.Since(startTime).Seconds())
}()

var key []byte
gcResultChan := make(chan *gcResult, gcDefaultGCConcurrency)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gcTaskQueue := make(chan gcTask, gcDefaultGCConcurrency)
gcTaskCh := make(chan gcTask, 1024)
for i = 0; i < gcDefaultGCConcurrency; i++ {
go runGcTaskWorker(gcTaskQueue, gcResultCh)
}
var key []byte
for {
task, err := genNextGCTask(...)
if err != nil {...}
if task == nil {
send N nil to gcTaskQueue, notify all workers to quit
break;
}
gcTaskQueue <- task

...receive task results if have
}
reveive task results util N nil

bo := tikv.NewBackoffer(tikv.GcGetTaskMaxBackoff, goctx.Background())
jobCtx, cancel := goctx.WithCancel(ctx)
defer cancel()
for i := 0; i < gcDefaultGCConcurrency; i++ {
task, err := getNextGCTask(store, bo, safePoint, key)
if err != nil {
return errors.Trace(err)
}
key = task.endKey

if task != nil {
go doGCTask(jobCtx, store, identifier, gcResultChan, task)
remained++
}

if len(key) == 0 {
break
}
}

ticker := time.NewTicker(gcJobLogTickInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return errors.New("[gc worker] gc job canceled")
case <-ticker.C:
log.Infof("[gc worker] %s gc in process, running jobs %v, safePoint: %v, successful regions: %v, "+
"failed regions: %v, cost time: %s",
identifier, remained, safePoint, successRegions, failedRegions, time.Since(startTime))
case res := <-gcResultChan:
remained--

successRegions += res.successRegions
failedRegions += res.failedRegions

if len(key) == 0 {
if remained == 0 {
return nil
}

continue
}

task, err := getNextGCTask(store, bo, safePoint, key)
if err != nil {
return errors.Trace(err)
}
key = task.endKey

if task != nil {
go doGCTask(jobCtx, store, identifier, gcResultChan, task)
remained++
}
}
}
}

func doGCTask(ctx goctx.Context, store tikv.Storage, identifier string, resChan chan *gcResult, task *gcTask) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function should wait on a task queue and handle task one by one util receive nil.

res := doGCForOneRange(ctx, store, task, identifier)

select {
case <-ctx.Done():
return
default:
resChan <- res
}
}

func resolveLocks(ctx goctx.Context, store tikv.Storage, safePoint uint64, identifier string) error {
gcWorkerCounter.WithLabelValues("resolve_locks").Inc()

Expand Down Expand Up @@ -511,44 +633,30 @@ func resolveLocks(ctx goctx.Context, store tikv.Storage, safePoint uint64, ident
return nil
}

func doGC(ctx goctx.Context, store tikv.Storage, safePoint uint64, identifier string) error {
gcWorkerCounter.WithLabelValues("do_gc").Inc()

err := saveSafePoint(store.GetSafePointKV(), tikv.GcSavedSafePoint, safePoint)
if err != nil {
return errors.Trace(err)
}

// Sleep to wait for all other tidb instances update their safepoint cache.
time.Sleep(gcSafePointCacheInterval)

log.Infof("[gc worker] %s start gc, safePoint: %v.", identifier, safePoint)
startTime := time.Now()
func doGCForOneRange(ctx goctx.Context, store tikv.Storage, task *gcTask, identifier string) *gcResult {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems you should use a range as the input args from the function name.

successRegions := 0
failedRegions := 0

ticker := time.NewTicker(gcJobLogTickInterval)
defer ticker.Stop()

bo := tikv.NewBackoffer(tikv.GcOneRegionMaxBackoff, ctx)
var key []byte
key := task.startKey
for {
select {
case <-ctx.Done():
return errors.New("[gc worker] gc job canceled")
case <-ticker.C:
log.Infof("[gc worker] %s gc in process, safePoint: %v, successful regions: %v, failed regions: %v, cost time: %s",
identifier, safePoint, successRegions, failedRegions, time.Since(startTime))
return &gcResult{
task,
successRegions,
failedRegions,
errors.New("[gc worker] gc job canceled")}
default:
}

loc, err := store.GetRegionCache().LocateKey(bo, key)
if err != nil {
return errors.Trace(err)
return &gcResult{task, successRegions, failedRegions, errors.Trace(err)}
}

var regionErr *errorpb.Error
regionErr, err = doGCForOneRegion(bo, store, safePoint, loc.Region)
regionErr, err = doGCForOneRegion(bo, store, task.safePoint, loc.Region)

// we check regionErr here first, because we know 'regionErr' and 'err' should not return together, to keep it to
// make the process correct.
Expand All @@ -569,14 +677,11 @@ func doGC(ctx goctx.Context, store tikv.Storage, safePoint uint64, identifier st
}

key = loc.EndKey
if len(key) == 0 {
if len(key) == 0 || bytes.Compare(key, task.endKey) >= 0 {
break
}
bo = tikv.NewBackoffer(tikv.GcOneRegionMaxBackoff, ctx)
}
log.Infof("[gc worker] %s finish gc, safePoint: %v, successful regions: %v, failed regions: %v, cost time: %s",
identifier, safePoint, successRegions, failedRegions, time.Since(startTime))
gcHistogram.WithLabelValues("do_gc").Observe(time.Since(startTime).Seconds())
return nil
}

Expand Down