From 73ee36b4438b5ecded39100481b0b11b9fba3949 Mon Sep 17 00:00:00 2001 From: Yuri Nikolic Date: Wed, 12 Jun 2024 08:13:23 +0200 Subject: [PATCH] Fixing review findings Signed-off-by: Yuri Nikolic --- pkg/distributor/errors.go | 32 ++++++- pkg/distributor/errors_test.go | 164 +++++++++++++++++++++++++++++++-- pkg/distributor/otel.go | 45 +++++---- pkg/distributor/otel_test.go | 50 ++++++++++ pkg/distributor/push.go | 39 ++------ 5 files changed, 267 insertions(+), 63 deletions(-) diff --git a/pkg/distributor/errors.go b/pkg/distributor/errors.go index ae5689582af..5c094cea5a9 100644 --- a/pkg/distributor/errors.go +++ b/pkg/distributor/errors.go @@ -5,6 +5,7 @@ package distributor import ( "context" "fmt" + "net/http" "time" "github.com/gogo/status" @@ -262,12 +263,12 @@ func toErrorWithGRPCStatus(pushErr error, serviceOverloadErrorEnabled bool) erro ) if errors.As(pushErr, &distributorErr) { errDetails = &mimirpb.ErrorDetails{Cause: distributorErr.Cause()} - errCode = toGRPCStatusCode(distributorErr.Cause(), serviceOverloadErrorEnabled) + errCode = errorCauseToGRPCStatusCode(distributorErr.Cause(), serviceOverloadErrorEnabled) } return globalerror.WrapErrorWithGRPCStatus(pushErr, errCode, errDetails).Err() } -func toGRPCStatusCode(errCause mimirpb.ErrorCause, serviceOverloadErrorEnabled bool) codes.Code { +func errorCauseToGRPCStatusCode(errCause mimirpb.ErrorCause, serviceOverloadErrorEnabled bool) codes.Code { switch errCause { case mimirpb.BAD_DATA: return codes.InvalidArgument @@ -288,6 +289,33 @@ func toGRPCStatusCode(errCause mimirpb.ErrorCause, serviceOverloadErrorEnabled b return codes.Internal } +func errorCauseToHTTPStatusCode(errCause mimirpb.ErrorCause, serviceOverloadErrorEnabled bool) int { + switch errCause { + case mimirpb.BAD_DATA: + return http.StatusBadRequest + case mimirpb.INGESTION_RATE_LIMITED, mimirpb.REQUEST_RATE_LIMITED: + // Return a 429 or a 529 here depending on configuration to tell the client it is going too fast. + // Client may discard the data or slow down and re-send. + // Prometheus v2.26 added a remote-write option 'retry_on_http_429'. + if serviceOverloadErrorEnabled { + return StatusServiceOverloaded + } + return http.StatusTooManyRequests + case mimirpb.REPLICAS_DID_NOT_MATCH: + return http.StatusAccepted + case mimirpb.TOO_MANY_CLUSTERS: + return http.StatusBadRequest + case mimirpb.TSDB_UNAVAILABLE: + return http.StatusServiceUnavailable + case mimirpb.CIRCUIT_BREAKER_OPEN: + return http.StatusServiceUnavailable + case mimirpb.METHOD_NOT_ALLOWED: + // Return a 501 (and not 405) to explicitly signal a misconfiguration and to possibly track that amongst other 5xx errors. + return http.StatusNotImplemented + } + return http.StatusInternalServerError +} + func wrapIngesterPushError(err error, ingesterID string) error { if err == nil { return nil diff --git a/pkg/distributor/errors_test.go b/pkg/distributor/errors_test.go index bca7a413310..bd87034d966 100644 --- a/pkg/distributor/errors_test.go +++ b/pkg/distributor/errors_test.go @@ -253,39 +253,39 @@ func TestToErrorWithGRPCStatus(t *testing.T) { expectedErrorMsg: tooManyClustersErr.Error(), expectedErrorDetails: &mimirpb.ErrorDetails{Cause: mimirpb.TOO_MANY_CLUSTERS}, }, - "a validationError gets translated into gets translated into an InvalidArgument error with BAD_DATA cause": { + "a validationError gets translated into an InvalidArgument error with BAD_DATA cause": { err: newValidationError(originalErr), expectedGRPCCode: codes.InvalidArgument, expectedErrorMsg: originalMsg, expectedErrorDetails: &mimirpb.ErrorDetails{Cause: mimirpb.BAD_DATA}, }, - "a DoNotLogError of a validationError gets translated into gets translated into an InvalidArgument error with BAD_DATA cause": { + "a DoNotLogError of a validationError gets translated into an InvalidArgument error with BAD_DATA cause": { err: middleware.DoNotLogError{Err: newValidationError(originalErr)}, expectedGRPCCode: codes.InvalidArgument, expectedErrorMsg: originalMsg, expectedErrorDetails: &mimirpb.ErrorDetails{Cause: mimirpb.BAD_DATA}, }, - "an ingestionRateLimitedError with serviceOverloadErrorEnabled gets translated into gets translated into an Unavailable error with INGESTION_RATE_LIMITED cause": { + "an ingestionRateLimitedError with serviceOverloadErrorEnabled gets translated into an Unavailable error with INGESTION_RATE_LIMITED cause": { err: ingestionRateLimitedErr, serviceOverloadErrorEnabled: true, expectedGRPCCode: codes.Unavailable, expectedErrorMsg: ingestionRateLimitedErr.Error(), expectedErrorDetails: &mimirpb.ErrorDetails{Cause: mimirpb.INGESTION_RATE_LIMITED}, }, - "a DoNotLogError of an ingestionRateLimitedError with serviceOverloadErrorEnabled gets translated into gets translated into an Unavailable error with INGESTION_RATE_LIMITED cause": { + "a DoNotLogError of an ingestionRateLimitedError with serviceOverloadErrorEnabled gets translated into an Unavailable error with INGESTION_RATE_LIMITED cause": { err: middleware.DoNotLogError{Err: ingestionRateLimitedErr}, serviceOverloadErrorEnabled: true, expectedGRPCCode: codes.Unavailable, expectedErrorMsg: ingestionRateLimitedErr.Error(), expectedErrorDetails: &mimirpb.ErrorDetails{Cause: mimirpb.INGESTION_RATE_LIMITED}, }, - "an ingestionRateLimitedError without serviceOverloadErrorEnabled gets translated into gets translated into a ResourceExhausted error with INGESTION_RATE_LIMITED cause": { + "an ingestionRateLimitedError without serviceOverloadErrorEnabled gets translated into a ResourceExhausted error with INGESTION_RATE_LIMITED cause": { err: ingestionRateLimitedErr, expectedGRPCCode: codes.ResourceExhausted, expectedErrorMsg: ingestionRateLimitedErr.Error(), expectedErrorDetails: &mimirpb.ErrorDetails{Cause: mimirpb.INGESTION_RATE_LIMITED}, }, - "a DoNotLogError of an ingestionRateLimitedError without serviceOverloadErrorEnabled gets translated into gets translated into a ResourceExhausted error with INGESTION_RATE_LIMITED cause": { + "a DoNotLogError of an ingestionRateLimitedError without serviceOverloadErrorEnabled gets translated into a ResourceExhausted error with INGESTION_RATE_LIMITED cause": { err: middleware.DoNotLogError{Err: ingestionRateLimitedErr}, expectedGRPCCode: codes.ResourceExhausted, expectedErrorMsg: ingestionRateLimitedErr.Error(), @@ -604,6 +604,158 @@ func TestIsIngestionClientError(t *testing.T) { } } +func TestErrorCauseToGRPCStatusCode(t *testing.T) { + type testStruct struct { + errorCause mimirpb.ErrorCause + serviceOverloadErrorEnabled bool + expectedGRPCStatusCode codes.Code + } + testCases := map[string]testStruct{ + "an REPLICAS_DID_NOT_MATCH error cause gets translated into an AlreadyExist": { + errorCause: mimirpb.REPLICAS_DID_NOT_MATCH, + expectedGRPCStatusCode: codes.AlreadyExists, + }, + "an TOO_MANY_CLUSTERS error cause gets translated into a FailedPrecondition": { + errorCause: mimirpb.TOO_MANY_CLUSTERS, + expectedGRPCStatusCode: codes.FailedPrecondition, + }, + "an BAD_DATA error cause gets translated into a InvalidArgument": { + errorCause: mimirpb.BAD_DATA, + expectedGRPCStatusCode: codes.InvalidArgument, + }, + "an INGESTION_RATE_LIMITED error cause without serviceOverloadErrorEnabled gets translated into a ResourceExhausted": { + errorCause: mimirpb.INGESTION_RATE_LIMITED, + serviceOverloadErrorEnabled: false, + expectedGRPCStatusCode: codes.ResourceExhausted, + }, + "an INGESTION_RATE_LIMITED error cause with serviceOverloadErrorEnabled gets translated into an Unavailable": { + errorCause: mimirpb.INGESTION_RATE_LIMITED, + serviceOverloadErrorEnabled: true, + expectedGRPCStatusCode: codes.Unavailable, + }, + "an REQUEST_RATE_LIMITED error cause without serviceOverloadErrorEnabled gets translated into a ResourceExhausted": { + errorCause: mimirpb.REQUEST_RATE_LIMITED, + serviceOverloadErrorEnabled: false, + expectedGRPCStatusCode: codes.ResourceExhausted, + }, + "an REQUEST_RATE_LIMITED error cause with serviceOverloadErrorEnabled gets translated into an Unavailable": { + errorCause: mimirpb.REQUEST_RATE_LIMITED, + serviceOverloadErrorEnabled: true, + expectedGRPCStatusCode: codes.Unavailable, + }, + "an UNKNOWN error cause gets translated into an Internal": { + errorCause: mimirpb.UNKNOWN_CAUSE, + expectedGRPCStatusCode: codes.Internal, + }, + "an INSTANCE_LIMIT error cause gets translated into an Internal": { + errorCause: mimirpb.INSTANCE_LIMIT, + expectedGRPCStatusCode: codes.Internal, + }, + "an SERVICE_UNAVAILABLE error cause gets translated into an Internal": { + errorCause: mimirpb.SERVICE_UNAVAILABLE, + expectedGRPCStatusCode: codes.Internal, + }, + "an TOO_BUSY error cause gets translated into an Internal": { + errorCause: mimirpb.TOO_BUSY, + expectedGRPCStatusCode: codes.Internal, + }, + "an CIRCUIT_BREAKER_OPEN error cause gets translated into an Unavailable": { + errorCause: mimirpb.CIRCUIT_BREAKER_OPEN, + expectedGRPCStatusCode: codes.Unavailable, + }, + "an METHOD_NOT_ALLOWED error cause gets translated into an Unimplemented": { + errorCause: mimirpb.METHOD_NOT_ALLOWED, + expectedGRPCStatusCode: codes.Unimplemented, + }, + "an TSDB_UNAVAILABLE error cause gets translated into an Internal": { + errorCause: mimirpb.TSDB_UNAVAILABLE, + expectedGRPCStatusCode: codes.Internal, + }, + } + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + status := errorCauseToGRPCStatusCode(tc.errorCause, tc.serviceOverloadErrorEnabled) + assert.Equal(t, tc.expectedGRPCStatusCode, status) + }) + } +} + +func TestErrorCauseToHTTPStatusCode(t *testing.T) { + type testStruct struct { + errorCause mimirpb.ErrorCause + serviceOverloadErrorEnabled bool + expectedHTTPStatus int + } + testCases := map[string]testStruct{ + "an REPLICAS_DID_NOT_MATCH error cause gets translated into a HTTP 202": { + errorCause: mimirpb.REPLICAS_DID_NOT_MATCH, + expectedHTTPStatus: http.StatusAccepted, + }, + "an TOO_MANY_CLUSTERS error cause gets translated into a HTTP 400": { + errorCause: mimirpb.TOO_MANY_CLUSTERS, + expectedHTTPStatus: http.StatusBadRequest, + }, + "an BAD_DATA error cause gets translated into a HTTP 400": { + errorCause: mimirpb.BAD_DATA, + expectedHTTPStatus: http.StatusBadRequest, + }, + "an INGESTION_RATE_LIMITED error cause without serviceOverloadErrorEnabled gets translated into a HTTP 429": { + errorCause: mimirpb.INGESTION_RATE_LIMITED, + serviceOverloadErrorEnabled: false, + expectedHTTPStatus: http.StatusTooManyRequests, + }, + "an INGESTION_RATE_LIMITED error cause with serviceOverloadErrorEnabled gets translated into a HTTP 529": { + errorCause: mimirpb.INGESTION_RATE_LIMITED, + serviceOverloadErrorEnabled: true, + expectedHTTPStatus: StatusServiceOverloaded, + }, + "an REQUEST_RATE_LIMITED error cause without serviceOverloadErrorEnabled gets translated into a HTTP 429": { + errorCause: mimirpb.REQUEST_RATE_LIMITED, + serviceOverloadErrorEnabled: false, + expectedHTTPStatus: http.StatusTooManyRequests, + }, + "an REQUEST_RATE_LIMITED error cause with serviceOverloadErrorEnabled gets translated into a HTTP 529": { + errorCause: mimirpb.REQUEST_RATE_LIMITED, + serviceOverloadErrorEnabled: true, + expectedHTTPStatus: StatusServiceOverloaded, + }, + "an UNKNOWN error cause gets translated into a HTTP 500": { + errorCause: mimirpb.UNKNOWN_CAUSE, + expectedHTTPStatus: http.StatusInternalServerError, + }, + "an INSTANCE_LIMIT error cause gets translated into a HTTP 500": { + errorCause: mimirpb.INSTANCE_LIMIT, + expectedHTTPStatus: http.StatusInternalServerError, + }, + "an SERVICE_UNAVAILABLE error cause gets translated into a HTTP 500": { + errorCause: mimirpb.SERVICE_UNAVAILABLE, + expectedHTTPStatus: http.StatusInternalServerError, + }, + "an TOO_BUSY error cause gets translated into a HTTP 500": { + errorCause: mimirpb.TOO_BUSY, + expectedHTTPStatus: http.StatusInternalServerError, + }, + "an CIRCUIT_BREAKER_OPEN error cause gets translated into a HTTP 500": { + errorCause: mimirpb.CIRCUIT_BREAKER_OPEN, + expectedHTTPStatus: http.StatusServiceUnavailable, + }, + "an METHOD_NOT_ALLOWED error cause gets translated into a HTTP 501": { + errorCause: mimirpb.METHOD_NOT_ALLOWED, + expectedHTTPStatus: http.StatusNotImplemented, + }, + "an TSDB_UNAVAILABLE error cause gets translated into a HTTP 503": { + errorCause: mimirpb.TSDB_UNAVAILABLE, + expectedHTTPStatus: http.StatusServiceUnavailable, + }, + } + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + status := errorCauseToHTTPStatusCode(tc.errorCause, tc.serviceOverloadErrorEnabled) + assert.Equal(t, tc.expectedHTTPStatus, status) + }) + } +} + func checkDistributorError(t *testing.T, err error, expectedCause mimirpb.ErrorCause) { var distributorErr Error require.ErrorAs(t, err, &distributorErr) diff --git a/pkg/distributor/otel.go b/pkg/distributor/otel.go index 9ff0a1e2131..725fa198b9d 100644 --- a/pkg/distributor/otel.go +++ b/pkg/distributor/otel.go @@ -253,7 +253,10 @@ func otlpHandler( errorMsg string ) if st, ok := grpcutil.ErrorToStatus(err); ok { - httpCode = int(st.Code()) + // TODO: This code is needed for backwards compatibility, + // and can be removed once -ingester.return-only-grpc-errors + // is removed. + httpCode = httpRetryableToOtlpRetryable(int(st.Code())) grpcCode = st.Code() errorMsg = st.Message() } else { @@ -281,33 +284,27 @@ func toOtlpGRPCHTTPStatus(pushErr error) (codes.Code, int) { return codes.Internal, http.StatusServiceUnavailable } - grpcStatusCode := toGRPCStatusCode(distributorErr.Cause(), false) - otlpHTTPStatusCode := toOtlpHTTPStatus(distributorErr.Cause()) + grpcStatusCode := errorCauseToGRPCStatusCode(distributorErr.Cause(), false) + httpStatusCode := errorCauseToHTTPStatusCode(distributorErr.Cause(), false) + otlpHTTPStatusCode := httpRetryableToOtlpRetryable(httpStatusCode) return grpcStatusCode, otlpHTTPStatusCode } -// toOtlpHTTPStatus maps the given mimirpb.ErrorCause to its OTLP endpoint HTTP status. -// This function is slightly different from toHTTPStatus() due to the OTLP specifications -// (https://opentelemetry.io/docs/specs/otlp/#failures-1): unlike Prometheus, the OTLP -// client only retries on HTTP status codes 429, 502, 503, and 504. -func toOtlpHTTPStatus(errCause mimirpb.ErrorCause) int { - switch errCause { - case mimirpb.BAD_DATA: - return http.StatusBadRequest - case mimirpb.INGESTION_RATE_LIMITED, mimirpb.REQUEST_RATE_LIMITED: - return http.StatusTooManyRequests - case mimirpb.REPLICAS_DID_NOT_MATCH: - return http.StatusAccepted - case mimirpb.TOO_MANY_CLUSTERS: - return http.StatusBadRequest - case mimirpb.TSDB_UNAVAILABLE: - return http.StatusServiceUnavailable - case mimirpb.CIRCUIT_BREAKER_OPEN: - return http.StatusServiceUnavailable - case mimirpb.METHOD_NOT_ALLOWED: - return http.StatusNotImplemented +// httpRetryableToOtlpRetryable maps non-retryable 5xx HTTP status codes according +// to the OTLP specifications (https://opentelemetry.io/docs/specs/otlp/#failures-1) +// to http.StatusServiceUnavailable. In case of a non-retryable HTTP status code, +// httpRetryableToOtlpRetryable returns the HTTP status code itself. +// Unlike Prometheus, which retries 429 and all 5xx HTTP status codes, +// the OTLP client only retries on HTTP status codes 429, 502, 503, and 504. +func httpRetryableToOtlpRetryable(httpStatusCode int) int { + if httpStatusCode/100 == 5 { + mask := httpStatusCode % 100 + // We map all 5xx except 502, 503 and 504 into 503. + if mask <= 1 || mask > 4 { + return http.StatusServiceUnavailable + } } - return http.StatusServiceUnavailable + return httpStatusCode } // writeErrorToHTTPResponseBody converts the given error into a grpc status and marshals it into a byte slice, in order to be written to the response body. diff --git a/pkg/distributor/otel_test.go b/pkg/distributor/otel_test.go index fe1f75bc4eb..e24ef7cf68f 100644 --- a/pkg/distributor/otel_test.go +++ b/pkg/distributor/otel_test.go @@ -729,3 +729,53 @@ func TestHandler_toOtlpGRPCHTTPStatus(t *testing.T) { }) } } + +func TestHttpRetryableToOtlpRetryable(t *testing.T) { + testCases := map[string]struct { + httpStatusCode int + expectedOtlpHTTPStatusCode int + }{ + "HTTP status codes 2xx gets translated into themselves": { + httpStatusCode: http.StatusAccepted, + expectedOtlpHTTPStatusCode: http.StatusAccepted, + }, + "HTTP status code 400 gets translated into itself": { + httpStatusCode: http.StatusBadRequest, + expectedOtlpHTTPStatusCode: http.StatusBadRequest, + }, + "HTTP status code 429 gets translated into itself": { + httpStatusCode: http.StatusTooManyRequests, + expectedOtlpHTTPStatusCode: http.StatusTooManyRequests, + }, + "HTTP status code 500 gets translated into 503": { + httpStatusCode: http.StatusInternalServerError, + expectedOtlpHTTPStatusCode: http.StatusServiceUnavailable, + }, + "HTTP status code 501 gets translated into 503": { + httpStatusCode: http.StatusNotImplemented, + expectedOtlpHTTPStatusCode: http.StatusServiceUnavailable, + }, + "HTTP status code 502 gets translated into itself": { + httpStatusCode: http.StatusBadGateway, + expectedOtlpHTTPStatusCode: http.StatusBadGateway, + }, + "HTTP status code 503 gets translated into itself": { + httpStatusCode: http.StatusServiceUnavailable, + expectedOtlpHTTPStatusCode: http.StatusServiceUnavailable, + }, + "HTTP status code 504 gets translated into itself": { + httpStatusCode: http.StatusGatewayTimeout, + expectedOtlpHTTPStatusCode: http.StatusGatewayTimeout, + }, + "HTTP status code 507 gets translated into 503": { + httpStatusCode: http.StatusInsufficientStorage, + expectedOtlpHTTPStatusCode: http.StatusServiceUnavailable, + }, + } + for testName, testCase := range testCases { + t.Run(testName, func(t *testing.T) { + otlpHTTPStatusCode := httpRetryableToOtlpRetryable(testCase.httpStatusCode) + require.Equal(t, testCase.expectedOtlpHTTPStatusCode, otlpHTTPStatusCode) + }) + } +} diff --git a/pkg/distributor/push.go b/pkg/distributor/push.go index ac3ceba0d19..10723d6a476 100644 --- a/pkg/distributor/push.go +++ b/pkg/distributor/push.go @@ -167,6 +167,9 @@ func handler( msg string ) if resp, ok := httpgrpc.HTTPResponseFromError(err); ok { + // TODO: This code is needed for backwards compatibility, + // and can be removed once -ingester.return-only-grpc-errors + // is removed. code, msg = int(resp.Code), string(resp.Body) } else { code = toHTTPStatus(ctx, err, limits) @@ -208,40 +211,14 @@ func calculateRetryAfter(retryAttemptHeader string, baseSeconds int, maxBackoffE // to that error, if the error is one of the errors from this package. Otherwise, an // http.StatusInternalServerError is returned. func toHTTPStatus(ctx context.Context, pushErr error, limits *validation.Overrides) int { - if errors.Is(pushErr, context.DeadlineExceeded) { - return http.StatusInternalServerError - } - var distributorErr Error if errors.As(pushErr, &distributorErr) { - switch distributorErr.Cause() { - case mimirpb.BAD_DATA: - return http.StatusBadRequest - case mimirpb.INGESTION_RATE_LIMITED, mimirpb.REQUEST_RATE_LIMITED: - serviceOverloadErrorEnabled := false - userID, err := tenant.TenantID(ctx) - if err == nil { - serviceOverloadErrorEnabled = limits.ServiceOverloadStatusCodeOnRateLimitEnabled(userID) - } - // Return a 429 or a 529 here depending on configuration to tell the client it is going too fast. - // Client may discard the data or slow down and re-send. - // Prometheus v2.26 added a remote-write option 'retry_on_http_429'. - if serviceOverloadErrorEnabled { - return StatusServiceOverloaded - } - return http.StatusTooManyRequests - case mimirpb.REPLICAS_DID_NOT_MATCH: - return http.StatusAccepted - case mimirpb.TOO_MANY_CLUSTERS: - return http.StatusBadRequest - case mimirpb.TSDB_UNAVAILABLE: - return http.StatusServiceUnavailable - case mimirpb.CIRCUIT_BREAKER_OPEN: - return http.StatusServiceUnavailable - case mimirpb.METHOD_NOT_ALLOWED: - // Return a 501 (and not 405) to explicitly signal a misconfiguration and to possibly track that amongst other 5xx errors. - return http.StatusNotImplemented + serviceOverloadErrorEnabled := false + userID, err := tenant.TenantID(ctx) + if err == nil { + serviceOverloadErrorEnabled = limits.ServiceOverloadStatusCodeOnRateLimitEnabled(userID) } + return errorCauseToHTTPStatusCode(distributorErr.Cause(), serviceOverloadErrorEnabled) } return http.StatusInternalServerError