Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ruler shutdown behaviour improvements #8346

Merged
merged 8 commits into from
Jul 8, 2024
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
* [ENHANCEMENT] Alertmanager: Reloading config and templates no longer needs to hit the disk. #4967
* [ENHANCEMENT] Compactor: Added experimantal `-compactor.in-memory-tenant-meta-cache-size` option to set size of in-memory cache (in number of items) for parsed meta.json files. This can help when tenant has many meta.json files and their parsing before each compaction cycle is using a lot of CPU time. #8544
* [ENHANCEMENT] Distributor: Interrupt OTLP write request translation when context is canceled or has timed out. #8524
* [BUGFIX] Ruler: add support for draining any outstanding alert notifications before shutting down. This can be enabled with the `-ruler.drain-notification-queue-on-shutdown=true` CLI flag. #8346
* [BUGFIX] Query-frontend: fix `-querier.max-query-lookback` enforcement when `-compactor.blocks-retention-period` is not set, and viceversa. #8388
* [BUGFIX] Ingester: fix sporadic `not found` error causing an internal server error if label names are queried with matchers during head compaction. #8391
* [BUGFIX] Ingester, store-gateway: fix case insensitive regular expressions not matching correctly some Unicode characters. #8391
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -11828,6 +11828,17 @@
"fieldValue": null,
"fieldDefaultValue": null
},
{
"kind": "field",
"name": "drain_notification_queue_on_shutdown",
"required": false,
"desc": "Drain all outstanding alert notifications when shutting down. If false, any outstanding alert notifications are dropped when shutting down.",
"fieldValue": null,
"fieldDefaultValue": false,
"fieldFlag": "ruler.drain-notification-queue-on-shutdown",
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "for_outage_tolerance",
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -2611,6 +2611,8 @@ Usage of ./cmd/mimir/mimir:
Override the expected name on the server certificate.
-ruler.disabled-tenants comma-separated-list-of-strings
Comma separated list of tenants whose rules this ruler cannot evaluate. If specified, a ruler that would normally pick the specified tenant(s) for processing will ignore them instead. Subject to sharding.
-ruler.drain-notification-queue-on-shutdown
[experimental] Drain all outstanding alert notifications when shutting down. If false, any outstanding alert notifications are dropped when shutting down.
-ruler.enable-api
Enable the ruler config API. (default true)
-ruler.enabled-tenants comma-separated-list-of-strings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1941,6 +1941,11 @@ alertmanager_client:
# CLI flag: -ruler.alertmanager-client.basic-auth-password
[basic_auth_password: <string> | default = ""]

# (experimental) Drain all outstanding alert notifications when shutting down.
# If false, any outstanding alert notifications are dropped when shutting down.
# CLI flag: -ruler.drain-notification-queue-on-shutdown
[drain_notification_queue_on_shutdown: <boolean> | default = false]

# (advanced) Max time to tolerate outage for restoring "for" state of alert.
# CLI flag: -ruler.for-outage-tolerance
[for_outage_tolerance: <duration> | default = 1h]
Expand Down
34 changes: 26 additions & 8 deletions pkg/ruler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,9 @@ func (r *DefaultMultiTenantManager) getOrCreateNotifier(userID string) (*notifie
reg = prometheus.WrapRegistererWithPrefix("cortex_", reg)
var err error
if n, err = newRulerNotifier(&notifier.Options{
QueueCapacity: r.cfg.NotificationQueueCapacity,
Registerer: reg,
QueueCapacity: r.cfg.NotificationQueueCapacity,
DrainOnShutdown: r.cfg.DrainNotificationQueueOnShutdown,
Registerer: reg,
Do: func(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error) {
// Note: The passed-in context comes from the Prometheus notifier
// and does *not* contain the userID. So it needs to be added to the context
Expand Down Expand Up @@ -342,6 +343,8 @@ func (r *DefaultMultiTenantManager) removeUsersIf(shouldRemove func(userID strin
continue
}

// Stop manager in the background, so we don't block further resharding operations.
// The manager won't terminate until any inflight evaluations are complete.
go mngr.Stop()
delete(r.userManagers, userID)

Expand All @@ -354,6 +357,10 @@ func (r *DefaultMultiTenantManager) removeUsersIf(shouldRemove func(userID strin
}

r.managersTotal.Set(float64(len(r.userManagers)))

// Note that we don't remove any notifiers here:
// - stopping a notifier can take quite some time, as it needs to drain the notification queue (if enabled), and we don't want to block further resharding operations
// - we can safely reuse the notifier if the tenant is resharded back to this ruler in the future
}

func (r *DefaultMultiTenantManager) GetRules(userID string) []*promRules.Group {
Expand All @@ -368,12 +375,6 @@ func (r *DefaultMultiTenantManager) GetRules(userID string) []*promRules.Group {
}

func (r *DefaultMultiTenantManager) Stop() {
r.notifiersMtx.Lock()
for _, n := range r.notifiers {
n.stop()
}
r.notifiersMtx.Unlock()

level.Info(r.logger).Log("msg", "stopping user managers")
wg := sync.WaitGroup{}
r.userManagerMtx.Lock()
Expand All @@ -391,6 +392,23 @@ func (r *DefaultMultiTenantManager) Stop() {
r.userManagerMtx.Unlock()
level.Info(r.logger).Log("msg", "all user managers stopped")

// Stop notifiers after all rule evaluations have finished, so that we have
// a chance to send any notifications generated while shutting down.
// rulerNotifier.stop() may take some time to complete if notifications need to be drained from the queue.
level.Info(r.logger).Log("msg", "stopping user notifiers")
charleskorn marked this conversation as resolved.
Show resolved Hide resolved
wg = sync.WaitGroup{}
r.notifiersMtx.Lock()
for _, n := range r.notifiers {
wg.Add(1)
go func(n *rulerNotifier) {
defer wg.Done()
n.stop()
}(n)
}
wg.Wait()
r.notifiersMtx.Unlock()
level.Info(r.logger).Log("msg", "all user notifiers stopped")

// cleanup user rules directories
r.mapper.cleanup()
}
Expand Down
119 changes: 115 additions & 4 deletions pkg/ruler/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ package ruler

import (
"context"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"path/filepath"
"sort"
"testing"
Expand Down Expand Up @@ -266,6 +269,108 @@ func TestFilterRuleGroupsByNotEmptyUsers(t *testing.T) {
}
}

func TestDefaultMultiTenantManager_WaitsToDrainPendingNotificationsOnShutdown(t *testing.T) {
releaseReceiver := make(chan struct{})
receiverReceivedRequest := make(chan struct{}, 2)
alertsReceived := atomic.NewInt64(0)

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Let the test know we've received a request.
receiverReceivedRequest <- struct{}{}

var alerts []*Alert

b, err := io.ReadAll(r.Body)
require.NoError(t, err)

err = json.Unmarshal(b, &alerts)
require.NoError(t, err)

alertsReceived.Add(int64(len(alerts)))

// Wait for the test to release us.
<-releaseReceiver

w.WriteHeader(http.StatusOK)
}))
defer func() {
server.Close()
}()

const user = "user-1"
ctx := context.Background()
logger := testutil.NewTestingLogger(t)
user1Group1 := createRuleGroup("group-1", user, createRecordingRule("count:metric_1", "count(metric_1)"))

cfg := Config{
RulePath: t.TempDir(),
AlertmanagerURL: server.URL,
NotificationQueueCapacity: 1000,
NotificationTimeout: 10 * time.Second,
DrainNotificationQueueOnShutdown: true,
}
m, err := NewDefaultMultiTenantManager(cfg, managerMockFactory, nil, logger, nil)
require.NoError(t, err)

m.SyncFullRuleGroups(ctx, map[string]rulespb.RuleGroupList{
user: {user1Group1},
})
m.Start()

// Wait for the manager to be running and have discovered the Alertmanager, then queue a notification.
userManager := assertManagerMockRunningForUser(t, m, user)
waitForAlertmanagerToBeDiscovered(t, userManager.notifier)
userManager.notifier.Send(&notifier.Alert{Labels: labels.FromStrings(labels.AlertName, "alert-1")})

// Wait for the Alertmanager to receive the request.
select {
case <-receiverReceivedRequest:
// We can continue.
case <-time.After(time.Second):
require.FailNow(t, "gave up waiting for first notification request to be sent")
}

// Stop the manager, and queue a second notification once the manager is stopped.
// This second notification will remain in the queue until we release the first notification's request by closing releaseReceiver below.
userManager.onStop = func() {
userManager.notifier.Send(&notifier.Alert{Labels: labels.FromStrings(labels.AlertName, "alert-2")})
}

// Stop() blocks until the user managers and notifiers have stopped, so run it in the background.
stopped := make(chan struct{})
go func() {
defer close(stopped)
m.Stop()
}()

assertManagerMockStopped(t, userManager)

// Wait a bit for the notifier to be told to shut down.
// This is a hack, but we have no more robust way to ensure that the notifier has acknowledged the shutdown request.
time.Sleep(100 * time.Millisecond)

// The notifier should now be in the draining state.
// Release the first request so that the second notification is drained from the queue, then check that both notifications are received and the manager has stopped.
close(releaseReceiver)
require.Eventually(t, func() bool {
return alertsReceived.Load() == 2
}, time.Second, 10*time.Millisecond, "gave up waiting for second notification to be sent")

select {
case <-stopped:
// Manager has stopped, nothing more to do.
case <-time.After(time.Second):
require.FailNow(t, "gave up waiting for multi-tenant manager to stop")
}
}

func waitForAlertmanagerToBeDiscovered(t *testing.T, notifier *notifier.Manager) {
// There is a hardcoded 5 second refresh interval in discovery.Manager, so we need to wait for that to happen at least once.
require.Eventually(t, func() bool {
return len(notifier.Alertmanagers()) > 0
}, 10*time.Second, 100*time.Millisecond, "gave up waiting for static Alertmanager URL to be discovered")
}

func getManager(m *DefaultMultiTenantManager, user string) RulesManager {
m.userManagerMtx.RLock()
defer m.userManagerMtx.RUnlock()
Expand Down Expand Up @@ -339,19 +444,25 @@ func assertRuleGroupsMappedOnDisk(t *testing.T, m *DefaultMultiTenantManager, us
}
}

func managerMockFactory(_ context.Context, _ string, _ *notifier.Manager, _ log.Logger, _ prometheus.Registerer) RulesManager {
return &managerMock{done: make(chan struct{})}
func managerMockFactory(_ context.Context, _ string, n *notifier.Manager, _ log.Logger, _ prometheus.Registerer) RulesManager {
return &managerMock{done: make(chan struct{}), notifier: n}
}

type managerMock struct {
running atomic.Bool
done chan struct{}
running atomic.Bool
done chan struct{}
notifier *notifier.Manager
onStop func()
}

func (m *managerMock) Run() {
defer m.running.Store(false)
m.running.Store(true)
<-m.done

if m.onStop != nil {
m.onStop()
}
}

func (m *managerMock) Stop() {
Expand Down
15 changes: 12 additions & 3 deletions pkg/ruler/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package ruler

import (
"context"
"errors"
"flag"
"net/url"
"strings"
Expand Down Expand Up @@ -69,10 +70,15 @@ func newRulerNotifier(o *notifier.Options, l gklog.Logger) (*rulerNotifier, erro
func (rn *rulerNotifier) run() {
rn.wg.Add(2)
go func() {
if err := rn.sdManager.Run(); err != nil {
level.Error(rn.logger).Log("msg", "error starting notifier discovery manager", "err", err)
defer rn.wg.Done()

// Ignore context cancelled errors: cancelling the context is how we stop the manager when shutting down normally.
if err := rn.sdManager.Run(); err != nil && !errors.Is(err, context.Canceled) {
level.Error(rn.logger).Log("msg", "error running notifier discovery manager", "err", err)
return
}
rn.wg.Done()

level.Info(rn.logger).Log("msg", "notifier discovery manager stopped")
}()
go func() {
rn.notifier.Run(rn.sdManager.SyncCh())
Expand All @@ -92,6 +98,9 @@ func (rn *rulerNotifier) applyConfig(cfg *config.Config) error {
return rn.sdManager.ApplyConfig(sdCfgs)
}

// stop stops the notifier and waits for it to terminate.
//
// Note that this can take quite some time if draining the notification queue is enabled.
func (rn *rulerNotifier) stop() {
rn.sdCancel(errRulerNotifierStopped)
rn.notifier.Stop()
Expand Down
3 changes: 3 additions & 0 deletions pkg/ruler/ruler.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ type Config struct {
NotificationTimeout time.Duration `yaml:"notification_timeout" category:"advanced"`
// Client configs for interacting with the Alertmanager
Notifier NotifierConfig `yaml:"alertmanager_client"`
// Enable draining the pending alert notification queue when shutting down.
DrainNotificationQueueOnShutdown bool `yaml:"drain_notification_queue_on_shutdown" category:"experimental"`

// Max time to tolerate outage for restoring "for" state of alert.
OutageTolerance time.Duration `yaml:"for_outage_tolerance" category:"advanced"`
Expand Down Expand Up @@ -170,6 +172,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
f.DurationVar(&cfg.AlertmanagerRefreshInterval, "ruler.alertmanager-refresh-interval", 1*time.Minute, "How long to wait between refreshing DNS resolutions of Alertmanager hosts.")
f.IntVar(&cfg.NotificationQueueCapacity, "ruler.notification-queue-capacity", 10000, "Capacity of the queue for notifications to be sent to the Alertmanager.")
f.DurationVar(&cfg.NotificationTimeout, "ruler.notification-timeout", 10*time.Second, "HTTP timeout duration when sending notifications to the Alertmanager.")
f.BoolVar(&cfg.DrainNotificationQueueOnShutdown, "ruler.drain-notification-queue-on-shutdown", false, "Drain all outstanding alert notifications when shutting down. If false, any outstanding alert notifications are dropped when shutting down.")

f.StringVar(&cfg.RulePath, "ruler.rule-path", "./data-ruler/", "Directory to store temporary rule files loaded by the Prometheus rule managers. This directory is not required to be persisted between restarts.")
f.BoolVar(&cfg.EnableAPI, "ruler.enable-api", true, "Enable the ruler config API.")
Expand Down
Loading