-
Notifications
You must be signed in to change notification settings - Fork 3.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merge quantile sketches as they arrive #11544
Merge quantile sketches as they arrive #11544
Conversation
Trivy scan found the following vulnerabilities:
|
type quantileSketchAccumulator struct { | ||
matrix logql.ProbabilisticQuantileMatrix | ||
} | ||
|
||
func newQuantileSketchAccumulator() *quantileSketchAccumulator { | ||
return &quantileSketchAccumulator{} | ||
} | ||
|
||
func (a *quantileSketchAccumulator) Accumulate(res logqlmodel.Result, _ int) error { | ||
if res.Data.Type() != logql.QuantileSketchMatrixType { | ||
return fmt.Errorf("unexpected matrix data type: got (%s), want (%s)", res.Data.Type(), logql.QuantileSketchMatrixType) | ||
} | ||
data, ok := res.Data.(logql.ProbabilisticQuantileMatrix) | ||
if !ok { | ||
return fmt.Errorf("unexpected matrix type: got (%T), want (ProbabilisticQuantileMatrix)", res.Data) | ||
} | ||
if a.matrix == nil { | ||
a.matrix = data | ||
return nil | ||
} | ||
|
||
var err error | ||
a.matrix, err = a.matrix.Merge(data) | ||
return err | ||
} | ||
|
||
func (a *quantileSketchAccumulator) Result() []logqlmodel.Result { | ||
return []logqlmodel.Result{{Data: a.matrix}} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be the place where we merge results?
I still need to benchmark this. |
that we use to store and check stream hashes Signed-off-by: Callum Styan <callumstyan@gmail.com>
Signed-off-by: Callum Styan <callumstyan@gmail.com>
I've pushed two commits to your branch, one with benchmarks for single merge vs accumulator and one for an optimization to the map usage in the quantile matrix/vector merging. First looking at just the single merge as we have it prior to the accumulator change, with and without the pooling:
Then the pooled single merge vs accumulator merge:
And the final result with my two commits vs the single merge usage:
|
func BenchmarkAccumulateMerge(b *testing.B) { | ||
b.ReportAllocs() | ||
results := make([]logql.ProbabilisticQuantileVector, 100) | ||
for i := range results { | ||
results[i] = make(logql.ProbabilisticQuantileVector, 100) | ||
for j := range results[i] { | ||
results[i][j] = logql.ProbabilisticQuantileSample{ | ||
T: int64(i), | ||
F: newRandomSketch(), | ||
Metric: []labels.Label{{Name: "foo", Value: fmt.Sprintf("bar-%d", j)}}, | ||
} | ||
} | ||
} | ||
res := make([]logqlmodel.Result, 0, 100) | ||
for _, r := range results { | ||
res = append(res, logqlmodel.Result{ | ||
Data: logql.ProbabilisticQuantileMatrix([]logql.ProbabilisticQuantileVector{r}), | ||
}) | ||
} | ||
b.ResetTimer() | ||
for n := 0; n < b.N; n++ { | ||
acc := newQuantileSketchAccumulator() | ||
for _, r := range res { | ||
require.NoError(b, acc.Accumulate(r, 0)) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice. I was about to push my change and had a conflict. Your code was almost exactly as mine 🎉
…t-a-time' into karsten/merge-one-at-a-time
@@ -248,7 +238,8 @@ func newDownstreamAccumulator(params logql.Params, nQueries int) *downstreamAccu | |||
} | |||
|
|||
func (a *downstreamAccumulator) build(acc logqlmodel.Result) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not too fond of this design. We do know whether the results are samples, logs or quantile sketches. So ideally the logql engine would inject the accumulator on the execution plan. Moving this into the logql package would also make the MockQuerier
simpler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems like a good change to me
What this PR does / why we need it:
Currently, the quantile sketch merge logic in the query frontend waits for all shards to complete and merges them then. That puts a lot of pressure on the memory. Luckily the merges can happen out of order as they come in.
I've simulated the later merge.
Checklist
CONTRIBUTING.md
guide (required)CHANGELOG.md
updatedadd-to-release-notes
labeldocs/sources/setup/upgrade/_index.md
production/helm/loki/Chart.yaml
and updateproduction/helm/loki/CHANGELOG.md
andproduction/helm/loki/README.md
. Example PRdeprecated-config.yaml
anddeleted-config.yaml
files respectively in thetools/deprecated-config-checker
directory. Example PR