Skip to content

Commit

Permalink
MQE: Use kahan in sum aggregation (#8923)
Browse files Browse the repository at this point in the history
* MQE: Use kahan in sum aggregation

Implements the improvements from
prometheus/prometheus#14074
prometheus/prometheus#14362

* Update CHANGELOG
  • Loading branch information
jhesketh authored Aug 7, 2024
1 parent 996150a commit f0e7d90
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 22 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* [CHANGE] Ingester client: experimental support for client-side circuit breakers, their configuration options (`-ingester.client.circuit-breaker.*`) and metrics (`cortex_ingester_client_circuit_breaker_results_total`, `cortex_ingester_client_circuit_breaker_transitions_total`) were removed. #8802
* [CHANGE] Ingester: circuit breakers do not open in case of per-instance limit errors anymore. Opening can be triggered only in case of push and pull requests exceeding the configured duration. #8854
* [CHANGE] Query-frontend: Return `413 Request Entity Too Large` if a response shard for an `/active_series` request is too large. #8861
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923
* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8841 #8842 #8845 #8853
* What it is:
* When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path.
Expand Down
24 changes: 12 additions & 12 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -865,39 +865,39 @@ func TestMemoryConsumptionLimit_SingleQueries(t *testing.T) {

// Each series has five samples, which will be rounded up to 8 (the nearest power of two) by the bucketed pool.
// At peak we'll hold in memory:
// - the running total for the sum() (a float and a bool at each step, with the number of steps rounded to the nearest power of 2),
// - the running total for the sum() (two floats (due to kahan) and a bool at each step, with the number of steps rounded to the nearest power of 2),
// - and the next series from the selector.
rangeQueryExpectedPeak: 8*(types.Float64Size+types.BoolSize) + 8*types.FPointSize,
rangeQueryLimit: 8*(types.Float64Size+types.BoolSize) + 8*types.FPointSize,
rangeQueryExpectedPeak: 8*(2*types.Float64Size+types.BoolSize) + 8*types.FPointSize,
rangeQueryLimit: 8*(2*types.Float64Size+types.BoolSize) + 8*types.FPointSize,

// Each series has one sample, which is already a power of two.
// At peak we'll hold in memory:
// - the running total for the sum() (a float and a bool),
// - the running total for the sum() (two floats and a bool),
// - the next series from the selector,
// - and the output sample.
instantQueryExpectedPeak: types.Float64Size + types.BoolSize + types.FPointSize + types.VectorSampleSize,
instantQueryLimit: types.Float64Size + types.BoolSize + types.FPointSize + types.VectorSampleSize,
instantQueryExpectedPeak: 2*types.Float64Size + types.BoolSize + types.FPointSize + types.VectorSampleSize,
instantQueryLimit: 2*types.Float64Size + types.BoolSize + types.FPointSize + types.VectorSampleSize,
},
"limit enabled, query selects more samples than limit but should not load all of them into memory at once, and peak consumption is over limit": {
expr: "sum(some_metric)",
shouldSucceed: false,

// Each series has five samples, which will be rounded up to 8 (the nearest power of two) by the bucketed pool.
// At peak we'll hold in memory:
// - the running total for the sum() (a float and a bool at each step, with the number of steps rounded to the nearest power of 2),
// - the running total for the sum() (two floats (due to kahan) and a bool at each step, with the number of steps rounded to the nearest power of 2),
// - and the next series from the selector.
// The last thing to be allocated is the bool slice for the running total, so that won't contribute to the peak before the query is aborted.
rangeQueryExpectedPeak: 8*types.Float64Size + 8*types.FPointSize,
rangeQueryLimit: 8*(types.Float64Size+types.BoolSize) + 8*types.FPointSize - 1,
rangeQueryExpectedPeak: 8*2*types.Float64Size + 8*types.FPointSize,
rangeQueryLimit: 8*(2*types.Float64Size+types.BoolSize) + 8*types.FPointSize - 1,

// Each series has one sample, which is already a power of two.
// At peak we'll hold in memory:
// - the running total for the sum() (a float and a bool),
// - the running total for the sum() (two floats and a bool),
// - the next series from the selector,
// - and the output sample.
// The last thing to be allocated is the bool slice for the running total, so that won't contribute to the peak before the query is aborted.
instantQueryExpectedPeak: types.Float64Size + types.FPointSize + types.VectorSampleSize,
instantQueryLimit: types.Float64Size + types.BoolSize + types.FPointSize + types.VectorSampleSize - 1,
instantQueryExpectedPeak: 2*types.Float64Size + types.FPointSize + types.VectorSampleSize,
instantQueryLimit: 2*types.Float64Size + types.BoolSize + types.FPointSize + types.VectorSampleSize - 1,
},
"histogram: limit enabled, but query does not exceed limit": {
expr: "sum(some_histogram)",
Expand Down
40 changes: 34 additions & 6 deletions pkg/streamingpromql/operators/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"errors"
"fmt"
"math"
"slices"
"sort"
"time"
Expand Down Expand Up @@ -95,10 +96,11 @@ type group struct {
lastSeriesIndex int

// Sum, presence, and histograms for each step.
floatSums []float64
floatPresent []bool
histogramSums []*histogram.FloatHistogram
histogramPointCount int
floatSums []float64
floatCompensatingMeans []float64 // Mean, or "compensating value" for Kahan summation.
floatPresent []bool
histogramSums []*histogram.FloatHistogram
histogramPointCount int
}

var _ types.InstantVectorOperator = &Aggregation{}
Expand Down Expand Up @@ -297,7 +299,8 @@ func (a *Aggregation) constructSeriesData(thisGroup *group, start int64, interva
for i, havePoint := range thisGroup.floatPresent {
if havePoint {
t := start + int64(i)*interval
floatPoints = append(floatPoints, promql.FPoint{T: t, F: thisGroup.floatSums[i]})
f := thisGroup.floatSums[i] + thisGroup.floatCompensatingMeans[i]
floatPoints = append(floatPoints, promql.FPoint{T: t, F: f})
}
}
}
Expand All @@ -318,9 +321,11 @@ func (a *Aggregation) constructSeriesData(thisGroup *group, start int64, interva
}

types.Float64SlicePool.Put(thisGroup.floatSums, a.MemoryConsumptionTracker)
types.Float64SlicePool.Put(thisGroup.floatCompensatingMeans, a.MemoryConsumptionTracker)
types.BoolSlicePool.Put(thisGroup.floatPresent, a.MemoryConsumptionTracker)
types.HistogramSlicePool.Put(thisGroup.histogramSums, a.MemoryConsumptionTracker)
thisGroup.floatSums = nil
thisGroup.floatCompensatingMeans = nil
thisGroup.floatPresent = nil
thisGroup.histogramSums = nil
thisGroup.histogramPointCount = 0
Expand Down Expand Up @@ -383,11 +388,17 @@ func (a *Aggregation) accumulateSeriesIntoGroup(s types.InstantVectorSeriesData,
return err
}

seriesGroup.floatCompensatingMeans, err = types.Float64SlicePool.Get(steps, a.MemoryConsumptionTracker)
if err != nil {
return err
}

seriesGroup.floatPresent, err = types.BoolSlicePool.Get(steps, a.MemoryConsumptionTracker)
if err != nil {
return err
}
seriesGroup.floatSums = seriesGroup.floatSums[:steps]
seriesGroup.floatCompensatingMeans = seriesGroup.floatCompensatingMeans[:steps]
seriesGroup.floatPresent = seriesGroup.floatPresent[:steps]
}

Expand All @@ -402,7 +413,7 @@ func (a *Aggregation) accumulateSeriesIntoGroup(s types.InstantVectorSeriesData,

for _, p := range s.Floats {
idx := (p.T - start) / interval
seriesGroup.floatSums[idx] += p.F
seriesGroup.floatSums[idx], seriesGroup.floatCompensatingMeans[idx] = kahanSumInc(p.F, seriesGroup.floatSums[idx], seriesGroup.floatCompensatingMeans[idx])
seriesGroup.floatPresent[idx] = true
}

Expand Down Expand Up @@ -456,3 +467,20 @@ func (g groupSorter) Swap(i, j int) {
g.metadata[i], g.metadata[j] = g.metadata[j], g.metadata[i]
g.groups[i], g.groups[j] = g.groups[j], g.groups[i]
}

// TODO(jhesketh): This will likely be useful elsewhere, so we may move this in the future.
// (We could also consider exporting this from the upstream promql package).
func kahanSumInc(inc, sum, c float64) (newSum, newC float64) {
t := sum + inc
switch {
case math.IsInf(t, 0):
c = 0

// Using Neumaier improvement, swap if next term larger than sum.
case math.Abs(sum) >= math.Abs(inc):
c += (sum - t) + inc
default:
c += (inc - t) + sum
}
return t, c
}
63 changes: 63 additions & 0 deletions pkg/streamingpromql/operators/aggregation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ package operators

import (
"context"
"fmt"
"math"
"testing"
"time"

Expand Down Expand Up @@ -262,3 +264,64 @@ func labelsToSeriesMetadata(lbls []labels.Labels) []types.SeriesMetadata {

return m
}

// This test is copied from promql/functions_internal_test.go.
// If kahanSumInc can be exported we can remove this.
func TestKahanSumInc(t *testing.T) {
testCases := map[string]struct {
first float64
second float64
expected float64
}{
"+Inf + anything = +Inf": {
first: math.Inf(1),
second: 2.0,
expected: math.Inf(1),
},
"-Inf + anything = -Inf": {
first: math.Inf(-1),
second: 2.0,
expected: math.Inf(-1),
},
"+Inf + -Inf = NaN": {
first: math.Inf(1),
second: math.Inf(-1),
expected: math.NaN(),
},
"NaN + anything = NaN": {
first: math.NaN(),
second: 2,
expected: math.NaN(),
},
"NaN + Inf = NaN": {
first: math.NaN(),
second: math.Inf(1),
expected: math.NaN(),
},
"NaN + -Inf = NaN": {
first: math.NaN(),
second: math.Inf(-1),
expected: math.NaN(),
},
}

runTest := func(t *testing.T, a, b, expected float64) {
t.Run(fmt.Sprintf("%v + %v = %v", a, b, expected), func(t *testing.T) {
sum, c := kahanSumInc(b, a, 0)
result := sum + c

if math.IsNaN(expected) {
require.Truef(t, math.IsNaN(result), "expected result to be NaN, but got %v (from %v + %v)", result, sum, c)
} else {
require.Equalf(t, expected, result, "expected result to be %v, but got %v (from %v + %v)", expected, result, sum, c)
}
})
}

for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
runTest(t, testCase.first, testCase.second, testCase.expected)
runTest(t, testCase.second, testCase.first, testCase.expected)
})
}
}
5 changes: 2 additions & 3 deletions pkg/streamingpromql/testdata/upstream/aggregators.test
Original file line number Diff line number Diff line change
Expand Up @@ -601,9 +601,8 @@ load 10s
data{test="nan",group="2",point="a"} 2
data{test="nan",group="2",point="b"} NaN

# Unsupported by streaming engine.
# eval instant at 1m sum(data{test="ten"})
# {} 10
eval instant at 1m sum(data{test="ten"})
{} 10

eval instant at 1m sum by (group) (data{test="pos_inf"})
{group="1"} Inf
Expand Down

0 comments on commit f0e7d90

Please sign in to comment.