Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingester: replace grpc/status usages with gogo/status #6416

Merged
merged 3 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 32 additions & 4 deletions pkg/ingester/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@ package ingester
import (
"errors"
"fmt"
"net/http"
"time"

"github.com/gogo/status"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/services"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
grpcstatus "google.golang.org/grpc/status"

"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/util/globalerror"
Expand All @@ -33,13 +35,19 @@ type errorWithStatus struct {
status *status.Status
}

// newErrorWithStatus creates a new errorWithStatus backed by the given error,
// and containing the given gRPC code.
func newErrorWithStatus(err error, code codes.Code) errorWithStatus {
return errorWithStatus{
err: err,
status: status.New(code, err.Error()),
}
}

// newErrorWithHTTPStatus creates a new errorWithStatus backed by the given error,
// and containing the given HTTP status code.
// TODO this is needed for backwards compatibility only and should be removed
// once httpgrpc.Errorf() usages in ingester are removed.
func newErrorWithHTTPStatus(err error, code int) errorWithStatus {
errWithHTTPStatus := httpgrpc.Errorf(code, err.Error())
stat, _ := status.FromError(errWithHTTPStatus)
Expand All @@ -50,15 +58,18 @@ func newErrorWithHTTPStatus(err error, code int) errorWithStatus {
}

func (e errorWithStatus) Error() string {
return e.status.String()
return e.status.Message()
}

func (e errorWithStatus) Unwrap() error {
return e.err
}

func (e errorWithStatus) GRPCStatus() *status.Status {
return e.status
func (e errorWithStatus) GRPCStatus() *grpcstatus.Status {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for reviewer: this is needed to ensure that gRPC engine will be able to recognizer errorWithStatus by calling grpc.status.FromError().

if stat, ok := e.status.Err().(interface{ GRPCStatus() *grpcstatus.Status }); ok {
return stat.GRPCStatus()
}
return nil
}

// TODO move this type into ingester.proto once httpgrpc is removed from ingester.go.
Expand Down Expand Up @@ -430,3 +441,20 @@ func newIngesterErrSamplers(freq int64) ingesterErrSamplers {
log.NewSampler(freq),
}
}

func handlePushError(err error) error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for reviewers: this method has been moved from ingester.go to errors.go.

var ingesterErr ingesterError
if errors.As(err, &ingesterErr) {
switch ingesterErr.errorType() {
case badData:
return newErrorWithHTTPStatus(err, http.StatusBadRequest)
case unavailable:
return newErrorWithStatus(err, codes.Unavailable)
case instanceLimitReached:
return newErrorWithStatus(log.DoNotLogError{Err: err}, codes.Unavailable)
case tsdbUnavailable:
return newErrorWithHTTPStatus(err, http.StatusServiceUnavailable)
}
}
return err
}
174 changes: 173 additions & 1 deletion pkg/ingester/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,25 @@
package ingester

import (
"context"
"errors"
"fmt"
"net/http"
"testing"
"time"

"github.com/gogo/status"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/services"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
grpcstatus "google.golang.org/grpc/status"

"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/util/globalerror"
"github.com/grafana/mimir/pkg/util/log"
"github.com/grafana/mimir/pkg/util/validation"
)

Expand Down Expand Up @@ -239,23 +242,44 @@ func TestErrorWithStatus(t *testing.T) {
err := newSampleTimestampTooOldError(timestamp, metricLabelAdapters)
errWithStatus := newErrorWithStatus(err, codes.Unavailable)
require.Error(t, errWithStatus)
// Ensure gogo's status.FromError recognizes errWithStatus.
stat, ok := status.FromError(errWithStatus)
require.True(t, ok)
require.Equal(t, codes.Unavailable, stat.Code())
require.Errorf(t, err, stat.Message())
require.Empty(t, stat.Details())

// Ensure grpc's status.FromError recognizes errWithStatus.
st, ok := grpcstatus.FromError(errWithStatus)
require.True(t, ok)
require.Equal(t, codes.Unavailable, st.Code())
require.Errorf(t, err, st.Message())

// Ensure httpgrpc's HTTPResponseFromError does not recognize errWithStatus.
resp, ok := httpgrpc.HTTPResponseFromError(errWithStatus)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we want it to not recognize the status? is that just to ensure that no behavior changes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

httpgrpc.HttpResponseFromError() ensures that the given error is built by httpgrpc.Errorf(). Since we need to guarantee backwards compatibility, methods that translate errors to the corresponding HTTP or gRPC codes will need to be able to distinguish between errors built by httpgrpc.Errorf() and status.Error().
status.FromError() recognizes both types of errors, but httpgrpc.HttpResponseFromError() recognzies only the former. This test ensures that this property holds.

require.False(t, ok)
require.Nil(t, resp)
}

func TestErrorWithHTTPStatus(t *testing.T) {
metricLabelAdapters := []mimirpb.LabelAdapter{{Name: labels.MetricName, Value: "test"}}
err := newSampleTimestampTooOldError(timestamp, metricLabelAdapters)
errWithHTTPStatus := newErrorWithHTTPStatus(err, http.StatusBadRequest)
require.Error(t, errWithHTTPStatus)
// Ensure gogo's status.FromError recognizes errWithHTTPStatus.
stat, ok := status.FromError(errWithHTTPStatus)
require.True(t, ok)
require.Equal(t, http.StatusBadRequest, int(stat.Code()))
require.Errorf(t, err, stat.Message())
require.NotEmpty(t, stat.Details())

// Ensure grpc's status.FromError recognizes errWithHTTPStatus.
st, ok := grpcstatus.FromError(errWithHTTPStatus)
require.True(t, ok)
require.Equal(t, http.StatusBadRequest, int(st.Code()))
require.Errorf(t, err, st.Message())

// Ensure httpgrpc's HTTPResponseFromError recognizes errWithHTTPStatus.
resp, ok := httpgrpc.HTTPResponseFromError(errWithHTTPStatus)
require.True(t, ok)
require.Equal(t, int32(http.StatusBadRequest), resp.Code)
Expand Down Expand Up @@ -292,3 +316,151 @@ func checkIngesterError(t *testing.T, err error, expectedType ingesterErrorType,
require.ErrorAs(t, err, &softErr)
}
}

func TestHandlePushError(t *testing.T) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for reviewers: this test has been moved from ingester.go to errors.go.

originalMsg := "this is an error"
originalErr := errors.New(originalMsg)
labelAdapters := []mimirpb.LabelAdapter{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "biz"}}
labels := mimirpb.FromLabelAdaptersToLabels(labelAdapters)

timestamp := model.Time(1)
testCases := []struct {
name string
err error
doNotLogExpected bool
expectedTranslation error
}{
{
name: "a generic error is not translated",
err: originalErr,
expectedTranslation: originalErr,
},
{
name: "a DoNotLog error of a generic error is not translated",
err: log.DoNotLogError{Err: originalErr},
expectedTranslation: log.DoNotLogError{Err: originalErr},
doNotLogExpected: true,
},
{
name: "an unavailableError gets translated into an errorWithStatus Unavailable error",
err: newUnavailableError(services.Stopping),
expectedTranslation: newErrorWithStatus(newUnavailableError(services.Stopping), codes.Unavailable),
},
{
name: "a wrapped unavailableError gets translated into a non-loggable errorWithStatus Unavailable error",
err: fmt.Errorf("wrapped: %w", newUnavailableError(services.Stopping)),
expectedTranslation: newErrorWithStatus(
fmt.Errorf("wrapped: %w", newUnavailableError(services.Stopping)),
codes.Unavailable,
),
},
{
name: "an instanceLimitReachedError gets translated into a non-loggable errorWithStatus Unavailable error",
err: newInstanceLimitReachedError("instance limit reached"),
expectedTranslation: newErrorWithStatus(
log.DoNotLogError{Err: newInstanceLimitReachedError("instance limit reached")},
codes.Unavailable,
),
doNotLogExpected: true,
},
{
name: "a wrapped instanceLimitReachedError gets translated into a non-loggable errorWithStatus Unavailable error",
err: fmt.Errorf("wrapped: %w", newInstanceLimitReachedError("instance limit reached")),
expectedTranslation: newErrorWithStatus(
log.DoNotLogError{Err: fmt.Errorf("wrapped: %w", newInstanceLimitReachedError("instance limit reached"))},
codes.Unavailable,
),
doNotLogExpected: true,
},
{
name: "a tsdbUnavailableError gets translated into an errorWithHTTPStatus 503 error",
err: newTSDBUnavailableError("tsdb stopping"),
expectedTranslation: newErrorWithHTTPStatus(
newTSDBUnavailableError("tsdb stopping"),
http.StatusServiceUnavailable,
),
},
{
name: "a wrapped tsdbUnavailableError gets translated into an errorWithHTTPStatus 503 error",
err: fmt.Errorf("wrapped: %w", newTSDBUnavailableError("tsdb stopping")),
expectedTranslation: newErrorWithHTTPStatus(
fmt.Errorf("wrapped: %w", newTSDBUnavailableError("tsdb stopping")),
http.StatusServiceUnavailable,
),
},
{
name: "a sampleError gets translated into an errorWithHTTPStatus 400 error",
err: newSampleError("id", "sample error", timestamp, labelAdapters),
expectedTranslation: newErrorWithHTTPStatus(
newSampleError("id", "sample error", timestamp, labelAdapters),
http.StatusBadRequest,
),
},
{
name: "a wrapped exemplarError gets translated into an errorWithHTTPStatus 400 error",
err: fmt.Errorf("wrapped: %w", newSampleError("id", "sample error", timestamp, labelAdapters)),
expectedTranslation: newErrorWithHTTPStatus(
fmt.Errorf("wrapped: %w", newSampleError("id", "sample error", timestamp, labelAdapters)),
http.StatusBadRequest,
),
},
{
name: "a exemplarError gets translated into an errorWithHTTPStatus 400 error",
err: newExemplarError("id", "exemplar error", timestamp, labelAdapters, labelAdapters),
expectedTranslation: newErrorWithHTTPStatus(
newExemplarError("id", "exemplar error", timestamp, labelAdapters, labelAdapters),
http.StatusBadRequest,
),
},
{
name: "a wrapped exemplarError gets translated into an errorWithHTTPStatus 400 error",
err: fmt.Errorf("wrapped: %w", newExemplarError("id", "exemplar error", timestamp, labelAdapters, labelAdapters)),
expectedTranslation: newErrorWithHTTPStatus(
fmt.Errorf("wrapped: %w", newExemplarError("id", "exemplar error", timestamp, labelAdapters, labelAdapters)),
http.StatusBadRequest,
),
},
{
name: "a perUserMetadataLimitReachedError gets translated into an errorWithHTTPStatus 400 error",
err: newPerUserSeriesLimitReachedError(10),
expectedTranslation: newErrorWithHTTPStatus(
newPerUserSeriesLimitReachedError(10),
http.StatusBadRequest,
),
},
{
name: "a wrapped perUserMetadataLimitReachedError gets translated into an errorWithHTTPStatus 400 error",
err: fmt.Errorf("wrapped: %w", newPerUserSeriesLimitReachedError(10)),
expectedTranslation: newErrorWithHTTPStatus(
fmt.Errorf("wrapped: %w", newPerUserSeriesLimitReachedError(10)),
http.StatusBadRequest,
),
},
{
name: "a perMetricMetadataLimitReachedError gets translated into an errorWithHTTPStatus 400 error",
err: newPerMetricSeriesLimitReachedError(10, labels),
expectedTranslation: newErrorWithHTTPStatus(
newPerMetricSeriesLimitReachedError(10, labels),
http.StatusBadRequest,
),
},
{
name: "a wrapped perMetricMetadataLimitReachedError gets translated into an errorWithHTTPStatus 400 error",
err: fmt.Errorf("wrapped: %w", newPerMetricSeriesLimitReachedError(10, labels)),
expectedTranslation: newErrorWithHTTPStatus(
fmt.Errorf("wrapped: %w", newPerMetricSeriesLimitReachedError(10, labels)),
http.StatusBadRequest,
),
},
}

for _, tc := range testCases {
handledErr := handlePushError(tc.err)
require.Equal(t, tc.expectedTranslation, handledErr)
if tc.doNotLogExpected {
var doNotLogError log.DoNotLogError
require.ErrorAs(t, handledErr, &doNotLogError)
require.False(t, doNotLogError.ShouldLog(context.Background(), 0))
}
}
}
17 changes: 0 additions & 17 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -3154,23 +3154,6 @@ func (i *Ingester) Push(ctx context.Context, req *mimirpb.WriteRequest) (*mimirp
return nil, handledErr
}

func handlePushError(err error) error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for reviewers: this method has been moved from ingester.go to errors.go.

var ingesterErr ingesterError
if errors.As(err, &ingesterErr) {
switch ingesterErr.errorType() {
case badData:
return newErrorWithHTTPStatus(err, http.StatusBadRequest)
case unavailable:
return newErrorWithStatus(err, codes.Unavailable)
case instanceLimitReached:
return newErrorWithStatus(util_log.DoNotLogError{Err: err}, codes.Unavailable)
case tsdbUnavailable:
return newErrorWithHTTPStatus(err, http.StatusServiceUnavailable)
}
}
return err
}

// pushMetadata returns number of ingested metadata.
func (i *Ingester) pushMetadata(ctx context.Context, userID string, metadata []*mimirpb.MetricMetadata) int {
ingestedMetadata := 0
Expand Down
Loading
Loading