Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow configuration of tenant federation concurrency #5874

Merged
merged 2 commits into from
Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
* [FEATURE] Ingester: add new metrics for tracking native histograms in active series: `cortex_ingester_active_native_histogram_series`, `cortex_ingester_active_native_histogram_series_custom_tracker`, `cortex_ingester_active_native_histogram_buckets`, `cortex_ingester_active_native_histogram_buckets_custom_tracker`. The first 2 are the subsets of the existing and unmodified `cortex_ingester_active_series` and `cortex_ingester_active_series_custom_tracker` respectively, only tracking native histogram series, and the last 2 are the equivalents for tracking the number of buckets in native histogram series. #5318
* [FEATURE] Add experimental CLI flag `-<prefix>.s3.native-aws-auth-enabled` that allows to enable the default credentials provider chain of the AWS SDK. #5636
* [FEATURE] Distributor: add experimental support for circuit breaking when writing to ingesters via `-ingester.client.circuit-breaker.enabled`, `-ingester.client.circuit-breaker.failure-threshold`, or `-ingester.client.circuit-breaker.cooldown-period` or their corresponding YAML. #5650
* [FEATURE] Querier: add experimental CLI flag `-tenant-federation.max-concurrent` to adjust the max number of per-tenant queries that can be run at a time when executing a single multi-tenant query. #5874
* [ENHANCEMENT] Overrides-exporter: Add new metrics for write path and alertmanager (`max_global_metadata_per_user`, `max_global_metadata_per_metric`, `request_rate`, `request_burst_size`, `alertmanager_notification_rate_limit`, `alertmanager_max_dispatcher_aggregation_groups`, `alertmanager_max_alerts_count`, `alertmanager_max_alerts_size_bytes`) and added flag `-overrides-exporter.enabled-metrics` to explicitly configure desired metrics, e.g. `-overrides-exporter.enabled-metrics=request_rate,ingestion_rate`. Default value for this flag is: `ingestion_rate,ingestion_burst_size,max_global_series_per_user,max_global_series_per_metric,max_global_exemplars_per_user,max_fetched_chunks_per_query,max_fetched_series_per_query,ruler_max_rules_per_rule_group,ruler_max_rule_groups_per_tenant`. #5376
* [ENHANCEMENT] Cardinality API: When zone aware replication is enabled, the label values cardinality API can now tolerate single zone failure #5178
* [ENHANCEMENT] Distributor: optimize sending requests to ingesters when incoming requests don't need to be modified. For now this feature can be disabled by setting `-timeseries-unmarshal-caching-optimization-enabled=false`. #5137
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -9284,6 +9284,17 @@
"fieldDefaultValue": false,
"fieldFlag": "tenant-federation.enabled",
"fieldType": "boolean"
},
{
"kind": "field",
"name": "max_concurrent",
"required": false,
"desc": "The number of workers used for each tenant federated query. This setting limits the maximum number of per-tenant queries executed at a time for a tenant federated query.",
"fieldValue": null,
"fieldDefaultValue": 16,
"fieldFlag": "tenant-federation.max-concurrent",
"fieldType": "int",
"fieldCategory": "experimental"
}
],
"fieldValue": null,
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -2595,6 +2595,8 @@ Usage of ./cmd/mimir/mimir:
Comma-separated list of components to include in the instantiated process. The default value 'all' includes all components that are required to form a functional Grafana Mimir instance in single-binary mode. Use the '-modules' command line flag to get a list of available components, and to see which components are included with 'all'. (default all)
-tenant-federation.enabled
If enabled on all services, queries can be federated across multiple tenants. The tenant IDs involved need to be specified separated by a '|' character in the 'X-Scope-OrgID' header.
-tenant-federation.max-concurrent int
[experimental] The number of workers used for each tenant federated query. This setting limits the maximum number of per-tenant queries executed at a time for a tenant federated query. (default 16)
-timeseries-unmarshal-caching-optimization-enabled
[experimental] Enables optimized marshaling of timeseries. (default true)
-usage-stats.enabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,12 @@ tenant_federation:
# CLI flag: -tenant-federation.enabled
[enabled: <boolean> | default = false]

# (experimental) The number of workers used for each tenant federated query.
# This setting limits the maximum number of per-tenant queries executed at a
# time for a tenant federated query.
# CLI flag: -tenant-federation.max-concurrent
[max_concurrent: <int> | default = 16]

activity_tracker:
# File where ongoing activities are stored. If empty, activity tracking is
# disabled.
Expand Down
8 changes: 4 additions & 4 deletions pkg/mimir/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,9 +409,9 @@ func (t *Mimir) initTenantFederation() (serv services.Service, err error) {
// single tenant. This allows for a less impactful enabling of tenant
// federation.
const bypassForSingleQuerier = true
t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, bypassForSingleQuerier, util_log.Logger))
t.ExemplarQueryable = tenantfederation.NewExemplarQueryable(t.ExemplarQueryable, bypassForSingleQuerier, util_log.Logger)
t.MetadataSupplier = tenantfederation.NewMetadataSupplier(t.MetadataSupplier, util_log.Logger)
t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, bypassForSingleQuerier, t.Cfg.TenantFederation.MaxConcurrent, util_log.Logger))
t.ExemplarQueryable = tenantfederation.NewExemplarQueryable(t.ExemplarQueryable, bypassForSingleQuerier, t.Cfg.TenantFederation.MaxConcurrent, util_log.Logger)
t.MetadataSupplier = tenantfederation.NewMetadataSupplier(t.MetadataSupplier, t.Cfg.TenantFederation.MaxConcurrent, util_log.Logger)
}
return nil, nil
}
Expand Down Expand Up @@ -735,7 +735,7 @@ func (t *Mimir) initRuler() (serv services.Service, err error) {
// This makes this label more consistent and hopefully less confusing to users.
const bypassForSingleQuerier = false

federatedQueryable = tenantfederation.NewQueryable(queryable, bypassForSingleQuerier, util_log.Logger)
federatedQueryable = tenantfederation.NewQueryable(queryable, bypassForSingleQuerier, t.Cfg.TenantFederation.MaxConcurrent, util_log.Logger)

regularQueryFunc := rules.EngineQueryFunc(eng, queryable)
federatedQueryFunc := rules.EngineQueryFunc(eng, federatedQueryable)
Expand Down
32 changes: 18 additions & 14 deletions pkg/querier/tenantfederation/merge_exemplar_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
// By setting bypassWithSingleQuerier to true, tenant federation logic gets
// bypassed if the request is only for a single tenant. The requests will also
// not contain the pseudo series label __tenant_id__ in this case.
func NewExemplarQueryable(upstream storage.ExemplarQueryable, bypassWithSingleQuerier bool, logger log.Logger) storage.ExemplarQueryable {
return NewMergeExemplarQueryable(defaultTenantLabel, upstream, bypassWithSingleQuerier, logger)
func NewExemplarQueryable(upstream storage.ExemplarQueryable, bypassWithSingleQuerier bool, maxConcurrency int, logger log.Logger) storage.ExemplarQueryable {
return NewMergeExemplarQueryable(defaultTenantLabel, upstream, bypassWithSingleQuerier, maxConcurrency, logger)
}

// NewMergeExemplarQueryable returns an exemplar queryable that makes requests for
Expand All @@ -41,13 +41,14 @@ func NewExemplarQueryable(upstream storage.ExemplarQueryable, bypassWithSingleQu
// By setting bypassWithSingleQuerier to true, tenant federation logic gets
// bypassed if the request is only for a single tenant. The requests will also
// not contain the pseudo series label `idLabelName` in this case.
func NewMergeExemplarQueryable(idLabelName string, upstream storage.ExemplarQueryable, bypassWithSingleQuerier bool, logger log.Logger) storage.ExemplarQueryable {
func NewMergeExemplarQueryable(idLabelName string, upstream storage.ExemplarQueryable, bypassWithSingleQuerier bool, maxConcurrency int, logger log.Logger) storage.ExemplarQueryable {
return &mergeExemplarQueryable{
logger: logger,
idLabelName: idLabelName,
bypassWithSingleQuerier: bypassWithSingleQuerier,
upstream: upstream,
resolver: tenant.NewMultiResolver(),
maxConcurrency: maxConcurrency,
}
}

Expand All @@ -57,6 +58,7 @@ type mergeExemplarQueryable struct {
bypassWithSingleQuerier bool
upstream storage.ExemplarQueryable
resolver tenant.Resolver
maxConcurrency int
}

// tenantsAndQueriers returns a list of tenant IDs and corresponding queriers based on the context
Expand Down Expand Up @@ -96,11 +98,12 @@ func (m *mergeExemplarQueryable) ExemplarQuerier(ctx context.Context) (storage.E
}

return &mergeExemplarQuerier{
logger: m.logger,
ctx: ctx,
idLabelName: m.idLabelName,
tenants: ids,
queriers: queriers,
logger: m.logger,
ctx: ctx,
idLabelName: m.idLabelName,
tenants: ids,
queriers: queriers,
maxConcurrency: m.maxConcurrency,
}, nil
}

Expand All @@ -111,11 +114,12 @@ type exemplarJob struct {
}

type mergeExemplarQuerier struct {
logger log.Logger
ctx context.Context
idLabelName string
tenants []string
queriers []storage.ExemplarQuerier
logger log.Logger
ctx context.Context
idLabelName string
tenants []string
queriers []storage.ExemplarQuerier
maxConcurrency int
}

// Select returns the union exemplars within the time range that match each slice of
Expand Down Expand Up @@ -186,7 +190,7 @@ func (m *mergeExemplarQuerier) Select(start, end int64, matchers ...[]*labels.Ma
return nil
}

err := concurrency.ForEachJob(ctx, len(jobs), maxConcurrency, run)
err := concurrency.ForEachJob(ctx, len(jobs), m.maxConcurrency, run)
if err != nil {
return nil, err
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/querier/tenantfederation/merge_exemplar_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (m *mockExemplarQuerier) matches(res exemplar.QueryResult, matchers []*labe
func TestMergeExemplarQueryable_ExemplarQuerier(t *testing.T) {
t.Run("error getting tenant IDs", func(t *testing.T) {
upstream := &mockExemplarQueryable{}
federated := NewExemplarQueryable(upstream, false, test.NewTestingLogger(t))
federated := NewExemplarQueryable(upstream, false, defaultConcurrency, test.NewTestingLogger(t))

q, err := federated.ExemplarQuerier(context.Background())
assert.ErrorIs(t, err, user.ErrNoOrgID)
Expand All @@ -105,7 +105,7 @@ func TestMergeExemplarQueryable_ExemplarQuerier(t *testing.T) {
t.Run("error getting upstream querier", func(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "123")
upstream := &mockExemplarQueryable{err: errors.New("unable to get querier")}
federated := NewExemplarQueryable(upstream, false, test.NewTestingLogger(t))
federated := NewExemplarQueryable(upstream, false, defaultConcurrency, test.NewTestingLogger(t))

q, err := federated.ExemplarQuerier(ctx)
assert.Error(t, err)
Expand All @@ -116,7 +116,7 @@ func TestMergeExemplarQueryable_ExemplarQuerier(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "123")
querier := &mockExemplarQuerier{}
upstream := &mockExemplarQueryable{queriers: map[string]storage.ExemplarQuerier{"123": querier}}
federated := NewExemplarQueryable(upstream, true, test.NewTestingLogger(t))
federated := NewExemplarQueryable(upstream, true, defaultConcurrency, test.NewTestingLogger(t))

q, err := federated.ExemplarQuerier(ctx)
assert.NoError(t, err)
Expand All @@ -127,7 +127,7 @@ func TestMergeExemplarQueryable_ExemplarQuerier(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "123")
querier := &mockExemplarQuerier{}
upstream := &mockExemplarQueryable{queriers: map[string]storage.ExemplarQuerier{"123": querier}}
federated := NewExemplarQueryable(upstream, false, test.NewTestingLogger(t))
federated := NewExemplarQueryable(upstream, false, defaultConcurrency, test.NewTestingLogger(t))

q, err := federated.ExemplarQuerier(ctx)
require.NoError(t, err)
Expand All @@ -146,7 +146,7 @@ func TestMergeExemplarQueryable_ExemplarQuerier(t *testing.T) {
"123": querier1,
"456": querier2,
}}
federated := NewExemplarQueryable(upstream, false, test.NewTestingLogger(t))
federated := NewExemplarQueryable(upstream, false, defaultConcurrency, test.NewTestingLogger(t))

q, err := federated.ExemplarQuerier(ctx)
require.NoError(t, err)
Expand Down Expand Up @@ -208,7 +208,7 @@ func TestMergeExemplarQuerier_Select(t *testing.T) {
"456": &mockExemplarQuerier{res: res2},
}}

federated := NewExemplarQueryable(upstream, false, test.NewTestingLogger(t))
federated := NewExemplarQueryable(upstream, false, defaultConcurrency, test.NewTestingLogger(t))
q, err := federated.ExemplarQuerier(user.InjectOrgID(context.Background(), "123|456"))
require.NoError(t, err)

Expand All @@ -233,7 +233,7 @@ func TestMergeExemplarQuerier_Select(t *testing.T) {
"456": &mockExemplarQuerier{res: res2},
}}

federated := NewExemplarQueryable(upstream, false, test.NewTestingLogger(t))
federated := NewExemplarQueryable(upstream, false, defaultConcurrency, test.NewTestingLogger(t))
q, err := federated.ExemplarQuerier(user.InjectOrgID(context.Background(), "123|456"))
require.NoError(t, err)

Expand Down Expand Up @@ -263,7 +263,7 @@ func TestMergeExemplarQuerier_Select(t *testing.T) {
"456": &mockExemplarQuerier{res: res2},
}}

federated := NewExemplarQueryable(upstream, false, test.NewTestingLogger(t))
federated := NewExemplarQueryable(upstream, false, defaultConcurrency, test.NewTestingLogger(t))
q, err := federated.ExemplarQuerier(user.InjectOrgID(context.Background(), "123|456"))
require.NoError(t, err)

Expand All @@ -284,7 +284,7 @@ func TestMergeExemplarQuerier_Select(t *testing.T) {
"456": &mockExemplarQuerier{res: res2},
}}

federated := NewExemplarQueryable(upstream, false, test.NewTestingLogger(t))
federated := NewExemplarQueryable(upstream, false, defaultConcurrency, test.NewTestingLogger(t))
q, err := federated.ExemplarQuerier(user.InjectOrgID(context.Background(), "123|456"))
require.NoError(t, err)

Expand All @@ -306,7 +306,7 @@ func TestMergeExemplarQuerier_Select(t *testing.T) {
"456": &mockExemplarQuerier{err: errors.New("timeout running exemplar query")},
}}

federated := NewExemplarQueryable(upstream, false, test.NewTestingLogger(t))
federated := NewExemplarQueryable(upstream, false, defaultConcurrency, test.NewTestingLogger(t))
q, err := federated.ExemplarQuerier(user.InjectOrgID(context.Background(), "123|456"))
require.NoError(t, err)

Expand Down
18 changes: 10 additions & 8 deletions pkg/querier/tenantfederation/merge_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,20 @@ import (
// metadata for all tenant IDs that are part of the request and merges the results.
//
// No deduplication of metadata is done before being returned.
func NewMetadataSupplier(next querier.MetadataSupplier, logger log.Logger) querier.MetadataSupplier {
func NewMetadataSupplier(next querier.MetadataSupplier, maxConcurrency int, logger log.Logger) querier.MetadataSupplier {
return &mergeMetadataSupplier{
next: next,
logger: logger,
resolver: tenant.NewMultiResolver(),
next: next,
maxConcurrency: maxConcurrency,
resolver: tenant.NewMultiResolver(),
logger: logger,
}
}

type mergeMetadataSupplier struct {
next querier.MetadataSupplier
resolver tenant.Resolver
logger log.Logger
next querier.MetadataSupplier
resolver tenant.Resolver
maxConcurrency int
logger log.Logger
}

func (m *mergeMetadataSupplier) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error) {
Expand Down Expand Up @@ -62,7 +64,7 @@ func (m *mergeMetadataSupplier) MetricsMetadata(ctx context.Context) ([]scrape.M
return nil
}

err = concurrency.ForEachJob(ctx, len(tenantIDs), maxConcurrency, run)
err = concurrency.ForEachJob(ctx, len(tenantIDs), m.maxConcurrency, run)
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/querier/tenantfederation/merge_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestMergeMetadataSupplier_MetricsMetadata(t *testing.T) {

t.Run("invalid tenant IDs", func(t *testing.T) {
upstream := &mockMetadataSupplier{}
supplier := NewMetadataSupplier(upstream, test.NewTestingLogger(t))
supplier := NewMetadataSupplier(upstream, defaultConcurrency, test.NewTestingLogger(t))
_, err := supplier.MetricsMetadata(context.Background())

assert.ErrorIs(t, err, user.ErrNoOrgID)
Expand All @@ -61,7 +61,7 @@ func TestMergeMetadataSupplier_MetricsMetadata(t *testing.T) {
},
}

supplier := NewMetadataSupplier(upstream, test.NewTestingLogger(t))
supplier := NewMetadataSupplier(upstream, defaultConcurrency, test.NewTestingLogger(t))
res, err := supplier.MetricsMetadata(user.InjectOrgID(context.Background(), "team-a"))

require.NoError(t, err)
Expand All @@ -77,7 +77,7 @@ func TestMergeMetadataSupplier_MetricsMetadata(t *testing.T) {
},
}

supplier := NewMetadataSupplier(upstream, test.NewTestingLogger(t))
supplier := NewMetadataSupplier(upstream, defaultConcurrency, test.NewTestingLogger(t))
res, err := supplier.MetricsMetadata(user.InjectOrgID(context.Background(), "team-a|team-b"))

require.NoError(t, err)
Expand All @@ -94,7 +94,7 @@ func TestMergeMetadataSupplier_MetricsMetadata(t *testing.T) {
},
}

supplier := NewMetadataSupplier(upstream, test.NewTestingLogger(t))
supplier := NewMetadataSupplier(upstream, defaultConcurrency, test.NewTestingLogger(t))
res, err := supplier.MetricsMetadata(user.InjectOrgID(context.Background(), "team-a|team-b"))

require.NoError(t, err)
Expand Down
Loading
Loading