Skip to content

Commit

Permalink
Distributor: replace ring.DoBatch with ring.DoBatchWithClientError (#…
Browse files Browse the repository at this point in the history
…6636)

Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>
(cherry picked from commit 8c6032f)
  • Loading branch information
duricanikolic authored and grafanabot committed Nov 13, 2023
1 parent 709a8ad commit 231976b
Show file tree
Hide file tree
Showing 8 changed files with 268 additions and 176 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/golang/snappy v0.0.4
github.com/google/gopacket v1.1.19
github.com/gorilla/mux v1.8.1
github.com/grafana/dskit v0.0.0-20231110082806-620b5f187e90
github.com/grafana/dskit v0.0.0-20231113093401-0ae8e2e7ba38
github.com/grafana/e2e v0.1.1
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/json-iterator/go v1.1.12
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -538,8 +538,8 @@ github.com/gosimple/slug v1.1.1 h1:fRu/digW+NMwBIP+RmviTK97Ho/bEj/C9swrCspN3D4=
github.com/gosimple/slug v1.1.1/go.mod h1:ER78kgg1Mv0NQGlXiDe57DpCyfbNywXXZ9mIorhxAf0=
github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc h1:PXZQA2WCxe85Tnn+WEvr8fDpfwibmEPgfgFEaC87G24=
github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc/go.mod h1:AHHlOEv1+GGQ3ktHMlhuTUwo3zljV3QJbC0+8o2kn+4=
github.com/grafana/dskit v0.0.0-20231110082806-620b5f187e90 h1:+YsdVHl/VdXqCQDuVefswsTfHTT25mbergE9xPwvDlo=
github.com/grafana/dskit v0.0.0-20231110082806-620b5f187e90/go.mod h1:8dsy5tQOkeNQyjXpm5mQsbCu3H5uzeBD35MzRQFznKU=
github.com/grafana/dskit v0.0.0-20231113093401-0ae8e2e7ba38 h1:nHd5vwL0g4zvYFcjGDLAij5EelqhAWM+nxypldn5Wyk=
github.com/grafana/dskit v0.0.0-20231113093401-0ae8e2e7ba38/go.mod h1:8dsy5tQOkeNQyjXpm5mQsbCu3H5uzeBD35MzRQFznKU=
github.com/grafana/e2e v0.1.1 h1:/b6xcv5BtoBnx8cZnCiey9DbjEc8z7gXHO5edoeRYxc=
github.com/grafana/e2e v0.1.1/go.mod h1:RpNLgae5VT+BUHvPE+/zSypmOXKwEu4t+tnEMS1ATaE=
github.com/grafana/goautoneg v0.0.0-20231010094147-47ce5e72a9ae h1:Yxbw9jKGJVC6qAK5Ubzzb/qZwM6rRMMqaDc/d4Vp3pM=
Expand Down
74 changes: 26 additions & 48 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
"go.uber.org/atomic"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"

"github.com/grafana/mimir/pkg/cardinality"
ingester_client "github.com/grafana/mimir/pkg/ingester/client"
Expand Down Expand Up @@ -1306,33 +1305,37 @@ func (d *Distributor) push(ctx context.Context, pushReq *Request) error {
localCtx = ingester_client.WithSlabPool(localCtx, slabPool)
}

err = ring.DoBatch(ctx, ring.WriteNoExtend, subRing, keys, func(ingester ring.InstanceDesc, indexes []int) error {
var timeseriesCount, metadataCount int
for _, i := range indexes {
if i >= initialMetadataIndex {
metadataCount++
} else {
timeseriesCount++
err = ring.DoBatchWithClientError(ctx, ring.WriteNoExtend, subRing, keys,
func(ingester ring.InstanceDesc, indexes []int) error {
var timeseriesCount, metadataCount int
for _, i := range indexes {
if i >= initialMetadataIndex {
metadataCount++
} else {
timeseriesCount++
}
}
}

timeseries := preallocSliceIfNeeded[mimirpb.PreallocTimeseries](timeseriesCount)
metadata := preallocSliceIfNeeded[*mimirpb.MetricMetadata](metadataCount)
timeseries := preallocSliceIfNeeded[mimirpb.PreallocTimeseries](timeseriesCount)
metadata := preallocSliceIfNeeded[*mimirpb.MetricMetadata](metadataCount)

for _, i := range indexes {
if i >= initialMetadataIndex {
metadata = append(metadata, req.Metadata[i-initialMetadataIndex])
} else {
timeseries = append(timeseries, req.Timeseries[i])
for _, i := range indexes {
if i >= initialMetadataIndex {
metadata = append(metadata, req.Metadata[i-initialMetadataIndex])
} else {
timeseries = append(timeseries, req.Timeseries[i])
}
}
}

err := d.send(localCtx, ingester, timeseries, metadata, req.Source)
if errors.Is(err, context.DeadlineExceeded) {
return errors.Wrap(err, deadlineExceededWrapMessage)
}
return err
}, func() { pushReq.CleanUp(); cancel() })
err := d.send(localCtx, ingester, timeseries, metadata, req.Source)
if errors.Is(err, context.DeadlineExceeded) {
return errors.Wrap(err, deadlineExceededWrapMessage)
}
return err
},
func() { pushReq.CleanUp(); cancel() },
isClientError,
)

return err
}
Expand Down Expand Up @@ -1391,31 +1394,6 @@ func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, time
return handleIngesterPushError(err)
}

func handleIngesterPushError(err error) error {
if err == nil {
return nil
}

stat, ok := grpcutil.ErrorToStatus(err)
if !ok {
return errors.Wrap(err, failedPushingToIngesterMessage)
}
statusCode := stat.Code()
if isHTTPStatusCode(statusCode) {
// TODO This code is needed for backwards compatibility, since ingesters may still return
// errors created by httpgrpc.Errorf(). If pushErr is one of those errors, we just propagate
// it. This code should be removed in mimir 2.12.0.
// Wrap HTTP gRPC error with more explanatory message.
return httpgrpc.Errorf(int(statusCode), "%s: %s", failedPushingToIngesterMessage, stat.Message())
}

return newIngesterPushError(stat)
}

func isHTTPStatusCode(statusCode codes.Code) bool {
return int(statusCode) >= 100 && int(statusCode) < 600
}

// forReplicationSet runs f, in parallel, for all ingesters in the input replication set.
func forReplicationSet[T any](ctx context.Context, d *Distributor, replicationSet ring.ReplicationSet, f func(context.Context, ingester_client.IngesterClient) (T, error)) ([]T, error) {
wrappedF := func(ctx context.Context, ing *ring.InstanceDesc) (T, error) {
Expand Down
82 changes: 0 additions & 82 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4910,88 +4910,6 @@ func TestSeriesAreShardedToCorrectIngesters(t *testing.T) {
assert.Equal(t, series, totalMetadata) // each series has unique metric name, and each metric name gets metadata
}

func TestHandleIngesterPushError(t *testing.T) {
testErrorMsg := "this is a test error message"
outputErrorMsgPrefix := "failed pushing to ingester"

// Ensure that no error gets translated into no error.
t.Run("no error gives no error", func(t *testing.T) {
err := handleIngesterPushError(nil)
require.NoError(t, err)
})

// Ensure that the errors created by httpgrpc get translated into
// other errors created by httpgrpc with the same code, and with
// a more explanatory message.
// TODO: this is needed for backwards compatibility and will be removed
// in mimir 2.12.0.
httpgrpcTests := map[string]struct {
ingesterPushError error
expectedStatus int32
expectedMessage string
}{
"a 4xx HTTP gRPC error gives a 4xx HTTP gRPC error": {
ingesterPushError: httpgrpc.Errorf(http.StatusBadRequest, testErrorMsg),
expectedStatus: http.StatusBadRequest,
expectedMessage: fmt.Sprintf("%s: %s", outputErrorMsgPrefix, testErrorMsg),
},
"a 5xx HTTP gRPC error gives a 5xx HTTP gRPC error": {
ingesterPushError: httpgrpc.Errorf(http.StatusServiceUnavailable, testErrorMsg),
expectedStatus: http.StatusServiceUnavailable,
expectedMessage: fmt.Sprintf("%s: %s", outputErrorMsgPrefix, testErrorMsg),
},
}
for testName, testData := range httpgrpcTests {
t.Run(testName, func(t *testing.T) {
err := handleIngesterPushError(testData.ingesterPushError)
res, ok := httpgrpc.HTTPResponseFromError(err)
require.True(t, ok)
require.NotNil(t, res)
require.Equal(t, testData.expectedStatus, res.GetCode())
require.Equal(t, testData.expectedMessage, string(res.Body))
})
}

// Ensure that the errors created by gogo/status package get translated
// into ingesterPushError messages.
statusTests := map[string]struct {
ingesterPushError error
expectedIngesterPushError ingesterPushError
}{
"a gRPC error with details gives an ingesterPushError with the same details": {
ingesterPushError: createStatusWithDetails(t, codes.Unavailable, testErrorMsg, mimirpb.UNKNOWN_CAUSE).Err(),
expectedIngesterPushError: newIngesterPushError(createStatusWithDetails(t, codes.Unavailable, testErrorMsg, mimirpb.UNKNOWN_CAUSE)),
},
"a DeadlineExceeded gRPC ingester error gives an ingesterPushError with UNKNOWN_CAUSE cause": {
// This is how context.DeadlineExceeded error is translated into a gRPC error.
ingesterPushError: status.Error(codes.DeadlineExceeded, context.DeadlineExceeded.Error()),
expectedIngesterPushError: newIngesterPushError(createStatusWithDetails(t, codes.Unavailable, context.DeadlineExceeded.Error(), mimirpb.UNKNOWN_CAUSE)),
},
"an Unavailable gRPC error without details gives an ingesterPushError with UNKNOWN_CAUSE cause": {
ingesterPushError: status.Error(codes.Unavailable, testErrorMsg),
expectedIngesterPushError: newIngesterPushError(createStatusWithDetails(t, codes.Unavailable, testErrorMsg, mimirpb.UNKNOWN_CAUSE)),
},
"an Internal gRPC ingester error without details gives an ingesterPushError with UNKNOWN_CAUSE cause": {
ingesterPushError: status.Error(codes.Internal, testErrorMsg),
expectedIngesterPushError: newIngesterPushError(createStatusWithDetails(t, codes.Unavailable, testErrorMsg, mimirpb.UNKNOWN_CAUSE)),
},
"an Unknown gRPC ingester error without details gives an ingesterPushError with UNKNOWN_CAUSE cause": {
ingesterPushError: status.Error(codes.Unknown, testErrorMsg),
expectedIngesterPushError: newIngesterPushError(createStatusWithDetails(t, codes.Unavailable, testErrorMsg, mimirpb.UNKNOWN_CAUSE)),
},
}
for testName, testData := range statusTests {
t.Run(testName, func(t *testing.T) {
err := handleIngesterPushError(testData.ingesterPushError)
ingesterPushErr, ok := err.(ingesterPushError)
require.True(t, ok)

require.Equal(t, testData.expectedIngesterPushError.Error(), ingesterPushErr.Error())
require.Equal(t, testData.expectedIngesterPushError.errorCause(), ingesterPushErr.errorCause())
})
}
}

func TestHandlePushError(t *testing.T) {
testErrorMsg := "this is a test error message"
userID := "test"
Expand Down
46 changes: 45 additions & 1 deletion pkg/distributor/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
package distributor

import (
"errors"
"fmt"

"github.com/gogo/status"
"github.com/grafana/dskit/grpcutil"
"github.com/grafana/dskit/httpgrpc"
"github.com/pkg/errors"
"google.golang.org/grpc/codes"

"github.com/grafana/mimir/pkg/mimirpb"
Expand Down Expand Up @@ -226,3 +228,45 @@ func toGRPCError(pushErr error, serviceOverloadErrorEnabled bool) error {
}
return stat.Err()
}

func handleIngesterPushError(err error) error {
if err == nil {
return nil
}

stat, ok := grpcutil.ErrorToStatus(err)
if !ok {
return errors.Wrap(err, failedPushingToIngesterMessage)
}
statusCode := stat.Code()
if isHTTPStatusCode(statusCode) {
// TODO This code is needed for backwards compatibility, since ingesters may still return
// errors created by httpgrpc.Errorf(). If pushErr is one of those errors, we just propagate
// it. This code should be removed in mimir 2.12.0.
// Wrap HTTP gRPC error with more explanatory message.
return httpgrpc.Errorf(int(statusCode), "%s: %s", failedPushingToIngesterMessage, stat.Message())
}

return newIngesterPushError(stat)
}

func isHTTPStatusCode(statusCode codes.Code) bool {
return int(statusCode) >= 100 && int(statusCode) < 600
}

func isClientError(err error) bool {
var ingesterPushErr ingesterPushError
if errors.As(err, &ingesterPushErr) {
return ingesterPushErr.errorCause() == mimirpb.BAD_DATA
}

// TODO This code is needed for backwards compatibility, since ingesters may still return
// errors with HTTP status code created by httpgrpc.Errorf(). If err is one of those errors,
// we treat 4xx errors as client errors. This code should be removed in mimir 2.12.0.

if code := grpcutil.ErrorToStatusCode(err); code/100 == 4 {
return true
}

return false
}
Loading

0 comments on commit 231976b

Please sign in to comment.