Skip to content

Commit

Permalink
store: proxy: fix queries never timing out bug (#2411)
Browse files Browse the repository at this point in the history
* store: proxy: add test for deadlocking problem

Signed-off-by: Giedrius Statkevičius <giedriuswork@gmail.com>

* store: proxy: add fix for timeouts

Checking here if the series context has ended is the correct fix here.
We want to check it because if any of the other Series() calls error out
then the context is canceled. So, it is equal to checking for errors
"downstream", in `mergedSeriesSet`.

Also, `handleErr()` here is the correct function to use because in such
a case we want to set `s.err` -- if `io.EOF` still hasn't been received
then it means that StoreAPI still has some data that it wants to send
but hasn't yet.

With this, the previously added test passes.

Signed-off-by: Giedrius Statkevičius <giedriuswork@gmail.com>
  • Loading branch information
GiedriusS authored Apr 14, 2020
1 parent d0d54d3 commit 91f5960
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#2254](https://github.com/thanos-io/thanos/pull/2254) Bucket: Fix metrics registered multiple times in bucket replicate
- [#2271](https://github.com/thanos-io/thanos/pull/2271) Bucket Web: Fixed Issue #2260 bucket passes null when storage is empty
- [#2339](https://github.com/thanos-io/thanos/pull/2339) Query: fix a bug where `--store.unhealthy-timeout` was never respected
- [#2411](https://github.com/thanos-io/thanos/pull/2411) Query: fix a bug where queries might not time out sometimes due to issues with one or more StoreAPIs

### Added

Expand Down
7 changes: 6 additions & 1 deletion pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,12 @@ func startStreamSeriesSet(
s.warnCh.send(storepb.NewWarnSeriesResponse(errors.New(w)))
continue
}
s.recvCh <- rr.r.GetSeries()
select {
case s.recvCh <- rr.r.GetSeries():
case <-ctx.Done():
s.handleErr(errors.Wrapf(ctx.Err(), "failed to receive any data from %s", s.name), done)
return
}
}
}()
return s
Expand Down
66 changes: 65 additions & 1 deletion pkg/store/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,60 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) {
expectedErr error
expectedWarningsLen int
}{
{
title: "partial response disabled; 1st errors out after some delay; 2nd store is fast",
storeAPIs: []Client{
&testClient{
StoreClient: &mockedStoreAPI{
RespSeries: []*storepb.SeriesResponse{
storepb.NewWarnSeriesResponse(errors.New("warning")),
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}),
},
RespDuration: 2 * time.Second,
SlowSeriesIndex: 1,
injectedError: errors.New("test"),
injectedErrorIndex: 1,
},
labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}},
minTime: 1,
maxTime: 300,
},
&testClient{
StoreClient: &mockedStoreAPI{
RespSeries: []*storepb.SeriesResponse{
storepb.NewWarnSeriesResponse(errors.New("warning")),
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),

storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),

storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}),
},
},
labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}},
minTime: 1,
maxTime: 300,
},
},
req: &storepb.SeriesRequest{
MinTime: 1,
MaxTime: 300,
Matchers: []storepb.LabelMatcher{{Name: "ext", Value: "1", Type: storepb.LabelMatcher_EQ}},
PartialResponseDisabled: true,
},
expectedErr: errors.New("test: receive series from test: test"),
},
{
title: "partial response disabled; 1st store is slow, 2nd store is fast;",
storeAPIs: []Client{
Expand Down Expand Up @@ -1349,6 +1403,10 @@ type mockedStoreAPI struct {
LastSeriesReq *storepb.SeriesRequest
LastLabelValuesReq *storepb.LabelValuesRequest
LastLabelNamesReq *storepb.LabelNamesRequest

// injectedError will be injected into Recv() if not nil.
injectedError error
injectedErrorIndex int
}

func (s *mockedStoreAPI) Info(ctx context.Context, req *storepb.InfoRequest, _ ...grpc.CallOption) (*storepb.InfoResponse, error) {
Expand All @@ -1358,7 +1416,7 @@ func (s *mockedStoreAPI) Info(ctx context.Context, req *storepb.InfoRequest, _ .
func (s *mockedStoreAPI) Series(ctx context.Context, req *storepb.SeriesRequest, _ ...grpc.CallOption) (storepb.Store_SeriesClient, error) {
s.LastSeriesReq = req

return &StoreSeriesClient{ctx: ctx, respSet: s.RespSeries, respDur: s.RespDuration, slowSeriesIndex: s.SlowSeriesIndex}, s.RespError
return &StoreSeriesClient{injectedErrorIndex: s.injectedErrorIndex, injectedError: s.injectedError, ctx: ctx, respSet: s.RespSeries, respDur: s.RespDuration, slowSeriesIndex: s.SlowSeriesIndex}, s.RespError
}

func (s *mockedStoreAPI) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest, _ ...grpc.CallOption) (*storepb.LabelNamesResponse, error) {
Expand All @@ -1382,12 +1440,18 @@ type StoreSeriesClient struct {
respSet []*storepb.SeriesResponse
respDur time.Duration
slowSeriesIndex int

injectedError error
injectedErrorIndex int
}

func (c *StoreSeriesClient) Recv() (*storepb.SeriesResponse, error) {
if c.respDur != 0 && (c.slowSeriesIndex == c.i || c.slowSeriesIndex == 0) {
time.Sleep(c.respDur)
}
if c.injectedError != nil && (c.injectedErrorIndex == c.i || c.injectedErrorIndex == 0) {
return nil, c.injectedError
}

if c.i >= len(c.respSet) {
return nil, io.EOF
Expand Down

0 comments on commit 91f5960

Please sign in to comment.