Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow ingester.Push() to proceed on soft errors only #6366

Merged
merged 1 commit into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion pkg/ingester/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ type ingesterError interface {
errorType() ingesterErrorType
}

// softError is a marker interface for the errors on which ingester.Push should not stop immediately.
type softError interface {
error
soft()
}

type softErrorFunction func() softError

// wrapOrAnnotateWithUser prepends the given userID to the given error.
// If the given error matches one of the errors from this package, the
// returned error retains a reference to the former.
Expand Down Expand Up @@ -114,6 +122,9 @@ func (e sampleError) errorType() ingesterErrorType {
return badData
}

// sampleError implements the softError interface.
func (e sampleError) soft() {}

func newSampleError(errID globalerror.ID, errMsg string, timestamp model.Time, labels []mimirpb.LabelAdapter) sampleError {
return sampleError{
errID: errID,
Expand Down Expand Up @@ -167,6 +178,9 @@ func (e exemplarError) errorType() ingesterErrorType {
return badData
}

// exemplarError implements the softError interface.
func (e exemplarError) soft() {}

func newExemplarError(errID globalerror.ID, errMsg string, timestamp model.Time, seriesLabels, exemplarLabels []mimirpb.LabelAdapter) exemplarError {
return exemplarError{
errID: errID,
Expand Down Expand Up @@ -207,7 +221,10 @@ func (e tsdbIngestExemplarErr) errorType() ingesterErrorType {
return badData
}

func newTSDBIngestExemplarErr(ingestErr error, timestamp model.Time, seriesLabels, exemplarLabels []mimirpb.LabelAdapter) error {
// tsdbIngestExemplarErr implements the softError interface.
func (e tsdbIngestExemplarErr) soft() {}

func newTSDBIngestExemplarErr(ingestErr error, timestamp model.Time, seriesLabels, exemplarLabels []mimirpb.LabelAdapter) tsdbIngestExemplarErr {
return tsdbIngestExemplarErr{
originalErr: ingestErr,
timestamp: timestamp,
Expand Down Expand Up @@ -240,6 +257,9 @@ func (e perUserSeriesLimitReachedError) errorType() ingesterErrorType {
return badData
}

// perUserSeriesLimitReachedError implements the softError interface.
func (e perUserSeriesLimitReachedError) soft() {}

// perUserMetadataLimitReachedError is an ingesterError indicating that a per-user metadata limit has been reached.
type perUserMetadataLimitReachedError struct {
limit int
Expand All @@ -264,6 +284,9 @@ func (e perUserMetadataLimitReachedError) errorType() ingesterErrorType {
return badData
}

// perUserMetadataLimitReachedError implements the softError interface.
func (e perUserMetadataLimitReachedError) soft() {}

// perMetricSeriesLimitReachedError is an ingesterError indicating that a per-metric series limit has been reached.
type perMetricSeriesLimitReachedError struct {
limit int
Expand Down Expand Up @@ -293,6 +316,9 @@ func (e perMetricSeriesLimitReachedError) errorType() ingesterErrorType {
return badData
}

// perMetricSeriesLimitReachedError implements the softError interface.
func (e perMetricSeriesLimitReachedError) soft() {}

// perMetricMetadataLimitReachedError is an ingesterError indicating that a per-metric metadata limit has been reached.
type perMetricMetadataLimitReachedError struct {
limit int
Expand Down Expand Up @@ -322,6 +348,9 @@ func (e perMetricMetadataLimitReachedError) errorType() ingesterErrorType {
return badData
}

// perMetricMetadataLimitReachedError implements the softError interface.
func (e perMetricMetadataLimitReachedError) soft() {}

// unavailableError is an ingesterError indicating that the ingester is unavailable.
type unavailableError struct {
state services.State
Expand Down
47 changes: 26 additions & 21 deletions pkg/ingester/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,33 +32,33 @@ func TestUnavailableError(t *testing.T) {
require.Error(t, err)
expectedMsg := fmt.Sprintf(integerUnavailableMsgFormat, state)
require.EqualError(t, err, expectedMsg)
checkIngesterError(t, err, unavailable)
checkIngesterError(t, err, unavailable, false)

wrappedErr := wrapOrAnnotateWithUser(err, userID)
require.ErrorIs(t, wrappedErr, err)
require.ErrorAs(t, wrappedErr, &unavailableError{})
checkIngesterError(t, wrappedErr, unavailable)
checkIngesterError(t, wrappedErr, unavailable, false)
}

func TestInstanceLimitReachedError(t *testing.T) {
limitErrorMessage := "this is a limit error message"
err := newInstanceLimitReachedError(limitErrorMessage)
require.Error(t, err)
require.EqualError(t, err, limitErrorMessage)
checkIngesterError(t, err, instanceLimitReached)
checkIngesterError(t, err, instanceLimitReached, false)

wrappedErr := wrapOrAnnotateWithUser(err, userID)
require.ErrorIs(t, wrappedErr, err)
require.ErrorAs(t, wrappedErr, &instanceLimitReachedError{})
checkIngesterError(t, wrappedErr, instanceLimitReached)
checkIngesterError(t, wrappedErr, instanceLimitReached, false)
}

func TestNewTSDBUnavailableError(t *testing.T) {
tsdbErrMsg := "TSDB Head forced compaction in progress and no write request is currently allowed"
err := newTSDBUnavailableError(tsdbErrMsg)
require.Error(t, err)
require.EqualError(t, err, tsdbErrMsg)
checkIngesterError(t, err, tsdbUnavailable)
checkIngesterError(t, err, tsdbUnavailable, false)

wrappedErr := fmt.Errorf("wrapped: %w", err)
require.ErrorIs(t, wrappedErr, err)
Expand All @@ -67,7 +67,7 @@ func TestNewTSDBUnavailableError(t *testing.T) {
wrappedWithUserErr := wrapOrAnnotateWithUser(err, userID)
require.ErrorIs(t, wrappedWithUserErr, err)
require.ErrorAs(t, wrappedWithUserErr, &tsdbUnavailableError{})
checkIngesterError(t, wrappedErr, tsdbUnavailable)
checkIngesterError(t, wrappedErr, tsdbUnavailable, false)
}

func TestNewPerUserSeriesLimitError(t *testing.T) {
Expand All @@ -78,12 +78,12 @@ func TestNewPerUserSeriesLimitError(t *testing.T) {
validation.MaxSeriesPerUserFlag,
)
require.Equal(t, expectedErrMsg, err.Error())
checkIngesterError(t, err, badData)
checkIngesterError(t, err, badData, true)

wrappedErr := wrapOrAnnotateWithUser(err, userID)
require.ErrorIs(t, wrappedErr, err)
require.ErrorAs(t, wrappedErr, &perUserSeriesLimitReachedError{})
checkIngesterError(t, wrappedErr, badData)
checkIngesterError(t, wrappedErr, badData, true)
}

func TestNewPerUserMetadataLimitError(t *testing.T) {
Expand All @@ -94,12 +94,12 @@ func TestNewPerUserMetadataLimitError(t *testing.T) {
validation.MaxMetadataPerUserFlag,
)
require.Equal(t, expectedErrMsg, err.Error())
checkIngesterError(t, err, badData)
checkIngesterError(t, err, badData, true)

wrappedErr := wrapOrAnnotateWithUser(err, userID)
require.ErrorIs(t, wrappedErr, err)
require.ErrorAs(t, wrappedErr, &perUserMetadataLimitReachedError{})
checkIngesterError(t, wrappedErr, badData)
checkIngesterError(t, wrappedErr, badData, true)
}

func TestNewPerMetricSeriesLimitError(t *testing.T) {
Expand All @@ -116,12 +116,12 @@ func TestNewPerMetricSeriesLimitError(t *testing.T) {
labels.String(),
)
require.Equal(t, expectedErrMsg, err.Error())
checkIngesterError(t, err, badData)
checkIngesterError(t, err, badData, true)

wrappedErr := wrapOrAnnotateWithUser(err, userID)
require.ErrorIs(t, wrappedErr, err)
require.ErrorAs(t, wrappedErr, &perMetricSeriesLimitReachedError{})
checkIngesterError(t, wrappedErr, badData)
checkIngesterError(t, wrappedErr, badData, true)
}

func TestNewPerMetricMetadataLimitError(t *testing.T) {
Expand All @@ -138,12 +138,12 @@ func TestNewPerMetricMetadataLimitError(t *testing.T) {
labels.String(),
)
require.Equal(t, expectedErrMsg, err.Error())
checkIngesterError(t, err, badData)
checkIngesterError(t, err, badData, true)

wrappedErr := wrapOrAnnotateWithUser(err, userID)
require.ErrorIs(t, wrappedErr, err)
require.ErrorAs(t, wrappedErr, &perMetricMetadataLimitReachedError{})
checkIngesterError(t, wrappedErr, badData)
checkIngesterError(t, wrappedErr, badData, true)
}

func TestNewSampleError(t *testing.T) {
Expand Down Expand Up @@ -177,13 +177,13 @@ func TestNewSampleError(t *testing.T) {
for testName, tc := range tests {
t.Run(testName, func(t *testing.T) {
require.Equal(t, tc.expectedMsg, tc.err.Error())
checkIngesterError(t, tc.err, badData)
checkIngesterError(t, tc.err, badData, true)

wrappedErr := wrapOrAnnotateWithUser(tc.err, userID)
require.ErrorIs(t, wrappedErr, tc.err)
var sampleErr sampleError
require.ErrorAs(t, wrappedErr, &sampleErr)
checkIngesterError(t, wrappedErr, badData)
checkIngesterError(t, wrappedErr, badData, true)
})
}
}
Expand All @@ -208,13 +208,13 @@ func TestNewExemplarError(t *testing.T) {
for testName, tc := range tests {
t.Run(testName, func(t *testing.T) {
require.Equal(t, tc.expectedMsg, tc.err.Error())
checkIngesterError(t, tc.err, badData)
checkIngesterError(t, tc.err, badData, true)

wrappedErr := wrapOrAnnotateWithUser(tc.err, userID)
require.ErrorIs(t, wrappedErr, tc.err)
var exemplarErr exemplarError
require.ErrorAs(t, wrappedErr, &exemplarErr)
checkIngesterError(t, wrappedErr, badData)
checkIngesterError(t, wrappedErr, badData, true)
})
}
}
Expand All @@ -226,12 +226,12 @@ func TestNewTSDBIngestExemplarErr(t *testing.T) {
err := newTSDBIngestExemplarErr(anotherErr, timestamp, seriesLabels, exemplarsLabels)
expectedErrMsg := fmt.Sprintf("err: %v. timestamp=1970-01-19T05:30:43.969Z, series={__name__=\"test\"}, exemplar={traceID=\"123\"}", anotherErr)
require.Equal(t, expectedErrMsg, err.Error())
checkIngesterError(t, err, badData)
checkIngesterError(t, err, badData, true)

wrappedErr := wrapOrAnnotateWithUser(err, userID)
require.ErrorIs(t, wrappedErr, err)
require.ErrorAs(t, wrappedErr, &tsdbIngestExemplarErr{})
checkIngesterError(t, wrappedErr, badData)
checkIngesterError(t, wrappedErr, badData, true)
}

func TestErrorWithStatus(t *testing.T) {
Expand Down Expand Up @@ -282,8 +282,13 @@ func TestWrapOrAnnotateWithUser(t *testing.T) {
require.Equal(t, wrappingErr, errors.Unwrap(wrappedSafeErr))
}

func checkIngesterError(t *testing.T, err error, expectedType ingesterErrorType) {
func checkIngesterError(t *testing.T, err error, expectedType ingesterErrorType, isSoft bool) {
var ingesterErr ingesterError
require.ErrorAs(t, err, &ingesterErr)
require.Equal(t, expectedType, ingesterErr.errorType())

if isSoft {
var softErr softError
require.ErrorAs(t, err, &softErr)
}
}
38 changes: 22 additions & 16 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -852,8 +852,13 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReques
// successfully committed
stats pushStats

firstPartialErr error
updateFirstPartial = func(sampler *util_log.Sampler, errFn func() error) {
firstPartialErr error
// updateFirstPartial is a function that, in case of a softError, stores that error
// in firstPartialError, and makes PushWithCleanup proceed. This way all the valid
// samples and exemplars will be we actually ingested, and the first softError that
// was encountered will be returned. If a sampler is specified, the softError gets
// wrapped by that sampler.
updateFirstPartial = func(sampler *util_log.Sampler, errFn softErrorFunction) {
if firstPartialErr == nil {
firstPartialErr = errFn()
if sampler != nil {
Expand Down Expand Up @@ -963,9 +968,10 @@ func (i *Ingester) updateMetricsFromPushStats(userID string, group string, stats
}

// pushSamplesToAppender appends samples and exemplars to the appender. Most errors are handled via updateFirstPartial function,
// but in case of unhandled errors, appender is rolled back and such error is returned.
// but in case of unhandled errors, appender is rolled back and such error is returned. Errors handled by updateFirstPartial
// must be of type softError.
func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.PreallocTimeseries, app extendedAppender, startAppend time.Time,
stats *pushStats, updateFirstPartial func(sampler *util_log.Sampler, errFn func() error), activeSeries *activeseries.ActiveSeries,
stats *pushStats, updateFirstPartial func(sampler *util_log.Sampler, errFn softErrorFunction), activeSeries *activeseries.ActiveSeries,
outOfOrderWindow time.Duration, minAppendTimeAvailable bool, minAppendTime int64) error {

// Return true if handled as soft error, and we can ingest more series.
Expand All @@ -980,49 +986,49 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre
switch cause := errors.Cause(err); cause {
case storage.ErrOutOfBounds:
stats.sampleOutOfBoundsCount++
updateFirstPartial(i.errorSamplers.sampleTimestampTooOld, func() error {
updateFirstPartial(i.errorSamplers.sampleTimestampTooOld, func() softError {
return newSampleTimestampTooOldError(model.Time(timestamp), labels)
})
return true

case storage.ErrOutOfOrderSample:
stats.sampleOutOfOrderCount++
updateFirstPartial(i.errorSamplers.sampleOutOfOrder, func() error {
updateFirstPartial(i.errorSamplers.sampleOutOfOrder, func() softError {
return newSampleOutOfOrderError(model.Time(timestamp), labels)
})
return true

case storage.ErrTooOldSample:
stats.sampleTooOldCount++
updateFirstPartial(i.errorSamplers.sampleTimestampTooOldOOOEnabled, func() error {
updateFirstPartial(i.errorSamplers.sampleTimestampTooOldOOOEnabled, func() softError {
return newSampleTimestampTooOldOOOEnabledError(model.Time(timestamp), labels, outOfOrderWindow)
})
return true

case globalerror.SampleTooFarInFuture:
stats.sampleTooFarInFutureCount++
updateFirstPartial(i.errorSamplers.sampleTimestampTooFarInFuture, func() error {
updateFirstPartial(i.errorSamplers.sampleTimestampTooFarInFuture, func() softError {
return newSampleTimestampTooFarInFutureError(model.Time(timestamp), labels)
})
return true

case storage.ErrDuplicateSampleForTimestamp:
stats.newValueForTimestampCount++
updateFirstPartial(i.errorSamplers.sampleDuplicateTimestamp, func() error {
updateFirstPartial(i.errorSamplers.sampleDuplicateTimestamp, func() softError {
return newSampleDuplicateTimestampError(model.Time(timestamp), labels)
})
return true

case globalerror.MaxSeriesPerUser:
stats.perUserSeriesLimitCount++
updateFirstPartial(i.errorSamplers.maxSeriesPerUserLimitExceeded, func() error {
updateFirstPartial(i.errorSamplers.maxSeriesPerUserLimitExceeded, func() softError {
return newPerUserSeriesLimitReachedError(i.limiter.limits.MaxGlobalSeriesPerUser(userID))
})
return true

case globalerror.MaxSeriesPerMetric:
stats.perMetricSeriesLimitCount++
updateFirstPartial(i.errorSamplers.maxSeriesPerMetricLimitExceeded, func() error {
updateFirstPartial(i.errorSamplers.maxSeriesPerMetricLimitExceeded, func() softError {
return newPerMetricSeriesLimitReachedError(i.limiter.limits.MaxGlobalSeriesPerMetric(userID), mimirpb.FromLabelAdaptersToLabelsWithCopy(labels))
})
return true
Expand Down Expand Up @@ -1063,7 +1069,7 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre
firstTimestamp = ts.Histograms[0].Timestamp
}

updateFirstPartial(i.errorSamplers.sampleTimestampTooOld, func() error {
updateFirstPartial(i.errorSamplers.sampleTimestampTooOld, func() softError {
return newSampleTimestampTooOldError(model.Time(firstTimestamp), ts.Labels)
})
continue
Expand All @@ -1078,7 +1084,7 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre

firstTimestamp := ts.Samples[0].TimestampMs

updateFirstPartial(i.errorSamplers.sampleTimestampTooOld, func() error {
updateFirstPartial(i.errorSamplers.sampleTimestampTooOld, func() softError {
return newSampleTimestampTooOldError(model.Time(firstTimestamp), ts.Labels)
})
continue
Expand Down Expand Up @@ -1197,15 +1203,15 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre
// app.AppendExemplar currently doesn't create the series, it must
// already exist. If it does not then drop.
if ref == 0 {
updateFirstPartial(nil, func() error {
updateFirstPartial(nil, func() softError {
return newExemplarMissingSeriesError(model.Time(ts.Exemplars[0].TimestampMs), ts.Labels, ts.Exemplars[0].Labels)
})
stats.failedExemplarsCount += len(ts.Exemplars)
} else { // Note that else is explicit, rather than a continue in the above if, in case of additional logic post exemplar processing.
for _, ex := range ts.Exemplars {
if ex.TimestampMs > maxTimestampMs {
stats.failedExemplarsCount++
updateFirstPartial(nil, func() error {
updateFirstPartial(nil, func() softError {
return newExemplarTimestampTooFarInFutureError(model.Time(ex.TimestampMs), ts.Labels, ex.Labels)
})
continue
Expand All @@ -1225,7 +1231,7 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre
}

// Error adding exemplar
updateFirstPartial(nil, func() error {
updateFirstPartial(nil, func() softError {
if err == nil {
return nil
}
Expand Down
Loading
Loading