Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store add touch series limit #3509

Merged
merged 9 commits into from
Dec 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re

- [#3469](https://github.com/thanos-io/thanos/pull/3469) StoreAPI: Added `hints` field to `LabelNamesRequest` and `LabelValuesRequest`. Hints in an opaque data structure that can be used to carry additional information from the store and its content is implementation specific.
- [#3421](https://github.com/thanos-io/thanos/pull/3421) Tools: Added `thanos tools bucket rewrite` command allowing to delete series from given block.
- [#3509](https://github.com/thanos-io/thanos/pull/3509) Store: Added touch series limit
- [#3388](https://github.com/thanos-io/thanos/pull/3378) Tools: Bucket replicator now can specify block IDs to copy.

### Fixed
Expand Down
7 changes: 6 additions & 1 deletion cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ func registerStore(app *extkingpin.App) {
maxSampleCount := cmd.Flag("store.grpc.series-sample-limit",
"Maximum amount of samples returned via a single Series call. The Series call fails if this limit is exceeded. 0 means no limit. NOTE: For efficiency the limit is internally implemented as 'chunks limit' considering each chunk contains 120 samples (it's the max number of samples each chunk can contain), so the actual number of samples might be lower, even though the maximum could be hit.").
Default("0").Uint()
maxTouchedSeriesCount := cmd.Flag("store.grpc.touched-series-limit",
"Maximum amount of touched series returned via a single Series call. The Series call fails if this limit is exceeded. 0 means no limit.").
Default("0").Uint()

maxConcurrent := cmd.Flag("store.grpc.series-max-concurrency", "Maximum number of concurrent Series calls.").Default("20").Int()

Expand Down Expand Up @@ -136,6 +139,7 @@ func registerStore(app *extkingpin.App) {
uint64(*indexCacheSize),
uint64(*chunkPoolSize),
uint64(*maxSampleCount),
uint64(*maxTouchedSeriesCount),
*maxConcurrent,
component.Store,
debugLogging,
Expand Down Expand Up @@ -173,7 +177,7 @@ func runStore(
grpcGracePeriod time.Duration,
grpcCert, grpcKey, grpcClientCA, httpBindAddr string,
httpGracePeriod time.Duration,
indexCacheSizeBytes, chunkPoolSizeBytes, maxSampleCount uint64,
indexCacheSizeBytes, chunkPoolSizeBytes, maxSampleCount, maxSeriesCount uint64,
maxConcurrency int,
component component.Component,
verbose bool,
Expand Down Expand Up @@ -303,6 +307,7 @@ func runStore(
queriesGate,
chunkPoolSizeBytes,
store.NewChunksLimiterFactory(maxSampleCount/store.MaxSamplesPerChunk), // The samples limit is an approximation based on the max number of samples per chunk.
store.NewSeriesLimiterFactory(maxSeriesCount),
verbose,
blockSyncConcurrency,
filterConf,
Expand Down
4 changes: 4 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ Flags:
samples each chunk can contain), so the actual
number of samples might be lower, even though
the maximum could be hit.
--store.grpc.touched-series-limit=0
Maximum amount of touched series returned via a
single Series call. The Series call fails if
this limit is exceeded. 0 means no limit.
--store.grpc.series-max-concurrency=20
Maximum number of concurrent Series calls.
--objstore.config-file=<file-path>
Expand Down
22 changes: 17 additions & 5 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ type bucketStoreMetrics struct {
seriesMergeDuration prometheus.Histogram
resultSeriesCount prometheus.Summary
chunkSizeBytes prometheus.Histogram
queriesDropped prometheus.Counter
queriesDropped *prometheus.CounterVec
seriesRefetches prometheus.Counter

cachedPostingsCompressions *prometheus.CounterVec
Expand Down Expand Up @@ -186,10 +186,10 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
},
})

m.queriesDropped = promauto.With(reg).NewCounter(prometheus.CounterOpts{
m.queriesDropped = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "thanos_bucket_store_queries_dropped_total",
Help: "Number of queries that were dropped due to the sample limit.",
})
Help: "Number of queries that were dropped due to the limit.",
}, []string{"reason"})
m.seriesRefetches = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_bucket_store_series_refetches_total",
Help: fmt.Sprintf("Total number of cases where %v bytes was not enough was to fetch series from index, resulting in refetch.", maxSeriesSize),
Expand Down Expand Up @@ -276,6 +276,8 @@ type BucketStore struct {

// chunksLimiterFactory creates a new limiter used to limit the number of chunks fetched by each Series() call.
chunksLimiterFactory ChunksLimiterFactory
// seriesLimiterFactory creates a new limiter used to limit the number of touched series by each Series() call.
seriesLimiterFactory SeriesLimiterFactory
partitioner partitioner

filterConfig *FilterConfig
Expand All @@ -300,6 +302,7 @@ func NewBucketStore(
queryGate gate.Gate,
maxChunkPoolBytes uint64,
chunksLimiterFactory ChunksLimiterFactory,
seriesLimiterFactory SeriesLimiterFactory,
debugLogging bool,
blockSyncConcurrency int,
filterConfig *FilterConfig,
Expand Down Expand Up @@ -333,6 +336,7 @@ func NewBucketStore(
filterConfig: filterConfig,
queryGate: queryGate,
chunksLimiterFactory: chunksLimiterFactory,
seriesLimiterFactory: seriesLimiterFactory,
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
enableCompatibilityLabel: enableCompatibilityLabel,
postingOffsetsInMemSampling: postingOffsetsInMemSampling,
Expand Down Expand Up @@ -683,6 +687,7 @@ func blockSeries(
matchers []*labels.Matcher,
req *storepb.SeriesRequest,
chunksLimiter ChunksLimiter,
seriesLimiter SeriesLimiter,
) (storepb.SeriesSet, *queryStats, error) {
ps, err := indexr.ExpandedPostings(matchers)
if err != nil {
Expand All @@ -693,6 +698,11 @@ func blockSeries(
return storepb.EmptySeriesSet(), indexr.stats, nil
}

// Reserve series seriesLimiter
if err := seriesLimiter.Reserve(uint64(len(ps))); err != nil {
return nil, nil, errors.Wrap(err, "exceeded series limit")
}

// Preload all series index data.
// TODO(bwplotka): Consider not keeping all series in memory all the time.
// TODO(bwplotka): Do lazy loading in one step as `ExpandingPostings` method.
Expand Down Expand Up @@ -882,7 +892,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
g, gctx = errgroup.WithContext(ctx)
resHints = &hintspb.SeriesResponseHints{}
reqBlockMatchers []*labels.Matcher
chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped)
chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped.WithLabelValues("chunks"))
seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series"))
)

if req.Hints != nil {
Expand Down Expand Up @@ -938,6 +949,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
blockMatchers,
req,
chunksLimiter,
seriesLimiter,
)
if err != nil {
return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID)
Expand Down
1 change: 1 addition & 0 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
nil,
0,
NewChunksLimiterFactory(maxChunksLimit),
NewSeriesLimiterFactory(0),
false,
20,
filterConf,
Expand Down
8 changes: 8 additions & 0 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,7 @@ func TestBucketStore_Info(t *testing.T) {
nil,
2e5,
NewChunksLimiterFactory(0),
NewSeriesLimiterFactory(0),
false,
20,
allowAllFilterConf,
Expand Down Expand Up @@ -826,6 +827,7 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul
nil,
0,
NewChunksLimiterFactory(0),
NewSeriesLimiterFactory(0),
false,
20,
allowAllFilterConf,
Expand Down Expand Up @@ -1273,6 +1275,7 @@ func benchBucketSeries(t testutil.TB, skipChunk bool, samplesPerSeries, totalSer
},
queryGate: noopGate{},
chunksLimiterFactory: NewChunksLimiterFactory(0),
seriesLimiterFactory: NewSeriesLimiterFactory(0),
}

for _, block := range blocks {
Expand Down Expand Up @@ -1489,6 +1492,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
},
queryGate: noopGate{},
chunksLimiterFactory: NewChunksLimiterFactory(0),
seriesLimiterFactory: NewSeriesLimiterFactory(0),
}

t.Run("invoke series for one block. Fill the cache on the way.", func(t *testing.T) {
Expand Down Expand Up @@ -1642,6 +1646,7 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) {
nil,
1000000,
NewChunksLimiterFactory(10000/MaxSamplesPerChunk),
NewSeriesLimiterFactory(0),
false,
10,
nil,
Expand Down Expand Up @@ -1735,6 +1740,7 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) {
nil,
1000000,
NewChunksLimiterFactory(100000/MaxSamplesPerChunk),
NewSeriesLimiterFactory(0),
false,
10,
nil,
Expand Down Expand Up @@ -1879,6 +1885,7 @@ func TestBlockWithLargeChunks(t *testing.T) {
nil,
1000000,
NewChunksLimiterFactory(10000/MaxSamplesPerChunk),
NewSeriesLimiterFactory(0),
false,
10,
nil,
Expand Down Expand Up @@ -2039,6 +2046,7 @@ func setupStoreForHintsTest(t *testing.T) (testutil.TB, *BucketStore, []*storepb
nil,
1000000,
NewChunksLimiterFactory(10000/MaxSamplesPerChunk),
NewSeriesLimiterFactory(0),
false,
10,
nil,
Expand Down
17 changes: 17 additions & 0 deletions pkg/store/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,20 @@ type ChunksLimiter interface {
Reserve(num uint64) error
}

type SeriesLimiter interface {
// Reserve num series out of the total number of series enforced by the limiter.
// Returns an error if the limit has been exceeded. This function must be
// goroutine safe.
Reserve(num uint64) error
}

// ChunksLimiterFactory is used to create a new ChunksLimiter. The factory is useful for
// projects depending on Thanos (eg. Cortex) which have dynamic limits.
type ChunksLimiterFactory func(failedCounter prometheus.Counter) ChunksLimiter

// SeriesLimiterFactory is used to create a new SeriesLimiter.
type SeriesLimiterFactory func(failedCounter prometheus.Counter) SeriesLimiter
lisuo3389 marked this conversation as resolved.
Show resolved Hide resolved

// Limiter is a simple mechanism for checking if something has passed a certain threshold.
type Limiter struct {
limit uint64
Expand Down Expand Up @@ -57,3 +67,10 @@ func NewChunksLimiterFactory(limit uint64) ChunksLimiterFactory {
return NewLimiter(limit, failedCounter)
}
}

// NewSeriesLimiterFactory makes a new NewSeriesLimiterFactory with a static limit.
func NewSeriesLimiterFactory(limit uint64) SeriesLimiterFactory {
return func(failedCounter prometheus.Counter) SeriesLimiter {
return NewLimiter(limit, failedCounter)
}
}