Skip to content

Commit

Permalink
Ruler shutdown behaviour improvements (#8346)
Browse files Browse the repository at this point in the history
* Don't log `msg="error starting notifier discovery manager" err="context canceled"` when the ruler shuts down cleanly.

* Stop notifiers after all rule evaluations have finished.

* Run `Stop` calls in parallel, to mitigate blocking behaviour of `Stop` introduced in prometheus/prometheus@086be26 when draining is enabled

* Add test case that will fail until Prometheus draining PR is merged

* Add changelog entry

* Add comments.

* Add CLI flag to enable draining
  • Loading branch information
charleskorn authored Jul 8, 2024
1 parent 003d6e8 commit 43140f8
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
* [ENHANCEMENT] Compactor: Added experimantal `-compactor.in-memory-tenant-meta-cache-size` option to set size of in-memory cache (in number of items) for parsed meta.json files. This can help when tenant has many meta.json files and their parsing before each compaction cycle is using a lot of CPU time. #8544
* [ENHANCEMENT] Distributor: Interrupt OTLP write request translation when context is canceled or has timed out. #8524
* [ENHANCEMENT] Ingester, store-gateway: optimised regular expression matching for patterns like `1.*|2.*|3.*|...|1000.*`. #8632
* [BUGFIX] Ruler: add support for draining any outstanding alert notifications before shutting down. This can be enabled with the `-ruler.drain-notification-queue-on-shutdown=true` CLI flag. #8346
* [BUGFIX] Query-frontend: fix `-querier.max-query-lookback` enforcement when `-compactor.blocks-retention-period` is not set, and viceversa. #8388
* [BUGFIX] Ingester: fix sporadic `not found` error causing an internal server error if label names are queried with matchers during head compaction. #8391
* [BUGFIX] Ingester, store-gateway: fix case insensitive regular expressions not matching correctly some Unicode characters. #8391
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -11828,6 +11828,17 @@
"fieldValue": null,
"fieldDefaultValue": null
},
{
"kind": "field",
"name": "drain_notification_queue_on_shutdown",
"required": false,
"desc": "Drain all outstanding alert notifications when shutting down. If false, any outstanding alert notifications are dropped when shutting down.",
"fieldValue": null,
"fieldDefaultValue": false,
"fieldFlag": "ruler.drain-notification-queue-on-shutdown",
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "for_outage_tolerance",
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -2613,6 +2613,8 @@ Usage of ./cmd/mimir/mimir:
Override the expected name on the server certificate.
-ruler.disabled-tenants comma-separated-list-of-strings
Comma separated list of tenants whose rules this ruler cannot evaluate. If specified, a ruler that would normally pick the specified tenant(s) for processing will ignore them instead. Subject to sharding.
-ruler.drain-notification-queue-on-shutdown
[experimental] Drain all outstanding alert notifications when shutting down. If false, any outstanding alert notifications are dropped when shutting down.
-ruler.enable-api
Enable the ruler config API. (default true)
-ruler.enabled-tenants comma-separated-list-of-strings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1947,6 +1947,11 @@ alertmanager_client:
# CLI flag: -ruler.alertmanager-client.basic-auth-password
[basic_auth_password: <string> | default = ""]
# (experimental) Drain all outstanding alert notifications when shutting down.
# If false, any outstanding alert notifications are dropped when shutting down.
# CLI flag: -ruler.drain-notification-queue-on-shutdown
[drain_notification_queue_on_shutdown: <boolean> | default = false]
# (advanced) Max time to tolerate outage for restoring "for" state of alert.
# CLI flag: -ruler.for-outage-tolerance
[for_outage_tolerance: <duration> | default = 1h]
Expand Down
34 changes: 26 additions & 8 deletions pkg/ruler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,9 @@ func (r *DefaultMultiTenantManager) getOrCreateNotifier(userID string) (*notifie
reg = prometheus.WrapRegistererWithPrefix("cortex_", reg)
var err error
if n, err = newRulerNotifier(&notifier.Options{
QueueCapacity: r.cfg.NotificationQueueCapacity,
Registerer: reg,
QueueCapacity: r.cfg.NotificationQueueCapacity,
DrainOnShutdown: r.cfg.DrainNotificationQueueOnShutdown,
Registerer: reg,
Do: func(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error) {
// Note: The passed-in context comes from the Prometheus notifier
// and does *not* contain the userID. So it needs to be added to the context
Expand Down Expand Up @@ -342,6 +343,8 @@ func (r *DefaultMultiTenantManager) removeUsersIf(shouldRemove func(userID strin
continue
}

// Stop manager in the background, so we don't block further resharding operations.
// The manager won't terminate until any inflight evaluations are complete.
go mngr.Stop()
delete(r.userManagers, userID)

Expand All @@ -354,6 +357,10 @@ func (r *DefaultMultiTenantManager) removeUsersIf(shouldRemove func(userID strin
}

r.managersTotal.Set(float64(len(r.userManagers)))

// Note that we don't remove any notifiers here:
// - stopping a notifier can take quite some time, as it needs to drain the notification queue (if enabled), and we don't want to block further resharding operations
// - we can safely reuse the notifier if the tenant is resharded back to this ruler in the future
}

func (r *DefaultMultiTenantManager) GetRules(userID string) []*promRules.Group {
Expand All @@ -368,12 +375,6 @@ func (r *DefaultMultiTenantManager) GetRules(userID string) []*promRules.Group {
}

func (r *DefaultMultiTenantManager) Stop() {
r.notifiersMtx.Lock()
for _, n := range r.notifiers {
n.stop()
}
r.notifiersMtx.Unlock()

level.Info(r.logger).Log("msg", "stopping user managers")
wg := sync.WaitGroup{}
r.userManagerMtx.Lock()
Expand All @@ -391,6 +392,23 @@ func (r *DefaultMultiTenantManager) Stop() {
r.userManagerMtx.Unlock()
level.Info(r.logger).Log("msg", "all user managers stopped")

// Stop notifiers after all rule evaluations have finished, so that we have
// a chance to send any notifications generated while shutting down.
// rulerNotifier.stop() may take some time to complete if notifications need to be drained from the queue.
level.Info(r.logger).Log("msg", "stopping user notifiers")
wg = sync.WaitGroup{}
r.notifiersMtx.Lock()
for _, n := range r.notifiers {
wg.Add(1)
go func(n *rulerNotifier) {
defer wg.Done()
n.stop()
}(n)
}
wg.Wait()
r.notifiersMtx.Unlock()
level.Info(r.logger).Log("msg", "all user notifiers stopped")

// cleanup user rules directories
r.mapper.cleanup()
}
Expand Down
119 changes: 115 additions & 4 deletions pkg/ruler/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ package ruler

import (
"context"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"path/filepath"
"sort"
"testing"
Expand Down Expand Up @@ -266,6 +269,108 @@ func TestFilterRuleGroupsByNotEmptyUsers(t *testing.T) {
}
}

func TestDefaultMultiTenantManager_WaitsToDrainPendingNotificationsOnShutdown(t *testing.T) {
releaseReceiver := make(chan struct{})
receiverReceivedRequest := make(chan struct{}, 2)
alertsReceived := atomic.NewInt64(0)

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Let the test know we've received a request.
receiverReceivedRequest <- struct{}{}

var alerts []*Alert

b, err := io.ReadAll(r.Body)
require.NoError(t, err)

err = json.Unmarshal(b, &alerts)
require.NoError(t, err)

alertsReceived.Add(int64(len(alerts)))

// Wait for the test to release us.
<-releaseReceiver

w.WriteHeader(http.StatusOK)
}))
defer func() {
server.Close()
}()

const user = "user-1"
ctx := context.Background()
logger := testutil.NewTestingLogger(t)
user1Group1 := createRuleGroup("group-1", user, createRecordingRule("count:metric_1", "count(metric_1)"))

cfg := Config{
RulePath: t.TempDir(),
AlertmanagerURL: server.URL,
NotificationQueueCapacity: 1000,
NotificationTimeout: 10 * time.Second,
DrainNotificationQueueOnShutdown: true,
}
m, err := NewDefaultMultiTenantManager(cfg, managerMockFactory, nil, logger, nil)
require.NoError(t, err)

m.SyncFullRuleGroups(ctx, map[string]rulespb.RuleGroupList{
user: {user1Group1},
})
m.Start()

// Wait for the manager to be running and have discovered the Alertmanager, then queue a notification.
userManager := assertManagerMockRunningForUser(t, m, user)
waitForAlertmanagerToBeDiscovered(t, userManager.notifier)
userManager.notifier.Send(&notifier.Alert{Labels: labels.FromStrings(labels.AlertName, "alert-1")})

// Wait for the Alertmanager to receive the request.
select {
case <-receiverReceivedRequest:
// We can continue.
case <-time.After(time.Second):
require.FailNow(t, "gave up waiting for first notification request to be sent")
}

// Stop the manager, and queue a second notification once the manager is stopped.
// This second notification will remain in the queue until we release the first notification's request by closing releaseReceiver below.
userManager.onStop = func() {
userManager.notifier.Send(&notifier.Alert{Labels: labels.FromStrings(labels.AlertName, "alert-2")})
}

// Stop() blocks until the user managers and notifiers have stopped, so run it in the background.
stopped := make(chan struct{})
go func() {
defer close(stopped)
m.Stop()
}()

assertManagerMockStopped(t, userManager)

// Wait a bit for the notifier to be told to shut down.
// This is a hack, but we have no more robust way to ensure that the notifier has acknowledged the shutdown request.
time.Sleep(100 * time.Millisecond)

// The notifier should now be in the draining state.
// Release the first request so that the second notification is drained from the queue, then check that both notifications are received and the manager has stopped.
close(releaseReceiver)
require.Eventually(t, func() bool {
return alertsReceived.Load() == 2
}, time.Second, 10*time.Millisecond, "gave up waiting for second notification to be sent")

select {
case <-stopped:
// Manager has stopped, nothing more to do.
case <-time.After(time.Second):
require.FailNow(t, "gave up waiting for multi-tenant manager to stop")
}
}

func waitForAlertmanagerToBeDiscovered(t *testing.T, notifier *notifier.Manager) {
// There is a hardcoded 5 second refresh interval in discovery.Manager, so we need to wait for that to happen at least once.
require.Eventually(t, func() bool {
return len(notifier.Alertmanagers()) > 0
}, 10*time.Second, 100*time.Millisecond, "gave up waiting for static Alertmanager URL to be discovered")
}

func getManager(m *DefaultMultiTenantManager, user string) RulesManager {
m.userManagerMtx.RLock()
defer m.userManagerMtx.RUnlock()
Expand Down Expand Up @@ -339,19 +444,25 @@ func assertRuleGroupsMappedOnDisk(t *testing.T, m *DefaultMultiTenantManager, us
}
}

func managerMockFactory(_ context.Context, _ string, _ *notifier.Manager, _ log.Logger, _ prometheus.Registerer) RulesManager {
return &managerMock{done: make(chan struct{})}
func managerMockFactory(_ context.Context, _ string, n *notifier.Manager, _ log.Logger, _ prometheus.Registerer) RulesManager {
return &managerMock{done: make(chan struct{}), notifier: n}
}

type managerMock struct {
running atomic.Bool
done chan struct{}
running atomic.Bool
done chan struct{}
notifier *notifier.Manager
onStop func()
}

func (m *managerMock) Run() {
defer m.running.Store(false)
m.running.Store(true)
<-m.done

if m.onStop != nil {
m.onStop()
}
}

func (m *managerMock) Stop() {
Expand Down
15 changes: 12 additions & 3 deletions pkg/ruler/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package ruler

import (
"context"
"errors"
"flag"
"net/url"
"strings"
Expand Down Expand Up @@ -69,10 +70,15 @@ func newRulerNotifier(o *notifier.Options, l gklog.Logger) (*rulerNotifier, erro
func (rn *rulerNotifier) run() {
rn.wg.Add(2)
go func() {
if err := rn.sdManager.Run(); err != nil {
level.Error(rn.logger).Log("msg", "error starting notifier discovery manager", "err", err)
defer rn.wg.Done()

// Ignore context cancelled errors: cancelling the context is how we stop the manager when shutting down normally.
if err := rn.sdManager.Run(); err != nil && !errors.Is(err, context.Canceled) {
level.Error(rn.logger).Log("msg", "error running notifier discovery manager", "err", err)
return
}
rn.wg.Done()

level.Info(rn.logger).Log("msg", "notifier discovery manager stopped")
}()
go func() {
rn.notifier.Run(rn.sdManager.SyncCh())
Expand All @@ -92,6 +98,9 @@ func (rn *rulerNotifier) applyConfig(cfg *config.Config) error {
return rn.sdManager.ApplyConfig(sdCfgs)
}

// stop stops the notifier and waits for it to terminate.
//
// Note that this can take quite some time if draining the notification queue is enabled.
func (rn *rulerNotifier) stop() {
rn.sdCancel(errRulerNotifierStopped)
rn.notifier.Stop()
Expand Down
3 changes: 3 additions & 0 deletions pkg/ruler/ruler.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ type Config struct {
NotificationTimeout time.Duration `yaml:"notification_timeout" category:"advanced"`
// Client configs for interacting with the Alertmanager
Notifier NotifierConfig `yaml:"alertmanager_client"`
// Enable draining the pending alert notification queue when shutting down.
DrainNotificationQueueOnShutdown bool `yaml:"drain_notification_queue_on_shutdown" category:"experimental"`

// Max time to tolerate outage for restoring "for" state of alert.
OutageTolerance time.Duration `yaml:"for_outage_tolerance" category:"advanced"`
Expand Down Expand Up @@ -170,6 +172,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
f.DurationVar(&cfg.AlertmanagerRefreshInterval, "ruler.alertmanager-refresh-interval", 1*time.Minute, "How long to wait between refreshing DNS resolutions of Alertmanager hosts.")
f.IntVar(&cfg.NotificationQueueCapacity, "ruler.notification-queue-capacity", 10000, "Capacity of the queue for notifications to be sent to the Alertmanager.")
f.DurationVar(&cfg.NotificationTimeout, "ruler.notification-timeout", 10*time.Second, "HTTP timeout duration when sending notifications to the Alertmanager.")
f.BoolVar(&cfg.DrainNotificationQueueOnShutdown, "ruler.drain-notification-queue-on-shutdown", false, "Drain all outstanding alert notifications when shutting down. If false, any outstanding alert notifications are dropped when shutting down.")

f.StringVar(&cfg.RulePath, "ruler.rule-path", "./data-ruler/", "Directory to store temporary rule files loaded by the Prometheus rule managers. This directory is not required to be persisted between restarts.")
f.BoolVar(&cfg.EnableAPI, "ruler.enable-api", true, "Enable the ruler config API.")
Expand Down

0 comments on commit 43140f8

Please sign in to comment.