Skip to content

Commit

Permalink
Backport part of #9588 to r310 (#9605)
Browse files Browse the repository at this point in the history
* Fix another possible source of `FloatHistogram` corruption

* Add test
  • Loading branch information
charleskorn authored Oct 14, 2024
1 parent a74218d commit a492668
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 6 deletions.
14 changes: 10 additions & 4 deletions pkg/streamingpromql/operators/series_merging.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,13 +223,14 @@ func mergeOneSideHistograms(data []types.InstantVectorSeriesData, sourceSeriesIn

// We're going to create a new slice, so return this one to the pool.
// We must defer here, rather than at the end, as the merge loop below reslices Histograms.
// We deliberately want this to happen at the end of mergeOneSideHistograms, so calling defer like this in a loop is fine.
// FIXME: this isn't correct for many-to-one / one-to-many matching - we'll need the series again (unless we store the result of the merge)
defer types.HPointSlicePool.Put(second.Histograms, memoryConsumptionTracker)
defer clearAndReturnHPointSlice(second.Histograms, memoryConsumptionTracker) // We're going to retain all the FloatHistogram instances, so don't leave them in the slice to be reused.

if len(second.Histograms) == 0 {
// We've reached the end of all series with histograms.
// However, continue iterating so we can return all of the slices.
// (As they may have length 0, but a non-zero capacity).
// However, continue iterating so we can return all of the slices to the pool.
// (As they may have length 0, but a non-zero capacity and so still need to be returned to the pool).
continue
}
mergedSize += len(second.Histograms)
Expand All @@ -252,7 +253,7 @@ func mergeOneSideHistograms(data []types.InstantVectorSeriesData, sourceSeriesIn
// We'll return the other slices in the for loop below.
// We must defer here, rather than at the end, as the merge loop below reslices Histograms.
// FIXME: this isn't correct for many-to-one / one-to-many matching - we'll need the series again (unless we store the result of the merge)
defer types.HPointSlicePool.Put(data[0].Histograms, memoryConsumptionTracker)
defer clearAndReturnHPointSlice(data[0].Histograms, memoryConsumptionTracker) // We're going to retain all the FloatHistogram instances, so don't leave them in the slice to be reused.

// Re-slice data with just the series with histograms to make the rest of our job easier
// Because we aren't re-sorting here it doesn't matter that sourceSeriesIndices remains longer.
Expand Down Expand Up @@ -322,6 +323,11 @@ func mergeOneSideHistograms(data []types.InstantVectorSeriesData, sourceSeriesIn
}
}

func clearAndReturnHPointSlice(s []promql.HPoint, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) {
clear(s)
types.HPointSlicePool.Put(s, memoryConsumptionTracker)
}

type MergeConflict struct {
firstConflictingSeriesIndex int // Will be the index of any input series in the case of a mixed float / histogram conflict.
secondConflictingSeriesIndex int // Will be -1 in the case of a mixed float / histogram conflict.
Expand Down
22 changes: 20 additions & 2 deletions pkg/streamingpromql/operators/series_merging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ func TestMergeSeries(t *testing.T) {
input []types.InstantVectorSeriesData
sourceSeriesIndices []int

expectedOutput types.InstantVectorSeriesData
expectedConflict *MergeConflict
expectedOutput types.InstantVectorSeriesData
expectedConflict *MergeConflict
expectInputHPointSlicesCleared bool
}{
"no input series": {
input: []types.InstantVectorSeriesData{},
Expand Down Expand Up @@ -120,6 +121,7 @@ func TestMergeSeries(t *testing.T) {
{T: 6, H: &histogram.FloatHistogram{Count: 60, Sum: 600}},
},
},
expectInputHPointSlicesCleared: true,
},
"two float only input series with no overlap, series not in time order": {
input: []types.InstantVectorSeriesData{
Expand Down Expand Up @@ -178,6 +180,7 @@ func TestMergeSeries(t *testing.T) {
{T: 6, H: &histogram.FloatHistogram{Count: 60, Sum: 600}},
},
},
expectInputHPointSlicesCleared: true,
},
"three float only input series with no overlap": {
input: []types.InstantVectorSeriesData{
Expand Down Expand Up @@ -256,6 +259,7 @@ func TestMergeSeries(t *testing.T) {
{T: 9, H: &histogram.FloatHistogram{Count: 90, Sum: 900}},
},
},
expectInputHPointSlicesCleared: true,
},
"two float only input series with overlap": {
input: []types.InstantVectorSeriesData{
Expand Down Expand Up @@ -314,6 +318,7 @@ func TestMergeSeries(t *testing.T) {
{T: 6, H: &histogram.FloatHistogram{Count: 60, Sum: 600}},
},
},
expectInputHPointSlicesCleared: true,
},
"three float only input series with overlap": {
input: []types.InstantVectorSeriesData{
Expand Down Expand Up @@ -380,6 +385,7 @@ func TestMergeSeries(t *testing.T) {
{T: 6, H: &histogram.FloatHistogram{Count: 60, Sum: 600}},
},
},
expectInputHPointSlicesCleared: true,
},
"float only input series with conflict": {
input: []types.InstantVectorSeriesData{
Expand Down Expand Up @@ -546,6 +552,7 @@ func TestMergeSeries(t *testing.T) {
{T: 6, H: &histogram.FloatHistogram{Count: 6, Sum: 6}},
},
},
expectInputHPointSlicesCleared: true,
},
"mixed float and histogram input series, series not in time order": {
input: []types.InstantVectorSeriesData{
Expand Down Expand Up @@ -581,6 +588,7 @@ func TestMergeSeries(t *testing.T) {
{T: 6, H: &histogram.FloatHistogram{Count: 6, Sum: 6}},
},
},
expectInputHPointSlicesCleared: true,
},
"mixed float and histogram input series, series in conflict on different types": {
input: []types.InstantVectorSeriesData{
Expand Down Expand Up @@ -690,6 +698,7 @@ func TestMergeSeries(t *testing.T) {
{T: 10, H: &histogram.FloatHistogram{Count: 10, Sum: 10}},
},
},
expectInputHPointSlicesCleared: true,
},
"input series exclusively with floats, histograms, or mixed, all overlap": {
input: []types.InstantVectorSeriesData{
Expand Down Expand Up @@ -743,6 +752,7 @@ func TestMergeSeries(t *testing.T) {
{T: 10, H: &histogram.FloatHistogram{Count: 10, Sum: 10}},
},
},
expectInputHPointSlicesCleared: true,
},
}

Expand All @@ -758,6 +768,14 @@ func TestMergeSeries(t *testing.T) {
require.Nil(t, conflict)
require.Equal(t, testCase.expectedOutput, result)
}

if testCase.expectInputHPointSlicesCleared {
for sliceIdx, slice := range testCase.input {
for pointIdx, point := range slice.Histograms {
require.Nilf(t, point.H, "expected point at index %v of HPoint slice at index %v to have been cleared, but it was not", pointIdx, sliceIdx)
}
}
}
})
}
}

0 comments on commit a492668

Please sign in to comment.