Skip to content

Commit

Permalink
Read alertmanager state from storage if peer settling fails. (#4021)
Browse files Browse the repository at this point in the history
* Read alertmanager state from storage if peer settling fails.

Signed-off-by: Steve Simpson <steve.simpson@grafana.com>

* Review comments.

Signed-off-by: Steve Simpson <steve.simpson@grafana.com>

* Review comments.

Signed-off-by: Steve Simpson <steve.simpson@grafana.com>
  • Loading branch information
stevesg authored Apr 7, 2021
1 parent 88ebc30 commit 9037020
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 7 deletions.
4 changes: 3 additions & 1 deletion pkg/alertmanager/alertmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/common/route"

"github.com/cortexproject/cortex/pkg/alertmanager/alertstore"
"github.com/cortexproject/cortex/pkg/util/services"
)

Expand Down Expand Up @@ -71,6 +72,7 @@ type Config struct {
ShardingEnabled bool
ReplicationFactor int
Replicator Replicator
Store alertstore.AlertStore
}

// An Alertmanager manages the alerts for one user.
Expand Down Expand Up @@ -161,7 +163,7 @@ func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) {
am.state = cfg.Peer
} else if cfg.ShardingEnabled {
level.Debug(am.logger).Log("msg", "starting tenant alertmanager with ring-based replication")
am.state = newReplicatedStates(cfg.UserID, cfg.ReplicationFactor, cfg.Replicator, am.logger, am.registry)
am.state = newReplicatedStates(cfg.UserID, cfg.ReplicationFactor, cfg.Replicator, cfg.Store, am.logger, am.registry)
} else {
level.Debug(am.logger).Log("msg", "starting tenant alertmanager without replication")
am.state = &NilPeer{}
Expand Down
1 change: 1 addition & 0 deletions pkg/alertmanager/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,7 @@ func (am *MultitenantAlertmanager) newAlertmanager(userID string, amConfig *amco
ShardingEnabled: am.cfg.ShardingEnabled,
Replicator: am,
ReplicationFactor: am.cfg.ShardingRing.ReplicationFactor,
Store: am.store,
}, reg)
if err != nil {
return nil, fmt.Errorf("unable to start Alertmanager for user %v: %v", userID, err)
Expand Down
30 changes: 28 additions & 2 deletions pkg/alertmanager/state_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ import (
"github.com/prometheus/alertmanager/cluster/clusterpb"
"github.com/prometheus/client_golang/prometheus"

"github.com/cortexproject/cortex/pkg/alertmanager/alertspb"
"github.com/cortexproject/cortex/pkg/alertmanager/alertstore"
"github.com/cortexproject/cortex/pkg/util/services"
)

const (
defaultSettleReadTimeout = 15 * time.Second
defaultStoreReadTimeout = 15 * time.Second
)

// state represents the Alertmanager silences and notification log internal state.
Expand All @@ -31,12 +34,14 @@ type state struct {
reg prometheus.Registerer

settleReadTimeout time.Duration
storeReadTimeout time.Duration

mtx sync.Mutex
states map[string]cluster.State

replicationFactor int
replicator Replicator
store alertstore.AlertStore

partialStateMergesTotal *prometheus.CounterVec
partialStateMergesFailed *prometheus.CounterVec
Expand All @@ -47,17 +52,19 @@ type state struct {
}

// newReplicatedStates creates a new state struct, which manages state to be replicated between alertmanagers.
func newReplicatedStates(userID string, rf int, re Replicator, l log.Logger, r prometheus.Registerer) *state {
func newReplicatedStates(userID string, rf int, re Replicator, st alertstore.AlertStore, l log.Logger, r prometheus.Registerer) *state {

s := &state{
logger: l,
userID: userID,
replicationFactor: rf,
replicator: re,
store: st,
states: make(map[string]cluster.State, 2), // we use two, one for the notifications and one for silences.
msgc: make(chan *clusterpb.Part),
reg: r,
settleReadTimeout: defaultSettleReadTimeout,
storeReadTimeout: defaultStoreReadTimeout,
partialStateMergesTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "alertmanager_partial_state_merges_total",
Help: "Number of times we have received a partial state to merge for a key.",
Expand Down Expand Up @@ -167,7 +174,26 @@ func (s *state) starting(ctx context.Context) error {
}
}

level.Info(s.logger).Log("msg", "state not settled but continuing anyway", "err", err)
level.Info(s.logger).Log("msg", "state not settled; trying to read from storage", "err", err)

// Attempt to read the state from persistent storage instead.
storeReadCtx, cancel := context.WithTimeout(ctx, s.storeReadTimeout)
defer cancel()

fullState, err := s.store.GetFullState(storeReadCtx, s.userID)
if errors.Is(err, alertspb.ErrNotFound) {
level.Info(s.logger).Log("msg", "no state for user in storage; proceeding", "user", s.userID)
return nil
}
if err == nil {
if err = s.mergeFullStates([]*clusterpb.FullState{fullState.State}); err == nil {
level.Info(s.logger).Log("msg", "state read from storage; proceeding")
return nil
}
}

level.Warn(s.logger).Log("msg", "failed to read state from storage; continuing anyway", "err", err)

return nil
}

Expand Down
50 changes: 46 additions & 4 deletions pkg/alertmanager/state_replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (

"github.com/go-kit/kit/log"

"github.com/cortexproject/cortex/pkg/alertmanager/alertspb"
"github.com/cortexproject/cortex/pkg/alertmanager/alertstore"
"github.com/cortexproject/cortex/pkg/util/services"
)

Expand Down Expand Up @@ -76,6 +78,25 @@ func (f *fakeReplicator) ReadFullStateForUser(ctx context.Context, userID string
return f.read.res, f.read.err
}

type fakeAlertStore struct {
alertstore.AlertStore

states map[string]alertspb.FullStateDesc
}

func newFakeAlertStore() *fakeAlertStore {
return &fakeAlertStore{
states: make(map[string]alertspb.FullStateDesc),
}
}

func (f *fakeAlertStore) GetFullState(ctx context.Context, user string) (alertspb.FullStateDesc, error) {
if result, ok := f.states[user]; ok {
return result, nil
}
return alertspb.FullStateDesc{}, alertspb.ErrNotFound
}

func TestStateReplication(t *testing.T) {
tc := []struct {
name string
Expand All @@ -102,7 +123,8 @@ func TestStateReplication(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
replicator := newFakeReplicator()
replicator.read = readStateResult{res: nil, err: nil}
s := newReplicatedStates("user-1", tt.replicationFactor, replicator, log.NewNopLogger(), reg)
store := newFakeAlertStore()
s := newReplicatedStates("user-1", tt.replicationFactor, replicator, store, log.NewNopLogger(), reg)

require.False(t, s.Ready())
{
Expand Down Expand Up @@ -163,6 +185,7 @@ func TestStateReplication_Settle(t *testing.T) {
name string
replicationFactor int
read readStateResult
storeStates map[string]alertspb.FullStateDesc
results map[string][][]byte
}{
{
Expand Down Expand Up @@ -228,9 +251,26 @@ func TestStateReplication_Settle(t *testing.T) {
},
},
{
name: "when reading the full state fails, still become ready.",
name: "when reading from replicas fails, state is read from storage.",
replicationFactor: 3,
read: readStateResult{err: errors.New("Read Error 1")},
storeStates: map[string]alertspb.FullStateDesc{
"user-1": {
State: &clusterpb.FullState{
Parts: []clusterpb.Part{{Key: "key1", Data: []byte("Datum1")}},
},
},
},
results: map[string][][]byte{
"key1": {[]byte("Datum1")},
"key2": nil,
},
},
{
name: "when reading from replicas and from storage fails, still become ready.",
replicationFactor: 3,
read: readStateResult{err: errors.New("Read Error 1")},
storeStates: map[string]alertspb.FullStateDesc{},
results: map[string][][]byte{
"key1": nil,
"key2": nil,
Expand All @@ -253,7 +293,9 @@ func TestStateReplication_Settle(t *testing.T) {

replicator := newFakeReplicator()
replicator.read = tt.read
s := newReplicatedStates("user-1", tt.replicationFactor, replicator, log.NewNopLogger(), reg)
store := newFakeAlertStore()
store.states = tt.storeStates
s := newReplicatedStates("user-1", tt.replicationFactor, replicator, store, log.NewNopLogger(), reg)

key1State := &fakeState{}
key2State := &fakeState{}
Expand Down Expand Up @@ -322,7 +364,7 @@ func TestStateReplication_GetFullState(t *testing.T) {
for _, tt := range tc {
t.Run(tt.name, func(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
s := newReplicatedStates("user-1", 1, nil, log.NewNopLogger(), reg)
s := newReplicatedStates("user-1", 1, nil, nil, log.NewNopLogger(), reg)

for key, datum := range tt.data {
state := &fakeState{binary: datum}
Expand Down

0 comments on commit 9037020

Please sign in to comment.