Skip to content

Commit

Permalink
Fix issue where query-schedulers accumulate stale querier connections (
Browse files Browse the repository at this point in the history
…#6100)

* Fix issue where query-scheduler accumulates connections from queriers that have shut down.

* Add test to ensure GetNextRequestForQuerier still returns if racing with NotifyQuerierShutdown and NotifyQuerierShutdown executes first.

* Add changelog entry.

* Address PR feedback: add comments

* Clarify comment

(cherry picked from commit a6d8fea)
  • Loading branch information
charleskorn authored and grafanabot committed Sep 25, 2023
1 parent b3d4f61 commit 0996f8e
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* [ENHANCEMENT] Fetch secrets used to configure server-side TLS from Vault when `-vault.enabled` is true. #6052.
* [BUGFIX] Query-frontend: Don't retry read requests rejected by the ingester due to utilization based read path limiting. #6032
* [BUGFIX] Ring: Ensure network addresses used for component hash rings are formatted correctly when using IPv6. #6068
* [BUGFIX] Query-scheduler: don't retain connections from queriers that have shut down, leading to gradually increasing enqueue latency over time. #6100

### Mixin

Expand Down
43 changes: 39 additions & 4 deletions pkg/scheduler/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ const (
)

var (
ErrTooManyRequests = errors.New("too many outstanding requests")
ErrStopped = errors.New("queue is stopped")
ErrTooManyRequests = errors.New("too many outstanding requests")
ErrStopped = errors.New("queue is stopped")
ErrQuerierShuttingDown = errors.New("querier has informed the scheduler it is shutting down")
)

// UserIndex is opaque type that allows to resume iteration over users between successive calls
Expand Down Expand Up @@ -151,6 +152,12 @@ func (q *RequestQueue) dispatcherLoop() {
case notifyShutdown:
queues.notifyQuerierShutdown(qe.querierID)
needToDispatchQueries = true

// Tell any waiting GetNextRequestForQuerier calls for this querier that nothing is coming.
// If the querier shuts down without notifying us, this is OK: we'll never mark it as shutting down, so we'll
// dispatch a query to GetNextRequestForQuerier and Scheduler.QuerierLoop will try to send the query to it
// later. This will fail because the connection is broken, and GetNextRequestForQuerier won't be called again.
q.cancelWaitingConnectionsForQuerier(qe.querierID, waitingQuerierConnections)
case forgetDisconnected:
if queues.forgetDisconnectedQueriers(time.Now()) > 0 {
// Removing some queriers may have caused a resharding.
Expand Down Expand Up @@ -194,7 +201,7 @@ func (q *RequestQueue) dispatcherLoop() {

for currentElement != nil {
querierConn := currentElement.Value.(*querierConnection)
_ = querierConn.send(nextRequestForQuerier{err: ErrStopped}) // If GetNextRequestForQuerier is already gone, we don't care, so ignore the result.
querierConn.sendError(ErrStopped)
currentElement = currentElement.Next()
}

Expand Down Expand Up @@ -231,7 +238,14 @@ func (q *RequestQueue) handleEnqueueRequest(queues *queues, r enqueueRequest) er
// dispatchRequestToQuerier finds and forwards a request to a querier, if a suitable request is available.
// Returns true if this querier should be removed from the list of waiting queriers (eg. because a request has been forwarded to it), false otherwise.
func (q *RequestQueue) dispatchRequestToQuerier(queues *queues, querierConn *querierConnection) bool {
queue, userID, idx := queues.getNextQueueForQuerier(querierConn.lastUserIndex.last, querierConn.querierID)
// If this querier has told us it's shutting down, don't bother trying to find a query request for it.
// Terminate GetNextRequestForQuerier with an error now.
queue, userID, idx, err := queues.getNextQueueForQuerier(querierConn.lastUserIndex.last, querierConn.querierID)
if err != nil {
querierConn.sendError(err)
return true
}

querierConn.lastUserIndex.last = idx
if queue == nil {
// Nothing available for this querier, try again next time.
Expand Down Expand Up @@ -262,6 +276,22 @@ func (q *RequestQueue) dispatchRequestToQuerier(queues *queues, querierConn *que
return true
}

func (q *RequestQueue) cancelWaitingConnectionsForQuerier(querierID string, waitingQuerierConnections *list.List) {
currentElement := waitingQuerierConnections.Front()

for currentElement != nil {
querierConn := currentElement.Value.(*querierConnection)
nextElement := currentElement.Next() // We have to capture the next element before calling Remove(), as Remove() clears it.

if querierConn.querierID == querierID {
querierConn.sendError(ErrQuerierShuttingDown)
waitingQuerierConnections.Remove(currentElement)
}

currentElement = nextElement
}
}

// EnqueueRequest puts the request into the queue. maxQueries is user-specific value that specifies how many queriers can
// this user use (zero or negative = all queriers). It is passed to each EnqueueRequest, because it can change
// between calls.
Expand Down Expand Up @@ -369,6 +399,11 @@ type querierConnection struct {
element *list.Element
}

func (q *querierConnection) sendError(err error) {
// If GetNextRequestForQuerier is already gone, we don't care, so ignore the result from send.
_ = q.send(nextRequestForQuerier{err: err})
}

// send sends req to the GetNextRequestForQuerier call that is waiting for a new query.
// Returns true if sending succeeds, or false otherwise (eg. because the GetNextRequestForQuerier call has already returned due to a context
// cancellation).
Expand Down
56 changes: 56 additions & 0 deletions pkg/scheduler/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,59 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBe
// We expect that querier-2 got the request only after querier-1 forget delay is passed.
assert.GreaterOrEqual(t, waitTime.Milliseconds(), forgetDelay.Milliseconds())
}

func TestRequestQueue_GetNextRequestForQuerier_ShouldReturnAfterGracefulShutdownNotification(t *testing.T) {
const forgetDelay = 3 * time.Second
const querierID = "querier-1"

queue := NewRequestQueue(1, forgetDelay,
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}))

ctx := context.Background()
require.NoError(t, services.StartAndAwaitRunning(ctx, queue))
t.Cleanup(func() {
require.NoError(t, services.StopAndAwaitTerminated(ctx, queue))
})

queue.RegisterQuerierConnection(querierID)
errChan := make(chan error)

go func() {
_, _, err := queue.GetNextRequestForQuerier(context.Background(), FirstUser(), querierID)
errChan <- err
}()

time.Sleep(20 * time.Millisecond) // Wait for GetNextRequestForQuerier to be waiting for a query.
queue.NotifyQuerierShutdown(querierID)

select {
case err := <-errChan:
require.EqualError(t, err, "querier has informed the scheduler it is shutting down")
case <-time.After(time.Second):
require.Fail(t, "gave up waiting for GetNextRequestForQuerierToReturn")
}
}

func TestRequestQueue_GetNextRequestForQuerier_ShouldReturnImmediatelyIfQuerierIsAlreadyShuttingDown(t *testing.T) {
const forgetDelay = 3 * time.Second
const querierID = "querier-1"

queue := NewRequestQueue(1, forgetDelay,
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}))

ctx := context.Background()
require.NoError(t, services.StartAndAwaitRunning(ctx, queue))
t.Cleanup(func() {
require.NoError(t, services.StopAndAwaitTerminated(ctx, queue))
})

queue.RegisterQuerierConnection(querierID)
queue.NotifyQuerierShutdown(querierID)

_, _, err := queue.GetNextRequestForQuerier(context.Background(), FirstUser(), querierID)
require.EqualError(t, err, "querier has informed the scheduler it is shutting down")
}
10 changes: 5 additions & 5 deletions pkg/scheduler/queue/user_queues.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,15 +146,15 @@ func (q *queues) getOrAddQueue(userID string, maxQueriers int) *list.List {
}

// Finds next queue for the querier. To support fair scheduling between users, client is expected
// to pass last user index returned by this function as argument. Is there was no previous
// to pass last user index returned by this function as argument. If there was no previous
// last user index, use -1.
func (q *queues) getNextQueueForQuerier(lastUserIndex int, querierID string) (*list.List, string, int) {
func (q *queues) getNextQueueForQuerier(lastUserIndex int, querierID string) (*list.List, string, int, error) {
uid := lastUserIndex

// Ensure the querier is not shutting down. If the querier is shutting down, we shouldn't forward
// any more queries to it.
if info := q.queriers[querierID]; info == nil || info.shuttingDown {
return nil, "", uid
return nil, "", uid, ErrQuerierShuttingDown
}

for iters := 0; iters < len(q.users); iters++ {
Expand All @@ -180,9 +180,9 @@ func (q *queues) getNextQueueForQuerier(lastUserIndex int, querierID string) (*l
}
}

return userQueue.requests, u, uid
return userQueue.requests, u, uid, nil
}
return nil, "", uid
return nil, "", uid, nil
}

func (q *queues) addQuerierConnection(querierID string) {
Expand Down
24 changes: 16 additions & 8 deletions pkg/scheduler/queue/user_queues_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ func TestQueues(t *testing.T) {
uq.addQuerierConnection("querier-1")
uq.addQuerierConnection("querier-2")

q, u, lastUserIndex := uq.getNextQueueForQuerier(-1, "querier-1")
q, u, lastUserIndex, err := uq.getNextQueueForQuerier(-1, "querier-1")
assert.Nil(t, q)
assert.Equal(t, "", u)
assert.NoError(t, err)

// Add queues: [one]
qOne := getOrAdd(t, uq, "one", 0)
Expand Down Expand Up @@ -73,8 +74,9 @@ func TestQueues(t *testing.T) {
uq.deleteQueue("four")
assert.NoError(t, isConsistent(uq))

q, _, _ = uq.getNextQueueForQuerier(lastUserIndex, "querier-1")
q, _, _, err = uq.getNextQueueForQuerier(lastUserIndex, "querier-1")
assert.Nil(t, q)
assert.NoError(t, err)
}

func TestQueuesOnTerminatingQuerier(t *testing.T) {
Expand All @@ -93,18 +95,20 @@ func TestQueuesOnTerminatingQuerier(t *testing.T) {

// After notify shutdown for querier-2, it's expected to own no queue.
uq.notifyQuerierShutdown("querier-2")
q, u, _ := uq.getNextQueueForQuerier(-1, "querier-2")
q, u, _, err := uq.getNextQueueForQuerier(-1, "querier-2")
assert.Nil(t, q)
assert.Equal(t, "", u)
assert.Equal(t, ErrQuerierShuttingDown, err)

// However, querier-1 still get queues because it's still running.
confirmOrderForQuerier(t, uq, "querier-1", -1, qOne, qTwo, qOne, qTwo)

// After disconnecting querier-2, it's expected to own no queue.
uq.removeQuerier("querier-2")
q, u, _ = uq.getNextQueueForQuerier(-1, "querier-2")
q, u, _, err = uq.getNextQueueForQuerier(-1, "querier-2")
assert.Nil(t, q)
assert.Equal(t, "", u)
assert.Equal(t, ErrQuerierShuttingDown, err)
}

func TestQueuesWithQueriers(t *testing.T) {
Expand All @@ -122,9 +126,10 @@ func TestQueuesWithQueriers(t *testing.T) {
uq.addQuerierConnection(qid)

// No querier has any queues yet.
q, u, _ := uq.getNextQueueForQuerier(-1, qid)
q, u, _, err := uq.getNextQueueForQuerier(-1, qid)
assert.Nil(t, q)
assert.Equal(t, "", u)
assert.NoError(t, err)
}

assert.NoError(t, isConsistent(uq))
Expand All @@ -148,7 +153,8 @@ func TestQueuesWithQueriers(t *testing.T) {

lastUserIndex := -1
for {
_, _, newIx := uq.getNextQueueForQuerier(lastUserIndex, qid)
_, _, newIx, err := uq.getNextQueueForQuerier(lastUserIndex, qid)
assert.NoError(t, err)
if newIx < lastUserIndex {
break
}
Expand Down Expand Up @@ -201,7 +207,7 @@ func TestQueuesConsistency(t *testing.T) {
assert.NotNil(t, uq.getOrAddQueue(generateTenant(r), 3))
case 1:
qid := generateQuerier(r)
_, _, luid := uq.getNextQueueForQuerier(lastUserIndexes[qid], qid)
_, _, luid, _ := uq.getNextQueueForQuerier(lastUserIndexes[qid], qid)
lastUserIndexes[qid] = luid
case 2:
uq.deleteQueue(generateTenant(r))
Expand Down Expand Up @@ -410,9 +416,11 @@ func getOrAdd(t *testing.T, uq *queues, tenant string, maxQueriers int) *list.Li
func confirmOrderForQuerier(t *testing.T, uq *queues, querier string, lastUserIndex int, qs ...*list.List) int {
var n *list.List
for _, q := range qs {
n, _, lastUserIndex = uq.getNextQueueForQuerier(lastUserIndex, querier)
var err error
n, _, lastUserIndex, err = uq.getNextQueueForQuerier(lastUserIndex, querier)
assert.Equal(t, q, n)
assert.NoError(t, isConsistent(uq))
assert.NoError(t, err)
}
return lastUserIndex
}
Expand Down

0 comments on commit 0996f8e

Please sign in to comment.