Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mimir query engine: add native histogram custom bucket-related annotations #8924

Merged
merged 10 commits into from
Aug 8, 2024
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* [CHANGE] Ingester client: experimental support for client-side circuit breakers, their configuration options (`-ingester.client.circuit-breaker.*`) and metrics (`cortex_ingester_client_circuit_breaker_results_total`, `cortex_ingester_client_circuit_breaker_transitions_total`) were removed. #8802
* [CHANGE] Ingester: circuit breakers do not open in case of per-instance limit errors anymore. Opening can be triggered only in case of push and pull requests exceeding the configured duration. #8854
* [CHANGE] Query-frontend: Return `413 Request Entity Too Large` if a response shard for an `/active_series` request is too large. #8861
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8925 #8932
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932
* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8841 #8842 #8845 #8853 #8886
* What it is:
* When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path.
Expand Down
60 changes: 54 additions & 6 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func TestOurTestCases(t *testing.T) {
prometheusEngine := promql.NewEngine(opts.CommonOpts)

testdataFS := os.DirFS("./testdata")
testFiles, err := fs.Glob(testdataFS, "ours/*.test")
testFiles, err := fs.Glob(testdataFS, "ours*/*.test")
require.NoError(t, err)

for _, testFile := range testFiles {
Expand All @@ -203,6 +203,10 @@ func TestOurTestCases(t *testing.T) {

// Run the tests against Prometheus' engine to ensure our test cases are valid.
t.Run("Prometheus' engine", func(t *testing.T) {
if strings.HasPrefix(testFile, "ours-only") {
t.Skip("disabled for Prometheus' engine due to bug in Prometheus' engine")
}

promqltest.RunTest(t, testScript, prometheusEngine)
})
})
Expand Down Expand Up @@ -1275,11 +1279,20 @@ func TestAnnotations(t *testing.T) {
metric{type="histogram", series="2"} {{schema:0 sum:1 count:1 buckets:[1]}}+{{schema:0 sum:5 count:4 buckets:[1 2 1]}}x3
`

nativeHistogramsWithCustomBucketsData := `
metric{series="exponential-buckets"} {{schema:0 sum:1 count:1 buckets:[1]}}+{{schema:0 sum:5 count:4 buckets:[1 2 1]}}x3
metric{series="custom-buckets-1"} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}}+{{schema:-53 sum:5 count:4 custom_values:[5 10] buckets:[1 2 1]}}x3
metric{series="custom-buckets-2"} {{schema:-53 sum:1 count:1 custom_values:[2 3] buckets:[1]}}+{{schema:-53 sum:5 count:4 custom_values:[2 3] buckets:[1 2 1]}}x3
metric{series="mixed-exponential-custom-buckets"} {{schema:0 sum:1 count:1 buckets:[1]}} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} {{schema:0 sum:5 count:4 buckets:[1 2 1]}}
metric{series="incompatible-custom-buckets"} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1]}} {{schema:-53 sum:1 count:1 custom_values:[2 3] buckets:[1]}} {{schema:-53 sum:5 count:4 custom_values:[5 10] buckets:[1 2 1]}}
`

testCases := map[string]struct {
data string
expr string
expectedWarningAnnotations []string
expectedInfoAnnotations []string
data string
expr string
expectedWarningAnnotations []string
expectedInfoAnnotations []string
skipComparisonWithPrometheusReason string
}{
"sum() with float and native histogram at same step": {
data: mixedFloatHistogramData,
Expand Down Expand Up @@ -1361,6 +1374,37 @@ func TestAnnotations(t *testing.T) {
expectedWarningAnnotations: []string{`PromQL warning: this native histogram metric is not a counter: "some_metric" (1:6)`},
},

"sum() over native histograms with both exponential and custom buckets": {
data: nativeHistogramsWithCustomBucketsData,
expr: `sum(metric{series=~"exponential-buckets|custom-buckets-1"})`,
expectedWarningAnnotations: []string{
`PromQL warning: vector contains a mix of histograms with exponential and custom buckets schemas for metric name "metric" (1:5)`,
},
},
"sum() over native histograms with incompatible custom buckets": {
data: nativeHistogramsWithCustomBucketsData,
expr: `sum(metric{series=~"custom-buckets-(1|2)"})`,
expectedWarningAnnotations: []string{
`PromQL warning: vector contains histograms with incompatible custom buckets for metric name "metric" (1:5)`,
},
},

"rate() over native histograms with both exponential and custom buckets": {
data: nativeHistogramsWithCustomBucketsData,
expr: `rate(metric{series="mixed-exponential-custom-buckets"}[1m])`,
expectedWarningAnnotations: []string{
`PromQL warning: vector contains a mix of histograms with exponential and custom buckets schemas for metric name "metric" (1:6)`,
},
skipComparisonWithPrometheusReason: "Prometheus' engine panics for this teste case, https://github.com/prometheus/prometheus/pull/14609 will fix this",
},
"rate() over native histograms with incompatible custom buckets": {
data: nativeHistogramsWithCustomBucketsData,
expr: `rate(metric{series="incompatible-custom-buckets"}[1m])`,
expectedWarningAnnotations: []string{
`PromQL warning: vector contains histograms with incompatible custom buckets for metric name "metric" (1:6)`,
},
},

"multiple annotations from different operators": {
data: `
mixed_metric_count 10 {{schema:0 sum:1 count:1 buckets:[1]}}
Expand All @@ -1385,11 +1429,12 @@ func TestAnnotations(t *testing.T) {
require.NoError(t, err)
prometheusEngine := promql.NewEngine(opts.CommonOpts)

const prometheusEngineName = "Prometheus' engine"
engines := map[string]promql.QueryEngine{
"Mimir's engine": mimirEngine,

// Compare against Prometheus' engine to verify our test cases are valid.
"Prometheus' engine": prometheusEngine,
prometheusEngineName: prometheusEngine,
}

for name, testCase := range testCases {
Expand All @@ -1399,6 +1444,9 @@ func TestAnnotations(t *testing.T) {

for engineName, engine := range engines {
t.Run(engineName, func(t *testing.T) {
if engineName == prometheusEngineName && testCase.skipComparisonWithPrometheusReason != "" {
t.Skipf("Skipping comparison with Prometheus' engine: %v", testCase.skipComparisonWithPrometheusReason)
}

queryTypes := map[string]func() (promql.Query, error){
"range": func() (promql.Query, error) {
Expand Down
22 changes: 22 additions & 0 deletions pkg/streamingpromql/functions/native_histograms.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
package functions

import (
"errors"

"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/util/annotations"

"github.com/grafana/mimir/pkg/streamingpromql/limiting"
"github.com/grafana/mimir/pkg/streamingpromql/types"
Expand Down Expand Up @@ -52,3 +56,21 @@ func HistogramSum(seriesData types.InstantVectorSeriesData, memoryConsumptionTra

return data, nil
}

func NativeHistogramErrorToAnnotation(err error, emitAnnotation EmitAnnotationFunc) error {
if err == nil {
return nil
}

if errors.Is(err, histogram.ErrHistogramsIncompatibleSchema) {
emitAnnotation(annotations.NewMixedExponentialCustomHistogramsWarning)
return nil
}

if errors.Is(err, histogram.ErrHistogramsIncompatibleBounds) {
emitAnnotation(annotations.NewIncompatibleCustomBucketsHistogramsWarning)
return nil
}

return err
}
14 changes: 12 additions & 2 deletions pkg/streamingpromql/functions/rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func rate(step types.RangeVectorStepData, rangeSeconds float64, floatBuffer *typ
if hCount >= 2 {
val, err := histogramRate(histogramBuffer, step, hHead, hTail, rangeSeconds, hCount, emitAnnotation)
if err != nil {
err = NativeHistogramErrorToAnnotation(err, emitAnnotation)
return 0, false, nil, err
}
return 0, false, val, nil
Expand All @@ -54,6 +55,7 @@ func rate(step types.RangeVectorStepData, rangeSeconds float64, floatBuffer *typ

func histogramRate(histogramBuffer *types.HPointRingBuffer, step types.RangeVectorStepData, hHead []promql.HPoint, hTail []promql.HPoint, rangeSeconds float64, hCount int, emitAnnotation EmitAnnotationFunc) (*histogram.FloatHistogram, error) {
firstPoint := histogramBuffer.First()
usingCustomBuckets := firstPoint.H.UsesCustomBuckets()

var lastPoint promql.HPoint
if len(hTail) > 0 {
Expand All @@ -71,6 +73,10 @@ func histogramRate(histogramBuffer *types.HPointRingBuffer, step types.RangeVect
currentSchema = lastPoint.H.Schema
}

if lastPoint.H.UsesCustomBuckets() != usingCustomBuckets {
return nil, histogram.ErrHistogramsIncompatibleSchema
}

delta := lastPoint.H.CopyToSchema(currentSchema)
_, err := delta.Sub(firstPoint.H)
if err != nil {
Expand All @@ -82,11 +88,15 @@ func histogramRate(histogramBuffer *types.HPointRingBuffer, step types.RangeVect
for _, p := range points {
if p.H.DetectReset(previousValue) {
// Counter reset.
_, err = delta.Add(previousValue)
if err != nil {
if _, err := delta.Add(previousValue); err != nil {
return err
}
}

if p.H.UsesCustomBuckets() != usingCustomBuckets {
return histogram.ErrHistogramsIncompatibleSchema
}

if p.H.Schema < currentSchema {
delta = delta.CopyToSchema(p.H.Schema)
}
Expand Down
45 changes: 39 additions & 6 deletions pkg/streamingpromql/operators/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/prometheus/prometheus/util/annotations"
"github.com/prometheus/prometheus/util/zeropool"

"github.com/grafana/mimir/pkg/streamingpromql/functions"
"github.com/grafana/mimir/pkg/streamingpromql/limiting"
"github.com/grafana/mimir/pkg/streamingpromql/types"
)
Expand All @@ -38,7 +39,11 @@ type Aggregation struct {

Annotations *annotations.Annotations

metricNames *MetricNames
currentSeriesIndex int

expressionPosition posrange.PositionRange
emitAnnotationFunc functions.EmitAnnotationFunc

remainingInnerSeriesToGroup []*group // One entry per series produced by Inner, value is the group for that series
remainingGroups []*group // One entry per group, in the order we want to return them
Expand Down Expand Up @@ -68,7 +73,7 @@ func NewAggregation(

slices.Sort(grouping)

return &Aggregation{
a := &Aggregation{
Inner: inner,
Start: s,
End: e,
Expand All @@ -78,8 +83,13 @@ func NewAggregation(
Without: without,
MemoryConsumptionTracker: memoryConsumptionTracker,
Annotations: annotations,
metricNames: &MetricNames{},
expressionPosition: expressionPosition,
}

a.emitAnnotationFunc = a.emitAnnotation // This is an optimisation to avoid creating the EmitAnnotationFunc instance on every usage.

return a
}

type groupWithLabels struct {
Expand Down Expand Up @@ -127,6 +137,8 @@ func (a *Aggregation) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadat
return nil, nil
}

a.metricNames.CaptureMetricNames(innerSeries)

// Determine the groups we'll return.
// Note that we use a string here to uniquely identify the groups, while Prometheus' engine uses a hash without any handling of hash collisions.
// While rare, this may cause differences in the results returned by this engine and Prometheus' engine.
Expand Down Expand Up @@ -278,14 +290,20 @@ func (a *Aggregation) accumulateUntilGroupComplete(ctx context.Context, g *group

thisSeriesGroup := a.remainingInnerSeriesToGroup[0]
a.remainingInnerSeriesToGroup = a.remainingInnerSeriesToGroup[1:]
err = a.accumulateSeriesIntoGroup(s, thisSeriesGroup, a.Steps, a.Start, a.Interval)
if err != nil {
if err := a.accumulateSeriesIntoGroup(s, thisSeriesGroup, a.Steps, a.Start, a.Interval); err != nil {
return err
}

a.currentSeriesIndex++
}
return nil
}

// Sentinel value used to indicate a sample has seen an invalid combination of histograms and should be ignored.
//
// Invalid combinations include exponential and custom buckets, and histograms with incompatible custom buckets.
var invalidCombinationOfHistograms = &histogram.FloatHistogram{}
charleskorn marked this conversation as resolved.
Show resolved Hide resolved

func (a *Aggregation) constructSeriesData(thisGroup *group, start int64, interval int64) (types.InstantVectorSeriesData, error) {
floatPointCount := a.reconcileAndCountFloatPoints(thisGroup)
var floatPoints []promql.FPoint
Expand Down Expand Up @@ -313,7 +331,7 @@ func (a *Aggregation) constructSeriesData(thisGroup *group, start int64, interva
}

for i, h := range thisGroup.histogramSums {
if h != nil {
if h != nil && h != invalidCombinationOfHistograms {
t := start + int64(i)*interval
histogramPoints = append(histogramPoints, promql.HPoint{T: t, H: thisGroup.histogramSums[i]})
}
Expand Down Expand Up @@ -422,10 +440,20 @@ func (a *Aggregation) accumulateSeriesIntoGroup(s types.InstantVectorSeriesData,
for _, p := range s.Histograms {
idx := (p.T - start) / interval

if seriesGroup.histogramSums[idx] != nil {
if seriesGroup.histogramSums[idx] == invalidCombinationOfHistograms {
// We've already seen an invalid combination of histograms at this timestamp. Ignore this point.
continue
} else if seriesGroup.histogramSums[idx] != nil {
seriesGroup.histogramSums[idx], err = seriesGroup.histogramSums[idx].Add(p.H)
if err != nil {
return err
// Unable to add histograms together (likely due to invalid combination of histograms). Make sure we don't emit a sample at this timestamp.
seriesGroup.histogramSums[idx] = invalidCombinationOfHistograms
seriesGroup.histogramPointCount--

if err := functions.NativeHistogramErrorToAnnotation(err, a.emitAnnotationFunc); err != nil {
// Unknown error: we couldn't convert the error to an annotation. Give up.
return err
}
}
} else if lastUncopiedHistogram == p.H {
// We've already used this histogram for a previous point due to lookback.
Expand All @@ -446,6 +474,11 @@ func (a *Aggregation) accumulateSeriesIntoGroup(s types.InstantVectorSeriesData,
return nil
}

func (a *Aggregation) emitAnnotation(generator functions.AnnotationGenerator) {
metricName := a.metricNames.GetMetricNameForSeries(a.currentSeriesIndex)
a.Annotations.Add(generator(metricName, a.Inner.ExpressionPosition()))
}

func (a *Aggregation) Close() {
a.Inner.Close()
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/streamingpromql/operators/aggregation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ func TestAggregation_ReturnsGroupsFinishedFirstEarliest(t *testing.T) {
for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
aggregator := &Aggregation{
Inner: &testOperator{series: testCase.inputSeries},
Grouping: testCase.grouping,
Inner: &testOperator{series: testCase.inputSeries},
Grouping: testCase.grouping,
metricNames: &MetricNames{},
}

outputSeries, err := aggregator.SeriesMetadata(context.Background())
Expand Down
10 changes: 10 additions & 0 deletions pkg/streamingpromql/testdata/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
This directory contains sets of test cases used to test Mimir's query engine (MQE).

There are three subdirectories, each with a different purpose:

- `ours`: test cases we have created. These are run against MQE by `TestOurTestCases`, and also run against Prometheus' engine to confirm they are valid test cases.
- `ours-only`: same as above, but the comparison against Prometheus' engine is skipped.
This is used for test cases that fail on Prometheus' engine due to a bug in Prometheus' engine.
These test cases will generally only remain in this directory temporarily, and move to `ours` once the bug in Prometheus' engine is resolved.
- `upstream`: test cases from https://github.com/prometheus/prometheus/tree/main/promql/promqltest/testdata, modified to disable test cases MQE does not yet support.
These are run against MQE by `TestUpstreamTestCases`, and kept in sync with the source test cases by `TestOurUpstreamTestCasesAreInSyncWithUpstream`.
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# SPDX-License-Identifier: AGPL-3.0-only

# These test cases currently fail on Prometheus' engine due to https://github.com/prometheus/prometheus/pull/14611.
# Once that PR is merged, these tests can move to native_histograms.test.

# Test mixing exponential and custom buckets.
load 6m
metric{series="exponential"} {{sum:4 count:5 buckets:[1 3 1]}} _ {{sum:4 count:5 buckets:[1 3 1]}} {{sum:4 count:5 buckets:[1 3 1]}} _
metric{series="other-exponential"} {{sum:3 count:4 buckets:[1 2 1]}} _ {{sum:3 count:4 buckets:[1 2 1]}} {{sum:3 count:4 buckets:[1 2 1]}} _
metric{series="custom"} _ {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1 0]}} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1 0]}} _ {{schema:-53 sum:2 count:1 custom_values:[5 10] buckets:[1 0]}}
metric{series="other-custom"} _ {{schema:-53 sum:15 count:2 custom_values:[5 10] buckets:[0 2]}} {{schema:-53 sum:15 count:2 custom_values:[5 10] buckets:[0 2]}} _ {{schema:-53 sum:16 count:2 custom_values:[5 10] buckets:[0 2]}}

# T=0: only exponential
# T=6: only custom
# T=12: mixed, should be ignored and emit an warning
# T=18: only exponential
# T=24: only custom
eval_warn range from 0 to 24m step 6m sum(metric)
{} {{sum:7 count:9 buckets:[2 5 2]}} {{schema:-53 sum:16 count:3 custom_values:[5 10] buckets:[1 2]}} _ {{sum:7 count:9 buckets:[2 5 2]}} {{schema:-53 sum:18 count:3 custom_values:[5 10] buckets:[1 2]}}

clear

# Test incompatible custom bucket schemas.
load 6m
metric{series="1"} _ {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1 2]}} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1 4]}} {{schema:-53 sum:1 count:1 custom_values:[3] buckets:[1]}}
metric{series="2"} {{schema:-53 sum:1 count:1 custom_values:[2] buckets:[1]}} _ {{schema:-53 sum:1 count:1 custom_values:[2] buckets:[1]}} {{schema:-53 sum:1.5 count:3 custom_values:[3] buckets:[3]}}
metric{series="3"} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1 2]}} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1 2]}} {{schema:-53 sum:1 count:1 custom_values:[5 10] buckets:[1 4]}} {{schema:-53 sum:1.3 count:2 custom_values:[3] buckets:[2]}}

# T=0: incompatible, should be ignored and emit a warning
# T=6: compatible
# T=12: incompatible followed by compatible, should be ignored and emit a warning
# T=18: compatible
eval_warn range from 0 to 18m step 6m sum(metric)
{} _ {{schema:-53 sum:2 count:2 custom_values:[5 10] buckets:[2 4]}} _ {{schema:-53 sum:3.8 count:6 custom_values:[3] buckets:[6]}}
Loading
Loading