Skip to content

Commit

Permalink
Finer is_retry_request settings (#763)
Browse files Browse the repository at this point in the history
* set is_retry_request only for requests that is possible to have undetermined errors

Signed-off-by: ekexium <eke@fastmail.com>

* rename tryTimes to retryTimes

Signed-off-by: ekexium <eke@fastmail.com>

---------

Signed-off-by: ekexium <eke@fastmail.com>
Co-authored-by: cfzjywxk <lsswxrxr@163.com>
  • Loading branch information
ekexium and cfzjywxk authored Apr 10, 2023
1 parent 1ec0ff5 commit f3e8703
Show file tree
Hide file tree
Showing 11 changed files with 344 additions and 193 deletions.
221 changes: 149 additions & 72 deletions internal/locate/region_request.go

Large diffs are not rendered by default.

36 changes: 18 additions & 18 deletions internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestStoreTokenLimit() {
kv.StoreLimit.Store(500)
s.cache.getStoreByStoreID(s.storeIDs[0]).tokenCount.Store(500)
// cause there is only one region in this cluster, regionID maps this leader.
resp, err := s.regionRequestSender.SendReq(s.bo, req, region.Region, time.Second)
resp, _, err := s.regionRequestSender.SendReq(s.bo, req, region.Region, time.Second)
s.NotNil(err)
s.Nil(resp)
e, ok := errors.Cause(err).(*tikverr.ErrTokenLimit)
Expand Down Expand Up @@ -130,7 +130,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSwitchPeerWhenNoLeader() {
bo := retry.NewBackofferWithVars(context.Background(), 5, nil)
loc, err := s.cache.LocateKey(s.bo, []byte("key"))
s.Nil(err)
resp, err := s.regionRequestSender.SendReq(bo, req, loc.Region, time.Second)
resp, _, err := s.regionRequestSender.SendReq(bo, req, loc.Region, time.Second)
s.Nil(err)
s.NotNil(resp)
}
Expand Down Expand Up @@ -181,7 +181,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestForwarding() {
Key: []byte("k"),
Value: []byte("v1"),
})
resp, ctx, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV)
resp, ctx, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV)
s.Nil(err)
regionErr, err := resp.GetRegionError()
s.Nil(err)
Expand All @@ -208,7 +208,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestForwarding() {
atomic.StoreUint32(&storeState, uint32(unreachable))

req = tikvrpc.NewRequest(tikvrpc.CmdRawGet, &kvrpcpb.RawGetRequest{Key: []byte("k")})
resp, ctx, err = s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV)
resp, ctx, _, err = s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV)
s.Nil(err)
regionErr, err = resp.GetRegionError()
s.Nil(err)
Expand Down Expand Up @@ -241,7 +241,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestForwarding() {
Key: []byte("k"),
Value: []byte("v2"),
})
resp, ctx, err = s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV)
resp, ctx, _, err = s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV)
s.Nil(err)
regionErr, err = resp.GetRegionError()
s.Nil(err)
Expand All @@ -261,7 +261,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestForwarding() {
Key: []byte("k"),
Value: []byte("v2"),
})
resp, ctx, err = s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV)
resp, ctx, _, err = s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV)
s.Nil(err)
regionErr, err = resp.GetRegionError()
s.Nil(err)
Expand Down Expand Up @@ -700,7 +700,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
// Normal
bo := retry.NewBackoffer(context.Background(), -1)
sender := s.regionRequestSender
resp, err := sender.SendReq(bo, req, region.Region, time.Second)
resp, _, err := sender.SendReq(bo, req, region.Region, time.Second)
s.Nil(err)
s.NotNil(resp)
s.True(bo.GetTotalBackoffTimes() == 0)
Expand All @@ -709,7 +709,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
bo = retry.NewBackoffer(context.Background(), -1)
s.cluster.ChangeLeader(s.regionID, s.peerIDs[1])
s.cluster.StopStore(s.storeIDs[0])
resp, err = sender.SendReq(bo, req, region.Region, time.Second)
resp, _, err = sender.SendReq(bo, req, region.Region, time.Second)
s.Nil(err)
s.NotNil(resp)
s.Equal(sender.replicaSelector.targetIdx, AccessIndex(1))
Expand All @@ -718,7 +718,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {

// Leader is updated because of send success, so no backoff.
bo = retry.NewBackoffer(context.Background(), -1)
resp, err = sender.SendReq(bo, req, region.Region, time.Second)
resp, _, err = sender.SendReq(bo, req, region.Region, time.Second)
s.Nil(err)
s.NotNil(resp)
s.Equal(sender.replicaSelector.targetIdx, AccessIndex(1))
Expand All @@ -729,7 +729,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
reloadRegion()
s.cluster.StopStore(s.storeIDs[1])
bo = retry.NewBackoffer(context.Background(), -1)
resp, err = sender.SendReq(bo, req, region.Region, time.Second)
resp, _, err = sender.SendReq(bo, req, region.Region, time.Second)
s.Nil(err)
s.True(hasFakeRegionError(resp))
s.Equal(bo.GetTotalBackoffTimes(), 1)
Expand All @@ -739,15 +739,15 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
reloadRegion()
s.cluster.ChangeLeader(s.regionID, s.peerIDs[0])
bo = retry.NewBackoffer(context.Background(), -1)
resp, err = sender.SendReq(bo, req, region.Region, time.Second)
resp, _, err = sender.SendReq(bo, req, region.Region, time.Second)
s.Nil(err)
s.NotNil(resp)
s.Equal(bo.GetTotalBackoffTimes(), 0)

// No leader. Backoff for each replica and runs out all replicas.
s.cluster.GiveUpLeader(s.regionID)
bo = retry.NewBackoffer(context.Background(), -1)
resp, err = sender.SendReq(bo, req, region.Region, time.Second)
resp, _, err = sender.SendReq(bo, req, region.Region, time.Second)
s.Nil(err)
s.True(hasFakeRegionError(resp))
s.Equal(bo.GetTotalBackoffTimes(), 2) // The unreachable leader is skipped
Expand All @@ -770,7 +770,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
reloadRegion()
s.cluster.StopStore(s.storeIDs[0])
bo = retry.NewBackoffer(context.Background(), -1)
resp, err = sender.SendReq(bo, req, region.Region, time.Second)
resp, _, err = sender.SendReq(bo, req, region.Region, time.Second)
s.Nil(err)
s.True(hasFakeRegionError(resp))
s.False(sender.replicaSelector.region.isValid())
Expand Down Expand Up @@ -804,7 +804,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
}}
reloadRegion()
bo = retry.NewBackoffer(context.Background(), -1)
resp, err := sender.SendReq(bo, req, region.Region, time.Second)
resp, _, err := sender.SendReq(bo, req, region.Region, time.Second)
s.Nil(err)
s.True(hasFakeRegionError(resp))
s.False(sender.replicaSelector.region.isValid())
Expand All @@ -824,7 +824,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
}}
reloadRegion()
bo = retry.NewBackoffer(context.Background(), -1)
resp, err := sender.SendReq(bo, req, region.Region, time.Second)
resp, _, err := sender.SendReq(bo, req, region.Region, time.Second)
s.Nil(err)
s.True(hasFakeRegionError(resp))
s.False(sender.replicaSelector.region.isValid())
Expand All @@ -843,7 +843,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
}}
reloadRegion()
bo = retry.NewBackoffer(context.Background(), -1)
resp, err := sender.SendReq(bo, req, region.Region, time.Second)
resp, _, err := sender.SendReq(bo, req, region.Region, time.Second)
s.Nil(err)
s.True(hasFakeRegionError(resp))
s.False(sender.replicaSelector.region.isValid())
Expand All @@ -867,7 +867,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
}}
reloadRegion()
bo = retry.NewBackoffer(context.Background(), -1)
resp, err := sender.SendReq(bo, req, region.Region, time.Second)
resp, _, err := sender.SendReq(bo, req, region.Region, time.Second)

// Return a sendError when meets NotLeader and can't find the leader in the region.
if i == 3 {
Expand All @@ -894,7 +894,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
s.cluster.StopStore(store)
}
bo = retry.NewBackoffer(context.Background(), -1)
resp, err = sender.SendReq(bo, req, region.Region, time.Second)
resp, _, err = sender.SendReq(bo, req, region.Region, time.Second)
s.Nil(err)
s.True(hasFakeRegionError(resp))
s.True(bo.GetTotalBackoffTimes() == 3)
Expand Down
30 changes: 15 additions & 15 deletions internal/locate/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestOnRegionError() {
return staleResp, nil
}}
bo := retry.NewBackofferWithVars(context.Background(), 5, nil)
resp, err := s.regionRequestSender.SendReq(bo, req, region.Region, time.Second)
resp, _, err := s.regionRequestSender.SendReq(bo, req, region.Region, time.Second)
s.Nil(err)
s.NotNil(resp)
regionErr, _ := resp.GetRegionError()
Expand All @@ -148,14 +148,14 @@ func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailedWithStoreRestart()
region, err := s.cache.LocateRegionByID(s.bo, s.region)
s.Nil(err)
s.NotNil(region)
resp, err := s.regionRequestSender.SendReq(s.bo, req, region.Region, time.Second)
resp, _, err := s.regionRequestSender.SendReq(s.bo, req, region.Region, time.Second)
s.Nil(err)
s.NotNil(resp.Resp)
s.Nil(s.regionRequestSender.rpcError)

// stop store.
s.cluster.StopStore(s.store)
_, err = s.regionRequestSender.SendReq(s.bo, req, region.Region, time.Second)
_, _, err = s.regionRequestSender.SendReq(s.bo, req, region.Region, time.Second)
s.NotNil(err)
// The RPC error shouldn't be nil since it failed to sent the request.
s.NotNil(s.regionRequestSender.rpcError)
Expand All @@ -169,7 +169,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailedWithStoreRestart()
s.Nil(err)
s.NotNil(region)
s.NotNil(s.regionRequestSender.rpcError)
resp, err = s.regionRequestSender.SendReq(s.bo, req, region.Region, time.Second)
resp, _, err = s.regionRequestSender.SendReq(s.bo, req, region.Region, time.Second)
s.Nil(err)
s.NotNil(resp.Resp)
}
Expand All @@ -190,7 +190,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailedWithCloseKnownStor
region, err := s.cache.LocateRegionByID(s.bo, s.region)
s.Nil(err)
s.NotNil(region)
resp, err := s.regionRequestSender.SendReq(s.bo, req, region.Region, time.Second)
resp, _, err := s.regionRequestSender.SendReq(s.bo, req, region.Region, time.Second)
s.Nil(err)
s.NotNil(resp.Resp)

Expand All @@ -200,7 +200,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailedWithCloseKnownStor

// send to store2 fail and send to new leader store1.
bo2 := retry.NewBackofferWithVars(context.Background(), 100, nil)
resp, err = s.regionRequestSender.SendReq(bo2, req, region.Region, time.Second)
resp, _, err = s.regionRequestSender.SendReq(bo2, req, region.Region, time.Second)
s.Nil(err)
regionErr, err := resp.GetRegionError()
s.Nil(err)
Expand All @@ -216,12 +216,12 @@ func (s *testRegionRequestToSingleStoreSuite) TestSendReqCtx() {
region, err := s.cache.LocateRegionByID(s.bo, s.region)
s.Nil(err)
s.NotNil(region)
resp, ctx, err := s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second, tikvrpc.TiKV)
resp, ctx, _, err := s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second, tikvrpc.TiKV)
s.Nil(err)
s.NotNil(resp.Resp)
s.NotNil(ctx)
req.ReplicaRead = true
resp, ctx, err = s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second, tikvrpc.TiKV)
resp, ctx, _, err = s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second, tikvrpc.TiKV)
s.Nil(err)
s.NotNil(resp.Resp)
s.NotNil(ctx)
Expand All @@ -235,15 +235,15 @@ func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailedWithCancelled() {
region, err := s.cache.LocateRegionByID(s.bo, s.region)
s.Nil(err)
s.NotNil(region)
resp, err := s.regionRequestSender.SendReq(s.bo, req, region.Region, time.Second)
resp, _, err := s.regionRequestSender.SendReq(s.bo, req, region.Region, time.Second)
s.Nil(err)
s.NotNil(resp.Resp)

// set store to cancel state.
s.cluster.CancelStore(s.store)
// locate region again is needed
// since last request on the region failed and region's info had been cleared.
_, err = s.regionRequestSender.SendReq(s.bo, req, region.Region, time.Second)
_, _, err = s.regionRequestSender.SendReq(s.bo, req, region.Region, time.Second)
s.NotNil(err)
s.Equal(errors.Cause(err), context.Canceled)

Expand All @@ -252,7 +252,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailedWithCancelled() {
region, err = s.cache.LocateRegionByID(s.bo, s.region)
s.Nil(err)
s.NotNil(region)
resp, err = s.regionRequestSender.SendReq(s.bo, req, region.Region, time.Second)
resp, _, err = s.regionRequestSender.SendReq(s.bo, req, region.Region, time.Second)
s.Nil(err)
s.NotNil(resp.Resp)
}
Expand All @@ -270,7 +270,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestNoReloadRegionWhenCtxCanceled(
bo, cancel := s.bo.Fork()
cancel()
// Call SendKVReq with a canceled context.
_, err = sender.SendReq(bo, req, region.Region, time.Second)
_, _, err = sender.SendReq(bo, req, region.Region, time.Second)
// Check this kind of error won't cause region cache drop.
s.Equal(errors.Cause(err), context.Canceled)
s.NotNil(sender.regionCache.getRegionByIDFromCache(s.region))
Expand Down Expand Up @@ -533,7 +533,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestNoReloadRegionForGrpcWhenCtxCa

bo, cancel := s.bo.Fork()
cancel()
_, err = sender.SendReq(bo, req, region.Region, 3*time.Second)
_, _, err = sender.SendReq(bo, req, region.Region, 3*time.Second)
s.Equal(errors.Cause(err), context.Canceled)
s.NotNil(s.cache.getRegionByIDFromCache(s.region))

Expand Down Expand Up @@ -578,7 +578,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestOnMaxTimestampNotSyncedError()
return resp, nil
}}
bo := retry.NewBackofferWithVars(context.Background(), 5, nil)
resp, err := s.regionRequestSender.SendReq(bo, req, region.Region, time.Second)
resp, _, err := s.regionRequestSender.SendReq(bo, req, region.Region, time.Second)
s.Nil(err)
s.NotNil(resp)
}()
Expand Down Expand Up @@ -646,7 +646,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestCloseConnectionOnStoreNotMatch

s.regionRequestSender.client = client
bo := retry.NewBackofferWithVars(context.Background(), 5, nil)
resp, err := s.regionRequestSender.SendReq(bo, req, region.Region, time.Second)
resp, _, err := s.regionRequestSender.SendReq(bo, req, region.Region, time.Second)
s.Nil(err)
s.NotNil(resp)
regionErr, _ := resp.GetRegionError()
Expand Down
8 changes: 4 additions & 4 deletions rawkv/rawkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,7 @@ func (c *Client) sendReq(ctx context.Context, key []byte, req *tikvrpc.Request,
if err != nil {
return nil, nil, err
}
resp, err := sender.SendReq(bo, req, loc.Region, client.ReadTimeoutShort)
resp, _, err := sender.SendReq(bo, req, loc.Region, client.ReadTimeoutShort)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -785,7 +785,7 @@ func (c *Client) doBatchReq(bo *retry.Backoffer, batch kvrpc.Batch, options *raw

sender := locate.NewRegionRequestSender(c.regionCache, c.rpcClient)
req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds())
resp, err := sender.SendReq(bo, req, batch.RegionID, client.ReadTimeoutShort)
resp, _, err := sender.SendReq(bo, req, batch.RegionID, client.ReadTimeoutShort)

batchResp := kvrpc.BatchResult{}
if err != nil {
Expand Down Expand Up @@ -852,7 +852,7 @@ func (c *Client) sendDeleteRangeReq(ctx context.Context, startKey []byte, endKey
})

req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds())
resp, err := sender.SendReq(bo, req, loc.Region, client.ReadTimeoutShort)
resp, _, err := sender.SendReq(bo, req, loc.Region, client.ReadTimeoutShort)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -938,7 +938,7 @@ func (c *Client) doBatchPut(bo *retry.Backoffer, batch kvrpc.Batch, opts *rawOpt
sender := locate.NewRegionRequestSender(c.regionCache, c.rpcClient)
req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds())
req.ApiVersion = c.apiVersion
resp, err := sender.SendReq(bo, req, batch.RegionID, client.ReadTimeoutShort)
resp, _, err := sender.SendReq(bo, req, batch.RegionID, client.ReadTimeoutShort)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit f3e8703

Please sign in to comment.