Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instrumented servers #6074

Merged
merged 12 commits into from
Feb 13, 2023
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#5926](https://github.com/thanos-io/thanos/pull/5926) Receiver: Add experimental string interning in writer. Can be enabled with a hidden flag `writer.intern`.
- [#5773](https://github.com/thanos-io/thanos/pull/5773) Store: Support disable cache index header file.
- [#5653](https://github.com/thanos-io/thanos/pull/5653) Receive: Allow setting hashing algorithm per tenant in hashrings config
- [#6074](https://github.com/thanos-io/thanos/pull/6074) *: Add histogram metrics `thanos_store_server_series_requested` and `thanos_store_server_chunks_requested` to all Stores.
- [#6074](https://github.com/thanos-io/thanos/pull/6074) *: Allow configuring series and sample limits per `Series` request for all Stores.

### Fixed

Expand Down
8 changes: 7 additions & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ func registerQuery(app *extkingpin.App) {
queryTelemetrySamplesQuantiles := cmd.Flag("query.telemetry.request-samples-quantiles", "The quantiles for exporting metrics about the samples count quantiles.").Default("100", "1000", "10000", "100000", "1000000").Int64List()
queryTelemetrySeriesQuantiles := cmd.Flag("query.telemetry.request-series-seconds-quantiles", "The quantiles for exporting metrics about the series count quantiles.").Default("10", "100", "1000", "10000", "100000").Int64List()

var storeRateLimits store.SeriesSelectLimits
storeRateLimits.RegisterFlags(cmd)

cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
selectorLset, err := parseFlagLabels(*selectorLabels)
if err != nil {
Expand Down Expand Up @@ -321,6 +324,7 @@ func registerQuery(app *extkingpin.App) {
*queryTelemetrySamplesQuantiles,
*queryTelemetrySeriesQuantiles,
promqlEngineType(*promqlEngine),
storeRateLimits,
)
})
}
Expand Down Expand Up @@ -397,6 +401,7 @@ func runQuery(
queryTelemetrySamplesQuantiles []int64,
queryTelemetrySeriesQuantiles []int64,
promqlEngine promqlEngineType,
storeRateLimits store.SeriesSelectLimits,
) error {
if alertQueryURL == "" {
lastColon := strings.LastIndex(httpBindAddr, ":")
Expand Down Expand Up @@ -759,9 +764,10 @@ func runQuery(
)

grpcAPI := apiv1.NewGRPCAPI(time.Now, queryReplicaLabels, queryableCreator, queryEngine, lookbackDeltaCreator, instantDefaultMaxSourceResolution)
storeServer := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, proxy), reg, storeRateLimits)
s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe,
grpcserver.WithServer(apiv1.RegisterQueryServer(grpcAPI)),
grpcserver.WithServer(store.RegisterStoreServer(proxy, logger)),
grpcserver.WithServer(store.RegisterStoreServer(storeServer, logger)),
grpcserver.WithServer(rules.RegisterRulesServer(rulesProxy)),
grpcserver.WithServer(targets.RegisterTargetsServer(targetsProxy)),
grpcserver.WithServer(metadata.RegisterMetadataServer(metadataProxy)),
Expand Down
19 changes: 11 additions & 8 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,8 @@ func runReceive(
writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs, conf.writerInterning)

var limitsConfig *receive.RootLimitsConfig
if conf.limitsConfig != nil {
limitsContentYaml, err := conf.limitsConfig.Content()
if conf.writeLimitsConfig != nil {
limitsContentYaml, err := conf.writeLimitsConfig.Content()
if err != nil {
return errors.Wrap(err, "get content of limit configuration")
}
Expand All @@ -220,7 +220,7 @@ func runReceive(
return errors.Wrap(err, "parse limit configuration")
}
}
limiter, err := receive.NewLimiter(conf.limitsConfig, reg, receiveMode, log.With(logger, "component", "receive-limiter"))
limiter, err := receive.NewLimiter(conf.writeLimitsConfig, reg, receiveMode, log.With(logger, "component", "receive-limiter"))
if err != nil {
return errors.Wrap(err, "creating limiter")
}
Expand Down Expand Up @@ -305,7 +305,7 @@ func runReceive(
return errors.Wrap(err, "setup gRPC server")
}

mts := store.NewProxyStore(
proxy := store.NewProxyStore(
logger,
reg,
dbs.TSDBLocalClients,
Expand All @@ -314,17 +314,18 @@ func runReceive(
0,
store.LazyRetrieval,
)
mts := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, proxy), reg, conf.storeRateLimits)
rw := store.ReadWriteTSDBStore{
StoreServer: mts,
WriteableStoreServer: webHandler,
}

infoSrv := info.NewInfoServer(
component.Receive.String(),
info.WithLabelSetFunc(func() []labelpb.ZLabelSet { return mts.LabelSet() }),
info.WithLabelSetFunc(func() []labelpb.ZLabelSet { return proxy.LabelSet() }),
info.WithStoreInfoFunc(func() *infopb.StoreInfo {
if httpProbe.IsReady() {
minTime, maxTime := mts.TimeRange()
minTime, maxTime := proxy.TimeRange()
return &infopb.StoreInfo{
MinTime: minTime,
MaxTime: maxTime,
Expand Down Expand Up @@ -798,12 +799,14 @@ type receiveConfig struct {
reqLogConfig *extflag.PathOrContent
relabelConfigPath *extflag.PathOrContent

limitsConfig *extflag.PathOrContent
writeLimitsConfig *extflag.PathOrContent
storeRateLimits store.SeriesSelectLimits
}

func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
rc.httpBindAddr, rc.httpGracePeriod, rc.httpTLSConfig = extkingpin.RegisterHTTPFlags(cmd)
rc.grpcBindAddr, rc.grpcGracePeriod, rc.grpcCert, rc.grpcKey, rc.grpcClientCA, rc.grpcMaxConnAge = extkingpin.RegisterGRPCFlags(cmd)
rc.storeRateLimits.RegisterFlags(cmd)

cmd.Flag("remote-write.address", "Address to listen on for remote write requests.").
Default("0.0.0.0:19291").StringVar(&rc.rwAddress)
Expand Down Expand Up @@ -919,7 +922,7 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {

rc.reqLogConfig = extkingpin.RegisterRequestLoggingFlags(cmd)

rc.limitsConfig = extflag.RegisterPathOrContent(cmd, "receive.limits-config", "YAML file that contains limit configuration.", extflag.WithEnvSubstitution(), extflag.WithHidden())
rc.writeLimitsConfig = extflag.RegisterPathOrContent(cmd, "receive.limits-config", "YAML file that contains limit configuration.", extflag.WithEnvSubstitution(), extflag.WithHidden())
}

// determineMode returns the ReceiverMode that this receiver is configured to run in.
Expand Down
5 changes: 4 additions & 1 deletion cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ type ruleConfig struct {
dataDir string
lset labels.Labels
ignoredLabelNames []string
storeRateLimits store.SeriesSelectLimits
}

func (rc *ruleConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand All @@ -103,6 +104,7 @@ func (rc *ruleConfig) registerFlag(cmd extkingpin.FlagClause) {
rc.shipper.registerFlag(cmd)
rc.query.registerFlag(cmd)
rc.alertmgr.registerFlag(cmd)
rc.storeRateLimits.RegisterFlags(cmd)
}

// registerRule registers a rule command.
Expand Down Expand Up @@ -634,7 +636,8 @@ func runRule(
return nil
}),
)
options = append(options, grpcserver.WithServer(store.RegisterStoreServer(tsdbStore, logger)))
storeServer := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, tsdbStore), reg, conf.storeRateLimits)
options = append(options, grpcserver.WithServer(store.RegisterStoreServer(storeServer, logger)))
}

options = append(options, grpcserver.WithServer(
Expand Down
23 changes: 13 additions & 10 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,9 @@ func runSidecar(
info.WithMetricMetadataInfoFunc(),
)

storeServer := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, promStore), reg, conf.storeRateLimits)
s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe,
grpcserver.WithServer(store.RegisterStoreServer(promStore, logger)),
grpcserver.WithServer(store.RegisterStoreServer(storeServer, logger)),
grpcserver.WithServer(rules.RegisterRulesServer(rules.NewPrometheus(conf.prometheus.url, c, m.Labels))),
grpcserver.WithServer(targets.RegisterTargetsServer(targets.NewPrometheus(conf.prometheus.url, c, m.Labels))),
grpcserver.WithServer(meta.RegisterMetadataServer(meta.NewPrometheus(conf.prometheus.url, c))),
Expand Down Expand Up @@ -474,15 +475,16 @@ func (s *promMetadata) Version() string {
}

type sidecarConfig struct {
http httpConfig
grpc grpcConfig
prometheus prometheusConfig
tsdb tsdbConfig
reloader reloaderConfig
reqLogConfig *extflag.PathOrContent
objStore extflag.PathOrContent
shipper shipperConfig
limitMinTime thanosmodel.TimeOrDurationValue
http httpConfig
grpc grpcConfig
prometheus prometheusConfig
tsdb tsdbConfig
reloader reloaderConfig
reqLogConfig *extflag.PathOrContent
objStore extflag.PathOrContent
shipper shipperConfig
limitMinTime thanosmodel.TimeOrDurationValue
storeRateLimits store.SeriesSelectLimits
}

func (sc *sidecarConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand All @@ -494,6 +496,7 @@ func (sc *sidecarConfig) registerFlag(cmd extkingpin.FlagClause) {
sc.reqLogConfig = extkingpin.RegisterRequestLoggingFlags(cmd)
sc.objStore = *extkingpin.RegisterCommonObjStoreFlags(cmd, "", false)
sc.shipper.registerFlag(cmd)
sc.storeRateLimits.RegisterFlags(cmd)
cmd.Flag("min-time", "Start of time range limit to serve. Thanos sidecar will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("0000-01-01T00:00:00Z").SetValue(&sc.limitMinTime)
}
20 changes: 8 additions & 12 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ type storeConfig struct {
indexCacheSizeBytes units.Base2Bytes
chunkPoolSize units.Base2Bytes
seriesBatchSize int
maxSampleCount uint64
maxTouchedSeriesCount uint64
storeRateLimits store.SeriesSelectLimits
maxDownloadedBytes units.Base2Bytes
maxConcurrency int
component component.StoreAPI
Expand All @@ -90,6 +89,7 @@ type storeConfig struct {
func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
sc.httpConfig = *sc.httpConfig.registerFlag(cmd)
sc.grpcConfig = *sc.grpcConfig.registerFlag(cmd)
sc.storeRateLimits.RegisterFlags(cmd)

cmd.Flag("data-dir", "Local data directory used for caching purposes (index-header, in-mem cache items and meta.jsons). If removed, no data will be lost, just store will have to rebuild the cache. NOTE: Putting raw blocks here will not cause the store to read them. For such use cases use Prometheus + sidecar. Ignored if -no-cache-index-header option is specified.").
Default("./data").StringVar(&sc.dataDir)
Expand All @@ -113,13 +113,8 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
cmd.Flag("chunk-pool-size", "Maximum size of concurrently allocatable bytes reserved strictly to reuse for chunks in memory.").
Default("2GB").BytesVar(&sc.chunkPoolSize)

cmd.Flag("store.grpc.series-sample-limit",
"Maximum amount of samples returned via a single Series call. The Series call fails if this limit is exceeded. 0 means no limit. NOTE: For efficiency the limit is internally implemented as 'chunks limit' considering each chunk contains 120 samples (it's the max number of samples each chunk can contain), so the actual number of samples might be lower, even though the maximum could be hit.").
Default("0").Uint64Var(&sc.maxSampleCount)

cmd.Flag("store.grpc.touched-series-limit",
"Maximum amount of touched series returned via a single Series call. The Series call fails if this limit is exceeded. 0 means no limit.").
Default("0").Uint64Var(&sc.maxTouchedSeriesCount)
cmd.Flag("store.grpc.touched-series-limit", "DEPRECATED: use store.limits.request-series.").Default("0").Uint64Var(&sc.storeRateLimits.SeriesPerRequest)
cmd.Flag("store.grpc.series-sample-limit", "DEPRECATED: use store.limits.request-samples.").Default("0").Uint64Var(&sc.storeRateLimits.SamplesPerRequest)

cmd.Flag("store.grpc.downloaded-bytes-limit",
"Maximum amount of downloaded (either fetched or touched) bytes in a single Series/LabelNames/LabelValues call. The Series call fails if this limit is exceeded. 0 means no limit.").
Expand Down Expand Up @@ -370,8 +365,8 @@ func runStore(
bkt,
metaFetcher,
dataDir,
store.NewChunksLimiterFactory(conf.maxSampleCount/store.MaxSamplesPerChunk), // The samples limit is an approximation based on the max number of samples per chunk.
store.NewSeriesLimiterFactory(conf.maxTouchedSeriesCount),
store.NewChunksLimiterFactory(conf.storeRateLimits.SamplesPerRequest/store.MaxSamplesPerChunk), // The samples limit is an approximation based on the max number of samples per chunk.
store.NewSeriesLimiterFactory(conf.storeRateLimits.SeriesPerRequest),
store.NewBytesLimiterFactory(conf.maxDownloadedBytes),
store.NewGapBasedPartitioner(store.PartitionerMaxGapSize),
conf.blockSyncConcurrency,
Expand Down Expand Up @@ -453,8 +448,9 @@ func runStore(
return errors.Wrap(err, "setup gRPC server")
}

storeServer := store.NewInstrumentedStoreServer(reg, bs)
s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, conf.component, grpcProbe,
grpcserver.WithServer(store.RegisterStoreServer(bs, logger)),
grpcserver.WithServer(store.RegisterStoreServer(storeServer, logger)),
grpcserver.WithServer(info.RegisterInfoServer(infoSrv)),
grpcserver.WithListen(conf.grpcConfig.bindAddress),
grpcserver.WithGracePeriod(time.Duration(conf.grpcConfig.gracePeriod)),
Expand Down
11 changes: 11 additions & 0 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,17 @@ Flags:
that are always used, even if the health check
fails. Useful if you have a caching layer on
top.
--store.limits.request-samples=0
The maximum samples allowed for a single
Series request, The Series call fails if
this limit is exceeded. 0 means no limit.
NOTE: For efficiency the limit is internally
implemented as 'chunks limit' considering each
chunk contains a maximum of 120 samples.
--store.limits.request-series=0
The maximum series allowed for a single Series
request. The Series call fails if this limit is
exceeded. 0 means no limit.
--store.response-timeout=0ms
If a Store doesn't send any data in this
specified duration then a Store will be ignored
Expand Down
11 changes: 11 additions & 0 deletions docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,17 @@ Flags:
Path to YAML file with request logging
configuration. See format details:
https://thanos.io/tip/thanos/logging.md/#configuration
--store.limits.request-samples=0
The maximum samples allowed for a single
Series request, The Series call fails if
this limit is exceeded. 0 means no limit.
NOTE: For efficiency the limit is internally
implemented as 'chunks limit' considering each
chunk contains a maximum of 120 samples.
--store.limits.request-series=0
The maximum series allowed for a single Series
request. The Series call fails if this limit is
exceeded. 0 means no limit.
--tracing.config=<content>
Alternative to 'tracing.config-file' flag
(mutually exclusive). Content of YAML file
Expand Down
11 changes: 11 additions & 0 deletions docs/components/rule.md
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,17 @@ Flags:
Works only if compaction is disabled on
Prometheus. Do it once and then disable the
flag when done.
--store.limits.request-samples=0
The maximum samples allowed for a single
Series request, The Series call fails if
this limit is exceeded. 0 means no limit.
NOTE: For efficiency the limit is internally
implemented as 'chunks limit' considering each
chunk contains a maximum of 120 samples.
--store.limits.request-series=0
The maximum series allowed for a single Series
request. The Series call fails if this limit is
exceeded. 0 means no limit.
--tracing.config=<content>
Alternative to 'tracing.config-file' flag
(mutually exclusive). Content of YAML file
Expand Down
11 changes: 11 additions & 0 deletions docs/components/sidecar.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,17 @@ Flags:
Works only if compaction is disabled on
Prometheus. Do it once and then disable the
flag when done.
--store.limits.request-samples=0
The maximum samples allowed for a single
Series request, The Series call fails if
this limit is exceeded. 0 means no limit.
NOTE: For efficiency the limit is internally
implemented as 'chunks limit' considering each
chunk contains a maximum of 120 samples.
--store.limits.request-series=0
The maximum series allowed for a single Series
request. The Series call fails if this limit is
exceeded. 0 means no limit.
--tracing.config=<content>
Alternative to 'tracing.config-file' flag
(mutually exclusive). Content of YAML file
Expand Down
23 changes: 12 additions & 11 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,19 +178,20 @@ Flags:
--store.grpc.series-max-concurrency=20
Maximum number of concurrent Series calls.
--store.grpc.series-sample-limit=0
Maximum amount of samples returned via a
single Series call. The Series call fails
if this limit is exceeded. 0 means no limit.
NOTE: For efficiency the limit is internally
implemented as 'chunks limit' considering
each chunk contains 120 samples (it's the max
number of samples each chunk can contain),
so the actual number of samples might be lower,
even though the maximum could be hit.
DEPRECATED: use store.limits.request-samples.
--store.grpc.touched-series-limit=0
Maximum amount of touched series returned via
a single Series call. The Series call fails if
DEPRECATED: use store.limits.request-series.
--store.limits.request-samples=0
The maximum samples allowed for a single
Series request, The Series call fails if
this limit is exceeded. 0 means no limit.
NOTE: For efficiency the limit is internally
implemented as 'chunks limit' considering each
chunk contains a maximum of 120 samples.
--store.limits.request-series=0
The maximum series allowed for a single Series
request. The Series call fails if this limit is
exceeded. 0 means no limit.
--sync-block-duration=3m Repeat interval for syncing the blocks between
local and remote view.
--tracing.config=<content>
Expand Down
Loading