Skip to content

Commit

Permalink
Allow configuration of tenant federation concurrency
Browse files Browse the repository at this point in the history
Allow the max number of per-tenant queries run concurrently to be
configured when executing a multi-tenant query. This can help mitigate
performance issues when running queries across more than 16 tenants at
once.

Fixes #2715

Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com>
  • Loading branch information
56quarters committed Aug 30, 2023
1 parent bdd6e14 commit 81adb3a
Show file tree
Hide file tree
Showing 13 changed files with 93 additions and 61 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
* [FEATURE] Ingester: add new metrics for tracking native histograms in active series: `cortex_ingester_active_native_histogram_series`, `cortex_ingester_active_native_histogram_series_custom_tracker`, `cortex_ingester_active_native_histogram_buckets`, `cortex_ingester_active_native_histogram_buckets_custom_tracker`. The first 2 are the subsets of the existing and unmodified `cortex_ingester_active_series` and `cortex_ingester_active_series_custom_tracker` respectively, only tracking native histogram series, and the last 2 are the equivalents for tracking the number of buckets in native histogram series. #5318
* [FEATURE] Add experimental CLI flag `-<prefix>.s3.native-aws-auth-enabled` that allows to enable the default credentials provider chain of the AWS SDK. #5636
* [FEATURE] Distributor: add experimental support for circuit breaking when writing to ingesters via `-ingester.client.circuit-breaker.enabled`, `-ingester.client.circuit-breaker.failure-threshold`, or `-ingester.client.circuit-breaker.cooldown-period` or their corresponding YAML. #5650
* [FEATURE] Querier: add experimental CLI flag `-tenant-federation.max-concurrent` to adjust the max number of per-tenant queries that can be run at a time when executing a single multi-tenant query. #5874
* [ENHANCEMENT] Overrides-exporter: Add new metrics for write path and alertmanager (`max_global_metadata_per_user`, `max_global_metadata_per_metric`, `request_rate`, `request_burst_size`, `alertmanager_notification_rate_limit`, `alertmanager_max_dispatcher_aggregation_groups`, `alertmanager_max_alerts_count`, `alertmanager_max_alerts_size_bytes`) and added flag `-overrides-exporter.enabled-metrics` to explicitly configure desired metrics, e.g. `-overrides-exporter.enabled-metrics=request_rate,ingestion_rate`. Default value for this flag is: `ingestion_rate,ingestion_burst_size,max_global_series_per_user,max_global_series_per_metric,max_global_exemplars_per_user,max_fetched_chunks_per_query,max_fetched_series_per_query,ruler_max_rules_per_rule_group,ruler_max_rule_groups_per_tenant`. #5376
* [ENHANCEMENT] Cardinality API: When zone aware replication is enabled, the label values cardinality API can now tolerate single zone failure #5178
* [ENHANCEMENT] Distributor: optimize sending requests to ingesters when incoming requests don't need to be modified. For now this feature can be disabled by setting `-timeseries-unmarshal-caching-optimization-enabled=false`. #5137
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -9284,6 +9284,17 @@
"fieldDefaultValue": false,
"fieldFlag": "tenant-federation.enabled",
"fieldType": "boolean"
},
{
"kind": "field",
"name": "max_concurrent",
"required": false,
"desc": "The number of workers used for each tenant federated query. This setting limits the maximum number of per-tenant queries executed at a time for a tenant federated query.",
"fieldValue": null,
"fieldDefaultValue": 16,
"fieldFlag": "tenant-federation.max-concurrent",
"fieldType": "int",
"fieldCategory": "experimental"
}
],
"fieldValue": null,
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -2595,6 +2595,8 @@ Usage of ./cmd/mimir/mimir:
Comma-separated list of components to include in the instantiated process. The default value 'all' includes all components that are required to form a functional Grafana Mimir instance in single-binary mode. Use the '-modules' command line flag to get a list of available components, and to see which components are included with 'all'. (default all)
-tenant-federation.enabled
If enabled on all services, queries can be federated across multiple tenants. The tenant IDs involved need to be specified separated by a '|' character in the 'X-Scope-OrgID' header.
-tenant-federation.max-concurrent int
[experimental] The number of workers used for each tenant federated query. This setting limits the maximum number of per-tenant queries executed at a time for a tenant federated query. (default 16)
-timeseries-unmarshal-caching-optimization-enabled
[experimental] Enables optimized marshaling of timeseries. (default true)
-usage-stats.enabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,12 @@ tenant_federation:
# CLI flag: -tenant-federation.enabled
[enabled: <boolean> | default = false]

# (experimental) The number of workers used for each tenant federated query.
# This setting limits the maximum number of per-tenant queries executed at a
# time for a tenant federated query.
# CLI flag: -tenant-federation.max-concurrent
[max_concurrent: <int> | default = 16]

activity_tracker:
# File where ongoing activities are stored. If empty, activity tracking is
# disabled.
Expand Down
8 changes: 4 additions & 4 deletions pkg/mimir/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,9 +409,9 @@ func (t *Mimir) initTenantFederation() (serv services.Service, err error) {
// single tenant. This allows for a less impactful enabling of tenant
// federation.
const bypassForSingleQuerier = true
t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, bypassForSingleQuerier, util_log.Logger))
t.ExemplarQueryable = tenantfederation.NewExemplarQueryable(t.ExemplarQueryable, bypassForSingleQuerier, util_log.Logger)
t.MetadataSupplier = tenantfederation.NewMetadataSupplier(t.MetadataSupplier, util_log.Logger)
t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, bypassForSingleQuerier, t.Cfg.TenantFederation.MaxConcurrent, util_log.Logger))
t.ExemplarQueryable = tenantfederation.NewExemplarQueryable(t.ExemplarQueryable, bypassForSingleQuerier, t.Cfg.TenantFederation.MaxConcurrent, util_log.Logger)
t.MetadataSupplier = tenantfederation.NewMetadataSupplier(t.MetadataSupplier, t.Cfg.TenantFederation.MaxConcurrent, util_log.Logger)
}
return nil, nil
}
Expand Down Expand Up @@ -735,7 +735,7 @@ func (t *Mimir) initRuler() (serv services.Service, err error) {
// This makes this label more consistent and hopefully less confusing to users.
const bypassForSingleQuerier = false

federatedQueryable = tenantfederation.NewQueryable(queryable, bypassForSingleQuerier, util_log.Logger)
federatedQueryable = tenantfederation.NewQueryable(queryable, bypassForSingleQuerier, t.Cfg.TenantFederation.MaxConcurrent, util_log.Logger)

regularQueryFunc := rules.EngineQueryFunc(eng, queryable)
federatedQueryFunc := rules.EngineQueryFunc(eng, federatedQueryable)
Expand Down
32 changes: 18 additions & 14 deletions pkg/querier/tenantfederation/merge_exemplar_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
// By setting bypassWithSingleQuerier to true, tenant federation logic gets
// bypassed if the request is only for a single tenant. The requests will also
// not contain the pseudo series label __tenant_id__ in this case.
func NewExemplarQueryable(upstream storage.ExemplarQueryable, bypassWithSingleQuerier bool, logger log.Logger) storage.ExemplarQueryable {
return NewMergeExemplarQueryable(defaultTenantLabel, upstream, bypassWithSingleQuerier, logger)
func NewExemplarQueryable(upstream storage.ExemplarQueryable, bypassWithSingleQuerier bool, maxConcurrency int, logger log.Logger) storage.ExemplarQueryable {
return NewMergeExemplarQueryable(defaultTenantLabel, upstream, bypassWithSingleQuerier, maxConcurrency, logger)
}

// NewMergeExemplarQueryable returns an exemplar queryable that makes requests for
Expand All @@ -41,13 +41,14 @@ func NewExemplarQueryable(upstream storage.ExemplarQueryable, bypassWithSingleQu
// By setting bypassWithSingleQuerier to true, tenant federation logic gets
// bypassed if the request is only for a single tenant. The requests will also
// not contain the pseudo series label `idLabelName` in this case.
func NewMergeExemplarQueryable(idLabelName string, upstream storage.ExemplarQueryable, bypassWithSingleQuerier bool, logger log.Logger) storage.ExemplarQueryable {
func NewMergeExemplarQueryable(idLabelName string, upstream storage.ExemplarQueryable, bypassWithSingleQuerier bool, maxConcurrency int, logger log.Logger) storage.ExemplarQueryable {
return &mergeExemplarQueryable{
logger: logger,
idLabelName: idLabelName,
bypassWithSingleQuerier: bypassWithSingleQuerier,
upstream: upstream,
resolver: tenant.NewMultiResolver(),
maxConcurrency: maxConcurrency,
}
}

Expand All @@ -57,6 +58,7 @@ type mergeExemplarQueryable struct {
bypassWithSingleQuerier bool
upstream storage.ExemplarQueryable
resolver tenant.Resolver
maxConcurrency int
}

// tenantsAndQueriers returns a list of tenant IDs and corresponding queriers based on the context
Expand Down Expand Up @@ -96,11 +98,12 @@ func (m *mergeExemplarQueryable) ExemplarQuerier(ctx context.Context) (storage.E
}

return &mergeExemplarQuerier{
logger: m.logger,
ctx: ctx,
idLabelName: m.idLabelName,
tenants: ids,
queriers: queriers,
logger: m.logger,
ctx: ctx,
idLabelName: m.idLabelName,
tenants: ids,
queriers: queriers,
maxConcurrency: m.maxConcurrency,
}, nil
}

Expand All @@ -111,11 +114,12 @@ type exemplarJob struct {
}

type mergeExemplarQuerier struct {
logger log.Logger
ctx context.Context
idLabelName string
tenants []string
queriers []storage.ExemplarQuerier
logger log.Logger
ctx context.Context
idLabelName string
tenants []string
queriers []storage.ExemplarQuerier
maxConcurrency int
}

// Select returns the union exemplars within the time range that match each slice of
Expand Down Expand Up @@ -186,7 +190,7 @@ func (m *mergeExemplarQuerier) Select(start, end int64, matchers ...[]*labels.Ma
return nil
}

err := concurrency.ForEachJob(ctx, len(jobs), maxConcurrency, run)
err := concurrency.ForEachJob(ctx, len(jobs), m.maxConcurrency, run)
if err != nil {
return nil, err
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/querier/tenantfederation/merge_exemplar_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (m *mockExemplarQuerier) matches(res exemplar.QueryResult, matchers []*labe
func TestMergeExemplarQueryable_ExemplarQuerier(t *testing.T) {
t.Run("error getting tenant IDs", func(t *testing.T) {
upstream := &mockExemplarQueryable{}
federated := NewExemplarQueryable(upstream, false, test.NewTestingLogger(t))
federated := NewExemplarQueryable(upstream, false, defaultConcurrency, test.NewTestingLogger(t))

q, err := federated.ExemplarQuerier(context.Background())
assert.ErrorIs(t, err, user.ErrNoOrgID)
Expand All @@ -105,7 +105,7 @@ func TestMergeExemplarQueryable_ExemplarQuerier(t *testing.T) {
t.Run("error getting upstream querier", func(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "123")
upstream := &mockExemplarQueryable{err: errors.New("unable to get querier")}
federated := NewExemplarQueryable(upstream, false, test.NewTestingLogger(t))
federated := NewExemplarQueryable(upstream, false, defaultConcurrency, test.NewTestingLogger(t))

q, err := federated.ExemplarQuerier(ctx)
assert.Error(t, err)
Expand All @@ -116,7 +116,7 @@ func TestMergeExemplarQueryable_ExemplarQuerier(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "123")
querier := &mockExemplarQuerier{}
upstream := &mockExemplarQueryable{queriers: map[string]storage.ExemplarQuerier{"123": querier}}
federated := NewExemplarQueryable(upstream, true, test.NewTestingLogger(t))
federated := NewExemplarQueryable(upstream, true, defaultConcurrency, test.NewTestingLogger(t))

q, err := federated.ExemplarQuerier(ctx)
assert.NoError(t, err)
Expand All @@ -127,7 +127,7 @@ func TestMergeExemplarQueryable_ExemplarQuerier(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "123")
querier := &mockExemplarQuerier{}
upstream := &mockExemplarQueryable{queriers: map[string]storage.ExemplarQuerier{"123": querier}}
federated := NewExemplarQueryable(upstream, false, test.NewTestingLogger(t))
federated := NewExemplarQueryable(upstream, false, defaultConcurrency, test.NewTestingLogger(t))

q, err := federated.ExemplarQuerier(ctx)
require.NoError(t, err)
Expand All @@ -146,7 +146,7 @@ func TestMergeExemplarQueryable_ExemplarQuerier(t *testing.T) {
"123": querier1,
"456": querier2,
}}
federated := NewExemplarQueryable(upstream, false, test.NewTestingLogger(t))
federated := NewExemplarQueryable(upstream, false, defaultConcurrency, test.NewTestingLogger(t))

q, err := federated.ExemplarQuerier(ctx)
require.NoError(t, err)
Expand Down Expand Up @@ -208,7 +208,7 @@ func TestMergeExemplarQuerier_Select(t *testing.T) {
"456": &mockExemplarQuerier{res: res2},
}}

federated := NewExemplarQueryable(upstream, false, test.NewTestingLogger(t))
federated := NewExemplarQueryable(upstream, false, defaultConcurrency, test.NewTestingLogger(t))
q, err := federated.ExemplarQuerier(user.InjectOrgID(context.Background(), "123|456"))
require.NoError(t, err)

Expand All @@ -233,7 +233,7 @@ func TestMergeExemplarQuerier_Select(t *testing.T) {
"456": &mockExemplarQuerier{res: res2},
}}

federated := NewExemplarQueryable(upstream, false, test.NewTestingLogger(t))
federated := NewExemplarQueryable(upstream, false, defaultConcurrency, test.NewTestingLogger(t))
q, err := federated.ExemplarQuerier(user.InjectOrgID(context.Background(), "123|456"))
require.NoError(t, err)

Expand Down Expand Up @@ -263,7 +263,7 @@ func TestMergeExemplarQuerier_Select(t *testing.T) {
"456": &mockExemplarQuerier{res: res2},
}}

federated := NewExemplarQueryable(upstream, false, test.NewTestingLogger(t))
federated := NewExemplarQueryable(upstream, false, defaultConcurrency, test.NewTestingLogger(t))
q, err := federated.ExemplarQuerier(user.InjectOrgID(context.Background(), "123|456"))
require.NoError(t, err)

Expand All @@ -284,7 +284,7 @@ func TestMergeExemplarQuerier_Select(t *testing.T) {
"456": &mockExemplarQuerier{res: res2},
}}

federated := NewExemplarQueryable(upstream, false, test.NewTestingLogger(t))
federated := NewExemplarQueryable(upstream, false, defaultConcurrency, test.NewTestingLogger(t))
q, err := federated.ExemplarQuerier(user.InjectOrgID(context.Background(), "123|456"))
require.NoError(t, err)

Expand All @@ -306,7 +306,7 @@ func TestMergeExemplarQuerier_Select(t *testing.T) {
"456": &mockExemplarQuerier{err: errors.New("timeout running exemplar query")},
}}

federated := NewExemplarQueryable(upstream, false, test.NewTestingLogger(t))
federated := NewExemplarQueryable(upstream, false, defaultConcurrency, test.NewTestingLogger(t))
q, err := federated.ExemplarQuerier(user.InjectOrgID(context.Background(), "123|456"))
require.NoError(t, err)

Expand Down
18 changes: 10 additions & 8 deletions pkg/querier/tenantfederation/merge_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,20 @@ import (
// metadata for all tenant IDs that are part of the request and merges the results.
//
// No deduplication of metadata is done before being returned.
func NewMetadataSupplier(next querier.MetadataSupplier, logger log.Logger) querier.MetadataSupplier {
func NewMetadataSupplier(next querier.MetadataSupplier, maxConcurrency int, logger log.Logger) querier.MetadataSupplier {
return &mergeMetadataSupplier{
next: next,
logger: logger,
resolver: tenant.NewMultiResolver(),
next: next,
maxConcurrency: maxConcurrency,
resolver: tenant.NewMultiResolver(),
logger: logger,
}
}

type mergeMetadataSupplier struct {
next querier.MetadataSupplier
resolver tenant.Resolver
logger log.Logger
next querier.MetadataSupplier
resolver tenant.Resolver
maxConcurrency int
logger log.Logger
}

func (m *mergeMetadataSupplier) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error) {
Expand Down Expand Up @@ -62,7 +64,7 @@ func (m *mergeMetadataSupplier) MetricsMetadata(ctx context.Context) ([]scrape.M
return nil
}

err = concurrency.ForEachJob(ctx, len(tenantIDs), maxConcurrency, run)
err = concurrency.ForEachJob(ctx, len(tenantIDs), m.maxConcurrency, run)
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/querier/tenantfederation/merge_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestMergeMetadataSupplier_MetricsMetadata(t *testing.T) {

t.Run("invalid tenant IDs", func(t *testing.T) {
upstream := &mockMetadataSupplier{}
supplier := NewMetadataSupplier(upstream, test.NewTestingLogger(t))
supplier := NewMetadataSupplier(upstream, defaultConcurrency, test.NewTestingLogger(t))
_, err := supplier.MetricsMetadata(context.Background())

assert.ErrorIs(t, err, user.ErrNoOrgID)
Expand All @@ -61,7 +61,7 @@ func TestMergeMetadataSupplier_MetricsMetadata(t *testing.T) {
},
}

supplier := NewMetadataSupplier(upstream, test.NewTestingLogger(t))
supplier := NewMetadataSupplier(upstream, defaultConcurrency, test.NewTestingLogger(t))
res, err := supplier.MetricsMetadata(user.InjectOrgID(context.Background(), "team-a"))

require.NoError(t, err)
Expand All @@ -77,7 +77,7 @@ func TestMergeMetadataSupplier_MetricsMetadata(t *testing.T) {
},
}

supplier := NewMetadataSupplier(upstream, test.NewTestingLogger(t))
supplier := NewMetadataSupplier(upstream, defaultConcurrency, test.NewTestingLogger(t))
res, err := supplier.MetricsMetadata(user.InjectOrgID(context.Background(), "team-a|team-b"))

require.NoError(t, err)
Expand All @@ -94,7 +94,7 @@ func TestMergeMetadataSupplier_MetricsMetadata(t *testing.T) {
},
}

supplier := NewMetadataSupplier(upstream, test.NewTestingLogger(t))
supplier := NewMetadataSupplier(upstream, defaultConcurrency, test.NewTestingLogger(t))
res, err := supplier.MetricsMetadata(user.InjectOrgID(context.Background(), "team-a|team-b"))

require.NoError(t, err)
Expand Down
Loading

0 comments on commit 81adb3a

Please sign in to comment.