Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge quantile sketches as they arrive #11544

Merged
merged 14 commits into from
Jan 4, 2024
Merged
28 changes: 14 additions & 14 deletions pkg/logql/downstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,12 @@ type DownstreamQuery struct {
Params Params
}

type Resp struct {
I int
Res logqlmodel.Result
Err error
}

// Downstreamer is an interface for deferring responsibility for query execution.
// It is decoupled from but consumed by a downStreamEvaluator to dispatch ASTs.
type Downstreamer interface {
Expand Down Expand Up @@ -375,24 +381,18 @@ func (ev *DownstreamEvaluator) NewStepEvaluator(

results, err := ev.Downstream(ctx, queries)
if err != nil {
return nil, fmt.Errorf("error running quantile sketch downstream query: %w", err)
return nil, err
}

xs := make([]StepEvaluator, 0, len(queries))
for _, res := range results {
if res.Data.Type() != QuantileSketchMatrixType {
return nil, fmt.Errorf("unexpected matrix data type: got (%s), want (%s)", res.Data.Type(), QuantileSketchMatrixType)
}
data, ok := res.Data.(ProbabilisticQuantileMatrix)
if !ok {
return nil, fmt.Errorf("unexpected matrix type: got (%T), want (ProbabilisticQuantileMatrix)", res.Data)
}
stepper := NewQuantileSketchMatrixStepEvaluator(data, params)
xs = append(xs, stepper)
if len(results) != 1 {
return nil, fmt.Errorf("unexpected results length for sharded quantile: got (%d), want (1)", len(results))
}

inner := NewQuantileSketchMergeStepEvaluator(xs)

matrix, ok := results[0].Data.(ProbabilisticQuantileMatrix)
if !ok {
return nil, fmt.Errorf("unexpected matrix type: got (%T), want (ProbabilisticQuantileMatrix)", results[0].Data)
}
inner := NewQuantileSketchMatrixStepEvaluator(matrix, params)
return NewQuantileSketchVectorStepEvaluator(inner, *e.quantile), nil

default:
Expand Down
29 changes: 28 additions & 1 deletion pkg/logql/quantile_over_time_sketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package logql
import (
"fmt"
"math"
"sync"
"time"

"github.com/prometheus/prometheus/model/labels"
Expand All @@ -23,9 +24,17 @@ const (
type ProbabilisticQuantileVector []ProbabilisticQuantileSample
type ProbabilisticQuantileMatrix []ProbabilisticQuantileVector

var streamHashPool = sync.Pool{
New: func() interface{} { return make(map[uint64]int) },
}

func (q ProbabilisticQuantileVector) Merge(right ProbabilisticQuantileVector) (ProbabilisticQuantileVector, error) {
// labels hash to vector index map
groups := make(map[uint64]int)
groups := streamHashPool.Get().(map[uint64]int)
defer func() {
clear(groups)
streamHashPool.Put(groups)
}()
for i, sample := range q {
groups[sample.Metric.Hash()] = i
}
Expand Down Expand Up @@ -80,6 +89,21 @@ func (ProbabilisticQuantileMatrix) String() string {
return "QuantileSketchMatrix()"
}

func (m ProbabilisticQuantileMatrix) Merge(right ProbabilisticQuantileMatrix) (ProbabilisticQuantileMatrix, error) {
if len(m) != len(right) {
return nil, fmt.Errorf("failed to merge probabilistic quantile matrix: lengths differ %d!=%d", len(m), len(right))
}
var err error
for i, vec := range m {
m[i], err = vec.Merge(right[i])
if err != nil {
return nil, fmt.Errorf("failed to merge probabilistic quantile matrix: %w", err)
}
}

return m, nil
}

func (ProbabilisticQuantileMatrix) Type() promql_parser.ValueType { return QuantileSketchMatrixType }

func (m ProbabilisticQuantileMatrix) Release() {
Expand Down Expand Up @@ -398,6 +422,9 @@ func NewQuantileSketchVectorStepEvaluator(inner StepEvaluator, quantile float64)

func (e *QuantileSketchVectorStepEvaluator) Next() (bool, int64, StepResult) {
ok, ts, r := e.inner.Next()
if !ok {
return false, 0, SampleVector{}
}
quantileSketchVec := r.QuantileSketchVec()

vec := make(promql.Vector, len(quantileSketchVec))
Expand Down
1 change: 1 addition & 0 deletions pkg/logql/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ func (m MockDownstreamer) Downstream(ctx context.Context, queries []DownstreamQu

results = append(results, res)
}
// TODO: use queryrange.instance.For
return results, nil
}

Expand Down
126 changes: 77 additions & 49 deletions pkg/querier/queryrange/downstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/go-kit/log/level"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/tenant"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -119,64 +120,59 @@ func (in instance) Downstream(ctx context.Context, queries []logql.DownstreamQue
})
}

// For runs a function against a list of queries, collecting the results or returning an error. The indices are preserved such that input[i] maps to output[i].
func (in instance) For(
func (in instance) AsyncFor(
ctx context.Context,
queries []logql.DownstreamQuery,
fn func(logql.DownstreamQuery) (logqlmodel.Result, error),
) ([]logqlmodel.Result, error) {
type resp struct {
i int
res logqlmodel.Result
err error
}
) chan logql.Resp {
ch := make(chan logql.Resp)

ctx, cancel := context.WithCancel(ctx)
defer cancel()
ch := make(chan resp)

// Make one goroutine to dispatch the other goroutines, bounded by instance parallelism
go func() {
for i := 0; i < len(queries); i++ {
err := concurrency.ForEachJob(ctx, len(queries), in.parallelism, func(ctx context.Context, i int) error {
res, err := fn(queries[i])
response := logql.Resp{
I: i,
Res: res,
Err: err,
}

// Feed the result into the channel unless the work has completed.
select {
case <-ctx.Done():
break
case <-in.locks:
go func(i int) {
// release lock back into pool
defer func() {
in.locks <- struct{}{}
}()

res, err := fn(queries[i])
response := resp{
i: i,
res: res,
err: err,
}

// Feed the result into the channel unless the work has completed.
select {
case <-ctx.Done():
case ch <- response:
}
}(i)
case ch <- response:
}
return err
})
if err != nil {
ch <- logql.Resp{
I: -1,
Err: err,
}
}
close(ch)
}()

return ch
}

// For runs a function against a list of queries, collecting the results or returning an error. The indices are preserved such that input[i] maps to output[i].
func (in instance) For(
ctx context.Context,
queries []logql.DownstreamQuery,
fn func(logql.DownstreamQuery) (logqlmodel.Result, error),
) ([]logqlmodel.Result, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

ch := in.AsyncFor(ctx, queries, fn)

acc := newDownstreamAccumulator(queries[0].Params, len(queries))
for i := 0; i < len(queries); i++ {
select {
case <-ctx.Done():
return nil, ctx.Err()
case resp := <-ch:
if resp.err != nil {
return nil, resp.err
}
if err := acc.Accumulate(ctx, resp.i, resp.res); err != nil {
return nil, err
}
for resp := range ch {
if resp.Err != nil {
return nil, resp.Err
}
if err := acc.Accumulate(ctx, resp.I, resp.Res); err != nil {
return nil, err
}
}
return acc.Result(), nil
Expand Down Expand Up @@ -248,7 +244,8 @@ func newDownstreamAccumulator(params logql.Params, nQueries int) *downstreamAccu
}

func (a *downstreamAccumulator) build(acc logqlmodel.Result) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not too fond of this design. We do know whether the results are samples, logs or quantile sketches. So ideally the logql engine would inject the accumulator on the execution plan. Moving this into the logql package would also make the MockQuerier simpler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems like a good change to me

if acc.Data.Type() == logqlmodel.ValueTypeStreams {
switch acc.Data.Type() {
case logqlmodel.ValueTypeStreams:

// the stream accumulator stores a heap with reversed order
// from the results we expect, so we need to reverse the direction
Expand All @@ -258,8 +255,9 @@ func (a *downstreamAccumulator) build(acc logqlmodel.Result) {
}

a.acc = newStreamAccumulator(direction, int(a.params.Limit()))

} else {
case logql.QuantileSketchMatrixType:
a.acc = newQuantileSketchAccumulator()
default:
a.acc = &bufferedAccumulator{
results: make([]logqlmodel.Result, a.n),
}
Expand Down Expand Up @@ -297,6 +295,36 @@ func (a *bufferedAccumulator) Result() []logqlmodel.Result {
return a.results
}

type quantileSketchAccumulator struct {
matrix logql.ProbabilisticQuantileMatrix
}

func newQuantileSketchAccumulator() *quantileSketchAccumulator {
return &quantileSketchAccumulator{}
}

func (a *quantileSketchAccumulator) Accumulate(res logqlmodel.Result, _ int) error {
if res.Data.Type() != logql.QuantileSketchMatrixType {
return fmt.Errorf("unexpected matrix data type: got (%s), want (%s)", res.Data.Type(), logql.QuantileSketchMatrixType)
}
data, ok := res.Data.(logql.ProbabilisticQuantileMatrix)
if !ok {
return fmt.Errorf("unexpected matrix type: got (%T), want (ProbabilisticQuantileMatrix)", res.Data)
}
if a.matrix == nil {
a.matrix = data
return nil
}

var err error
a.matrix, err = a.matrix.Merge(data)
return err
}

func (a *quantileSketchAccumulator) Result() []logqlmodel.Result {
return []logqlmodel.Result{{Data: a.matrix}}
}
Comment on lines +292 to +320
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be the place where we merge results?


// heap impl for keeping only the top n results across m streams
// importantly, accumulatedStreams is _bounded_, so it will only
// store the top `limit` results across all streams.
Expand Down
69 changes: 69 additions & 0 deletions pkg/querier/queryrange/downstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"errors"
"fmt"
"github.com/grafana/loki/pkg/logql/sketch"
"math/rand"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -598,3 +600,70 @@ func TestDownstreamAccumulatorMultiMerge(t *testing.T) {
})
}
}

func newRandomSketch() sketch.QuantileSketch {
r := rand.New(rand.NewSource(42))
s := sketch.NewDDSketch()
for i := 0; i < 1000; i++ {
_ = s.Add(r.Float64())
}
return s
}

func BenchmarkAccumulateMerge(b *testing.B) {
b.ReportAllocs()
results := make([]logql.ProbabilisticQuantileVector, 100)
for i := range results {
results[i] = make(logql.ProbabilisticQuantileVector, 100)
for j := range results[i] {
results[i][j] = logql.ProbabilisticQuantileSample{
T: int64(i),
F: newRandomSketch(),
Metric: []labels.Label{{Name: "foo", Value: fmt.Sprintf("bar-%d", j)}},
}
}
}
res := make([]logqlmodel.Result, 0, 100)
for _, r := range results {
res = append(res, logqlmodel.Result{
Data: logql.ProbabilisticQuantileMatrix([]logql.ProbabilisticQuantileVector{r}),
})
}
b.ResetTimer()
for n := 0; n < b.N; n++ {
acc := newQuantileSketchAccumulator()
for _, r := range res {
require.NoError(b, acc.Accumulate(r, 0))
}
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice. I was about to push my change and had a conflict. Your code was almost exactly as mine 🎉


func BenchmarkSingleMerge(b *testing.B) {
b.ReportAllocs()
results := make([]logql.ProbabilisticQuantileVector, 100)
for i := range results {
results[i] = make(logql.ProbabilisticQuantileVector, 100)
for j := range results[i] {
results[i][j] = logql.ProbabilisticQuantileSample{
T: int64(i),
F: newRandomSketch(),
Metric: []labels.Label{{Name: "foo", Value: fmt.Sprintf("bar-%d", j)}},
}
}
}

b.ResetTimer()
for n := 0; n < b.N; n++ {
// simulate receiving all sketches and keeping them in memory
res := make([]logql.ProbabilisticQuantileMatrix, 0, 100)
for _, r := range results {
res = append(res, logql.ProbabilisticQuantileMatrix{r})
}
var err error
final := res[0]
for i := 1; i < len(res); i++ {
_, err = final.Merge(res[i])
require.NoError(b, err)
}
}
}