diff --git a/pkg/alertmanager/alertmanager.go b/pkg/alertmanager/alertmanager.go index e6880a89e5..6d605ce245 100644 --- a/pkg/alertmanager/alertmanager.go +++ b/pkg/alertmanager/alertmanager.go @@ -43,6 +43,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/common/route" + "github.com/cortexproject/cortex/pkg/alertmanager/alertstore" "github.com/cortexproject/cortex/pkg/util/services" ) @@ -71,6 +72,7 @@ type Config struct { ShardingEnabled bool ReplicationFactor int Replicator Replicator + Store alertstore.AlertStore } // An Alertmanager manages the alerts for one user. @@ -161,7 +163,7 @@ func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) { am.state = cfg.Peer } else if cfg.ShardingEnabled { level.Debug(am.logger).Log("msg", "starting tenant alertmanager with ring-based replication") - am.state = newReplicatedStates(cfg.UserID, cfg.ReplicationFactor, cfg.Replicator, am.logger, am.registry) + am.state = newReplicatedStates(cfg.UserID, cfg.ReplicationFactor, cfg.Replicator, cfg.Store, am.logger, am.registry) } else { level.Debug(am.logger).Log("msg", "starting tenant alertmanager without replication") am.state = &NilPeer{} diff --git a/pkg/alertmanager/multitenant.go b/pkg/alertmanager/multitenant.go index 57be1fe8c5..204500def5 100644 --- a/pkg/alertmanager/multitenant.go +++ b/pkg/alertmanager/multitenant.go @@ -855,6 +855,7 @@ func (am *MultitenantAlertmanager) newAlertmanager(userID string, amConfig *amco ShardingEnabled: am.cfg.ShardingEnabled, Replicator: am, ReplicationFactor: am.cfg.ShardingRing.ReplicationFactor, + Store: am.store, }, reg) if err != nil { return nil, fmt.Errorf("unable to start Alertmanager for user %v: %v", userID, err) diff --git a/pkg/alertmanager/state_replication.go b/pkg/alertmanager/state_replication.go index 23dac007bf..f749e151d3 100644 --- a/pkg/alertmanager/state_replication.go +++ b/pkg/alertmanager/state_replication.go @@ -15,11 +15,14 @@ import ( "github.com/prometheus/alertmanager/cluster/clusterpb" "github.com/prometheus/client_golang/prometheus" + "github.com/cortexproject/cortex/pkg/alertmanager/alertspb" + "github.com/cortexproject/cortex/pkg/alertmanager/alertstore" "github.com/cortexproject/cortex/pkg/util/services" ) const ( defaultSettleReadTimeout = 15 * time.Second + defaultStoreReadTimeout = 15 * time.Second ) // state represents the Alertmanager silences and notification log internal state. @@ -31,12 +34,14 @@ type state struct { reg prometheus.Registerer settleReadTimeout time.Duration + storeReadTimeout time.Duration mtx sync.Mutex states map[string]cluster.State replicationFactor int replicator Replicator + store alertstore.AlertStore partialStateMergesTotal *prometheus.CounterVec partialStateMergesFailed *prometheus.CounterVec @@ -47,17 +52,19 @@ type state struct { } // newReplicatedStates creates a new state struct, which manages state to be replicated between alertmanagers. -func newReplicatedStates(userID string, rf int, re Replicator, l log.Logger, r prometheus.Registerer) *state { +func newReplicatedStates(userID string, rf int, re Replicator, st alertstore.AlertStore, l log.Logger, r prometheus.Registerer) *state { s := &state{ logger: l, userID: userID, replicationFactor: rf, replicator: re, + store: st, states: make(map[string]cluster.State, 2), // we use two, one for the notifications and one for silences. msgc: make(chan *clusterpb.Part), reg: r, settleReadTimeout: defaultSettleReadTimeout, + storeReadTimeout: defaultStoreReadTimeout, partialStateMergesTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Name: "alertmanager_partial_state_merges_total", Help: "Number of times we have received a partial state to merge for a key.", @@ -167,7 +174,26 @@ func (s *state) starting(ctx context.Context) error { } } - level.Info(s.logger).Log("msg", "state not settled but continuing anyway", "err", err) + level.Info(s.logger).Log("msg", "state not settled; trying to read from storage", "err", err) + + // Attempt to read the state from persistent storage instead. + storeReadCtx, cancel := context.WithTimeout(ctx, s.storeReadTimeout) + defer cancel() + + fullState, err := s.store.GetFullState(storeReadCtx, s.userID) + if errors.Is(err, alertspb.ErrNotFound) { + level.Info(s.logger).Log("msg", "no state for user in storage; proceeding", "user", s.userID) + return nil + } + if err == nil { + if err = s.mergeFullStates([]*clusterpb.FullState{fullState.State}); err == nil { + level.Info(s.logger).Log("msg", "state read from storage; proceeding") + return nil + } + } + + level.Warn(s.logger).Log("msg", "failed to read state from storage; continuing anyway", "err", err) + return nil } diff --git a/pkg/alertmanager/state_replication_test.go b/pkg/alertmanager/state_replication_test.go index 59c758dc90..8e4bd90e7e 100644 --- a/pkg/alertmanager/state_replication_test.go +++ b/pkg/alertmanager/state_replication_test.go @@ -18,6 +18,8 @@ import ( "github.com/go-kit/kit/log" + "github.com/cortexproject/cortex/pkg/alertmanager/alertspb" + "github.com/cortexproject/cortex/pkg/alertmanager/alertstore" "github.com/cortexproject/cortex/pkg/util/services" ) @@ -76,6 +78,25 @@ func (f *fakeReplicator) ReadFullStateForUser(ctx context.Context, userID string return f.read.res, f.read.err } +type fakeAlertStore struct { + alertstore.AlertStore + + states map[string]alertspb.FullStateDesc +} + +func newFakeAlertStore() *fakeAlertStore { + return &fakeAlertStore{ + states: make(map[string]alertspb.FullStateDesc), + } +} + +func (f *fakeAlertStore) GetFullState(ctx context.Context, user string) (alertspb.FullStateDesc, error) { + if result, ok := f.states[user]; ok { + return result, nil + } + return alertspb.FullStateDesc{}, alertspb.ErrNotFound +} + func TestStateReplication(t *testing.T) { tc := []struct { name string @@ -102,7 +123,8 @@ func TestStateReplication(t *testing.T) { reg := prometheus.NewPedanticRegistry() replicator := newFakeReplicator() replicator.read = readStateResult{res: nil, err: nil} - s := newReplicatedStates("user-1", tt.replicationFactor, replicator, log.NewNopLogger(), reg) + store := newFakeAlertStore() + s := newReplicatedStates("user-1", tt.replicationFactor, replicator, store, log.NewNopLogger(), reg) require.False(t, s.Ready()) { @@ -163,6 +185,7 @@ func TestStateReplication_Settle(t *testing.T) { name string replicationFactor int read readStateResult + storeStates map[string]alertspb.FullStateDesc results map[string][][]byte }{ { @@ -228,9 +251,26 @@ func TestStateReplication_Settle(t *testing.T) { }, }, { - name: "when reading the full state fails, still become ready.", + name: "when reading from replicas fails, state is read from storage.", + replicationFactor: 3, + read: readStateResult{err: errors.New("Read Error 1")}, + storeStates: map[string]alertspb.FullStateDesc{ + "user-1": { + State: &clusterpb.FullState{ + Parts: []clusterpb.Part{{Key: "key1", Data: []byte("Datum1")}}, + }, + }, + }, + results: map[string][][]byte{ + "key1": {[]byte("Datum1")}, + "key2": nil, + }, + }, + { + name: "when reading from replicas and from storage fails, still become ready.", replicationFactor: 3, read: readStateResult{err: errors.New("Read Error 1")}, + storeStates: map[string]alertspb.FullStateDesc{}, results: map[string][][]byte{ "key1": nil, "key2": nil, @@ -253,7 +293,9 @@ func TestStateReplication_Settle(t *testing.T) { replicator := newFakeReplicator() replicator.read = tt.read - s := newReplicatedStates("user-1", tt.replicationFactor, replicator, log.NewNopLogger(), reg) + store := newFakeAlertStore() + store.states = tt.storeStates + s := newReplicatedStates("user-1", tt.replicationFactor, replicator, store, log.NewNopLogger(), reg) key1State := &fakeState{} key2State := &fakeState{} @@ -322,7 +364,7 @@ func TestStateReplication_GetFullState(t *testing.T) { for _, tt := range tc { t.Run(tt.name, func(t *testing.T) { reg := prometheus.NewPedanticRegistry() - s := newReplicatedStates("user-1", 1, nil, log.NewNopLogger(), reg) + s := newReplicatedStates("user-1", 1, nil, nil, log.NewNopLogger(), reg) for key, datum := range tt.data { state := &fakeState{binary: datum}