Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge quantile sketches as they arrive #11544

Merged
merged 14 commits into from
Jan 4, 2024

Conversation

jeschkies
Copy link
Contributor

@jeschkies jeschkies commented Dec 21, 2023

What this PR does / why we need it:
Currently, the quantile sketch merge logic in the query frontend waits for all shards to complete and merges them then. That puts a lot of pressure on the memory. Luckily the merges can happen out of order as they come in.

› benchstat before2.log after.log
goos: linux
goarch: amd64
pkg: github.com/grafana/loki/pkg/querier/queryrange
cpu: AMD Ryzen 7 3700X 8-Core Processor             
                                 │ before2.log │             after.log              │
                                 │   sec/op    │   sec/op     vs base               │
Accumulator/streams-16             46.51µ ± 2%   45.74µ ± 2%       ~ (p=0.280 n=10)
Accumulator/quantile_sketches-16   6.806m ± 2%   7.012m ± 0%  +3.02% (p=0.000 n=10)
geomean                            562.7µ        566.3µ       +0.65%

                                 │  before2.log  │              after.log               │
                                 │     B/op      │     B/op      vs base                │
Accumulator/streams-16              8.385Ki ± 0%   8.384Ki ± 0%        ~ (p=0.570 n=10)
Accumulator/quantile_sketches-16   68.482Ki ± 0%   4.465Ki ± 1%  -93.48% (p=0.000 n=10)
geomean                             23.96Ki        6.119Ki       -74.47%

                                 │ before2.log │              after.log              │
                                 │  allocs/op  │ allocs/op   vs base                 │
Accumulator/streams-16              79.00 ± 0%   79.00 ± 0%       ~ (p=1.000 n=10) ¹
Accumulator/quantile_sketches-16    69.00 ± 3%   67.00 ± 1%  -2.90% (p=0.010 n=10)
geomean                             73.83        72.75       -1.46%
¹ all samples are equal

I've simulated the later merge.

Checklist

  • Reviewed the CONTRIBUTING.md guide (required)
  • Documentation added
  • Tests updated
  • CHANGELOG.md updated
    • If the change is worth mentioning in the release notes, add add-to-release-notes label
  • Changes that require user attention or interaction to upgrade are documented in docs/sources/setup/upgrade/_index.md
  • For Helm chart changes bump the Helm chart version in production/helm/loki/Chart.yaml and update production/helm/loki/CHANGELOG.md and production/helm/loki/README.md. Example PR
  • If the change is deprecating or removing a configuration option, update the deprecated-config.yaml and deleted-config.yaml files respectively in the tools/deprecated-config-checker directory. Example PR

Copy link
Contributor

github-actions bot commented Dec 21, 2023

Trivy scan found the following vulnerabilities:

  • HIGH, Target: docker.io/grafana/loki:main-5c502e4 (alpine 3.18.4), Type: alpine openssl: Incorrect cipher key and IV length processing in libcrypto3 v3.1.3-r0. Fixed in v3.1.4-r0
  • HIGH, Target: docker.io/grafana/loki:main-5c502e4 (alpine 3.18.4), Type: alpine openssl: Incorrect cipher key and IV length processing in libssl3 v3.1.3-r0. Fixed in v3.1.4-r0
    \nTo see more details on these vulnerabilities, and how/where to fix them, please run docker build -t grafana/loki:main-5c502e4 -f cmd/loki/Dockerfile .
    trivy i grafana/loki:main-5c502e4 on your branch. If these were not introduced by your PR, please considering fixing them in via a subsequent PR. Thanks!

Comment on lines +298 to +326
type quantileSketchAccumulator struct {
matrix logql.ProbabilisticQuantileMatrix
}

func newQuantileSketchAccumulator() *quantileSketchAccumulator {
return &quantileSketchAccumulator{}
}

func (a *quantileSketchAccumulator) Accumulate(res logqlmodel.Result, _ int) error {
if res.Data.Type() != logql.QuantileSketchMatrixType {
return fmt.Errorf("unexpected matrix data type: got (%s), want (%s)", res.Data.Type(), logql.QuantileSketchMatrixType)
}
data, ok := res.Data.(logql.ProbabilisticQuantileMatrix)
if !ok {
return fmt.Errorf("unexpected matrix type: got (%T), want (ProbabilisticQuantileMatrix)", res.Data)
}
if a.matrix == nil {
a.matrix = data
return nil
}

var err error
a.matrix, err = a.matrix.Merge(data)
return err
}

func (a *quantileSketchAccumulator) Result() []logqlmodel.Result {
return []logqlmodel.Result{{Data: a.matrix}}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be the place where we merge results?

@jeschkies
Copy link
Contributor Author

I still need to benchmark this.

that we use to store and check stream hashes

Signed-off-by: Callum Styan <callumstyan@gmail.com>
Signed-off-by: Callum Styan <callumstyan@gmail.com>
@cstyan
Copy link
Contributor

cstyan commented Jan 2, 2024

I've pushed two commits to your branch, one with benchmarks for single merge vs accumulator and one for an optimization to the map usage in the quantile matrix/vector merging.

First looking at just the single merge as we have it prior to the accumulator change, with and without the pooling:

cpu: AMD Ryzen 9 5950X 16-Core Processor
               │   no-pool   │                pool                │
               │   sec/op    │   sec/op     vs base               │
SingleMerge-32   5.032m ± 6%   3.846m ± 5%  -23.56% (p=0.002 n=6)

               │     no-pool     │                pool                 │
               │      B/op       │     B/op      vs base               │
SingleMerge-32   1043.875Ki ± 0%   3.390Ki ± 3%  -99.68% (p=0.002 n=6)

               │   no-pool   │               pool                │
               │  allocs/op  │ allocs/op   vs base               │
SingleMerge-32   1771.0 ± 0%   129.5 ± 0%  -92.69% (p=0.002 n=6)

Then the pooled single merge vs accumulator merge:

     │ AccumulateMerge │         SingleMerge          │
     │     sec/op      │   sec/op     vs base         │
*-32       3.932m ± 2%   3.846m ± 5%  ~ (p=0.093 n=6)

     │ AccumulateMerge │             SingleMerge              │
     │      B/op       │     B/op      vs base                │
*-32      1.051Ki ± 4%   3.390Ki ± 3%  +222.58% (p=0.002 n=6)

     │ AccumulateMerge │             SingleMerge             │
     │    allocs/op    │  allocs/op   vs base                │
*-32        30.00 ± 3%   129.50 ± 0%  +331.67% (p=0.002 n=6)

And the final result with my two commits vs the single merge usage:

     │ SingleMerge │          AccumulateMerge           │
     │   sec/op    │   sec/op     vs base               │
*-32   5.032m ± 6%   3.932m ± 2%  -21.86% (p=0.002 n=6)

     │   SingleMerge   │           AccumulateMerge           │
     │      B/op       │     B/op      vs base               │
*-32   1043.875Ki ± 0%   1.051Ki ± 4%  -99.90% (p=0.002 n=6)

     │ SingleMerge  │          AccumulateMerge          │
     │  allocs/op   │ allocs/op   vs base               │
*-32   1771.00 ± 0%   30.00 ± 3%  -98.31% (p=0.002 n=6)

Comment on lines 613 to 639
func BenchmarkAccumulateMerge(b *testing.B) {
b.ReportAllocs()
results := make([]logql.ProbabilisticQuantileVector, 100)
for i := range results {
results[i] = make(logql.ProbabilisticQuantileVector, 100)
for j := range results[i] {
results[i][j] = logql.ProbabilisticQuantileSample{
T: int64(i),
F: newRandomSketch(),
Metric: []labels.Label{{Name: "foo", Value: fmt.Sprintf("bar-%d", j)}},
}
}
}
res := make([]logqlmodel.Result, 0, 100)
for _, r := range results {
res = append(res, logqlmodel.Result{
Data: logql.ProbabilisticQuantileMatrix([]logql.ProbabilisticQuantileVector{r}),
})
}
b.ResetTimer()
for n := 0; n < b.N; n++ {
acc := newQuantileSketchAccumulator()
for _, r := range res {
require.NoError(b, acc.Accumulate(r, 0))
}
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice. I was about to push my change and had a conflict. Your code was almost exactly as mine 🎉

@jeschkies jeschkies marked this pull request as ready for review January 3, 2024 11:31
@jeschkies jeschkies requested a review from a team as a code owner January 3, 2024 11:31
@@ -248,7 +238,8 @@ func newDownstreamAccumulator(params logql.Params, nQueries int) *downstreamAccu
}

func (a *downstreamAccumulator) build(acc logqlmodel.Result) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not too fond of this design. We do know whether the results are samples, logs or quantile sketches. So ideally the logql engine would inject the accumulator on the execution plan. Moving this into the logql package would also make the MockQuerier simpler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems like a good change to me

@jeschkies jeschkies merged commit 8d01cbf into grafana:main Jan 4, 2024
7 checks passed
@jeschkies jeschkies deleted the karsten/merge-one-at-a-time branch January 4, 2024 11:38
rhnasc pushed a commit to inloco/loki that referenced this pull request Apr 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants