Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Receive: add per request limits for remote write #5527

Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
0dbf4ec
Add per request limits for remote write
douglascamata Jul 20, 2022
9ac2340
Remove useless TODO item
douglascamata Jul 20, 2022
7995468
Refactor write request limits test
douglascamata Jul 21, 2022
87cef59
Add write concurrency limit to Receive
douglascamata Jul 21, 2022
6cd031c
Change write limits config option name
douglascamata Jul 21, 2022
83ab7ca
Document remote write concurrenty limit
douglascamata Jul 21, 2022
a6addd3
Merge branch 'main' of https://github.com/thanos-io/thanos into dougl…
douglascamata Jul 21, 2022
ab7fd37
Add changelog entry
douglascamata Jul 21, 2022
e836893
Format docs
douglascamata Jul 21, 2022
3240f69
Extract request limiting logic from handler
douglascamata Jul 21, 2022
1f38552
Add copyright header
douglascamata Jul 21, 2022
77a404b
Add a TODO for per-tenant limits
douglascamata Jul 22, 2022
d00ea15
Add default value and hide the request limit flags
douglascamata Jul 22, 2022
2400aac
Improve TODO comment in request limits
douglascamata Jul 22, 2022
510248a
Update Receive docs after flags wre made hidden
douglascamata Jul 22, 2022
b8943a0
Add note about WIP in Receive request limits doc
douglascamata Jul 22, 2022
7f5c41b
Fix typo in Receive docs
douglascamata Jul 25, 2022
7071c22
Fix help text for concurrent request limit
douglascamata Jul 25, 2022
2cd1014
Use byte unit helpers for improved readability
douglascamata Jul 25, 2022
8561400
Removed check for nil writeGate
douglascamata Jul 25, 2022
7efee5e
Better organize linebreaks
douglascamata Jul 25, 2022
42d9f49
Fix help text for limits hit metric
douglascamata Jul 27, 2022
a98bcc3
Apply some english feedback
douglascamata Jul 27, 2022
cc83217
Improve limits & gates documentationb
douglascamata Jul 28, 2022
a9b5529
Fix import clause
douglascamata Jul 28, 2022
58d362b
Use a 3 node hashring for write limits test
douglascamata Jul 28, 2022
003d9aa
Fix comment
douglascamata Aug 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#5475](https://github.com/thanos-io/thanos/pull/5475) Compact/Store: Added `--block-files-concurrency` allowing to configure number of go routines for download/upload block files during compaction.
- [#5470](https://github.com/thanos-io/thanos/pull/5470) Receive: Implement exposing TSDB stats for all tenants
- [#5493](https://github.com/thanos-io/thanos/pull/5493) Compact: Added `--compact.blocks-fetch-concurrency` allowing to configure number of go routines for download blocks during compactions.
- [#5527](https://github.com/thanos-io/thanos/pull/5527) Receive: Add per request limits for remote write.

### Changed

Expand Down
64 changes: 48 additions & 16 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,22 +198,26 @@ func runReceive(
)
writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs)
webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{
Writer: writer,
ListenAddress: conf.rwAddress,
Registry: reg,
Endpoint: conf.endpoint,
TenantHeader: conf.tenantHeader,
TenantField: conf.tenantField,
DefaultTenantID: conf.defaultTenantID,
ReplicaHeader: conf.replicaHeader,
ReplicationFactor: conf.replicationFactor,
RelabelConfigs: relabelConfig,
ReceiverMode: receiveMode,
Tracer: tracer,
TLSConfig: rwTLSConfig,
DialOpts: dialOpts,
ForwardTimeout: time.Duration(*conf.forwardTimeout),
TSDBStats: dbs,
Writer: writer,
ListenAddress: conf.rwAddress,
Registry: reg,
Endpoint: conf.endpoint,
TenantHeader: conf.tenantHeader,
TenantField: conf.tenantField,
DefaultTenantID: conf.defaultTenantID,
ReplicaHeader: conf.replicaHeader,
ReplicationFactor: conf.replicationFactor,
RelabelConfigs: relabelConfig,
ReceiverMode: receiveMode,
Tracer: tracer,
TLSConfig: rwTLSConfig,
DialOpts: dialOpts,
ForwardTimeout: time.Duration(*conf.forwardTimeout),
TSDBStats: dbs,
WriteSeriesLimit: conf.writeSeriesLimit,
WriteSamplesLimit: conf.writeSamplesLimit,
WriteRequestSizeLimit: conf.writeRequestSizeLimit,
WriteRequestConcurrencyLimit: conf.writeRequestConcurrencyLimit,
})

grpcProbe := prober.NewGRPC()
Expand Down Expand Up @@ -763,6 +767,11 @@ type receiveConfig struct {

reqLogConfig *extflag.PathOrContent
relabelConfigPath *extflag.PathOrContent

writeSeriesLimit int64
writeSamplesLimit int64
writeRequestSizeLimit int64
writeRequestConcurrencyLimit int
}

func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down Expand Up @@ -853,6 +862,29 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
Default("false").Hidden().BoolVar(&rc.allowOutOfOrderUpload)

rc.reqLogConfig = extkingpin.RegisterRequestLoggingFlags(cmd)

// TODO(douglascamata): allow all these limits to be configured per tenant
douglascamata marked this conversation as resolved.
Show resolved Hide resolved
// and move the configuration to a file. Then this is done, remove the
// "hidden" modifier on all these flags.
cmd.Flag("receive.write-request-limits.max-series",
"The maximum amount of series accepted in remote write requests."+
"The default is no limit, represented by 0.").
Default("0").Hidden().Int64Var(&rc.writeSeriesLimit)

cmd.Flag("receive.write-request-limits.max-samples",
"The maximum amount of samples accepted in remote write requests."+
"The default is no limit, represented by 0.").
Default("0").Hidden().Int64Var(&rc.writeSamplesLimit)

cmd.Flag("receive.write-request-limits.max-size-bytes",
"The maximum size (in bytes) of remote write requests."+
"The default is no limit, represented by 0.").
Default("0").Hidden().Int64Var(&rc.writeRequestSizeLimit)

cmd.Flag("receive.write-request-limits.max-concurrency",
"The maximum amount of remote write requests that will be concurrently processed while others wait."+
"The default is no limit, represented by 0.").
Default("0").Hidden().IntVar(&rc.writeRequestConcurrencyLimit)
}

// determineMode returns the ReceiverMode that this receiver is configured to run in.
Expand Down
19 changes: 19 additions & 0 deletions docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,25 @@ The example content of `hashring.json`:

With such configuration any receive listens for remote write on `<ip>10908/api/v1/receive` and will forward to correct one in hashring if needed for tenancy and replication.

## Limiting

### Request limits (work in progress)

Thanos Receive supports setting limits on the incoming remote write request sizes. These limits should help you to prevent a single tenant from being able to send big requests and possibly crash the Receive.

These limits are applied per request and can be configured with the following command line arguments:

- `--receive.write-request-limits.max-size-bytes`: the maximum body size.
- `--receive.write-request-limits.max-concurrency`: the maximum amount of remote write requests that will be concurrently processed while others wait.
matej-g marked this conversation as resolved.
Show resolved Hide resolved
- `--receive.write-request-limits.max-series`: the maximum amount of series in a single remote write request.
- `--receive.write-request-limits.max-samples`: the maximum amount of samples in a single remote write request (summed from all series).

Any request above these limits will cause an 413 HTTP response (*Entity Too Large*) and should not be retried without modifications. It's up to remote write clients to split up the data and retry or completely drop it.
matej-g marked this conversation as resolved.
Show resolved Hide resolved

By default all these limits are disabled.

**IMPORTANT**: this feature is a work-in-progress and it might change in the near future, i.e. configuration might move to a file to allow easy configuration of different request limits per tenant.

## Flags

```$ mdox-exec="thanos receive --help"
Expand Down
94 changes: 73 additions & 21 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (

"github.com/thanos-io/thanos/pkg/api"
statusapi "github.com/thanos-io/thanos/pkg/api/status"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/logging"

"github.com/go-kit/log"
Expand Down Expand Up @@ -83,22 +85,26 @@ var (

// Options for the web Handler.
type Options struct {
Writer *Writer
ListenAddress string
Registry *prometheus.Registry
TenantHeader string
TenantField string
DefaultTenantID string
ReplicaHeader string
Endpoint string
ReplicationFactor uint64
ReceiverMode ReceiverMode
Tracer opentracing.Tracer
TLSConfig *tls.Config
DialOpts []grpc.DialOption
ForwardTimeout time.Duration
RelabelConfigs []*relabel.Config
TSDBStats TSDBStats
Writer *Writer
ListenAddress string
Registry *prometheus.Registry
TenantHeader string
TenantField string
DefaultTenantID string
ReplicaHeader string
Endpoint string
ReplicationFactor uint64
ReceiverMode ReceiverMode
Tracer opentracing.Tracer
TLSConfig *tls.Config
DialOpts []grpc.DialOption
ForwardTimeout time.Duration
RelabelConfigs []*relabel.Config
TSDBStats TSDBStats
WriteSeriesLimit int64
WriteSamplesLimit int64
WriteRequestSizeLimit int64
WriteRequestConcurrencyLimit int
}

// Handler serves a Prometheus remote write receiving HTTP endpoint.
Expand All @@ -122,6 +128,9 @@ type Handler struct {

writeSamplesTotal *prometheus.HistogramVec
writeTimeseriesTotal *prometheus.HistogramVec

writeGate gate.Gate
requestLimiter requestLimiter
}

func NewHandler(logger log.Logger, o *Options) *Handler {
Expand All @@ -147,6 +156,13 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
Max: 30 * time.Second,
Jitter: true,
},
writeGate: gate.NewNoop(),
requestLimiter: newRequestLimiter(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of extending the existing handler, is it possible to make the limiter a middleware that wraps the handler? This could be cleaner to maintain long term.

Copy link
Contributor Author

@douglascamata douglascamata Jul 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately I don't think that's a good idea, because then the limiter middleware will have to do work that is done again in the main handler:

  • Checking content length header is easy, but when it's not there we have to read the request body into a byte slice to check its size, which involves trying a snappy decompression too.
  • Unmarshalling the request body and looping over the data to count timeseries and samples.

I wanted to avoid doing extra work at all costs to keep the "hot path" of Receive fast.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, that's a good point. Well initially I thought the middleware would be on this handler which already has the decoded protobuf request, but here we don't have the raw body correct?

Also if I may ask, what is the practical use of setting limits on the request body? As an admin, I am not sure I would know what to set this limit to. I can understand the impact of limiting series or samples, but I would find it hard to know how to configure the body size limit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, that's a good point. Well initially I thought the middleware would be on this handler which already has the decoded protobuf request, but here we don't have the raw body correct?

Got it. When it gets to that handler (it does only forward request across the hashring) almost all the "heavy" work is done and some of it will be wasted. Examples:

  • If we have the content length header, we could have hit the request size limit and returned the error. But we still grow the copy buffer, copy body to buffer, try to snappy-decode it, unmarshall the protobuffer, apply relabel rules, and theeeen block the request.
  • If we hit a timeseries/sample limit, we will apply relabel rules, and then block the request (this seems fine though).

Also if I may ask, what is the practical use of setting limits on the request body? As an admin, I am not sure I would know what to set this limit to. I can understand the impact of limiting series or samples, but I would find it hard to know how to configure the body size limit.

The request size limit is more geared towards networking concerns, I would say. It prevents clients from writing very big requests (even when the other limits aren't configured or hit, i.e. very long label name/values, or when clients are writing "junk"). This can still cause OOMs or possibly create a situation similar to a slow client attack.

This makes me wonder why Receive's remote write endpoint doesn't have a configurable server-side timeout... possibly could add this in a follow up too.

o.WriteRequestSizeLimit,
o.WriteSeriesLimit,
o.WriteSamplesLimit,
registerer,
),
forwardRequests: promauto.With(registerer).NewCounterVec(
prometheus.CounterOpts{
Name: "thanos_receive_forward_requests_total",
Expand Down Expand Up @@ -185,6 +201,13 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
),
}

if o.WriteRequestConcurrencyLimit > 0 {
h.writeGate = gate.New(
extprom.WrapRegistererWithPrefix("thanos_receive_write_request_concurrent_", registerer),
o.WriteRequestConcurrencyLimit,
)
}

h.forwardRequests.WithLabelValues(labelSuccess)
h.forwardRequests.WithLabelValues(labelError)
h.replications.WithLabelValues(labelSuccess)
Expand Down Expand Up @@ -397,10 +420,25 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {

tLogger := log.With(h.logger, "tenant", tenant)

tracing.DoInSpan(r.Context(), "receive_write_gate_ismyturn", func(ctx context.Context) {
err = h.writeGate.Start(r.Context())
})
if err != nil {
level.Error(tLogger).Log("err", err, "msg", "internal server error")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

defer h.writeGate.Done()

// ioutil.ReadAll dynamically adjust the byte slice for read data, starting from 512B.
// Since this is receive hot path, grow upfront saving allocations and CPU time.
compressed := bytes.Buffer{}
if r.ContentLength >= 0 {
if !h.requestLimiter.AllowSizeBytes(tenant, r.ContentLength) {
http.Error(w, "write request too large", http.StatusRequestEntityTooLarge)
return
}
compressed.Grow(int(r.ContentLength))
} else {
compressed.Grow(512)
Expand All @@ -410,14 +448,18 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, errors.Wrap(err, "read compressed request body").Error(), http.StatusInternalServerError)
return
}

reqBuf, err := s2.Decode(nil, compressed.Bytes())
if err != nil {
level.Error(tLogger).Log("msg", "snappy decode error", "err", err)
http.Error(w, errors.Wrap(err, "snappy decode error").Error(), http.StatusBadRequest)
return
}

if !h.requestLimiter.AllowSizeBytes(tenant, int64(len(reqBuf))) {
http.Error(w, "write request too large", http.StatusRequestEntityTooLarge)
return
}

// NOTE: Due to zero copy ZLabels, Labels used from WriteRequests keeps memory
// from the whole request. Ensure that we always copy those when we want to
// store them for longer time.
Expand Down Expand Up @@ -449,6 +491,20 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
return
}

if !h.requestLimiter.AllowSeries(tenant, int64(len(wreq.Timeseries))) {
http.Error(w, "too many timeseries", http.StatusRequestEntityTooLarge)
return
}

totalSamples := 0
for _, timeseries := range wreq.Timeseries {
totalSamples += len(timeseries.Samples)
}
if !h.requestLimiter.AllowSamples(tenant, int64(totalSamples)) {
http.Error(w, "too many samples", http.StatusRequestEntityTooLarge)
return
}

// Apply relabeling configs.
h.relabel(&wreq)
if len(wreq.Timeseries) == 0 {
Expand All @@ -475,10 +531,6 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, err.Error(), responseStatusCode)
}
h.writeTimeseriesTotal.WithLabelValues(strconv.Itoa(responseStatusCode), tenant).Observe(float64(len(wreq.Timeseries)))
totalSamples := 0
for _, timeseries := range wreq.Timeseries {
totalSamples += len(timeseries.Samples)
}
h.writeSamplesTotal.WithLabelValues(strconv.Itoa(responseStatusCode), tenant).Observe(float64(totalSamples))
}

Expand Down
80 changes: 80 additions & 0 deletions pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"
"time"

"github.com/alecthomas/units"
"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
Expand Down Expand Up @@ -690,6 +691,85 @@ func TestReceiveQuorum(t *testing.T) {
}
}

func TestReceiveWriteRequestLimits(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe an e2e test could help in understanding some real-world client behavior eg Prometheus, avalanche. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add 👍

Copy link
Contributor Author

@douglascamata douglascamata Jul 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@saswatamcode actually, what do you mean with understanding some real-world client behavior eg Prometheus, avalanche.?

Do you mean, for example, testing that a well configured Prometheus or Avalanche will retry on a 429? I don't think it's worth to add an e2e test only for this reason. It means we're testing Prometheus/Avalanche and not Thanos.

I started working on the e2e tests and the further I progress on them the more I think they have no real value beyond what the handler test provides. The only additional value the e2e adds that I can see is to ensure that the CLI args are properly passed down to the request limiting code. Besides this, the request limiting code can be tested without having to add an e2e test.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant if seeing how a hashring would behave, with such request limits and multiple tenants, would be useful?
But yes let's see what maintainers have to say! Unit tests might be sufficient here. 🙂

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Less + simpler tests while having the same verifications level = better, so I think a unit tests will suffice here.

It means we're testing Prometheus/Avalanche and not Thanos.

In some way yes, but also... are we sure those projects have those tests? 🙃

Anyway, let's investigate/create focus tests in separate stream of work

for _, tc := range []struct {
name string
status int
amountSeries int
amountSamples int
}{
{
name: "Request above limit of series",
status: http.StatusRequestEntityTooLarge,
amountSeries: 21,
},
{
name: "Request under the limit of series",
status: http.StatusOK,
amountSeries: 20,
},
{
name: "Request above limit of samples (series * samples)",
status: http.StatusRequestEntityTooLarge,
amountSeries: 30,
amountSamples: 15,
},
{
name: "Request under the limit of samples (series * samples)",
status: http.StatusOK,
amountSeries: 10,
amountSamples: 2,
},
{
name: "Request above body size limit",
status: http.StatusRequestEntityTooLarge,
amountSeries: 300,
amountSamples: 150,
},
} {
t.Run(tc.name, func(t *testing.T) {
if tc.amountSamples == 0 {
tc.amountSamples = 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason for this mutation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to kind of have 1 as default value for the test scenarios that do not explicitly set amountSamples.

}

appendables := []*fakeAppendable{
{
appender: newFakeAppender(nil, nil, nil),
},
}
handlers, _ := newTestHandlerHashring(appendables, 1)
handler := handlers[0]
handler.requestLimiter = newRequestLimiter(int64(1*units.Megabyte), 20, 200, nil)
tenant := "test"

wreq := &prompb.WriteRequest{
Timeseries: []prompb.TimeSeries{},
}

for i := 0; i < tc.amountSeries; i += 1 {
label := labelpb.ZLabel{Name: "foo", Value: "bar"}
series := prompb.TimeSeries{
Labels: []labelpb.ZLabel{label},
}
for j := 0; j < tc.amountSamples; j += 1 {
sample := prompb.Sample{Value: float64(j), Timestamp: int64(j)}
series.Samples = append(series.Samples, sample)
}
wreq.Timeseries = append(wreq.Timeseries, series)
}

// Test that the correct status is returned.
rec, err := makeRequest(handler, tenant, wreq)
if err != nil {
t.Fatalf("handler %d: unexpectedly failed making HTTP request: %v", tc.status, err)
}
if rec.Code != tc.status {
t.Errorf("handler: got unexpected HTTP status code: expected %d, got %d; body: %s", tc.status, rec.Code, rec.Body.String())
}
})
}
}

func TestReceiveWithConsistencyDelay(t *testing.T) {
appenderErrFn := func() error { return errors.New("failed to get appender") }
conflictErrFn := func() error { return storage.ErrOutOfBounds }
Expand Down
Loading