Skip to content

Commit

Permalink
Don't load series multiple times when streaming chunks from store-gat…
Browse files Browse the repository at this point in the history
…eways and only one batch is needed (grafana#8039)

Co-authored-by: Jeanette Tan <jeanette.tan@grafana.com>
  • Loading branch information
2 people authored and narqo committed Jun 6, 2024
1 parent 5cdfa2a commit 05e9c4c
Show file tree
Hide file tree
Showing 7 changed files with 605 additions and 163 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
* [ENHANCEMENT] Store-gateway: add `-blocks-storage.bucket-store.index-header.lazy-loading-concurrency-queue-timeout`. When set, loads of index-headers at the store-gateway's index-header lazy load gate will not wait longer than that to execute. If a load reaches the wait timeout, then the querier will retry the blocks on a different store-gateway. If all store-gateways are unavailable, then the query will fail with `err-mimir-store-consistency-check-failed`. #8138
* [ENHANCEMENT] Ingester: Optimize querying with regexp matchers. #8106
* [ENHANCEMENT] Distributor: Introduce `-distributor.max-request-pool-buffer-size` to allow configuring the maximum size of the request pool buffers. #8082
* [ENHANCEMENT] Store-gateway: improve performance when streaming chunks to queriers is enabled (`-querier.prefer-streaming-chunks-from-store-gateways=true`) and the query selects fewer than `-blocks-storage.bucket-store.batch-series-size` series (defaults to 5000 series). #8039
* [ENHANCEMENT] Ingester: active series are now updated along with owned series. They decrease when series change ownership between ingesters. This helps provide a more accurate total of active series when ingesters are added. This is only enabled when `-ingester.track-ingester-owned-series` or `-ingester.use-ingester-owned-series-for-limits` are enabled. #8084
* [BUGFIX] Rules: improve error handling when querier is local to the ruler. #7567
* [BUGFIX] Querier, store-gateway: Protect against panics raised during snappy encoding. #7520
Expand Down
102 changes: 42 additions & 60 deletions pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,10 +606,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.Stor
defer done()

var (
// If we are streaming the series labels and chunks separately, we don't need to fetch the postings
// twice. So we use these slices to re-use them. Each reuse[i] corresponds to a single block.
reuse []*reusedPostingsAndMatchers
resHints = &hintspb.SeriesResponseHints{}
streamingIterators *streamingSeriesIterators
resHints = &hintspb.SeriesResponseHints{}
)
for _, b := range blocks {
resHints.AddQueriedBlock(b.meta.ULID)
Expand All @@ -633,7 +631,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.Stor
seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series"))
)

seriesSet, reuse, err = s.streamingSeriesForBlocks(ctx, req, blocks, indexReaders, shardSelector, matchers, chunksLimiter, seriesLimiter, stats)
seriesSet, streamingIterators, err = s.createIteratorForChunksStreamingLabelsPhase(ctx, req, blocks, indexReaders, shardSelector, matchers, chunksLimiter, seriesLimiter, stats)
if err != nil {
return err
}
Expand Down Expand Up @@ -661,15 +659,11 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.Stor

start := time.Now()
if req.StreamingChunksBatchSize > 0 {
var seriesChunkIt iterator[seriesChunksSet]
seriesChunkIt, err = s.streamingChunksSetForBlocks(ctx, req, blocks, indexReaders, readers, shardSelector, matchers, chunksLimiter, seriesLimiter, stats, reuse)
if err != nil {
return err
}
seriesChunkIt := s.createIteratorForChunksStreamingChunksPhase(ctx, readers, stats, chunksLimiter, seriesLimiter, streamingIterators)
err = s.sendStreamingChunks(req, srv, seriesChunkIt, stats, streamingSeriesCount)
} else {
var seriesSet storepb.SeriesSet
seriesSet, err = s.nonStreamingSeriesSetForBlocks(ctx, req, blocks, indexReaders, readers, shardSelector, matchers, chunksLimiter, seriesLimiter, stats)
seriesSet, err = s.createIteratorForNonChunksStreamingRequest(ctx, req, blocks, indexReaders, readers, shardSelector, matchers, chunksLimiter, seriesLimiter, stats)
if err != nil {
return err
}
Expand Down Expand Up @@ -1027,24 +1021,24 @@ func chunksSize(chks []storepb.AggrChunk) (size int) {
return size
}

// nonStreamingSeriesSetForBlocks is used when the streaming feature is not enabled.
func (s *BucketStore) nonStreamingSeriesSetForBlocks(
// createIteratorForNonChunksStreamingRequest is used when the streaming feature is not enabled.
func (s *BucketStore) createIteratorForNonChunksStreamingRequest(
ctx context.Context,
req *storepb.SeriesRequest,
blocks []*bucketBlock,
indexReaders map[ulid.ULID]*bucketIndexReader,
chunkReaders *bucketChunkReaders,
shardSelector *sharding.ShardSelector,
matchers []*labels.Matcher,
chunksLimiter ChunksLimiter, // Rate limiter for loading chunks.
seriesLimiter SeriesLimiter, // Rate limiter for loading series.
chunksLimiter ChunksLimiter,
seriesLimiter SeriesLimiter,
stats *safeQueryStats,
) (storepb.SeriesSet, error) {
strategy := defaultStrategy
if req.SkipChunks {
strategy = noChunkRefs
}
it, err := s.getSeriesIteratorFromBlocks(ctx, req, blocks, indexReaders, shardSelector, matchers, chunksLimiter, seriesLimiter, stats, nil, strategy)
it, err := s.getSeriesIteratorFromBlocks(ctx, req, blocks, indexReaders, shardSelector, matchers, chunksLimiter, seriesLimiter, stats, strategy, nil)
if err != nil {
return nil, err
}
Expand All @@ -1059,56 +1053,45 @@ func (s *BucketStore) nonStreamingSeriesSetForBlocks(
return set, nil
}

// streamingSeriesForBlocks is used when streaming feature is enabled.
// createIteratorForChunksStreamingLabelsPhase is used when streaming feature is enabled.
// It returns a series set that only contains the series labels without any chunks information.
// The returned postings (series ref) and matches should be re-used when getting chunks to save on computation.
func (s *BucketStore) streamingSeriesForBlocks(
// The streamingSeriesIterators should be re-used when getting chunks to save on computation.
func (s *BucketStore) createIteratorForChunksStreamingLabelsPhase(
ctx context.Context,
req *storepb.SeriesRequest,
blocks []*bucketBlock,
indexReaders map[ulid.ULID]*bucketIndexReader,
shardSelector *sharding.ShardSelector,
matchers []*labels.Matcher,
chunksLimiter ChunksLimiter, // Rate limiter for loading chunks.
seriesLimiter SeriesLimiter, // Rate limiter for loading series.
chunksLimiter ChunksLimiter,
seriesLimiter SeriesLimiter,
stats *safeQueryStats,
) (storepb.SeriesSet, []*reusedPostingsAndMatchers, error) {
var (
reuse = make([]*reusedPostingsAndMatchers, len(blocks))
strategy = noChunkRefs | overlapMintMaxt
)
for i := range reuse {
reuse[i] = &reusedPostingsAndMatchers{}
}
it, err := s.getSeriesIteratorFromBlocks(ctx, req, blocks, indexReaders, shardSelector, matchers, chunksLimiter, seriesLimiter, stats, reuse, strategy)
) (storepb.SeriesSet, *streamingSeriesIterators, error) {
streamingIterators := newStreamingSeriesIterators()
it, err := s.getSeriesIteratorFromBlocks(ctx, req, blocks, indexReaders, shardSelector, matchers, chunksLimiter, seriesLimiter, stats, overlapMintMaxt, streamingIterators)
if err != nil {
return nil, nil, err
}
return newSeriesSetWithoutChunks(ctx, it, stats), reuse, nil

return newSeriesSetWithoutChunks(ctx, it, stats), streamingIterators, nil
}

// streamingChunksSetForBlocks is used when streaming feature is enabled.
// It returns an iterator to go over the chunks for the series returned in the streamingSeriesForBlocks call.
// It is recommended to pass the reusePostings and reusePendingMatches returned by the streamingSeriesForBlocks call.
func (s *BucketStore) streamingChunksSetForBlocks(
// createIteratorForChunksStreamingChunksPhase is used when streaming feature is enabled.
// It returns an iterator to go over the chunks for the series returned in the createIteratorForChunksStreamingLabelsPhase call.
// It is required to pass the iterators returned by the createIteratorForChunksStreamingLabelsPhase call for reuse.
func (s *BucketStore) createIteratorForChunksStreamingChunksPhase(
ctx context.Context,
req *storepb.SeriesRequest,
blocks []*bucketBlock,
indexReaders map[ulid.ULID]*bucketIndexReader,
chunkReaders *bucketChunkReaders,
shardSelector *sharding.ShardSelector,
matchers []*labels.Matcher,
chunksLimiter ChunksLimiter, // Rate limiter for loading chunks.
seriesLimiter SeriesLimiter, // Rate limiter for loading series.
stats *safeQueryStats,
reuse []*reusedPostingsAndMatchers, // Should come from streamingSeriesForBlocks.
) (iterator[seriesChunksSet], error) {
it, err := s.getSeriesIteratorFromBlocks(ctx, req, blocks, indexReaders, shardSelector, matchers, chunksLimiter, seriesLimiter, stats, reuse, defaultStrategy)
if err != nil {
return nil, err
}
chunksLimiter ChunksLimiter,
seriesLimiter SeriesLimiter,
iterators *streamingSeriesIterators,
) iterator[seriesChunksSet] {
preparedIterators := iterators.prepareForChunksStreamingPhase()
it := s.getSeriesIteratorFromPerBlockIterators(preparedIterators, chunksLimiter, seriesLimiter)
scsi := newChunksPreloadingIterator(ctx, s.logger, s.userID, *chunkReaders, it, s.maxSeriesPerBatch, stats)
return scsi, nil

return scsi
}

func (s *BucketStore) getSeriesIteratorFromBlocks(
Expand All @@ -1121,8 +1104,8 @@ func (s *BucketStore) getSeriesIteratorFromBlocks(
chunksLimiter ChunksLimiter, // Rate limiter for loading chunks.
seriesLimiter SeriesLimiter, // Rate limiter for loading series.
stats *safeQueryStats,
reuse []*reusedPostingsAndMatchers, // Used if not empty. If not empty, len(reuse) must be len(blocks).
strategy seriesIteratorStrategy,
streamingIterators *streamingSeriesIterators,
) (iterator[seriesChunkRefsSet], error) {
var (
mtx = sync.Mutex{}
Expand All @@ -1131,9 +1114,8 @@ func (s *BucketStore) getSeriesIteratorFromBlocks(
begin = time.Now()
blocksQueriedByBlockMeta = make(map[blockQueriedMeta]int)
)
for i, b := range blocks {
for _, b := range blocks {
b := b
i := i

// Keep track of queried blocks.
indexr := indexReaders[b.meta.ULID]
Expand All @@ -1144,10 +1126,6 @@ func (s *BucketStore) getSeriesIteratorFromBlocks(
if shardSelector != nil {
blockSeriesHashCache = s.seriesHashCache.GetBlockCache(b.meta.ULID.String())
}
var r *reusedPostingsAndMatchers
if len(reuse) > 0 {
r = reuse[i]
}
g.Go(func() error {
part, err := openBlockSeriesChunkRefsSetsIterator(
ctx,
Expand All @@ -1162,8 +1140,8 @@ func (s *BucketStore) getSeriesIteratorFromBlocks(
strategy,
req.MinTime, req.MaxTime,
stats,
r,
s.logger,
streamingIterators,
)
if err != nil {
return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID)
Expand Down Expand Up @@ -1192,13 +1170,17 @@ func (s *BucketStore) getSeriesIteratorFromBlocks(
stats.streamingSeriesExpandPostingsDuration += time.Since(begin)
})

mergedIterator := mergedSeriesChunkRefsSetIterators(s.maxSeriesPerBatch, batches...)
return s.getSeriesIteratorFromPerBlockIterators(batches, chunksLimiter, seriesLimiter), nil
}

func (s *BucketStore) getSeriesIteratorFromPerBlockIterators(perBlockIterators []iterator[seriesChunkRefsSet], chunksLimiter ChunksLimiter, seriesLimiter SeriesLimiter) iterator[seriesChunkRefsSet] {
mergedIterator := mergedSeriesChunkRefsSetIterators(s.maxSeriesPerBatch, perBlockIterators...)

// Apply limits after the merging, so that if the same series is part of multiple blocks it just gets
// counted once towards the limit.
mergedIterator = newLimitingSeriesChunkRefsSetIterator(mergedIterator, chunksLimiter, seriesLimiter)

return mergedIterator, nil
return mergedIterator
}

func (s *BucketStore) recordSeriesCallResult(safeStats *safeQueryStats) {
Expand Down Expand Up @@ -1467,8 +1449,8 @@ func blockLabelNames(ctx context.Context, indexr *bucketIndexReader, matchers []
noChunkRefs,
minTime, maxTime,
stats,
nil,
logger,
nil,
)
if err != nil {
return nil, errors.Wrap(err, "fetch series")
Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/bucket_chunk_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ func TestBucketChunkReader_refetchChunks(t *testing.T) {
block.meta.MinTime,
block.meta.MaxTime,
newSafeQueryStats(),
nil,
log.NewNopLogger(),
nil,
)
require.NoError(t, err)

Expand Down
102 changes: 52 additions & 50 deletions pkg/storegateway/series_refs.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,13 @@ func (b seriesChunkRefsSet) release() {
seriesChunkRefsSetPool.Put(&reuse)
}

// makeUnreleasable returns a new seriesChunkRefsSet that cannot be released on a subsequent call to release.
//
// This is useful for scenarios where a set will be used multiple times and so it is not safe for consumers to release it.
func (b seriesChunkRefsSet) makeUnreleasable() seriesChunkRefsSet {
return seriesChunkRefsSet{b.series, false}
}

// seriesChunkRefs holds a series with a list of chunk references.
type seriesChunkRefs struct {
lset labels.Labels
Expand Down Expand Up @@ -690,47 +697,58 @@ func openBlockSeriesChunkRefsSetsIterator(
ctx context.Context,
batchSize int,
tenantID string,
indexr *bucketIndexReader, // Index reader for block.
indexr *bucketIndexReader,
indexCache indexcache.IndexCache,
blockMeta *block.Meta,
matchers []*labels.Matcher, // Series matchers.
shard *sharding.ShardSelector, // Shard selector.
matchers []*labels.Matcher,
shard *sharding.ShardSelector,
seriesHasher seriesHasher,
strategy seriesIteratorStrategy,
minTime, maxTime int64, // Series must have data in this time range to be returned (ignored if skipChunks=true).
minTime, maxTime int64,
stats *safeQueryStats,
reuse *reusedPostingsAndMatchers, // If this is not nil, these posting and matchers are used as it is without fetching new ones.
logger log.Logger,
streamingIterators *streamingSeriesIterators,
) (iterator[seriesChunkRefsSet], error) {
if batchSize <= 0 {
return nil, errors.New("set size must be a positive number")
}

var (
ps []storage.SeriesRef
pendingMatchers []*labels.Matcher
fetchPostings = true
)
if reuse != nil {
fetchPostings = !reuse.isSet()
ps = reuse.ps
pendingMatchers = reuse.matchers
ps, pendingMatchers, err := indexr.ExpandedPostings(ctx, matchers, stats)
if err != nil {
return nil, errors.Wrap(err, "expanded matching postings")
}
if fetchPostings {
var err error
ps, pendingMatchers, err = indexr.ExpandedPostings(ctx, matchers, stats)
if err != nil {
return nil, errors.Wrap(err, "expanded matching postings")
}
if reuse != nil {
reuse.set(ps, pendingMatchers)
}

iteratorFactory := func(strategy seriesIteratorStrategy, psi *postingsSetsIterator) iterator[seriesChunkRefsSet] {
return openBlockSeriesChunkRefsSetsIteratorFromPostings(ctx, tenantID, indexr, indexCache, blockMeta, shard, seriesHasher, strategy, minTime, maxTime, stats, psi, pendingMatchers, logger)
}

if streamingIterators == nil {
psi := newPostingsSetsIterator(ps, batchSize)
return iteratorFactory(strategy, psi), nil
}

return streamingIterators.wrapIterator(strategy, ps, batchSize, iteratorFactory), nil
}

func openBlockSeriesChunkRefsSetsIteratorFromPostings(
ctx context.Context,
tenantID string,
indexr *bucketIndexReader,
indexCache indexcache.IndexCache,
blockMeta *block.Meta,
shard *sharding.ShardSelector,
seriesHasher seriesHasher,
strategy seriesIteratorStrategy,
minTime, maxTime int64,
stats *safeQueryStats,
postingsSetsIterator *postingsSetsIterator,
pendingMatchers []*labels.Matcher,
logger log.Logger,
) iterator[seriesChunkRefsSet] {
var it iterator[seriesChunkRefsSet]
it = newLoadingSeriesChunkRefsSetIterator(
ctx,
newPostingsSetsIterator(ps, batchSize),
postingsSetsIterator,
indexr,
indexCache,
stats,
Expand All @@ -743,36 +761,12 @@ func openBlockSeriesChunkRefsSetsIterator(
tenantID,
logger,
)

if len(pendingMatchers) > 0 {
it = newFilteringSeriesChunkRefsSetIterator(pendingMatchers, it, stats)
}

return it, nil
}

// reusedPostings is used to share the postings and matches across function calls for re-use
// in case of streaming series. We have it as a separate struct so that we can give a safe way
// to use it by making a copy where required. You can use it to put items only once.
type reusedPostingsAndMatchers struct {
ps []storage.SeriesRef
matchers []*labels.Matcher
filled bool
}

func (p *reusedPostingsAndMatchers) set(ps []storage.SeriesRef, matchers []*labels.Matcher) {
if p.filled {
// We already have something here.
return
}
// Postings list can be modified later, so we make a copy here.
p.ps = make([]storage.SeriesRef, len(ps))
copy(p.ps, ps)
p.matchers = matchers
p.filled = true
}

func (p *reusedPostingsAndMatchers) isSet() bool {
return p.filled
return it
}

// seriesIteratorStrategy defines the strategy to use when loading the series and their chunk refs.
Expand Down Expand Up @@ -812,6 +806,14 @@ func (s seriesIteratorStrategy) isNoChunkRefsAndOverlapMintMaxt() bool {
return s.isNoChunkRefs() && s.isOverlapMintMaxt()
}

func (s seriesIteratorStrategy) withNoChunkRefs() seriesIteratorStrategy {
return s | noChunkRefs
}

func (s seriesIteratorStrategy) withChunkRefs() seriesIteratorStrategy {
return s & ^noChunkRefs
}

func newLoadingSeriesChunkRefsSetIterator(
ctx context.Context,
postingsSetIterator *postingsSetsIterator,
Expand Down
Loading

0 comments on commit 05e9c4c

Please sign in to comment.