diff --git a/CHANGELOG.md b/CHANGELOG.md index 14eafae02d0..abc3d2ad4c0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#5926](https://github.com/thanos-io/thanos/pull/5926) Receiver: Add experimental string interning in writer. Can be enabled with a hidden flag `writer.intern`. - [#5773](https://github.com/thanos-io/thanos/pull/5773) Store: Support disable cache index header file. - [#5653](https://github.com/thanos-io/thanos/pull/5653) Receive: Allow setting hashing algorithm per tenant in hashrings config +- [#6074](https://github.com/thanos-io/thanos/pull/6074) *: Add histogram metrics `thanos_store_server_series_requested` and `thanos_store_server_chunks_requested` to all Stores. +- [#6074](https://github.com/thanos-io/thanos/pull/6074) *: Allow configuring series and sample limits per `Series` request for all Stores. - [#6104](https://github.com/thanos-io/thanos/pull/6104) Objstore: Support S3 session token. - [#5548](https://github.com/thanos-io/thanos/pull/5548) Query: Added experimental support for load balancing across multiple Store endpoints. diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 3a917623d62..13160344100 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -211,6 +211,9 @@ func registerQuery(app *extkingpin.App) { queryTelemetrySamplesQuantiles := cmd.Flag("query.telemetry.request-samples-quantiles", "The quantiles for exporting metrics about the samples count quantiles.").Default("100", "1000", "10000", "100000", "1000000").Int64List() queryTelemetrySeriesQuantiles := cmd.Flag("query.telemetry.request-series-seconds-quantiles", "The quantiles for exporting metrics about the series count quantiles.").Default("10", "100", "1000", "10000", "100000").Int64List() + var storeRateLimits store.SeriesSelectLimits + storeRateLimits.RegisterFlags(cmd) + cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error { selectorLset, err := parseFlagLabels(*selectorLabels) if err != nil { @@ -329,6 +332,7 @@ func registerQuery(app *extkingpin.App) { *queryTelemetrySamplesQuantiles, *queryTelemetrySeriesQuantiles, promqlEngineType(*promqlEngine), + storeRateLimits, ) }) } @@ -407,6 +411,7 @@ func runQuery( queryTelemetrySamplesQuantiles []int64, queryTelemetrySeriesQuantiles []int64, promqlEngine promqlEngineType, + storeRateLimits store.SeriesSelectLimits, ) error { if alertQueryURL == "" { lastColon := strings.LastIndex(httpBindAddr, ":") @@ -782,9 +787,10 @@ func runQuery( ) grpcAPI := apiv1.NewGRPCAPI(time.Now, queryReplicaLabels, queryableCreator, queryEngine, lookbackDeltaCreator, instantDefaultMaxSourceResolution) + storeServer := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, proxy), reg, storeRateLimits) s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe, grpcserver.WithServer(apiv1.RegisterQueryServer(grpcAPI)), - grpcserver.WithServer(store.RegisterStoreServer(proxy, logger)), + grpcserver.WithServer(store.RegisterStoreServer(storeServer, logger)), grpcserver.WithServer(rules.RegisterRulesServer(rulesProxy)), grpcserver.WithServer(targets.RegisterTargetsServer(targetsProxy)), grpcserver.WithServer(metadata.RegisterMetadataServer(metadataProxy)), diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index eac2d6add42..a70edc16d3a 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -210,8 +210,8 @@ func runReceive( writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs, conf.writerInterning) var limitsConfig *receive.RootLimitsConfig - if conf.limitsConfig != nil { - limitsContentYaml, err := conf.limitsConfig.Content() + if conf.writeLimitsConfig != nil { + limitsContentYaml, err := conf.writeLimitsConfig.Content() if err != nil { return errors.Wrap(err, "get content of limit configuration") } @@ -220,7 +220,7 @@ func runReceive( return errors.Wrap(err, "parse limit configuration") } } - limiter, err := receive.NewLimiter(conf.limitsConfig, reg, receiveMode, log.With(logger, "component", "receive-limiter")) + limiter, err := receive.NewLimiter(conf.writeLimitsConfig, reg, receiveMode, log.With(logger, "component", "receive-limiter")) if err != nil { return errors.Wrap(err, "creating limiter") } @@ -305,7 +305,7 @@ func runReceive( return errors.Wrap(err, "setup gRPC server") } - mts := store.NewProxyStore( + proxy := store.NewProxyStore( logger, reg, dbs.TSDBLocalClients, @@ -314,6 +314,7 @@ func runReceive( 0, store.LazyRetrieval, ) + mts := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, proxy), reg, conf.storeRateLimits) rw := store.ReadWriteTSDBStore{ StoreServer: mts, WriteableStoreServer: webHandler, @@ -321,10 +322,10 @@ func runReceive( infoSrv := info.NewInfoServer( component.Receive.String(), - info.WithLabelSetFunc(func() []labelpb.ZLabelSet { return mts.LabelSet() }), + info.WithLabelSetFunc(func() []labelpb.ZLabelSet { return proxy.LabelSet() }), info.WithStoreInfoFunc(func() *infopb.StoreInfo { if httpProbe.IsReady() { - minTime, maxTime := mts.TimeRange() + minTime, maxTime := proxy.TimeRange() return &infopb.StoreInfo{ MinTime: minTime, MaxTime: maxTime, @@ -798,12 +799,14 @@ type receiveConfig struct { reqLogConfig *extflag.PathOrContent relabelConfigPath *extflag.PathOrContent - limitsConfig *extflag.PathOrContent + writeLimitsConfig *extflag.PathOrContent + storeRateLimits store.SeriesSelectLimits } func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { rc.httpBindAddr, rc.httpGracePeriod, rc.httpTLSConfig = extkingpin.RegisterHTTPFlags(cmd) rc.grpcBindAddr, rc.grpcGracePeriod, rc.grpcCert, rc.grpcKey, rc.grpcClientCA, rc.grpcMaxConnAge = extkingpin.RegisterGRPCFlags(cmd) + rc.storeRateLimits.RegisterFlags(cmd) cmd.Flag("remote-write.address", "Address to listen on for remote write requests."). Default("0.0.0.0:19291").StringVar(&rc.rwAddress) @@ -919,7 +922,7 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { rc.reqLogConfig = extkingpin.RegisterRequestLoggingFlags(cmd) - rc.limitsConfig = extflag.RegisterPathOrContent(cmd, "receive.limits-config", "YAML file that contains limit configuration.", extflag.WithEnvSubstitution(), extflag.WithHidden()) + rc.writeLimitsConfig = extflag.RegisterPathOrContent(cmd, "receive.limits-config", "YAML file that contains limit configuration.", extflag.WithEnvSubstitution(), extflag.WithHidden()) } // determineMode returns the ReceiverMode that this receiver is configured to run in. diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 2d8d79cf1e9..01df5324134 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -94,6 +94,7 @@ type ruleConfig struct { dataDir string lset labels.Labels ignoredLabelNames []string + storeRateLimits store.SeriesSelectLimits } func (rc *ruleConfig) registerFlag(cmd extkingpin.FlagClause) { @@ -103,6 +104,7 @@ func (rc *ruleConfig) registerFlag(cmd extkingpin.FlagClause) { rc.shipper.registerFlag(cmd) rc.query.registerFlag(cmd) rc.alertmgr.registerFlag(cmd) + rc.storeRateLimits.RegisterFlags(cmd) } // registerRule registers a rule command. @@ -634,7 +636,8 @@ func runRule( return nil }), ) - options = append(options, grpcserver.WithServer(store.RegisterStoreServer(tsdbStore, logger))) + storeServer := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, tsdbStore), reg, conf.storeRateLimits) + options = append(options, grpcserver.WithServer(store.RegisterStoreServer(storeServer, logger))) } options = append(options, grpcserver.WithServer( diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 478ae29e13a..2206d8c9dc9 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -282,8 +282,9 @@ func runSidecar( info.WithMetricMetadataInfoFunc(), ) + storeServer := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, promStore), reg, conf.storeRateLimits) s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe, - grpcserver.WithServer(store.RegisterStoreServer(promStore, logger)), + grpcserver.WithServer(store.RegisterStoreServer(storeServer, logger)), grpcserver.WithServer(rules.RegisterRulesServer(rules.NewPrometheus(conf.prometheus.url, c, m.Labels))), grpcserver.WithServer(targets.RegisterTargetsServer(targets.NewPrometheus(conf.prometheus.url, c, m.Labels))), grpcserver.WithServer(meta.RegisterMetadataServer(meta.NewPrometheus(conf.prometheus.url, c))), @@ -474,15 +475,16 @@ func (s *promMetadata) Version() string { } type sidecarConfig struct { - http httpConfig - grpc grpcConfig - prometheus prometheusConfig - tsdb tsdbConfig - reloader reloaderConfig - reqLogConfig *extflag.PathOrContent - objStore extflag.PathOrContent - shipper shipperConfig - limitMinTime thanosmodel.TimeOrDurationValue + http httpConfig + grpc grpcConfig + prometheus prometheusConfig + tsdb tsdbConfig + reloader reloaderConfig + reqLogConfig *extflag.PathOrContent + objStore extflag.PathOrContent + shipper shipperConfig + limitMinTime thanosmodel.TimeOrDurationValue + storeRateLimits store.SeriesSelectLimits } func (sc *sidecarConfig) registerFlag(cmd extkingpin.FlagClause) { @@ -494,6 +496,7 @@ func (sc *sidecarConfig) registerFlag(cmd extkingpin.FlagClause) { sc.reqLogConfig = extkingpin.RegisterRequestLoggingFlags(cmd) sc.objStore = *extkingpin.RegisterCommonObjStoreFlags(cmd, "", false) sc.shipper.registerFlag(cmd) + sc.storeRateLimits.RegisterFlags(cmd) cmd.Flag("min-time", "Start of time range limit to serve. Thanos sidecar will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). Default("0000-01-01T00:00:00Z").SetValue(&sc.limitMinTime) } diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 207937f1ce7..c59154b6b6c 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -64,8 +64,7 @@ type storeConfig struct { indexCacheSizeBytes units.Base2Bytes chunkPoolSize units.Base2Bytes seriesBatchSize int - maxSampleCount uint64 - maxTouchedSeriesCount uint64 + storeRateLimits store.SeriesSelectLimits maxDownloadedBytes units.Base2Bytes maxConcurrency int component component.StoreAPI @@ -90,6 +89,7 @@ type storeConfig struct { func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { sc.httpConfig = *sc.httpConfig.registerFlag(cmd) sc.grpcConfig = *sc.grpcConfig.registerFlag(cmd) + sc.storeRateLimits.RegisterFlags(cmd) cmd.Flag("data-dir", "Local data directory used for caching purposes (index-header, in-mem cache items and meta.jsons). If removed, no data will be lost, just store will have to rebuild the cache. NOTE: Putting raw blocks here will not cause the store to read them. For such use cases use Prometheus + sidecar. Ignored if -no-cache-index-header option is specified."). Default("./data").StringVar(&sc.dataDir) @@ -113,13 +113,8 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("chunk-pool-size", "Maximum size of concurrently allocatable bytes reserved strictly to reuse for chunks in memory."). Default("2GB").BytesVar(&sc.chunkPoolSize) - cmd.Flag("store.grpc.series-sample-limit", - "Maximum amount of samples returned via a single Series call. The Series call fails if this limit is exceeded. 0 means no limit. NOTE: For efficiency the limit is internally implemented as 'chunks limit' considering each chunk contains 120 samples (it's the max number of samples each chunk can contain), so the actual number of samples might be lower, even though the maximum could be hit."). - Default("0").Uint64Var(&sc.maxSampleCount) - - cmd.Flag("store.grpc.touched-series-limit", - "Maximum amount of touched series returned via a single Series call. The Series call fails if this limit is exceeded. 0 means no limit."). - Default("0").Uint64Var(&sc.maxTouchedSeriesCount) + cmd.Flag("store.grpc.touched-series-limit", "DEPRECATED: use store.limits.request-series.").Default("0").Uint64Var(&sc.storeRateLimits.SeriesPerRequest) + cmd.Flag("store.grpc.series-sample-limit", "DEPRECATED: use store.limits.request-samples.").Default("0").Uint64Var(&sc.storeRateLimits.SamplesPerRequest) cmd.Flag("store.grpc.downloaded-bytes-limit", "Maximum amount of downloaded (either fetched or touched) bytes in a single Series/LabelNames/LabelValues call. The Series call fails if this limit is exceeded. 0 means no limit."). @@ -370,8 +365,8 @@ func runStore( bkt, metaFetcher, dataDir, - store.NewChunksLimiterFactory(conf.maxSampleCount/store.MaxSamplesPerChunk), // The samples limit is an approximation based on the max number of samples per chunk. - store.NewSeriesLimiterFactory(conf.maxTouchedSeriesCount), + store.NewChunksLimiterFactory(conf.storeRateLimits.SamplesPerRequest/store.MaxSamplesPerChunk), // The samples limit is an approximation based on the max number of samples per chunk. + store.NewSeriesLimiterFactory(conf.storeRateLimits.SeriesPerRequest), store.NewBytesLimiterFactory(conf.maxDownloadedBytes), store.NewGapBasedPartitioner(store.PartitionerMaxGapSize), conf.blockSyncConcurrency, @@ -453,8 +448,9 @@ func runStore( return errors.Wrap(err, "setup gRPC server") } + storeServer := store.NewInstrumentedStoreServer(reg, bs) s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, conf.component, grpcProbe, - grpcserver.WithServer(store.RegisterStoreServer(bs, logger)), + grpcserver.WithServer(store.RegisterStoreServer(storeServer, logger)), grpcserver.WithServer(info.RegisterInfoServer(infoSrv)), grpcserver.WithListen(conf.grpcConfig.bindAddress), grpcserver.WithGracePeriod(time.Duration(conf.grpcConfig.gracePeriod)), diff --git a/docs/components/query.md b/docs/components/query.md index b37dd9e9ea5..e68a8f68a16 100644 --- a/docs/components/query.md +++ b/docs/components/query.md @@ -440,6 +440,17 @@ Flags: that are always used, even if the health check fails. Useful if you have a caching layer on top. + --store.limits.request-samples=0 + The maximum samples allowed for a single + Series request, The Series call fails if + this limit is exceeded. 0 means no limit. + NOTE: For efficiency the limit is internally + implemented as 'chunks limit' considering each + chunk contains a maximum of 120 samples. + --store.limits.request-series=0 + The maximum series allowed for a single Series + request. The Series call fails if this limit is + exceeded. 0 means no limit. --store.response-timeout=0ms If a Store doesn't send any data in this specified duration then a Store will be ignored diff --git a/docs/components/receive.md b/docs/components/receive.md index 14925d54635..b9865807c3d 100644 --- a/docs/components/receive.md +++ b/docs/components/receive.md @@ -334,6 +334,17 @@ Flags: Path to YAML file with request logging configuration. See format details: https://thanos.io/tip/thanos/logging.md/#configuration + --store.limits.request-samples=0 + The maximum samples allowed for a single + Series request, The Series call fails if + this limit is exceeded. 0 means no limit. + NOTE: For efficiency the limit is internally + implemented as 'chunks limit' considering each + chunk contains a maximum of 120 samples. + --store.limits.request-series=0 + The maximum series allowed for a single Series + request. The Series call fails if this limit is + exceeded. 0 means no limit. --tracing.config= Alternative to 'tracing.config-file' flag (mutually exclusive). Content of YAML file diff --git a/docs/components/rule.md b/docs/components/rule.md index cd7558d902c..3736d6bb2ce 100644 --- a/docs/components/rule.md +++ b/docs/components/rule.md @@ -453,6 +453,17 @@ Flags: Works only if compaction is disabled on Prometheus. Do it once and then disable the flag when done. + --store.limits.request-samples=0 + The maximum samples allowed for a single + Series request, The Series call fails if + this limit is exceeded. 0 means no limit. + NOTE: For efficiency the limit is internally + implemented as 'chunks limit' considering each + chunk contains a maximum of 120 samples. + --store.limits.request-series=0 + The maximum series allowed for a single Series + request. The Series call fails if this limit is + exceeded. 0 means no limit. --tracing.config= Alternative to 'tracing.config-file' flag (mutually exclusive). Content of YAML file diff --git a/docs/components/sidecar.md b/docs/components/sidecar.md index d5aed396346..3e265c9db87 100644 --- a/docs/components/sidecar.md +++ b/docs/components/sidecar.md @@ -174,6 +174,17 @@ Flags: Works only if compaction is disabled on Prometheus. Do it once and then disable the flag when done. + --store.limits.request-samples=0 + The maximum samples allowed for a single + Series request, The Series call fails if + this limit is exceeded. 0 means no limit. + NOTE: For efficiency the limit is internally + implemented as 'chunks limit' considering each + chunk contains a maximum of 120 samples. + --store.limits.request-series=0 + The maximum series allowed for a single Series + request. The Series call fails if this limit is + exceeded. 0 means no limit. --tracing.config= Alternative to 'tracing.config-file' flag (mutually exclusive). Content of YAML file diff --git a/docs/components/store.md b/docs/components/store.md index a67411934c1..64e1b0e05d7 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -178,19 +178,20 @@ Flags: --store.grpc.series-max-concurrency=20 Maximum number of concurrent Series calls. --store.grpc.series-sample-limit=0 - Maximum amount of samples returned via a - single Series call. The Series call fails - if this limit is exceeded. 0 means no limit. - NOTE: For efficiency the limit is internally - implemented as 'chunks limit' considering - each chunk contains 120 samples (it's the max - number of samples each chunk can contain), - so the actual number of samples might be lower, - even though the maximum could be hit. + DEPRECATED: use store.limits.request-samples. --store.grpc.touched-series-limit=0 - Maximum amount of touched series returned via - a single Series call. The Series call fails if + DEPRECATED: use store.limits.request-series. + --store.limits.request-samples=0 + The maximum samples allowed for a single + Series request, The Series call fails if this limit is exceeded. 0 means no limit. + NOTE: For efficiency the limit is internally + implemented as 'chunks limit' considering each + chunk contains a maximum of 120 samples. + --store.limits.request-series=0 + The maximum series allowed for a single Series + request. The Series call fails if this limit is + exceeded. 0 means no limit. --sync-block-duration=3m Repeat interval for syncing the blocks between local and remote view. --tracing.config= diff --git a/pkg/store/limiter.go b/pkg/store/limiter.go index 6229b1a3836..f564e114432 100644 --- a/pkg/store/limiter.go +++ b/pkg/store/limiter.go @@ -9,7 +9,11 @@ import ( "github.com/alecthomas/units" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "go.uber.org/atomic" + + "github.com/thanos-io/thanos/pkg/extkingpin" + "github.com/thanos-io/thanos/pkg/store/storepb" ) type ChunksLimiter interface { @@ -82,16 +86,94 @@ func NewChunksLimiterFactory(limit uint64) ChunksLimiterFactory { } } -// NewSeriesLimiterFactory makes a new NewSeriesLimiterFactory with a static limit. +// NewSeriesLimiterFactory makes a new SeriesLimiterFactory with a static limit. func NewSeriesLimiterFactory(limit uint64) SeriesLimiterFactory { return func(failedCounter prometheus.Counter) SeriesLimiter { return NewLimiter(limit, failedCounter) } } -// NewSeriesLimiterFactory makes a new NewSeriesLimiterFactory with a static limit. +// NewBytesLimiterFactory makes a new BytesLimiterFactory with a static limit. func NewBytesLimiterFactory(limit units.Base2Bytes) BytesLimiterFactory { return func(failedCounter prometheus.Counter) BytesLimiter { return NewLimiter(uint64(limit), failedCounter) } } + +// SeriesSelectLimits are limits applied against individual Series calls. +type SeriesSelectLimits struct { + SeriesPerRequest uint64 + SamplesPerRequest uint64 +} + +func (l *SeriesSelectLimits) RegisterFlags(cmd extkingpin.FlagClause) { + cmd.Flag("store.limits.request-series", "The maximum series allowed for a single Series request. The Series call fails if this limit is exceeded. 0 means no limit.").Default("0").Uint64Var(&l.SeriesPerRequest) + cmd.Flag("store.limits.request-samples", "The maximum samples allowed for a single Series request, The Series call fails if this limit is exceeded. 0 means no limit. NOTE: For efficiency the limit is internally implemented as 'chunks limit' considering each chunk contains a maximum of 120 samples.").Default("0").Uint64Var(&l.SamplesPerRequest) +} + +var _ storepb.StoreServer = &limitedStoreServer{} + +// limitedStoreServer is a storepb.StoreServer that can apply series and sample limits against individual Series requests. +type limitedStoreServer struct { + storepb.StoreServer + newSeriesLimiter SeriesLimiterFactory + newSamplesLimiter ChunksLimiterFactory + failedRequestsCounter *prometheus.CounterVec +} + +// NewLimitedStoreServer creates a new limitedStoreServer. +func NewLimitedStoreServer(store storepb.StoreServer, reg prometheus.Registerer, selectLimits SeriesSelectLimits) storepb.StoreServer { + return &limitedStoreServer{ + StoreServer: store, + newSeriesLimiter: NewSeriesLimiterFactory(selectLimits.SeriesPerRequest), + newSamplesLimiter: NewChunksLimiterFactory(selectLimits.SamplesPerRequest), + failedRequestsCounter: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_store_selects_dropped_total", + Help: "Number of select queries that were dropped due to configured limits.", + }, []string{"reason"}), + } +} + +func (s *limitedStoreServer) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { + seriesLimiter := s.newSeriesLimiter(s.failedRequestsCounter.WithLabelValues("series")) + chunksLimiter := s.newSamplesLimiter(s.failedRequestsCounter.WithLabelValues("chunks")) + limitedSrv := newLimitedServer(srv, seriesLimiter, chunksLimiter) + if err := s.StoreServer.Series(req, limitedSrv); err != nil { + return err + } + + return nil +} + +var _ storepb.Store_SeriesServer = &limitedServer{} + +// limitedServer is a storepb.Store_SeriesServer that tracks statistics about sent series. +type limitedServer struct { + storepb.Store_SeriesServer + seriesLimiter SeriesLimiter + samplesLimiter ChunksLimiter +} + +func newLimitedServer(upstream storepb.Store_SeriesServer, seriesLimiter SeriesLimiter, chunksLimiter ChunksLimiter) *limitedServer { + return &limitedServer{ + Store_SeriesServer: upstream, + seriesLimiter: seriesLimiter, + samplesLimiter: chunksLimiter, + } +} + +func (i *limitedServer) Send(response *storepb.SeriesResponse) error { + series := response.GetSeries() + if series == nil { + return i.Store_SeriesServer.Send(response) + } + + if err := i.seriesLimiter.Reserve(1); err != nil { + return errors.Wrapf(err, "failed to send series") + } + if err := i.samplesLimiter.Reserve(uint64(len(series.Chunks) * MaxSamplesPerChunk)); err != nil { + return errors.Wrapf(err, "failed to send samples") + } + + return i.Store_SeriesServer.Send(response) +} diff --git a/pkg/store/limiter_test.go b/pkg/store/limiter_test.go index addbf23174c..97b225e8e68 100644 --- a/pkg/store/limiter_test.go +++ b/pkg/store/limiter_test.go @@ -4,12 +4,17 @@ package store import ( + "context" "testing" + "time" "github.com/efficientgo/core/testutil" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/prometheus/model/labels" + + "github.com/thanos-io/thanos/pkg/store/storepb" ) func TestLimiter(t *testing.T) { @@ -28,3 +33,103 @@ func TestLimiter(t *testing.T) { testutil.NotOk(t, l.Reserve(2)) testutil.Equals(t, float64(1), prom_testutil.ToFloat64(c)) } + +func TestRateLimitedServer(t *testing.T) { + numSamples := 60 + series := []*storepb.SeriesResponse{ + storeSeriesResponse(t, labels.FromStrings("series", "1"), makeSamples(numSamples)), + storeSeriesResponse(t, labels.FromStrings("series", "2"), makeSamples(numSamples)), + storeSeriesResponse(t, labels.FromStrings("series", "3"), makeSamples(numSamples)), + } + tests := []struct { + name string + limits SeriesSelectLimits + series []*storepb.SeriesResponse + err string + }{ + { + name: "no limits", + limits: SeriesSelectLimits{ + SeriesPerRequest: 0, + SamplesPerRequest: 0, + }, + series: series, + }, + { + name: "series bellow limit", + limits: SeriesSelectLimits{ + SeriesPerRequest: 3, + SamplesPerRequest: 0, + }, + series: series, + }, + { + name: "series over limit", + limits: SeriesSelectLimits{ + SeriesPerRequest: 2, + SamplesPerRequest: 0, + }, + series: series, + err: "failed to send series: limit 2 violated (got 3)", + }, + { + name: "chunks bellow limit", + limits: SeriesSelectLimits{ + SeriesPerRequest: 0, + SamplesPerRequest: uint64(3 * numSamples * MaxSamplesPerChunk), + }, + series: series, + }, + { + name: "chunks over limit", + limits: SeriesSelectLimits{ + SeriesPerRequest: 0, + SamplesPerRequest: 50, + }, + series: series, + err: "failed to send samples: limit 50 violated (got 120)", + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + store := NewLimitedStoreServer(newStoreServerStub(test.series), prometheus.NewRegistry(), test.limits) + seriesServer := storepb.NewInProcessStream(ctx, 10) + err := store.Series(&storepb.SeriesRequest{}, seriesServer) + if test.err == "" { + testutil.Ok(t, err) + } else { + testutil.NotOk(t, err) + testutil.Assert(t, test.err == err.Error(), "want %s, got %s", test.err, err.Error()) + } + }) + } +} + +func makeSamples(numSamples int) []sample { + samples := make([]sample, numSamples) + for i := range samples { + samples[i] = sample{t: int64(i), v: float64(i)} + } + return samples +} + +type testStoreServer struct { + storepb.StoreServer + responses []*storepb.SeriesResponse +} + +func newStoreServerStub(responses []*storepb.SeriesResponse) *testStoreServer { + return &testStoreServer{responses: responses} +} + +func (m *testStoreServer) Series(_ *storepb.SeriesRequest, server storepb.Store_SeriesServer) error { + for _, r := range m.responses { + if err := server.Send(r); err != nil { + return err + } + } + return nil +} diff --git a/pkg/store/telemetry.go b/pkg/store/telemetry.go index a854daaf0c6..5a5167c9375 100644 --- a/pkg/store/telemetry.go +++ b/pkg/store/telemetry.go @@ -8,6 +8,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/thanos-io/thanos/pkg/store/storepb" ) @@ -86,3 +87,60 @@ type NoopSeriesStatsAggregator struct{} func (s *NoopSeriesStatsAggregator) Aggregate(_ storepb.SeriesStatsCounter) {} func (s *NoopSeriesStatsAggregator) Observe(_ float64) {} + +// instrumentedStoreServer is a storepb.StoreServer that exposes metrics about Series requests. +type instrumentedStoreServer struct { + storepb.StoreServer + seriesRequested prometheus.Histogram + chunksRequested prometheus.Histogram +} + +// NewInstrumentedStoreServer creates a new instrumentedStoreServer. +func NewInstrumentedStoreServer(reg prometheus.Registerer, store storepb.StoreServer) storepb.StoreServer { + return &instrumentedStoreServer{ + StoreServer: store, + seriesRequested: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + Name: "thanos_store_server_series_requested", + Help: "Number of requested series for Series calls", + Buckets: []float64{1, 10, 100, 1000, 10000, 100000, 1000000}, + }), + chunksRequested: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + Name: "thanos_store_server_chunks_requested", + Help: "Number of requested chunks for Series calls", + Buckets: []float64{1, 100, 1000, 10000, 100000, 10000000, 100000000, 1000000000}, + }), + } +} + +func (s *instrumentedStoreServer) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { + instrumented := newInstrumentedServer(srv) + if err := s.StoreServer.Series(req, instrumented); err != nil { + return err + } + + s.seriesRequested.Observe(instrumented.seriesSent) + s.chunksRequested.Observe(instrumented.chunksSent) + return nil +} + +// instrumentedServer is a storepb.Store_SeriesServer that tracks statistics about sent series. +type instrumentedServer struct { + storepb.Store_SeriesServer + seriesSent float64 + chunksSent float64 +} + +func newInstrumentedServer(upstream storepb.Store_SeriesServer) *instrumentedServer { + return &instrumentedServer{Store_SeriesServer: upstream} +} + +func (i *instrumentedServer) Send(response *storepb.SeriesResponse) error { + if err := i.Store_SeriesServer.Send(response); err != nil { + return err + } + if series := response.GetSeries(); series != nil { + i.seriesSent++ + i.chunksSent += float64(len(series.Chunks)) + } + return nil +}