Skip to content

Commit

Permalink
br: refactor error handle mechanism to tolerant unexpect kv errors. (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Dec 11, 2023
1 parent 80f96ed commit eacc493
Show file tree
Hide file tree
Showing 11 changed files with 311 additions and 92 deletions.
50 changes: 12 additions & 38 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1185,17 +1185,18 @@ func OnBackupResponse(
backupTS uint64,
lockResolver *txnlock.LockResolver,
resp *backuppb.BackupResponse,
errContext *utils.ErrorContext,
) (*backuppb.BackupResponse, int, error) {
log.Debug("OnBackupResponse", zap.Reflect("resp", resp))
if resp.Error == nil {
return resp, 0, nil
}
backoffMs := 0
switch v := resp.Error.Detail.(type) {

err := resp.Error
switch v := err.Detail.(type) {
case *backuppb.Error_KvError:
if lockErr := v.KvError.Locked; lockErr != nil {
// Try to resolve lock.
log.Warn("backup occur kv error", zap.Reflect("error", v))
msBeforeExpired, err1 := lockResolver.ResolveLocks(
bo, backupTS, []*txnlock.Lock{txnlock.NewLock(lockErr)})
if err1 != nil {
Expand All @@ -1206,44 +1207,16 @@ func OnBackupResponse(
}
return nil, backoffMs, nil
}
// Backup should not meet error other than KeyLocked.
log.Error("unexpect kv error", zap.Reflect("KvError", v.KvError))
return nil, backoffMs, errors.Annotatef(berrors.ErrKVUnknown, "storeID: %d OnBackupResponse error %v", storeID, v)

case *backuppb.Error_RegionError:
regionErr := v.RegionError
// Ignore following errors.
if !(regionErr.EpochNotMatch != nil ||
regionErr.NotLeader != nil ||
regionErr.RegionNotFound != nil ||
regionErr.ServerIsBusy != nil ||
regionErr.StaleCommand != nil ||
regionErr.StoreNotMatch != nil ||
regionErr.ReadIndexNotReady != nil ||
regionErr.ProposalInMergingMode != nil) {
log.Error("unexpect region error", zap.Reflect("RegionError", regionErr))
return nil, backoffMs, errors.Annotatef(berrors.ErrKVUnknown, "storeID: %d OnBackupResponse error %v", storeID, v)
}
log.Warn("backup occur region error",
zap.Reflect("RegionError", regionErr),
zap.Uint64("storeID", storeID))
// TODO: a better backoff.
backoffMs = 1000 /* 1s */
return nil, backoffMs, nil
case *backuppb.Error_ClusterIdError:
log.Error("backup occur cluster ID error", zap.Reflect("error", v), zap.Uint64("storeID", storeID))
return nil, 0, errors.Annotatef(berrors.ErrKVClusterIDMismatch, "%v on storeID: %d", resp.Error, storeID)
default:
// UNSAFE! TODO: use meaningful error code instead of unstructured message to find failed to write error.
if utils.MessageIsRetryableStorageError(resp.GetError().GetMsg()) {
log.Warn("backup occur storage error", zap.String("error", resp.GetError().GetMsg()))
// back off 3000ms, for S3 is 99.99% available (i.e. the max outage time would less than 52.56mins per year),
// this time would be probably enough for s3 to resume.
res := errContext.HandleError(resp.Error, storeID)
switch res.Strategy {
case utils.GiveUpStrategy:
return nil, 0, errors.Annotatef(berrors.ErrKVUnknown, "storeID: %d OnBackupResponse error %s", storeID, res.Reason)
case utils.RetryStrategy:
return nil, 3000, nil
}
log.Error("backup occur unknown error", zap.String("error", resp.Error.GetMsg()), zap.Uint64("storeID", storeID))
return nil, 0, errors.Annotatef(berrors.ErrKVUnknown, "%v on storeID: %d", resp.Error, storeID)
}
return nil, 3000, errors.Annotatef(berrors.ErrKVUnknown, "unreachable")
}

func (bc *Client) handleFineGrained(
Expand Down Expand Up @@ -1273,12 +1246,13 @@ func (bc *Client) handleFineGrained(
}
hasProgress := false
backoffMill := 0
errContext := utils.NewErrorContext("handleFineGrainedBackup", 10)
err = SendBackup(
ctx, storeID, client, req,
// Handle responses with the same backoffer.
func(resp *backuppb.BackupResponse) error {
response, shouldBackoff, err1 :=
OnBackupResponse(storeID, bo, req.EndVersion, lockResolver, resp)
OnBackupResponse(storeID, bo, req.EndVersion, lockResolver, resp, errContext)
if err1 != nil {
return err1
}
Expand Down
19 changes: 10 additions & 9 deletions br/pkg/backup/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/tidb/br/pkg/mock"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/tablecodec"
Expand Down Expand Up @@ -230,20 +231,20 @@ func TestOnBackupRegionErrorResponse(t *testing.T) {
}

cases := []Case{
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{NotLeader: &errorpb.NotLeader{}}), exceptedBackoffMs: 1000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{RegionNotFound: &errorpb.RegionNotFound{}}), exceptedBackoffMs: 1000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{NotLeader: &errorpb.NotLeader{}}), exceptedBackoffMs: 3000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{RegionNotFound: &errorpb.RegionNotFound{}}), exceptedBackoffMs: 3000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{KeyNotInRegion: &errorpb.KeyNotInRegion{}}), exceptedBackoffMs: 0, exceptedErr: true},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}), exceptedBackoffMs: 1000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{}}), exceptedBackoffMs: 1000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{StaleCommand: &errorpb.StaleCommand{}}), exceptedBackoffMs: 1000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{StoreNotMatch: &errorpb.StoreNotMatch{}}), exceptedBackoffMs: 1000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}), exceptedBackoffMs: 3000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{}}), exceptedBackoffMs: 3000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{StaleCommand: &errorpb.StaleCommand{}}), exceptedBackoffMs: 3000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{StoreNotMatch: &errorpb.StoreNotMatch{}}), exceptedBackoffMs: 3000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{RaftEntryTooLarge: &errorpb.RaftEntryTooLarge{}}), exceptedBackoffMs: 0, exceptedErr: true},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ReadIndexNotReady: &errorpb.ReadIndexNotReady{}}), exceptedBackoffMs: 1000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ProposalInMergingMode: &errorpb.ProposalInMergingMode{}}), exceptedBackoffMs: 1000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ReadIndexNotReady: &errorpb.ReadIndexNotReady{}}), exceptedBackoffMs: 3000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ProposalInMergingMode: &errorpb.ProposalInMergingMode{}}), exceptedBackoffMs: 3000, exceptedErr: false},
}
for _, cs := range cases {
t.Log(cs)
_, backoffMs, err := backup.OnBackupResponse(cs.storeID, cs.bo, cs.backupTS, cs.lockResolver, cs.resp)
_, backoffMs, err := backup.OnBackupResponse(cs.storeID, cs.bo, cs.backupTS, cs.lockResolver, cs.resp, utils.NewErrorContext("test", 1))
require.Equal(t, cs.exceptedBackoffMs, backoffMs)
if cs.exceptedErr {
require.Error(t, err)
Expand Down
39 changes: 9 additions & 30 deletions br/pkg/backup/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package backup

import (
"context"
"fmt"
"sync"

"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -73,6 +72,7 @@ func (push *pushDown) pushBackup(
})

wg := new(sync.WaitGroup)
errContext := utils.NewErrorContext("pushBackup", 10)
for _, s := range stores {
store := s
storeID := s.GetId()
Expand Down Expand Up @@ -183,35 +183,10 @@ func (push *pushDown) pushBackup(
progressCallBack(RegionUnit)
} else {
errPb := resp.GetError()
switch v := errPb.Detail.(type) {
case *backuppb.Error_KvError:
logutil.CL(ctx).Warn("backup occur kv error", zap.Reflect("error", v))

case *backuppb.Error_RegionError:
logutil.CL(ctx).Warn("backup occur region error", zap.Reflect("error", v))

case *backuppb.Error_ClusterIdError:
logutil.CL(ctx).Error("backup occur cluster ID error", zap.Reflect("error", v))
return errors.Annotatef(berrors.ErrKVClusterIDMismatch, "%v", errPb)
default:
if utils.MessageIsRetryableStorageError(errPb.GetMsg()) {
logutil.CL(ctx).Warn("backup occur storage error", zap.String("error", errPb.GetMsg()))
continue
}
var errMsg string
if utils.MessageIsNotFoundStorageError(errPb.GetMsg()) {
errMsg = fmt.Sprintf("File or directory not found on TiKV Node (store id: %v; Address: %s). "+
"work around:please ensure br and tikv nodes share a same storage and the user of br and tikv has same uid.",
store.GetId(), redact.String(store.GetAddress()))
logutil.CL(ctx).Error("", zap.String("error", berrors.ErrKVStorage.Error()+": "+errMsg))
}
if utils.MessageIsPermissionDeniedStorageError(errPb.GetMsg()) {
errMsg = fmt.Sprintf("I/O permission denied error occurs on TiKV Node(store id: %v; Address: %s). "+
"work around:please ensure tikv has permission to read from & write to the storage.",
store.GetId(), redact.String(store.GetAddress()))
logutil.CL(ctx).Error("", zap.String("error", berrors.ErrKVStorage.Error()+": "+errMsg))
}

res := errContext.HandleIgnorableError(errPb, store.GetId())
switch res.Strategy {
case utils.GiveUpStrategy:
errMsg := res.Reason
if len(errMsg) <= 0 {
errMsg = errPb.Msg
}
Expand All @@ -220,6 +195,10 @@ func (push *pushDown) pushBackup(
redact.String(store.GetAddress()),
errMsg,
)
default:
// other type just continue for next response
// and finally handle the range in fineGrainedBackup
continue
}
}
case err := <-push.errCh:
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ var (
ErrBackupInvalidRange = errors.Normalize("backup range invalid", errors.RFCCodeText("BR:Backup:ErrBackupInvalidRange"))
ErrBackupNoLeader = errors.Normalize("backup no leader", errors.RFCCodeText("BR:Backup:ErrBackupNoLeader"))
ErrBackupGCSafepointExceeded = errors.Normalize("backup GC safepoint exceeded", errors.RFCCodeText("BR:Backup:ErrBackupGCSafepointExceeded"))
ErrBackupKeyIsLocked = errors.Normalize("backup key is locked", errors.RFCCodeText("BR:Backup:ErrBackupKeyIsLocked"))
ErrBackupRegion = errors.Normalize("backup region error", errors.RFCCodeText("BR:Backup:ErrBackupRegion"))

ErrRestoreModeMismatch = errors.Normalize("restore mode mismatch", errors.RFCCodeText("BR:Restore:ErrRestoreModeMismatch"))
ErrRestoreRangeMismatch = errors.Normalize("restore range mismatch", errors.RFCCodeText("BR:Restore:ErrRestoreRangeMismatch"))
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/utils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ go_test(
],
embed = [":utils"],
flaky = True,
shard_count = 33,
shard_count = 36,
deps = [
"//br/pkg/errors",
"//br/pkg/metautil",
Expand All @@ -115,6 +115,7 @@ go_test(
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/encryptionpb",
"@com_github_pingcap_kvproto//pkg/errorpb",
"@com_github_pingcap_kvproto//pkg/import_sstpb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_stretchr_testify//require",
Expand Down
20 changes: 13 additions & 7 deletions br/pkg/utils/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,28 +120,34 @@ type importerBackoffer struct {
attempt int
delayTime time.Duration
maxDelayTime time.Duration
errContext *ErrorContext
}

// NewBackoffer creates a new controller regulating a truncated exponential backoff.
func NewBackoffer(attempt int, delayTime, maxDelayTime time.Duration) Backoffer {
func NewBackoffer(attempt int, delayTime, maxDelayTime time.Duration, errContext *ErrorContext) Backoffer {
return &importerBackoffer{
attempt: attempt,
delayTime: delayTime,
maxDelayTime: maxDelayTime,
errContext: errContext,
}
}

func NewImportSSTBackoffer() Backoffer {
return NewBackoffer(importSSTRetryTimes, importSSTWaitInterval, importSSTMaxWaitInterval)
errContext := NewErrorContext("import sst", 3)
return NewBackoffer(importSSTRetryTimes, importSSTWaitInterval, importSSTMaxWaitInterval, errContext)
}

func NewDownloadSSTBackoffer() Backoffer {
return NewBackoffer(downloadSSTRetryTimes, downloadSSTWaitInterval, downloadSSTMaxWaitInterval)
errContext := NewErrorContext("download sst", 3)
return NewBackoffer(downloadSSTRetryTimes, downloadSSTWaitInterval, downloadSSTMaxWaitInterval, errContext)
}

func (bo *importerBackoffer) NextBackoff(err error) time.Duration {
log.Warn("retry to import ssts", zap.Int("attempt", bo.attempt), zap.Error(err))
if MessageIsRetryableStorageError(err.Error()) {
// we don't care storeID here.
res := bo.errContext.HandleErrorMsg(err.Error(), 0)
if res.Strategy == RetryStrategy {
bo.delayTime = 2 * bo.delayTime
bo.attempt--
} else {
Expand All @@ -151,7 +157,7 @@ func (bo *importerBackoffer) NextBackoff(err error) time.Duration {
bo.delayTime = 2 * bo.delayTime
bo.attempt--
case berrors.ErrKVRangeIsEmpty, berrors.ErrKVRewriteRuleNotFound:
// Excepted error, finish the operation
// Expected error, finish the operation
bo.delayTime = 0
bo.attempt = 0
default:
Expand All @@ -160,10 +166,10 @@ func (bo *importerBackoffer) NextBackoff(err error) time.Duration {
bo.delayTime = 2 * bo.delayTime
bo.attempt--
default:
// Unexcepted error
// Unexpected error
bo.delayTime = 0
bo.attempt = 0
log.Warn("unexcepted error, stop to retry", zap.Error(err))
log.Warn("unexpected error, stop retrying", zap.Error(err))
}
}
}
Expand Down
26 changes: 22 additions & 4 deletions br/pkg/utils/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

"github.com/pingcap/errors"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/stretchr/testify/require"
Expand All @@ -18,7 +19,7 @@ import (

func TestBackoffWithSuccess(t *testing.T) {
var counter int
backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond)
backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond, utils.NewDefaultContext())
err := utils.WithRetry(context.Background(), func() error {
defer func() { counter++ }()
switch counter {
Expand All @@ -35,9 +36,26 @@ func TestBackoffWithSuccess(t *testing.T) {
require.NoError(t, err)
}

func TestBackoffWithUnknowneErrorSuccess(t *testing.T) {
var counter int
backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond, utils.NewDefaultContext())
err := utils.WithRetry(context.Background(), func() error {
defer func() { counter++ }()
switch counter {
case 0:
return errors.New("unknown error: not in the allow list")
case 1:
return berrors.ErrKVEpochNotMatch
}
return nil
}, backoffer)
require.Equal(t, 3, counter)
require.NoError(t, err)
}

func TestBackoffWithFatalError(t *testing.T) {
var counter int
backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond)
backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond, utils.NewDefaultContext())
gRPCError := status.Error(codes.Unavailable, "transport is closing")
err := utils.WithRetry(context.Background(), func() error {
defer func() { counter++ }()
Expand Down Expand Up @@ -65,7 +83,7 @@ func TestBackoffWithFatalError(t *testing.T) {
func TestBackoffWithFatalRawGRPCError(t *testing.T) {
var counter int
canceledError := status.Error(codes.Canceled, "context canceled")
backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond)
backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond, utils.NewDefaultContext())
err := utils.WithRetry(context.Background(), func() error {
defer func() { counter++ }()
return canceledError // nolint:wrapcheck
Expand All @@ -76,7 +94,7 @@ func TestBackoffWithFatalRawGRPCError(t *testing.T) {

func TestBackoffWithRetryableError(t *testing.T) {
var counter int
backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond)
backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond, utils.NewDefaultContext())
err := utils.WithRetry(context.Background(), func() error {
defer func() { counter++ }()
return berrors.ErrKVEpochNotMatch
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/utils/permission.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ var (
permissionDeniedMsg = "permissiondenied"
)

// MessageIsNotFoundStorageError checks whether the message returning from TiKV is "NotFound" storage I/O error
func MessageIsNotFoundStorageError(msg string) bool {
// messageIsNotFoundStorageError checks whether the message returning from TiKV is "NotFound" storage I/O error
func messageIsNotFoundStorageError(msg string) bool {
msgLower := strings.ToLower(msg)
return strings.Contains(msgLower, "io") && strings.Contains(msgLower, ioNotFoundMsg)
}

// MessageIsPermissionDeniedStorageError checks whether the message returning from TiKV is "PermissionDenied" storage I/O error
func MessageIsPermissionDeniedStorageError(msg string) bool {
func messageIsPermissionDeniedStorageError(msg string) bool {
msgLower := strings.ToLower(msg)
return strings.Contains(msgLower, permissionDeniedMsg)
}
Loading

0 comments on commit eacc493

Please sign in to comment.