Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename variables and types in query-scheduler RequestQueue #6141

Merged
merged 6 commits into from
Oct 3, 2023
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 42 additions & 43 deletions pkg/scheduler/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ type RequestQueue struct {

connectedQuerierWorkers *atomic.Int32

stopRequested chan struct{} // Written to by stop() to wake up dispatcherLoop() in response to a stop request.
stopCompleted chan struct{} // Closed by dispatcherLoop() after a stop is requested and the dispatcher has stopped.
querierOperations chan querierOperation
enqueueRequests chan enqueueRequest
availableQuerierConnections chan *querierConnection
stopRequested chan struct{} // Written to by stop() to wake up dispatcherLoop() in response to a stop request.
stopCompleted chan struct{} // Closed by dispatcherLoop() after a stop is requested and the dispatcher has stopped.
querierOperations chan querierOperation
requestsToEnqueue chan requestToEnqueue
nextRequestForQuerierCalls chan *waitingNextRequestForQuerierCall

queueLength *prometheus.GaugeVec // Per user and reason.
discardedRequests *prometheus.CounterVec // Per user.
Expand All @@ -87,7 +87,7 @@ const (
forgetDisconnected
)

type enqueueRequest struct {
type requestToEnqueue struct {
userID string
req Request
maxQueriers int
Expand All @@ -108,9 +108,9 @@ func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, que
stopCompleted: make(chan struct{}),

// These channels must not be buffered so that we can detect when dispatcherLoop() has finished.
querierOperations: make(chan querierOperation),
enqueueRequests: make(chan enqueueRequest),
availableQuerierConnections: make(chan *querierConnection),
querierOperations: make(chan querierOperation),
requestsToEnqueue: make(chan requestToEnqueue),
nextRequestForQuerierCalls: make(chan *waitingNextRequestForQuerierCall),
}

q.Service = services.NewTimerService(forgetCheckPeriod, q.starting, q.forgetDisconnectedQueriers, q.stop).WithName("request queue")
Expand All @@ -128,7 +128,7 @@ func (q *RequestQueue) starting(_ context.Context) error {
func (q *RequestQueue) dispatcherLoop() {
stopping := false
queues := newUserQueues(q.maxOutstandingPerTenant, q.forgetDelay)
waitingQuerierConnections := list.New()
waitingGetNextRequestForQuerierCalls := list.New()

for {
needToDispatchQueries := false
Expand Down Expand Up @@ -157,7 +157,7 @@ func (q *RequestQueue) dispatcherLoop() {
// If the querier shuts down without notifying us, this is OK: we'll never mark it as shutting down, so we'll
// dispatch a query to GetNextRequestForQuerier and Scheduler.QuerierLoop will try to send the query to it
// later. This will fail because the connection is broken, and GetNextRequestForQuerier won't be called again.
q.cancelWaitingConnectionsForQuerier(qe.querierID, waitingQuerierConnections)
q.cancelWaitingConnectionsForQuerier(qe.querierID, waitingGetNextRequestForQuerierCalls)
case forgetDisconnected:
if queues.forgetDisconnectedQueriers(time.Now()) > 0 {
// Removing some queriers may have caused a resharding.
Expand All @@ -166,29 +166,29 @@ func (q *RequestQueue) dispatcherLoop() {
default:
panic(fmt.Sprintf("received unknown querier event %v for querier ID %v", qe.operation, qe.querierID))
}
case r := <-q.enqueueRequests:
case r := <-q.requestsToEnqueue:
err := q.handleEnqueueRequest(queues, r)
r.processed <- err

if err == nil {
needToDispatchQueries = true
}
case querierConn := <-q.availableQuerierConnections:
if !q.dispatchRequestToQuerier(queues, querierConn) {
case call := <-q.nextRequestForQuerierCalls:
if !q.tryDispatchRequest(queues, call) {
// No requests available for this querier connection right now. Add it to the list to try later.
querierConn.element = waitingQuerierConnections.PushBack(querierConn)
waitingGetNextRequestForQuerierCalls.PushBack(call)
}
}

if needToDispatchQueries {
currentElement := waitingQuerierConnections.Front()
currentElement := waitingGetNextRequestForQuerierCalls.Front()

for currentElement != nil && queues.len() > 0 {
querierConn := currentElement.Value.(*querierConnection)
call := currentElement.Value.(*waitingNextRequestForQuerierCall)
nextElement := currentElement.Next() // We have to capture the next element before calling Remove(), as Remove() clears it.

if q.dispatchRequestToQuerier(queues, querierConn) {
waitingQuerierConnections.Remove(currentElement)
if q.tryDispatchRequest(queues, call) {
waitingGetNextRequestForQuerierCalls.Remove(currentElement)
}

currentElement = nextElement
Expand All @@ -197,11 +197,11 @@ func (q *RequestQueue) dispatcherLoop() {

if stopping && (queues.len() == 0 || q.connectedQuerierWorkers.Load() == 0) {
// Tell any waiting GetNextRequestForQuerier calls that nothing is coming.
currentElement := waitingQuerierConnections.Front()
currentElement := waitingGetNextRequestForQuerierCalls.Front()

for currentElement != nil {
querierConn := currentElement.Value.(*querierConnection)
querierConn.sendError(ErrStopped)
call := currentElement.Value.(*waitingNextRequestForQuerierCall)
call.sendError(ErrStopped)
currentElement = currentElement.Next()
}

Expand All @@ -212,7 +212,7 @@ func (q *RequestQueue) dispatcherLoop() {
}
}

func (q *RequestQueue) handleEnqueueRequest(queues *queues, r enqueueRequest) error {
func (q *RequestQueue) handleEnqueueRequest(queues *queues, r requestToEnqueue) error {
queue := queues.getOrAddQueue(r.userID, r.maxQueriers)
if queue == nil {
// This can only happen if userID is "".
Expand All @@ -235,18 +235,18 @@ func (q *RequestQueue) handleEnqueueRequest(queues *queues, r enqueueRequest) er
return nil
}

// dispatchRequestToQuerier finds and forwards a request to a querier, if a suitable request is available.
// Returns true if this querier should be removed from the list of waiting queriers (eg. because a request has been forwarded to it), false otherwise.
func (q *RequestQueue) dispatchRequestToQuerier(queues *queues, querierConn *querierConnection) bool {
// tryDispatchRequest finds and forwards a request to a waiting GetNextRequestForQuerier call, if a suitable request is available.
// Returns true if call should be removed from the list of waiting calls (eg. because a request has been forwarded to it), false otherwise.
func (q *RequestQueue) tryDispatchRequest(queues *queues, call *waitingNextRequestForQuerierCall) bool {
// If this querier has told us it's shutting down, don't bother trying to find a query request for it.
// Terminate GetNextRequestForQuerier with an error now.
queue, userID, idx, err := queues.getNextQueueForQuerier(querierConn.lastUserIndex.last, querierConn.querierID)
queue, userID, idx, err := queues.getNextQueueForQuerier(call.lastUserIndex.last, call.querierID)
if err != nil {
querierConn.sendError(err)
call.sendError(err)
return true
}

querierConn.lastUserIndex.last = idx
call.lastUserIndex.last = idx
if queue == nil {
// Nothing available for this querier, try again next time.
return false
Expand All @@ -255,9 +255,9 @@ func (q *RequestQueue) dispatchRequestToQuerier(queues *queues, querierConn *que
// Pick next request from the queue. The queue is guaranteed not to be empty because we remove empty queues.
queueElement := queue.Front()

requestSent := querierConn.send(nextRequestForQuerier{
requestSent := call.send(nextRequestForQuerier{
req: queueElement.Value,
lastUserIndex: querierConn.lastUserIndex,
lastUserIndex: call.lastUserIndex,
err: nil,
})

Expand All @@ -280,11 +280,11 @@ func (q *RequestQueue) cancelWaitingConnectionsForQuerier(querierID string, wait
currentElement := waitingQuerierConnections.Front()

for currentElement != nil {
querierConn := currentElement.Value.(*querierConnection)
call := currentElement.Value.(*waitingNextRequestForQuerierCall)
nextElement := currentElement.Next() // We have to capture the next element before calling Remove(), as Remove() clears it.

if querierConn.querierID == querierID {
querierConn.sendError(ErrQuerierShuttingDown)
if call.querierID == querierID {
call.sendError(ErrQuerierShuttingDown)
waitingQuerierConnections.Remove(currentElement)
}

Expand All @@ -303,7 +303,7 @@ func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers in
q.enqueueDuration.Observe(time.Since(start).Seconds())
}()

r := enqueueRequest{
r := requestToEnqueue{
userID: userID,
req: req,
maxQueriers: maxQueriers,
Expand All @@ -312,7 +312,7 @@ func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers in
}

select {
case q.enqueueRequests <- r:
case q.requestsToEnqueue <- r:
return <-r.processed
case <-q.stopCompleted:
return ErrStopped
Expand All @@ -323,18 +323,18 @@ func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers in
// By passing user index from previous call of this method, querier guarantees that it iterates over all users fairly.
// If querier finds that request from the user is already expired, it can get a request for the same user by using UserIndex.ReuseLastUser.
func (q *RequestQueue) GetNextRequestForQuerier(ctx context.Context, last UserIndex, querierID string) (Request, UserIndex, error) {
querierConn := &querierConnection{
call := &waitingNextRequestForQuerierCall{
ctx: ctx,
querierID: querierID,
lastUserIndex: last,
processed: make(chan nextRequestForQuerier),
}

select {
case q.availableQuerierConnections <- querierConn:
case q.nextRequestForQuerierCalls <- call:
// The dispatcher now knows we're waiting. Either we'll get a request to send to a querier, or we'll cancel.
select {
case result := <-querierConn.processed:
case result := <-call.processed:
return result.req, result.lastUserIndex, result.err
case <-ctx.Done():
return nil, last, ctx.Err()
Expand Down Expand Up @@ -389,25 +389,24 @@ func (q *RequestQueue) GetConnectedQuerierWorkersMetric() float64 {
return float64(q.connectedQuerierWorkers.Load())
}

type querierConnection struct {
type waitingNextRequestForQuerierCall struct {
ctx context.Context
querierID string
lastUserIndex UserIndex
processed chan nextRequestForQuerier

haveUsed bool // Must be set to true after sending a message to processed, to ensure we only ever try to send one message to processed.
element *list.Element
}

func (q *querierConnection) sendError(err error) {
func (q *waitingNextRequestForQuerierCall) sendError(err error) {
// If GetNextRequestForQuerier is already gone, we don't care, so ignore the result from send.
_ = q.send(nextRequestForQuerier{err: err})
}

// send sends req to the GetNextRequestForQuerier call that is waiting for a new query.
// Returns true if sending succeeds, or false otherwise (eg. because the GetNextRequestForQuerier call has already returned due to a context
// cancellation).
func (q *querierConnection) send(req nextRequestForQuerier) bool {
func (q *waitingNextRequestForQuerierCall) send(req nextRequestForQuerier) bool {
if q.haveUsed {
panic("bug: should not try to send multiple messages to a querier")
}
Expand Down
Loading