Skip to content

Commit

Permalink
Add alternative implementation
Browse files Browse the repository at this point in the history
Signed-off-by: gotjosh <josue.abreu@gmail.com>
  • Loading branch information
gotjosh committed Jul 19, 2024
1 parent 424fbd1 commit 112d186
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 57 deletions.
91 changes: 43 additions & 48 deletions pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/grpcutil"
"github.com/grafana/dskit/tenant"
"github.com/grafana/dskit/user"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -143,74 +144,68 @@ func (t *PusherAppendable) Appender(ctx context.Context) storage.Appender {
}
}

type multitenantConcurrencyController struct {
type MultitenantConcurrencyController struct {
globalConcurrency semaphore.Weighted
mtx sync.Mutex
tenantConcurrency map[string]*semaphore.Weighted
}

type tenantController struct {
doneFunc func()
allowFunc func() bool
}
func (c *MultitenantConcurrencyController) Done(ctx context.Context, group *rules.Group) {
c.globalConcurrency.Release(1)

func (c *multitenantConcurrencyController) ControllerFor(userID string) *tenantController {
// We need to create a new controller for each tenant, because each tenant has its own concurrency limits.
// TODO: We need to garbage collect the controllers when they are no longer needed.
return &tenantController{
doneFunc: c.doneFuncFor(userID),
allowFunc: c.allowFuncFor(userID),
userID, err := tenant.TenantID(ctx)
if err != nil {
// TODO: log error
return
}
}
func (tc *tenantController) Done() {
tc.doneFunc()
}

func (tc *tenantController) Allow() bool {
return tc.allowFunc()
}
c.mtx.Lock()
defer c.mtx.Unlock()
tc, ok := c.tenantConcurrency[userID]

func (c *multitenantConcurrencyController) doneFuncFor(userID string) func() {
return func() {
c.globalConcurrency.Release(1)
c.mtx.Lock()
tc, ok := c.tenantConcurrency[userID]
if ok {
tc.Release(1)
}
c.mtx.Unlock()
if ok {
// TODO: log error
tc.Release(1)
}
}

func (c *multitenantConcurrencyController) allowFuncFor(userID string) func() bool {
return func() bool {
if !c.globalConcurrency.TryAcquire(1) {
return false
}
const IntervalToEvaluationThreshold = 50.00

c.mtx.Lock()
defer c.mtx.Unlock()
tc, ok := c.tenantConcurrency[userID]
if !ok {
tc = semaphore.NewWeighted(4)
c.tenantConcurrency[userID] = tc
}
return tc.TryAcquire(1)
func (c *MultitenantConcurrencyController) Allow(ctx context.Context, group *rules.Group) bool {
if !c.globalConcurrency.TryAcquire(1) {
return false
}

userID, err := tenant.TenantID(ctx)
if err != nil {
return false
}

c.mtx.Lock()
defer c.mtx.Unlock()
tc, ok := c.tenantConcurrency[userID]
if !ok {
tc = semaphore.NewWeighted(4)
c.tenantConcurrency[userID] = tc
}

return tc.TryAcquire(1) && c.IsGroupAtRisk(group)
}

func NewMultitenantConcurrencyController(maxConcurrency int64) ConcurrencyController {
return &multitenantConcurrencyController{
func (c *MultitenantConcurrencyController) IsGroupAtRisk(group *rules.Group) bool {
interval := group.Interval().Seconds()
lastEvaluation := group.GetEvaluationTime().Seconds()

return lastEvaluation < interval*IntervalToEvaluationThreshold/100
}

func NewMultitenantConcurrencyController(maxConcurrency int64) *MultitenantConcurrencyController {
return &MultitenantConcurrencyController{
globalConcurrency: *semaphore.NewWeighted(maxConcurrency),
tenantConcurrency: make(map[string]*semaphore.Weighted),
}
}

type ConcurrencyController interface {
ControllerFor(userID string) *tenantController
}

// RulesLimits defines limits used by Ruler.
type RulesLimits interface {
EvaluationDelay(userID string) time.Duration
Expand Down Expand Up @@ -340,7 +335,7 @@ func DefaultTenantManagerFactory(
p Pusher,
queryable storage.Queryable,
queryFunc rules.QueryFunc,
concurrencyController ConcurrencyController,
concurrencyController *MultitenantConcurrencyController,
overrides RulesLimits,
reg prometheus.Registerer,
) ManagerFactory {
Expand Down Expand Up @@ -412,7 +407,7 @@ func DefaultTenantManagerFactory(
return overrides.EvaluationDelay(userID)
},
ConcurrentEvalsEnabled: concurrentEvaluationEnabled,
RuleConcurrencyController: concurrencyController.ControllerFor(userID),
RuleConcurrencyController: concurrencyController,
MaxConcurrentEvals: overrides.RulerMaxConcurrentRuleEvaluations(userID),
})
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ruler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type DefaultMultiTenantManager struct {
cfg Config
notifierCfg *config.Config
managerFactory ManagerFactory
concurrencyController ConcurrencyController
concurrencyController *MultitenantConcurrencyController

mapper *mapper

Expand All @@ -61,7 +61,7 @@ type DefaultMultiTenantManager struct {
rulerIsRunning atomic.Bool
}

func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, reg prometheus.Registerer, logger log.Logger, dnsResolver cache.AddressProvider, cc ConcurrencyController) (*DefaultMultiTenantManager, error) {
func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, reg prometheus.Registerer, logger log.Logger, dnsResolver cache.AddressProvider, cc *MultitenantConcurrencyController) (*DefaultMultiTenantManager, error) {
refreshMetrics := discovery.NewRefreshMetrics(reg)
ncfg, err := buildNotifierConfig(&cfg, dnsResolver, refreshMetrics)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion vendor/github.com/prometheus/prometheus/rules/group.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions vendor/github.com/prometheus/prometheus/rules/manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 112d186

Please sign in to comment.