Skip to content

Commit

Permalink
Ingester: replace grpc/status usages with gogo/status (#6416)
Browse files Browse the repository at this point in the history
* Ingester: replace grpc/status usages with gogo/status

* Move handlePushError from ingester.go to errors.go

Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>

* Fixing review findings

Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>

---------

Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>
  • Loading branch information
duricanikolic authored Oct 18, 2023
1 parent 74edb76 commit 2be6899
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 172 deletions.
36 changes: 32 additions & 4 deletions pkg/ingester/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@ package ingester
import (
"errors"
"fmt"
"net/http"
"time"

"github.com/gogo/status"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/services"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
grpcstatus "google.golang.org/grpc/status"

"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/util/globalerror"
Expand All @@ -33,13 +35,19 @@ type errorWithStatus struct {
status *status.Status
}

// newErrorWithStatus creates a new errorWithStatus backed by the given error,
// and containing the given gRPC code.
func newErrorWithStatus(err error, code codes.Code) errorWithStatus {
return errorWithStatus{
err: err,
status: status.New(code, err.Error()),
}
}

// newErrorWithHTTPStatus creates a new errorWithStatus backed by the given error,
// and containing the given HTTP status code.
// TODO this is needed for backwards compatibility only and should be removed
// once httpgrpc.Errorf() usages in ingester are removed.
func newErrorWithHTTPStatus(err error, code int) errorWithStatus {
errWithHTTPStatus := httpgrpc.Errorf(code, err.Error())
stat, _ := status.FromError(errWithHTTPStatus)
Expand All @@ -50,15 +58,18 @@ func newErrorWithHTTPStatus(err error, code int) errorWithStatus {
}

func (e errorWithStatus) Error() string {
return e.status.String()
return e.status.Message()
}

func (e errorWithStatus) Unwrap() error {
return e.err
}

func (e errorWithStatus) GRPCStatus() *status.Status {
return e.status
func (e errorWithStatus) GRPCStatus() *grpcstatus.Status {
if stat, ok := e.status.Err().(interface{ GRPCStatus() *grpcstatus.Status }); ok {
return stat.GRPCStatus()
}
return nil
}

// TODO move this type into ingester.proto once httpgrpc is removed from ingester.go.
Expand Down Expand Up @@ -430,3 +441,20 @@ func newIngesterErrSamplers(freq int64) ingesterErrSamplers {
log.NewSampler(freq),
}
}

func handlePushError(err error) error {
var ingesterErr ingesterError
if errors.As(err, &ingesterErr) {
switch ingesterErr.errorType() {
case badData:
return newErrorWithHTTPStatus(err, http.StatusBadRequest)
case unavailable:
return newErrorWithStatus(err, codes.Unavailable)
case instanceLimitReached:
return newErrorWithStatus(log.DoNotLogError{Err: err}, codes.Unavailable)
case tsdbUnavailable:
return newErrorWithHTTPStatus(err, http.StatusServiceUnavailable)
}
}
return err
}
174 changes: 173 additions & 1 deletion pkg/ingester/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,25 @@
package ingester

import (
"context"
"errors"
"fmt"
"net/http"
"testing"
"time"

"github.com/gogo/status"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/services"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
grpcstatus "google.golang.org/grpc/status"

"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/util/globalerror"
"github.com/grafana/mimir/pkg/util/log"
"github.com/grafana/mimir/pkg/util/validation"
)

Expand Down Expand Up @@ -239,23 +242,44 @@ func TestErrorWithStatus(t *testing.T) {
err := newSampleTimestampTooOldError(timestamp, metricLabelAdapters)
errWithStatus := newErrorWithStatus(err, codes.Unavailable)
require.Error(t, errWithStatus)
// Ensure gogo's status.FromError recognizes errWithStatus.
stat, ok := status.FromError(errWithStatus)
require.True(t, ok)
require.Equal(t, codes.Unavailable, stat.Code())
require.Errorf(t, err, stat.Message())
require.Empty(t, stat.Details())

// Ensure grpc's status.FromError recognizes errWithStatus.
st, ok := grpcstatus.FromError(errWithStatus)
require.True(t, ok)
require.Equal(t, codes.Unavailable, st.Code())
require.Errorf(t, err, st.Message())

// Ensure httpgrpc's HTTPResponseFromError does not recognize errWithStatus.
resp, ok := httpgrpc.HTTPResponseFromError(errWithStatus)
require.False(t, ok)
require.Nil(t, resp)
}

func TestErrorWithHTTPStatus(t *testing.T) {
metricLabelAdapters := []mimirpb.LabelAdapter{{Name: labels.MetricName, Value: "test"}}
err := newSampleTimestampTooOldError(timestamp, metricLabelAdapters)
errWithHTTPStatus := newErrorWithHTTPStatus(err, http.StatusBadRequest)
require.Error(t, errWithHTTPStatus)
// Ensure gogo's status.FromError recognizes errWithHTTPStatus.
stat, ok := status.FromError(errWithHTTPStatus)
require.True(t, ok)
require.Equal(t, http.StatusBadRequest, int(stat.Code()))
require.Errorf(t, err, stat.Message())
require.NotEmpty(t, stat.Details())

// Ensure grpc's status.FromError recognizes errWithHTTPStatus.
st, ok := grpcstatus.FromError(errWithHTTPStatus)
require.True(t, ok)
require.Equal(t, http.StatusBadRequest, int(st.Code()))
require.Errorf(t, err, st.Message())

// Ensure httpgrpc's HTTPResponseFromError recognizes errWithHTTPStatus.
resp, ok := httpgrpc.HTTPResponseFromError(errWithHTTPStatus)
require.True(t, ok)
require.Equal(t, int32(http.StatusBadRequest), resp.Code)
Expand Down Expand Up @@ -292,3 +316,151 @@ func checkIngesterError(t *testing.T, err error, expectedType ingesterErrorType,
require.ErrorAs(t, err, &softErr)
}
}

func TestHandlePushError(t *testing.T) {
originalMsg := "this is an error"
originalErr := errors.New(originalMsg)
labelAdapters := []mimirpb.LabelAdapter{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "biz"}}
labels := mimirpb.FromLabelAdaptersToLabels(labelAdapters)

timestamp := model.Time(1)
testCases := []struct {
name string
err error
doNotLogExpected bool
expectedTranslation error
}{
{
name: "a generic error is not translated",
err: originalErr,
expectedTranslation: originalErr,
},
{
name: "a DoNotLog error of a generic error is not translated",
err: log.DoNotLogError{Err: originalErr},
expectedTranslation: log.DoNotLogError{Err: originalErr},
doNotLogExpected: true,
},
{
name: "an unavailableError gets translated into an errorWithStatus Unavailable error",
err: newUnavailableError(services.Stopping),
expectedTranslation: newErrorWithStatus(newUnavailableError(services.Stopping), codes.Unavailable),
},
{
name: "a wrapped unavailableError gets translated into a non-loggable errorWithStatus Unavailable error",
err: fmt.Errorf("wrapped: %w", newUnavailableError(services.Stopping)),
expectedTranslation: newErrorWithStatus(
fmt.Errorf("wrapped: %w", newUnavailableError(services.Stopping)),
codes.Unavailable,
),
},
{
name: "an instanceLimitReachedError gets translated into a non-loggable errorWithStatus Unavailable error",
err: newInstanceLimitReachedError("instance limit reached"),
expectedTranslation: newErrorWithStatus(
log.DoNotLogError{Err: newInstanceLimitReachedError("instance limit reached")},
codes.Unavailable,
),
doNotLogExpected: true,
},
{
name: "a wrapped instanceLimitReachedError gets translated into a non-loggable errorWithStatus Unavailable error",
err: fmt.Errorf("wrapped: %w", newInstanceLimitReachedError("instance limit reached")),
expectedTranslation: newErrorWithStatus(
log.DoNotLogError{Err: fmt.Errorf("wrapped: %w", newInstanceLimitReachedError("instance limit reached"))},
codes.Unavailable,
),
doNotLogExpected: true,
},
{
name: "a tsdbUnavailableError gets translated into an errorWithHTTPStatus 503 error",
err: newTSDBUnavailableError("tsdb stopping"),
expectedTranslation: newErrorWithHTTPStatus(
newTSDBUnavailableError("tsdb stopping"),
http.StatusServiceUnavailable,
),
},
{
name: "a wrapped tsdbUnavailableError gets translated into an errorWithHTTPStatus 503 error",
err: fmt.Errorf("wrapped: %w", newTSDBUnavailableError("tsdb stopping")),
expectedTranslation: newErrorWithHTTPStatus(
fmt.Errorf("wrapped: %w", newTSDBUnavailableError("tsdb stopping")),
http.StatusServiceUnavailable,
),
},
{
name: "a sampleError gets translated into an errorWithHTTPStatus 400 error",
err: newSampleError("id", "sample error", timestamp, labelAdapters),
expectedTranslation: newErrorWithHTTPStatus(
newSampleError("id", "sample error", timestamp, labelAdapters),
http.StatusBadRequest,
),
},
{
name: "a wrapped exemplarError gets translated into an errorWithHTTPStatus 400 error",
err: fmt.Errorf("wrapped: %w", newSampleError("id", "sample error", timestamp, labelAdapters)),
expectedTranslation: newErrorWithHTTPStatus(
fmt.Errorf("wrapped: %w", newSampleError("id", "sample error", timestamp, labelAdapters)),
http.StatusBadRequest,
),
},
{
name: "a exemplarError gets translated into an errorWithHTTPStatus 400 error",
err: newExemplarError("id", "exemplar error", timestamp, labelAdapters, labelAdapters),
expectedTranslation: newErrorWithHTTPStatus(
newExemplarError("id", "exemplar error", timestamp, labelAdapters, labelAdapters),
http.StatusBadRequest,
),
},
{
name: "a wrapped exemplarError gets translated into an errorWithHTTPStatus 400 error",
err: fmt.Errorf("wrapped: %w", newExemplarError("id", "exemplar error", timestamp, labelAdapters, labelAdapters)),
expectedTranslation: newErrorWithHTTPStatus(
fmt.Errorf("wrapped: %w", newExemplarError("id", "exemplar error", timestamp, labelAdapters, labelAdapters)),
http.StatusBadRequest,
),
},
{
name: "a perUserMetadataLimitReachedError gets translated into an errorWithHTTPStatus 400 error",
err: newPerUserSeriesLimitReachedError(10),
expectedTranslation: newErrorWithHTTPStatus(
newPerUserSeriesLimitReachedError(10),
http.StatusBadRequest,
),
},
{
name: "a wrapped perUserMetadataLimitReachedError gets translated into an errorWithHTTPStatus 400 error",
err: fmt.Errorf("wrapped: %w", newPerUserSeriesLimitReachedError(10)),
expectedTranslation: newErrorWithHTTPStatus(
fmt.Errorf("wrapped: %w", newPerUserSeriesLimitReachedError(10)),
http.StatusBadRequest,
),
},
{
name: "a perMetricMetadataLimitReachedError gets translated into an errorWithHTTPStatus 400 error",
err: newPerMetricSeriesLimitReachedError(10, labels),
expectedTranslation: newErrorWithHTTPStatus(
newPerMetricSeriesLimitReachedError(10, labels),
http.StatusBadRequest,
),
},
{
name: "a wrapped perMetricMetadataLimitReachedError gets translated into an errorWithHTTPStatus 400 error",
err: fmt.Errorf("wrapped: %w", newPerMetricSeriesLimitReachedError(10, labels)),
expectedTranslation: newErrorWithHTTPStatus(
fmt.Errorf("wrapped: %w", newPerMetricSeriesLimitReachedError(10, labels)),
http.StatusBadRequest,
),
},
}

for _, tc := range testCases {
handledErr := handlePushError(tc.err)
require.Equal(t, tc.expectedTranslation, handledErr)
if tc.doNotLogExpected {
var doNotLogError log.DoNotLogError
require.ErrorAs(t, handledErr, &doNotLogError)
require.False(t, doNotLogError.ShouldLog(context.Background(), 0))
}
}
}
17 changes: 0 additions & 17 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -3154,23 +3154,6 @@ func (i *Ingester) Push(ctx context.Context, req *mimirpb.WriteRequest) (*mimirp
return nil, handledErr
}

func handlePushError(err error) error {
var ingesterErr ingesterError
if errors.As(err, &ingesterErr) {
switch ingesterErr.errorType() {
case badData:
return newErrorWithHTTPStatus(err, http.StatusBadRequest)
case unavailable:
return newErrorWithStatus(err, codes.Unavailable)
case instanceLimitReached:
return newErrorWithStatus(util_log.DoNotLogError{Err: err}, codes.Unavailable)
case tsdbUnavailable:
return newErrorWithHTTPStatus(err, http.StatusServiceUnavailable)
}
}
return err
}

// pushMetadata returns number of ingested metadata.
func (i *Ingester) pushMetadata(ctx context.Context, userID string, metadata []*mimirpb.MetricMetadata) int {
ingestedMetadata := 0
Expand Down
Loading

0 comments on commit 2be6899

Please sign in to comment.