diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index dd3e615f4..6243f62b6 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -585,12 +585,16 @@ func (c *RPCContext) String() string { type contextPatcher struct { replicaRead *bool busyThreshold *time.Duration + staleRead *bool } func (patcher *contextPatcher) applyTo(pbCtx *kvrpcpb.Context) { if patcher.replicaRead != nil { pbCtx.ReplicaRead = *patcher.replicaRead } + if patcher.staleRead != nil { + pbCtx.StaleRead = *patcher.staleRead + } if patcher.busyThreshold != nil { millis := patcher.busyThreshold.Milliseconds() if millis > 0 && millis <= math.MaxUint32 { diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index f10580bd1..52d81b338 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -542,6 +542,7 @@ type accessFollower struct { func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { replicaSize := len(selector.replicas) + resetStaleRead := false if state.lastIdx < 0 { if state.tryLeader { state.lastIdx = AccessIndex(rand.Intn(replicaSize)) @@ -562,6 +563,8 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector // if txnScope is local, we will retry both other peers and the leader by the strategy of replicaSelector. if state.isGlobalStaleRead { WithLeaderOnly()(&state.option) + // retry on the leader should not use stale read flag to avoid possible DataIsNotReady error as it always can serve any read + resetStaleRead = true } state.lastIdx++ } @@ -592,7 +595,7 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector ) } leader := selector.replicas[state.leaderIdx] - if leader.isEpochStale() || leader.isExhausted(1) { + if leader.isEpochStale() || (!state.option.leaderOnly && leader.isExhausted(1)) { metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc() selector.invalidateRegion() return nil, nil @@ -608,7 +611,15 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector selector.replicas[selector.targetIdx].store.recordReplicaFlowsStats(toLeader) } } - return selector.buildRPCContext(bo) + rpcCtx, err := selector.buildRPCContext(bo) + if err != nil || rpcCtx == nil { + return nil, err + } + if resetStaleRead { + staleRead := false + rpcCtx.contextPatcher.staleRead = &staleRead + } + return rpcCtx, nil } func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replicaSelector, cause error) { @@ -1869,9 +1880,12 @@ func (s *RegionRequestSender) onRegionError( zap.Uint64("safe-ts", regionErr.GetDataIsNotReady().GetSafeTs()), zap.Stringer("ctx", ctx), ) - err = bo.Backoff(retry.BoMaxDataNotReady, errors.New("data is not ready")) - if err != nil { - return false, err + if !req.IsGlobalStaleRead() { + // only backoff local stale reads as global should retry immediately against the leader as a normal read + err = bo.Backoff(retry.BoMaxDataNotReady, errors.New("data is not ready")) + if err != nil { + return false, err + } } return true, nil } diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index 3a3287e05..c8fe0f614 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -909,31 +909,34 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { req = tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}) req.ReadReplicaScope = oracle.GlobalTxnScope req.TxnScope = oracle.GlobalTxnScope - req.EnableStaleRead() - for i := 0; i < 5; i++ { + for i := 0; i < 10; i++ { + req.EnableStaleRead() // The request may be sent to the leader directly. We have to distinguish it. - failureOnFollower := false + failureOnFollower := 0 + failureOnLeader := 0 s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { if addr != s.cluster.GetStore(s.storeIDs[0]).Address { - failureOnFollower = true + failureOnFollower++ + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{}}}, nil + } else if failureOnLeader == 0 && i%2 == 0 { + failureOnLeader++ return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{}}}, nil + } else { + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{}}, nil } - return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{}}, nil }} sender.SendReq(bo, req, region.Region, time.Second) state, ok := sender.replicaSelector.state.(*accessFollower) s.True(ok) - s.True(!failureOnFollower || state.option.leaderOnly) - totalAttempts := 0 - for idx, replica := range sender.replicaSelector.replicas { - totalAttempts += replica.attempts - if idx == int(state.leaderIdx) { - s.Equal(1, replica.attempts) - } else { - s.True(replica.attempts <= 1) - } + s.True(failureOnFollower <= 1) // any retry should go to the leader, hence at most one failure on the follower allowed + if failureOnFollower == 0 && failureOnLeader == 0 { + // if the request goes to the leader and succeeds then it is executed as a StaleRead + s.True(req.StaleRead) + } else { + // otherwise #leaderOnly flag should be set and retry request as a normal read + s.True(state.option.leaderOnly) + s.False(req.StaleRead) } - s.True(totalAttempts <= 2) } }