Skip to content

Commit

Permalink
fix: special case the return values from a sharded first/last_over_ti…
Browse files Browse the repository at this point in the history
…me query (#13578)

Signed-off-by: Callum Styan <callumstyan@gmail.com>
  • Loading branch information
cstyan committed Jul 25, 2024
1 parent bd20171 commit 29a37d5
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 29 deletions.
1 change: 0 additions & 1 deletion pkg/logql/downstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,6 @@ func (ev *DownstreamEvaluator) NewStepEvaluator(
return nil, fmt.Errorf("unexpected type (%s) uncoercible to StepEvaluator", data.Type())
}
}

return NewMergeLastOverTimeStepEvaluator(params, xs), nil
default:
return ev.defaultEvaluator.NewStepEvaluator(ctx, nextEvFactory, e, params)
Expand Down
60 changes: 59 additions & 1 deletion pkg/logql/downstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestMappingEquivalence(t *testing.T) {
regular := NewEngine(opts, q, NoLimits, log.NewNopLogger())
sharded := NewDownstreamEngine(opts, MockDownstreamer{regular}, NoLimits, log.NewNopLogger())

t.Run(tc.query, func(t *testing.T) {
t.Run(tc.query+"_range", func(t *testing.T) {
params, err := NewLiteralParams(
tc.query,
start,
Expand Down Expand Up @@ -125,6 +125,46 @@ func TestMappingEquivalence(t *testing.T) {
require.Equal(t, res.Data, shardedRes.Data)
}
})
t.Run(tc.query+"_instant", func(t *testing.T) {
// for an instant query we set the start and end to the same timestamp
// plus set step and interval to 0
params, err := NewLiteralParams(
tc.query,
time.Unix(0, int64(rounds+1)),
time.Unix(0, int64(rounds+1)),
0,
0,
logproto.FORWARD,
uint32(limit),
nil,
nil,
)
require.NoError(t, err)
qry := regular.Query(params)
ctx := user.InjectOrgID(context.Background(), "fake")

strategy := NewPowerOfTwoStrategy(ConstantShards(shards))
mapper := NewShardMapper(strategy, nilShardMetrics, tc.shardAgg)
_, _, mapped, err := mapper.Parse(params.GetExpression())
require.NoError(t, err)

shardedQry := sharded.Query(ctx, ParamsWithExpressionOverride{
Params: params,
ExpressionOverride: mapped,
})

res, err := qry.Exec(ctx)
require.NoError(t, err)

shardedRes, err := shardedQry.Exec(ctx)
require.NoError(t, err)

if tc.approximate {
approximatelyEqualsVector(t, res.Data.(promql.Vector), shardedRes.Data.(promql.Vector)) //, tc.realtiveError)
} else {
require.Equal(t, res.Data, shardedRes.Data)
}
})
}
}

Expand Down Expand Up @@ -579,6 +619,24 @@ func approximatelyEquals(t *testing.T, as, bs promql.Matrix) {
}
}

// approximatelyEqualsVector ensures two responses are approximately equal,
// up to 6 decimals precision per sample
func approximatelyEqualsVector(t *testing.T, as, bs promql.Vector) {
require.Len(t, bs, len(as))

for i := 0; i < len(as); i++ {
a := as[i]
b := bs[i]
require.Equal(t, a.Metric, b.Metric)

aSample := a.F
aSample = math.Round(aSample*1e6) / 1e6
bSample := b.F
bSample = math.Round(bSample*1e6) / 1e6
require.Equalf(t, aSample, bSample, "metric %s differs from %s at %d", a.Metric, b.Metric, i)
}
}

func relativeError(t *testing.T, expected, actual promql.Matrix, alpha float64) {
require.Len(t, actual, len(expected))

Expand Down
68 changes: 44 additions & 24 deletions pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,11 @@ func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_
case SampleVector:
maxSeriesCapture := func(id string) int { return q.limits.MaxQuerySeries(ctx, id) }
maxSeries := validation.SmallestPositiveIntPerTenant(tenantIDs, maxSeriesCapture)
return q.JoinSampleVector(next, vec, stepEvaluator, maxSeries)
mfl := false
if rae, ok := expr.(*syntax.RangeAggregationExpr); ok && (rae.Operation == syntax.OpRangeTypeFirstWithTimestamp || rae.Operation == syntax.OpRangeTypeLastWithTimestamp) {
mfl = true
}
return q.JoinSampleVector(next, vec, stepEvaluator, maxSeries, mfl)
case ProbabilisticQuantileVector:
return MergeQuantileSketchVector(next, vec, stepEvaluator, q.params)
default:
Expand All @@ -381,9 +385,31 @@ func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_
return nil, errors.New("unexpected empty result")
}

func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEvaluator, maxSeries int) (promql_parser.Value, error) {
seriesIndex := map[uint64]*promql.Series{}
func vectorsToSeries(vec promql.Vector, sm map[uint64]promql.Series) {
for _, p := range vec {
var (
series promql.Series
hash = p.Metric.Hash()
ok bool
)

series, ok = sm[hash]
if !ok {
series = promql.Series{
Metric: p.Metric,
Floats: make([]promql.FPoint, 0, 1),
}
sm[hash] = series
}
series.Floats = append(series.Floats, promql.FPoint{
T: p.T,
F: p.F,
})
sm[hash] = series
}
}

func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEvaluator, maxSeries int, mergeFirstLast bool) (promql_parser.Value, error) {
vec := promql.Vector{}
if next {
vec = r.SampleVector()
Expand All @@ -393,8 +419,21 @@ func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEval
if len(vec) > maxSeries {
return nil, logqlmodel.NewSeriesLimitError(maxSeries)
}
seriesIndex := map[uint64]promql.Series{}

if GetRangeType(q.params) == InstantType {
// an instant query sharded first/last_over_time can return a single vector
if mergeFirstLast {
vectorsToSeries(vec, seriesIndex)
series := make([]promql.Series, 0, len(seriesIndex))
for _, s := range seriesIndex {
series = append(series, s)
}
result := promql.Matrix(series)
sort.Sort(result)
return result, stepEvaluator.Error()
}

sortByValue, err := Sortable(q.params)
if err != nil {
return nil, fmt.Errorf("fail to check Sortable, logql: %s ,err: %s", q.params.QueryString(), err)
Expand All @@ -412,26 +451,7 @@ func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEval

for next {
vec = r.SampleVector()
for _, p := range vec {
var (
series *promql.Series
hash = p.Metric.Hash()
ok bool
)

series, ok = seriesIndex[hash]
if !ok {
series = &promql.Series{
Metric: p.Metric,
Floats: make([]promql.FPoint, 0, stepCount),
}
seriesIndex[hash] = series
}
series.Floats = append(series.Floats, promql.FPoint{
T: p.T,
F: p.F,
})
}
vectorsToSeries(vec, seriesIndex)
// as we slowly build the full query for each steps, make sure we don't go over the limit of unique series.
if len(seriesIndex) > maxSeries {
return nil, logqlmodel.NewSeriesLimitError(maxSeries)
Expand All @@ -444,7 +464,7 @@ func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEval

series := make([]promql.Series, 0, len(seriesIndex))
for _, s := range seriesIndex {
series = append(series, *s)
series = append(series, s)
}
result := promql.Matrix(series)
sort.Sort(result)
Expand Down
7 changes: 5 additions & 2 deletions pkg/logql/first_last_over_time.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ type mergeOverTimeStepEvaluator struct {

// Next returns the first or last element within one step of each matrix.
func (e *mergeOverTimeStepEvaluator) Next() (bool, int64, StepResult) {
var vec promql.Vector
vec := promql.Vector{}

e.ts = e.ts.Add(e.step)
if e.ts.After(e.end) {
Expand All @@ -142,7 +142,6 @@ func (e *mergeOverTimeStepEvaluator) Next() (bool, int64, StepResult) {
// Merge other results
for i, m := range e.matrices {
for j, series := range m {

if len(series.Floats) == 0 || !e.inRange(series.Floats[0].T, ts) {
continue
}
Expand Down Expand Up @@ -171,6 +170,10 @@ func (e *mergeOverTimeStepEvaluator) pop(r, s int) {

// inRange returns true if t is in step range of ts.
func (e *mergeOverTimeStepEvaluator) inRange(t, ts int64) bool {
// special case instant queries
if e.step.Milliseconds() == 0 {
return true
}
return (ts-e.step.Milliseconds()) <= t && t < ts
}

Expand Down
1 change: 0 additions & 1 deletion pkg/logql/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,6 @@ func (m MockDownstreamer) Downstream(ctx context.Context, queries []DownstreamQu
if err != nil {
return nil, err
}

results = append(results, res)
}

Expand Down

0 comments on commit 29a37d5

Please sign in to comment.