Skip to content

Commit

Permalink
Implement series limit using ingester own series (#6718)
Browse files Browse the repository at this point in the history
Ingester can now track "owned series" and use this for tenant series limit, instead of using in-memory series.

This helps users to avoid hitting limits on ingester scale up, scale down, or when changing user's ingester shard size -- in each of these cases, ingester will recompute owned series based on updated ring.

This feature is currently limited to setup with multizone-ingesters, where number of zones is equal to replication factor.

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
Co-authored-by: Patryk Prus <patryk.prus@grafana.com>
  • Loading branch information
pstibrany and pr00se authored Nov 28, 2023
1 parent 63f2f62 commit 6f421c7
Show file tree
Hide file tree
Showing 21 changed files with 1,112 additions and 92 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
* [FEATURE] Add the experimental `-<prefix>.s3.send-content-md5` flag (defaults to `false`) to configure S3 Put Object requests to send a `Content-MD5` header. Setting this flag is not recommended unless your object storage does not support checksums. #6622
* [FEATURE] Distributor: add an experimental flag `-distributor.reusable-ingester-push-worker` that can be used to pre-allocate a pool of workers to be used to send push requests to the ingesters. #6660
* [FEATURE] Distributor: Support enabling of automatically generated name suffixes for metrics ingested via OTLP, through the flag `-distributor.otel-metric-suffixes-enabled`. #6542
* [FEATURE] Ingester: ingester can now track which of the user's series the ingester actually owns according to the ring, and only consider owned series when checking for user series limit. This helps to avoid hitting the user's series limit when scaling up ingesters or changing user's ingester shard size. Feature is currently experimental, and disabled by default. It can be enabled by setting `-ingester.use-ingester-owned-series-for-limits` (to use owned series for limiting). This is currently limited to multi-zone ingester setup, with replication factor being equal to number of zones. #6718
* [ENHANCEMENT] Query-frontend: don't treat cancel as an error. #4648
* [ENHANCEMENT] Ingester: exported summary `cortex_ingester_inflight_push_requests_summary` tracking total number of inflight requests in percentile buckets. #5845
* [ENHANCEMENT] Query-scheduler: add `cortex_query_scheduler_enqueue_duration_seconds` metric that records the time taken to enqueue or reject a query request. #5879
Expand Down
33 changes: 33 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -3031,6 +3031,39 @@
"fieldFlag": "ingester.return-only-grpc-errors",
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "use_ingester_owned_series_for_limits",
"required": false,
"desc": "When enabled, only series currently owned by ingester according to the ring are used when checking user per-tenant series limit.",
"fieldValue": null,
"fieldDefaultValue": false,
"fieldFlag": "ingester.use-ingester-owned-series-for-limits",
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "track_ingester_owned_series",
"required": false,
"desc": "This option enables tracking of ingester-owned series based on ring state, even if -ingester.use-ingester-owned-series-for-limits is disabled.",
"fieldValue": null,
"fieldDefaultValue": false,
"fieldFlag": "ingester.track-ingester-owned-series",
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "owned_series_update_interval",
"required": false,
"desc": "How often to check for ring changes and possibly recompute owned series as a result of detected change.",
"fieldValue": null,
"fieldDefaultValue": 15000000000,
"fieldFlag": "ingester.owned-series-update-interval",
"fieldType": "duration",
"fieldCategory": "experimental"
}
],
"fieldValue": null,
Expand Down
6 changes: 6 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1345,6 +1345,8 @@ Usage of ./cmd/mimir/mimir:
[experimental] Whether the shipper should label out-of-order blocks with an external label before uploading them. Setting this label will compact out-of-order blocks separately from non-out-of-order blocks
-ingester.out-of-order-time-window duration
[experimental] Non-zero value enables out-of-order support for most recent samples that are within the time window in relation to the TSDB's maximum time, i.e., within [db.maxTime-timeWindow, db.maxTime]). The ingester will need more memory as a factor of rate of out-of-order samples being ingested and the number of series that are getting out-of-order samples. If query falls into this window, cached results will use value from -query-frontend.results-cache-ttl-for-out-of-order-time-window option to specify TTL for resulting cache entry.
-ingester.owned-series-update-interval duration
[experimental] How often to check for ring changes and possibly recompute owned series as a result of detected change. (default 15s)
-ingester.rate-update-period duration
Period with which to update the per-tenant ingestion rates. (default 15s)
-ingester.read-path-cpu-utilization-limit float
Expand Down Expand Up @@ -1447,8 +1449,12 @@ Usage of ./cmd/mimir/mimir:
True to enable the zone-awareness and replicate ingested samples across different availability zones. This option needs be set on ingesters, distributors, queriers and rulers when running in microservices mode.
-ingester.stream-chunks-when-using-blocks
Stream chunks from ingesters to queriers. (default true)
-ingester.track-ingester-owned-series
[experimental] This option enables tracking of ingester-owned series based on ring state, even if -ingester.use-ingester-owned-series-for-limits is disabled.
-ingester.tsdb-config-update-period duration
[experimental] Period with which to update the per-tenant TSDB configuration. (default 15s)
-ingester.use-ingester-owned-series-for-limits
[experimental] When enabled, only series currently owned by ingester according to the ring are used when checking user per-tenant series limit.
-log.buffered
[deprecated] Use a buffered logger to reduce write contention. (default true)
-log.format string
Expand Down
4 changes: 4 additions & 0 deletions docs/sources/mimir/configure/about-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ The following features are currently experimental:
- `-distributor.limit-inflight-requests-using-grpc-method-limiter`
- `-ingester.limit-inflight-requests-using-grpc-method-limiter`
- Logging of requests that did not send any HTTP request: `-server.http-log-closed-connections-without-response-enabled`.
- Ingester: track "owned series" and use owned series instead of in-memory series for tenant limits.
- `-ingester.use-ingester-owned-series-for-limits`
- `-ingester.track-ingester-owned-series`
- `-ingester.owned-series-update-interval`

## Deprecated features

Expand Down
16 changes: 16 additions & 0 deletions docs/sources/mimir/references/configuration-parameters/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1169,6 +1169,22 @@ instance_limits:
# (experimental) When enabled only gRPC errors will be returned by the ingester.
# CLI flag: -ingester.return-only-grpc-errors
[return_only_grpc_errors: <boolean> | default = false]
# (experimental) When enabled, only series currently owned by ingester according
# to the ring are used when checking user per-tenant series limit.
# CLI flag: -ingester.use-ingester-owned-series-for-limits
[use_ingester_owned_series_for_limits: <boolean> | default = false]
# (experimental) This option enables tracking of ingester-owned series based on
# ring state, even if -ingester.use-ingester-owned-series-for-limits is
# disabled.
# CLI flag: -ingester.track-ingester-owned-series
[track_ingester_owned_series: <boolean> | default = false]
# (experimental) How often to check for ring changes and possibly recompute
# owned series as a result of detected change.
# CLI flag: -ingester.owned-series-update-interval
[owned_series_update_interval: <duration> | default = 15s]
```

### querier
Expand Down
28 changes: 2 additions & 26 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,35 +573,11 @@ func (d *Distributor) stopping(_ error) error {
}

func (d *Distributor) tokenForLabels(userID string, labels []mimirpb.LabelAdapter) uint32 {
return shardByAllLabels(userID, labels)
return mimirpb.ShardByAllLabelAdapters(userID, labels)
}

func (d *Distributor) tokenForMetadata(userID string, metricName string) uint32 {
return shardByMetricName(userID, metricName)
}

// shardByMetricName returns the token for the given metric. The provided metricName
// is guaranteed to not be retained.
func shardByMetricName(userID string, metricName string) uint32 {
h := shardByUser(userID)
h = ingester_client.HashAdd32(h, metricName)
return h
}

func shardByUser(userID string) uint32 {
h := ingester_client.HashNew32()
h = ingester_client.HashAdd32(h, userID)
return h
}

// This function generates different values for different order of same labels.
func shardByAllLabels(userID string, labels []mimirpb.LabelAdapter) uint32 {
h := shardByUser(userID)
for _, label := range labels {
h = ingester_client.HashAdd32(h, label.Name)
h = ingester_client.HashAdd32(h, label.Value)
}
return h
return mimirpb.ShardByMetricName(userID, metricName)
}

// Returns a boolean that indicates whether or not we want to remove the replica label going forward,
Expand Down
21 changes: 2 additions & 19 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3818,7 +3818,7 @@ func (i *mockIngester) Push(ctx context.Context, req *mimirpb.WriteRequest, _ ..
}

for _, series := range req.Timeseries {
hash := shardByAllLabels(orgid, series.Labels)
hash := mimirpb.ShardByAllLabelAdapters(orgid, series.Labels)
existing, ok := i.timeseries[hash]
if !ok {
// Make a copy because the request Timeseries are reused
Expand All @@ -3840,7 +3840,7 @@ func (i *mockIngester) Push(ctx context.Context, req *mimirpb.WriteRequest, _ ..
}

for _, m := range req.Metadata {
hash := shardByMetricName(orgid, m.MetricFamilyName)
hash := mimirpb.ShardByMetricName(orgid, m.MetricFamilyName)
set, ok := i.metadata[hash]
if !ok {
set = map[mimirpb.MetricMetadata]struct{}{}
Expand Down Expand Up @@ -4540,23 +4540,6 @@ func TestDistributorValidation(t *testing.T) {
}
}

// This is not great, but we deal with unsorted labels in prePushRelabelMiddleware.
func TestShardByAllLabelsReturnsWrongResultsForUnsortedLabels(t *testing.T) {
val1 := shardByAllLabels("test", []mimirpb.LabelAdapter{
{Name: "__name__", Value: "foo"},
{Name: "bar", Value: "baz"},
{Name: "sample", Value: "1"},
})

val2 := shardByAllLabels("test", []mimirpb.LabelAdapter{
{Name: "__name__", Value: "foo"},
{Name: "sample", Value: "1"},
{Name: "bar", Value: "baz"},
})

assert.NotEqual(t, val1, val2)
}

func TestDistributor_Push_Relabel(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "user")

Expand Down
91 changes: 69 additions & 22 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ type Config struct {
ErrorSampleRate int64 `yaml:"error_sample_rate" json:"error_sample_rate" category:"experimental"`

ReturnOnlyGRPCErrors bool `yaml:"return_only_grpc_errors" json:"return_only_grpc_errors" category:"experimental"`

UseIngesterOwnedSeriesForLimits bool `yaml:"use_ingester_owned_series_for_limits" category:"experimental"`
UpdateIngesterOwnedSeries bool `yaml:"track_ingester_owned_series" category:"experimental"`
OwnedSeriesUpdateInterval time.Duration `yaml:"owned_series_update_interval" category:"experimental"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand All @@ -194,6 +198,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
f.BoolVar(&cfg.LimitInflightRequestsUsingGrpcMethodLimiter, "ingester.limit-inflight-requests-using-grpc-method-limiter", false, "Use experimental method of limiting push requests.")
f.Int64Var(&cfg.ErrorSampleRate, "ingester.error-sample-rate", 0, "Each error will be logged once in this many times. Use 0 to log all of them.")
f.BoolVar(&cfg.ReturnOnlyGRPCErrors, "ingester.return-only-grpc-errors", false, "When enabled only gRPC errors will be returned by the ingester.")
f.BoolVar(&cfg.UseIngesterOwnedSeriesForLimits, "ingester.use-ingester-owned-series-for-limits", false, "When enabled, only series currently owned by ingester according to the ring are used when checking user per-tenant series limit.")
f.BoolVar(&cfg.UpdateIngesterOwnedSeries, "ingester.track-ingester-owned-series", false, "This option enables tracking of ingester-owned series based on ring state, even if -ingester.use-ingester-owned-series-for-limits is disabled.")
f.DurationVar(&cfg.OwnedSeriesUpdateInterval, "ingester.owned-series-update-interval", 15*time.Second, "How often to check for ring changes and possibly recompute owned series as a result of detected change.")
}

func (cfg *Config) Validate() error {
Expand Down Expand Up @@ -239,6 +246,7 @@ type Ingester struct {
limits *validation.Overrides
limiter *Limiter
subservicesWatcher *services.FailureWatcher
ownedSeriesService *ownedSeriesService

// Mimir blocks storage.
tsdbsMtx sync.RWMutex
Expand Down Expand Up @@ -333,7 +341,7 @@ func newIngester(cfg Config, limits *validation.Overrides, registerer prometheus
}

// New returns an Ingester that uses Mimir block storage.
func New(cfg Config, limits *validation.Overrides, activeGroupsCleanupService *util.ActiveGroupsCleanupService, registerer prometheus.Registerer, logger log.Logger) (*Ingester, error) {
func New(cfg Config, limits *validation.Overrides, ingestersRing ring.ReadRing, activeGroupsCleanupService *util.ActiveGroupsCleanupService, registerer prometheus.Registerer, logger log.Logger) (*Ingester, error) {
i, err := newIngester(cfg, limits, registerer, logger)
if err != nil {
return nil, err
Expand Down Expand Up @@ -366,6 +374,10 @@ func New(cfg Config, limits *validation.Overrides, activeGroupsCleanupService *u
i.subservicesWatcher = services.NewFailureWatcher()
i.subservicesWatcher.WatchService(i.lifecycler)

if cfg.UseIngesterOwnedSeriesForLimits || cfg.UpdateIngesterOwnedSeries {
i.ownedSeriesService = newOwnedSeriesService(i.cfg.OwnedSeriesUpdateInterval, i.lifecycler.ID, ingestersRing, log.With(i.logger, "component", "owned series"), registerer, i.limits.IngestionTenantShardSize, i.getTSDBUsers, i.getTSDB)
}

// Init the limter and instantiate the user states which depend on it
i.limiter = NewLimiter(
limits,
Expand Down Expand Up @@ -461,6 +473,10 @@ func (i *Ingester) starting(ctx context.Context) error {
servs = append(servs, i.utilizationBasedLimiter)
}

if i.ownedSeriesService != nil {
servs = append(servs, i.ownedSeriesService)
}

shutdownMarkerPath := shutdownmarker.GetPath(i.cfg.BlocksStorageConfig.TSDB.Dir)
shutdownMarkerFound, err := shutdownmarker.Exists(shutdownMarkerPath)
if err != nil {
Expand Down Expand Up @@ -553,8 +569,8 @@ func (i *Ingester) updateLoop(ctx context.Context) error {
usageStatsUpdateTicker := time.NewTicker(usageStatsUpdateInterval)
defer usageStatsUpdateTicker.Stop()

localLimitMetricUpdateTicker := time.NewTicker(time.Second * 15)
defer localLimitMetricUpdateTicker.Stop()
limitMetricsUpdateTicker := time.NewTicker(time.Second * 15)
defer limitMetricsUpdateTicker.Stop()

for {
select {
Expand All @@ -575,8 +591,8 @@ func (i *Ingester) updateLoop(ctx context.Context) error {
i.updateActiveSeries(time.Now())
case <-usageStatsUpdateTicker.C:
i.updateUsageStats()
case <-localLimitMetricUpdateTicker.C:
i.updateLocalLimitMetrics()
case <-limitMetricsUpdateTicker.C:
i.updateMetrics()
case <-ctx.Done():
return nil
case err := <-i.subservicesWatcher.Chan():
Expand Down Expand Up @@ -697,7 +713,7 @@ func (i *Ingester) updateUsageStats() {
func (i *Ingester) applyTSDBSettings() {
for _, userID := range i.getTSDBUsers() {
globalValue := i.limits.MaxGlobalExemplarsPerUser(userID)
localValue := i.limiter.convertGlobalToLocalLimit(userID, globalValue)
localValue := i.limiter.convertGlobalToLocalLimit(i.limiter.getShardSize(userID), globalValue)

oooTW := i.limits.OutOfOrderTimeWindow(userID)
if oooTW < 0 {
Expand Down Expand Up @@ -734,12 +750,25 @@ func (i *Ingester) applyTSDBSettings() {
}
}

func (i *Ingester) updateLocalLimitMetrics() {
func (i *Ingester) updateMetrics() {
for _, userID := range i.getTSDBUsers() {
localValue := i.limiter.maxSeriesPerUser(userID)
db := i.getTSDB(userID)
if db == nil {
continue
}

// update metrics
i.metrics.maxLocalSeriesPerUser.WithLabelValues(userID).Set(float64(localValue))
localLimitShards := i.limiter.getShardSize(userID)
if i.cfg.UseIngesterOwnedSeriesForLimits || i.cfg.UpdateIngesterOwnedSeries {
ownedSeries, shards := db.ownedSeriesAndShards()
i.metrics.ownedSeriesPerUser.WithLabelValues(userID).Set(float64(ownedSeries))

if i.cfg.UseIngesterOwnedSeriesForLimits {
localLimitShards = shards
}
}

localLimit := i.limiter.maxSeriesPerUser(userID, localLimitShards)
i.metrics.maxLocalSeriesPerUser.WithLabelValues(userID).Set(float64(localLimit))
}
}

Expand Down Expand Up @@ -2203,18 +2232,21 @@ func (i *Ingester) createTSDB(userID string, walReplayConcurrency int) (*userTSD
matchersConfig := i.limits.ActiveSeriesCustomTrackersConfig(userID)

userDB := &userTSDB{
userID: userID,
activeSeries: activeseries.NewActiveSeries(activeseries.NewMatchers(matchersConfig), i.cfg.ActiveSeriesMetrics.IdleTimeout),
seriesInMetric: newMetricCounter(i.limiter, i.cfg.getIgnoreSeriesLimitForMetricNamesMap()),
ingestedAPISamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod),
ingestedRuleSamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod),
instanceLimitsFn: i.getInstanceLimits,
instanceSeriesCount: &i.seriesCount,
instanceErrors: i.metrics.rejected,
blockMinRetention: i.cfg.BlocksStorageConfig.TSDB.Retention,
}

maxExemplars := i.limiter.convertGlobalToLocalLimit(userID, i.limits.MaxGlobalExemplarsPerUser(userID))
userID: userID,
activeSeries: activeseries.NewActiveSeries(activeseries.NewMatchers(matchersConfig), i.cfg.ActiveSeriesMetrics.IdleTimeout),
seriesInMetric: newMetricCounter(i.limiter, i.cfg.getIgnoreSeriesLimitForMetricNamesMap()),
ingestedAPISamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod),
ingestedRuleSamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod),
instanceLimitsFn: i.getInstanceLimits,
instanceSeriesCount: &i.seriesCount,
instanceErrors: i.metrics.rejected,
blockMinRetention: i.cfg.BlocksStorageConfig.TSDB.Retention,
useOwnedSeriesForLimits: i.cfg.UseIngesterOwnedSeriesForLimits,
ownedSeriesShardSize: i.limits.IngestionTenantShardSize(userID), // initialize series shard size so that it's correct even before we update ownedSeries for the first time (during WAL replay).
}
userDB.triggerRecomputeOwnedSeries(recomputeOwnedSeriesReasonNewUser)

maxExemplars := i.limiter.convertGlobalToLocalLimit(i.limits.IngestionTenantShardSize(userID), i.limits.MaxGlobalExemplarsPerUser(userID))
oooTW := i.limits.OutOfOrderTimeWindow(userID)
// Create a new user database
db, err := tsdb.Open(udir, userLogger, tsdbPromReg, &tsdb.Options{
Expand Down Expand Up @@ -2248,6 +2280,7 @@ func (i *Ingester) createTSDB(userID string, walReplayConcurrency int) (*userTSD
BlockPostingsForMatchersCacheMaxBytes: i.cfg.BlocksStorageConfig.TSDB.BlockPostingsForMatchersCacheMaxBytes,
BlockPostingsForMatchersCacheForce: i.cfg.BlocksStorageConfig.TSDB.BlockPostingsForMatchersCacheForce,
EnableNativeHistograms: i.limits.NativeHistogramsIngestionEnabled(userID),
SecondaryHashFunction: secondaryTSDBHashFunctionForUser(userID),
}, nil)
if err != nil {
return nil, errors.Wrapf(err, "failed to open TSDB: %s", udir)
Expand Down Expand Up @@ -2704,6 +2737,8 @@ func (i *Ingester) compactBlocks(ctx context.Context, force bool, forcedCompacti

i.metrics.compactionsTriggered.Inc()

minTimeBefore := userDB.Head().MinTime()

reason := ""
switch {
case force:
Expand All @@ -2729,6 +2764,18 @@ func (i *Ingester) compactBlocks(ctx context.Context, force bool, forcedCompacti
level.Debug(i.logger).Log("msg", "TSDB blocks compaction completed successfully", "user", userID, "compactReason", reason)
}

minTimeAfter := userDB.Head().MinTime()

// If head was compacted, its MinTime has changed. We need to recalculate series owned by this ingester,
// because in-memory series are removed during compaction.
if minTimeBefore != minTimeAfter {
r := recomputeOwnedSeriesReasonCompaction
if force && forcedCompactionMaxTime != math.MaxInt64 {
r = recomputeOwnedSeriesReasonEarlyCompaction
}
userDB.triggerRecomputeOwnedSeries(r)
}

return nil
})
}
Expand Down
Loading

0 comments on commit 6f421c7

Please sign in to comment.