Skip to content

Commit

Permalink
avoid dataIsNotReady error while retrying stale read on the leader (#765
Browse files Browse the repository at this point in the history
)

* avoid dataIsNotReady error while retrying stale read on the leader

Signed-off-by: artem_danilov <artem_danilov@airbnb.com>

* move StaleRead flag reset to retry section

Signed-off-by: artem_danilov <artem_danilov@airbnb.com>

* move all logic to #next and allow retry on the leader

Signed-off-by: artem_danilov <artem_danilov@airbnb.com>

---------

Signed-off-by: artem_danilov <artem_danilov@airbnb.com>
Co-authored-by: artem_danilov <artem_danilov@airbnb.com>
  • Loading branch information
Tema and artem_danilov authored Apr 19, 2023
1 parent 92db9f7 commit 4157137
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 20 deletions.
4 changes: 4 additions & 0 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,12 +585,16 @@ func (c *RPCContext) String() string {
type contextPatcher struct {
replicaRead *bool
busyThreshold *time.Duration
staleRead *bool
}

func (patcher *contextPatcher) applyTo(pbCtx *kvrpcpb.Context) {
if patcher.replicaRead != nil {
pbCtx.ReplicaRead = *patcher.replicaRead
}
if patcher.staleRead != nil {
pbCtx.StaleRead = *patcher.staleRead
}
if patcher.busyThreshold != nil {
millis := patcher.busyThreshold.Milliseconds()
if millis > 0 && millis <= math.MaxUint32 {
Expand Down
24 changes: 19 additions & 5 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,7 @@ type accessFollower struct {

func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) {
replicaSize := len(selector.replicas)
resetStaleRead := false
if state.lastIdx < 0 {
if state.tryLeader {
state.lastIdx = AccessIndex(rand.Intn(replicaSize))
Expand All @@ -562,6 +563,8 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
// if txnScope is local, we will retry both other peers and the leader by the strategy of replicaSelector.
if state.isGlobalStaleRead {
WithLeaderOnly()(&state.option)
// retry on the leader should not use stale read flag to avoid possible DataIsNotReady error as it always can serve any read
resetStaleRead = true
}
state.lastIdx++
}
Expand Down Expand Up @@ -592,7 +595,7 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
)
}
leader := selector.replicas[state.leaderIdx]
if leader.isEpochStale() || leader.isExhausted(1) {
if leader.isEpochStale() || (!state.option.leaderOnly && leader.isExhausted(1)) {
metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc()
selector.invalidateRegion()
return nil, nil
Expand All @@ -608,7 +611,15 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
selector.replicas[selector.targetIdx].store.recordReplicaFlowsStats(toLeader)
}
}
return selector.buildRPCContext(bo)
rpcCtx, err := selector.buildRPCContext(bo)
if err != nil || rpcCtx == nil {
return nil, err
}
if resetStaleRead {
staleRead := false
rpcCtx.contextPatcher.staleRead = &staleRead
}
return rpcCtx, nil
}

func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replicaSelector, cause error) {
Expand Down Expand Up @@ -1869,9 +1880,12 @@ func (s *RegionRequestSender) onRegionError(
zap.Uint64("safe-ts", regionErr.GetDataIsNotReady().GetSafeTs()),
zap.Stringer("ctx", ctx),
)
err = bo.Backoff(retry.BoMaxDataNotReady, errors.New("data is not ready"))
if err != nil {
return false, err
if !req.IsGlobalStaleRead() {
// only backoff local stale reads as global should retry immediately against the leader as a normal read
err = bo.Backoff(retry.BoMaxDataNotReady, errors.New("data is not ready"))
if err != nil {
return false, err
}
}
return true, nil
}
Expand Down
33 changes: 18 additions & 15 deletions internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -909,31 +909,34 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
req = tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")})
req.ReadReplicaScope = oracle.GlobalTxnScope
req.TxnScope = oracle.GlobalTxnScope
req.EnableStaleRead()
for i := 0; i < 5; i++ {
for i := 0; i < 10; i++ {
req.EnableStaleRead()
// The request may be sent to the leader directly. We have to distinguish it.
failureOnFollower := false
failureOnFollower := 0
failureOnLeader := 0
s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
if addr != s.cluster.GetStore(s.storeIDs[0]).Address {
failureOnFollower = true
failureOnFollower++
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{}}}, nil
} else if failureOnLeader == 0 && i%2 == 0 {
failureOnLeader++
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{}}}, nil
} else {
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{}}, nil
}
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{}}, nil
}}
sender.SendReq(bo, req, region.Region, time.Second)
state, ok := sender.replicaSelector.state.(*accessFollower)
s.True(ok)
s.True(!failureOnFollower || state.option.leaderOnly)
totalAttempts := 0
for idx, replica := range sender.replicaSelector.replicas {
totalAttempts += replica.attempts
if idx == int(state.leaderIdx) {
s.Equal(1, replica.attempts)
} else {
s.True(replica.attempts <= 1)
}
s.True(failureOnFollower <= 1) // any retry should go to the leader, hence at most one failure on the follower allowed
if failureOnFollower == 0 && failureOnLeader == 0 {
// if the request goes to the leader and succeeds then it is executed as a StaleRead
s.True(req.StaleRead)
} else {
// otherwise #leaderOnly flag should be set and retry request as a normal read
s.True(state.option.leaderOnly)
s.False(req.StaleRead)
}
s.True(totalAttempts <= 2)
}
}

Expand Down

0 comments on commit 4157137

Please sign in to comment.