From 63af2774c69d1d0948a3ffcc2b58cedae69f0326 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Wed, 26 Jun 2024 11:53:40 +1000 Subject: [PATCH] Add test case that will fail until Prometheus draining PR is merged --- pkg/ruler/manager_test.go | 119 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 115 insertions(+), 4 deletions(-) diff --git a/pkg/ruler/manager_test.go b/pkg/ruler/manager_test.go index 073f03ebacf..c71c0ce5755 100644 --- a/pkg/ruler/manager_test.go +++ b/pkg/ruler/manager_test.go @@ -7,7 +7,10 @@ package ruler import ( "context" + "encoding/json" "io" + "net/http" + "net/http/httptest" "path/filepath" "sort" "testing" @@ -266,6 +269,108 @@ func TestFilterRuleGroupsByNotEmptyUsers(t *testing.T) { } } +func TestDefaultMultiTenantManager_WaitsToDrainPendingNotificationsOnShutdown(t *testing.T) { + releaseReceiver := make(chan struct{}) + receiverReceivedRequest := make(chan struct{}, 2) + alertsReceived := atomic.NewInt64(0) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Let the test know we've received a request. + receiverReceivedRequest <- struct{}{} + + var alerts []*Alert + + b, err := io.ReadAll(r.Body) + require.NoError(t, err) + + err = json.Unmarshal(b, &alerts) + require.NoError(t, err) + + alertsReceived.Add(int64(len(alerts))) + + // Wait for the test to release us. + <-releaseReceiver + + w.WriteHeader(http.StatusOK) + })) + defer func() { + server.Close() + }() + + const user = "user-1" + ctx := context.Background() + logger := testutil.NewTestingLogger(t) + user1Group1 := createRuleGroup("group-1", user, createRecordingRule("count:metric_1", "count(metric_1)")) + + // TODO: configure draining on shutdown + cfg := Config{ + RulePath: t.TempDir(), + AlertmanagerURL: server.URL, + NotificationQueueCapacity: 1000, + NotificationTimeout: 10 * time.Second, + } + m, err := NewDefaultMultiTenantManager(cfg, managerMockFactory, nil, logger, nil) + require.NoError(t, err) + + m.SyncFullRuleGroups(ctx, map[string]rulespb.RuleGroupList{ + user: {user1Group1}, + }) + m.Start() + + // Wait for the manager to be running and have discovered the Alertmanager, then queue a notification. + userManager := assertManagerMockRunningForUser(t, m, user) + waitForAlertmanagerToBeDiscovered(t, userManager.notifier) + userManager.notifier.Send(¬ifier.Alert{Labels: labels.FromStrings(labels.AlertName, "alert-1")}) + + // Wait for the Alertmanager to receive the request. + select { + case <-receiverReceivedRequest: + // We can continue. + case <-time.After(time.Second): + require.FailNow(t, "gave up waiting for first notification request to be sent") + } + + // Stop the manager, and queue a second notification once the manager is stopped. + // This second notification will remain in the queue until we release the first notification's request by closing releaseReceiver below. + userManager.onStop = func() { + userManager.notifier.Send(¬ifier.Alert{Labels: labels.FromStrings(labels.AlertName, "alert-2")}) + } + + // Stop() blocks until the user managers and notifiers have stopped, so run it in the background. + stopped := make(chan struct{}) + go func() { + defer close(stopped) + m.Stop() + }() + + assertManagerMockStopped(t, userManager) + + // Wait a bit for the notifier to be told to shut down. + // This is a hack, but we have no more robust way to ensure that the notifier has acknowledged the shutdown request. + time.Sleep(100 * time.Millisecond) + + // The notifier should now be in the draining state. + // Release the first request so that the second notification is drained from the queue, then check that both notifications are received and the manager has stopped. + close(releaseReceiver) + require.Eventually(t, func() bool { + return alertsReceived.Load() == 2 + }, time.Second, 10*time.Millisecond, "gave up waiting for second notification to be sent") + + select { + case <-stopped: + // Manager has stopped, nothing more to do. + case <-time.After(time.Second): + require.FailNow(t, "gave up waiting for multi-tenant manager to stop") + } +} + +func waitForAlertmanagerToBeDiscovered(t *testing.T, notifier *notifier.Manager) { + // There is a hardcoded 5 second refresh interval in discovery.Manager, so we need to wait for that to happen at least once. + require.Eventually(t, func() bool { + return len(notifier.Alertmanagers()) > 0 + }, 10*time.Second, 100*time.Millisecond, "gave up waiting for static Alertmanager URL to be discovered") +} + func getManager(m *DefaultMultiTenantManager, user string) RulesManager { m.userManagerMtx.RLock() defer m.userManagerMtx.RUnlock() @@ -339,19 +444,25 @@ func assertRuleGroupsMappedOnDisk(t *testing.T, m *DefaultMultiTenantManager, us } } -func managerMockFactory(_ context.Context, _ string, _ *notifier.Manager, _ log.Logger, _ prometheus.Registerer) RulesManager { - return &managerMock{done: make(chan struct{})} +func managerMockFactory(_ context.Context, _ string, n *notifier.Manager, _ log.Logger, _ prometheus.Registerer) RulesManager { + return &managerMock{done: make(chan struct{}), notifier: n} } type managerMock struct { - running atomic.Bool - done chan struct{} + running atomic.Bool + done chan struct{} + notifier *notifier.Manager + onStop func() } func (m *managerMock) Run() { defer m.running.Store(false) m.running.Store(true) <-m.done + + if m.onStop != nil { + m.onStop() + } } func (m *managerMock) Stop() {