From d3db28f604f909a918e25568269306183a9186ea Mon Sep 17 00:00:00 2001 From: Aleskey Sin Date: Tue, 21 Jan 2020 18:24:18 +0300 Subject: [PATCH 1/5] Improve proxyStore timeouts. Signed-off-by: Aleskey Sin --- pkg/store/proxy.go | 113 +++++++----- pkg/store/proxy_test.go | 400 ++++++++++++++++++++++++++++++++++++---- 2 files changed, 427 insertions(+), 86 deletions(-) diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 1af4242d28..882eee4b50 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -198,12 +198,7 @@ func newRespCh(ctx context.Context, buffer int) (*ctxRespSender, <-chan *storepb } func (s ctxRespSender) send(r *storepb.SeriesResponse) { - select { - case <-s.ctx.Done(): - return - case s.ch <- r: - return - } + s.ch <- r } // Series returns all series for a requested time range and label matcher. Requested series are taken from other @@ -348,6 +343,21 @@ type streamSeriesSet struct { closeSeries context.CancelFunc } +type recvResponse struct { + r *storepb.SeriesResponse + err error +} + +func startFrameCtx(responseTimeout time.Duration) (context.Context, context.CancelFunc) { + frameTimeoutCtx := context.Background() + var cancel context.CancelFunc + if responseTimeout != 0 { + frameTimeoutCtx, cancel = context.WithTimeout(frameTimeoutCtx, responseTimeout) + return frameTimeoutCtx, cancel + } + return frameTimeoutCtx, nil +} + func startStreamSeriesSet( ctx context.Context, logger log.Logger, @@ -384,14 +394,34 @@ func startStreamSeriesSet( } }() for { - r, err := s.stream.Recv() + frameTimeoutCtx, cancel := startFrameCtx(s.responseTimeout) + if cancel != nil { + defer cancel() + } + rCh := make(chan *recvResponse, 1) + var rr *recvResponse + go func() { + r, err := s.stream.Recv() + rCh <- &recvResponse{r: r, err: err} + }() - if err == io.EOF { + select { + case <-ctx.Done(): + s.timeoutHandling(true, ctx) + return + case <-frameTimeoutCtx.Done(): + s.timeoutHandling(false, frameTimeoutCtx) return + case rr = <-rCh: } + close(rCh) - if err != nil { - wrapErr := errors.Wrapf(err, "receive series from %s", s.name) + if rr.err == io.EOF { + return + } + + if rr.err != nil { + wrapErr := errors.Wrapf(rr.err, "receive series from %s", s.name) if partialResponse { s.warnCh.send(storepb.NewWarnSeriesResponse(wrapErr)) return @@ -402,59 +432,40 @@ func startStreamSeriesSet( s.errMtx.Unlock() return } - numResponses++ - if w := r.GetWarning(); w != "" { + if w := rr.r.GetWarning(); w != "" { s.warnCh.send(storepb.NewWarnSeriesResponse(errors.New(w))) continue } - - select { - case s.recvCh <- r.GetSeries(): - continue - case <-ctx.Done(): - return - } - + s.recvCh <- rr.r.GetSeries() } }() return s } -// Next blocks until new message is received or stream is closed or operation is timed out. -func (s *streamSeriesSet) Next() (ok bool) { - ctx := s.ctx - timeoutMsg := fmt.Sprintf("failed to receive any data from %s", s.name) - - if s.responseTimeout != 0 { - timeoutMsg = fmt.Sprintf("failed to receive any data in %s from %s", s.responseTimeout.String(), s.name) - - timeoutCtx, done := context.WithTimeout(s.ctx, s.responseTimeout) - defer done() - ctx = timeoutCtx +func (s *streamSeriesSet) timeoutHandling(isQueryTimeout bool, ctx context.Context) { + var err error + if isQueryTimeout { + err = errors.Wrap(ctx.Err(), fmt.Sprintf("failed to receive any data from %s", s.name)) + } else { + err = errors.Wrap(ctx.Err(), fmt.Sprintf("failed to receive any data in %s from %s", s.responseTimeout.String(), s.name)) } - - select { - case s.currSeries, ok = <-s.recvCh: - return ok - case <-ctx.Done(): - // closeSeries to shutdown a goroutine in startStreamSeriesSet. - s.closeSeries() - - err := errors.Wrap(ctx.Err(), timeoutMsg) - if s.partialResponse { - level.Warn(s.logger).Log("err", err, "msg", "returning partial response") - s.warnCh.send(storepb.NewWarnSeriesResponse(err)) - return false - } - s.errMtx.Lock() - s.err = err - s.errMtx.Unlock() - - level.Warn(s.logger).Log("err", err, "msg", "partial response disabled; aborting request") - return false + s.closeSeries() + if s.partialResponse { + level.Warn(s.logger).Log("err", err, "msg", "returning partial response") + s.warnCh.send(storepb.NewWarnSeriesResponse(err)) + return } + s.errMtx.Lock() + s.err = err + s.errMtx.Unlock() +} + +// Next blocks until new message is received or stream is closed or operation is timed out. +func (s *streamSeriesSet) Next() (ok bool) { + s.currSeries, ok = <-s.recvCh + return ok } func (s *streamSeriesSet) At() ([]storepb.Label, []storepb.AggrChunk) { diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index 0e5af9492f..cdc3ac7728 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -5,6 +5,7 @@ package store import ( "context" + "fmt" "io" "math" "os" @@ -49,6 +50,7 @@ func (c *testClient) String() string { func (c *testClient) Addr() string { return "testaddr" } + func TestProxyStore_Info(t *testing.T) { defer leaktest.CheckTimeout(t, 10*time.Second)() @@ -412,32 +414,6 @@ func TestProxyStore_Series(t *testing.T) { }, expectedErr: errors.New("fetch series for [name:\"ext\" value:\"1\" ] test: error!"), }, - { - title: "use no chunk to only get labels", - storeAPIs: []Client{ - &testClient{ - StoreClient: &mockedStoreAPI{ - RespSeries: []*storepb.SeriesResponse{ - storeSeriesResponse(t, labels.FromStrings("a", "a")), - }, - }, - minTime: 1, - maxTime: 300, - labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}}, - }, - }, - req: &storepb.SeriesRequest{ - MinTime: 1, - MaxTime: 300, - Matchers: []storepb.LabelMatcher{{Name: "ext", Value: "1", Type: storepb.LabelMatcher_EQ}}, - SkipChunks: true, - }, - expectedSeries: []rawSeries{ - { - lset: []storepb.Label{{Name: "a", Value: "a"}}, - }, - }, - }, } { if ok := t.Run(tc.title, func(t *testing.T) { @@ -488,8 +464,54 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) { expectedWarningsLen int }{ { - title: "partial response disabled one thanos query is slow to respond", + title: "partial response disabled; 1st store is slow, 2nd store is fast;", + storeAPIs: []Client{ + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storepb.NewWarnSeriesResponse(errors.New("warning")), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}), + }, + RespDuration: 10 * time.Second, + }, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}}, + minTime: 1, + maxTime: 300, + }, + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storepb.NewWarnSeriesResponse(errors.New("warning")), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}), + }, + }, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}}, + minTime: 1, + maxTime: 300, + }, + }, + req: &storepb.SeriesRequest{ + MinTime: 1, + MaxTime: 300, + Matchers: []storepb.LabelMatcher{{Name: "ext", Value: "1", Type: storepb.LabelMatcher_EQ}}, + PartialResponseDisabled: true, + }, + expectedErr: errors.New("test: failed to receive any data in 4s from test: context deadline exceeded"), + }, + { + title: "partial response disabled; 1st store is fast, 2nd store is slow;", storeAPIs: []Client{ + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storepb.NewWarnSeriesResponse(errors.New("warning")), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}), + }, + }, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}}, + minTime: 1, + maxTime: 300, + }, &testClient{ StoreClient: &mockedStoreAPI{ RespSeries: []*storepb.SeriesResponse{ @@ -502,6 +524,33 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) { minTime: 1, maxTime: 300, }, + }, + req: &storepb.SeriesRequest{ + MinTime: 1, + MaxTime: 300, + Matchers: []storepb.LabelMatcher{{Name: "ext", Value: "1", Type: storepb.LabelMatcher_EQ}}, + PartialResponseDisabled: true, + }, + expectedErr: errors.New("test: failed to receive any data in 4s from test: context deadline exceeded"), + }, + { + title: "partial response disabled; 1st store is slow on 2nd series, 2nd store is fast;", + storeAPIs: []Client{ + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storepb.NewWarnSeriesResponse(errors.New("warning")), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{3, 1}, {4, 2}, {5, 3}}), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{6, 1}, {7, 2}, {8, 3}}), + }, + RespDuration: 10 * time.Second, + SlowSeriesIndex: 2, + }, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}}, + minTime: 1, + maxTime: 300, + }, &testClient{ StoreClient: &mockedStoreAPI{ RespSeries: []*storepb.SeriesResponse{ @@ -523,7 +572,125 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) { expectedErr: errors.New("test: failed to receive any data in 4s from test: context deadline exceeded"), }, { - title: "partial response enabled one thanos query is slow to respond", + title: "partial response disabled; 1st store is fast to respond, 2nd store is slow on 2nd series;", + storeAPIs: []Client{ + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storepb.NewWarnSeriesResponse(errors.New("warning")), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{3, 1}, {4, 2}, {5, 3}}), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{6, 1}, {7, 2}, {8, 3}}), + }, + }, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}}, + minTime: 1, + maxTime: 300, + }, + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storepb.NewWarnSeriesResponse(errors.New("warning")), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}), + }, + RespDuration: 10 * time.Second, + SlowSeriesIndex: 2, + }, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}}, + minTime: 1, + maxTime: 300, + }, + }, + req: &storepb.SeriesRequest{ + MinTime: 1, + MaxTime: 300, + Matchers: []storepb.LabelMatcher{{Name: "ext", Value: "1", Type: storepb.LabelMatcher_EQ}}, + PartialResponseDisabled: true, + }, + expectedErr: errors.New("test: failed to receive any data in 4s from test: context deadline exceeded"), + }, + { + title: "partial response enabled; 1st store is slow to respond, 2nd store is fast;", + storeAPIs: []Client{ + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storepb.NewWarnSeriesResponse(errors.New("warning")), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}), + }, + RespDuration: 10 * time.Second, + }, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}}, + minTime: 1, + maxTime: 300, + }, + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storepb.NewWarnSeriesResponse(errors.New("warning")), + storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{1, 1}, {2, 2}, {3, 3}}), + }, + }, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}}, + minTime: 1, + maxTime: 300, + }, + }, + req: &storepb.SeriesRequest{ + MinTime: 1, + MaxTime: 300, + Matchers: []storepb.LabelMatcher{{Name: "ext", Value: "1", Type: storepb.LabelMatcher_EQ}}, + }, + expectedSeries: []rawSeries{ + { + lset: []storepb.Label{{Name: "b", Value: "c"}}, + chunks: [][]sample{{{1, 1}, {2, 2}, {3, 3}}}, + }, + }, + expectedWarningsLen: 2, + }, + { + title: "partial response enabled; 1st store is fast, 2nd store is slow;", + storeAPIs: []Client{ + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storepb.NewWarnSeriesResponse(errors.New("warning")), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}), + }, + }, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}}, + minTime: 1, + maxTime: 300, + }, + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storepb.NewWarnSeriesResponse(errors.New("warning")), + storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{1, 1}, {2, 2}, {3, 3}}), + }, + RespDuration: 10 * time.Second, + }, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}}, + minTime: 1, + maxTime: 300, + }, + }, + req: &storepb.SeriesRequest{ + MinTime: 1, + MaxTime: 300, + Matchers: []storepb.LabelMatcher{{Name: "ext", Value: "1", Type: storepb.LabelMatcher_EQ}}, + }, + expectedSeries: []rawSeries{ + { + lset: []storepb.Label{{Name: "a", Value: "b"}}, + chunks: [][]sample{{{1, 1}, {2, 2}, {3, 3}}}, + }, + }, + expectedWarningsLen: 2, + }, + { + title: "partial response enabled; 1st store is fast, 2-3 is slow, 4th is fast;", storeAPIs: []Client{ &testClient{ StoreClient: &mockedStoreAPI{ @@ -548,6 +715,154 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) { minTime: 1, maxTime: 300, }, + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storepb.NewWarnSeriesResponse(errors.New("warning")), + storeSeriesResponse(t, labels.FromStrings("c", "d"), []sample{{1, 1}, {2, 2}, {3, 3}}), + }, + RespDuration: 10 * time.Second, + }, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}}, + minTime: 1, + maxTime: 300, + }, + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storepb.NewWarnSeriesResponse(errors.New("warning")), + storeSeriesResponse(t, labels.FromStrings("d", "f"), []sample{{1, 1}, {2, 2}, {3, 3}}), + }, + }, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}}, + minTime: 1, + maxTime: 300, + }, + }, + req: &storepb.SeriesRequest{ + MinTime: 1, + MaxTime: 300, + Matchers: []storepb.LabelMatcher{{Name: "ext", Value: "1", Type: storepb.LabelMatcher_EQ}}, + }, + expectedSeries: []rawSeries{ + { + lset: []storepb.Label{{Name: "a", Value: "b"}}, + chunks: [][]sample{{{1, 1}, {2, 2}, {3, 3}}}, + }, + { + lset: []storepb.Label{{Name: "d", Value: "f"}}, + chunks: [][]sample{{{1, 1}, {2, 2}, {3, 3}}}, + }, + }, + expectedWarningsLen: 4, + }, + { + title: "partial response enabled; 1st store is slow on 2nd series, 2nd store is fast", + storeAPIs: []Client{ + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storepb.NewWarnSeriesResponse(errors.New("warning")), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{4, 1}, {5, 2}, {6, 3}}), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{7, 1}, {8, 2}, {9, 3}}), + }, + RespDuration: 10 * time.Second, + SlowSeriesIndex: 2, + }, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}}, + minTime: 1, + maxTime: 300, + }, + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storepb.NewWarnSeriesResponse(errors.New("warning")), + storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{1, 1}, {2, 2}, {3, 3}}), + }, + }, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}}, + minTime: 1, + maxTime: 300, + }, + }, + req: &storepb.SeriesRequest{ + MinTime: 1, + MaxTime: 300, + Matchers: []storepb.LabelMatcher{{Name: "ext", Value: "1", Type: storepb.LabelMatcher_EQ}}, + }, + expectedSeries: []rawSeries{ + { + lset: []storepb.Label{{Name: "a", Value: "b"}}, + chunks: [][]sample{{{1, 1}, {2, 2}, {3, 3}}}, + }, + { + lset: []storepb.Label{{Name: "b", Value: "c"}}, + chunks: [][]sample{{{1, 1}, {2, 2}, {3, 3}}}, + }, + }, + expectedWarningsLen: 3, + }, + { + title: "partial response disabled; all stores respond 3s", + storeAPIs: []Client{ + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{4, 1}, {5, 2}, {6, 3}}), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{7, 1}, {8, 2}, {9, 3}}), + }, + RespDuration: 3 * time.Second, + }, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}}, + minTime: 1, + maxTime: 300, + }, + }, + req: &storepb.SeriesRequest{ + MinTime: 1, + MaxTime: 300, + Matchers: []storepb.LabelMatcher{{Name: "ext", Value: "1", Type: storepb.LabelMatcher_EQ}}, + PartialResponseDisabled: true, + }, + expectedSeries: []rawSeries{ + { + lset: []storepb.Label{{Name: "a", Value: "b"}}, + chunks: [][]sample{{{1, 1}, {2, 2}, {3, 3}}}, + }, + }, + expectedErr: errors.New("test: failed to receive any data from test: context deadline exceeded"), + }, + { + title: "partial response enabled; all stores respond 3s", + storeAPIs: []Client{ + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{4, 1}, {5, 2}, {6, 3}}), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{7, 1}, {8, 2}, {9, 3}}), + }, + RespDuration: 3 * time.Second, + }, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}}, + minTime: 1, + maxTime: 300, + }, + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{1, 1}, {2, 2}, {3, 3}}), + storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{4, 1}, {5, 2}, {6, 3}}), + storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{7, 1}, {8, 2}, {9, 3}}), + }, + RespDuration: 3 * time.Second, + }, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}}, + minTime: 1, + maxTime: 300, + }, }, req: &storepb.SeriesRequest{ MinTime: 1, @@ -559,6 +874,10 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) { lset: []storepb.Label{{Name: "a", Value: "b"}}, chunks: [][]sample{{{1, 1}, {2, 2}, {3, 3}}}, }, + { + lset: []storepb.Label{{Name: "b", Value: "c"}}, + chunks: [][]sample{{{1, 1}, {2, 2}, {3, 3}}}, + }, }, expectedWarningsLen: 2, }, @@ -572,9 +891,13 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) { 4*time.Second, ) - s := newStoreSeriesServer(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + s := newStoreSeriesServer(ctx) + t0 := time.Now() err := q.Series(tc.req, s) + elapsedTime := time.Since(t0) if tc.expectedErr != nil { testutil.NotOk(t, err) testutil.Equals(t, tc.expectedErr.Error(), err.Error()) @@ -585,6 +908,8 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) { seriesEquals(t, tc.expectedSeries, s.SeriesSet) testutil.Equals(t, tc.expectedWarningsLen, len(s.Warnings), "got %v", s.Warnings) + + testutil.Assert(t, elapsedTime < 5010*time.Millisecond, fmt.Sprintf("Request has taken %f, expected: <%d, it seems that responseTimeout doesn't work properly.", elapsedTime.Seconds(), 5)) }); !ok { return } @@ -1015,6 +1340,8 @@ type mockedStoreAPI struct { RespLabelNames *storepb.LabelNamesResponse RespError error RespDuration time.Duration + // Index of series in store to slow response. + SlowSeriesIndex int LastSeriesReq *storepb.SeriesRequest LastLabelValuesReq *storepb.LabelValuesRequest @@ -1028,7 +1355,7 @@ func (s *mockedStoreAPI) Info(ctx context.Context, req *storepb.InfoRequest, _ . func (s *mockedStoreAPI) Series(ctx context.Context, req *storepb.SeriesRequest, _ ...grpc.CallOption) (storepb.Store_SeriesClient, error) { s.LastSeriesReq = req - return &StoreSeriesClient{ctx: ctx, respSet: s.RespSeries, respDur: s.RespDuration}, s.RespError + return &StoreSeriesClient{ctx: ctx, respSet: s.RespSeries, respDur: s.RespDuration, slowSeriesIndex: s.SlowSeriesIndex}, s.RespError } func (s *mockedStoreAPI) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest, _ ...grpc.CallOption) (*storepb.LabelNamesResponse, error) { @@ -1047,14 +1374,17 @@ func (s *mockedStoreAPI) LabelValues(ctx context.Context, req *storepb.LabelValu type StoreSeriesClient struct { // This field just exist to pseudo-implement the unused methods of the interface. storepb.Store_SeriesClient - ctx context.Context - i int - respSet []*storepb.SeriesResponse - respDur time.Duration + ctx context.Context + i int + respSet []*storepb.SeriesResponse + respDur time.Duration + slowSeriesIndex int } func (c *StoreSeriesClient) Recv() (*storepb.SeriesResponse, error) { - time.Sleep(c.respDur) + if c.respDur != 0 && (c.slowSeriesIndex == c.i || c.slowSeriesIndex == 0) { + time.Sleep(c.respDur) + } if c.i >= len(c.respSet) { return nil, io.EOF From e260217f434ec9d7f8358995b201dc3d7777de92 Mon Sep 17 00:00:00 2001 From: Aleskey Sin Date: Mon, 3 Feb 2020 18:33:09 +0300 Subject: [PATCH 2/5] Fix send to closed channel. Signed-off-by: Aleskey Sin --- pkg/store/proxy.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 882eee4b50..55deb89910 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -403,6 +403,7 @@ func startStreamSeriesSet( go func() { r, err := s.stream.Recv() rCh <- &recvResponse{r: r, err: err} + close(rCh) }() select { @@ -414,7 +415,6 @@ func startStreamSeriesSet( return case rr = <-rCh: } - close(rCh) if rr.err == io.EOF { return From 9c6412ce5dad56feb903e5569b28e171980afd19 Mon Sep 17 00:00:00 2001 From: Aleskey Sin Date: Thu, 13 Feb 2020 13:35:19 +0300 Subject: [PATCH 3/5] Update for PR. Signed-off-by: Aleskey Sin --- pkg/store/proxy.go | 58 +++++++++++++++++++++++----------------------- 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 55deb89910..c425110777 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -348,14 +348,14 @@ type recvResponse struct { err error } -func startFrameCtx(responseTimeout time.Duration) (context.Context, context.CancelFunc) { +func frameCtx(responseTimeout time.Duration) (context.Context, context.CancelFunc) { frameTimeoutCtx := context.Background() var cancel context.CancelFunc if responseTimeout != 0 { frameTimeoutCtx, cancel = context.WithTimeout(frameTimeoutCtx, responseTimeout) return frameTimeoutCtx, cancel } - return frameTimeoutCtx, nil + return frameTimeoutCtx, func() {} } func startStreamSeriesSet( @@ -393,43 +393,49 @@ func startStreamSeriesSet( emptyStreamResponses.Inc() } }() - for { - frameTimeoutCtx, cancel := startFrameCtx(s.responseTimeout) - if cancel != nil { - defer cancel() + + rCh := make(chan *recvResponse) + recvCancel := make(chan bool) + go func() { + for { + r, err := s.stream.Recv() + select { + case <-recvCancel: + close(rCh) + return + case rCh <- &recvResponse{r: r, err: err}: + } } - rCh := make(chan *recvResponse, 1) + }() + for { + frameTimeoutCtx, cancel := frameCtx(s.responseTimeout) + defer cancel() var rr *recvResponse - go func() { - r, err := s.stream.Recv() - rCh <- &recvResponse{r: r, err: err} - close(rCh) - }() + var err error select { case <-ctx.Done(): - s.timeoutHandling(true, ctx) + close(recvCancel) + err = errors.Wrap(ctx.Err(), fmt.Sprintf("failed to receive any data from %s", s.name)) + s.handleErr(err) return case <-frameTimeoutCtx.Done(): - s.timeoutHandling(false, frameTimeoutCtx) + close(recvCancel) + err = errors.Wrap(frameTimeoutCtx.Err(), fmt.Sprintf("failed to receive any data in %s from %s", s.responseTimeout.String(), s.name)) + s.handleErr(err) return case rr = <-rCh: } if rr.err == io.EOF { + close(recvCancel) return } if rr.err != nil { wrapErr := errors.Wrapf(rr.err, "receive series from %s", s.name) - if partialResponse { - s.warnCh.send(storepb.NewWarnSeriesResponse(wrapErr)) - return - } - - s.errMtx.Lock() - s.err = wrapErr - s.errMtx.Unlock() + s.handleErr(wrapErr) + close(recvCancel) return } numResponses++ @@ -444,13 +450,7 @@ func startStreamSeriesSet( return s } -func (s *streamSeriesSet) timeoutHandling(isQueryTimeout bool, ctx context.Context) { - var err error - if isQueryTimeout { - err = errors.Wrap(ctx.Err(), fmt.Sprintf("failed to receive any data from %s", s.name)) - } else { - err = errors.Wrap(ctx.Err(), fmt.Sprintf("failed to receive any data in %s from %s", s.responseTimeout.String(), s.name)) - } +func (s *streamSeriesSet) handleErr(err error) { s.closeSeries() if s.partialResponse { level.Warn(s.logger).Log("err", err, "msg", "returning partial response") From ec2441d0a5284c798b144c59b70ff9bb483a39a1 Mon Sep 17 00:00:00 2001 From: Aleskey Sin Date: Fri, 14 Feb 2020 11:36:43 +0300 Subject: [PATCH 4/5] Fix recv done channel. Signed-off-by: Aleskey Sin --- pkg/store/proxy.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index c425110777..70562aac05 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -395,12 +395,12 @@ func startStreamSeriesSet( }() rCh := make(chan *recvResponse) - recvCancel := make(chan bool) + done := make(chan struct{}) go func() { for { r, err := s.stream.Recv() select { - case <-recvCancel: + case <-done: close(rCh) return case rCh <- &recvResponse{r: r, err: err}: @@ -415,12 +415,12 @@ func startStreamSeriesSet( var err error select { case <-ctx.Done(): - close(recvCancel) + close(done) err = errors.Wrap(ctx.Err(), fmt.Sprintf("failed to receive any data from %s", s.name)) s.handleErr(err) return case <-frameTimeoutCtx.Done(): - close(recvCancel) + close(done) err = errors.Wrap(frameTimeoutCtx.Err(), fmt.Sprintf("failed to receive any data in %s from %s", s.responseTimeout.String(), s.name)) s.handleErr(err) return @@ -428,14 +428,14 @@ func startStreamSeriesSet( } if rr.err == io.EOF { - close(recvCancel) + close(done) return } if rr.err != nil { wrapErr := errors.Wrapf(rr.err, "receive series from %s", s.name) s.handleErr(wrapErr) - close(recvCancel) + close(done) return } numResponses++ From 6fcb67780088d12fdab2cdd59dd72250c37727b9 Mon Sep 17 00:00:00 2001 From: Aleskey Sin Date: Tue, 18 Feb 2020 14:50:13 +0300 Subject: [PATCH 5/5] PR fixes. Signed-off-by: Aleskey Sin --- pkg/store/proxy.go | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 70562aac05..c422f516f8 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -411,18 +411,12 @@ func startStreamSeriesSet( frameTimeoutCtx, cancel := frameCtx(s.responseTimeout) defer cancel() var rr *recvResponse - - var err error select { case <-ctx.Done(): - close(done) - err = errors.Wrap(ctx.Err(), fmt.Sprintf("failed to receive any data from %s", s.name)) - s.handleErr(err) + s.handleErr(errors.Wrapf(ctx.Err(), "failed to receive any data from %s", s.name), done) return case <-frameTimeoutCtx.Done(): - close(done) - err = errors.Wrap(frameTimeoutCtx.Err(), fmt.Sprintf("failed to receive any data in %s from %s", s.responseTimeout.String(), s.name)) - s.handleErr(err) + s.handleErr(errors.Wrapf(ctx.Err(), "failed to receive any data in %s from %s", s.responseTimeout.String(), s.name), done) return case rr = <-rCh: } @@ -434,8 +428,7 @@ func startStreamSeriesSet( if rr.err != nil { wrapErr := errors.Wrapf(rr.err, "receive series from %s", s.name) - s.handleErr(wrapErr) - close(done) + s.handleErr(wrapErr, done) return } numResponses++ @@ -450,8 +443,10 @@ func startStreamSeriesSet( return s } -func (s *streamSeriesSet) handleErr(err error) { +func (s *streamSeriesSet) handleErr(err error, done chan struct{}) { + defer close(done) s.closeSeries() + if s.partialResponse { level.Warn(s.logger).Log("err", err, "msg", "returning partial response") s.warnCh.send(storepb.NewWarnSeriesResponse(err))