Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Receive: add per request limits for remote write #5527

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
0dbf4ec
Add per request limits for remote write
douglascamata Jul 20, 2022
9ac2340
Remove useless TODO item
douglascamata Jul 20, 2022
7995468
Refactor write request limits test
douglascamata Jul 21, 2022
87cef59
Add write concurrency limit to Receive
douglascamata Jul 21, 2022
6cd031c
Change write limits config option name
douglascamata Jul 21, 2022
83ab7ca
Document remote write concurrenty limit
douglascamata Jul 21, 2022
a6addd3
Merge branch 'main' of https://github.com/thanos-io/thanos into dougl…
douglascamata Jul 21, 2022
ab7fd37
Add changelog entry
douglascamata Jul 21, 2022
e836893
Format docs
douglascamata Jul 21, 2022
3240f69
Extract request limiting logic from handler
douglascamata Jul 21, 2022
1f38552
Add copyright header
douglascamata Jul 21, 2022
77a404b
Add a TODO for per-tenant limits
douglascamata Jul 22, 2022
d00ea15
Add default value and hide the request limit flags
douglascamata Jul 22, 2022
2400aac
Improve TODO comment in request limits
douglascamata Jul 22, 2022
510248a
Update Receive docs after flags wre made hidden
douglascamata Jul 22, 2022
b8943a0
Add note about WIP in Receive request limits doc
douglascamata Jul 22, 2022
7f5c41b
Fix typo in Receive docs
douglascamata Jul 25, 2022
7071c22
Fix help text for concurrent request limit
douglascamata Jul 25, 2022
2cd1014
Use byte unit helpers for improved readability
douglascamata Jul 25, 2022
8561400
Removed check for nil writeGate
douglascamata Jul 25, 2022
7efee5e
Better organize linebreaks
douglascamata Jul 25, 2022
42d9f49
Fix help text for limits hit metric
douglascamata Jul 27, 2022
a98bcc3
Apply some english feedback
douglascamata Jul 27, 2022
cc83217
Improve limits & gates documentationb
douglascamata Jul 28, 2022
a9b5529
Fix import clause
douglascamata Jul 28, 2022
58d362b
Use a 3 node hashring for write limits test
douglascamata Jul 28, 2022
003d9aa
Fix comment
douglascamata Aug 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#5475](https://github.com/thanos-io/thanos/pull/5475) Compact/Store: Added `--block-files-concurrency` allowing to configure number of go routines for download/upload block files during compaction.
- [#5470](https://github.com/thanos-io/thanos/pull/5470) Receive: Implement exposing TSDB stats for all tenants
- [#5493](https://github.com/thanos-io/thanos/pull/5493) Compact: Added `--compact.blocks-fetch-concurrency` allowing to configure number of go routines for download blocks during compactions.
- [#5527](https://github.com/thanos-io/thanos/pull/5527) Receive: Add per request limits for remote write.

### Changed

Expand Down
64 changes: 48 additions & 16 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,22 +198,26 @@ func runReceive(
)
writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs)
webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{
Writer: writer,
ListenAddress: conf.rwAddress,
Registry: reg,
Endpoint: conf.endpoint,
TenantHeader: conf.tenantHeader,
TenantField: conf.tenantField,
DefaultTenantID: conf.defaultTenantID,
ReplicaHeader: conf.replicaHeader,
ReplicationFactor: conf.replicationFactor,
RelabelConfigs: relabelConfig,
ReceiverMode: receiveMode,
Tracer: tracer,
TLSConfig: rwTLSConfig,
DialOpts: dialOpts,
ForwardTimeout: time.Duration(*conf.forwardTimeout),
TSDBStats: dbs,
Writer: writer,
ListenAddress: conf.rwAddress,
Registry: reg,
Endpoint: conf.endpoint,
TenantHeader: conf.tenantHeader,
TenantField: conf.tenantField,
DefaultTenantID: conf.defaultTenantID,
ReplicaHeader: conf.replicaHeader,
ReplicationFactor: conf.replicationFactor,
RelabelConfigs: relabelConfig,
ReceiverMode: receiveMode,
Tracer: tracer,
TLSConfig: rwTLSConfig,
DialOpts: dialOpts,
ForwardTimeout: time.Duration(*conf.forwardTimeout),
TSDBStats: dbs,
WriteSeriesLimit: conf.writeSeriesLimit,
WriteSamplesLimit: conf.writeSamplesLimit,
WriteRequestSizeLimit: conf.writeRequestSizeLimit,
WriteRequestConcurrencyLimit: conf.writeRequestConcurrencyLimit,
})

grpcProbe := prober.NewGRPC()
Expand Down Expand Up @@ -763,6 +767,11 @@ type receiveConfig struct {

reqLogConfig *extflag.PathOrContent
relabelConfigPath *extflag.PathOrContent

writeSeriesLimit int64
writeSamplesLimit int64
writeRequestSizeLimit int64
writeRequestConcurrencyLimit int
}

func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down Expand Up @@ -853,6 +862,29 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
Default("false").Hidden().BoolVar(&rc.allowOutOfOrderUpload)

rc.reqLogConfig = extkingpin.RegisterRequestLoggingFlags(cmd)

// TODO(douglascamata): Allow all these limits to be configured per tenant
// and move the configuration to a file. Then this is done, remove the
// "hidden" modifier on all these flags.
cmd.Flag("receive.write-request-limits.max-series",
"The maximum amount of series accepted in remote write requests."+
"The default is no limit, represented by 0.").
Default("0").Hidden().Int64Var(&rc.writeSeriesLimit)

cmd.Flag("receive.write-request-limits.max-samples",
"The maximum amount of samples accepted in remote write requests."+
"The default is no limit, represented by 0.").
Default("0").Hidden().Int64Var(&rc.writeSamplesLimit)

cmd.Flag("receive.write-request-limits.max-size-bytes",
"The maximum size (in bytes) of remote write requests."+
"The default is no limit, represented by 0.").
Default("0").Hidden().Int64Var(&rc.writeRequestSizeLimit)

cmd.Flag("receive.write-request-limits.max-concurrency",
"The maximum amount of remote write requests that will be concurrently processed while others wait."+
"The default is no limit, represented by 0.").
Default("0").Hidden().IntVar(&rc.writeRequestConcurrencyLimit)
}

// determineMode returns the ReceiverMode that this receiver is configured to run in.
Expand Down
38 changes: 38 additions & 0 deletions docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,44 @@ The example content of `hashring.json`:

With such configuration any receive listens for remote write on `<ip>10908/api/v1/receive` and will forward to correct one in hashring if needed for tenancy and replication.

## Limits & gates (experimental)

Thanos Receive has some limits and gates that can be configured to control resource usage. Here's the difference between limits and gates:

- **Limits**: if a request hits any configured limit the client will receive an error response from the server.
- **Gates**: if a request hits a gate without capacity it will wait until the gate's capacity is replenished to be processed. It doesn't trigger an error response from the server.

**IMPORTANT**: this feature is experimental and a work-in-progres. It might change in the near future, i.e. configuration might move to a file (to allow easy configuration of different request limits per tenant) or its structure could change.

### Request limits

Thanos Receive supports setting limits on the incoming remote write request sizes. These limits should help you to prevent a single tenant from being able to send big requests and possibly crash the Receive.

These limits are applied per request and can be configured with the following command line arguments:

- `--receive.write-request-limits.max-size-bytes`: the maximum body size.
- `--receive.write-request-limits.max-series`: the maximum amount of series in a single remote write request.
- `--receive.write-request-limits.max-samples`: the maximum amount of samples in a single remote write request (summed from all series).

Any request above these limits will cause an 413 HTTP response (*Entity Too Large*) and should not be retried without modifications.

Currently a 413 HTTP response will cause data loss at the client, as none of them (Prometheus included) will break down 413 responses into smaller requests. The recommendation is to monitor these errors in the client and contact the owners of your Receive instance for more information on its configured limits.

Future work that can improve this scenario:

- Proper handling of 413 responses in clients, given Receive can somehow communicate which limit was reached.
- Including in the 413 response which are the current limits that apply to the tenant.

By default all these limits are disabled.

## Request gates

The available request gates in Thanos Receive can be configured with the following command line arguments:

- `--receive.write-request-limits.max-concurrency`: the maximum amount of remote write requests that will be concurrently worked on. Any request request that would exceed this limit will be accepted, but wait until the gate allows it to be processed.

By default all gates are disabled.

## Flags

```$ mdox-exec="thanos receive --help"
Expand Down
94 changes: 73 additions & 21 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (

"github.com/thanos-io/thanos/pkg/api"
statusapi "github.com/thanos-io/thanos/pkg/api/status"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/logging"

"github.com/go-kit/log"
Expand Down Expand Up @@ -83,22 +85,26 @@ var (

// Options for the web Handler.
type Options struct {
Writer *Writer
ListenAddress string
Registry *prometheus.Registry
TenantHeader string
TenantField string
DefaultTenantID string
ReplicaHeader string
Endpoint string
ReplicationFactor uint64
ReceiverMode ReceiverMode
Tracer opentracing.Tracer
TLSConfig *tls.Config
DialOpts []grpc.DialOption
ForwardTimeout time.Duration
RelabelConfigs []*relabel.Config
TSDBStats TSDBStats
Writer *Writer
ListenAddress string
Registry *prometheus.Registry
TenantHeader string
TenantField string
DefaultTenantID string
ReplicaHeader string
Endpoint string
ReplicationFactor uint64
ReceiverMode ReceiverMode
Tracer opentracing.Tracer
TLSConfig *tls.Config
DialOpts []grpc.DialOption
ForwardTimeout time.Duration
RelabelConfigs []*relabel.Config
TSDBStats TSDBStats
WriteSeriesLimit int64
WriteSamplesLimit int64
WriteRequestSizeLimit int64
WriteRequestConcurrencyLimit int
}

// Handler serves a Prometheus remote write receiving HTTP endpoint.
Expand All @@ -122,6 +128,9 @@ type Handler struct {

writeSamplesTotal *prometheus.HistogramVec
writeTimeseriesTotal *prometheus.HistogramVec

writeGate gate.Gate
requestLimiter requestLimiter
}

func NewHandler(logger log.Logger, o *Options) *Handler {
Expand All @@ -147,6 +156,13 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
Max: 30 * time.Second,
Jitter: true,
},
writeGate: gate.NewNoop(),
requestLimiter: newRequestLimiter(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of extending the existing handler, is it possible to make the limiter a middleware that wraps the handler? This could be cleaner to maintain long term.

Copy link
Contributor Author

@douglascamata douglascamata Jul 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately I don't think that's a good idea, because then the limiter middleware will have to do work that is done again in the main handler:

  • Checking content length header is easy, but when it's not there we have to read the request body into a byte slice to check its size, which involves trying a snappy decompression too.
  • Unmarshalling the request body and looping over the data to count timeseries and samples.

I wanted to avoid doing extra work at all costs to keep the "hot path" of Receive fast.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, that's a good point. Well initially I thought the middleware would be on this handler which already has the decoded protobuf request, but here we don't have the raw body correct?

Also if I may ask, what is the practical use of setting limits on the request body? As an admin, I am not sure I would know what to set this limit to. I can understand the impact of limiting series or samples, but I would find it hard to know how to configure the body size limit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, that's a good point. Well initially I thought the middleware would be on this handler which already has the decoded protobuf request, but here we don't have the raw body correct?

Got it. When it gets to that handler (it does only forward request across the hashring) almost all the "heavy" work is done and some of it will be wasted. Examples:

  • If we have the content length header, we could have hit the request size limit and returned the error. But we still grow the copy buffer, copy body to buffer, try to snappy-decode it, unmarshall the protobuffer, apply relabel rules, and theeeen block the request.
  • If we hit a timeseries/sample limit, we will apply relabel rules, and then block the request (this seems fine though).

Also if I may ask, what is the practical use of setting limits on the request body? As an admin, I am not sure I would know what to set this limit to. I can understand the impact of limiting series or samples, but I would find it hard to know how to configure the body size limit.

The request size limit is more geared towards networking concerns, I would say. It prevents clients from writing very big requests (even when the other limits aren't configured or hit, i.e. very long label name/values, or when clients are writing "junk"). This can still cause OOMs or possibly create a situation similar to a slow client attack.

This makes me wonder why Receive's remote write endpoint doesn't have a configurable server-side timeout... possibly could add this in a follow up too.

o.WriteRequestSizeLimit,
o.WriteSeriesLimit,
o.WriteSamplesLimit,
registerer,
),
forwardRequests: promauto.With(registerer).NewCounterVec(
prometheus.CounterOpts{
Name: "thanos_receive_forward_requests_total",
Expand Down Expand Up @@ -185,6 +201,13 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
),
}

if o.WriteRequestConcurrencyLimit > 0 {
h.writeGate = gate.New(
extprom.WrapRegistererWithPrefix("thanos_receive_write_request_concurrent_", registerer),
o.WriteRequestConcurrencyLimit,
)
}

h.forwardRequests.WithLabelValues(labelSuccess)
h.forwardRequests.WithLabelValues(labelError)
h.replications.WithLabelValues(labelSuccess)
Expand Down Expand Up @@ -397,10 +420,25 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {

tLogger := log.With(h.logger, "tenant", tenant)

tracing.DoInSpan(r.Context(), "receive_write_gate_ismyturn", func(ctx context.Context) {
err = h.writeGate.Start(r.Context())
})
if err != nil {
level.Error(tLogger).Log("err", err, "msg", "internal server error")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

defer h.writeGate.Done()

// ioutil.ReadAll dynamically adjust the byte slice for read data, starting from 512B.
// Since this is receive hot path, grow upfront saving allocations and CPU time.
compressed := bytes.Buffer{}
if r.ContentLength >= 0 {
if !h.requestLimiter.AllowSizeBytes(tenant, r.ContentLength) {
http.Error(w, "write request too large", http.StatusRequestEntityTooLarge)
return
}
compressed.Grow(int(r.ContentLength))
} else {
compressed.Grow(512)
Expand All @@ -410,14 +448,18 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, errors.Wrap(err, "read compressed request body").Error(), http.StatusInternalServerError)
return
}

reqBuf, err := s2.Decode(nil, compressed.Bytes())
if err != nil {
level.Error(tLogger).Log("msg", "snappy decode error", "err", err)
http.Error(w, errors.Wrap(err, "snappy decode error").Error(), http.StatusBadRequest)
return
}

if !h.requestLimiter.AllowSizeBytes(tenant, int64(len(reqBuf))) {
http.Error(w, "write request too large", http.StatusRequestEntityTooLarge)
return
}

// NOTE: Due to zero copy ZLabels, Labels used from WriteRequests keeps memory
// from the whole request. Ensure that we always copy those when we want to
// store them for longer time.
Expand Down Expand Up @@ -449,6 +491,20 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
return
}

if !h.requestLimiter.AllowSeries(tenant, int64(len(wreq.Timeseries))) {
http.Error(w, "too many timeseries", http.StatusRequestEntityTooLarge)
return
}

totalSamples := 0
for _, timeseries := range wreq.Timeseries {
totalSamples += len(timeseries.Samples)
}
if !h.requestLimiter.AllowSamples(tenant, int64(totalSamples)) {
http.Error(w, "too many samples", http.StatusRequestEntityTooLarge)
return
}

// Apply relabeling configs.
h.relabel(&wreq)
if len(wreq.Timeseries) == 0 {
Expand All @@ -475,10 +531,6 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, err.Error(), responseStatusCode)
}
h.writeTimeseriesTotal.WithLabelValues(strconv.Itoa(responseStatusCode), tenant).Observe(float64(len(wreq.Timeseries)))
totalSamples := 0
for _, timeseries := range wreq.Timeseries {
totalSamples += len(timeseries.Samples)
}
h.writeSamplesTotal.WithLabelValues(strconv.Itoa(responseStatusCode), tenant).Observe(float64(totalSamples))
}

Expand Down
Loading