From e44733a16e650b8ef8e4bd37b1e6c4d99b00fc11 Mon Sep 17 00:00:00 2001 From: fengou1 <85682690+fengou1@users.noreply.github.com> Date: Tue, 7 Feb 2023 00:11:58 +0800 Subject: [PATCH] br: add retry for prepare flashback for backup cluster is empty and there are only one region (#41059) close pingcap/tidb#41058 --- br/pkg/restore/data.go | 35 +++++++++++++++++++++-------------- br/pkg/utils/backoff.go | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 14 deletions(-) diff --git a/br/pkg/restore/data.go b/br/pkg/restore/data.go index d4254c60adbff..265126b9411af 100644 --- a/br/pkg/restore/data.go +++ b/br/pkg/restore/data.go @@ -302,22 +302,29 @@ func (recovery *Recovery) WaitApply(ctx context.Context) (err error) { // prepare the region for flashback the data, the purpose is to stop region service, put region in flashback state func (recovery *Recovery) PrepareFlashbackToVersion(ctx context.Context, resolveTS uint64, startTS uint64) (err error) { - handler := func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) { - stats, err := ddl.SendPrepareFlashbackToVersionRPC(ctx, recovery.mgr.GetStorage().(tikv.Storage), resolveTS, startTS, r) - return stats, err - } + retryErr := utils.WithRetry( + ctx, + func() error { + handler := func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) { + stats, err := ddl.SendPrepareFlashbackToVersionRPC(ctx, recovery.mgr.GetStorage().(tikv.Storage), resolveTS, startTS, r) + return stats, err + } - runner := rangetask.NewRangeTaskRunner("br-flashback-prepare-runner", recovery.mgr.GetStorage().(tikv.Storage), int(recovery.concurrency), handler) - // Run prepare flashback on the entire TiKV cluster. Empty keys means the range is unbounded. - err = runner.RunOnRange(ctx, []byte(""), []byte("")) - if err != nil { - log.Error("region flashback prepare get error") - return errors.Trace(err) - } - recovery.progress.Inc() - log.Info("region flashback prepare complete", zap.Int("regions", runner.CompletedRegions())) + runner := rangetask.NewRangeTaskRunner("br-flashback-prepare-runner", recovery.mgr.GetStorage().(tikv.Storage), int(recovery.concurrency), handler) + // Run prepare flashback on the entire TiKV cluster. Empty keys means the range is unbounded. + err = runner.RunOnRange(ctx, []byte(""), []byte("")) + if err != nil { + log.Warn("region flashback prepare get error") + return errors.Trace(err) + } + log.Info("region flashback prepare complete", zap.Int("regions", runner.CompletedRegions())) + return nil + }, + utils.NewFlashBackBackoffer(), + ) - return nil + recovery.progress.Inc() + return retryErr } // flashback the region data to version resolveTS diff --git a/br/pkg/utils/backoff.go b/br/pkg/utils/backoff.go index 5353c972d24ad..bff2490b56650 100644 --- a/br/pkg/utils/backoff.go +++ b/br/pkg/utils/backoff.go @@ -33,6 +33,11 @@ const ( resetTSRetryTimeExt = 600 resetTSWaitIntervalExt = 500 * time.Millisecond resetTSMaxWaitIntervalExt = 300 * time.Second + + // region heartbeat are 10 seconds by default, if some region has 2 heartbeat missing (15 seconds), it appear to be a network issue between PD and TiKV. + flashbackRetryTime = 3 + flashbackWaitInterval = 3000 * time.Millisecond + flashbackMaxWaitInterval = 15 * time.Second ) // RetryState is the mutable state needed for retrying. @@ -204,3 +209,34 @@ func (bo *pdReqBackoffer) NextBackoff(err error) time.Duration { func (bo *pdReqBackoffer) Attempt() int { return bo.attempt } + +type flashbackBackoffer struct { + attempt int + delayTime time.Duration + maxDelayTime time.Duration +} + +// NewBackoffer creates a new controller regulating a truncated exponential backoff. +func NewFlashBackBackoffer() Backoffer { + return &flashbackBackoffer{ + attempt: flashbackRetryTime, + delayTime: flashbackWaitInterval, + maxDelayTime: flashbackMaxWaitInterval, + } +} + +// retry 3 times when prepare flashback failure. +func (bo *flashbackBackoffer) NextBackoff(err error) time.Duration { + bo.delayTime = 2 * bo.delayTime + bo.attempt-- + log.Warn("region may not ready to serve, retry it...", zap.Error(err)) + + if bo.delayTime > bo.maxDelayTime { + return bo.maxDelayTime + } + return bo.delayTime +} + +func (bo *flashbackBackoffer) Attempt() int { + return bo.attempt +}