From d04a78d67c59778fc40a4b494b8cf5faf2959a1f Mon Sep 17 00:00:00 2001 From: Yuri Nikolic Date: Thu, 12 Oct 2023 16:21:12 +0200 Subject: [PATCH] Allow ingester.Push to proceed on soft errors only Signed-off-by: Yuri Nikolic --- pkg/ingester/errors.go | 31 +++++++++++++++++- pkg/ingester/errors_test.go | 47 +++++++++++++++------------- pkg/ingester/ingester.go | 38 ++++++++++++---------- pkg/ingester/instance_limits_test.go | 4 +-- 4 files changed, 80 insertions(+), 40 deletions(-) diff --git a/pkg/ingester/errors.go b/pkg/ingester/errors.go index 43a4a8f0014..3131ed8e0e0 100644 --- a/pkg/ingester/errors.go +++ b/pkg/ingester/errors.go @@ -78,6 +78,14 @@ type ingesterError interface { errorType() ingesterErrorType } +// softError is a marker interface for the errors on which ingester.Push should not stop immediately. +type softError interface { + error + soft() +} + +type softErrorFunction func() softError + // wrapOrAnnotateWithUser prepends the given userID to the given error. // If the given error matches one of the errors from this package, the // returned error retains a reference to the former. @@ -114,6 +122,9 @@ func (e sampleError) errorType() ingesterErrorType { return badData } +// sampleError implements the softError interface. +func (e sampleError) soft() {} + func newSampleError(errID globalerror.ID, errMsg string, timestamp model.Time, labels []mimirpb.LabelAdapter) sampleError { return sampleError{ errID: errID, @@ -167,6 +178,9 @@ func (e exemplarError) errorType() ingesterErrorType { return badData } +// exemplarError implements the softError interface. +func (e exemplarError) soft() {} + func newExemplarError(errID globalerror.ID, errMsg string, timestamp model.Time, seriesLabels, exemplarLabels []mimirpb.LabelAdapter) exemplarError { return exemplarError{ errID: errID, @@ -207,7 +221,10 @@ func (e tsdbIngestExemplarErr) errorType() ingesterErrorType { return badData } -func newTSDBIngestExemplarErr(ingestErr error, timestamp model.Time, seriesLabels, exemplarLabels []mimirpb.LabelAdapter) error { +// tsdbIngestExemplarErr implements the softError interface. +func (e tsdbIngestExemplarErr) soft() {} + +func newTSDBIngestExemplarErr(ingestErr error, timestamp model.Time, seriesLabels, exemplarLabels []mimirpb.LabelAdapter) tsdbIngestExemplarErr { return tsdbIngestExemplarErr{ originalErr: ingestErr, timestamp: timestamp, @@ -240,6 +257,9 @@ func (e perUserSeriesLimitReachedError) errorType() ingesterErrorType { return badData } +// perUserSeriesLimitReachedError implements the softError interface. +func (e perUserSeriesLimitReachedError) soft() {} + // perUserMetadataLimitReachedError is an ingesterError indicating that a per-user metadata limit has been reached. type perUserMetadataLimitReachedError struct { limit int @@ -264,6 +284,9 @@ func (e perUserMetadataLimitReachedError) errorType() ingesterErrorType { return badData } +// perUserMetadataLimitReachedError implements the softError interface. +func (e perUserMetadataLimitReachedError) soft() {} + // perMetricSeriesLimitReachedError is an ingesterError indicating that a per-metric series limit has been reached. type perMetricSeriesLimitReachedError struct { limit int @@ -293,6 +316,9 @@ func (e perMetricSeriesLimitReachedError) errorType() ingesterErrorType { return badData } +// perMetricSeriesLimitReachedError implements the softError interface. +func (e perMetricSeriesLimitReachedError) soft() {} + // perMetricMetadataLimitReachedError is an ingesterError indicating that a per-metric metadata limit has been reached. type perMetricMetadataLimitReachedError struct { limit int @@ -322,6 +348,9 @@ func (e perMetricMetadataLimitReachedError) errorType() ingesterErrorType { return badData } +// perMetricMetadataLimitReachedError implements the softError interface. +func (e perMetricMetadataLimitReachedError) soft() {} + // unavailableError is an ingesterError indicating that the ingester is unavailable. type unavailableError struct { state services.State diff --git a/pkg/ingester/errors_test.go b/pkg/ingester/errors_test.go index 8950ea73e00..1a4342d2bf5 100644 --- a/pkg/ingester/errors_test.go +++ b/pkg/ingester/errors_test.go @@ -32,12 +32,12 @@ func TestUnavailableError(t *testing.T) { require.Error(t, err) expectedMsg := fmt.Sprintf(integerUnavailableMsgFormat, state) require.EqualError(t, err, expectedMsg) - checkIngesterError(t, err, unavailable) + checkIngesterError(t, err, unavailable, false) wrappedErr := wrapOrAnnotateWithUser(err, userID) require.ErrorIs(t, wrappedErr, err) require.ErrorAs(t, wrappedErr, &unavailableError{}) - checkIngesterError(t, wrappedErr, unavailable) + checkIngesterError(t, wrappedErr, unavailable, false) } func TestInstanceLimitReachedError(t *testing.T) { @@ -45,12 +45,12 @@ func TestInstanceLimitReachedError(t *testing.T) { err := newInstanceLimitReachedError(limitErrorMessage) require.Error(t, err) require.EqualError(t, err, limitErrorMessage) - checkIngesterError(t, err, instanceLimitReached) + checkIngesterError(t, err, instanceLimitReached, false) wrappedErr := wrapOrAnnotateWithUser(err, userID) require.ErrorIs(t, wrappedErr, err) require.ErrorAs(t, wrappedErr, &instanceLimitReachedError{}) - checkIngesterError(t, wrappedErr, instanceLimitReached) + checkIngesterError(t, wrappedErr, instanceLimitReached, false) } func TestNewTSDBUnavailableError(t *testing.T) { @@ -58,7 +58,7 @@ func TestNewTSDBUnavailableError(t *testing.T) { err := newTSDBUnavailableError(tsdbErrMsg) require.Error(t, err) require.EqualError(t, err, tsdbErrMsg) - checkIngesterError(t, err, tsdbUnavailable) + checkIngesterError(t, err, tsdbUnavailable, false) wrappedErr := fmt.Errorf("wrapped: %w", err) require.ErrorIs(t, wrappedErr, err) @@ -67,7 +67,7 @@ func TestNewTSDBUnavailableError(t *testing.T) { wrappedWithUserErr := wrapOrAnnotateWithUser(err, userID) require.ErrorIs(t, wrappedWithUserErr, err) require.ErrorAs(t, wrappedWithUserErr, &tsdbUnavailableError{}) - checkIngesterError(t, wrappedErr, tsdbUnavailable) + checkIngesterError(t, wrappedErr, tsdbUnavailable, false) } func TestNewPerUserSeriesLimitError(t *testing.T) { @@ -78,12 +78,12 @@ func TestNewPerUserSeriesLimitError(t *testing.T) { validation.MaxSeriesPerUserFlag, ) require.Equal(t, expectedErrMsg, err.Error()) - checkIngesterError(t, err, badData) + checkIngesterError(t, err, badData, true) wrappedErr := wrapOrAnnotateWithUser(err, userID) require.ErrorIs(t, wrappedErr, err) require.ErrorAs(t, wrappedErr, &perUserSeriesLimitReachedError{}) - checkIngesterError(t, wrappedErr, badData) + checkIngesterError(t, wrappedErr, badData, true) } func TestNewPerUserMetadataLimitError(t *testing.T) { @@ -94,12 +94,12 @@ func TestNewPerUserMetadataLimitError(t *testing.T) { validation.MaxMetadataPerUserFlag, ) require.Equal(t, expectedErrMsg, err.Error()) - checkIngesterError(t, err, badData) + checkIngesterError(t, err, badData, true) wrappedErr := wrapOrAnnotateWithUser(err, userID) require.ErrorIs(t, wrappedErr, err) require.ErrorAs(t, wrappedErr, &perUserMetadataLimitReachedError{}) - checkIngesterError(t, wrappedErr, badData) + checkIngesterError(t, wrappedErr, badData, true) } func TestNewPerMetricSeriesLimitError(t *testing.T) { @@ -116,12 +116,12 @@ func TestNewPerMetricSeriesLimitError(t *testing.T) { labels.String(), ) require.Equal(t, expectedErrMsg, err.Error()) - checkIngesterError(t, err, badData) + checkIngesterError(t, err, badData, true) wrappedErr := wrapOrAnnotateWithUser(err, userID) require.ErrorIs(t, wrappedErr, err) require.ErrorAs(t, wrappedErr, &perMetricSeriesLimitReachedError{}) - checkIngesterError(t, wrappedErr, badData) + checkIngesterError(t, wrappedErr, badData, true) } func TestNewPerMetricMetadataLimitError(t *testing.T) { @@ -138,12 +138,12 @@ func TestNewPerMetricMetadataLimitError(t *testing.T) { labels.String(), ) require.Equal(t, expectedErrMsg, err.Error()) - checkIngesterError(t, err, badData) + checkIngesterError(t, err, badData, true) wrappedErr := wrapOrAnnotateWithUser(err, userID) require.ErrorIs(t, wrappedErr, err) require.ErrorAs(t, wrappedErr, &perMetricMetadataLimitReachedError{}) - checkIngesterError(t, wrappedErr, badData) + checkIngesterError(t, wrappedErr, badData, true) } func TestNewSampleError(t *testing.T) { @@ -177,13 +177,13 @@ func TestNewSampleError(t *testing.T) { for testName, tc := range tests { t.Run(testName, func(t *testing.T) { require.Equal(t, tc.expectedMsg, tc.err.Error()) - checkIngesterError(t, tc.err, badData) + checkIngesterError(t, tc.err, badData, true) wrappedErr := wrapOrAnnotateWithUser(tc.err, userID) require.ErrorIs(t, wrappedErr, tc.err) var sampleErr sampleError require.ErrorAs(t, wrappedErr, &sampleErr) - checkIngesterError(t, wrappedErr, badData) + checkIngesterError(t, wrappedErr, badData, true) }) } } @@ -208,13 +208,13 @@ func TestNewExemplarError(t *testing.T) { for testName, tc := range tests { t.Run(testName, func(t *testing.T) { require.Equal(t, tc.expectedMsg, tc.err.Error()) - checkIngesterError(t, tc.err, badData) + checkIngesterError(t, tc.err, badData, true) wrappedErr := wrapOrAnnotateWithUser(tc.err, userID) require.ErrorIs(t, wrappedErr, tc.err) var exemplarErr exemplarError require.ErrorAs(t, wrappedErr, &exemplarErr) - checkIngesterError(t, wrappedErr, badData) + checkIngesterError(t, wrappedErr, badData, true) }) } } @@ -226,12 +226,12 @@ func TestNewTSDBIngestExemplarErr(t *testing.T) { err := newTSDBIngestExemplarErr(anotherErr, timestamp, seriesLabels, exemplarsLabels) expectedErrMsg := fmt.Sprintf("err: %v. timestamp=1970-01-19T05:30:43.969Z, series={__name__=\"test\"}, exemplar={traceID=\"123\"}", anotherErr) require.Equal(t, expectedErrMsg, err.Error()) - checkIngesterError(t, err, badData) + checkIngesterError(t, err, badData, true) wrappedErr := wrapOrAnnotateWithUser(err, userID) require.ErrorIs(t, wrappedErr, err) require.ErrorAs(t, wrappedErr, &tsdbIngestExemplarErr{}) - checkIngesterError(t, wrappedErr, badData) + checkIngesterError(t, wrappedErr, badData, true) } func TestErrorWithStatus(t *testing.T) { @@ -282,8 +282,13 @@ func TestWrapOrAnnotateWithUser(t *testing.T) { require.Equal(t, wrappingErr, errors.Unwrap(wrappedSafeErr)) } -func checkIngesterError(t *testing.T, err error, expectedType ingesterErrorType) { +func checkIngesterError(t *testing.T, err error, expectedType ingesterErrorType, isSoft bool) { var ingesterErr ingesterError require.ErrorAs(t, err, &ingesterErr) require.Equal(t, expectedType, ingesterErr.errorType()) + + if isSoft { + var softErr softError + require.ErrorAs(t, err, &softErr) + } } diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 9b75a3f03e8..064cec7bcf9 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -852,8 +852,13 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReques // successfully committed stats pushStats - firstPartialErr error - updateFirstPartial = func(sampler *util_log.Sampler, errFn func() error) { + firstPartialErr error + // updateFirstPartial is a function that, in case of a softError, stores that error + // in firstPartialError, and makes PushWithCleanup proceed. This way all the valid + // samples and exemplars will be we actually ingested, and the first softError that + // was encountered will be returned. If a sampler is specified, the softError gets + // wrapped by that sampler. + updateFirstPartial = func(sampler *util_log.Sampler, errFn softErrorFunction) { if firstPartialErr == nil { firstPartialErr = errFn() if sampler != nil { @@ -963,9 +968,10 @@ func (i *Ingester) updateMetricsFromPushStats(userID string, group string, stats } // pushSamplesToAppender appends samples and exemplars to the appender. Most errors are handled via updateFirstPartial function, -// but in case of unhandled errors, appender is rolled back and such error is returned. +// but in case of unhandled errors, appender is rolled back and such error is returned. Errors handled by updateFirstPartial +// must be of type softError. func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.PreallocTimeseries, app extendedAppender, startAppend time.Time, - stats *pushStats, updateFirstPartial func(sampler *util_log.Sampler, errFn func() error), activeSeries *activeseries.ActiveSeries, + stats *pushStats, updateFirstPartial func(sampler *util_log.Sampler, errFn softErrorFunction), activeSeries *activeseries.ActiveSeries, outOfOrderWindow time.Duration, minAppendTimeAvailable bool, minAppendTime int64) error { // Return true if handled as soft error, and we can ingest more series. @@ -980,49 +986,49 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre switch cause := errors.Cause(err); cause { case storage.ErrOutOfBounds: stats.sampleOutOfBoundsCount++ - updateFirstPartial(i.errorSamplers.sampleTimestampTooOld, func() error { + updateFirstPartial(i.errorSamplers.sampleTimestampTooOld, func() softError { return newSampleTimestampTooOldError(model.Time(timestamp), labels) }) return true case storage.ErrOutOfOrderSample: stats.sampleOutOfOrderCount++ - updateFirstPartial(i.errorSamplers.sampleOutOfOrder, func() error { + updateFirstPartial(i.errorSamplers.sampleOutOfOrder, func() softError { return newSampleOutOfOrderError(model.Time(timestamp), labels) }) return true case storage.ErrTooOldSample: stats.sampleTooOldCount++ - updateFirstPartial(i.errorSamplers.sampleTimestampTooOldOOOEnabled, func() error { + updateFirstPartial(i.errorSamplers.sampleTimestampTooOldOOOEnabled, func() softError { return newSampleTimestampTooOldOOOEnabledError(model.Time(timestamp), labels, outOfOrderWindow) }) return true case globalerror.SampleTooFarInFuture: stats.sampleTooFarInFutureCount++ - updateFirstPartial(i.errorSamplers.sampleTimestampTooFarInFuture, func() error { + updateFirstPartial(i.errorSamplers.sampleTimestampTooFarInFuture, func() softError { return newSampleTimestampTooFarInFutureError(model.Time(timestamp), labels) }) return true case storage.ErrDuplicateSampleForTimestamp: stats.newValueForTimestampCount++ - updateFirstPartial(i.errorSamplers.sampleDuplicateTimestamp, func() error { + updateFirstPartial(i.errorSamplers.sampleDuplicateTimestamp, func() softError { return newSampleDuplicateTimestampError(model.Time(timestamp), labels) }) return true case globalerror.MaxSeriesPerUser: stats.perUserSeriesLimitCount++ - updateFirstPartial(i.errorSamplers.maxSeriesPerUserLimitExceeded, func() error { + updateFirstPartial(i.errorSamplers.maxSeriesPerUserLimitExceeded, func() softError { return newPerUserSeriesLimitReachedError(i.limiter.limits.MaxGlobalSeriesPerUser(userID)) }) return true case globalerror.MaxSeriesPerMetric: stats.perMetricSeriesLimitCount++ - updateFirstPartial(i.errorSamplers.maxSeriesPerMetricLimitExceeded, func() error { + updateFirstPartial(i.errorSamplers.maxSeriesPerMetricLimitExceeded, func() softError { return newPerMetricSeriesLimitReachedError(i.limiter.limits.MaxGlobalSeriesPerMetric(userID), mimirpb.FromLabelAdaptersToLabelsWithCopy(labels)) }) return true @@ -1063,7 +1069,7 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre firstTimestamp = ts.Histograms[0].Timestamp } - updateFirstPartial(i.errorSamplers.sampleTimestampTooOld, func() error { + updateFirstPartial(i.errorSamplers.sampleTimestampTooOld, func() softError { return newSampleTimestampTooOldError(model.Time(firstTimestamp), ts.Labels) }) continue @@ -1078,7 +1084,7 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre firstTimestamp := ts.Samples[0].TimestampMs - updateFirstPartial(i.errorSamplers.sampleTimestampTooOld, func() error { + updateFirstPartial(i.errorSamplers.sampleTimestampTooOld, func() softError { return newSampleTimestampTooOldError(model.Time(firstTimestamp), ts.Labels) }) continue @@ -1197,7 +1203,7 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre // app.AppendExemplar currently doesn't create the series, it must // already exist. If it does not then drop. if ref == 0 { - updateFirstPartial(nil, func() error { + updateFirstPartial(nil, func() softError { return newExemplarMissingSeriesError(model.Time(ts.Exemplars[0].TimestampMs), ts.Labels, ts.Exemplars[0].Labels) }) stats.failedExemplarsCount += len(ts.Exemplars) @@ -1205,7 +1211,7 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre for _, ex := range ts.Exemplars { if ex.TimestampMs > maxTimestampMs { stats.failedExemplarsCount++ - updateFirstPartial(nil, func() error { + updateFirstPartial(nil, func() softError { return newExemplarTimestampTooFarInFutureError(model.Time(ex.TimestampMs), ts.Labels, ex.Labels) }) continue @@ -1225,7 +1231,7 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre } // Error adding exemplar - updateFirstPartial(nil, func() error { + updateFirstPartial(nil, func() softError { if err == nil { return nil } diff --git a/pkg/ingester/instance_limits_test.go b/pkg/ingester/instance_limits_test.go index 7c15519519b..1133214812a 100644 --- a/pkg/ingester/instance_limits_test.go +++ b/pkg/ingester/instance_limits_test.go @@ -47,11 +47,11 @@ func TestInstanceLimitErr(t *testing.T) { var instanceLimitErr instanceLimitReachedError require.Error(t, instanceLimitErr) require.ErrorAs(t, limitError, &instanceLimitErr) - checkIngesterError(t, limitError, instanceLimitReached) + checkIngesterError(t, limitError, instanceLimitReached, false) wrappedWithUserErr := wrapOrAnnotateWithUser(limitError, userID) require.Error(t, wrappedWithUserErr) require.ErrorIs(t, wrappedWithUserErr, limitError) - checkIngesterError(t, wrappedWithUserErr, instanceLimitReached) + checkIngesterError(t, wrappedWithUserErr, instanceLimitReached, false) } }