Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhanced bytes limiter with data type param #7414

Merged
merged 17 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 62 additions & 39 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,17 @@ import (
"github.com/thanos-io/thanos/pkg/tracing"
)

type StoreDataType int

const (
PostingsFetched StoreDataType = iota
PostingsTouched
SeriesFetched
SeriesTouched
ChunksFetched
ChunksTouched
)

const (
// MaxSamplesPerChunk is approximately the max number of samples that we may have in any given chunk. This is needed
// for precalculating the number of samples that we may have to retrieve and decode for any given query
Expand Down Expand Up @@ -388,10 +399,10 @@ type BucketStore struct {
// seriesLimiterFactory creates a new limiter used to limit the number of touched series by each Series() call,
// or LabelName and LabelValues calls when used with matchers.
seriesLimiterFactory SeriesLimiterFactory

// bytesLimiterFactory creates a new limiter used to limit the amount of bytes fetched/touched by each Series() call.
bytesLimiterFactory BytesLimiterFactory
partitioner Partitioner

partitioner Partitioner

filterConfig *FilterConfig
advLabelSets []labelpb.ZLabelSet
Expand Down Expand Up @@ -2869,12 +2880,11 @@ func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context,
if !hit {
return false, nil, nil
}
if err := bytesLimiter.Reserve(uint64(len(dataFromCache))); err != nil {
if err := bytesLimiter.ReserveWithType(uint64(len(dataFromCache)), PostingsTouched); err != nil {
return false, nil, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading expanded postings from index cache: %s", err)
}
r.stats.DataDownloadedSizeSum += units.Base2Bytes(len(dataFromCache))
r.stats.postingsTouched++
r.stats.PostingsTouchedSizeSum += units.Base2Bytes(len(dataFromCache))

r.stats.add(PostingsTouched, 1, len(dataFromCache))
p, closeFns, err := r.decodeCachedPostings(dataFromCache)
defer func() {
for _, closeFn := range closeFns {
Expand Down Expand Up @@ -2943,10 +2953,9 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
// Fetch postings from the cache with a single call.
fromCache, _ := r.block.indexCache.FetchMultiPostings(ctx, r.block.meta.ULID, keys, tenant)
for _, dataFromCache := range fromCache {
if err := bytesLimiter.Reserve(uint64(len(dataFromCache))); err != nil {
if err := bytesLimiter.ReserveWithType(uint64(len(dataFromCache)), PostingsTouched); err != nil {
return nil, closeFns, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading postings from index cache: %s", err)
}
r.stats.DataDownloadedSizeSum += units.Base2Bytes(len(dataFromCache))
}

// Iterate over all groups and fetch posting from cache.
Expand All @@ -2958,8 +2967,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
}
// Get postings for the given key from cache first.
if b, ok := fromCache[key]; ok {
r.stats.postingsTouched++
r.stats.PostingsTouchedSizeSum += units.Base2Bytes(len(b))
r.stats.add(PostingsTouched, 1, len(b))

l, closer, err := r.decodeCachedPostings(b)
if err != nil {
Expand Down Expand Up @@ -3000,10 +3008,9 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
start := int64(part.Start)
length := int64(part.End) - start

if err := bytesLimiter.Reserve(uint64(length)); err != nil {
if err := bytesLimiter.ReserveWithType(uint64(length), PostingsFetched); err != nil {
return nil, closeFns, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while fetching postings: %s", err)
}
r.stats.DataDownloadedSizeSum += units.Base2Bytes(length)
}

g, ctx := errgroup.WithContext(ctx)
Expand Down Expand Up @@ -3035,8 +3042,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
rdr := newPostingsReaderBuilder(ctx, brdr, ptrs[i:j], start, length)

stats.postingsFetchCount++
stats.postingsFetched += j - i
stats.PostingsFetchedSizeSum += units.Base2Bytes(int(length))
stats.add(PostingsFetched, j-i, int(length))

for rdr.Next() {
diffVarintPostings, postingsCount, keyID := rdr.AtDiffVarint()
Expand All @@ -3050,12 +3056,11 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
return errors.Wrap(err, "encoding with snappy")
}

stats.postingsTouched++
stats.PostingsTouchedSizeSum += units.Base2Bytes(int(len(diffVarintPostings)))
stats.cachedPostingsCompressions += 1
stats.CachedPostingsOriginalSizeSum += units.Base2Bytes(len(diffVarintPostings))
stats.CachedPostingsCompressedSizeSum += units.Base2Bytes(len(dataToCache))
stats.CachedPostingsCompressionTimeSum += time.Since(startCompression)
stats.add(PostingsTouched, 1, len(diffVarintPostings))

r.block.indexCache.StorePostings(r.block.meta.ULID, keys[keyID], dataToCache, tenant)
}
Expand Down Expand Up @@ -3178,10 +3183,9 @@ func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.Ser
fromCache, ids := r.block.indexCache.FetchMultiSeries(ctx, r.block.meta.ULID, ids, tenant)
for id, b := range fromCache {
r.loadedSeries[id] = b
if err := bytesLimiter.Reserve(uint64(len(b))); err != nil {
if err := bytesLimiter.ReserveWithType(uint64(len(b)), SeriesTouched); err != nil {
return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading series from index cache: %s", err)
}
r.stats.DataDownloadedSizeSum += units.Base2Bytes(len(b))
}

parts := r.block.partitioner.Partition(len(ids), func(i int) (start, end uint64) {
Expand All @@ -3207,11 +3211,8 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.Series
r.stats.merge(stats)
}()

if bytesLimiter != nil {
if err := bytesLimiter.Reserve(uint64(end - start)); err != nil {
return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while fetching series: %s", err)
}
stats.DataDownloadedSizeSum += units.Base2Bytes(end - start)
if err := bytesLimiter.ReserveWithType(uint64(end-start), SeriesFetched); err != nil {
return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while fetching series: %s", err)
}

b, err := r.block.readIndexRange(ctx, int64(start), int64(end-start), r.logger)
Expand All @@ -3220,9 +3221,8 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.Series
}

stats.seriesFetchCount++
stats.seriesFetched += len(ids)
stats.SeriesFetchDurationSum += time.Since(begin)
stats.SeriesFetchedSizeSum += units.Base2Bytes(int(end - start))
stats.add(SeriesFetched, len(ids), int(end-start))

for i, id := range ids {
c := b[uint64(id)-start:]
Expand Down Expand Up @@ -3325,8 +3325,7 @@ func (r *bucketIndexReader) LoadSeriesForTime(ref storage.SeriesRef, lset *[]sym
return false, errors.Errorf("series %d not found", ref)
}

r.stats.seriesTouched++
r.stats.SeriesTouchedSizeSum += units.Base2Bytes(len(b))
r.stats.add(SeriesTouched, 1, len(b))
justinjung04 marked this conversation as resolved.
Show resolved Hide resolved
return decodeSeriesForTime(b, lset, chks, skipChunks, mint, maxt)
}

Expand Down Expand Up @@ -3514,10 +3513,9 @@ func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs [
})

for _, p := range parts {
if err := bytesLimiter.Reserve(uint64(p.End - p.Start)); err != nil {
if err := bytesLimiter.ReserveWithType(uint64(p.End-p.Start), ChunksFetched); err != nil {
return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while fetching chunks: %s", err)
}
r.stats.DataDownloadedSizeSum += units.Base2Bytes(p.End - p.Start)
}

for _, p := range parts {
Expand Down Expand Up @@ -3551,8 +3549,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a
bufReader := bufio.NewReaderSize(reader, r.block.estimatedMaxChunkSize)

stats.chunksFetchCount++
stats.chunksFetched += len(pIdxs)
stats.ChunksFetchedSizeSum += units.Base2Bytes(int(part.End - part.Start))
stats.add(ChunksFetched, len(pIdxs), int(part.End-part.Start))

var (
buf []byte
Expand Down Expand Up @@ -3613,8 +3610,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a
if err != nil {
return errors.Wrap(err, "populate chunk")
}
stats.chunksTouched++
stats.ChunksTouchedSizeSum += units.Base2Bytes(int(chunkDataLen))
stats.add(ChunksTouched, 1, int(chunkDataLen))
continue
}

Expand All @@ -3623,10 +3619,9 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a
fetchBegin = time.Now()
// Read entire chunk into new buffer.
// TODO: readChunkRange call could be avoided for any chunk but last in this particular part.
if err := bytesLimiter.Reserve(uint64(chunkLen)); err != nil {
if err := bytesLimiter.ReserveWithType(uint64(chunkLen), ChunksTouched); err != nil {
return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while fetching chunks: %s", err)
}
stats.DataDownloadedSizeSum += units.Base2Bytes(chunkLen)

nb, err := r.block.readChunkRange(ctx, seq, int64(pIdx.offset), int64(chunkLen), []byteRange{{offset: 0, length: chunkLen}}, r.logger)
if err != nil {
Expand All @@ -3636,16 +3631,15 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a
return errors.Errorf("preloaded chunk too small, expecting %d", chunkLen)
}

stats.chunksFetchCount++
stats.ChunksFetchedSizeSum += units.Base2Bytes(len(*nb))
stats.add(ChunksFetched, 1, len(*nb))
c := rawChunk((*nb)[n:])
err = populateChunk(&(res[pIdx.seriesEntry].chks[pIdx.chunk]), &c, aggrs, r.save, calculateChunkChecksum)
if err != nil {
r.block.chunkPool.Put(nb)
return errors.Wrap(err, "populate chunk")
}
stats.chunksTouched++
stats.ChunksTouchedSizeSum += units.Base2Bytes(int(chunkDataLen))

stats.add(ChunksTouched, 1, int(chunkDataLen))

r.block.chunkPool.Put(nb)
}
Expand Down Expand Up @@ -3746,6 +3740,35 @@ type queryStats struct {
DataDownloadedSizeSum units.Base2Bytes
}

func (s *queryStats) add(dataType StoreDataType, dataCount int, dataSize int) {
s.mtx.Lock()
defer s.mtx.Unlock()

switch dataType {
case PostingsFetched:
s.postingsFetched += dataCount
s.PostingsFetchedSizeSum += units.Base2Bytes(dataSize)
case PostingsTouched:
s.postingsTouched += dataCount
s.PostingsTouchedSizeSum += units.Base2Bytes(dataSize)
case SeriesFetched:
s.seriesFetched += dataCount
s.SeriesFetchedSizeSum += units.Base2Bytes(dataSize)
case SeriesTouched:
s.seriesTouched += dataCount
s.SeriesTouchedSizeSum += units.Base2Bytes(dataSize)
case ChunksFetched:
s.chunksFetched += dataCount
s.ChunksFetchedSizeSum += units.Base2Bytes(dataSize)
case ChunksTouched:
s.chunksTouched += dataCount
s.ChunksTouchedSizeSum += units.Base2Bytes(dataSize)
default:
return
}
s.DataDownloadedSizeSum += units.Base2Bytes(dataSize)
}

func (s *queryStats) merge(o *queryStats) {
s.mtx.Lock()
defer s.mtx.Unlock()
Expand Down
40 changes: 40 additions & 0 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,46 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) {
}
}

func TestBucketStore_Series_CustomBytesLimiters_e2e(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
bkt := objstore.NewInMemBucket()

dir := t.TempDir()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), func(_ prometheus.Counter) BytesLimiter {
return &bytesLimiterMock{
limitFunc: func(_ uint64, dataType StoreDataType) error {
if dataType == PostingsFetched {
return fmt.Errorf("error reserving data type: PostingsFetched")
}

return nil
},
}
}, emptyRelabelConfig, allowAllFilterConf)
testutil.Ok(t, s.store.SyncBlocks(ctx))

req := &storepb.SeriesRequest{
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "a", Value: "1"},
},
MinTime: minTimeDuration.PrometheusTimestamp(),
MaxTime: maxTimeDuration.PrometheusTimestamp(),
}

s.cache.SwapWith(noopCache{})
srv := newStoreSeriesServer(ctx)
err := s.store.Series(req, srv)

testutil.NotOk(t, err)
testutil.Assert(t, strings.Contains(err.Error(), "exceeded bytes limit"))
testutil.Assert(t, strings.Contains(err.Error(), "error reserving data type: PostingsFetched"))
status, ok := status.FromError(err)
testutil.Equals(t, true, ok)
testutil.Equals(t, codes.ResourceExhausted, status.Code())
}

func TestBucketStore_LabelNames_e2e(t *testing.T) {
objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) {
ctx, cancel := context.WithCancel(context.Background())
Expand Down
45 changes: 44 additions & 1 deletion pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3852,6 +3852,9 @@ func TestBucketStoreStreamingSeriesLimit(t *testing.T) {
})
testutil.Ok(t, err)

firstBytesLimiterChecked := false
secondBytesLimiterChecked := false

// Set series limit to 2. Only pass if series limiter applies
// for lazy postings only.
bucketStore, err := NewBucketStore(
Expand All @@ -3860,7 +3863,24 @@ func TestBucketStoreStreamingSeriesLimit(t *testing.T) {
"",
NewChunksLimiterFactory(10e6),
NewSeriesLimiterFactory(2),
NewBytesLimiterFactory(10e6),
func(_ prometheus.Counter) BytesLimiter {
return &compositeBytesLimiterMock{
limiters: []BytesLimiter{
&bytesLimiterMock{
limitFunc: func(_ uint64, _ StoreDataType) error {
firstBytesLimiterChecked = true
return nil
},
},
&bytesLimiterMock{
limitFunc: func(_ uint64, _ StoreDataType) error {
secondBytesLimiterChecked = true
return nil
},
},
},
}
},
NewGapBasedPartitioner(PartitionerMaxGapSize),
20,
true,
Expand Down Expand Up @@ -3891,4 +3911,27 @@ func TestBucketStoreStreamingSeriesLimit(t *testing.T) {
srv := newStoreSeriesServer(context.Background())
testutil.Ok(t, bucketStore.Series(req, srv))
testutil.Equals(t, 2, len(srv.SeriesSet))
testutil.Equals(t, true, firstBytesLimiterChecked)
testutil.Equals(t, true, secondBytesLimiterChecked)
}

type bytesLimiterMock struct {
limitFunc func(uint64, StoreDataType) error
}

func (m *bytesLimiterMock) ReserveWithType(num uint64, dataType StoreDataType) error {
return m.limitFunc(num, dataType)
}

type compositeBytesLimiterMock struct {
limiters []BytesLimiter
}

func (m *compositeBytesLimiterMock) ReserveWithType(num uint64, dataType StoreDataType) error {
for _, l := range m.limiters {
if err := l.ReserveWithType(num, dataType); err != nil {
return err
}
}
return nil
}
6 changes: 5 additions & 1 deletion pkg/store/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type BytesLimiter interface {
// Reserve bytes out of the total amount of bytes enforced by the limiter.
// Returns an error if the limit has been exceeded. This function must be
// goroutine safe.
Reserve(num uint64) error
ReserveWithType(num uint64, dataType StoreDataType) error
}

// ChunksLimiterFactory is used to create a new ChunksLimiter. The factory is useful for
Expand Down Expand Up @@ -64,6 +64,10 @@ func NewLimiter(limit uint64, ctr prometheus.Counter) *Limiter {

// Reserve implements ChunksLimiter.
func (l *Limiter) Reserve(num uint64) error {
return l.ReserveWithType(num, 0)
}

func (l *Limiter) ReserveWithType(num uint64, _ StoreDataType) error {
if l == nil {
return nil
}
Expand Down
Loading