diff --git a/CHANGELOG.md b/CHANGELOG.md index 95ee8ff054a..e466542fb2e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ * [ENHANCEMENT] Fetch secrets used to configure server-side TLS from Vault when `-vault.enabled` is true. #6052. * [BUGFIX] Query-frontend: Don't retry read requests rejected by the ingester due to utilization based read path limiting. #6032 * [BUGFIX] Ring: Ensure network addresses used for component hash rings are formatted correctly when using IPv6. #6068 +* [BUGFIX] Query-scheduler: don't retain connections from queriers that have shut down, leading to gradually increasing enqueue latency over time. #6100 ### Mixin diff --git a/pkg/scheduler/queue/queue.go b/pkg/scheduler/queue/queue.go index 17e05ed43be..e1b446a91e9 100644 --- a/pkg/scheduler/queue/queue.go +++ b/pkg/scheduler/queue/queue.go @@ -23,8 +23,9 @@ const ( ) var ( - ErrTooManyRequests = errors.New("too many outstanding requests") - ErrStopped = errors.New("queue is stopped") + ErrTooManyRequests = errors.New("too many outstanding requests") + ErrStopped = errors.New("queue is stopped") + ErrQuerierShuttingDown = errors.New("querier has informed the scheduler it is shutting down") ) // UserIndex is opaque type that allows to resume iteration over users between successive calls @@ -151,6 +152,12 @@ func (q *RequestQueue) dispatcherLoop() { case notifyShutdown: queues.notifyQuerierShutdown(qe.querierID) needToDispatchQueries = true + + // Tell any waiting GetNextRequestForQuerier calls for this querier that nothing is coming. + // If the querier shuts down without notifying us, this is OK: we'll never mark it as shutting down, so we'll + // dispatch a query to GetNextRequestForQuerier and Scheduler.QuerierLoop will try to send the query to it + // later. This will fail because the connection is broken, and GetNextRequestForQuerier won't be called again. + q.cancelWaitingConnectionsForQuerier(qe.querierID, waitingQuerierConnections) case forgetDisconnected: if queues.forgetDisconnectedQueriers(time.Now()) > 0 { // Removing some queriers may have caused a resharding. @@ -194,7 +201,7 @@ func (q *RequestQueue) dispatcherLoop() { for currentElement != nil { querierConn := currentElement.Value.(*querierConnection) - _ = querierConn.send(nextRequestForQuerier{err: ErrStopped}) // If GetNextRequestForQuerier is already gone, we don't care, so ignore the result. + querierConn.sendError(ErrStopped) currentElement = currentElement.Next() } @@ -231,7 +238,14 @@ func (q *RequestQueue) handleEnqueueRequest(queues *queues, r enqueueRequest) er // dispatchRequestToQuerier finds and forwards a request to a querier, if a suitable request is available. // Returns true if this querier should be removed from the list of waiting queriers (eg. because a request has been forwarded to it), false otherwise. func (q *RequestQueue) dispatchRequestToQuerier(queues *queues, querierConn *querierConnection) bool { - queue, userID, idx := queues.getNextQueueForQuerier(querierConn.lastUserIndex.last, querierConn.querierID) + // If this querier has told us it's shutting down, don't bother trying to find a query request for it. + // Terminate GetNextRequestForQuerier with an error now. + queue, userID, idx, err := queues.getNextQueueForQuerier(querierConn.lastUserIndex.last, querierConn.querierID) + if err != nil { + querierConn.sendError(err) + return true + } + querierConn.lastUserIndex.last = idx if queue == nil { // Nothing available for this querier, try again next time. @@ -262,6 +276,22 @@ func (q *RequestQueue) dispatchRequestToQuerier(queues *queues, querierConn *que return true } +func (q *RequestQueue) cancelWaitingConnectionsForQuerier(querierID string, waitingQuerierConnections *list.List) { + currentElement := waitingQuerierConnections.Front() + + for currentElement != nil { + querierConn := currentElement.Value.(*querierConnection) + nextElement := currentElement.Next() // We have to capture the next element before calling Remove(), as Remove() clears it. + + if querierConn.querierID == querierID { + querierConn.sendError(ErrQuerierShuttingDown) + waitingQuerierConnections.Remove(currentElement) + } + + currentElement = nextElement + } +} + // EnqueueRequest puts the request into the queue. maxQueries is user-specific value that specifies how many queriers can // this user use (zero or negative = all queriers). It is passed to each EnqueueRequest, because it can change // between calls. @@ -369,6 +399,11 @@ type querierConnection struct { element *list.Element } +func (q *querierConnection) sendError(err error) { + // If GetNextRequestForQuerier is already gone, we don't care, so ignore the result from send. + _ = q.send(nextRequestForQuerier{err: err}) +} + // send sends req to the GetNextRequestForQuerier call that is waiting for a new query. // Returns true if sending succeeds, or false otherwise (eg. because the GetNextRequestForQuerier call has already returned due to a context // cancellation). diff --git a/pkg/scheduler/queue/queue_test.go b/pkg/scheduler/queue/queue_test.go index 974c1e7cbf5..70f3c07970c 100644 --- a/pkg/scheduler/queue/queue_test.go +++ b/pkg/scheduler/queue/queue_test.go @@ -183,3 +183,59 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBe // We expect that querier-2 got the request only after querier-1 forget delay is passed. assert.GreaterOrEqual(t, waitTime.Milliseconds(), forgetDelay.Milliseconds()) } + +func TestRequestQueue_GetNextRequestForQuerier_ShouldReturnAfterGracefulShutdownNotification(t *testing.T) { + const forgetDelay = 3 * time.Second + const querierID = "querier-1" + + queue := NewRequestQueue(1, forgetDelay, + promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), + promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), + promauto.With(nil).NewHistogram(prometheus.HistogramOpts{})) + + ctx := context.Background() + require.NoError(t, services.StartAndAwaitRunning(ctx, queue)) + t.Cleanup(func() { + require.NoError(t, services.StopAndAwaitTerminated(ctx, queue)) + }) + + queue.RegisterQuerierConnection(querierID) + errChan := make(chan error) + + go func() { + _, _, err := queue.GetNextRequestForQuerier(context.Background(), FirstUser(), querierID) + errChan <- err + }() + + time.Sleep(20 * time.Millisecond) // Wait for GetNextRequestForQuerier to be waiting for a query. + queue.NotifyQuerierShutdown(querierID) + + select { + case err := <-errChan: + require.EqualError(t, err, "querier has informed the scheduler it is shutting down") + case <-time.After(time.Second): + require.Fail(t, "gave up waiting for GetNextRequestForQuerierToReturn") + } +} + +func TestRequestQueue_GetNextRequestForQuerier_ShouldReturnImmediatelyIfQuerierIsAlreadyShuttingDown(t *testing.T) { + const forgetDelay = 3 * time.Second + const querierID = "querier-1" + + queue := NewRequestQueue(1, forgetDelay, + promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), + promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), + promauto.With(nil).NewHistogram(prometheus.HistogramOpts{})) + + ctx := context.Background() + require.NoError(t, services.StartAndAwaitRunning(ctx, queue)) + t.Cleanup(func() { + require.NoError(t, services.StopAndAwaitTerminated(ctx, queue)) + }) + + queue.RegisterQuerierConnection(querierID) + queue.NotifyQuerierShutdown(querierID) + + _, _, err := queue.GetNextRequestForQuerier(context.Background(), FirstUser(), querierID) + require.EqualError(t, err, "querier has informed the scheduler it is shutting down") +} diff --git a/pkg/scheduler/queue/user_queues.go b/pkg/scheduler/queue/user_queues.go index 7136cf7343e..49dc866060f 100644 --- a/pkg/scheduler/queue/user_queues.go +++ b/pkg/scheduler/queue/user_queues.go @@ -146,15 +146,15 @@ func (q *queues) getOrAddQueue(userID string, maxQueriers int) *list.List { } // Finds next queue for the querier. To support fair scheduling between users, client is expected -// to pass last user index returned by this function as argument. Is there was no previous +// to pass last user index returned by this function as argument. If there was no previous // last user index, use -1. -func (q *queues) getNextQueueForQuerier(lastUserIndex int, querierID string) (*list.List, string, int) { +func (q *queues) getNextQueueForQuerier(lastUserIndex int, querierID string) (*list.List, string, int, error) { uid := lastUserIndex // Ensure the querier is not shutting down. If the querier is shutting down, we shouldn't forward // any more queries to it. if info := q.queriers[querierID]; info == nil || info.shuttingDown { - return nil, "", uid + return nil, "", uid, ErrQuerierShuttingDown } for iters := 0; iters < len(q.users); iters++ { @@ -180,9 +180,9 @@ func (q *queues) getNextQueueForQuerier(lastUserIndex int, querierID string) (*l } } - return userQueue.requests, u, uid + return userQueue.requests, u, uid, nil } - return nil, "", uid + return nil, "", uid, nil } func (q *queues) addQuerierConnection(querierID string) { diff --git a/pkg/scheduler/queue/user_queues_test.go b/pkg/scheduler/queue/user_queues_test.go index ee8ede54ab0..8a81a2f100f 100644 --- a/pkg/scheduler/queue/user_queues_test.go +++ b/pkg/scheduler/queue/user_queues_test.go @@ -27,9 +27,10 @@ func TestQueues(t *testing.T) { uq.addQuerierConnection("querier-1") uq.addQuerierConnection("querier-2") - q, u, lastUserIndex := uq.getNextQueueForQuerier(-1, "querier-1") + q, u, lastUserIndex, err := uq.getNextQueueForQuerier(-1, "querier-1") assert.Nil(t, q) assert.Equal(t, "", u) + assert.NoError(t, err) // Add queues: [one] qOne := getOrAdd(t, uq, "one", 0) @@ -73,8 +74,9 @@ func TestQueues(t *testing.T) { uq.deleteQueue("four") assert.NoError(t, isConsistent(uq)) - q, _, _ = uq.getNextQueueForQuerier(lastUserIndex, "querier-1") + q, _, _, err = uq.getNextQueueForQuerier(lastUserIndex, "querier-1") assert.Nil(t, q) + assert.NoError(t, err) } func TestQueuesOnTerminatingQuerier(t *testing.T) { @@ -93,18 +95,20 @@ func TestQueuesOnTerminatingQuerier(t *testing.T) { // After notify shutdown for querier-2, it's expected to own no queue. uq.notifyQuerierShutdown("querier-2") - q, u, _ := uq.getNextQueueForQuerier(-1, "querier-2") + q, u, _, err := uq.getNextQueueForQuerier(-1, "querier-2") assert.Nil(t, q) assert.Equal(t, "", u) + assert.Equal(t, ErrQuerierShuttingDown, err) // However, querier-1 still get queues because it's still running. confirmOrderForQuerier(t, uq, "querier-1", -1, qOne, qTwo, qOne, qTwo) // After disconnecting querier-2, it's expected to own no queue. uq.removeQuerier("querier-2") - q, u, _ = uq.getNextQueueForQuerier(-1, "querier-2") + q, u, _, err = uq.getNextQueueForQuerier(-1, "querier-2") assert.Nil(t, q) assert.Equal(t, "", u) + assert.Equal(t, ErrQuerierShuttingDown, err) } func TestQueuesWithQueriers(t *testing.T) { @@ -122,9 +126,10 @@ func TestQueuesWithQueriers(t *testing.T) { uq.addQuerierConnection(qid) // No querier has any queues yet. - q, u, _ := uq.getNextQueueForQuerier(-1, qid) + q, u, _, err := uq.getNextQueueForQuerier(-1, qid) assert.Nil(t, q) assert.Equal(t, "", u) + assert.NoError(t, err) } assert.NoError(t, isConsistent(uq)) @@ -148,7 +153,8 @@ func TestQueuesWithQueriers(t *testing.T) { lastUserIndex := -1 for { - _, _, newIx := uq.getNextQueueForQuerier(lastUserIndex, qid) + _, _, newIx, err := uq.getNextQueueForQuerier(lastUserIndex, qid) + assert.NoError(t, err) if newIx < lastUserIndex { break } @@ -201,7 +207,7 @@ func TestQueuesConsistency(t *testing.T) { assert.NotNil(t, uq.getOrAddQueue(generateTenant(r), 3)) case 1: qid := generateQuerier(r) - _, _, luid := uq.getNextQueueForQuerier(lastUserIndexes[qid], qid) + _, _, luid, _ := uq.getNextQueueForQuerier(lastUserIndexes[qid], qid) lastUserIndexes[qid] = luid case 2: uq.deleteQueue(generateTenant(r)) @@ -410,9 +416,11 @@ func getOrAdd(t *testing.T, uq *queues, tenant string, maxQueriers int) *list.Li func confirmOrderForQuerier(t *testing.T, uq *queues, querier string, lastUserIndex int, qs ...*list.List) int { var n *list.List for _, q := range qs { - n, _, lastUserIndex = uq.getNextQueueForQuerier(lastUserIndex, querier) + var err error + n, _, lastUserIndex, err = uq.getNextQueueForQuerier(lastUserIndex, querier) assert.Equal(t, q, n) assert.NoError(t, isConsistent(uq)) + assert.NoError(t, err) } return lastUserIndex }