Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mimir query engine: extract query time range values to their own type #9280

Merged
merged 6 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
* [CHANGE] Query-scheduler: Remove the experimental `-query-scheduler.use-multi-algorithm-query-queue` flag. The new multi-algorithm tree queue is always used for the scheduler. #9210
* [FEATURE] Alertmanager: Added `-alertmanager.log-parsing-label-matchers` to control logging when parsing label matchers. This flag is intended to be used with `-alertmanager.utf8-strict-mode-enabled` to validate UTF-8 strict mode is working as intended. The default value is `false`. #9173
* [FEATURE] Alertmanager: Added `-alertmanager.utf8-migration-logging-enabled` to enable logging of tenant configurations that are incompatible with UTF-8 strict mode. The default value is `false`. #9174
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9192 #9194 #9196 #9201 #9212 #9225 #9260 #9272 #9277 #9278
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9192 #9194 #9196 #9201 #9212 #9225 #9260 #9272 #9277 #9278 #9280
* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8841 #8842 #8845 #8853 #8886 #8988
* What it is:
* When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path.
Expand Down
7 changes: 5 additions & 2 deletions pkg/streamingpromql/aggregations/aggregations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ package aggregations

import (
"testing"
"time"

"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/promql"
"github.com/stretchr/testify/require"

Expand All @@ -26,6 +28,7 @@ func TestAggregationGroupNativeHistogramSafety(t *testing.T) {
for name, group := range groups {
t.Run(name, func(t *testing.T) {
memoryConsumptionTracker := limiting.NewMemoryConsumptionTracker(0, nil)
timeRange := types.NewRangeQueryTimeRange(timestamp.Time(0), timestamp.Time(4), time.Millisecond)

// First series: all histograms should be nil-ed out after returning, as they're all retained for use.
histograms, err := types.HPointSlicePool.Get(4, memoryConsumptionTracker)
Expand All @@ -40,7 +43,7 @@ func TestAggregationGroupNativeHistogramSafety(t *testing.T) {
histograms = append(histograms, promql.HPoint{T: 4, H: h3})
series := types.InstantVectorSeriesData{Histograms: histograms}

require.NoError(t, group.AccumulateSeries(series, 5, 0, 1, memoryConsumptionTracker, nil))
require.NoError(t, group.AccumulateSeries(series, timeRange, memoryConsumptionTracker, nil))
require.Equal(t, []promql.HPoint{{T: 0, H: nil}, {T: 1, H: nil}, {T: 2, H: nil}, {T: 4, H: nil}}, series.Histograms, "all histograms retained should be nil-ed out after accumulating series")

// Second series: all histograms that are not retained should be nil-ed out after returning.
Expand All @@ -56,7 +59,7 @@ func TestAggregationGroupNativeHistogramSafety(t *testing.T) {
histograms = append(histograms, promql.HPoint{T: 4, H: h6})
series = types.InstantVectorSeriesData{Histograms: histograms}

require.NoError(t, group.AccumulateSeries(series, 5, 0, 1, memoryConsumptionTracker, nil))
require.NoError(t, group.AccumulateSeries(series, timeRange, memoryConsumptionTracker, nil))

expected := []promql.HPoint{
{T: 0, H: h4}, // h4 not retained (added to h1)
Expand Down
48 changes: 24 additions & 24 deletions pkg/streamingpromql/aggregations/avg.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type AvgAggregationGroup struct {
groupSeriesCounts []float64
}

func (g *AvgAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, steps int, start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc functions.EmitAnnotationFunc) error {
func (g *AvgAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc functions.EmitAnnotationFunc) error {
defer types.PutInstantVectorSeriesData(data, memoryConsumptionTracker)
if len(data.Floats) == 0 && len(data.Histograms) == 0 {
// Nothing to do
Expand All @@ -42,51 +42,51 @@ func (g *AvgAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesDat
var err error

if g.groupSeriesCounts == nil {
g.groupSeriesCounts, err = types.Float64SlicePool.Get(steps, memoryConsumptionTracker)
g.groupSeriesCounts, err = types.Float64SlicePool.Get(timeRange.StepCount, memoryConsumptionTracker)
if err != nil {
return err
}
g.groupSeriesCounts = g.groupSeriesCounts[:steps]
g.groupSeriesCounts = g.groupSeriesCounts[:timeRange.StepCount]
}

err = g.accumulateFloats(data, steps, start, interval, memoryConsumptionTracker)
err = g.accumulateFloats(data, timeRange, memoryConsumptionTracker)
if err != nil {
return err
}
err = g.accumulateHistograms(data, steps, start, interval, memoryConsumptionTracker, emitAnnotationFunc)
err = g.accumulateHistograms(data, timeRange, memoryConsumptionTracker, emitAnnotationFunc)
if err != nil {
return err
}

return nil
}

func (g *AvgAggregationGroup) accumulateFloats(data types.InstantVectorSeriesData, steps int, start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) error {
func (g *AvgAggregationGroup) accumulateFloats(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) error {
var err error
if len(data.Floats) > 0 && g.floats == nil {
// First series with float values for this group, populate it.
g.floats, err = types.Float64SlicePool.Get(steps, memoryConsumptionTracker)
g.floats, err = types.Float64SlicePool.Get(timeRange.StepCount, memoryConsumptionTracker)
if err != nil {
return err
}

g.floatCompensatingMeans, err = types.Float64SlicePool.Get(steps, memoryConsumptionTracker)
g.floatCompensatingMeans, err = types.Float64SlicePool.Get(timeRange.StepCount, memoryConsumptionTracker)
if err != nil {
return err
}

g.floatPresent, err = types.BoolSlicePool.Get(steps, memoryConsumptionTracker)
g.floatPresent, err = types.BoolSlicePool.Get(timeRange.StepCount, memoryConsumptionTracker)
if err != nil {
return err
}

g.floats = g.floats[:steps]
g.floatCompensatingMeans = g.floatCompensatingMeans[:steps]
g.floatPresent = g.floatPresent[:steps]
g.floats = g.floats[:timeRange.StepCount]
g.floatCompensatingMeans = g.floatCompensatingMeans[:timeRange.StepCount]
g.floatPresent = g.floatPresent[:timeRange.StepCount]
}

for _, p := range data.Floats {
idx := (p.T - start) / interval
idx := (p.T - timeRange.StartT) / timeRange.IntervalMs
g.groupSeriesCounts[idx]++
if !g.floatPresent[idx] {
// The first point is just taken as the value
Expand All @@ -108,19 +108,19 @@ func (g *AvgAggregationGroup) accumulateFloats(data types.InstantVectorSeriesDat
// instead of continuing to sum up, we revert to incremental
// calculation of the mean value from here on.
if g.floatMeans == nil {
g.floatMeans, err = types.Float64SlicePool.Get(steps, memoryConsumptionTracker)
g.floatMeans, err = types.Float64SlicePool.Get(timeRange.StepCount, memoryConsumptionTracker)
if err != nil {
return err
}
g.floatMeans = g.floatMeans[:steps]
g.floatMeans = g.floatMeans[:timeRange.StepCount]
}
if g.incrementalMeans == nil {
// First time we are using an incremental mean. Track which samples will be incremental.
g.incrementalMeans, err = types.BoolSlicePool.Get(steps, memoryConsumptionTracker)
g.incrementalMeans, err = types.BoolSlicePool.Get(timeRange.StepCount, memoryConsumptionTracker)
if err != nil {
return err
}
g.incrementalMeans = g.incrementalMeans[:steps]
g.incrementalMeans = g.incrementalMeans[:timeRange.StepCount]
}
g.incrementalMeans[idx] = true
g.floatMeans[idx] = g.floats[idx] / (g.groupSeriesCounts[idx] - 1)
Expand Down Expand Up @@ -153,21 +153,21 @@ func (g *AvgAggregationGroup) accumulateFloats(data types.InstantVectorSeriesDat
return nil
}

func (g *AvgAggregationGroup) accumulateHistograms(data types.InstantVectorSeriesData, steps int, start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc functions.EmitAnnotationFunc) error {
func (g *AvgAggregationGroup) accumulateHistograms(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc functions.EmitAnnotationFunc) error {
var err error
if len(data.Histograms) > 0 && g.histograms == nil {
// First series with histogram values for this group, populate it.
g.histograms, err = types.HistogramSlicePool.Get(steps, memoryConsumptionTracker)
g.histograms, err = types.HistogramSlicePool.Get(timeRange.StepCount, memoryConsumptionTracker)
if err != nil {
return err
}
g.histograms = g.histograms[:steps]
g.histograms = g.histograms[:timeRange.StepCount]
}

var lastUncopiedHistogram *histogram.FloatHistogram

for inputIdx, p := range data.Histograms {
outputIdx := (p.T - start) / interval
outputIdx := (p.T - timeRange.StartT) / timeRange.IntervalMs
g.groupSeriesCounts[outputIdx]++

if g.histograms[outputIdx] == invalidCombinationOfHistograms {
Expand Down Expand Up @@ -280,7 +280,7 @@ func (g *AvgAggregationGroup) reconcileAndCountFloatPoints() (int, bool) {
return floatPointCount, haveMixedFloatsAndHistograms
}

func (g *AvgAggregationGroup) ComputeOutputSeries(start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) {
func (g *AvgAggregationGroup) ComputeOutputSeries(timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) {
floatPointCount, hasMixedData := g.reconcileAndCountFloatPoints()
var floatPoints []promql.FPoint
var err error
Expand All @@ -293,7 +293,7 @@ func (g *AvgAggregationGroup) ComputeOutputSeries(start int64, interval int64, m

for i, havePoint := range g.floatPresent {
if havePoint {
t := start + int64(i)*interval
t := timeRange.StartT + int64(i)*timeRange.IntervalMs
var f float64
if g.incrementalMeans != nil && g.incrementalMeans[i] {
f = g.floatMeans[i] + g.floatCompensatingMeans[i]
Expand All @@ -314,7 +314,7 @@ func (g *AvgAggregationGroup) ComputeOutputSeries(start int64, interval int64, m

for i, h := range g.histograms {
if h != nil && h != invalidCombinationOfHistograms {
t := start + int64(i)*interval
t := timeRange.StartT + int64(i)*timeRange.IntervalMs
histogramPoints = append(histogramPoints, promql.HPoint{T: t, H: h.Compact(0)})
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/streamingpromql/aggregations/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import (
// AggregationGroup accumulates series that have been grouped together and computes the output series data.
type AggregationGroup interface {
// AccumulateSeries takes in a series as part of the group
AccumulateSeries(data types.InstantVectorSeriesData, steps int, start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc functions.EmitAnnotationFunc) error
AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, emitAnnotationFunc functions.EmitAnnotationFunc) error
// ComputeOutputSeries does any final calculations and returns the grouped series data
ComputeOutputSeries(start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error)
ComputeOutputSeries(timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error)
}

type AggregationGroupFactory func() AggregationGroup
Expand Down
18 changes: 9 additions & 9 deletions pkg/streamingpromql/aggregations/min_max.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,43 +49,43 @@ func (g *MinMaxAggregationGroup) minAccumulatePoint(idx int64, f float64) {
}
}

func (g *MinMaxAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, steps int, start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ functions.EmitAnnotationFunc) error {
func (g *MinMaxAggregationGroup) AccumulateSeries(data types.InstantVectorSeriesData, timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ functions.EmitAnnotationFunc) error {
if (len(data.Floats) > 0 || len(data.Histograms) > 0) && g.floatValues == nil {
// Even if we only have histograms, we have to populate the float slices, as we'll treat histograms as if they have value 0.
// This is consistent with Prometheus but may not be the desired value: https://github.com/prometheus/prometheus/issues/14711

var err error
// First series with float values for this group, populate it.
g.floatValues, err = types.Float64SlicePool.Get(steps, memoryConsumptionTracker)
g.floatValues, err = types.Float64SlicePool.Get(timeRange.StepCount, memoryConsumptionTracker)
if err != nil {
return err
}

g.floatPresent, err = types.BoolSlicePool.Get(steps, memoryConsumptionTracker)
g.floatPresent, err = types.BoolSlicePool.Get(timeRange.StepCount, memoryConsumptionTracker)
if err != nil {
return err
}
g.floatValues = g.floatValues[:steps]
g.floatPresent = g.floatPresent[:steps]
g.floatValues = g.floatValues[:timeRange.StepCount]
g.floatPresent = g.floatPresent[:timeRange.StepCount]
}

for _, p := range data.Floats {
idx := (p.T - start) / interval
idx := (p.T - timeRange.StartT) / timeRange.IntervalMs
g.accumulatePoint(idx, p.F)
}

// If a histogram exists max treats it as 0. We have to detect this here so that we return a 0 value instead of nothing.
// This is consistent with Prometheus but may not be the desired value: https://github.com/prometheus/prometheus/issues/14711
for _, p := range data.Histograms {
idx := (p.T - start) / interval
idx := (p.T - timeRange.StartT) / timeRange.IntervalMs
g.accumulatePoint(idx, 0)
}

types.PutInstantVectorSeriesData(data, memoryConsumptionTracker)
return nil
}

func (g *MinMaxAggregationGroup) ComputeOutputSeries(start int64, interval int64, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) {
func (g *MinMaxAggregationGroup) ComputeOutputSeries(timeRange types.QueryTimeRange, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) (types.InstantVectorSeriesData, bool, error) {
floatPointCount := 0
for _, p := range g.floatPresent {
if p {
Expand All @@ -102,7 +102,7 @@ func (g *MinMaxAggregationGroup) ComputeOutputSeries(start int64, interval int64

for i, havePoint := range g.floatPresent {
if havePoint {
t := start + int64(i)*interval
t := timeRange.StartT + int64(i)*timeRange.IntervalMs
f := g.floatValues[i]
floatPoints = append(floatPoints, promql.FPoint{T: t, F: f})
}
Expand Down
Loading
Loading