Skip to content

Commit

Permalink
store-gateway: read series from bucket with io.Reader (#4926)
Browse files Browse the repository at this point in the history
Uses an io.Reader to read series data from the bucket, so we never hold everything in a buffer. It also introduces pooling for same bytes.

This PR introduces a reader offsetTrackingReader which can keep track of its offset in a buffer. I plan to introduce this reader in loadChunks as well to simplify the code there a bit.


---------

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
  • Loading branch information
dimitarvdimitrov authored May 5, 2023
1 parent c9b54e2 commit 14080d0
Show file tree
Hide file tree
Showing 11 changed files with 183 additions and 83 deletions.
13 changes: 11 additions & 2 deletions pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,7 @@ func (s *BucketStore) recordSeriesStats(stats *queryStats) {
s.metrics.seriesDataFetched.WithLabelValues("series", "").Observe(float64(stats.seriesFetched))
s.metrics.seriesDataSizeTouched.WithLabelValues("series", "").Observe(float64(stats.seriesProcessedSizeSum))
s.metrics.seriesDataSizeFetched.WithLabelValues("series", "").Observe(float64(stats.seriesFetchedSizeSum))
s.metrics.seriesRefetches.Add(float64(stats.seriesRefetches))
}

func (s *BucketStore) recordStreamingSeriesStats(stats *queryStats) {
Expand Down Expand Up @@ -1549,10 +1550,18 @@ func (b *bucketBlock) indexFilename() string {
return path.Join(b.meta.ULID.String(), block.IndexFilename)
}

func (b *bucketBlock) readIndexRange(ctx context.Context, off, length int64) ([]byte, error) {
func (b *bucketBlock) indexRangeReader(ctx context.Context, off, length int64) (io.ReadCloser, error) {
r, err := b.bkt.GetRange(ctx, b.indexFilename(), off, length)
if err != nil {
return nil, errors.Wrap(err, "get range reader")
return nil, errors.Wrap(err, "get index range reader")
}
return r, nil
}

func (b *bucketBlock) readIndexRange(ctx context.Context, off, length int64) ([]byte, error) {
r, err := b.indexRangeReader(ctx, off, length)
if err != nil {
return nil, err
}
defer runutil.CloseWithLogOnErr(b.logger, r, "readIndexRange close range reader")

Expand Down
88 changes: 60 additions & 28 deletions pkg/storegateway/bucket_index_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,20 @@
package storegateway

import (
"bufio"
"context"
"encoding/binary"
"encoding/hex"
"fmt"
"io"
"sort"
"strings"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/runutil"
"github.com/oklog/ulid"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/pkg/errors"
Expand All @@ -33,6 +36,7 @@ import (
streamindex "github.com/grafana/mimir/pkg/storegateway/indexheader/index"
"github.com/grafana/mimir/pkg/util"
util_math "github.com/grafana/mimir/pkg/util/math"
"github.com/grafana/mimir/pkg/util/pool"
"github.com/grafana/mimir/pkg/util/spanlogger"
)

Expand Down Expand Up @@ -61,12 +65,14 @@ func newBucketIndexReader(block *bucketBlock, postingsStrategy postingsSelection

// ExpandedPostings returns postings in expanded list instead of index.Postings.
// This is because we need to have them buffered anyway to perform efficient lookup
// on object storage.
// Found posting IDs (ps) are not strictly required to point to a valid Series, e.g. during
// background garbage collections.
// on object storage. The returned postings are sorted.
//
// Reminder: A posting is a reference (represented as a uint64) to a series reference, which in turn points to the first
// chunk where the series contains the matching label-value pair for a given block of data. Postings can be fetched by
// Depending on the postingsSelectionStrategy there may be some pendingMatchers returned.
// If pendingMatchers is not empty, then the returned postings may or may not match the pendingMatchers.
// The caller is responsible for filtering the series of the postings with the pendingMatchers.
//
// Reminder: A posting is a reference (represented as a uint64) to a series, which points to the first
// byte of a series in the index for a given block of data. Postings can be fetched by
// single label name=value.
func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.Matcher, stats *safeQueryStats) (returnRefs []storage.SeriesRef, pendingMatchers []*labels.Matcher, returnErr error) {
var (
Expand Down Expand Up @@ -594,6 +600,7 @@ func (r *bucketIndexReader) decodePostings(b []byte, stats *safeQueryStats) (ind
return l, key, pendingMatchers, err
}

// preloadSeries expects the provided ids to be sorted.
func (r *bucketIndexReader) preloadSeries(ctx context.Context, ids []storage.SeriesRef, stats *safeQueryStats) (*bucketIndexLoadedSeries, error) {
span, ctx := tracing.StartSpan(ctx, "preloadSeries()")
defer span.Finish()
Expand Down Expand Up @@ -625,49 +632,74 @@ func (r *bucketIndexReader) preloadSeries(ctx context.Context, ids []storage.Ser
return loaded, g.Wait()
}

var seriesOffsetReaders = &sync.Pool{New: func() any {
return &offsetTrackingReader{r: bufio.NewReaderSize(nil, 32*1024)}
}}

// loadSeries expects the provided ids to be sorted.
func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.SeriesRef, refetch bool, start, end uint64, loaded *bucketIndexLoadedSeries, stats *safeQueryStats) error {
begin := time.Now()
defer r.recordLoadSeriesStats(stats, refetch, len(ids), start, end, time.Now())

b, err := r.block.readIndexRange(ctx, int64(start), int64(end-start))
reader, err := r.block.indexRangeReader(ctx, int64(start), int64(end-start))
if err != nil {
return errors.Wrap(err, "read series range")
}
defer runutil.CloseWithLogOnErr(r.block.logger, reader, "loadSeries close range reader")

stats.update(func(stats *queryStats) {
stats.seriesFetchCount++
stats.seriesFetched += len(ids)
stats.seriesFetchDurationSum += time.Since(begin)
stats.seriesFetchedSizeSum += int(end - start)
})
offsetReader := seriesOffsetReaders.Get().(*offsetTrackingReader)
defer seriesOffsetReaders.Put(offsetReader)

for i, id := range ids {
c := b[uint64(id)-start:]
offsetReader.Reset(start, reader)
defer offsetReader.Release()

// Use a slab pool to reduce allocations by sharding one large slice of bytes instead of allocating each series' bytes separately.
// But in order to avoid a race condition with an async cache, we never release the pool and let the GC collect it.
bytesPool := pool.NewSlabPool[byte](pool.NoopPool{}, seriesBytesSlabSize)

l, n := binary.Uvarint(c)
if n < 1 {
return errors.New("reading series length failed")
for i, id := range ids {
// We iterate the series in order assuming they are sorted.
err := offsetReader.SkipTo(uint64(id))
if err != nil {
return err
}
seriesSize, err := binary.ReadUvarint(offsetReader)
if err != nil {
return err
}
if len(c) < n+int(l) {
seriesBytes := bytesPool.Get(int(seriesSize))
n, err := io.ReadFull(offsetReader, seriesBytes)
if errors.Is(err, io.ErrUnexpectedEOF) {
if i == 0 && refetch {
return errors.Errorf("invalid remaining size, even after refetch, remaining: %d, expected %d", len(c), n+int(l))
return errors.Errorf("invalid remaining size, even after refetch, read %d, expected %d", n, seriesSize)
}

level.Warn(r.block.logger).Log("msg", "series size exceeded expected size; refetching", "series_id", id, "series_length", seriesSize, "max_series_size", maxSeriesSize)
// Inefficient, but should be rare.
r.block.metrics.seriesRefetches.Inc()
level.Warn(r.block.logger).Log("msg", "series size exceeded expected size; refetching", "id", id, "series length", n+int(l), "maxSeriesSize", maxSeriesSize)

// Fetch plus to get the size of next one if exists.
return r.loadSeries(ctx, ids[i:], true, uint64(id), uint64(id)+uint64(n+int(l)+1), loaded, stats)
return r.loadSeries(ctx, ids[i:], true, uint64(id), uint64(id)+binary.MaxVarintLen64+seriesSize+1, loaded, stats)
} else if err != nil {
return errors.Wrapf(err, "fetching series %d from block %s", id, r.block.meta.ULID)
}
c = c[n : n+int(l)]

loaded.addSeries(id, c)
loaded.addSeries(id, seriesBytes)

r.block.indexCache.StoreSeriesForRef(r.block.userID, r.block.meta.ULID, id, c)
r.block.indexCache.StoreSeriesForRef(r.block.userID, r.block.meta.ULID, id, seriesBytes)
}
return nil
}

func (r *bucketIndexReader) recordLoadSeriesStats(stats *safeQueryStats, refetch bool, numSeries int, start, end uint64, loadStartTime time.Time) {
stats.update(func(stats *queryStats) {
if !refetch {
// only the root loadSeries will record the time
stats.seriesFetchDurationSum += time.Since(loadStartTime)
} else {
stats.seriesRefetches++
}
stats.seriesFetched += numSeries
stats.seriesFetchedSizeSum += int(end - start)
})
}

// Close released the underlying resources of the reader.
func (r *bucketIndexReader) Close() error {
r.block.pendingReaders.Done()
Expand Down
20 changes: 10 additions & 10 deletions pkg/storegateway/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,9 @@ func TestBucketBlockSet_remove(t *testing.T) {
}

// Regression tests against: https://github.com/thanos-io/thanos/issues/1983.
func TestReadIndexCache_LoadSeries(t *testing.T) {
func TestBucketIndexReader_RefetchSeries(t *testing.T) {
bkt := objstore.NewInMemBucket()

s := NewBucketStoreMetrics(nil)
b := &bucketBlock{
meta: &metadata.Meta{
BlockMeta: tsdb.BlockMeta{
Expand All @@ -203,7 +202,7 @@ func TestReadIndexCache_LoadSeries(t *testing.T) {
},
bkt: bkt,
logger: log.NewNopLogger(),
metrics: s,
metrics: NewBucketStoreMetrics(nil),
indexCache: noopCache{},
}

Expand All @@ -223,32 +222,33 @@ func TestReadIndexCache_LoadSeries(t *testing.T) {
}

// Success with no refetches.
stats := newSafeQueryStats()
loaded := newBucketIndexLoadedSeries()
assert.NoError(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2, 13, 24}, false, 2, 100, loaded, newSafeQueryStats()))
assert.NoError(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2, 13, 24}, false, 2, 100, loaded, stats))
assert.Equal(t, map[storage.SeriesRef][]byte{
2: []byte("aaaaaaaaaa"),
13: []byte("bbbbbbbbbb"),
24: []byte("cccccccccc"),
}, loaded.series)
assert.Equal(t, float64(0), promtest.ToFloat64(s.seriesRefetches))
assert.Equal(t, 0, stats.export().seriesRefetches)

// Success with 2 refetches.
loaded = newBucketIndexLoadedSeries()
assert.NoError(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2, 13, 24}, false, 2, 15, loaded, newSafeQueryStats()))
assert.NoError(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2, 13, 24}, false, 2, 15, loaded, stats))
assert.Equal(t, map[storage.SeriesRef][]byte{
2: []byte("aaaaaaaaaa"),
13: []byte("bbbbbbbbbb"),
24: []byte("cccccccccc"),
}, loaded.series)
assert.Equal(t, float64(2), promtest.ToFloat64(s.seriesRefetches))
assert.Equal(t, 2, stats.export().seriesRefetches)

// Success with refetch on first element.
loaded = newBucketIndexLoadedSeries()
assert.NoError(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2}, false, 2, 5, loaded, newSafeQueryStats()))
assert.NoError(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2}, false, 2, 5, loaded, stats))
assert.Equal(t, map[storage.SeriesRef][]byte{
2: []byte("aaaaaaaaaa"),
}, loaded.series)
assert.Equal(t, float64(3), promtest.ToFloat64(s.seriesRefetches))
assert.Equal(t, 3, stats.export().seriesRefetches)

buf.Reset()
buf.PutByte(0)
Expand All @@ -258,7 +258,7 @@ func TestReadIndexCache_LoadSeries(t *testing.T) {
assert.NoError(t, bkt.Upload(context.Background(), filepath.Join(b.meta.ULID.String(), block.IndexFilename), bytes.NewReader(buf.Get())))

// Fail, but no recursion at least.
assert.Error(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2, 13, 24}, false, 1, 15, newBucketIndexLoadedSeries(), newSafeQueryStats()))
assert.Error(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2, 13, 24}, false, 1, 15, newBucketIndexLoadedSeries(), stats))
}

func TestBlockLabelNames(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions pkg/storegateway/indexcache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type IndexCache interface {

// FetchMultiSeriesForRefs fetches multiple series - each identified by ID - from the cache
// and returns a map containing cache hits, along with a list of missing IDs.
// The order of the returned misses should be the same as their relative order in the provided ids.
FetchMultiSeriesForRefs(ctx context.Context, userID string, blockID ulid.ULID, ids []storage.SeriesRef) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef)

// StoreExpandedPostings stores the result of ExpandedPostings, encoded with an unspecified codec.
Expand Down
7 changes: 5 additions & 2 deletions pkg/storegateway/indexcache/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (c *RemoteIndexCache) FetchMultiSeriesForRefs(ctx context.Context, userID s
// Build the cache keys, while keeping a map between input id and the cache key
// so that we can easily reverse it back after the GetMulti().
keys := make([]string, 0, len(ids))
keysMapping := map[storage.SeriesRef]string{}
keysMapping := make(map[storage.SeriesRef]string, len(ids))

for _, id := range ids {
key := seriesForRefCacheKey(userID, blockID, id)
Expand All @@ -246,7 +246,10 @@ func (c *RemoteIndexCache) FetchMultiSeriesForRefs(ctx context.Context, userID s

// Construct the resulting hits map and list of missing keys. We iterate on the input
// list of ids to be able to easily create the list of ones in a single iteration.
hits = map[storage.SeriesRef][]byte{}
hits = make(map[storage.SeriesRef][]byte, len(results))
if numMisses := len(ids) - len(results); numMisses > 0 {
misses = make([]storage.SeriesRef, 0, numMisses)
}

for _, id := range ids {
key, ok := keysMapping[id]
Expand Down
43 changes: 43 additions & 0 deletions pkg/storegateway/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package storegateway

import (
"bufio"
"fmt"
"io"

"github.com/pkg/errors"
Expand Down Expand Up @@ -125,3 +126,45 @@ func readByteRanges(src io.Reader, dst []byte, byteRanges byteRanges) ([]byte, e

return dst, nil
}

type offsetTrackingReader struct {
offset uint64
r *bufio.Reader
}

// SkipTo skips to the provided offset.
func (r *offsetTrackingReader) SkipTo(at uint64) error {
if diff := int(at - r.offset); diff < 0 {
return fmt.Errorf("cannot reverse reader: offset %d, at %d", r.offset, at)
} else if diff > 0 {
n, err := r.r.Discard(diff)
r.offset += uint64(n)
if err != nil {
return fmt.Errorf("discarding sequence reader: %w", err)
}
}
return nil
}

// ReadByte implement io.ByteReader for compatibility with binary.ReadUvarint.
func (r *offsetTrackingReader) ReadByte() (byte, error) {
r.offset++
return r.r.ReadByte()
}

// Read implements io.Reader.
func (r *offsetTrackingReader) Read(p []byte) (int, error) {
n, err := r.r.Read(p)
r.offset += uint64(n)
return n, err
}

func (r *offsetTrackingReader) Reset(offset uint64, reader io.Reader) {
r.r.Reset(reader)
r.offset = offset
}

// Release removes the reference to the underlying reader
func (r *offsetTrackingReader) Release() {
r.Reset(0, nil)
}
4 changes: 4 additions & 0 deletions pkg/storegateway/series_chunks.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ const (
// fine-grained chunks cache is enabled (byte slices have variable size and contain many chunks) or disabled (byte slices
// are at most 16KB each).
chunkBytesSlabSize = 160 * 1024

// Selected so that most series fit it and at the same time it's not too large for requests with few series.
// Most series are less than 4096 B.
seriesBytesSlabSize = 16 * 1024
)

var (
Expand Down
1 change: 1 addition & 0 deletions pkg/storegateway/series_refs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1224,6 +1224,7 @@ func shardOwned(shard *sharding.ShardSelector, hasher seriesHasher, id storage.S
return hash%shard.ShardCount == shard.ShardIndex
}

// postingsSetsIterator splits the provided postings into sets, while retaining their original order.
type postingsSetsIterator struct {
postings []storage.SeriesRef

Expand Down
Loading

0 comments on commit 14080d0

Please sign in to comment.