Skip to content

Commit

Permalink
Code review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski committed Feb 8, 2023
1 parent 0f5ddff commit 011dbb3
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 35 deletions.
1 change: 0 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
### Changed

- [#6035](https://github.com/thanos-io/thanos/pull/6035) Replicate: Support all types of matchers to match blocks for replication. Change matcher parameter from string slice to a single string.
>>>>>>> 0dbde4f1 (Add CHANGELOG entry)

### Fixed

Expand Down
6 changes: 3 additions & 3 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func registerQuery(app *extkingpin.App) {
queryTelemetrySamplesQuantiles := cmd.Flag("query.telemetry.request-samples-quantiles", "The quantiles for exporting metrics about the samples count quantiles.").Default("100", "1000", "10000", "100000", "1000000").Int64List()
queryTelemetrySeriesQuantiles := cmd.Flag("query.telemetry.request-series-seconds-quantiles", "The quantiles for exporting metrics about the series count quantiles.").Default("10", "100", "1000", "10000", "100000").Int64List()

var storeRateLimits store.RateLimits
var storeRateLimits store.SeriesSelectLimits
storeRateLimits.RegisterFlags(cmd)

cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
Expand Down Expand Up @@ -401,7 +401,7 @@ func runQuery(
queryTelemetrySamplesQuantiles []int64,
queryTelemetrySeriesQuantiles []int64,
promqlEngine promqlEngineType,
storeRateLimits store.RateLimits,
storeRateLimits store.SeriesSelectLimits,
) error {
if alertQueryURL == "" {
lastColon := strings.LastIndex(httpBindAddr, ":")
Expand Down Expand Up @@ -764,7 +764,7 @@ func runQuery(
)

grpcAPI := apiv1.NewGRPCAPI(time.Now, queryReplicaLabels, queryableCreator, queryEngine, lookbackDeltaCreator, instantDefaultMaxSourceResolution)
storeServer := store.NewRateLimitedStoreServer(store.NewInstrumentedStoreServer(reg, proxy), reg, storeRateLimits)
storeServer := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, proxy), reg, storeRateLimits)
s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe,
grpcserver.WithServer(apiv1.RegisterQueryServer(grpcAPI)),
grpcserver.WithServer(store.RegisterStoreServer(storeServer, logger)),
Expand Down
4 changes: 2 additions & 2 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func runReceive(
0,
store.LazyRetrieval,
)
mts := store.NewRateLimitedStoreServer(store.NewInstrumentedStoreServer(reg, proxy), reg, conf.storeRateLimits)
mts := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, proxy), reg, conf.storeRateLimits)
rw := store.ReadWriteTSDBStore{
StoreServer: mts,
WriteableStoreServer: webHandler,
Expand Down Expand Up @@ -800,7 +800,7 @@ type receiveConfig struct {
relabelConfigPath *extflag.PathOrContent

writeLimitsConfig *extflag.PathOrContent
storeRateLimits store.RateLimits
storeRateLimits store.SeriesSelectLimits
}

func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down
4 changes: 2 additions & 2 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ type ruleConfig struct {
dataDir string
lset labels.Labels
ignoredLabelNames []string
storeRateLimits store.RateLimits
storeRateLimits store.SeriesSelectLimits
}

func (rc *ruleConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down Expand Up @@ -636,7 +636,7 @@ func runRule(
return nil
}),
)
storeServer := store.NewRateLimitedStoreServer(store.NewInstrumentedStoreServer(reg, tsdbStore), reg, conf.storeRateLimits)
storeServer := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, tsdbStore), reg, conf.storeRateLimits)
options = append(options, grpcserver.WithServer(store.RegisterStoreServer(storeServer, logger)))
}

Expand Down
4 changes: 2 additions & 2 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func runSidecar(
info.WithMetricMetadataInfoFunc(),
)

storeServer := store.NewRateLimitedStoreServer(store.NewInstrumentedStoreServer(reg, promStore), reg, conf.storeRateLimits)
storeServer := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, promStore), reg, conf.storeRateLimits)
s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe,
grpcserver.WithServer(store.RegisterStoreServer(storeServer, logger)),
grpcserver.WithServer(rules.RegisterRulesServer(rules.NewPrometheus(conf.prometheus.url, c, m.Labels))),
Expand Down Expand Up @@ -484,7 +484,7 @@ type sidecarConfig struct {
objStore extflag.PathOrContent
shipper shipperConfig
limitMinTime thanosmodel.TimeOrDurationValue
storeRateLimits store.RateLimits
storeRateLimits store.SeriesSelectLimits
}

func (sc *sidecarConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type storeConfig struct {
indexCacheSizeBytes units.Base2Bytes
chunkPoolSize units.Base2Bytes
seriesBatchSize int
storeRateLimits store.RateLimits
storeRateLimits store.SeriesSelectLimits
maxDownloadedBytes units.Base2Bytes
maxConcurrency int
component component.StoreAPI
Expand Down
39 changes: 22 additions & 17 deletions pkg/store/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,64 +100,69 @@ func NewBytesLimiterFactory(limit units.Base2Bytes) BytesLimiterFactory {
}
}

type RateLimits struct {
// SeriesSelectLimits are limits applied against individual Series calls.
type SeriesSelectLimits struct {
SeriesPerRequest uint64
SamplesPerRequest uint64
}

func (l *RateLimits) RegisterFlags(cmd extkingpin.FlagClause) {
func (l *SeriesSelectLimits) RegisterFlags(cmd extkingpin.FlagClause) {
cmd.Flag("store.grpc.series-limit", "The maximum series allowed for a single Series request. The Series call fails if this limit is exceeded. 0 means no limit.").Default("0").Uint64Var(&l.SeriesPerRequest)
cmd.Flag("store.grpc.samples-limit", "The maximum samples allowed for a single Series request, The Series call fails if this limit is exceeded. 0 means no limit. NOTE: For efficiency the limit is internally implemented as 'chunks limit' considering each chunk contains a maximum of 120 samples.").Default("0").Uint64Var(&l.SamplesPerRequest)
}

// rateLimitedStoreServer is a storepb.StoreServer that can apply rate limits against Series requests.
type rateLimitedStoreServer struct {
var _ storepb.StoreServer = &limitedStoreServer{}

// limitedStoreServer is a storepb.StoreServer that can apply series and sample limits against individual Series requests.
type limitedStoreServer struct {
storepb.StoreServer
newSeriesLimiter SeriesLimiterFactory
newSamplesLimiter ChunksLimiterFactory
failedRequestsCounter *prometheus.CounterVec
}

// NewRateLimitedStoreServer creates a new rateLimitedStoreServer.
func NewRateLimitedStoreServer(store storepb.StoreServer, reg prometheus.Registerer, rateLimits RateLimits) storepb.StoreServer {
return &rateLimitedStoreServer{
// NewLimitedStoreServer creates a new limitedStoreServer.
func NewLimitedStoreServer(store storepb.StoreServer, reg prometheus.Registerer, selectLimits SeriesSelectLimits) storepb.StoreServer {
return &limitedStoreServer{
StoreServer: store,
newSeriesLimiter: NewSeriesLimiterFactory(rateLimits.SeriesPerRequest),
newSamplesLimiter: NewChunksLimiterFactory(rateLimits.SamplesPerRequest),
newSeriesLimiter: NewSeriesLimiterFactory(selectLimits.SeriesPerRequest),
newSamplesLimiter: NewChunksLimiterFactory(selectLimits.SamplesPerRequest),
failedRequestsCounter: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "thanos_store_selects_dropped_total",
Help: "Number of select queries that were dropped due to configured limits.",
}, []string{"reason"}),
}
}

func (s *rateLimitedStoreServer) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
func (s *limitedStoreServer) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
seriesLimiter := s.newSeriesLimiter(s.failedRequestsCounter.WithLabelValues("series"))
chunksLimiter := s.newSamplesLimiter(s.failedRequestsCounter.WithLabelValues("chunks"))
rateLimitedSrv := newRateLimitedServer(srv, seriesLimiter, chunksLimiter)
if err := s.StoreServer.Series(req, rateLimitedSrv); err != nil {
limitedSrv := newLimitedServer(srv, seriesLimiter, chunksLimiter)
if err := s.StoreServer.Series(req, limitedSrv); err != nil {
return err
}

return nil
}

// rateLimitedServer is a storepb.Store_SeriesServer that tracks statistics about sent series.
type rateLimitedServer struct {
var _ storepb.Store_SeriesServer = &limitedServer{}

// limitedServer is a storepb.Store_SeriesServer that tracks statistics about sent series.
type limitedServer struct {
storepb.Store_SeriesServer
seriesLimiter SeriesLimiter
samplesLimiter ChunksLimiter
}

func newRateLimitedServer(upstream storepb.Store_SeriesServer, seriesLimiter SeriesLimiter, chunksLimiter ChunksLimiter) *rateLimitedServer {
return &rateLimitedServer{
func newLimitedServer(upstream storepb.Store_SeriesServer, seriesLimiter SeriesLimiter, chunksLimiter ChunksLimiter) *limitedServer {
return &limitedServer{
Store_SeriesServer: upstream,
seriesLimiter: seriesLimiter,
samplesLimiter: chunksLimiter,
}
}

func (i *rateLimitedServer) Send(response *storepb.SeriesResponse) error {
func (i *limitedServer) Send(response *storepb.SeriesResponse) error {
series := response.GetSeries()
if series == nil {
return i.Store_SeriesServer.Send(response)
Expand Down
14 changes: 7 additions & 7 deletions pkg/store/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,29 +43,29 @@ func TestRateLimitedServer(t *testing.T) {
}
tests := []struct {
name string
limits RateLimits
limits SeriesSelectLimits
series []*storepb.SeriesResponse
err string
}{
{
name: "no limits",
limits: RateLimits{
limits: SeriesSelectLimits{
SeriesPerRequest: 0,
SamplesPerRequest: 0,
},
series: series,
},
{
name: "series bellow limit",
limits: RateLimits{
limits: SeriesSelectLimits{
SeriesPerRequest: 3,
SamplesPerRequest: 0,
},
series: series,
},
{
name: "series over limit",
limits: RateLimits{
limits: SeriesSelectLimits{
SeriesPerRequest: 2,
SamplesPerRequest: 0,
},
Expand All @@ -74,15 +74,15 @@ func TestRateLimitedServer(t *testing.T) {
},
{
name: "chunks bellow limit",
limits: RateLimits{
limits: SeriesSelectLimits{
SeriesPerRequest: 0,
SamplesPerRequest: uint64(3 * numSamples * MaxSamplesPerChunk),
},
series: series,
},
{
name: "chunks over limit",
limits: RateLimits{
limits: SeriesSelectLimits{
SeriesPerRequest: 0,
SamplesPerRequest: 50,
},
Expand All @@ -95,7 +95,7 @@ func TestRateLimitedServer(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

store := NewRateLimitedStoreServer(newStoreServerStub(test.series), prometheus.NewRegistry(), test.limits)
store := NewLimitedStoreServer(newStoreServerStub(test.series), prometheus.NewRegistry(), test.limits)
seriesServer := storepb.NewInProcessStream(ctx, 10)
err := store.Series(&storepb.SeriesRequest{}, seriesServer)
if test.err == "" {
Expand Down

0 comments on commit 011dbb3

Please sign in to comment.