diff --git a/CHANGELOG.md b/CHANGELOG.md index 140bc43028f..19bc13ca3ad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,7 +33,7 @@ * [CHANGE] Query-scheduler: Remove the experimental `-query-scheduler.use-multi-algorithm-query-queue` flag. The new multi-algorithm tree queue is always used for the scheduler. #9210 * [FEATURE] Alertmanager: Added `-alertmanager.log-parsing-label-matchers` to control logging when parsing label matchers. This flag is intended to be used with `-alertmanager.utf8-strict-mode-enabled` to validate UTF-8 strict mode is working as intended. The default value is `false`. #9173 * [FEATURE] Alertmanager: Added `-alertmanager.utf8-migration-logging-enabled` to enable logging of tenant configurations that are incompatible with UTF-8 strict mode. The default value is `false`. #9174 -* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9192 #9194 #9196 #9201 #9212 #9225 #9260 #9272 #9277 #9278 +* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9192 #9194 #9196 #9201 #9212 #9225 #9260 #9272 #9277 #9278 #9280 * [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8841 #8842 #8845 #8853 #8886 #8988 * What it is: * When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path. diff --git a/pkg/streamingpromql/aggregations/aggregations_test.go b/pkg/streamingpromql/aggregations/aggregations_test.go index 4b401fd5a4e..69788ceba8c 100644 --- a/pkg/streamingpromql/aggregations/aggregations_test.go +++ b/pkg/streamingpromql/aggregations/aggregations_test.go @@ -4,8 +4,10 @@ package aggregations import ( "testing" + "time" "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/promql" "github.com/stretchr/testify/require" @@ -26,6 +28,7 @@ func TestAggregationGroupNativeHistogramSafety(t *testing.T) { for name, group := range groups { t.Run(name, func(t *testing.T) { memoryConsumptionTracker := limiting.NewMemoryConsumptionTracker(0, nil) + timeRange := types.NewRangeQueryTimeRange(timestamp.Time(0), timestamp.Time(4), time.Millisecond) // First series: all histograms should be nil-ed out after returning, as they're all retained for use. histograms, err := types.HPointSlicePool.Get(4, memoryConsumptionTracker) @@ -40,7 +43,7 @@ func TestAggregationGroupNativeHistogramSafety(t *testing.T) { histograms = append(histograms, promql.HPoint{T: 4, H: h3}) series := types.InstantVectorSeriesData{Histograms: histograms} - require.NoError(t, group.AccumulateSeries(series, 5, 0, 1, memoryConsumptionTracker, nil)) + require.NoError(t, group.AccumulateSeries(series, timeRange, memoryConsumptionTracker, nil)) require.Equal(t, []promql.HPoint{{T: 0, H: nil}, {T: 1, H: nil}, {T: 2, H: nil}, {T: 4, H: nil}}, series.Histograms, "all histograms retained should be nil-ed out after accumulating series") // Second series: all histograms that are not retained should be nil-ed out after returning. @@ -56,7 +59,7 @@ func TestAggregationGroupNativeHistogramSafety(t *testing.T) { histograms = append(histograms, promql.HPoint{T: 4, H: h6}) series = types.InstantVectorSeriesData{Histograms: histograms} - require.NoError(t, group.AccumulateSeries(series, 5, 0, 1, memoryConsumptionTracker, nil)) + require.NoError(t, group.AccumulateSeries(series, timeRange, memoryConsumptionTracker, nil)) expected := []promql.HPoint{ {T: 0, H: h4}, // h4 not retained (added to h1) diff --git a/pkg/streamingpromql/aggregations/avg.go b/pkg/streamingpromql/aggregations/avg.go index 11f0ca214f9..1066e229ddf 100644 --- a/pkg/streamingpromql/aggregations/avg.go +++ b/pkg/streamingpromql/aggregations/avg.go @@ -32,7 +32,7 @@ type AvgAggregationGroup struct { groupSeriesCounts []float64 } -func (g *AvgAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, steps int, start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc functions.EmitAnnotationFunc) error { +func (g *AvgAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc functions.EmitAnnotationFunc) error { defer types.PutInstantVectorSeriesData(data, memoryConsumptionTracker) if len(data.Floats) == 0 && len(data.Histograms) == 0 { // Nothing to do @@ -42,18 +42,18 @@ func (g *AvgAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesDat var err error if g.groupSeriesCounts == nil { - g.groupSeriesCounts, err = types.Float64SlicePool.Get(steps, memoryConsumptionTracker) + g.groupSeriesCounts, err = types.Float64SlicePool.Get(timeRange.StepCount, memoryConsumptionTracker) if err != nil { return err } - g.groupSeriesCounts = g.groupSeriesCounts[:steps] + g.groupSeriesCounts = g.groupSeriesCounts[:timeRange.StepCount] } - err = g.accumulateFloats(data, steps, start, interval, memoryConsumptionTracker) + err = g.accumulateFloats(data, timeRange, memoryConsumptionTracker) if err != nil { return err } - err = g.accumulateHistograms(data, steps, start, interval, memoryConsumptionTracker, emitAnnotationFunc) + err = g.accumulateHistograms(data, timeRange, memoryConsumptionTracker, emitAnnotationFunc) if err != nil { return err } @@ -61,32 +61,32 @@ func (g *AvgAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesDat return nil } -func (g *AvgAggregationGroup) accumulateFloats(data types.InstantVectorSeriesData, steps int, start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) error { +func (g *AvgAggregationGroup) accumulateFloats(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) error { var err error if len(data.Floats) > 0 && g.floats == nil { // First series with float values for this group, populate it. - g.floats, err = types.Float64SlicePool.Get(steps, memoryConsumptionTracker) + g.floats, err = types.Float64SlicePool.Get(timeRange.StepCount, memoryConsumptionTracker) if err != nil { return err } - g.floatCompensatingMeans, err = types.Float64SlicePool.Get(steps, memoryConsumptionTracker) + g.floatCompensatingMeans, err = types.Float64SlicePool.Get(timeRange.StepCount, memoryConsumptionTracker) if err != nil { return err } - g.floatPresent, err = types.BoolSlicePool.Get(steps, memoryConsumptionTracker) + g.floatPresent, err = types.BoolSlicePool.Get(timeRange.StepCount, memoryConsumptionTracker) if err != nil { return err } - g.floats = g.floats[:steps] - g.floatCompensatingMeans = g.floatCompensatingMeans[:steps] - g.floatPresent = g.floatPresent[:steps] + g.floats = g.floats[:timeRange.StepCount] + g.floatCompensatingMeans = g.floatCompensatingMeans[:timeRange.StepCount] + g.floatPresent = g.floatPresent[:timeRange.StepCount] } for _, p := range data.Floats { - idx := (p.T - start) / interval + idx := (p.T - timeRange.StartT) / timeRange.IntervalMs g.groupSeriesCounts[idx]++ if !g.floatPresent[idx] { // The first point is just taken as the value @@ -108,19 +108,19 @@ func (g *AvgAggregationGroup) accumulateFloats(data types.InstantVectorSeriesDat // instead of continuing to sum up, we revert to incremental // calculation of the mean value from here on. if g.floatMeans == nil { - g.floatMeans, err = types.Float64SlicePool.Get(steps, memoryConsumptionTracker) + g.floatMeans, err = types.Float64SlicePool.Get(timeRange.StepCount, memoryConsumptionTracker) if err != nil { return err } - g.floatMeans = g.floatMeans[:steps] + g.floatMeans = g.floatMeans[:timeRange.StepCount] } if g.incrementalMeans == nil { // First time we are using an incremental mean. Track which samples will be incremental. - g.incrementalMeans, err = types.BoolSlicePool.Get(steps, memoryConsumptionTracker) + g.incrementalMeans, err = types.BoolSlicePool.Get(timeRange.StepCount, memoryConsumptionTracker) if err != nil { return err } - g.incrementalMeans = g.incrementalMeans[:steps] + g.incrementalMeans = g.incrementalMeans[:timeRange.StepCount] } g.incrementalMeans[idx] = true g.floatMeans[idx] = g.floats[idx] / (g.groupSeriesCounts[idx] - 1) @@ -153,21 +153,21 @@ func (g *AvgAggregationGroup) accumulateFloats(data types.InstantVectorSeriesDat return nil } -func (g *AvgAggregationGroup) accumulateHistograms(data types.InstantVectorSeriesData, steps int, start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc functions.EmitAnnotationFunc) error { +func (g *AvgAggregationGroup) accumulateHistograms(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc functions.EmitAnnotationFunc) error { var err error if len(data.Histograms) > 0 && g.histograms == nil { // First series with histogram values for this group, populate it. - g.histograms, err = types.HistogramSlicePool.Get(steps, memoryConsumptionTracker) + g.histograms, err = types.HistogramSlicePool.Get(timeRange.StepCount, memoryConsumptionTracker) if err != nil { return err } - g.histograms = g.histograms[:steps] + g.histograms = g.histograms[:timeRange.StepCount] } var lastUncopiedHistogram *histogram.FloatHistogram for inputIdx, p := range data.Histograms { - outputIdx := (p.T - start) / interval + outputIdx := (p.T - timeRange.StartT) / timeRange.IntervalMs g.groupSeriesCounts[outputIdx]++ if g.histograms[outputIdx] == invalidCombinationOfHistograms { @@ -280,7 +280,7 @@ func (g *AvgAggregationGroup) reconcileAndCountFloatPoints() (int, bool) { return floatPointCount, haveMixedFloatsAndHistograms } -func (g *AvgAggregationGroup) ComputeOutputSeries(start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) { +func (g *AvgAggregationGroup) ComputeOutputSeries(timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) { floatPointCount, hasMixedData := g.reconcileAndCountFloatPoints() var floatPoints []promql.FPoint var err error @@ -293,7 +293,7 @@ func (g *AvgAggregationGroup) ComputeOutputSeries(start int64, interval int64, m for i, havePoint := range g.floatPresent { if havePoint { - t := start + int64(i)*interval + t := timeRange.StartT + int64(i)*timeRange.IntervalMs var f float64 if g.incrementalMeans != nil && g.incrementalMeans[i] { f = g.floatMeans[i] + g.floatCompensatingMeans[i] @@ -314,7 +314,7 @@ func (g *AvgAggregationGroup) ComputeOutputSeries(start int64, interval int64, m for i, h := range g.histograms { if h != nil && h != invalidCombinationOfHistograms { - t := start + int64(i)*interval + t := timeRange.StartT + int64(i)*timeRange.IntervalMs histogramPoints = append(histogramPoints, promql.HPoint{T: t, H: h.Compact(0)}) } } diff --git a/pkg/streamingpromql/aggregations/common.go b/pkg/streamingpromql/aggregations/common.go index dadc60d9244..5cf8cfb06ca 100644 --- a/pkg/streamingpromql/aggregations/common.go +++ b/pkg/streamingpromql/aggregations/common.go @@ -14,9 +14,9 @@ import ( // AggregationGroup accumulates series that have been grouped together and computes the output series data. type AggregationGroup interface { // AccumulateSeries takes in a series as part of the group - AccumulateSeries(data types.InstantVectorSeriesData, steps int, start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc functions.EmitAnnotationFunc) error + AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc functions.EmitAnnotationFunc) error // ComputeOutputSeries does any final calculations and returns the grouped series data - ComputeOutputSeries(start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) + ComputeOutputSeries(timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) } type AggregationGroupFactory func() AggregationGroup diff --git a/pkg/streamingpromql/aggregations/min_max.go b/pkg/streamingpromql/aggregations/min_max.go index 183814995ab..54810d595d4 100644 --- a/pkg/streamingpromql/aggregations/min_max.go +++ b/pkg/streamingpromql/aggregations/min_max.go @@ -49,35 +49,35 @@ func (g *MinMaxAggregationGroup) minAccumulatePoint(idx int64, f float64) { } } -func (g *MinMaxAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, steps int, start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ functions.EmitAnnotationFunc) error { +func (g *MinMaxAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ functions.EmitAnnotationFunc) error { if (len(data.Floats) > 0 || len(data.Histograms) > 0) && g.floatValues == nil { // Even if we only have histograms, we have to populate the float slices, as we'll treat histograms as if they have value 0. // This is consistent with Prometheus but may not be the desired value: https://github.com/prometheus/prometheus/issues/14711 var err error // First series with float values for this group, populate it. - g.floatValues, err = types.Float64SlicePool.Get(steps, memoryConsumptionTracker) + g.floatValues, err = types.Float64SlicePool.Get(timeRange.StepCount, memoryConsumptionTracker) if err != nil { return err } - g.floatPresent, err = types.BoolSlicePool.Get(steps, memoryConsumptionTracker) + g.floatPresent, err = types.BoolSlicePool.Get(timeRange.StepCount, memoryConsumptionTracker) if err != nil { return err } - g.floatValues = g.floatValues[:steps] - g.floatPresent = g.floatPresent[:steps] + g.floatValues = g.floatValues[:timeRange.StepCount] + g.floatPresent = g.floatPresent[:timeRange.StepCount] } for _, p := range data.Floats { - idx := (p.T - start) / interval + idx := (p.T - timeRange.StartT) / timeRange.IntervalMs g.accumulatePoint(idx, p.F) } // If a histogram exists max treats it as 0. We have to detect this here so that we return a 0 value instead of nothing. // This is consistent with Prometheus but may not be the desired value: https://github.com/prometheus/prometheus/issues/14711 for _, p := range data.Histograms { - idx := (p.T - start) / interval + idx := (p.T - timeRange.StartT) / timeRange.IntervalMs g.accumulatePoint(idx, 0) } @@ -85,7 +85,7 @@ func (g *MinMaxAggregationGroup) AccumulateSeries(data types.InstantVectorSeries return nil } -func (g *MinMaxAggregationGroup) ComputeOutputSeries(start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) { +func (g *MinMaxAggregationGroup) ComputeOutputSeries(timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) { floatPointCount := 0 for _, p := range g.floatPresent { if p { @@ -102,7 +102,7 @@ func (g *MinMaxAggregationGroup) ComputeOutputSeries(start int64, interval int64 for i, havePoint := range g.floatPresent { if havePoint { - t := start + int64(i)*interval + t := timeRange.StartT + int64(i)*timeRange.IntervalMs f := g.floatValues[i] floatPoints = append(floatPoints, promql.FPoint{T: t, F: f}) } diff --git a/pkg/streamingpromql/aggregations/sum.go b/pkg/streamingpromql/aggregations/sum.go index 318d5024c5d..f9c052af98d 100644 --- a/pkg/streamingpromql/aggregations/sum.go +++ b/pkg/streamingpromql/aggregations/sum.go @@ -24,18 +24,18 @@ type SumAggregationGroup struct { histogramPointCount int } -func (g *SumAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, steps int, start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc functions.EmitAnnotationFunc) error { +func (g *SumAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc functions.EmitAnnotationFunc) error { defer types.PutInstantVectorSeriesData(data, memoryConsumptionTracker) if len(data.Floats) == 0 && len(data.Histograms) == 0 { // Nothing to do return nil } - err := g.accumulateFloats(data, steps, start, interval, memoryConsumptionTracker) + err := g.accumulateFloats(data, timeRange, memoryConsumptionTracker) if err != nil { return err } - err = g.accumulateHistograms(data, steps, start, interval, memoryConsumptionTracker, emitAnnotationFunc) + err = g.accumulateHistograms(data, timeRange, memoryConsumptionTracker, emitAnnotationFunc) if err != nil { return err } @@ -43,32 +43,32 @@ func (g *SumAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesDat return nil } -func (g *SumAggregationGroup) accumulateFloats(data types.InstantVectorSeriesData, steps int, start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) error { +func (g *SumAggregationGroup) accumulateFloats(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) error { var err error if len(data.Floats) > 0 && g.floatSums == nil { // First series with float values for this group, populate it. - g.floatSums, err = types.Float64SlicePool.Get(steps, memoryConsumptionTracker) + g.floatSums, err = types.Float64SlicePool.Get(timeRange.StepCount, memoryConsumptionTracker) if err != nil { return err } - g.floatCompensatingValues, err = types.Float64SlicePool.Get(steps, memoryConsumptionTracker) + g.floatCompensatingValues, err = types.Float64SlicePool.Get(timeRange.StepCount, memoryConsumptionTracker) if err != nil { return err } - g.floatPresent, err = types.BoolSlicePool.Get(steps, memoryConsumptionTracker) + g.floatPresent, err = types.BoolSlicePool.Get(timeRange.StepCount, memoryConsumptionTracker) if err != nil { return err } - g.floatSums = g.floatSums[:steps] - g.floatCompensatingValues = g.floatCompensatingValues[:steps] - g.floatPresent = g.floatPresent[:steps] + g.floatSums = g.floatSums[:timeRange.StepCount] + g.floatCompensatingValues = g.floatCompensatingValues[:timeRange.StepCount] + g.floatPresent = g.floatPresent[:timeRange.StepCount] } for _, p := range data.Floats { - idx := (p.T - start) / interval + idx := (p.T - timeRange.StartT) / timeRange.IntervalMs g.floatSums[idx], g.floatCompensatingValues[idx] = floats.KahanSumInc(p.F, g.floatSums[idx], g.floatCompensatingValues[idx]) g.floatPresent[idx] = true } @@ -76,21 +76,21 @@ func (g *SumAggregationGroup) accumulateFloats(data types.InstantVectorSeriesDat return nil } -func (g *SumAggregationGroup) accumulateHistograms(data types.InstantVectorSeriesData, steps int, start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc functions.EmitAnnotationFunc) error { +func (g *SumAggregationGroup) accumulateHistograms(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc functions.EmitAnnotationFunc) error { var err error var lastUncopiedHistogram *histogram.FloatHistogram if len(data.Histograms) > 0 && g.histogramSums == nil { // First series with histogram values for this group, populate it. - g.histogramSums, err = types.HistogramSlicePool.Get(steps, memoryConsumptionTracker) + g.histogramSums, err = types.HistogramSlicePool.Get(timeRange.StepCount, memoryConsumptionTracker) if err != nil { return err } - g.histogramSums = g.histogramSums[:steps] + g.histogramSums = g.histogramSums[:timeRange.StepCount] } for inputIdx, p := range data.Histograms { - outputIdx := (p.T - start) / interval + outputIdx := (p.T - timeRange.StartT) / timeRange.IntervalMs if g.histogramSums[outputIdx] == invalidCombinationOfHistograms { // We've already seen an invalid combination of histograms at this timestamp. Ignore this point. @@ -181,7 +181,7 @@ func (g *SumAggregationGroup) reconcileAndCountFloatPoints() (int, bool) { return floatPointCount, haveMixedFloatsAndHistograms } -func (g *SumAggregationGroup) ComputeOutputSeries(start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) { +func (g *SumAggregationGroup) ComputeOutputSeries(timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) { floatPointCount, hasMixedData := g.reconcileAndCountFloatPoints() var floatPoints []promql.FPoint var err error @@ -193,7 +193,7 @@ func (g *SumAggregationGroup) ComputeOutputSeries(start int64, interval int64, m for i, havePoint := range g.floatPresent { if havePoint { - t := start + int64(i)*interval + t := timeRange.StartT + int64(i)*timeRange.IntervalMs f := g.floatSums[i] + g.floatCompensatingValues[i] floatPoints = append(floatPoints, promql.FPoint{T: t, F: f}) } @@ -209,7 +209,7 @@ func (g *SumAggregationGroup) ComputeOutputSeries(start int64, interval int64, m for i, h := range g.histogramSums { if h != nil && h != invalidCombinationOfHistograms { - t := start + int64(i)*interval + t := timeRange.StartT + int64(i)*timeRange.IntervalMs histogramPoints = append(histogramPoints, promql.HPoint{T: t, H: h.Compact(0)}) } } diff --git a/pkg/streamingpromql/functions.go b/pkg/streamingpromql/functions.go index 9a7e9ef6b57..c1c13db72f5 100644 --- a/pkg/streamingpromql/functions.go +++ b/pkg/streamingpromql/functions.go @@ -27,7 +27,7 @@ type ScalarFunctionOperatorFactory func( memoryConsumptionTracker *limiting.MemoryConsumptionTracker, annotations *annotations.Annotations, expressionPosition posrange.PositionRange, - start, end, interval int64, + timeRange types.QueryTimeRange, ) (types.ScalarOperator, error) // SingleInputVectorFunctionOperatorFactory creates an InstantVectorFunctionOperatorFactory for functions @@ -206,16 +206,16 @@ func RegisterScalarFunctionOperatorFactory(functionName string, factory ScalarFu return nil } -func piOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, start, end, interval int64) (types.ScalarOperator, error) { +func piOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.ScalarOperator, error) { if len(args) != 0 { // Should be caught by the PromQL parser, but we check here for safety. return nil, fmt.Errorf("expected exactly 0 arguments for pi, got %v", len(args)) } - return operators.NewScalarConstant(math.Pi, start, end, interval, memoryConsumptionTracker, expressionPosition), nil + return operators.NewScalarConstant(math.Pi, timeRange, memoryConsumptionTracker, expressionPosition), nil } -func instantVectorToScalarOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, start, end, interval int64) (types.ScalarOperator, error) { +func instantVectorToScalarOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.ScalarOperator, error) { if len(args) != 1 { // Should be caught by the PromQL parser, but we check here for safety. return nil, fmt.Errorf("expected exactly 1 argument for scalar, got %v", len(args)) @@ -227,7 +227,7 @@ func instantVectorToScalarOperatorFactory(args []types.Operator, memoryConsumpti return nil, fmt.Errorf("expected an instant vector argument for scalar, got %T", args[0]) } - return operators.NewInstantVectorToScalar(inner, start, end, interval, memoryConsumptionTracker, expressionPosition), nil + return operators.NewInstantVectorToScalar(inner, timeRange, memoryConsumptionTracker, expressionPosition), nil } func unaryNegationOfInstantVectorOperatorFactory(inner types.InstantVectorOperator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, expressionPosition posrange.PositionRange) types.InstantVectorOperator { diff --git a/pkg/streamingpromql/operators/aggregation.go b/pkg/streamingpromql/operators/aggregation.go index b55aed74ab8..359d84152d5 100644 --- a/pkg/streamingpromql/operators/aggregation.go +++ b/pkg/streamingpromql/operators/aggregation.go @@ -27,10 +27,7 @@ import ( type Aggregation struct { Inner types.InstantVectorOperator - Start int64 // Milliseconds since Unix epoch - End int64 // Milliseconds since Unix epoch - Interval int64 // In milliseconds - Steps int + TimeRange types.QueryTimeRange Grouping []string // If this is a 'without' aggregation, NewAggregation will ensure that this slice contains __name__. Without bool MemoryConsumptionTracker *limiting.MemoryConsumptionTracker @@ -53,9 +50,7 @@ type Aggregation struct { func NewAggregation( inner types.InstantVectorOperator, - startT int64, - endT int64, - intervalMs int64, + timeRange types.QueryTimeRange, grouping []string, without bool, op parser.ItemType, @@ -79,10 +74,7 @@ func NewAggregation( a := &Aggregation{ Inner: inner, - Start: startT, - End: endT, - Interval: intervalMs, - Steps: stepCount(startT, endT, intervalMs), + TimeRange: timeRange, Grouping: grouping, Without: without, MemoryConsumptionTracker: memoryConsumptionTracker, @@ -270,7 +262,7 @@ func (a *Aggregation) NextSeries(ctx context.Context) (types.InstantVectorSeries } // Construct the group and return it - seriesData, hasMixedData, err := thisGroup.aggregation.ComputeOutputSeries(a.Start, a.Interval, a.MemoryConsumptionTracker) + seriesData, hasMixedData, err := thisGroup.aggregation.ComputeOutputSeries(a.TimeRange, a.MemoryConsumptionTracker) if err != nil { return types.InstantVectorSeriesData{}, err } @@ -300,7 +292,7 @@ func (a *Aggregation) accumulateUntilGroupComplete(ctx context.Context, g *group thisSeriesGroup := a.remainingInnerSeriesToGroup[0] a.remainingInnerSeriesToGroup = a.remainingInnerSeriesToGroup[1:] - if err := thisSeriesGroup.aggregation.AccumulateSeries(s, a.Steps, a.Start, a.Interval, a.MemoryConsumptionTracker, a.emitAnnotationFunc); err != nil { + if err := thisSeriesGroup.aggregation.AccumulateSeries(s, a.TimeRange, a.MemoryConsumptionTracker, a.emitAnnotationFunc); err != nil { return err } thisSeriesGroup.remainingSeriesCount-- diff --git a/pkg/streamingpromql/operators/aggregation_test.go b/pkg/streamingpromql/operators/aggregation_test.go index 524fd42ff71..beaf605ea69 100644 --- a/pkg/streamingpromql/operators/aggregation_test.go +++ b/pkg/streamingpromql/operators/aggregation_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/promql/parser/posrange" "github.com/stretchr/testify/require" @@ -235,7 +236,7 @@ func TestAggregation_GroupLabelling(t *testing.T) { for name, testCase := range testCases { t.Run(name, func(t *testing.T) { - aggregator, err := NewAggregation(nil, 0, 0, 1, testCase.grouping, testCase.without, parser.SUM, nil, nil, posrange.PositionRange{}) + aggregator, err := NewAggregation(nil, types.NewInstantQueryTimeRange(timestamp.Time(0)), testCase.grouping, testCase.without, parser.SUM, nil, nil, posrange.PositionRange{}) require.NoError(t, err) bytesFunc, labelsFunc := aggregator.seriesToGroupFuncs() diff --git a/pkg/streamingpromql/operators/instant_vector_selector.go b/pkg/streamingpromql/operators/instant_vector_selector.go index 8282c58a50d..5e4ed31f57b 100644 --- a/pkg/streamingpromql/operators/instant_vector_selector.go +++ b/pkg/streamingpromql/operators/instant_vector_selector.go @@ -25,8 +25,6 @@ type InstantVectorSelector struct { Selector *Selector MemoryConsumptionTracker *limiting.MemoryConsumptionTracker - numSteps int - chunkIterator chunkenc.Iterator memoizedIterator *storage.MemoizedSeriesIterator } @@ -38,9 +36,6 @@ func (v *InstantVectorSelector) ExpressionPosition() posrange.PositionRange { } func (v *InstantVectorSelector) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata, error) { - // Compute value we need on every call to NextSeries() once, here. - v.numSteps = stepCount(v.Selector.Start, v.Selector.End, v.Selector.Interval) - return v.Selector.SeriesMetadata(ctx) } @@ -69,7 +64,7 @@ func (v *InstantVectorSelector) NextSeries(ctx context.Context) (types.InstantVe lastHistogramT := int64(math.MinInt64) var lastHistogram *histogram.FloatHistogram - for stepT := v.Selector.Start; stepT <= v.Selector.End; stepT += v.Selector.Interval { + for stepT := v.Selector.TimeRange.StartT; stepT <= v.Selector.TimeRange.EndT; stepT += v.Selector.TimeRange.IntervalMs { var t int64 var f float64 var h *histogram.FloatHistogram @@ -137,7 +132,7 @@ func (v *InstantVectorSelector) NextSeries(ctx context.Context) (types.InstantVe // Only create the slice once we know the series is a histogram or not. // (It is possible to over-allocate in the case where we have both floats and histograms, but that won't be common). var err error - if data.Histograms, err = types.HPointSlicePool.Get(v.numSteps, v.MemoryConsumptionTracker); err != nil { + if data.Histograms, err = types.HPointSlicePool.Get(v.Selector.TimeRange.StepCount, v.MemoryConsumptionTracker); err != nil { return types.InstantVectorSeriesData{}, err } } @@ -148,7 +143,7 @@ func (v *InstantVectorSelector) NextSeries(ctx context.Context) (types.InstantVe if len(data.Floats) == 0 { // Only create the slice once we know the series is a histogram or not var err error - if data.Floats, err = types.FPointSlicePool.Get(v.numSteps, v.MemoryConsumptionTracker); err != nil { + if data.Floats, err = types.FPointSlicePool.Get(v.Selector.TimeRange.StepCount, v.MemoryConsumptionTracker); err != nil { return types.InstantVectorSeriesData{}, err } } diff --git a/pkg/streamingpromql/operators/instant_vector_selector_test.go b/pkg/streamingpromql/operators/instant_vector_selector_test.go index 9248ad048e0..42f6fdf5503 100644 --- a/pkg/streamingpromql/operators/instant_vector_selector_test.go +++ b/pkg/streamingpromql/operators/instant_vector_selector_test.go @@ -9,12 +9,12 @@ import ( "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/promqltest" "github.com/stretchr/testify/require" "github.com/grafana/mimir/pkg/streamingpromql/limiting" + "github.com/grafana/mimir/pkg/streamingpromql/types" ) func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) { @@ -218,9 +218,7 @@ func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) { selector := &InstantVectorSelector{ Selector: &Selector{ Queryable: storage, - Start: timestamp.FromTime(startTime), - End: timestamp.FromTime(endTime), - Interval: time.Minute.Milliseconds(), + TimeRange: types.NewRangeQueryTimeRange(startTime, endTime, time.Minute), Matchers: []*labels.Matcher{ labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_metric"), }, diff --git a/pkg/streamingpromql/operators/instant_vector_to_scalar.go b/pkg/streamingpromql/operators/instant_vector_to_scalar.go index eabc84962c9..bbb21ca73ee 100644 --- a/pkg/streamingpromql/operators/instant_vector_to_scalar.go +++ b/pkg/streamingpromql/operators/instant_vector_to_scalar.go @@ -16,9 +16,7 @@ import ( // InstantVectorToScalar is an operator that implements the scalar() function. type InstantVectorToScalar struct { Inner types.InstantVectorOperator - Start int64 // Milliseconds since Unix epoch - End int64 // Milliseconds since Unix epoch - Interval int64 // In milliseconds + TimeRange types.QueryTimeRange MemoryConsumptionTracker *limiting.MemoryConsumptionTracker expressionPosition posrange.PositionRange @@ -28,17 +26,13 @@ var _ types.ScalarOperator = &InstantVectorToScalar{} func NewInstantVectorToScalar( inner types.InstantVectorOperator, - start int64, - end int64, - interval int64, + timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, expressionPosition posrange.PositionRange, ) *InstantVectorToScalar { return &InstantVectorToScalar{ Inner: inner, - Start: start, - End: end, - Interval: interval, + TimeRange: timeRange, MemoryConsumptionTracker: memoryConsumptionTracker, expressionPosition: expressionPosition, } @@ -50,21 +44,20 @@ func (i *InstantVectorToScalar) GetValues(ctx context.Context) (types.ScalarData return types.ScalarData{}, err } - stepCount := stepCount(i.Start, i.End, i.Interval) - seenPoint, err := types.BoolSlicePool.Get(stepCount, i.MemoryConsumptionTracker) + seenPoint, err := types.BoolSlicePool.Get(i.TimeRange.StepCount, i.MemoryConsumptionTracker) if err != nil { return types.ScalarData{}, err } defer types.BoolSlicePool.Put(seenPoint, i.MemoryConsumptionTracker) - seenPoint = seenPoint[:stepCount] + seenPoint = seenPoint[:i.TimeRange.StepCount] - output, err := types.FPointSlicePool.Get(stepCount, i.MemoryConsumptionTracker) + output, err := types.FPointSlicePool.Get(i.TimeRange.StepCount, i.MemoryConsumptionTracker) if err != nil { return types.ScalarData{}, err } - for t := i.Start; t <= i.End; t += i.Interval { + for t := i.TimeRange.StartT; t <= i.TimeRange.EndT; t += i.TimeRange.IntervalMs { output = append(output, promql.FPoint{ T: t, F: math.NaN(), @@ -78,7 +71,7 @@ func (i *InstantVectorToScalar) GetValues(ctx context.Context) (types.ScalarData } for _, p := range seriesData.Floats { - sampleIdx := (p.T - i.Start) / i.Interval + sampleIdx := (p.T - i.TimeRange.StartT) / i.TimeRange.IntervalMs if seenPoint[sampleIdx] { // We've already seen another point at this timestamp, so return NaN at this timestamp. diff --git a/pkg/streamingpromql/operators/range_vector_selector.go b/pkg/streamingpromql/operators/range_vector_selector.go index 79ae591c66a..117442ac6ca 100644 --- a/pkg/streamingpromql/operators/range_vector_selector.go +++ b/pkg/streamingpromql/operators/range_vector_selector.go @@ -22,10 +22,8 @@ type RangeVectorSelector struct { Selector *Selector rangeMilliseconds int64 - numSteps int - - chunkIterator chunkenc.Iterator - nextT int64 + chunkIterator chunkenc.Iterator + nextT int64 } var _ types.RangeVectorOperator = &RangeVectorSelector{} @@ -37,13 +35,12 @@ func (m *RangeVectorSelector) ExpressionPosition() posrange.PositionRange { func (m *RangeVectorSelector) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata, error) { // Compute value we need on every call to NextSeries() once, here. m.rangeMilliseconds = m.Selector.Range.Milliseconds() - m.numSteps = stepCount(m.Selector.Start, m.Selector.End, m.Selector.Interval) return m.Selector.SeriesMetadata(ctx) } func (m *RangeVectorSelector) StepCount() int { - return m.numSteps + return m.Selector.TimeRange.StepCount } func (m *RangeVectorSelector) Range() time.Duration { @@ -57,12 +54,12 @@ func (m *RangeVectorSelector) NextSeries(ctx context.Context) error { return err } - m.nextT = m.Selector.Start + m.nextT = m.Selector.TimeRange.StartT return nil } func (m *RangeVectorSelector) NextStepSamples(floats *types.FPointRingBuffer, histograms *types.HPointRingBuffer) (types.RangeVectorStepData, error) { - if m.nextT > m.Selector.End { + if m.nextT > m.Selector.TimeRange.EndT { return types.RangeVectorStepData{}, types.EOS } @@ -84,7 +81,7 @@ func (m *RangeVectorSelector) NextStepSamples(floats *types.FPointRingBuffer, hi return types.RangeVectorStepData{}, err } - m.nextT += m.Selector.Interval + m.nextT += m.Selector.TimeRange.IntervalMs return types.RangeVectorStepData{ StepT: stepT, diff --git a/pkg/streamingpromql/operators/scalar_constant.go b/pkg/streamingpromql/operators/scalar_constant.go index 5bd95e11404..a118d63c168 100644 --- a/pkg/streamingpromql/operators/scalar_constant.go +++ b/pkg/streamingpromql/operators/scalar_constant.go @@ -13,9 +13,7 @@ import ( type ScalarConstant struct { Value float64 - Start int64 // Milliseconds since Unix epoch - End int64 // Milliseconds since Unix epoch - Interval int64 // In milliseconds + TimeRange types.QueryTimeRange MemoryConsumptionTracker *limiting.MemoryConsumptionTracker expressionPosition posrange.PositionRange @@ -25,34 +23,29 @@ var _ types.ScalarOperator = &ScalarConstant{} func NewScalarConstant( value float64, - start int64, - end int64, - interval int64, + timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, expressionPosition posrange.PositionRange, ) *ScalarConstant { return &ScalarConstant{ Value: value, - Start: start, - End: end, - Interval: interval, + TimeRange: timeRange, MemoryConsumptionTracker: memoryConsumptionTracker, expressionPosition: expressionPosition, } } func (s *ScalarConstant) GetValues(_ context.Context) (types.ScalarData, error) { - numSteps := stepCount(s.Start, s.End, s.Interval) - samples, err := types.FPointSlicePool.Get(numSteps, s.MemoryConsumptionTracker) + samples, err := types.FPointSlicePool.Get(s.TimeRange.StepCount, s.MemoryConsumptionTracker) if err != nil { return types.ScalarData{}, err } - samples = samples[:numSteps] + samples = samples[:s.TimeRange.StepCount] - for step := 0; step < numSteps; step++ { - samples[step].T = s.Start + int64(step)*s.Interval + for step := 0; step < s.TimeRange.StepCount; step++ { + samples[step].T = s.TimeRange.StartT + int64(step)*s.TimeRange.IntervalMs samples[step].F = s.Value } diff --git a/pkg/streamingpromql/operators/selector.go b/pkg/streamingpromql/operators/selector.go index 63a22a3fa6c..d7a9d788aa7 100644 --- a/pkg/streamingpromql/operators/selector.go +++ b/pkg/streamingpromql/operators/selector.go @@ -18,10 +18,8 @@ import ( type Selector struct { Queryable storage.Queryable - Start int64 // Milliseconds since Unix epoch - End int64 // Milliseconds since Unix epoch + TimeRange types.QueryTimeRange Timestamp *int64 // Milliseconds since Unix epoch, only set if selector uses @ modifier (eg. metric{...} @ 123) - Interval int64 // In milliseconds Offset int64 // In milliseconds Matchers []*labels.Matcher @@ -48,8 +46,8 @@ func (s *Selector) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata, return nil, errors.New("invalid Selector configuration: both LookbackDelta and Range are non-zero") } - startTimestamp := s.Start - endTimestamp := s.End + startTimestamp := s.TimeRange.StartT + endTimestamp := s.TimeRange.EndT if s.Timestamp != nil { // Timestamp from @ modifier takes precedence over query evaluation timestamp. @@ -65,7 +63,7 @@ func (s *Selector) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata, hints := &storage.SelectHints{ Start: startTimestamp, End: endTimestamp, - Step: s.Interval, + Step: s.TimeRange.IntervalMs, Range: rangeMilliseconds, // Mimir doesn't use Grouping or By, so there's no need to include them here. diff --git a/pkg/streamingpromql/operators/time.go b/pkg/streamingpromql/operators/time.go deleted file mode 100644 index 259c2b4d97f..00000000000 --- a/pkg/streamingpromql/operators/time.go +++ /dev/null @@ -1,7 +0,0 @@ -// SPDX-License-Identifier: AGPL-3.0-only - -package operators - -func stepCount(start, end, interval int64) int { - return int((end-start)/interval) + 1 -} diff --git a/pkg/streamingpromql/operators/vector_scalar_binary_operation.go b/pkg/streamingpromql/operators/vector_scalar_binary_operation.go index 06a8daac8ef..2acb7e0c79d 100644 --- a/pkg/streamingpromql/operators/vector_scalar_binary_operation.go +++ b/pkg/streamingpromql/operators/vector_scalar_binary_operation.go @@ -26,12 +26,8 @@ type VectorScalarBinaryOperation struct { Op parser.ItemType MemoryConsumptionTracker *limiting.MemoryConsumptionTracker - start int64 // Milliseconds since Unix epoch - end int64 // Milliseconds since Unix epoch - interval int64 // In milliseconds - stepCount int - - opFunc vectorScalarBinaryOperationFunc + timeRange types.QueryTimeRange + opFunc vectorScalarBinaryOperationFunc expressionPosition posrange.PositionRange emitAnnotation functions.EmitAnnotationFunc @@ -46,9 +42,7 @@ func NewVectorScalarBinaryOperation( vector types.InstantVectorOperator, scalarIsLeftSide bool, op parser.ItemType, - start int64, - end int64, - interval int64, + timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, annotations *annotations.Annotations, expressionPosition posrange.PositionRange, @@ -65,11 +59,7 @@ func NewVectorScalarBinaryOperation( Op: op, MemoryConsumptionTracker: memoryConsumptionTracker, - start: start, - end: end, - interval: interval, - stepCount: stepCount(start, end, interval), - + timeRange: timeRange, expressionPosition: expressionPosition, } @@ -175,7 +165,7 @@ func (v *VectorScalarBinaryOperation) NextSeries(ctx context.Context) (types.Ins break } - scalarIdx := (t - v.start) / v.interval // Scalars always have a value at every step, so we can just compute the index of the corresponding scalar value from the timestamp. + scalarIdx := (t - v.timeRange.StartT) / v.timeRange.IntervalMs // Scalars always have a value at every step, so we can just compute the index of the corresponding scalar value from the timestamp. scalarValue := v.scalarData.Samples[scalarIdx].F f, h, ok, err := v.opFunc(scalarValue, vectorF, vectorH) diff --git a/pkg/streamingpromql/query.go b/pkg/streamingpromql/query.go index d2531f3896a..54033bbbe93 100644 --- a/pkg/streamingpromql/query.go +++ b/pkg/streamingpromql/query.go @@ -14,7 +14,6 @@ import ( "github.com/go-kit/log/level" "github.com/grafana/dskit/cancellation" "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/storage" @@ -44,9 +43,7 @@ type Query struct { memoryConsumptionTracker *limiting.MemoryConsumptionTracker annotations *annotations.Annotations - startT int64 // Start timestamp, in milliseconds since Unix epoch. - endT int64 // End timestamp, in milliseconds since Unix epoch. - intervalMs int64 // Range query interval, or 1 for instant queries. Note that this is deliberately different to statement.Interval for instant queries (where it is 0) to simplify some loop conditions. + timeRange types.QueryTimeRange result *promql.Result } @@ -83,21 +80,18 @@ func newQuery(ctx context.Context, queryable storage.Queryable, opts promql.Quer Interval: interval, // 0 for instant queries LookbackDelta: opts.LookbackDelta(), }, - - startT: timestamp.FromTime(start), - endT: timestamp.FromTime(end), - intervalMs: interval.Milliseconds(), // 1 for instant queries (set below) } if q.IsInstant() { - q.intervalMs = 1 - } + q.timeRange = types.NewInstantQueryTimeRange(start) + } else { + q.timeRange = types.NewRangeQueryTimeRange(start, end, interval) - if !q.IsInstant() { if expr.Type() != parser.ValueTypeVector && expr.Type() != parser.ValueTypeScalar { return nil, fmt.Errorf("query expression produces a %s, but expression for range queries must produce an instant vector or scalar", parser.DocumentedType(expr.Type())) } } + q.root, err = q.convertToOperator(expr) if err != nil { return nil, err @@ -139,10 +133,8 @@ func (q *Query) convertToInstantVectorOperator(expr parser.Expr) (types.InstantV MemoryConsumptionTracker: q.memoryConsumptionTracker, Selector: &operators.Selector{ Queryable: q.queryable, - Start: q.startT, - End: q.endT, + TimeRange: q.timeRange, Timestamp: e.Timestamp, - Interval: q.intervalMs, Offset: e.OriginalOffset.Milliseconds(), LookbackDelta: lookbackDelta, Matchers: e.LabelMatchers, @@ -166,9 +158,7 @@ func (q *Query) convertToInstantVectorOperator(expr parser.Expr) (types.InstantV return operators.NewAggregation( inner, - q.startT, - q.endT, - q.intervalMs, + q.timeRange, e.Grouping, e.Without, e.Op, @@ -219,7 +209,7 @@ func (q *Query) convertToInstantVectorOperator(expr parser.Expr) (types.InstantV scalarIsLeftSide := e.LHS.Type() == parser.ValueTypeScalar - o, err := operators.NewVectorScalarBinaryOperation(scalar, vector, scalarIsLeftSide, e.Op, q.startT, q.endT, q.intervalMs, q.memoryConsumptionTracker, q.annotations, e.PositionRange()) + o, err := operators.NewVectorScalarBinaryOperation(scalar, vector, scalarIsLeftSide, e.Op, q.timeRange, q.memoryConsumptionTracker, q.annotations, e.PositionRange()) if err != nil { return nil, err } @@ -308,10 +298,8 @@ func (q *Query) convertToRangeVectorOperator(expr parser.Expr) (types.RangeVecto return &operators.RangeVectorSelector{ Selector: &operators.Selector{ Queryable: q.queryable, - Start: q.startT, - End: q.endT, + TimeRange: q.timeRange, Timestamp: vectorSelector.Timestamp, - Interval: q.intervalMs, Offset: vectorSelector.OriginalOffset.Milliseconds(), Range: e.Range, Matchers: vectorSelector.LabelMatchers, @@ -342,9 +330,7 @@ func (q *Query) convertToScalarOperator(expr parser.Expr) (types.ScalarOperator, case *parser.NumberLiteral: o := operators.NewScalarConstant( e.Val, - q.startT, - q.endT, - q.intervalMs, + q.timeRange, q.memoryConsumptionTracker, e.PositionRange(), ) @@ -412,7 +398,7 @@ func (q *Query) convertFunctionCallToScalarOperator(e *parser.Call) (types.Scala args[i] = a } - return factory(args, q.memoryConsumptionTracker, q.annotations, e.PosRange, q.startT, q.endT, q.intervalMs) + return factory(args, q.memoryConsumptionTracker, q.annotations, e.PosRange, q.timeRange) } func (q *Query) IsInstant() bool { diff --git a/pkg/streamingpromql/types/data.go b/pkg/streamingpromql/types/data.go index 6ff8edb4559..7249a3da3af 100644 --- a/pkg/streamingpromql/types/data.go +++ b/pkg/streamingpromql/types/data.go @@ -6,8 +6,11 @@ package types import ( + "time" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/promql" ) @@ -151,3 +154,35 @@ func HasDuplicateSeries(metadata []SeriesMetadata) bool { return false } } + +type QueryTimeRange struct { + StartT int64 // Start timestamp, in milliseconds since Unix epoch. + EndT int64 // End timestamp, in milliseconds since Unix epoch. + IntervalMs int64 // Range query interval, or 1 for instant queries. Note that this is deliberately different to parser.EvalStmt.Interval for instant queries (where it is 0) to simplify some loop conditions. + + StepCount int // 1 for instant queries. +} + +func NewInstantQueryTimeRange(t time.Time) QueryTimeRange { + ts := timestamp.FromTime(t) + + return QueryTimeRange{ + StartT: ts, + EndT: ts, + IntervalMs: 1, + StepCount: 1, + } +} + +func NewRangeQueryTimeRange(start time.Time, end time.Time, interval time.Duration) QueryTimeRange { + startT := timestamp.FromTime(start) + endT := timestamp.FromTime(end) + intervalMs := interval.Milliseconds() + + return QueryTimeRange{ + StartT: startT, + EndT: endT, + IntervalMs: intervalMs, + StepCount: int((endT-startT)/intervalMs) + 1, + } +}