diff --git a/CHANGELOG.md b/CHANGELOG.md index a92b18ac7e1..498e8cd1287 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,7 +22,7 @@ * [CHANGE] Ingester client: experimental support for client-side circuit breakers, their configuration options (`-ingester.client.circuit-breaker.*`) and metrics (`cortex_ingester_client_circuit_breaker_results_total`, `cortex_ingester_client_circuit_breaker_transitions_total`) were removed. #8802 * [CHANGE] Ingester: circuit breakers do not open in case of per-instance limit errors anymore. Opening can be triggered only in case of push and pull requests exceeding the configured duration. #8854 * [CHANGE] Query-frontend: Return `413 Request Entity Too Large` if a response shard for an `/active_series` request is too large. #8861 -* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 +* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 * [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8841 #8842 #8845 #8853 * What it is: * When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path. diff --git a/pkg/streamingpromql/engine_test.go b/pkg/streamingpromql/engine_test.go index 4e469971450..63ec485351a 100644 --- a/pkg/streamingpromql/engine_test.go +++ b/pkg/streamingpromql/engine_test.go @@ -865,18 +865,18 @@ func TestMemoryConsumptionLimit_SingleQueries(t *testing.T) { // Each series has five samples, which will be rounded up to 8 (the nearest power of two) by the bucketed pool. // At peak we'll hold in memory: - // - the running total for the sum() (a float and a bool at each step, with the number of steps rounded to the nearest power of 2), + // - the running total for the sum() (two floats (due to kahan) and a bool at each step, with the number of steps rounded to the nearest power of 2), // - and the next series from the selector. - rangeQueryExpectedPeak: 8*(types.Float64Size+types.BoolSize) + 8*types.FPointSize, - rangeQueryLimit: 8*(types.Float64Size+types.BoolSize) + 8*types.FPointSize, + rangeQueryExpectedPeak: 8*(2*types.Float64Size+types.BoolSize) + 8*types.FPointSize, + rangeQueryLimit: 8*(2*types.Float64Size+types.BoolSize) + 8*types.FPointSize, // Each series has one sample, which is already a power of two. // At peak we'll hold in memory: - // - the running total for the sum() (a float and a bool), + // - the running total for the sum() (two floats and a bool), // - the next series from the selector, // - and the output sample. - instantQueryExpectedPeak: types.Float64Size + types.BoolSize + types.FPointSize + types.VectorSampleSize, - instantQueryLimit: types.Float64Size + types.BoolSize + types.FPointSize + types.VectorSampleSize, + instantQueryExpectedPeak: 2*types.Float64Size + types.BoolSize + types.FPointSize + types.VectorSampleSize, + instantQueryLimit: 2*types.Float64Size + types.BoolSize + types.FPointSize + types.VectorSampleSize, }, "limit enabled, query selects more samples than limit but should not load all of them into memory at once, and peak consumption is over limit": { expr: "sum(some_metric)", @@ -884,20 +884,20 @@ func TestMemoryConsumptionLimit_SingleQueries(t *testing.T) { // Each series has five samples, which will be rounded up to 8 (the nearest power of two) by the bucketed pool. // At peak we'll hold in memory: - // - the running total for the sum() (a float and a bool at each step, with the number of steps rounded to the nearest power of 2), + // - the running total for the sum() (two floats (due to kahan) and a bool at each step, with the number of steps rounded to the nearest power of 2), // - and the next series from the selector. // The last thing to be allocated is the bool slice for the running total, so that won't contribute to the peak before the query is aborted. - rangeQueryExpectedPeak: 8*types.Float64Size + 8*types.FPointSize, - rangeQueryLimit: 8*(types.Float64Size+types.BoolSize) + 8*types.FPointSize - 1, + rangeQueryExpectedPeak: 8*2*types.Float64Size + 8*types.FPointSize, + rangeQueryLimit: 8*(2*types.Float64Size+types.BoolSize) + 8*types.FPointSize - 1, // Each series has one sample, which is already a power of two. // At peak we'll hold in memory: - // - the running total for the sum() (a float and a bool), + // - the running total for the sum() (two floats and a bool), // - the next series from the selector, // - and the output sample. // The last thing to be allocated is the bool slice for the running total, so that won't contribute to the peak before the query is aborted. - instantQueryExpectedPeak: types.Float64Size + types.FPointSize + types.VectorSampleSize, - instantQueryLimit: types.Float64Size + types.BoolSize + types.FPointSize + types.VectorSampleSize - 1, + instantQueryExpectedPeak: 2*types.Float64Size + types.FPointSize + types.VectorSampleSize, + instantQueryLimit: 2*types.Float64Size + types.BoolSize + types.FPointSize + types.VectorSampleSize - 1, }, "histogram: limit enabled, but query does not exceed limit": { expr: "sum(some_histogram)", diff --git a/pkg/streamingpromql/operators/aggregation.go b/pkg/streamingpromql/operators/aggregation.go index 43fd7419dd8..d8821907fc8 100644 --- a/pkg/streamingpromql/operators/aggregation.go +++ b/pkg/streamingpromql/operators/aggregation.go @@ -9,6 +9,7 @@ import ( "context" "errors" "fmt" + "math" "slices" "sort" "time" @@ -95,10 +96,11 @@ type group struct { lastSeriesIndex int // Sum, presence, and histograms for each step. - floatSums []float64 - floatPresent []bool - histogramSums []*histogram.FloatHistogram - histogramPointCount int + floatSums []float64 + floatCompensatingMeans []float64 // Mean, or "compensating value" for Kahan summation. + floatPresent []bool + histogramSums []*histogram.FloatHistogram + histogramPointCount int } var _ types.InstantVectorOperator = &Aggregation{} @@ -297,7 +299,8 @@ func (a *Aggregation) constructSeriesData(thisGroup *group, start int64, interva for i, havePoint := range thisGroup.floatPresent { if havePoint { t := start + int64(i)*interval - floatPoints = append(floatPoints, promql.FPoint{T: t, F: thisGroup.floatSums[i]}) + f := thisGroup.floatSums[i] + thisGroup.floatCompensatingMeans[i] + floatPoints = append(floatPoints, promql.FPoint{T: t, F: f}) } } } @@ -318,9 +321,11 @@ func (a *Aggregation) constructSeriesData(thisGroup *group, start int64, interva } types.Float64SlicePool.Put(thisGroup.floatSums, a.MemoryConsumptionTracker) + types.Float64SlicePool.Put(thisGroup.floatCompensatingMeans, a.MemoryConsumptionTracker) types.BoolSlicePool.Put(thisGroup.floatPresent, a.MemoryConsumptionTracker) types.HistogramSlicePool.Put(thisGroup.histogramSums, a.MemoryConsumptionTracker) thisGroup.floatSums = nil + thisGroup.floatCompensatingMeans = nil thisGroup.floatPresent = nil thisGroup.histogramSums = nil thisGroup.histogramPointCount = 0 @@ -383,11 +388,17 @@ func (a *Aggregation) accumulateSeriesIntoGroup(s types.InstantVectorSeriesData, return err } + seriesGroup.floatCompensatingMeans, err = types.Float64SlicePool.Get(steps, a.MemoryConsumptionTracker) + if err != nil { + return err + } + seriesGroup.floatPresent, err = types.BoolSlicePool.Get(steps, a.MemoryConsumptionTracker) if err != nil { return err } seriesGroup.floatSums = seriesGroup.floatSums[:steps] + seriesGroup.floatCompensatingMeans = seriesGroup.floatCompensatingMeans[:steps] seriesGroup.floatPresent = seriesGroup.floatPresent[:steps] } @@ -402,7 +413,7 @@ func (a *Aggregation) accumulateSeriesIntoGroup(s types.InstantVectorSeriesData, for _, p := range s.Floats { idx := (p.T - start) / interval - seriesGroup.floatSums[idx] += p.F + seriesGroup.floatSums[idx], seriesGroup.floatCompensatingMeans[idx] = kahanSumInc(p.F, seriesGroup.floatSums[idx], seriesGroup.floatCompensatingMeans[idx]) seriesGroup.floatPresent[idx] = true } @@ -456,3 +467,20 @@ func (g groupSorter) Swap(i, j int) { g.metadata[i], g.metadata[j] = g.metadata[j], g.metadata[i] g.groups[i], g.groups[j] = g.groups[j], g.groups[i] } + +// TODO(jhesketh): This will likely be useful elsewhere, so we may move this in the future. +// (We could also consider exporting this from the upstream promql package). +func kahanSumInc(inc, sum, c float64) (newSum, newC float64) { + t := sum + inc + switch { + case math.IsInf(t, 0): + c = 0 + + // Using Neumaier improvement, swap if next term larger than sum. + case math.Abs(sum) >= math.Abs(inc): + c += (sum - t) + inc + default: + c += (inc - t) + sum + } + return t, c +} diff --git a/pkg/streamingpromql/operators/aggregation_test.go b/pkg/streamingpromql/operators/aggregation_test.go index 59a9e0a0a03..42036165022 100644 --- a/pkg/streamingpromql/operators/aggregation_test.go +++ b/pkg/streamingpromql/operators/aggregation_test.go @@ -4,6 +4,8 @@ package operators import ( "context" + "fmt" + "math" "testing" "time" @@ -262,3 +264,64 @@ func labelsToSeriesMetadata(lbls []labels.Labels) []types.SeriesMetadata { return m } + +// This test is copied from promql/functions_internal_test.go. +// If kahanSumInc can be exported we can remove this. +func TestKahanSumInc(t *testing.T) { + testCases := map[string]struct { + first float64 + second float64 + expected float64 + }{ + "+Inf + anything = +Inf": { + first: math.Inf(1), + second: 2.0, + expected: math.Inf(1), + }, + "-Inf + anything = -Inf": { + first: math.Inf(-1), + second: 2.0, + expected: math.Inf(-1), + }, + "+Inf + -Inf = NaN": { + first: math.Inf(1), + second: math.Inf(-1), + expected: math.NaN(), + }, + "NaN + anything = NaN": { + first: math.NaN(), + second: 2, + expected: math.NaN(), + }, + "NaN + Inf = NaN": { + first: math.NaN(), + second: math.Inf(1), + expected: math.NaN(), + }, + "NaN + -Inf = NaN": { + first: math.NaN(), + second: math.Inf(-1), + expected: math.NaN(), + }, + } + + runTest := func(t *testing.T, a, b, expected float64) { + t.Run(fmt.Sprintf("%v + %v = %v", a, b, expected), func(t *testing.T) { + sum, c := kahanSumInc(b, a, 0) + result := sum + c + + if math.IsNaN(expected) { + require.Truef(t, math.IsNaN(result), "expected result to be NaN, but got %v (from %v + %v)", result, sum, c) + } else { + require.Equalf(t, expected, result, "expected result to be %v, but got %v (from %v + %v)", expected, result, sum, c) + } + }) + } + + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + runTest(t, testCase.first, testCase.second, testCase.expected) + runTest(t, testCase.second, testCase.first, testCase.expected) + }) + } +} diff --git a/pkg/streamingpromql/testdata/upstream/aggregators.test b/pkg/streamingpromql/testdata/upstream/aggregators.test index 9559be17cb6..5e18e28cbbb 100644 --- a/pkg/streamingpromql/testdata/upstream/aggregators.test +++ b/pkg/streamingpromql/testdata/upstream/aggregators.test @@ -601,9 +601,8 @@ load 10s data{test="nan",group="2",point="a"} 2 data{test="nan",group="2",point="b"} NaN -# Unsupported by streaming engine. -# eval instant at 1m sum(data{test="ten"}) -# {} 10 +eval instant at 1m sum(data{test="ten"}) + {} 10 eval instant at 1m sum by (group) (data{test="pos_inf"}) {group="1"} Inf