Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv: run gc workers parallelly #5837

Merged
merged 15 commits into from
Feb 9, 2018
228 changes: 182 additions & 46 deletions store/tikv/gcworker/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"os"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/juju/errors"
Expand Down Expand Up @@ -102,14 +103,19 @@ const (
gcDefaultRunInterval = time.Minute * 10
gcWaitTime = time.Minute * 1

gcLifeTimeKey = "tikv_gc_life_time"
gcDefaultLifeTime = time.Minute * 10
gcSafePointKey = "tikv_gc_safe_point"
gcSafePointCacheInterval = tikv.GcSafePointCacheInterval
gcLifeTimeKey = "tikv_gc_life_time"
gcDefaultLifeTime = time.Minute * 10
gcSafePointKey = "tikv_gc_safe_point"
gcConcurrencyKey = "tikv_gc_concurrency"
gcDefaultConcurrency = 1
gcMinConcurrency = 1
gcMaxConcurrency = 128
// We don't want gc to sweep out the cached info belong to other processes, like coprocessor.
gcScanLockLimit = tikv.ResolvedCacheSize / 2
)

var gcSafePointCacheInterval = tikv.GcSafePointCacheInterval

var gcVariableComments = map[string]string{
gcLeaderUUIDKey: "Current GC worker leader UUID. (DO NOT EDIT)",
gcLeaderDescKey: "Host name and pid of current GC leader. (DO NOT EDIT)",
Expand All @@ -118,6 +124,7 @@ var gcVariableComments = map[string]string{
gcRunIntervalKey: "GC run interval, at least 10m, in Go format.",
gcLifeTimeKey: "All versions within life time will not be collected by GC, at least 10m, in Go format.",
gcSafePointKey: "All versions after safe point can be accessed. (DO NOT EDIT)",
gcConcurrencyKey: "How many go routines used to do GC parallel, [1, 128], default 1",
}

func (w *GCWorker) start(ctx goctx.Context, wg *sync.WaitGroup) {
Expand Down Expand Up @@ -150,7 +157,7 @@ func (w *GCWorker) start(ctx goctx.Context, wg *sync.WaitGroup) {
break
}
case <-ctx.Done():
log.Infof("[gc worker] (%s) quit.", w.uuid)
log.Infof("[gc worker] %s quit.", w.uuid)
return
}
}
Expand Down Expand Up @@ -307,13 +314,22 @@ func RunGCJob(ctx goctx.Context, s tikv.Storage, safePoint uint64, identifier st
if err != nil {
return errors.Trace(err)
}
err = doGC(ctx, s, safePoint, identifier)
err = doGCParallel(ctx, s, safePoint, identifier, gcDefaultConcurrency)
if err != nil {
return errors.Trace(err)
}
return nil
}

func (w *GCWorker) doGC(ctx goctx.Context, safePoint uint64) error {
gcConcurrency, err := w.loadGCConcurrencyWithDefault()
if err != nil {
log.Errorf("[gc worker] %s failed to load gcConcurrency, err %s", w.uuid, err)
gcConcurrency = gcDefaultConcurrency
}

return doGCParallel(ctx, w.store, safePoint, w.uuid, gcConcurrency)
}
func (w *GCWorker) runGCJob(ctx goctx.Context, safePoint uint64) {
gcWorkerCounter.WithLabelValues("run_job").Inc()
err := resolveLocks(ctx, w.store, safePoint, w.uuid)
Expand All @@ -330,7 +346,7 @@ func (w *GCWorker) runGCJob(ctx goctx.Context, safePoint uint64) {
w.done <- errors.Trace(err)
return
}
err = doGC(ctx, w.store, safePoint, w.uuid)
err = w.doGC(ctx, safePoint)
if err != nil {
log.Errorf("[gc worker] %s do GC returns an error %v", w.uuid, err)
w.gcIsRunning = false
Expand Down Expand Up @@ -422,6 +438,35 @@ func (w *GCWorker) deleteRanges(ctx goctx.Context, safePoint uint64) error {
return nil
}

func (w *GCWorker) loadGCConcurrencyWithDefault() (int, error) {
str, err := w.loadValueFromSysTable(gcConcurrencyKey, w.session)
if err != nil {
return gcDefaultConcurrency, errors.Trace(err)
}
if str == "" {
err = w.saveValueToSysTable(gcConcurrencyKey, strconv.Itoa(gcDefaultConcurrency), w.session)
if err != nil {
return gcDefaultConcurrency, errors.Trace(err)
}
return gcDefaultConcurrency, nil
}

jobConcurrency, err := strconv.Atoi(str)
if err != nil {
return gcDefaultConcurrency, err
}

if jobConcurrency < gcMinConcurrency {
jobConcurrency = gcMinConcurrency
}

if jobConcurrency > gcMaxConcurrency {
jobConcurrency = gcMaxConcurrency
}

return jobConcurrency, nil
}

func resolveLocks(ctx goctx.Context, store tikv.Storage, safePoint uint64, identifier string) error {
gcWorkerCounter.WithLabelValues("resolve_locks").Inc()

Expand Down Expand Up @@ -511,44 +556,63 @@ func resolveLocks(ctx goctx.Context, store tikv.Storage, safePoint uint64, ident
return nil
}

func doGC(ctx goctx.Context, store tikv.Storage, safePoint uint64, identifier string) error {
gcWorkerCounter.WithLabelValues("do_gc").Inc()

err := saveSafePoint(store.GetSafePointKV(), tikv.GcSavedSafePoint, safePoint)
if err != nil {
return errors.Trace(err)
}

// Sleep to wait for all other tidb instances update their safepoint cache.
time.Sleep(gcSafePointCacheInterval)
type gcTask struct {
startKey []byte
endKey []byte
safePoint uint64
}

log.Infof("[gc worker] %s start gc, safePoint: %v.", identifier, safePoint)
startTime := time.Now()
successRegions := 0
failedRegions := 0
type gcTaskWorker struct {
identifier string
store tikv.Storage
taskCh chan *gcTask
wg *sync.WaitGroup
// use atomic to read and set
successRegions *int32
failedRegions *int32
}

ticker := time.NewTicker(gcJobLogTickInterval)
defer ticker.Stop()
func newGCTaskWorker(store tikv.Storage, taskCh chan *gcTask, wg *sync.WaitGroup, identifer string, successRegions *int32, failedRegions *int32) *gcTaskWorker {
return &gcTaskWorker{
identifer,
store,
taskCh,
wg,
successRegions,
failedRegions,
}
}

bo := tikv.NewBackoffer(tikv.GcOneRegionMaxBackoff, ctx)
var key []byte
for {
select {
case <-ctx.Done():
return errors.New("[gc worker] gc job canceled")
case <-ticker.C:
log.Infof("[gc worker] %s gc in process, safePoint: %v, successful regions: %v, failed regions: %v, cost time: %s",
identifier, safePoint, successRegions, failedRegions, time.Since(startTime))
default:
func (w *gcTaskWorker) run() {
defer w.wg.Done()
for task := range w.taskCh {
err := w.doGCForRange(task.startKey, task.endKey, task.safePoint)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In region split case, will some worker GC the same region? Is there any problem for this situation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem here, see above

if err != nil {
log.Errorf("[gc worker] %s, gc interupted because get region(%v, %v) error, err %v",
w.identifier, task.startKey, task.endKey, errors.Trace(err))
}
}
}

loc, err := store.GetRegionCache().LocateKey(bo, key)
func (w *gcTaskWorker) doGCForRange(startKey []byte, endKey []byte, safePoint uint64) error {
var successRegions int32
var failedRegions int32
defer func() {
atomic.AddInt32(w.successRegions, successRegions)
atomic.AddInt32(w.failedRegions, failedRegions)
gcActionRegionResultCounter.WithLabelValues("success").Add(float64(successRegions))
gcActionRegionResultCounter.WithLabelValues("fail").Add(float64(failedRegions))
}()
key := startKey
for {
bo := tikv.NewBackoffer(tikv.GcOneRegionMaxBackoff, goctx.Background())
loc, err := w.store.GetRegionCache().LocateKey(bo, key)
if err != nil {
return errors.Trace(err)
}

var regionErr *errorpb.Error
regionErr, err = doGCForOneRegion(bo, store, safePoint, loc.Region)
regionErr, err = w.doGCForRegion(bo, safePoint, loc.Region)

// we check regionErr here first, because we know 'regionErr' and 'err' should not return together, to keep it to
// make the process correct.
Expand All @@ -560,36 +624,31 @@ func doGC(ctx goctx.Context, store tikv.Storage, safePoint uint64, identifier st
}

if err != nil {
log.Warnf("[gc worker] %s gc for range [%v, %v) safepoint: %v, failed, err: %v", w.identifier, startKey, endKey, safePoint, err)
failedRegions++
gcActionRegionResultCounter.WithLabelValues("fail").Inc()
log.Warnf("[gc worker] %s failed to do gc on region(%s, %s), ignore it", identifier, string(loc.StartKey), string(loc.EndKey))
} else {
successRegions++
gcActionRegionResultCounter.WithLabelValues("success").Inc()
}

key = loc.EndKey
if len(key) == 0 {
if len(key) == 0 || bytes.Compare(key, endKey) >= 0 {
break
}
bo = tikv.NewBackoffer(tikv.GcOneRegionMaxBackoff, ctx)
}
log.Infof("[gc worker] %s finish gc, safePoint: %v, successful regions: %v, failed regions: %v, cost time: %s",
identifier, safePoint, successRegions, failedRegions, time.Since(startTime))
gcHistogram.WithLabelValues("do_gc").Observe(time.Since(startTime).Seconds())

return nil
}

// these two errors should not return together, for more, see the func 'doGC'
func doGCForOneRegion(bo *tikv.Backoffer, store tikv.Storage, safePoint uint64, region tikv.RegionVerID) (*errorpb.Error, error) {
// these two errors should not return together, for more, see the func 'doGCParallel'
func (w *gcTaskWorker) doGCForRegion(bo *tikv.Backoffer, safePoint uint64, region tikv.RegionVerID) (*errorpb.Error, error) {
req := &tikvrpc.Request{
Type: tikvrpc.CmdGC,
GC: &kvrpcpb.GCRequest{
SafePoint: safePoint,
},
}

resp, err := store.SendReq(bo, req, region, tikv.GCTimeout)
resp, err := w.store.SendReq(bo, req, region, tikv.GCTimeout)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -612,6 +671,83 @@ func doGCForOneRegion(bo *tikv.Backoffer, store tikv.Storage, safePoint uint64,
return nil, nil
}

func genNextGCTask(store tikv.Storage, bo *tikv.Backoffer, safePoint uint64, key kv.Key) (*gcTask, error) {
loc, err := store.GetRegionCache().LocateKey(bo, key)
if err != nil {
return nil, errors.Trace(err)
}

task := &gcTask{
startKey: key,
endKey: loc.EndKey,
safePoint: safePoint,
}
return task, nil
}

func doGCParallel(ctx goctx.Context, store tikv.Storage, safePoint uint64, identifier string, concurrency int) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this function as a method of GCWorker is better.
We can remove RunGCJob as it is only used by a bench program.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

gcWorkerCounter.WithLabelValues("do_gc").Inc()

err := saveSafePoint(store.GetSafePointKV(), tikv.GcSavedSafePoint, safePoint)
if err != nil {
return errors.Trace(err)
}

// Sleep to wait for all other tidb instances update their safepoint cache.
time.Sleep(gcSafePointCacheInterval)

log.Infof("[gc worker] %s start gc, concurrency %v, safePoint: %v.", identifier, concurrency, safePoint)
startTime := time.Now()
var successRegions int32
var failedRegions int32

ticker := time.NewTicker(gcJobLogTickInterval)
defer ticker.Stop()

// Create task queue and start task workers.
gcTaskCh := make(chan *gcTask, concurrency)
var wg sync.WaitGroup
for i := 0; i < concurrency; i++ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

var wg sync.WaitGroup
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
    go w.run(&wg)
}

wg.Done when worker finish.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is in defer func

w := newGCTaskWorker(store, gcTaskCh, &wg, identifier, &successRegions, &failedRegions)
wg.Add(1)
go w.run()
}

var key []byte
defer func() {
close(gcTaskCh)
wg.Wait()
log.Infof("[gc worker] %s finish gc, safePoint: %v, successful regions: %v, failed regions: %v, total cost time: %s",
identifier, safePoint, atomic.LoadInt32(&successRegions), atomic.LoadInt32(&failedRegions), time.Since(startTime))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't matter if it's not atomic load here, because when the code run here, no worker would use successRegions/failedRegions any more.
atomic load is okay, of course.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to keep it for future.

gcHistogram.WithLabelValues("do_gc").Observe(time.Since(startTime).Seconds())
}()

for {
select {
case <-ctx.Done():
return errors.New("[gc worker] gc job canceled")
case <-ticker.C:
log.Infof("[gc worker] %s gc in process, safePoint: %v, successful regions: %v, failed regions: %v, total cost time: %s",
identifier, safePoint, atomic.LoadInt32(&successRegions), atomic.LoadInt32(&failedRegions), time.Since(startTime))
default:
}

bo := tikv.NewBackoffer(tikv.GcOneRegionMaxBackoff, ctx)
task, err := genNextGCTask(store, bo, safePoint, key)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

key is not initialized?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

start key is "", no need to do

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is the variable key initialized?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what would happen when region split during different call of this function?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to init, default values "" is ok.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

task is defined by range, not region, so the worker will deal with the two regions if it splited.

if err != nil {
return errors.Trace(err)
}
if task != nil {
gcTaskCh <- task
key = task.endKey
}

if len(key) == 0 {
return nil
}
}
}

func (w *GCWorker) checkLeader() (bool, error) {
gcWorkerCounter.WithLabelValues("check_leader").Inc()
session := createSession(w.store)
Expand Down
Loading