Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduced allocated memory by chunks reader in the store gateway at query time #3814

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re
### Fixed

- [#3773](https://github.com/thanos-io/thanos/pull/3773) Compact: Pad compaction planner size check
- [#3796](https://github.com/thanos-io/thanos/pull/3796) Store: Decreased memory allocations while fetching block's chunks.
- [#3814](https://github.com/thanos-io/thanos/pull/3814) Store: Decreased memory utilisation while fetching block's chunks.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR superseed the smaller optimisation done in #3796.

- [#3815](https://github.com/thanos-io/thanos/pull/3815) Receive: Improve handling of empty time series from clients

### Changed
Expand Down
70 changes: 49 additions & 21 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"context"
"encoding/binary"
"fmt"
"io"
"io/ioutil"
"math"
"os"
Expand Down Expand Up @@ -1461,33 +1460,30 @@ func (b *bucketBlock) readIndexRange(ctx context.Context, off, length int64) ([]
return buf.Bytes(), nil
}

func (b *bucketBlock) readChunkRange(ctx context.Context, seq int, off, length int64) (*[]byte, error) {
func (b *bucketBlock) readChunkRange(ctx context.Context, seq int, off, length int64, chunkRanges byteRanges) (*[]byte, error) {
if seq < 0 || seq >= len(b.chunkObjs) {
return nil, errors.Errorf("unknown segment file for index %d", seq)
}

// Request bytes.MinRead extra space to ensure the copy operation will not trigger
// a memory reallocation.
c, err := b.chunkPool.Get(int(length) + bytes.MinRead)
// Get a reader for the required range.
reader, err := b.bkt.GetRange(ctx, b.chunkObjs[seq], off, length)
if err != nil {
return nil, errors.Wrap(err, "allocate chunk bytes")
return nil, errors.Wrap(err, "get range reader")
}
defer runutil.CloseWithLogOnErr(b.logger, reader, "readChunkRange close range reader")

buf := bytes.NewBuffer(*c)

r, err := b.bkt.GetRange(ctx, b.chunkObjs[seq], off, length)
// Get a buffer from the pool.
chunkBuffer, err := b.chunkPool.Get(chunkRanges.size())
if err != nil {
b.chunkPool.Put(c)
return nil, errors.Wrap(err, "get range reader")
return nil, errors.Wrap(err, "allocate chunk bytes")
}
defer runutil.CloseWithLogOnErr(b.logger, r, "readChunkRange close range reader")

if _, err = io.Copy(buf, r); err != nil {
b.chunkPool.Put(c)
return nil, errors.Wrap(err, "read range")
*chunkBuffer, err = readByteRanges(reader, *chunkBuffer, chunkRanges)
if err != nil {
return nil, err
}
internalBuf := buf.Bytes()
return &internalBuf, nil

return chunkBuffer, nil
}

func (b *bucketBlock) indexReader(ctx context.Context) *bucketIndexReader {
Expand Down Expand Up @@ -2255,7 +2251,11 @@ func (r *bucketChunkReader) preload() error {
func (r *bucketChunkReader) loadChunks(ctx context.Context, offs []uint32, seq int, start, end uint32) error {
fetchBegin := time.Now()

b, err := r.block.readChunkRange(ctx, seq, int64(start), int64(end-start))
// Compute the byte ranges of chunks we actually need. The total read data may be bigger
// than required because of the partitioner.
chunkRanges := chunkOffsetsToByteRanges(offs, start)
pracucci marked this conversation as resolved.
Show resolved Hide resolved

b, err := r.block.readChunkRange(ctx, seq, int64(start), int64(end-start), chunkRanges)
if err != nil {
return errors.Wrapf(err, "read range for %d", seq)
}
Expand All @@ -2275,8 +2275,13 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, offs []uint32, seq i
r.stats.chunksFetchDurationSum += time.Since(fetchBegin)
r.stats.chunksFetchedSizeSum += int(end - start)

for _, o := range offs {
cb := (*b)[o-start:]
readOffset := 0
for idx, o := range offs {
chunkRange := chunkRanges[idx]

// The chunks byte ranges are stored contiguously in the data buffer.
cb := (*b)[readOffset : readOffset+chunkRange.length]
readOffset += chunkRange.length

l, n := binary.Uvarint(cb)
if n < 1 {
Expand All @@ -2301,7 +2306,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, offs []uint32, seq i
fetchBegin = time.Now()

// Read entire chunk into new buffer.
nb, err := r.block.readChunkRange(ctx, seq, int64(o), int64(chLen))
nb, err := r.block.readChunkRange(ctx, seq, int64(o), int64(chLen), []byteRange{{offset: 0, length: chLen}})
if err != nil {
return errors.Wrapf(err, "preloaded chunk too small, expecting %d, and failed to fetch full chunk", chLen)
}
Expand All @@ -2324,6 +2329,29 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, offs []uint32, seq i
return nil
}

// chunkOffsetsToByteRanges returns non-overlapping byte ranges with each range offset
// relative to start. The provided input offsets must be sorted.
func chunkOffsetsToByteRanges(offsets []uint32, start uint32) byteRanges {
ranges := make([]byteRange, len(offsets))

for idx := 0; idx < len(offsets); idx++ {
ranges[idx] = byteRange{
// The byte range offset is required to be relative to the start of the read slice.
offset: int(offsets[idx] - start),
length: maxChunkSize,
}

if idx > 0 {
// Ensure ranges are non overlapping.
if prev := ranges[idx-1]; prev.length > ranges[idx].offset-prev.offset {
ranges[idx-1].length = ranges[idx].offset - prev.offset
}
}
}

return ranges
}

func (r *bucketChunkReader) Chunk(id uint64) (chunkenc.Chunk, error) {
c, ok := r.chunks[id]
if !ok {
Expand Down
160 changes: 159 additions & 1 deletion pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2309,9 +2309,167 @@ func BenchmarkBucketBlock_readChunkRange(b *testing.B) {
offset := int64(0)
length := readLengths[n%len(readLengths)]

_, err := blk.readChunkRange(ctx, 0, offset, length)
_, err := blk.readChunkRange(ctx, 0, offset, length, byteRanges{{offset: 0, length: int(length)}})
if err != nil {
b.Fatal(err.Error())
}
}
}

func BenchmarkBlockSeries(b *testing.B) {
var (
ctx = context.Background()
logger = log.NewNopLogger()
)

tmpDir, err := ioutil.TempDir("", "benchmark")
testutil.Ok(b, err)
b.Cleanup(func() {
testutil.Ok(b, os.RemoveAll(tmpDir))
})

bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt"))
testutil.Ok(b, err)
b.Cleanup(func() {
testutil.Ok(b, bkt.Close())
})

// Create a block.
head, _ := storetestutil.CreateHeadWithSeries(b, 0, storetestutil.HeadGenOptions{
TSDBDir: filepath.Join(tmpDir, "head"),
SamplesPerSeries: 86400 / 15, // Simulate 1 day block with 15s scrape interval.
Series: 1000,
PrependLabels: nil,
Random: rand.New(rand.NewSource(120)),
SkipChunks: true,
})
blockID := createBlockFromHead(b, tmpDir, head)
testutil.Ok(b, head.Close())

// Upload the block to the bucket.
thanosMeta := metadata.Thanos{
Labels: labels.Labels{{Name: "ext1", Value: "1"}}.Map(),
Downsample: metadata.ThanosDownsample{Resolution: 0},
Source: metadata.TestSource,
}

blockMeta, err := metadata.InjectThanos(logger, filepath.Join(tmpDir, blockID.String()), thanosMeta, nil)
testutil.Ok(b, err)

testutil.Ok(b, block.Upload(context.Background(), logger, bkt, filepath.Join(tmpDir, blockID.String())))

// Create chunk pool and partitioner using the same production settings.
chunkPool, err := NewDefaultChunkBytesPool(64 * 1024 * 1024 * 1024)
testutil.Ok(b, err)

partitioner := NewGapBasedPartitioner(PartitionerMaxGapSize)

// Create an index header reader.
indexHeaderReader, err := indexheader.NewBinaryReader(ctx, logger, bkt, tmpDir, blockMeta.ULID, DefaultPostingOffsetInMemorySampling)
testutil.Ok(b, err)
indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, storecache.DefaultInMemoryIndexCacheConfig)
testutil.Ok(b, err)

// Create a bucket block with only the dependencies we need for the benchmark.
blk, err := newBucketBlock(context.Background(), logger, newBucketStoreMetrics(nil), blockMeta, bkt, tmpDir, indexCache, chunkPool, indexHeaderReader, partitioner)
testutil.Ok(b, err)

for _, concurrency := range []int{1, 2, 4, 8, 16, 32} {
b.Run(fmt.Sprintf("concurrency: %d", concurrency), func(b *testing.B) {
benchmarkBlockSeriesWithConcurrency(b, concurrency, blockMeta, blk)
})
}
}

func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMeta *metadata.Meta, blk *bucketBlock) {
ctx := context.Background()

// Run the same number of queries per goroutine.
queriesPerWorker := b.N / concurrency

// No limits.
chunksLimiter := NewChunksLimiterFactory(0)(nil)
seriesLimiter := NewSeriesLimiterFactory(0)(nil)

// Run multiple workers to execute the queries.
wg := sync.WaitGroup{}
wg.Add(concurrency)

for w := 0; w < concurrency; w++ {
go func() {
defer wg.Done()

for n := 0; n < queriesPerWorker; n++ {
// Each query touches a subset of series. To make it reproducible and make sure
// we just don't query consecutive series (as is in the real world), we do create
// a label matcher which looks for a short integer within the label value.
labelMatcher := fmt.Sprintf(".*%d.*", n%20)

req := &storepb.SeriesRequest{
MinTime: blockMeta.MinTime,
MaxTime: blockMeta.MaxTime,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_RE, Name: "i", Value: labelMatcher},
},
SkipChunks: false,
}

matchers, err := storepb.MatchersToPromMatchers(req.Matchers...)
testutil.Ok(b, err)

indexReader := blk.indexReader(ctx)
chunkReader := blk.chunkReader(ctx)

seriesSet, _, err := blockSeries(nil, indexReader, chunkReader, matchers, req, chunksLimiter, seriesLimiter)
testutil.Ok(b, err)

// Ensure at least 1 series has been returned (as expected).
testutil.Equals(b, true, seriesSet.Next())

testutil.Ok(b, indexReader.Close())
testutil.Ok(b, chunkReader.Close())
}
}()
}

wg.Wait()
}

func TestChunkOffsetsToByteRanges(t *testing.T) {
tests := map[string]struct {
offsets []uint32
start uint32
expected byteRanges
}{
"no offsets in input": {
offsets: nil,
expected: byteRanges{},
},
"no overlapping ranges in input": {
offsets: []uint32{1000, 20000, 45000},
start: 1000,
expected: byteRanges{
{offset: 0, length: 16000},
{offset: 19000, length: 16000},
{offset: 44000, length: 16000},
},
},
"overlapping ranges in input": {
offsets: []uint32{1000, 5000, 9500, 30000},
start: 1000,
expected: byteRanges{
{offset: 0, length: 4000},
{offset: 4000, length: 4500},
{offset: 8500, length: 16000},
{offset: 29000, length: 16000},
},
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
testutil.Equals(t, len(testData.offsets), len(testData.expected))
testutil.Equals(t, testData.expected, chunkOffsetsToByteRanges(testData.offsets, testData.start))
})
}
}
Loading