Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributor write path: put httpgrpc.Errorf() calls at the topmost level #6191

Merged
merged 24 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
d04d395
Distributor write path: put httpgrpc.Errorf() calls at the topmost level
duricanikolic Oct 2, 2023
dc32aae
Implementing review findings
duricanikolic Oct 3, 2023
b00249b
Get rid of replicasNotMatchError and tooManyClustersError
duricanikolic Oct 3, 2023
944082e
Remove wrapping from req.WriteRequest() and tenant.TenantID() errors
duricanikolic Oct 3, 2023
728d47c
Get rid of distrbutorerror.Error interface
duricanikolic Oct 3, 2023
5bcd840
Get rid of push.getHTTPStatusAndMessage()
duricanikolic Oct 3, 2023
0f784ce
Revert validate.CleanAndValidateMetadata signature change
duricanikolic Oct 3, 2023
b05f212
Make DistributorPushError private
duricanikolic Oct 3, 2023
d05e965
Rename errors by removing the Error suffix
duricanikolic Oct 3, 2023
4f1bdf6
Handle service overloaded error correctly
duricanikolic Oct 3, 2023
3c6ede5
Fixing review findings
duricanikolic Oct 3, 2023
34c1913
Ensure errors.As and erros.Is work correctly
duricanikolic Oct 4, 2023
5c2a168
Revert distributor_test.go
duricanikolic Oct 4, 2023
96c82a0
Fix wrong status code 429 instead of 400 in case of TooManyCluster error
duricanikolic Oct 4, 2023
592c6d4
Some refactorings
duricanikolic Oct 4, 2023
ac43e20
Fixing review findings
duricanikolic Oct 4, 2023
8218c4e
Adding TestToHTTPStatus
duricanikolic Oct 4, 2023
430144d
Update pkg/distributor/distributorerror/errors.go
duricanikolic Oct 4, 2023
c1d2212
Re-declare ReplicasNotMatch and TooManyClusters errors
duricanikolic Oct 4, 2023
10480cd
Fixing Validation error
duricanikolic Oct 4, 2023
865a7f3
Get rid of most of the methods from validation/errors.go
duricanikolic Oct 4, 2023
a179533
Get rid of remaining methods in validation/errors
duricanikolic Oct 4, 2023
55f3944
Rename all distributorerr constructors
duricanikolic Oct 5, 2023
c055c8d
Fixing review findings
duricanikolic Oct 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/grafana/mimir/pkg/util/gziphandler"
util_log "github.com/grafana/mimir/pkg/util/log"
"github.com/grafana/mimir/pkg/util/push"
"github.com/grafana/mimir/pkg/util/validation"
"github.com/grafana/mimir/pkg/util/validation/exporter"
)

Expand Down Expand Up @@ -227,11 +228,11 @@ func (a *API) RegisterRuntimeConfig(runtimeConfigHandler http.HandlerFunc, userL
}

// RegisterDistributor registers the endpoints associated with the distributor.
func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config, reg prometheus.Registerer) {
func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config, reg prometheus.Registerer, limits *validation.Overrides) {
distributorpb.RegisterDistributorServer(a.server.GRPC, d)

a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, d.PushWithMiddlewares), true, false, "POST")
a.RegisterRoute("/otlp/v1/metrics", push.OTLPHandler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, a.cfg.enableOtelMetadataStorage, reg, d.PushWithMiddlewares), true, false, "POST")
a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, limits, d.PushWithMiddlewares), true, false, "POST")
a.RegisterRoute("/otlp/v1/metrics", push.OTLPHandler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, a.cfg.enableOtelMetadataStorage, limits, reg, d.PushWithMiddlewares), true, false, "POST")

a.indexPage.AddLinks(defaultWeight, "Distributor", []IndexPageLink{
{Desc: "Ring status", Path: "/distributor/ring"},
Expand All @@ -258,7 +259,7 @@ type Ingester interface {
}

// RegisterIngester registers the ingester HTTP and gRPC services.
func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config) {
func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config, limits *validation.Overrides) {
client.RegisterIngesterServer(a.server.GRPC, i)

a.indexPage.AddLinks(dangerousWeight, "Dangerous", []IndexPageLink{
Expand All @@ -269,7 +270,7 @@ func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config) {
a.RegisterRoute("/ingester/flush", http.HandlerFunc(i.FlushHandler), false, true, "GET", "POST")
a.RegisterRoute("/ingester/prepare-shutdown", http.HandlerFunc(i.PrepareShutdownHandler), false, true, "GET", "POST", "DELETE")
a.RegisterRoute("/ingester/shutdown", http.HandlerFunc(i.ShutdownHandler), false, true, "GET", "POST")
a.RegisterRoute("/ingester/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, i.PushWithCleanup), true, false, "POST") // For testing and debugging.
a.RegisterRoute("/ingester/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, limits, i.PushWithCleanup), true, false, "POST") // For testing and debugging.
a.RegisterRoute("/ingester/tsdb_metrics", http.HandlerFunc(i.UserRegistryHandler), true, true, "GET")

a.indexPage.AddLinks(defaultWeight, "Ingester", []IndexPageLink{
Expand Down
61 changes: 33 additions & 28 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"golang.org/x/sync/errgroup"

"github.com/grafana/mimir/pkg/cardinality"
"github.com/grafana/mimir/pkg/distributor/distributorerror"
ingester_client "github.com/grafana/mimir/pkg/ingester/client"
"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/util"
Expand Down Expand Up @@ -77,9 +78,6 @@ const (
// Size of "slab" when using pooled buffers for marshaling write requests. When handling single Push request
// buffers for multiple write requests sent to ingesters will be allocated from single "slab", if there is enough space.
writeRequestSlabPoolSize = 512 * 1024

// 529 is non-standard status code used by some services to signal that "The service is overloaded".
statusServiceOverload = 529
)

// Distributor forwards appends and queries to individual ingesters.
Expand Down Expand Up @@ -728,15 +726,15 @@ func (d *Distributor) prePushHaDedupeMiddleware(next push.Func) push.Func {

removeReplica, err := d.checkSample(ctx, userID, cluster, replica)
if err != nil {
if errors.Is(err, replicasNotMatchError{}) {
var replicasNotMatchErr distributorerror.ReplicasNotMatch
if errors.As(err, &replicasNotMatchErr) {
duricanikolic marked this conversation as resolved.
Show resolved Hide resolved
// These samples have been deduped.
d.dedupedSamples.WithLabelValues(userID, cluster).Add(float64(numSamples))
return httpgrpc.Errorf(http.StatusAccepted, err.Error())
}

if errors.Is(err, tooManyClustersError{}) {
var tooManyClustersErr distributorerror.TooManyClusters
if errors.As(err, &tooManyClustersErr) {
duricanikolic marked this conversation as resolved.
Show resolved Hide resolved
d.discardedSamplesTooManyHaClusters.WithLabelValues(userID, group).Add(float64(numSamples))
return httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

return err
Expand Down Expand Up @@ -904,9 +902,8 @@ func (d *Distributor) prePushValidationMiddleware(next push.Func) push.Func {
// invalid data but all the remaining series could be perfectly valid.
if validationErr != nil {
if firstPartialErr == nil {
// The series labels may be retained by validationErr but that's not a problem for this
// use case because we format it calling Error() and then we discard it.
firstPartialErr = httpgrpc.Errorf(http.StatusBadRequest, validationErr.Error())
// The series are never retained by validationErr. This is guaranteed by the way the latter is built.
firstPartialErr = validationErr
}
removeIndexes = append(removeIndexes, tsIdx)
continue
Expand All @@ -926,9 +923,8 @@ func (d *Distributor) prePushValidationMiddleware(next push.Func) push.Func {
for mIdx, m := range req.Metadata {
if validationErr := validation.CleanAndValidateMetadata(d.metadataValidationMetrics, d.limits, userID, m); validationErr != nil {
if firstPartialErr == nil {
// The metadata info may be retained by validationErr but that's not a problem for this
// use case because we format it calling Error() and then we discard it.
firstPartialErr = httpgrpc.Errorf(http.StatusBadRequest, validationErr.Error())
// The series are never retained by validationErr. This is guaranteed by the way the latter is built.
firstPartialErr = validationErr
}

removeIndexes = append(removeIndexes, mIdx)
Expand All @@ -950,10 +946,7 @@ func (d *Distributor) prePushValidationMiddleware(next push.Func) push.Func {
d.discardedSamplesRateLimited.WithLabelValues(userID, group).Add(float64(validatedSamples))
d.discardedExemplarsRateLimited.WithLabelValues(userID).Add(float64(validatedExemplars))
d.discardedMetadataRateLimited.WithLabelValues(userID).Add(float64(validatedMetadata))
// Return a 429 here to tell the client it is going too fast.
// Client may discard the data or slow down and re-send.
// Prometheus v2.26 added a remote-write option 'retry_on_http_429'.
return httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewIngestionRateLimitedError(d.limits.IngestionRate(userID), d.limits.IngestionBurstSize(userID)).Error())
return distributorerror.IngestionRateLimitedErrorf(validation.IngestionRateLimitedMsgFormat, d.limits.IngestionRate(userID), d.limits.IngestionBurstSize(userID))
}

// totalN included samples, exemplars and metadata. Ingester follows this pattern when computing its ingestion rate.
Expand Down Expand Up @@ -1054,13 +1047,7 @@ func (d *Distributor) limitsMiddleware(next push.Func) push.Func {
if !d.requestRateLimiter.AllowN(now, userID, 1) {
d.discardedRequestsRateLimited.WithLabelValues(userID).Add(1)

// Return a 429 or a 529 here depending on configuration to tell the client it is going too fast.
// Client may discard the data or slow down and re-send.
// Prometheus v2.26 added a remote-write option 'retry_on_http_429'.
if d.limits.ServiceOverloadStatusCodeOnRateLimitEnabled(userID) {
return httpgrpc.Errorf(statusServiceOverload, validation.NewRequestRateLimitedError(d.limits.RequestRate(userID), d.limits.RequestBurstSize(userID)).Error())
}
return httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewRequestRateLimitedError(d.limits.RequestRate(userID), d.limits.RequestBurstSize(userID)).Error())
return distributorerror.RequestRateLimitedErrorf(validation.RequestRateLimitedMsgFormat, d.limits.RequestRate(userID), d.limits.RequestBurstSize(userID))
}

// Note that we don't enforce the per-user ingestion rate limit here since we need to apply validation
Expand Down Expand Up @@ -1093,11 +1080,29 @@ func (d *Distributor) Push(ctx context.Context, req *mimirpb.WriteRequest) (*mim
mimirpb.ReuseSlice(req.Timeseries)
})

err := d.PushWithMiddlewares(ctx, pushReq)
if err != nil {
return nil, err
pushErr := d.PushWithMiddlewares(ctx, pushReq)
if pushErr == nil {
return &mimirpb.WriteResponse{}, nil
}
handledErr := d.handlePushError(ctx, pushErr)
return nil, handledErr
}

func (d *Distributor) handlePushError(ctx context.Context, pushErr error) error {
if errors.Is(pushErr, context.Canceled) {
return pushErr
}

serviceOverloadErrorEnabled := false
userID, err := tenant.TenantID(ctx)
if err == nil {
serviceOverloadErrorEnabled = d.limits.ServiceOverloadStatusCodeOnRateLimitEnabled(userID)
}
httpStatus, ok := distributorerror.ToHTTPStatus(pushErr, serviceOverloadErrorEnabled)
if ok {
duricanikolic marked this conversation as resolved.
Show resolved Hide resolved
return httpgrpc.Errorf(httpStatus, pushErr.Error())
}
return &mimirpb.WriteResponse{}, nil
return pushErr
}

// push takes a write request and distributes it to ingesters using the ring.
Expand Down
24 changes: 13 additions & 11 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"google.golang.org/grpc/status"

"github.com/grafana/mimir/pkg/cardinality"
"github.com/grafana/mimir/pkg/distributor/distributorerror"
"github.com/grafana/mimir/pkg/ingester"
"github.com/grafana/mimir/pkg/ingester/client"
"github.com/grafana/mimir/pkg/mimirpb"
Expand Down Expand Up @@ -189,7 +190,7 @@ func TestDistributor_Push(t *testing.T) {
happyIngesters: 3,
samples: samplesIn{num: 25, startTimestampMs: 123456789000},
metadata: 5,
expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewIngestionRateLimitedError(20, 20).Error()),
expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, distributorerror.IngestionRateLimitedErrorf(validation.IngestionRateLimitedMsgFormat, 20, 20).Error()),
metricNames: []string{lastSeenTimestamp},
expectedMetrics: `
# HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user.
Expand Down Expand Up @@ -487,7 +488,7 @@ func TestDistributor_PushRequestRateLimiter(t *testing.T) {
pushes: []testPush{
{expectedError: nil},
{expectedError: nil},
{expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewRequestRateLimitedError(4, 2).Error())},
{expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, distributorerror.RequestRateLimitedErrorf(validation.RequestRateLimitedMsgFormat, 4, 2).Error())},
},
},
"request limit is disabled when set to 0": {
Expand All @@ -508,7 +509,7 @@ func TestDistributor_PushRequestRateLimiter(t *testing.T) {
{expectedError: nil},
{expectedError: nil},
{expectedError: nil},
{expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewRequestRateLimitedError(2, 3).Error())},
{expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, distributorerror.RequestRateLimitedErrorf(validation.RequestRateLimitedMsgFormat, 2, 3).Error())},
},
},
"request limit is reached return 529 when enable service overload error set to true": {
Expand All @@ -519,7 +520,7 @@ func TestDistributor_PushRequestRateLimiter(t *testing.T) {
pushes: []testPush{
{expectedError: nil},
{expectedError: nil},
{expectedError: httpgrpc.Errorf(statusServiceOverload, validation.NewRequestRateLimitedError(4, 2).Error())},
{expectedError: httpgrpc.Errorf(distributorerror.StatusServiceOverloaded, distributorerror.RequestRateLimitedErrorf(validation.RequestRateLimitedMsgFormat, 4, 2).Error())},
},
},
}
Expand Down Expand Up @@ -580,10 +581,10 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
pushes: []testPush{
{samples: 2, expectedError: nil},
{samples: 1, expectedError: nil},
{samples: 2, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewIngestionRateLimitedError(10, 5).Error())},
{samples: 2, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, distributorerror.IngestionRateLimitedErrorf(validation.IngestionRateLimitedMsgFormat, 10, 5).Error())},
{samples: 2, expectedError: nil},
{samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewIngestionRateLimitedError(10, 5).Error())},
{metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewIngestionRateLimitedError(10, 5).Error())},
{samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, distributorerror.IngestionRateLimitedErrorf(validation.IngestionRateLimitedMsgFormat, 10, 5).Error())},
{metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, distributorerror.IngestionRateLimitedErrorf(validation.IngestionRateLimitedMsgFormat, 10, 5).Error())},
},
},
"for each distributor, set an ingestion burst limit.": {
Expand All @@ -593,10 +594,10 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
pushes: []testPush{
{samples: 10, expectedError: nil},
{samples: 5, expectedError: nil},
{samples: 5, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewIngestionRateLimitedError(10, 20).Error())},
{samples: 5, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, distributorerror.IngestionRateLimitedErrorf(validation.IngestionRateLimitedMsgFormat, 10, 20).Error())},
{samples: 5, expectedError: nil},
{samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewIngestionRateLimitedError(10, 20).Error())},
{metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewIngestionRateLimitedError(10, 20).Error())},
{samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, distributorerror.IngestionRateLimitedErrorf(validation.IngestionRateLimitedMsgFormat, 10, 20).Error())},
{metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, distributorerror.IngestionRateLimitedErrorf(validation.IngestionRateLimitedMsgFormat, 10, 20).Error())},
},
},
}
Expand Down Expand Up @@ -2721,7 +2722,8 @@ func TestHaDedupeMiddleware(t *testing.T) {
pushReq := push.NewParsedRequest(req)
pushReq.AddCleanup(cleanup)
err := middleware(tc.ctx, pushReq)
gotErrs = append(gotErrs, err)
handledErr := ds[0].handlePushError(tc.ctx, err)
gotErrs = append(gotErrs, handledErr)
}

assert.Equal(t, tc.expectedReqs, gotReqs)
Expand Down
150 changes: 150 additions & 0 deletions pkg/distributor/distributorerror/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// SPDX-License-Identifier: AGPL-3.0-only

package distributorerror

import (
"errors"
"fmt"
"net/http"
)

const (
// 529 is non-standard status code used by some services to signal that "The service is overloaded".
StatusServiceOverloaded = 529
)

// ReplicasNotMatch is an error stating that replicas do not match.
type ReplicasNotMatch struct {
duricanikolic marked this conversation as resolved.
Show resolved Hide resolved
replica, elected, format string
}

// ReplicasNotMatchErrorf formats and returns a ReplicasNotMatch error.
// The error message is formatted according to the given format specifier, replica and elected.
// In order to print the limit and the burst, the format specifier must contain two %s verbs.
func ReplicasNotMatchErrorf(format, replica, elected string) ReplicasNotMatch {
return ReplicasNotMatch{
replica: replica,
elected: elected,
format: format,
}
}
duricanikolic marked this conversation as resolved.
Show resolved Hide resolved

func (e ReplicasNotMatch) Error() string {
return fmt.Sprintf(e.format, e.replica, e.elected)
}

// TooManyClusters is an error stating that there are too many HA clusters.
type TooManyClusters struct {
limit int
format string
duricanikolic marked this conversation as resolved.
Show resolved Hide resolved
}

// TooManyClustersErrorf formats and returns a TooManyClusters error.
// The error message is formatted according to the given format specifier and limit.
// In order to print the limit, the format specifier must contain a %d verb.
func TooManyClustersErrorf(format string, limit int) TooManyClusters {
return TooManyClusters{limit: limit, format: format}
}

func (e TooManyClusters) Error() string {
return fmt.Sprintf(e.format, e.limit)
}

// Validation is an error, used to represent all validation errors from the validation package.
type Validation struct {
error
}

// ValidationErrorf formats and returns a Validation error.
// The error message is formatted according to the given format specifier and arguments.
// The new error does not retain any of the passed parameters.
func ValidationErrorf(format string, args ...any) Validation {
// In order to ensure that no label is retained, we first create a string out of the
// given format and args, and then create an error containing that message.
msg := fmt.Sprintf(format, args...)
return Validation{error: errors.New(msg)}
}

// IngestionRateLimited is an error used to represent the ingestion rate limited error.
type IngestionRateLimited struct {
format string
limit float64
burst int
}

// IngestionRateLimitedErrorf formats and returns a IngestionRateLimited error.
// The error message is formatted according to the given format specifier, limit and burst.
// In order to print the limit and the burst, the format specifier must contain %v and %d verbs.
func IngestionRateLimitedErrorf(format string, limit float64, burst int) IngestionRateLimited {
return IngestionRateLimited{
format: format,
limit: limit,
burst: burst,
}
}

func (e IngestionRateLimited) Error() string {
return fmt.Sprintf(e.format, e.limit, e.burst)
}

// RequestRateLimited is an error used to represent the request rate limited error.
type RequestRateLimited struct {
format string
limit float64
burst int
}

// RequestRateLimitedErrorf formats and returns a RequestRateLimited error.
// The error message is formatted according to the given format specifier, limit and burst.
// In order to print the limit and the burst, the format specifier must contain %v and %d verbs.
func RequestRateLimitedErrorf(format string, limit float64, burst int) RequestRateLimited {
return RequestRateLimited{
format: format,
limit: limit,
burst: burst,
}
}

func (e RequestRateLimited) Error() string {
return fmt.Sprintf(e.format, e.limit, e.burst)
}

// ToHTTPStatus converts the given error into an appropriate HTTP status corresponding
// to that error, if the error is one of the errors from this package. In that case,
// the resulting HTTP status is returned with status true. Otherwise, -1 and the status
// false are returned.
// TODO Remove this method once HTTP status codes are removed from distributor.Push.
// TODO This method should be moved into the push package.
func ToHTTPStatus(pushErr error, serviceOverloadErrorEnabled bool) (int, bool) {
var (
replicasNotMatchErr ReplicasNotMatch
tooManyClustersErr TooManyClusters
validationErr Validation
ingestionRateErr IngestionRateLimited
requestRateErr RequestRateLimited
)

switch {
case errors.As(pushErr, &replicasNotMatchErr):
return http.StatusAccepted, true
case errors.As(pushErr, &tooManyClustersErr):
return http.StatusBadRequest, true
case errors.As(pushErr, &validationErr):
return http.StatusBadRequest, true
case errors.As(pushErr, &ingestionRateErr):
// Return a 429 here to tell the client it is going too fast.
// Client may discard the data or slow down and re-send.
// Prometheus v2.26 added a remote-write option 'retry_on_http_429'.
return http.StatusTooManyRequests, true
case errors.As(pushErr, &requestRateErr):
// Return a 429 or a 529 here depending on configuration to tell the client it is going too fast.
// Client may discard the data or slow down and re-send.
// Prometheus v2.26 added a remote-write option 'retry_on_http_429'.
if serviceOverloadErrorEnabled {
return StatusServiceOverloaded, true
}
return http.StatusTooManyRequests, true
default:
return -1, false
}
}
Loading
Loading