diff --git a/CHANGELOG.md b/CHANGELOG.md index 607d3b5599c..baa4f0a6480 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -59,6 +59,7 @@ * [FEATURE] Ingester: add new metrics for tracking native histograms in active series: `cortex_ingester_active_native_histogram_series`, `cortex_ingester_active_native_histogram_series_custom_tracker`, `cortex_ingester_active_native_histogram_buckets`, `cortex_ingester_active_native_histogram_buckets_custom_tracker`. The first 2 are the subsets of the existing and unmodified `cortex_ingester_active_series` and `cortex_ingester_active_series_custom_tracker` respectively, only tracking native histogram series, and the last 2 are the equivalents for tracking the number of buckets in native histogram series. #5318 * [FEATURE] Add experimental CLI flag `-.s3.native-aws-auth-enabled` that allows to enable the default credentials provider chain of the AWS SDK. #5636 * [FEATURE] Distributor: add experimental support for circuit breaking when writing to ingesters via `-ingester.client.circuit-breaker.enabled`, `-ingester.client.circuit-breaker.failure-threshold`, or `-ingester.client.circuit-breaker.cooldown-period` or their corresponding YAML. #5650 +* [FEATURE] Querier: add experimental CLI flag `-tenant-federation.max-concurrent` to adjust the max number of per-tenant queries that can be run at a time when executing a single multi-tenant query. #5874 * [ENHANCEMENT] Overrides-exporter: Add new metrics for write path and alertmanager (`max_global_metadata_per_user`, `max_global_metadata_per_metric`, `request_rate`, `request_burst_size`, `alertmanager_notification_rate_limit`, `alertmanager_max_dispatcher_aggregation_groups`, `alertmanager_max_alerts_count`, `alertmanager_max_alerts_size_bytes`) and added flag `-overrides-exporter.enabled-metrics` to explicitly configure desired metrics, e.g. `-overrides-exporter.enabled-metrics=request_rate,ingestion_rate`. Default value for this flag is: `ingestion_rate,ingestion_burst_size,max_global_series_per_user,max_global_series_per_metric,max_global_exemplars_per_user,max_fetched_chunks_per_query,max_fetched_series_per_query,ruler_max_rules_per_rule_group,ruler_max_rule_groups_per_tenant`. #5376 * [ENHANCEMENT] Cardinality API: When zone aware replication is enabled, the label values cardinality API can now tolerate single zone failure #5178 * [ENHANCEMENT] Distributor: optimize sending requests to ingesters when incoming requests don't need to be modified. For now this feature can be disabled by setting `-timeseries-unmarshal-caching-optimization-enabled=false`. #5137 diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index 1f00349de26..9ef48248030 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -9284,6 +9284,17 @@ "fieldDefaultValue": false, "fieldFlag": "tenant-federation.enabled", "fieldType": "boolean" + }, + { + "kind": "field", + "name": "max_concurrent", + "required": false, + "desc": "The number of workers used for each tenant federated query. This setting limits the maximum number of per-tenant queries executed at a time for a tenant federated query.", + "fieldValue": null, + "fieldDefaultValue": 16, + "fieldFlag": "tenant-federation.max-concurrent", + "fieldType": "int", + "fieldCategory": "experimental" } ], "fieldValue": null, diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 91c024377d7..2e753e82634 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -2595,6 +2595,8 @@ Usage of ./cmd/mimir/mimir: Comma-separated list of components to include in the instantiated process. The default value 'all' includes all components that are required to form a functional Grafana Mimir instance in single-binary mode. Use the '-modules' command line flag to get a list of available components, and to see which components are included with 'all'. (default all) -tenant-federation.enabled If enabled on all services, queries can be federated across multiple tenants. The tenant IDs involved need to be specified separated by a '|' character in the 'X-Scope-OrgID' header. + -tenant-federation.max-concurrent int + [experimental] The number of workers used for each tenant federated query. This setting limits the maximum number of per-tenant queries executed at a time for a tenant federated query. (default 16) -timeseries-unmarshal-caching-optimization-enabled [experimental] Enables optimized marshaling of timeseries. (default true) -usage-stats.enabled diff --git a/docs/sources/mimir/configure/about-versioning.md b/docs/sources/mimir/configure/about-versioning.md index de1633d0b32..27e16b23f4e 100644 --- a/docs/sources/mimir/configure/about-versioning.md +++ b/docs/sources/mimir/configure/about-versioning.md @@ -113,6 +113,7 @@ The following features are currently experimental: - Streaming chunks from store-gateway to querier (`-querier.prefer-streaming-chunks-from-store-gateways`, `-querier.streaming-chunks-per-store-gateway-buffer-size`) - Ingester query request minimisation (`-querier.minimize-ingester-requests`, `-querier.minimize-ingester-requests-hedging-delay`) - Limiting queries based on the estimated number of chunks that will be used (`-querier.max-estimated-fetched-chunks-per-query-multiplier`) + - Max concurrency for tenant federated queries (`-tenant-federation.max-concurrent`) - Query-frontend - `-query-frontend.querier-forget-delay` - Instant query splitting (`-query-frontend.split-instant-queries-by-interval`) diff --git a/docs/sources/mimir/references/configuration-parameters/index.md b/docs/sources/mimir/references/configuration-parameters/index.md index 49fab12742d..724350fad9d 100644 --- a/docs/sources/mimir/references/configuration-parameters/index.md +++ b/docs/sources/mimir/references/configuration-parameters/index.md @@ -198,6 +198,12 @@ tenant_federation: # CLI flag: -tenant-federation.enabled [enabled: | default = false] + # (experimental) The number of workers used for each tenant federated query. + # This setting limits the maximum number of per-tenant queries executed at a + # time for a tenant federated query. + # CLI flag: -tenant-federation.max-concurrent + [max_concurrent: | default = 16] + activity_tracker: # File where ongoing activities are stored. If empty, activity tracking is # disabled. diff --git a/pkg/mimir/modules.go b/pkg/mimir/modules.go index af26db37a71..48aaa2a0439 100644 --- a/pkg/mimir/modules.go +++ b/pkg/mimir/modules.go @@ -409,9 +409,9 @@ func (t *Mimir) initTenantFederation() (serv services.Service, err error) { // single tenant. This allows for a less impactful enabling of tenant // federation. const bypassForSingleQuerier = true - t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, bypassForSingleQuerier, util_log.Logger)) - t.ExemplarQueryable = tenantfederation.NewExemplarQueryable(t.ExemplarQueryable, bypassForSingleQuerier, util_log.Logger) - t.MetadataSupplier = tenantfederation.NewMetadataSupplier(t.MetadataSupplier, util_log.Logger) + t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, bypassForSingleQuerier, t.Cfg.TenantFederation.MaxConcurrent, util_log.Logger)) + t.ExemplarQueryable = tenantfederation.NewExemplarQueryable(t.ExemplarQueryable, bypassForSingleQuerier, t.Cfg.TenantFederation.MaxConcurrent, util_log.Logger) + t.MetadataSupplier = tenantfederation.NewMetadataSupplier(t.MetadataSupplier, t.Cfg.TenantFederation.MaxConcurrent, util_log.Logger) } return nil, nil } @@ -735,7 +735,7 @@ func (t *Mimir) initRuler() (serv services.Service, err error) { // This makes this label more consistent and hopefully less confusing to users. const bypassForSingleQuerier = false - federatedQueryable = tenantfederation.NewQueryable(queryable, bypassForSingleQuerier, util_log.Logger) + federatedQueryable = tenantfederation.NewQueryable(queryable, bypassForSingleQuerier, t.Cfg.TenantFederation.MaxConcurrent, util_log.Logger) regularQueryFunc := rules.EngineQueryFunc(eng, queryable) federatedQueryFunc := rules.EngineQueryFunc(eng, federatedQueryable) diff --git a/pkg/querier/tenantfederation/merge_exemplar_queryable.go b/pkg/querier/tenantfederation/merge_exemplar_queryable.go index 067cc09e3a1..276da4006e9 100644 --- a/pkg/querier/tenantfederation/merge_exemplar_queryable.go +++ b/pkg/querier/tenantfederation/merge_exemplar_queryable.go @@ -27,8 +27,8 @@ import ( // By setting bypassWithSingleQuerier to true, tenant federation logic gets // bypassed if the request is only for a single tenant. The requests will also // not contain the pseudo series label __tenant_id__ in this case. -func NewExemplarQueryable(upstream storage.ExemplarQueryable, bypassWithSingleQuerier bool, logger log.Logger) storage.ExemplarQueryable { - return NewMergeExemplarQueryable(defaultTenantLabel, upstream, bypassWithSingleQuerier, logger) +func NewExemplarQueryable(upstream storage.ExemplarQueryable, bypassWithSingleQuerier bool, maxConcurrency int, logger log.Logger) storage.ExemplarQueryable { + return NewMergeExemplarQueryable(defaultTenantLabel, upstream, bypassWithSingleQuerier, maxConcurrency, logger) } // NewMergeExemplarQueryable returns an exemplar queryable that makes requests for @@ -41,13 +41,14 @@ func NewExemplarQueryable(upstream storage.ExemplarQueryable, bypassWithSingleQu // By setting bypassWithSingleQuerier to true, tenant federation logic gets // bypassed if the request is only for a single tenant. The requests will also // not contain the pseudo series label `idLabelName` in this case. -func NewMergeExemplarQueryable(idLabelName string, upstream storage.ExemplarQueryable, bypassWithSingleQuerier bool, logger log.Logger) storage.ExemplarQueryable { +func NewMergeExemplarQueryable(idLabelName string, upstream storage.ExemplarQueryable, bypassWithSingleQuerier bool, maxConcurrency int, logger log.Logger) storage.ExemplarQueryable { return &mergeExemplarQueryable{ logger: logger, idLabelName: idLabelName, bypassWithSingleQuerier: bypassWithSingleQuerier, upstream: upstream, resolver: tenant.NewMultiResolver(), + maxConcurrency: maxConcurrency, } } @@ -57,6 +58,7 @@ type mergeExemplarQueryable struct { bypassWithSingleQuerier bool upstream storage.ExemplarQueryable resolver tenant.Resolver + maxConcurrency int } // tenantsAndQueriers returns a list of tenant IDs and corresponding queriers based on the context @@ -96,11 +98,12 @@ func (m *mergeExemplarQueryable) ExemplarQuerier(ctx context.Context) (storage.E } return &mergeExemplarQuerier{ - logger: m.logger, - ctx: ctx, - idLabelName: m.idLabelName, - tenants: ids, - queriers: queriers, + logger: m.logger, + ctx: ctx, + idLabelName: m.idLabelName, + tenants: ids, + queriers: queriers, + maxConcurrency: m.maxConcurrency, }, nil } @@ -111,11 +114,12 @@ type exemplarJob struct { } type mergeExemplarQuerier struct { - logger log.Logger - ctx context.Context - idLabelName string - tenants []string - queriers []storage.ExemplarQuerier + logger log.Logger + ctx context.Context + idLabelName string + tenants []string + queriers []storage.ExemplarQuerier + maxConcurrency int } // Select returns the union exemplars within the time range that match each slice of @@ -186,7 +190,7 @@ func (m *mergeExemplarQuerier) Select(start, end int64, matchers ...[]*labels.Ma return nil } - err := concurrency.ForEachJob(ctx, len(jobs), maxConcurrency, run) + err := concurrency.ForEachJob(ctx, len(jobs), m.maxConcurrency, run) if err != nil { return nil, err } diff --git a/pkg/querier/tenantfederation/merge_exemplar_queryable_test.go b/pkg/querier/tenantfederation/merge_exemplar_queryable_test.go index 0f6df098d72..536a74436da 100644 --- a/pkg/querier/tenantfederation/merge_exemplar_queryable_test.go +++ b/pkg/querier/tenantfederation/merge_exemplar_queryable_test.go @@ -95,7 +95,7 @@ func (m *mockExemplarQuerier) matches(res exemplar.QueryResult, matchers []*labe func TestMergeExemplarQueryable_ExemplarQuerier(t *testing.T) { t.Run("error getting tenant IDs", func(t *testing.T) { upstream := &mockExemplarQueryable{} - federated := NewExemplarQueryable(upstream, false, test.NewTestingLogger(t)) + federated := NewExemplarQueryable(upstream, false, defaultConcurrency, test.NewTestingLogger(t)) q, err := federated.ExemplarQuerier(context.Background()) assert.ErrorIs(t, err, user.ErrNoOrgID) @@ -105,7 +105,7 @@ func TestMergeExemplarQueryable_ExemplarQuerier(t *testing.T) { t.Run("error getting upstream querier", func(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "123") upstream := &mockExemplarQueryable{err: errors.New("unable to get querier")} - federated := NewExemplarQueryable(upstream, false, test.NewTestingLogger(t)) + federated := NewExemplarQueryable(upstream, false, defaultConcurrency, test.NewTestingLogger(t)) q, err := federated.ExemplarQuerier(ctx) assert.Error(t, err) @@ -116,7 +116,7 @@ func TestMergeExemplarQueryable_ExemplarQuerier(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "123") querier := &mockExemplarQuerier{} upstream := &mockExemplarQueryable{queriers: map[string]storage.ExemplarQuerier{"123": querier}} - federated := NewExemplarQueryable(upstream, true, test.NewTestingLogger(t)) + federated := NewExemplarQueryable(upstream, true, defaultConcurrency, test.NewTestingLogger(t)) q, err := federated.ExemplarQuerier(ctx) assert.NoError(t, err) @@ -127,7 +127,7 @@ func TestMergeExemplarQueryable_ExemplarQuerier(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "123") querier := &mockExemplarQuerier{} upstream := &mockExemplarQueryable{queriers: map[string]storage.ExemplarQuerier{"123": querier}} - federated := NewExemplarQueryable(upstream, false, test.NewTestingLogger(t)) + federated := NewExemplarQueryable(upstream, false, defaultConcurrency, test.NewTestingLogger(t)) q, err := federated.ExemplarQuerier(ctx) require.NoError(t, err) @@ -146,7 +146,7 @@ func TestMergeExemplarQueryable_ExemplarQuerier(t *testing.T) { "123": querier1, "456": querier2, }} - federated := NewExemplarQueryable(upstream, false, test.NewTestingLogger(t)) + federated := NewExemplarQueryable(upstream, false, defaultConcurrency, test.NewTestingLogger(t)) q, err := federated.ExemplarQuerier(ctx) require.NoError(t, err) @@ -208,7 +208,7 @@ func TestMergeExemplarQuerier_Select(t *testing.T) { "456": &mockExemplarQuerier{res: res2}, }} - federated := NewExemplarQueryable(upstream, false, test.NewTestingLogger(t)) + federated := NewExemplarQueryable(upstream, false, defaultConcurrency, test.NewTestingLogger(t)) q, err := federated.ExemplarQuerier(user.InjectOrgID(context.Background(), "123|456")) require.NoError(t, err) @@ -233,7 +233,7 @@ func TestMergeExemplarQuerier_Select(t *testing.T) { "456": &mockExemplarQuerier{res: res2}, }} - federated := NewExemplarQueryable(upstream, false, test.NewTestingLogger(t)) + federated := NewExemplarQueryable(upstream, false, defaultConcurrency, test.NewTestingLogger(t)) q, err := federated.ExemplarQuerier(user.InjectOrgID(context.Background(), "123|456")) require.NoError(t, err) @@ -263,7 +263,7 @@ func TestMergeExemplarQuerier_Select(t *testing.T) { "456": &mockExemplarQuerier{res: res2}, }} - federated := NewExemplarQueryable(upstream, false, test.NewTestingLogger(t)) + federated := NewExemplarQueryable(upstream, false, defaultConcurrency, test.NewTestingLogger(t)) q, err := federated.ExemplarQuerier(user.InjectOrgID(context.Background(), "123|456")) require.NoError(t, err) @@ -284,7 +284,7 @@ func TestMergeExemplarQuerier_Select(t *testing.T) { "456": &mockExemplarQuerier{res: res2}, }} - federated := NewExemplarQueryable(upstream, false, test.NewTestingLogger(t)) + federated := NewExemplarQueryable(upstream, false, defaultConcurrency, test.NewTestingLogger(t)) q, err := federated.ExemplarQuerier(user.InjectOrgID(context.Background(), "123|456")) require.NoError(t, err) @@ -306,7 +306,7 @@ func TestMergeExemplarQuerier_Select(t *testing.T) { "456": &mockExemplarQuerier{err: errors.New("timeout running exemplar query")}, }} - federated := NewExemplarQueryable(upstream, false, test.NewTestingLogger(t)) + federated := NewExemplarQueryable(upstream, false, defaultConcurrency, test.NewTestingLogger(t)) q, err := federated.ExemplarQuerier(user.InjectOrgID(context.Background(), "123|456")) require.NoError(t, err) diff --git a/pkg/querier/tenantfederation/merge_metadata.go b/pkg/querier/tenantfederation/merge_metadata.go index e079d98a020..a132a51a97a 100644 --- a/pkg/querier/tenantfederation/merge_metadata.go +++ b/pkg/querier/tenantfederation/merge_metadata.go @@ -21,18 +21,20 @@ import ( // metadata for all tenant IDs that are part of the request and merges the results. // // No deduplication of metadata is done before being returned. -func NewMetadataSupplier(next querier.MetadataSupplier, logger log.Logger) querier.MetadataSupplier { +func NewMetadataSupplier(next querier.MetadataSupplier, maxConcurrency int, logger log.Logger) querier.MetadataSupplier { return &mergeMetadataSupplier{ - next: next, - logger: logger, - resolver: tenant.NewMultiResolver(), + next: next, + maxConcurrency: maxConcurrency, + resolver: tenant.NewMultiResolver(), + logger: logger, } } type mergeMetadataSupplier struct { - next querier.MetadataSupplier - resolver tenant.Resolver - logger log.Logger + next querier.MetadataSupplier + resolver tenant.Resolver + maxConcurrency int + logger log.Logger } func (m *mergeMetadataSupplier) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error) { @@ -62,7 +64,7 @@ func (m *mergeMetadataSupplier) MetricsMetadata(ctx context.Context) ([]scrape.M return nil } - err = concurrency.ForEachJob(ctx, len(tenantIDs), maxConcurrency, run) + err = concurrency.ForEachJob(ctx, len(tenantIDs), m.maxConcurrency, run) if err != nil { return nil, err } diff --git a/pkg/querier/tenantfederation/merge_metadata_test.go b/pkg/querier/tenantfederation/merge_metadata_test.go index 9cadf11708c..8c7644e6d63 100644 --- a/pkg/querier/tenantfederation/merge_metadata_test.go +++ b/pkg/querier/tenantfederation/merge_metadata_test.go @@ -48,7 +48,7 @@ func TestMergeMetadataSupplier_MetricsMetadata(t *testing.T) { t.Run("invalid tenant IDs", func(t *testing.T) { upstream := &mockMetadataSupplier{} - supplier := NewMetadataSupplier(upstream, test.NewTestingLogger(t)) + supplier := NewMetadataSupplier(upstream, defaultConcurrency, test.NewTestingLogger(t)) _, err := supplier.MetricsMetadata(context.Background()) assert.ErrorIs(t, err, user.ErrNoOrgID) @@ -61,7 +61,7 @@ func TestMergeMetadataSupplier_MetricsMetadata(t *testing.T) { }, } - supplier := NewMetadataSupplier(upstream, test.NewTestingLogger(t)) + supplier := NewMetadataSupplier(upstream, defaultConcurrency, test.NewTestingLogger(t)) res, err := supplier.MetricsMetadata(user.InjectOrgID(context.Background(), "team-a")) require.NoError(t, err) @@ -77,7 +77,7 @@ func TestMergeMetadataSupplier_MetricsMetadata(t *testing.T) { }, } - supplier := NewMetadataSupplier(upstream, test.NewTestingLogger(t)) + supplier := NewMetadataSupplier(upstream, defaultConcurrency, test.NewTestingLogger(t)) res, err := supplier.MetricsMetadata(user.InjectOrgID(context.Background(), "team-a|team-b")) require.NoError(t, err) @@ -94,7 +94,7 @@ func TestMergeMetadataSupplier_MetricsMetadata(t *testing.T) { }, } - supplier := NewMetadataSupplier(upstream, test.NewTestingLogger(t)) + supplier := NewMetadataSupplier(upstream, defaultConcurrency, test.NewTestingLogger(t)) res, err := supplier.MetricsMetadata(user.InjectOrgID(context.Background(), "team-a|team-b")) require.NoError(t, err) diff --git a/pkg/querier/tenantfederation/merge_queryable.go b/pkg/querier/tenantfederation/merge_queryable.go index 1a792c97925..b19c42619b0 100644 --- a/pkg/querier/tenantfederation/merge_queryable.go +++ b/pkg/querier/tenantfederation/merge_queryable.go @@ -36,8 +36,8 @@ import ( // If the label "__tenant_id__" is already existing, its value is overwritten // by the tenant ID and the previous value is exposed through a new label // prefixed with "original_". This behaviour is not implemented recursively. -func NewQueryable(upstream storage.Queryable, byPassWithSingleQuerier bool, logger log.Logger) storage.Queryable { - return NewMergeQueryable(defaultTenantLabel, tenantQuerierCallback(upstream), byPassWithSingleQuerier, logger) +func NewQueryable(upstream storage.Queryable, byPassWithSingleQuerier bool, maxConcurrency int, logger log.Logger) storage.Queryable { + return NewMergeQueryable(defaultTenantLabel, tenantQuerierCallback(upstream), byPassWithSingleQuerier, maxConcurrency, logger) } func tenantQuerierCallback(queryable storage.Queryable) MergeQuerierCallback { @@ -80,12 +80,13 @@ type MergeQuerierCallback func(ctx context.Context, mint int64, maxt int64) (ids // If the label `idLabelName` is already existing, its value is overwritten and // the previous value is exposed through a new label prefixed with "original_". // This behaviour is not implemented recursively. -func NewMergeQueryable(idLabelName string, callback MergeQuerierCallback, byPassWithSingleQuerier bool, logger log.Logger) storage.Queryable { +func NewMergeQueryable(idLabelName string, callback MergeQuerierCallback, byPassWithSingleQuerier bool, maxConcurrency int, logger log.Logger) storage.Queryable { return &mergeQueryable{ logger: logger, idLabelName: idLabelName, callback: callback, bypassWithSingleQuerier: byPassWithSingleQuerier, + maxConcurrency: maxConcurrency, } } @@ -94,6 +95,7 @@ type mergeQueryable struct { idLabelName string bypassWithSingleQuerier bool callback MergeQuerierCallback + maxConcurrency int } // Querier returns a new mergeQuerier, which aggregates results from multiple @@ -113,11 +115,12 @@ func (m *mergeQueryable) Querier(ctx context.Context, mint int64, maxt int64) (s } return &mergeQuerier{ - logger: m.logger, - ctx: ctx, - idLabelName: m.idLabelName, - queriers: queriers, - ids: ids, + logger: m.logger, + ctx: ctx, + idLabelName: m.idLabelName, + queriers: queriers, + ids: ids, + maxConcurrency: m.maxConcurrency, }, nil } @@ -128,11 +131,12 @@ func (m *mergeQueryable) Querier(ctx context.Context, mint int64, maxt int64) (s // the previous value is exposed through a new label prefixed with "original_". // This behaviour is not implemented recursively type mergeQuerier struct { - logger log.Logger - ctx context.Context - queriers []storage.Querier - idLabelName string - ids []string + logger log.Logger + ctx context.Context + queriers []storage.Querier + idLabelName string + ids []string + maxConcurrency int } // LabelValues returns all potential values for a label name. It is not safe @@ -246,7 +250,7 @@ func (m *mergeQuerier) mergeDistinctStringSliceWithTenants(f stringSliceFunc, te return nil } - err := concurrency.ForEachJob(m.ctx, len(jobs), maxConcurrency, run) + err := concurrency.ForEachJob(m.ctx, len(jobs), m.maxConcurrency, run) if err != nil { return nil, nil, err } @@ -324,7 +328,7 @@ func (m *mergeQuerier) Select(sortSeries bool, hints *storage.SelectHints, match return nil } - err := concurrency.ForEachJob(ctx, len(jobs), maxConcurrency, run) + err := concurrency.ForEachJob(ctx, len(jobs), m.maxConcurrency, run) if err != nil { return storage.ErrSeriesSet(err) } diff --git a/pkg/querier/tenantfederation/merge_queryable_test.go b/pkg/querier/tenantfederation/merge_queryable_test.go index c707b19e3fe..39c297ded44 100644 --- a/pkg/querier/tenantfederation/merge_queryable_test.go +++ b/pkg/querier/tenantfederation/merge_queryable_test.go @@ -266,7 +266,7 @@ type mergeQueryableScenario struct { func (s *mergeQueryableScenario) init() (storage.Querier, error) { // initialize with default tenant label - q := NewQueryable(&s.queryable, !s.doNotByPassSingleQuerier, log.NewNopLogger()) + q := NewQueryable(&s.queryable, !s.doNotByPassSingleQuerier, defaultConcurrency, log.NewNopLogger()) // inject tenants into context ctx := context.Background() @@ -345,7 +345,7 @@ type labelValuesScenario struct { func TestMergeQueryable_Querier(t *testing.T) { t.Run("querying without a tenant specified should error", func(t *testing.T) { queryable := &mockTenantQueryableWithFilter{logger: log.NewNopLogger()} - q := NewQueryable(queryable, false /* bypassWithSingleQuerier */, log.NewNopLogger()) + q := NewQueryable(queryable, false /* bypassWithSingleQuerier */, defaultConcurrency, log.NewNopLogger()) // Create a context with no tenant specified. ctx := context.Background() @@ -899,7 +899,7 @@ func TestTracingMergeQueryable(t *testing.T) { // set a multi tenant resolver tenant.WithDefaultResolver(tenant.NewMultiResolver()) filter := mockTenantQueryableWithFilter{} - q := NewQueryable(&filter, false, log.NewNopLogger()) + q := NewQueryable(&filter, false, defaultConcurrency, log.NewNopLogger()) // retrieve querier if set querier, err := q.Querier(ctx, mint, maxt) require.NoError(t, err) diff --git a/pkg/querier/tenantfederation/tenant_federation.go b/pkg/querier/tenantfederation/tenant_federation.go index e5d3a65e6e9..e25106c5d20 100644 --- a/pkg/querier/tenantfederation/tenant_federation.go +++ b/pkg/querier/tenantfederation/tenant_federation.go @@ -14,16 +14,18 @@ import ( const ( defaultTenantLabel = "__tenant_id__" retainExistingPrefix = "original_" - maxConcurrency = 16 + defaultConcurrency = 16 ) type Config struct { // Enabled switches on support for multi tenant query federation - Enabled bool `yaml:"enabled"` + Enabled bool `yaml:"enabled"` + MaxConcurrent int `yaml:"max_concurrent" category:"experimental"` } func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.Enabled, "tenant-federation.enabled", false, "If enabled on all services, queries can be federated across multiple tenants. The tenant IDs involved need to be specified separated by a '|' character in the 'X-Scope-OrgID' header.") + f.IntVar(&cfg.MaxConcurrent, "tenant-federation.max-concurrent", defaultConcurrency, "The number of workers used for each tenant federated query. This setting limits the maximum number of per-tenant queries executed at a time for a tenant federated query.") } // filterValuesByMatchers applies matchers to inputed `idLabelName` and diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 43f1922e43f..b051dd85250 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -188,7 +188,6 @@ type Limits struct { func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.IngestionTenantShardSize, "distributor.ingestion-tenant-shard-size", 0, "The tenant's shard size used by shuffle-sharding. This value is the total size of the shard (ie. it is not the number of ingesters in the shard per zone, but the number of ingesters in the shard across all zones, if zone-awareness is enabled). Must be set both on ingesters and distributors. 0 disables shuffle sharding.") f.Float64Var(&l.RequestRate, requestRateFlag, 0, "Per-tenant push request rate limit in requests per second. 0 to disable.") - f.BoolVar(&l.ServiceOverloadStatusCodeOnRateLimitEnabled, "distributor.service-overload-status-code-on-rate-limit-enabled", false, "If enabled, rate limit errors will be reported to the client with HTTP status code 529 (Service is overloaded). If disabled, status code 429 (Too Many Requests) is used.") f.IntVar(&l.RequestBurstSize, requestBurstSizeFlag, 0, "Per-tenant allowed push request burst size. 0 to disable.") f.Float64Var(&l.IngestionRate, ingestionRateFlag, 10000, "Per-tenant ingestion rate limit in samples per second.") f.IntVar(&l.IngestionBurstSize, ingestionBurstSizeFlag, 200000, "Per-tenant allowed ingestion burst size (in number of samples).") @@ -205,6 +204,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { _ = l.CreationGracePeriod.Set("10m") f.Var(&l.CreationGracePeriod, creationGracePeriodFlag, "Controls how far into the future incoming samples and exemplars are accepted compared to the wall clock. Any sample or exemplar will be rejected if its timestamp is greater than '(now + grace_period)'. This configuration is enforced in the distributor, ingester and query-frontend (to avoid querying too far into the future).") f.BoolVar(&l.EnforceMetadataMetricName, "validation.enforce-metadata-metric-name", true, "Enforce every metadata has a metric name.") + f.BoolVar(&l.ServiceOverloadStatusCodeOnRateLimitEnabled, "distributor.service-overload-status-code-on-rate-limit-enabled", false, "If enabled, rate limit errors will be reported to the client with HTTP status code 529 (Service is overloaded). If disabled, status code 429 (Too Many Requests) is used.") f.IntVar(&l.MaxGlobalSeriesPerUser, MaxSeriesPerUserFlag, 150000, "The maximum number of in-memory series per tenant, across the cluster before replication. 0 to disable.") f.IntVar(&l.MaxGlobalSeriesPerMetric, MaxSeriesPerMetricFlag, 0, "The maximum number of in-memory series per metric name, across the cluster before replication. 0 to disable.")