-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
store/tikv: run gc workers parallelly #5837
Conversation
store/tikv/gcworker/gc_worker.go
Outdated
return task, nil | ||
} | ||
|
||
func gcTaskWorker(store tikv.Storage, taskCh chan *gcTask, resultCh chan *gcResult) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about defining a struct type named gcTaskWorker
, then add a method run
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think it is a good idea.
store/tikv/gcworker/gc_worker.go
Outdated
bo := tikv.NewBackoffer(tikv.GcOneRegionMaxBackoff, ctx) | ||
// create task queue and task result queue, and start task workers | ||
gcTaskCh := make(chan *gcTask, gcConcurrency) | ||
gcResultCh := make(chan *gcResult, 1024) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why 1024
here?
store/tikv/gcworker/gc_worker.go
Outdated
@@ -106,6 +106,10 @@ const ( | |||
gcDefaultLifeTime = time.Minute * 10 | |||
gcSafePointKey = "tikv_gc_safe_point" | |||
gcSafePointCacheInterval = tikv.GcSafePointCacheInterval | |||
gcConcurrencyKey = "tikv_gc_concurrency" | |||
gcDefaultConcurrency = 32 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As i have tested, the concurrency should less than 50% of the total count of TiKV nodes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I test a cluster with 5 tikv node and 2000 regions, when i set GC concurrency as 2, only 5 regions was skipped during a round of GC.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is somehow different from what I think it will do, but I'm glad it works well.
store/tikv/gcworker/gc_worker.go
Outdated
@@ -118,6 +123,7 @@ var gcVariableComments = map[string]string{ | |||
gcRunIntervalKey: "GC run interval, at least 10m, in Go format.", | |||
gcLifeTimeKey: "All versions within life time will not be collected by GC, at least 10m, in Go format.", | |||
gcSafePointKey: "All versions after safe point can be accessed. (DO NOT EDIT)", | |||
gcConcurrencyKey: "How many go routines used to do GC parallel, [1, 1024], default 32", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
default 32?
gcDefaultConcurrency = 2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[1, 128]
store/tikv/gcworker/gc_worker.go
Outdated
} | ||
|
||
func (w *gcTaskWorker) run() { | ||
for { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't use task == nil
to indicate work channel finish.
Go idiom:
for task := <-w.taskCh {
}
store/tikv/gcworker/gc_worker.go
Outdated
res = &gcResult{ | ||
successRegions: 0, | ||
failedRegions: 0, | ||
lastErr: err, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/err/errors.Trace(err)
failedRegions = 0 when error is not nil?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It means we don't get the region, so set it 0
here.
store/tikv/gcworker/gc_worker.go
Outdated
lastErr: err, | ||
} | ||
} | ||
if res != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not put it into line 565?
store/tikv/gcworker/gc_worker.go
Outdated
} | ||
} | ||
|
||
func (w *GCWorker) loadGcConcurrencyWithDefault() (int, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/Gc/GC
Use capital letters for abbreviation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
store/tikv/gcworker/gc_worker.go
Outdated
case <-ticker.C: | ||
log.Infof("[gc worker] %s gc in process, safePoint: %v, successful regions: %v, failed regions: %v, cost time: %s", | ||
identifier, safePoint, successRegions, failedRegions, time.Since(startTime)) | ||
default: | ||
} | ||
|
||
bo := tikv.NewBackoffer(tikv.GcOneRegionMaxBackoff, ctx) | ||
task, err := genNextGCTask(store, bo, safePoint, key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
key is not initialized?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
start key is ""
, no need to do
store/tikv/gcworker/gc_worker.go
Outdated
} | ||
} | ||
|
||
waitTaskWorkersDone(gcResultCh, concurrency) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wg.Wait()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
} else { | ||
successRegions++ | ||
gcActionRegionResultCounter.WithLabelValues("success").Inc() | ||
} | ||
|
||
key = loc.EndKey |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is [startkey, endkey] a range and should we consider all regions in the range?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we do it in a range, not just a region if it splited
@@ -27,6 +27,7 @@ import ( | |||
"github.com/pingcap/tidb/store/mockstore" | |||
"github.com/pingcap/tidb/store/tikv" | |||
goctx "golang.org/x/net/context" | |||
"strconv" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put standard packages and third-party packages in different code block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
store/tikv/gcworker/gc_worker.go
Outdated
lastErr: lastErr, | ||
} | ||
|
||
return res, nil | ||
} | ||
|
||
// these two errors should not return together, for more, see the func 'doGC' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is strange, would you update it please?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be here, not outdated, we sadly rely on this.
store/tikv/gcworker/gc_worker.go
Outdated
if taskRes == nil { | ||
count++ | ||
if count == concurrency { | ||
close(gcResultCh) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to deal with gcResultCh
, its capacity is large enough, no task worker will be blocked on sending to this channel.
store/tikv/gcworker/gc_worker.go
Outdated
func (w *gcTaskWorker) run() { | ||
defer w.wg.Done() | ||
for { | ||
task := <-w.taskCh |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can write the for loop like for task := range w.taskCh {
When the channel is closed, the for loop will break.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
store/tikv/gcworker/gc_worker.go
Outdated
return | ||
} | ||
|
||
res, err := doGCForRange(w.store, task.startKey, task.endKey, task.safePoint) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be cleaner to define the doGCForRange
as a method of gcTaskWorker
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
store/tikv/gcworker/gc_worker.go
Outdated
func (w *GCWorker) doGC(ctx goctx.Context, safePoint uint64) error { | ||
gcConcurrency, err := w.loadGCConcurrencyWithDefault() | ||
if err != nil { | ||
log.Errorf("") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Miss err here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
store/tikv/gcworker/gc_worker.go
Outdated
ticker := time.NewTicker(gcJobLogTickInterval) | ||
defer ticker.Stop() | ||
|
||
// create task queue and task result queue, and start task workers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please fix comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And sentence begins with a capital letter, end with dot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
store/tikv/gcworker/gc_worker.go
Outdated
gcHistogram.WithLabelValues("do_gc").Observe(time.Since(startTime).Seconds()) | ||
}() | ||
|
||
bo := tikv.NewBackoffer(tikv.GcOneRegionMaxBackoff, ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please move this line into below for loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
store/tikv/gcworker/gc_worker.go
Outdated
@@ -37,6 +37,7 @@ import ( | |||
tidbutil "github.com/pingcap/tidb/util" | |||
log "github.com/sirupsen/logrus" | |||
goctx "golang.org/x/net/context" | |||
"sync/atomic" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move to line 23
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
store/tikv/gcworker/gc_worker.go
Outdated
ticker := time.NewTicker(gcJobLogTickInterval) | ||
defer ticker.Stop() | ||
|
||
// create task queue and task result queue, and start task workers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And sentence begins with a capital letter, end with dot.
store/tikv/gcworker/gc_worker.go
Outdated
|
||
// create task queue and task result queue, and start task workers | ||
gcTaskCh := make(chan *gcTask, concurrency) | ||
wg := sync.WaitGroup{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
var wg sync.WaitGroup
wg.Add(concurrency)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
store/tikv/gcworker/gc_worker.go
Outdated
close(gcTaskCh) | ||
wg.Wait() | ||
log.Infof("[gc worker] %s finish gc, safePoint: %v, successful regions: %v, failed regions: %v, total cost time: %s", | ||
identifier, safePoint, atomic.LoadInt32(&successRegions), atomic.LoadInt32(&failedRegions), time.Since(startTime)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't matter if it's not atomic load here, because when the code run here, no worker would use successRegions/failedRegions any more.
atomic load is okay, of course.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to keep it for future.
store/tikv/gcworker/gc_worker.go
Outdated
|
||
task, err := genNextGCTask(store, bo, safePoint, key) | ||
if err != nil { | ||
log.Errorf("[gc worker] %s, gc interupted because get region error, err %v", identifier, errors.Trace(err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/errors.Trace(err) / errors.ErrorStack(err)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be reported in other, I think I'd better remove it.
store/tikv/gcworker/gc_worker.go
Outdated
default: | ||
} | ||
|
||
task, err := genNextGCTask(store, bo, safePoint, key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where is the variable key
initialized?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what would happen when region split during different call of this function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to init, default values ""
is ok.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
task is defined by range, not region, so the worker will deal with the two regions if it splited.
@@ -123,33 +124,76 @@ func (s *testGCWorkerSuite) TestPrepareGC(c *C) { | |||
safePoint, err = s.gcWorker.loadTime(gcSafePointKey, s.gcWorker.session) | |||
c.Assert(err, IsNil) | |||
s.timeEqual(c, safePoint.Add(time.Minute*30), now, 2*time.Second) | |||
|
|||
// change GC concurrency |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Format
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
func (w *gcTaskWorker) run() { | ||
defer w.wg.Done() | ||
for task := range w.taskCh { | ||
err := w.doGCForRange(task.startKey, task.endKey, task.safePoint) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In region split case, will some worker GC the same region? Is there any problem for this situation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No problem here, see above
store/tikv/gcworker/gc_worker.go
Outdated
for task := range w.taskCh { | ||
err := w.doGCForRange(task.startKey, task.endKey, task.safePoint) | ||
if err != nil { | ||
log.Errorf("[gc worker] %s, gc interupted because get region error, err %v", w.identifier, errors.Trace(err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better log it with the task info.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
store/tikv/gcworker/gc_worker.go
Outdated
gcConcurrency = gcDefaultConcurrency | ||
} | ||
|
||
return doGC(ctx, w.store, safePoint, w.uuid, gcConcurrency) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So many doGC
makes me confused, could you change this function name to doGCParallelly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And this method can be merged to runGCJob
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Empty function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?this func is used to get the gcConcurrency
LGTM |
/run-all-tests |
@coocood PTAL |
LGTM @coocood |
store/tikv/gcworker/gc_worker.go
Outdated
return task, nil | ||
} | ||
|
||
func doGCParallel(ctx goctx.Context, store tikv.Storage, safePoint uint64, identifier string, concurrency int) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make this function as a method of GCWorker
is better.
We can remove RunGCJob
as it is only used by a bench program.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
LGTM |
/run-all-tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
No description provided.