diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 217e89c9e37..d3f78348d3f 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -316,6 +316,7 @@ func runStore( chunkPool, store.NewChunksLimiterFactory(maxSampleCount/store.MaxSamplesPerChunk), // The samples limit is an approximation based on the max number of samples per chunk. store.NewSeriesLimiterFactory(maxSeriesCount), + store.NewGapBasedPartitioner(store.PartitionerMaxGapSize), verbose, blockSyncConcurrency, filterConf, diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 51200ada599..c2bc9193529 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -82,7 +82,7 @@ const ( // not too small (too much memory). DefaultPostingOffsetInMemorySampling = 32 - partitionerMaxGapSize = 512 * 1024 + PartitionerMaxGapSize = 512 * 1024 // Labels for metrics. labelEncode = "encode" @@ -278,7 +278,7 @@ type BucketStore struct { chunksLimiterFactory ChunksLimiterFactory // seriesLimiterFactory creates a new limiter used to limit the number of touched series by each Series() call. seriesLimiterFactory SeriesLimiterFactory - partitioner partitioner + partitioner Partitioner filterConfig *FilterConfig advLabelSets []labelpb.ZLabelSet @@ -303,6 +303,7 @@ func NewBucketStore( chunkPool pool.BytesPool, chunksLimiterFactory ChunksLimiterFactory, seriesLimiterFactory SeriesLimiterFactory, + partitioner Partitioner, debugLogging bool, blockSyncConcurrency int, filterConfig *FilterConfig, @@ -332,7 +333,7 @@ func NewBucketStore( queryGate: queryGate, chunksLimiterFactory: chunksLimiterFactory, seriesLimiterFactory: seriesLimiterFactory, - partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize}, + partitioner: partitioner, enableCompatibilityLabel: enableCompatibilityLabel, postingOffsetsInMemSampling: postingOffsetsInMemSampling, enableSeriesResponseHints: enableSeriesResponseHints, @@ -1378,7 +1379,7 @@ type bucketBlock struct { pendingReaders sync.WaitGroup - partitioner partitioner + partitioner Partitioner // Block's labels used by block-level matchers to filter blocks to query. These are used to select blocks using // request hints' BlockMatchers. @@ -1395,7 +1396,7 @@ func newBucketBlock( indexCache storecache.IndexCache, chunkPool pool.BytesPool, indexHeadReader indexheader.Reader, - p partitioner, + p Partitioner, ) (b *bucketBlock, err error) { b = &bucketBlock{ logger: logger, @@ -2034,7 +2035,7 @@ type part struct { elemRng [2]int } -type partitioner interface { +type Partitioner interface { // Partition partitions length entries into n <= length ranges that cover all // input ranges // It supports overlapping ranges. @@ -2046,6 +2047,12 @@ type gapBasedPartitioner struct { maxGapSize uint64 } +func NewGapBasedPartitioner(maxGapSize uint64) Partitioner { + return gapBasedPartitioner{ + maxGapSize: maxGapSize, + } +} + // Partition partitions length entries into n <= length ranges that cover all // input ranges by combining entries that are separated by reasonably small gaps. // It is used to combine multiple small ranges from object storage into bigger, more efficient/cheaper ones. diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index b5b329cc3dd..d09bfc889b3 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -168,6 +168,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m chunkPool, NewChunksLimiterFactory(maxChunksLimit), NewSeriesLimiterFactory(0), + NewGapBasedPartitioner(PartitionerMaxGapSize), false, 20, filterConf, diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 19153f5fbf7..59ff00c1158 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -579,6 +579,7 @@ func TestBucketStore_Info(t *testing.T) { chunkPool, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), + NewGapBasedPartitioner(PartitionerMaxGapSize), false, 20, allowAllFilterConf, @@ -834,6 +835,7 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul chunkPool, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), + NewGapBasedPartitioner(PartitionerMaxGapSize), false, 20, allowAllFilterConf, @@ -1146,7 +1148,7 @@ func benchmarkExpandedPostings( indexCache: noopCache{}, bkt: bkt, meta: &metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: id}}, - partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize}, + partitioner: NewGapBasedPartitioner(PartitionerMaxGapSize), } indexr := newBucketIndexReader(context.Background(), b) @@ -1262,7 +1264,7 @@ func benchBucketSeries(t testutil.TB, skipChunk bool, samplesPerSeries, totalSer metrics: m, bkt: bkt, meta: meta, - partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize}, + partitioner: NewGapBasedPartitioner(PartitionerMaxGapSize), chunkObjs: []string{filepath.Join(id.String(), "chunks", "000001")}, chunkPool: chunkPool, extLset: extLset, @@ -1436,7 +1438,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { metrics: newBucketStoreMetrics(nil), bkt: bkt, meta: meta, - partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize}, + partitioner: NewGapBasedPartitioner(PartitionerMaxGapSize), chunkObjs: []string{filepath.Join(id.String(), "chunks", "000001")}, chunkPool: chunkPool, } @@ -1475,7 +1477,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { metrics: newBucketStoreMetrics(nil), bkt: bkt, meta: meta, - partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize}, + partitioner: NewGapBasedPartitioner(PartitionerMaxGapSize), chunkObjs: []string{filepath.Join(id.String(), "chunks", "000001")}, chunkPool: chunkPool, } @@ -1656,6 +1658,7 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) { chunkPool, NewChunksLimiterFactory(10000/MaxSamplesPerChunk), NewSeriesLimiterFactory(0), + NewGapBasedPartitioner(PartitionerMaxGapSize), false, 10, nil, @@ -1753,6 +1756,7 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) { chunkPool, NewChunksLimiterFactory(100000/MaxSamplesPerChunk), NewSeriesLimiterFactory(0), + NewGapBasedPartitioner(PartitionerMaxGapSize), false, 10, nil, @@ -1901,6 +1905,7 @@ func TestBlockWithLargeChunks(t *testing.T) { chunkPool, NewChunksLimiterFactory(10000/MaxSamplesPerChunk), NewSeriesLimiterFactory(0), + NewGapBasedPartitioner(PartitionerMaxGapSize), false, 10, nil, @@ -2065,6 +2070,7 @@ func setupStoreForHintsTest(t *testing.T) (testutil.TB, *BucketStore, []*storepb chunkPool, NewChunksLimiterFactory(10000/MaxSamplesPerChunk), NewSeriesLimiterFactory(0), + NewGapBasedPartitioner(PartitionerMaxGapSize), false, 10, nil,