Skip to content

Commit

Permalink
Propagate errors from store-gateway to querier when hitting concurren…
Browse files Browse the repository at this point in the history
…cy limit

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
  • Loading branch information
dimitarvdimitrov committed May 3, 2024
1 parent 68a0024 commit ed7012c
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 18 deletions.
30 changes: 30 additions & 0 deletions pkg/querier/blocks_store_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"go.uber.org/atomic"
"golang.org/x/exp/slices"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

Expand All @@ -54,6 +55,7 @@ import (
"github.com/grafana/mimir/pkg/storegateway/storegatewaypb"
"github.com/grafana/mimir/pkg/storegateway/storepb"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/globalerror"
"github.com/grafana/mimir/pkg/util/limiter"
"github.com/grafana/mimir/pkg/util/test"
)
Expand Down Expand Up @@ -3286,6 +3288,34 @@ func TestShouldStopQueryFunc(t *testing.T) {
err: errors.New("test"),
expected: false,
},
"should not stop query on store-gateway instance limit": {
err: globalerror.NewErrorWithGRPCStatus(errors.New("instance limit"), codes.Aborted, &mimirpb.ErrorDetails{Cause: mimirpb.INSTANCE_LIMIT}),
expected: false,
},
"should not stop query on store-gateway instance limit; shouldn't look at the gRPC code, only Mimir error cause": {
err: globalerror.NewErrorWithGRPCStatus(errors.New("instance limit"), codes.Internal, &mimirpb.ErrorDetails{Cause: mimirpb.INSTANCE_LIMIT}),
expected: false,
},
"should not stop query on any other mimirpb error": {
err: globalerror.NewErrorWithGRPCStatus(errors.New("instance limit"), codes.Internal, &mimirpb.ErrorDetails{Cause: mimirpb.TOO_BUSY}),
expected: false,
},
"should not stop query on any unknown error detail": {
err: func() error {
st, createErr := status.New(codes.Internal, "test").WithDetails(&hintspb.Block{Id: "123"})
require.NoError(t, createErr)
return st.Err()
}(),
expected: false,
},
"should not stop query on multiple error details": {
err: func() error {
st, createErr := status.New(codes.Internal, "test").WithDetails(&hintspb.Block{Id: "123"}, &mimirpb.ErrorDetails{Cause: mimirpb.INSTANCE_LIMIT})
require.NoError(t, createErr)
return st.Err()
}(),
expected: false,
},
}

for testName, testData := range tests {
Expand Down
44 changes: 27 additions & 17 deletions pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/grafana/mimir/pkg/storegateway/storegatewaypb"
"github.com/grafana/mimir/pkg/storegateway/storepb"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/globalerror"
"github.com/grafana/mimir/pkg/util/pool"
"github.com/grafana/mimir/pkg/util/spanlogger"
)
Expand Down Expand Up @@ -546,18 +547,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.Stor
// We don't do the streaming call if we are not requesting the chunks.
req.StreamingChunksBatchSize = 0
}
defer func() {
if err == nil {
return
}
code := codes.Internal
if st, ok := grpcutil.ErrorToStatus(err); ok {
code = st.Code()
} else if errors.Is(err, context.Canceled) {
code = codes.Canceled
}
err = status.Error(code, err.Error())
}()
defer func() { err = mapSeriesError(err) }()

matchers, err := storepb.MatchersToPromMatchers(req.Matchers...)
if err != nil {
Expand Down Expand Up @@ -611,11 +601,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.Stor
// but sometimes it can take minutes if the block isn't loaded and there is a surge in queries for unloaded blocks.
done, err := s.limitConcurrentQueries(ctx, stats)
if err != nil {
if errors.Is(err, errGateTimeout) {
// If the gate timed out, then we behave in the same way as if the blocks aren't discovered yet.
// The querier will try the blocks again on a different store-gateway replica.
return nil
}
return err
}
defer done()
Expand Down Expand Up @@ -714,6 +699,31 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.Stor
return nil
}

func mapSeriesError(err error) error {
if err == nil {
return err
}

var stGwErr storeGatewayError
switch {
case errors.As(err, &stGwErr):
switch cause := stGwErr.errorCause(); cause {
case mimirpb.INSTANCE_LIMIT:
return globalerror.NewErrorWithGRPCStatus(stGwErr, codes.Unavailable, &mimirpb.ErrorDetails{Cause: cause})
default:
return globalerror.NewErrorWithGRPCStatus(stGwErr, codes.Internal, &mimirpb.ErrorDetails{Cause: cause})
}
default:
code := codes.Internal
if st, ok := grpcutil.ErrorToStatus(err); ok {
code = st.Code()
} else if errors.Is(err, context.Canceled) {
code = codes.Canceled
}
return status.Error(code, err.Error())
}
}

func (s *BucketStore) recordRequestAmbientTime(stats *safeQueryStats, requestStart time.Time) {
stats.update(func(stats *queryStats) {
stats.streamingSeriesAmbientTime += time.Since(requestStart)
Expand Down
3 changes: 2 additions & 1 deletion pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/thanos-io/objstore"
"google.golang.org/grpc/metadata"

"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/storage/bucket"
"github.com/grafana/mimir/pkg/storage/tsdb"
"github.com/grafana/mimir/pkg/storage/tsdb/block"
Expand Down Expand Up @@ -430,7 +431,7 @@ type timeoutGate struct {
timeout time.Duration
}

var errGateTimeout = errors.New("concurrency gate waiting timeout")
var errGateTimeout = staticError{cause: mimirpb.INSTANCE_LIMIT, msg: "timeout waiting for concurrency gate"}

func (t timeoutGate) Start(ctx context.Context) error {
if t.timeout == 0 {
Expand Down
21 changes: 21 additions & 0 deletions pkg/storegateway/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package storegateway

import "github.com/grafana/mimir/pkg/mimirpb"

type storeGatewayError interface {
error
errorCause() mimirpb.ErrorCause
}

type staticError struct {
cause mimirpb.ErrorCause
msg string
}

func (s staticError) Error() string {
return s.msg
}

func (s staticError) errorCause() mimirpb.ErrorCause {
return s.cause
}

0 comments on commit ed7012c

Please sign in to comment.