Skip to content

Commit

Permalink
Merge quantile sketches as they arrive (grafana#11544)
Browse files Browse the repository at this point in the history
  • Loading branch information
jeschkies authored and rhnasc committed Apr 12, 2024
1 parent 592810f commit 5095cfc
Show file tree
Hide file tree
Showing 5 changed files with 212 additions and 61 deletions.
28 changes: 14 additions & 14 deletions pkg/logql/downstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,12 @@ type DownstreamQuery struct {
Params Params
}

type Resp struct {
I int
Res logqlmodel.Result
Err error
}

// Downstreamer is an interface for deferring responsibility for query execution.
// It is decoupled from but consumed by a downStreamEvaluator to dispatch ASTs.
type Downstreamer interface {
Expand Down Expand Up @@ -375,24 +381,18 @@ func (ev *DownstreamEvaluator) NewStepEvaluator(

results, err := ev.Downstream(ctx, queries)
if err != nil {
return nil, fmt.Errorf("error running quantile sketch downstream query: %w", err)
return nil, err
}

xs := make([]StepEvaluator, 0, len(queries))
for _, res := range results {
if res.Data.Type() != QuantileSketchMatrixType {
return nil, fmt.Errorf("unexpected matrix data type: got (%s), want (%s)", res.Data.Type(), QuantileSketchMatrixType)
}
data, ok := res.Data.(ProbabilisticQuantileMatrix)
if !ok {
return nil, fmt.Errorf("unexpected matrix type: got (%T), want (ProbabilisticQuantileMatrix)", res.Data)
}
stepper := NewQuantileSketchMatrixStepEvaluator(data, params)
xs = append(xs, stepper)
if len(results) != 1 {
return nil, fmt.Errorf("unexpected results length for sharded quantile: got (%d), want (1)", len(results))
}

inner := NewQuantileSketchMergeStepEvaluator(xs)

matrix, ok := results[0].Data.(ProbabilisticQuantileMatrix)
if !ok {
return nil, fmt.Errorf("unexpected matrix type: got (%T), want (ProbabilisticQuantileMatrix)", results[0].Data)
}
inner := NewQuantileSketchMatrixStepEvaluator(matrix, params)
return NewQuantileSketchVectorStepEvaluator(inner, *e.quantile), nil

default:
Expand Down
29 changes: 28 additions & 1 deletion pkg/logql/quantile_over_time_sketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package logql
import (
"fmt"
"math"
"sync"
"time"

"github.com/prometheus/prometheus/model/labels"
Expand All @@ -23,9 +24,17 @@ const (
type ProbabilisticQuantileVector []ProbabilisticQuantileSample
type ProbabilisticQuantileMatrix []ProbabilisticQuantileVector

var streamHashPool = sync.Pool{
New: func() interface{} { return make(map[uint64]int) },
}

func (q ProbabilisticQuantileVector) Merge(right ProbabilisticQuantileVector) (ProbabilisticQuantileVector, error) {
// labels hash to vector index map
groups := make(map[uint64]int)
groups := streamHashPool.Get().(map[uint64]int)
defer func() {
clear(groups)
streamHashPool.Put(groups)
}()
for i, sample := range q {
groups[sample.Metric.Hash()] = i
}
Expand Down Expand Up @@ -80,6 +89,21 @@ func (ProbabilisticQuantileMatrix) String() string {
return "QuantileSketchMatrix()"
}

func (m ProbabilisticQuantileMatrix) Merge(right ProbabilisticQuantileMatrix) (ProbabilisticQuantileMatrix, error) {
if len(m) != len(right) {
return nil, fmt.Errorf("failed to merge probabilistic quantile matrix: lengths differ %d!=%d", len(m), len(right))
}
var err error
for i, vec := range m {
m[i], err = vec.Merge(right[i])
if err != nil {
return nil, fmt.Errorf("failed to merge probabilistic quantile matrix: %w", err)
}
}

return m, nil
}

func (ProbabilisticQuantileMatrix) Type() promql_parser.ValueType { return QuantileSketchMatrixType }

func (m ProbabilisticQuantileMatrix) Release() {
Expand Down Expand Up @@ -398,6 +422,9 @@ func NewQuantileSketchVectorStepEvaluator(inner StepEvaluator, quantile float64)

func (e *QuantileSketchVectorStepEvaluator) Next() (bool, int64, StepResult) {
ok, ts, r := e.inner.Next()
if !ok {
return false, 0, SampleVector{}
}
quantileSketchVec := r.QuantileSketchVec()

vec := make(promql.Vector, len(quantileSketchVec))
Expand Down
10 changes: 10 additions & 0 deletions pkg/logql/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,16 @@ func (m MockDownstreamer) Downstream(ctx context.Context, queries []DownstreamQu

results = append(results, res)
}

if matrix, ok := results[0].Data.(ProbabilisticQuantileMatrix); ok {
if len(results) == 1 {
return results, nil
}
for _, m := range results[1:] {
matrix, _ = matrix.Merge(m.Data.(ProbabilisticQuantileMatrix))
}
return []logqlmodel.Result{{Data: matrix}}, nil
}
return results, nil
}

Expand Down
114 changes: 68 additions & 46 deletions pkg/querier/queryrange/downstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/go-kit/log/level"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/tenant"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -125,58 +126,46 @@ func (in instance) For(
queries []logql.DownstreamQuery,
fn func(logql.DownstreamQuery) (logqlmodel.Result, error),
) ([]logqlmodel.Result, error) {
type resp struct {
i int
res logqlmodel.Result
err error
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()
ch := make(chan resp)

// Make one goroutine to dispatch the other goroutines, bounded by instance parallelism
ch := make(chan logql.Resp)

// ForEachJob blocks until all are done. However, we want to process the
// results as they come in. That's why we start everything in another
// gorouting.
go func() {
for i := 0; i < len(queries); i++ {
err := concurrency.ForEachJob(ctx, len(queries), in.parallelism, func(ctx context.Context, i int) error {
res, err := fn(queries[i])
response := logql.Resp{
I: i,
Res: res,
Err: err,
}

// Feed the result into the channel unless the work has completed.
select {
case <-ctx.Done():
break
case <-in.locks:
go func(i int) {
// release lock back into pool
defer func() {
in.locks <- struct{}{}
}()

res, err := fn(queries[i])
response := resp{
i: i,
res: res,
err: err,
}

// Feed the result into the channel unless the work has completed.
select {
case <-ctx.Done():
case ch <- response:
}
}(i)
case ch <- response:
}
return err
})
if err != nil {
ch <- logql.Resp{
I: -1,
Err: err,
}
}
close(ch)
}()

acc := newDownstreamAccumulator(queries[0].Params, len(queries))
for i := 0; i < len(queries); i++ {
select {
case <-ctx.Done():
return nil, ctx.Err()
case resp := <-ch:
if resp.err != nil {
return nil, resp.err
}
if err := acc.Accumulate(ctx, resp.i, resp.res); err != nil {
return nil, err
}
for resp := range ch {
if resp.Err != nil {
return nil, resp.Err
}
if err := acc.Accumulate(ctx, resp.I, resp.Res); err != nil {
return nil, err
}
}
return acc.Result(), nil
Expand Down Expand Up @@ -222,8 +211,8 @@ func sampleStreamToVector(streams []queryrangebase.SampleStream) parser.Value {
return xs
}

// downstreamAccumulator is one of two variants:
// a logsAccumulator or a bufferedAccumulator.
// downstreamAccumulator is one of three variants:
// a logsAccumulator, a bufferedAccumulator, or a quantileSketchAccumulator.
// Which variant is detected on the first call to Accumulate.
// Metric queries, which are generally small payloads, are buffered
// since the memory overhead is negligible.
Expand All @@ -232,6 +221,7 @@ func sampleStreamToVector(streams []queryrangebase.SampleStream) parser.Value {
// accumulate the results into a logsAccumulator, discarding values
// over the limit to keep memory pressure down while other subqueries
// are executing.
// Sharded probabilistic quantile query results are merged as they come in.
type downstreamAccumulator struct {
acc resultAccumulator
params logql.Params
Expand All @@ -248,7 +238,8 @@ func newDownstreamAccumulator(params logql.Params, nQueries int) *downstreamAccu
}

func (a *downstreamAccumulator) build(acc logqlmodel.Result) {
if acc.Data.Type() == logqlmodel.ValueTypeStreams {
switch acc.Data.Type() {
case logqlmodel.ValueTypeStreams:

// the stream accumulator stores a heap with reversed order
// from the results we expect, so we need to reverse the direction
Expand All @@ -258,8 +249,9 @@ func (a *downstreamAccumulator) build(acc logqlmodel.Result) {
}

a.acc = newStreamAccumulator(direction, int(a.params.Limit()))

} else {
case logql.QuantileSketchMatrixType:
a.acc = newQuantileSketchAccumulator()
default:
a.acc = &bufferedAccumulator{
results: make([]logqlmodel.Result, a.n),
}
Expand Down Expand Up @@ -297,6 +289,36 @@ func (a *bufferedAccumulator) Result() []logqlmodel.Result {
return a.results
}

type quantileSketchAccumulator struct {
matrix logql.ProbabilisticQuantileMatrix
}

func newQuantileSketchAccumulator() *quantileSketchAccumulator {
return &quantileSketchAccumulator{}
}

func (a *quantileSketchAccumulator) Accumulate(res logqlmodel.Result, _ int) error {
if res.Data.Type() != logql.QuantileSketchMatrixType {
return fmt.Errorf("unexpected matrix data type: got (%s), want (%s)", res.Data.Type(), logql.QuantileSketchMatrixType)
}
data, ok := res.Data.(logql.ProbabilisticQuantileMatrix)
if !ok {
return fmt.Errorf("unexpected matrix type: got (%T), want (ProbabilisticQuantileMatrix)", res.Data)
}
if a.matrix == nil {
a.matrix = data
return nil
}

var err error
a.matrix, err = a.matrix.Merge(data)
return err
}

func (a *quantileSketchAccumulator) Result() []logqlmodel.Result {
return []logqlmodel.Result{{Data: a.matrix}}
}

// heap impl for keeping only the top n results across m streams
// importantly, accumulatedStreams is _bounded_, so it will only
// store the top `limit` results across all streams.
Expand Down
Loading

0 comments on commit 5095cfc

Please sign in to comment.