From efc1de823942ccf53c5357154538aae618190c2e Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Wed, 12 Jun 2024 13:45:47 +1000 Subject: [PATCH 1/7] Don't log `msg="error starting notifier discovery manager" err="context canceled"` when the ruler shuts down cleanly. --- pkg/ruler/notifier.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/pkg/ruler/notifier.go b/pkg/ruler/notifier.go index fb6f9a48aa9..50a5ff47fa0 100644 --- a/pkg/ruler/notifier.go +++ b/pkg/ruler/notifier.go @@ -7,6 +7,7 @@ package ruler import ( "context" + "errors" "flag" "net/url" "strings" @@ -69,10 +70,15 @@ func newRulerNotifier(o *notifier.Options, l gklog.Logger) (*rulerNotifier, erro func (rn *rulerNotifier) run() { rn.wg.Add(2) go func() { - if err := rn.sdManager.Run(); err != nil { - level.Error(rn.logger).Log("msg", "error starting notifier discovery manager", "err", err) + defer rn.wg.Done() + + // Ignore context cancelled errors: cancelling the context is how we stop the manager when shutting down normally. + if err := rn.sdManager.Run(); err != nil && !errors.Is(err, context.Canceled) { + level.Error(rn.logger).Log("msg", "error running notifier discovery manager", "err", err) + return } - rn.wg.Done() + + level.Info(rn.logger).Log("msg", "notifier discovery manager stopped") }() go func() { rn.notifier.Run(rn.sdManager.SyncCh()) From 3927f0622599a2b7aed680551755c8fb0fa46a2e Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Wed, 12 Jun 2024 14:08:31 +1000 Subject: [PATCH 2/7] Stop notifiers after all rule evaluations have finished. --- pkg/ruler/manager.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/pkg/ruler/manager.go b/pkg/ruler/manager.go index cb89b3e93af..5b5865810ba 100644 --- a/pkg/ruler/manager.go +++ b/pkg/ruler/manager.go @@ -368,12 +368,6 @@ func (r *DefaultMultiTenantManager) GetRules(userID string) []*promRules.Group { } func (r *DefaultMultiTenantManager) Stop() { - r.notifiersMtx.Lock() - for _, n := range r.notifiers { - n.stop() - } - r.notifiersMtx.Unlock() - level.Info(r.logger).Log("msg", "stopping user managers") wg := sync.WaitGroup{} r.userManagerMtx.Lock() @@ -391,6 +385,14 @@ func (r *DefaultMultiTenantManager) Stop() { r.userManagerMtx.Unlock() level.Info(r.logger).Log("msg", "all user managers stopped") + // Stop notifiers after all rule evaluations have finished, so that we have + // a chance to send any notifications generated while shutting down. + r.notifiersMtx.Lock() + for _, n := range r.notifiers { + n.stop() + } + r.notifiersMtx.Unlock() + // cleanup user rules directories r.mapper.cleanup() } From 91db91173d1148e400ccc66eeb3873ad0fc4edf2 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Fri, 14 Jun 2024 13:53:16 +1000 Subject: [PATCH 3/7] Run `Stop` calls in parallel, to mitigate blocking behaviour of `Stop` introduced in https://github.com/prometheus/prometheus/pull/14290/commits/086be262a8483589400828cd7f5f79e7b7b367dd when draining is enabled --- pkg/ruler/manager.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/pkg/ruler/manager.go b/pkg/ruler/manager.go index 5b5865810ba..f7cea08434b 100644 --- a/pkg/ruler/manager.go +++ b/pkg/ruler/manager.go @@ -387,11 +387,20 @@ func (r *DefaultMultiTenantManager) Stop() { // Stop notifiers after all rule evaluations have finished, so that we have // a chance to send any notifications generated while shutting down. + // rulerNotifier.stop() may take some time to complete if notifications need to be drained from the queue. + level.Info(r.logger).Log("msg", "stopping user notifiers") + wg = sync.WaitGroup{} r.notifiersMtx.Lock() for _, n := range r.notifiers { - n.stop() + wg.Add(1) + go func(n *rulerNotifier) { + defer wg.Done() + n.stop() + }(n) } + wg.Wait() r.notifiersMtx.Unlock() + level.Info(r.logger).Log("msg", "all user notifiers stopped") // cleanup user rules directories r.mapper.cleanup() From de40862e820730947d542ab94e53cf8e2bc5ddcc Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Wed, 26 Jun 2024 11:53:40 +1000 Subject: [PATCH 4/7] Add test case that will fail until Prometheus draining PR is merged --- pkg/ruler/manager_test.go | 119 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 115 insertions(+), 4 deletions(-) diff --git a/pkg/ruler/manager_test.go b/pkg/ruler/manager_test.go index 073f03ebacf..c71c0ce5755 100644 --- a/pkg/ruler/manager_test.go +++ b/pkg/ruler/manager_test.go @@ -7,7 +7,10 @@ package ruler import ( "context" + "encoding/json" "io" + "net/http" + "net/http/httptest" "path/filepath" "sort" "testing" @@ -266,6 +269,108 @@ func TestFilterRuleGroupsByNotEmptyUsers(t *testing.T) { } } +func TestDefaultMultiTenantManager_WaitsToDrainPendingNotificationsOnShutdown(t *testing.T) { + releaseReceiver := make(chan struct{}) + receiverReceivedRequest := make(chan struct{}, 2) + alertsReceived := atomic.NewInt64(0) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Let the test know we've received a request. + receiverReceivedRequest <- struct{}{} + + var alerts []*Alert + + b, err := io.ReadAll(r.Body) + require.NoError(t, err) + + err = json.Unmarshal(b, &alerts) + require.NoError(t, err) + + alertsReceived.Add(int64(len(alerts))) + + // Wait for the test to release us. + <-releaseReceiver + + w.WriteHeader(http.StatusOK) + })) + defer func() { + server.Close() + }() + + const user = "user-1" + ctx := context.Background() + logger := testutil.NewTestingLogger(t) + user1Group1 := createRuleGroup("group-1", user, createRecordingRule("count:metric_1", "count(metric_1)")) + + // TODO: configure draining on shutdown + cfg := Config{ + RulePath: t.TempDir(), + AlertmanagerURL: server.URL, + NotificationQueueCapacity: 1000, + NotificationTimeout: 10 * time.Second, + } + m, err := NewDefaultMultiTenantManager(cfg, managerMockFactory, nil, logger, nil) + require.NoError(t, err) + + m.SyncFullRuleGroups(ctx, map[string]rulespb.RuleGroupList{ + user: {user1Group1}, + }) + m.Start() + + // Wait for the manager to be running and have discovered the Alertmanager, then queue a notification. + userManager := assertManagerMockRunningForUser(t, m, user) + waitForAlertmanagerToBeDiscovered(t, userManager.notifier) + userManager.notifier.Send(¬ifier.Alert{Labels: labels.FromStrings(labels.AlertName, "alert-1")}) + + // Wait for the Alertmanager to receive the request. + select { + case <-receiverReceivedRequest: + // We can continue. + case <-time.After(time.Second): + require.FailNow(t, "gave up waiting for first notification request to be sent") + } + + // Stop the manager, and queue a second notification once the manager is stopped. + // This second notification will remain in the queue until we release the first notification's request by closing releaseReceiver below. + userManager.onStop = func() { + userManager.notifier.Send(¬ifier.Alert{Labels: labels.FromStrings(labels.AlertName, "alert-2")}) + } + + // Stop() blocks until the user managers and notifiers have stopped, so run it in the background. + stopped := make(chan struct{}) + go func() { + defer close(stopped) + m.Stop() + }() + + assertManagerMockStopped(t, userManager) + + // Wait a bit for the notifier to be told to shut down. + // This is a hack, but we have no more robust way to ensure that the notifier has acknowledged the shutdown request. + time.Sleep(100 * time.Millisecond) + + // The notifier should now be in the draining state. + // Release the first request so that the second notification is drained from the queue, then check that both notifications are received and the manager has stopped. + close(releaseReceiver) + require.Eventually(t, func() bool { + return alertsReceived.Load() == 2 + }, time.Second, 10*time.Millisecond, "gave up waiting for second notification to be sent") + + select { + case <-stopped: + // Manager has stopped, nothing more to do. + case <-time.After(time.Second): + require.FailNow(t, "gave up waiting for multi-tenant manager to stop") + } +} + +func waitForAlertmanagerToBeDiscovered(t *testing.T, notifier *notifier.Manager) { + // There is a hardcoded 5 second refresh interval in discovery.Manager, so we need to wait for that to happen at least once. + require.Eventually(t, func() bool { + return len(notifier.Alertmanagers()) > 0 + }, 10*time.Second, 100*time.Millisecond, "gave up waiting for static Alertmanager URL to be discovered") +} + func getManager(m *DefaultMultiTenantManager, user string) RulesManager { m.userManagerMtx.RLock() defer m.userManagerMtx.RUnlock() @@ -339,19 +444,25 @@ func assertRuleGroupsMappedOnDisk(t *testing.T, m *DefaultMultiTenantManager, us } } -func managerMockFactory(_ context.Context, _ string, _ *notifier.Manager, _ log.Logger, _ prometheus.Registerer) RulesManager { - return &managerMock{done: make(chan struct{})} +func managerMockFactory(_ context.Context, _ string, n *notifier.Manager, _ log.Logger, _ prometheus.Registerer) RulesManager { + return &managerMock{done: make(chan struct{}), notifier: n} } type managerMock struct { - running atomic.Bool - done chan struct{} + running atomic.Bool + done chan struct{} + notifier *notifier.Manager + onStop func() } func (m *managerMock) Run() { defer m.running.Store(false) m.running.Store(true) <-m.done + + if m.onStop != nil { + m.onStop() + } } func (m *managerMock) Stop() { From f3d6c2ec9d0b804339ae201e9a0dc5708cdd1509 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Wed, 26 Jun 2024 11:56:15 +1000 Subject: [PATCH 5/7] Add changelog entry --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a11f9284a9a..24928ecf52f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ * [ENHANCEMENT] Alertmanager: Reloading config and templates no longer needs to hit the disk. #4967 * [ENHANCEMENT] Compactor: Added experimantal `-compactor.in-memory-tenant-meta-cache-size` option to set size of in-memory cache (in number of items) for parsed meta.json files. This can help when tenant has many meta.json files and their parsing before each compaction cycle is using a lot of CPU time. #8544 * [ENHANCEMENT] Distributor: Interrupt OTLP write request translation when context is canceled or has timed out. #8524 +* [BUGFIX] Ruler: add support for draining any outstanding alert notifications before shutting down. This can be enabled with the `-ruler.drain-notification-queue-on-shutdown=true` CLI flag. #8346 * [BUGFIX] Query-frontend: fix `-querier.max-query-lookback` enforcement when `-compactor.blocks-retention-period` is not set, and viceversa. #8388 * [BUGFIX] Ingester: fix sporadic `not found` error causing an internal server error if label names are queried with matchers during head compaction. #8391 * [BUGFIX] Ingester, store-gateway: fix case insensitive regular expressions not matching correctly some Unicode characters. #8391 From 7a5ccdf8aefe7ed1e7641ed41fd1ee0038f22156 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Wed, 26 Jun 2024 17:09:15 +1000 Subject: [PATCH 6/7] Add comments. --- pkg/ruler/manager.go | 6 ++++++ pkg/ruler/notifier.go | 3 +++ 2 files changed, 9 insertions(+) diff --git a/pkg/ruler/manager.go b/pkg/ruler/manager.go index f7cea08434b..cddfaca08c1 100644 --- a/pkg/ruler/manager.go +++ b/pkg/ruler/manager.go @@ -342,6 +342,8 @@ func (r *DefaultMultiTenantManager) removeUsersIf(shouldRemove func(userID strin continue } + // Stop manager in the background, so we don't block further resharding operations. + // The manager won't terminate until any inflight evaluations are complete. go mngr.Stop() delete(r.userManagers, userID) @@ -354,6 +356,10 @@ func (r *DefaultMultiTenantManager) removeUsersIf(shouldRemove func(userID strin } r.managersTotal.Set(float64(len(r.userManagers))) + + // Note that we don't remove any notifiers here: + // - stopping a notifier can take quite some time, as it needs to drain the notification queue (if enabled), and we don't want to block further resharding operations + // - we can safely reuse the notifier if the tenant is resharded back to this ruler in the future } func (r *DefaultMultiTenantManager) GetRules(userID string) []*promRules.Group { diff --git a/pkg/ruler/notifier.go b/pkg/ruler/notifier.go index 50a5ff47fa0..f8447aef9e7 100644 --- a/pkg/ruler/notifier.go +++ b/pkg/ruler/notifier.go @@ -98,6 +98,9 @@ func (rn *rulerNotifier) applyConfig(cfg *config.Config) error { return rn.sdManager.ApplyConfig(sdCfgs) } +// stop stops the notifier and waits for it to terminate. +// +// Note that this can take quite some time if draining the notification queue is enabled. func (rn *rulerNotifier) stop() { rn.sdCancel(errRulerNotifierStopped) rn.notifier.Stop() From b696239c3004b28e528b762dd329d668e1d54f48 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Fri, 5 Jul 2024 19:45:42 +1000 Subject: [PATCH 7/7] Add CLI flag to enable draining --- cmd/mimir/config-descriptor.json | 11 +++++++++++ cmd/mimir/help-all.txt.tmpl | 2 ++ .../mimir/configure/configuration-parameters/index.md | 5 +++++ pkg/ruler/manager.go | 5 +++-- pkg/ruler/manager_test.go | 10 +++++----- pkg/ruler/ruler.go | 3 +++ 6 files changed, 29 insertions(+), 7 deletions(-) diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index 8875eebc45c..b9590046923 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -11828,6 +11828,17 @@ "fieldValue": null, "fieldDefaultValue": null }, + { + "kind": "field", + "name": "drain_notification_queue_on_shutdown", + "required": false, + "desc": "Drain all outstanding alert notifications when shutting down. If false, any outstanding alert notifications are dropped when shutting down.", + "fieldValue": null, + "fieldDefaultValue": false, + "fieldFlag": "ruler.drain-notification-queue-on-shutdown", + "fieldType": "boolean", + "fieldCategory": "experimental" + }, { "kind": "field", "name": "for_outage_tolerance", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 70d2a2e69e8..34aa5916bbe 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -2611,6 +2611,8 @@ Usage of ./cmd/mimir/mimir: Override the expected name on the server certificate. -ruler.disabled-tenants comma-separated-list-of-strings Comma separated list of tenants whose rules this ruler cannot evaluate. If specified, a ruler that would normally pick the specified tenant(s) for processing will ignore them instead. Subject to sharding. + -ruler.drain-notification-queue-on-shutdown + [experimental] Drain all outstanding alert notifications when shutting down. If false, any outstanding alert notifications are dropped when shutting down. -ruler.enable-api Enable the ruler config API. (default true) -ruler.enabled-tenants comma-separated-list-of-strings diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index a058f11712c..d73f3f939ca 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -1941,6 +1941,11 @@ alertmanager_client: # CLI flag: -ruler.alertmanager-client.basic-auth-password [basic_auth_password: | default = ""] +# (experimental) Drain all outstanding alert notifications when shutting down. +# If false, any outstanding alert notifications are dropped when shutting down. +# CLI flag: -ruler.drain-notification-queue-on-shutdown +[drain_notification_queue_on_shutdown: | default = false] + # (advanced) Max time to tolerate outage for restoring "for" state of alert. # CLI flag: -ruler.for-outage-tolerance [for_outage_tolerance: | default = 1h] diff --git a/pkg/ruler/manager.go b/pkg/ruler/manager.go index cddfaca08c1..40d304ee64a 100644 --- a/pkg/ruler/manager.go +++ b/pkg/ruler/manager.go @@ -298,8 +298,9 @@ func (r *DefaultMultiTenantManager) getOrCreateNotifier(userID string) (*notifie reg = prometheus.WrapRegistererWithPrefix("cortex_", reg) var err error if n, err = newRulerNotifier(¬ifier.Options{ - QueueCapacity: r.cfg.NotificationQueueCapacity, - Registerer: reg, + QueueCapacity: r.cfg.NotificationQueueCapacity, + DrainOnShutdown: r.cfg.DrainNotificationQueueOnShutdown, + Registerer: reg, Do: func(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error) { // Note: The passed-in context comes from the Prometheus notifier // and does *not* contain the userID. So it needs to be added to the context diff --git a/pkg/ruler/manager_test.go b/pkg/ruler/manager_test.go index c71c0ce5755..fd73713f589 100644 --- a/pkg/ruler/manager_test.go +++ b/pkg/ruler/manager_test.go @@ -302,12 +302,12 @@ func TestDefaultMultiTenantManager_WaitsToDrainPendingNotificationsOnShutdown(t logger := testutil.NewTestingLogger(t) user1Group1 := createRuleGroup("group-1", user, createRecordingRule("count:metric_1", "count(metric_1)")) - // TODO: configure draining on shutdown cfg := Config{ - RulePath: t.TempDir(), - AlertmanagerURL: server.URL, - NotificationQueueCapacity: 1000, - NotificationTimeout: 10 * time.Second, + RulePath: t.TempDir(), + AlertmanagerURL: server.URL, + NotificationQueueCapacity: 1000, + NotificationTimeout: 10 * time.Second, + DrainNotificationQueueOnShutdown: true, } m, err := NewDefaultMultiTenantManager(cfg, managerMockFactory, nil, logger, nil) require.NoError(t, err) diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index bcb90e512c7..3c1afed7892 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -109,6 +109,8 @@ type Config struct { NotificationTimeout time.Duration `yaml:"notification_timeout" category:"advanced"` // Client configs for interacting with the Alertmanager Notifier NotifierConfig `yaml:"alertmanager_client"` + // Enable draining the pending alert notification queue when shutting down. + DrainNotificationQueueOnShutdown bool `yaml:"drain_notification_queue_on_shutdown" category:"experimental"` // Max time to tolerate outage for restoring "for" state of alert. OutageTolerance time.Duration `yaml:"for_outage_tolerance" category:"advanced"` @@ -170,6 +172,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { f.DurationVar(&cfg.AlertmanagerRefreshInterval, "ruler.alertmanager-refresh-interval", 1*time.Minute, "How long to wait between refreshing DNS resolutions of Alertmanager hosts.") f.IntVar(&cfg.NotificationQueueCapacity, "ruler.notification-queue-capacity", 10000, "Capacity of the queue for notifications to be sent to the Alertmanager.") f.DurationVar(&cfg.NotificationTimeout, "ruler.notification-timeout", 10*time.Second, "HTTP timeout duration when sending notifications to the Alertmanager.") + f.BoolVar(&cfg.DrainNotificationQueueOnShutdown, "ruler.drain-notification-queue-on-shutdown", false, "Drain all outstanding alert notifications when shutting down. If false, any outstanding alert notifications are dropped when shutting down.") f.StringVar(&cfg.RulePath, "ruler.rule-path", "./data-ruler/", "Directory to store temporary rule files loaded by the Prometheus rule managers. This directory is not required to be persisted between restarts.") f.BoolVar(&cfg.EnableAPI, "ruler.enable-api", true, "Enable the ruler config API.")