Skip to content

Commit

Permalink
Enhanced bytes limiter with data type param (#7414)
Browse files Browse the repository at this point in the history
* Refactor existing stats incrementation for touched and fetched data

Signed-off-by: Justin Jung <jungjust@amazon.com>

* Add TypedBytesLimiter

Signed-off-by: Justin Jung <jungjust@amazon.com>

* Remove addAndCheck func

Signed-off-by: Justin Jung <jungjust@amazon.com>

* Update BytesLimiter interface to accept dataType param

Signed-off-by: Justin Jung <jungjust@amazon.com>

* Added tests

Signed-off-by: Justin Jung <jungjust@amazon.com>

* Fix build + changelog

Signed-off-by: Justin Jung <jungjust@amazon.com>

* Fix wrong data type

Signed-off-by: Justin Jung <jungjust@amazon.com>

* Changed storeDataType to be exported

Signed-off-by: Justin Jung <jungjust@amazon.com>

* Revert []BytesLimiter to BytesLimtier

Signed-off-by: Justin Jung <jungjust@amazon.com>

* Lint

Signed-off-by: Justin Jung <jungjust@amazon.com>

* More reverts

Signed-off-by: Justin Jung <jungjust@amazon.com>

* More

Signed-off-by: Justin Jung <jungjust@amazon.com>

* Rename DefaultBytesLimiterFactory back to NewBytesLimiterFactory

Signed-off-by: Justin Jung <jungjust@amazon.com>

* Changed StoreDataType from string to int

Signed-off-by: Justin Jung <jungjust@amazon.com>

* Removed nil check for bytesLimiter

Signed-off-by: Justin Jung <jungjust@amazon.com>

* nit

Signed-off-by: Justin Jung <jungjust@amazon.com>

* Removed changelog

Signed-off-by: Justin Jung <jungjust@amazon.com>

---------

Signed-off-by: Justin Jung <jungjust@amazon.com>
  • Loading branch information
justinjung04 authored Jun 13, 2024
1 parent 86382a8 commit 651a4a4
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 41 deletions.
101 changes: 62 additions & 39 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,17 @@ import (
"github.com/thanos-io/thanos/pkg/tracing"
)

type StoreDataType int

const (
PostingsFetched StoreDataType = iota
PostingsTouched
SeriesFetched
SeriesTouched
ChunksFetched
ChunksTouched
)

const (
// MaxSamplesPerChunk is approximately the max number of samples that we may have in any given chunk. This is needed
// for precalculating the number of samples that we may have to retrieve and decode for any given query
Expand Down Expand Up @@ -388,10 +399,10 @@ type BucketStore struct {
// seriesLimiterFactory creates a new limiter used to limit the number of touched series by each Series() call,
// or LabelName and LabelValues calls when used with matchers.
seriesLimiterFactory SeriesLimiterFactory

// bytesLimiterFactory creates a new limiter used to limit the amount of bytes fetched/touched by each Series() call.
bytesLimiterFactory BytesLimiterFactory
partitioner Partitioner

partitioner Partitioner

filterConfig *FilterConfig
advLabelSets []labelpb.ZLabelSet
Expand Down Expand Up @@ -2869,12 +2880,11 @@ func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context,
if !hit {
return false, nil, nil
}
if err := bytesLimiter.Reserve(uint64(len(dataFromCache))); err != nil {
if err := bytesLimiter.ReserveWithType(uint64(len(dataFromCache)), PostingsTouched); err != nil {
return false, nil, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading expanded postings from index cache: %s", err)
}
r.stats.DataDownloadedSizeSum += units.Base2Bytes(len(dataFromCache))
r.stats.postingsTouched++
r.stats.PostingsTouchedSizeSum += units.Base2Bytes(len(dataFromCache))

r.stats.add(PostingsTouched, 1, len(dataFromCache))
p, closeFns, err := r.decodeCachedPostings(dataFromCache)
defer func() {
for _, closeFn := range closeFns {
Expand Down Expand Up @@ -2943,10 +2953,9 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
// Fetch postings from the cache with a single call.
fromCache, _ := r.block.indexCache.FetchMultiPostings(ctx, r.block.meta.ULID, keys, tenant)
for _, dataFromCache := range fromCache {
if err := bytesLimiter.Reserve(uint64(len(dataFromCache))); err != nil {
if err := bytesLimiter.ReserveWithType(uint64(len(dataFromCache)), PostingsTouched); err != nil {
return nil, closeFns, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading postings from index cache: %s", err)
}
r.stats.DataDownloadedSizeSum += units.Base2Bytes(len(dataFromCache))
}

// Iterate over all groups and fetch posting from cache.
Expand All @@ -2958,8 +2967,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
}
// Get postings for the given key from cache first.
if b, ok := fromCache[key]; ok {
r.stats.postingsTouched++
r.stats.PostingsTouchedSizeSum += units.Base2Bytes(len(b))
r.stats.add(PostingsTouched, 1, len(b))

l, closer, err := r.decodeCachedPostings(b)
if err != nil {
Expand Down Expand Up @@ -3000,10 +3008,9 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
start := int64(part.Start)
length := int64(part.End) - start

if err := bytesLimiter.Reserve(uint64(length)); err != nil {
if err := bytesLimiter.ReserveWithType(uint64(length), PostingsFetched); err != nil {
return nil, closeFns, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while fetching postings: %s", err)
}
r.stats.DataDownloadedSizeSum += units.Base2Bytes(length)
}

g, ctx := errgroup.WithContext(ctx)
Expand Down Expand Up @@ -3035,8 +3042,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
rdr := newPostingsReaderBuilder(ctx, brdr, ptrs[i:j], start, length)

stats.postingsFetchCount++
stats.postingsFetched += j - i
stats.PostingsFetchedSizeSum += units.Base2Bytes(int(length))
stats.add(PostingsFetched, j-i, int(length))

for rdr.Next() {
diffVarintPostings, postingsCount, keyID := rdr.AtDiffVarint()
Expand All @@ -3050,12 +3056,11 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
return errors.Wrap(err, "encoding with snappy")
}

stats.postingsTouched++
stats.PostingsTouchedSizeSum += units.Base2Bytes(int(len(diffVarintPostings)))
stats.cachedPostingsCompressions += 1
stats.CachedPostingsOriginalSizeSum += units.Base2Bytes(len(diffVarintPostings))
stats.CachedPostingsCompressedSizeSum += units.Base2Bytes(len(dataToCache))
stats.CachedPostingsCompressionTimeSum += time.Since(startCompression)
stats.add(PostingsTouched, 1, len(diffVarintPostings))

r.block.indexCache.StorePostings(r.block.meta.ULID, keys[keyID], dataToCache, tenant)
}
Expand Down Expand Up @@ -3178,10 +3183,9 @@ func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.Ser
fromCache, ids := r.block.indexCache.FetchMultiSeries(ctx, r.block.meta.ULID, ids, tenant)
for id, b := range fromCache {
r.loadedSeries[id] = b
if err := bytesLimiter.Reserve(uint64(len(b))); err != nil {
if err := bytesLimiter.ReserveWithType(uint64(len(b)), SeriesTouched); err != nil {
return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading series from index cache: %s", err)
}
r.stats.DataDownloadedSizeSum += units.Base2Bytes(len(b))
}

parts := r.block.partitioner.Partition(len(ids), func(i int) (start, end uint64) {
Expand All @@ -3207,11 +3211,8 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.Series
r.stats.merge(stats)
}()

if bytesLimiter != nil {
if err := bytesLimiter.Reserve(uint64(end - start)); err != nil {
return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while fetching series: %s", err)
}
stats.DataDownloadedSizeSum += units.Base2Bytes(end - start)
if err := bytesLimiter.ReserveWithType(uint64(end-start), SeriesFetched); err != nil {
return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while fetching series: %s", err)
}

b, err := r.block.readIndexRange(ctx, int64(start), int64(end-start), r.logger)
Expand All @@ -3220,9 +3221,8 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.Series
}

stats.seriesFetchCount++
stats.seriesFetched += len(ids)
stats.SeriesFetchDurationSum += time.Since(begin)
stats.SeriesFetchedSizeSum += units.Base2Bytes(int(end - start))
stats.add(SeriesFetched, len(ids), int(end-start))

for i, id := range ids {
c := b[uint64(id)-start:]
Expand Down Expand Up @@ -3325,8 +3325,7 @@ func (r *bucketIndexReader) LoadSeriesForTime(ref storage.SeriesRef, lset *[]sym
return false, errors.Errorf("series %d not found", ref)
}

r.stats.seriesTouched++
r.stats.SeriesTouchedSizeSum += units.Base2Bytes(len(b))
r.stats.add(SeriesTouched, 1, len(b))
return decodeSeriesForTime(b, lset, chks, skipChunks, mint, maxt)
}

Expand Down Expand Up @@ -3514,10 +3513,9 @@ func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs [
})

for _, p := range parts {
if err := bytesLimiter.Reserve(uint64(p.End - p.Start)); err != nil {
if err := bytesLimiter.ReserveWithType(uint64(p.End-p.Start), ChunksFetched); err != nil {
return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while fetching chunks: %s", err)
}
r.stats.DataDownloadedSizeSum += units.Base2Bytes(p.End - p.Start)
}

for _, p := range parts {
Expand Down Expand Up @@ -3551,8 +3549,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a
bufReader := bufio.NewReaderSize(reader, r.block.estimatedMaxChunkSize)

stats.chunksFetchCount++
stats.chunksFetched += len(pIdxs)
stats.ChunksFetchedSizeSum += units.Base2Bytes(int(part.End - part.Start))
stats.add(ChunksFetched, len(pIdxs), int(part.End-part.Start))

var (
buf []byte
Expand Down Expand Up @@ -3613,8 +3610,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a
if err != nil {
return errors.Wrap(err, "populate chunk")
}
stats.chunksTouched++
stats.ChunksTouchedSizeSum += units.Base2Bytes(int(chunkDataLen))
stats.add(ChunksTouched, 1, int(chunkDataLen))
continue
}

Expand All @@ -3623,10 +3619,9 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a
fetchBegin = time.Now()
// Read entire chunk into new buffer.
// TODO: readChunkRange call could be avoided for any chunk but last in this particular part.
if err := bytesLimiter.Reserve(uint64(chunkLen)); err != nil {
if err := bytesLimiter.ReserveWithType(uint64(chunkLen), ChunksTouched); err != nil {
return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while fetching chunks: %s", err)
}
stats.DataDownloadedSizeSum += units.Base2Bytes(chunkLen)

nb, err := r.block.readChunkRange(ctx, seq, int64(pIdx.offset), int64(chunkLen), []byteRange{{offset: 0, length: chunkLen}}, r.logger)
if err != nil {
Expand All @@ -3636,16 +3631,15 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a
return errors.Errorf("preloaded chunk too small, expecting %d", chunkLen)
}

stats.chunksFetchCount++
stats.ChunksFetchedSizeSum += units.Base2Bytes(len(*nb))
stats.add(ChunksFetched, 1, len(*nb))
c := rawChunk((*nb)[n:])
err = populateChunk(&(res[pIdx.seriesEntry].chks[pIdx.chunk]), &c, aggrs, r.save, calculateChunkChecksum)
if err != nil {
r.block.chunkPool.Put(nb)
return errors.Wrap(err, "populate chunk")
}
stats.chunksTouched++
stats.ChunksTouchedSizeSum += units.Base2Bytes(int(chunkDataLen))

stats.add(ChunksTouched, 1, int(chunkDataLen))

r.block.chunkPool.Put(nb)
}
Expand Down Expand Up @@ -3746,6 +3740,35 @@ type queryStats struct {
DataDownloadedSizeSum units.Base2Bytes
}

func (s *queryStats) add(dataType StoreDataType, dataCount int, dataSize int) {
s.mtx.Lock()
defer s.mtx.Unlock()

switch dataType {
case PostingsFetched:
s.postingsFetched += dataCount
s.PostingsFetchedSizeSum += units.Base2Bytes(dataSize)
case PostingsTouched:
s.postingsTouched += dataCount
s.PostingsTouchedSizeSum += units.Base2Bytes(dataSize)
case SeriesFetched:
s.seriesFetched += dataCount
s.SeriesFetchedSizeSum += units.Base2Bytes(dataSize)
case SeriesTouched:
s.seriesTouched += dataCount
s.SeriesTouchedSizeSum += units.Base2Bytes(dataSize)
case ChunksFetched:
s.chunksFetched += dataCount
s.ChunksFetchedSizeSum += units.Base2Bytes(dataSize)
case ChunksTouched:
s.chunksTouched += dataCount
s.ChunksTouchedSizeSum += units.Base2Bytes(dataSize)
default:
return
}
s.DataDownloadedSizeSum += units.Base2Bytes(dataSize)
}

func (s *queryStats) merge(o *queryStats) {
s.mtx.Lock()
defer s.mtx.Unlock()
Expand Down
40 changes: 40 additions & 0 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,46 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) {
}
}

func TestBucketStore_Series_CustomBytesLimiters_e2e(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
bkt := objstore.NewInMemBucket()

dir := t.TempDir()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), func(_ prometheus.Counter) BytesLimiter {
return &bytesLimiterMock{
limitFunc: func(_ uint64, dataType StoreDataType) error {
if dataType == PostingsFetched {
return fmt.Errorf("error reserving data type: PostingsFetched")
}

return nil
},
}
}, emptyRelabelConfig, allowAllFilterConf)
testutil.Ok(t, s.store.SyncBlocks(ctx))

req := &storepb.SeriesRequest{
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "a", Value: "1"},
},
MinTime: minTimeDuration.PrometheusTimestamp(),
MaxTime: maxTimeDuration.PrometheusTimestamp(),
}

s.cache.SwapWith(noopCache{})
srv := newStoreSeriesServer(ctx)
err := s.store.Series(req, srv)

testutil.NotOk(t, err)
testutil.Assert(t, strings.Contains(err.Error(), "exceeded bytes limit"))
testutil.Assert(t, strings.Contains(err.Error(), "error reserving data type: PostingsFetched"))
status, ok := status.FromError(err)
testutil.Equals(t, true, ok)
testutil.Equals(t, codes.ResourceExhausted, status.Code())
}

func TestBucketStore_LabelNames_e2e(t *testing.T) {
objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) {
ctx, cancel := context.WithCancel(context.Background())
Expand Down
45 changes: 44 additions & 1 deletion pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3852,6 +3852,9 @@ func TestBucketStoreStreamingSeriesLimit(t *testing.T) {
})
testutil.Ok(t, err)

firstBytesLimiterChecked := false
secondBytesLimiterChecked := false

// Set series limit to 2. Only pass if series limiter applies
// for lazy postings only.
bucketStore, err := NewBucketStore(
Expand All @@ -3860,7 +3863,24 @@ func TestBucketStoreStreamingSeriesLimit(t *testing.T) {
"",
NewChunksLimiterFactory(10e6),
NewSeriesLimiterFactory(2),
NewBytesLimiterFactory(10e6),
func(_ prometheus.Counter) BytesLimiter {
return &compositeBytesLimiterMock{
limiters: []BytesLimiter{
&bytesLimiterMock{
limitFunc: func(_ uint64, _ StoreDataType) error {
firstBytesLimiterChecked = true
return nil
},
},
&bytesLimiterMock{
limitFunc: func(_ uint64, _ StoreDataType) error {
secondBytesLimiterChecked = true
return nil
},
},
},
}
},
NewGapBasedPartitioner(PartitionerMaxGapSize),
20,
true,
Expand Down Expand Up @@ -3891,4 +3911,27 @@ func TestBucketStoreStreamingSeriesLimit(t *testing.T) {
srv := newStoreSeriesServer(context.Background())
testutil.Ok(t, bucketStore.Series(req, srv))
testutil.Equals(t, 2, len(srv.SeriesSet))
testutil.Equals(t, true, firstBytesLimiterChecked)
testutil.Equals(t, true, secondBytesLimiterChecked)
}

type bytesLimiterMock struct {
limitFunc func(uint64, StoreDataType) error
}

func (m *bytesLimiterMock) ReserveWithType(num uint64, dataType StoreDataType) error {
return m.limitFunc(num, dataType)
}

type compositeBytesLimiterMock struct {
limiters []BytesLimiter
}

func (m *compositeBytesLimiterMock) ReserveWithType(num uint64, dataType StoreDataType) error {
for _, l := range m.limiters {
if err := l.ReserveWithType(num, dataType); err != nil {
return err
}
}
return nil
}
6 changes: 5 additions & 1 deletion pkg/store/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type BytesLimiter interface {
// Reserve bytes out of the total amount of bytes enforced by the limiter.
// Returns an error if the limit has been exceeded. This function must be
// goroutine safe.
Reserve(num uint64) error
ReserveWithType(num uint64, dataType StoreDataType) error
}

// ChunksLimiterFactory is used to create a new ChunksLimiter. The factory is useful for
Expand Down Expand Up @@ -64,6 +64,10 @@ func NewLimiter(limit uint64, ctr prometheus.Counter) *Limiter {

// Reserve implements ChunksLimiter.
func (l *Limiter) Reserve(num uint64) error {
return l.ReserveWithType(num, 0)
}

func (l *Limiter) ReserveWithType(num uint64, _ StoreDataType) error {
if l == nil {
return nil
}
Expand Down

0 comments on commit 651a4a4

Please sign in to comment.