Skip to content

Commit

Permalink
proxy: Make sure all go routines are schedules using single errgroup.
Browse files Browse the repository at this point in the history
This fixes potential race condition. Bit more efficient version of #744

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Jan 17, 2019
1 parent d2078ff commit 3d8ccb1
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 69 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel

### Fixed

- [#745](https://github.com/improbable-eng/thanos/pull/745) - Fixed race conditions and edge cases for Thanos Querier fanout logic.
- [#649](https://github.com/improbable-eng/thanos/issues/649) - Fixed store label values api to add also external label values.
- [#708](https://github.com/improbable-eng/thanos/issues/708) - `"X-Amz-Acl": "bucket-owner-full-control"` metadata for s3 upload operation is no longer set by default which was breaking some providers handled by minio client.

Expand Down
191 changes: 125 additions & 66 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,25 @@ func (s *ProxyStore) Info(ctx context.Context, r *storepb.InfoRequest) (*storepb
return res, nil
}

type ctxRespSender struct {
ctx context.Context
ch chan<- *storepb.SeriesResponse
}

func newRespCh(ctx context.Context, buffer int) (*ctxRespSender, <-chan *storepb.SeriesResponse, func()) {
respCh := make(chan *storepb.SeriesResponse, buffer)
return &ctxRespSender{ctx: ctx, ch: respCh}, respCh, func() { close(respCh) }
}

func (s ctxRespSender) send(r *storepb.SeriesResponse) {
select {
case <-s.ctx.Done():
return
case s.ch <- r:
return
}
}

// Series returns all series for a requested time range and label matcher. Requested series are taken from other
// stores and proxied to RPC client. NOTE: Resulted data are not trimmed exactly to min and max time range.
func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
Expand All @@ -94,70 +113,82 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
}

var (
seriesSet []storepb.SeriesSet
respCh = make(chan *storepb.SeriesResponse, len(stores)+1)
g, gctx = errgroup.WithContext(srv.Context())
)
g, gctx = errgroup.WithContext(srv.Context())

var storeDebugMsgs []string
// Allow to buffer max 10 series response.
// Each might be quite large (multi chunk long series given by sidecar).
respSender, respRecv, closeFn = newRespCh(gctx, 10)
)

for _, st := range stores {
// We might be able to skip the store if its meta information indicates
// it cannot have series matching our query.
// NOTE: all matchers are validated in labelsMatches method so we explicitly ignore error.
if ok, _ := storeMatches(st, r.MinTime, r.MaxTime, newMatchers...); !ok {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s filtered out", st))
continue
}
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s queried", st))

sc, err := st.Series(gctx, &storepb.SeriesRequest{
MinTime: r.MinTime,
MaxTime: r.MaxTime,
Matchers: newMatchers,
Aggregates: r.Aggregates,
MaxResolutionWindow: r.MaxResolutionWindow,
PartialResponseDisabled: r.PartialResponseDisabled,
})
if err != nil {
storeID := fmt.Sprintf("%v", storepb.LabelsToString(st.Labels()))
if storeID == "" {
storeID = "Store Gateway"
g.Go(func() error {
var (
seriesSet []storepb.SeriesSet
storeDebugMsgs []string
r = &storepb.SeriesRequest{
MinTime: r.MinTime,
MaxTime: r.MaxTime,
Matchers: newMatchers,
Aggregates: r.Aggregates,
MaxResolutionWindow: r.MaxResolutionWindow,
PartialResponseDisabled: r.PartialResponseDisabled,
}
err = errors.Wrapf(err, "fetch series for %s %s", storeID, st)
if r.PartialResponseDisabled {
level.Error(s.logger).Log("err", err, "msg", "partial response disabled; aborting request")
return err
wg = &sync.WaitGroup{}
)

defer func() {
wg.Wait()
closeFn()
}()

for _, st := range stores {
// We might be able to skip the store if its meta information indicates
// it cannot have series matching our query.
// NOTE: all matchers are validated in labelsMatches method so we explicitly ignore error.
if ok, _ := storeMatches(st, r.MinTime, r.MaxTime, r.Matchers...); !ok {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s filtered out", st))
continue
}
respCh <- storepb.NewWarnSeriesResponse(err)
continue
}
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s queried", st))

seriesSet = append(seriesSet, startStreamSeriesSet(sc, respCh, 10))
}
sc, err := st.Series(gctx, r)
if err != nil {
storeID := fmt.Sprintf("%v", storepb.LabelsToString(st.Labels()))
if storeID == "" {
storeID = "Store Gateway"
}
err = errors.Wrapf(err, "fetch series for %s %s", storeID, st)
if r.PartialResponseDisabled {
level.Error(s.logger).Log("err", err, "msg", "partial response disabled; aborting request")
return err
}
respSender.send(storepb.NewWarnSeriesResponse(err))
continue
}

level.Debug(s.logger).Log("msg", strings.Join(storeDebugMsgs, ";"))
g.Go(func() error {
defer close(respCh)
// Schedule streamSeriesSet that translates gRPC streamed response into seriesSet (if series) or respCh if warnings.
seriesSet = append(seriesSet, startStreamSeriesSet(gctx, wg, sc, respSender, st.String(), !r.PartialResponseDisabled))
}

level.Debug(s.logger).Log("msg", strings.Join(storeDebugMsgs, ";"))

if len(seriesSet) == 0 {
// This is indicates that configured StoreAPIs are not the ones end user expects
err := errors.New("No store matched for this query")
level.Warn(s.logger).Log("err", err, "stores", strings.Join(storeDebugMsgs, ";"))
respCh <- storepb.NewWarnSeriesResponse(err)
respSender.send(storepb.NewWarnSeriesResponse(err))
return nil
}

mergedSet := storepb.MergeSeriesSets(seriesSet...)
for mergedSet.Next() {
var series storepb.Series
series.Labels, series.Chunks = mergedSet.At()
respCh <- storepb.NewSeriesResponse(&series)
respSender.send(storepb.NewSeriesResponse(&series))
}
return mergedSet.Err()
})

for resp := range respCh {
for resp := range respRecv {
if err := srv.Send(resp); err != nil {
return status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error())
}
Expand All @@ -171,48 +202,74 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe

}

type warnSender interface {
send(*storepb.SeriesResponse)
}

// streamSeriesSet iterates over incoming stream of series.
// All errors are sent out of band via warning channel.
type streamSeriesSet struct {
stream storepb.Store_SeriesClient
warnCh chan<- *storepb.SeriesResponse
warnCh warnSender

currSeries *storepb.Series
recvCh chan *storepb.Series

errMtx sync.Mutex
err error

name string
}

func startStreamSeriesSet(
ctx context.Context,
wg *sync.WaitGroup,
stream storepb.Store_SeriesClient,
warnCh chan<- *storepb.SeriesResponse,
bufferSize int,
warnCh warnSender,
name string,
partialResponse bool,
) *streamSeriesSet {
s := &streamSeriesSet{
stream: stream,
warnCh: warnCh,
recvCh: make(chan *storepb.Series, bufferSize),
recvCh: make(chan *storepb.Series, 10),
name: name,
}
go s.fetchLoop()
return s
}

func (s *streamSeriesSet) fetchLoop() {
defer close(s.recvCh)
for {
r, err := s.stream.Recv()
if err == io.EOF {
return
}
if err != nil {
s.warnCh <- storepb.NewWarnSeriesResponse(errors.Wrap(err, "receive series"))
return
}
wg.Add(1)
go func() {
defer wg.Done()
defer close(s.recvCh)
for {
r, err := s.stream.Recv()
if err == io.EOF {
return
}

if w := r.GetWarning(); w != "" {
s.warnCh <- storepb.NewWarnSeriesResponse(errors.New(w))
continue
if ctx.Err() != nil {
return
}

if err != nil {
if partialResponse {
s.warnCh.send(storepb.NewWarnSeriesResponse(errors.Wrap(err, "receive series")))
return
}

s.errMtx.Lock()
defer s.errMtx.Unlock()
s.err = err
return
}

if w := r.GetWarning(); w != "" {
s.warnCh.send(storepb.NewWarnSeriesResponse(errors.New(w)))
continue
}
s.recvCh <- r.GetSeries()
}
s.recvCh <- r.GetSeries()
}
}()
return s
}

// Next blocks until new message is received or stream is closed.
Expand All @@ -228,7 +285,9 @@ func (s *streamSeriesSet) At() ([]storepb.Label, []storepb.AggrChunk) {
return s.currSeries.Labels, s.currSeries.Chunks
}
func (s *streamSeriesSet) Err() error {
return nil
s.errMtx.Lock()
defer s.errMtx.Unlock()
return errors.Wrap(s.err, s.name)
}

// matchStore returns true if the given store may hold data for the given label matchers.
Expand Down
6 changes: 3 additions & 3 deletions pkg/store/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,14 +500,13 @@ func TestProxyStore_Series_RegressionFillResponseChannel(t *testing.T) {
ctx := context.Background()
s := newStoreSeriesServer(ctx)

err := q.Series(
testutil.Ok(t, q.Series(
&storepb.SeriesRequest{
MinTime: 1,
MaxTime: 300,
Matchers: []storepb.LabelMatcher{{Name: "fed", Value: "a", Type: storepb.LabelMatcher_EQ}},
}, s,
)
testutil.Ok(t, err)
))
testutil.Equals(t, 0, len(s.SeriesSet))
testutil.Equals(t, 110, len(s.Warnings))
}
Expand Down Expand Up @@ -658,6 +657,7 @@ func TestStoreMatches(t *testing.T) {
type storeSeriesServer struct {
// This field just exist to pseudo-implement the unused methods of the interface.
storepb.Store_SeriesServer

ctx context.Context

SeriesSet []storepb.Series
Expand Down

0 comments on commit 3d8ccb1

Please sign in to comment.