Skip to content

Commit

Permalink
Add test case that will fail until Prometheus draining PR is merged
Browse files Browse the repository at this point in the history
  • Loading branch information
charleskorn committed Jun 26, 2024
1 parent 93783be commit 63af277
Showing 1 changed file with 115 additions and 4 deletions.
119 changes: 115 additions & 4 deletions pkg/ruler/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ package ruler

import (
"context"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"path/filepath"
"sort"
"testing"
Expand Down Expand Up @@ -266,6 +269,108 @@ func TestFilterRuleGroupsByNotEmptyUsers(t *testing.T) {
}
}

func TestDefaultMultiTenantManager_WaitsToDrainPendingNotificationsOnShutdown(t *testing.T) {
releaseReceiver := make(chan struct{})
receiverReceivedRequest := make(chan struct{}, 2)
alertsReceived := atomic.NewInt64(0)

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Let the test know we've received a request.
receiverReceivedRequest <- struct{}{}

var alerts []*Alert

b, err := io.ReadAll(r.Body)
require.NoError(t, err)

err = json.Unmarshal(b, &alerts)
require.NoError(t, err)

alertsReceived.Add(int64(len(alerts)))

// Wait for the test to release us.
<-releaseReceiver

w.WriteHeader(http.StatusOK)
}))
defer func() {
server.Close()
}()

const user = "user-1"
ctx := context.Background()
logger := testutil.NewTestingLogger(t)
user1Group1 := createRuleGroup("group-1", user, createRecordingRule("count:metric_1", "count(metric_1)"))

// TODO: configure draining on shutdown
cfg := Config{
RulePath: t.TempDir(),
AlertmanagerURL: server.URL,
NotificationQueueCapacity: 1000,
NotificationTimeout: 10 * time.Second,
}
m, err := NewDefaultMultiTenantManager(cfg, managerMockFactory, nil, logger, nil)
require.NoError(t, err)

m.SyncFullRuleGroups(ctx, map[string]rulespb.RuleGroupList{
user: {user1Group1},
})
m.Start()

// Wait for the manager to be running and have discovered the Alertmanager, then queue a notification.
userManager := assertManagerMockRunningForUser(t, m, user)
waitForAlertmanagerToBeDiscovered(t, userManager.notifier)
userManager.notifier.Send(&notifier.Alert{Labels: labels.FromStrings(labels.AlertName, "alert-1")})

// Wait for the Alertmanager to receive the request.
select {
case <-receiverReceivedRequest:
// We can continue.
case <-time.After(time.Second):
require.FailNow(t, "gave up waiting for first notification request to be sent")
}

// Stop the manager, and queue a second notification once the manager is stopped.
// This second notification will remain in the queue until we release the first notification's request by closing releaseReceiver below.
userManager.onStop = func() {
userManager.notifier.Send(&notifier.Alert{Labels: labels.FromStrings(labels.AlertName, "alert-2")})
}

// Stop() blocks until the user managers and notifiers have stopped, so run it in the background.
stopped := make(chan struct{})
go func() {
defer close(stopped)
m.Stop()
}()

assertManagerMockStopped(t, userManager)

// Wait a bit for the notifier to be told to shut down.
// This is a hack, but we have no more robust way to ensure that the notifier has acknowledged the shutdown request.
time.Sleep(100 * time.Millisecond)

// The notifier should now be in the draining state.
// Release the first request so that the second notification is drained from the queue, then check that both notifications are received and the manager has stopped.
close(releaseReceiver)
require.Eventually(t, func() bool {
return alertsReceived.Load() == 2
}, time.Second, 10*time.Millisecond, "gave up waiting for second notification to be sent")

select {
case <-stopped:
// Manager has stopped, nothing more to do.
case <-time.After(time.Second):
require.FailNow(t, "gave up waiting for multi-tenant manager to stop")
}
}

func waitForAlertmanagerToBeDiscovered(t *testing.T, notifier *notifier.Manager) {
// There is a hardcoded 5 second refresh interval in discovery.Manager, so we need to wait for that to happen at least once.
require.Eventually(t, func() bool {
return len(notifier.Alertmanagers()) > 0
}, 10*time.Second, 100*time.Millisecond, "gave up waiting for static Alertmanager URL to be discovered")
}

func getManager(m *DefaultMultiTenantManager, user string) RulesManager {
m.userManagerMtx.RLock()
defer m.userManagerMtx.RUnlock()
Expand Down Expand Up @@ -339,19 +444,25 @@ func assertRuleGroupsMappedOnDisk(t *testing.T, m *DefaultMultiTenantManager, us
}
}

func managerMockFactory(_ context.Context, _ string, _ *notifier.Manager, _ log.Logger, _ prometheus.Registerer) RulesManager {
return &managerMock{done: make(chan struct{})}
func managerMockFactory(_ context.Context, _ string, n *notifier.Manager, _ log.Logger, _ prometheus.Registerer) RulesManager {
return &managerMock{done: make(chan struct{}), notifier: n}
}

type managerMock struct {
running atomic.Bool
done chan struct{}
running atomic.Bool
done chan struct{}
notifier *notifier.Manager
onStop func()
}

func (m *managerMock) Run() {
defer m.running.Store(false)
m.running.Store(true)
<-m.done

if m.onStop != nil {
m.onStop()
}
}

func (m *managerMock) Stop() {
Expand Down

0 comments on commit 63af277

Please sign in to comment.