diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index 080c6e1f56b83..237d147a97ed2 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -1185,17 +1185,18 @@ func OnBackupResponse( backupTS uint64, lockResolver *txnlock.LockResolver, resp *backuppb.BackupResponse, + errContext *utils.ErrorContext, ) (*backuppb.BackupResponse, int, error) { log.Debug("OnBackupResponse", zap.Reflect("resp", resp)) if resp.Error == nil { return resp, 0, nil } backoffMs := 0 - switch v := resp.Error.Detail.(type) { + + err := resp.Error + switch v := err.Detail.(type) { case *backuppb.Error_KvError: if lockErr := v.KvError.Locked; lockErr != nil { - // Try to resolve lock. - log.Warn("backup occur kv error", zap.Reflect("error", v)) msBeforeExpired, err1 := lockResolver.ResolveLocks( bo, backupTS, []*txnlock.Lock{txnlock.NewLock(lockErr)}) if err1 != nil { @@ -1206,44 +1207,16 @@ func OnBackupResponse( } return nil, backoffMs, nil } - // Backup should not meet error other than KeyLocked. - log.Error("unexpect kv error", zap.Reflect("KvError", v.KvError)) - return nil, backoffMs, errors.Annotatef(berrors.ErrKVUnknown, "storeID: %d OnBackupResponse error %v", storeID, v) - - case *backuppb.Error_RegionError: - regionErr := v.RegionError - // Ignore following errors. - if !(regionErr.EpochNotMatch != nil || - regionErr.NotLeader != nil || - regionErr.RegionNotFound != nil || - regionErr.ServerIsBusy != nil || - regionErr.StaleCommand != nil || - regionErr.StoreNotMatch != nil || - regionErr.ReadIndexNotReady != nil || - regionErr.ProposalInMergingMode != nil) { - log.Error("unexpect region error", zap.Reflect("RegionError", regionErr)) - return nil, backoffMs, errors.Annotatef(berrors.ErrKVUnknown, "storeID: %d OnBackupResponse error %v", storeID, v) - } - log.Warn("backup occur region error", - zap.Reflect("RegionError", regionErr), - zap.Uint64("storeID", storeID)) - // TODO: a better backoff. - backoffMs = 1000 /* 1s */ - return nil, backoffMs, nil - case *backuppb.Error_ClusterIdError: - log.Error("backup occur cluster ID error", zap.Reflect("error", v), zap.Uint64("storeID", storeID)) - return nil, 0, errors.Annotatef(berrors.ErrKVClusterIDMismatch, "%v on storeID: %d", resp.Error, storeID) default: - // UNSAFE! TODO: use meaningful error code instead of unstructured message to find failed to write error. - if utils.MessageIsRetryableStorageError(resp.GetError().GetMsg()) { - log.Warn("backup occur storage error", zap.String("error", resp.GetError().GetMsg())) - // back off 3000ms, for S3 is 99.99% available (i.e. the max outage time would less than 52.56mins per year), - // this time would be probably enough for s3 to resume. + res := errContext.HandleError(resp.Error, storeID) + switch res.Strategy { + case utils.GiveUpStrategy: + return nil, 0, errors.Annotatef(berrors.ErrKVUnknown, "storeID: %d OnBackupResponse error %s", storeID, res.Reason) + case utils.RetryStrategy: return nil, 3000, nil } - log.Error("backup occur unknown error", zap.String("error", resp.Error.GetMsg()), zap.Uint64("storeID", storeID)) - return nil, 0, errors.Annotatef(berrors.ErrKVUnknown, "%v on storeID: %d", resp.Error, storeID) } + return nil, 3000, errors.Annotatef(berrors.ErrKVUnknown, "unreachable") } func (bc *Client) handleFineGrained( @@ -1273,12 +1246,13 @@ func (bc *Client) handleFineGrained( } hasProgress := false backoffMill := 0 + errContext := utils.NewErrorContext("handleFineGrainedBackup", 10) err = SendBackup( ctx, storeID, client, req, // Handle responses with the same backoffer. func(resp *backuppb.BackupResponse) error { response, shouldBackoff, err1 := - OnBackupResponse(storeID, bo, req.EndVersion, lockResolver, resp) + OnBackupResponse(storeID, bo, req.EndVersion, lockResolver, resp, errContext) if err1 != nil { return err1 } diff --git a/br/pkg/backup/client_test.go b/br/pkg/backup/client_test.go index 592416e8ec03c..f296111789ea9 100644 --- a/br/pkg/backup/client_test.go +++ b/br/pkg/backup/client_test.go @@ -20,12 +20,22 @@ import ( "github.com/pingcap/tidb/br/pkg/mock" "github.com/pingcap/tidb/br/pkg/pdutil" "github.com/pingcap/tidb/br/pkg/storage" +<<<<<<< HEAD "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/codec" +======= + "github.com/pingcap/tidb/br/pkg/utils" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/tablecodec" + "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util/codec" +>>>>>>> 6c30c6e6aa7 (br: refactor error handle mechanism to tolerant unexpect kv errors. (#48646)) "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/testutils" @@ -230,20 +240,20 @@ func TestOnBackupRegionErrorResponse(t *testing.T) { } cases := []Case{ - {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{NotLeader: &errorpb.NotLeader{}}), exceptedBackoffMs: 1000, exceptedErr: false}, - {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{RegionNotFound: &errorpb.RegionNotFound{}}), exceptedBackoffMs: 1000, exceptedErr: false}, + {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{NotLeader: &errorpb.NotLeader{}}), exceptedBackoffMs: 3000, exceptedErr: false}, + {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{RegionNotFound: &errorpb.RegionNotFound{}}), exceptedBackoffMs: 3000, exceptedErr: false}, {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{KeyNotInRegion: &errorpb.KeyNotInRegion{}}), exceptedBackoffMs: 0, exceptedErr: true}, - {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}), exceptedBackoffMs: 1000, exceptedErr: false}, - {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{}}), exceptedBackoffMs: 1000, exceptedErr: false}, - {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{StaleCommand: &errorpb.StaleCommand{}}), exceptedBackoffMs: 1000, exceptedErr: false}, - {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{StoreNotMatch: &errorpb.StoreNotMatch{}}), exceptedBackoffMs: 1000, exceptedErr: false}, + {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}), exceptedBackoffMs: 3000, exceptedErr: false}, + {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{}}), exceptedBackoffMs: 3000, exceptedErr: false}, + {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{StaleCommand: &errorpb.StaleCommand{}}), exceptedBackoffMs: 3000, exceptedErr: false}, + {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{StoreNotMatch: &errorpb.StoreNotMatch{}}), exceptedBackoffMs: 3000, exceptedErr: false}, {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{RaftEntryTooLarge: &errorpb.RaftEntryTooLarge{}}), exceptedBackoffMs: 0, exceptedErr: true}, - {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ReadIndexNotReady: &errorpb.ReadIndexNotReady{}}), exceptedBackoffMs: 1000, exceptedErr: false}, - {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ProposalInMergingMode: &errorpb.ProposalInMergingMode{}}), exceptedBackoffMs: 1000, exceptedErr: false}, + {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ReadIndexNotReady: &errorpb.ReadIndexNotReady{}}), exceptedBackoffMs: 3000, exceptedErr: false}, + {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ProposalInMergingMode: &errorpb.ProposalInMergingMode{}}), exceptedBackoffMs: 3000, exceptedErr: false}, } for _, cs := range cases { t.Log(cs) - _, backoffMs, err := backup.OnBackupResponse(cs.storeID, cs.bo, cs.backupTS, cs.lockResolver, cs.resp) + _, backoffMs, err := backup.OnBackupResponse(cs.storeID, cs.bo, cs.backupTS, cs.lockResolver, cs.resp, utils.NewErrorContext("test", 1)) require.Equal(t, cs.exceptedBackoffMs, backoffMs) if cs.exceptedErr { require.Error(t, err) diff --git a/br/pkg/backup/push.go b/br/pkg/backup/push.go index 4713509024a25..16d2fa0da06ea 100644 --- a/br/pkg/backup/push.go +++ b/br/pkg/backup/push.go @@ -4,7 +4,6 @@ package backup import ( "context" - "fmt" "sync" "github.com/opentracing/opentracing-go" @@ -73,6 +72,7 @@ func (push *pushDown) pushBackup( }) wg := new(sync.WaitGroup) + errContext := utils.NewErrorContext("pushBackup", 10) for _, s := range stores { store := s storeID := s.GetId() @@ -183,35 +183,10 @@ func (push *pushDown) pushBackup( progressCallBack(RegionUnit) } else { errPb := resp.GetError() - switch v := errPb.Detail.(type) { - case *backuppb.Error_KvError: - logutil.CL(ctx).Warn("backup occur kv error", zap.Reflect("error", v)) - - case *backuppb.Error_RegionError: - logutil.CL(ctx).Warn("backup occur region error", zap.Reflect("error", v)) - - case *backuppb.Error_ClusterIdError: - logutil.CL(ctx).Error("backup occur cluster ID error", zap.Reflect("error", v)) - return errors.Annotatef(berrors.ErrKVClusterIDMismatch, "%v", errPb) - default: - if utils.MessageIsRetryableStorageError(errPb.GetMsg()) { - logutil.CL(ctx).Warn("backup occur storage error", zap.String("error", errPb.GetMsg())) - continue - } - var errMsg string - if utils.MessageIsNotFoundStorageError(errPb.GetMsg()) { - errMsg = fmt.Sprintf("File or directory not found on TiKV Node (store id: %v; Address: %s). "+ - "work around:please ensure br and tikv nodes share a same storage and the user of br and tikv has same uid.", - store.GetId(), redact.String(store.GetAddress())) - logutil.CL(ctx).Error("", zap.String("error", berrors.ErrKVStorage.Error()+": "+errMsg)) - } - if utils.MessageIsPermissionDeniedStorageError(errPb.GetMsg()) { - errMsg = fmt.Sprintf("I/O permission denied error occurs on TiKV Node(store id: %v; Address: %s). "+ - "work around:please ensure tikv has permission to read from & write to the storage.", - store.GetId(), redact.String(store.GetAddress())) - logutil.CL(ctx).Error("", zap.String("error", berrors.ErrKVStorage.Error()+": "+errMsg)) - } - + res := errContext.HandleIgnorableError(errPb, store.GetId()) + switch res.Strategy { + case utils.GiveUpStrategy: + errMsg := res.Reason if len(errMsg) <= 0 { errMsg = errPb.Msg } @@ -220,6 +195,10 @@ func (push *pushDown) pushBackup( redact.String(store.GetAddress()), errMsg, ) + default: + // other type just continue for next response + // and finally handle the range in fineGrainedBackup + continue } } case err := <-push.errCh: diff --git a/br/pkg/errors/errors.go b/br/pkg/errors/errors.go index 1c18f0b45fe6e..9c032bbbb3259 100644 --- a/br/pkg/errors/errors.go +++ b/br/pkg/errors/errors.go @@ -49,6 +49,8 @@ var ( ErrBackupInvalidRange = errors.Normalize("backup range invalid", errors.RFCCodeText("BR:Backup:ErrBackupInvalidRange")) ErrBackupNoLeader = errors.Normalize("backup no leader", errors.RFCCodeText("BR:Backup:ErrBackupNoLeader")) ErrBackupGCSafepointExceeded = errors.Normalize("backup GC safepoint exceeded", errors.RFCCodeText("BR:Backup:ErrBackupGCSafepointExceeded")) + ErrBackupKeyIsLocked = errors.Normalize("backup key is locked", errors.RFCCodeText("BR:Backup:ErrBackupKeyIsLocked")) + ErrBackupRegion = errors.Normalize("backup region error", errors.RFCCodeText("BR:Backup:ErrBackupRegion")) ErrRestoreModeMismatch = errors.Normalize("restore mode mismatch", errors.RFCCodeText("BR:Restore:ErrRestoreModeMismatch")) ErrRestoreRangeMismatch = errors.Normalize("restore range mismatch", errors.RFCCodeText("BR:Restore:ErrRestoreRangeMismatch")) diff --git a/br/pkg/utils/BUILD.bazel b/br/pkg/utils/BUILD.bazel index ad17b725129b7..b8bf9356d929e 100644 --- a/br/pkg/utils/BUILD.bazel +++ b/br/pkg/utils/BUILD.bazel @@ -94,7 +94,11 @@ go_test( ], embed = [":utils"], flaky = True, +<<<<<<< HEAD shard_count = 33, +======= + shard_count = 37, +>>>>>>> 6c30c6e6aa7 (br: refactor error handle mechanism to tolerant unexpect kv errors. (#48646)) deps = [ "//br/pkg/errors", "//br/pkg/metautil", @@ -115,6 +119,7 @@ go_test( "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/brpb", "@com_github_pingcap_kvproto//pkg/encryptionpb", + "@com_github_pingcap_kvproto//pkg/errorpb", "@com_github_pingcap_kvproto//pkg/import_sstpb", "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_stretchr_testify//require", diff --git a/br/pkg/utils/backoff.go b/br/pkg/utils/backoff.go index 8c7712ecb1b1d..8f872e46a2b3b 100644 --- a/br/pkg/utils/backoff.go +++ b/br/pkg/utils/backoff.go @@ -120,28 +120,34 @@ type importerBackoffer struct { attempt int delayTime time.Duration maxDelayTime time.Duration + errContext *ErrorContext } // NewBackoffer creates a new controller regulating a truncated exponential backoff. -func NewBackoffer(attempt int, delayTime, maxDelayTime time.Duration) Backoffer { +func NewBackoffer(attempt int, delayTime, maxDelayTime time.Duration, errContext *ErrorContext) Backoffer { return &importerBackoffer{ attempt: attempt, delayTime: delayTime, maxDelayTime: maxDelayTime, + errContext: errContext, } } func NewImportSSTBackoffer() Backoffer { - return NewBackoffer(importSSTRetryTimes, importSSTWaitInterval, importSSTMaxWaitInterval) + errContext := NewErrorContext("import sst", 3) + return NewBackoffer(importSSTRetryTimes, importSSTWaitInterval, importSSTMaxWaitInterval, errContext) } func NewDownloadSSTBackoffer() Backoffer { - return NewBackoffer(downloadSSTRetryTimes, downloadSSTWaitInterval, downloadSSTMaxWaitInterval) + errContext := NewErrorContext("download sst", 3) + return NewBackoffer(downloadSSTRetryTimes, downloadSSTWaitInterval, downloadSSTMaxWaitInterval, errContext) } func (bo *importerBackoffer) NextBackoff(err error) time.Duration { log.Warn("retry to import ssts", zap.Int("attempt", bo.attempt), zap.Error(err)) - if MessageIsRetryableStorageError(err.Error()) { + // we don't care storeID here. + res := bo.errContext.HandleErrorMsg(err.Error(), 0) + if res.Strategy == RetryStrategy { bo.delayTime = 2 * bo.delayTime bo.attempt-- } else { @@ -151,7 +157,7 @@ func (bo *importerBackoffer) NextBackoff(err error) time.Duration { bo.delayTime = 2 * bo.delayTime bo.attempt-- case berrors.ErrKVRangeIsEmpty, berrors.ErrKVRewriteRuleNotFound: - // Excepted error, finish the operation + // Expected error, finish the operation bo.delayTime = 0 bo.attempt = 0 default: @@ -160,10 +166,10 @@ func (bo *importerBackoffer) NextBackoff(err error) time.Duration { bo.delayTime = 2 * bo.delayTime bo.attempt-- default: - // Unexcepted error + // Unexpected error bo.delayTime = 0 bo.attempt = 0 - log.Warn("unexcepted error, stop to retry", zap.Error(err)) + log.Warn("unexpected error, stop retrying", zap.Error(err)) } } } diff --git a/br/pkg/utils/backoff_test.go b/br/pkg/utils/backoff_test.go index 31778052f77e1..dc09826fd7806 100644 --- a/br/pkg/utils/backoff_test.go +++ b/br/pkg/utils/backoff_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/pingcap/errors" berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/utils" "github.com/stretchr/testify/require" @@ -18,7 +19,7 @@ import ( func TestBackoffWithSuccess(t *testing.T) { var counter int - backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond) + backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond, utils.NewDefaultContext()) err := utils.WithRetry(context.Background(), func() error { defer func() { counter++ }() switch counter { @@ -35,9 +36,26 @@ func TestBackoffWithSuccess(t *testing.T) { require.NoError(t, err) } +func TestBackoffWithUnknowneErrorSuccess(t *testing.T) { + var counter int + backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond, utils.NewDefaultContext()) + err := utils.WithRetry(context.Background(), func() error { + defer func() { counter++ }() + switch counter { + case 0: + return errors.New("unknown error: not in the allow list") + case 1: + return berrors.ErrKVEpochNotMatch + } + return nil + }, backoffer) + require.Equal(t, 3, counter) + require.NoError(t, err) +} + func TestBackoffWithFatalError(t *testing.T) { var counter int - backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond) + backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond, utils.NewDefaultContext()) gRPCError := status.Error(codes.Unavailable, "transport is closing") err := utils.WithRetry(context.Background(), func() error { defer func() { counter++ }() @@ -65,7 +83,7 @@ func TestBackoffWithFatalError(t *testing.T) { func TestBackoffWithFatalRawGRPCError(t *testing.T) { var counter int canceledError := status.Error(codes.Canceled, "context canceled") - backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond) + backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond, utils.NewDefaultContext()) err := utils.WithRetry(context.Background(), func() error { defer func() { counter++ }() return canceledError // nolint:wrapcheck @@ -76,7 +94,7 @@ func TestBackoffWithFatalRawGRPCError(t *testing.T) { func TestBackoffWithRetryableError(t *testing.T) { var counter int - backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond) + backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond, utils.NewDefaultContext()) err := utils.WithRetry(context.Background(), func() error { defer func() { counter++ }() return berrors.ErrKVEpochNotMatch diff --git a/br/pkg/utils/permission.go b/br/pkg/utils/permission.go index e18c28dbbbe1c..3c0795db11c47 100644 --- a/br/pkg/utils/permission.go +++ b/br/pkg/utils/permission.go @@ -7,14 +7,14 @@ var ( permissionDeniedMsg = "permissiondenied" ) -// MessageIsNotFoundStorageError checks whether the message returning from TiKV is "NotFound" storage I/O error -func MessageIsNotFoundStorageError(msg string) bool { +// messageIsNotFoundStorageError checks whether the message returning from TiKV is "NotFound" storage I/O error +func messageIsNotFoundStorageError(msg string) bool { msgLower := strings.ToLower(msg) return strings.Contains(msgLower, "io") && strings.Contains(msgLower, ioNotFoundMsg) } // MessageIsPermissionDeniedStorageError checks whether the message returning from TiKV is "PermissionDenied" storage I/O error -func MessageIsPermissionDeniedStorageError(msg string) bool { +func messageIsPermissionDeniedStorageError(msg string) bool { msgLower := strings.ToLower(msg) return strings.Contains(msgLower, permissionDeniedMsg) } diff --git a/br/pkg/utils/retry.go b/br/pkg/utils/retry.go index 20482d7c423a2..0baf07a6cf555 100644 --- a/br/pkg/utils/retry.go +++ b/br/pkg/utils/retry.go @@ -4,16 +4,25 @@ package utils import ( "context" + "fmt" "strings" "sync" "time" "github.com/cznic/mathutil" "github.com/pingcap/errors" +<<<<<<< HEAD tmysql "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/parser/terror" +======= + backuppb "github.com/pingcap/kvproto/pkg/brpb" + "github.com/pingcap/log" + tmysql "github.com/pingcap/tidb/pkg/errno" + "github.com/pingcap/tidb/pkg/parser/terror" +>>>>>>> 6c30c6e6aa7 (br: refactor error handle mechanism to tolerant unexpect kv errors. (#48646)) "github.com/tikv/client-go/v2/tikv" "go.uber.org/multierr" + "go.uber.org/zap" ) var retryableServerError = []string{ @@ -33,6 +42,157 @@ var retryableServerError = []string{ "end of file before message length reached", } +type ErrorResult struct { + Strategy ErrorStrategy + Reason string +} + +type ErrorStrategy int + +const ( + // This type can be retry but consume the backoffer attempts. + RetryStrategy ErrorStrategy = iota + // This type means unrecoverable error and the whole progress should exits + // for example: + // 1. permission not valid. + // 2. data has not found. + // 3. retry too many times + GiveUpStrategy + // This type represents Unknown error + UnknownStrategy +) + +type ErrorContext struct { + mu sync.Mutex + // encounter times for one context on a store + // we may use this value to determine the retry policy + encounterTimes map[uint64]int + // unknown error retry limitation. + // encouter many times error makes Retry to GiveUp. + encounterTimesLimitation int + // whether in backup or restore + scenario string +} + +func NewErrorContext(scenario string, limitation int) *ErrorContext { + return &ErrorContext{ + scenario: scenario, + encounterTimes: make(map[uint64]int), + encounterTimesLimitation: limitation, + } +} + +func NewDefaultContext() *ErrorContext { + return &ErrorContext{ + scenario: "default", + encounterTimes: make(map[uint64]int), + encounterTimesLimitation: 1, + } +} + +func (ec *ErrorContext) HandleError(err *backuppb.Error, uuid uint64) ErrorResult { + if err == nil { + return ErrorResult{RetryStrategy, "unreachable retry"} + } + res := ec.handleErrorPb(err, uuid) + // try the best effort to save progress from error here + if res.Strategy == UnknownStrategy && len(err.Msg) != 0 { + return ec.HandleErrorMsg(err.Msg, uuid) + } + return res +} + +func (ec *ErrorContext) HandleIgnorableError(err *backuppb.Error, uuid uint64) ErrorResult { + if err == nil { + return ErrorResult{RetryStrategy, "unreachable retry"} + } + res := ec.handleIgnorableErrorPb(err, uuid) + // try the best effort to save progress from error here + if res.Strategy == UnknownStrategy && len(err.Msg) != 0 { + return ec.HandleErrorMsg(err.Msg, uuid) + } + return res +} + +func (ec *ErrorContext) HandleErrorMsg(msg string, uuid uint64) ErrorResult { + // UNSAFE! TODO: use meaningful error code instead of unstructured message to find failed to write error. + logger := log.L().With(zap.String("scenario", ec.scenario)) + if messageIsNotFoundStorageError(msg) { + reason := fmt.Sprintf("File or directory not found on TiKV Node (store id: %v). "+ + "work around:please ensure br and tikv nodes share a same storage and the user of br and tikv has same uid.", + uuid) + return ErrorResult{GiveUpStrategy, reason} + } + if messageIsPermissionDeniedStorageError(msg) { + reason := fmt.Sprintf("I/O permission denied error occurs on TiKV Node(store id: %v). "+ + "work around:please ensure tikv has permission to read from & write to the storage.", + uuid) + return ErrorResult{GiveUpStrategy, reason} + } + msgLower := strings.ToLower(msg) + if strings.Contains(msgLower, "context canceled") { + return ErrorResult{GiveUpStrategy, "context canceled, give up"} + } + + if MessageIsRetryableStorageError(msg) { + logger.Warn("occur storage error", zap.String("error", msg)) + return ErrorResult{RetryStrategy, "retrable error"} + } + // retry enough on same store + ec.mu.Lock() + defer ec.mu.Unlock() + ec.encounterTimes[uuid]++ + if ec.encounterTimes[uuid] <= ec.encounterTimesLimitation { + return ErrorResult{RetryStrategy, "unknown error, retry it for few times"} + } + return ErrorResult{GiveUpStrategy, "unknown error and retry too many times, give up"} +} + +func (ec *ErrorContext) handleIgnorableErrorPb(e *backuppb.Error, uuid uint64) ErrorResult { + switch e.Detail.(type) { + case *backuppb.Error_KvError: + return ErrorResult{RetryStrategy, "retry outside because the error can be ignored"} + case *backuppb.Error_RegionError: + return ErrorResult{RetryStrategy, "retry outside because the error can be ignored"} + case *backuppb.Error_ClusterIdError: + return ErrorResult{GiveUpStrategy, "cluster ID mismatch"} + } + return ErrorResult{UnknownStrategy, "unreachable code"} +} + +func (ec *ErrorContext) handleErrorPb(e *backuppb.Error, uuid uint64) ErrorResult { + logger := log.L().With(zap.String("scenario", ec.scenario)) + switch v := e.Detail.(type) { + case *backuppb.Error_KvError: + // should not meet error other than KeyLocked. + return ErrorResult{GiveUpStrategy, "unknown kv error"} + + case *backuppb.Error_RegionError: + regionErr := v.RegionError + // Ignore following errors. + if !(regionErr.EpochNotMatch != nil || + regionErr.NotLeader != nil || + regionErr.RegionNotFound != nil || + regionErr.ServerIsBusy != nil || + regionErr.StaleCommand != nil || + regionErr.StoreNotMatch != nil || + regionErr.ReadIndexNotReady != nil || + regionErr.ProposalInMergingMode != nil) { + logger.Error("unexpect region error", zap.Reflect("RegionError", regionErr)) + return ErrorResult{GiveUpStrategy, "unknown kv error"} + } + logger.Warn("occur region error", + zap.Reflect("RegionError", regionErr), + zap.Uint64("uuid", uuid)) + return ErrorResult{RetryStrategy, "retrable error"} + + case *backuppb.Error_ClusterIdError: + logger.Error("occur cluster ID error", zap.Reflect("error", v), zap.Uint64("uuid", uuid)) + return ErrorResult{GiveUpStrategy, "cluster ID mismatch"} + } + return ErrorResult{UnknownStrategy, "unreachable code"} +} + // RetryableFunc presents a retryable operation. type RetryableFunc func() error diff --git a/br/pkg/utils/retry_test.go b/br/pkg/utils/retry_test.go index eeef8c61c0480..eeaef51ab8dfd 100644 --- a/br/pkg/utils/retry_test.go +++ b/br/pkg/utils/retry_test.go @@ -9,6 +9,8 @@ import ( "time" "github.com/pingcap/errors" + backuppb "github.com/pingcap/kvproto/pkg/brpb" + "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/tidb/br/pkg/utils" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/tikv" @@ -47,3 +49,74 @@ func TestRetryAdapter(t *testing.T) { req.Greater(time.Since(begin), 200*time.Millisecond) } + +func TestHandleError(t *testing.T) { + ec := utils.NewErrorContext("test", 3) + // Test case 1: Error is nil + result := ec.HandleError(nil, 123) + require.Equal(t, utils.ErrorResult{utils.RetryStrategy, "unreachable retry"}, result) + + // Test case 2: Error is KvError and can be ignored + kvError := &backuppb.Error_KvError{} + result = ec.HandleIgnorableError(&backuppb.Error{Detail: kvError}, 123) + require.Equal(t, utils.ErrorResult{utils.RetryStrategy, "retry outside because the error can be ignored"}, result) + + // Test case 3: Error is KvError and cannot be ignored + result = ec.HandleError(&backuppb.Error{Detail: kvError}, 123) + require.Equal(t, utils.ErrorResult{utils.GiveUpStrategy, "unknown kv error"}, result) + + // Test case 4: Error is RegionError and can be ignored + regionError := &backuppb.Error_RegionError{ + RegionError: &errorpb.Error{NotLeader: &errorpb.NotLeader{RegionId: 1}}} + result = ec.HandleIgnorableError(&backuppb.Error{Detail: regionError}, 123) + require.Equal(t, utils.ErrorResult{utils.RetryStrategy, "retry outside because the error can be ignored"}, result) + + // Test case 5: Error is RegionError and cannot be ignored + regionError = &backuppb.Error_RegionError{ + RegionError: &errorpb.Error{DiskFull: &errorpb.DiskFull{}}} + result = ec.HandleError(&backuppb.Error{Detail: regionError}, 123) + require.Equal(t, utils.ErrorResult{utils.GiveUpStrategy, "unknown kv error"}, result) + + // Test case 6: Error is ClusterIdError + clusterIdError := &backuppb.Error_ClusterIdError{} + result = ec.HandleError(&backuppb.Error{Detail: clusterIdError}, 123) + require.Equal(t, utils.ErrorResult{utils.GiveUpStrategy, "cluster ID mismatch"}, result) +} + +func TestHandleErrorMsg(t *testing.T) { + ec := utils.NewErrorContext("test", 3) + + // Test messageIsNotFoundStorageError + msg := "IO: files Notfound error" + uuid := uint64(456) + expectedReason := "File or directory not found on TiKV Node (store id: 456). work around:please ensure br and tikv nodes share a same storage and the user of br and tikv has same uid." + expectedResult := utils.ErrorResult{utils.GiveUpStrategy, expectedReason} + actualResult := ec.HandleErrorMsg(msg, uuid) + require.Equal(t, expectedResult, actualResult) + + // Test messageIsPermissionDeniedStorageError + msg = "I/O permissiondenied error occurs on TiKV Node(store id: 456)." + expectedReason = "I/O permission denied error occurs on TiKV Node(store id: 456). work around:please ensure tikv has permission to read from & write to the storage." + expectedResult = utils.ErrorResult{utils.GiveUpStrategy, expectedReason} + actualResult = ec.HandleErrorMsg(msg, uuid) + require.Equal(t, expectedResult, actualResult) + + // Test MessageIsRetryableStorageError + msg = "server closed" + expectedResult = utils.ErrorResult{utils.RetryStrategy, "retrable error"} + actualResult = ec.HandleErrorMsg(msg, uuid) + require.Equal(t, expectedResult, actualResult) + + // Test unknown error + msg = "unknown error" + expectedResult = utils.ErrorResult{utils.RetryStrategy, "unknown error, retry it for few times"} + actualResult = ec.HandleErrorMsg(msg, uuid) + require.Equal(t, expectedResult, actualResult) + + // Test retry too many times + _ = ec.HandleErrorMsg(msg, uuid) + _ = ec.HandleErrorMsg(msg, uuid) + expectedResult = utils.ErrorResult{utils.GiveUpStrategy, "unknown error and retry too many times, give up"} + actualResult = ec.HandleErrorMsg(msg, uuid) + require.Equal(t, expectedResult, actualResult) +} diff --git a/errors.toml b/errors.toml index 757fa81a07a0c..2677afeab2d68 100644 --- a/errors.toml +++ b/errors.toml @@ -16,11 +16,21 @@ error = ''' backup range invalid ''' +["BR:Backup:ErrBackupKeyIsLocked"] +error = ''' +backup key is locked +''' + ["BR:Backup:ErrBackupNoLeader"] error = ''' backup no leader ''' +["BR:Backup:ErrBackupRegion"] +error = ''' +backup region error +''' + ["BR:Common:ErrEnvNotSpecified"] error = ''' environment variable not found