Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQE: Use kahan in sum aggregation #8923

Merged
merged 3 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* [CHANGE] Ingester client: experimental support for client-side circuit breakers, their configuration options (`-ingester.client.circuit-breaker.*`) and metrics (`cortex_ingester_client_circuit_breaker_results_total`, `cortex_ingester_client_circuit_breaker_transitions_total`) were removed. #8802
* [CHANGE] Ingester: circuit breakers do not open in case of per-instance limit errors anymore. Opening can be triggered only in case of push and pull requests exceeding the configured duration. #8854
* [CHANGE] Query-frontend: Return `413 Request Entity Too Large` if a response shard for an `/active_series` request is too large. #8861
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923
* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8841 #8842 #8845 #8853
* What it is:
* When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path.
Expand Down
24 changes: 12 additions & 12 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -865,39 +865,39 @@ func TestMemoryConsumptionLimit_SingleQueries(t *testing.T) {

// Each series has five samples, which will be rounded up to 8 (the nearest power of two) by the bucketed pool.
// At peak we'll hold in memory:
// - the running total for the sum() (a float and a bool at each step, with the number of steps rounded to the nearest power of 2),
// - the running total for the sum() (two floats (due to kahan) and a bool at each step, with the number of steps rounded to the nearest power of 2),
// - and the next series from the selector.
rangeQueryExpectedPeak: 8*(types.Float64Size+types.BoolSize) + 8*types.FPointSize,
rangeQueryLimit: 8*(types.Float64Size+types.BoolSize) + 8*types.FPointSize,
rangeQueryExpectedPeak: 8*(2*types.Float64Size+types.BoolSize) + 8*types.FPointSize,
rangeQueryLimit: 8*(2*types.Float64Size+types.BoolSize) + 8*types.FPointSize,

// Each series has one sample, which is already a power of two.
// At peak we'll hold in memory:
// - the running total for the sum() (a float and a bool),
// - the running total for the sum() (two floats and a bool),
// - the next series from the selector,
// - and the output sample.
instantQueryExpectedPeak: types.Float64Size + types.BoolSize + types.FPointSize + types.VectorSampleSize,
instantQueryLimit: types.Float64Size + types.BoolSize + types.FPointSize + types.VectorSampleSize,
instantQueryExpectedPeak: 2*types.Float64Size + types.BoolSize + types.FPointSize + types.VectorSampleSize,
instantQueryLimit: 2*types.Float64Size + types.BoolSize + types.FPointSize + types.VectorSampleSize,
},
"limit enabled, query selects more samples than limit but should not load all of them into memory at once, and peak consumption is over limit": {
expr: "sum(some_metric)",
shouldSucceed: false,

// Each series has five samples, which will be rounded up to 8 (the nearest power of two) by the bucketed pool.
// At peak we'll hold in memory:
// - the running total for the sum() (a float and a bool at each step, with the number of steps rounded to the nearest power of 2),
// - the running total for the sum() (two floats (due to kahan) and a bool at each step, with the number of steps rounded to the nearest power of 2),
// - and the next series from the selector.
// The last thing to be allocated is the bool slice for the running total, so that won't contribute to the peak before the query is aborted.
rangeQueryExpectedPeak: 8*types.Float64Size + 8*types.FPointSize,
rangeQueryLimit: 8*(types.Float64Size+types.BoolSize) + 8*types.FPointSize - 1,
rangeQueryExpectedPeak: 8*2*types.Float64Size + 8*types.FPointSize,
rangeQueryLimit: 8*(2*types.Float64Size+types.BoolSize) + 8*types.FPointSize - 1,

// Each series has one sample, which is already a power of two.
// At peak we'll hold in memory:
// - the running total for the sum() (a float and a bool),
// - the running total for the sum() (two floats and a bool),
// - the next series from the selector,
// - and the output sample.
// The last thing to be allocated is the bool slice for the running total, so that won't contribute to the peak before the query is aborted.
instantQueryExpectedPeak: types.Float64Size + types.FPointSize + types.VectorSampleSize,
instantQueryLimit: types.Float64Size + types.BoolSize + types.FPointSize + types.VectorSampleSize - 1,
instantQueryExpectedPeak: 2*types.Float64Size + types.FPointSize + types.VectorSampleSize,
instantQueryLimit: 2*types.Float64Size + types.BoolSize + types.FPointSize + types.VectorSampleSize - 1,
},
"histogram: limit enabled, but query does not exceed limit": {
expr: "sum(some_histogram)",
Expand Down
32 changes: 30 additions & 2 deletions pkg/streamingpromql/operators/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"errors"
"fmt"
"math"
"slices"
"sort"
"time"
Expand Down Expand Up @@ -96,6 +97,7 @@ type group struct {

// Sum, presence, and histograms for each step.
floatSums []float64
floatMeans []float64 // Mean, or "compensating value" for Kahan summation.
jhesketh marked this conversation as resolved.
Show resolved Hide resolved
floatPresent []bool
histogramSums []*histogram.FloatHistogram
histogramPointCount int
Expand Down Expand Up @@ -297,7 +299,8 @@ func (a *Aggregation) constructSeriesData(thisGroup *group, start int64, interva
for i, havePoint := range thisGroup.floatPresent {
if havePoint {
t := start + int64(i)*interval
floatPoints = append(floatPoints, promql.FPoint{T: t, F: thisGroup.floatSums[i]})
f := thisGroup.floatSums[i] + thisGroup.floatMeans[i]
floatPoints = append(floatPoints, promql.FPoint{T: t, F: f})
}
}
}
Expand All @@ -318,9 +321,11 @@ func (a *Aggregation) constructSeriesData(thisGroup *group, start int64, interva
}

types.Float64SlicePool.Put(thisGroup.floatSums, a.MemoryConsumptionTracker)
types.Float64SlicePool.Put(thisGroup.floatMeans, a.MemoryConsumptionTracker)
types.BoolSlicePool.Put(thisGroup.floatPresent, a.MemoryConsumptionTracker)
types.HistogramSlicePool.Put(thisGroup.histogramSums, a.MemoryConsumptionTracker)
thisGroup.floatSums = nil
thisGroup.floatMeans = nil
thisGroup.floatPresent = nil
thisGroup.histogramSums = nil
thisGroup.histogramPointCount = 0
Expand Down Expand Up @@ -383,11 +388,17 @@ func (a *Aggregation) accumulateSeriesIntoGroup(s types.InstantVectorSeriesData,
return err
}

seriesGroup.floatMeans, err = types.Float64SlicePool.Get(steps, a.MemoryConsumptionTracker)
if err != nil {
return err
}

seriesGroup.floatPresent, err = types.BoolSlicePool.Get(steps, a.MemoryConsumptionTracker)
if err != nil {
return err
}
seriesGroup.floatSums = seriesGroup.floatSums[:steps]
seriesGroup.floatMeans = seriesGroup.floatMeans[:steps]
seriesGroup.floatPresent = seriesGroup.floatPresent[:steps]
}

Expand All @@ -402,7 +413,7 @@ func (a *Aggregation) accumulateSeriesIntoGroup(s types.InstantVectorSeriesData,

for _, p := range s.Floats {
idx := (p.T - start) / interval
seriesGroup.floatSums[idx] += p.F
seriesGroup.floatSums[idx], seriesGroup.floatMeans[idx] = kahanSumInc(p.F, seriesGroup.floatSums[idx], seriesGroup.floatMeans[idx])
seriesGroup.floatPresent[idx] = true
}

Expand Down Expand Up @@ -456,3 +467,20 @@ func (g groupSorter) Swap(i, j int) {
g.metadata[i], g.metadata[j] = g.metadata[j], g.metadata[i]
g.groups[i], g.groups[j] = g.groups[j], g.groups[i]
}

// TODO(jhesketh): This will likely be useful elsewhere, so we may move this in the future.
// (We could also consider exporting this from the upstream promql package).
func kahanSumInc(inc, sum, c float64) (newSum, newC float64) {
t := sum + inc
switch {
case math.IsInf(t, 0):
c = 0

// Using Neumaier improvement, swap if next term larger than sum.
case math.Abs(sum) >= math.Abs(inc):
c += (sum - t) + inc
default:
c += (inc - t) + sum
}
return t, c
}
63 changes: 63 additions & 0 deletions pkg/streamingpromql/operators/aggregation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ package operators

import (
"context"
"fmt"
"math"
"testing"
"time"

Expand Down Expand Up @@ -262,3 +264,64 @@ func labelsToSeriesMetadata(lbls []labels.Labels) []types.SeriesMetadata {

return m
}

// This test is copied from promql/functions_internal_test.go.
// If kahanSumInc can be exported we can remove this.
func TestKahanSumInc(t *testing.T) {
testCases := map[string]struct {
first float64
second float64
expected float64
}{
"+Inf + anything = +Inf": {
first: math.Inf(1),
second: 2.0,
expected: math.Inf(1),
},
"-Inf + anything = -Inf": {
first: math.Inf(-1),
second: 2.0,
expected: math.Inf(-1),
},
"+Inf + -Inf = NaN": {
first: math.Inf(1),
second: math.Inf(-1),
expected: math.NaN(),
},
"NaN + anything = NaN": {
first: math.NaN(),
second: 2,
expected: math.NaN(),
},
"NaN + Inf = NaN": {
first: math.NaN(),
second: math.Inf(1),
expected: math.NaN(),
},
"NaN + -Inf = NaN": {
first: math.NaN(),
second: math.Inf(-1),
expected: math.NaN(),
},
}

runTest := func(t *testing.T, a, b, expected float64) {
t.Run(fmt.Sprintf("%v + %v = %v", a, b, expected), func(t *testing.T) {
sum, c := kahanSumInc(b, a, 0)
result := sum + c

if math.IsNaN(expected) {
require.Truef(t, math.IsNaN(result), "expected result to be NaN, but got %v (from %v + %v)", result, sum, c)
} else {
require.Equalf(t, expected, result, "expected result to be %v, but got %v (from %v + %v)", expected, result, sum, c)
}
})
}

for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
runTest(t, testCase.first, testCase.second, testCase.expected)
runTest(t, testCase.second, testCase.first, testCase.expected)
})
}
}
4 changes: 2 additions & 2 deletions pkg/streamingpromql/testdata/upstream/aggregators.test
Original file line number Diff line number Diff line change
Expand Up @@ -602,8 +602,8 @@ load 10s
data{test="nan",group="2",point="b"} NaN

# Unsupported by streaming engine.
jhesketh marked this conversation as resolved.
Show resolved Hide resolved
jhesketh marked this conversation as resolved.
Show resolved Hide resolved
# eval instant at 1m sum(data{test="ten"})
# {} 10
eval instant at 1m sum(data{test="ten"})
{} 10

eval instant at 1m sum by (group) (data{test="pos_inf"})
{group="1"} Inf
Expand Down
Loading