Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: Optimized common cases for time selecting smaller amount of series. Avoid looking up symbols. #3531

Merged
merged 3 commits into from
Dec 4, 2020
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 108 additions & 105 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,51 +703,59 @@ func blockSeries(
// Transform all series into the response types and mark their relevant chunks
// for preloading.
var (
res []seriesEntry
lset labels.Labels
chks []chunks.Meta
res []seriesEntry
symbolizedLset []symbolizedLabel
lset labels.Labels
chks []chunks.Meta
)
for _, id := range ps {
if err := indexr.LoadedSeries(id, &lset, &chks, req); err != nil {
ok, err := indexr.LoadSeriesForTime(id, &symbolizedLset, &chks, req.SkipChunks, req.MinTime, req.MaxTime)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really nitpick, but perhaps ok -> found ? Also on line 2091.

if err != nil {
return nil, nil, errors.Wrap(err, "read series")
}
if len(chks) > 0 {
s := seriesEntry{lset: make(labels.Labels, 0, len(lset)+len(extLset))}
if !req.SkipChunks {
s.refs = make([]uint64, 0, len(chks))
s.chks = make([]storepb.AggrChunk, 0, len(chks))
for _, meta := range chks {
if err := chunkr.addPreload(meta.Ref); err != nil {
return nil, nil, errors.Wrap(err, "add chunk preload")
}
s.chks = append(s.chks, storepb.AggrChunk{
MinTime: meta.MinTime,
MaxTime: meta.MaxTime,
})
s.refs = append(s.refs, meta.Ref)
}
if !ok {
// No matching chunks for this time duration, skip series.
continue
}

// Reserve chunksLimiter if we save chunks.
if err := chunksLimiter.Reserve(uint64(len(s.chks))); err != nil {
return nil, nil, errors.Wrap(err, "exceeded chunks limit")
s := seriesEntry{lset: make(labels.Labels, 0, len(symbolizedLset)+len(extLset))}
if !req.SkipChunks {
// Schedule loading chunks.
s.refs = make([]uint64, 0, len(chks))
s.chks = make([]storepb.AggrChunk, 0, len(chks))
for _, meta := range chks {
if err := chunkr.addPreload(meta.Ref); err != nil {
return nil, nil, errors.Wrap(err, "add chunk preload")
}
s.chks = append(s.chks, storepb.AggrChunk{
MinTime: meta.MinTime,
MaxTime: meta.MaxTime,
})
s.refs = append(s.refs, meta.Ref)
}

for _, l := range lset {
// Skip if the external labels of the block overrule the series' label.
// NOTE(fabxc): maybe move it to a prefixed version to still ensure uniqueness of series?
if extLset[l.Name] != "" {
continue
}
s.lset = append(s.lset, l)
// Ensure sample limit through chunksLimiter if we return chunks.
if err := chunksLimiter.Reserve(uint64(len(s.chks))); err != nil {
return nil, nil, errors.Wrap(err, "exceeded chunks limit")
}
for ln, lv := range extLset {
s.lset = append(s.lset, labels.Label{Name: ln, Value: lv})
}
sort.Sort(s.lset)
}
if err := indexr.LookupLabelsSymbols(symbolizedLset, &lset); err != nil {
Copy link
Contributor

@yeya24 yeya24 Dec 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another idea to optimize this:

  1. we can pass s.lset and extLset to this function, if the label name is already in extLset, we don't need to decode that label value

  2. Then we can simplify line 746-756 and reduce the slice append

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, will try it out

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm what about merging this improvement and focusing on next ones in next PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did a simple benchmark but seems this idea is not very good.

name                                                        old time/op    new time/op    delta
BucketSeries/1000000SeriesWith1Samples/1of1000000              162ms ± 0%     205ms ± 0%  +26.23%
BucketSeries/1000000SeriesWith1Samples/10of1000000             206ms ± 0%     175ms ± 0%  -15.38%
BucketSeries/1000000SeriesWith1Samples/1000000of1000000        3.90s ± 0%     4.54s ± 0%  +16.47%
BucketSeries/100000SeriesWith100Samples/1of10000000           13.8ms ± 0%    12.3ms ± 0%  -11.25%
BucketSeries/100000SeriesWith100Samples/100of10000000         13.4ms ± 0%    13.0ms ± 0%   -2.59%
BucketSeries/100000SeriesWith100Samples/10000000of10000000     323ms ± 0%     356ms ± 0%  +10.39%
BucketSeries/1SeriesWith10000000Samples/1of10000000            229µs ± 0%     166µs ± 0%  -27.38%
BucketSeries/1SeriesWith10000000Samples/100of10000000          246µs ± 0%     168µs ± 0%  -31.50%
BucketSeries/1SeriesWith10000000Samples/10000000of10000000     105ms ± 0%      75ms ± 0%  -28.75%

name                                                        old alloc/op   new alloc/op   delta
BucketSeries/1000000SeriesWith1Samples/1of1000000             62.0MB ± 0%    62.1MB ± 0%   +0.01%
BucketSeries/1000000SeriesWith1Samples/10of1000000            62.0MB ± 0%    62.0MB ± 0%   +0.00%
BucketSeries/1000000SeriesWith1Samples/1000000of1000000       1.30GB ± 0%    1.31GB ± 0%   +1.17%
BucketSeries/100000SeriesWith100Samples/1of10000000           4.86MB ± 0%    4.86MB ± 0%   -0.00%
BucketSeries/100000SeriesWith100Samples/100of10000000         4.82MB ± 0%    4.82MB ± 0%   +0.00%
BucketSeries/100000SeriesWith100Samples/10000000of10000000     125MB ± 0%     127MB ± 0%   +1.50%
BucketSeries/1SeriesWith10000000Samples/1of10000000            225kB ± 0%     225kB ± 0%   -0.05%
BucketSeries/1SeriesWith10000000Samples/100of10000000          225kB ± 0%     225kB ± 0%   -0.04%
BucketSeries/1SeriesWith10000000Samples/10000000of10000000    38.0MB ± 0%    38.0MB ± 0%   -0.00%

name                                                        old allocs/op  new allocs/op  delta
BucketSeries/1000000SeriesWith1Samples/1of1000000              9.70k ± 0%     9.70k ± 0%   +0.07%
BucketSeries/1000000SeriesWith1Samples/10of1000000             9.81k ± 0%     9.81k ± 0%   -0.01%
BucketSeries/1000000SeriesWith1Samples/1000000of1000000        12.1M ± 0%     12.1M ± 0%   -0.00%
BucketSeries/100000SeriesWith100Samples/1of10000000            1.11k ± 0%     1.11k ± 0%   -0.27%
BucketSeries/100000SeriesWith100Samples/100of10000000          1.15k ± 0%     1.15k ± 0%   -0.17%
BucketSeries/100000SeriesWith100Samples/10000000of10000000     1.21M ± 0%     1.21M ± 0%   -0.00%
BucketSeries/1SeriesWith10000000Samples/1of10000000              210 ± 0%       208 ± 0%   -0.95%
BucketSeries/1SeriesWith10000000Samples/100of10000000            210 ± 0%       208 ± 0%   -0.95%
BucketSeries/1SeriesWith10000000Samples/10000000of10000000      170k ± 0%      170k ± 0%   -0.01%

return nil, nil, errors.Wrap(err, "Lookup labels symbols")
}

res = append(res, s)
for _, l := range lset {
// Skip if the external labels of the block overrule the series' label.
// NOTE(fabxc): maybe move it to a prefixed version to still ensure uniqueness of series?
if extLset[l.Name] != "" {
continue
}
s.lset = append(s.lset, l)
}
for ln, lv := range extLset {
s.lset = append(s.lset, labels.Label{Name: ln, Value: lv})
}
sort.Sort(s.lset)
res = append(res, s)
}

if req.SkipChunks {
Expand All @@ -771,7 +779,6 @@ func blockSeries(
}
}
}

return newBucketSeriesSet(res), indexr.stats.merge(chunkr.stats), nil
}

Expand Down Expand Up @@ -2070,20 +2077,25 @@ func (g gapBasedPartitioner) Partition(length int, rng func(int) (uint64, uint64
return parts
}

// LoadedSeries populates the given labels and chunk metas for the series identified
// by the reference.
// Returns ErrNotFound if the ref does not resolve to a known series.
func (r *bucketIndexReader) LoadedSeries(ref uint64, lset *labels.Labels, chks *[]chunks.Meta,
req *storepb.SeriesRequest) error {
type symbolizedLabel struct {
name, value uint32
}

// LoadSeriesForTime populates the given symbolized labels for the series identified by the reference if at least one chunk is within
// time selection.
// LoadSeriesForTime also populates chunk metas slices if skipChunks if set to false. Chunks are also limited by the given time selection.
// LoadSeriesForTime returns false, when there are no series data for given time range.
//
// Error is returned on decoding error or if the reference does not resolve to a known series.
func (r *bucketIndexReader) LoadSeriesForTime(ref uint64, lset *[]symbolizedLabel, chks *[]chunks.Meta, skipChunks bool, mint, maxt int64) (ok bool, err error) {
b, ok := r.loadedSeries[ref]
if !ok {
return errors.Errorf("series %d not found", ref)
return false, errors.Errorf("series %d not found", ref)
}

r.stats.seriesTouched++
r.stats.seriesTouchedSizeSum += len(b)

return r.decodeSeriesWithReq(b, lset, chks, req)
return decodeSeriesForTime(b, lset, chks, skipChunks, mint, maxt)
}

// Close released the underlying resources of the reader.
Expand All @@ -2092,93 +2104,84 @@ func (r *bucketIndexReader) Close() error {
return nil
}

// decodeSeriesWithReq decodes a series entry from the given byte slice based on the SeriesRequest.
func (r *bucketIndexReader) decodeSeriesWithReq(b []byte, lbls *labels.Labels, chks *[]chunks.Meta,
req *storepb.SeriesRequest) error {
// LookupLabelsSymbols allows populates label set strings from symbolized label set.
func (r *bucketIndexReader) LookupLabelsSymbols(symbolized []symbolizedLabel, lbls *labels.Labels) error {
*lbls = (*lbls)[:0]
*chks = (*chks)[:0]

d := encoding.Decbuf{B: b}

k := d.Uvarint()

for i := 0; i < k; i++ {
lno := uint32(d.Uvarint())
lvo := uint32(d.Uvarint())

if d.Err() != nil {
return errors.Wrap(d.Err(), "read series label offsets")
}

ln, err := r.dec.LookupSymbol(lno)
for _, s := range symbolized {
// TODO(bwplotka): Cache, it takes majority of query time.
ln, err := r.dec.LookupSymbol(s.name)
if err != nil {
return errors.Wrap(err, "lookup label name")
}
lv, err := r.dec.LookupSymbol(lvo)
lv, err := r.dec.LookupSymbol(s.value)
if err != nil {
return errors.Wrap(err, "lookup label value")
}

*lbls = append(*lbls, labels.Label{Name: ln, Value: lv})
}
return nil
}

// Read the chunks meta data.
k = d.Uvarint()
// decodeSeriesForTime decodes a series entry from the given byte slice decoding only chunk metas that are within given min and max time.
// If skipChunks is specified decodeSeriesForTime does not return any chunks, but only labels and only if at least single chunk is within time range.
// decodeSeriesForTime returns false, when there are no series data for given time range.
func decodeSeriesForTime(b []byte, lset *[]symbolizedLabel, chks *[]chunks.Meta, skipChunks bool, selectMint, selectMaxt int64) (ok bool, err error) {
*lset = (*lset)[:0]

if k == 0 {
return nil
if !skipChunks {
*chks = (*chks)[:0]
}

t0 := d.Varint64()
maxt := int64(d.Uvarint64()) + t0
ref0 := int64(d.Uvarint64())
d := encoding.Decbuf{B: b}

// No chunk in the required time range.
if t0 > req.MaxTime {
return nil
// Read labels without looking up symbols.
k := d.Uvarint()
for i := 0; i < k; i++ {
lno := uint32(d.Uvarint())
lvo := uint32(d.Uvarint())
*lset = append(*lset, symbolizedLabel{name: lno, value: lvo})
}

if req.MinTime <= maxt {
*chks = append(*chks, chunks.Meta{
Ref: uint64(ref0),
MinTime: t0,
MaxTime: maxt,
})
// Get a valid chunk, return if it is a skip chunk request.
if req.SkipChunks {
return nil
}
// Read the chunks meta data.
k = d.Uvarint()
if k == 0 {
return false, d.Err()
}
t0 = maxt

for i := 1; i < k; i++ {
mint := int64(d.Uvarint64()) + t0
maxt := int64(d.Uvarint64()) + mint
ref0 += d.Varint64()
t0 = maxt
// First t0 is absolute, rest is just diff so different type is used (Uvarint64).
mint := d.Varint64()
maxt := int64(d.Uvarint64()) + mint
// Similar for first ref.
ref := int64(d.Uvarint64())

if maxt < req.MinTime {
continue
}
if mint > req.MaxTime {
break
for i := 0; i < k; i++ {
if i > 0 {
mint += int64(d.Uvarint64())
maxt = int64(d.Uvarint64()) + mint
ref += d.Varint64()
}

if d.Err() != nil {
return errors.Wrapf(d.Err(), "read meta for chunk %d", i)
if mint > selectMaxt {
break
}

*chks = append(*chks, chunks.Meta{
Ref: uint64(ref0),
MinTime: mint,
MaxTime: maxt,
})
if maxt >= selectMint {
// Found a chunk.
if skipChunks {
// We are not interested in chunks and we know there is at least one, that's enough to return series.
return true, nil
}

if req.SkipChunks {
return nil
*chks = append(*chks, chunks.Meta{
Ref: uint64(ref),
MinTime: mint,
MaxTime: maxt,
})
}

mint = maxt
}
return d.Err()

return len(*chks) > 0, d.Err()
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
}

type bucketChunkReader struct {
Expand Down