Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Receive: add per request limits for remote write #5527

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
0dbf4ec
Add per request limits for remote write
douglascamata Jul 20, 2022
9ac2340
Remove useless TODO item
douglascamata Jul 20, 2022
7995468
Refactor write request limits test
douglascamata Jul 21, 2022
87cef59
Add write concurrency limit to Receive
douglascamata Jul 21, 2022
6cd031c
Change write limits config option name
douglascamata Jul 21, 2022
83ab7ca
Document remote write concurrenty limit
douglascamata Jul 21, 2022
a6addd3
Merge branch 'main' of https://github.com/thanos-io/thanos into dougl…
douglascamata Jul 21, 2022
ab7fd37
Add changelog entry
douglascamata Jul 21, 2022
e836893
Format docs
douglascamata Jul 21, 2022
3240f69
Extract request limiting logic from handler
douglascamata Jul 21, 2022
1f38552
Add copyright header
douglascamata Jul 21, 2022
77a404b
Add a TODO for per-tenant limits
douglascamata Jul 22, 2022
d00ea15
Add default value and hide the request limit flags
douglascamata Jul 22, 2022
2400aac
Improve TODO comment in request limits
douglascamata Jul 22, 2022
510248a
Update Receive docs after flags wre made hidden
douglascamata Jul 22, 2022
b8943a0
Add note about WIP in Receive request limits doc
douglascamata Jul 22, 2022
7f5c41b
Fix typo in Receive docs
douglascamata Jul 25, 2022
7071c22
Fix help text for concurrent request limit
douglascamata Jul 25, 2022
2cd1014
Use byte unit helpers for improved readability
douglascamata Jul 25, 2022
8561400
Removed check for nil writeGate
douglascamata Jul 25, 2022
7efee5e
Better organize linebreaks
douglascamata Jul 25, 2022
42d9f49
Fix help text for limits hit metric
douglascamata Jul 27, 2022
a98bcc3
Apply some english feedback
douglascamata Jul 27, 2022
cc83217
Improve limits & gates documentationb
douglascamata Jul 28, 2022
a9b5529
Fix import clause
douglascamata Jul 28, 2022
58d362b
Use a 3 node hashring for write limits test
douglascamata Jul 28, 2022
003d9aa
Fix comment
douglascamata Aug 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 35 additions & 16 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,22 +198,25 @@ func runReceive(
)
writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs)
webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{
Writer: writer,
ListenAddress: conf.rwAddress,
Registry: reg,
Endpoint: conf.endpoint,
TenantHeader: conf.tenantHeader,
TenantField: conf.tenantField,
DefaultTenantID: conf.defaultTenantID,
ReplicaHeader: conf.replicaHeader,
ReplicationFactor: conf.replicationFactor,
RelabelConfigs: relabelConfig,
ReceiverMode: receiveMode,
Tracer: tracer,
TLSConfig: rwTLSConfig,
DialOpts: dialOpts,
ForwardTimeout: time.Duration(*conf.forwardTimeout),
TSDBStats: dbs,
Writer: writer,
ListenAddress: conf.rwAddress,
Registry: reg,
Endpoint: conf.endpoint,
TenantHeader: conf.tenantHeader,
TenantField: conf.tenantField,
DefaultTenantID: conf.defaultTenantID,
ReplicaHeader: conf.replicaHeader,
ReplicationFactor: conf.replicationFactor,
RelabelConfigs: relabelConfig,
ReceiverMode: receiveMode,
Tracer: tracer,
TLSConfig: rwTLSConfig,
DialOpts: dialOpts,
ForwardTimeout: time.Duration(*conf.forwardTimeout),
TSDBStats: dbs,
WriteSeriesLimit: conf.writeSeriesLimit,
WriteSamplesLimit: conf.writeSamplesLimit,
WriteRequestSizeLimit: conf.writeRequestSizeLimit,
})

grpcProbe := prober.NewGRPC()
Expand Down Expand Up @@ -763,6 +766,10 @@ type receiveConfig struct {

reqLogConfig *extflag.PathOrContent
relabelConfigPath *extflag.PathOrContent

writeSeriesLimit int
writeSamplesLimit int
writeRequestSizeLimit int
}

func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down Expand Up @@ -853,6 +860,18 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
Default("false").Hidden().BoolVar(&rc.allowOutOfOrderUpload)

rc.reqLogConfig = extkingpin.RegisterRequestLoggingFlags(cmd)

cmd.Flag("receive.request-limits.max-series",
"The maximum amount of series accepted in remote write requests.").
Default("0").IntVar(&rc.writeSeriesLimit)

cmd.Flag("receive.request-limits.max-samples",
"The maximum amount of samples accepted in remote write requests.").
Default("0").IntVar(&rc.writeSamplesLimit)

cmd.Flag("receive.request-limits.max-size-bytes",
"The maximum size (in bytes) of remote write requests.").
Default("0").IntVar(&rc.writeRequestSizeLimit)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to support limits on the tenant level?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, this is something that I plan to add as a follow up. I am also talking with @saswatamcode, because his active series work will also need different per-tenant limits. We will propose that all the limits should be configured using a yaml file to make it simpler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I am curious if we are going the per-tenant limit way, do we still want to keep those flags in this PR? Is it working as the default value if no limit is explicitly configured in the config file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, @yeya24. I would prefer to have everything in the same place, the file. But defining the default value quickly through CLI args seem convenient too. I have no strong opinion on this.

We can maybe support CLI and file for default limit and document that one of them has priority over the other.

}

// determineMode returns the ReceiverMode that this receiver is configured to run in.
Expand Down
23 changes: 23 additions & 0 deletions docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,29 @@ The example content of `hashring.json`:

With such configuration any receive listens for remote write on `<ip>10908/api/v1/receive` and will forward to correct one in hashring if needed for tenancy and replication.

## Limiting

### Request limits

Thanos Receive supports setting limits on the incoming remote write request sizes.
These limits should help you to prevent a single tenant from being able to send
big requests and possibly crash the Receive.

These limits are applied per request and can be configured with the following
command line arguments:

- `--receive.request-limits.max-size-bytes`: the maximum body size.
- `--receive.request-limits.max-series`: the maximum amount of series in a single
remote write request.
- `--receive.request-limits.max-samples`: the maximum amount of samples in a single
remote write request (summed from all series).

Any request above these limits will cause an 413 HTTP response (_Entity Too Large_)
and should not be retried without modifications. It's up to remote write clients to
split up the data and retry or completely drop it.

By default all these limits are disabled.

## Flags

```$ mdox-exec="thanos receive --help"
Expand Down
103 changes: 80 additions & 23 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,22 +83,25 @@ var (

// Options for the web Handler.
type Options struct {
Writer *Writer
ListenAddress string
Registry *prometheus.Registry
TenantHeader string
TenantField string
DefaultTenantID string
ReplicaHeader string
Endpoint string
ReplicationFactor uint64
ReceiverMode ReceiverMode
Tracer opentracing.Tracer
TLSConfig *tls.Config
DialOpts []grpc.DialOption
ForwardTimeout time.Duration
RelabelConfigs []*relabel.Config
TSDBStats TSDBStats
Writer *Writer
ListenAddress string
Registry *prometheus.Registry
TenantHeader string
TenantField string
DefaultTenantID string
ReplicaHeader string
Endpoint string
ReplicationFactor uint64
ReceiverMode ReceiverMode
Tracer opentracing.Tracer
TLSConfig *tls.Config
DialOpts []grpc.DialOption
ForwardTimeout time.Duration
RelabelConfigs []*relabel.Config
TSDBStats TSDBStats
WriteSeriesLimit int
WriteSamplesLimit int
WriteRequestSizeLimit int
}

// Handler serves a Prometheus remote write receiving HTTP endpoint.
Expand All @@ -120,8 +123,11 @@ type Handler struct {
replications *prometheus.CounterVec
replicationFactor prometheus.Gauge

writeSamplesTotal *prometheus.HistogramVec
writeTimeseriesTotal *prometheus.HistogramVec
writeSamplesTotal *prometheus.HistogramVec
writeTimeseriesTotal *prometheus.HistogramVec
writeSamplesLimitHit *prometheus.CounterVec
writeTimeseriesLimitHit *prometheus.CounterVec
writeRequestSizeLimitHit *prometheus.CounterVec
}

func NewHandler(logger log.Logger, o *Options) *Handler {
Expand Down Expand Up @@ -183,6 +189,30 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
Buckets: []float64{10, 50, 100, 500, 1000, 5000, 10000},
}, []string{"code", "tenant"},
),
writeSamplesLimitHit: promauto.With(registerer).NewCounterVec(
prometheus.CounterOpts{
Namespace: "thanos",
Subsystem: "receive",
Help: "The number of times a request was refused due ot hiting the samples limit.",
Name: "write_samples_limit_hit_total",
}, []string{"tenant"},
),
writeTimeseriesLimitHit: promauto.With(registerer).NewCounterVec(
prometheus.CounterOpts{
Namespace: "thanos",
Subsystem: "receive",
Help: "The number of times a request was refused due ot hiting the series limit.",
Name: "write_series_limit_hit_total",
}, []string{"tenant"},
),
writeRequestSizeLimitHit: promauto.With(registerer).NewCounterVec(
prometheus.CounterOpts{
Namespace: "thanos",
Subsystem: "receive",
Help: "The number of times a request was refused due ot hiting the request size limit.",
Name: "write_request_size_limit_hit_total",
}, []string{"tenant"},
),
}

h.forwardRequests.WithLabelValues(labelSuccess)
Expand Down Expand Up @@ -401,6 +431,14 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
// Since this is receive hot path, grow upfront saving allocations and CPU time.
compressed := bytes.Buffer{}
if r.ContentLength >= 0 {
// If the content length is known, we can block the request based on
// max size limit here already and avoid growing the buffer.
sizeLimit := int64(h.options.WriteRequestSizeLimit)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can isolate limiting logic into a dedicated function (apart from h.writeGate) and use that in receiveHTTP. Maybe to a function like requestUnderLimit which takes in request as argument and returns parsed request and bool.

As these are a lot of ifs which would happen every time a remote write request is received, I think benchmarking this logic would be nice! Separate requestUnderLimit function would allow this! 🙂

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me see what I can do to isolate the logic. 👍

I have no worries regarding the ifs, they should add such a tiny amount of time to the handler's execution that the benchmark would show mostly noise. Most of the time spent in limiting logic will be due to the metrics increment and writing the error response back.

If others also worry about this I can try to work out on a benchmark and see whether it's useful.

if sizeLimit > 0 && r.ContentLength > sizeLimit {
h.writeRequestSizeLimitHit.WithLabelValues(tenant)
http.Error(w, "write request too large", http.StatusRequestEntityTooLarge)
return
}
compressed.Grow(int(r.ContentLength))
} else {
compressed.Grow(512)
Expand All @@ -410,14 +448,20 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, errors.Wrap(err, "read compressed request body").Error(), http.StatusInternalServerError)
return
}

reqBuf, err := s2.Decode(nil, compressed.Bytes())
if err != nil {
level.Error(tLogger).Log("msg", "snappy decode error", "err", err)
http.Error(w, errors.Wrap(err, "snappy decode error").Error(), http.StatusBadRequest)
return
}

sizeLimit := int64(h.options.WriteRequestSizeLimit)
if sizeLimit > 0 && int64(len(reqBuf)) > sizeLimit {
h.writeRequestSizeLimitHit.WithLabelValues(tenant)
http.Error(w, "write request too large", http.StatusRequestEntityTooLarge)
return
}

// NOTE: Due to zero copy ZLabels, Labels used from WriteRequests keeps memory
// from the whole request. Ensure that we always copy those when we want to
// store them for longer time.
Expand Down Expand Up @@ -449,6 +493,23 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
return
}

seriesLimit := h.options.WriteSeriesLimit
if seriesLimit > 0 && len(wreq.Timeseries) > seriesLimit {
h.writeTimeseriesLimitHit.WithLabelValues(tenant)
http.Error(w, "too many timeseries", http.StatusRequestEntityTooLarge)
return
}

totalSamples := 0
for _, timeseries := range wreq.Timeseries {
totalSamples += len(timeseries.Samples)
}
samplesLimit := h.options.WriteSamplesLimit
if samplesLimit > 0 && totalSamples > samplesLimit {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason for having separate sizeLimit, seriesLimit and samplesLimit variables here instead of directly using h.options._Limit ? 🙂

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can make the condition inside the if much shorter and easier to read no matter your screen's resolution. 😄

http.Error(w, "too many samples", http.StatusRequestEntityTooLarge)
return
}

// Apply relabeling configs.
h.relabel(&wreq)
if len(wreq.Timeseries) == 0 {
Expand All @@ -475,10 +536,6 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, err.Error(), responseStatusCode)
}
h.writeTimeseriesTotal.WithLabelValues(strconv.Itoa(responseStatusCode), tenant).Observe(float64(len(wreq.Timeseries)))
totalSamples := 0
for _, timeseries := range wreq.Timeseries {
totalSamples += len(timeseries.Samples)
}
h.writeSamplesTotal.WithLabelValues(strconv.Itoa(responseStatusCode), tenant).Observe(float64(totalSamples))
}

Expand Down
102 changes: 102 additions & 0 deletions pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,108 @@ func TestReceiveQuorum(t *testing.T) {
}
}

// TODO(dougalscamata): continue here
douglascamata marked this conversation as resolved.
Show resolved Hide resolved
func TestReceiveWriteRequestLimits(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe an e2e test could help in understanding some real-world client behavior eg Prometheus, avalanche. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add 👍

Copy link
Contributor Author

@douglascamata douglascamata Jul 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@saswatamcode actually, what do you mean with understanding some real-world client behavior eg Prometheus, avalanche.?

Do you mean, for example, testing that a well configured Prometheus or Avalanche will retry on a 429? I don't think it's worth to add an e2e test only for this reason. It means we're testing Prometheus/Avalanche and not Thanos.

I started working on the e2e tests and the further I progress on them the more I think they have no real value beyond what the handler test provides. The only additional value the e2e adds that I can see is to ensure that the CLI args are properly passed down to the request limiting code. Besides this, the request limiting code can be tested without having to add an e2e test.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant if seeing how a hashring would behave, with such request limits and multiple tenants, would be useful?
But yes let's see what maintainers have to say! Unit tests might be sufficient here. 🙂

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Less + simpler tests while having the same verifications level = better, so I think a unit tests will suffice here.

It means we're testing Prometheus/Avalanche and not Thanos.

In some way yes, but also... are we sure those projects have those tests? 🙃

Anyway, let's investigate/create focus tests in separate stream of work

for _, tc := range []struct {
name string
status int
amountSeries int
amountSamples int
appendables []*fakeAppendable
}{
{
name: "Request above limit of series",
status: http.StatusRequestEntityTooLarge,
amountSeries: 21,
appendables: []*fakeAppendable{
{
appender: newFakeAppender(nil, nil, nil),
},
},
},
{
name: "Request under the limit of series",
status: http.StatusOK,
amountSeries: 20,
appendables: []*fakeAppendable{
{
appender: newFakeAppender(nil, nil, nil),
},
},
},
{
name: "Request above limit of samples (series * samples)",
status: http.StatusRequestEntityTooLarge,
amountSeries: 30,
amountSamples: 15,
appendables: []*fakeAppendable{
{
appender: newFakeAppender(nil, nil, nil),
},
},
},
{
name: "Request under the limit of samples (series * samples)",
status: http.StatusOK,
amountSeries: 10,
amountSamples: 2,
appendables: []*fakeAppendable{
{
appender: newFakeAppender(nil, nil, nil),
},
},
},
{
name: "Request above body size limit",
status: http.StatusRequestEntityTooLarge,
amountSeries: 300,
amountSamples: 150,
appendables: []*fakeAppendable{
{
appender: newFakeAppender(nil, nil, nil),
},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
if tc.amountSamples == 0 {
tc.amountSamples = 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason for this mutation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to kind of have 1 as default value for the test scenarios that do not explicitly set amountSamples.

}
handlers, _ := newTestHandlerHashring(tc.appendables, 1)
handler := handlers[0]
handler.options.WriteRequestSizeLimit = 1 * 1024 * 1024
handler.options.WriteSamplesLimit = 200
handler.options.WriteSeriesLimit = 20
tenant := "test"

wreq := &prompb.WriteRequest{
Timeseries: []prompb.TimeSeries{},
}

for i := 0; i < tc.amountSeries; i += 1 {
label := labelpb.ZLabel{Name: "foo", Value: "bar"}
series := prompb.TimeSeries{
Labels: []labelpb.ZLabel{label},
}
for j := 0; j < tc.amountSamples; j += 1 {
sample := prompb.Sample{Value: float64(j), Timestamp: int64(j)}
series.Samples = append(series.Samples, sample)
}
wreq.Timeseries = append(wreq.Timeseries, series)
}

// Test that the correct status is returned.
rec, err := makeRequest(handler, tenant, wreq)
if err != nil {
t.Fatalf("handler %d: unexpectedly failed making HTTP request: %v", tc.status, err)
}
if rec.Code != tc.status {
t.Errorf("handler: got unexpected HTTP status code: expected %d, got %d; body: %s", tc.status, rec.Code, rec.Body.String())
}
})
}
}

func TestReceiveWithConsistencyDelay(t *testing.T) {
appenderErrFn := func() error { return errors.New("failed to get appender") }
conflictErrFn := func() error { return storage.ErrOutOfBounds }
Expand Down