Skip to content

Commit

Permalink
Add insight=true to push handlers (#8294)
Browse files Browse the repository at this point in the history
* Add insight=true to push handlers.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>

* CHANGELOG.md entry.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>

* Address review feedback.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>

* Improve log message.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>

* One more iteration of error message clarification.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>

* Add comment about messages.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>

---------

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
  • Loading branch information
pstibrany authored Jun 6, 2024
1 parent 2ec3b86 commit 9c3c70b
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
* [ENHANCEMENT] Query-frontend: include route name in query stats log lines. #8191
* [ENHANCEMENT] OTLP: Speed up conversion from OTel to Mimir format by about 8% and reduce memory consumption by about 30%. Can be disabled via `-distributor.direct-otlp-translation-enabled=false` #7957
* [ENHANCEMENT] Ingester/Querier: Optimise regexps with long lists of alternates. #8221, #8234
* [EHNAHCEMENT] Distributor: add `insight=true` to remote-write and OTLP write handlers when the HTTP response status code is 4xx. #8294
* [BUGFIX] Distributor: make OTLP endpoint return marshalled proto bytes as response body for 4xx/5xx errors. #8227
* [BUGFIX] Rules: improve error handling when querier is local to the ruler. #7567
* [BUGFIX] Querier, store-gateway: Protect against panics raised during snappy encoding. #7520
Expand Down
27 changes: 20 additions & 7 deletions pkg/distributor/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,12 @@ func handler(
msg = err.Error()
}
if code != 202 {
level.Error(logger).Log("msg", "push error", "err", err)
// This error message is consistent with error message in OTLP handler, and ingester's ingest-storage pushToStorage method.
msgs := []interface{}{"msg", "detected an error while ingesting Prometheus remote-write request (the request may have been partially ingested)", "httpCode", code, "err", err}
if code/100 == 4 {
msgs = append(msgs, "insight", true)
}
level.Error(logger).Log(msgs...)
}
addHeaders(w, err, r, code, retryCfg)
http.Error(w, msg, code)
Expand Down Expand Up @@ -233,42 +238,50 @@ func otlpHandler(
var (
httpCode int
grpcCode codes.Code
errorMsg string
)
if resp, ok := httpgrpc.HTTPResponseFromError(err); ok {
// here the error would always be nil, since it is already checked in httpgrpc.HTTPResponseFromError
s, _ := grpcutil.ErrorToStatus(err)
writeErrorToHTTPResponseBody(w, int(resp.Code), s.Code(), string(resp.Body), logger)
httpCode = int(resp.Code)
grpcCode = s.Code() // this will be the same as httpCode.
errorMsg = string(resp.Body)
} else {
grpcCode, httpCode = toGRPCHTTPStatus(ctx, err, limits)
writeErrorToHTTPResponseBody(w, httpCode, grpcCode, err.Error(), logger)
errorMsg = err.Error()
}
if httpCode != 202 {
level.Error(logger).Log("msg", "push error", "err", err)
// This error message is consistent with error message in Prometheus remote-write handler, and ingester's ingest-storage pushToStorage method.
msgs := []interface{}{"msg", "detected an error while ingesting OTLP metrics request (the request may have been partially ingested)", "httpCode", httpCode, "err", err}
if httpCode/100 == 4 {
msgs = append(msgs, "insight", true)
}
level.Error(logger).Log(msgs...)
}
addHeaders(w, err, r, httpCode, retryCfg)
writeErrorToHTTPResponseBody(w, httpCode, grpcCode, errorMsg, logger)
}
})
}

// writeErrorToHTTPResponseBody converts the given error into a grpc status and marshals it into a byte slice, in order to be written to the response body.
// See doc https://opentelemetry.io/docs/specs/otlp/#failures-1
func writeErrorToHTTPResponseBody(w http.ResponseWriter, httpCode int, grpcCode codes.Code, msg string, logger log.Logger) {
// writeResponseFailedError would be returned when writeErrorToHTTPResponseBody fails to write the error to the response body.
writeResponseFailedBody, _ := proto.Marshal(grpcstatus.New(codes.Internal, "write error to response failed").Proto())
w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("X-Content-Type-Options", "nosniff")
w.WriteHeader(httpCode)

respBytes, err := proto.Marshal(grpcstatus.New(grpcCode, msg).Proto())
if err != nil {
level.Error(logger).Log("msg", "otlp response marshal failed", "err", err)
writeResponseFailedBody, _ := proto.Marshal(grpcstatus.New(codes.Internal, "failed to marshal OTLP response").Proto())
_, _ = w.Write(writeResponseFailedBody)
return
}

_, err = w.Write(respBytes)
if err != nil {
level.Error(logger).Log("msg", "write error to otlp response failed", "err", err)
_, _ = w.Write(writeResponseFailedBody)
}
}

Expand Down
76 changes: 71 additions & 5 deletions pkg/distributor/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@ import (
"net/http"
"net/http/httptest"
"strconv"
"strings"
"testing"
"time"

"github.com/failsafe-go/failsafe-go/circuitbreaker"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/golang/snappy"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/httpgrpc/server"
Expand Down Expand Up @@ -186,6 +189,9 @@ func TestHandlerOTLPPush(t *testing.T) {
responseCode int
errMessage string
enableOtelMetadataStorage bool

expectedLogs []string
expectedRetryHeader bool
}{
{
name: "Write samples. No compression",
Expand Down Expand Up @@ -227,6 +233,7 @@ func TestHandlerOTLPPush(t *testing.T) {
},
responseCode: http.StatusRequestEntityTooLarge,
errMessage: "the incoming push request has been rejected because its message size of 63 bytes is larger",
expectedLogs: []string{`level=error user=test msg="detected an error while ingesting OTLP metrics request (the request may have been partially ingested)" httpCode=413 err="rpc error: code = Code(413) desc = the incoming push request has been rejected because its message size of 63 bytes is larger than the allowed limit of 30 bytes (err-mimir-distributor-max-write-message-size). To adjust the related limit, configure -distributor.max-recv-msg-size, or contact your service administrator." insight=true`},
},
{
name: "Write samples. Unsupported compression",
Expand All @@ -240,6 +247,20 @@ func TestHandlerOTLPPush(t *testing.T) {
},
responseCode: http.StatusUnsupportedMediaType,
errMessage: "Only \"gzip\" or no compression supported",
expectedLogs: []string{`level=error user=test msg="detected an error while ingesting OTLP metrics request (the request may have been partially ingested)" httpCode=415 err="rpc error: code = Code(415) desc = unsupported compression: snappy. Only \"gzip\" or no compression supported" insight=true`},
},
{
name: "Rate limited request",
maxMsgSize: 100000,
series: sampleSeries,
metadata: sampleMetadata,
verifyFunc: func(_ *testing.T, pushReq *Request) error {
return httpgrpc.Errorf(http.StatusTooManyRequests, "go slower")
},
responseCode: http.StatusTooManyRequests,
errMessage: "go slower",
expectedLogs: []string{`level=error user=test msg="detected an error while ingesting OTLP metrics request (the request may have been partially ingested)" httpCode=429 err="rpc error: code = Code(429) desc = go slower" insight=true`},
expectedRetryHeader: true,
},
{
name: "Write histograms",
Expand Down Expand Up @@ -302,7 +323,10 @@ func TestHandlerOTLPPush(t *testing.T) {
t.Cleanup(pushReq.CleanUp)
return tt.verifyFunc(t, pushReq)
}
handler := OTLPHandler(tt.maxMsgSize, nil, nil, tt.enableOtelMetadataStorage, limits, RetryConfig{}, pusher, nil, nil, log.NewNopLogger(), true)

logs := &concurrency.SyncBuffer{}
retryConfig := RetryConfig{Enabled: true, BaseSeconds: 5, MaxBackoffExponent: 5}
handler := OTLPHandler(tt.maxMsgSize, nil, nil, tt.enableOtelMetadataStorage, limits, retryConfig, pusher, nil, nil, level.NewFilter(log.NewLogfmtLogger(logs), level.AllowInfo()), true)

resp := httptest.NewRecorder()
handler.ServeHTTP(resp, req)
Expand All @@ -316,6 +340,15 @@ func TestHandlerOTLPPush(t *testing.T) {
assert.NoError(t, err)
assert.Contains(t, respStatus.GetMessage(), tt.errMessage)
}

var logLines []string
if logsStr := logs.String(); logsStr != "" {
logLines = strings.Split(strings.TrimSpace(logsStr), "\n")
}
assert.Equal(t, tt.expectedLogs, logLines)

retryAfter := resp.Header().Get("Retry-After")
assert.Equal(t, tt.expectedRetryHeader, retryAfter != "")
})
}
}
Expand Down Expand Up @@ -759,18 +792,21 @@ func TestHandler_ErrorTranslation(t *testing.T) {
err error
expectedHTTPStatus int
expectedErrorMessage string
expectedLogs []string
}{
{
name: "a generic error during request parsing gets an HTTP 400",
err: fmt.Errorf(errMsg),
expectedHTTPStatus: http.StatusBadRequest,
expectedErrorMessage: errMsg,
expectedLogs: []string{`level=error user=testuser msg="detected an error while ingesting Prometheus remote-write request (the request may have been partially ingested)" httpCode=400 err="rpc error: code = Code(400) desc = this is an error" insight=true`},
},
{
name: "a gRPC error with a status during request parsing gets translated into HTTP error without DoNotLogError header",
err: httpgrpc.Errorf(http.StatusRequestEntityTooLarge, errMsg),
expectedHTTPStatus: http.StatusRequestEntityTooLarge,
expectedErrorMessage: errMsg,
expectedLogs: []string{`level=error user=testuser msg="detected an error while ingesting Prometheus remote-write request (the request may have been partially ingested)" httpCode=413 err="rpc error: code = Code(413) desc = this is an error" insight=true`},
},
}
for _, tc := range parserTestCases {
Expand All @@ -783,13 +819,21 @@ func TestHandler_ErrorTranslation(t *testing.T) {
return err
}

h := handler(10, nil, nil, false, nil, RetryConfig{}, pushFunc, log.NewNopLogger(), parserFunc)
logs := &concurrency.SyncBuffer{}
h := handler(10, nil, nil, false, nil, RetryConfig{}, pushFunc, log.NewLogfmtLogger(logs), parserFunc)

recorder := httptest.NewRecorder()
h.ServeHTTP(recorder, httptest.NewRequest(http.MethodPost, "/push", bufCloser{&bytes.Buffer{}}))
ctxWithUser := user.InjectOrgID(context.Background(), "testuser")
h.ServeHTTP(recorder, httptest.NewRequest(http.MethodPost, "/push", bufCloser{&bytes.Buffer{}}).WithContext(ctxWithUser))

assert.Equal(t, tc.expectedHTTPStatus, recorder.Code)
assert.Equal(t, fmt.Sprintf("%s\n", tc.expectedErrorMessage), recorder.Body.String())

var logLines []string
if logsStr := logs.String(); logsStr != "" {
logLines = strings.Split(strings.TrimSpace(logsStr), "\n")
}
assert.Equal(t, tc.expectedLogs, logLines)
})
}

Expand All @@ -799,6 +843,7 @@ func TestHandler_ErrorTranslation(t *testing.T) {
expectedHTTPStatus int
expectedErrorMessage string
expectedDoNotLogErrorHeader bool
expectedLogs []string
}{
{
name: "no error during push gets translated into a HTTP 200",
Expand All @@ -810,32 +855,44 @@ func TestHandler_ErrorTranslation(t *testing.T) {
err: fmt.Errorf(errMsg),
expectedHTTPStatus: http.StatusInternalServerError,
expectedErrorMessage: errMsg,
expectedLogs: []string{`level=error user=testuser msg="detected an error while ingesting Prometheus remote-write request (the request may have been partially ingested)" httpCode=500 err="this is an error"`},
},
{
name: "a DoNotLogError of a generic error during push gets a HTTP 500 with DoNotLogError header",
err: middleware.DoNotLogError{Err: fmt.Errorf(errMsg)},
expectedHTTPStatus: http.StatusInternalServerError,
expectedErrorMessage: errMsg,
expectedDoNotLogErrorHeader: true,
expectedLogs: []string{`level=error user=testuser msg="detected an error while ingesting Prometheus remote-write request (the request may have been partially ingested)" httpCode=500 err="this is an error"`},
},
{
name: "a gRPC error with a status during push gets translated into HTTP error without DoNotLogError header",
err: httpgrpc.Errorf(http.StatusRequestEntityTooLarge, errMsg),
expectedHTTPStatus: http.StatusRequestEntityTooLarge,
expectedErrorMessage: errMsg,
expectedLogs: []string{`level=error user=testuser msg="detected an error while ingesting Prometheus remote-write request (the request may have been partially ingested)" httpCode=413 err="rpc error: code = Code(413) desc = this is an error" insight=true`},
},
{
name: "a DoNotLogError of a gRPC error with a status during push gets translated into HTTP error without DoNotLogError header",
err: middleware.DoNotLogError{Err: httpgrpc.Errorf(http.StatusRequestEntityTooLarge, errMsg)},
expectedHTTPStatus: http.StatusRequestEntityTooLarge,
expectedErrorMessage: errMsg,
expectedDoNotLogErrorHeader: true,
expectedLogs: []string{`level=error user=testuser msg="detected an error while ingesting Prometheus remote-write request (the request may have been partially ingested)" httpCode=413 err="rpc error: code = Code(413) desc = this is an error" insight=true`},
},
{
name: "a context.Canceled error during push gets translated into a HTTP 499",
err: context.Canceled,
expectedHTTPStatus: statusClientClosedRequest,
expectedErrorMessage: context.Canceled.Error(),
expectedLogs: []string{`level=warn user=testuser msg="push request canceled" err="context canceled"`},
},
{
name: "StatusBadRequest is logged with insight=true",
err: httpgrpc.Errorf(http.StatusBadRequest, "limits reached"),
expectedHTTPStatus: http.StatusBadRequest,
expectedErrorMessage: "limits reached",
expectedLogs: []string{`level=error user=testuser msg="detected an error while ingesting Prometheus remote-write request (the request may have been partially ingested)" httpCode=400 err="rpc error: code = Code(400) desc = limits reached" insight=true`},
},
}

Expand All @@ -852,9 +909,12 @@ func TestHandler_ErrorTranslation(t *testing.T) {
}
return tc.err
}
h := handler(10, nil, nil, false, nil, RetryConfig{}, pushFunc, log.NewNopLogger(), parserFunc)

logs := &concurrency.SyncBuffer{}
h := handler(10, nil, nil, false, nil, RetryConfig{}, pushFunc, log.NewLogfmtLogger(logs), parserFunc)
recorder := httptest.NewRecorder()
h.ServeHTTP(recorder, httptest.NewRequest(http.MethodPost, "/push", bufCloser{&bytes.Buffer{}}))
ctxWithUser := user.InjectOrgID(context.Background(), "testuser")
h.ServeHTTP(recorder, httptest.NewRequest(http.MethodPost, "/push", bufCloser{&bytes.Buffer{}}).WithContext(ctxWithUser))

assert.Equal(t, tc.expectedHTTPStatus, recorder.Code)
if tc.err != nil {
Expand All @@ -866,6 +926,12 @@ func TestHandler_ErrorTranslation(t *testing.T) {
} else {
require.Equal(t, "", header)
}

var logLines []string
if logsStr := logs.String(); logsStr != "" {
logLines = strings.Split(strings.TrimSpace(logsStr), "\n")
}
assert.Equal(t, tc.expectedLogs, logLines)
})
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/ingest/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func (c pusherConsumer) pushToStorage(ctx context.Context, tenantID string, req
if reason != "" {
err = fmt.Errorf("%w (%s)", err, reason)
}
// This error message is consistent with error message in Prometheus remote-write and OTLP handlers in distributors.
level.Warn(spanLog).Log("msg", "detected a client error while ingesting write request (the request may have been partially ingested)", "user", tenantID, "insight", true, "err", err)
}
}
Expand Down

0 comments on commit 9c3c70b

Please sign in to comment.