Skip to content

Commit

Permalink
Add support for avg_over_time
Browse files Browse the repository at this point in the history
  • Loading branch information
charleskorn committed Aug 8, 2024
1 parent f40c3a8 commit 33fef8f
Show file tree
Hide file tree
Showing 6 changed files with 193 additions and 55 deletions.
1 change: 1 addition & 0 deletions pkg/streamingpromql/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type FeatureToggles struct {
}

var overTimeFunctionNames = []string{
"avg_over_time",
"count_over_time",
"last_over_time",
"max_over_time",
Expand Down
34 changes: 27 additions & 7 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ func TestUnsupportedPromQLFeatures(t *testing.T) {
"metric{} or other_metric{}": "binary expression with many-to-many matching",
"metric{} + on() group_left() other_metric{}": "binary expression with many-to-one matching",
"metric{} + on() group_right() other_metric{}": "binary expression with one-to-many matching",
"1": "scalar value as top-level expression",
"metric{} offset 2h": "instant vector selector with 'offset'",
"avg(metric{})": "'avg' aggregation",
"rate(metric{}[5m] offset 2h)": "range vector selector with 'offset'",
"rate(metric{}[5m:1m])": "PromQL expression type *parser.SubqueryExpr",
"avg_over_time(metric{}[5m])": "'avg_over_time' function",
"-sum(metric{})": "PromQL expression type *parser.UnaryExpr",
"1": "scalar value as top-level expression",
"metric{} offset 2h": "instant vector selector with 'offset'",
"avg(metric{})": "'avg' aggregation",
"rate(metric{}[5m] offset 2h)": "range vector selector with 'offset'",
"rate(metric{}[5m:1m])": "PromQL expression type *parser.SubqueryExpr",
"quantile_over_time(0.4, metric{}[5m])": "'quantile_over_time' function",
"-sum(metric{})": "PromQL expression type *parser.UnaryExpr",
}

for expression, expectedError := range unsupportedExpressions {
Expand Down Expand Up @@ -1425,6 +1425,26 @@ func TestAnnotations(t *testing.T) {
},
},

"avg_over_time() over series with both floats and histograms": {
data: `some_metric 10 {{schema:0 sum:1 count:1 buckets:[1]}}`,
expr: `avg_over_time(some_metric[1m])`,
expectedWarningAnnotations: []string{`PromQL warning: encountered a mix of histograms and floats for metric name "some_metric" (1:15)`},
},
"avg_over_time() over native histograms with both exponential and custom buckets": {
data: nativeHistogramsWithCustomBucketsData,
expr: `avg_over_time(metric{series="mixed-exponential-custom-buckets"}[1m])`,
expectedWarningAnnotations: []string{
`PromQL warning: vector contains a mix of histograms with exponential and custom buckets schemas for metric name "metric" (1:15)`,
},
},
"avg_over_time() over native histograms with incompatible custom buckets": {
data: nativeHistogramsWithCustomBucketsData,
expr: `avg_over_time(metric{series="incompatible-custom-buckets"}[1m])`,
expectedWarningAnnotations: []string{
`PromQL warning: vector contains histograms with incompatible custom buckets for metric name "metric" (1:15)`,
},
},

"multiple annotations from different operators": {
data: `
mixed_metric_count 10 {{schema:0 sum:1 count:1 buckets:[1]}}
Expand Down
1 change: 1 addition & 0 deletions pkg/streamingpromql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ var instantVectorFunctionOperatorFactories = map[string]InstantVectorFunctionOpe
"asinh": InstantVectorTransformationFunctionOperatorFactory("asinh", functions.Asinh),
"atan": InstantVectorTransformationFunctionOperatorFactory("atan", functions.Atan),
"atanh": InstantVectorTransformationFunctionOperatorFactory("atanh", functions.Atanh),
"avg_over_time": FunctionOverRangeVectorOperatorFactory("avg_over_time", functions.AvgOverTime),
"ceil": InstantVectorTransformationFunctionOperatorFactory("ceil", functions.Ceil),
"cos": InstantVectorTransformationFunctionOperatorFactory("cos", functions.Cos),
"cosh": InstantVectorTransformationFunctionOperatorFactory("cosh", functions.Cosh),
Expand Down
115 changes: 115 additions & 0 deletions pkg/streamingpromql/functions/range_vectors.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/promql/functions.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Prometheus Authors

package functions

Expand Down Expand Up @@ -211,3 +214,115 @@ func sumHistograms(head, tail []promql.HPoint, emitAnnotation EmitAnnotationFunc

return sum, nil
}

var AvgOverTime = FunctionOverRangeVector{
SeriesMetadataFunc: DropSeriesName,
StepFunc: avgOverTime,
NeedsSeriesNamesForAnnotations: true,
}

func avgOverTime(step types.RangeVectorStepData, _ float64, fPoints *types.FPointRingBuffer, hPoints *types.HPointRingBuffer, emitAnnotation EmitAnnotationFunc) (float64, bool, *histogram.FloatHistogram, error) {
fHead, fTail := fPoints.UnsafePoints(step.RangeEnd)
hHead, hTail := hPoints.UnsafePoints(step.RangeEnd)

haveFloats := len(fHead) > 0 || len(fTail) > 0
haveHistograms := len(hHead) > 0 || len(hTail) > 0

if !haveFloats && !haveHistograms {
return 0, false, nil, nil
}

if haveFloats && haveHistograms {
emitAnnotation(annotations.NewMixedFloatsHistogramsWarning)
return 0, false, nil, nil
}

if haveFloats {
return avgFloats(fHead, fTail), true, nil, nil
}

h, err := avgHistograms(hHead, hTail)

if err != nil {
err = NativeHistogramErrorToAnnotation(err, emitAnnotation)
}

return 0, false, h, err
}

func avgFloats(head, tail []promql.FPoint) float64 {
avgSoFar, c, count := 0.0, 0.0, 0.0

accumulate := func(points []promql.FPoint) {
for _, p := range points {
count++

if math.IsInf(avgSoFar, 0) {
if math.IsInf(p.F, 0) && (avgSoFar > 0) == (p.F > 0) {
// Running average is infinite and the next point is also the same infinite.
// We already have the correct running value, so just continue.
continue
}
if !math.IsInf(p.F, 0) && !math.IsNaN(p.F) {
// Running average is infinite, and the next point is neither infinite nor NaN.
// The running average will still be infinite after considering this point, so just continue
// to avoid incorrectly introducing NaN below.
continue
}
}

avgSoFar, c = floats.KahanSumInc(p.F/count-avgSoFar/count, avgSoFar, c)
}
}

accumulate(head)
accumulate(tail)

return avgSoFar + c
}

func avgHistograms(head, tail []promql.HPoint) (*histogram.FloatHistogram, error) {
var avgSoFar *histogram.FloatHistogram
count := 1.0

if len(head) > 0 {
avgSoFar = head[0].H
head = head[1:]
} else {
avgSoFar = tail[0].H
tail = tail[1:]
}

// We must make a copy of the histogram, as the ring buffer may reuse the FloatHistogram instance on subsequent steps.
avgSoFar = avgSoFar.Copy()

accumulate := func(points []promql.HPoint) error {
for _, p := range points {
count++
contributionByP := p.H.Copy().Div(count)
contributionByAvgSoFar := avgSoFar.Copy().Div(count)

change, err := contributionByP.Sub(contributionByAvgSoFar)
if err != nil {
return err
}

avgSoFar, err = avgSoFar.Add(change)
if err != nil {
return err
}
}

return nil
}

if err := accumulate(head); err != nil {
return nil, err
}

if err := accumulate(tail); err != nil {
return nil, err
}

return avgSoFar, nil
}
27 changes: 21 additions & 6 deletions pkg/streamingpromql/testdata/ours/functions.test
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ eval range from 0 to 6m step 1m floor(some_metric)
clear

load 1m
some_metric{foo="bar"} 0 1 2 3 _ _ {{schema:3 sum:4 count:4 buckets:[1 2 1]}} {{schema:3 sum:4 count:4 buckets:[1 2 1]}}
some_metric{foo="bar"} 0 1 2 3 _ _ {{schema:3 sum:4 count:4 buckets:[1 2 1]}} {{schema:3 sum:5 count:3 buckets:[2 5 4]}}
some_nhcb_metric{baz="bar"} {{schema:-53 sum:1 count:5 custom_values:[5 10] buckets:[1 4]}} {{schema:-53 sum:15 count:2 custom_values:[5 10] buckets:[0 2]}} {{schema:-53 sum:3 count:15 custom_values:[5 10] buckets:[7 8]}}

eval range from 0 to 7m step 1m count_over_time(some_metric[3m])
Expand All @@ -77,10 +77,10 @@ eval range from 0 to 7m step 1m count_over_time(some_metric[5s])
{foo="bar"} 1 1 1 1 _ _ 1 1

eval range from 0 to 7m step 1m last_over_time(some_metric[3m])
some_metric{foo="bar"} 0 1 2 3 3 3 {{schema:3 sum:4 count:4 buckets:[1 2 1]}} {{schema:3 sum:4 count:4 buckets:[1 2 1]}}
some_metric{foo="bar"} 0 1 2 3 3 3 {{schema:3 sum:4 count:4 buckets:[1 2 1]}} {{schema:3 sum:5 count:3 buckets:[2 5 4]}}

eval range from 0 to 7m step 1m last_over_time(some_metric[5s])
some_metric{foo="bar"} 0 1 2 3 _ _ {{schema:3 sum:4 count:4 buckets:[1 2 1]}} {{schema:3 sum:4 count:4 buckets:[1 2 1]}}
some_metric{foo="bar"} 0 1 2 3 _ _ {{schema:3 sum:4 count:4 buckets:[1 2 1]}} {{schema:3 sum:5 count:3 buckets:[2 5 4]}}

eval range from 0 to 7m step 1m present_over_time(some_metric[3m])
{foo="bar"} 1 1 1 1 1 1 1 1
Expand All @@ -101,16 +101,31 @@ eval range from 0 to 7m step 1m max_over_time(some_metric[5s])
{foo="bar"} 0 1 2 3 _ _ _ _

eval_warn range from 0 to 10m step 1m sum_over_time(some_metric[3m])
{foo="bar"} 0 1 3 6 6 5 _ {{schema:3 sum:8 count:8 buckets:[2 4 2]}} {{schema:3 sum:8 count:8 buckets:[2 4 2]}} {{schema:3 sum:8 count:8 buckets:[2 4 2]}} {{schema:3 sum:4 count:4 buckets:[1 2 1]}}
{foo="bar"} 0 1 3 6 6 5 _ {{schema:3 sum:9 count:7 buckets:[3 7 5]}} {{schema:3 sum:9 count:7 buckets:[3 7 5]}} {{schema:3 sum:9 count:7 buckets:[3 7 5]}} {{schema:3 sum:5 count:3 buckets:[2 5 4]}}

eval range from 0 to 5m step 1m sum_over_time(some_metric[3m])
{foo="bar"} 0 1 3 6 6 5

eval range from 7m to 10m step 1m sum_over_time(some_metric[3m])
{foo="bar"} {{schema:3 sum:8 count:8 buckets:[2 4 2]}} {{schema:3 sum:8 count:8 buckets:[2 4 2]}} {{schema:3 sum:8 count:8 buckets:[2 4 2]}} {{schema:3 sum:4 count:4 buckets:[1 2 1]}}
{foo="bar"} {{schema:3 sum:9 count:7 buckets:[3 7 5]}} {{schema:3 sum:9 count:7 buckets:[3 7 5]}} {{schema:3 sum:9 count:7 buckets:[3 7 5]}} {{schema:3 sum:5 count:3 buckets:[2 5 4]}}

eval range from 0 to 7m step 1m sum_over_time(some_metric[5s])
{foo="bar"} 0 1 2 3 _ _ {{schema:3 sum:4 count:4 buckets:[1 2 1]}} {{schema:3 sum:4 count:4 buckets:[1 2 1]}}
{foo="bar"} 0 1 2 3 _ _ {{schema:3 sum:4 count:4 buckets:[1 2 1]}} {{schema:3 sum:5 count:3 buckets:[2 5 4]}}

eval range from 0 to 2m step 1m sum_over_time(some_nhcb_metric[3m])
{baz="bar"} {{schema:-53 sum:1 count:5 custom_values:[5 10] buckets:[1 4]}} {{schema:-53 sum:16 count:7 custom_values:[5 10] buckets:[1 6]}} {{schema:-53 sum:19 count:22 custom_values:[5 10] buckets:[8 14]}}

eval_warn range from 0 to 10m step 1m avg_over_time(some_metric[3m])
{foo="bar"} 0 0.5 1 1.5 2 2.5 _ {{schema:3 sum:4.5 count:3.5 buckets:[1.5 3.5 2.5]}} {{schema:3 sum:4.5 count:3.5 buckets:[1.5 3.5 2.5]}} {{schema:3 sum:4.5 count:3.5 buckets:[1.5 3.5 2.5]}} {{schema:3 sum:5 count:3 buckets:[2 5 4]}}

eval range from 0 to 5m step 1m avg_over_time(some_metric[3m])
{foo="bar"} 0 0.5 1 1.5 2 2.5

eval range from 7m to 10m step 1m avg_over_time(some_metric[3m])
{foo="bar"} {{schema:3 sum:4.5 count:3.5 buckets:[1.5 3.5 2.5]}} {{schema:3 sum:4.5 count:3.5 buckets:[1.5 3.5 2.5]}} {{schema:3 sum:4.5 count:3.5 buckets:[1.5 3.5 2.5]}} {{schema:3 sum:5 count:3 buckets:[2 5 4]}}

eval range from 0 to 7m step 1m avg_over_time(some_metric[5s])
{foo="bar"} 0 1 2 3 _ _ {{schema:3 sum:4 count:4 buckets:[1 2 1]}} {{schema:3 sum:5 count:3 buckets:[2 5 4]}}

eval range from 0 to 2m step 1m avg_over_time(some_nhcb_metric[3m])
{baz="bar"} {{schema:-53 sum:1 count:5 custom_values:[5 10] buckets:[1 4]}} {{schema:-53 sum:8 count:3.5 custom_values:[5 10] buckets:[0.5 3]}} {{schema:-53 sum:6.333333333333334 count:7.333333333333333 custom_values:[5 10] buckets:[2.666666666666667 4.666666666666666]}}
70 changes: 28 additions & 42 deletions pkg/streamingpromql/testdata/upstream/functions.test
Original file line number Diff line number Diff line change
Expand Up @@ -772,103 +772,89 @@ load 10s
metric9 -9.988465674311579e+307 -9.988465674311579e+307 -9.988465674311579e+307
metric10 -9.988465674311579e+307 9.988465674311579e+307

# Unsupported by streaming engine.
# eval instant at 1m avg_over_time(metric[1m])
# {} 3
eval instant at 1m avg_over_time(metric[1m])
{} 3

eval instant at 1m sum_over_time(metric[1m])/count_over_time(metric[1m])
{} 3

# Unsupported by streaming engine.
# eval instant at 1m avg_over_time(metric2[1m])
# {} Inf
eval instant at 1m avg_over_time(metric2[1m])
{} Inf

eval instant at 1m sum_over_time(metric2[1m])/count_over_time(metric2[1m])
{} Inf

# Unsupported by streaming engine.
# eval instant at 1m avg_over_time(metric3[1m])
# {} -Inf
eval instant at 1m avg_over_time(metric3[1m])
{} -Inf

eval instant at 1m sum_over_time(metric3[1m])/count_over_time(metric3[1m])
{} -Inf

# Unsupported by streaming engine.
# eval instant at 1m avg_over_time(metric4[1m])
# {} NaN
eval instant at 1m avg_over_time(metric4[1m])
{} NaN

eval instant at 1m sum_over_time(metric4[1m])/count_over_time(metric4[1m])
{} NaN

# Unsupported by streaming engine.
# eval instant at 1m avg_over_time(metric5[1m])
# {} Inf
eval instant at 1m avg_over_time(metric5[1m])
{} Inf

eval instant at 1m sum_over_time(metric5[1m])/count_over_time(metric5[1m])
{} Inf

# Unsupported by streaming engine.
# eval instant at 1m avg_over_time(metric5b[1m])
# {} Inf
eval instant at 1m avg_over_time(metric5b[1m])
{} Inf

eval instant at 1m sum_over_time(metric5b[1m])/count_over_time(metric5b[1m])
{} Inf

# Unsupported by streaming engine.
# eval instant at 1m avg_over_time(metric5c[1m])
# {} NaN
eval instant at 1m avg_over_time(metric5c[1m])
{} NaN

eval instant at 1m sum_over_time(metric5c[1m])/count_over_time(metric5c[1m])
{} NaN

# Unsupported by streaming engine.
# eval instant at 1m avg_over_time(metric6[1m])
# {} -Inf
eval instant at 1m avg_over_time(metric6[1m])
{} -Inf

eval instant at 1m sum_over_time(metric6[1m])/count_over_time(metric6[1m])
{} -Inf

# Unsupported by streaming engine.
# eval instant at 1m avg_over_time(metric6b[1m])
# {} -Inf
eval instant at 1m avg_over_time(metric6b[1m])
{} -Inf

eval instant at 1m sum_over_time(metric6b[1m])/count_over_time(metric6b[1m])
{} -Inf

# Unsupported by streaming engine.
# eval instant at 1m avg_over_time(metric6c[1m])
# {} NaN
eval instant at 1m avg_over_time(metric6c[1m])
{} NaN

eval instant at 1m sum_over_time(metric6c[1m])/count_over_time(metric6c[1m])
{} NaN


# Unsupported by streaming engine.
# eval instant at 1m avg_over_time(metric7[1m])
# {} NaN
eval instant at 1m avg_over_time(metric7[1m])
{} NaN

eval instant at 1m sum_over_time(metric7[1m])/count_over_time(metric7[1m])
{} NaN

# Unsupported by streaming engine.
# eval instant at 1m avg_over_time(metric8[1m])
# {} 9.988465674311579e+307
eval instant at 1m avg_over_time(metric8[1m])
{} 9.988465674311579e+307

# This overflows float64.
eval instant at 1m sum_over_time(metric8[1m])/count_over_time(metric8[1m])
{} Inf

# Unsupported by streaming engine.
# eval instant at 1m avg_over_time(metric9[1m])
# {} -9.988465674311579e+307
eval instant at 1m avg_over_time(metric9[1m])
{} -9.988465674311579e+307

# This overflows float64.
eval instant at 1m sum_over_time(metric9[1m])/count_over_time(metric9[1m])
{} -Inf

# Unsupported by streaming engine.
# eval instant at 1m avg_over_time(metric10[1m])
# {} 0
eval instant at 1m avg_over_time(metric10[1m])
{} 0

eval instant at 1m sum_over_time(metric10[1m])/count_over_time(metric10[1m])
{} 0
Expand Down

0 comments on commit 33fef8f

Please sign in to comment.