Skip to content

Commit

Permalink
Fixing review findings
Browse files Browse the repository at this point in the history
Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>
  • Loading branch information
duricanikolic committed Jun 12, 2024
1 parent 4af491e commit 73ee36b
Show file tree
Hide file tree
Showing 5 changed files with 267 additions and 63 deletions.
32 changes: 30 additions & 2 deletions pkg/distributor/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package distributor
import (
"context"
"fmt"
"net/http"
"time"

"github.com/gogo/status"
Expand Down Expand Up @@ -262,12 +263,12 @@ func toErrorWithGRPCStatus(pushErr error, serviceOverloadErrorEnabled bool) erro
)
if errors.As(pushErr, &distributorErr) {
errDetails = &mimirpb.ErrorDetails{Cause: distributorErr.Cause()}
errCode = toGRPCStatusCode(distributorErr.Cause(), serviceOverloadErrorEnabled)
errCode = errorCauseToGRPCStatusCode(distributorErr.Cause(), serviceOverloadErrorEnabled)
}
return globalerror.WrapErrorWithGRPCStatus(pushErr, errCode, errDetails).Err()
}

func toGRPCStatusCode(errCause mimirpb.ErrorCause, serviceOverloadErrorEnabled bool) codes.Code {
func errorCauseToGRPCStatusCode(errCause mimirpb.ErrorCause, serviceOverloadErrorEnabled bool) codes.Code {
switch errCause {
case mimirpb.BAD_DATA:
return codes.InvalidArgument
Expand All @@ -288,6 +289,33 @@ func toGRPCStatusCode(errCause mimirpb.ErrorCause, serviceOverloadErrorEnabled b
return codes.Internal
}

func errorCauseToHTTPStatusCode(errCause mimirpb.ErrorCause, serviceOverloadErrorEnabled bool) int {
switch errCause {
case mimirpb.BAD_DATA:
return http.StatusBadRequest
case mimirpb.INGESTION_RATE_LIMITED, mimirpb.REQUEST_RATE_LIMITED:
// Return a 429 or a 529 here depending on configuration to tell the client it is going too fast.
// Client may discard the data or slow down and re-send.
// Prometheus v2.26 added a remote-write option 'retry_on_http_429'.
if serviceOverloadErrorEnabled {
return StatusServiceOverloaded
}
return http.StatusTooManyRequests
case mimirpb.REPLICAS_DID_NOT_MATCH:
return http.StatusAccepted
case mimirpb.TOO_MANY_CLUSTERS:
return http.StatusBadRequest
case mimirpb.TSDB_UNAVAILABLE:
return http.StatusServiceUnavailable
case mimirpb.CIRCUIT_BREAKER_OPEN:
return http.StatusServiceUnavailable
case mimirpb.METHOD_NOT_ALLOWED:
// Return a 501 (and not 405) to explicitly signal a misconfiguration and to possibly track that amongst other 5xx errors.
return http.StatusNotImplemented
}
return http.StatusInternalServerError
}

func wrapIngesterPushError(err error, ingesterID string) error {
if err == nil {
return nil
Expand Down
164 changes: 158 additions & 6 deletions pkg/distributor/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,39 +253,39 @@ func TestToErrorWithGRPCStatus(t *testing.T) {
expectedErrorMsg: tooManyClustersErr.Error(),
expectedErrorDetails: &mimirpb.ErrorDetails{Cause: mimirpb.TOO_MANY_CLUSTERS},
},
"a validationError gets translated into gets translated into an InvalidArgument error with BAD_DATA cause": {
"a validationError gets translated into an InvalidArgument error with BAD_DATA cause": {
err: newValidationError(originalErr),
expectedGRPCCode: codes.InvalidArgument,
expectedErrorMsg: originalMsg,
expectedErrorDetails: &mimirpb.ErrorDetails{Cause: mimirpb.BAD_DATA},
},
"a DoNotLogError of a validationError gets translated into gets translated into an InvalidArgument error with BAD_DATA cause": {
"a DoNotLogError of a validationError gets translated into an InvalidArgument error with BAD_DATA cause": {
err: middleware.DoNotLogError{Err: newValidationError(originalErr)},
expectedGRPCCode: codes.InvalidArgument,
expectedErrorMsg: originalMsg,
expectedErrorDetails: &mimirpb.ErrorDetails{Cause: mimirpb.BAD_DATA},
},
"an ingestionRateLimitedError with serviceOverloadErrorEnabled gets translated into gets translated into an Unavailable error with INGESTION_RATE_LIMITED cause": {
"an ingestionRateLimitedError with serviceOverloadErrorEnabled gets translated into an Unavailable error with INGESTION_RATE_LIMITED cause": {
err: ingestionRateLimitedErr,
serviceOverloadErrorEnabled: true,
expectedGRPCCode: codes.Unavailable,
expectedErrorMsg: ingestionRateLimitedErr.Error(),
expectedErrorDetails: &mimirpb.ErrorDetails{Cause: mimirpb.INGESTION_RATE_LIMITED},
},
"a DoNotLogError of an ingestionRateLimitedError with serviceOverloadErrorEnabled gets translated into gets translated into an Unavailable error with INGESTION_RATE_LIMITED cause": {
"a DoNotLogError of an ingestionRateLimitedError with serviceOverloadErrorEnabled gets translated into an Unavailable error with INGESTION_RATE_LIMITED cause": {
err: middleware.DoNotLogError{Err: ingestionRateLimitedErr},
serviceOverloadErrorEnabled: true,
expectedGRPCCode: codes.Unavailable,
expectedErrorMsg: ingestionRateLimitedErr.Error(),
expectedErrorDetails: &mimirpb.ErrorDetails{Cause: mimirpb.INGESTION_RATE_LIMITED},
},
"an ingestionRateLimitedError without serviceOverloadErrorEnabled gets translated into gets translated into a ResourceExhausted error with INGESTION_RATE_LIMITED cause": {
"an ingestionRateLimitedError without serviceOverloadErrorEnabled gets translated into a ResourceExhausted error with INGESTION_RATE_LIMITED cause": {
err: ingestionRateLimitedErr,
expectedGRPCCode: codes.ResourceExhausted,
expectedErrorMsg: ingestionRateLimitedErr.Error(),
expectedErrorDetails: &mimirpb.ErrorDetails{Cause: mimirpb.INGESTION_RATE_LIMITED},
},
"a DoNotLogError of an ingestionRateLimitedError without serviceOverloadErrorEnabled gets translated into gets translated into a ResourceExhausted error with INGESTION_RATE_LIMITED cause": {
"a DoNotLogError of an ingestionRateLimitedError without serviceOverloadErrorEnabled gets translated into a ResourceExhausted error with INGESTION_RATE_LIMITED cause": {
err: middleware.DoNotLogError{Err: ingestionRateLimitedErr},
expectedGRPCCode: codes.ResourceExhausted,
expectedErrorMsg: ingestionRateLimitedErr.Error(),
Expand Down Expand Up @@ -604,6 +604,158 @@ func TestIsIngestionClientError(t *testing.T) {
}
}

func TestErrorCauseToGRPCStatusCode(t *testing.T) {
type testStruct struct {
errorCause mimirpb.ErrorCause
serviceOverloadErrorEnabled bool
expectedGRPCStatusCode codes.Code
}
testCases := map[string]testStruct{
"an REPLICAS_DID_NOT_MATCH error cause gets translated into an AlreadyExist": {
errorCause: mimirpb.REPLICAS_DID_NOT_MATCH,
expectedGRPCStatusCode: codes.AlreadyExists,
},
"an TOO_MANY_CLUSTERS error cause gets translated into a FailedPrecondition": {
errorCause: mimirpb.TOO_MANY_CLUSTERS,
expectedGRPCStatusCode: codes.FailedPrecondition,
},
"an BAD_DATA error cause gets translated into a InvalidArgument": {
errorCause: mimirpb.BAD_DATA,
expectedGRPCStatusCode: codes.InvalidArgument,
},
"an INGESTION_RATE_LIMITED error cause without serviceOverloadErrorEnabled gets translated into a ResourceExhausted": {
errorCause: mimirpb.INGESTION_RATE_LIMITED,
serviceOverloadErrorEnabled: false,
expectedGRPCStatusCode: codes.ResourceExhausted,
},
"an INGESTION_RATE_LIMITED error cause with serviceOverloadErrorEnabled gets translated into an Unavailable": {
errorCause: mimirpb.INGESTION_RATE_LIMITED,
serviceOverloadErrorEnabled: true,
expectedGRPCStatusCode: codes.Unavailable,
},
"an REQUEST_RATE_LIMITED error cause without serviceOverloadErrorEnabled gets translated into a ResourceExhausted": {
errorCause: mimirpb.REQUEST_RATE_LIMITED,
serviceOverloadErrorEnabled: false,
expectedGRPCStatusCode: codes.ResourceExhausted,
},
"an REQUEST_RATE_LIMITED error cause with serviceOverloadErrorEnabled gets translated into an Unavailable": {
errorCause: mimirpb.REQUEST_RATE_LIMITED,
serviceOverloadErrorEnabled: true,
expectedGRPCStatusCode: codes.Unavailable,
},
"an UNKNOWN error cause gets translated into an Internal": {
errorCause: mimirpb.UNKNOWN_CAUSE,
expectedGRPCStatusCode: codes.Internal,
},
"an INSTANCE_LIMIT error cause gets translated into an Internal": {
errorCause: mimirpb.INSTANCE_LIMIT,
expectedGRPCStatusCode: codes.Internal,
},
"an SERVICE_UNAVAILABLE error cause gets translated into an Internal": {
errorCause: mimirpb.SERVICE_UNAVAILABLE,
expectedGRPCStatusCode: codes.Internal,
},
"an TOO_BUSY error cause gets translated into an Internal": {
errorCause: mimirpb.TOO_BUSY,
expectedGRPCStatusCode: codes.Internal,
},
"an CIRCUIT_BREAKER_OPEN error cause gets translated into an Unavailable": {
errorCause: mimirpb.CIRCUIT_BREAKER_OPEN,
expectedGRPCStatusCode: codes.Unavailable,
},
"an METHOD_NOT_ALLOWED error cause gets translated into an Unimplemented": {
errorCause: mimirpb.METHOD_NOT_ALLOWED,
expectedGRPCStatusCode: codes.Unimplemented,
},
"an TSDB_UNAVAILABLE error cause gets translated into an Internal": {
errorCause: mimirpb.TSDB_UNAVAILABLE,
expectedGRPCStatusCode: codes.Internal,
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
status := errorCauseToGRPCStatusCode(tc.errorCause, tc.serviceOverloadErrorEnabled)
assert.Equal(t, tc.expectedGRPCStatusCode, status)
})
}
}

func TestErrorCauseToHTTPStatusCode(t *testing.T) {
type testStruct struct {
errorCause mimirpb.ErrorCause
serviceOverloadErrorEnabled bool
expectedHTTPStatus int
}
testCases := map[string]testStruct{
"an REPLICAS_DID_NOT_MATCH error cause gets translated into a HTTP 202": {
errorCause: mimirpb.REPLICAS_DID_NOT_MATCH,
expectedHTTPStatus: http.StatusAccepted,
},
"an TOO_MANY_CLUSTERS error cause gets translated into a HTTP 400": {
errorCause: mimirpb.TOO_MANY_CLUSTERS,
expectedHTTPStatus: http.StatusBadRequest,
},
"an BAD_DATA error cause gets translated into a HTTP 400": {
errorCause: mimirpb.BAD_DATA,
expectedHTTPStatus: http.StatusBadRequest,
},
"an INGESTION_RATE_LIMITED error cause without serviceOverloadErrorEnabled gets translated into a HTTP 429": {
errorCause: mimirpb.INGESTION_RATE_LIMITED,
serviceOverloadErrorEnabled: false,
expectedHTTPStatus: http.StatusTooManyRequests,
},
"an INGESTION_RATE_LIMITED error cause with serviceOverloadErrorEnabled gets translated into a HTTP 529": {
errorCause: mimirpb.INGESTION_RATE_LIMITED,
serviceOverloadErrorEnabled: true,
expectedHTTPStatus: StatusServiceOverloaded,
},
"an REQUEST_RATE_LIMITED error cause without serviceOverloadErrorEnabled gets translated into a HTTP 429": {
errorCause: mimirpb.REQUEST_RATE_LIMITED,
serviceOverloadErrorEnabled: false,
expectedHTTPStatus: http.StatusTooManyRequests,
},
"an REQUEST_RATE_LIMITED error cause with serviceOverloadErrorEnabled gets translated into a HTTP 529": {
errorCause: mimirpb.REQUEST_RATE_LIMITED,
serviceOverloadErrorEnabled: true,
expectedHTTPStatus: StatusServiceOverloaded,
},
"an UNKNOWN error cause gets translated into a HTTP 500": {
errorCause: mimirpb.UNKNOWN_CAUSE,
expectedHTTPStatus: http.StatusInternalServerError,
},
"an INSTANCE_LIMIT error cause gets translated into a HTTP 500": {
errorCause: mimirpb.INSTANCE_LIMIT,
expectedHTTPStatus: http.StatusInternalServerError,
},
"an SERVICE_UNAVAILABLE error cause gets translated into a HTTP 500": {
errorCause: mimirpb.SERVICE_UNAVAILABLE,
expectedHTTPStatus: http.StatusInternalServerError,
},
"an TOO_BUSY error cause gets translated into a HTTP 500": {
errorCause: mimirpb.TOO_BUSY,
expectedHTTPStatus: http.StatusInternalServerError,
},
"an CIRCUIT_BREAKER_OPEN error cause gets translated into a HTTP 500": {
errorCause: mimirpb.CIRCUIT_BREAKER_OPEN,
expectedHTTPStatus: http.StatusServiceUnavailable,
},
"an METHOD_NOT_ALLOWED error cause gets translated into a HTTP 501": {
errorCause: mimirpb.METHOD_NOT_ALLOWED,
expectedHTTPStatus: http.StatusNotImplemented,
},
"an TSDB_UNAVAILABLE error cause gets translated into a HTTP 503": {
errorCause: mimirpb.TSDB_UNAVAILABLE,
expectedHTTPStatus: http.StatusServiceUnavailable,
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
status := errorCauseToHTTPStatusCode(tc.errorCause, tc.serviceOverloadErrorEnabled)
assert.Equal(t, tc.expectedHTTPStatus, status)
})
}
}

func checkDistributorError(t *testing.T, err error, expectedCause mimirpb.ErrorCause) {
var distributorErr Error
require.ErrorAs(t, err, &distributorErr)
Expand Down
45 changes: 21 additions & 24 deletions pkg/distributor/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,10 @@ func otlpHandler(
errorMsg string
)
if st, ok := grpcutil.ErrorToStatus(err); ok {
httpCode = int(st.Code())
// TODO: This code is needed for backwards compatibility,
// and can be removed once -ingester.return-only-grpc-errors
// is removed.
httpCode = httpRetryableToOtlpRetryable(int(st.Code()))
grpcCode = st.Code()
errorMsg = st.Message()
} else {
Expand Down Expand Up @@ -281,33 +284,27 @@ func toOtlpGRPCHTTPStatus(pushErr error) (codes.Code, int) {
return codes.Internal, http.StatusServiceUnavailable
}

grpcStatusCode := toGRPCStatusCode(distributorErr.Cause(), false)
otlpHTTPStatusCode := toOtlpHTTPStatus(distributorErr.Cause())
grpcStatusCode := errorCauseToGRPCStatusCode(distributorErr.Cause(), false)
httpStatusCode := errorCauseToHTTPStatusCode(distributorErr.Cause(), false)
otlpHTTPStatusCode := httpRetryableToOtlpRetryable(httpStatusCode)
return grpcStatusCode, otlpHTTPStatusCode
}

// toOtlpHTTPStatus maps the given mimirpb.ErrorCause to its OTLP endpoint HTTP status.
// This function is slightly different from toHTTPStatus() due to the OTLP specifications
// (https://opentelemetry.io/docs/specs/otlp/#failures-1): unlike Prometheus, the OTLP
// client only retries on HTTP status codes 429, 502, 503, and 504.
func toOtlpHTTPStatus(errCause mimirpb.ErrorCause) int {
switch errCause {
case mimirpb.BAD_DATA:
return http.StatusBadRequest
case mimirpb.INGESTION_RATE_LIMITED, mimirpb.REQUEST_RATE_LIMITED:
return http.StatusTooManyRequests
case mimirpb.REPLICAS_DID_NOT_MATCH:
return http.StatusAccepted
case mimirpb.TOO_MANY_CLUSTERS:
return http.StatusBadRequest
case mimirpb.TSDB_UNAVAILABLE:
return http.StatusServiceUnavailable
case mimirpb.CIRCUIT_BREAKER_OPEN:
return http.StatusServiceUnavailable
case mimirpb.METHOD_NOT_ALLOWED:
return http.StatusNotImplemented
// httpRetryableToOtlpRetryable maps non-retryable 5xx HTTP status codes according
// to the OTLP specifications (https://opentelemetry.io/docs/specs/otlp/#failures-1)
// to http.StatusServiceUnavailable. In case of a non-retryable HTTP status code,
// httpRetryableToOtlpRetryable returns the HTTP status code itself.
// Unlike Prometheus, which retries 429 and all 5xx HTTP status codes,
// the OTLP client only retries on HTTP status codes 429, 502, 503, and 504.
func httpRetryableToOtlpRetryable(httpStatusCode int) int {
if httpStatusCode/100 == 5 {
mask := httpStatusCode % 100
// We map all 5xx except 502, 503 and 504 into 503.
if mask <= 1 || mask > 4 {
return http.StatusServiceUnavailable
}
}
return http.StatusServiceUnavailable
return httpStatusCode
}

// writeErrorToHTTPResponseBody converts the given error into a grpc status and marshals it into a byte slice, in order to be written to the response body.
Expand Down
50 changes: 50 additions & 0 deletions pkg/distributor/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,3 +729,53 @@ func TestHandler_toOtlpGRPCHTTPStatus(t *testing.T) {
})
}
}

func TestHttpRetryableToOtlpRetryable(t *testing.T) {
testCases := map[string]struct {
httpStatusCode int
expectedOtlpHTTPStatusCode int
}{
"HTTP status codes 2xx gets translated into themselves": {
httpStatusCode: http.StatusAccepted,
expectedOtlpHTTPStatusCode: http.StatusAccepted,
},
"HTTP status code 400 gets translated into itself": {
httpStatusCode: http.StatusBadRequest,
expectedOtlpHTTPStatusCode: http.StatusBadRequest,
},
"HTTP status code 429 gets translated into itself": {
httpStatusCode: http.StatusTooManyRequests,
expectedOtlpHTTPStatusCode: http.StatusTooManyRequests,
},
"HTTP status code 500 gets translated into 503": {
httpStatusCode: http.StatusInternalServerError,
expectedOtlpHTTPStatusCode: http.StatusServiceUnavailable,
},
"HTTP status code 501 gets translated into 503": {
httpStatusCode: http.StatusNotImplemented,
expectedOtlpHTTPStatusCode: http.StatusServiceUnavailable,
},
"HTTP status code 502 gets translated into itself": {
httpStatusCode: http.StatusBadGateway,
expectedOtlpHTTPStatusCode: http.StatusBadGateway,
},
"HTTP status code 503 gets translated into itself": {
httpStatusCode: http.StatusServiceUnavailable,
expectedOtlpHTTPStatusCode: http.StatusServiceUnavailable,
},
"HTTP status code 504 gets translated into itself": {
httpStatusCode: http.StatusGatewayTimeout,
expectedOtlpHTTPStatusCode: http.StatusGatewayTimeout,
},
"HTTP status code 507 gets translated into 503": {
httpStatusCode: http.StatusInsufficientStorage,
expectedOtlpHTTPStatusCode: http.StatusServiceUnavailable,
},
}
for testName, testCase := range testCases {
t.Run(testName, func(t *testing.T) {
otlpHTTPStatusCode := httpRetryableToOtlpRetryable(testCase.httpStatusCode)
require.Equal(t, testCase.expectedOtlpHTTPStatusCode, otlpHTTPStatusCode)
})
}
}
Loading

0 comments on commit 73ee36b

Please sign in to comment.