diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index c680686a11d..4eab4580de9 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -75,6 +75,25 @@ func (s *ProxyStore) Info(ctx context.Context, r *storepb.InfoRequest) (*storepb return res, nil } +type ctxRespSender struct { + ctx context.Context + ch chan<- *storepb.SeriesResponse +} + +func newRespCh(ctx context.Context, buffer int) (*ctxRespSender, <-chan *storepb.SeriesResponse, func()) { + respCh := make(chan *storepb.SeriesResponse, buffer) + return &ctxRespSender{ctx: ctx, ch: respCh}, respCh, func() { close(respCh) } +} + +func (s ctxRespSender) send(r *storepb.SeriesResponse) { + select { + case <-s.ctx.Done(): + return + case s.ch <- r: + return + } +} + // Series returns all series for a requested time range and label matcher. Requested series are taken from other // stores and proxied to RPC client. NOTE: Resulted data are not trimmed exactly to min and max time range. func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { @@ -94,57 +113,69 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe } var ( - seriesSet []storepb.SeriesSet - respCh = make(chan *storepb.SeriesResponse, len(stores)+1) - g, gctx = errgroup.WithContext(srv.Context()) - ) + g, gctx = errgroup.WithContext(srv.Context()) - var storeDebugMsgs []string + // Allow to buffer max 10 series response. + // Each might be quite large (multi chunk long series given by sidecar). + respSender, respRecv, closeFn = newRespCh(gctx, 10) + ) - for _, st := range stores { - // We might be able to skip the store if its meta information indicates - // it cannot have series matching our query. - // NOTE: all matchers are validated in labelsMatches method so we explicitly ignore error. - if ok, _ := storeMatches(st, r.MinTime, r.MaxTime, newMatchers...); !ok { - storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s filtered out", st)) - continue - } - storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s queried", st)) - - sc, err := st.Series(gctx, &storepb.SeriesRequest{ - MinTime: r.MinTime, - MaxTime: r.MaxTime, - Matchers: newMatchers, - Aggregates: r.Aggregates, - MaxResolutionWindow: r.MaxResolutionWindow, - PartialResponseDisabled: r.PartialResponseDisabled, - }) - if err != nil { - storeID := fmt.Sprintf("%v", storepb.LabelsToString(st.Labels())) - if storeID == "" { - storeID = "Store Gateway" + g.Go(func() error { + var ( + seriesSet []storepb.SeriesSet + storeDebugMsgs []string + r = &storepb.SeriesRequest{ + MinTime: r.MinTime, + MaxTime: r.MaxTime, + Matchers: newMatchers, + Aggregates: r.Aggregates, + MaxResolutionWindow: r.MaxResolutionWindow, + PartialResponseDisabled: r.PartialResponseDisabled, } - err = errors.Wrapf(err, "fetch series for %s %s", storeID, st) - if r.PartialResponseDisabled { - level.Error(s.logger).Log("err", err, "msg", "partial response disabled; aborting request") - return err + wg = &sync.WaitGroup{} + ) + + defer func() { + wg.Wait() + closeFn() + }() + + for _, st := range stores { + // We might be able to skip the store if its meta information indicates + // it cannot have series matching our query. + // NOTE: all matchers are validated in labelsMatches method so we explicitly ignore error. + if ok, _ := storeMatches(st, r.MinTime, r.MaxTime, r.Matchers...); !ok { + storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s filtered out", st)) + continue } - respCh <- storepb.NewWarnSeriesResponse(err) - continue - } + storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s queried", st)) - seriesSet = append(seriesSet, startStreamSeriesSet(sc, respCh, 10)) - } + sc, err := st.Series(gctx, r) + if err != nil { + storeID := fmt.Sprintf("%v", storepb.LabelsToString(st.Labels())) + if storeID == "" { + storeID = "Store Gateway" + } + err = errors.Wrapf(err, "fetch series for %s %s", storeID, st) + if r.PartialResponseDisabled { + level.Error(s.logger).Log("err", err, "msg", "partial response disabled; aborting request") + return err + } + respSender.send(storepb.NewWarnSeriesResponse(err)) + continue + } - level.Debug(s.logger).Log("msg", strings.Join(storeDebugMsgs, ";")) - g.Go(func() error { - defer close(respCh) + // Schedule streamSeriesSet that translates gRPC streamed response into seriesSet (if series) or respCh if warnings. + seriesSet = append(seriesSet, startStreamSeriesSet(gctx, wg, sc, respSender, st.String(), !r.PartialResponseDisabled)) + } + + level.Debug(s.logger).Log("msg", strings.Join(storeDebugMsgs, ";")) if len(seriesSet) == 0 { // This is indicates that configured StoreAPIs are not the ones end user expects err := errors.New("No store matched for this query") level.Warn(s.logger).Log("err", err, "stores", strings.Join(storeDebugMsgs, ";")) - respCh <- storepb.NewWarnSeriesResponse(err) + respSender.send(storepb.NewWarnSeriesResponse(err)) return nil } @@ -152,12 +183,12 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe for mergedSet.Next() { var series storepb.Series series.Labels, series.Chunks = mergedSet.At() - respCh <- storepb.NewSeriesResponse(&series) + respSender.send(storepb.NewSeriesResponse(&series)) } return mergedSet.Err() }) - for resp := range respCh { + for resp := range respRecv { if err := srv.Send(resp); err != nil { return status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error()) } @@ -171,48 +202,74 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe } +type warnSender interface { + send(*storepb.SeriesResponse) +} + // streamSeriesSet iterates over incoming stream of series. // All errors are sent out of band via warning channel. type streamSeriesSet struct { stream storepb.Store_SeriesClient - warnCh chan<- *storepb.SeriesResponse + warnCh warnSender currSeries *storepb.Series recvCh chan *storepb.Series + + errMtx sync.Mutex + err error + + name string } func startStreamSeriesSet( + ctx context.Context, + wg *sync.WaitGroup, stream storepb.Store_SeriesClient, - warnCh chan<- *storepb.SeriesResponse, - bufferSize int, + warnCh warnSender, + name string, + partialResponse bool, ) *streamSeriesSet { s := &streamSeriesSet{ stream: stream, warnCh: warnCh, - recvCh: make(chan *storepb.Series, bufferSize), + recvCh: make(chan *storepb.Series, 10), + name: name, } - go s.fetchLoop() - return s -} -func (s *streamSeriesSet) fetchLoop() { - defer close(s.recvCh) - for { - r, err := s.stream.Recv() - if err == io.EOF { - return - } - if err != nil { - s.warnCh <- storepb.NewWarnSeriesResponse(errors.Wrap(err, "receive series")) - return - } + wg.Add(1) + go func() { + defer wg.Done() + defer close(s.recvCh) + for { + r, err := s.stream.Recv() + if err == io.EOF { + return + } - if w := r.GetWarning(); w != "" { - s.warnCh <- storepb.NewWarnSeriesResponse(errors.New(w)) - continue + if ctx.Err() != nil { + return + } + + if err != nil { + if partialResponse { + s.warnCh.send(storepb.NewWarnSeriesResponse(errors.Wrap(err, "receive series"))) + return + } + + s.errMtx.Lock() + defer s.errMtx.Unlock() + s.err = err + return + } + + if w := r.GetWarning(); w != "" { + s.warnCh.send(storepb.NewWarnSeriesResponse(errors.New(w))) + continue + } + s.recvCh <- r.GetSeries() } - s.recvCh <- r.GetSeries() - } + }() + return s } // Next blocks until new message is received or stream is closed. @@ -228,7 +285,9 @@ func (s *streamSeriesSet) At() ([]storepb.Label, []storepb.AggrChunk) { return s.currSeries.Labels, s.currSeries.Chunks } func (s *streamSeriesSet) Err() error { - return nil + s.errMtx.Lock() + defer s.errMtx.Unlock() + return errors.Wrap(s.err, s.name) } // matchStore returns true if the given store may hold data for the given label matchers. diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index c70d0afc099..5d5fb1a5d73 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -500,14 +500,13 @@ func TestProxyStore_Series_RegressionFillResponseChannel(t *testing.T) { ctx := context.Background() s := newStoreSeriesServer(ctx) - err := q.Series( + testutil.Ok(t, q.Series( &storepb.SeriesRequest{ MinTime: 1, MaxTime: 300, Matchers: []storepb.LabelMatcher{{Name: "fed", Value: "a", Type: storepb.LabelMatcher_EQ}}, }, s, - ) - testutil.Ok(t, err) + )) testutil.Equals(t, 0, len(s.SeriesSet)) testutil.Equals(t, 110, len(s.Warnings)) } @@ -658,6 +657,7 @@ func TestStoreMatches(t *testing.T) { type storeSeriesServer struct { // This field just exist to pseudo-implement the unused methods of the interface. storepb.Store_SeriesServer + ctx context.Context SeriesSet []storepb.Series