diff --git a/CHANGELOG.md b/CHANGELOG.md index 5a08f1f8ad8..f6f2d2a37b0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,7 +22,7 @@ * [CHANGE] Ingester client: experimental support for client-side circuit breakers, their configuration options (`-ingester.client.circuit-breaker.*`) and metrics (`cortex_ingester_client_circuit_breaker_results_total`, `cortex_ingester_client_circuit_breaker_transitions_total`) were removed. #8802 * [CHANGE] Ingester: circuit breakers do not open in case of per-instance limit errors anymore. Opening can be triggered only in case of push and pull requests exceeding the configured duration. #8854 * [CHANGE] Query-frontend: Return `413 Request Entity Too Large` if a response shard for an `/active_series` request is too large. #8861 -* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 +* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 * [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8841 #8842 #8845 #8853 #8886 * What it is: * When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path. diff --git a/pkg/streamingpromql/benchmarks/benchmarks.go b/pkg/streamingpromql/benchmarks/benchmarks.go index 663db5a0936..da23204d572 100644 --- a/pkg/streamingpromql/benchmarks/benchmarks.go +++ b/pkg/streamingpromql/benchmarks/benchmarks.go @@ -115,6 +115,18 @@ func TestCases(metricSizes []int) []BenchCase { { Expr: "rate(nh_X[1h])", }, + { + Expr: "avg_over_time(a_X[1m])", + }, + { + Expr: "avg_over_time(nh_X[1m])", + }, + { + Expr: "sum_over_time(a_X[1m])", + }, + { + Expr: "sum_over_time(nh_X[1m])", + }, //{ // Expr: "absent_over_time(a_X[1d])", //}, diff --git a/pkg/streamingpromql/config.go b/pkg/streamingpromql/config.go index b0bbaaa3238..641508525c7 100644 --- a/pkg/streamingpromql/config.go +++ b/pkg/streamingpromql/config.go @@ -19,9 +19,13 @@ type FeatureToggles struct { } var overTimeFunctionNames = []string{ + "avg_over_time", "count_over_time", "last_over_time", + "max_over_time", + "min_over_time", "present_over_time", + "sum_over_time", } // EnableAllFeatures enables all features supported by MQE, including experimental or incomplete features. diff --git a/pkg/streamingpromql/engine_concurrency_test.go b/pkg/streamingpromql/engine_concurrency_test.go index 11217b26931..29ff3de990f 100644 --- a/pkg/streamingpromql/engine_concurrency_test.go +++ b/pkg/streamingpromql/engine_concurrency_test.go @@ -42,7 +42,6 @@ func TestConcurrentQueries(t *testing.T) { testCases := []concurrentQueryTestCase{ { - // Rate over native histogram expr: `rate(native_histogram[5m])`, start: startT, end: startT.Add(10 * time.Minute), @@ -62,6 +61,24 @@ func TestConcurrentQueries(t *testing.T) { end: startT.Add(10 * time.Minute), step: time.Minute, }, + { + expr: `count_over_time(native_histogram[5m])`, + start: startT, + end: startT.Add(10 * time.Minute), + step: time.Minute, + }, + { + expr: `sum_over_time(native_histogram[5m])`, + start: startT, + end: startT.Add(10 * time.Minute), + step: time.Minute, + }, + { + expr: `avg_over_time(native_histogram[5m])`, + start: startT, + end: startT.Add(10 * time.Minute), + step: time.Minute, + }, } storage := promqltest.LoadedStorage(t, data) diff --git a/pkg/streamingpromql/engine_test.go b/pkg/streamingpromql/engine_test.go index 805bf425ff5..201cadbb105 100644 --- a/pkg/streamingpromql/engine_test.go +++ b/pkg/streamingpromql/engine_test.go @@ -48,13 +48,13 @@ func TestUnsupportedPromQLFeatures(t *testing.T) { "metric{} or other_metric{}": "binary expression with many-to-many matching", "metric{} + on() group_left() other_metric{}": "binary expression with many-to-one matching", "metric{} + on() group_right() other_metric{}": "binary expression with one-to-many matching", - "1": "scalar value as top-level expression", - "metric{} offset 2h": "instant vector selector with 'offset'", - "avg(metric{})": "'avg' aggregation", - "rate(metric{}[5m] offset 2h)": "range vector selector with 'offset'", - "rate(metric{}[5m:1m])": "PromQL expression type *parser.SubqueryExpr", - "avg_over_time(metric{}[5m])": "'avg_over_time' function", - "-sum(metric{})": "PromQL expression type *parser.UnaryExpr", + "1": "scalar value as top-level expression", + "metric{} offset 2h": "instant vector selector with 'offset'", + "avg(metric{})": "'avg' aggregation", + "rate(metric{}[5m] offset 2h)": "range vector selector with 'offset'", + "rate(metric{}[5m:1m])": "PromQL expression type *parser.SubqueryExpr", + "quantile_over_time(0.4, metric{}[5m])": "'quantile_over_time' function", + "-sum(metric{})": "PromQL expression type *parser.UnaryExpr", } for expression, expectedError := range unsupportedExpressions { @@ -1395,7 +1395,7 @@ func TestAnnotations(t *testing.T) { expectedWarningAnnotations: []string{ `PromQL warning: vector contains a mix of histograms with exponential and custom buckets schemas for metric name "metric" (1:6)`, }, - skipComparisonWithPrometheusReason: "Prometheus' engine panics for this teste case, https://github.com/prometheus/prometheus/pull/14609 will fix this", + skipComparisonWithPrometheusReason: "Prometheus' engine panics for this test case, https://github.com/prometheus/prometheus/pull/14609 will fix this", }, "rate() over native histograms with incompatible custom buckets": { data: nativeHistogramsWithCustomBucketsData, @@ -1405,6 +1405,46 @@ func TestAnnotations(t *testing.T) { }, }, + "sum_over_time() over series with both floats and histograms": { + data: `some_metric 10 {{schema:0 sum:1 count:1 buckets:[1]}}`, + expr: `sum_over_time(some_metric[1m])`, + expectedWarningAnnotations: []string{`PromQL warning: encountered a mix of histograms and floats for metric name "some_metric" (1:15)`}, + }, + "sum_over_time() over native histograms with both exponential and custom buckets": { + data: nativeHistogramsWithCustomBucketsData, + expr: `sum_over_time(metric{series="mixed-exponential-custom-buckets"}[1m])`, + expectedWarningAnnotations: []string{ + `PromQL warning: vector contains a mix of histograms with exponential and custom buckets schemas for metric name "metric" (1:15)`, + }, + }, + "sum_over_time() over native histograms with incompatible custom buckets": { + data: nativeHistogramsWithCustomBucketsData, + expr: `sum_over_time(metric{series="incompatible-custom-buckets"}[1m])`, + expectedWarningAnnotations: []string{ + `PromQL warning: vector contains histograms with incompatible custom buckets for metric name "metric" (1:15)`, + }, + }, + + "avg_over_time() over series with both floats and histograms": { + data: `some_metric 10 {{schema:0 sum:1 count:1 buckets:[1]}}`, + expr: `avg_over_time(some_metric[1m])`, + expectedWarningAnnotations: []string{`PromQL warning: encountered a mix of histograms and floats for metric name "some_metric" (1:15)`}, + }, + "avg_over_time() over native histograms with both exponential and custom buckets": { + data: nativeHistogramsWithCustomBucketsData, + expr: `avg_over_time(metric{series="mixed-exponential-custom-buckets"}[1m])`, + expectedWarningAnnotations: []string{ + `PromQL warning: vector contains a mix of histograms with exponential and custom buckets schemas for metric name "metric" (1:15)`, + }, + }, + "avg_over_time() over native histograms with incompatible custom buckets": { + data: nativeHistogramsWithCustomBucketsData, + expr: `avg_over_time(metric{series="incompatible-custom-buckets"}[1m])`, + expectedWarningAnnotations: []string{ + `PromQL warning: vector contains histograms with incompatible custom buckets for metric name "metric" (1:15)`, + }, + }, + "multiple annotations from different operators": { data: ` mixed_metric_count 10 {{schema:0 sum:1 count:1 buckets:[1]}} diff --git a/pkg/streamingpromql/floats/kahan.go b/pkg/streamingpromql/floats/kahan.go new file mode 100644 index 00000000000..13156879deb --- /dev/null +++ b/pkg/streamingpromql/floats/kahan.go @@ -0,0 +1,23 @@ +// SPDX-License-Identifier: AGPL-3.0-only +// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/promql/engine.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: The Prometheus Authors + +package floats + +import "math" + +func KahanSumInc(inc, sum, c float64) (newSum, newC float64) { + t := sum + inc + switch { + case math.IsInf(t, 0): + c = 0 + + // Using Neumaier improvement, swap if next term larger than sum. + case math.Abs(sum) >= math.Abs(inc): + c += (sum - t) + inc + default: + c += (inc - t) + sum + } + return t, c +} diff --git a/pkg/streamingpromql/floats/kahan_test.go b/pkg/streamingpromql/floats/kahan_test.go new file mode 100644 index 00000000000..d9a24651a27 --- /dev/null +++ b/pkg/streamingpromql/floats/kahan_test.go @@ -0,0 +1,73 @@ +// SPDX-License-Identifier: AGPL-3.0-only +// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/promql/functions_internal_test.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: The Prometheus Authors + +package floats + +import ( + "fmt" + "math" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestKahanSumInc(t *testing.T) { + testCases := map[string]struct { + first float64 + second float64 + expected float64 + }{ + "+Inf + anything = +Inf": { + first: math.Inf(1), + second: 2.0, + expected: math.Inf(1), + }, + "-Inf + anything = -Inf": { + first: math.Inf(-1), + second: 2.0, + expected: math.Inf(-1), + }, + "+Inf + -Inf = NaN": { + first: math.Inf(1), + second: math.Inf(-1), + expected: math.NaN(), + }, + "NaN + anything = NaN": { + first: math.NaN(), + second: 2, + expected: math.NaN(), + }, + "NaN + Inf = NaN": { + first: math.NaN(), + second: math.Inf(1), + expected: math.NaN(), + }, + "NaN + -Inf = NaN": { + first: math.NaN(), + second: math.Inf(-1), + expected: math.NaN(), + }, + } + + runTest := func(t *testing.T, a, b, expected float64) { + t.Run(fmt.Sprintf("%v + %v = %v", a, b, expected), func(t *testing.T) { + sum, c := KahanSumInc(b, a, 0) + result := sum + c + + if math.IsNaN(expected) { + require.Truef(t, math.IsNaN(result), "expected result to be NaN, but got %v (from %v + %v)", result, sum, c) + } else { + require.Equalf(t, expected, result, "expected result to be %v, but got %v (from %v + %v)", expected, result, sum, c) + } + }) + } + + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + runTest(t, testCase.first, testCase.second, testCase.expected) + runTest(t, testCase.second, testCase.first, testCase.expected) + }) + } +} diff --git a/pkg/streamingpromql/functions.go b/pkg/streamingpromql/functions.go index aa09a2b0369..761813b72ae 100644 --- a/pkg/streamingpromql/functions.go +++ b/pkg/streamingpromql/functions.go @@ -99,6 +99,7 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe "asinh": InstantVectorTransformationFunctionOperatorFactory("asinh", functions.Asinh), "atan": InstantVectorTransformationFunctionOperatorFactory("atan", functions.Atan), "atanh": InstantVectorTransformationFunctionOperatorFactory("atanh", functions.Atanh), + "avg_over_time": FunctionOverRangeVectorOperatorFactory("avg_over_time", functions.AvgOverTime), "ceil": InstantVectorTransformationFunctionOperatorFactory("ceil", functions.Ceil), "cos": InstantVectorTransformationFunctionOperatorFactory("cos", functions.Cos), "cosh": InstantVectorTransformationFunctionOperatorFactory("cosh", functions.Cosh), @@ -109,6 +110,8 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe "histogram_count": InstantVectorTransformationFunctionOperatorFactory("histogram_count", functions.HistogramCount), "histogram_sum": InstantVectorTransformationFunctionOperatorFactory("histogram_sum", functions.HistogramSum), "last_over_time": FunctionOverRangeVectorOperatorFactory("last_over_time", functions.LastOverTime), + "max_over_time": FunctionOverRangeVectorOperatorFactory("max_over_time", functions.MaxOverTime), + "min_over_time": FunctionOverRangeVectorOperatorFactory("min_over_time", functions.MinOverTime), "ln": InstantVectorTransformationFunctionOperatorFactory("ln", functions.Ln), "log10": InstantVectorTransformationFunctionOperatorFactory("log10", functions.Log10), "log2": InstantVectorTransformationFunctionOperatorFactory("log2", functions.Log2), @@ -119,6 +122,7 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe "sin": InstantVectorTransformationFunctionOperatorFactory("sin", functions.Sin), "sinh": InstantVectorTransformationFunctionOperatorFactory("sinh", functions.Sinh), "sqrt": InstantVectorTransformationFunctionOperatorFactory("sqrt", functions.Sqrt), + "sum_over_time": FunctionOverRangeVectorOperatorFactory("sum_over_time", functions.SumOverTime), "tan": InstantVectorTransformationFunctionOperatorFactory("tan", functions.Tan), "tanh": InstantVectorTransformationFunctionOperatorFactory("tanh", functions.Tanh), } diff --git a/pkg/streamingpromql/functions/range_vectors.go b/pkg/streamingpromql/functions/range_vectors.go index 7ce38ba265e..c00b9c03fae 100644 --- a/pkg/streamingpromql/functions/range_vectors.go +++ b/pkg/streamingpromql/functions/range_vectors.go @@ -1,10 +1,18 @@ // SPDX-License-Identifier: AGPL-3.0-only +// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/promql/functions.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: The Prometheus Authors package functions import ( + "math" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/util/annotations" + "github.com/grafana/mimir/pkg/streamingpromql/floats" "github.com/grafana/mimir/pkg/streamingpromql/types" ) @@ -13,7 +21,7 @@ var CountOverTime = FunctionOverRangeVector{ StepFunc: countOverTime, } -func countOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPointRingBuffer, hPoints *types.HPointRingBuffer, _ EmitAnnotationFunc) (f float64, hasFloat bool, h *histogram.FloatHistogram, err error) { +func countOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPointRingBuffer, hPoints *types.HPointRingBuffer, _ EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) { fPointCount := fPoints.CountAtOrBefore(step.RangeEnd) hPointCount := hPoints.CountAtOrBefore(step.RangeEnd) @@ -29,7 +37,7 @@ var LastOverTime = FunctionOverRangeVector{ StepFunc: lastOverTime, } -func lastOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPointRingBuffer, hPoints *types.HPointRingBuffer, _ EmitAnnotationFunc) (f float64, hasFloat bool, h *histogram.FloatHistogram, err error) { +func lastOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPointRingBuffer, hPoints *types.HPointRingBuffer, _ EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) { lastFloat, floatAvailable := fPoints.LastAtOrBefore(step.RangeEnd) lastHistogram, histogramAvailable := hPoints.LastAtOrBefore(step.RangeEnd) @@ -50,10 +58,314 @@ var PresentOverTime = FunctionOverRangeVector{ StepFunc: presentOverTime, } -func presentOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPointRingBuffer, hPoints *types.HPointRingBuffer, _ EmitAnnotationFunc) (f float64, hasFloat bool, h *histogram.FloatHistogram, err error) { +func presentOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPointRingBuffer, hPoints *types.HPointRingBuffer, _ EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) { if fPoints.AnyAtOrBefore(step.RangeEnd) || hPoints.AnyAtOrBefore(step.RangeEnd) { return 1, true, nil, nil } return 0, false, nil, nil } + +var MaxOverTime = FunctionOverRangeVector{ + SeriesMetadataFunc: DropSeriesName, + StepFunc: maxOverTime, +} + +func maxOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPointRingBuffer, _ *types.HPointRingBuffer, _ EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) { + head, tail := fPoints.UnsafePoints(step.RangeEnd) + + if len(head) == 0 && len(tail) == 0 { + return 0, false, nil, nil + } + + var maxSoFar float64 + + if len(head) > 0 { + maxSoFar = head[0].F + head = head[1:] + } else { + maxSoFar = tail[0].F + tail = tail[1:] + } + + for _, p := range head { + if p.F > maxSoFar || math.IsNaN(maxSoFar) { + maxSoFar = p.F + } + } + + for _, p := range tail { + if p.F > maxSoFar || math.IsNaN(maxSoFar) { + maxSoFar = p.F + } + } + + return maxSoFar, true, nil, nil +} + +var MinOverTime = FunctionOverRangeVector{ + SeriesMetadataFunc: DropSeriesName, + StepFunc: minOverTime, +} + +func minOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPointRingBuffer, _ *types.HPointRingBuffer, _ EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) { + head, tail := fPoints.UnsafePoints(step.RangeEnd) + + if len(head) == 0 && len(tail) == 0 { + return 0, false, nil, nil + } + + var minSoFar float64 + + if len(head) > 0 { + minSoFar = head[0].F + head = head[1:] + } else { + minSoFar = tail[0].F + tail = tail[1:] + } + + for _, p := range head { + if p.F < minSoFar || math.IsNaN(minSoFar) { + minSoFar = p.F + } + } + + for _, p := range tail { + if p.F < minSoFar || math.IsNaN(minSoFar) { + minSoFar = p.F + } + } + + return minSoFar, true, nil, nil +} + +var SumOverTime = FunctionOverRangeVector{ + SeriesMetadataFunc: DropSeriesName, + StepFunc: sumOverTime, + NeedsSeriesNamesForAnnotations: true, +} + +func sumOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPointRingBuffer, hPoints *types.HPointRingBuffer, emitAnnotation EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) { + fHead, fTail := fPoints.UnsafePoints(step.RangeEnd) + hHead, hTail := hPoints.UnsafePoints(step.RangeEnd) + + haveFloats := len(fHead) > 0 || len(fTail) > 0 + haveHistograms := len(hHead) > 0 || len(hTail) > 0 + + if !haveFloats && !haveHistograms { + return 0, false, nil, nil + } + + if haveFloats && haveHistograms { + emitAnnotation(annotations.NewMixedFloatsHistogramsWarning) + return 0, false, nil, nil + } + + if haveFloats { + return sumFloats(fHead, fTail), true, nil, nil + } + + h, err := sumHistograms(hHead, hTail, emitAnnotation) + return 0, false, h, err +} + +func sumFloats(head, tail []promql.FPoint) float64 { + sum, c := 0.0, 0.0 + + for _, p := range head { + sum, c = floats.KahanSumInc(p.F, sum, c) + } + + for _, p := range tail { + sum, c = floats.KahanSumInc(p.F, sum, c) + } + + return sum + c +} + +func sumHistograms(head, tail []promql.HPoint, emitAnnotation EmitAnnotationFunc) (*histogram.FloatHistogram, error) { + var sum *histogram.FloatHistogram + + if len(head) > 0 { + sum = head[0].H + head = head[1:] + } else { + sum = tail[0].H + tail = tail[1:] + } + + // We must make a copy of the histogram, as the ring buffer may reuse the FloatHistogram instance on subsequent steps. + sum = sum.Copy() + + for _, p := range head { + if _, err := sum.Add(p.H); err != nil { + err = NativeHistogramErrorToAnnotation(err, emitAnnotation) + return nil, err + } + } + + for _, p := range tail { + if _, err := sum.Add(p.H); err != nil { + err = NativeHistogramErrorToAnnotation(err, emitAnnotation) + return nil, err + } + } + + return sum, nil +} + +var AvgOverTime = FunctionOverRangeVector{ + SeriesMetadataFunc: DropSeriesName, + StepFunc: avgOverTime, + NeedsSeriesNamesForAnnotations: true, +} + +func avgOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPointRingBuffer, hPoints *types.HPointRingBuffer, emitAnnotation EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) { + fHead, fTail := fPoints.UnsafePoints(step.RangeEnd) + hHead, hTail := hPoints.UnsafePoints(step.RangeEnd) + + haveFloats := len(fHead) > 0 || len(fTail) > 0 + haveHistograms := len(hHead) > 0 || len(hTail) > 0 + + if !haveFloats && !haveHistograms { + return 0, false, nil, nil + } + + if haveFloats && haveHistograms { + emitAnnotation(annotations.NewMixedFloatsHistogramsWarning) + return 0, false, nil, nil + } + + if haveFloats { + return avgFloats(fHead, fTail), true, nil, nil + } + + h, err := avgHistograms(hHead, hTail) + + if err != nil { + err = NativeHistogramErrorToAnnotation(err, emitAnnotation) + } + + return 0, false, h, err +} + +func avgFloats(head, tail []promql.FPoint) float64 { + sum, c, count := 0.0, 0.0, 0.0 + avgSoFar := 0.0 // Only used for incremental calculation method. + useIncrementalCalculation := false + + accumulate := func(points []promql.FPoint) { + for _, p := range points { + count++ + + if !useIncrementalCalculation { + newSum, newC := floats.KahanSumInc(p.F, sum, c) + + if count == 1 || !math.IsInf(newSum, 0) { + // Continue using simple average calculation provided we haven't overflowed, + // and also for first point to avoid dividing by zero below. + sum, c = newSum, newC + continue + } + + // We've just hit overflow, switch to incremental calculation. + useIncrementalCalculation = true + avgSoFar = sum / (count - 1) + c = c / (count - 1) + } + + // If we get here, we've hit overflow at some point in the range. + // Use incremental calculation method to produce more accurate results. + if math.IsInf(avgSoFar, 0) { + if math.IsInf(p.F, 0) && (avgSoFar > 0) == (p.F > 0) { + // Running average is infinite and the next point is also the same infinite. + // We already have the correct running value, so just continue. + continue + } + if !math.IsInf(p.F, 0) && !math.IsNaN(p.F) { + // Running average is infinite, and the next point is neither infinite nor NaN. + // The running average will still be infinite after considering this point, so just continue + // to avoid incorrectly introducing NaN below. + continue + } + } + + avgSoFar, c = floats.KahanSumInc(p.F/count-(avgSoFar+c)/count, avgSoFar, c) + } + } + + accumulate(head) + accumulate(tail) + + if useIncrementalCalculation { + return avgSoFar + c + } + + return (sum + c) / count +} + +func avgHistograms(head, tail []promql.HPoint) (*histogram.FloatHistogram, error) { + var avgSoFar *histogram.FloatHistogram + count := 1.0 + + if len(head) > 0 { + avgSoFar = head[0].H + head = head[1:] + } else { + avgSoFar = tail[0].H + tail = tail[1:] + } + + // We must make a copy of the histogram, as the ring buffer may reuse the FloatHistogram instance on subsequent steps. + avgSoFar = avgSoFar.Copy() + + // Reuse these instances if we need them, to avoid allocating two FloatHistograms for every remaining histogram in the range. + var contributionByP *histogram.FloatHistogram + var contributionByAvgSoFar *histogram.FloatHistogram + + accumulate := func(points []promql.HPoint) error { + for _, p := range points { + count++ + + // Make a copy of p.H, as the ring buffer may reuse the FloatHistogram instance on subsequent steps. + if contributionByP == nil { + contributionByP = p.H.Copy() + } else { + p.H.CopyTo(contributionByP) + } + + // Make a copy of avgSoFar so we can divide it below without modifying the running total. + if contributionByAvgSoFar == nil { + contributionByAvgSoFar = avgSoFar.Copy() + } else { + avgSoFar.CopyTo(contributionByAvgSoFar) + } + + contributionByP = contributionByP.Div(count) + contributionByAvgSoFar = contributionByAvgSoFar.Div(count) + + change, err := contributionByP.Sub(contributionByAvgSoFar) + if err != nil { + return err + } + + avgSoFar, err = avgSoFar.Add(change) + if err != nil { + return err + } + } + + return nil + } + + if err := accumulate(head); err != nil { + return nil, err + } + + if err := accumulate(tail); err != nil { + return nil, err + } + + return avgSoFar, nil +} diff --git a/pkg/streamingpromql/operators/aggregation.go b/pkg/streamingpromql/operators/aggregation.go index 996f837406b..17cc3e8c903 100644 --- a/pkg/streamingpromql/operators/aggregation.go +++ b/pkg/streamingpromql/operators/aggregation.go @@ -9,7 +9,6 @@ import ( "context" "errors" "fmt" - "math" "slices" "sort" "time" @@ -22,6 +21,7 @@ import ( "github.com/prometheus/prometheus/util/annotations" "github.com/prometheus/prometheus/util/zeropool" + "github.com/grafana/mimir/pkg/streamingpromql/floats" "github.com/grafana/mimir/pkg/streamingpromql/functions" "github.com/grafana/mimir/pkg/streamingpromql/limiting" "github.com/grafana/mimir/pkg/streamingpromql/types" @@ -431,7 +431,7 @@ func (a *Aggregation) accumulateSeriesIntoGroup(s types.InstantVectorSeriesData, for _, p := range s.Floats { idx := (p.T - start) / interval - seriesGroup.floatSums[idx], seriesGroup.floatCompensatingMeans[idx] = kahanSumInc(p.F, seriesGroup.floatSums[idx], seriesGroup.floatCompensatingMeans[idx]) + seriesGroup.floatSums[idx], seriesGroup.floatCompensatingMeans[idx] = floats.KahanSumInc(p.F, seriesGroup.floatSums[idx], seriesGroup.floatCompensatingMeans[idx]) seriesGroup.floatPresent[idx] = true } @@ -500,20 +500,3 @@ func (g groupSorter) Swap(i, j int) { g.metadata[i], g.metadata[j] = g.metadata[j], g.metadata[i] g.groups[i], g.groups[j] = g.groups[j], g.groups[i] } - -// TODO(jhesketh): This will likely be useful elsewhere, so we may move this in the future. -// (We could also consider exporting this from the upstream promql package). -func kahanSumInc(inc, sum, c float64) (newSum, newC float64) { - t := sum + inc - switch { - case math.IsInf(t, 0): - c = 0 - - // Using Neumaier improvement, swap if next term larger than sum. - case math.Abs(sum) >= math.Abs(inc): - c += (sum - t) + inc - default: - c += (inc - t) + sum - } - return t, c -} diff --git a/pkg/streamingpromql/operators/aggregation_test.go b/pkg/streamingpromql/operators/aggregation_test.go index 186b8bafc2b..5db9b3b8804 100644 --- a/pkg/streamingpromql/operators/aggregation_test.go +++ b/pkg/streamingpromql/operators/aggregation_test.go @@ -4,8 +4,6 @@ package operators import ( "context" - "fmt" - "math" "testing" "time" @@ -265,64 +263,3 @@ func labelsToSeriesMetadata(lbls []labels.Labels) []types.SeriesMetadata { return m } - -// This test is copied from promql/functions_internal_test.go. -// If kahanSumInc can be exported we can remove this. -func TestKahanSumInc(t *testing.T) { - testCases := map[string]struct { - first float64 - second float64 - expected float64 - }{ - "+Inf + anything = +Inf": { - first: math.Inf(1), - second: 2.0, - expected: math.Inf(1), - }, - "-Inf + anything = -Inf": { - first: math.Inf(-1), - second: 2.0, - expected: math.Inf(-1), - }, - "+Inf + -Inf = NaN": { - first: math.Inf(1), - second: math.Inf(-1), - expected: math.NaN(), - }, - "NaN + anything = NaN": { - first: math.NaN(), - second: 2, - expected: math.NaN(), - }, - "NaN + Inf = NaN": { - first: math.NaN(), - second: math.Inf(1), - expected: math.NaN(), - }, - "NaN + -Inf = NaN": { - first: math.NaN(), - second: math.Inf(-1), - expected: math.NaN(), - }, - } - - runTest := func(t *testing.T, a, b, expected float64) { - t.Run(fmt.Sprintf("%v + %v = %v", a, b, expected), func(t *testing.T) { - sum, c := kahanSumInc(b, a, 0) - result := sum + c - - if math.IsNaN(expected) { - require.Truef(t, math.IsNaN(result), "expected result to be NaN, but got %v (from %v + %v)", result, sum, c) - } else { - require.Equalf(t, expected, result, "expected result to be %v, but got %v (from %v + %v)", expected, result, sum, c) - } - }) - } - - for name, testCase := range testCases { - t.Run(name, func(t *testing.T) { - runTest(t, testCase.first, testCase.second, testCase.expected) - runTest(t, testCase.second, testCase.first, testCase.expected) - }) - } -} diff --git a/pkg/streamingpromql/testdata/ours-only/native_histograms_custom_buckets.test b/pkg/streamingpromql/testdata/ours-only/native_histograms_custom_buckets.test index 89ec644f811..e69d2c68742 100644 --- a/pkg/streamingpromql/testdata/ours-only/native_histograms_custom_buckets.test +++ b/pkg/streamingpromql/testdata/ours-only/native_histograms_custom_buckets.test @@ -12,7 +12,7 @@ load 6m # T=0: only exponential # T=6: only custom -# T=12: mixed, should be ignored and emit an warning +# T=12: mixed, should be ignored and emit a warning # T=18: only exponential # T=24: only custom eval_warn range from 0 to 24m step 6m sum(metric) diff --git a/pkg/streamingpromql/testdata/ours/functions.test b/pkg/streamingpromql/testdata/ours/functions.test index 6f99c0881d3..15982db8ead 100644 --- a/pkg/streamingpromql/testdata/ours/functions.test +++ b/pkg/streamingpromql/testdata/ours/functions.test @@ -67,22 +67,78 @@ eval range from 0 to 6m step 1m floor(some_metric) clear load 1m - some_metric 0 1 2 3 _ _ {{schema:3 sum:4 count:4 buckets:[1 2 1]}} {{schema:3 sum:4 count:4 buckets:[1 2 1]}} + some_metric{foo="bar"} 0 1 2 3 _ _ {{schema:3 sum:4 count:4 buckets:[1 2 1]}} {{schema:3 sum:5 count:3 buckets:[2 5 4]}} + some_nhcb_metric{baz="bar"} {{schema:-53 sum:1 count:5 custom_values:[5 10] buckets:[1 4]}} {{schema:-53 sum:15 count:2 custom_values:[5 10] buckets:[0 2]}} {{schema:-53 sum:3 count:15 custom_values:[5 10] buckets:[7 8]}} + some_inf_and_nan_metric{foo="baz"} 0 1 2 3 Inf Inf Inf NaN NaN NaN NaN 8 7 6 eval range from 0 to 7m step 1m count_over_time(some_metric[3m]) - {} 1 2 3 4 3 2 2 2 + {foo="bar"} 1 2 3 4 3 2 2 2 eval range from 0 to 7m step 1m count_over_time(some_metric[5s]) - {} 1 1 1 1 _ _ 1 1 + {foo="bar"} 1 1 1 1 _ _ 1 1 eval range from 0 to 7m step 1m last_over_time(some_metric[3m]) - some_metric 0 1 2 3 3 3 {{schema:3 sum:4 count:4 buckets:[1 2 1]}} {{schema:3 sum:4 count:4 buckets:[1 2 1]}} + some_metric{foo="bar"} 0 1 2 3 3 3 {{schema:3 sum:4 count:4 buckets:[1 2 1]}} {{schema:3 sum:5 count:3 buckets:[2 5 4]}} eval range from 0 to 7m step 1m last_over_time(some_metric[5s]) - some_metric 0 1 2 3 _ _ {{schema:3 sum:4 count:4 buckets:[1 2 1]}} {{schema:3 sum:4 count:4 buckets:[1 2 1]}} + some_metric{foo="bar"} 0 1 2 3 _ _ {{schema:3 sum:4 count:4 buckets:[1 2 1]}} {{schema:3 sum:5 count:3 buckets:[2 5 4]}} eval range from 0 to 7m step 1m present_over_time(some_metric[3m]) - {} 1 1 1 1 1 1 1 1 + {foo="bar"} 1 1 1 1 1 1 1 1 eval range from 0 to 7m step 1m present_over_time(some_metric[5s]) - {} 1 1 1 1 _ _ 1 1 + {foo="bar"} 1 1 1 1 _ _ 1 1 + +eval range from 0 to 7m step 1m min_over_time(some_metric[3m]) + {foo="bar"} 0 0 0 0 1 2 3 _ + +eval range from 0 to 7m step 1m min_over_time(some_metric[5s]) + {foo="bar"} 0 1 2 3 _ _ _ _ + +eval range from 0 to 16m step 1m min_over_time(some_inf_and_nan_metric[3m]) + {foo="baz"} 0 0 0 0 1 2 3 Inf Inf Inf NaN 8 7 6 6 6 6 + +eval range from 0 to 7m step 1m max_over_time(some_metric[3m]) + {foo="bar"} 0 1 2 3 3 3 3 _ + +eval range from 0 to 7m step 1m max_over_time(some_metric[5s]) + {foo="bar"} 0 1 2 3 _ _ _ _ + +eval range from 0 to 16m step 1m max_over_time(some_inf_and_nan_metric[3m]) + {foo="baz"} 0 1 2 3 Inf Inf Inf Inf Inf Inf NaN 8 8 8 8 7 6 + +eval_warn range from 0 to 10m step 1m sum_over_time(some_metric[3m]) + {foo="bar"} 0 1 3 6 6 5 _ {{schema:3 sum:9 count:7 buckets:[3 7 5]}} {{schema:3 sum:9 count:7 buckets:[3 7 5]}} {{schema:3 sum:9 count:7 buckets:[3 7 5]}} {{schema:3 sum:5 count:3 buckets:[2 5 4]}} + +eval range from 0 to 5m step 1m sum_over_time(some_metric[3m]) + {foo="bar"} 0 1 3 6 6 5 + +eval range from 7m to 10m step 1m sum_over_time(some_metric[3m]) + {foo="bar"} {{schema:3 sum:9 count:7 buckets:[3 7 5]}} {{schema:3 sum:9 count:7 buckets:[3 7 5]}} {{schema:3 sum:9 count:7 buckets:[3 7 5]}} {{schema:3 sum:5 count:3 buckets:[2 5 4]}} + +eval range from 0 to 7m step 1m sum_over_time(some_metric[5s]) + {foo="bar"} 0 1 2 3 _ _ {{schema:3 sum:4 count:4 buckets:[1 2 1]}} {{schema:3 sum:5 count:3 buckets:[2 5 4]}} + +eval range from 0 to 2m step 1m sum_over_time(some_nhcb_metric[3m]) + {baz="bar"} {{schema:-53 sum:1 count:5 custom_values:[5 10] buckets:[1 4]}} {{schema:-53 sum:16 count:7 custom_values:[5 10] buckets:[1 6]}} {{schema:-53 sum:19 count:22 custom_values:[5 10] buckets:[8 14]}} + +eval range from 0 to 16m step 1m sum_over_time(some_inf_and_nan_metric[3m]) + {foo="baz"} 0 1 3 6 Inf Inf Inf NaN NaN NaN NaN NaN NaN NaN 21 13 6 + +eval_warn range from 0 to 10m step 1m avg_over_time(some_metric[3m]) + {foo="bar"} 0 0.5 1 1.5 2 2.5 _ {{schema:3 sum:4.5 count:3.5 buckets:[1.5 3.5 2.5]}} {{schema:3 sum:4.5 count:3.5 buckets:[1.5 3.5 2.5]}} {{schema:3 sum:4.5 count:3.5 buckets:[1.5 3.5 2.5]}} {{schema:3 sum:5 count:3 buckets:[2 5 4]}} + +eval range from 0 to 5m step 1m avg_over_time(some_metric[3m]) + {foo="bar"} 0 0.5 1 1.5 2 2.5 + +eval range from 7m to 10m step 1m avg_over_time(some_metric[3m]) + {foo="bar"} {{schema:3 sum:4.5 count:3.5 buckets:[1.5 3.5 2.5]}} {{schema:3 sum:4.5 count:3.5 buckets:[1.5 3.5 2.5]}} {{schema:3 sum:4.5 count:3.5 buckets:[1.5 3.5 2.5]}} {{schema:3 sum:5 count:3 buckets:[2 5 4]}} + +eval range from 0 to 7m step 1m avg_over_time(some_metric[5s]) + {foo="bar"} 0 1 2 3 _ _ {{schema:3 sum:4 count:4 buckets:[1 2 1]}} {{schema:3 sum:5 count:3 buckets:[2 5 4]}} + +eval range from 0 to 2m step 1m avg_over_time(some_nhcb_metric[3m]) + {baz="bar"} {{schema:-53 sum:1 count:5 custom_values:[5 10] buckets:[1 4]}} {{schema:-53 sum:8 count:3.5 custom_values:[5 10] buckets:[0.5 3]}} {{schema:-53 sum:6.333333333333334 count:7.333333333333333 custom_values:[5 10] buckets:[2.666666666666667 4.666666666666666]}} + +eval range from 0 to 16m step 1m avg_over_time(some_inf_and_nan_metric[3m]) + {foo="baz"} 0 0.5 1 1.5 Inf Inf Inf NaN NaN NaN NaN NaN NaN NaN 7 6.5 6 diff --git a/pkg/streamingpromql/testdata/upstream/at_modifier.test b/pkg/streamingpromql/testdata/upstream/at_modifier.test index 3c275389a14..5f23f94f673 100644 --- a/pkg/streamingpromql/testdata/upstream/at_modifier.test +++ b/pkg/streamingpromql/testdata/upstream/at_modifier.test @@ -84,14 +84,12 @@ eval instant at 10s metric @ 1m40s # {job="2"} -20 # Millisecond precision. -# Unsupported by streaming engine. -# eval instant at 100s metric_ms @ 1.234 -# metric_ms 1234 +eval instant at 100s metric_ms @ 1.234 + metric_ms 1234 # Range vector selectors. -# Unsupported by streaming engine. -# eval instant at 25s sum_over_time(metric{job="1"}[100s] @ 100) -# {job="1"} 55 +eval instant at 25s sum_over_time(metric{job="1"}[100s] @ 100) + {job="1"} 55 # Unsupported by streaming engine. # eval instant at 25s sum_over_time(metric{job="1"}[100s] @ 100 offset 50s) diff --git a/pkg/streamingpromql/testdata/upstream/functions.test b/pkg/streamingpromql/testdata/upstream/functions.test index 1612f1adba8..308c168a07e 100644 --- a/pkg/streamingpromql/testdata/upstream/functions.test +++ b/pkg/streamingpromql/testdata/upstream/functions.test @@ -772,132 +772,102 @@ load 10s metric9 -9.988465674311579e+307 -9.988465674311579e+307 -9.988465674311579e+307 metric10 -9.988465674311579e+307 9.988465674311579e+307 -# Unsupported by streaming engine. -# eval instant at 1m avg_over_time(metric[1m]) -# {} 3 +eval instant at 1m avg_over_time(metric[1m]) + {} 3 -# Unsupported by streaming engine. -# eval instant at 1m sum_over_time(metric[1m])/count_over_time(metric[1m]) -# {} 3 +eval instant at 1m sum_over_time(metric[1m])/count_over_time(metric[1m]) + {} 3 -# Unsupported by streaming engine. -# eval instant at 1m avg_over_time(metric2[1m]) -# {} Inf +eval instant at 1m avg_over_time(metric2[1m]) + {} Inf -# Unsupported by streaming engine. -# eval instant at 1m sum_over_time(metric2[1m])/count_over_time(metric2[1m]) -# {} Inf +eval instant at 1m sum_over_time(metric2[1m])/count_over_time(metric2[1m]) + {} Inf -# Unsupported by streaming engine. -# eval instant at 1m avg_over_time(metric3[1m]) -# {} -Inf +eval instant at 1m avg_over_time(metric3[1m]) + {} -Inf -# Unsupported by streaming engine. -# eval instant at 1m sum_over_time(metric3[1m])/count_over_time(metric3[1m]) -# {} -Inf +eval instant at 1m sum_over_time(metric3[1m])/count_over_time(metric3[1m]) + {} -Inf -# Unsupported by streaming engine. -# eval instant at 1m avg_over_time(metric4[1m]) -# {} NaN +eval instant at 1m avg_over_time(metric4[1m]) + {} NaN -# Unsupported by streaming engine. -# eval instant at 1m sum_over_time(metric4[1m])/count_over_time(metric4[1m]) -# {} NaN +eval instant at 1m sum_over_time(metric4[1m])/count_over_time(metric4[1m]) + {} NaN -# Unsupported by streaming engine. -# eval instant at 1m avg_over_time(metric5[1m]) -# {} Inf +eval instant at 1m avg_over_time(metric5[1m]) + {} Inf -# Unsupported by streaming engine. -# eval instant at 1m sum_over_time(metric5[1m])/count_over_time(metric5[1m]) -# {} Inf +eval instant at 1m sum_over_time(metric5[1m])/count_over_time(metric5[1m]) + {} Inf -# Unsupported by streaming engine. -# eval instant at 1m avg_over_time(metric5b[1m]) -# {} Inf +eval instant at 1m avg_over_time(metric5b[1m]) + {} Inf -# Unsupported by streaming engine. -# eval instant at 1m sum_over_time(metric5b[1m])/count_over_time(metric5b[1m]) -# {} Inf +eval instant at 1m sum_over_time(metric5b[1m])/count_over_time(metric5b[1m]) + {} Inf -# Unsupported by streaming engine. -# eval instant at 1m avg_over_time(metric5c[1m]) -# {} NaN +eval instant at 1m avg_over_time(metric5c[1m]) + {} NaN -# Unsupported by streaming engine. -# eval instant at 1m sum_over_time(metric5c[1m])/count_over_time(metric5c[1m]) -# {} NaN +eval instant at 1m sum_over_time(metric5c[1m])/count_over_time(metric5c[1m]) + {} NaN -# Unsupported by streaming engine. -# eval instant at 1m avg_over_time(metric6[1m]) -# {} -Inf +eval instant at 1m avg_over_time(metric6[1m]) + {} -Inf -# Unsupported by streaming engine. -# eval instant at 1m sum_over_time(metric6[1m])/count_over_time(metric6[1m]) -# {} -Inf +eval instant at 1m sum_over_time(metric6[1m])/count_over_time(metric6[1m]) + {} -Inf -# Unsupported by streaming engine. -# eval instant at 1m avg_over_time(metric6b[1m]) -# {} -Inf +eval instant at 1m avg_over_time(metric6b[1m]) + {} -Inf -# Unsupported by streaming engine. -# eval instant at 1m sum_over_time(metric6b[1m])/count_over_time(metric6b[1m]) -# {} -Inf +eval instant at 1m sum_over_time(metric6b[1m])/count_over_time(metric6b[1m]) + {} -Inf -# Unsupported by streaming engine. -# eval instant at 1m avg_over_time(metric6c[1m]) -# {} NaN +eval instant at 1m avg_over_time(metric6c[1m]) + {} NaN -# Unsupported by streaming engine. -# eval instant at 1m sum_over_time(metric6c[1m])/count_over_time(metric6c[1m]) -# {} NaN +eval instant at 1m sum_over_time(metric6c[1m])/count_over_time(metric6c[1m]) + {} NaN -# Unsupported by streaming engine. -# eval instant at 1m avg_over_time(metric7[1m]) -# {} NaN +eval instant at 1m avg_over_time(metric7[1m]) + {} NaN -# Unsupported by streaming engine. -# eval instant at 1m sum_over_time(metric7[1m])/count_over_time(metric7[1m]) -# {} NaN +eval instant at 1m sum_over_time(metric7[1m])/count_over_time(metric7[1m]) + {} NaN -# Unsupported by streaming engine. -# eval instant at 1m avg_over_time(metric8[1m]) -# {} 9.988465674311579e+307 +eval instant at 1m avg_over_time(metric8[1m]) + {} 9.988465674311579e+307 # This overflows float64. -# Unsupported by streaming engine. -# eval instant at 1m sum_over_time(metric8[1m])/count_over_time(metric8[1m]) -# {} Inf +eval instant at 1m sum_over_time(metric8[1m])/count_over_time(metric8[1m]) + {} Inf -# Unsupported by streaming engine. -# eval instant at 1m avg_over_time(metric9[1m]) -# {} -9.988465674311579e+307 +eval instant at 1m avg_over_time(metric9[1m]) + {} -9.988465674311579e+307 # This overflows float64. -# Unsupported by streaming engine. -# eval instant at 1m sum_over_time(metric9[1m])/count_over_time(metric9[1m]) -# {} -Inf +eval instant at 1m sum_over_time(metric9[1m])/count_over_time(metric9[1m]) + {} -Inf -# Unsupported by streaming engine. -# eval instant at 1m avg_over_time(metric10[1m]) -# {} 0 +eval instant at 1m avg_over_time(metric10[1m]) + {} 0 -# Unsupported by streaming engine. -# eval instant at 1m sum_over_time(metric10[1m])/count_over_time(metric10[1m]) -# {} 0 +eval instant at 1m sum_over_time(metric10[1m])/count_over_time(metric10[1m]) + {} 0 # Test if very big intermediate values cause loss of detail. clear load 10s metric 1 1e100 1 -1e100 -# Unsupported by streaming engine. -# eval instant at 1m sum_over_time(metric[1m]) -# {} 2 +eval instant at 1m sum_over_time(metric[1m]) + {} 2 -# Unsupported by streaming engine. -# eval instant at 1m avg_over_time(metric[1m]) -# {} 0.5 +eval instant at 1m avg_over_time(metric[1m]) + {} 0.5 # Tests for stddev_over_time and stdvar_over_time. clear @@ -1122,21 +1092,19 @@ load 10s data{type="some_nan3"} NaN 0 1 data{type="only_nan"} NaN NaN NaN -# Unsupported by streaming engine. -# eval instant at 1m min_over_time(data[1m]) -# {type="numbers"} 0 -# {type="some_nan"} 0 -# {type="some_nan2"} 1 -# {type="some_nan3"} 0 -# {type="only_nan"} NaN - -# Unsupported by streaming engine. -# eval instant at 1m max_over_time(data[1m]) -# {type="numbers"} 3 -# {type="some_nan"} 2 -# {type="some_nan2"} 2 -# {type="some_nan3"} 1 -# {type="only_nan"} NaN +eval instant at 1m min_over_time(data[1m]) + {type="numbers"} 0 + {type="some_nan"} 0 + {type="some_nan2"} 1 + {type="some_nan3"} 0 + {type="only_nan"} NaN + +eval instant at 1m max_over_time(data[1m]) + {type="numbers"} 3 + {type="some_nan"} 2 + {type="some_nan2"} 2 + {type="some_nan3"} 1 + {type="only_nan"} NaN eval instant at 1m last_over_time(data[1m]) data{type="numbers"} 3 diff --git a/pkg/streamingpromql/testdata/upstream/range_queries.test b/pkg/streamingpromql/testdata/upstream/range_queries.test index 0e6658b6012..95fa9075684 100644 --- a/pkg/streamingpromql/testdata/upstream/range_queries.test +++ b/pkg/streamingpromql/testdata/upstream/range_queries.test @@ -7,9 +7,8 @@ load 30s bar 0 1 10 100 1000 -# Unsupported by streaming engine. -# eval range from 0 to 2m step 1m sum_over_time(bar[30s]) -# {} 0 11 1100 +eval range from 0 to 2m step 1m sum_over_time(bar[30s]) + {} 0 11 1100 clear @@ -17,9 +16,8 @@ clear load 30s bar 0 1 10 100 1000 0 0 0 0 -# Unsupported by streaming engine. -# eval range from 0 to 2m step 1m sum_over_time(bar[30s]) -# {} 0 11 1100 +eval range from 0 to 2m step 1m sum_over_time(bar[30s]) + {} 0 11 1100 clear @@ -27,9 +25,8 @@ clear load 30s bar 0 1 10 100 1000 10000 100000 1000000 10000000 -# Unsupported by streaming engine. -# eval range from 0 to 4m step 1m sum_over_time(bar[30s]) -# {} 0 11 1100 110000 11000000 +eval range from 0 to 4m step 1m sum_over_time(bar[30s]) + {} 0 11 1100 110000 11000000 clear @@ -37,9 +34,8 @@ clear load 30s bar 5 17 42 2 7 905 51 -# Unsupported by streaming engine. -# eval range from 0 to 3m step 1m sum_over_time(bar[30s]) -# {} 5 59 9 956 +eval range from 0 to 3m step 1m sum_over_time(bar[30s]) + {} 5 59 9 956 clear