Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve store timeouts #1789

Merged
merged 5 commits into from
Feb 18, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 69 additions & 58 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,12 +198,7 @@ func newRespCh(ctx context.Context, buffer int) (*ctxRespSender, <-chan *storepb
}

func (s ctxRespSender) send(r *storepb.SeriesResponse) {
select {
case <-s.ctx.Done():
return
case s.ch <- r:
return
}
s.ch <- r
}

// Series returns all series for a requested time range and label matcher. Requested series are taken from other
Expand Down Expand Up @@ -348,6 +343,21 @@ type streamSeriesSet struct {
closeSeries context.CancelFunc
}

type recvResponse struct {
r *storepb.SeriesResponse
err error
}

func frameCtx(responseTimeout time.Duration) (context.Context, context.CancelFunc) {
frameTimeoutCtx := context.Background()
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
var cancel context.CancelFunc
if responseTimeout != 0 {
frameTimeoutCtx, cancel = context.WithTimeout(frameTimeoutCtx, responseTimeout)
return frameTimeoutCtx, cancel
}
return frameTimeoutCtx, func() {}
}

func startStreamSeriesSet(
ctx context.Context,
logger log.Logger,
Expand Down Expand Up @@ -383,78 +393,79 @@ func startStreamSeriesSet(
emptyStreamResponses.Inc()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't we want to increment this if the context was actually cancelled.. right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, therefore numResponses++ only on recv processed.

}
}()

rCh := make(chan *recvResponse)
recvCancel := make(chan bool)
povilasv marked this conversation as resolved.
Show resolved Hide resolved
go func() {
for {
r, err := s.stream.Recv()
select {
case <-recvCancel:
close(rCh)
return
case rCh <- &recvResponse{r: r, err: err}:
}
}
}()
for {
r, err := s.stream.Recv()
frameTimeoutCtx, cancel := frameCtx(s.responseTimeout)
defer cancel()
var rr *recvResponse

if err == io.EOF {
var err error
select {
case <-ctx.Done():
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
close(recvCancel)
err = errors.Wrap(ctx.Err(), fmt.Sprintf("failed to receive any data from %s", s.name))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why separate err variable?

also can we use Wrapf instead of sprintf?

s.handleErr(err)
return
case <-frameTimeoutCtx.Done():
close(recvCancel)
err = errors.Wrap(frameTimeoutCtx.Err(), fmt.Sprintf("failed to receive any data in %s from %s", s.responseTimeout.String(), s.name))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

s.handleErr(err)
return
case rr = <-rCh:
}

if err != nil {
wrapErr := errors.Wrapf(err, "receive series from %s", s.name)
if partialResponse {
s.warnCh.send(storepb.NewWarnSeriesResponse(wrapErr))
return
}

s.errMtx.Lock()
s.err = wrapErr
s.errMtx.Unlock()
if rr.err == io.EOF {
close(recvCancel)
return
}

if rr.err != nil {
wrapErr := errors.Wrapf(rr.err, "receive series from %s", s.name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why another var? (: We can inline this.. not a blocker though.

s.handleErr(wrapErr)
close(recvCancel)
return
}
numResponses++

if w := r.GetWarning(); w != "" {
if w := rr.r.GetWarning(); w != "" {
s.warnCh.send(storepb.NewWarnSeriesResponse(errors.New(w)))
continue
}

select {
case s.recvCh <- r.GetSeries():
continue
case <-ctx.Done():
return
}

s.recvCh <- rr.r.GetSeries()
}
}()
return s
}

// Next blocks until new message is received or stream is closed or operation is timed out.
func (s *streamSeriesSet) Next() (ok bool) {
ctx := s.ctx
timeoutMsg := fmt.Sprintf("failed to receive any data from %s", s.name)

if s.responseTimeout != 0 {
timeoutMsg = fmt.Sprintf("failed to receive any data in %s from %s", s.responseTimeout.String(), s.name)

timeoutCtx, done := context.WithTimeout(s.ctx, s.responseTimeout)
defer done()
ctx = timeoutCtx
func (s *streamSeriesSet) handleErr(err error) {
s.closeSeries()
if s.partialResponse {
level.Warn(s.logger).Log("err", err, "msg", "returning partial response")
s.warnCh.send(storepb.NewWarnSeriesResponse(err))
return
}
s.errMtx.Lock()
s.err = err
s.errMtx.Unlock()
}

select {
case s.currSeries, ok = <-s.recvCh:
return ok
case <-ctx.Done():
// closeSeries to shutdown a goroutine in startStreamSeriesSet.
s.closeSeries()

err := errors.Wrap(ctx.Err(), timeoutMsg)
if s.partialResponse {
level.Warn(s.logger).Log("err", err, "msg", "returning partial response")
s.warnCh.send(storepb.NewWarnSeriesResponse(err))
return false
}
s.errMtx.Lock()
s.err = err
s.errMtx.Unlock()

level.Warn(s.logger).Log("err", err, "msg", "partial response disabled; aborting request")
return false
}
// Next blocks until new message is received or stream is closed or operation is timed out.
func (s *streamSeriesSet) Next() (ok bool) {
s.currSeries, ok = <-s.recvCh
return ok
}

func (s *streamSeriesSet) At() ([]storepb.Label, []storepb.AggrChunk) {
Expand Down
Loading