Skip to content

Commit

Permalink
Use types.QueryTimeRange in aggregations
Browse files Browse the repository at this point in the history
  • Loading branch information
charleskorn committed Sep 12, 2024
1 parent 58c24bb commit 0f4c351
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 57 deletions.
7 changes: 5 additions & 2 deletions pkg/streamingpromql/aggregations/aggregations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ package aggregations

import (
"testing"
"time"

"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/promql"
"github.com/stretchr/testify/require"

Expand All @@ -26,6 +28,7 @@ func TestAggregationGroupNativeHistogramSafety(t *testing.T) {
for name, group := range groups {
t.Run(name, func(t *testing.T) {
memoryConsumptionTracker := limiting.NewMemoryConsumptionTracker(0, nil)
timeRange := types.NewRangeQueryTimeRange(timestamp.Time(0), timestamp.Time(4), time.Millisecond)

// First series: all histograms should be nil-ed out after returning, as they're all retained for use.
histograms, err := types.HPointSlicePool.Get(4, memoryConsumptionTracker)
Expand All @@ -40,7 +43,7 @@ func TestAggregationGroupNativeHistogramSafety(t *testing.T) {
histograms = append(histograms, promql.HPoint{T: 4, H: h3})
series := types.InstantVectorSeriesData{Histograms: histograms}

require.NoError(t, group.AccumulateSeries(series, 5, 0, 1, memoryConsumptionTracker, nil))
require.NoError(t, group.AccumulateSeries(series, timeRange, memoryConsumptionTracker, nil))
require.Equal(t, []promql.HPoint{{T: 0, H: nil}, {T: 1, H: nil}, {T: 2, H: nil}, {T: 4, H: nil}}, series.Histograms, "all histograms retained should be nil-ed out after accumulating series")

// Second series: all histograms that are not retained should be nil-ed out after returning.
Expand All @@ -56,7 +59,7 @@ func TestAggregationGroupNativeHistogramSafety(t *testing.T) {
histograms = append(histograms, promql.HPoint{T: 4, H: h6})
series = types.InstantVectorSeriesData{Histograms: histograms}

require.NoError(t, group.AccumulateSeries(series, 5, 0, 1, memoryConsumptionTracker, nil))
require.NoError(t, group.AccumulateSeries(series, timeRange, memoryConsumptionTracker, nil))

expected := []promql.HPoint{
{T: 0, H: h4}, // h4 not retained (added to h1)
Expand Down
48 changes: 24 additions & 24 deletions pkg/streamingpromql/aggregations/avg.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type AvgAggregationGroup struct {
groupSeriesCounts []float64
}

func (g *AvgAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, steps int, start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc functions.EmitAnnotationFunc) error {
func (g *AvgAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc functions.EmitAnnotationFunc) error {
defer types.PutInstantVectorSeriesData(data, memoryConsumptionTracker)
if len(data.Floats) == 0 && len(data.Histograms) == 0 {
// Nothing to do
Expand All @@ -42,51 +42,51 @@ func (g *AvgAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesDat
var err error

if g.groupSeriesCounts == nil {
g.groupSeriesCounts, err = types.Float64SlicePool.Get(steps, memoryConsumptionTracker)
g.groupSeriesCounts, err = types.Float64SlicePool.Get(timeRange.StepCount, memoryConsumptionTracker)
if err != nil {
return err
}
g.groupSeriesCounts = g.groupSeriesCounts[:steps]
g.groupSeriesCounts = g.groupSeriesCounts[:timeRange.StepCount]
}

err = g.accumulateFloats(data, steps, start, interval, memoryConsumptionTracker)
err = g.accumulateFloats(data, timeRange, memoryConsumptionTracker)
if err != nil {
return err
}
err = g.accumulateHistograms(data, steps, start, interval, memoryConsumptionTracker, emitAnnotationFunc)
err = g.accumulateHistograms(data, timeRange, memoryConsumptionTracker, emitAnnotationFunc)
if err != nil {
return err
}

return nil
}

func (g *AvgAggregationGroup) accumulateFloats(data types.InstantVectorSeriesData, steps int, start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) error {
func (g *AvgAggregationGroup) accumulateFloats(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) error {
var err error
if len(data.Floats) > 0 && g.floats == nil {
// First series with float values for this group, populate it.
g.floats, err = types.Float64SlicePool.Get(steps, memoryConsumptionTracker)
g.floats, err = types.Float64SlicePool.Get(timeRange.StepCount, memoryConsumptionTracker)
if err != nil {
return err
}

g.floatCompensatingMeans, err = types.Float64SlicePool.Get(steps, memoryConsumptionTracker)
g.floatCompensatingMeans, err = types.Float64SlicePool.Get(timeRange.StepCount, memoryConsumptionTracker)
if err != nil {
return err
}

g.floatPresent, err = types.BoolSlicePool.Get(steps, memoryConsumptionTracker)
g.floatPresent, err = types.BoolSlicePool.Get(timeRange.StepCount, memoryConsumptionTracker)
if err != nil {
return err
}

g.floats = g.floats[:steps]
g.floatCompensatingMeans = g.floatCompensatingMeans[:steps]
g.floatPresent = g.floatPresent[:steps]
g.floats = g.floats[:timeRange.StepCount]
g.floatCompensatingMeans = g.floatCompensatingMeans[:timeRange.StepCount]
g.floatPresent = g.floatPresent[:timeRange.StepCount]
}

for _, p := range data.Floats {
idx := (p.T - start) / interval
idx := (p.T - timeRange.StartT) / timeRange.IntervalMs
g.groupSeriesCounts[idx]++
if !g.floatPresent[idx] {
// The first point is just taken as the value
Expand All @@ -108,19 +108,19 @@ func (g *AvgAggregationGroup) accumulateFloats(data types.InstantVectorSeriesDat
// instead of continuing to sum up, we revert to incremental
// calculation of the mean value from here on.
if g.floatMeans == nil {
g.floatMeans, err = types.Float64SlicePool.Get(steps, memoryConsumptionTracker)
g.floatMeans, err = types.Float64SlicePool.Get(timeRange.StepCount, memoryConsumptionTracker)
if err != nil {
return err
}
g.floatMeans = g.floatMeans[:steps]
g.floatMeans = g.floatMeans[:timeRange.StepCount]
}
if g.incrementalMeans == nil {
// First time we are using an incremental mean. Track which samples will be incremental.
g.incrementalMeans, err = types.BoolSlicePool.Get(steps, memoryConsumptionTracker)
g.incrementalMeans, err = types.BoolSlicePool.Get(timeRange.StepCount, memoryConsumptionTracker)
if err != nil {
return err
}
g.incrementalMeans = g.incrementalMeans[:steps]
g.incrementalMeans = g.incrementalMeans[:timeRange.StepCount]
}
g.incrementalMeans[idx] = true
g.floatMeans[idx] = g.floats[idx] / (g.groupSeriesCounts[idx] - 1)
Expand Down Expand Up @@ -153,21 +153,21 @@ func (g *AvgAggregationGroup) accumulateFloats(data types.InstantVectorSeriesDat
return nil
}

func (g *AvgAggregationGroup) accumulateHistograms(data types.InstantVectorSeriesData, steps int, start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc functions.EmitAnnotationFunc) error {
func (g *AvgAggregationGroup) accumulateHistograms(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc functions.EmitAnnotationFunc) error {
var err error
if len(data.Histograms) > 0 && g.histograms == nil {
// First series with histogram values for this group, populate it.
g.histograms, err = types.HistogramSlicePool.Get(steps, memoryConsumptionTracker)
g.histograms, err = types.HistogramSlicePool.Get(timeRange.StepCount, memoryConsumptionTracker)
if err != nil {
return err
}
g.histograms = g.histograms[:steps]
g.histograms = g.histograms[:timeRange.StepCount]
}

var lastUncopiedHistogram *histogram.FloatHistogram

for inputIdx, p := range data.Histograms {
outputIdx := (p.T - start) / interval
outputIdx := (p.T - timeRange.StartT) / timeRange.IntervalMs
g.groupSeriesCounts[outputIdx]++

if g.histograms[outputIdx] == invalidCombinationOfHistograms {
Expand Down Expand Up @@ -280,7 +280,7 @@ func (g *AvgAggregationGroup) reconcileAndCountFloatPoints() (int, bool) {
return floatPointCount, haveMixedFloatsAndHistograms
}

func (g *AvgAggregationGroup) ComputeOutputSeries(start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) {
func (g *AvgAggregationGroup) ComputeOutputSeries(timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) {
floatPointCount, hasMixedData := g.reconcileAndCountFloatPoints()
var floatPoints []promql.FPoint
var err error
Expand All @@ -293,7 +293,7 @@ func (g *AvgAggregationGroup) ComputeOutputSeries(start int64, interval int64, m

for i, havePoint := range g.floatPresent {
if havePoint {
t := start + int64(i)*interval
t := timeRange.StartT + int64(i)*timeRange.IntervalMs
var f float64
if g.incrementalMeans != nil && g.incrementalMeans[i] {
f = g.floatMeans[i] + g.floatCompensatingMeans[i]
Expand All @@ -314,7 +314,7 @@ func (g *AvgAggregationGroup) ComputeOutputSeries(start int64, interval int64, m

for i, h := range g.histograms {
if h != nil && h != invalidCombinationOfHistograms {
t := start + int64(i)*interval
t := timeRange.StartT + int64(i)*timeRange.IntervalMs
histogramPoints = append(histogramPoints, promql.HPoint{T: t, H: h.Compact(0)})
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/streamingpromql/aggregations/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import (
// AggregationGroup accumulates series that have been grouped together and computes the output series data.
type AggregationGroup interface {
// AccumulateSeries takes in a series as part of the group
AccumulateSeries(data types.InstantVectorSeriesData, steps int, start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc functions.EmitAnnotationFunc) error
AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc functions.EmitAnnotationFunc) error
// ComputeOutputSeries does any final calculations and returns the grouped series data
ComputeOutputSeries(start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error)
ComputeOutputSeries(timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error)
}

type AggregationGroupFactory func() AggregationGroup
Expand Down
18 changes: 9 additions & 9 deletions pkg/streamingpromql/aggregations/min_max.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,43 +49,43 @@ func (g *MinMaxAggregationGroup) minAccumulatePoint(idx int64, f float64) {
}
}

func (g *MinMaxAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, steps int, start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ functions.EmitAnnotationFunc) error {
func (g *MinMaxAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ functions.EmitAnnotationFunc) error {
if (len(data.Floats) > 0 || len(data.Histograms) > 0) && g.floatValues == nil {
// Even if we only have histograms, we have to populate the float slices, as we'll treat histograms as if they have value 0.
// This is consistent with Prometheus but may not be the desired value: https://github.com/prometheus/prometheus/issues/14711

var err error
// First series with float values for this group, populate it.
g.floatValues, err = types.Float64SlicePool.Get(steps, memoryConsumptionTracker)
g.floatValues, err = types.Float64SlicePool.Get(timeRange.StepCount, memoryConsumptionTracker)
if err != nil {
return err
}

g.floatPresent, err = types.BoolSlicePool.Get(steps, memoryConsumptionTracker)
g.floatPresent, err = types.BoolSlicePool.Get(timeRange.StepCount, memoryConsumptionTracker)
if err != nil {
return err
}
g.floatValues = g.floatValues[:steps]
g.floatPresent = g.floatPresent[:steps]
g.floatValues = g.floatValues[:timeRange.StepCount]
g.floatPresent = g.floatPresent[:timeRange.StepCount]
}

for _, p := range data.Floats {
idx := (p.T - start) / interval
idx := (p.T - timeRange.StartT) / timeRange.IntervalMs
g.accumulatePoint(idx, p.F)
}

// If a histogram exists max treats it as 0. We have to detect this here so that we return a 0 value instead of nothing.
// This is consistent with Prometheus but may not be the desired value: https://github.com/prometheus/prometheus/issues/14711
for _, p := range data.Histograms {
idx := (p.T - start) / interval
idx := (p.T - timeRange.StartT) / timeRange.IntervalMs
g.accumulatePoint(idx, 0)
}

types.PutInstantVectorSeriesData(data, memoryConsumptionTracker)
return nil
}

func (g *MinMaxAggregationGroup) ComputeOutputSeries(start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) {
func (g *MinMaxAggregationGroup) ComputeOutputSeries(timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) {
floatPointCount := 0
for _, p := range g.floatPresent {
if p {
Expand All @@ -102,7 +102,7 @@ func (g *MinMaxAggregationGroup) ComputeOutputSeries(start int64, interval int64

for i, havePoint := range g.floatPresent {
if havePoint {
t := start + int64(i)*interval
t := timeRange.StartT + int64(i)*timeRange.IntervalMs
f := g.floatValues[i]
floatPoints = append(floatPoints, promql.FPoint{T: t, F: f})
}
Expand Down
36 changes: 18 additions & 18 deletions pkg/streamingpromql/aggregations/sum.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,73 +24,73 @@ type SumAggregationGroup struct {
histogramPointCount int
}

func (g *SumAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, steps int, start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc functions.EmitAnnotationFunc) error {
func (g *SumAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc functions.EmitAnnotationFunc) error {
defer types.PutInstantVectorSeriesData(data, memoryConsumptionTracker)
if len(data.Floats) == 0 && len(data.Histograms) == 0 {
// Nothing to do
return nil
}

err := g.accumulateFloats(data, steps, start, interval, memoryConsumptionTracker)
err := g.accumulateFloats(data, timeRange, memoryConsumptionTracker)
if err != nil {
return err
}
err = g.accumulateHistograms(data, steps, start, interval, memoryConsumptionTracker, emitAnnotationFunc)
err = g.accumulateHistograms(data, timeRange, memoryConsumptionTracker, emitAnnotationFunc)
if err != nil {
return err
}

return nil
}

func (g *SumAggregationGroup) accumulateFloats(data types.InstantVectorSeriesData, steps int, start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) error {
func (g *SumAggregationGroup) accumulateFloats(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) error {
var err error

if len(data.Floats) > 0 && g.floatSums == nil {
// First series with float values for this group, populate it.
g.floatSums, err = types.Float64SlicePool.Get(steps, memoryConsumptionTracker)
g.floatSums, err = types.Float64SlicePool.Get(timeRange.StepCount, memoryConsumptionTracker)
if err != nil {
return err
}

g.floatCompensatingValues, err = types.Float64SlicePool.Get(steps, memoryConsumptionTracker)
g.floatCompensatingValues, err = types.Float64SlicePool.Get(timeRange.StepCount, memoryConsumptionTracker)
if err != nil {
return err
}

g.floatPresent, err = types.BoolSlicePool.Get(steps, memoryConsumptionTracker)
g.floatPresent, err = types.BoolSlicePool.Get(timeRange.StepCount, memoryConsumptionTracker)
if err != nil {
return err
}
g.floatSums = g.floatSums[:steps]
g.floatCompensatingValues = g.floatCompensatingValues[:steps]
g.floatPresent = g.floatPresent[:steps]
g.floatSums = g.floatSums[:timeRange.StepCount]
g.floatCompensatingValues = g.floatCompensatingValues[:timeRange.StepCount]
g.floatPresent = g.floatPresent[:timeRange.StepCount]
}

for _, p := range data.Floats {
idx := (p.T - start) / interval
idx := (p.T - timeRange.StartT) / timeRange.IntervalMs
g.floatSums[idx], g.floatCompensatingValues[idx] = floats.KahanSumInc(p.F, g.floatSums[idx], g.floatCompensatingValues[idx])
g.floatPresent[idx] = true
}

return nil
}

func (g *SumAggregationGroup) accumulateHistograms(data types.InstantVectorSeriesData, steps int, start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc functions.EmitAnnotationFunc) error {
func (g *SumAggregationGroup) accumulateHistograms(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc functions.EmitAnnotationFunc) error {
var err error
var lastUncopiedHistogram *histogram.FloatHistogram

if len(data.Histograms) > 0 && g.histogramSums == nil {
// First series with histogram values for this group, populate it.
g.histogramSums, err = types.HistogramSlicePool.Get(steps, memoryConsumptionTracker)
g.histogramSums, err = types.HistogramSlicePool.Get(timeRange.StepCount, memoryConsumptionTracker)
if err != nil {
return err
}
g.histogramSums = g.histogramSums[:steps]
g.histogramSums = g.histogramSums[:timeRange.StepCount]
}

for inputIdx, p := range data.Histograms {
outputIdx := (p.T - start) / interval
outputIdx := (p.T - timeRange.StartT) / timeRange.IntervalMs

if g.histogramSums[outputIdx] == invalidCombinationOfHistograms {
// We've already seen an invalid combination of histograms at this timestamp. Ignore this point.
Expand Down Expand Up @@ -181,7 +181,7 @@ func (g *SumAggregationGroup) reconcileAndCountFloatPoints() (int, bool) {
return floatPointCount, haveMixedFloatsAndHistograms
}

func (g *SumAggregationGroup) ComputeOutputSeries(start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) {
func (g *SumAggregationGroup) ComputeOutputSeries(timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) {
floatPointCount, hasMixedData := g.reconcileAndCountFloatPoints()
var floatPoints []promql.FPoint
var err error
Expand All @@ -193,7 +193,7 @@ func (g *SumAggregationGroup) ComputeOutputSeries(start int64, interval int64, m

for i, havePoint := range g.floatPresent {
if havePoint {
t := start + int64(i)*interval
t := timeRange.StartT + int64(i)*timeRange.IntervalMs
f := g.floatSums[i] + g.floatCompensatingValues[i]
floatPoints = append(floatPoints, promql.FPoint{T: t, F: f})
}
Expand All @@ -209,7 +209,7 @@ func (g *SumAggregationGroup) ComputeOutputSeries(start int64, interval int64, m

for i, h := range g.histogramSums {
if h != nil && h != invalidCombinationOfHistograms {
t := start + int64(i)*interval
t := timeRange.StartT + int64(i)*timeRange.IntervalMs
histogramPoints = append(histogramPoints, promql.HPoint{T: t, H: h.Compact(0)})
}
}
Expand Down
Loading

0 comments on commit 0f4c351

Please sign in to comment.