Skip to content

Commit

Permalink
Distributor: Add limit for exemplar per series (#7989)
Browse files Browse the repository at this point in the history
* Distributor: Add limit for exemplar per series

Signed-off-by: Ying WANG <ying.wang@grafana.com>

* Address comments

Signed-off-by: Ying WANG <ying.wang@grafana.com>

---------

Signed-off-by: Ying WANG <ying.wang@grafana.com>
  • Loading branch information
ying-jeanne authored Apr 29, 2024
1 parent 37e4b89 commit 6af4273
Show file tree
Hide file tree
Showing 11 changed files with 123 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=streaming`. #7693 #7899
* [FEATURE] New `/ingester/unregister-on-shutdown` HTTP endpoint allows dynamic access to ingesters' `-ingester.ring.unregister-on-shutdown` configuration. #7739
* [FEATURE] Server: added experimental [PROXY protocol support](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt). The PROXY protocol support can be enabled via `-server.proxy-protocol-enabled=true`. When enabled, the support is added both to HTTP and gRPC listening ports. #7698
* [ENHANCEMENT] Distributor: add experimental limit for exemplars per series per request, enabled with `-distributor.max-exemplars-per-series-per-request`, the number of discarded exemplars are tracked with `cortex_discarded_exemplars_total{reason="too_many_exemplars_per_series"}` #7989
* [ENHANCEMENT] Store-gateway: merge series from different blocks concurrently. #7456
* [ENHANCEMENT] Store-gateway: Add `stage="wait_max_concurrent"` to `cortex_bucket_store_series_request_stage_duration_seconds` which records how long the query had to wait for its turn for `-blocks-storage.bucket-store.max-concurrent`. #7609
* [ENHANCEMENT] Querier: add `cortex_querier_federation_upstream_query_wait_duration_seconds` to observe time from when a querier picks up a cross-tenant query to when work begins on its single-tenant counterparts. #7209
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -3291,6 +3291,17 @@
"fieldFlag": "validation.max-native-histogram-buckets",
"fieldType": "int"
},
{
"kind": "field",
"name": "max_exemplars_per_series_per_request",
"required": false,
"desc": "Maximum number of exemplars per series per request. 0 to disable limit in request. The exceeding exemplars are dropped.",
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldFlag": "distributor.max-exemplars-per-series-per-request",
"fieldType": "int",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "reduce_native_histogram_over_max_buckets",
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1181,6 +1181,8 @@ Usage of ./cmd/mimir/mimir:
Max ingestion rate (samples/sec) that this distributor will accept. This limit is per-distributor, not per-tenant. Additional push requests will be rejected. Current ingestion rate is computed as exponentially weighted moving average, updated every second. 0 = unlimited.
-distributor.limit-inflight-requests-using-grpc-method-limiter
[deprecated] When enabled, in-flight write requests limit is checked as soon as the gRPC request is received, before the request is decoded and parsed. (default true)
-distributor.max-exemplars-per-series-per-request int
[experimental] Maximum number of exemplars per series per request. 0 to disable limit in request. The exceeding exemplars are dropped.
-distributor.max-recv-msg-size int
Max message size in bytes that the distributors will accept for incoming push requests to the remote write API. If exceeded, the request will be rejected. (default 104857600)
-distributor.metric-relabeling-enabled
Expand Down
2 changes: 2 additions & 0 deletions docs/sources/mimir/configure/about-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ The following features are currently experimental:
- `-distributor.retry-after-header.enabled`
- `-distributor.retry-after-header.base-seconds`
- `-distributor.retry-after-header.max-backoff-exponent`
- Limit exemplars per series per request
- `-distributor.max-exemplars-per-series-per-request`
- Hash ring
- Disabling ring heartbeat timeouts
- `-distributor.ring.heartbeat-timeout=0`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3007,6 +3007,11 @@ The `limits` block configures default and per-tenant limits imposed by component
# CLI flag: -validation.max-native-histogram-buckets
[max_native_histogram_buckets: <int> | default = 0]
# (experimental) Maximum number of exemplars per series per request. 0 to
# disable limit in request. The exceeding exemplars are dropped.
# CLI flag: -distributor.max-exemplars-per-series-per-request
[max_exemplars_per_series_per_request: <int> | default = 0]
# Whether to reduce or reject native histogram samples with more buckets than
# the configured limit.
# CLI flag: -validation.reduce-native-histogram-over-max-buckets
Expand Down
6 changes: 6 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,12 @@ func (d *Distributor) validateSeries(nowt time.Time, ts *mimirpb.PreallocTimeser
return nil
}

allowedExemplars := d.limits.MaxExemplarsPerSeriesPerRequest(userID)
if allowedExemplars > 0 && len(ts.Exemplars) > allowedExemplars {
d.exemplarValidationMetrics.tooManyExemplars.WithLabelValues(userID).Add(float64(len(ts.Exemplars) - allowedExemplars))
ts.ResizeExemplars(allowedExemplars)
}

for i := 0; i < len(ts.Exemplars); {
e := ts.Exemplars[i]
if err := validateExemplar(d.exemplarValidationMetrics, userID, ts.Labels, e); err != nil {
Expand Down
60 changes: 47 additions & 13 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1617,10 +1617,10 @@ func TestDistributor_ExemplarValidation(t *testing.T) {
makeExemplarTimeseries([]string{model.MetricNameLabel, "test"}, 601000, []string{"foo", "bar"}),
},
expectedMetrics: `
# HELP cortex_discarded_exemplars_total The total number of exemplars that were discarded.
# TYPE cortex_discarded_exemplars_total counter
cortex_discarded_exemplars_total{reason="exemplar_too_old",user="user"} 1
`,
# HELP cortex_discarded_exemplars_total The total number of exemplars that were discarded.
# TYPE cortex_discarded_exemplars_total counter
cortex_discarded_exemplars_total{reason="exemplar_too_old",user="user"} 1
`,
},
"should drop exemplars with timestamp lower than the accepted minimum, when multiple exemplars are specified for the same series": {
prepareConfig: func(limits *validation.Limits) {
Expand Down Expand Up @@ -1648,10 +1648,10 @@ func TestDistributor_ExemplarValidation(t *testing.T) {
},
},
expectedMetrics: `
# HELP cortex_discarded_exemplars_total The total number of exemplars that were discarded.
# TYPE cortex_discarded_exemplars_total counter
cortex_discarded_exemplars_total{reason="exemplar_too_old",user="user"} 1
`,
# HELP cortex_discarded_exemplars_total The total number of exemplars that were discarded.
# TYPE cortex_discarded_exemplars_total counter
cortex_discarded_exemplars_total{reason="exemplar_too_old",user="user"} 1
`,
},
"should drop exemplars with timestamp lower than the accepted minimum, when multiple exemplars are specified in the same series": {
prepareConfig: func(limits *validation.Limits) {
Expand Down Expand Up @@ -1679,10 +1679,10 @@ func TestDistributor_ExemplarValidation(t *testing.T) {
},
},
expectedMetrics: `
# HELP cortex_discarded_exemplars_total The total number of exemplars that were discarded.
# TYPE cortex_discarded_exemplars_total counter
cortex_discarded_exemplars_total{reason="exemplar_too_old",user="user"} 1
`,
# HELP cortex_discarded_exemplars_total The total number of exemplars that were discarded.
# TYPE cortex_discarded_exemplars_total counter
cortex_discarded_exemplars_total{reason="exemplar_too_old",user="user"} 1
`,
},
"should drop exemplars with timestamp greater than the accepted maximum, when multiple exemplars are specified in the same series": {
prepareConfig: func(limits *validation.Limits) {
Expand Down Expand Up @@ -1710,9 +1710,43 @@ func TestDistributor_ExemplarValidation(t *testing.T) {
},
},
expectedMetrics: `
# HELP cortex_discarded_exemplars_total The total number of exemplars that were discarded.
# TYPE cortex_discarded_exemplars_total counter
cortex_discarded_exemplars_total{reason="exemplar_too_far_in_future",user="user"} 1
`,
},
"should drop exemplars above the allowed exemplars per series limit, when multiple exemplars are specified in the same series": {
prepareConfig: func(limits *validation.Limits) {
limits.MaxGlobalExemplarsPerUser = 2
limits.MaxExemplarsPerSeriesPerRequest = 2
},
minExemplarTS: 300000,
maxExemplarTS: math.MaxInt64,
req: makeWriteRequestWith(mimirpb.PreallocTimeseries{
TimeSeries: &mimirpb.TimeSeries{
Labels: []mimirpb.LabelAdapter{{Name: model.MetricNameLabel, Value: "test"}},
Exemplars: []mimirpb.Exemplar{
{Labels: []mimirpb.LabelAdapter{{Name: "foo", Value: "bar1"}}, TimestampMs: 600000},
{Labels: []mimirpb.LabelAdapter{{Name: "foo", Value: "bar2"}}, TimestampMs: 601000},
{Labels: []mimirpb.LabelAdapter{{Name: "foo", Value: "bar3"}}, TimestampMs: 602000},
},
},
}),
expectedExemplars: []mimirpb.PreallocTimeseries{
{
TimeSeries: &mimirpb.TimeSeries{
Labels: []mimirpb.LabelAdapter{{Name: model.MetricNameLabel, Value: "test"}},
Exemplars: []mimirpb.Exemplar{
{Labels: []mimirpb.LabelAdapter{{Name: "foo", Value: "bar1"}}, TimestampMs: 600000},
{Labels: []mimirpb.LabelAdapter{{Name: "foo", Value: "bar2"}}, TimestampMs: 601000},
},
},
},
},
expectedMetrics: `
# HELP cortex_discarded_exemplars_total The total number of exemplars that were discarded.
# TYPE cortex_discarded_exemplars_total counter
cortex_discarded_exemplars_total{reason="exemplar_too_far_in_future",user="user"} 1
cortex_discarded_exemplars_total{reason="too_many_exemplars_per_series",user="user"} 1
`,
},
}
Expand Down
16 changes: 10 additions & 6 deletions pkg/distributor/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,13 @@ var (
reasonTooFarInFuture = globalerror.SampleTooFarInFuture.LabelValue()

// Discarded exemplars reasons.
reasonExemplarLabelsMissing = globalerror.ExemplarLabelsMissing.LabelValue()
reasonExemplarLabelsTooLong = globalerror.ExemplarLabelsTooLong.LabelValue()
reasonExemplarTimestampInvalid = globalerror.ExemplarTimestampInvalid.LabelValue()
reasonExemplarLabelsBlank = "exemplar_labels_blank"
reasonExemplarTooOld = "exemplar_too_old"
reasonExemplarTooFarInFuture = "exemplar_too_far_in_future"
reasonExemplarLabelsMissing = globalerror.ExemplarLabelsMissing.LabelValue()
reasonExemplarLabelsTooLong = globalerror.ExemplarLabelsTooLong.LabelValue()
reasonExemplarTimestampInvalid = globalerror.ExemplarTimestampInvalid.LabelValue()
reasonExemplarLabelsBlank = "exemplar_labels_blank"
reasonExemplarTooOld = "exemplar_too_old"
reasonExemplarTooFarInFuture = "exemplar_too_far_in_future"
reasonTooManyExemplarsPerSeries = "too_many_exemplars_per_series"

// Discarded metadata reasons.
reasonMetadataMetricNameTooLong = globalerror.MetricMetadataMetricNameTooLong.LabelValue()
Expand Down Expand Up @@ -175,6 +176,7 @@ type exemplarValidationMetrics struct {
labelsBlank *prometheus.CounterVec
tooOld *prometheus.CounterVec
tooFarInFuture *prometheus.CounterVec
tooManyExemplars *prometheus.CounterVec
}

func (m *exemplarValidationMetrics) deleteUserMetrics(userID string) {
Expand All @@ -184,6 +186,7 @@ func (m *exemplarValidationMetrics) deleteUserMetrics(userID string) {
m.labelsBlank.DeleteLabelValues(userID)
m.tooOld.DeleteLabelValues(userID)
m.tooFarInFuture.DeleteLabelValues(userID)
m.tooManyExemplars.DeleteLabelValues(userID)
}

func newExemplarValidationMetrics(r prometheus.Registerer) *exemplarValidationMetrics {
Expand All @@ -194,6 +197,7 @@ func newExemplarValidationMetrics(r prometheus.Registerer) *exemplarValidationMe
labelsBlank: validation.DiscardedExemplarsCounter(r, reasonExemplarLabelsBlank),
tooOld: validation.DiscardedExemplarsCounter(r, reasonExemplarTooOld),
tooFarInFuture: validation.DiscardedExemplarsCounter(r, reasonExemplarTooFarInFuture),
tooManyExemplars: validation.DiscardedExemplarsCounter(r, reasonTooManyExemplarsPerSeries),
}
}

Expand Down
15 changes: 15 additions & 0 deletions pkg/mimirpb/timeseries.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,21 @@ func (p *PreallocTimeseries) ClearExemplars() {
p.clearUnmarshalData()
}

func (p *PreallocTimeseries) ResizeExemplars(newSize int) {
if len(p.Exemplars) <= newSize {
return
}
// Name and Value may point into a large gRPC buffer, so clear the reference in each exemplar to allow GC
for i := newSize; i < len(p.Exemplars); i++ {
for j := range p.Exemplars[i].Labels {
p.Exemplars[i].Labels[j].Name = ""
p.Exemplars[i].Labels[j].Value = ""
}
}
p.Exemplars = p.Exemplars[:newSize]
p.clearUnmarshalData()
}

func (p *PreallocTimeseries) HistogramsUpdated() {
p.clearUnmarshalData()
}
Expand Down
18 changes: 18 additions & 0 deletions pkg/mimirpb/timeseries_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,24 @@ func TestPreallocTimeseries_SetLabels(t *testing.T) {
require.Nil(t, p.marshalledData)
}

func TestPreallocTimeseries_ResizeExemplars(t *testing.T) {
t.Run("should resize Exemplars when size is bigger than target size", func(t *testing.T) {
p := PreallocTimeseries{
TimeSeries: &TimeSeries{
Exemplars: make([]Exemplar, 10),
},
marshalledData: []byte{1, 2, 3},
}

for i := range p.Exemplars {
p.Exemplars[i] = Exemplar{Labels: []LabelAdapter{{Name: "trace", Value: "1"}, {Name: "service", Value: "A"}}, Value: 1, TimestampMs: int64(i)}
}
p.ResizeExemplars(5)
require.Len(t, p.Exemplars, 5)
require.Nil(t, p.marshalledData)
})
}

func BenchmarkPreallocTimeseries_SortLabelsIfNeeded(b *testing.B) {
bcs := []int{10, 40, 100}

Expand Down
6 changes: 6 additions & 0 deletions pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ type Limits struct {
MaxLabelNamesPerSeries int `yaml:"max_label_names_per_series" json:"max_label_names_per_series"`
MaxMetadataLength int `yaml:"max_metadata_length" json:"max_metadata_length"`
MaxNativeHistogramBuckets int `yaml:"max_native_histogram_buckets" json:"max_native_histogram_buckets"`
MaxExemplarsPerSeriesPerRequest int `yaml:"max_exemplars_per_series_per_request" json:"max_exemplars_per_series_per_request" category:"experimental"`
ReduceNativeHistogramOverMaxBuckets bool `yaml:"reduce_native_histogram_over_max_buckets" json:"reduce_native_histogram_over_max_buckets"`
CreationGracePeriod model.Duration `yaml:"creation_grace_period" json:"creation_grace_period" category:"advanced"`
EnforceMetadataMetricName bool `yaml:"enforce_metadata_metric_name" json:"enforce_metadata_metric_name" category:"advanced"`
Expand Down Expand Up @@ -243,6 +244,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.MaxLabelNamesPerSeries, MaxLabelNamesPerSeriesFlag, 30, "Maximum number of label names per series.")
f.IntVar(&l.MaxMetadataLength, MaxMetadataLengthFlag, 1024, "Maximum length accepted for metric metadata. Metadata refers to Metric Name, HELP and UNIT. Longer metadata is dropped except for HELP which is truncated.")
f.IntVar(&l.MaxNativeHistogramBuckets, maxNativeHistogramBucketsFlag, 0, "Maximum number of buckets per native histogram sample. 0 to disable the limit.")
f.IntVar(&l.MaxExemplarsPerSeriesPerRequest, "distributor.max-exemplars-per-series-per-request", 0, "Maximum number of exemplars per series per request. 0 to disable limit in request. The exceeding exemplars are dropped.")
f.BoolVar(&l.ReduceNativeHistogramOverMaxBuckets, ReduceNativeHistogramOverMaxBucketsFlag, true, "Whether to reduce or reject native histogram samples with more buckets than the configured limit.")
_ = l.CreationGracePeriod.Set("10m")
f.Var(&l.CreationGracePeriod, CreationGracePeriodFlag, "Controls how far into the future incoming samples and exemplars are accepted compared to the wall clock. Any sample or exemplar will be rejected if its timestamp is greater than '(now + grace_period)'. This configuration is enforced in the distributor, ingester and query-frontend (to avoid querying too far into the future).")
Expand Down Expand Up @@ -797,6 +799,10 @@ func (o *Overrides) NativeHistogramsIngestionEnabled(userID string) bool {
return o.getOverridesForUser(userID).NativeHistogramsIngestionEnabled
}

func (o *Overrides) MaxExemplarsPerSeriesPerRequest(userID string) int {
return o.getOverridesForUser(userID).MaxExemplarsPerSeriesPerRequest
}

// RulerTenantShardSize returns shard size (number of rulers) used by this tenant when using shuffle-sharding strategy.
func (o *Overrides) RulerTenantShardSize(userID string) int {
return o.getOverridesForUser(userID).RulerTenantShardSize
Expand Down

0 comments on commit 6af4273

Please sign in to comment.