-
Notifications
You must be signed in to change notification settings - Fork 3.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merge quantile sketches as they arrive #11544
Merged
jeschkies
merged 14 commits into
grafana:main
from
jeschkies:karsten/merge-one-at-a-time
Jan 4, 2024
Merged
Changes from all commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
1d0d3cf
Merge quantile sketches as they arrive
jeschkies 6992149
Merge remote-tracking branch 'grafana/main' into karsten/merge-one-at…
jeschkies 375c683
Format
jeschkies b5521e2
Add convenient method to merge matrices
jeschkies e5be590
Do not cancel asyncfor
jeschkies 5f8b104
Forward result when there is no error
jeschkies 80c1f8a
Move quantile merge into accumulator
jeschkies 53c1b51
Add note to use for
jeschkies ced0809
improve sketch merge performance by using a pool for the maps
cstyan 6f5ef81
add benchmarks for bloated memory merge vs accumulator merge
cstyan 80179fe
Benchmark accumulator
jeschkies 8de58a1
Merge remote-tracking branch 'refs/remotes/origin/karsten/merge-one-a…
jeschkies adeab72
Merge remote-tracking branch 'grafana/main' into karsten/merge-one-at…
jeschkies a6c2fbd
Fix unit test
jeschkies File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,7 @@ import ( | |
"time" | ||
|
||
"github.com/go-kit/log/level" | ||
"github.com/grafana/dskit/concurrency" | ||
"github.com/grafana/dskit/tenant" | ||
"github.com/opentracing/opentracing-go" | ||
"github.com/prometheus/prometheus/model/labels" | ||
|
@@ -125,58 +126,46 @@ func (in instance) For( | |
queries []logql.DownstreamQuery, | ||
fn func(logql.DownstreamQuery) (logqlmodel.Result, error), | ||
) ([]logqlmodel.Result, error) { | ||
type resp struct { | ||
i int | ||
res logqlmodel.Result | ||
err error | ||
} | ||
|
||
ctx, cancel := context.WithCancel(ctx) | ||
defer cancel() | ||
ch := make(chan resp) | ||
|
||
// Make one goroutine to dispatch the other goroutines, bounded by instance parallelism | ||
ch := make(chan logql.Resp) | ||
|
||
// ForEachJob blocks until all are done. However, we want to process the | ||
// results as they come in. That's why we start everything in another | ||
// gorouting. | ||
go func() { | ||
for i := 0; i < len(queries); i++ { | ||
err := concurrency.ForEachJob(ctx, len(queries), in.parallelism, func(ctx context.Context, i int) error { | ||
res, err := fn(queries[i]) | ||
response := logql.Resp{ | ||
I: i, | ||
Res: res, | ||
Err: err, | ||
} | ||
|
||
// Feed the result into the channel unless the work has completed. | ||
select { | ||
case <-ctx.Done(): | ||
break | ||
case <-in.locks: | ||
go func(i int) { | ||
// release lock back into pool | ||
defer func() { | ||
in.locks <- struct{}{} | ||
}() | ||
|
||
res, err := fn(queries[i]) | ||
response := resp{ | ||
i: i, | ||
res: res, | ||
err: err, | ||
} | ||
|
||
// Feed the result into the channel unless the work has completed. | ||
select { | ||
case <-ctx.Done(): | ||
case ch <- response: | ||
} | ||
}(i) | ||
case ch <- response: | ||
} | ||
return err | ||
}) | ||
if err != nil { | ||
ch <- logql.Resp{ | ||
I: -1, | ||
Err: err, | ||
} | ||
} | ||
close(ch) | ||
}() | ||
|
||
acc := newDownstreamAccumulator(queries[0].Params, len(queries)) | ||
for i := 0; i < len(queries); i++ { | ||
select { | ||
case <-ctx.Done(): | ||
return nil, ctx.Err() | ||
case resp := <-ch: | ||
if resp.err != nil { | ||
return nil, resp.err | ||
} | ||
if err := acc.Accumulate(ctx, resp.i, resp.res); err != nil { | ||
return nil, err | ||
} | ||
for resp := range ch { | ||
if resp.Err != nil { | ||
return nil, resp.Err | ||
} | ||
if err := acc.Accumulate(ctx, resp.I, resp.Res); err != nil { | ||
return nil, err | ||
} | ||
} | ||
return acc.Result(), nil | ||
|
@@ -222,8 +211,8 @@ func sampleStreamToVector(streams []queryrangebase.SampleStream) parser.Value { | |
return xs | ||
} | ||
|
||
// downstreamAccumulator is one of two variants: | ||
// a logsAccumulator or a bufferedAccumulator. | ||
// downstreamAccumulator is one of three variants: | ||
// a logsAccumulator, a bufferedAccumulator, or a quantileSketchAccumulator. | ||
// Which variant is detected on the first call to Accumulate. | ||
// Metric queries, which are generally small payloads, are buffered | ||
// since the memory overhead is negligible. | ||
|
@@ -232,6 +221,7 @@ func sampleStreamToVector(streams []queryrangebase.SampleStream) parser.Value { | |
// accumulate the results into a logsAccumulator, discarding values | ||
// over the limit to keep memory pressure down while other subqueries | ||
// are executing. | ||
// Sharded probabilistic quantile query results are merged as they come in. | ||
type downstreamAccumulator struct { | ||
acc resultAccumulator | ||
params logql.Params | ||
|
@@ -248,7 +238,8 @@ func newDownstreamAccumulator(params logql.Params, nQueries int) *downstreamAccu | |
} | ||
|
||
func (a *downstreamAccumulator) build(acc logqlmodel.Result) { | ||
if acc.Data.Type() == logqlmodel.ValueTypeStreams { | ||
switch acc.Data.Type() { | ||
case logqlmodel.ValueTypeStreams: | ||
|
||
// the stream accumulator stores a heap with reversed order | ||
// from the results we expect, so we need to reverse the direction | ||
|
@@ -258,8 +249,9 @@ func (a *downstreamAccumulator) build(acc logqlmodel.Result) { | |
} | ||
|
||
a.acc = newStreamAccumulator(direction, int(a.params.Limit())) | ||
|
||
} else { | ||
case logql.QuantileSketchMatrixType: | ||
a.acc = newQuantileSketchAccumulator() | ||
default: | ||
a.acc = &bufferedAccumulator{ | ||
results: make([]logqlmodel.Result, a.n), | ||
} | ||
|
@@ -297,6 +289,36 @@ func (a *bufferedAccumulator) Result() []logqlmodel.Result { | |
return a.results | ||
} | ||
|
||
type quantileSketchAccumulator struct { | ||
matrix logql.ProbabilisticQuantileMatrix | ||
} | ||
|
||
func newQuantileSketchAccumulator() *quantileSketchAccumulator { | ||
return &quantileSketchAccumulator{} | ||
} | ||
|
||
func (a *quantileSketchAccumulator) Accumulate(res logqlmodel.Result, _ int) error { | ||
if res.Data.Type() != logql.QuantileSketchMatrixType { | ||
return fmt.Errorf("unexpected matrix data type: got (%s), want (%s)", res.Data.Type(), logql.QuantileSketchMatrixType) | ||
} | ||
data, ok := res.Data.(logql.ProbabilisticQuantileMatrix) | ||
if !ok { | ||
return fmt.Errorf("unexpected matrix type: got (%T), want (ProbabilisticQuantileMatrix)", res.Data) | ||
} | ||
if a.matrix == nil { | ||
a.matrix = data | ||
return nil | ||
} | ||
|
||
var err error | ||
a.matrix, err = a.matrix.Merge(data) | ||
return err | ||
} | ||
|
||
func (a *quantileSketchAccumulator) Result() []logqlmodel.Result { | ||
return []logqlmodel.Result{{Data: a.matrix}} | ||
} | ||
Comment on lines
+292
to
+320
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be the place where we merge results? |
||
|
||
// heap impl for keeping only the top n results across m streams | ||
// importantly, accumulatedStreams is _bounded_, so it will only | ||
// store the top `limit` results across all streams. | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not too fond of this design. We do know whether the results are samples, logs or quantile sketches. So ideally the logql engine would inject the accumulator on the execution plan. Moving this into the logql package would also make the
MockQuerier
simpler.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems like a good change to me