Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributor: replace ring.DoBatch with ring.DoBatchWithClientError #6636

Merged
merged 1 commit into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/golang/snappy v0.0.4
github.com/google/gopacket v1.1.19
github.com/gorilla/mux v1.8.1
github.com/grafana/dskit v0.0.0-20231110082806-620b5f187e90
github.com/grafana/dskit v0.0.0-20231113093401-0ae8e2e7ba38
github.com/grafana/e2e v0.1.1
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/json-iterator/go v1.1.12
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -538,8 +538,8 @@ github.com/gosimple/slug v1.1.1 h1:fRu/digW+NMwBIP+RmviTK97Ho/bEj/C9swrCspN3D4=
github.com/gosimple/slug v1.1.1/go.mod h1:ER78kgg1Mv0NQGlXiDe57DpCyfbNywXXZ9mIorhxAf0=
github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc h1:PXZQA2WCxe85Tnn+WEvr8fDpfwibmEPgfgFEaC87G24=
github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc/go.mod h1:AHHlOEv1+GGQ3ktHMlhuTUwo3zljV3QJbC0+8o2kn+4=
github.com/grafana/dskit v0.0.0-20231110082806-620b5f187e90 h1:+YsdVHl/VdXqCQDuVefswsTfHTT25mbergE9xPwvDlo=
github.com/grafana/dskit v0.0.0-20231110082806-620b5f187e90/go.mod h1:8dsy5tQOkeNQyjXpm5mQsbCu3H5uzeBD35MzRQFznKU=
github.com/grafana/dskit v0.0.0-20231113093401-0ae8e2e7ba38 h1:nHd5vwL0g4zvYFcjGDLAij5EelqhAWM+nxypldn5Wyk=
github.com/grafana/dskit v0.0.0-20231113093401-0ae8e2e7ba38/go.mod h1:8dsy5tQOkeNQyjXpm5mQsbCu3H5uzeBD35MzRQFznKU=
github.com/grafana/e2e v0.1.1 h1:/b6xcv5BtoBnx8cZnCiey9DbjEc8z7gXHO5edoeRYxc=
github.com/grafana/e2e v0.1.1/go.mod h1:RpNLgae5VT+BUHvPE+/zSypmOXKwEu4t+tnEMS1ATaE=
github.com/grafana/goautoneg v0.0.0-20231010094147-47ce5e72a9ae h1:Yxbw9jKGJVC6qAK5Ubzzb/qZwM6rRMMqaDc/d4Vp3pM=
Expand Down
74 changes: 26 additions & 48 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
"go.uber.org/atomic"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"

"github.com/grafana/mimir/pkg/cardinality"
ingester_client "github.com/grafana/mimir/pkg/ingester/client"
Expand Down Expand Up @@ -1306,33 +1305,37 @@ func (d *Distributor) push(ctx context.Context, pushReq *Request) error {
localCtx = ingester_client.WithSlabPool(localCtx, slabPool)
}

err = ring.DoBatch(ctx, ring.WriteNoExtend, subRing, keys, func(ingester ring.InstanceDesc, indexes []int) error {
var timeseriesCount, metadataCount int
for _, i := range indexes {
if i >= initialMetadataIndex {
metadataCount++
} else {
timeseriesCount++
err = ring.DoBatchWithClientError(ctx, ring.WriteNoExtend, subRing, keys,
func(ingester ring.InstanceDesc, indexes []int) error {
var timeseriesCount, metadataCount int
for _, i := range indexes {
if i >= initialMetadataIndex {
metadataCount++
} else {
timeseriesCount++
}
}
}

timeseries := preallocSliceIfNeeded[mimirpb.PreallocTimeseries](timeseriesCount)
metadata := preallocSliceIfNeeded[*mimirpb.MetricMetadata](metadataCount)
timeseries := preallocSliceIfNeeded[mimirpb.PreallocTimeseries](timeseriesCount)
metadata := preallocSliceIfNeeded[*mimirpb.MetricMetadata](metadataCount)

for _, i := range indexes {
if i >= initialMetadataIndex {
metadata = append(metadata, req.Metadata[i-initialMetadataIndex])
} else {
timeseries = append(timeseries, req.Timeseries[i])
for _, i := range indexes {
if i >= initialMetadataIndex {
metadata = append(metadata, req.Metadata[i-initialMetadataIndex])
} else {
timeseries = append(timeseries, req.Timeseries[i])
}
}
}

err := d.send(localCtx, ingester, timeseries, metadata, req.Source)
if errors.Is(err, context.DeadlineExceeded) {
return errors.Wrap(err, deadlineExceededWrapMessage)
}
return err
}, func() { pushReq.CleanUp(); cancel() })
err := d.send(localCtx, ingester, timeseries, metadata, req.Source)
if errors.Is(err, context.DeadlineExceeded) {
return errors.Wrap(err, deadlineExceededWrapMessage)
}
return err
},
func() { pushReq.CleanUp(); cancel() },
isClientError,
)

return err
}
Expand Down Expand Up @@ -1391,31 +1394,6 @@ func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, time
return handleIngesterPushError(err)
}

func handleIngesterPushError(err error) error {
if err == nil {
return nil
}

stat, ok := grpcutil.ErrorToStatus(err)
if !ok {
return errors.Wrap(err, failedPushingToIngesterMessage)
}
statusCode := stat.Code()
if isHTTPStatusCode(statusCode) {
// TODO This code is needed for backwards compatibility, since ingesters may still return
// errors created by httpgrpc.Errorf(). If pushErr is one of those errors, we just propagate
// it. This code should be removed in mimir 2.12.0.
// Wrap HTTP gRPC error with more explanatory message.
return httpgrpc.Errorf(int(statusCode), "%s: %s", failedPushingToIngesterMessage, stat.Message())
}

return newIngesterPushError(stat)
}

func isHTTPStatusCode(statusCode codes.Code) bool {
return int(statusCode) >= 100 && int(statusCode) < 600
}

// forReplicationSet runs f, in parallel, for all ingesters in the input replication set.
func forReplicationSet[T any](ctx context.Context, d *Distributor, replicationSet ring.ReplicationSet, f func(context.Context, ingester_client.IngesterClient) (T, error)) ([]T, error) {
wrappedF := func(ctx context.Context, ing *ring.InstanceDesc) (T, error) {
Expand Down
82 changes: 0 additions & 82 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4910,88 +4910,6 @@ func TestSeriesAreShardedToCorrectIngesters(t *testing.T) {
assert.Equal(t, series, totalMetadata) // each series has unique metric name, and each metric name gets metadata
}

func TestHandleIngesterPushError(t *testing.T) {
testErrorMsg := "this is a test error message"
outputErrorMsgPrefix := "failed pushing to ingester"

// Ensure that no error gets translated into no error.
t.Run("no error gives no error", func(t *testing.T) {
err := handleIngesterPushError(nil)
require.NoError(t, err)
})

// Ensure that the errors created by httpgrpc get translated into
// other errors created by httpgrpc with the same code, and with
// a more explanatory message.
// TODO: this is needed for backwards compatibility and will be removed
// in mimir 2.12.0.
httpgrpcTests := map[string]struct {
ingesterPushError error
expectedStatus int32
expectedMessage string
}{
"a 4xx HTTP gRPC error gives a 4xx HTTP gRPC error": {
ingesterPushError: httpgrpc.Errorf(http.StatusBadRequest, testErrorMsg),
expectedStatus: http.StatusBadRequest,
expectedMessage: fmt.Sprintf("%s: %s", outputErrorMsgPrefix, testErrorMsg),
},
"a 5xx HTTP gRPC error gives a 5xx HTTP gRPC error": {
ingesterPushError: httpgrpc.Errorf(http.StatusServiceUnavailable, testErrorMsg),
expectedStatus: http.StatusServiceUnavailable,
expectedMessage: fmt.Sprintf("%s: %s", outputErrorMsgPrefix, testErrorMsg),
},
}
for testName, testData := range httpgrpcTests {
t.Run(testName, func(t *testing.T) {
err := handleIngesterPushError(testData.ingesterPushError)
res, ok := httpgrpc.HTTPResponseFromError(err)
require.True(t, ok)
require.NotNil(t, res)
require.Equal(t, testData.expectedStatus, res.GetCode())
require.Equal(t, testData.expectedMessage, string(res.Body))
})
}

// Ensure that the errors created by gogo/status package get translated
// into ingesterPushError messages.
statusTests := map[string]struct {
ingesterPushError error
expectedIngesterPushError ingesterPushError
}{
"a gRPC error with details gives an ingesterPushError with the same details": {
ingesterPushError: createStatusWithDetails(t, codes.Unavailable, testErrorMsg, mimirpb.UNKNOWN_CAUSE).Err(),
expectedIngesterPushError: newIngesterPushError(createStatusWithDetails(t, codes.Unavailable, testErrorMsg, mimirpb.UNKNOWN_CAUSE)),
},
"a DeadlineExceeded gRPC ingester error gives an ingesterPushError with UNKNOWN_CAUSE cause": {
// This is how context.DeadlineExceeded error is translated into a gRPC error.
ingesterPushError: status.Error(codes.DeadlineExceeded, context.DeadlineExceeded.Error()),
expectedIngesterPushError: newIngesterPushError(createStatusWithDetails(t, codes.Unavailable, context.DeadlineExceeded.Error(), mimirpb.UNKNOWN_CAUSE)),
},
"an Unavailable gRPC error without details gives an ingesterPushError with UNKNOWN_CAUSE cause": {
ingesterPushError: status.Error(codes.Unavailable, testErrorMsg),
expectedIngesterPushError: newIngesterPushError(createStatusWithDetails(t, codes.Unavailable, testErrorMsg, mimirpb.UNKNOWN_CAUSE)),
},
"an Internal gRPC ingester error without details gives an ingesterPushError with UNKNOWN_CAUSE cause": {
ingesterPushError: status.Error(codes.Internal, testErrorMsg),
expectedIngesterPushError: newIngesterPushError(createStatusWithDetails(t, codes.Unavailable, testErrorMsg, mimirpb.UNKNOWN_CAUSE)),
},
"an Unknown gRPC ingester error without details gives an ingesterPushError with UNKNOWN_CAUSE cause": {
ingesterPushError: status.Error(codes.Unknown, testErrorMsg),
expectedIngesterPushError: newIngesterPushError(createStatusWithDetails(t, codes.Unavailable, testErrorMsg, mimirpb.UNKNOWN_CAUSE)),
},
}
for testName, testData := range statusTests {
t.Run(testName, func(t *testing.T) {
err := handleIngesterPushError(testData.ingesterPushError)
ingesterPushErr, ok := err.(ingesterPushError)
require.True(t, ok)

require.Equal(t, testData.expectedIngesterPushError.Error(), ingesterPushErr.Error())
require.Equal(t, testData.expectedIngesterPushError.errorCause(), ingesterPushErr.errorCause())
})
}
}

func TestHandlePushError(t *testing.T) {
testErrorMsg := "this is a test error message"
userID := "test"
Expand Down
46 changes: 45 additions & 1 deletion pkg/distributor/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
package distributor

import (
"errors"
"fmt"

"github.com/gogo/status"
"github.com/grafana/dskit/grpcutil"
"github.com/grafana/dskit/httpgrpc"
"github.com/pkg/errors"
"google.golang.org/grpc/codes"

"github.com/grafana/mimir/pkg/mimirpb"
Expand Down Expand Up @@ -226,3 +228,45 @@ func toGRPCError(pushErr error, serviceOverloadErrorEnabled bool) error {
}
return stat.Err()
}

func handleIngesterPushError(err error) error {
if err == nil {
return nil
}

stat, ok := grpcutil.ErrorToStatus(err)
if !ok {
return errors.Wrap(err, failedPushingToIngesterMessage)
}
statusCode := stat.Code()
if isHTTPStatusCode(statusCode) {
// TODO This code is needed for backwards compatibility, since ingesters may still return
// errors created by httpgrpc.Errorf(). If pushErr is one of those errors, we just propagate
// it. This code should be removed in mimir 2.12.0.
// Wrap HTTP gRPC error with more explanatory message.
return httpgrpc.Errorf(int(statusCode), "%s: %s", failedPushingToIngesterMessage, stat.Message())
}

return newIngesterPushError(stat)
}

func isHTTPStatusCode(statusCode codes.Code) bool {
return int(statusCode) >= 100 && int(statusCode) < 600
}

func isClientError(err error) bool {
var ingesterPushErr ingesterPushError
if errors.As(err, &ingesterPushErr) {
return ingesterPushErr.errorCause() == mimirpb.BAD_DATA
}

// TODO This code is needed for backwards compatibility, since ingesters may still return
// errors with HTTP status code created by httpgrpc.Errorf(). If err is one of those errors,
// we treat 4xx errors as client errors. This code should be removed in mimir 2.12.0.

if code := grpcutil.ErrorToStatusCode(err); code/100 == 4 {
return true
}

return false
}
Loading
Loading