Skip to content

Commit

Permalink
Move query time range information to its own type
Browse files Browse the repository at this point in the history
  • Loading branch information
charleskorn committed Sep 12, 2024
1 parent db75986 commit 58c24bb
Show file tree
Hide file tree
Showing 13 changed files with 92 additions and 122 deletions.
10 changes: 5 additions & 5 deletions pkg/streamingpromql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type ScalarFunctionOperatorFactory func(
memoryConsumptionTracker *limiting.MemoryConsumptionTracker,
annotations *annotations.Annotations,
expressionPosition posrange.PositionRange,
start, end, interval int64,
timeRange types.QueryTimeRange,
) (types.ScalarOperator, error)

// SingleInputVectorFunctionOperatorFactory creates an InstantVectorFunctionOperatorFactory for functions
Expand Down Expand Up @@ -206,16 +206,16 @@ func RegisterScalarFunctionOperatorFactory(functionName string, factory ScalarFu
return nil
}

func piOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, start, end, interval int64) (types.ScalarOperator, error) {
func piOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.ScalarOperator, error) {
if len(args) != 0 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 0 arguments for pi, got %v", len(args))
}

return operators.NewScalarConstant(math.Pi, start, end, interval, memoryConsumptionTracker, expressionPosition), nil
return operators.NewScalarConstant(math.Pi, timeRange, memoryConsumptionTracker, expressionPosition), nil
}

func instantVectorToScalarOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, start, end, interval int64) (types.ScalarOperator, error) {
func instantVectorToScalarOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, _ *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.ScalarOperator, error) {
if len(args) != 1 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 1 argument for scalar, got %v", len(args))
Expand All @@ -227,7 +227,7 @@ func instantVectorToScalarOperatorFactory(args []types.Operator, memoryConsumpti
return nil, fmt.Errorf("expected an instant vector argument for scalar, got %T", args[0])
}

return operators.NewInstantVectorToScalar(inner, start, end, interval, memoryConsumptionTracker, expressionPosition), nil
return operators.NewInstantVectorToScalar(inner, timeRange, memoryConsumptionTracker, expressionPosition), nil
}

func unaryNegationOfInstantVectorOperatorFactory(inner types.InstantVectorOperator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, expressionPosition posrange.PositionRange) types.InstantVectorOperator {
Expand Down
18 changes: 5 additions & 13 deletions pkg/streamingpromql/operators/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@ import (

type Aggregation struct {
Inner types.InstantVectorOperator
Start int64 // Milliseconds since Unix epoch
End int64 // Milliseconds since Unix epoch
Interval int64 // In milliseconds
Steps int
TimeRange types.QueryTimeRange
Grouping []string // If this is a 'without' aggregation, NewAggregation will ensure that this slice contains __name__.
Without bool
MemoryConsumptionTracker *limiting.MemoryConsumptionTracker
Expand All @@ -53,9 +50,7 @@ type Aggregation struct {

func NewAggregation(
inner types.InstantVectorOperator,
startT int64,
endT int64,
intervalMs int64,
timeRange types.QueryTimeRange,
grouping []string,
without bool,
op parser.ItemType,
Expand All @@ -79,10 +74,7 @@ func NewAggregation(

a := &Aggregation{
Inner: inner,
Start: startT,
End: endT,
Interval: intervalMs,
Steps: stepCount(startT, endT, intervalMs),
TimeRange: timeRange,
Grouping: grouping,
Without: without,
MemoryConsumptionTracker: memoryConsumptionTracker,
Expand Down Expand Up @@ -270,7 +262,7 @@ func (a *Aggregation) NextSeries(ctx context.Context) (types.InstantVectorSeries
}

// Construct the group and return it
seriesData, hasMixedData, err := thisGroup.aggregation.ComputeOutputSeries(a.Start, a.Interval, a.MemoryConsumptionTracker)
seriesData, hasMixedData, err := thisGroup.aggregation.ComputeOutputSeries(a.TimeRange.StartT, a.TimeRange.IntervalMs, a.MemoryConsumptionTracker)
if err != nil {
return types.InstantVectorSeriesData{}, err
}
Expand Down Expand Up @@ -300,7 +292,7 @@ func (a *Aggregation) accumulateUntilGroupComplete(ctx context.Context, g *group

thisSeriesGroup := a.remainingInnerSeriesToGroup[0]
a.remainingInnerSeriesToGroup = a.remainingInnerSeriesToGroup[1:]
if err := thisSeriesGroup.aggregation.AccumulateSeries(s, a.Steps, a.Start, a.Interval, a.MemoryConsumptionTracker, a.emitAnnotationFunc); err != nil {
if err := thisSeriesGroup.aggregation.AccumulateSeries(s, a.TimeRange.StepCount, a.TimeRange.StartT, a.TimeRange.IntervalMs, a.MemoryConsumptionTracker, a.emitAnnotationFunc); err != nil {
return err
}
thisSeriesGroup.remainingSeriesCount--
Expand Down
3 changes: 2 additions & 1 deletion pkg/streamingpromql/operators/aggregation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package operators

import (
"context"
"github.com/prometheus/prometheus/model/timestamp"
"testing"

"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -235,7 +236,7 @@ func TestAggregation_GroupLabelling(t *testing.T) {

for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
aggregator, err := NewAggregation(nil, 0, 0, 1, testCase.grouping, testCase.without, parser.SUM, nil, nil, posrange.PositionRange{})
aggregator, err := NewAggregation(nil, types.NewInstantQueryTimeRange(timestamp.Time(0)), testCase.grouping, testCase.without, parser.SUM, nil, nil, posrange.PositionRange{})
require.NoError(t, err)
bytesFunc, labelsFunc := aggregator.seriesToGroupFuncs()

Expand Down
11 changes: 3 additions & 8 deletions pkg/streamingpromql/operators/instant_vector_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ type InstantVectorSelector struct {
Selector *Selector
MemoryConsumptionTracker *limiting.MemoryConsumptionTracker

numSteps int

chunkIterator chunkenc.Iterator
memoizedIterator *storage.MemoizedSeriesIterator
}
Expand All @@ -38,9 +36,6 @@ func (v *InstantVectorSelector) ExpressionPosition() posrange.PositionRange {
}

func (v *InstantVectorSelector) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata, error) {
// Compute value we need on every call to NextSeries() once, here.
v.numSteps = stepCount(v.Selector.Start, v.Selector.End, v.Selector.Interval)

return v.Selector.SeriesMetadata(ctx)
}

Expand Down Expand Up @@ -69,7 +64,7 @@ func (v *InstantVectorSelector) NextSeries(ctx context.Context) (types.InstantVe
lastHistogramT := int64(math.MinInt64)
var lastHistogram *histogram.FloatHistogram

for stepT := v.Selector.Start; stepT <= v.Selector.End; stepT += v.Selector.Interval {
for stepT := v.Selector.TimeRange.StartT; stepT <= v.Selector.TimeRange.EndT; stepT += v.Selector.TimeRange.IntervalMs {
var t int64
var f float64
var h *histogram.FloatHistogram
Expand Down Expand Up @@ -137,7 +132,7 @@ func (v *InstantVectorSelector) NextSeries(ctx context.Context) (types.InstantVe
// Only create the slice once we know the series is a histogram or not.
// (It is possible to over-allocate in the case where we have both floats and histograms, but that won't be common).
var err error
if data.Histograms, err = types.HPointSlicePool.Get(v.numSteps, v.MemoryConsumptionTracker); err != nil {
if data.Histograms, err = types.HPointSlicePool.Get(v.Selector.TimeRange.StepCount, v.MemoryConsumptionTracker); err != nil {
return types.InstantVectorSeriesData{}, err
}
}
Expand All @@ -148,7 +143,7 @@ func (v *InstantVectorSelector) NextSeries(ctx context.Context) (types.InstantVe
if len(data.Floats) == 0 {
// Only create the slice once we know the series is a histogram or not
var err error
if data.Floats, err = types.FPointSlicePool.Get(v.numSteps, v.MemoryConsumptionTracker); err != nil {
if data.Floats, err = types.FPointSlicePool.Get(v.Selector.TimeRange.StepCount, v.MemoryConsumptionTracker); err != nil {
return types.InstantVectorSeriesData{}, err
}
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/streamingpromql/operators/instant_vector_selector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ package operators

import (
"context"
"github.com/grafana/mimir/pkg/streamingpromql/types"
"testing"
"time"

"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/promqltest"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -218,9 +218,7 @@ func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) {
selector := &InstantVectorSelector{
Selector: &Selector{
Queryable: storage,
Start: timestamp.FromTime(startTime),
End: timestamp.FromTime(endTime),
Interval: time.Minute.Milliseconds(),
TimeRange: types.NewRangeQueryTimeRange(startTime, endTime, time.Minute),
Matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "my_metric"),
},
Expand Down
23 changes: 8 additions & 15 deletions pkg/streamingpromql/operators/instant_vector_to_scalar.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ import (
// InstantVectorToScalar is an operator that implements the scalar() function.
type InstantVectorToScalar struct {
Inner types.InstantVectorOperator
Start int64 // Milliseconds since Unix epoch
End int64 // Milliseconds since Unix epoch
Interval int64 // In milliseconds
TimeRange types.QueryTimeRange
MemoryConsumptionTracker *limiting.MemoryConsumptionTracker

expressionPosition posrange.PositionRange
Expand All @@ -28,17 +26,13 @@ var _ types.ScalarOperator = &InstantVectorToScalar{}

func NewInstantVectorToScalar(
inner types.InstantVectorOperator,
start int64,
end int64,
interval int64,
timeRange types.QueryTimeRange,
memoryConsumptionTracker *limiting.MemoryConsumptionTracker,
expressionPosition posrange.PositionRange,
) *InstantVectorToScalar {
return &InstantVectorToScalar{
Inner: inner,
Start: start,
End: end,
Interval: interval,
TimeRange: timeRange,
MemoryConsumptionTracker: memoryConsumptionTracker,
expressionPosition: expressionPosition,
}
Expand All @@ -50,21 +44,20 @@ func (i *InstantVectorToScalar) GetValues(ctx context.Context) (types.ScalarData
return types.ScalarData{}, err
}

stepCount := stepCount(i.Start, i.End, i.Interval)
seenPoint, err := types.BoolSlicePool.Get(stepCount, i.MemoryConsumptionTracker)
seenPoint, err := types.BoolSlicePool.Get(i.TimeRange.StepCount, i.MemoryConsumptionTracker)
if err != nil {
return types.ScalarData{}, err
}

defer types.BoolSlicePool.Put(seenPoint, i.MemoryConsumptionTracker)
seenPoint = seenPoint[:stepCount]
seenPoint = seenPoint[:i.TimeRange.StepCount]

output, err := types.FPointSlicePool.Get(stepCount, i.MemoryConsumptionTracker)
output, err := types.FPointSlicePool.Get(i.TimeRange.StepCount, i.MemoryConsumptionTracker)
if err != nil {
return types.ScalarData{}, err
}

for t := i.Start; t <= i.End; t += i.Interval {
for t := i.TimeRange.StartT; t <= i.TimeRange.EndT; t += i.TimeRange.IntervalMs {
output = append(output, promql.FPoint{
T: t,
F: math.NaN(),
Expand All @@ -78,7 +71,7 @@ func (i *InstantVectorToScalar) GetValues(ctx context.Context) (types.ScalarData
}

for _, p := range seriesData.Floats {
sampleIdx := (p.T - i.Start) / i.Interval
sampleIdx := (p.T - i.TimeRange.StartT) / i.TimeRange.IntervalMs

if seenPoint[sampleIdx] {
// We've already seen another point at this timestamp, so return NaN at this timestamp.
Expand Down
15 changes: 6 additions & 9 deletions pkg/streamingpromql/operators/range_vector_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@ type RangeVectorSelector struct {
Selector *Selector

rangeMilliseconds int64
numSteps int

chunkIterator chunkenc.Iterator
nextT int64
chunkIterator chunkenc.Iterator
nextT int64
}

var _ types.RangeVectorOperator = &RangeVectorSelector{}
Expand All @@ -37,13 +35,12 @@ func (m *RangeVectorSelector) ExpressionPosition() posrange.PositionRange {
func (m *RangeVectorSelector) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata, error) {
// Compute value we need on every call to NextSeries() once, here.
m.rangeMilliseconds = m.Selector.Range.Milliseconds()
m.numSteps = stepCount(m.Selector.Start, m.Selector.End, m.Selector.Interval)

return m.Selector.SeriesMetadata(ctx)
}

func (m *RangeVectorSelector) StepCount() int {
return m.numSteps
return m.Selector.TimeRange.StepCount
}

func (m *RangeVectorSelector) Range() time.Duration {
Expand All @@ -57,12 +54,12 @@ func (m *RangeVectorSelector) NextSeries(ctx context.Context) error {
return err
}

m.nextT = m.Selector.Start
m.nextT = m.Selector.TimeRange.StartT
return nil
}

func (m *RangeVectorSelector) NextStepSamples(floats *types.FPointRingBuffer, histograms *types.HPointRingBuffer) (types.RangeVectorStepData, error) {
if m.nextT > m.Selector.End {
if m.nextT > m.Selector.TimeRange.EndT {
return types.RangeVectorStepData{}, types.EOS
}

Expand All @@ -84,7 +81,7 @@ func (m *RangeVectorSelector) NextStepSamples(floats *types.FPointRingBuffer, hi
return types.RangeVectorStepData{}, err
}

m.nextT += m.Selector.Interval
m.nextT += m.Selector.TimeRange.IntervalMs

return types.RangeVectorStepData{
StepT: stepT,
Expand Down
21 changes: 7 additions & 14 deletions pkg/streamingpromql/operators/scalar_constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ import (

type ScalarConstant struct {
Value float64
Start int64 // Milliseconds since Unix epoch
End int64 // Milliseconds since Unix epoch
Interval int64 // In milliseconds
TimeRange types.QueryTimeRange
MemoryConsumptionTracker *limiting.MemoryConsumptionTracker

expressionPosition posrange.PositionRange
Expand All @@ -25,34 +23,29 @@ var _ types.ScalarOperator = &ScalarConstant{}

func NewScalarConstant(
value float64,
start int64,
end int64,
interval int64,
timeRange types.QueryTimeRange,
memoryConsumptionTracker *limiting.MemoryConsumptionTracker,
expressionPosition posrange.PositionRange,
) *ScalarConstant {
return &ScalarConstant{
Value: value,
Start: start,
End: end,
Interval: interval,
TimeRange: timeRange,
MemoryConsumptionTracker: memoryConsumptionTracker,
expressionPosition: expressionPosition,
}
}

func (s *ScalarConstant) GetValues(_ context.Context) (types.ScalarData, error) {
numSteps := stepCount(s.Start, s.End, s.Interval)
samples, err := types.FPointSlicePool.Get(numSteps, s.MemoryConsumptionTracker)
samples, err := types.FPointSlicePool.Get(s.TimeRange.StepCount, s.MemoryConsumptionTracker)

if err != nil {
return types.ScalarData{}, err
}

samples = samples[:numSteps]
samples = samples[:s.TimeRange.StepCount]

for step := 0; step < numSteps; step++ {
samples[step].T = s.Start + int64(step)*s.Interval
for step := 0; step < s.TimeRange.StepCount; step++ {
samples[step].T = s.TimeRange.StartT + int64(step)*s.TimeRange.IntervalMs
samples[step].F = s.Value
}

Expand Down
10 changes: 4 additions & 6 deletions pkg/streamingpromql/operators/selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@ import (

type Selector struct {
Queryable storage.Queryable
Start int64 // Milliseconds since Unix epoch
End int64 // Milliseconds since Unix epoch
TimeRange types.QueryTimeRange
Timestamp *int64 // Milliseconds since Unix epoch, only set if selector uses @ modifier (eg. metric{...} @ 123)
Interval int64 // In milliseconds
Offset int64 // In milliseconds
Matchers []*labels.Matcher

Expand All @@ -48,8 +46,8 @@ func (s *Selector) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata,
return nil, errors.New("invalid Selector configuration: both LookbackDelta and Range are non-zero")
}

startTimestamp := s.Start
endTimestamp := s.End
startTimestamp := s.TimeRange.StartT
endTimestamp := s.TimeRange.EndT

if s.Timestamp != nil {
// Timestamp from @ modifier takes precedence over query evaluation timestamp.
Expand All @@ -65,7 +63,7 @@ func (s *Selector) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata,
hints := &storage.SelectHints{
Start: startTimestamp,
End: endTimestamp,
Step: s.Interval,
Step: s.TimeRange.IntervalMs,
Range: rangeMilliseconds,

// Mimir doesn't use Grouping or By, so there's no need to include them here.
Expand Down
7 changes: 0 additions & 7 deletions pkg/streamingpromql/operators/time.go

This file was deleted.

Loading

0 comments on commit 58c24bb

Please sign in to comment.