From 011dbb3d269d6e033b5996c22a8cf429f9c37b0e Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Wed, 8 Feb 2023 13:21:23 +0100 Subject: [PATCH] Code review comments Signed-off-by: Filip Petkovski --- CHANGELOG.md | 1 - cmd/thanos/query.go | 6 +++--- cmd/thanos/receive.go | 4 ++-- cmd/thanos/rule.go | 4 ++-- cmd/thanos/sidecar.go | 4 ++-- cmd/thanos/store.go | 2 +- pkg/store/limiter.go | 39 ++++++++++++++++++++++----------------- pkg/store/limiter_test.go | 14 +++++++------- 8 files changed, 39 insertions(+), 35 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0c2e3b5cb2..44b2697e7b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,7 +23,6 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Changed - [#6035](https://github.com/thanos-io/thanos/pull/6035) Replicate: Support all types of matchers to match blocks for replication. Change matcher parameter from string slice to a single string. ->>>>>>> 0dbde4f1 (Add CHANGELOG entry) ### Fixed diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 94cdf5cdb8..dde61e9f37 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -205,7 +205,7 @@ func registerQuery(app *extkingpin.App) { queryTelemetrySamplesQuantiles := cmd.Flag("query.telemetry.request-samples-quantiles", "The quantiles for exporting metrics about the samples count quantiles.").Default("100", "1000", "10000", "100000", "1000000").Int64List() queryTelemetrySeriesQuantiles := cmd.Flag("query.telemetry.request-series-seconds-quantiles", "The quantiles for exporting metrics about the series count quantiles.").Default("10", "100", "1000", "10000", "100000").Int64List() - var storeRateLimits store.RateLimits + var storeRateLimits store.SeriesSelectLimits storeRateLimits.RegisterFlags(cmd) cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error { @@ -401,7 +401,7 @@ func runQuery( queryTelemetrySamplesQuantiles []int64, queryTelemetrySeriesQuantiles []int64, promqlEngine promqlEngineType, - storeRateLimits store.RateLimits, + storeRateLimits store.SeriesSelectLimits, ) error { if alertQueryURL == "" { lastColon := strings.LastIndex(httpBindAddr, ":") @@ -764,7 +764,7 @@ func runQuery( ) grpcAPI := apiv1.NewGRPCAPI(time.Now, queryReplicaLabels, queryableCreator, queryEngine, lookbackDeltaCreator, instantDefaultMaxSourceResolution) - storeServer := store.NewRateLimitedStoreServer(store.NewInstrumentedStoreServer(reg, proxy), reg, storeRateLimits) + storeServer := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, proxy), reg, storeRateLimits) s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe, grpcserver.WithServer(apiv1.RegisterQueryServer(grpcAPI)), grpcserver.WithServer(store.RegisterStoreServer(storeServer, logger)), diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 632409ad1a..a70edc16d3 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -314,7 +314,7 @@ func runReceive( 0, store.LazyRetrieval, ) - mts := store.NewRateLimitedStoreServer(store.NewInstrumentedStoreServer(reg, proxy), reg, conf.storeRateLimits) + mts := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, proxy), reg, conf.storeRateLimits) rw := store.ReadWriteTSDBStore{ StoreServer: mts, WriteableStoreServer: webHandler, @@ -800,7 +800,7 @@ type receiveConfig struct { relabelConfigPath *extflag.PathOrContent writeLimitsConfig *extflag.PathOrContent - storeRateLimits store.RateLimits + storeRateLimits store.SeriesSelectLimits } func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index ce9433450b..01df532413 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -94,7 +94,7 @@ type ruleConfig struct { dataDir string lset labels.Labels ignoredLabelNames []string - storeRateLimits store.RateLimits + storeRateLimits store.SeriesSelectLimits } func (rc *ruleConfig) registerFlag(cmd extkingpin.FlagClause) { @@ -636,7 +636,7 @@ func runRule( return nil }), ) - storeServer := store.NewRateLimitedStoreServer(store.NewInstrumentedStoreServer(reg, tsdbStore), reg, conf.storeRateLimits) + storeServer := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, tsdbStore), reg, conf.storeRateLimits) options = append(options, grpcserver.WithServer(store.RegisterStoreServer(storeServer, logger))) } diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index be9db038d3..2206d8c9dc 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -282,7 +282,7 @@ func runSidecar( info.WithMetricMetadataInfoFunc(), ) - storeServer := store.NewRateLimitedStoreServer(store.NewInstrumentedStoreServer(reg, promStore), reg, conf.storeRateLimits) + storeServer := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, promStore), reg, conf.storeRateLimits) s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe, grpcserver.WithServer(store.RegisterStoreServer(storeServer, logger)), grpcserver.WithServer(rules.RegisterRulesServer(rules.NewPrometheus(conf.prometheus.url, c, m.Labels))), @@ -484,7 +484,7 @@ type sidecarConfig struct { objStore extflag.PathOrContent shipper shipperConfig limitMinTime thanosmodel.TimeOrDurationValue - storeRateLimits store.RateLimits + storeRateLimits store.SeriesSelectLimits } func (sc *sidecarConfig) registerFlag(cmd extkingpin.FlagClause) { diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index d3e61c87e7..eee2525b94 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -64,7 +64,7 @@ type storeConfig struct { indexCacheSizeBytes units.Base2Bytes chunkPoolSize units.Base2Bytes seriesBatchSize int - storeRateLimits store.RateLimits + storeRateLimits store.SeriesSelectLimits maxDownloadedBytes units.Base2Bytes maxConcurrency int component component.StoreAPI diff --git a/pkg/store/limiter.go b/pkg/store/limiter.go index eab0779767..a19d20b3ea 100644 --- a/pkg/store/limiter.go +++ b/pkg/store/limiter.go @@ -100,30 +100,33 @@ func NewBytesLimiterFactory(limit units.Base2Bytes) BytesLimiterFactory { } } -type RateLimits struct { +// SeriesSelectLimits are limits applied against individual Series calls. +type SeriesSelectLimits struct { SeriesPerRequest uint64 SamplesPerRequest uint64 } -func (l *RateLimits) RegisterFlags(cmd extkingpin.FlagClause) { +func (l *SeriesSelectLimits) RegisterFlags(cmd extkingpin.FlagClause) { cmd.Flag("store.grpc.series-limit", "The maximum series allowed for a single Series request. The Series call fails if this limit is exceeded. 0 means no limit.").Default("0").Uint64Var(&l.SeriesPerRequest) cmd.Flag("store.grpc.samples-limit", "The maximum samples allowed for a single Series request, The Series call fails if this limit is exceeded. 0 means no limit. NOTE: For efficiency the limit is internally implemented as 'chunks limit' considering each chunk contains a maximum of 120 samples.").Default("0").Uint64Var(&l.SamplesPerRequest) } -// rateLimitedStoreServer is a storepb.StoreServer that can apply rate limits against Series requests. -type rateLimitedStoreServer struct { +var _ storepb.StoreServer = &limitedStoreServer{} + +// limitedStoreServer is a storepb.StoreServer that can apply series and sample limits against individual Series requests. +type limitedStoreServer struct { storepb.StoreServer newSeriesLimiter SeriesLimiterFactory newSamplesLimiter ChunksLimiterFactory failedRequestsCounter *prometheus.CounterVec } -// NewRateLimitedStoreServer creates a new rateLimitedStoreServer. -func NewRateLimitedStoreServer(store storepb.StoreServer, reg prometheus.Registerer, rateLimits RateLimits) storepb.StoreServer { - return &rateLimitedStoreServer{ +// NewLimitedStoreServer creates a new limitedStoreServer. +func NewLimitedStoreServer(store storepb.StoreServer, reg prometheus.Registerer, selectLimits SeriesSelectLimits) storepb.StoreServer { + return &limitedStoreServer{ StoreServer: store, - newSeriesLimiter: NewSeriesLimiterFactory(rateLimits.SeriesPerRequest), - newSamplesLimiter: NewChunksLimiterFactory(rateLimits.SamplesPerRequest), + newSeriesLimiter: NewSeriesLimiterFactory(selectLimits.SeriesPerRequest), + newSamplesLimiter: NewChunksLimiterFactory(selectLimits.SamplesPerRequest), failedRequestsCounter: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_store_selects_dropped_total", Help: "Number of select queries that were dropped due to configured limits.", @@ -131,33 +134,35 @@ func NewRateLimitedStoreServer(store storepb.StoreServer, reg prometheus.Registe } } -func (s *rateLimitedStoreServer) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { +func (s *limitedStoreServer) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { seriesLimiter := s.newSeriesLimiter(s.failedRequestsCounter.WithLabelValues("series")) chunksLimiter := s.newSamplesLimiter(s.failedRequestsCounter.WithLabelValues("chunks")) - rateLimitedSrv := newRateLimitedServer(srv, seriesLimiter, chunksLimiter) - if err := s.StoreServer.Series(req, rateLimitedSrv); err != nil { + limitedSrv := newLimitedServer(srv, seriesLimiter, chunksLimiter) + if err := s.StoreServer.Series(req, limitedSrv); err != nil { return err } return nil } -// rateLimitedServer is a storepb.Store_SeriesServer that tracks statistics about sent series. -type rateLimitedServer struct { +var _ storepb.Store_SeriesServer = &limitedServer{} + +// limitedServer is a storepb.Store_SeriesServer that tracks statistics about sent series. +type limitedServer struct { storepb.Store_SeriesServer seriesLimiter SeriesLimiter samplesLimiter ChunksLimiter } -func newRateLimitedServer(upstream storepb.Store_SeriesServer, seriesLimiter SeriesLimiter, chunksLimiter ChunksLimiter) *rateLimitedServer { - return &rateLimitedServer{ +func newLimitedServer(upstream storepb.Store_SeriesServer, seriesLimiter SeriesLimiter, chunksLimiter ChunksLimiter) *limitedServer { + return &limitedServer{ Store_SeriesServer: upstream, seriesLimiter: seriesLimiter, samplesLimiter: chunksLimiter, } } -func (i *rateLimitedServer) Send(response *storepb.SeriesResponse) error { +func (i *limitedServer) Send(response *storepb.SeriesResponse) error { series := response.GetSeries() if series == nil { return i.Store_SeriesServer.Send(response) diff --git a/pkg/store/limiter_test.go b/pkg/store/limiter_test.go index 308520c069..97b225e8e6 100644 --- a/pkg/store/limiter_test.go +++ b/pkg/store/limiter_test.go @@ -43,13 +43,13 @@ func TestRateLimitedServer(t *testing.T) { } tests := []struct { name string - limits RateLimits + limits SeriesSelectLimits series []*storepb.SeriesResponse err string }{ { name: "no limits", - limits: RateLimits{ + limits: SeriesSelectLimits{ SeriesPerRequest: 0, SamplesPerRequest: 0, }, @@ -57,7 +57,7 @@ func TestRateLimitedServer(t *testing.T) { }, { name: "series bellow limit", - limits: RateLimits{ + limits: SeriesSelectLimits{ SeriesPerRequest: 3, SamplesPerRequest: 0, }, @@ -65,7 +65,7 @@ func TestRateLimitedServer(t *testing.T) { }, { name: "series over limit", - limits: RateLimits{ + limits: SeriesSelectLimits{ SeriesPerRequest: 2, SamplesPerRequest: 0, }, @@ -74,7 +74,7 @@ func TestRateLimitedServer(t *testing.T) { }, { name: "chunks bellow limit", - limits: RateLimits{ + limits: SeriesSelectLimits{ SeriesPerRequest: 0, SamplesPerRequest: uint64(3 * numSamples * MaxSamplesPerChunk), }, @@ -82,7 +82,7 @@ func TestRateLimitedServer(t *testing.T) { }, { name: "chunks over limit", - limits: RateLimits{ + limits: SeriesSelectLimits{ SeriesPerRequest: 0, SamplesPerRequest: 50, }, @@ -95,7 +95,7 @@ func TestRateLimitedServer(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() - store := NewRateLimitedStoreServer(newStoreServerStub(test.series), prometheus.NewRegistry(), test.limits) + store := NewLimitedStoreServer(newStoreServerStub(test.series), prometheus.NewRegistry(), test.limits) seriesServer := storepb.NewInProcessStream(ctx, 10) err := store.Series(&storepb.SeriesRequest{}, seriesServer) if test.err == "" {