Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure that ingester returns safe errors #6019

Merged
merged 6 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/ingester/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,17 @@ import (
"github.com/grafana/mimir/pkg/util/validation"
)

const (
unavailable = int(codes.Unavailable)
duricanikolic marked this conversation as resolved.
Show resolved Hide resolved
)

var (
// This is the closest fitting Prometheus API error code for requests rejected due to limiting.
tooBusyError = httpgrpc.Errorf(http.StatusServiceUnavailable,
duricanikolic marked this conversation as resolved.
Show resolved Hide resolved
"the ingester is currently too busy to process queries, try again later")

errMaxSeriesPerMetricLimitExceeded = safeToWrapError("per-metric series limit exceeded")
errMaxSeriesPerUserLimitExceeded = safeToWrapError("per-user series limit exceeded")
)

type safeToWrap interface {
Expand Down
18 changes: 10 additions & 8 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,10 +809,11 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, pushReq *push.Request) (

db, err := i.getOrCreateTSDB(userID, false)
if err != nil {
// Check for a particular per-instance limit and return that error directly
// since it contains extra information for gRPC and our logging middleware.
if errors.Is(err, errMaxTenantsReached) {
return nil, err
// If this is a safe error, we wrap it with userID and return it, because
// it might contain extra information for gRPC and our logging middleware.
duricanikolic marked this conversation as resolved.
Show resolved Hide resolved
var safe safeToWrap
if errors.As(err, &safe) {
return nil, wrapWithUser(err, userID)
}
return nil, annotateWithUser(err, userID)
}
Expand Down Expand Up @@ -859,10 +860,11 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, pushReq *push.Request) (
level.Warn(i.logger).Log("msg", "failed to rollback appender on error", "user", userID, "err", err)
}

// Check for a particular per-instance limit and return that error directly
// since it contains extra information for gRPC and our logging middleware.
if errors.Is(err, errMaxInMemorySeriesReached) {
return nil, err
// If this is a safe error, we wrap it with userID and return it, because
// it might contain extra information for gRPC and our logging middleware.
var safe safeToWrap
if errors.As(err, &safe) {
return nil, wrapWithUser(err, userID)
}
return nil, annotateWithUser(err, userID)
duricanikolic marked this conversation as resolved.
Show resolved Hide resolved
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5868,6 +5868,8 @@ func TestIngester_PushInstanceLimits(t *testing.T) {
assert.ErrorIs(t, err, testData.expectedErr)
var optional middleware.OptionalLogging
assert.ErrorAs(t, err, &optional)
var safe safeToWrap
assert.ErrorAs(t, err, &safe)
s, ok := status.FromError(err)
require.True(t, ok, "expected to be able to convert to gRPC status")
assert.Equal(t, codes.Unavailable, s.Code())
Expand Down Expand Up @@ -5998,10 +6000,13 @@ func TestIngester_inflightPushRequests(t *testing.T) {

time.Sleep(10 * time.Millisecond) // Give first goroutine a chance to start pushing...
req := generateSamplesForLabel(labels.FromStrings(labels.MetricName, "testcase"), 1, 1024)
var optional middleware.OptionalLogging

_, err := i.Push(ctx, req)
require.ErrorIs(t, err, errMaxInflightRequestsReached)
var safe safeToWrap
require.ErrorAs(t, err, &safe)

var optional middleware.OptionalLogging
require.ErrorAs(t, err, &optional)
require.False(t, optional.ShouldLog(ctx, time.Duration(0)), "expected not to log via .ShouldLog()")

Expand Down
32 changes: 5 additions & 27 deletions pkg/ingester/instance_limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,12 @@
package ingester

import (
"context"
"flag"
"time"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"gopkg.in/yaml.v3"

"github.com/grafana/mimir/pkg/util/globalerror"
"github.com/grafana/mimir/pkg/util/log"
)

const (
Expand All @@ -32,31 +29,12 @@ var (
errMaxInflightRequestsReached = newInstanceLimitError(globalerror.IngesterMaxInflightPushRequests.MessageWithPerInstanceLimitConfig("the write request has been rejected because the ingester exceeded the allowed number of inflight push requests", maxInflightPushRequestsFlag))
)

type instanceLimitErr struct {
msg string
status *status.Status
}

func newInstanceLimitError(msg string) error {
return &instanceLimitErr{
return newErrorWithStatus(
log.DoNotLogError{Err: safeToWrapError(msg)},
// Errors from hitting per-instance limits are always "unavailable" for gRPC
status: status.New(codes.Unavailable, msg),
msg: msg,
}
}

func (e *instanceLimitErr) ShouldLog(context.Context, time.Duration) bool {
// We increment metrics when hitting per-instance limits and so there's no need to
// log them, the error doesn't contain any interesting information for us.
return false
}

func (e *instanceLimitErr) GRPCStatus() *status.Status {
return e.status
}

func (e *instanceLimitErr) Error() string {
return e.msg
unavailable,
)
}

// InstanceLimits describes limits used by ingester. Reaching any of these will result in Push method to return
Expand Down
40 changes: 19 additions & 21 deletions pkg/ingester/instance_limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package ingester

import (
"context"
"fmt"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -42,29 +41,28 @@ max_tenants: 50000
}

func TestInstanceLimitErr(t *testing.T) {
t.Run("bare error implements ShouldLog()", func(t *testing.T) {
var optional middleware.OptionalLogging
require.ErrorAs(t, errMaxInflightRequestsReached, &optional)
require.False(t, optional.ShouldLog(context.Background(), time.Duration(0)))
})
userID := "1"
limitErrors := []error{
errMaxIngestionRateReached,
wrapWithUser(errMaxIngestionRateReached, userID),
errMaxTenantsReached,
wrapWithUser(errMaxTenantsReached, userID),
errMaxInMemorySeriesReached,
wrapWithUser(errMaxInMemorySeriesReached, userID),
errMaxInflightRequestsReached,
wrapWithUser(errMaxInflightRequestsReached, userID),
}
for _, limitError := range limitErrors {
var safe safeToWrap
require.ErrorAs(t, limitError, &safe)

t.Run("wrapped error implements ShouldLog()", func(t *testing.T) {
err := fmt.Errorf("%w: oh no", errMaxTenantsReached)
var optional middleware.OptionalLogging
require.ErrorAs(t, err, &optional)
require.ErrorAs(t, limitError, &optional)
require.False(t, optional.ShouldLog(context.Background(), time.Duration(0)))
})

t.Run("bare error implements GRPCStatus()", func(t *testing.T) {
s, ok := status.FromError(errMaxInMemorySeriesReached)
stat, ok := status.FromError(limitError)
require.True(t, ok, "expected to be able to convert to gRPC status")
require.Equal(t, codes.Unavailable, s.Code())
})

t.Run("wrapped error implements GRPCStatus()", func(t *testing.T) {
err := fmt.Errorf("%w: oh no", errMaxIngestionRateReached)
s, ok := status.FromError(err)
require.True(t, ok, "expected to be able to convert to gRPC status")
require.Equal(t, codes.Unavailable, s.Code())
})
require.Equal(t, codes.Unavailable, stat.Code())
require.ErrorContains(t, stat.Err(), limitError.Error())
}
}
8 changes: 0 additions & 8 deletions pkg/ingester/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,11 @@ package ingester
import (
"math"

"github.com/pkg/errors"

"github.com/grafana/mimir/pkg/util"
util_math "github.com/grafana/mimir/pkg/util/math"
"github.com/grafana/mimir/pkg/util/validation"
)

var (
// These errors are only internal, to change the API error messages, see Limiter's methods below.
errMaxSeriesPerMetricLimitExceeded = errors.New("per-metric series limit exceeded")
errMaxSeriesPerUserLimitExceeded = errors.New("per-user series limit exceeded")
)

// RingCount is the interface exposed by a ring implementation which allows
// to count members
type RingCount interface {
Expand Down
Loading