From 976fc7eed703e0cae69266802eafcb925e4ae2a3 Mon Sep 17 00:00:00 2001 From: utukj Date: Tue, 18 Oct 2022 16:57:19 +0100 Subject: [PATCH] Revert "Receive: Reload tenant limit configuration on file change (#5673)" This reverts commit 24e1cc0faf219049174020955f8e3c8251106d87. Signed-off-by: utukj --- CHANGELOG.md | 1 - cmd/thanos/receive.go | 46 ++--- docs/components/receive.md | 2 +- go.mod | 8 +- go.sum | 4 +- pkg/extkingpin/path_content_reloader.go | 128 ------------ pkg/extkingpin/path_content_reloader_test.go | 105 ---------- pkg/receive/handler.go | 22 +- pkg/receive/handler_test.go | 38 ++-- pkg/receive/limiter.go | 189 ++---------------- pkg/receive/limiter_config.go | 4 +- pkg/receive/limiter_config_test.go | 6 +- pkg/receive/limiter_test.go | 100 --------- pkg/receive/request_limiter.go | 31 ++- pkg/receive/request_limiter_test.go | 20 +- pkg/receive/testdata/limits.yaml | 22 -- .../limits_config/invalid_limits.yaml | 17 -- 17 files changed, 97 insertions(+), 646 deletions(-) delete mode 100644 pkg/extkingpin/path_content_reloader.go delete mode 100644 pkg/extkingpin/path_content_reloader_test.go delete mode 100644 pkg/receive/limiter_test.go delete mode 100644 pkg/receive/testdata/limits.yaml delete mode 100644 pkg/receive/testdata/limits_config/invalid_limits.yaml diff --git a/CHANGELOG.md b/CHANGELOG.md index 6e1d2143c3..9ed82d6525 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,7 +29,6 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#5734](https://github.com/thanos-io/thanos/pull/5734) Store: Support disable block viewer UI. - [#5411](https://github.com/thanos-io/thanos/pull/5411) Tracing: Add OpenTelemetry Protocol exporter. - [#5779](https://github.com/thanos-io/thanos/pull/5779) Objstore: Support specifying S3 storage class. -- [#5673](https://github.com/thanos-io/thanos/pull/5673) Receive: Reload tenant limit configuration on file change. ### Changed diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index d86b560983..5c47b91dd5 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -192,19 +192,6 @@ func runReceive( return errors.Wrap(err, "parse relabel configuration") } - dbs := receive.NewMultiTSDB( - conf.dataDir, - logger, - reg, - tsdbOpts, - lset, - conf.tenantLabelName, - bkt, - conf.allowOutOfOrderUpload, - hashFunc, - ) - writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs) - var limitsConfig *receive.RootLimitsConfig if conf.limitsConfig != nil { limitsContentYaml, err := conf.limitsConfig.Content() @@ -216,11 +203,20 @@ func runReceive( return errors.Wrap(err, "parse limit configuration") } } - limiter, err := receive.NewLimiter(conf.limitsConfig, reg, receiveMode, log.With(logger, "component", "receive-limiter")) - if err != nil { - return errors.Wrap(err, "creating limiter") - } + limiter := receive.NewLimiter(limitsConfig, reg, receiveMode, log.With(logger, "component", "receive-limiter")) + dbs := receive.NewMultiTSDB( + conf.dataDir, + logger, + reg, + tsdbOpts, + lset, + conf.tenantLabelName, + bkt, + conf.allowOutOfOrderUpload, + hashFunc, + ) + writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs) webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{ Writer: writer, ListenAddress: conf.rwAddress, @@ -403,22 +399,6 @@ func runReceive( }) } - { - if limiter.CanReload() { - ctx, cancel := context.WithCancel(context.Background()) - g.Add(func() error { - level.Debug(logger).Log("msg", "limits config initialized with file watcher.") - if err := limiter.StartConfigReloader(ctx); err != nil { - return err - } - <-ctx.Done() - return nil - }, func(err error) { - cancel() - }) - } - } - level.Info(logger).Log("msg", "starting receiver") return nil } diff --git a/docs/components/receive.md b/docs/components/receive.md index ef4e39e35e..6fa13938e9 100644 --- a/docs/components/receive.md +++ b/docs/components/receive.md @@ -86,7 +86,7 @@ Thanos Receive has some limits and gates that can be configured to control resou To configure the gates and limits you can use one of the two options: -- `--receive.limits-config-file=`: where `` is the path to the YAML file. Any modification to the indicated file will trigger a configuration reload. If the updated configuration is invalid an error will be logged and it won't replace the previous valid configuration. +- `--receive.limits-config-file=`: where `` is the path to the YAML file. - `--receive.limits-config=`: where `` is the content of YAML file. By default all the limits and gates are **disabled**. diff --git a/go.mod b/go.mod index bee3e97fe7..13743c8020 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,7 @@ require ( github.com/davecgh/go-spew v1.1.1 github.com/dustin/go-humanize v1.0.0 github.com/efficientgo/e2e v0.13.1-0.20220923082810-8fa9daa8af8a - github.com/efficientgo/tools/extkingpin v0.0.0-20220817170617-6c25e3b627dd + github.com/efficientgo/tools/extkingpin v0.0.0-20220801101838-3312908f6a9d github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb github.com/fatih/structtag v1.2.0 github.com/felixge/fgprof v0.9.2 @@ -108,7 +108,6 @@ require ( require ( github.com/efficientgo/core v1.0.0-rc.0 - github.com/efficientgo/tools/core v0.0.0-20220817170617-6c25e3b627dd github.com/minio/sha256-simd v1.0.0 ) @@ -128,7 +127,10 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.10.0 ) -require go.opentelemetry.io/contrib/propagators/autoprop v0.34.0 +require ( + github.com/efficientgo/tools/core v0.0.0-20220817170617-6c25e3b627dd + go.opentelemetry.io/contrib/propagators/autoprop v0.34.0 +) require ( github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.32.3 // indirect diff --git a/go.sum b/go.sum index 97fc0d0411..5ee9bab6be 100644 --- a/go.sum +++ b/go.sum @@ -252,8 +252,8 @@ github.com/efficientgo/e2e v0.13.1-0.20220923082810-8fa9daa8af8a h1:cnJajqeh/Hjv github.com/efficientgo/e2e v0.13.1-0.20220923082810-8fa9daa8af8a/go.mod h1:Hi+sz0REtlhVZ8zcdeTC3j6LUEEpJpPtNjOaOKuNcgI= github.com/efficientgo/tools/core v0.0.0-20220817170617-6c25e3b627dd h1:svR6KxSP1xiPw10RN4Pd7g6BAVkEcNN628PAqZH31mM= github.com/efficientgo/tools/core v0.0.0-20220817170617-6c25e3b627dd/go.mod h1:OmVcnJopJL8d3X3sSXTiypGoUSgFq1aDGmlrdi9dn/M= -github.com/efficientgo/tools/extkingpin v0.0.0-20220817170617-6c25e3b627dd h1:VaYzzXeUbC5fVheskcKVNOyJMEYD+HgrJNzIAg/mRIM= -github.com/efficientgo/tools/extkingpin v0.0.0-20220817170617-6c25e3b627dd/go.mod h1:ZV0utlglOczUWv3ih2AbqPSoLoFzdplUYxwV62eZi6Q= +github.com/efficientgo/tools/extkingpin v0.0.0-20220801101838-3312908f6a9d h1:WZV/mrUyKS9w9r+Jdw+zq/tdGAb5LwB+H37EkMLhEMA= +github.com/efficientgo/tools/extkingpin v0.0.0-20220801101838-3312908f6a9d/go.mod h1:ZV0utlglOczUWv3ih2AbqPSoLoFzdplUYxwV62eZi6Q= github.com/elastic/go-sysinfo v1.1.1/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6eh0ikPT9F0= github.com/elastic/go-sysinfo v1.8.1 h1:4Yhj+HdV6WjbCRgGdZpPJ8lZQlXZLKDAeIkmQ/VRvi4= github.com/elastic/go-sysinfo v1.8.1/go.mod h1:JfllUnzoQV/JRYymbH3dO1yggI3mV2oTKSXsDHM+uIM= diff --git a/pkg/extkingpin/path_content_reloader.go b/pkg/extkingpin/path_content_reloader.go deleted file mode 100644 index 68c2cd252c..0000000000 --- a/pkg/extkingpin/path_content_reloader.go +++ /dev/null @@ -1,128 +0,0 @@ -// Copyright (c) The Thanos Authors. -// Licensed under the Apache License 2.0. - -package extkingpin - -import ( - "context" - "fmt" - "os" - "path" - "path/filepath" - "time" - - "github.com/fsnotify/fsnotify" - "github.com/go-kit/log" - "github.com/go-kit/log/level" - "github.com/pkg/errors" -) - -type fileContent interface { - Content() ([]byte, error) - Path() string -} - -// PathContentReloader starts a file watcher that monitors the file indicated by fileContent.Path() and runs -// reloadFunc whenever a change is detected. -// A debounce timer can be configured via opts to handle situations where many "write" events are received together or -// a "create" event is followed up by a "write" event, for example. Files will be effectively reloaded at the latest -// after 2 times the debounce timer. By default the debouncer timer is 1 second. -// To ensure renames and deletes are properly handled, the file watcher is put at the file's parent folder. See -// https://github.com/fsnotify/fsnotify/issues/214 for more details. -func PathContentReloader(ctx context.Context, fileContent fileContent, logger log.Logger, reloadFunc func(), debounceTime time.Duration) error { - filePath, err := filepath.Abs(fileContent.Path()) - if err != nil { - return errors.Wrap(err, "getting absolute file path") - } - - watcher, err := fsnotify.NewWatcher() - if filePath == "" { - level.Debug(logger).Log("msg", "no path detected for config reload") - } - if err != nil { - return errors.Wrap(err, "creating file watcher") - } - go func() { - var reloadTimer *time.Timer - if debounceTime != 0 { - reloadTimer = time.AfterFunc(debounceTime, func() { - reloadFunc() - level.Debug(logger).Log("msg", "configuration reloaded after debouncing") - }) - } - defer watcher.Close() - for { - select { - case <-ctx.Done(): - if reloadTimer != nil { - reloadTimer.Stop() - } - return - case event := <-watcher.Events: - // fsnotify sometimes sends a bunch of events without name or operation. - // It's unclear what they are and why they are sent - filter them out. - if event.Name == "" { - break - } - // We are watching the file's parent folder (more details on this is done can be found below), but are - // only interested in changed to the target file. Discard every other file as quickly as possible. - if event.Name != filePath { - break - } - // We only react to files being written or created. - // On chmod or remove we have nothing to do. - // On rename we have the old file name (not useful). A create event for the new file will come later. - if event.Op&fsnotify.Write == 0 && event.Op&fsnotify.Create == 0 { - break - } - level.Debug(logger).Log("msg", fmt.Sprintf("change detected for %s", filePath), "eventName", event.Name, "eventOp", event.Op) - if reloadTimer != nil { - reloadTimer.Reset(debounceTime) - } - case err := <-watcher.Errors: - level.Error(logger).Log("msg", "watcher error", "error", err) - } - } - }() - // We watch the file's parent folder and not the file itself to better handle DELETE and RENAME events. Check - // https://github.com/fsnotify/fsnotify/issues/214 for more details. - if err := watcher.Add(path.Dir(filePath)); err != nil { - return errors.Wrapf(err, "adding path %s to file watcher", filePath) - } - return nil -} - -type staticPathContent struct { - content []byte - path string -} - -var _ fileContent = (*staticPathContent)(nil) - -// Content returns the cached content. -func (t *staticPathContent) Content() ([]byte, error) { - return t.content, nil -} - -// Path returns the path to the file that contains the content. -func (t *staticPathContent) Path() string { - return t.path -} - -// NewStaticPathContent creates a new content that can be used to serve a static configuration. It copies the -// configuration from `fromPath` into `destPath` to avoid confusion with file watchers. -func NewStaticPathContent(fromPath string) (*staticPathContent, error) { - content, err := os.ReadFile(fromPath) - if err != nil { - return nil, errors.Wrapf(err, "could not load test content: %s", fromPath) - } - return &staticPathContent{content, fromPath}, nil -} - -// Rewrite rewrites the file backing this staticPathContent and swaps the local content cache. The file writing -// is needed to trigger the file system monitor. -func (t *staticPathContent) Rewrite(newContent []byte) error { - t.content = newContent - // Write the file to ensure possible file watcher reloaders get triggered. - return os.WriteFile(t.path, newContent, 0666) -} diff --git a/pkg/extkingpin/path_content_reloader_test.go b/pkg/extkingpin/path_content_reloader_test.go deleted file mode 100644 index fb20f83d5c..0000000000 --- a/pkg/extkingpin/path_content_reloader_test.go +++ /dev/null @@ -1,105 +0,0 @@ -// Copyright (c) The Thanos Authors. -// Licensed under the Apache License 2.0. - -package extkingpin - -import ( - "context" - "os" - "path" - "sync" - "testing" - "time" - - "github.com/go-kit/log" - "github.com/thanos-io/thanos/pkg/testutil" -) - -func TestPathContentReloader(t *testing.T) { - type args struct { - runSteps func(t *testing.T, testFile string, pathContent *staticPathContent) - } - tests := []struct { - name string - args args - wantReloads int - }{ - { - name: "Many operations, only rewrite triggers one reload", - args: args{ - runSteps: func(t *testing.T, testFile string, pathContent *staticPathContent) { - testutil.Ok(t, os.Chmod(testFile, 0777)) - testutil.Ok(t, os.Remove(testFile)) - testutil.Ok(t, pathContent.Rewrite([]byte("test modified"))) - }, - }, - wantReloads: 1, - }, - { - name: "Many operations, only rename triggers one reload", - args: args{ - runSteps: func(t *testing.T, testFile string, pathContent *staticPathContent) { - testutil.Ok(t, os.Chmod(testFile, 0777)) - testutil.Ok(t, os.Rename(testFile, testFile+".tmp")) - testutil.Ok(t, os.Rename(testFile+".tmp", testFile)) - }, - }, - wantReloads: 1, - }, - { - name: "Many operations, two rewrites trigger two reloads", - args: args{ - runSteps: func(t *testing.T, testFile string, pathContent *staticPathContent) { - testutil.Ok(t, os.Chmod(testFile, 0777)) - testutil.Ok(t, os.Remove(testFile)) - testutil.Ok(t, pathContent.Rewrite([]byte("test modified"))) - time.Sleep(2 * time.Second) - testutil.Ok(t, pathContent.Rewrite([]byte("test modified again"))) - }, - }, - wantReloads: 1, - }, - { - name: "Chmod doesn't trigger reload", - args: args{ - runSteps: func(t *testing.T, testFile string, pathContent *staticPathContent) { - testutil.Ok(t, os.Chmod(testFile, 0777)) - }, - }, - wantReloads: 0, - }, - { - name: "Remove doesn't trigger reload", - args: args{ - runSteps: func(t *testing.T, testFile string, pathContent *staticPathContent) { - testutil.Ok(t, os.Remove(testFile)) - }, - }, - wantReloads: 0, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - testFile := path.Join(t.TempDir(), "test") - testutil.Ok(t, os.WriteFile(testFile, []byte("test"), 0666)) - pathContent, err := NewStaticPathContent(testFile) - testutil.Ok(t, err) - - wg := &sync.WaitGroup{} - wg.Add(tt.wantReloads) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - reloadCount := 0 - err = PathContentReloader(ctx, pathContent, log.NewLogfmtLogger(os.Stdout), func() { - reloadCount++ - wg.Done() - }, 100*time.Millisecond) - testutil.Ok(t, err) - - tt.args.runSteps(t, testFile, pathContent) - wg.Wait() - testutil.Equals(t, tt.wantReloads, reloadCount) - }) - } -} diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 12afb752b8..156bb74566 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -17,6 +17,10 @@ import ( "sync" "time" + "github.com/thanos-io/thanos/pkg/api" + statusapi "github.com/thanos-io/thanos/pkg/api/status" + "github.com/thanos-io/thanos/pkg/logging" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/gogo/protobuf/proto" @@ -31,9 +35,6 @@ import ( "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" - "github.com/thanos-io/thanos/pkg/api" - statusapi "github.com/thanos-io/thanos/pkg/api/status" - "github.com/thanos-io/thanos/pkg/logging" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -98,7 +99,7 @@ type Options struct { ForwardTimeout time.Duration RelabelConfigs []*relabel.Config TSDBStats TSDBStats - Limiter *Limiter + Limiter *limiter } // Handler serves a Prometheus remote write receiving HTTP endpoint. @@ -123,7 +124,7 @@ type Handler struct { writeSamplesTotal *prometheus.HistogramVec writeTimeseriesTotal *prometheus.HistogramVec - Limiter *Limiter + limiter *limiter } func NewHandler(logger log.Logger, o *Options) *Handler { @@ -149,7 +150,7 @@ func NewHandler(logger log.Logger, o *Options) *Handler { Max: 30 * time.Second, Jitter: true, }, - Limiter: o.Limiter, + limiter: o.Limiter, forwardRequests: promauto.With(registerer).NewCounterVec( prometheus.CounterOpts{ Name: "thanos_receive_forward_requests_total", @@ -406,18 +407,17 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { tLogger := log.With(h.logger, "tenant", tenant) - writeGate := h.Limiter.WriteGate() tracing.DoInSpan(r.Context(), "receive_write_gate_ismyturn", func(ctx context.Context) { - err = writeGate.Start(r.Context()) + err = h.limiter.writeGate.Start(r.Context()) }) - defer writeGate.Done() if err != nil { level.Error(tLogger).Log("err", err, "msg", "internal server error") http.Error(w, err.Error(), http.StatusInternalServerError) return } + defer h.limiter.writeGate.Done() - under, err := h.Limiter.HeadSeriesLimiter.isUnderLimit(tenant) + under, err := h.limiter.HeadSeriesLimiter.isUnderLimit(tenant) if err != nil { level.Error(tLogger).Log("msg", "error while limiting", "err", err.Error()) } @@ -428,7 +428,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { return } - requestLimiter := h.Limiter.RequestLimiter() + requestLimiter := h.limiter.requestLimiter // io.ReadAll dynamically adjust the byte slice for read data, starting from 512B. // Since this is receive hot path, grow upfront saving allocations and CPU time. compressed := bytes.Buffer{} diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index 4a2a536038..44076de141 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -13,7 +13,6 @@ import ( "net/http" "net/http/httptest" "os" - "path" "path/filepath" "runtime" "runtime/pprof" @@ -22,8 +21,6 @@ import ( "testing" "time" - "gopkg.in/yaml.v3" - "github.com/alecthomas/units" "github.com/go-kit/log" "github.com/gogo/protobuf/proto" @@ -43,7 +40,6 @@ import ( "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/errutil" - "github.com/thanos-io/thanos/pkg/extkingpin" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" @@ -366,7 +362,6 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin }, } - limiter, _ := NewLimiter(NewNopConfig(), nil, RouterIngestor, log.NewNopLogger()) for i := range appendables { h := NewHandler(nil, &Options{ TenantHeader: DefaultTenantHeader, @@ -374,7 +369,7 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin ReplicationFactor: replicationFactor, ForwardTimeout: 5 * time.Second, Writer: NewWriter(log.NewNopLogger(), newFakeTenantAppendable(appendables[i])), - Limiter: limiter, + Limiter: NewLimiter(nil, nil, RouterIngestor, nil), }) handlers = append(handlers, h) h.peers = peers @@ -780,28 +775,23 @@ func TestReceiveWriteRequestLimits(t *testing.T) { } handlers, _ := newTestHandlerHashring(appendables, 3) handler := handlers[0] - tenant := "test" - tenantConfig, err := yaml.Marshal(&RootLimitsConfig{ - WriteLimits: WriteLimitsConfig{ - TenantsLimits: TenantsWriteLimitsConfig{ - tenant: &WriteLimitConfig{ - RequestLimits: NewEmptyRequestLimitsConfig(). - SetSizeBytesLimit(int64(1 * units.Megabyte)). - SetSeriesLimit(20). - SetSamplesLimit(200), + handler.limiter = NewLimiter( + &RootLimitsConfig{ + WriteLimits: WriteLimitsConfig{ + TenantsLimits: TenantsWriteLimitsConfig{ + tenant: &WriteLimitConfig{ + RequestLimits: newEmptyRequestLimitsConfig(). + SetSizeBytesLimit(int64(1 * units.Megabyte)). + SetSeriesLimit(20). + SetSamplesLimit(200), + }, }, }, }, - }) - if err != nil { - t.Fatal("handler: failed to generate limit configuration") - } - tmpLimitsPath := path.Join(t.TempDir(), "limits.yaml") - testutil.Ok(t, os.WriteFile(tmpLimitsPath, tenantConfig, 0666)) - limitConfig, _ := extkingpin.NewStaticPathContent(tmpLimitsPath) - handler.Limiter, _ = NewLimiter( - limitConfig, nil, RouterIngestor, log.NewNopLogger(), + nil, + RouterIngestor, + log.NewNopLogger(), ) wreq := &prompb.WriteRequest{ diff --git a/pkg/receive/limiter.go b/pkg/receive/limiter.go index ff5bbe3199..bc3c4d8358 100644 --- a/pkg/receive/limiter.go +++ b/pkg/receive/limiter.go @@ -5,204 +5,59 @@ package receive import ( "context" - "fmt" - "sync" - "time" "github.com/go-kit/log" - "github.com/go-kit/log/level" - "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/thanos-io/thanos/pkg/extkingpin" - - "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/gate" ) -// Limiter is responsible for managing the configuration and initialization of -// different types that apply limits to the Receive instance. -type Limiter struct { - sync.RWMutex - requestLimiter requestLimiter - HeadSeriesLimiter headSeriesLimiter - writeGate gate.Gate - registerer prometheus.Registerer - configPathOrContent fileContent - logger log.Logger - configReloadCounter prometheus.Counter - configReloadFailedCounter prometheus.Counter - receiverMode ReceiverMode -} - -// headSeriesLimiter encompasses active/head series limiting logic. -type headSeriesLimiter interface { - QueryMetaMonitoring(context.Context) error - isUnderLimit(tenant string) (bool, error) +type limiter struct { + requestLimiter requestLimiter + writeGate gate.Gate + HeadSeriesLimiter headSeriesLimiter } +// requestLimiter encompasses logic for limiting remote write requests. type requestLimiter interface { AllowSizeBytes(tenant string, contentLengthBytes int64) bool AllowSeries(tenant string, amount int64) bool AllowSamples(tenant string, amount int64) bool } -// fileContent is an interface to avoid a direct dependency on kingpin or extkingpin. -type fileContent interface { - Content() ([]byte, error) - Path() string +// headSeriesLimiter encompasses active/head series limiting logic. +type headSeriesLimiter interface { + QueryMetaMonitoring(context.Context) error + isUnderLimit(tenant string) (bool, error) } -// NewLimiter creates a new *Limiter given a configuration and prometheus -// registerer. -func NewLimiter(configFile fileContent, reg prometheus.Registerer, r ReceiverMode, logger log.Logger) (*Limiter, error) { - limiter := &Limiter{ +func NewLimiter(root *RootLimitsConfig, reg prometheus.Registerer, r ReceiverMode, logger log.Logger) *limiter { + limiter := &limiter{ writeGate: gate.NewNoop(), requestLimiter: &noopRequestLimiter{}, HeadSeriesLimiter: NewNopSeriesLimit(), - logger: logger, - receiverMode: r, - } - - if reg != nil { - limiter.registerer = NewUnRegisterer(reg) - limiter.configReloadCounter = promauto.With(limiter.registerer).NewCounter( - prometheus.CounterOpts{ - Namespace: "thanos", - Subsystem: "receive", - Name: "limits_config_reload_total", - Help: "How many times the limit configuration was reloaded", - }, - ) - limiter.configReloadFailedCounter = promauto.With(limiter.registerer).NewCounter( - prometheus.CounterOpts{ - Namespace: "thanos", - Subsystem: "receive", - Name: "limits_config_reload_err_total", - Help: "How many times the limit configuration failed to reload.", - }, - ) - } - - if configFile == nil { - return limiter, nil - } - - limiter.configPathOrContent = configFile - if err := limiter.loadConfig(); err != nil { - return nil, errors.Wrap(err, "load tenant limits config") - } - - return limiter, nil -} - -// StartConfigReloader starts the automatic configuration reloader based off of -// the file indicated by pathOrContent. It starts a Go routine in the given -// *run.Group. -func (l *Limiter) StartConfigReloader(ctx context.Context) error { - if !l.CanReload() { - return nil } - - return extkingpin.PathContentReloader(ctx, l.configPathOrContent, l.logger, func() { - level.Info(l.logger).Log("msg", "reloading limit config") - if err := l.loadConfig(); err != nil { - if failedReload := l.configReloadCounter; failedReload != nil { - failedReload.Inc() - } - errMsg := fmt.Sprintf("error reloading tenant limits config from %s", l.configPathOrContent.Path()) - level.Error(l.logger).Log("msg", errMsg, "err", err) - } - if reloadCounter := l.configReloadCounter; reloadCounter != nil { - reloadCounter.Inc() - } - }, 1*time.Second) -} - -func (l *Limiter) CanReload() bool { - if l.configPathOrContent == nil { - return false + if root == nil { + return limiter } - if l.configPathOrContent.Path() == "" { - return false - } - return true -} -func (l *Limiter) loadConfig() error { - config, err := ParseLimitConfigContent(l.configPathOrContent) - if err != nil { - return err - } - l.Lock() - defer l.Unlock() - maxWriteConcurrency := config.WriteLimits.GlobalLimits.MaxConcurrency + maxWriteConcurrency := root.WriteLimits.GlobalLimits.MaxConcurrency if maxWriteConcurrency > 0 { - l.writeGate = gate.New( + limiter.writeGate = gate.New( extprom.WrapRegistererWithPrefix( "thanos_receive_write_request_concurrent_", - l.registerer, + reg, ), int(maxWriteConcurrency), ) } - l.requestLimiter = newConfigRequestLimiter( - l.registerer, - &config.WriteLimits, - ) - seriesLimitSupported := (l.receiverMode == RouterOnly || l.receiverMode == RouterIngestor) && (len(config.WriteLimits.TenantsLimits) != 0 || config.WriteLimits.DefaultLimits.HeadSeriesLimit != 0) - if seriesLimitSupported { - l.HeadSeriesLimiter = NewHeadSeriesLimit(config.WriteLimits, l.registerer, l.logger) - } - return nil -} + limiter.requestLimiter = newConfigRequestLimiter(reg, &root.WriteLimits) -// RequestLimiter is a safe getter for the request limiter. -func (l *Limiter) RequestLimiter() requestLimiter { - l.RLock() - defer l.RUnlock() - return l.requestLimiter -} - -// WriteGate is a safe getter for the write gate. -func (l *Limiter) WriteGate() gate.Gate { - l.RLock() - defer l.RUnlock() - return l.writeGate -} - -// ParseLimitConfigContent parses the limit configuration from the path or -// content. -func ParseLimitConfigContent(limitsConfig fileContent) (*RootLimitsConfig, error) { - if limitsConfig == nil { - return &RootLimitsConfig{}, nil - } - limitsContentYaml, err := limitsConfig.Content() - if err != nil { - return nil, errors.Wrap(err, "get content of limit configuration") - } - parsedConfig, err := ParseRootLimitConfig(limitsContentYaml) - if err != nil { - return nil, errors.Wrap(err, "parse limit configuration") + // Impose active series limit only if Receiver is in Router or RouterIngestor mode, and config is provided. + seriesLimitSupported := (r == RouterOnly || r == RouterIngestor) && (len(root.WriteLimits.TenantsLimits) != 0 || root.WriteLimits.DefaultLimits.HeadSeriesLimit != 0) + if seriesLimitSupported { + limiter.HeadSeriesLimiter = NewHeadSeriesLimit(root.WriteLimits, reg, logger) } - return parsedConfig, nil -} - -type nopConfigContent struct{} - -var _ fileContent = (*nopConfigContent)(nil) - -// Content returns no content and no error. -func (n nopConfigContent) Content() ([]byte, error) { - return nil, nil -} - -// Path returns an empty path. -func (n nopConfigContent) Path() string { - return "" -} -// NewNopConfig creates a no-op config content (no configuration). -func NewNopConfig() nopConfigContent { - return nopConfigContent{} + return limiter } diff --git a/pkg/receive/limiter_config.go b/pkg/receive/limiter_config.go index c3bd330b6e..67aa5ef93a 100644 --- a/pkg/receive/limiter_config.go +++ b/pkg/receive/limiter_config.go @@ -78,7 +78,6 @@ type DefaultLimitsConfig struct { HeadSeriesLimit uint64 `yaml:"head_series_limit"` } -// TenantsWriteLimitsConfig is a map of tenant IDs to their *WriteLimitConfig. type TenantsWriteLimitsConfig map[string]*WriteLimitConfig // A tenant might not always have limits configured, so things here must @@ -111,7 +110,8 @@ type requestLimitsConfig struct { SamplesLimit *int64 `yaml:"samples_limit"` } -func NewEmptyRequestLimitsConfig() *requestLimitsConfig { +// Utils for initializing. +func newEmptyRequestLimitsConfig() *requestLimitsConfig { return &requestLimitsConfig{} } diff --git a/pkg/receive/limiter_config_test.go b/pkg/receive/limiter_config_test.go index 3e32ea41e8..b080680162 100644 --- a/pkg/receive/limiter_config_test.go +++ b/pkg/receive/limiter_config_test.go @@ -35,7 +35,7 @@ func TestParseLimiterConfig(t *testing.T) { }, }, DefaultLimits: DefaultLimitsConfig{ - RequestLimits: *NewEmptyRequestLimitsConfig(). + RequestLimits: *newEmptyRequestLimitsConfig(). SetSizeBytesLimit(1024). SetSeriesLimit(1000). SetSamplesLimit(10), @@ -44,7 +44,7 @@ func TestParseLimiterConfig(t *testing.T) { TenantsLimits: TenantsWriteLimitsConfig{ "acme": NewEmptyWriteLimitConfig(). SetRequestLimits( - NewEmptyRequestLimitsConfig(). + newEmptyRequestLimitsConfig(). SetSizeBytesLimit(0). SetSeriesLimit(0). SetSamplesLimit(0), @@ -52,7 +52,7 @@ func TestParseLimiterConfig(t *testing.T) { SetHeadSeriesLimit(2000), "ajax": NewEmptyWriteLimitConfig(). SetRequestLimits( - NewEmptyRequestLimitsConfig(). + newEmptyRequestLimitsConfig(). SetSeriesLimit(50000). SetSamplesLimit(500), ), diff --git a/pkg/receive/limiter_test.go b/pkg/receive/limiter_test.go deleted file mode 100644 index be7e8790c1..0000000000 --- a/pkg/receive/limiter_test.go +++ /dev/null @@ -1,100 +0,0 @@ -// Copyright (c) The Thanos Authors. -// Licensed under the Apache License 2.0. - -package receive - -import ( - "context" - "os" - "path" - "testing" - "time" - - "github.com/thanos-io/thanos/pkg/extkingpin" - - "github.com/efficientgo/tools/core/pkg/testutil" - "github.com/go-kit/log" -) - -func TestLimiter_StartConfigReloader(t *testing.T) { - origLimitsFile, err := os.ReadFile(path.Join("testdata", "limits_config", "good_limits.yaml")) - testutil.Ok(t, err) - copyLimitsFile := path.Join(t.TempDir(), "limits.yaml") - testutil.Ok(t, os.WriteFile(copyLimitsFile, origLimitsFile, 0666)) - - goodLimits, err := extkingpin.NewStaticPathContent(copyLimitsFile) - if err != nil { - t.Fatalf("error trying to save static limit config: %s", err) - } - invalidLimitsPath := path.Join("./testdata", "limits_config", "invalid_limits.yaml") - invalidLimits, err := os.ReadFile(invalidLimitsPath) - if err != nil { - t.Fatalf("could not load test content at %s: %s", invalidLimitsPath, err) - } - - limiter, err := NewLimiter(goodLimits, nil, RouterIngestor, log.NewLogfmtLogger(os.Stdout)) - testutil.Ok(t, err) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - err = limiter.StartConfigReloader(ctx) - testutil.Ok(t, err) - - time.Sleep(1 * time.Second) - testutil.Ok(t, goodLimits.Rewrite(invalidLimits)) -} - -type emptyPathFile struct{} - -func (e emptyPathFile) Content() ([]byte, error) { - return []byte{}, nil -} - -func (e emptyPathFile) Path() string { - return "" -} - -func TestLimiter_CanReload(t *testing.T) { - validLimitsPath, err := extkingpin.NewStaticPathContent( - path.Join("testdata", "limits_config", "good_limits.yaml"), - ) - testutil.Ok(t, err) - emptyLimitsPath := emptyPathFile{} - - type args struct { - configFilePath fileContent - } - tests := []struct { - name string - args args - wantReload bool - }{ - { - name: "Nil config file path cannot be reloaded", - args: args{configFilePath: nil}, - wantReload: false, - }, - { - name: "Empty config file path cannot be reloaded", - args: args{configFilePath: emptyLimitsPath}, - wantReload: false, - }, - { - name: "Valid config file path can be reloaded", - args: args{configFilePath: validLimitsPath}, - wantReload: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - configFile := tt.args.configFilePath - limiter, err := NewLimiter(configFile, nil, RouterIngestor, log.NewLogfmtLogger(os.Stdout)) - testutil.Ok(t, err) - if tt.wantReload { - testutil.Assert(t, limiter.CanReload()) - } else { - testutil.Assert(t, !limiter.CanReload()) - } - }) - } -} diff --git a/pkg/receive/request_limiter.go b/pkg/receive/request_limiter.go index 7da0c64a6d..de7554de2f 100644 --- a/pkg/receive/request_limiter.go +++ b/pkg/receive/request_limiter.go @@ -14,7 +14,7 @@ const ( sizeBytesLimitName = "body_size" ) -var unlimitedRequestLimitsConfig = NewEmptyRequestLimitsConfig(). +var unlimitedRequestLimitsConfig = newEmptyRequestLimitsConfig(). SetSizeBytesLimit(0). SetSeriesLimit(0). SetSamplesLimit(0) @@ -49,12 +49,7 @@ func newConfigRequestLimiter(reg prometheus.Registerer, writeLimits *WriteLimits tenantLimits: tenantRequestLimits, cachedDefaultLimits: defaultRequestLimits, } - limiter.registerMetrics(reg) - return &limiter -} - -func (l *configRequestLimiter) registerMetrics(reg prometheus.Registerer) { - l.limitsHit = promauto.With(reg).NewSummaryVec( + limiter.limitsHit = promauto.With(reg).NewSummaryVec( prometheus.SummaryOpts{ Namespace: "thanos", Subsystem: "receive", @@ -63,7 +58,7 @@ func (l *configRequestLimiter) registerMetrics(reg prometheus.Registerer) { Objectives: map[float64]float64{0.50: 0.1, 0.95: 0.1, 0.99: 0.001}, }, []string{"tenant", "limit"}, ) - l.configuredLimits = promauto.With(reg).NewGaugeVec( + limiter.configuredLimits = promauto.With(reg).NewGaugeVec( prometheus.GaugeOpts{ Namespace: "thanos", Subsystem: "receive", @@ -71,14 +66,16 @@ func (l *configRequestLimiter) registerMetrics(reg prometheus.Registerer) { Help: "The configured write limits.", }, []string{"tenant", "limit"}, ) - for tenant, limits := range l.tenantLimits { - l.configuredLimits.WithLabelValues(tenant, sizeBytesLimitName).Set(float64(*limits.SizeBytesLimit)) - l.configuredLimits.WithLabelValues(tenant, seriesLimitName).Set(float64(*limits.SeriesLimit)) - l.configuredLimits.WithLabelValues(tenant, samplesLimitName).Set(float64(*limits.SamplesLimit)) + for tenant, limits := range tenantRequestLimits { + limiter.configuredLimits.WithLabelValues(tenant, sizeBytesLimitName).Set(float64(*limits.SizeBytesLimit)) + limiter.configuredLimits.WithLabelValues(tenant, seriesLimitName).Set(float64(*limits.SeriesLimit)) + limiter.configuredLimits.WithLabelValues(tenant, samplesLimitName).Set(float64(*limits.SamplesLimit)) } - l.configuredLimits.WithLabelValues("", sizeBytesLimitName).Set(float64(*l.cachedDefaultLimits.SizeBytesLimit)) - l.configuredLimits.WithLabelValues("", seriesLimitName).Set(float64(*l.cachedDefaultLimits.SeriesLimit)) - l.configuredLimits.WithLabelValues("", samplesLimitName).Set(float64(*l.cachedDefaultLimits.SamplesLimit)) + limiter.configuredLimits.WithLabelValues("", sizeBytesLimitName).Set(float64(*defaultRequestLimits.SizeBytesLimit)) + limiter.configuredLimits.WithLabelValues("", seriesLimitName).Set(float64(*defaultRequestLimits.SeriesLimit)) + limiter.configuredLimits.WithLabelValues("", samplesLimitName).Set(float64(*defaultRequestLimits.SamplesLimit)) + + return &limiter } func (l *configRequestLimiter) AllowSizeBytes(tenant string, contentLengthBytes int64) bool { @@ -103,7 +100,7 @@ func (l *configRequestLimiter) AllowSeries(tenant string, amount int64) bool { } allowed := *limit >= amount - if !allowed && l.limitsHit != nil { + if !allowed { l.limitsHit. WithLabelValues(tenant, seriesLimitName). Observe(float64(amount - *limit)) @@ -117,7 +114,7 @@ func (l *configRequestLimiter) AllowSamples(tenant string, amount int64) bool { return true } allowed := *limit >= amount - if !allowed && l.limitsHit != nil { + if !allowed { l.limitsHit. WithLabelValues(tenant, samplesLimitName). Observe(float64(amount - *limit)) diff --git a/pkg/receive/request_limiter_test.go b/pkg/receive/request_limiter_test.go index dfbea066d9..e654cd1cdf 100644 --- a/pkg/receive/request_limiter_test.go +++ b/pkg/receive/request_limiter_test.go @@ -15,12 +15,12 @@ func TestRequestLimiter_limitsFor(t *testing.T) { limits := WriteLimitsConfig{ DefaultLimits: DefaultLimitsConfig{ - RequestLimits: *NewEmptyRequestLimitsConfig(). + RequestLimits: *newEmptyRequestLimitsConfig(). SetSeriesLimit(10), }, TenantsLimits: TenantsWriteLimitsConfig{ tenantWithLimits: &WriteLimitConfig{ - RequestLimits: NewEmptyRequestLimitsConfig(). + RequestLimits: newEmptyRequestLimitsConfig(). SetSeriesLimit(30), }, }, @@ -33,7 +33,7 @@ func TestRequestLimiter_limitsFor(t *testing.T) { { name: "Gets the default limits when tenant's limits aren't present", tenant: tenantWithoutLimits, - wantLimits: NewEmptyRequestLimitsConfig(). + wantLimits: newEmptyRequestLimitsConfig(). SetSeriesLimit(10). SetSamplesLimit(0). SetSizeBytesLimit(0), @@ -41,7 +41,7 @@ func TestRequestLimiter_limitsFor(t *testing.T) { { name: "Gets the tenant's limits when it is present", tenant: tenantWithLimits, - wantLimits: NewEmptyRequestLimitsConfig(). + wantLimits: newEmptyRequestLimitsConfig(). SetSeriesLimit(30). SetSamplesLimit(0). SetSizeBytesLimit(0), @@ -102,11 +102,11 @@ func TestRequestLimiter_AllowRequestBodySizeBytes(t *testing.T) { tenant := "tenant" limits := WriteLimitsConfig{ DefaultLimits: DefaultLimitsConfig{ - RequestLimits: *NewEmptyRequestLimitsConfig().SetSeriesLimit(10), + RequestLimits: *newEmptyRequestLimitsConfig().SetSeriesLimit(10), }, TenantsLimits: TenantsWriteLimitsConfig{ tenant: &WriteLimitConfig{ - RequestLimits: NewEmptyRequestLimitsConfig().SetSizeBytesLimit(tt.sizeByteLimit), + RequestLimits: newEmptyRequestLimitsConfig().SetSizeBytesLimit(tt.sizeByteLimit), }, }, } @@ -159,11 +159,11 @@ func TestRequestLimiter_AllowSeries(t *testing.T) { tenant := "tenant" limits := WriteLimitsConfig{ DefaultLimits: DefaultLimitsConfig{ - RequestLimits: *NewEmptyRequestLimitsConfig().SetSeriesLimit(10), + RequestLimits: *newEmptyRequestLimitsConfig().SetSeriesLimit(10), }, TenantsLimits: TenantsWriteLimitsConfig{ tenant: &WriteLimitConfig{ - RequestLimits: NewEmptyRequestLimitsConfig().SetSeriesLimit(tt.seriesLimit), + RequestLimits: newEmptyRequestLimitsConfig().SetSeriesLimit(tt.seriesLimit), }, }, } @@ -217,11 +217,11 @@ func TestRequestLimiter_AllowSamples(t *testing.T) { tenant := "tenant" limits := WriteLimitsConfig{ DefaultLimits: DefaultLimitsConfig{ - RequestLimits: *NewEmptyRequestLimitsConfig().SetSeriesLimit(10), + RequestLimits: *newEmptyRequestLimitsConfig().SetSeriesLimit(10), }, TenantsLimits: TenantsWriteLimitsConfig{ tenant: &WriteLimitConfig{ - RequestLimits: NewEmptyRequestLimitsConfig().SetSamplesLimit(tt.samplesLimit), + RequestLimits: newEmptyRequestLimitsConfig().SetSamplesLimit(tt.samplesLimit), }, }, } diff --git a/pkg/receive/testdata/limits.yaml b/pkg/receive/testdata/limits.yaml deleted file mode 100644 index 2345756179..0000000000 --- a/pkg/receive/testdata/limits.yaml +++ /dev/null @@ -1,22 +0,0 @@ -write: - global: - max_concurrency: 30 - meta_monitoring_url: "http://localhost:9090" - meta_monitoring_limit_query: "sum(prometheus_tsdb_head_series) by (tenant)" - default: - request: - size_bytes_limit: 1024 - series_limit: 1000 - samples_limit: 10 - head_series_limit: 1000 - tenants: - acme: - request: - size_bytes_limit: 0 - series_limit: 0 - samples_limit: 0 - head_series_limit: 2000 - ajax: - request: - series_limit: 50000 - samples_limit: 500 diff --git a/pkg/receive/testdata/limits_config/invalid_limits.yaml b/pkg/receive/testdata/limits_config/invalid_limits.yaml deleted file mode 100644 index 74db0453f8..0000000000 --- a/pkg/receive/testdata/limits_config/invalid_limits.yaml +++ /dev/null @@ -1,17 +0,0 @@ -write: - global: - max_concurrency: 30 - request: - size_bytes_limit: 1024 - series_limit: 1000 - samples_limit: 10 - tenants: - acme: - request: - size_bytes_limit: 0 - series_limit: 0 - samples_limit: 0 - ajax: - request: - series_limit: 50000 - samples_limit: 500