diff --git a/pkg/ruler/compat.go b/pkg/ruler/compat.go index be67414c5a7..1cc1690bf14 100644 --- a/pkg/ruler/compat.go +++ b/pkg/ruler/compat.go @@ -14,6 +14,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/grpcutil" + "github.com/grafana/dskit/tenant" "github.com/grafana/dskit/user" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -143,74 +144,68 @@ func (t *PusherAppendable) Appender(ctx context.Context) storage.Appender { } } -type multitenantConcurrencyController struct { +type MultitenantConcurrencyController struct { globalConcurrency semaphore.Weighted mtx sync.Mutex tenantConcurrency map[string]*semaphore.Weighted } -type tenantController struct { - doneFunc func() - allowFunc func() bool -} +func (c *MultitenantConcurrencyController) Done(ctx context.Context, group *rules.Group) { + c.globalConcurrency.Release(1) -func (c *multitenantConcurrencyController) ControllerFor(userID string) *tenantController { - // We need to create a new controller for each tenant, because each tenant has its own concurrency limits. - // TODO: We need to garbage collect the controllers when they are no longer needed. - return &tenantController{ - doneFunc: c.doneFuncFor(userID), - allowFunc: c.allowFuncFor(userID), + userID, err := tenant.TenantID(ctx) + if err != nil { + // TODO: log error + return } -} -func (tc *tenantController) Done() { - tc.doneFunc() -} -func (tc *tenantController) Allow() bool { - return tc.allowFunc() -} + c.mtx.Lock() + defer c.mtx.Unlock() + tc, ok := c.tenantConcurrency[userID] -func (c *multitenantConcurrencyController) doneFuncFor(userID string) func() { - return func() { - c.globalConcurrency.Release(1) - c.mtx.Lock() - tc, ok := c.tenantConcurrency[userID] - if ok { - tc.Release(1) - } - c.mtx.Unlock() + if ok { + // TODO: log error + tc.Release(1) } } -func (c *multitenantConcurrencyController) allowFuncFor(userID string) func() bool { - return func() bool { - if !c.globalConcurrency.TryAcquire(1) { - return false - } +const IntervalToEvaluationThreshold = 50.00 - c.mtx.Lock() - defer c.mtx.Unlock() - tc, ok := c.tenantConcurrency[userID] - if !ok { - tc = semaphore.NewWeighted(4) - c.tenantConcurrency[userID] = tc - } - return tc.TryAcquire(1) +func (c *MultitenantConcurrencyController) Allow(ctx context.Context, group *rules.Group) bool { + if !c.globalConcurrency.TryAcquire(1) { + return false + } + + userID, err := tenant.TenantID(ctx) + if err != nil { + return false } + c.mtx.Lock() + defer c.mtx.Unlock() + tc, ok := c.tenantConcurrency[userID] + if !ok { + tc = semaphore.NewWeighted(4) + c.tenantConcurrency[userID] = tc + } + + return tc.TryAcquire(1) && c.IsGroupAtRisk(group) } -func NewMultitenantConcurrencyController(maxConcurrency int64) ConcurrencyController { - return &multitenantConcurrencyController{ +func (c *MultitenantConcurrencyController) IsGroupAtRisk(group *rules.Group) bool { + interval := group.Interval().Seconds() + lastEvaluation := group.GetEvaluationTime().Seconds() + + return lastEvaluation < interval*IntervalToEvaluationThreshold/100 +} + +func NewMultitenantConcurrencyController(maxConcurrency int64) *MultitenantConcurrencyController { + return &MultitenantConcurrencyController{ globalConcurrency: *semaphore.NewWeighted(maxConcurrency), tenantConcurrency: make(map[string]*semaphore.Weighted), } } -type ConcurrencyController interface { - ControllerFor(userID string) *tenantController -} - // RulesLimits defines limits used by Ruler. type RulesLimits interface { EvaluationDelay(userID string) time.Duration @@ -340,7 +335,7 @@ func DefaultTenantManagerFactory( p Pusher, queryable storage.Queryable, queryFunc rules.QueryFunc, - concurrencyController ConcurrencyController, + concurrencyController *MultitenantConcurrencyController, overrides RulesLimits, reg prometheus.Registerer, ) ManagerFactory { @@ -412,7 +407,7 @@ func DefaultTenantManagerFactory( return overrides.EvaluationDelay(userID) }, ConcurrentEvalsEnabled: concurrentEvaluationEnabled, - RuleConcurrencyController: concurrencyController.ControllerFor(userID), + RuleConcurrencyController: concurrencyController, MaxConcurrentEvals: overrides.RulerMaxConcurrentRuleEvaluations(userID), }) } diff --git a/pkg/ruler/manager.go b/pkg/ruler/manager.go index a8fc988229f..766f2904124 100644 --- a/pkg/ruler/manager.go +++ b/pkg/ruler/manager.go @@ -36,7 +36,7 @@ type DefaultMultiTenantManager struct { cfg Config notifierCfg *config.Config managerFactory ManagerFactory - concurrencyController ConcurrencyController + concurrencyController *MultitenantConcurrencyController mapper *mapper @@ -61,7 +61,7 @@ type DefaultMultiTenantManager struct { rulerIsRunning atomic.Bool } -func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, reg prometheus.Registerer, logger log.Logger, dnsResolver cache.AddressProvider, cc ConcurrencyController) (*DefaultMultiTenantManager, error) { +func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, reg prometheus.Registerer, logger log.Logger, dnsResolver cache.AddressProvider, cc *MultitenantConcurrencyController) (*DefaultMultiTenantManager, error) { refreshMetrics := discovery.NewRefreshMetrics(reg) ncfg, err := buildNotifierConfig(&cfg, dnsResolver, refreshMetrics) if err != nil { diff --git a/vendor/github.com/prometheus/prometheus/rules/group.go b/vendor/github.com/prometheus/prometheus/rules/group.go index 43744779d74..2ea6c7e0e1e 100644 --- a/vendor/github.com/prometheus/prometheus/rules/group.go +++ b/vendor/github.com/prometheus/prometheus/rules/group.go @@ -603,7 +603,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { // If the rule has no dependencies, it can run concurrently because no other rules in this group depend on its output. // Try run concurrently if there are slots available. - if ctrl := g.concurrencyController; isRuleEligibleForConcurrentExecution(rule) && ctrl.Allow() { + if ctrl := g.concurrencyController; isRuleEligibleForConcurrentExecution(rule) && ctrl.Allow(ctx, g) { wg.Add(1) go eval(i, rule, func() { diff --git a/vendor/github.com/prometheus/prometheus/rules/manager.go b/vendor/github.com/prometheus/prometheus/rules/manager.go index 14e53470b66..0bef4a6f12a 100644 --- a/vendor/github.com/prometheus/prometheus/rules/manager.go +++ b/vendor/github.com/prometheus/prometheus/rules/manager.go @@ -488,10 +488,10 @@ func (c ruleDependencyController) AnalyseRules(rules []Rule) { type RuleConcurrencyController interface { // Allow determines whether any concurrent evaluation slots are available. // If Allow() returns true, then Done() must be called to release the acquired slot. - Allow() bool + Allow(ctx context.Context, group *Group) bool // Done releases a concurrent evaluation slot. - Done() + Done(ctx context.Context, group *Group) } // concurrentRuleEvalController holds a weighted semaphore which controls the concurrent evaluation of rules. @@ -521,11 +521,11 @@ func (c *concurrentRuleEvalController) RuleEligible(g *Group, r Rule) bool { return depMap.isIndependent(r) } -func (c *concurrentRuleEvalController) Allow() bool { +func (c *concurrentRuleEvalController) Allow(ctx context.Context, group *Group) bool { return c.sema.TryAcquire(1) } -func (c *concurrentRuleEvalController) Done() { +func (c *concurrentRuleEvalController) Done(ctx context.Context, group *Group) { c.sema.Release(1) } @@ -544,9 +544,9 @@ func (c sequentialRuleEvalController) RuleEligible(_ *Group, _ Rule) bool { return false } -func (c sequentialRuleEvalController) Allow() bool { +func (c sequentialRuleEvalController) Allow(ctx context.Context, group *Group) bool { return false } -func (c sequentialRuleEvalController) Done() {} +func (c sequentialRuleEvalController) Done(ctx context.Context, group *Group) {} func (c sequentialRuleEvalController) Invalidate() {}