Skip to content

Commit

Permalink
Allow ingester.Push to proceed on soft errors only
Browse files Browse the repository at this point in the history
Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>
  • Loading branch information
duricanikolic committed Oct 13, 2023
1 parent 231d362 commit d04a78d
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 40 deletions.
31 changes: 30 additions & 1 deletion pkg/ingester/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ type ingesterError interface {
errorType() ingesterErrorType
}

// softError is a marker interface for the errors on which ingester.Push should not stop immediately.
type softError interface {
error
soft()
}

type softErrorFunction func() softError

// wrapOrAnnotateWithUser prepends the given userID to the given error.
// If the given error matches one of the errors from this package, the
// returned error retains a reference to the former.
Expand Down Expand Up @@ -114,6 +122,9 @@ func (e sampleError) errorType() ingesterErrorType {
return badData
}

// sampleError implements the softError interface.
func (e sampleError) soft() {}

func newSampleError(errID globalerror.ID, errMsg string, timestamp model.Time, labels []mimirpb.LabelAdapter) sampleError {
return sampleError{
errID: errID,
Expand Down Expand Up @@ -167,6 +178,9 @@ func (e exemplarError) errorType() ingesterErrorType {
return badData
}

// exemplarError implements the softError interface.
func (e exemplarError) soft() {}

func newExemplarError(errID globalerror.ID, errMsg string, timestamp model.Time, seriesLabels, exemplarLabels []mimirpb.LabelAdapter) exemplarError {
return exemplarError{
errID: errID,
Expand Down Expand Up @@ -207,7 +221,10 @@ func (e tsdbIngestExemplarErr) errorType() ingesterErrorType {
return badData
}

func newTSDBIngestExemplarErr(ingestErr error, timestamp model.Time, seriesLabels, exemplarLabels []mimirpb.LabelAdapter) error {
// tsdbIngestExemplarErr implements the softError interface.
func (e tsdbIngestExemplarErr) soft() {}

func newTSDBIngestExemplarErr(ingestErr error, timestamp model.Time, seriesLabels, exemplarLabels []mimirpb.LabelAdapter) tsdbIngestExemplarErr {
return tsdbIngestExemplarErr{
originalErr: ingestErr,
timestamp: timestamp,
Expand Down Expand Up @@ -240,6 +257,9 @@ func (e perUserSeriesLimitReachedError) errorType() ingesterErrorType {
return badData
}

// perUserSeriesLimitReachedError implements the softError interface.
func (e perUserSeriesLimitReachedError) soft() {}

// perUserMetadataLimitReachedError is an ingesterError indicating that a per-user metadata limit has been reached.
type perUserMetadataLimitReachedError struct {
limit int
Expand All @@ -264,6 +284,9 @@ func (e perUserMetadataLimitReachedError) errorType() ingesterErrorType {
return badData
}

// perUserMetadataLimitReachedError implements the softError interface.
func (e perUserMetadataLimitReachedError) soft() {}

// perMetricSeriesLimitReachedError is an ingesterError indicating that a per-metric series limit has been reached.
type perMetricSeriesLimitReachedError struct {
limit int
Expand Down Expand Up @@ -293,6 +316,9 @@ func (e perMetricSeriesLimitReachedError) errorType() ingesterErrorType {
return badData
}

// perMetricSeriesLimitReachedError implements the softError interface.
func (e perMetricSeriesLimitReachedError) soft() {}

// perMetricMetadataLimitReachedError is an ingesterError indicating that a per-metric metadata limit has been reached.
type perMetricMetadataLimitReachedError struct {
limit int
Expand Down Expand Up @@ -322,6 +348,9 @@ func (e perMetricMetadataLimitReachedError) errorType() ingesterErrorType {
return badData
}

// perMetricMetadataLimitReachedError implements the softError interface.
func (e perMetricMetadataLimitReachedError) soft() {}

// unavailableError is an ingesterError indicating that the ingester is unavailable.
type unavailableError struct {
state services.State
Expand Down
47 changes: 26 additions & 21 deletions pkg/ingester/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,33 +32,33 @@ func TestUnavailableError(t *testing.T) {
require.Error(t, err)
expectedMsg := fmt.Sprintf(integerUnavailableMsgFormat, state)
require.EqualError(t, err, expectedMsg)
checkIngesterError(t, err, unavailable)
checkIngesterError(t, err, unavailable, false)

wrappedErr := wrapOrAnnotateWithUser(err, userID)
require.ErrorIs(t, wrappedErr, err)
require.ErrorAs(t, wrappedErr, &unavailableError{})
checkIngesterError(t, wrappedErr, unavailable)
checkIngesterError(t, wrappedErr, unavailable, false)
}

func TestInstanceLimitReachedError(t *testing.T) {
limitErrorMessage := "this is a limit error message"
err := newInstanceLimitReachedError(limitErrorMessage)
require.Error(t, err)
require.EqualError(t, err, limitErrorMessage)
checkIngesterError(t, err, instanceLimitReached)
checkIngesterError(t, err, instanceLimitReached, false)

wrappedErr := wrapOrAnnotateWithUser(err, userID)
require.ErrorIs(t, wrappedErr, err)
require.ErrorAs(t, wrappedErr, &instanceLimitReachedError{})
checkIngesterError(t, wrappedErr, instanceLimitReached)
checkIngesterError(t, wrappedErr, instanceLimitReached, false)
}

func TestNewTSDBUnavailableError(t *testing.T) {
tsdbErrMsg := "TSDB Head forced compaction in progress and no write request is currently allowed"
err := newTSDBUnavailableError(tsdbErrMsg)
require.Error(t, err)
require.EqualError(t, err, tsdbErrMsg)
checkIngesterError(t, err, tsdbUnavailable)
checkIngesterError(t, err, tsdbUnavailable, false)

wrappedErr := fmt.Errorf("wrapped: %w", err)
require.ErrorIs(t, wrappedErr, err)
Expand All @@ -67,7 +67,7 @@ func TestNewTSDBUnavailableError(t *testing.T) {
wrappedWithUserErr := wrapOrAnnotateWithUser(err, userID)
require.ErrorIs(t, wrappedWithUserErr, err)
require.ErrorAs(t, wrappedWithUserErr, &tsdbUnavailableError{})
checkIngesterError(t, wrappedErr, tsdbUnavailable)
checkIngesterError(t, wrappedErr, tsdbUnavailable, false)
}

func TestNewPerUserSeriesLimitError(t *testing.T) {
Expand All @@ -78,12 +78,12 @@ func TestNewPerUserSeriesLimitError(t *testing.T) {
validation.MaxSeriesPerUserFlag,
)
require.Equal(t, expectedErrMsg, err.Error())
checkIngesterError(t, err, badData)
checkIngesterError(t, err, badData, true)

wrappedErr := wrapOrAnnotateWithUser(err, userID)
require.ErrorIs(t, wrappedErr, err)
require.ErrorAs(t, wrappedErr, &perUserSeriesLimitReachedError{})
checkIngesterError(t, wrappedErr, badData)
checkIngesterError(t, wrappedErr, badData, true)
}

func TestNewPerUserMetadataLimitError(t *testing.T) {
Expand All @@ -94,12 +94,12 @@ func TestNewPerUserMetadataLimitError(t *testing.T) {
validation.MaxMetadataPerUserFlag,
)
require.Equal(t, expectedErrMsg, err.Error())
checkIngesterError(t, err, badData)
checkIngesterError(t, err, badData, true)

wrappedErr := wrapOrAnnotateWithUser(err, userID)
require.ErrorIs(t, wrappedErr, err)
require.ErrorAs(t, wrappedErr, &perUserMetadataLimitReachedError{})
checkIngesterError(t, wrappedErr, badData)
checkIngesterError(t, wrappedErr, badData, true)
}

func TestNewPerMetricSeriesLimitError(t *testing.T) {
Expand All @@ -116,12 +116,12 @@ func TestNewPerMetricSeriesLimitError(t *testing.T) {
labels.String(),
)
require.Equal(t, expectedErrMsg, err.Error())
checkIngesterError(t, err, badData)
checkIngesterError(t, err, badData, true)

wrappedErr := wrapOrAnnotateWithUser(err, userID)
require.ErrorIs(t, wrappedErr, err)
require.ErrorAs(t, wrappedErr, &perMetricSeriesLimitReachedError{})
checkIngesterError(t, wrappedErr, badData)
checkIngesterError(t, wrappedErr, badData, true)
}

func TestNewPerMetricMetadataLimitError(t *testing.T) {
Expand All @@ -138,12 +138,12 @@ func TestNewPerMetricMetadataLimitError(t *testing.T) {
labels.String(),
)
require.Equal(t, expectedErrMsg, err.Error())
checkIngesterError(t, err, badData)
checkIngesterError(t, err, badData, true)

wrappedErr := wrapOrAnnotateWithUser(err, userID)
require.ErrorIs(t, wrappedErr, err)
require.ErrorAs(t, wrappedErr, &perMetricMetadataLimitReachedError{})
checkIngesterError(t, wrappedErr, badData)
checkIngesterError(t, wrappedErr, badData, true)
}

func TestNewSampleError(t *testing.T) {
Expand Down Expand Up @@ -177,13 +177,13 @@ func TestNewSampleError(t *testing.T) {
for testName, tc := range tests {
t.Run(testName, func(t *testing.T) {
require.Equal(t, tc.expectedMsg, tc.err.Error())
checkIngesterError(t, tc.err, badData)
checkIngesterError(t, tc.err, badData, true)

wrappedErr := wrapOrAnnotateWithUser(tc.err, userID)
require.ErrorIs(t, wrappedErr, tc.err)
var sampleErr sampleError
require.ErrorAs(t, wrappedErr, &sampleErr)
checkIngesterError(t, wrappedErr, badData)
checkIngesterError(t, wrappedErr, badData, true)
})
}
}
Expand All @@ -208,13 +208,13 @@ func TestNewExemplarError(t *testing.T) {
for testName, tc := range tests {
t.Run(testName, func(t *testing.T) {
require.Equal(t, tc.expectedMsg, tc.err.Error())
checkIngesterError(t, tc.err, badData)
checkIngesterError(t, tc.err, badData, true)

wrappedErr := wrapOrAnnotateWithUser(tc.err, userID)
require.ErrorIs(t, wrappedErr, tc.err)
var exemplarErr exemplarError
require.ErrorAs(t, wrappedErr, &exemplarErr)
checkIngesterError(t, wrappedErr, badData)
checkIngesterError(t, wrappedErr, badData, true)
})
}
}
Expand All @@ -226,12 +226,12 @@ func TestNewTSDBIngestExemplarErr(t *testing.T) {
err := newTSDBIngestExemplarErr(anotherErr, timestamp, seriesLabels, exemplarsLabels)
expectedErrMsg := fmt.Sprintf("err: %v. timestamp=1970-01-19T05:30:43.969Z, series={__name__=\"test\"}, exemplar={traceID=\"123\"}", anotherErr)
require.Equal(t, expectedErrMsg, err.Error())
checkIngesterError(t, err, badData)
checkIngesterError(t, err, badData, true)

wrappedErr := wrapOrAnnotateWithUser(err, userID)
require.ErrorIs(t, wrappedErr, err)
require.ErrorAs(t, wrappedErr, &tsdbIngestExemplarErr{})
checkIngesterError(t, wrappedErr, badData)
checkIngesterError(t, wrappedErr, badData, true)
}

func TestErrorWithStatus(t *testing.T) {
Expand Down Expand Up @@ -282,8 +282,13 @@ func TestWrapOrAnnotateWithUser(t *testing.T) {
require.Equal(t, wrappingErr, errors.Unwrap(wrappedSafeErr))
}

func checkIngesterError(t *testing.T, err error, expectedType ingesterErrorType) {
func checkIngesterError(t *testing.T, err error, expectedType ingesterErrorType, isSoft bool) {
var ingesterErr ingesterError
require.ErrorAs(t, err, &ingesterErr)
require.Equal(t, expectedType, ingesterErr.errorType())

if isSoft {
var softErr softError
require.ErrorAs(t, err, &softErr)
}
}
38 changes: 22 additions & 16 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -852,8 +852,13 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReques
// successfully committed
stats pushStats

firstPartialErr error
updateFirstPartial = func(sampler *util_log.Sampler, errFn func() error) {
firstPartialErr error
// updateFirstPartial is a function that, in case of a softError, stores that error
// in firstPartialError, and makes PushWithCleanup proceed. This way all the valid
// samples and exemplars will be we actually ingested, and the first softError that
// was encountered will be returned. If a sampler is specified, the softError gets
// wrapped by that sampler.
updateFirstPartial = func(sampler *util_log.Sampler, errFn softErrorFunction) {
if firstPartialErr == nil {
firstPartialErr = errFn()
if sampler != nil {
Expand Down Expand Up @@ -963,9 +968,10 @@ func (i *Ingester) updateMetricsFromPushStats(userID string, group string, stats
}

// pushSamplesToAppender appends samples and exemplars to the appender. Most errors are handled via updateFirstPartial function,
// but in case of unhandled errors, appender is rolled back and such error is returned.
// but in case of unhandled errors, appender is rolled back and such error is returned. Errors handled by updateFirstPartial
// must be of type softError.
func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.PreallocTimeseries, app extendedAppender, startAppend time.Time,
stats *pushStats, updateFirstPartial func(sampler *util_log.Sampler, errFn func() error), activeSeries *activeseries.ActiveSeries,
stats *pushStats, updateFirstPartial func(sampler *util_log.Sampler, errFn softErrorFunction), activeSeries *activeseries.ActiveSeries,
outOfOrderWindow time.Duration, minAppendTimeAvailable bool, minAppendTime int64) error {

// Return true if handled as soft error, and we can ingest more series.
Expand All @@ -980,49 +986,49 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre
switch cause := errors.Cause(err); cause {
case storage.ErrOutOfBounds:
stats.sampleOutOfBoundsCount++
updateFirstPartial(i.errorSamplers.sampleTimestampTooOld, func() error {
updateFirstPartial(i.errorSamplers.sampleTimestampTooOld, func() softError {
return newSampleTimestampTooOldError(model.Time(timestamp), labels)
})
return true

case storage.ErrOutOfOrderSample:
stats.sampleOutOfOrderCount++
updateFirstPartial(i.errorSamplers.sampleOutOfOrder, func() error {
updateFirstPartial(i.errorSamplers.sampleOutOfOrder, func() softError {
return newSampleOutOfOrderError(model.Time(timestamp), labels)
})
return true

case storage.ErrTooOldSample:
stats.sampleTooOldCount++
updateFirstPartial(i.errorSamplers.sampleTimestampTooOldOOOEnabled, func() error {
updateFirstPartial(i.errorSamplers.sampleTimestampTooOldOOOEnabled, func() softError {
return newSampleTimestampTooOldOOOEnabledError(model.Time(timestamp), labels, outOfOrderWindow)
})
return true

case globalerror.SampleTooFarInFuture:
stats.sampleTooFarInFutureCount++
updateFirstPartial(i.errorSamplers.sampleTimestampTooFarInFuture, func() error {
updateFirstPartial(i.errorSamplers.sampleTimestampTooFarInFuture, func() softError {
return newSampleTimestampTooFarInFutureError(model.Time(timestamp), labels)
})
return true

case storage.ErrDuplicateSampleForTimestamp:
stats.newValueForTimestampCount++
updateFirstPartial(i.errorSamplers.sampleDuplicateTimestamp, func() error {
updateFirstPartial(i.errorSamplers.sampleDuplicateTimestamp, func() softError {
return newSampleDuplicateTimestampError(model.Time(timestamp), labels)
})
return true

case globalerror.MaxSeriesPerUser:
stats.perUserSeriesLimitCount++
updateFirstPartial(i.errorSamplers.maxSeriesPerUserLimitExceeded, func() error {
updateFirstPartial(i.errorSamplers.maxSeriesPerUserLimitExceeded, func() softError {
return newPerUserSeriesLimitReachedError(i.limiter.limits.MaxGlobalSeriesPerUser(userID))
})
return true

case globalerror.MaxSeriesPerMetric:
stats.perMetricSeriesLimitCount++
updateFirstPartial(i.errorSamplers.maxSeriesPerMetricLimitExceeded, func() error {
updateFirstPartial(i.errorSamplers.maxSeriesPerMetricLimitExceeded, func() softError {
return newPerMetricSeriesLimitReachedError(i.limiter.limits.MaxGlobalSeriesPerMetric(userID), mimirpb.FromLabelAdaptersToLabelsWithCopy(labels))
})
return true
Expand Down Expand Up @@ -1063,7 +1069,7 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre
firstTimestamp = ts.Histograms[0].Timestamp
}

updateFirstPartial(i.errorSamplers.sampleTimestampTooOld, func() error {
updateFirstPartial(i.errorSamplers.sampleTimestampTooOld, func() softError {
return newSampleTimestampTooOldError(model.Time(firstTimestamp), ts.Labels)
})
continue
Expand All @@ -1078,7 +1084,7 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre

firstTimestamp := ts.Samples[0].TimestampMs

updateFirstPartial(i.errorSamplers.sampleTimestampTooOld, func() error {
updateFirstPartial(i.errorSamplers.sampleTimestampTooOld, func() softError {
return newSampleTimestampTooOldError(model.Time(firstTimestamp), ts.Labels)
})
continue
Expand Down Expand Up @@ -1197,15 +1203,15 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre
// app.AppendExemplar currently doesn't create the series, it must
// already exist. If it does not then drop.
if ref == 0 {
updateFirstPartial(nil, func() error {
updateFirstPartial(nil, func() softError {
return newExemplarMissingSeriesError(model.Time(ts.Exemplars[0].TimestampMs), ts.Labels, ts.Exemplars[0].Labels)
})
stats.failedExemplarsCount += len(ts.Exemplars)
} else { // Note that else is explicit, rather than a continue in the above if, in case of additional logic post exemplar processing.
for _, ex := range ts.Exemplars {
if ex.TimestampMs > maxTimestampMs {
stats.failedExemplarsCount++
updateFirstPartial(nil, func() error {
updateFirstPartial(nil, func() softError {
return newExemplarTimestampTooFarInFutureError(model.Time(ex.TimestampMs), ts.Labels, ex.Labels)
})
continue
Expand All @@ -1225,7 +1231,7 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre
}

// Error adding exemplar
updateFirstPartial(nil, func() error {
updateFirstPartial(nil, func() softError {
if err == nil {
return nil
}
Expand Down
Loading

0 comments on commit d04a78d

Please sign in to comment.