From b43c41c7b6f2fd175bb4816ba7872c823c9d7714 Mon Sep 17 00:00:00 2001 From: XuWentao <30992981+wentaoxu@users.noreply.github.com> Date: Fri, 9 Feb 2018 23:17:48 +0800 Subject: [PATCH] store/tikv: do gc parallel (#5850) * do gc parallel --- cmd/benchdb/main.go | 21 --- store/tikv/gc_worker.go | 253 +++++++++++++++++++++++++---------- store/tikv/gc_worker_test.go | 24 ++++ 3 files changed, 209 insertions(+), 89 deletions(-) diff --git a/cmd/benchdb/main.go b/cmd/benchdb/main.go index 9c790ad3f24c1..a40a179f669d8 100644 --- a/cmd/benchdb/main.go +++ b/cmd/benchdb/main.go @@ -27,7 +27,6 @@ import ( "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/terror" "github.com/pingcap/tidb/util/logutil" - "golang.org/x/net/context" ) var ( @@ -76,8 +75,6 @@ func main() { ut.updateRangeRows(spec) case "select": ut.selectRows(spec) - case "gc": - ut.manualGC(nil) case "query": ut.query(spec) default: @@ -272,24 +269,6 @@ func (ut *benchDB) selectRows(spec string) { }) } -// manualGC manually triggers GC and sends to done channel after finished. -func (ut *benchDB) manualGC(done chan bool) { - cLog("GC started") - start := time.Now() - ver, err := ut.store.CurrentVersion() - if err != nil { - log.Fatal(err) - } - err = tikv.RunGCJob(context.Background(), ut.store, ver.Ver, "benchDB") - if err != nil { - log.Fatal(err) - } - cLog("GC finished, duration ", time.Since(start)) - if done != nil { - done <- true - } -} - func (ut *benchDB) query(spec string) { strs := strings.Split(spec, ":") sql := strs[0] diff --git a/store/tikv/gc_worker.go b/store/tikv/gc_worker.go index 55abfa1741f86..40b4a1353baed 100644 --- a/store/tikv/gc_worker.go +++ b/store/tikv/gc_worker.go @@ -19,6 +19,7 @@ import ( "os" "strconv" "sync" + "sync/atomic" "time" log "github.com/Sirupsen/logrus" @@ -205,12 +206,17 @@ const ( gcDefaultLifeTime = time.Minute * 10 gcSafePointKey = "tikv_gc_safe_point" gcSavedSafePoint = "/tidb/store/gcworker/saved_safe_point" - gcSafePointCacheInterval = time.Second * 100 gcSafePointUpdateInterval = time.Second * 10 gcSafePointQuickRepeatInterval = time.Second gcCPUTimeInaccuracyBound = time.Second + gcConcurrencyKey = "tikv_gc_concurrency" + gcDefaultConcurrency = 1 + gcMinConcurrency = 1 + gcMaxConcurrency = 128 ) +var gcSafePointCacheInterval = time.Second * 100 + var gcVariableComments = map[string]string{ gcLeaderUUIDKey: "Current GC worker leader UUID. (DO NOT EDIT)", gcLeaderDescKey: "Host name and pid of current GC leader. (DO NOT EDIT)", @@ -219,6 +225,7 @@ var gcVariableComments = map[string]string{ gcRunIntervalKey: "GC run interval, at least 10m, in Go format.", gcLifeTimeKey: "All versions within life time will not be collected by GC, at least 10m, in Go format.", gcSafePointKey: "All versions after safe point can be accessed. (DO NOT EDIT)", + gcConcurrencyKey: "How many go routines used to do GC parallel, [1, 128], default 1", } func (w *GCWorker) start(ctx goctx.Context, wg *sync.WaitGroup) { @@ -243,7 +250,7 @@ func (w *GCWorker) start(ctx goctx.Context, wg *sync.WaitGroup) { break } case <-ctx.Done(): - log.Infof("[gc worker] (%s) quit.", w.uuid) + log.Infof("[gc worker] %s quit.", w.uuid) return } } @@ -394,42 +401,25 @@ func (w *GCWorker) calculateNewSafePoint(now time.Time) (*time.Time, error) { return &safePoint, nil } -// RunGCJob sends GC command to KV. it is exported for testing purpose, do not use it with GCWorker at the same time. -func RunGCJob(ctx goctx.Context, store kv.Storage, safePoint uint64, identifier string) error { - s, ok := store.(*tikvStore) - if !ok { - return errors.New("should use tikv driver") - } - err := resolveLocks(ctx, s, safePoint, identifier) - if err != nil { - return errors.Trace(err) - } - err = doGC(ctx, s, safePoint, identifier) - if err != nil { - return errors.Trace(err) - } - return nil -} - func (w *GCWorker) runGCJob(ctx goctx.Context, safePoint uint64) { gcWorkerCounter.WithLabelValues("run_job").Inc() - err := resolveLocks(ctx, w.store, safePoint, w.uuid) + err := w.resolveLocks(ctx, safePoint) if err != nil { - log.Errorf("[gc worker] %s resolve locks returns an error %v", w.uuid, err) + log.Errorf("[gc worker] %s resolve locks returns an error %v", w.uuid, errors.ErrorStack(err)) gcJobFailureCounter.WithLabelValues("resolve_lock").Inc() w.done <- errors.Trace(err) return } err = w.deleteRanges(ctx, safePoint) if err != nil { - log.Errorf("[gc worker] %s delete range returns an error %v", w.uuid, err) + log.Errorf("[gc worker] %s delete range returns an error %v", w.uuid, errors.ErrorStack(err)) gcJobFailureCounter.WithLabelValues("delete_range").Inc() w.done <- errors.Trace(err) return } - err = doGC(ctx, w.store, safePoint, w.uuid) + err = w.doGC(ctx, safePoint) if err != nil { - log.Errorf("[gc worker] %s do GC returns an error %v", w.uuid, err) + log.Errorf("[gc worker] %s do GC returns an error %v", w.uuid, errors.ErrorStack(err)) w.gcIsRunning = false gcJobFailureCounter.WithLabelValues("gc").Inc() w.done <- errors.Trace(err) @@ -519,7 +509,36 @@ func (w *GCWorker) deleteRanges(ctx goctx.Context, safePoint uint64) error { return nil } -func resolveLocks(ctx goctx.Context, store *tikvStore, safePoint uint64, identifier string) error { +func (w *GCWorker) loadGCConcurrencyWithDefault() (int, error) { + str, err := w.loadValueFromSysTable(gcConcurrencyKey, w.session) + if err != nil { + return gcDefaultConcurrency, errors.Trace(err) + } + if str == "" { + err = w.saveValueToSysTable(gcConcurrencyKey, strconv.Itoa(gcDefaultConcurrency), w.session) + if err != nil { + return gcDefaultConcurrency, errors.Trace(err) + } + return gcDefaultConcurrency, nil + } + + jobConcurrency, err := strconv.Atoi(str) + if err != nil { + return gcDefaultConcurrency, err + } + + if jobConcurrency < gcMinConcurrency { + jobConcurrency = gcMinConcurrency + } + + if jobConcurrency > gcMaxConcurrency { + jobConcurrency = gcMaxConcurrency + } + + return jobConcurrency, nil +} + +func (w *GCWorker) resolveLocks(ctx goctx.Context, safePoint uint64) error { gcWorkerCounter.WithLabelValues("resolve_locks").Inc() req := &tikvrpc.Request{ Type: tikvrpc.CmdScanLock, @@ -529,7 +548,7 @@ func resolveLocks(ctx goctx.Context, store *tikvStore, safePoint uint64, identif } bo := NewBackoffer(gcResolveLockMaxBackoff, ctx) - log.Infof("[gc worker] %s start resolve locks, safePoint: %v.", identifier, safePoint) + log.Infof("[gc worker] %s start resolve locks, safePoint: %v.", w.uuid, safePoint) startTime := time.Now() regions, totalResolvedLocks := 0, 0 @@ -541,11 +560,11 @@ func resolveLocks(ctx goctx.Context, store *tikvStore, safePoint uint64, identif default: } - loc, err := store.regionCache.LocateKey(bo, key) + loc, err := w.store.regionCache.LocateKey(bo, key) if err != nil { return errors.Trace(err) } - resp, err := store.SendReq(bo, req, loc.Region, readTimeoutMedium) + resp, err := w.store.SendReq(bo, req, loc.Region, readTimeoutMedium) if err != nil { return errors.Trace(err) } @@ -572,7 +591,7 @@ func resolveLocks(ctx goctx.Context, store *tikvStore, safePoint uint64, identif for i := range locksInfo { locks[i] = newLock(locksInfo[i]) } - ok, err1 := store.lockResolver.ResolveLocks(bo, locks) + ok, err1 := w.store.lockResolver.ResolveLocks(bo, locks) if err1 != nil { return errors.Trace(err1) } @@ -590,49 +609,69 @@ func resolveLocks(ctx goctx.Context, store *tikvStore, safePoint uint64, identif break } } - log.Infof("[gc worker] %s finish resolve locks, safePoint: %v, regions: %v, total resolved: %v, cost time: %s", identifier, safePoint, regions, totalResolvedLocks, time.Since(startTime)) + log.Infof("[gc worker] %s finish resolve locks, safePoint: %v, regions: %v, total resolved: %v, cost time: %s", + w.uuid, safePoint, regions, totalResolvedLocks, time.Since(startTime)) gcHistogram.WithLabelValues("resolve_locks").Observe(time.Since(startTime).Seconds()) return nil } -func doGC(ctx goctx.Context, store *tikvStore, safePoint uint64, identifier string) error { - gcWorkerCounter.WithLabelValues("do_gc").Inc() - - err := store.gcWorker.saveSafePoint(gcSavedSafePoint, safePoint) - if err != nil { - return errors.Trace(err) - } - - // Sleep to wait for all other tidb instances update their safepoint cache. - time.Sleep(gcSafePointCacheInterval) +type gcTask struct { + startKey []byte + endKey []byte + safePoint uint64 +} - log.Infof("[gc worker] %s start gc, safePoint: %v.", identifier, safePoint) - startTime := time.Now() - successRegions := 0 - failedRegions := 0 +type gcTaskWorker struct { + identifier string + store *tikvStore + taskCh chan *gcTask + wg *sync.WaitGroup + // use atomic to read and set + successRegions *int32 + failedRegions *int32 +} - ticker := time.NewTicker(gcJobLogTickInterval) - defer ticker.Stop() +func newGCTaskWorker(store *tikvStore, taskCh chan *gcTask, wg *sync.WaitGroup, identifer string, successRegions *int32, failedRegions *int32) *gcTaskWorker { + return &gcTaskWorker{ + identifer, + store, + taskCh, + wg, + successRegions, + failedRegions, + } +} - bo := NewBackoffer(gcOneRegionMaxBackoff, ctx) - var key []byte - for { - select { - case <-ctx.Done(): - return errors.New("[gc worker] gc job canceled") - case <-ticker.C: - log.Infof("[gc worker] %s gc in process, safePoint: %v, successful regions: %v, failed regions: %v, cost time: %s", - identifier, safePoint, successRegions, failedRegions, time.Since(startTime)) - default: +func (w *gcTaskWorker) run() { + defer w.wg.Done() + for task := range w.taskCh { + err := w.doGCForRange(task.startKey, task.endKey, task.safePoint) + if err != nil { + log.Errorf("[gc worker] %s, gc interupted because get region(%v, %v) error, err %v", + w.identifier, task.startKey, task.endKey, errors.Trace(err)) } + } +} - loc, err := store.regionCache.LocateKey(bo, key) +func (w *gcTaskWorker) doGCForRange(startKey []byte, endKey []byte, safePoint uint64) error { + var successRegions int32 + var failedRegions int32 + defer func() { + atomic.AddInt32(w.successRegions, successRegions) + atomic.AddInt32(w.failedRegions, failedRegions) + gcActionRegionResultCounter.WithLabelValues("success").Add(float64(successRegions)) + gcActionRegionResultCounter.WithLabelValues("fail").Add(float64(failedRegions)) + }() + key := startKey + for { + bo := NewBackoffer(gcOneRegionMaxBackoff, goctx.Background()) + loc, err := w.store.GetRegionCache().LocateKey(bo, key) if err != nil { return errors.Trace(err) } var regionErr *errorpb.Error - regionErr, err = doGCForOneRegion(bo, store, safePoint, loc.Region) + regionErr, err = w.doGCForRegion(bo, safePoint, loc.Region) // we check regionErr here first, because we know 'regionErr' and 'err' should not return together, to keep it to // make the process correct. @@ -644,28 +683,23 @@ func doGC(ctx goctx.Context, store *tikvStore, safePoint uint64, identifier stri } if err != nil { + log.Warnf("[gc worker] %s gc for range [%v, %v) safepoint: %v, failed, err: %v", w.identifier, startKey, endKey, safePoint, err) failedRegions++ - gcActionRegionResultCounter.WithLabelValues("fail").Inc() - log.Warnf("[gc worker] %s failed to do gc on region(%s, %s), ignore it", identifier, string(loc.StartKey), string(loc.EndKey)) } else { successRegions++ - gcActionRegionResultCounter.WithLabelValues("success").Inc() } key = loc.EndKey - if len(key) == 0 { + if len(key) == 0 || bytes.Compare(key, endKey) >= 0 { break } - bo = NewBackoffer(gcOneRegionMaxBackoff, ctx) } - log.Infof("[gc worker] %s finish gc, safePoint: %v, successful regions: %v, failed regions: %v, cost time: %s", - identifier, safePoint, successRegions, failedRegions, time.Since(startTime)) - gcHistogram.WithLabelValues("do_gc").Observe(time.Since(startTime).Seconds()) + return nil } // these two errors should not return together, for more, see the func 'doGC' -func doGCForOneRegion(bo *Backoffer, store *tikvStore, safePoint uint64, region RegionVerID) (*errorpb.Error, error) { +func (w *gcTaskWorker) doGCForRegion(bo *Backoffer, safePoint uint64, region RegionVerID) (*errorpb.Error, error) { req := &tikvrpc.Request{ Type: tikvrpc.CmdGC, GC: &kvrpcpb.GCRequest{ @@ -673,7 +707,7 @@ func doGCForOneRegion(bo *Backoffer, store *tikvStore, safePoint uint64, region }, } - resp, err := store.SendReq(bo, req, region, gcTimeout) + resp, err := w.store.SendReq(bo, req, region, gcTimeout) if err != nil { return nil, errors.Trace(err) } @@ -696,6 +730,89 @@ func doGCForOneRegion(bo *Backoffer, store *tikvStore, safePoint uint64, region return nil, nil } +func (w *GCWorker) genNextGCTask(bo *Backoffer, safePoint uint64, key kv.Key) (*gcTask, error) { + loc, err := w.store.GetRegionCache().LocateKey(bo, key) + if err != nil { + return nil, errors.Trace(err) + } + + task := &gcTask{ + startKey: key, + endKey: loc.EndKey, + safePoint: safePoint, + } + return task, nil +} + +func (w *GCWorker) doGC(ctx goctx.Context, safePoint uint64) error { + gcWorkerCounter.WithLabelValues("do_gc").Inc() + + err := w.saveSafePoint(gcSavedSafePoint, safePoint) + if err != nil { + return errors.Trace(err) + } + + // Sleep to wait for all other tidb instances update their safepoint cache. + time.Sleep(gcSafePointCacheInterval) + + concurrency, err := w.loadGCConcurrencyWithDefault() + if err != nil { + log.Errorf("[gc worker] %s failed to load gcConcurrency, err %s", w.uuid, err) + concurrency = gcDefaultConcurrency + } + + log.Infof("[gc worker] %s start gc, concurrency %v, safePoint: %v.", w.uuid, concurrency, safePoint) + startTime := time.Now() + var successRegions int32 + var failedRegions int32 + + ticker := time.NewTicker(gcJobLogTickInterval) + defer ticker.Stop() + + // Create task queue and start task workers. + gcTaskCh := make(chan *gcTask, concurrency) + var wg sync.WaitGroup + for i := 0; i < concurrency; i++ { + w := newGCTaskWorker(w.store, gcTaskCh, &wg, w.uuid, &successRegions, &failedRegions) + wg.Add(1) + go w.run() + } + + var key []byte + defer func() { + close(gcTaskCh) + wg.Wait() + log.Infof("[gc worker] %s finish gc, safePoint: %v, successful regions: %v, failed regions: %v, total cost time: %s", + w.uuid, safePoint, atomic.LoadInt32(&successRegions), atomic.LoadInt32(&failedRegions), time.Since(startTime)) + gcHistogram.WithLabelValues("do_gc").Observe(time.Since(startTime).Seconds()) + }() + + for { + select { + case <-ctx.Done(): + return errors.New("[gc worker] gc job canceled") + case <-ticker.C: + log.Infof("[gc worker] %s gc in process, safePoint: %v, successful regions: %v, failed regions: %v, total cost time: %s", + w.uuid, safePoint, atomic.LoadInt32(&successRegions), atomic.LoadInt32(&failedRegions), time.Since(startTime)) + default: + } + + bo := NewBackoffer(gcOneRegionMaxBackoff, ctx) + task, err := w.genNextGCTask(bo, safePoint, key) + if err != nil { + return errors.Trace(err) + } + if task != nil { + gcTaskCh <- task + key = task.endKey + } + + if len(key) == 0 { + return nil + } + } +} + func (w *GCWorker) checkLeader() (bool, error) { gcWorkerCounter.WithLabelValues("check_leader").Inc() session := createSession(w.store) diff --git a/store/tikv/gc_worker_test.go b/store/tikv/gc_worker_test.go index 6c3f6221b4b24..97a5202f740a1 100644 --- a/store/tikv/gc_worker_test.go +++ b/store/tikv/gc_worker_test.go @@ -15,6 +15,7 @@ package tikv import ( "math" + "strconv" "time" . "github.com/pingcap/check" @@ -101,4 +102,27 @@ func (s *testGCWorkerSuite) TestPrepareGC(c *C) { safePoint, err = s.gcWorker.loadTime(gcSafePointKey, s.gcWorker.session) c.Assert(err, IsNil) s.timeEqual(c, safePoint.Add(time.Minute*30), now, 2*time.Second) + + // Change GC concurrency. + concurrency, err := s.gcWorker.loadGCConcurrencyWithDefault() + c.Assert(err, IsNil) + c.Assert(concurrency, Equals, gcDefaultConcurrency) + + err = s.gcWorker.saveValueToSysTable(gcConcurrencyKey, strconv.Itoa(gcMinConcurrency), s.gcWorker.session) + c.Assert(err, IsNil) + concurrency, err = s.gcWorker.loadGCConcurrencyWithDefault() + c.Assert(err, IsNil) + c.Assert(concurrency, Equals, gcMinConcurrency) + + err = s.gcWorker.saveValueToSysTable(gcConcurrencyKey, strconv.Itoa(-1), s.gcWorker.session) + c.Assert(err, IsNil) + concurrency, err = s.gcWorker.loadGCConcurrencyWithDefault() + c.Assert(err, IsNil) + c.Assert(concurrency, Equals, gcMinConcurrency) + + err = s.gcWorker.saveValueToSysTable(gcConcurrencyKey, strconv.Itoa(1000000), s.gcWorker.session) + c.Assert(err, IsNil) + concurrency, err = s.gcWorker.loadGCConcurrencyWithDefault() + c.Assert(err, IsNil) + c.Assert(concurrency, Equals, gcMaxConcurrency) }