Skip to content

Commit

Permalink
proxy: Make sure all go routines are schedules using single errgroup. (
Browse files Browse the repository at this point in the history
…#745)

This fixes potential race condition. Bit more efficient version of #744

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka authored Jan 21, 2019
1 parent 2f08fec commit 705d6a2
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 69 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel

### Fixed

- [#745](https://github.com/improbable-eng/thanos/pull/745) - Fixed race conditions and edge cases for Thanos Querier fanout logic.
- [#649](https://github.com/improbable-eng/thanos/issues/649) - Fixed store label values api to add also external label values.
- [#708](https://github.com/improbable-eng/thanos/issues/708) - `"X-Amz-Acl": "bucket-owner-full-control"` metadata for s3 upload operation is no longer set by default which was breaking some providers handled by minio client.

Expand Down
191 changes: 125 additions & 66 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,25 @@ func (s *ProxyStore) Info(ctx context.Context, r *storepb.InfoRequest) (*storepb
return res, nil
}

type ctxRespSender struct {
ctx context.Context
ch chan<- *storepb.SeriesResponse
}

func newRespCh(ctx context.Context, buffer int) (*ctxRespSender, <-chan *storepb.SeriesResponse, func()) {
respCh := make(chan *storepb.SeriesResponse, buffer)
return &ctxRespSender{ctx: ctx, ch: respCh}, respCh, func() { close(respCh) }
}

func (s ctxRespSender) send(r *storepb.SeriesResponse) {
select {
case <-s.ctx.Done():
return
case s.ch <- r:
return
}
}

// Series returns all series for a requested time range and label matcher. Requested series are taken from other
// stores and proxied to RPC client. NOTE: Resulted data are not trimmed exactly to min and max time range.
func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
Expand All @@ -94,70 +113,82 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
}

var (
seriesSet []storepb.SeriesSet
respCh = make(chan *storepb.SeriesResponse, len(stores)+1)
g, gctx = errgroup.WithContext(srv.Context())
)
g, gctx = errgroup.WithContext(srv.Context())

var storeDebugMsgs []string
// Allow to buffer max 10 series response.
// Each might be quite large (multi chunk long series given by sidecar).
respSender, respRecv, closeFn = newRespCh(gctx, 10)
)

for _, st := range stores {
// We might be able to skip the store if its meta information indicates
// it cannot have series matching our query.
// NOTE: all matchers are validated in labelsMatches method so we explicitly ignore error.
if ok, _ := storeMatches(st, r.MinTime, r.MaxTime, newMatchers...); !ok {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s filtered out", st))
continue
}
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s queried", st))

sc, err := st.Series(gctx, &storepb.SeriesRequest{
MinTime: r.MinTime,
MaxTime: r.MaxTime,
Matchers: newMatchers,
Aggregates: r.Aggregates,
MaxResolutionWindow: r.MaxResolutionWindow,
PartialResponseDisabled: r.PartialResponseDisabled,
})
if err != nil {
storeID := fmt.Sprintf("%v", storepb.LabelsToString(st.Labels()))
if storeID == "" {
storeID = "Store Gateway"
g.Go(func() error {
var (
seriesSet []storepb.SeriesSet
storeDebugMsgs []string
r = &storepb.SeriesRequest{
MinTime: r.MinTime,
MaxTime: r.MaxTime,
Matchers: newMatchers,
Aggregates: r.Aggregates,
MaxResolutionWindow: r.MaxResolutionWindow,
PartialResponseDisabled: r.PartialResponseDisabled,
}
err = errors.Wrapf(err, "fetch series for %s %s", storeID, st)
if r.PartialResponseDisabled {
level.Error(s.logger).Log("err", err, "msg", "partial response disabled; aborting request")
return err
wg = &sync.WaitGroup{}
)

defer func() {
wg.Wait()
closeFn()
}()

for _, st := range stores {
// We might be able to skip the store if its meta information indicates
// it cannot have series matching our query.
// NOTE: all matchers are validated in labelsMatches method so we explicitly ignore error.
if ok, _ := storeMatches(st, r.MinTime, r.MaxTime, r.Matchers...); !ok {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s filtered out", st))
continue
}
respCh <- storepb.NewWarnSeriesResponse(err)
continue
}
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s queried", st))

seriesSet = append(seriesSet, startStreamSeriesSet(sc, respCh, 10))
}
sc, err := st.Series(gctx, r)
if err != nil {
storeID := fmt.Sprintf("%v", storepb.LabelsToString(st.Labels()))
if storeID == "" {
storeID = "Store Gateway"
}
err = errors.Wrapf(err, "fetch series for %s %s", storeID, st)
if r.PartialResponseDisabled {
level.Error(s.logger).Log("err", err, "msg", "partial response disabled; aborting request")
return err
}
respSender.send(storepb.NewWarnSeriesResponse(err))
continue
}

level.Debug(s.logger).Log("msg", strings.Join(storeDebugMsgs, ";"))
g.Go(func() error {
defer close(respCh)
// Schedule streamSeriesSet that translates gRPC streamed response into seriesSet (if series) or respCh if warnings.
seriesSet = append(seriesSet, startStreamSeriesSet(gctx, wg, sc, respSender, st.String(), !r.PartialResponseDisabled))
}

level.Debug(s.logger).Log("msg", strings.Join(storeDebugMsgs, ";"))

if len(seriesSet) == 0 {
// This is indicates that configured StoreAPIs are not the ones end user expects
err := errors.New("No store matched for this query")
level.Warn(s.logger).Log("err", err, "stores", strings.Join(storeDebugMsgs, ";"))
respCh <- storepb.NewWarnSeriesResponse(err)
respSender.send(storepb.NewWarnSeriesResponse(err))
return nil
}

mergedSet := storepb.MergeSeriesSets(seriesSet...)
for mergedSet.Next() {
var series storepb.Series
series.Labels, series.Chunks = mergedSet.At()
respCh <- storepb.NewSeriesResponse(&series)
respSender.send(storepb.NewSeriesResponse(&series))
}
return mergedSet.Err()
})

for resp := range respCh {
for resp := range respRecv {
if err := srv.Send(resp); err != nil {
return status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error())
}
Expand All @@ -171,48 +202,74 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe

}

type warnSender interface {
send(*storepb.SeriesResponse)
}

// streamSeriesSet iterates over incoming stream of series.
// All errors are sent out of band via warning channel.
type streamSeriesSet struct {
stream storepb.Store_SeriesClient
warnCh chan<- *storepb.SeriesResponse
warnCh warnSender

currSeries *storepb.Series
recvCh chan *storepb.Series

errMtx sync.Mutex
err error

name string
}

func startStreamSeriesSet(
ctx context.Context,
wg *sync.WaitGroup,
stream storepb.Store_SeriesClient,
warnCh chan<- *storepb.SeriesResponse,
bufferSize int,
warnCh warnSender,
name string,
partialResponse bool,
) *streamSeriesSet {
s := &streamSeriesSet{
stream: stream,
warnCh: warnCh,
recvCh: make(chan *storepb.Series, bufferSize),
recvCh: make(chan *storepb.Series, 10),
name: name,
}
go s.fetchLoop()
return s
}

func (s *streamSeriesSet) fetchLoop() {
defer close(s.recvCh)
for {
r, err := s.stream.Recv()
if err == io.EOF {
return
}
if err != nil {
s.warnCh <- storepb.NewWarnSeriesResponse(errors.Wrap(err, "receive series"))
return
}
wg.Add(1)
go func() {
defer wg.Done()
defer close(s.recvCh)
for {
r, err := s.stream.Recv()
if err == io.EOF {
return
}

if w := r.GetWarning(); w != "" {
s.warnCh <- storepb.NewWarnSeriesResponse(errors.New(w))
continue
if ctx.Err() != nil {
return
}

if err != nil {
if partialResponse {
s.warnCh.send(storepb.NewWarnSeriesResponse(errors.Wrap(err, "receive series")))
return
}

s.errMtx.Lock()
defer s.errMtx.Unlock()
s.err = err
return
}

if w := r.GetWarning(); w != "" {
s.warnCh.send(storepb.NewWarnSeriesResponse(errors.New(w)))
continue
}
s.recvCh <- r.GetSeries()
}
s.recvCh <- r.GetSeries()
}
}()
return s
}

// Next blocks until new message is received or stream is closed.
Expand All @@ -228,7 +285,9 @@ func (s *streamSeriesSet) At() ([]storepb.Label, []storepb.AggrChunk) {
return s.currSeries.Labels, s.currSeries.Chunks
}
func (s *streamSeriesSet) Err() error {
return nil
s.errMtx.Lock()
defer s.errMtx.Unlock()
return errors.Wrap(s.err, s.name)
}

// matchStore returns true if the given store may hold data for the given label matchers.
Expand Down
6 changes: 3 additions & 3 deletions pkg/store/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,14 +500,13 @@ func TestProxyStore_Series_RegressionFillResponseChannel(t *testing.T) {
ctx := context.Background()
s := newStoreSeriesServer(ctx)

err := q.Series(
testutil.Ok(t, q.Series(
&storepb.SeriesRequest{
MinTime: 1,
MaxTime: 300,
Matchers: []storepb.LabelMatcher{{Name: "fed", Value: "a", Type: storepb.LabelMatcher_EQ}},
}, s,
)
testutil.Ok(t, err)
))
testutil.Equals(t, 0, len(s.SeriesSet))
testutil.Equals(t, 110, len(s.Warnings))
}
Expand Down Expand Up @@ -658,6 +657,7 @@ func TestStoreMatches(t *testing.T) {
type storeSeriesServer struct {
// This field just exist to pseudo-implement the unused methods of the interface.
storepb.Store_SeriesServer

ctx context.Context

SeriesSet []storepb.Series
Expand Down

0 comments on commit 705d6a2

Please sign in to comment.