Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mimir query engine: add support for min_over_time, max_over_time, sum_over_time and avg_over_time #8934

Merged
merged 14 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* [CHANGE] Ingester client: experimental support for client-side circuit breakers, their configuration options (`-ingester.client.circuit-breaker.*`) and metrics (`cortex_ingester_client_circuit_breaker_results_total`, `cortex_ingester_client_circuit_breaker_transitions_total`) were removed. #8802
* [CHANGE] Ingester: circuit breakers do not open in case of per-instance limit errors anymore. Opening can be triggered only in case of push and pull requests exceeding the configured duration. #8854
* [CHANGE] Query-frontend: Return `413 Request Entity Too Large` if a response shard for an `/active_series` request is too large. #8861
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934
* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8841 #8842 #8845 #8853 #8886
* What it is:
* When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path.
Expand Down
12 changes: 12 additions & 0 deletions pkg/streamingpromql/benchmarks/benchmarks.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,18 @@ func TestCases(metricSizes []int) []BenchCase {
{
Expr: "rate(nh_X[1h])",
},
{
Expr: "avg_over_time(a_X[1m])",
},
{
Expr: "avg_over_time(nh_X[1m])",
},
{
Expr: "sum_over_time(a_X[1m])",
},
{
Expr: "sum_over_time(nh_X[1m])",
},
//{
// Expr: "absent_over_time(a_X[1d])",
//},
Expand Down
4 changes: 4 additions & 0 deletions pkg/streamingpromql/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@ type FeatureToggles struct {
}

var overTimeFunctionNames = []string{
"avg_over_time",
"count_over_time",
"last_over_time",
"max_over_time",
"min_over_time",
"present_over_time",
"sum_over_time",
}

// EnableAllFeatures enables all features supported by MQE, including experimental or incomplete features.
Expand Down
19 changes: 18 additions & 1 deletion pkg/streamingpromql/engine_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ func TestConcurrentQueries(t *testing.T) {

testCases := []concurrentQueryTestCase{
{
// Rate over native histogram
expr: `rate(native_histogram[5m])`,
start: startT,
end: startT.Add(10 * time.Minute),
Expand All @@ -62,6 +61,24 @@ func TestConcurrentQueries(t *testing.T) {
end: startT.Add(10 * time.Minute),
step: time.Minute,
},
{
expr: `count_over_time(native_histogram[5m])`,
start: startT,
end: startT.Add(10 * time.Minute),
step: time.Minute,
},
{
expr: `sum_over_time(native_histogram[5m])`,
start: startT,
end: startT.Add(10 * time.Minute),
step: time.Minute,
},
{
expr: `avg_over_time(native_histogram[5m])`,
start: startT,
end: startT.Add(10 * time.Minute),
step: time.Minute,
},
}

storage := promqltest.LoadedStorage(t, data)
Expand Down
56 changes: 48 additions & 8 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ func TestUnsupportedPromQLFeatures(t *testing.T) {
"metric{} or other_metric{}": "binary expression with many-to-many matching",
"metric{} + on() group_left() other_metric{}": "binary expression with many-to-one matching",
"metric{} + on() group_right() other_metric{}": "binary expression with one-to-many matching",
"1": "scalar value as top-level expression",
"metric{} offset 2h": "instant vector selector with 'offset'",
"avg(metric{})": "'avg' aggregation",
"rate(metric{}[5m] offset 2h)": "range vector selector with 'offset'",
"rate(metric{}[5m:1m])": "PromQL expression type *parser.SubqueryExpr",
"avg_over_time(metric{}[5m])": "'avg_over_time' function",
"-sum(metric{})": "PromQL expression type *parser.UnaryExpr",
"1": "scalar value as top-level expression",
"metric{} offset 2h": "instant vector selector with 'offset'",
"avg(metric{})": "'avg' aggregation",
"rate(metric{}[5m] offset 2h)": "range vector selector with 'offset'",
"rate(metric{}[5m:1m])": "PromQL expression type *parser.SubqueryExpr",
"quantile_over_time(0.4, metric{}[5m])": "'quantile_over_time' function",
"-sum(metric{})": "PromQL expression type *parser.UnaryExpr",
}

for expression, expectedError := range unsupportedExpressions {
Expand Down Expand Up @@ -1395,7 +1395,7 @@ func TestAnnotations(t *testing.T) {
expectedWarningAnnotations: []string{
`PromQL warning: vector contains a mix of histograms with exponential and custom buckets schemas for metric name "metric" (1:6)`,
},
skipComparisonWithPrometheusReason: "Prometheus' engine panics for this teste case, https://github.com/prometheus/prometheus/pull/14609 will fix this",
skipComparisonWithPrometheusReason: "Prometheus' engine panics for this test case, https://github.com/prometheus/prometheus/pull/14609 will fix this",
},
"rate() over native histograms with incompatible custom buckets": {
data: nativeHistogramsWithCustomBucketsData,
Expand All @@ -1405,6 +1405,46 @@ func TestAnnotations(t *testing.T) {
},
},

"sum_over_time() over series with both floats and histograms": {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit)
It might be worth splitting out the _over_time functions into their own test so we can check the common case of mixed metrics etc iterating over the function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm inclined to leave this as-is for the time being - until prometheus/prometheus#14609 is vendored into Mimir, we need to skip one of the rate test cases that would fall into this group of tests, and I don't think it's so bad that the tests are repeated.

data: `some_metric 10 {{schema:0 sum:1 count:1 buckets:[1]}}`,
expr: `sum_over_time(some_metric[1m])`,
expectedWarningAnnotations: []string{`PromQL warning: encountered a mix of histograms and floats for metric name "some_metric" (1:15)`},
},
"sum_over_time() over native histograms with both exponential and custom buckets": {
data: nativeHistogramsWithCustomBucketsData,
expr: `sum_over_time(metric{series="mixed-exponential-custom-buckets"}[1m])`,
expectedWarningAnnotations: []string{
`PromQL warning: vector contains a mix of histograms with exponential and custom buckets schemas for metric name "metric" (1:15)`,
},
},
"sum_over_time() over native histograms with incompatible custom buckets": {
data: nativeHistogramsWithCustomBucketsData,
expr: `sum_over_time(metric{series="incompatible-custom-buckets"}[1m])`,
expectedWarningAnnotations: []string{
`PromQL warning: vector contains histograms with incompatible custom buckets for metric name "metric" (1:15)`,
},
},

"avg_over_time() over series with both floats and histograms": {
data: `some_metric 10 {{schema:0 sum:1 count:1 buckets:[1]}}`,
expr: `avg_over_time(some_metric[1m])`,
expectedWarningAnnotations: []string{`PromQL warning: encountered a mix of histograms and floats for metric name "some_metric" (1:15)`},
},
"avg_over_time() over native histograms with both exponential and custom buckets": {
data: nativeHistogramsWithCustomBucketsData,
expr: `avg_over_time(metric{series="mixed-exponential-custom-buckets"}[1m])`,
expectedWarningAnnotations: []string{
`PromQL warning: vector contains a mix of histograms with exponential and custom buckets schemas for metric name "metric" (1:15)`,
},
},
"avg_over_time() over native histograms with incompatible custom buckets": {
data: nativeHistogramsWithCustomBucketsData,
expr: `avg_over_time(metric{series="incompatible-custom-buckets"}[1m])`,
expectedWarningAnnotations: []string{
`PromQL warning: vector contains histograms with incompatible custom buckets for metric name "metric" (1:15)`,
},
},

"multiple annotations from different operators": {
data: `
mixed_metric_count 10 {{schema:0 sum:1 count:1 buckets:[1]}}
Expand Down
23 changes: 23 additions & 0 deletions pkg/streamingpromql/floats/kahan.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/promql/engine.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Prometheus Authors

package floats

import "math"

func KahanSumInc(inc, sum, c float64) (newSum, newC float64) {
t := sum + inc
switch {
case math.IsInf(t, 0):
c = 0

// Using Neumaier improvement, swap if next term larger than sum.
case math.Abs(sum) >= math.Abs(inc):
c += (sum - t) + inc
default:
c += (inc - t) + sum
}
return t, c
}
73 changes: 73 additions & 0 deletions pkg/streamingpromql/floats/kahan_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/promql/functions_internal_test.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Prometheus Authors

package floats

import (
"fmt"
"math"
"testing"

"github.com/stretchr/testify/require"
)

func TestKahanSumInc(t *testing.T) {
testCases := map[string]struct {
first float64
second float64
expected float64
}{
"+Inf + anything = +Inf": {
first: math.Inf(1),
second: 2.0,
expected: math.Inf(1),
},
"-Inf + anything = -Inf": {
first: math.Inf(-1),
second: 2.0,
expected: math.Inf(-1),
},
"+Inf + -Inf = NaN": {
first: math.Inf(1),
second: math.Inf(-1),
expected: math.NaN(),
},
"NaN + anything = NaN": {
first: math.NaN(),
second: 2,
expected: math.NaN(),
},
"NaN + Inf = NaN": {
first: math.NaN(),
second: math.Inf(1),
expected: math.NaN(),
},
"NaN + -Inf = NaN": {
first: math.NaN(),
second: math.Inf(-1),
expected: math.NaN(),
},
}

runTest := func(t *testing.T, a, b, expected float64) {
t.Run(fmt.Sprintf("%v + %v = %v", a, b, expected), func(t *testing.T) {
sum, c := KahanSumInc(b, a, 0)
result := sum + c

if math.IsNaN(expected) {
require.Truef(t, math.IsNaN(result), "expected result to be NaN, but got %v (from %v + %v)", result, sum, c)
} else {
require.Equalf(t, expected, result, "expected result to be %v, but got %v (from %v + %v)", expected, result, sum, c)
}
})
}

for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
runTest(t, testCase.first, testCase.second, testCase.expected)
runTest(t, testCase.second, testCase.first, testCase.expected)
})
}
}
4 changes: 4 additions & 0 deletions pkg/streamingpromql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe
"asinh": InstantVectorTransformationFunctionOperatorFactory("asinh", functions.Asinh),
"atan": InstantVectorTransformationFunctionOperatorFactory("atan", functions.Atan),
"atanh": InstantVectorTransformationFunctionOperatorFactory("atanh", functions.Atanh),
"avg_over_time": FunctionOverRangeVectorOperatorFactory("avg_over_time", functions.AvgOverTime),
"ceil": InstantVectorTransformationFunctionOperatorFactory("ceil", functions.Ceil),
"cos": InstantVectorTransformationFunctionOperatorFactory("cos", functions.Cos),
"cosh": InstantVectorTransformationFunctionOperatorFactory("cosh", functions.Cosh),
Expand All @@ -109,6 +110,8 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe
"histogram_count": InstantVectorTransformationFunctionOperatorFactory("histogram_count", functions.HistogramCount),
"histogram_sum": InstantVectorTransformationFunctionOperatorFactory("histogram_sum", functions.HistogramSum),
"last_over_time": FunctionOverRangeVectorOperatorFactory("last_over_time", functions.LastOverTime),
"max_over_time": FunctionOverRangeVectorOperatorFactory("max_over_time", functions.MaxOverTime),
"min_over_time": FunctionOverRangeVectorOperatorFactory("min_over_time", functions.MinOverTime),
"ln": InstantVectorTransformationFunctionOperatorFactory("ln", functions.Ln),
"log10": InstantVectorTransformationFunctionOperatorFactory("log10", functions.Log10),
"log2": InstantVectorTransformationFunctionOperatorFactory("log2", functions.Log2),
Expand All @@ -119,6 +122,7 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe
"sin": InstantVectorTransformationFunctionOperatorFactory("sin", functions.Sin),
"sinh": InstantVectorTransformationFunctionOperatorFactory("sinh", functions.Sinh),
"sqrt": InstantVectorTransformationFunctionOperatorFactory("sqrt", functions.Sqrt),
"sum_over_time": FunctionOverRangeVectorOperatorFactory("sum_over_time", functions.SumOverTime),
"tan": InstantVectorTransformationFunctionOperatorFactory("tan", functions.Tan),
"tanh": InstantVectorTransformationFunctionOperatorFactory("tanh", functions.Tanh),
}
Expand Down
Loading
Loading