Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv: run gc workers parallelly #5837

Merged
merged 15 commits into from
Feb 9, 2018
Merged

Conversation

zhangjinpeng87
Copy link
Contributor

No description provided.

return task, nil
}

func gcTaskWorker(store tikv.Storage, taskCh chan *gcTask, resultCh chan *gcResult) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about defining a struct type named gcTaskWorker, then add a method run?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think it is a good idea.

bo := tikv.NewBackoffer(tikv.GcOneRegionMaxBackoff, ctx)
// create task queue and task result queue, and start task workers
gcTaskCh := make(chan *gcTask, gcConcurrency)
gcResultCh := make(chan *gcResult, 1024)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why 1024 here?

@@ -106,6 +106,10 @@ const (
gcDefaultLifeTime = time.Minute * 10
gcSafePointKey = "tikv_gc_safe_point"
gcSafePointCacheInterval = tikv.GcSafePointCacheInterval
gcConcurrencyKey = "tikv_gc_concurrency"
gcDefaultConcurrency = 32
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As i have tested, the concurrency should less than 50% of the total count of TiKV nodes.

Copy link
Contributor Author

@zhangjinpeng87 zhangjinpeng87 Feb 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I test a cluster with 5 tikv node and 2000 regions, when i set GC concurrency as 2, only 5 regions was skipped during a round of GC.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is somehow different from what I think it will do, but I'm glad it works well.

@wentaoxu wentaoxu changed the title [DNM] run gc workers parallelly store/tikv: run gc workers parallelly Feb 9, 2018
@wentaoxu
Copy link
Contributor

wentaoxu commented Feb 9, 2018

@@ -118,6 +123,7 @@ var gcVariableComments = map[string]string{
gcRunIntervalKey: "GC run interval, at least 10m, in Go format.",
gcLifeTimeKey: "All versions within life time will not be collected by GC, at least 10m, in Go format.",
gcSafePointKey: "All versions after safe point can be accessed. (DO NOT EDIT)",
gcConcurrencyKey: "How many go routines used to do GC parallel, [1, 1024], default 32",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default 32?

gcDefaultConcurrency = 2

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[1, 128]

}

func (w *gcTaskWorker) run() {
for {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't use task == nil to indicate work channel finish.

Go idiom:

for task := <-w.taskCh {
}

res = &gcResult{
successRegions: 0,
failedRegions: 0,
lastErr: err,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/err/errors.Trace(err)
failedRegions = 0 when error is not nil?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It means we don't get the region, so set it 0 here.

lastErr: err,
}
}
if res != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not put it into line 565?

}
}

func (w *GCWorker) loadGcConcurrencyWithDefault() (int, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/Gc/GC
Use capital letters for abbreviation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

case <-ticker.C:
log.Infof("[gc worker] %s gc in process, safePoint: %v, successful regions: %v, failed regions: %v, cost time: %s",
identifier, safePoint, successRegions, failedRegions, time.Since(startTime))
default:
}

bo := tikv.NewBackoffer(tikv.GcOneRegionMaxBackoff, ctx)
task, err := genNextGCTask(store, bo, safePoint, key)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

key is not initialized?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

start key is "", no need to do

}
}

waitTaskWorkersDone(gcResultCh, concurrency)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wg.Wait()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

} else {
successRegions++
gcActionRegionResultCounter.WithLabelValues("success").Inc()
}

key = loc.EndKey
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is [startkey, endkey] a range and should we consider all regions in the range?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we do it in a range, not just a region if it splited

@@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/tikv"
goctx "golang.org/x/net/context"
"strconv"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put standard packages and third-party packages in different code block.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

lastErr: lastErr,
}

return res, nil
}

// these two errors should not return together, for more, see the func 'doGC'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is strange, would you update it please?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be here, not outdated, we sadly rely on this.

if taskRes == nil {
count++
if count == concurrency {
close(gcResultCh)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to deal with gcResultCh, its capacity is large enough, no task worker will be blocked on sending to this channel.

func (w *gcTaskWorker) run() {
defer w.wg.Done()
for {
task := <-w.taskCh
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can write the for loop like for task := range w.taskCh {
When the channel is closed, the for loop will break.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

return
}

res, err := doGCForRange(w.store, task.startKey, task.endKey, task.safePoint)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be cleaner to define the doGCForRange as a method of gcTaskWorker

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@wentaoxu
Copy link
Contributor

wentaoxu commented Feb 9, 2018

func (w *GCWorker) doGC(ctx goctx.Context, safePoint uint64) error {
gcConcurrency, err := w.loadGCConcurrencyWithDefault()
if err != nil {
log.Errorf("")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Miss err here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

ticker := time.NewTicker(gcJobLogTickInterval)
defer ticker.Stop()

// create task queue and task result queue, and start task workers
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And sentence begins with a capital letter, end with dot.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

gcHistogram.WithLabelValues("do_gc").Observe(time.Since(startTime).Seconds())
}()

bo := tikv.NewBackoffer(tikv.GcOneRegionMaxBackoff, ctx)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move this line into below for loop.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@@ -37,6 +37,7 @@ import (
tidbutil "github.com/pingcap/tidb/util"
log "github.com/sirupsen/logrus"
goctx "golang.org/x/net/context"
"sync/atomic"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move to line 23

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

ticker := time.NewTicker(gcJobLogTickInterval)
defer ticker.Stop()

// create task queue and task result queue, and start task workers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And sentence begins with a capital letter, end with dot.


// create task queue and task result queue, and start task workers
gcTaskCh := make(chan *gcTask, concurrency)
wg := sync.WaitGroup{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

var wg sync.WaitGroup
wg.Add(concurrency)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

close(gcTaskCh)
wg.Wait()
log.Infof("[gc worker] %s finish gc, safePoint: %v, successful regions: %v, failed regions: %v, total cost time: %s",
identifier, safePoint, atomic.LoadInt32(&successRegions), atomic.LoadInt32(&failedRegions), time.Since(startTime))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't matter if it's not atomic load here, because when the code run here, no worker would use successRegions/failedRegions any more.
atomic load is okay, of course.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to keep it for future.


task, err := genNextGCTask(store, bo, safePoint, key)
if err != nil {
log.Errorf("[gc worker] %s, gc interupted because get region error, err %v", identifier, errors.Trace(err))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/errors.Trace(err) / errors.ErrorStack(err)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be reported in other, I think I'd better remove it.

default:
}

task, err := genNextGCTask(store, bo, safePoint, key)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is the variable key initialized?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what would happen when region split during different call of this function?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to init, default values "" is ok.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

task is defined by range, not region, so the worker will deal with the two regions if it splited.

@@ -123,33 +124,76 @@ func (s *testGCWorkerSuite) TestPrepareGC(c *C) {
safePoint, err = s.gcWorker.loadTime(gcSafePointKey, s.gcWorker.session)
c.Assert(err, IsNil)
s.timeEqual(c, safePoint.Add(time.Minute*30), now, 2*time.Second)

// change GC concurrency
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Format

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

func (w *gcTaskWorker) run() {
defer w.wg.Done()
for task := range w.taskCh {
err := w.doGCForRange(task.startKey, task.endKey, task.safePoint)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In region split case, will some worker GC the same region? Is there any problem for this situation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem here, see above

for task := range w.taskCh {
err := w.doGCForRange(task.startKey, task.endKey, task.safePoint)
if err != nil {
log.Errorf("[gc worker] %s, gc interupted because get region error, err %v", w.identifier, errors.Trace(err))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better log it with the task info.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

gcConcurrency = gcDefaultConcurrency
}

return doGC(ctx, w.store, safePoint, w.uuid, gcConcurrency)
Copy link
Contributor

@tiancaiamao tiancaiamao Feb 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So many doGC makes me confused, could you change this function name to doGCParallelly

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this method can be merged to runGCJob

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Empty function?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?this func is used to get the gcConcurrency

@zhangjinpeng87
Copy link
Contributor Author

LGTM

@wentaoxu
Copy link
Contributor

wentaoxu commented Feb 9, 2018

/run-all-tests

@zhangjinpeng87
Copy link
Contributor Author

@coocood PTAL

@tiancaiamao
Copy link
Contributor

LGTM @coocood

return task, nil
}

func doGCParallel(ctx goctx.Context, store tikv.Storage, safePoint uint64, identifier string, concurrency int) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this function as a method of GCWorker is better.
We can remove RunGCJob as it is only used by a bench program.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@coocood
Copy link
Member

coocood commented Feb 9, 2018

LGTM

@coocood
Copy link
Member

coocood commented Feb 9, 2018

/run-all-tests

@wentaoxu wentaoxu merged commit a63601a into master Feb 9, 2018
Copy link
Contributor Author

@zhangjinpeng87 zhangjinpeng87 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@ngaut ngaut deleted the zhangjinpeng/parallel-gc branch February 11, 2018 10:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants