Skip to content

Commit

Permalink
Mimir query engine: fix issue where sum and avg results over nati…
Browse files Browse the repository at this point in the history
…ve histograms can be corrupted (#9260) (#9264)

* Fix issue where `sum` and `avg` results over native histograms can be corrupted

* Add changelog entry

* Add test and fix further issue

* Address PR feedback: add comments

* Address PR feedback: clarify comments

* Address PR feedback: move `RemoveReferencesToRetainedHistogram` to `InstantVectorSeriesData` type

* Remove references to uncopied histograms in later points

* Fix typo in comment

(cherry picked from commit 5a18dba)

Co-authored-by: Charles Korn <charleskorn@users.noreply.github.com>
  • Loading branch information
grafanabot and charleskorn authored Sep 11, 2024
1 parent 028f495 commit 4b9859a
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 24 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
* [CHANGE] Query-scheduler: Remove the experimental `-query-scheduler.use-multi-algorithm-query-queue` flag. The new multi-algorithm tree queue is always used for the scheduler. #9210
* [FEATURE] Alertmanager: Added `-alertmanager.log-parsing-label-matchers` to control logging when parsing label matchers. This flag is intended to be used with `-alertmanager.utf8-strict-mode-enabled` to validate UTF-8 strict mode is working as intended. The default value is `false`. #9173
* [FEATURE] Alertmanager: Added `-alertmanager.utf8-migration-logging-enabled` to enable logging of tenant configurations that are incompatible with UTF-8 strict mode. The default value is `false`. #9174
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9192 #9194 #9196 #9201 #9212 #9225
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9192 #9194 #9196 #9201 #9212 #9225 #9260
* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8841 #8842 #8845 #8853 #8886 #8988
* What it is:
* When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path.
Expand Down
72 changes: 72 additions & 0 deletions pkg/streamingpromql/aggregations/aggregations_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// SPDX-License-Identifier: AGPL-3.0-only

package aggregations

import (
"testing"

"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/promql"
"github.com/stretchr/testify/require"

"github.com/grafana/mimir/pkg/streamingpromql/limiting"
"github.com/grafana/mimir/pkg/streamingpromql/types"
)

func TestAggregationGroupNativeHistogramSafety(t *testing.T) {
// This test exists to ensure that AggregationGroup implementations correctly remove FloatHistogram instances they retain,
// so that retained FloatHistogram instances are not mutated when the HPoint slice is later reused by something else.

// These are the aggregations that retain the first FloatHistogram instance for an output timestamp.
groups := map[string]AggregationGroup{
"sum": &SumAggregationGroup{},
"avg": &AvgAggregationGroup{},
}

for name, group := range groups {
t.Run(name, func(t *testing.T) {
memoryConsumptionTracker := limiting.NewMemoryConsumptionTracker(0, nil)

// First series: all histograms should be nil-ed out after returning, as they're all retained for use.
histograms, err := types.HPointSlicePool.Get(4, memoryConsumptionTracker)
require.NoError(t, err)

h1 := &histogram.FloatHistogram{Sum: 1}
h2 := &histogram.FloatHistogram{Sum: 2}
h3 := &histogram.FloatHistogram{Sum: 3}
histograms = append(histograms, promql.HPoint{T: 0, H: h1})
histograms = append(histograms, promql.HPoint{T: 1, H: h2})
histograms = append(histograms, promql.HPoint{T: 2, H: h2}) // T=2 is a lookback and refers to the same histogram as T=1.
histograms = append(histograms, promql.HPoint{T: 4, H: h3})
series := types.InstantVectorSeriesData{Histograms: histograms}

require.NoError(t, group.AccumulateSeries(series, 5, 0, 1, memoryConsumptionTracker, nil))
require.Equal(t, []promql.HPoint{{T: 0, H: nil}, {T: 1, H: nil}, {T: 2, H: nil}, {T: 4, H: nil}}, series.Histograms, "all histograms retained should be nil-ed out after accumulating series")

// Second series: all histograms that are not retained should be nil-ed out after returning.
histograms, err = types.HPointSlicePool.Get(5, memoryConsumptionTracker)
require.NoError(t, err)
h4 := &histogram.FloatHistogram{Sum: 4}
h5 := &histogram.FloatHistogram{Sum: 5}
h6 := &histogram.FloatHistogram{Sum: 6}
histograms = append(histograms, promql.HPoint{T: 0, H: h4})
histograms = append(histograms, promql.HPoint{T: 1, H: h5})
histograms = append(histograms, promql.HPoint{T: 2, H: h6})
histograms = append(histograms, promql.HPoint{T: 3, H: h6}) // T=3 is a lookback and refers to the same histogram as T=2.
histograms = append(histograms, promql.HPoint{T: 4, H: h6})
series = types.InstantVectorSeriesData{Histograms: histograms}

require.NoError(t, group.AccumulateSeries(series, 5, 0, 1, memoryConsumptionTracker, nil))

expected := []promql.HPoint{
{T: 0, H: h4}, // h4 not retained (added to h1)
{T: 1, H: h5}, // h5 not retained (added to h2)
{T: 2, H: nil}, // h6 is retained for T=3
{T: 3, H: nil}, // h6 is retained for this point
{T: 4, H: nil}, // h6 is retained for T=4
}

require.Equal(t, expected, series.Histograms, "all histograms retained should be nil-ed out after accumulating series")
})
}
}
39 changes: 25 additions & 14 deletions pkg/streamingpromql/aggregations/avg.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,42 +166,53 @@ func (g *AvgAggregationGroup) accumulateHistograms(data types.InstantVectorSerie

var lastUncopiedHistogram *histogram.FloatHistogram

for i, p := range data.Histograms {
idx := (p.T - start) / interval
g.groupSeriesCounts[idx]++
for inputIdx, p := range data.Histograms {
outputIdx := (p.T - start) / interval
g.groupSeriesCounts[outputIdx]++

if g.histograms[idx] == invalidCombinationOfHistograms {
if g.histograms[outputIdx] == invalidCombinationOfHistograms {
// We've already seen an invalid combination of histograms at this timestamp. Ignore this point.
continue
}

if g.histograms[idx] == nil {
if lastUncopiedHistogram == p.H {
// Ensure the FloatHistogram instance is not reused when the HPoint slice is reused, as we're retaining a reference to it.
data.Histograms[inputIdx].H = nil
}

if g.histograms[outputIdx] == nil {
if lastUncopiedHistogram == p.H {
// We've already used this histogram for a previous point due to lookback.
// Make a copy of it so we don't modify the other point.
g.histograms[idx] = p.H.Copy()
g.histograms[outputIdx] = p.H.Copy()
g.histogramPointCount++

continue
}
// This is the first time we have seen this histogram.

// We have not previously used this histogram as the start of an output point.
// It is safe to store it and modify it later without copying, as we'll make copies above if the same histogram is used for subsequent points.
g.histograms[idx] = p.H
g.histograms[outputIdx] = p.H
g.histogramPointCount++
lastUncopiedHistogram = p.H

// Ensure the FloatHistogram instance is not reused when the HPoint slice data.Histograms is reused, including if it was used at previous points.
data.RemoveReferencesToRetainedHistogram(p.H, inputIdx)

continue
}

// Check if the next point in data.Histograms is the same as the current point (due to lookback)
// If it is, create a copy before modifying it.
toAdd := p.H
if i+1 < len(data.Histograms) && data.Histograms[i+1].H == p.H {
if inputIdx+1 < len(data.Histograms) && data.Histograms[inputIdx+1].H == p.H {
toAdd = p.H.Copy()
}

_, err = toAdd.Sub(g.histograms[idx])
_, err = toAdd.Sub(g.histograms[outputIdx])
if err != nil {
// Unable to subtract histograms (likely due to invalid combination of histograms). Make sure we don't emit a sample at this timestamp.
g.histograms[idx] = invalidCombinationOfHistograms
g.histograms[outputIdx] = invalidCombinationOfHistograms
g.histogramPointCount--

if err := functions.NativeHistogramErrorToAnnotation(err, emitAnnotationFunc); err != nil {
Expand All @@ -211,11 +222,11 @@ func (g *AvgAggregationGroup) accumulateHistograms(data types.InstantVectorSerie
continue
}

toAdd.Div(g.groupSeriesCounts[idx])
_, err = g.histograms[idx].Add(toAdd)
toAdd.Div(g.groupSeriesCounts[outputIdx])
_, err = g.histograms[outputIdx].Add(toAdd)
if err != nil {
// Unable to add histograms together (likely due to invalid combination of histograms). Make sure we don't emit a sample at this timestamp.
g.histograms[idx] = invalidCombinationOfHistograms
g.histograms[outputIdx] = invalidCombinationOfHistograms
g.histogramPointCount--

if err := functions.NativeHistogramErrorToAnnotation(err, emitAnnotationFunc); err != nil {
Expand Down
29 changes: 20 additions & 9 deletions pkg/streamingpromql/aggregations/sum.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,34 +89,45 @@ func (g *SumAggregationGroup) accumulateHistograms(data types.InstantVectorSerie
g.histogramSums = g.histogramSums[:steps]
}

for _, p := range data.Histograms {
idx := (p.T - start) / interval
for inputIdx, p := range data.Histograms {
outputIdx := (p.T - start) / interval

if g.histogramSums[idx] == invalidCombinationOfHistograms {
if g.histogramSums[outputIdx] == invalidCombinationOfHistograms {
// We've already seen an invalid combination of histograms at this timestamp. Ignore this point.
continue
}

if g.histogramSums[idx] == nil {
if lastUncopiedHistogram == p.H {
// Ensure the FloatHistogram instance is not reused when the HPoint slice is reused, as we're retaining a reference to it.
data.Histograms[inputIdx].H = nil
}

if g.histogramSums[outputIdx] == nil {
if lastUncopiedHistogram == p.H {
// We've already used this histogram for a previous point due to lookback.
// Make a copy of it so we don't modify the other point.
g.histogramSums[idx] = p.H.Copy()
g.histogramSums[outputIdx] = p.H.Copy()
g.histogramPointCount++

continue
}
// This is the first time we have seen this histogram.

// We have not previously used this histogram as the start of an output point.
// It is safe to store it and modify it later without copying, as we'll make copies above if the same histogram is used for subsequent points.
g.histogramSums[idx] = p.H
g.histogramSums[outputIdx] = p.H
g.histogramPointCount++
lastUncopiedHistogram = p.H

// Ensure the FloatHistogram instance is not reused when the HPoint slice data.Histograms is reused, including if it was used at previous points.
data.RemoveReferencesToRetainedHistogram(p.H, inputIdx)

continue
}

g.histogramSums[idx], err = g.histogramSums[idx].Add(p.H)
g.histogramSums[outputIdx], err = g.histogramSums[outputIdx].Add(p.H)
if err != nil {
// Unable to add histograms together (likely due to invalid combination of histograms). Make sure we don't emit a sample at this timestamp.
g.histogramSums[idx] = invalidCombinationOfHistograms
g.histogramSums[outputIdx] = invalidCombinationOfHistograms
g.histogramPointCount--

if err := functions.NativeHistogramErrorToAnnotation(err, emitAnnotationFunc); err != nil {
Expand Down
13 changes: 13 additions & 0 deletions pkg/streamingpromql/types/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,19 @@ type InstantVectorSeriesData struct {
Histograms []promql.HPoint
}

// RemoveReferencesToRetainedHistogram searches backwards through d.Histograms, starting at lastIndex, removing any
// points that reference h, stopping once a different FloatHistogram is reached.
func (d InstantVectorSeriesData) RemoveReferencesToRetainedHistogram(h *histogram.FloatHistogram, lastIndex int) {
for i := lastIndex; i >= 0; i-- {
if d.Histograms[i].H != h {
// We've reached a different histogram. We're done.
return
}

d.Histograms[i].H = nil
}
}

type InstantVectorSeriesDataIterator struct {
data InstantVectorSeriesData
fIndex int
Expand Down

0 comments on commit 4b9859a

Please sign in to comment.