Skip to content

Commit

Permalink
Enforce -validation.create-grace-period for exemplars too (#5761)
Browse files Browse the repository at this point in the history
* Enforce -validation.create-grace-period for exemplars too

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Update CLI flag description

Signed-off-by: Marco Pracucci <marco@pracucci.com>

---------

Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci authored Aug 17, 2023
1 parent 1902411 commit bb6e672
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* [CHANGE] gRPC clients: use default connect timeout of 5s, and therefore enable default connect backoff max delay of 5s. #5562
* [CHANGE] The `-shutdown-delay` flag is no longer experimental. #5701
* [CHANGE] The `-validation.create-grace-period` is now enforced in the ingester too, other than distributor and query-frontend. If you've configured `-validation.create-grace-period` then make sure the configuration is applied to ingesters too. #5712
* [CHANGE] The `-validation.create-grace-period` is now enforced for examplars too in the distributor. If an examplar has timestamp greater than "now + grace_period", then the exemplar will be dropped and the metric `cortex_discarded_exemplars_total{reason="exemplar_too_far_in_future",user="..."}` increased. #5761
* [FEATURE] Cardinality API: Add a new `count_method` parameter which enables counting active series #5136
* [FEATURE] Query-frontend: added experimental support to cache cardinality, label names and label values query responses. The cache will be used when `-query-frontend.cache-results` is enabled, and `-query-frontend.results-cache-ttl-for-cardinality-query` or `-query-frontend.results-cache-ttl-for-labels-query` set to a value greater than 0. The following metrics have been added to track the query results cache hit ratio per `request_type`: #5212 #5235 #5426 #5524
* `cortex_frontend_query_result_cache_requests_total{request_type="query_range|cardinality|label_names_and_values"}`
Expand Down
2 changes: 1 addition & 1 deletion cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -2982,7 +2982,7 @@
"kind": "field",
"name": "creation_grace_period",
"required": false,
"desc": "Controls how far into the future incoming samples are accepted compared to the wall clock. Any sample will be rejected if its timestamp is greater than '(now + grace_period)'. This configuration is enforced in the distributor, ingester and query-frontend (to avoid querying too far into the future).",
"desc": "Controls how far into the future incoming samples and exemplars are accepted compared to the wall clock. Any sample or exemplar will be rejected if its timestamp is greater than '(now + grace_period)'. This configuration is enforced in the distributor, ingester and query-frontend (to avoid querying too far into the future).",
"fieldValue": null,
"fieldDefaultValue": 600000000000,
"fieldFlag": "validation.create-grace-period",
Expand Down
2 changes: 1 addition & 1 deletion cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -2584,7 +2584,7 @@ Usage of ./cmd/mimir/mimir:
-usage-stats.installation-mode string
Installation mode. Supported values: custom, helm, jsonnet. (default "custom")
-validation.create-grace-period duration
Controls how far into the future incoming samples are accepted compared to the wall clock. Any sample will be rejected if its timestamp is greater than '(now + grace_period)'. This configuration is enforced in the distributor, ingester and query-frontend (to avoid querying too far into the future). (default 10m)
Controls how far into the future incoming samples and exemplars are accepted compared to the wall clock. Any sample or exemplar will be rejected if its timestamp is greater than '(now + grace_period)'. This configuration is enforced in the distributor, ingester and query-frontend (to avoid querying too far into the future). (default 10m)
-validation.enforce-metadata-metric-name
Enforce every metadata has a metric name. (default true)
-validation.max-label-names-per-series int
Expand Down
10 changes: 5 additions & 5 deletions docs/sources/mimir/references/configuration-parameters/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2728,11 +2728,11 @@ The `limits` block configures default and per-tenant limits imposed by component
# CLI flag: -validation.max-native-histogram-buckets
[max_native_histogram_buckets: <int> | default = 0]
# (advanced) Controls how far into the future incoming samples are accepted
# compared to the wall clock. Any sample will be rejected if its timestamp is
# greater than '(now + grace_period)'. This configuration is enforced in the
# distributor, ingester and query-frontend (to avoid querying too far into the
# future).
# (advanced) Controls how far into the future incoming samples and exemplars are
# accepted compared to the wall clock. Any sample or exemplar will be rejected
# if its timestamp is greater than '(now + grace_period)'. This configuration is
# enforced in the distributor, ingester and query-frontend (to avoid querying
# too far into the future).
# CLI flag: -validation.create-grace-period
[creation_grace_period: <duration> | default = 10m]
Expand Down
9 changes: 6 additions & 3 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ func (d *Distributor) checkSample(ctx context.Context, userID, cluster, replica
// May alter timeseries data in-place.
// The returned error may retain the series labels.
// It uses the passed nowt time to observe the delay of sample timestamps.
func (d *Distributor) validateSeries(nowt time.Time, ts *mimirpb.PreallocTimeseries, userID, group string, skipLabelNameValidation bool, minExemplarTS int64) error {
func (d *Distributor) validateSeries(nowt time.Time, ts *mimirpb.PreallocTimeseries, userID, group string, skipLabelNameValidation bool, minExemplarTS, maxExemplarTS int64) error {
if err := validation.ValidateLabels(d.sampleValidationMetrics, d.limits, userID, group, ts.Labels, skipLabelNameValidation); err != nil {
return err
}
Expand Down Expand Up @@ -647,7 +647,7 @@ func (d *Distributor) validateSeries(nowt time.Time, ts *mimirpb.PreallocTimeser
// there never will be any.
return err
}
if !validation.ExemplarTimestampOK(d.exemplarValidationMetrics, userID, minExemplarTS, e) {
if !validation.ValidateExemplarTimestamp(d.exemplarValidationMetrics, userID, minExemplarTS, maxExemplarTS, e) {
ts.DeleteExemplarByMovingLast(i)
// Don't increase index i. After moving last exemplar to this index, we want to check it again.
continue
Expand Down Expand Up @@ -877,6 +877,9 @@ func (d *Distributor) prePushValidationMiddleware(next push.Func) push.Func {
minExemplarTS = earliestSampleTimestampMs - 5*time.Minute.Milliseconds()
}

// Enforce the creation grace period on exemplars too.
maxExemplarTS := now.Add(d.limits.CreationGracePeriod(userID)).UnixMilli()

var firstPartialErr error
var removeIndexes []int
for tsIdx, ts := range req.Timeseries {
Expand All @@ -889,7 +892,7 @@ func (d *Distributor) prePushValidationMiddleware(next push.Func) push.Func {

skipLabelNameValidation := d.cfg.SkipLabelNameValidation || req.GetSkipLabelNameValidation()
// Note that validateSeries may drop some data in ts.
validationErr := d.validateSeries(now, &req.Timeseries[tsIdx], userID, group, skipLabelNameValidation, minExemplarTS)
validationErr := d.validateSeries(now, &req.Timeseries[tsIdx], userID, group, skipLabelNameValidation, minExemplarTS, maxExemplarTS)

// Errors in validation are considered non-fatal, as one series in a request may contain
// invalid data but all the remaining series could be perfectly valid.
Expand Down
72 changes: 67 additions & 5 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1399,14 +1399,17 @@ func TestDistributor_ExemplarValidation(t *testing.T) {
tests := map[string]struct {
prepareConfig func(limits *validation.Limits)
minExemplarTS int64
maxExemplarTS int64
req *mimirpb.WriteRequest
expectedExemplars []mimirpb.PreallocTimeseries
expectedMetrics string
}{
"disable exemplars": {
prepareConfig: func(limits *validation.Limits) {
limits.MaxGlobalExemplarsPerUser = 0
},
minExemplarTS: 0,
maxExemplarTS: 0,
req: &mimirpb.WriteRequest{Timeseries: []mimirpb.PreallocTimeseries{
makeExemplarTimeseries([]string{model.MetricNameLabel, "test1"}, 1000, []string{"foo", "bar"}),
}},
Expand All @@ -1422,6 +1425,7 @@ func TestDistributor_ExemplarValidation(t *testing.T) {
limits.MaxGlobalExemplarsPerUser = 1
},
minExemplarTS: 0,
maxExemplarTS: math.MaxInt64,
req: &mimirpb.WriteRequest{Timeseries: []mimirpb.PreallocTimeseries{
makeExemplarTimeseries([]string{model.MetricNameLabel, "test1"}, 1000, []string{"foo", "bar"}),
makeExemplarTimeseries([]string{model.MetricNameLabel, "test2"}, 1000, []string{"foo", "bar"}),
Expand All @@ -1431,11 +1435,12 @@ func TestDistributor_ExemplarValidation(t *testing.T) {
makeExemplarTimeseries([]string{model.MetricNameLabel, "test2"}, 1000, []string{"foo", "bar"}),
},
},
"one old, one new, separate series": {
"should drop exemplars with timestamp lower than the accepted minimum, when the exemplars are specified in different series": {
prepareConfig: func(limits *validation.Limits) {
limits.MaxGlobalExemplarsPerUser = 1
},
minExemplarTS: 300000,
maxExemplarTS: math.MaxInt64,
req: &mimirpb.WriteRequest{Timeseries: []mimirpb.PreallocTimeseries{
makeExemplarTimeseries([]string{model.MetricNameLabel, "test"}, 1000, []string{"foo", "bar"}),
makeExemplarTimeseries([]string{model.MetricNameLabel, "test"}, 601000, []string{"foo", "bar"}),
Expand All @@ -1447,12 +1452,18 @@ func TestDistributor_ExemplarValidation(t *testing.T) {
}},
makeExemplarTimeseries([]string{model.MetricNameLabel, "test"}, 601000, []string{"foo", "bar"}),
},
expectedMetrics: `
# HELP cortex_discarded_exemplars_total The total number of exemplars that were discarded.
# TYPE cortex_discarded_exemplars_total counter
cortex_discarded_exemplars_total{reason="exemplar_too_old",user="user"} 1
`,
},
"multi exemplars": {
"should drop exemplars with timestamp lower than the accepted minimum, when multiple exemplars are specified for the same series": {
prepareConfig: func(limits *validation.Limits) {
limits.MaxGlobalExemplarsPerUser = 2
},
minExemplarTS: 300000,
maxExemplarTS: math.MaxInt64,
req: &mimirpb.WriteRequest{Timeseries: []mimirpb.PreallocTimeseries{
{
TimeSeries: &mimirpb.TimeSeries{
Expand All @@ -1474,12 +1485,18 @@ func TestDistributor_ExemplarValidation(t *testing.T) {
},
},
},
expectedMetrics: `
# HELP cortex_discarded_exemplars_total The total number of exemplars that were discarded.
# TYPE cortex_discarded_exemplars_total counter
cortex_discarded_exemplars_total{reason="exemplar_too_old",user="user"} 1
`,
},
"one old, one new, same series": {
"should drop exemplars with timestamp lower than the accepted minimum, when multiple exemplars are specified in the same series": {
prepareConfig: func(limits *validation.Limits) {
limits.MaxGlobalExemplarsPerUser = 2
},
minExemplarTS: 300000,
maxExemplarTS: math.MaxInt64,
req: &mimirpb.WriteRequest{Timeseries: []mimirpb.PreallocTimeseries{
{
TimeSeries: &mimirpb.TimeSeries{
Expand All @@ -1501,6 +1518,44 @@ func TestDistributor_ExemplarValidation(t *testing.T) {
},
},
},
expectedMetrics: `
# HELP cortex_discarded_exemplars_total The total number of exemplars that were discarded.
# TYPE cortex_discarded_exemplars_total counter
cortex_discarded_exemplars_total{reason="exemplar_too_old",user="user"} 1
`,
},
"should drop exemplars with timestamp greater than the accepted maximum, when multiple exemplars are specified in the same series": {
prepareConfig: func(limits *validation.Limits) {
limits.MaxGlobalExemplarsPerUser = 2
},
minExemplarTS: 0,
maxExemplarTS: 100000,
req: &mimirpb.WriteRequest{Timeseries: []mimirpb.PreallocTimeseries{
{
TimeSeries: &mimirpb.TimeSeries{
Labels: []mimirpb.LabelAdapter{{Name: model.MetricNameLabel, Value: "test"}},
Exemplars: []mimirpb.Exemplar{
{Labels: []mimirpb.LabelAdapter{{Name: "foo", Value: "bar1"}}, TimestampMs: 1000},
{Labels: []mimirpb.LabelAdapter{{Name: "foo", Value: "bar2"}}, TimestampMs: 601000},
},
},
},
}},
expectedExemplars: []mimirpb.PreallocTimeseries{
{
TimeSeries: &mimirpb.TimeSeries{
Labels: []mimirpb.LabelAdapter{{Name: model.MetricNameLabel, Value: "test"}},
Exemplars: []mimirpb.Exemplar{
{Labels: []mimirpb.LabelAdapter{{Name: "foo", Value: "bar1"}}, TimestampMs: 1000},
},
},
},
},
expectedMetrics: `
# HELP cortex_discarded_exemplars_total The total number of exemplars that were discarded.
# TYPE cortex_discarded_exemplars_total counter
cortex_discarded_exemplars_total{reason="exemplar_too_far_in_future",user="user"} 1
`,
},
}
now := mtime.Now()
Expand All @@ -1509,15 +1564,22 @@ func TestDistributor_ExemplarValidation(t *testing.T) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
tc.prepareConfig(limits)
ds, _, _ := prepare(t, prepConfig{
ds, _, regs := prepare(t, prepConfig{
limits: limits,
numDistributors: 1,
})

// Pre-condition check.
require.Len(t, ds, 1)
require.Len(t, regs, 1)

for _, ts := range tc.req.Timeseries {
err := ds[0].validateSeries(now, &ts, "user", "test-group", false, tc.minExemplarTS)
err := ds[0].validateSeries(now, &ts, "user", "test-group", false, tc.minExemplarTS, tc.maxExemplarTS)
assert.NoError(t, err)
}

assert.Equal(t, tc.expectedExemplars, tc.req.Timeseries)
assert.NoError(t, testutil.GatherAndCompare(regs[0], strings.NewReader(tc.expectedMetrics), "cortex_discarded_exemplars_total"))
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.MaxMetadataLength, maxMetadataLengthFlag, 1024, "Maximum length accepted for metric metadata. Metadata refers to Metric Name, HELP and UNIT. Longer metadata is dropped except for HELP which is truncated.")
f.IntVar(&l.MaxNativeHistogramBuckets, maxNativeHistogramBucketsFlag, 0, "Maximum number of buckets per native histogram sample. 0 to disable the limit.")
_ = l.CreationGracePeriod.Set("10m")
f.Var(&l.CreationGracePeriod, creationGracePeriodFlag, "Controls how far into the future incoming samples are accepted compared to the wall clock. Any sample will be rejected if its timestamp is greater than '(now + grace_period)'. This configuration is enforced in the distributor, ingester and query-frontend (to avoid querying too far into the future).")
f.Var(&l.CreationGracePeriod, creationGracePeriodFlag, "Controls how far into the future incoming samples and exemplars are accepted compared to the wall clock. Any sample or exemplar will be rejected if its timestamp is greater than '(now + grace_period)'. This configuration is enforced in the distributor, ingester and query-frontend (to avoid querying too far into the future).")
f.BoolVar(&l.EnforceMetadataMetricName, "validation.enforce-metadata-metric-name", true, "Enforce every metadata has a metric name.")

f.IntVar(&l.MaxGlobalSeriesPerUser, MaxSeriesPerUserFlag, 150000, "The maximum number of in-memory series per tenant, across the cluster before replication. 0 to disable.")
Expand Down
12 changes: 10 additions & 2 deletions pkg/util/validation/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ var (
reasonExemplarTimestampInvalid = globalerror.ExemplarTimestampInvalid.LabelValue()
reasonExemplarLabelsBlank = "exemplar_labels_blank"
reasonExemplarTooOld = "exemplar_too_old"
reasonExemplarTooFarInFuture = "exemplar_too_far_in_future"

// Discarded metadata reasons.
reasonMetadataMetricNameTooLong = globalerror.MetricMetadataMetricNameTooLong.LabelValue()
Expand Down Expand Up @@ -167,6 +168,7 @@ type ExemplarValidationMetrics struct {
labelsTooLong *prometheus.CounterVec
labelsBlank *prometheus.CounterVec
tooOld *prometheus.CounterVec
tooFarInFuture *prometheus.CounterVec
}

func (m *ExemplarValidationMetrics) DeleteUserMetrics(userID string) {
Expand All @@ -175,6 +177,7 @@ func (m *ExemplarValidationMetrics) DeleteUserMetrics(userID string) {
m.labelsTooLong.DeleteLabelValues(userID)
m.labelsBlank.DeleteLabelValues(userID)
m.tooOld.DeleteLabelValues(userID)
m.tooFarInFuture.DeleteLabelValues(userID)
}

func NewExemplarValidationMetrics(r prometheus.Registerer) *ExemplarValidationMetrics {
Expand All @@ -184,6 +187,7 @@ func NewExemplarValidationMetrics(r prometheus.Registerer) *ExemplarValidationMe
labelsTooLong: DiscardedExemplarsCounter(r, reasonExemplarLabelsTooLong),
labelsBlank: DiscardedExemplarsCounter(r, reasonExemplarLabelsBlank),
tooOld: DiscardedExemplarsCounter(r, reasonExemplarTooOld),
tooFarInFuture: DiscardedExemplarsCounter(r, reasonExemplarTooFarInFuture),
}
}

Expand Down Expand Up @@ -277,13 +281,17 @@ func ValidateExemplar(m *ExemplarValidationMetrics, userID string, ls []mimirpb.
return nil
}

// ExemplarTimestampOK returns true if the timestamp is newer than minTS.
// ValidateExemplarTimestamp returns true if the exemplar timestamp is between minTS and maxTS.
// This is separate from ValidateExemplar() so we can silently drop old ones, not log an error.
func ExemplarTimestampOK(m *ExemplarValidationMetrics, userID string, minTS int64, e mimirpb.Exemplar) bool {
func ValidateExemplarTimestamp(m *ExemplarValidationMetrics, userID string, minTS, maxTS int64, e mimirpb.Exemplar) bool {
if e.TimestampMs < minTS {
m.tooOld.WithLabelValues(userID).Inc()
return false
}
if e.TimestampMs > maxTS {
m.tooFarInFuture.WithLabelValues(userID).Inc()
return false
}
return true
}

Expand Down

0 comments on commit bb6e672

Please sign in to comment.