-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
store/tikv: run gc workers parallelly #5837
Changes from 12 commits
9463c94
1945bd0
87693c9
50cefba
4fe2f76
8c51b33
9028003
0870b77
3f58cb2
d078864
b2dc868
59d44aa
d46d475
d8aa1b5
f4c459d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ import ( | |
"os" | ||
"strconv" | ||
"sync" | ||
"sync/atomic" | ||
"time" | ||
|
||
"github.com/juju/errors" | ||
|
@@ -102,14 +103,19 @@ const ( | |
gcDefaultRunInterval = time.Minute * 10 | ||
gcWaitTime = time.Minute * 1 | ||
|
||
gcLifeTimeKey = "tikv_gc_life_time" | ||
gcDefaultLifeTime = time.Minute * 10 | ||
gcSafePointKey = "tikv_gc_safe_point" | ||
gcSafePointCacheInterval = tikv.GcSafePointCacheInterval | ||
gcLifeTimeKey = "tikv_gc_life_time" | ||
gcDefaultLifeTime = time.Minute * 10 | ||
gcSafePointKey = "tikv_gc_safe_point" | ||
gcConcurrencyKey = "tikv_gc_concurrency" | ||
gcDefaultConcurrency = 1 | ||
gcMinConcurrency = 1 | ||
gcMaxConcurrency = 128 | ||
// We don't want gc to sweep out the cached info belong to other processes, like coprocessor. | ||
gcScanLockLimit = tikv.ResolvedCacheSize / 2 | ||
) | ||
|
||
var gcSafePointCacheInterval = tikv.GcSafePointCacheInterval | ||
|
||
var gcVariableComments = map[string]string{ | ||
gcLeaderUUIDKey: "Current GC worker leader UUID. (DO NOT EDIT)", | ||
gcLeaderDescKey: "Host name and pid of current GC leader. (DO NOT EDIT)", | ||
|
@@ -118,6 +124,7 @@ var gcVariableComments = map[string]string{ | |
gcRunIntervalKey: "GC run interval, at least 10m, in Go format.", | ||
gcLifeTimeKey: "All versions within life time will not be collected by GC, at least 10m, in Go format.", | ||
gcSafePointKey: "All versions after safe point can be accessed. (DO NOT EDIT)", | ||
gcConcurrencyKey: "How many go routines used to do GC parallel, [1, 128], default 1", | ||
} | ||
|
||
func (w *GCWorker) start(ctx goctx.Context, wg *sync.WaitGroup) { | ||
|
@@ -150,7 +157,7 @@ func (w *GCWorker) start(ctx goctx.Context, wg *sync.WaitGroup) { | |
break | ||
} | ||
case <-ctx.Done(): | ||
log.Infof("[gc worker] (%s) quit.", w.uuid) | ||
log.Infof("[gc worker] %s quit.", w.uuid) | ||
return | ||
} | ||
} | ||
|
@@ -307,13 +314,22 @@ func RunGCJob(ctx goctx.Context, s tikv.Storage, safePoint uint64, identifier st | |
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
err = doGC(ctx, s, safePoint, identifier) | ||
err = doGCParallel(ctx, s, safePoint, identifier, gcDefaultConcurrency) | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
return nil | ||
} | ||
|
||
func (w *GCWorker) doGC(ctx goctx.Context, safePoint uint64) error { | ||
gcConcurrency, err := w.loadGCConcurrencyWithDefault() | ||
if err != nil { | ||
log.Errorf("[gc worker] %s failed to load gcConcurrency, err %s", w.uuid, err) | ||
gcConcurrency = gcDefaultConcurrency | ||
} | ||
|
||
return doGCParallel(ctx, w.store, safePoint, w.uuid, gcConcurrency) | ||
} | ||
func (w *GCWorker) runGCJob(ctx goctx.Context, safePoint uint64) { | ||
gcWorkerCounter.WithLabelValues("run_job").Inc() | ||
err := resolveLocks(ctx, w.store, safePoint, w.uuid) | ||
|
@@ -330,7 +346,7 @@ func (w *GCWorker) runGCJob(ctx goctx.Context, safePoint uint64) { | |
w.done <- errors.Trace(err) | ||
return | ||
} | ||
err = doGC(ctx, w.store, safePoint, w.uuid) | ||
err = w.doGC(ctx, safePoint) | ||
if err != nil { | ||
log.Errorf("[gc worker] %s do GC returns an error %v", w.uuid, err) | ||
w.gcIsRunning = false | ||
|
@@ -422,6 +438,35 @@ func (w *GCWorker) deleteRanges(ctx goctx.Context, safePoint uint64) error { | |
return nil | ||
} | ||
|
||
func (w *GCWorker) loadGCConcurrencyWithDefault() (int, error) { | ||
str, err := w.loadValueFromSysTable(gcConcurrencyKey, w.session) | ||
if err != nil { | ||
return gcDefaultConcurrency, errors.Trace(err) | ||
} | ||
if str == "" { | ||
err = w.saveValueToSysTable(gcConcurrencyKey, strconv.Itoa(gcDefaultConcurrency), w.session) | ||
if err != nil { | ||
return gcDefaultConcurrency, errors.Trace(err) | ||
} | ||
return gcDefaultConcurrency, nil | ||
} | ||
|
||
jobConcurrency, err := strconv.Atoi(str) | ||
if err != nil { | ||
return gcDefaultConcurrency, err | ||
} | ||
|
||
if jobConcurrency < gcMinConcurrency { | ||
jobConcurrency = gcMinConcurrency | ||
} | ||
|
||
if jobConcurrency > gcMaxConcurrency { | ||
jobConcurrency = gcMaxConcurrency | ||
} | ||
|
||
return jobConcurrency, nil | ||
} | ||
|
||
func resolveLocks(ctx goctx.Context, store tikv.Storage, safePoint uint64, identifier string) error { | ||
gcWorkerCounter.WithLabelValues("resolve_locks").Inc() | ||
|
||
|
@@ -511,44 +556,63 @@ func resolveLocks(ctx goctx.Context, store tikv.Storage, safePoint uint64, ident | |
return nil | ||
} | ||
|
||
func doGC(ctx goctx.Context, store tikv.Storage, safePoint uint64, identifier string) error { | ||
gcWorkerCounter.WithLabelValues("do_gc").Inc() | ||
|
||
err := saveSafePoint(store.GetSafePointKV(), tikv.GcSavedSafePoint, safePoint) | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
|
||
// Sleep to wait for all other tidb instances update their safepoint cache. | ||
time.Sleep(gcSafePointCacheInterval) | ||
type gcTask struct { | ||
startKey []byte | ||
endKey []byte | ||
safePoint uint64 | ||
} | ||
|
||
log.Infof("[gc worker] %s start gc, safePoint: %v.", identifier, safePoint) | ||
startTime := time.Now() | ||
successRegions := 0 | ||
failedRegions := 0 | ||
type gcTaskWorker struct { | ||
identifier string | ||
store tikv.Storage | ||
taskCh chan *gcTask | ||
wg *sync.WaitGroup | ||
// use atomic to read and set | ||
successRegions *int32 | ||
failedRegions *int32 | ||
} | ||
|
||
ticker := time.NewTicker(gcJobLogTickInterval) | ||
defer ticker.Stop() | ||
func newGCTaskWorker(store tikv.Storage, taskCh chan *gcTask, wg *sync.WaitGroup, identifer string, successRegions *int32, failedRegions *int32) *gcTaskWorker { | ||
return &gcTaskWorker{ | ||
identifer, | ||
store, | ||
taskCh, | ||
wg, | ||
successRegions, | ||
failedRegions, | ||
} | ||
} | ||
|
||
bo := tikv.NewBackoffer(tikv.GcOneRegionMaxBackoff, ctx) | ||
var key []byte | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return errors.New("[gc worker] gc job canceled") | ||
case <-ticker.C: | ||
log.Infof("[gc worker] %s gc in process, safePoint: %v, successful regions: %v, failed regions: %v, cost time: %s", | ||
identifier, safePoint, successRegions, failedRegions, time.Since(startTime)) | ||
default: | ||
func (w *gcTaskWorker) run() { | ||
defer w.wg.Done() | ||
for task := range w.taskCh { | ||
err := w.doGCForRange(task.startKey, task.endKey, task.safePoint) | ||
if err != nil { | ||
log.Errorf("[gc worker] %s, gc interupted because get region(%v, %v) error, err %v", | ||
w.identifier, task.startKey, task.endKey, errors.Trace(err)) | ||
} | ||
} | ||
} | ||
|
||
loc, err := store.GetRegionCache().LocateKey(bo, key) | ||
func (w *gcTaskWorker) doGCForRange(startKey []byte, endKey []byte, safePoint uint64) error { | ||
var successRegions int32 | ||
var failedRegions int32 | ||
defer func() { | ||
atomic.AddInt32(w.successRegions, successRegions) | ||
atomic.AddInt32(w.failedRegions, failedRegions) | ||
gcActionRegionResultCounter.WithLabelValues("success").Add(float64(successRegions)) | ||
gcActionRegionResultCounter.WithLabelValues("fail").Add(float64(failedRegions)) | ||
}() | ||
key := startKey | ||
for { | ||
bo := tikv.NewBackoffer(tikv.GcOneRegionMaxBackoff, goctx.Background()) | ||
loc, err := w.store.GetRegionCache().LocateKey(bo, key) | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
|
||
var regionErr *errorpb.Error | ||
regionErr, err = doGCForOneRegion(bo, store, safePoint, loc.Region) | ||
regionErr, err = w.doGCForRegion(bo, safePoint, loc.Region) | ||
|
||
// we check regionErr here first, because we know 'regionErr' and 'err' should not return together, to keep it to | ||
// make the process correct. | ||
|
@@ -560,36 +624,31 @@ func doGC(ctx goctx.Context, store tikv.Storage, safePoint uint64, identifier st | |
} | ||
|
||
if err != nil { | ||
log.Warnf("[gc worker] %s gc for range [%v, %v) safepoint: %v, failed, err: %v", w.identifier, startKey, endKey, safePoint, err) | ||
failedRegions++ | ||
gcActionRegionResultCounter.WithLabelValues("fail").Inc() | ||
log.Warnf("[gc worker] %s failed to do gc on region(%s, %s), ignore it", identifier, string(loc.StartKey), string(loc.EndKey)) | ||
} else { | ||
successRegions++ | ||
gcActionRegionResultCounter.WithLabelValues("success").Inc() | ||
} | ||
|
||
key = loc.EndKey | ||
if len(key) == 0 { | ||
if len(key) == 0 || bytes.Compare(key, endKey) >= 0 { | ||
break | ||
} | ||
bo = tikv.NewBackoffer(tikv.GcOneRegionMaxBackoff, ctx) | ||
} | ||
log.Infof("[gc worker] %s finish gc, safePoint: %v, successful regions: %v, failed regions: %v, cost time: %s", | ||
identifier, safePoint, successRegions, failedRegions, time.Since(startTime)) | ||
gcHistogram.WithLabelValues("do_gc").Observe(time.Since(startTime).Seconds()) | ||
|
||
return nil | ||
} | ||
|
||
// these two errors should not return together, for more, see the func 'doGC' | ||
func doGCForOneRegion(bo *tikv.Backoffer, store tikv.Storage, safePoint uint64, region tikv.RegionVerID) (*errorpb.Error, error) { | ||
// these two errors should not return together, for more, see the func 'doGCParallel' | ||
func (w *gcTaskWorker) doGCForRegion(bo *tikv.Backoffer, safePoint uint64, region tikv.RegionVerID) (*errorpb.Error, error) { | ||
req := &tikvrpc.Request{ | ||
Type: tikvrpc.CmdGC, | ||
GC: &kvrpcpb.GCRequest{ | ||
SafePoint: safePoint, | ||
}, | ||
} | ||
|
||
resp, err := store.SendReq(bo, req, region, tikv.GCTimeout) | ||
resp, err := w.store.SendReq(bo, req, region, tikv.GCTimeout) | ||
if err != nil { | ||
return nil, errors.Trace(err) | ||
} | ||
|
@@ -612,6 +671,83 @@ func doGCForOneRegion(bo *tikv.Backoffer, store tikv.Storage, safePoint uint64, | |
return nil, nil | ||
} | ||
|
||
func genNextGCTask(store tikv.Storage, bo *tikv.Backoffer, safePoint uint64, key kv.Key) (*gcTask, error) { | ||
loc, err := store.GetRegionCache().LocateKey(bo, key) | ||
if err != nil { | ||
return nil, errors.Trace(err) | ||
} | ||
|
||
task := &gcTask{ | ||
startKey: key, | ||
endKey: loc.EndKey, | ||
safePoint: safePoint, | ||
} | ||
return task, nil | ||
} | ||
|
||
func doGCParallel(ctx goctx.Context, store tikv.Storage, safePoint uint64, identifier string, concurrency int) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make this function as a method of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok |
||
gcWorkerCounter.WithLabelValues("do_gc").Inc() | ||
|
||
err := saveSafePoint(store.GetSafePointKV(), tikv.GcSavedSafePoint, safePoint) | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
|
||
// Sleep to wait for all other tidb instances update their safepoint cache. | ||
time.Sleep(gcSafePointCacheInterval) | ||
|
||
log.Infof("[gc worker] %s start gc, concurrency %v, safePoint: %v.", identifier, concurrency, safePoint) | ||
startTime := time.Now() | ||
var successRegions int32 | ||
var failedRegions int32 | ||
|
||
ticker := time.NewTicker(gcJobLogTickInterval) | ||
defer ticker.Stop() | ||
|
||
// Create task queue and start task workers. | ||
gcTaskCh := make(chan *gcTask, concurrency) | ||
var wg sync.WaitGroup | ||
for i := 0; i < concurrency; i++ { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is in |
||
w := newGCTaskWorker(store, gcTaskCh, &wg, identifier, &successRegions, &failedRegions) | ||
wg.Add(1) | ||
go w.run() | ||
} | ||
|
||
var key []byte | ||
defer func() { | ||
close(gcTaskCh) | ||
wg.Wait() | ||
log.Infof("[gc worker] %s finish gc, safePoint: %v, successful regions: %v, failed regions: %v, total cost time: %s", | ||
identifier, safePoint, atomic.LoadInt32(&successRegions), atomic.LoadInt32(&failedRegions), time.Since(startTime)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It doesn't matter if it's not atomic load here, because when the code run here, no worker would use successRegions/failedRegions any more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd like to keep it for future. |
||
gcHistogram.WithLabelValues("do_gc").Observe(time.Since(startTime).Seconds()) | ||
}() | ||
|
||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return errors.New("[gc worker] gc job canceled") | ||
case <-ticker.C: | ||
log.Infof("[gc worker] %s gc in process, safePoint: %v, successful regions: %v, failed regions: %v, total cost time: %s", | ||
identifier, safePoint, atomic.LoadInt32(&successRegions), atomic.LoadInt32(&failedRegions), time.Since(startTime)) | ||
default: | ||
} | ||
|
||
bo := tikv.NewBackoffer(tikv.GcOneRegionMaxBackoff, ctx) | ||
task, err := genNextGCTask(store, bo, safePoint, key) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. key is not initialized? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. start key is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. where is the variable There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what would happen when region split during different call of this function? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no need to init, default values There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. task is defined by range, not region, so the worker will deal with the two regions if it splited. |
||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
if task != nil { | ||
gcTaskCh <- task | ||
key = task.endKey | ||
} | ||
|
||
if len(key) == 0 { | ||
return nil | ||
} | ||
} | ||
} | ||
|
||
func (w *GCWorker) checkLeader() (bool, error) { | ||
gcWorkerCounter.WithLabelValues("check_leader").Inc() | ||
session := createSession(w.store) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In region split case, will some worker GC the same region? Is there any problem for this situation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No problem here, see above