diff --git a/CHANGELOG.md b/CHANGELOG.md index 03323404f49..b244573bac7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -53,11 +53,11 @@ * [ENHANCEMENT] OTLP: Speed up conversion from OTel to Mimir format by about 8% and reduce memory consumption by about 30%. Can be disabled via `-distributor.direct-otlp-translation-enabled=false` #7957 * [ENHANCEMENT] Ingester/Querier: Optimise regexps with long lists of alternates. #8221, #8234 * [ENHANCEMENT] Ingester: Include more detail in tracing of queries. #8242 -* [EHNAHCEMENT] Distributor: add `insight=true` to remote-write and OTLP write handlers when the HTTP response status code is 4xx. #8294 +* [ENHANCEMENT] Distributor: add `insight=true` to remote-write and OTLP write handlers when the HTTP response status code is 4xx. #8294 * [ENHANCEMENT] Ingester: reduce locked time while matching postings for a label, improving the write latency and compaction speed. #8327 * [ENHANCEMENT] Ingester: reduce the amount of locks taken during the Head compaction's garbage-collection process, improving the write latency and compaction speed. #8327 * [ENHANCEMENT] Query-frontend: log the start, end time and matchers for remote read requests to the query stats logs. #8326 -* [BUGFIX] Distributor: prometheus retry on 5xx and 429 errors, while otlp collector only retry on 429, 502, 503 and 504, mapping other 5xx errors to the retryable ones in otlp endpoint. #8324 +* [BUGFIX] Distributor: prometheus retry on 5xx and 429 errors, while otlp collector only retry on 429, 502, 503 and 504, mapping other 5xx errors to the retryable ones in otlp endpoint. #8324 #8339 * [BUGFIX] Distributor: make OTLP endpoint return marshalled proto bytes as response body for 4xx/5xx errors. #8227 * [BUGFIX] Rules: improve error handling when querier is local to the ruler. #7567 * [BUGFIX] Querier, store-gateway: Protect against panics raised during snappy encoding. #7520 @@ -428,10 +428,6 @@ * [CHANGE] Ingester: changed the default value for the experimental configuration parameter `-blocks-storage.tsdb.early-head-compaction-min-estimated-series-reduction-percentage` from 10 to 15. #6186 * [CHANGE] Ingester: `/ingester/push` HTTP endpoint has been removed. This endpoint was added for testing and troubleshooting, but was never documented or used for anything. #6299 * [CHANGE] Experimental setting `-log.rate-limit-logs-per-second-burst` renamed to `-log.rate-limit-logs-burst-size`. #6230 -* [CHANGE] Distributor: instead of errors with HTTP status codes, `Push()` now returns errors with gRPC codes: #6377 - * `http.StatusAccepted` (202) code is replaced with `codes.AlreadyExists`. - * `http.BadRequest` (400) code is replaced with `codes.FailedPrecondition`. - * `http.StatusTooManyRequests` (429) and the non-standard `529` (The service is overloaded) codes are replaced with `codes.ResourceExhausted`. * [CHANGE] Ingester: by setting the newly introduced experimental CLI flag `-ingester.return-only-grpc-errors` to true, ingester will return only gRPC errors. This feature changes the following status codes: #6443 #6680 #6723 * `http.StatusBadRequest` (400) is replaced with `codes.FailedPrecondition` on the write path. * `http.StatusServiceUnavailable` (503) is replaced with `codes.Internal` on the write path, and with `codes.ResourceExhausted` on the read path. diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 2662d1f9544..8b98f71ce8a 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -1414,7 +1414,7 @@ func (d *Distributor) handlePushError(ctx context.Context, pushErr error) error if err == nil { serviceOverloadErrorEnabled = d.limits.ServiceOverloadStatusCodeOnRateLimitEnabled(userID) } - return toGRPCError(pushErr, serviceOverloadErrorEnabled) + return toErrorWithGRPCStatus(pushErr, serviceOverloadErrorEnabled) } // push takes a write request and distributes it to ingesters using the ring. diff --git a/pkg/distributor/distributor_ingest_storage_test.go b/pkg/distributor/distributor_ingest_storage_test.go index 5c8a385384c..3ecea83ab5e 100644 --- a/pkg/distributor/distributor_ingest_storage_test.go +++ b/pkg/distributor/distributor_ingest_storage_test.go @@ -285,6 +285,9 @@ func TestDistributor_Push_ShouldReturnErrorMappedTo4xxStatusCodeIfWriteRequestCo limits := prepareDefaultLimits() limits.MaxLabelValueLength = hugeLabelValueLength + overrides, err := validation.NewOverrides(*limits, nil) + require.NoError(t, err) + testConfig := prepConfig{ numDistributors: 1, ingestStorageEnabled: true, @@ -305,7 +308,7 @@ func TestDistributor_Push_ShouldReturnErrorMappedTo4xxStatusCodeIfWriteRequestCo // We expect a gRPC error. errStatus, ok := grpcutil.ErrorToStatus(err) require.True(t, ok) - assert.Equal(t, codes.FailedPrecondition, errStatus.Code()) + assert.Equal(t, codes.InvalidArgument, errStatus.Code()) assert.ErrorContains(t, errStatus.Err(), ingest.ErrWriteRequestDataItemTooLarge.Error()) // We expect the gRPC error to be detected as client error. @@ -321,7 +324,7 @@ func TestDistributor_Push_ShouldReturnErrorMappedTo4xxStatusCodeIfWriteRequestCo sourceIPs, _ := middleware.NewSourceIPs("SomeField", "(.*)", false) // Send write request through the HTTP handler. - h := Handler(maxRecvMsgSize, nil, sourceIPs, false, nil, RetryConfig{}, distributors[0].PushWithMiddlewares, nil, log.NewNopLogger()) + h := Handler(maxRecvMsgSize, nil, sourceIPs, false, overrides, RetryConfig{}, distributors[0].PushWithMiddlewares, nil, log.NewNopLogger()) h.ServeHTTP(resp, createRequest(t, marshalledReq)) assert.Equal(t, http.StatusBadRequest, resp.Code) }) diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 3af3123fc14..9bae65259ad 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -1005,7 +1005,7 @@ func TestDistributor_PushHAInstances(t *testing.T) { testReplica: "instance1234567890123456789012345678901234567890", cluster: "cluster0", samples: 5, - expectedError: status.New(codes.FailedPrecondition, fmt.Sprintf(labelValueTooLongMsgFormat, "__replica__", "instance1234567890123456789012345678901234567890", mimirpb.FromLabelAdaptersToString(labelSetGenWithReplicaAndCluster("instance1234567890123456789012345678901234567890", "cluster0")(0)))), + expectedError: status.New(codes.InvalidArgument, fmt.Sprintf(labelValueTooLongMsgFormat, "__replica__", "instance1234567890123456789012345678901234567890", mimirpb.FromLabelAdaptersToString(labelSetGenWithReplicaAndCluster("instance1234567890123456789012345678901234567890", "cluster0")(0)))), expectedDetails: &mimirpb.ErrorDetails{Cause: mimirpb.BAD_DATA}, }, } { @@ -1528,14 +1528,14 @@ func TestDistributor_Push_HistogramValidation(t *testing.T) { }, "too new histogram": { req: makeWriteRequestHistogram([]string{model.MetricNameLabel, "test"}, math.MaxInt64, generateTestHistogram(0)), - expectedErr: status.New(codes.FailedPrecondition, fmt.Sprintf(sampleTimestampTooNewMsgFormat, math.MaxInt64, "test")), + expectedErr: status.New(codes.InvalidArgument, fmt.Sprintf(sampleTimestampTooNewMsgFormat, math.MaxInt64, "test")), }, "valid float histogram": { req: makeWriteRequestFloatHistogram([]string{model.MetricNameLabel, "test"}, 1000, generateTestFloatHistogram(0)), }, "too new float histogram": { req: makeWriteRequestFloatHistogram([]string{model.MetricNameLabel, "test"}, math.MaxInt64, generateTestFloatHistogram(0)), - expectedErr: status.New(codes.FailedPrecondition, fmt.Sprintf(sampleTimestampTooNewMsgFormat, math.MaxInt64, "test")), + expectedErr: status.New(codes.InvalidArgument, fmt.Sprintf(sampleTimestampTooNewMsgFormat, math.MaxInt64, "test")), }, "buckets at limit": { req: makeWriteRequestFloatHistogram([]string{model.MetricNameLabel, "test"}, 1000, testHistogram), @@ -1546,7 +1546,7 @@ func TestDistributor_Push_HistogramValidation(t *testing.T) { bucketLimit: 7, errMsg: "received a native histogram sample with too many buckets, timestamp", errID: globalerror.MaxNativeHistogramBuckets, - expectedErr: status.New(codes.FailedPrecondition, fmt.Sprintf(maxNativeHistogramBucketsMsgFormat, 1000, "test", 8, 7)), + expectedErr: status.New(codes.InvalidArgument, fmt.Sprintf(maxNativeHistogramBucketsMsgFormat, 1000, "test", 8, 7)), }, } @@ -6797,7 +6797,7 @@ func TestDistributorValidation(t *testing.T) { TimestampMs: int64(future), Value: 4, }}, - expectedErr: status.New(codes.FailedPrecondition, fmt.Sprintf(sampleTimestampTooNewMsgFormat, future, "testmetric")), + expectedErr: status.New(codes.InvalidArgument, fmt.Sprintf(sampleTimestampTooNewMsgFormat, future, "testmetric")), }, "validation does not fail for samples from the past without past_grace_period setting": { @@ -6809,7 +6809,7 @@ func TestDistributorValidation(t *testing.T) { labels: [][]mimirpb.LabelAdapter{{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "bar"}}}, samples: []mimirpb.Sample{{TimestampMs: int64(past), Value: 4}}, limits: func(limits *validation.Limits) { limits.PastGracePeriod = model.Duration(now.Sub(past) / 2) }, - expectedErr: status.New(codes.FailedPrecondition, fmt.Sprintf(sampleTimestampTooOldMsgFormat, past, "testmetric")), + expectedErr: status.New(codes.InvalidArgument, fmt.Sprintf(sampleTimestampTooOldMsgFormat, past, "testmetric")), }, "exceeds maximum labels per series": { @@ -6818,7 +6818,7 @@ func TestDistributorValidation(t *testing.T) { TimestampMs: int64(now), Value: 2, }}, - expectedErr: status.New(codes.FailedPrecondition, fmt.Sprintf(tooManyLabelsMsgFormat, 3, 2, `testmetric{foo="bar", foo2="bar2"}`, "")), + expectedErr: status.New(codes.InvalidArgument, fmt.Sprintf(tooManyLabelsMsgFormat, 3, 2, `testmetric{foo="bar", foo2="bar2"}`, "")), }, "exceeds maximum labels per series with a metric that exceeds 200 characters when formatted": { labels: [][]mimirpb.LabelAdapter{{ @@ -6832,7 +6832,7 @@ func TestDistributorValidation(t *testing.T) { TimestampMs: int64(now), Value: 2, }}, - expectedErr: status.New(codes.FailedPrecondition, fmt.Sprintf(tooManyLabelsMsgFormat, 5, 2, `testmetric{foo-with-a-long-long-label="bar-with-a-long-long-value", foo2-with-a-long-long-label="bar2-with-a-long-long-value", foo3-with-a-long-long-label="bar3-with-a-long-long-value", foo4-with-a-lo`, "…")), + expectedErr: status.New(codes.InvalidArgument, fmt.Sprintf(tooManyLabelsMsgFormat, 5, 2, `testmetric{foo-with-a-long-long-label="bar-with-a-long-long-value", foo2-with-a-long-long-label="bar2-with-a-long-long-value", foo3-with-a-long-long-label="bar3-with-a-long-long-value", foo4-with-a-lo`, "…")), }, "exceeds maximum labels per series with a metric that exceeds 200 bytes when formatted": { labels: [][]mimirpb.LabelAdapter{{ @@ -6844,7 +6844,7 @@ func TestDistributorValidation(t *testing.T) { TimestampMs: int64(now), Value: 2, }}, - expectedErr: status.New(codes.FailedPrecondition, fmt.Sprintf(tooManyLabelsMsgFormat, 3, 2, `testmetric{families="👩\u200d👦👨\u200d👧👨\u200d👩\u200d👧👩\u200d👧👩\u200d👩\u200d👦\u200d👦👨\u200d👩\u200d👧\u200d👦👨\u200d👧\u200d👦👨\u200d👩\u200d👦👪👨\u200d👦👨\u200d👦\u200d👦👨\u200d👨\u200d👧👨\u200d👧\u200d👧", foo="b"}`, "")), + expectedErr: status.New(codes.InvalidArgument, fmt.Sprintf(tooManyLabelsMsgFormat, 3, 2, `testmetric{families="👩\u200d👦👨\u200d👧👨\u200d👩\u200d👧👩\u200d👧👩\u200d👩\u200d👦\u200d👦👨\u200d👩\u200d👧\u200d👦👨\u200d👧\u200d👦👨\u200d👩\u200d👦👪👨\u200d👦👨\u200d👦\u200d👦👨\u200d👨\u200d👧👨\u200d👧\u200d👧", foo="b"}`, "")), }, "multiple validation failures should return the first failure": { labels: [][]mimirpb.LabelAdapter{ @@ -6855,7 +6855,7 @@ func TestDistributorValidation(t *testing.T) { {TimestampMs: int64(now), Value: 2}, {TimestampMs: int64(past), Value: 2}, }, - expectedErr: status.New(codes.FailedPrecondition, fmt.Sprintf(tooManyLabelsMsgFormat, 3, 2, `testmetric{foo="bar", foo2="bar2"}`, "")), + expectedErr: status.New(codes.InvalidArgument, fmt.Sprintf(tooManyLabelsMsgFormat, 3, 2, `testmetric{foo="bar", foo2="bar2"}`, "")), }, "metadata validation failure": { metadata: []*mimirpb.MetricMetadata{{MetricFamilyName: "", Help: "a test metric.", Unit: "", Type: mimirpb.COUNTER}}, @@ -6864,7 +6864,7 @@ func TestDistributorValidation(t *testing.T) { TimestampMs: int64(now), Value: 1, }}, - expectedErr: status.New(codes.FailedPrecondition, metadataMetricNameMissingMsgFormat), + expectedErr: status.New(codes.InvalidArgument, metadataMetricNameMissingMsgFormat), }, // Validation passes for empty exemplar labels, since we just want to skip the exemplars and not fail the time series as a whole. "empty exemplar labels": { @@ -7552,9 +7552,9 @@ func TestHandlePushError(t *testing.T) { pushError: httpGrpc5xxErr, expectedOtherError: httpGrpc5xxErr, }, - "an Error gives the error returned by toGRPCError()": { + "an Error gives the error returned by toErrorWithGRPCStatus()": { pushError: mockDistributorErr(testErrorMsg), - expectedGRPCError: status.Convert(toGRPCError(mockDistributorErr(testErrorMsg), false)), + expectedGRPCError: status.Convert(toErrorWithGRPCStatus(mockDistributorErr(testErrorMsg), false)), }, "a random error without status gives an Internal gRPC error": { pushError: errWithUserID, diff --git a/pkg/distributor/errors.go b/pkg/distributor/errors.go index 8bb59b8fffe..5c094cea5a9 100644 --- a/pkg/distributor/errors.go +++ b/pkg/distributor/errors.go @@ -5,6 +5,7 @@ package distributor import ( "context" "fmt" + "net/http" "time" "github.com/gogo/status" @@ -253,8 +254,8 @@ func (e circuitBreakerOpenError) Cause() mimirpb.ErrorCause { // Ensure that circuitBreakerOpenError implements Error. var _ Error = circuitBreakerOpenError{} -// toGRPCError converts the given error into an appropriate gRPC error. -func toGRPCError(pushErr error, serviceOverloadErrorEnabled bool) error { +// toErrorWithGRPCStatus converts the given error into an appropriate gRPC error. +func toErrorWithGRPCStatus(pushErr error, serviceOverloadErrorEnabled bool) error { var ( distributorErr Error errDetails *mimirpb.ErrorDetails @@ -262,30 +263,59 @@ func toGRPCError(pushErr error, serviceOverloadErrorEnabled bool) error { ) if errors.As(pushErr, &distributorErr) { errDetails = &mimirpb.ErrorDetails{Cause: distributorErr.Cause()} - switch distributorErr.Cause() { - case mimirpb.BAD_DATA: - errCode = codes.FailedPrecondition - case mimirpb.INGESTION_RATE_LIMITED: - errCode = codes.ResourceExhausted - case mimirpb.REQUEST_RATE_LIMITED: - if serviceOverloadErrorEnabled { - errCode = codes.Unavailable - } else { - errCode = codes.ResourceExhausted - } - case mimirpb.REPLICAS_DID_NOT_MATCH: - errCode = codes.AlreadyExists - case mimirpb.TOO_MANY_CLUSTERS: - errCode = codes.FailedPrecondition - case mimirpb.CIRCUIT_BREAKER_OPEN: - errCode = codes.Unavailable - case mimirpb.METHOD_NOT_ALLOWED: - errCode = codes.Unimplemented - } + errCode = errorCauseToGRPCStatusCode(distributorErr.Cause(), serviceOverloadErrorEnabled) } return globalerror.WrapErrorWithGRPCStatus(pushErr, errCode, errDetails).Err() } +func errorCauseToGRPCStatusCode(errCause mimirpb.ErrorCause, serviceOverloadErrorEnabled bool) codes.Code { + switch errCause { + case mimirpb.BAD_DATA: + return codes.InvalidArgument + case mimirpb.INGESTION_RATE_LIMITED, mimirpb.REQUEST_RATE_LIMITED: + if serviceOverloadErrorEnabled { + return codes.Unavailable + } + return codes.ResourceExhausted + case mimirpb.REPLICAS_DID_NOT_MATCH: + return codes.AlreadyExists + case mimirpb.TOO_MANY_CLUSTERS: + return codes.FailedPrecondition + case mimirpb.CIRCUIT_BREAKER_OPEN: + return codes.Unavailable + case mimirpb.METHOD_NOT_ALLOWED: + return codes.Unimplemented + } + return codes.Internal +} + +func errorCauseToHTTPStatusCode(errCause mimirpb.ErrorCause, serviceOverloadErrorEnabled bool) int { + switch errCause { + case mimirpb.BAD_DATA: + return http.StatusBadRequest + case mimirpb.INGESTION_RATE_LIMITED, mimirpb.REQUEST_RATE_LIMITED: + // Return a 429 or a 529 here depending on configuration to tell the client it is going too fast. + // Client may discard the data or slow down and re-send. + // Prometheus v2.26 added a remote-write option 'retry_on_http_429'. + if serviceOverloadErrorEnabled { + return StatusServiceOverloaded + } + return http.StatusTooManyRequests + case mimirpb.REPLICAS_DID_NOT_MATCH: + return http.StatusAccepted + case mimirpb.TOO_MANY_CLUSTERS: + return http.StatusBadRequest + case mimirpb.TSDB_UNAVAILABLE: + return http.StatusServiceUnavailable + case mimirpb.CIRCUIT_BREAKER_OPEN: + return http.StatusServiceUnavailable + case mimirpb.METHOD_NOT_ALLOWED: + // Return a 501 (and not 405) to explicitly signal a misconfiguration and to possibly track that amongst other 5xx errors. + return http.StatusNotImplemented + } + return http.StatusInternalServerError +} + func wrapIngesterPushError(err error, ingesterID string) error { if err == nil { return nil diff --git a/pkg/distributor/errors_test.go b/pkg/distributor/errors_test.go index d0169daba7b..1110990d42d 100644 --- a/pkg/distributor/errors_test.go +++ b/pkg/distributor/errors_test.go @@ -201,7 +201,7 @@ func TestNewCircuitBreakerOpenError(t *testing.T) { checkDistributorError(t, wrappedErr, mimirpb.CIRCUIT_BREAKER_OPEN) } -func TestToGRPCError(t *testing.T) { +func TestToErrorWithGRPCStatus(t *testing.T) { const ( ingesterID = "ingester-25" originalMsg = "this is an error" @@ -253,25 +253,39 @@ func TestToGRPCError(t *testing.T) { expectedErrorMsg: tooManyClustersErr.Error(), expectedErrorDetails: &mimirpb.ErrorDetails{Cause: mimirpb.TOO_MANY_CLUSTERS}, }, - "a validationError gets translated into gets translated into a FailedPrecondition error with VALIDATION cause": { + "a validationError gets translated into an InvalidArgument error with BAD_DATA cause": { err: newValidationError(originalErr), - expectedGRPCCode: codes.FailedPrecondition, + expectedGRPCCode: codes.InvalidArgument, expectedErrorMsg: originalMsg, expectedErrorDetails: &mimirpb.ErrorDetails{Cause: mimirpb.BAD_DATA}, }, - "a DoNotLogError of a validationError gets translated into gets translated into a FailedPrecondition error with VALIDATION cause": { + "a DoNotLogError of a validationError gets translated into an InvalidArgument error with BAD_DATA cause": { err: middleware.DoNotLogError{Err: newValidationError(originalErr)}, - expectedGRPCCode: codes.FailedPrecondition, + expectedGRPCCode: codes.InvalidArgument, expectedErrorMsg: originalMsg, expectedErrorDetails: &mimirpb.ErrorDetails{Cause: mimirpb.BAD_DATA}, }, - "an ingestionRateLimitedError gets translated into gets translated into a ResourceExhausted error with INGESTION_RATE_LIMITED cause": { + "an ingestionRateLimitedError with serviceOverloadErrorEnabled gets translated into an Unavailable error with INGESTION_RATE_LIMITED cause": { + err: ingestionRateLimitedErr, + serviceOverloadErrorEnabled: true, + expectedGRPCCode: codes.Unavailable, + expectedErrorMsg: ingestionRateLimitedErr.Error(), + expectedErrorDetails: &mimirpb.ErrorDetails{Cause: mimirpb.INGESTION_RATE_LIMITED}, + }, + "a DoNotLogError of an ingestionRateLimitedError with serviceOverloadErrorEnabled gets translated into an Unavailable error with INGESTION_RATE_LIMITED cause": { + err: middleware.DoNotLogError{Err: ingestionRateLimitedErr}, + serviceOverloadErrorEnabled: true, + expectedGRPCCode: codes.Unavailable, + expectedErrorMsg: ingestionRateLimitedErr.Error(), + expectedErrorDetails: &mimirpb.ErrorDetails{Cause: mimirpb.INGESTION_RATE_LIMITED}, + }, + "an ingestionRateLimitedError without serviceOverloadErrorEnabled gets translated into a ResourceExhausted error with INGESTION_RATE_LIMITED cause": { err: ingestionRateLimitedErr, expectedGRPCCode: codes.ResourceExhausted, expectedErrorMsg: ingestionRateLimitedErr.Error(), expectedErrorDetails: &mimirpb.ErrorDetails{Cause: mimirpb.INGESTION_RATE_LIMITED}, }, - "a DoNotLogError of an ingestionRateLimitedError gets translated into gets translated into a ResourceExhausted error with INGESTION_RATE_LIMITED cause": { + "a DoNotLogError of an ingestionRateLimitedError without serviceOverloadErrorEnabled gets translated into a ResourceExhausted error with INGESTION_RATE_LIMITED cause": { err: middleware.DoNotLogError{Err: ingestionRateLimitedErr}, expectedGRPCCode: codes.ResourceExhausted, expectedErrorMsg: ingestionRateLimitedErr.Error(), @@ -303,15 +317,15 @@ func TestToGRPCError(t *testing.T) { expectedErrorMsg: requestRateLimitedErr.Error(), expectedErrorDetails: &mimirpb.ErrorDetails{Cause: mimirpb.REQUEST_RATE_LIMITED}, }, - "an ingesterPushError with BAD_DATA cause gets translated into a FailedPrecondition error with BAD_DATA cause": { + "an ingesterPushError with BAD_DATA cause gets translated into an InvalidArgument error with BAD_DATA cause": { err: newIngesterPushError(createStatusWithDetails(t, codes.Internal, originalMsg, mimirpb.BAD_DATA), ingesterID), - expectedGRPCCode: codes.FailedPrecondition, + expectedGRPCCode: codes.InvalidArgument, expectedErrorMsg: fmt.Sprintf("%s %s: %s", failedPushingToIngesterMessage, ingesterID, originalMsg), expectedErrorDetails: &mimirpb.ErrorDetails{Cause: mimirpb.BAD_DATA}, }, - "a DoNotLogError of an ingesterPushError with BAD_DATA cause gets translated into a FailedPrecondition error with BAD_DATA cause": { + "a DoNotLogError of an ingesterPushError with BAD_DATA cause gets translated into an InvalidArgument error with BAD_DATA cause": { err: middleware.DoNotLogError{Err: newIngesterPushError(createStatusWithDetails(t, codes.Internal, originalMsg, mimirpb.BAD_DATA), ingesterID)}, - expectedGRPCCode: codes.FailedPrecondition, + expectedGRPCCode: codes.InvalidArgument, expectedErrorMsg: fmt.Sprintf("%s %s: %s", failedPushingToIngesterMessage, ingesterID, originalMsg), expectedErrorDetails: &mimirpb.ErrorDetails{Cause: mimirpb.BAD_DATA}, }, @@ -390,7 +404,7 @@ func TestToGRPCError(t *testing.T) { } for name, tc := range testCases { t.Run(name, func(t *testing.T) { - err := toGRPCError(tc.err, tc.serviceOverloadErrorEnabled) + err := toErrorWithGRPCStatus(tc.err, tc.serviceOverloadErrorEnabled) stat, ok := grpcutil.ErrorToStatus(err) require.True(t, ok) @@ -590,6 +604,158 @@ func TestIsIngestionClientError(t *testing.T) { } } +func TestErrorCauseToGRPCStatusCode(t *testing.T) { + type testStruct struct { + errorCause mimirpb.ErrorCause + serviceOverloadErrorEnabled bool + expectedGRPCStatusCode codes.Code + } + testCases := map[string]testStruct{ + "a REPLICAS_DID_NOT_MATCH error cause gets translated into an AlreadyExists": { + errorCause: mimirpb.REPLICAS_DID_NOT_MATCH, + expectedGRPCStatusCode: codes.AlreadyExists, + }, + "a TOO_MANY_CLUSTERS error cause gets translated into a FailedPrecondition": { + errorCause: mimirpb.TOO_MANY_CLUSTERS, + expectedGRPCStatusCode: codes.FailedPrecondition, + }, + "a BAD_DATA error cause gets translated into a InvalidArgument": { + errorCause: mimirpb.BAD_DATA, + expectedGRPCStatusCode: codes.InvalidArgument, + }, + "an INGESTION_RATE_LIMITED error cause without serviceOverloadErrorEnabled gets translated into a ResourceExhausted": { + errorCause: mimirpb.INGESTION_RATE_LIMITED, + serviceOverloadErrorEnabled: false, + expectedGRPCStatusCode: codes.ResourceExhausted, + }, + "an INGESTION_RATE_LIMITED error cause with serviceOverloadErrorEnabled gets translated into an Unavailable": { + errorCause: mimirpb.INGESTION_RATE_LIMITED, + serviceOverloadErrorEnabled: true, + expectedGRPCStatusCode: codes.Unavailable, + }, + "a REQUEST_RATE_LIMITED error cause without serviceOverloadErrorEnabled gets translated into a ResourceExhausted": { + errorCause: mimirpb.REQUEST_RATE_LIMITED, + serviceOverloadErrorEnabled: false, + expectedGRPCStatusCode: codes.ResourceExhausted, + }, + "a REQUEST_RATE_LIMITED error cause with serviceOverloadErrorEnabled gets translated into an Unavailable": { + errorCause: mimirpb.REQUEST_RATE_LIMITED, + serviceOverloadErrorEnabled: true, + expectedGRPCStatusCode: codes.Unavailable, + }, + "an UNKNOWN error cause gets translated into an Internal": { + errorCause: mimirpb.UNKNOWN_CAUSE, + expectedGRPCStatusCode: codes.Internal, + }, + "an INSTANCE_LIMIT error cause gets translated into an Internal": { + errorCause: mimirpb.INSTANCE_LIMIT, + expectedGRPCStatusCode: codes.Internal, + }, + "a SERVICE_UNAVAILABLE error cause gets translated into an Internal": { + errorCause: mimirpb.SERVICE_UNAVAILABLE, + expectedGRPCStatusCode: codes.Internal, + }, + "a TOO_BUSY error cause gets translated into an Internal": { + errorCause: mimirpb.TOO_BUSY, + expectedGRPCStatusCode: codes.Internal, + }, + "a CIRCUIT_BREAKER_OPEN error cause gets translated into an Unavailable": { + errorCause: mimirpb.CIRCUIT_BREAKER_OPEN, + expectedGRPCStatusCode: codes.Unavailable, + }, + "a METHOD_NOT_ALLOWED error cause gets translated into an Unimplemented": { + errorCause: mimirpb.METHOD_NOT_ALLOWED, + expectedGRPCStatusCode: codes.Unimplemented, + }, + "a TSDB_UNAVAILABLE error cause gets translated into an Internal": { + errorCause: mimirpb.TSDB_UNAVAILABLE, + expectedGRPCStatusCode: codes.Internal, + }, + } + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + status := errorCauseToGRPCStatusCode(tc.errorCause, tc.serviceOverloadErrorEnabled) + assert.Equal(t, tc.expectedGRPCStatusCode, status) + }) + } +} + +func TestErrorCauseToHTTPStatusCode(t *testing.T) { + type testStruct struct { + errorCause mimirpb.ErrorCause + serviceOverloadErrorEnabled bool + expectedHTTPStatus int + } + testCases := map[string]testStruct{ + "a REPLICAS_DID_NOT_MATCH error cause gets translated into a HTTP 202": { + errorCause: mimirpb.REPLICAS_DID_NOT_MATCH, + expectedHTTPStatus: http.StatusAccepted, + }, + "a TOO_MANY_CLUSTERS error cause gets translated into a HTTP 400": { + errorCause: mimirpb.TOO_MANY_CLUSTERS, + expectedHTTPStatus: http.StatusBadRequest, + }, + "a BAD_DATA error cause gets translated into a HTTP 400": { + errorCause: mimirpb.BAD_DATA, + expectedHTTPStatus: http.StatusBadRequest, + }, + "an INGESTION_RATE_LIMITED error cause without serviceOverloadErrorEnabled gets translated into a HTTP 429": { + errorCause: mimirpb.INGESTION_RATE_LIMITED, + serviceOverloadErrorEnabled: false, + expectedHTTPStatus: http.StatusTooManyRequests, + }, + "an INGESTION_RATE_LIMITED error cause with serviceOverloadErrorEnabled gets translated into a HTTP 529": { + errorCause: mimirpb.INGESTION_RATE_LIMITED, + serviceOverloadErrorEnabled: true, + expectedHTTPStatus: StatusServiceOverloaded, + }, + "a REQUEST_RATE_LIMITED error cause without serviceOverloadErrorEnabled gets translated into a HTTP 429": { + errorCause: mimirpb.REQUEST_RATE_LIMITED, + serviceOverloadErrorEnabled: false, + expectedHTTPStatus: http.StatusTooManyRequests, + }, + "a REQUEST_RATE_LIMITED error cause with serviceOverloadErrorEnabled gets translated into a HTTP 529": { + errorCause: mimirpb.REQUEST_RATE_LIMITED, + serviceOverloadErrorEnabled: true, + expectedHTTPStatus: StatusServiceOverloaded, + }, + "an UNKNOWN error cause gets translated into a HTTP 500": { + errorCause: mimirpb.UNKNOWN_CAUSE, + expectedHTTPStatus: http.StatusInternalServerError, + }, + "an INSTANCE_LIMIT error cause gets translated into a HTTP 500": { + errorCause: mimirpb.INSTANCE_LIMIT, + expectedHTTPStatus: http.StatusInternalServerError, + }, + "a SERVICE_UNAVAILABLE error cause gets translated into a HTTP 500": { + errorCause: mimirpb.SERVICE_UNAVAILABLE, + expectedHTTPStatus: http.StatusInternalServerError, + }, + "a TOO_BUSY error cause gets translated into a HTTP 500": { + errorCause: mimirpb.TOO_BUSY, + expectedHTTPStatus: http.StatusInternalServerError, + }, + "a METHOD_NOT_ALLOWED error cause gets translated into a HTTP 501": { + errorCause: mimirpb.METHOD_NOT_ALLOWED, + expectedHTTPStatus: http.StatusNotImplemented, + }, + "a CIRCUIT_BREAKER_OPEN error cause gets translated into a HTTP 503": { + errorCause: mimirpb.CIRCUIT_BREAKER_OPEN, + expectedHTTPStatus: http.StatusServiceUnavailable, + }, + "a TSDB_UNAVAILABLE error cause gets translated into a HTTP 503": { + errorCause: mimirpb.TSDB_UNAVAILABLE, + expectedHTTPStatus: http.StatusServiceUnavailable, + }, + } + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + status := errorCauseToHTTPStatusCode(tc.errorCause, tc.serviceOverloadErrorEnabled) + assert.Equal(t, tc.expectedHTTPStatus, status) + }) + } +} + func checkDistributorError(t *testing.T, err error, expectedCause mimirpb.ErrorCause) { var distributorErr Error require.ErrorAs(t, err, &distributorErr) diff --git a/pkg/distributor/otel.go b/pkg/distributor/otel.go index d2f3852cd74..aea82261809 100644 --- a/pkg/distributor/otel.go +++ b/pkg/distributor/otel.go @@ -13,6 +13,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/gogo/protobuf/proto" + "github.com/gogo/status" "github.com/grafana/dskit/grpcutil" "github.com/grafana/dskit/httpgrpc" "github.com/grafana/dskit/middleware" @@ -28,7 +29,6 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" "go.uber.org/multierr" "google.golang.org/grpc/codes" - grpcstatus "google.golang.org/grpc/status" "github.com/grafana/mimir/pkg/distributor/otlp" "github.com/grafana/mimir/pkg/mimirpb" @@ -252,12 +252,13 @@ func otlpHandler( grpcCode codes.Code errorMsg string ) - if resp, ok := httpgrpc.HTTPResponseFromError(err); ok { - // here the error would always be nil, since it is already checked in httpgrpc.HTTPResponseFromError - s, _ := grpcutil.ErrorToStatus(err) - httpCode = int(resp.Code) - grpcCode = s.Code() // this will be the same as httpCode. - errorMsg = string(resp.Body) + if st, ok := grpcutil.ErrorToStatus(err); ok { + // TODO: This code is needed for backwards compatibility, + // and can be removed once -ingester.return-only-grpc-errors + // is removed. + httpCode = httpRetryableToOTLPRetryable(int(st.Code())) + grpcCode = st.Code() + errorMsg = st.Message() } else { grpcCode, httpCode = toOtlpGRPCHTTPStatus(err) errorMsg = err.Error() @@ -277,30 +278,33 @@ func otlpHandler( } // toOtlpGRPCHTTPStatus is utilized by the OTLP endpoint. -// According to the OTLP specifications (https://opentelemetry.io/docs/specs/otlp/#failures-1), unlike Prometheus, the OTLP client only retries on HTTP status codes 429, 502, 503, and 504. func toOtlpGRPCHTTPStatus(pushErr error) (codes.Code, int) { - if !errors.Is(pushErr, context.DeadlineExceeded) { - var distributorErr Error - if errors.As(pushErr, &distributorErr) { - switch distributorErr.Cause() { - case mimirpb.BAD_DATA: - return codes.InvalidArgument, http.StatusBadRequest - case mimirpb.INGESTION_RATE_LIMITED, mimirpb.REQUEST_RATE_LIMITED: - return codes.ResourceExhausted, http.StatusTooManyRequests - case mimirpb.REPLICAS_DID_NOT_MATCH: - return codes.OK, http.StatusAccepted - case mimirpb.TOO_MANY_CLUSTERS: - return codes.InvalidArgument, http.StatusBadRequest - case mimirpb.TSDB_UNAVAILABLE: - return codes.Unavailable, http.StatusServiceUnavailable - case mimirpb.CIRCUIT_BREAKER_OPEN: - return codes.Unavailable, http.StatusServiceUnavailable - case mimirpb.METHOD_NOT_ALLOWED: - return codes.Unimplemented, http.StatusNotImplemented - } + var distributorErr Error + if errors.Is(pushErr, context.DeadlineExceeded) || !errors.As(pushErr, &distributorErr) { + return codes.Internal, http.StatusServiceUnavailable + } + + grpcStatusCode := errorCauseToGRPCStatusCode(distributorErr.Cause(), false) + httpStatusCode := errorCauseToHTTPStatusCode(distributorErr.Cause(), false) + otlpHTTPStatusCode := httpRetryableToOTLPRetryable(httpStatusCode) + return grpcStatusCode, otlpHTTPStatusCode +} + +// httpRetryableToOTLPRetryable maps non-retryable 5xx HTTP status codes according +// to the OTLP specifications (https://opentelemetry.io/docs/specs/otlp/#failures-1) +// to http.StatusServiceUnavailable. In case of a non-retryable HTTP status code, +// httpRetryableToOTLPRetryable returns the HTTP status code itself. +// Unlike Prometheus, which retries 429 and all 5xx HTTP status codes, +// the OTLP client only retries on HTTP status codes 429, 502, 503, and 504. +func httpRetryableToOTLPRetryable(httpStatusCode int) int { + if httpStatusCode/100 == 5 { + mask := httpStatusCode % 100 + // We map all 5xx except 502, 503 and 504 into 503. + if mask <= 1 || mask > 4 { + return http.StatusServiceUnavailable } } - return codes.Internal, http.StatusServiceUnavailable + return httpStatusCode } // writeErrorToHTTPResponseBody converts the given error into a grpc status and marshals it into a byte slice, in order to be written to the response body. @@ -310,10 +314,10 @@ func writeErrorToHTTPResponseBody(w http.ResponseWriter, httpCode int, grpcCode w.Header().Set("X-Content-Type-Options", "nosniff") w.WriteHeader(httpCode) - respBytes, err := proto.Marshal(grpcstatus.New(grpcCode, msg).Proto()) + respBytes, err := proto.Marshal(status.New(grpcCode, msg).Proto()) if err != nil { level.Error(logger).Log("msg", "otlp response marshal failed", "err", err) - writeResponseFailedBody, _ := proto.Marshal(grpcstatus.New(codes.Internal, "failed to marshal OTLP response").Proto()) + writeResponseFailedBody, _ := proto.Marshal(status.New(codes.Internal, "failed to marshal OTLP response").Proto()) _, _ = w.Write(writeResponseFailedBody) return } diff --git a/pkg/distributor/otel_test.go b/pkg/distributor/otel_test.go index 631ed45500e..9f58e7b114d 100644 --- a/pkg/distributor/otel_test.go +++ b/pkg/distributor/otel_test.go @@ -14,7 +14,6 @@ import ( "testing" "time" - "github.com/failsafe-go/failsafe-go/circuitbreaker" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/concurrency" @@ -589,173 +588,194 @@ func TestHandler_toOtlpGRPCHTTPStatus(t *testing.T) { err error expectedHTTPStatus int expectedGRPCStatus codes.Code - expectedErrorMsg string } testCases := map[string]testStruct{ - "a generic error gets translated into a HTTP 503": { + "a generic error gets translated into gRPC code.Internal and HTTP 503 statuses": { err: originalErr, expectedHTTPStatus: http.StatusServiceUnavailable, expectedGRPCStatus: codes.Internal, - expectedErrorMsg: originalMsg, }, - "a DoNotLog of a generic error gets translated into a HTTP 503": { + "a DoNotLog of a generic error gets translated into gRPC codes.Internal and HTTP 503 statuses": { err: middleware.DoNotLogError{Err: originalErr}, expectedHTTPStatus: http.StatusServiceUnavailable, expectedGRPCStatus: codes.Internal, - expectedErrorMsg: originalMsg, }, - "a context.DeadlineExceeded gets translated into a HTTP 503": { + "a context.DeadlineExceeded gets translated into gRPC codes.Internal and HTTP 503 statuses": { err: context.DeadlineExceeded, expectedHTTPStatus: http.StatusServiceUnavailable, expectedGRPCStatus: codes.Internal, - expectedErrorMsg: context.DeadlineExceeded.Error(), }, - "a replicasDidNotMatchError gets translated into an HTTP 202": { + "a replicasDidNotMatchError gets translated into gRPC codes.AlreadyExists and HTTP 202 statuses": { err: replicasNotMatchErr, expectedHTTPStatus: http.StatusAccepted, - expectedGRPCStatus: codes.OK, - expectedErrorMsg: replicasNotMatchErr.Error(), + expectedGRPCStatus: codes.AlreadyExists, }, - "a DoNotLogError of a replicasDidNotMatchError gets translated into an HTTP 202": { + "a DoNotLogError of a replicasDidNotMatchError gets translated into gRPC codes.AlreadyExists and HTTP 202 statuses": { err: middleware.DoNotLogError{Err: replicasNotMatchErr}, expectedHTTPStatus: http.StatusAccepted, - expectedGRPCStatus: codes.OK, - expectedErrorMsg: replicasNotMatchErr.Error(), + expectedGRPCStatus: codes.AlreadyExists, }, - "a tooManyClustersError gets translated into an HTTP 400": { + "a tooManyClustersError gets translated into gRPC codes.FailedPrecondition and HTTP 400 statuses": { err: tooManyClustersErr, expectedHTTPStatus: http.StatusBadRequest, - expectedGRPCStatus: codes.InvalidArgument, - expectedErrorMsg: tooManyClustersErr.Error(), + expectedGRPCStatus: codes.FailedPrecondition, }, - "a DoNotLogError of a tooManyClustersError gets translated into an HTTP 400": { + "a DoNotLogError of a tooManyClustersError gets translated into gRPC codes.FailedPrecondition and HTTP 400 statuses": { err: middleware.DoNotLogError{Err: tooManyClustersErr}, expectedHTTPStatus: http.StatusBadRequest, - expectedGRPCStatus: codes.InvalidArgument, - expectedErrorMsg: tooManyClustersErr.Error(), + expectedGRPCStatus: codes.FailedPrecondition, }, - "a validationError gets translated into an HTTP 400": { + "a validationError gets translated into gRPC codes.InvalidArgument and HTTP 400 statuses": { err: newValidationError(originalErr), expectedHTTPStatus: http.StatusBadRequest, expectedGRPCStatus: codes.InvalidArgument, - expectedErrorMsg: originalMsg, }, - "a DoNotLogError of a validationError gets translated into an HTTP 400": { + "a DoNotLogError of a validationError gets translated into gRPC codes.InvalidArgument and HTTP 400 statuses": { err: middleware.DoNotLogError{Err: newValidationError(originalErr)}, expectedHTTPStatus: http.StatusBadRequest, expectedGRPCStatus: codes.InvalidArgument, - expectedErrorMsg: originalMsg, }, - "an ingestionRateLimitedError gets translated into an HTTP 429": { + "an ingestionRateLimitedError gets translated into gRPC codes.ResourceExhausted and HTTP 429 statuses": { err: ingestionRateLimitedErr, expectedHTTPStatus: http.StatusTooManyRequests, expectedGRPCStatus: codes.ResourceExhausted, - expectedErrorMsg: ingestionRateLimitedErr.Error(), }, - "a DoNotLogError of an ingestionRateLimitedError gets translated into an HTTP 429": { + "a DoNotLogError of an ingestionRateLimitedError gets translated into gRPC codes.ResourceExhausted and HTTP 429 statuses": { err: middleware.DoNotLogError{Err: ingestionRateLimitedErr}, expectedHTTPStatus: http.StatusTooManyRequests, expectedGRPCStatus: codes.ResourceExhausted, - expectedErrorMsg: ingestionRateLimitedErr.Error(), }, - "an ingesterPushError with BAD_DATA cause gets translated into an HTTP 400": { + "an ingesterPushError with BAD_DATA cause gets translated into gRPC codes.InvalidArgument and HTTP 400 statuses": { err: newIngesterPushError(createStatusWithDetails(t, codes.Internal, originalMsg, mimirpb.BAD_DATA), ingesterID), expectedHTTPStatus: http.StatusBadRequest, expectedGRPCStatus: codes.InvalidArgument, - expectedErrorMsg: fmt.Sprintf("%s %s: %s", failedPushingToIngesterMessage, ingesterID, originalMsg), }, - "a DoNotLogError of an ingesterPushError with BAD_DATA cause gets translated into an HTTP 400": { + "a DoNotLogError of an ingesterPushError with BAD_DATA cause gets translated into gRPC codes.InvalidArgument and HTTP 400 statuses": { err: middleware.DoNotLogError{Err: newIngesterPushError(createStatusWithDetails(t, codes.FailedPrecondition, originalMsg, mimirpb.BAD_DATA), ingesterID)}, expectedHTTPStatus: http.StatusBadRequest, expectedGRPCStatus: codes.InvalidArgument, - expectedErrorMsg: fmt.Sprintf("%s %s: %s", failedPushingToIngesterMessage, ingesterID, originalMsg), }, - "an ingesterPushError with METHOD_NOT_ALLOWED cause gets translated into an HTTP 501": { + "an ingesterPushError with METHOD_NOT_ALLOWED cause gets translated into gRPC codes.Unimplemented and HTTP 503 statuses": { err: newIngesterPushError(createStatusWithDetails(t, codes.Unimplemented, originalMsg, mimirpb.METHOD_NOT_ALLOWED), ingesterID), - expectedHTTPStatus: http.StatusNotImplemented, + expectedHTTPStatus: http.StatusServiceUnavailable, expectedGRPCStatus: codes.Unimplemented, - expectedErrorMsg: fmt.Sprintf("%s %s: %s", failedPushingToIngesterMessage, ingesterID, originalMsg), }, - "a DoNotLogError of an ingesterPushError with METHOD_NOT_ALLOWED cause gets translated into an HTTP 501": { + "a DoNotLogError of an ingesterPushError with METHOD_NOT_ALLOWED cause gets translated into gRPC codes.Unimplemented and HTTP 503 statuses": { err: middleware.DoNotLogError{Err: newIngesterPushError(createStatusWithDetails(t, codes.Unimplemented, originalMsg, mimirpb.METHOD_NOT_ALLOWED), ingesterID)}, - expectedHTTPStatus: http.StatusNotImplemented, + expectedHTTPStatus: http.StatusServiceUnavailable, expectedGRPCStatus: codes.Unimplemented, - expectedErrorMsg: fmt.Sprintf("%s %s: %s", failedPushingToIngesterMessage, ingesterID, originalMsg), }, - "an ingesterPushError with TSDB_UNAVAILABLE cause gets translated into an HTTP 503": { + "an ingesterPushError with TSDB_UNAVAILABLE cause gets translated into gRPC codes.Internal and HTTP 503 statuses": { err: newIngesterPushError(createStatusWithDetails(t, codes.Internal, originalMsg, mimirpb.TSDB_UNAVAILABLE), ingesterID), expectedHTTPStatus: http.StatusServiceUnavailable, - expectedGRPCStatus: codes.Unavailable, - expectedErrorMsg: fmt.Sprintf("%s %s: %s", failedPushingToIngesterMessage, ingesterID, originalMsg), + expectedGRPCStatus: codes.Internal, }, - "a DoNotLogError of an ingesterPushError with TSDB_UNAVAILABLE cause gets translated into an HTTP 503": { + "a DoNotLogError of an ingesterPushError with TSDB_UNAVAILABLE cause gets translated into gRPC codes.Internal and HTTP 503 statuses": { err: middleware.DoNotLogError{Err: newIngesterPushError(createStatusWithDetails(t, codes.Internal, originalMsg, mimirpb.TSDB_UNAVAILABLE), ingesterID)}, expectedHTTPStatus: http.StatusServiceUnavailable, - expectedGRPCStatus: codes.Unavailable, - expectedErrorMsg: fmt.Sprintf("%s %s: %s", failedPushingToIngesterMessage, ingesterID, originalMsg), + expectedGRPCStatus: codes.Internal, }, - "an ingesterPushError with SERVICE_UNAVAILABLE cause gets translated into an HTTP 503": { + "an ingesterPushError with SERVICE_UNAVAILABLE cause gets translated into gRPC codes.Internal and HTTP 503 statuses": { err: newIngesterPushError(createStatusWithDetails(t, codes.Unavailable, originalMsg, mimirpb.SERVICE_UNAVAILABLE), ingesterID), expectedHTTPStatus: http.StatusServiceUnavailable, expectedGRPCStatus: codes.Internal, - expectedErrorMsg: fmt.Sprintf("%s %s: %s", failedPushingToIngesterMessage, ingesterID, originalMsg), }, - "a DoNotLogError of an ingesterPushError with SERVICE_UNAVAILABLE cause gets translated into an HTTP 503": { + "a DoNotLogError of an ingesterPushError with SERVICE_UNAVAILABLE cause gets translated gRPC codes.Internal and HTTP 503 statuses": { err: middleware.DoNotLogError{Err: newIngesterPushError(createStatusWithDetails(t, codes.Unavailable, originalMsg, mimirpb.SERVICE_UNAVAILABLE), ingesterID)}, expectedHTTPStatus: http.StatusServiceUnavailable, expectedGRPCStatus: codes.Internal, - expectedErrorMsg: fmt.Sprintf("%s %s: %s", failedPushingToIngesterMessage, ingesterID, originalMsg), }, - "an ingesterPushError with INSTANCE_LIMIT cause gets translated into an HTTP 503": { + "an ingesterPushError with INSTANCE_LIMIT cause gets translated into gRPC codes.Internal and HTTP 503 statuses": { err: newIngesterPushError(createStatusWithDetails(t, codes.Unavailable, originalMsg, mimirpb.INSTANCE_LIMIT), ingesterID), expectedHTTPStatus: http.StatusServiceUnavailable, expectedGRPCStatus: codes.Internal, - expectedErrorMsg: fmt.Sprintf("%s %s: %s", failedPushingToIngesterMessage, ingesterID, originalMsg), }, - "a DoNotLogError of an ingesterPushError with INSTANCE_LIMIT cause gets translated into an HTTP 503": { + "a DoNotLogError of an ingesterPushError with INSTANCE_LIMIT cause gets translated into gRPC codes.Internal and HTTP 503 statuses": { err: middleware.DoNotLogError{Err: newIngesterPushError(createStatusWithDetails(t, codes.Unavailable, originalMsg, mimirpb.INSTANCE_LIMIT), ingesterID)}, expectedHTTPStatus: http.StatusServiceUnavailable, expectedGRPCStatus: codes.Internal, - expectedErrorMsg: fmt.Sprintf("%s %s: %s", failedPushingToIngesterMessage, ingesterID, originalMsg), }, - "an ingesterPushError with UNKNOWN_CAUSE cause gets translated into an HTTP 503": { + "an ingesterPushError with UNKNOWN_CAUSE cause gets translated into gRPC codes.Internal and HTTP 503 statuses": { err: newIngesterPushError(createStatusWithDetails(t, codes.Internal, originalMsg, mimirpb.UNKNOWN_CAUSE), ingesterID), expectedHTTPStatus: http.StatusServiceUnavailable, expectedGRPCStatus: codes.Internal, - expectedErrorMsg: fmt.Sprintf("%s %s: %s", failedPushingToIngesterMessage, ingesterID, originalMsg), }, - "a DoNotLogError of an ingesterPushError with UNKNOWN_CAUSE cause gets translated into an HTTP 503": { + "a DoNotLogError of an ingesterPushError with UNKNOWN_CAUSE cause gets translated into gRPC codes.Internal and HTTP 503 statuses": { err: middleware.DoNotLogError{Err: newIngesterPushError(createStatusWithDetails(t, codes.Internal, originalMsg, mimirpb.UNKNOWN_CAUSE), ingesterID)}, expectedHTTPStatus: http.StatusServiceUnavailable, expectedGRPCStatus: codes.Internal, - expectedErrorMsg: fmt.Sprintf("%s %s: %s", failedPushingToIngesterMessage, ingesterID, originalMsg), }, - "an ingesterPushError obtained from a DeadlineExceeded coming from the ingester gets translated into an HTTP 503": { + "an ingesterPushError obtained from a DeadlineExceeded coming from the ingester gets translated into gRPC codes.Internal and HTTP 503 statuses": { err: newIngesterPushError(createStatusWithDetails(t, codes.Internal, context.DeadlineExceeded.Error(), mimirpb.UNKNOWN_CAUSE), ingesterID), expectedHTTPStatus: http.StatusServiceUnavailable, expectedGRPCStatus: codes.Internal, - expectedErrorMsg: fmt.Sprintf("%s %s: %s", failedPushingToIngesterMessage, ingesterID, context.DeadlineExceeded), }, - "a circuitBreakerOpenError gets translated into an HTTP 503": { + "a circuitBreakerOpenError gets translated into gRPC codes.Unavailable and HTTP 503 statuses": { err: newCircuitBreakerOpenError(client.ErrCircuitBreakerOpen{}), expectedHTTPStatus: http.StatusServiceUnavailable, expectedGRPCStatus: codes.Unavailable, - expectedErrorMsg: circuitbreaker.ErrOpen.Error(), }, - "a wrapped circuitBreakerOpenError gets translated into an HTTP 503": { + "a wrapped circuitBreakerOpenError gets translated into gRPC codes.Unavailable and HTTP 503 statuses": { err: errors.Wrap(newCircuitBreakerOpenError(client.ErrCircuitBreakerOpen{}), fmt.Sprintf("%s %s", failedPushingToIngesterMessage, ingesterID)), expectedHTTPStatus: http.StatusServiceUnavailable, expectedGRPCStatus: codes.Unavailable, - expectedErrorMsg: fmt.Sprintf("%s %s: %s", failedPushingToIngesterMessage, ingesterID, circuitbreaker.ErrOpen), }, } for name, tc := range testCases { t.Run(name, func(t *testing.T) { gStatus, status := toOtlpGRPCHTTPStatus(tc.err) - msg := tc.err.Error() assert.Equal(t, tc.expectedHTTPStatus, status) assert.Equal(t, tc.expectedGRPCStatus, gStatus) - assert.Equal(t, tc.expectedErrorMsg, msg) + }) + } +} + +func TestHttpRetryableToOTLPRetryable(t *testing.T) { + testCases := map[string]struct { + httpStatusCode int + expectedOtlpHTTPStatusCode int + }{ + "HTTP status codes 2xx gets translated into themselves": { + httpStatusCode: http.StatusAccepted, + expectedOtlpHTTPStatusCode: http.StatusAccepted, + }, + "HTTP status code 400 gets translated into itself": { + httpStatusCode: http.StatusBadRequest, + expectedOtlpHTTPStatusCode: http.StatusBadRequest, + }, + "HTTP status code 429 gets translated into itself": { + httpStatusCode: http.StatusTooManyRequests, + expectedOtlpHTTPStatusCode: http.StatusTooManyRequests, + }, + "HTTP status code 500 gets translated into 503": { + httpStatusCode: http.StatusInternalServerError, + expectedOtlpHTTPStatusCode: http.StatusServiceUnavailable, + }, + "HTTP status code 501 gets translated into 503": { + httpStatusCode: http.StatusNotImplemented, + expectedOtlpHTTPStatusCode: http.StatusServiceUnavailable, + }, + "HTTP status code 502 gets translated into itself": { + httpStatusCode: http.StatusBadGateway, + expectedOtlpHTTPStatusCode: http.StatusBadGateway, + }, + "HTTP status code 503 gets translated into itself": { + httpStatusCode: http.StatusServiceUnavailable, + expectedOtlpHTTPStatusCode: http.StatusServiceUnavailable, + }, + "HTTP status code 504 gets translated into itself": { + httpStatusCode: http.StatusGatewayTimeout, + expectedOtlpHTTPStatusCode: http.StatusGatewayTimeout, + }, + "HTTP status code 507 gets translated into 503": { + httpStatusCode: http.StatusInsufficientStorage, + expectedOtlpHTTPStatusCode: http.StatusServiceUnavailable, + }, + } + for testName, testCase := range testCases { + t.Run(testName, func(t *testing.T) { + otlpHTTPStatusCode := httpRetryableToOTLPRetryable(testCase.httpStatusCode) + require.Equal(t, testCase.expectedOtlpHTTPStatusCode, otlpHTTPStatusCode) }) } } diff --git a/pkg/distributor/push.go b/pkg/distributor/push.go index 4955b7b8f06..d38104573de 100644 --- a/pkg/distributor/push.go +++ b/pkg/distributor/push.go @@ -167,6 +167,9 @@ func handler( msg string ) if resp, ok := httpgrpc.HTTPResponseFromError(err); ok { + // TODO: This code is needed for backwards compatibility, + // and can be removed once -ingester.return-only-grpc-errors + // is removed. code, msg = int(resp.Code), string(resp.Body) } else { code = toHTTPStatus(ctx, err, limits) @@ -207,8 +210,6 @@ func calculateRetryAfter(retryAttemptHeader string, baseSeconds int, maxBackoffE // toHTTPStatus converts the given error into an appropriate HTTP status corresponding // to that error, if the error is one of the errors from this package. Otherwise, an // http.StatusInternalServerError is returned. -// to that error, if the error is one of the errors from this package. Otherwise, codes.Internal and -// http.StatusInternalServerError is returned. func toHTTPStatus(ctx context.Context, pushErr error, limits *validation.Overrides) int { if errors.Is(pushErr, context.DeadlineExceeded) { return http.StatusInternalServerError @@ -216,34 +217,12 @@ func toHTTPStatus(ctx context.Context, pushErr error, limits *validation.Overrid var distributorErr Error if errors.As(pushErr, &distributorErr) { - switch distributorErr.Cause() { - case mimirpb.BAD_DATA: - return http.StatusBadRequest - case mimirpb.INGESTION_RATE_LIMITED, mimirpb.REQUEST_RATE_LIMITED: - serviceOverloadErrorEnabled := false - userID, err := tenant.TenantID(ctx) - if err == nil { - serviceOverloadErrorEnabled = limits.ServiceOverloadStatusCodeOnRateLimitEnabled(userID) - } - // Return a 429 or a 529 here depending on configuration to tell the client it is going too fast. - // Client may discard the data or slow down and re-send. - // Prometheus v2.26 added a remote-write option 'retry_on_http_429'. - if serviceOverloadErrorEnabled { - return StatusServiceOverloaded - } - return http.StatusTooManyRequests - case mimirpb.REPLICAS_DID_NOT_MATCH: - return http.StatusAccepted - case mimirpb.TOO_MANY_CLUSTERS: - return http.StatusBadRequest - case mimirpb.TSDB_UNAVAILABLE: - return http.StatusServiceUnavailable - case mimirpb.CIRCUIT_BREAKER_OPEN: - return http.StatusServiceUnavailable - case mimirpb.METHOD_NOT_ALLOWED: - // Return a 501 (and not 405) to explicitly signal a misconfiguration and to possibly track that amongst other 5xx errors. - return http.StatusNotImplemented + serviceOverloadErrorEnabled := false + userID, err := tenant.TenantID(ctx) + if err == nil { + serviceOverloadErrorEnabled = limits.ServiceOverloadStatusCodeOnRateLimitEnabled(userID) } + return errorCauseToHTTPStatusCode(distributorErr.Cause(), serviceOverloadErrorEnabled) } return http.StatusInternalServerError diff --git a/pkg/distributor/push_test.go b/pkg/distributor/push_test.go index 056fb1359f3..8763b26fb42 100644 --- a/pkg/distributor/push_test.go +++ b/pkg/distributor/push_test.go @@ -16,7 +16,6 @@ import ( "testing" "time" - "github.com/failsafe-go/failsafe-go/circuitbreaker" "github.com/go-kit/log" "github.com/golang/snappy" "github.com/grafana/dskit/concurrency" @@ -670,168 +669,136 @@ func TestHandler_toHTTPStatus(t *testing.T) { err error serviceOverloadErrorEnabled bool expectedHTTPStatus int - expectedErrorMsg string } testCases := map[string]testStruct{ "a generic error gets translated into a HTTP 500": { err: originalErr, expectedHTTPStatus: http.StatusInternalServerError, - expectedErrorMsg: originalMsg, }, "a DoNotLog of a generic error gets translated into a HTTP 500": { err: middleware.DoNotLogError{Err: originalErr}, expectedHTTPStatus: http.StatusInternalServerError, - expectedErrorMsg: originalMsg, }, "a context.DeadlineExceeded gets translated into a HTTP 500": { err: context.DeadlineExceeded, expectedHTTPStatus: http.StatusInternalServerError, - expectedErrorMsg: context.DeadlineExceeded.Error(), }, "a replicasDidNotMatchError gets translated into an HTTP 202": { err: replicasNotMatchErr, expectedHTTPStatus: http.StatusAccepted, - expectedErrorMsg: replicasNotMatchErr.Error(), }, "a DoNotLogError of a replicasDidNotMatchError gets translated into an HTTP 202": { err: middleware.DoNotLogError{Err: replicasNotMatchErr}, expectedHTTPStatus: http.StatusAccepted, - expectedErrorMsg: replicasNotMatchErr.Error(), }, "a tooManyClustersError gets translated into an HTTP 400": { err: tooManyClustersErr, expectedHTTPStatus: http.StatusBadRequest, - expectedErrorMsg: tooManyClustersErr.Error(), }, "a DoNotLogError of a tooManyClustersError gets translated into an HTTP 400": { err: middleware.DoNotLogError{Err: tooManyClustersErr}, expectedHTTPStatus: http.StatusBadRequest, - expectedErrorMsg: tooManyClustersErr.Error(), }, "a validationError gets translated into an HTTP 400": { err: newValidationError(originalErr), expectedHTTPStatus: http.StatusBadRequest, - expectedErrorMsg: originalMsg, }, "a DoNotLogError of a validationError gets translated into an HTTP 400": { err: middleware.DoNotLogError{Err: newValidationError(originalErr)}, expectedHTTPStatus: http.StatusBadRequest, - expectedErrorMsg: originalMsg, }, "an ingestionRateLimitedError gets translated into an HTTP 429": { err: ingestionRateLimitedErr, expectedHTTPStatus: http.StatusTooManyRequests, - expectedErrorMsg: ingestionRateLimitedErr.Error(), }, "an ingestionRateLimitedError with serviceOverloadErrorEnabled gets translated into an HTTP 529": { err: ingestionRateLimitedErr, serviceOverloadErrorEnabled: true, expectedHTTPStatus: StatusServiceOverloaded, - expectedErrorMsg: ingestionRateLimitedErr.Error(), }, "a DoNotLogError of an ingestionRateLimitedError gets translated into an HTTP 429": { err: middleware.DoNotLogError{Err: ingestionRateLimitedErr}, expectedHTTPStatus: http.StatusTooManyRequests, - expectedErrorMsg: ingestionRateLimitedErr.Error(), }, "a requestRateLimitedError with serviceOverloadErrorEnabled gets translated into an HTTP 529": { err: requestRateLimitedErr, serviceOverloadErrorEnabled: true, expectedHTTPStatus: StatusServiceOverloaded, - expectedErrorMsg: requestRateLimitedErr.Error(), }, "a DoNotLogError of a requestRateLimitedError with serviceOverloadErrorEnabled gets translated into an HTTP 529": { err: middleware.DoNotLogError{Err: requestRateLimitedErr}, serviceOverloadErrorEnabled: true, expectedHTTPStatus: StatusServiceOverloaded, - expectedErrorMsg: requestRateLimitedErr.Error(), }, "a requestRateLimitedError without serviceOverloadErrorEnabled gets translated into an HTTP 429": { err: requestRateLimitedErr, serviceOverloadErrorEnabled: false, expectedHTTPStatus: http.StatusTooManyRequests, - expectedErrorMsg: requestRateLimitedErr.Error(), }, "a DoNotLogError of a requestRateLimitedError without serviceOverloadErrorEnabled gets translated into an HTTP 429": { err: middleware.DoNotLogError{Err: requestRateLimitedErr}, serviceOverloadErrorEnabled: false, expectedHTTPStatus: http.StatusTooManyRequests, - expectedErrorMsg: requestRateLimitedErr.Error(), }, "an ingesterPushError with BAD_DATA cause gets translated into an HTTP 400": { err: newIngesterPushError(createStatusWithDetails(t, codes.Internal, originalMsg, mimirpb.BAD_DATA), ingesterID), expectedHTTPStatus: http.StatusBadRequest, - expectedErrorMsg: fmt.Sprintf("%s %s: %s", failedPushingToIngesterMessage, ingesterID, originalMsg), }, "a DoNotLogError of an ingesterPushError with BAD_DATA cause gets translated into an HTTP 400": { err: middleware.DoNotLogError{Err: newIngesterPushError(createStatusWithDetails(t, codes.FailedPrecondition, originalMsg, mimirpb.BAD_DATA), ingesterID)}, expectedHTTPStatus: http.StatusBadRequest, - expectedErrorMsg: fmt.Sprintf("%s %s: %s", failedPushingToIngesterMessage, ingesterID, originalMsg), }, "an ingesterPushError with METHOD_NOT_ALLOWED cause gets translated into an HTTP 501": { err: newIngesterPushError(createStatusWithDetails(t, codes.Unimplemented, originalMsg, mimirpb.METHOD_NOT_ALLOWED), ingesterID), expectedHTTPStatus: http.StatusNotImplemented, - expectedErrorMsg: fmt.Sprintf("%s %s: %s", failedPushingToIngesterMessage, ingesterID, originalMsg), }, "a DoNotLogError of an ingesterPushError with METHOD_NOT_ALLOWED cause gets translated into an HTTP 501": { err: middleware.DoNotLogError{Err: newIngesterPushError(createStatusWithDetails(t, codes.Unimplemented, originalMsg, mimirpb.METHOD_NOT_ALLOWED), ingesterID)}, expectedHTTPStatus: http.StatusNotImplemented, - expectedErrorMsg: fmt.Sprintf("%s %s: %s", failedPushingToIngesterMessage, ingesterID, originalMsg), }, "an ingesterPushError with TSDB_UNAVAILABLE cause gets translated into an HTTP 503": { err: newIngesterPushError(createStatusWithDetails(t, codes.Internal, originalMsg, mimirpb.TSDB_UNAVAILABLE), ingesterID), expectedHTTPStatus: http.StatusServiceUnavailable, - expectedErrorMsg: fmt.Sprintf("%s %s: %s", failedPushingToIngesterMessage, ingesterID, originalMsg), }, "a DoNotLogError of an ingesterPushError with TSDB_UNAVAILABLE cause gets translated into an HTTP 503": { err: middleware.DoNotLogError{Err: newIngesterPushError(createStatusWithDetails(t, codes.Internal, originalMsg, mimirpb.TSDB_UNAVAILABLE), ingesterID)}, expectedHTTPStatus: http.StatusServiceUnavailable, - expectedErrorMsg: fmt.Sprintf("%s %s: %s", failedPushingToIngesterMessage, ingesterID, originalMsg), }, "an ingesterPushError with SERVICE_UNAVAILABLE cause gets translated into an HTTP 500": { err: newIngesterPushError(createStatusWithDetails(t, codes.Unavailable, originalMsg, mimirpb.SERVICE_UNAVAILABLE), ingesterID), expectedHTTPStatus: http.StatusInternalServerError, - expectedErrorMsg: fmt.Sprintf("%s %s: %s", failedPushingToIngesterMessage, ingesterID, originalMsg), }, "a DoNotLogError of an ingesterPushError with SERVICE_UNAVAILABLE cause gets translated into an HTTP 500": { err: middleware.DoNotLogError{Err: newIngesterPushError(createStatusWithDetails(t, codes.Unavailable, originalMsg, mimirpb.SERVICE_UNAVAILABLE), ingesterID)}, expectedHTTPStatus: http.StatusInternalServerError, - expectedErrorMsg: fmt.Sprintf("%s %s: %s", failedPushingToIngesterMessage, ingesterID, originalMsg), }, "an ingesterPushError with INSTANCE_LIMIT cause gets translated into an HTTP 500": { err: newIngesterPushError(createStatusWithDetails(t, codes.Unavailable, originalMsg, mimirpb.INSTANCE_LIMIT), ingesterID), expectedHTTPStatus: http.StatusInternalServerError, - expectedErrorMsg: fmt.Sprintf("%s %s: %s", failedPushingToIngesterMessage, ingesterID, originalMsg), }, "a DoNotLogError of an ingesterPushError with INSTANCE_LIMIT cause gets translated into an HTTP 500": { err: middleware.DoNotLogError{Err: newIngesterPushError(createStatusWithDetails(t, codes.Unavailable, originalMsg, mimirpb.INSTANCE_LIMIT), ingesterID)}, expectedHTTPStatus: http.StatusInternalServerError, - expectedErrorMsg: fmt.Sprintf("%s %s: %s", failedPushingToIngesterMessage, ingesterID, originalMsg), }, "an ingesterPushError with UNKNOWN_CAUSE cause gets translated into an HTTP 500": { err: newIngesterPushError(createStatusWithDetails(t, codes.Internal, originalMsg, mimirpb.UNKNOWN_CAUSE), ingesterID), expectedHTTPStatus: http.StatusInternalServerError, - expectedErrorMsg: fmt.Sprintf("%s %s: %s", failedPushingToIngesterMessage, ingesterID, originalMsg), }, "a DoNotLogError of an ingesterPushError with UNKNOWN_CAUSE cause gets translated into an HTTP 500": { err: middleware.DoNotLogError{Err: newIngesterPushError(createStatusWithDetails(t, codes.Internal, originalMsg, mimirpb.UNKNOWN_CAUSE), ingesterID)}, expectedHTTPStatus: http.StatusInternalServerError, - expectedErrorMsg: fmt.Sprintf("%s %s: %s", failedPushingToIngesterMessage, ingesterID, originalMsg), }, "an ingesterPushError obtained from a DeadlineExceeded coming from the ingester gets translated into an HTTP 500": { err: newIngesterPushError(createStatusWithDetails(t, codes.Internal, context.DeadlineExceeded.Error(), mimirpb.UNKNOWN_CAUSE), ingesterID), expectedHTTPStatus: http.StatusInternalServerError, - expectedErrorMsg: fmt.Sprintf("%s %s: %s", failedPushingToIngesterMessage, ingesterID, context.DeadlineExceeded), }, "a circuitBreakerOpenError gets translated into an HTTP 503": { err: newCircuitBreakerOpenError(client.ErrCircuitBreakerOpen{}), expectedHTTPStatus: http.StatusServiceUnavailable, - expectedErrorMsg: circuitbreaker.ErrOpen.Error(), }, "a wrapped circuitBreakerOpenError gets translated into an HTTP 503": { err: errors.Wrap(newCircuitBreakerOpenError(client.ErrCircuitBreakerOpen{}), fmt.Sprintf("%s %s", failedPushingToIngesterMessage, ingesterID)), expectedHTTPStatus: http.StatusServiceUnavailable, - expectedErrorMsg: fmt.Sprintf("%s %s: %s", failedPushingToIngesterMessage, ingesterID, circuitbreaker.ErrOpen), }, } for name, tc := range testCases { @@ -850,9 +817,7 @@ func TestHandler_toHTTPStatus(t *testing.T) { require.NoError(t, err) status := toHTTPStatus(ctx, tc.err, limits) - msg := tc.err.Error() assert.Equal(t, tc.expectedHTTPStatus, status) - assert.Equal(t, tc.expectedErrorMsg, msg) }) } }