Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributor: improve error handling in otlp and push handler #8339

Merged
merged 9 commits into from
Jun 12, 2024
8 changes: 2 additions & 6 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@
* [ENHANCEMENT] OTLP: Speed up conversion from OTel to Mimir format by about 8% and reduce memory consumption by about 30%. Can be disabled via `-distributor.direct-otlp-translation-enabled=false` #7957
* [ENHANCEMENT] Ingester/Querier: Optimise regexps with long lists of alternates. #8221, #8234
* [ENHANCEMENT] Ingester: Include more detail in tracing of queries. #8242
* [EHNAHCEMENT] Distributor: add `insight=true` to remote-write and OTLP write handlers when the HTTP response status code is 4xx. #8294
* [ENHANCEMENT] Distributor: add `insight=true` to remote-write and OTLP write handlers when the HTTP response status code is 4xx. #8294
* [ENHANCEMENT] Ingester: reduce locked time while matching postings for a label, improving the write latency and compaction speed. #8327
* [ENHANCEMENT] Ingester: reduce the amount of locks taken during the Head compaction's garbage-collection process, improving the write latency and compaction speed. #8327
* [ENHANCEMENT] Query-frontend: log the start, end time and matchers for remote read requests to the query stats logs. #8326
* [BUGFIX] Distributor: prometheus retry on 5xx and 429 errors, while otlp collector only retry on 429, 502, 503 and 504, mapping other 5xx errors to the retryable ones in otlp endpoint. #8324
* [BUGFIX] Distributor: prometheus retry on 5xx and 429 errors, while otlp collector only retry on 429, 502, 503 and 504, mapping other 5xx errors to the retryable ones in otlp endpoint. #8324 #8339
* [BUGFIX] Distributor: make OTLP endpoint return marshalled proto bytes as response body for 4xx/5xx errors. #8227
* [BUGFIX] Rules: improve error handling when querier is local to the ruler. #7567
* [BUGFIX] Querier, store-gateway: Protect against panics raised during snappy encoding. #7520
Expand Down Expand Up @@ -428,10 +428,6 @@
* [CHANGE] Ingester: changed the default value for the experimental configuration parameter `-blocks-storage.tsdb.early-head-compaction-min-estimated-series-reduction-percentage` from 10 to 15. #6186
* [CHANGE] Ingester: `/ingester/push` HTTP endpoint has been removed. This endpoint was added for testing and troubleshooting, but was never documented or used for anything. #6299
* [CHANGE] Experimental setting `-log.rate-limit-logs-per-second-burst` renamed to `-log.rate-limit-logs-burst-size`. #6230
* [CHANGE] Distributor: instead of errors with HTTP status codes, `Push()` now returns errors with gRPC codes: #6377
* `http.StatusAccepted` (202) code is replaced with `codes.AlreadyExists`.
* `http.BadRequest` (400) code is replaced with `codes.FailedPrecondition`.
* `http.StatusTooManyRequests` (429) and the non-standard `529` (The service is overloaded) codes are replaced with `codes.ResourceExhausted`.
* [CHANGE] Ingester: by setting the newly introduced experimental CLI flag `-ingester.return-only-grpc-errors` to true, ingester will return only gRPC errors. This feature changes the following status codes: #6443 #6680 #6723
* `http.StatusBadRequest` (400) is replaced with `codes.FailedPrecondition` on the write path.
* `http.StatusServiceUnavailable` (503) is replaced with `codes.Internal` on the write path, and with `codes.ResourceExhausted` on the read path.
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1414,7 +1414,7 @@ func (d *Distributor) handlePushError(ctx context.Context, pushErr error) error
if err == nil {
serviceOverloadErrorEnabled = d.limits.ServiceOverloadStatusCodeOnRateLimitEnabled(userID)
}
return toGRPCError(pushErr, serviceOverloadErrorEnabled)
return toErrorWithGRPCStatus(pushErr, serviceOverloadErrorEnabled)
}

// push takes a write request and distributes it to ingesters using the ring.
Expand Down
7 changes: 5 additions & 2 deletions pkg/distributor/distributor_ingest_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,9 @@ func TestDistributor_Push_ShouldReturnErrorMappedTo4xxStatusCodeIfWriteRequestCo
limits := prepareDefaultLimits()
limits.MaxLabelValueLength = hugeLabelValueLength

overrides, err := validation.NewOverrides(*limits, nil)
require.NoError(t, err)

testConfig := prepConfig{
numDistributors: 1,
ingestStorageEnabled: true,
Expand All @@ -305,7 +308,7 @@ func TestDistributor_Push_ShouldReturnErrorMappedTo4xxStatusCodeIfWriteRequestCo
// We expect a gRPC error.
errStatus, ok := grpcutil.ErrorToStatus(err)
require.True(t, ok)
assert.Equal(t, codes.FailedPrecondition, errStatus.Code())
assert.Equal(t, codes.InvalidArgument, errStatus.Code())
assert.ErrorContains(t, errStatus.Err(), ingest.ErrWriteRequestDataItemTooLarge.Error())

// We expect the gRPC error to be detected as client error.
Expand All @@ -321,7 +324,7 @@ func TestDistributor_Push_ShouldReturnErrorMappedTo4xxStatusCodeIfWriteRequestCo
sourceIPs, _ := middleware.NewSourceIPs("SomeField", "(.*)", false)

// Send write request through the HTTP handler.
h := Handler(maxRecvMsgSize, nil, sourceIPs, false, nil, RetryConfig{}, distributors[0].PushWithMiddlewares, nil, log.NewNopLogger())
h := Handler(maxRecvMsgSize, nil, sourceIPs, false, overrides, RetryConfig{}, distributors[0].PushWithMiddlewares, nil, log.NewNopLogger())
h.ServeHTTP(resp, createRequest(t, marshalledReq))
assert.Equal(t, http.StatusBadRequest, resp.Code)
})
Expand Down
26 changes: 13 additions & 13 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1005,7 +1005,7 @@ func TestDistributor_PushHAInstances(t *testing.T) {
testReplica: "instance1234567890123456789012345678901234567890",
cluster: "cluster0",
samples: 5,
expectedError: status.New(codes.FailedPrecondition, fmt.Sprintf(labelValueTooLongMsgFormat, "__replica__", "instance1234567890123456789012345678901234567890", mimirpb.FromLabelAdaptersToString(labelSetGenWithReplicaAndCluster("instance1234567890123456789012345678901234567890", "cluster0")(0)))),
expectedError: status.New(codes.InvalidArgument, fmt.Sprintf(labelValueTooLongMsgFormat, "__replica__", "instance1234567890123456789012345678901234567890", mimirpb.FromLabelAdaptersToString(labelSetGenWithReplicaAndCluster("instance1234567890123456789012345678901234567890", "cluster0")(0)))),
expectedDetails: &mimirpb.ErrorDetails{Cause: mimirpb.BAD_DATA},
},
} {
Expand Down Expand Up @@ -1528,14 +1528,14 @@ func TestDistributor_Push_HistogramValidation(t *testing.T) {
},
"too new histogram": {
req: makeWriteRequestHistogram([]string{model.MetricNameLabel, "test"}, math.MaxInt64, generateTestHistogram(0)),
expectedErr: status.New(codes.FailedPrecondition, fmt.Sprintf(sampleTimestampTooNewMsgFormat, math.MaxInt64, "test")),
expectedErr: status.New(codes.InvalidArgument, fmt.Sprintf(sampleTimestampTooNewMsgFormat, math.MaxInt64, "test")),
},
"valid float histogram": {
req: makeWriteRequestFloatHistogram([]string{model.MetricNameLabel, "test"}, 1000, generateTestFloatHistogram(0)),
},
"too new float histogram": {
req: makeWriteRequestFloatHistogram([]string{model.MetricNameLabel, "test"}, math.MaxInt64, generateTestFloatHistogram(0)),
expectedErr: status.New(codes.FailedPrecondition, fmt.Sprintf(sampleTimestampTooNewMsgFormat, math.MaxInt64, "test")),
expectedErr: status.New(codes.InvalidArgument, fmt.Sprintf(sampleTimestampTooNewMsgFormat, math.MaxInt64, "test")),
},
"buckets at limit": {
req: makeWriteRequestFloatHistogram([]string{model.MetricNameLabel, "test"}, 1000, testHistogram),
Expand All @@ -1546,7 +1546,7 @@ func TestDistributor_Push_HistogramValidation(t *testing.T) {
bucketLimit: 7,
errMsg: "received a native histogram sample with too many buckets, timestamp",
errID: globalerror.MaxNativeHistogramBuckets,
expectedErr: status.New(codes.FailedPrecondition, fmt.Sprintf(maxNativeHistogramBucketsMsgFormat, 1000, "test", 8, 7)),
expectedErr: status.New(codes.InvalidArgument, fmt.Sprintf(maxNativeHistogramBucketsMsgFormat, 1000, "test", 8, 7)),
},
}

Expand Down Expand Up @@ -6797,7 +6797,7 @@ func TestDistributorValidation(t *testing.T) {
TimestampMs: int64(future),
Value: 4,
}},
expectedErr: status.New(codes.FailedPrecondition, fmt.Sprintf(sampleTimestampTooNewMsgFormat, future, "testmetric")),
expectedErr: status.New(codes.InvalidArgument, fmt.Sprintf(sampleTimestampTooNewMsgFormat, future, "testmetric")),
},

"validation does not fail for samples from the past without past_grace_period setting": {
Expand All @@ -6809,7 +6809,7 @@ func TestDistributorValidation(t *testing.T) {
labels: [][]mimirpb.LabelAdapter{{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "bar"}}},
samples: []mimirpb.Sample{{TimestampMs: int64(past), Value: 4}},
limits: func(limits *validation.Limits) { limits.PastGracePeriod = model.Duration(now.Sub(past) / 2) },
expectedErr: status.New(codes.FailedPrecondition, fmt.Sprintf(sampleTimestampTooOldMsgFormat, past, "testmetric")),
expectedErr: status.New(codes.InvalidArgument, fmt.Sprintf(sampleTimestampTooOldMsgFormat, past, "testmetric")),
},

"exceeds maximum labels per series": {
Expand All @@ -6818,7 +6818,7 @@ func TestDistributorValidation(t *testing.T) {
TimestampMs: int64(now),
Value: 2,
}},
expectedErr: status.New(codes.FailedPrecondition, fmt.Sprintf(tooManyLabelsMsgFormat, 3, 2, `testmetric{foo="bar", foo2="bar2"}`, "")),
expectedErr: status.New(codes.InvalidArgument, fmt.Sprintf(tooManyLabelsMsgFormat, 3, 2, `testmetric{foo="bar", foo2="bar2"}`, "")),
},
"exceeds maximum labels per series with a metric that exceeds 200 characters when formatted": {
labels: [][]mimirpb.LabelAdapter{{
Expand All @@ -6832,7 +6832,7 @@ func TestDistributorValidation(t *testing.T) {
TimestampMs: int64(now),
Value: 2,
}},
expectedErr: status.New(codes.FailedPrecondition, fmt.Sprintf(tooManyLabelsMsgFormat, 5, 2, `testmetric{foo-with-a-long-long-label="bar-with-a-long-long-value", foo2-with-a-long-long-label="bar2-with-a-long-long-value", foo3-with-a-long-long-label="bar3-with-a-long-long-value", foo4-with-a-lo`, "…")),
expectedErr: status.New(codes.InvalidArgument, fmt.Sprintf(tooManyLabelsMsgFormat, 5, 2, `testmetric{foo-with-a-long-long-label="bar-with-a-long-long-value", foo2-with-a-long-long-label="bar2-with-a-long-long-value", foo3-with-a-long-long-label="bar3-with-a-long-long-value", foo4-with-a-lo`, "…")),
},
"exceeds maximum labels per series with a metric that exceeds 200 bytes when formatted": {
labels: [][]mimirpb.LabelAdapter{{
Expand All @@ -6844,7 +6844,7 @@ func TestDistributorValidation(t *testing.T) {
TimestampMs: int64(now),
Value: 2,
}},
expectedErr: status.New(codes.FailedPrecondition, fmt.Sprintf(tooManyLabelsMsgFormat, 3, 2, `testmetric{families="👩\u200d👦👨\u200d👧👨\u200d👩\u200d👧👩\u200d👧👩\u200d👩\u200d👦\u200d👦👨\u200d👩\u200d👧\u200d👦👨\u200d👧\u200d👦👨\u200d👩\u200d👦👪👨\u200d👦👨\u200d👦\u200d👦👨\u200d👨\u200d👧👨\u200d👧\u200d👧", foo="b"}`, "")),
expectedErr: status.New(codes.InvalidArgument, fmt.Sprintf(tooManyLabelsMsgFormat, 3, 2, `testmetric{families="👩\u200d👦👨\u200d👧👨\u200d👩\u200d👧👩\u200d👧👩\u200d👩\u200d👦\u200d👦👨\u200d👩\u200d👧\u200d👦👨\u200d👧\u200d👦👨\u200d👩\u200d👦👪👨\u200d👦👨\u200d👦\u200d👦👨\u200d👨\u200d👧👨\u200d👧\u200d👧", foo="b"}`, "")),
},
"multiple validation failures should return the first failure": {
labels: [][]mimirpb.LabelAdapter{
Expand All @@ -6855,7 +6855,7 @@ func TestDistributorValidation(t *testing.T) {
{TimestampMs: int64(now), Value: 2},
{TimestampMs: int64(past), Value: 2},
},
expectedErr: status.New(codes.FailedPrecondition, fmt.Sprintf(tooManyLabelsMsgFormat, 3, 2, `testmetric{foo="bar", foo2="bar2"}`, "")),
expectedErr: status.New(codes.InvalidArgument, fmt.Sprintf(tooManyLabelsMsgFormat, 3, 2, `testmetric{foo="bar", foo2="bar2"}`, "")),
},
"metadata validation failure": {
metadata: []*mimirpb.MetricMetadata{{MetricFamilyName: "", Help: "a test metric.", Unit: "", Type: mimirpb.COUNTER}},
Expand All @@ -6864,7 +6864,7 @@ func TestDistributorValidation(t *testing.T) {
TimestampMs: int64(now),
Value: 1,
}},
expectedErr: status.New(codes.FailedPrecondition, metadataMetricNameMissingMsgFormat),
expectedErr: status.New(codes.InvalidArgument, metadataMetricNameMissingMsgFormat),
},
// Validation passes for empty exemplar labels, since we just want to skip the exemplars and not fail the time series as a whole.
"empty exemplar labels": {
Expand Down Expand Up @@ -7552,9 +7552,9 @@ func TestHandlePushError(t *testing.T) {
pushError: httpGrpc5xxErr,
expectedOtherError: httpGrpc5xxErr,
},
"an Error gives the error returned by toGRPCError()": {
"an Error gives the error returned by toErrorWithGRPCStatus()": {
pushError: mockDistributorErr(testErrorMsg),
expectedGRPCError: status.Convert(toGRPCError(mockDistributorErr(testErrorMsg), false)),
expectedGRPCError: status.Convert(toErrorWithGRPCStatus(mockDistributorErr(testErrorMsg), false)),
},
"a random error without status gives an Internal gRPC error": {
pushError: errWithUserID,
Expand Down
74 changes: 52 additions & 22 deletions pkg/distributor/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package distributor
import (
"context"
"fmt"
"net/http"
"time"

"github.com/gogo/status"
Expand Down Expand Up @@ -253,39 +254,68 @@ func (e circuitBreakerOpenError) Cause() mimirpb.ErrorCause {
// Ensure that circuitBreakerOpenError implements Error.
var _ Error = circuitBreakerOpenError{}

// toGRPCError converts the given error into an appropriate gRPC error.
func toGRPCError(pushErr error, serviceOverloadErrorEnabled bool) error {
// toErrorWithGRPCStatus converts the given error into an appropriate gRPC error.
func toErrorWithGRPCStatus(pushErr error, serviceOverloadErrorEnabled bool) error {
var (
distributorErr Error
errDetails *mimirpb.ErrorDetails
errCode = codes.Internal
)
if errors.As(pushErr, &distributorErr) {
errDetails = &mimirpb.ErrorDetails{Cause: distributorErr.Cause()}
switch distributorErr.Cause() {
case mimirpb.BAD_DATA:
errCode = codes.FailedPrecondition
case mimirpb.INGESTION_RATE_LIMITED:
errCode = codes.ResourceExhausted
case mimirpb.REQUEST_RATE_LIMITED:
if serviceOverloadErrorEnabled {
errCode = codes.Unavailable
} else {
errCode = codes.ResourceExhausted
}
case mimirpb.REPLICAS_DID_NOT_MATCH:
errCode = codes.AlreadyExists
case mimirpb.TOO_MANY_CLUSTERS:
errCode = codes.FailedPrecondition
case mimirpb.CIRCUIT_BREAKER_OPEN:
errCode = codes.Unavailable
case mimirpb.METHOD_NOT_ALLOWED:
errCode = codes.Unimplemented
}
errCode = errorCauseToGRPCStatusCode(distributorErr.Cause(), serviceOverloadErrorEnabled)
}
return globalerror.WrapErrorWithGRPCStatus(pushErr, errCode, errDetails).Err()
}

func errorCauseToGRPCStatusCode(errCause mimirpb.ErrorCause, serviceOverloadErrorEnabled bool) codes.Code {
switch errCause {
case mimirpb.BAD_DATA:
return codes.InvalidArgument
aknuds1 marked this conversation as resolved.
Show resolved Hide resolved
case mimirpb.INGESTION_RATE_LIMITED, mimirpb.REQUEST_RATE_LIMITED:
if serviceOverloadErrorEnabled {
return codes.Unavailable
}
return codes.ResourceExhausted
case mimirpb.REPLICAS_DID_NOT_MATCH:
return codes.AlreadyExists
case mimirpb.TOO_MANY_CLUSTERS:
return codes.FailedPrecondition
case mimirpb.CIRCUIT_BREAKER_OPEN:
return codes.Unavailable
case mimirpb.METHOD_NOT_ALLOWED:
return codes.Unimplemented
}
return codes.Internal
}

func errorCauseToHTTPStatusCode(errCause mimirpb.ErrorCause, serviceOverloadErrorEnabled bool) int {
switch errCause {
case mimirpb.BAD_DATA:
return http.StatusBadRequest
case mimirpb.INGESTION_RATE_LIMITED, mimirpb.REQUEST_RATE_LIMITED:
// Return a 429 or a 529 here depending on configuration to tell the client it is going too fast.
// Client may discard the data or slow down and re-send.
// Prometheus v2.26 added a remote-write option 'retry_on_http_429'.
if serviceOverloadErrorEnabled {
return StatusServiceOverloaded
}
return http.StatusTooManyRequests
case mimirpb.REPLICAS_DID_NOT_MATCH:
return http.StatusAccepted
case mimirpb.TOO_MANY_CLUSTERS:
return http.StatusBadRequest
case mimirpb.TSDB_UNAVAILABLE:
return http.StatusServiceUnavailable
case mimirpb.CIRCUIT_BREAKER_OPEN:
return http.StatusServiceUnavailable
case mimirpb.METHOD_NOT_ALLOWED:
// Return a 501 (and not 405) to explicitly signal a misconfiguration and to possibly track that amongst other 5xx errors.
return http.StatusNotImplemented
}
return http.StatusInternalServerError
}

func wrapIngesterPushError(err error, ingesterID string) error {
if err == nil {
return nil
Expand Down
Loading
Loading