Skip to content

Commit

Permalink
Instrumented servers (thanos-io#6074)
Browse files Browse the repository at this point in the history
* Add instrumentation to Store servers

This commit adds an instrumented store server that exposes metrics
about Series requests and uses it as a decorator around all Store APIs.

The instrumented store currenly exposes only two metrics, series requested
and chunks requested. Additional metrics can be added as needed.

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

* Add rate limits to Store servers

This commit implements a RateLimited store server which can be used
to apply various limits to Series calls in components that implement
the Store API.

Rate limits are disabled by default but can be enabled selectively
for each individual Thanos component.

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

* Add CHANGELOG entry

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

* Reuse existing limiters

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

* Fix chunks limit binding

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

* Unify flag names

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

* Run make docs

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

* Add another series bucket

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

* Code review comments

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

* Fix changelog

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

* Rename flags

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

---------

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski authored and Nathaniel Graham committed Apr 17, 2023
1 parent d953f75 commit 4de733e
Show file tree
Hide file tree
Showing 14 changed files with 348 additions and 45 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#5926](https://github.com/thanos-io/thanos/pull/5926) Receiver: Add experimental string interning in writer. Can be enabled with a hidden flag `writer.intern`.
- [#5773](https://github.com/thanos-io/thanos/pull/5773) Store: Support disable cache index header file.
- [#5653](https://github.com/thanos-io/thanos/pull/5653) Receive: Allow setting hashing algorithm per tenant in hashrings config
- [#6074](https://github.com/thanos-io/thanos/pull/6074) *: Add histogram metrics `thanos_store_server_series_requested` and `thanos_store_server_chunks_requested` to all Stores.
- [#6074](https://github.com/thanos-io/thanos/pull/6074) *: Allow configuring series and sample limits per `Series` request for all Stores.
- [#6104](https://github.com/thanos-io/thanos/pull/6104) Objstore: Support S3 session token.
- [#5548](https://github.com/thanos-io/thanos/pull/5548) Query: Added experimental support for load balancing across multiple Store endpoints.

Expand Down
8 changes: 7 additions & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ func registerQuery(app *extkingpin.App) {
queryTelemetrySamplesQuantiles := cmd.Flag("query.telemetry.request-samples-quantiles", "The quantiles for exporting metrics about the samples count quantiles.").Default("100", "1000", "10000", "100000", "1000000").Int64List()
queryTelemetrySeriesQuantiles := cmd.Flag("query.telemetry.request-series-seconds-quantiles", "The quantiles for exporting metrics about the series count quantiles.").Default("10", "100", "1000", "10000", "100000").Int64List()

var storeRateLimits store.SeriesSelectLimits
storeRateLimits.RegisterFlags(cmd)

cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
selectorLset, err := parseFlagLabels(*selectorLabels)
if err != nil {
Expand Down Expand Up @@ -329,6 +332,7 @@ func registerQuery(app *extkingpin.App) {
*queryTelemetrySamplesQuantiles,
*queryTelemetrySeriesQuantiles,
promqlEngineType(*promqlEngine),
storeRateLimits,
)
})
}
Expand Down Expand Up @@ -407,6 +411,7 @@ func runQuery(
queryTelemetrySamplesQuantiles []int64,
queryTelemetrySeriesQuantiles []int64,
promqlEngine promqlEngineType,
storeRateLimits store.SeriesSelectLimits,
) error {
if alertQueryURL == "" {
lastColon := strings.LastIndex(httpBindAddr, ":")
Expand Down Expand Up @@ -782,9 +787,10 @@ func runQuery(
)

grpcAPI := apiv1.NewGRPCAPI(time.Now, queryReplicaLabels, queryableCreator, queryEngine, lookbackDeltaCreator, instantDefaultMaxSourceResolution)
storeServer := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, proxy), reg, storeRateLimits)
s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe,
grpcserver.WithServer(apiv1.RegisterQueryServer(grpcAPI)),
grpcserver.WithServer(store.RegisterStoreServer(proxy, logger)),
grpcserver.WithServer(store.RegisterStoreServer(storeServer, logger)),
grpcserver.WithServer(rules.RegisterRulesServer(rulesProxy)),
grpcserver.WithServer(targets.RegisterTargetsServer(targetsProxy)),
grpcserver.WithServer(metadata.RegisterMetadataServer(metadataProxy)),
Expand Down
19 changes: 11 additions & 8 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,8 @@ func runReceive(
writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs, conf.writerInterning)

var limitsConfig *receive.RootLimitsConfig
if conf.limitsConfig != nil {
limitsContentYaml, err := conf.limitsConfig.Content()
if conf.writeLimitsConfig != nil {
limitsContentYaml, err := conf.writeLimitsConfig.Content()
if err != nil {
return errors.Wrap(err, "get content of limit configuration")
}
Expand All @@ -220,7 +220,7 @@ func runReceive(
return errors.Wrap(err, "parse limit configuration")
}
}
limiter, err := receive.NewLimiter(conf.limitsConfig, reg, receiveMode, log.With(logger, "component", "receive-limiter"))
limiter, err := receive.NewLimiter(conf.writeLimitsConfig, reg, receiveMode, log.With(logger, "component", "receive-limiter"))
if err != nil {
return errors.Wrap(err, "creating limiter")
}
Expand Down Expand Up @@ -305,7 +305,7 @@ func runReceive(
return errors.Wrap(err, "setup gRPC server")
}

mts := store.NewProxyStore(
proxy := store.NewProxyStore(
logger,
reg,
dbs.TSDBLocalClients,
Expand All @@ -314,17 +314,18 @@ func runReceive(
0,
store.LazyRetrieval,
)
mts := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, proxy), reg, conf.storeRateLimits)
rw := store.ReadWriteTSDBStore{
StoreServer: mts,
WriteableStoreServer: webHandler,
}

infoSrv := info.NewInfoServer(
component.Receive.String(),
info.WithLabelSetFunc(func() []labelpb.ZLabelSet { return mts.LabelSet() }),
info.WithLabelSetFunc(func() []labelpb.ZLabelSet { return proxy.LabelSet() }),
info.WithStoreInfoFunc(func() *infopb.StoreInfo {
if httpProbe.IsReady() {
minTime, maxTime := mts.TimeRange()
minTime, maxTime := proxy.TimeRange()
return &infopb.StoreInfo{
MinTime: minTime,
MaxTime: maxTime,
Expand Down Expand Up @@ -798,12 +799,14 @@ type receiveConfig struct {
reqLogConfig *extflag.PathOrContent
relabelConfigPath *extflag.PathOrContent

limitsConfig *extflag.PathOrContent
writeLimitsConfig *extflag.PathOrContent
storeRateLimits store.SeriesSelectLimits
}

func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
rc.httpBindAddr, rc.httpGracePeriod, rc.httpTLSConfig = extkingpin.RegisterHTTPFlags(cmd)
rc.grpcBindAddr, rc.grpcGracePeriod, rc.grpcCert, rc.grpcKey, rc.grpcClientCA, rc.grpcMaxConnAge = extkingpin.RegisterGRPCFlags(cmd)
rc.storeRateLimits.RegisterFlags(cmd)

cmd.Flag("remote-write.address", "Address to listen on for remote write requests.").
Default("0.0.0.0:19291").StringVar(&rc.rwAddress)
Expand Down Expand Up @@ -919,7 +922,7 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {

rc.reqLogConfig = extkingpin.RegisterRequestLoggingFlags(cmd)

rc.limitsConfig = extflag.RegisterPathOrContent(cmd, "receive.limits-config", "YAML file that contains limit configuration.", extflag.WithEnvSubstitution(), extflag.WithHidden())
rc.writeLimitsConfig = extflag.RegisterPathOrContent(cmd, "receive.limits-config", "YAML file that contains limit configuration.", extflag.WithEnvSubstitution(), extflag.WithHidden())
}

// determineMode returns the ReceiverMode that this receiver is configured to run in.
Expand Down
5 changes: 4 additions & 1 deletion cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ type ruleConfig struct {
dataDir string
lset labels.Labels
ignoredLabelNames []string
storeRateLimits store.SeriesSelectLimits
}

func (rc *ruleConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand All @@ -103,6 +104,7 @@ func (rc *ruleConfig) registerFlag(cmd extkingpin.FlagClause) {
rc.shipper.registerFlag(cmd)
rc.query.registerFlag(cmd)
rc.alertmgr.registerFlag(cmd)
rc.storeRateLimits.RegisterFlags(cmd)
}

// registerRule registers a rule command.
Expand Down Expand Up @@ -634,7 +636,8 @@ func runRule(
return nil
}),
)
options = append(options, grpcserver.WithServer(store.RegisterStoreServer(tsdbStore, logger)))
storeServer := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, tsdbStore), reg, conf.storeRateLimits)
options = append(options, grpcserver.WithServer(store.RegisterStoreServer(storeServer, logger)))
}

options = append(options, grpcserver.WithServer(
Expand Down
23 changes: 13 additions & 10 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,9 @@ func runSidecar(
info.WithMetricMetadataInfoFunc(),
)

storeServer := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, promStore), reg, conf.storeRateLimits)
s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe,
grpcserver.WithServer(store.RegisterStoreServer(promStore, logger)),
grpcserver.WithServer(store.RegisterStoreServer(storeServer, logger)),
grpcserver.WithServer(rules.RegisterRulesServer(rules.NewPrometheus(conf.prometheus.url, c, m.Labels))),
grpcserver.WithServer(targets.RegisterTargetsServer(targets.NewPrometheus(conf.prometheus.url, c, m.Labels))),
grpcserver.WithServer(meta.RegisterMetadataServer(meta.NewPrometheus(conf.prometheus.url, c))),
Expand Down Expand Up @@ -474,15 +475,16 @@ func (s *promMetadata) Version() string {
}

type sidecarConfig struct {
http httpConfig
grpc grpcConfig
prometheus prometheusConfig
tsdb tsdbConfig
reloader reloaderConfig
reqLogConfig *extflag.PathOrContent
objStore extflag.PathOrContent
shipper shipperConfig
limitMinTime thanosmodel.TimeOrDurationValue
http httpConfig
grpc grpcConfig
prometheus prometheusConfig
tsdb tsdbConfig
reloader reloaderConfig
reqLogConfig *extflag.PathOrContent
objStore extflag.PathOrContent
shipper shipperConfig
limitMinTime thanosmodel.TimeOrDurationValue
storeRateLimits store.SeriesSelectLimits
}

func (sc *sidecarConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand All @@ -494,6 +496,7 @@ func (sc *sidecarConfig) registerFlag(cmd extkingpin.FlagClause) {
sc.reqLogConfig = extkingpin.RegisterRequestLoggingFlags(cmd)
sc.objStore = *extkingpin.RegisterCommonObjStoreFlags(cmd, "", false)
sc.shipper.registerFlag(cmd)
sc.storeRateLimits.RegisterFlags(cmd)
cmd.Flag("min-time", "Start of time range limit to serve. Thanos sidecar will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("0000-01-01T00:00:00Z").SetValue(&sc.limitMinTime)
}
20 changes: 8 additions & 12 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ type storeConfig struct {
indexCacheSizeBytes units.Base2Bytes
chunkPoolSize units.Base2Bytes
seriesBatchSize int
maxSampleCount uint64
maxTouchedSeriesCount uint64
storeRateLimits store.SeriesSelectLimits
maxDownloadedBytes units.Base2Bytes
maxConcurrency int
component component.StoreAPI
Expand All @@ -90,6 +89,7 @@ type storeConfig struct {
func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
sc.httpConfig = *sc.httpConfig.registerFlag(cmd)
sc.grpcConfig = *sc.grpcConfig.registerFlag(cmd)
sc.storeRateLimits.RegisterFlags(cmd)

cmd.Flag("data-dir", "Local data directory used for caching purposes (index-header, in-mem cache items and meta.jsons). If removed, no data will be lost, just store will have to rebuild the cache. NOTE: Putting raw blocks here will not cause the store to read them. For such use cases use Prometheus + sidecar. Ignored if -no-cache-index-header option is specified.").
Default("./data").StringVar(&sc.dataDir)
Expand All @@ -113,13 +113,8 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
cmd.Flag("chunk-pool-size", "Maximum size of concurrently allocatable bytes reserved strictly to reuse for chunks in memory.").
Default("2GB").BytesVar(&sc.chunkPoolSize)

cmd.Flag("store.grpc.series-sample-limit",
"Maximum amount of samples returned via a single Series call. The Series call fails if this limit is exceeded. 0 means no limit. NOTE: For efficiency the limit is internally implemented as 'chunks limit' considering each chunk contains 120 samples (it's the max number of samples each chunk can contain), so the actual number of samples might be lower, even though the maximum could be hit.").
Default("0").Uint64Var(&sc.maxSampleCount)

cmd.Flag("store.grpc.touched-series-limit",
"Maximum amount of touched series returned via a single Series call. The Series call fails if this limit is exceeded. 0 means no limit.").
Default("0").Uint64Var(&sc.maxTouchedSeriesCount)
cmd.Flag("store.grpc.touched-series-limit", "DEPRECATED: use store.limits.request-series.").Default("0").Uint64Var(&sc.storeRateLimits.SeriesPerRequest)
cmd.Flag("store.grpc.series-sample-limit", "DEPRECATED: use store.limits.request-samples.").Default("0").Uint64Var(&sc.storeRateLimits.SamplesPerRequest)

cmd.Flag("store.grpc.downloaded-bytes-limit",
"Maximum amount of downloaded (either fetched or touched) bytes in a single Series/LabelNames/LabelValues call. The Series call fails if this limit is exceeded. 0 means no limit.").
Expand Down Expand Up @@ -370,8 +365,8 @@ func runStore(
bkt,
metaFetcher,
dataDir,
store.NewChunksLimiterFactory(conf.maxSampleCount/store.MaxSamplesPerChunk), // The samples limit is an approximation based on the max number of samples per chunk.
store.NewSeriesLimiterFactory(conf.maxTouchedSeriesCount),
store.NewChunksLimiterFactory(conf.storeRateLimits.SamplesPerRequest/store.MaxSamplesPerChunk), // The samples limit is an approximation based on the max number of samples per chunk.
store.NewSeriesLimiterFactory(conf.storeRateLimits.SeriesPerRequest),
store.NewBytesLimiterFactory(conf.maxDownloadedBytes),
store.NewGapBasedPartitioner(store.PartitionerMaxGapSize),
conf.blockSyncConcurrency,
Expand Down Expand Up @@ -453,8 +448,9 @@ func runStore(
return errors.Wrap(err, "setup gRPC server")
}

storeServer := store.NewInstrumentedStoreServer(reg, bs)
s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, conf.component, grpcProbe,
grpcserver.WithServer(store.RegisterStoreServer(bs, logger)),
grpcserver.WithServer(store.RegisterStoreServer(storeServer, logger)),
grpcserver.WithServer(info.RegisterInfoServer(infoSrv)),
grpcserver.WithListen(conf.grpcConfig.bindAddress),
grpcserver.WithGracePeriod(time.Duration(conf.grpcConfig.gracePeriod)),
Expand Down
11 changes: 11 additions & 0 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,17 @@ Flags:
that are always used, even if the health check
fails. Useful if you have a caching layer on
top.
--store.limits.request-samples=0
The maximum samples allowed for a single
Series request, The Series call fails if
this limit is exceeded. 0 means no limit.
NOTE: For efficiency the limit is internally
implemented as 'chunks limit' considering each
chunk contains a maximum of 120 samples.
--store.limits.request-series=0
The maximum series allowed for a single Series
request. The Series call fails if this limit is
exceeded. 0 means no limit.
--store.response-timeout=0ms
If a Store doesn't send any data in this
specified duration then a Store will be ignored
Expand Down
11 changes: 11 additions & 0 deletions docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,17 @@ Flags:
Path to YAML file with request logging
configuration. See format details:
https://thanos.io/tip/thanos/logging.md/#configuration
--store.limits.request-samples=0
The maximum samples allowed for a single
Series request, The Series call fails if
this limit is exceeded. 0 means no limit.
NOTE: For efficiency the limit is internally
implemented as 'chunks limit' considering each
chunk contains a maximum of 120 samples.
--store.limits.request-series=0
The maximum series allowed for a single Series
request. The Series call fails if this limit is
exceeded. 0 means no limit.
--tracing.config=<content>
Alternative to 'tracing.config-file' flag
(mutually exclusive). Content of YAML file
Expand Down
11 changes: 11 additions & 0 deletions docs/components/rule.md
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,17 @@ Flags:
Works only if compaction is disabled on
Prometheus. Do it once and then disable the
flag when done.
--store.limits.request-samples=0
The maximum samples allowed for a single
Series request, The Series call fails if
this limit is exceeded. 0 means no limit.
NOTE: For efficiency the limit is internally
implemented as 'chunks limit' considering each
chunk contains a maximum of 120 samples.
--store.limits.request-series=0
The maximum series allowed for a single Series
request. The Series call fails if this limit is
exceeded. 0 means no limit.
--tracing.config=<content>
Alternative to 'tracing.config-file' flag
(mutually exclusive). Content of YAML file
Expand Down
11 changes: 11 additions & 0 deletions docs/components/sidecar.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,17 @@ Flags:
Works only if compaction is disabled on
Prometheus. Do it once and then disable the
flag when done.
--store.limits.request-samples=0
The maximum samples allowed for a single
Series request, The Series call fails if
this limit is exceeded. 0 means no limit.
NOTE: For efficiency the limit is internally
implemented as 'chunks limit' considering each
chunk contains a maximum of 120 samples.
--store.limits.request-series=0
The maximum series allowed for a single Series
request. The Series call fails if this limit is
exceeded. 0 means no limit.
--tracing.config=<content>
Alternative to 'tracing.config-file' flag
(mutually exclusive). Content of YAML file
Expand Down
Loading

0 comments on commit 4de733e

Please sign in to comment.