Skip to content

Commit

Permalink
convert getNextQueueForQuerier to use tenantQuerierState to report ne…
Browse files Browse the repository at this point in the history
…xt tenant for querier
  • Loading branch information
francoposa committed Sep 27, 2023
1 parent 5deea93 commit d753be3
Showing 1 changed file with 27 additions and 24 deletions.
51 changes: 27 additions & 24 deletions pkg/scheduler/queue/user_queues.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,40 +149,43 @@ func (q *queues) getOrAddTenantQueue(tenantID string, maxQueriers int) *list.Lis
// to pass last user index returned by this function as argument. If there was no previous
// last user index, use -1.
func (q *queues) getNextQueueForQuerier(lastUserIndex int, querierID string) (*list.List, string, int, error) {
userIndex := lastUserIndex

// Ensure the querier is not shutting down. If the querier is shutting down, we shouldn't forward
// any more queries to it.
if info := q.tenantQuerierState.queriersByID[querierID]; info == nil || info.shuttingDown {
return nil, "", userIndex, ErrQuerierShuttingDown
nextTenantID, nextTenantIndex, err := q.tenantQuerierState.getNextTenantIDForQuerier(lastUserIndex, querierID)
if err != nil || nextTenantID == "" {
return nil, nextTenantID, nextTenantIndex, err
}

for iters := 0; iters < len(q.tenantQuerierState.tenantIDOrder); iters++ {
userIndex = userIndex + 1
tenantQueue := q.tenantQueues[nextTenantID]
return tenantQueue.requests, nextTenantID, nextTenantIndex, nil
}

// Don't use "mod len(q.tenantQuerierState.tenantIDOrder)", as that could skip users at the beginning of the list
// for example when q.tenantQuerierState.tenantIDOrder has shrunk since last call.
if userIndex >= len(q.tenantQuerierState.tenantIDOrder) {
userIndex = 0
func (tqs *tenantQuerierState) getNextTenantIDForQuerier(lastTenantIndex int, querierID string) (string, int, error) {
// check if querier is registered and is not shutting down
if q := tqs.queriersByID[querierID]; q == nil || q.shuttingDown {
return "", lastTenantIndex, ErrQuerierShuttingDown
}
tenantOrderIndex := lastTenantIndex
for iters := 0; iters < len(tqs.tenantIDOrder); iters++ {
tenantOrderIndex++
if tenantOrderIndex >= len(tqs.tenantIDOrder) {
tenantOrderIndex = 0
}

userID := q.tenantQuerierState.tenantIDOrder[userIndex]
if userID == "" {
tenantID := tqs.tenantIDOrder[tenantOrderIndex]
if tenantID == "" {
continue
}

userQueue := q.tenantQueues[userID]

if querierSet := q.tenantQuerierState.tenantQuerierIDs[userID]; querierSet != nil {
if _, ok := querierSet[querierID]; !ok {
// This querier is not handling the user.
continue
}
tenantQuerierSet := tqs.tenantQuerierIDs[tenantID]
if tenantQuerierSet == nil {
// tenant can use all queriers
return tenantID, tenantOrderIndex, nil
} else if _, ok := tenantQuerierSet[querierID]; ok {
// tenant is assigned this querier
return tenantID, tenantOrderIndex, nil
}

return userQueue.requests, userID, userIndex, nil
}
return nil, "", userIndex, nil

return "", lastTenantIndex, nil
}

func (tqs *tenantQuerierState) getOrAddTenant(tenantID string, maxQueriers int) *queueTenant {
Expand Down

0 comments on commit d753be3

Please sign in to comment.