From dd18bcb36fa33d3183ab1bc0069d37c8827338ef Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Mon, 14 Oct 2024 13:20:28 +1100 Subject: [PATCH] MQE: don't reuse `FloatHistogram` instances for successive points in instant vector selectors (#9588) * Don't allow returning the same FloatHistogram instance for multiple samples in a series * Simplify logic where we can, now that we don't need to check for FloatHistograms that have been used multiple times * Add reminder to comment * Add changelog entry * Fix another possible source of `FloatHistogram` corruption * Add test --- CHANGELOG.md | 2 +- .../aggregations/aggregations_test.go | 29 +++++---- pkg/streamingpromql/aggregations/avg.go | 37 ++---------- pkg/streamingpromql/aggregations/sum.go | 23 +------ pkg/streamingpromql/functions/math.go | 5 -- .../operators/instant_vector_selector.go | 18 +++--- .../operators/instant_vector_selector_test.go | 60 ++++++------------- .../operators/series_merging.go | 14 +++-- .../operators/series_merging_test.go | 22 ++++++- .../vector_vector_binary_operation.go | 1 + pkg/streamingpromql/types/data.go | 18 +----- 11 files changed, 87 insertions(+), 142 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e530644230b..47bdd28aef0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,7 +13,7 @@ * `cortex_alertmanager_state_replication_failed_total` * `cortex_alertmanager_alerts` * `cortex_alertmanager_silences` -* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 #9507 #9518 #9531 #9532 #9533 #9553 +* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 #9507 #9518 #9531 #9532 #9533 #9553 #9588 * [FEATURE] Query-frontend: added experimental configuration options `query-frontend.cache-errors` and `query-frontend.results-cache-ttl-for-errors` to allow non-transient responses to be cached. When set to `true` error responses from hitting limits or bad data are cached for a short TTL. #9028 * [FEATURE] gRPC: Support S2 compression. #9322 * `-alertmanager.alertmanager-client.grpc-compression=s2` diff --git a/pkg/streamingpromql/aggregations/aggregations_test.go b/pkg/streamingpromql/aggregations/aggregations_test.go index 69788ceba8c..2dd25012c8f 100644 --- a/pkg/streamingpromql/aggregations/aggregations_test.go +++ b/pkg/streamingpromql/aggregations/aggregations_test.go @@ -37,10 +37,11 @@ func TestAggregationGroupNativeHistogramSafety(t *testing.T) { h1 := &histogram.FloatHistogram{Sum: 1} h2 := &histogram.FloatHistogram{Sum: 2} h3 := &histogram.FloatHistogram{Sum: 3} + h4 := &histogram.FloatHistogram{Sum: 4} histograms = append(histograms, promql.HPoint{T: 0, H: h1}) histograms = append(histograms, promql.HPoint{T: 1, H: h2}) - histograms = append(histograms, promql.HPoint{T: 2, H: h2}) // T=2 is a lookback and refers to the same histogram as T=1. - histograms = append(histograms, promql.HPoint{T: 4, H: h3}) + histograms = append(histograms, promql.HPoint{T: 2, H: h3}) + histograms = append(histograms, promql.HPoint{T: 4, H: h4}) series := types.InstantVectorSeriesData{Histograms: histograms} require.NoError(t, group.AccumulateSeries(series, timeRange, memoryConsumptionTracker, nil)) @@ -49,24 +50,26 @@ func TestAggregationGroupNativeHistogramSafety(t *testing.T) { // Second series: all histograms that are not retained should be nil-ed out after returning. histograms, err = types.HPointSlicePool.Get(5, memoryConsumptionTracker) require.NoError(t, err) - h4 := &histogram.FloatHistogram{Sum: 4} h5 := &histogram.FloatHistogram{Sum: 5} h6 := &histogram.FloatHistogram{Sum: 6} - histograms = append(histograms, promql.HPoint{T: 0, H: h4}) - histograms = append(histograms, promql.HPoint{T: 1, H: h5}) - histograms = append(histograms, promql.HPoint{T: 2, H: h6}) - histograms = append(histograms, promql.HPoint{T: 3, H: h6}) // T=3 is a lookback and refers to the same histogram as T=2. - histograms = append(histograms, promql.HPoint{T: 4, H: h6}) + h7 := &histogram.FloatHistogram{Sum: 7} + h8 := &histogram.FloatHistogram{Sum: 8} + h9 := &histogram.FloatHistogram{Sum: 9} + histograms = append(histograms, promql.HPoint{T: 0, H: h5}) + histograms = append(histograms, promql.HPoint{T: 1, H: h6}) + histograms = append(histograms, promql.HPoint{T: 2, H: h7}) + histograms = append(histograms, promql.HPoint{T: 3, H: h8}) + histograms = append(histograms, promql.HPoint{T: 4, H: h9}) series = types.InstantVectorSeriesData{Histograms: histograms} require.NoError(t, group.AccumulateSeries(series, timeRange, memoryConsumptionTracker, nil)) expected := []promql.HPoint{ - {T: 0, H: h4}, // h4 not retained (added to h1) - {T: 1, H: h5}, // h5 not retained (added to h2) - {T: 2, H: nil}, // h6 is retained for T=3 - {T: 3, H: nil}, // h6 is retained for this point - {T: 4, H: nil}, // h6 is retained for T=3 + {T: 0, H: h5}, // h5 not retained (added to h1) + {T: 1, H: h6}, // h6 not retained (added to h2) + {T: 2, H: h7}, // h7 not retained (added to h3) + {T: 3, H: nil}, // h8 is retained for this point + {T: 4, H: h9}, // h9 not retained (added to h4) } require.Equal(t, expected, series.Histograms, "all histograms retained should be nil-ed out after accumulating series") diff --git a/pkg/streamingpromql/aggregations/avg.go b/pkg/streamingpromql/aggregations/avg.go index 843170e0bdd..71c770be4f0 100644 --- a/pkg/streamingpromql/aggregations/avg.go +++ b/pkg/streamingpromql/aggregations/avg.go @@ -165,8 +165,6 @@ func (g *AvgAggregationGroup) accumulateHistograms(data types.InstantVectorSerie g.histograms = g.histograms[:timeRange.StepCount] } - var lastUncopiedHistogram *histogram.FloatHistogram - for inputIdx, p := range data.Histograms { outputIdx := timeRange.PointIndex(p.T) g.groupSeriesCounts[outputIdx]++ @@ -176,41 +174,18 @@ func (g *AvgAggregationGroup) accumulateHistograms(data types.InstantVectorSerie continue } - if lastUncopiedHistogram == p.H { - // Ensure the FloatHistogram instance is not reused when the HPoint slice is reused, as we're retaining a reference to it. - data.Histograms[inputIdx].H = nil - } - if g.histograms[outputIdx] == nil { - if lastUncopiedHistogram == p.H { - // We've already used this histogram for a previous point due to lookback. - // Make a copy of it so we don't modify the other point. - g.histograms[outputIdx] = p.H.Copy() - g.histogramPointCount++ - - continue - } - - // We have not previously used this histogram as the start of an output point. - // It is safe to store it and modify it later without copying, as we'll make copies above if the same histogram is used for subsequent points. + // First sample for this output point, retain the histogram as-is. g.histograms[outputIdx] = p.H g.histogramPointCount++ - lastUncopiedHistogram = p.H - // Ensure the FloatHistogram instance is not reused when the HPoint slice data.Histograms is reused, including if it was used at previous points. - data.RemoveReferencesToRetainedHistogram(p.H, inputIdx) + // Ensure the FloatHistogram instance is not reused when the HPoint slice data.Histograms is reused. + data.Histograms[inputIdx].H = nil continue } - // Check if the next point in data.Histograms is the same as the current point (due to lookback) - // If it is, create a copy before modifying it. - toAdd := p.H - if inputIdx+1 < len(data.Histograms) && data.Histograms[inputIdx+1].H == p.H { - toAdd = p.H.Copy() - } - - _, err = toAdd.Sub(g.histograms[outputIdx]) + _, err = p.H.Sub(g.histograms[outputIdx]) if err != nil { // Unable to subtract histograms (likely due to invalid combination of histograms). Make sure we don't emit a sample at this timestamp. g.histograms[outputIdx] = invalidCombinationOfHistograms @@ -223,8 +198,8 @@ func (g *AvgAggregationGroup) accumulateHistograms(data types.InstantVectorSerie continue } - toAdd.Div(g.groupSeriesCounts[outputIdx]) - _, err = g.histograms[outputIdx].Add(toAdd) + p.H.Div(g.groupSeriesCounts[outputIdx]) + _, err = g.histograms[outputIdx].Add(p.H) if err != nil { // Unable to add histograms together (likely due to invalid combination of histograms). Make sure we don't emit a sample at this timestamp. g.histograms[outputIdx] = invalidCombinationOfHistograms diff --git a/pkg/streamingpromql/aggregations/sum.go b/pkg/streamingpromql/aggregations/sum.go index cf14ccb67a9..b182c87d2fb 100644 --- a/pkg/streamingpromql/aggregations/sum.go +++ b/pkg/streamingpromql/aggregations/sum.go @@ -78,7 +78,6 @@ func (g *SumAggregationGroup) accumulateFloats(data types.InstantVectorSeriesDat func (g *SumAggregationGroup) accumulateHistograms(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc functions.EmitAnnotationFunc) error { var err error - var lastUncopiedHistogram *histogram.FloatHistogram if len(data.Histograms) > 0 && g.histogramSums == nil { // First series with histogram values for this group, populate it. @@ -97,29 +96,13 @@ func (g *SumAggregationGroup) accumulateHistograms(data types.InstantVectorSerie continue } - if lastUncopiedHistogram == p.H { - // Ensure the FloatHistogram instance is not reused when the HPoint slice is reused, as we're retaining a reference to it. - data.Histograms[inputIdx].H = nil - } - if g.histogramSums[outputIdx] == nil { - if lastUncopiedHistogram == p.H { - // We've already used this histogram for a previous point due to lookback. - // Make a copy of it so we don't modify the other point. - g.histogramSums[outputIdx] = p.H.Copy() - g.histogramPointCount++ - - continue - } - - // We have not previously used this histogram as the start of an output point. - // It is safe to store it and modify it later without copying, as we'll make copies above if the same histogram is used for subsequent points. + // First sample for this output point, retain the histogram as-is. g.histogramSums[outputIdx] = p.H g.histogramPointCount++ - lastUncopiedHistogram = p.H - // Ensure the FloatHistogram instance is not reused when the HPoint slice data.Histograms is reused, including if it was used at previous points. - data.RemoveReferencesToRetainedHistogram(p.H, inputIdx) + // Ensure the FloatHistogram instance is not reused when the HPoint slice data.Histograms is reused. + data.Histograms[inputIdx].H = nil continue } diff --git a/pkg/streamingpromql/functions/math.go b/pkg/streamingpromql/functions/math.go index 7e2756596a2..f13a7de32b5 100644 --- a/pkg/streamingpromql/functions/math.go +++ b/pkg/streamingpromql/functions/math.go @@ -58,11 +58,6 @@ var UnaryNegation InstantVectorSeriesFunction = func(seriesData types.InstantVec } for i := range seriesData.Histograms { - if i > 0 && seriesData.Histograms[i].H == seriesData.Histograms[i-1].H { - // Previous point shares the same histogram instance, which we've already negated, so don't negate it again. - continue - } - seriesData.Histograms[i].H.Mul(-1) // Mul modifies the histogram in-place, so we don't need to do anything with the result here. } diff --git a/pkg/streamingpromql/operators/instant_vector_selector.go b/pkg/streamingpromql/operators/instant_vector_selector.go index 84807792a09..7c10ddaa4b7 100644 --- a/pkg/streamingpromql/operators/instant_vector_selector.go +++ b/pkg/streamingpromql/operators/instant_vector_selector.go @@ -89,9 +89,8 @@ func (v *InstantVectorSelector) NextSeries(ctx context.Context) (types.InstantVe t, f = v.memoizedIterator.At() case chunkenc.ValHistogram, chunkenc.ValFloatHistogram: if atT := v.memoizedIterator.AtT(); atT == lastHistogramT && lastHistogram != nil { - // We're still looking at the last histogram we used, don't bother creating another FloatHistogram. - // Consuming operators are expected to check for the same FloatHistogram instance used at multiple points and copy it - // if they are going to mutate it, so this is safe to do. + // We're still looking at the last histogram we used, don't bother creating another FloatHistogram yet as we might not need it. + // If we're going to return this histogram, we'll make a copy below. t, h = atT, lastHistogram } else { t, h = v.memoizedIterator.AtFloatHistogram() @@ -109,12 +108,9 @@ func (v *InstantVectorSelector) NextSeries(ctx context.Context) (types.InstantVe } if h != nil { if t == lastHistogramT && lastHistogram != nil { - // Reuse exactly the same FloatHistogram as last time. + // Reuse exactly the same FloatHistogram as last time, don't bother creating another FloatHistogram yet. // PeekPrev can return a new FloatHistogram instance with the same underlying bucket slices as a previous call - // to AtFloatHistogram. - // Consuming operators are expected to check for the same FloatHistogram instance used at multiple points and copy - // it if they are going to mutate it, but consuming operators don't check the underlying bucket slices, so without - // this, we can end up with incorrect query results. + // to AtFloatHistogram, so if we're going to return this histogram, we'll make a copy below. h = lastHistogram } } @@ -136,6 +132,12 @@ func (v *InstantVectorSelector) NextSeries(ctx context.Context) (types.InstantVe return types.InstantVectorSeriesData{}, err } } + + if t == lastHistogramT { + // We're returning a histogram we've previously used, so make a copy of it now. + h = h.Copy() + } + data.Histograms = append(data.Histograms, promql.HPoint{T: stepT, H: h}) lastHistogramT = t lastHistogram = h diff --git a/pkg/streamingpromql/operators/instant_vector_selector_test.go b/pkg/streamingpromql/operators/instant_vector_selector_test.go index 42f6fdf5503..ec2aed797ff 100644 --- a/pkg/streamingpromql/operators/instant_vector_selector_test.go +++ b/pkg/streamingpromql/operators/instant_vector_selector_test.go @@ -4,6 +4,7 @@ package operators import ( "context" + "fmt" "testing" "time" @@ -18,14 +19,14 @@ import ( ) func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) { - requireNotSame := func(t *testing.T, h1, h2 *histogram.FloatHistogram) { - require.NotSame(t, h1, h2, "must not point to the same *FloatHistogram") - - requireNotSameSlices(t, h1.PositiveSpans, h2.PositiveSpans, "positive spans") - requireNotSameSlices(t, h1.NegativeSpans, h2.NegativeSpans, "negative spans") - requireNotSameSlices(t, h1.PositiveBuckets, h2.PositiveBuckets, "positive buckets") - requireNotSameSlices(t, h1.NegativeBuckets, h2.NegativeBuckets, "negative buckets") - requireNotSameSlices(t, h1.CustomValues, h2.CustomValues, "custom values") + requireNotSame := func(t *testing.T, h1, h2 *histogram.FloatHistogram, context string) { + require.NotSamef(t, h1, h2, "%v: must not point to the same *FloatHistogram", context) + + requireNotSameSlices(t, h1.PositiveSpans, h2.PositiveSpans, "positive spans", context) + requireNotSameSlices(t, h1.NegativeSpans, h2.NegativeSpans, "negative spans", context) + requireNotSameSlices(t, h1.PositiveBuckets, h2.PositiveBuckets, "positive buckets", context) + requireNotSameSlices(t, h1.NegativeBuckets, h2.NegativeBuckets, "negative buckets", context) + requireNotSameSlices(t, h1.CustomValues, h2.CustomValues, "custom values", context) } testCases := map[string]struct { @@ -45,9 +46,6 @@ func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) { require.Equal(t, 5.0, points[0].H.Sum) require.Equal(t, 20.0, points[1].H.Sum) require.Equal(t, 21.0, points[2].H.Sum) - - requireNotSame(t, points[0].H, points[1].H) - requireNotSame(t, points[1].H, points[2].H) }, }, "different histograms at each point, some due to lookback": { @@ -63,10 +61,6 @@ func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) { require.Equal(t, 5.0, points[1].H.Sum) require.Equal(t, 20.0, points[2].H.Sum) require.Equal(t, 21.0, points[3].H.Sum) - - requireNotSame(t, points[0].H, points[1].H) - requireNotSame(t, points[1].H, points[2].H) - requireNotSame(t, points[2].H, points[3].H) }, }, "same histogram at each point due to lookback": { @@ -80,9 +74,6 @@ func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) { require.Equal(t, 5.0, points[0].H.Sum) require.Equal(t, 5.0, points[1].H.Sum) require.Equal(t, 5.0, points[2].H.Sum) - - require.Same(t, points[0].H, points[1].H) - require.Same(t, points[1].H, points[2].H) }, }, "same histogram at each point not due to lookback": { @@ -95,8 +86,6 @@ func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) { require.Len(t, points, 2) require.Equal(t, 5.0, points[0].H.Sum) require.Equal(t, 5.0, points[1].H.Sum) - - requireNotSame(t, points[0].H, points[1].H) }, }, "last point is from lookback and is the same as the previous point": { @@ -111,9 +100,6 @@ func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) { require.Equal(t, 3.0, points[0].H.Sum) require.Equal(t, 5.0, points[1].H.Sum) require.Equal(t, 5.0, points[2].H.Sum) - - requireNotSame(t, points[0].H, points[1].H) - require.Same(t, points[1].H, points[2].H) }, }, "last point is from lookback but is not the same as the previous point": { @@ -128,9 +114,6 @@ func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) { require.Equal(t, 3.0, points[0].H.Sum) require.Equal(t, 5.0, points[1].H.Sum) require.Equal(t, 20.0, points[2].H.Sum) - - requireNotSame(t, points[0].H, points[1].H) - requireNotSame(t, points[1].H, points[2].H) }, }, @@ -145,11 +128,6 @@ func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) { require.Equal(t, 3.0, points[0].H.Sum) require.Equal(t, 5.0, points[1].H.Sum) require.Equal(t, 3.0, points[2].H.Sum) - - requireNotSame(t, points[0].H, points[1].H) - requireNotSame(t, points[1].H, points[2].H) - - requireNotSame(t, points[0].H, points[2].H) }, }, "different histograms should have different spans": { @@ -160,7 +138,6 @@ func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) { stepCount: 2, check: func(t *testing.T, points []promql.HPoint, _ []promql.FPoint) { require.Len(t, points, 2) - requireNotSame(t, points[0].H, points[1].H) }, }, "successive histograms returned due to lookback should create different histograms at each point": { @@ -172,7 +149,6 @@ func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) { stepCount: 3, check: func(t *testing.T, points []promql.HPoint, _ []promql.FPoint) { require.Len(t, points, 2) - requireNotSame(t, points[0].H, points[1].H) }, }, "lookback points in middle of series reuse existing histogram": { @@ -183,9 +159,6 @@ func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) { stepCount: 5, check: func(t *testing.T, points []promql.HPoint, _ []promql.FPoint) { require.Len(t, points, 4) - requireNotSame(t, points[0].H, points[2].H) - require.Same(t, points[0].H, points[1].H) - require.Same(t, points[2].H, points[3].H) }, }, // FIXME: this test currently fails due to https://github.com/prometheus/prometheus/issues/14172 @@ -201,8 +174,6 @@ func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) { // require.Equal(t, 3.0, hPoints[0].H.Sum) // require.Equal(t, 3.0, hPoints[1].H.Sum) // - // require.Same(t, hPoints[0].H, hPoints[1].H) - // // require.Equal(t, []promql.FPoint{{T: 60000, F: 2}}, fPoints) // }, //}, @@ -234,16 +205,23 @@ func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) { series, err := selector.NextSeries(ctx) require.NoError(t, err) testCase.check(t, series.Histograms, series.Floats) + + for i := 1; i < len(series.Histograms); i++ { + first := series.Histograms[i-1].H + second := series.Histograms[i].H + + requireNotSame(t, first, second, fmt.Sprintf("histograms for points at index %v and %v in %v", i-1, i, series.Histograms)) + } }) } } -func requireNotSameSlices[T any](t *testing.T, s1, s2 []T, description string) { - require.NotSamef(t, s1, s2, "must not point to the same %v slice", description) +func requireNotSameSlices[T any](t *testing.T, s1, s2 []T, description string, context string) { + require.NotSamef(t, s1, s2, "%v: must not point to the same %v slice", context, description) // require.NotSame only checks the slice headers are different. It does not check that the slices do not point the same underlying arrays. // So specifically check if the first elements are different. if len(s1) > 0 && len(s2) > 0 { - require.NotSamef(t, &s1[0], &s2[0], "must not point to the same underlying %v array", description) + require.NotSamef(t, &s1[0], &s2[0], "%v: must not point to the same underlying %v array", context, description) } } diff --git a/pkg/streamingpromql/operators/series_merging.go b/pkg/streamingpromql/operators/series_merging.go index 7c3dd8b8077..fcddf1cd90e 100644 --- a/pkg/streamingpromql/operators/series_merging.go +++ b/pkg/streamingpromql/operators/series_merging.go @@ -223,13 +223,14 @@ func mergeOneSideHistograms(data []types.InstantVectorSeriesData, sourceSeriesIn // We're going to create a new slice, so return this one to the pool. // We must defer here, rather than at the end, as the merge loop below reslices Histograms. + // We deliberately want this to happen at the end of mergeOneSideHistograms, so calling defer like this in a loop is fine. // FIXME: this isn't correct for many-to-one / one-to-many matching - we'll need the series again (unless we store the result of the merge) - defer types.HPointSlicePool.Put(second.Histograms, memoryConsumptionTracker) + defer clearAndReturnHPointSlice(second.Histograms, memoryConsumptionTracker) // We're going to retain all the FloatHistogram instances, so don't leave them in the slice to be reused. if len(second.Histograms) == 0 { // We've reached the end of all series with histograms. - // However, continue iterating so we can return all of the slices. - // (As they may have length 0, but a non-zero capacity). + // However, continue iterating so we can return all of the slices to the pool. + // (As they may have length 0, but a non-zero capacity and so still need to be returned to the pool). continue } mergedSize += len(second.Histograms) @@ -252,7 +253,7 @@ func mergeOneSideHistograms(data []types.InstantVectorSeriesData, sourceSeriesIn // We'll return the other slices in the for loop below. // We must defer here, rather than at the end, as the merge loop below reslices Histograms. // FIXME: this isn't correct for many-to-one / one-to-many matching - we'll need the series again (unless we store the result of the merge) - defer types.HPointSlicePool.Put(data[0].Histograms, memoryConsumptionTracker) + defer clearAndReturnHPointSlice(data[0].Histograms, memoryConsumptionTracker) // We're going to retain all the FloatHistogram instances, so don't leave them in the slice to be reused. // Re-slice data with just the series with histograms to make the rest of our job easier // Because we aren't re-sorting here it doesn't matter that sourceSeriesIndices remains longer. @@ -322,6 +323,11 @@ func mergeOneSideHistograms(data []types.InstantVectorSeriesData, sourceSeriesIn } } +func clearAndReturnHPointSlice(s []promql.HPoint, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) { + clear(s) + types.HPointSlicePool.Put(s, memoryConsumptionTracker) +} + type MergeConflict struct { firstConflictingSeriesIndex int // Will be the index of any input series in the case of a mixed float / histogram conflict. secondConflictingSeriesIndex int // Will be -1 in the case of a mixed float / histogram conflict. diff --git a/pkg/streamingpromql/operators/series_merging_test.go b/pkg/streamingpromql/operators/series_merging_test.go index 0212c68b483..026947df4e5 100644 --- a/pkg/streamingpromql/operators/series_merging_test.go +++ b/pkg/streamingpromql/operators/series_merging_test.go @@ -18,8 +18,9 @@ func TestMergeSeries(t *testing.T) { input []types.InstantVectorSeriesData sourceSeriesIndices []int - expectedOutput types.InstantVectorSeriesData - expectedConflict *MergeConflict + expectedOutput types.InstantVectorSeriesData + expectedConflict *MergeConflict + expectInputHPointSlicesCleared bool }{ "no input series": { input: []types.InstantVectorSeriesData{}, @@ -120,6 +121,7 @@ func TestMergeSeries(t *testing.T) { {T: 6, H: &histogram.FloatHistogram{Count: 60, Sum: 600}}, }, }, + expectInputHPointSlicesCleared: true, }, "two float only input series with no overlap, series not in time order": { input: []types.InstantVectorSeriesData{ @@ -178,6 +180,7 @@ func TestMergeSeries(t *testing.T) { {T: 6, H: &histogram.FloatHistogram{Count: 60, Sum: 600}}, }, }, + expectInputHPointSlicesCleared: true, }, "three float only input series with no overlap": { input: []types.InstantVectorSeriesData{ @@ -256,6 +259,7 @@ func TestMergeSeries(t *testing.T) { {T: 9, H: &histogram.FloatHistogram{Count: 90, Sum: 900}}, }, }, + expectInputHPointSlicesCleared: true, }, "two float only input series with overlap": { input: []types.InstantVectorSeriesData{ @@ -314,6 +318,7 @@ func TestMergeSeries(t *testing.T) { {T: 6, H: &histogram.FloatHistogram{Count: 60, Sum: 600}}, }, }, + expectInputHPointSlicesCleared: true, }, "three float only input series with overlap": { input: []types.InstantVectorSeriesData{ @@ -380,6 +385,7 @@ func TestMergeSeries(t *testing.T) { {T: 6, H: &histogram.FloatHistogram{Count: 60, Sum: 600}}, }, }, + expectInputHPointSlicesCleared: true, }, "float only input series with conflict": { input: []types.InstantVectorSeriesData{ @@ -546,6 +552,7 @@ func TestMergeSeries(t *testing.T) { {T: 6, H: &histogram.FloatHistogram{Count: 6, Sum: 6}}, }, }, + expectInputHPointSlicesCleared: true, }, "mixed float and histogram input series, series not in time order": { input: []types.InstantVectorSeriesData{ @@ -581,6 +588,7 @@ func TestMergeSeries(t *testing.T) { {T: 6, H: &histogram.FloatHistogram{Count: 6, Sum: 6}}, }, }, + expectInputHPointSlicesCleared: true, }, "mixed float and histogram input series, series in conflict on different types": { input: []types.InstantVectorSeriesData{ @@ -690,6 +698,7 @@ func TestMergeSeries(t *testing.T) { {T: 10, H: &histogram.FloatHistogram{Count: 10, Sum: 10}}, }, }, + expectInputHPointSlicesCleared: true, }, "input series exclusively with floats, histograms, or mixed, all overlap": { input: []types.InstantVectorSeriesData{ @@ -743,6 +752,7 @@ func TestMergeSeries(t *testing.T) { {T: 10, H: &histogram.FloatHistogram{Count: 10, Sum: 10}}, }, }, + expectInputHPointSlicesCleared: true, }, } @@ -758,6 +768,14 @@ func TestMergeSeries(t *testing.T) { require.Nil(t, conflict) require.Equal(t, testCase.expectedOutput, result) } + + if testCase.expectInputHPointSlicesCleared { + for sliceIdx, slice := range testCase.input { + for pointIdx, point := range slice.Histograms { + require.Nilf(t, point.H, "expected point at index %v of HPoint slice at index %v to have been cleared, but it was not", pointIdx, sliceIdx) + } + } + } }) } } diff --git a/pkg/streamingpromql/operators/vector_vector_binary_operation.go b/pkg/streamingpromql/operators/vector_vector_binary_operation.go index a91278e80b9..1d9d708215d 100644 --- a/pkg/streamingpromql/operators/vector_vector_binary_operation.go +++ b/pkg/streamingpromql/operators/vector_vector_binary_operation.go @@ -654,6 +654,7 @@ func (b *VectorVectorBinaryOperation) Close() { type binaryOperationFunc func(lhs, rhs float64, hlhs, hrhs *histogram.FloatHistogram) (float64, *histogram.FloatHistogram, bool, error) // FIXME(jhesketh): Investigate avoiding copying histograms for binary ops. +// We would need nil-out the retained FloatHistogram instances in their original HPoint slices, to avoid them being modified when the slice is returned to the pool. var arithmeticAndComparisonOperationFuncs = map[parser.ItemType]binaryOperationFunc{ parser.ADD: func(lhs, rhs float64, hlhs, hrhs *histogram.FloatHistogram) (float64, *histogram.FloatHistogram, bool, error) { if hlhs != nil && hrhs != nil { diff --git a/pkg/streamingpromql/types/data.go b/pkg/streamingpromql/types/data.go index 240f19ba470..2ea14d9fa2c 100644 --- a/pkg/streamingpromql/types/data.go +++ b/pkg/streamingpromql/types/data.go @@ -27,26 +27,10 @@ type InstantVectorSeriesData struct { // Histograms contains histogram samples for this series. // Samples must be sorted in timestamp order, earliest timestamps first. // Samples must not have duplicate timestamps. - // HPoint contains a pointer to a histogram, and consecutive HPoints may contain a reference - // to the same FloatHistogram. - // It is therefore important to check for references to the same FloatHistogram in - // subsequent points before mutating it. + // Samples must not share FloatHistogram instances. Histograms []promql.HPoint } -// RemoveReferencesToRetainedHistogram searches backwards through d.Histograms, starting at lastIndex, removing any -// points that reference h, stopping once a different FloatHistogram is reached. -func (d InstantVectorSeriesData) RemoveReferencesToRetainedHistogram(h *histogram.FloatHistogram, lastIndex int) { - for i := lastIndex; i >= 0; i-- { - if d.Histograms[i].H != h { - // We've reached a different histogram. We're done. - return - } - - d.Histograms[i].H = nil - } -} - type InstantVectorSeriesDataIterator struct { data InstantVectorSeriesData fIndex int