Skip to content

Commit

Permalink
Mimir query engine: extract query time range values to their own type (
Browse files Browse the repository at this point in the history
…#9280)

* Move query time range information to its own type

* Use `types.QueryTimeRange` in aggregations

* Add changelog entry

* Fix linting issues
  • Loading branch information
charleskorn authored Sep 12, 2024
1 parent 467bfda commit 0a06baa
Show file tree
Hide file tree
Showing 19 changed files with 152 additions and 178 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
* [CHANGE] Query-scheduler: Remove the experimental `-query-scheduler.use-multi-algorithm-query-queue` flag. The new multi-algorithm tree queue is always used for the scheduler. #9210
* [FEATURE] Alertmanager: Added `-alertmanager.log-parsing-label-matchers` to control logging when parsing label matchers. This flag is intended to be used with `-alertmanager.utf8-strict-mode-enabled` to validate UTF-8 strict mode is working as intended. The default value is `false`. #9173
* [FEATURE] Alertmanager: Added `-alertmanager.utf8-migration-logging-enabled` to enable logging of tenant configurations that are incompatible with UTF-8 strict mode. The default value is `false`. #9174
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9192 #9194 #9196 #9201 #9212 #9225 #9260 #9272 #9277 #9278
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9192 #9194 #9196 #9201 #9212 #9225 #9260 #9272 #9277 #9278 #9280
* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8841 #8842 #8845 #8853 #8886 #8988
* What it is:
* When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path.
Expand Down
7 changes: 5 additions & 2 deletions pkg/streamingpromql/aggregations/aggregations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ package aggregations

import (
"testing"
"time"

"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/promql"
"github.com/stretchr/testify/require"

Expand All @@ -26,6 +28,7 @@ func TestAggregationGroupNativeHistogramSafety(t *testing.T) {
for name, group := range groups {
t.Run(name, func(t *testing.T) {
memoryConsumptionTracker := limiting.NewMemoryConsumptionTracker(0, nil)
timeRange := types.NewRangeQueryTimeRange(timestamp.Time(0), timestamp.Time(4), time.Millisecond)

// First series: all histograms should be nil-ed out after returning, as they're all retained for use.
histograms, err := types.HPointSlicePool.Get(4, memoryConsumptionTracker)
Expand All @@ -40,7 +43,7 @@ func TestAggregationGroupNativeHistogramSafety(t *testing.T) {
histograms = append(histograms, promql.HPoint{T: 4, H: h3})
series := types.InstantVectorSeriesData{Histograms: histograms}

require.NoError(t, group.AccumulateSeries(series, 5, 0, 1, memoryConsumptionTracker, nil))
require.NoError(t, group.AccumulateSeries(series, timeRange, memoryConsumptionTracker, nil))
require.Equal(t, []promql.HPoint{{T: 0, H: nil}, {T: 1, H: nil}, {T: 2, H: nil}, {T: 4, H: nil}}, series.Histograms, "all histograms retained should be nil-ed out after accumulating series")

// Second series: all histograms that are not retained should be nil-ed out after returning.
Expand All @@ -56,7 +59,7 @@ func TestAggregationGroupNativeHistogramSafety(t *testing.T) {
histograms = append(histograms, promql.HPoint{T: 4, H: h6})
series = types.InstantVectorSeriesData{Histograms: histograms}

require.NoError(t, group.AccumulateSeries(series, 5, 0, 1, memoryConsumptionTracker, nil))
require.NoError(t, group.AccumulateSeries(series, timeRange, memoryConsumptionTracker, nil))

expected := []promql.HPoint{
{T: 0, H: h4}, // h4 not retained (added to h1)
Expand Down
48 changes: 24 additions & 24 deletions pkg/streamingpromql/aggregations/avg.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type AvgAggregationGroup struct {
groupSeriesCounts []float64
}

func (g *AvgAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, steps int, start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc functions.EmitAnnotationFunc) error {
func (g *AvgAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc functions.EmitAnnotationFunc) error {
defer types.PutInstantVectorSeriesData(data, memoryConsumptionTracker)
if len(data.Floats) == 0 && len(data.Histograms) == 0 {
// Nothing to do
Expand All @@ -42,51 +42,51 @@ func (g *AvgAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesDat
var err error

if g.groupSeriesCounts == nil {
g.groupSeriesCounts, err = types.Float64SlicePool.Get(steps, memoryConsumptionTracker)
g.groupSeriesCounts, err = types.Float64SlicePool.Get(timeRange.StepCount, memoryConsumptionTracker)
if err != nil {
return err
}
g.groupSeriesCounts = g.groupSeriesCounts[:steps]
g.groupSeriesCounts = g.groupSeriesCounts[:timeRange.StepCount]
}

err = g.accumulateFloats(data, steps, start, interval, memoryConsumptionTracker)
err = g.accumulateFloats(data, timeRange, memoryConsumptionTracker)
if err != nil {
return err
}
err = g.accumulateHistograms(data, steps, start, interval, memoryConsumptionTracker, emitAnnotationFunc)
err = g.accumulateHistograms(data, timeRange, memoryConsumptionTracker, emitAnnotationFunc)
if err != nil {
return err
}

return nil
}

func (g *AvgAggregationGroup) accumulateFloats(data types.InstantVectorSeriesData, steps int, start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) error {
func (g *AvgAggregationGroup) accumulateFloats(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) error {
var err error
if len(data.Floats) > 0 && g.floats == nil {
// First series with float values for this group, populate it.
g.floats, err = types.Float64SlicePool.Get(steps, memoryConsumptionTracker)
g.floats, err = types.Float64SlicePool.Get(timeRange.StepCount, memoryConsumptionTracker)
if err != nil {
return err
}

g.floatCompensatingMeans, err = types.Float64SlicePool.Get(steps, memoryConsumptionTracker)
g.floatCompensatingMeans, err = types.Float64SlicePool.Get(timeRange.StepCount, memoryConsumptionTracker)
if err != nil {
return err
}

g.floatPresent, err = types.BoolSlicePool.Get(steps, memoryConsumptionTracker)
g.floatPresent, err = types.BoolSlicePool.Get(timeRange.StepCount, memoryConsumptionTracker)
if err != nil {
return err
}

g.floats = g.floats[:steps]
g.floatCompensatingMeans = g.floatCompensatingMeans[:steps]
g.floatPresent = g.floatPresent[:steps]
g.floats = g.floats[:timeRange.StepCount]
g.floatCompensatingMeans = g.floatCompensatingMeans[:timeRange.StepCount]
g.floatPresent = g.floatPresent[:timeRange.StepCount]
}

for _, p := range data.Floats {
idx := (p.T - start) / interval
idx := (p.T - timeRange.StartT) / timeRange.IntervalMs
g.groupSeriesCounts[idx]++
if !g.floatPresent[idx] {
// The first point is just taken as the value
Expand All @@ -108,19 +108,19 @@ func (g *AvgAggregationGroup) accumulateFloats(data types.InstantVectorSeriesDat
// instead of continuing to sum up, we revert to incremental
// calculation of the mean value from here on.
if g.floatMeans == nil {
g.floatMeans, err = types.Float64SlicePool.Get(steps, memoryConsumptionTracker)
g.floatMeans, err = types.Float64SlicePool.Get(timeRange.StepCount, memoryConsumptionTracker)
if err != nil {
return err
}
g.floatMeans = g.floatMeans[:steps]
g.floatMeans = g.floatMeans[:timeRange.StepCount]
}
if g.incrementalMeans == nil {
// First time we are using an incremental mean. Track which samples will be incremental.
g.incrementalMeans, err = types.BoolSlicePool.Get(steps, memoryConsumptionTracker)
g.incrementalMeans, err = types.BoolSlicePool.Get(timeRange.StepCount, memoryConsumptionTracker)
if err != nil {
return err
}
g.incrementalMeans = g.incrementalMeans[:steps]
g.incrementalMeans = g.incrementalMeans[:timeRange.StepCount]
}
g.incrementalMeans[idx] = true
g.floatMeans[idx] = g.floats[idx] / (g.groupSeriesCounts[idx] - 1)
Expand Down Expand Up @@ -153,21 +153,21 @@ func (g *AvgAggregationGroup) accumulateFloats(data types.InstantVectorSeriesDat
return nil
}

func (g *AvgAggregationGroup) accumulateHistograms(data types.InstantVectorSeriesData, steps int, start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc functions.EmitAnnotationFunc) error {
func (g *AvgAggregationGroup) accumulateHistograms(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc functions.EmitAnnotationFunc) error {
var err error
if len(data.Histograms) > 0 && g.histograms == nil {
// First series with histogram values for this group, populate it.
g.histograms, err = types.HistogramSlicePool.Get(steps, memoryConsumptionTracker)
g.histograms, err = types.HistogramSlicePool.Get(timeRange.StepCount, memoryConsumptionTracker)
if err != nil {
return err
}
g.histograms = g.histograms[:steps]
g.histograms = g.histograms[:timeRange.StepCount]
}

var lastUncopiedHistogram *histogram.FloatHistogram

for inputIdx, p := range data.Histograms {
outputIdx := (p.T - start) / interval
outputIdx := (p.T - timeRange.StartT) / timeRange.IntervalMs
g.groupSeriesCounts[outputIdx]++

if g.histograms[outputIdx] == invalidCombinationOfHistograms {
Expand Down Expand Up @@ -280,7 +280,7 @@ func (g *AvgAggregationGroup) reconcileAndCountFloatPoints() (int, bool) {
return floatPointCount, haveMixedFloatsAndHistograms
}

func (g *AvgAggregationGroup) ComputeOutputSeries(start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) {
func (g *AvgAggregationGroup) ComputeOutputSeries(timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) {
floatPointCount, hasMixedData := g.reconcileAndCountFloatPoints()
var floatPoints []promql.FPoint
var err error
Expand All @@ -293,7 +293,7 @@ func (g *AvgAggregationGroup) ComputeOutputSeries(start int64, interval int64, m

for i, havePoint := range g.floatPresent {
if havePoint {
t := start + int64(i)*interval
t := timeRange.StartT + int64(i)*timeRange.IntervalMs
var f float64
if g.incrementalMeans != nil && g.incrementalMeans[i] {
f = g.floatMeans[i] + g.floatCompensatingMeans[i]
Expand All @@ -314,7 +314,7 @@ func (g *AvgAggregationGroup) ComputeOutputSeries(start int64, interval int64, m

for i, h := range g.histograms {
if h != nil && h != invalidCombinationOfHistograms {
t := start + int64(i)*interval
t := timeRange.StartT + int64(i)*timeRange.IntervalMs
histogramPoints = append(histogramPoints, promql.HPoint{T: t, H: h.Compact(0)})
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/streamingpromql/aggregations/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import (
// AggregationGroup accumulates series that have been grouped together and computes the output series data.
type AggregationGroup interface {
// AccumulateSeries takes in a series as part of the group
AccumulateSeries(data types.InstantVectorSeriesData, steps int, start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc functions.EmitAnnotationFunc) error
AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc functions.EmitAnnotationFunc) error
// ComputeOutputSeries does any final calculations and returns the grouped series data
ComputeOutputSeries(start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error)
ComputeOutputSeries(timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error)
}

type AggregationGroupFactory func() AggregationGroup
Expand Down
18 changes: 9 additions & 9 deletions pkg/streamingpromql/aggregations/min_max.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,43 +49,43 @@ func (g *MinMaxAggregationGroup) minAccumulatePoint(idx int64, f float64) {
}
}

func (g *MinMaxAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, steps int, start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ functions.EmitAnnotationFunc) error {
func (g *MinMaxAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ functions.EmitAnnotationFunc) error {
if (len(data.Floats) > 0 || len(data.Histograms) > 0) && g.floatValues == nil {
// Even if we only have histograms, we have to populate the float slices, as we'll treat histograms as if they have value 0.
// This is consistent with Prometheus but may not be the desired value: https://github.com/prometheus/prometheus/issues/14711

var err error
// First series with float values for this group, populate it.
g.floatValues, err = types.Float64SlicePool.Get(steps, memoryConsumptionTracker)
g.floatValues, err = types.Float64SlicePool.Get(timeRange.StepCount, memoryConsumptionTracker)
if err != nil {
return err
}

g.floatPresent, err = types.BoolSlicePool.Get(steps, memoryConsumptionTracker)
g.floatPresent, err = types.BoolSlicePool.Get(timeRange.StepCount, memoryConsumptionTracker)
if err != nil {
return err
}
g.floatValues = g.floatValues[:steps]
g.floatPresent = g.floatPresent[:steps]
g.floatValues = g.floatValues[:timeRange.StepCount]
g.floatPresent = g.floatPresent[:timeRange.StepCount]
}

for _, p := range data.Floats {
idx := (p.T - start) / interval
idx := (p.T - timeRange.StartT) / timeRange.IntervalMs
g.accumulatePoint(idx, p.F)
}

// If a histogram exists max treats it as 0. We have to detect this here so that we return a 0 value instead of nothing.
// This is consistent with Prometheus but may not be the desired value: https://github.com/prometheus/prometheus/issues/14711
for _, p := range data.Histograms {
idx := (p.T - start) / interval
idx := (p.T - timeRange.StartT) / timeRange.IntervalMs
g.accumulatePoint(idx, 0)
}

types.PutInstantVectorSeriesData(data, memoryConsumptionTracker)
return nil
}

func (g *MinMaxAggregationGroup) ComputeOutputSeries(start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) {
func (g *MinMaxAggregationGroup) ComputeOutputSeries(timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) {
floatPointCount := 0
for _, p := range g.floatPresent {
if p {
Expand All @@ -102,7 +102,7 @@ func (g *MinMaxAggregationGroup) ComputeOutputSeries(start int64, interval int64

for i, havePoint := range g.floatPresent {
if havePoint {
t := start + int64(i)*interval
t := timeRange.StartT + int64(i)*timeRange.IntervalMs
f := g.floatValues[i]
floatPoints = append(floatPoints, promql.FPoint{T: t, F: f})
}
Expand Down
Loading

0 comments on commit 0a06baa

Please sign in to comment.