diff --git a/CHANGELOG.md b/CHANGELOG.md index 017ec2e415..c4a5fae76b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#2254](https://github.com/thanos-io/thanos/pull/2254) Bucket: Fix metrics registered multiple times in bucket replicate - [#2271](https://github.com/thanos-io/thanos/pull/2271) Bucket Web: Fixed Issue #2260 bucket passes null when storage is empty - [#2339](https://github.com/thanos-io/thanos/pull/2339) Query: fix a bug where `--store.unhealthy-timeout` was never respected +- [#2411](https://github.com/thanos-io/thanos/pull/2411) Query: fix a bug where queries might not time out sometimes due to issues with one or more StoreAPIs ### Added diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 8440de2f96..c071848c05 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -434,7 +434,12 @@ func startStreamSeriesSet( s.warnCh.send(storepb.NewWarnSeriesResponse(errors.New(w))) continue } - s.recvCh <- rr.r.GetSeries() + select { + case s.recvCh <- rr.r.GetSeries(): + case <-ctx.Done(): + s.handleErr(errors.Wrapf(ctx.Err(), "failed to receive any data from %s", s.name), done) + return + } } }() return s diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index 1e49cce456..8e8b6022d4 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -462,6 +462,60 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) { expectedErr error expectedWarningsLen int }{ + { + title: "partial response disabled; 1st errors out after some delay; 2nd store is fast", + storeAPIs: []Client{ + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storepb.NewWarnSeriesResponse(errors.New("warning")), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}), + }, + RespDuration: 2 * time.Second, + SlowSeriesIndex: 1, + injectedError: errors.New("test"), + injectedErrorIndex: 1, + }, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}}, + minTime: 1, + maxTime: 300, + }, + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storepb.NewWarnSeriesResponse(errors.New("warning")), + storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}), + storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}), + storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}), + storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}), + storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}), + + storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}), + storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}), + storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}), + storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}), + storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}), + + storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}), + storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}), + storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}), + storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}), + storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{4, 1}, {5, 2}, {6, 3}}), + }, + }, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}}, + minTime: 1, + maxTime: 300, + }, + }, + req: &storepb.SeriesRequest{ + MinTime: 1, + MaxTime: 300, + Matchers: []storepb.LabelMatcher{{Name: "ext", Value: "1", Type: storepb.LabelMatcher_EQ}}, + PartialResponseDisabled: true, + }, + expectedErr: errors.New("test: receive series from test: test"), + }, { title: "partial response disabled; 1st store is slow, 2nd store is fast;", storeAPIs: []Client{ @@ -1349,6 +1403,10 @@ type mockedStoreAPI struct { LastSeriesReq *storepb.SeriesRequest LastLabelValuesReq *storepb.LabelValuesRequest LastLabelNamesReq *storepb.LabelNamesRequest + + // injectedError will be injected into Recv() if not nil. + injectedError error + injectedErrorIndex int } func (s *mockedStoreAPI) Info(ctx context.Context, req *storepb.InfoRequest, _ ...grpc.CallOption) (*storepb.InfoResponse, error) { @@ -1358,7 +1416,7 @@ func (s *mockedStoreAPI) Info(ctx context.Context, req *storepb.InfoRequest, _ . func (s *mockedStoreAPI) Series(ctx context.Context, req *storepb.SeriesRequest, _ ...grpc.CallOption) (storepb.Store_SeriesClient, error) { s.LastSeriesReq = req - return &StoreSeriesClient{ctx: ctx, respSet: s.RespSeries, respDur: s.RespDuration, slowSeriesIndex: s.SlowSeriesIndex}, s.RespError + return &StoreSeriesClient{injectedErrorIndex: s.injectedErrorIndex, injectedError: s.injectedError, ctx: ctx, respSet: s.RespSeries, respDur: s.RespDuration, slowSeriesIndex: s.SlowSeriesIndex}, s.RespError } func (s *mockedStoreAPI) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest, _ ...grpc.CallOption) (*storepb.LabelNamesResponse, error) { @@ -1382,12 +1440,18 @@ type StoreSeriesClient struct { respSet []*storepb.SeriesResponse respDur time.Duration slowSeriesIndex int + + injectedError error + injectedErrorIndex int } func (c *StoreSeriesClient) Recv() (*storepb.SeriesResponse, error) { if c.respDur != 0 && (c.slowSeriesIndex == c.i || c.slowSeriesIndex == 0) { time.Sleep(c.respDur) } + if c.injectedError != nil && (c.injectedErrorIndex == c.i || c.injectedErrorIndex == 0) { + return nil, c.injectedError + } if c.i >= len(c.respSet) { return nil, io.EOF