Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read alertmanager state from storage if peer settling fails. #4021

Merged
merged 3 commits into from
Apr 7, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/alertmanager/alertmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/common/route"

"github.com/cortexproject/cortex/pkg/alertmanager/alertstore"
"github.com/cortexproject/cortex/pkg/util/services"
)

Expand Down Expand Up @@ -71,6 +72,7 @@ type Config struct {
ShardingEnabled bool
ReplicationFactor int
Replicator Replicator
Store alertstore.AlertStore
}

// An Alertmanager manages the alerts for one user.
Expand Down Expand Up @@ -161,7 +163,7 @@ func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) {
am.state = cfg.Peer
} else if cfg.ShardingEnabled {
level.Debug(am.logger).Log("msg", "starting tenant alertmanager with ring-based replication")
am.state = newReplicatedStates(cfg.UserID, cfg.ReplicationFactor, cfg.Replicator, am.logger, am.registry)
am.state = newReplicatedStates(cfg.UserID, cfg.ReplicationFactor, cfg.Replicator, cfg.Store, am.logger, am.registry)
} else {
level.Debug(am.logger).Log("msg", "starting tenant alertmanager without replication")
am.state = &NilPeer{}
Expand Down
1 change: 1 addition & 0 deletions pkg/alertmanager/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,7 @@ func (am *MultitenantAlertmanager) newAlertmanager(userID string, amConfig *amco
ShardingEnabled: am.cfg.ShardingEnabled,
Replicator: am,
ReplicationFactor: am.cfg.ShardingRing.ReplicationFactor,
Store: am.store,
}, reg)
if err != nil {
return nil, fmt.Errorf("unable to start Alertmanager for user %v: %v", userID, err)
Expand Down
25 changes: 23 additions & 2 deletions pkg/alertmanager/state_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ import (
"github.com/prometheus/alertmanager/cluster/clusterpb"
"github.com/prometheus/client_golang/prometheus"

"github.com/cortexproject/cortex/pkg/alertmanager/alertstore"
"github.com/cortexproject/cortex/pkg/util/services"
)

const (
defaultSettleReadTimeout = 15 * time.Second
defaultStoreReadTimeout = 15 * time.Second
)

// state represents the Alertmanager silences and notification log internal state.
Expand All @@ -31,12 +33,14 @@ type state struct {
reg prometheus.Registerer

settleReadTimeout time.Duration
storeReadTimeout time.Duration

mtx sync.Mutex
states map[string]cluster.State

replicationFactor int
replicator Replicator
store alertstore.AlertStore

partialStateMergesTotal *prometheus.CounterVec
partialStateMergesFailed *prometheus.CounterVec
Expand All @@ -47,17 +51,19 @@ type state struct {
}

// newReplicatedStates creates a new state struct, which manages state to be replicated between alertmanagers.
func newReplicatedStates(userID string, rf int, re Replicator, l log.Logger, r prometheus.Registerer) *state {
func newReplicatedStates(userID string, rf int, re Replicator, st alertstore.AlertStore, l log.Logger, r prometheus.Registerer) *state {

s := &state{
logger: l,
userID: userID,
replicationFactor: rf,
replicator: re,
store: st,
states: make(map[string]cluster.State, 2), // we use two, one for the notifications and one for silences.
msgc: make(chan *clusterpb.Part),
reg: r,
settleReadTimeout: defaultSettleReadTimeout,
storeReadTimeout: defaultStoreReadTimeout,
partialStateMergesTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "alertmanager_partial_state_merges_total",
Help: "Number of times we have received a partial state to merge for a key.",
Expand Down Expand Up @@ -167,7 +173,22 @@ func (s *state) starting(ctx context.Context) error {
}
}

level.Info(s.logger).Log("msg", "state not settled but continuing anyway", "err", err)
level.Info(s.logger).Log("msg", "state not settled; trying to read from storage", "err", err)

// Attempt to read the state from persistent storage instead.
storeReadCtx, cancel := context.WithTimeout(ctx, s.storeReadTimeout)
defer cancel()

fullState, err := s.store.GetFullState(storeReadCtx, s.userID)
if err == nil {
stevesg marked this conversation as resolved.
Show resolved Hide resolved
if err = s.mergeFullStates([]*clusterpb.FullState{fullState.State}); err == nil {
level.Info(s.logger).Log("msg", "state read from storage; proceeding")
return nil
}
}

level.Info(s.logger).Log("msg", "failed to read state from storage; continuing anyway", "err", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming we circuit break in case of "object not found" error, then I would raise this to warning:

Suggested change
level.Info(s.logger).Log("msg", "failed to read state from storage; continuing anyway", "err", err)
level.Warn(s.logger).Log("msg", "failed to read state from storage; continuing anyway", "err", err)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By circuit break, you mean don't retry any calls to object storage when no object?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By circuit break I was meaning a "guard" (if condition then return). There's no retry mechanism here yet (to be discussed if we want it, given retries may also be offered by the bucket client).


return nil
}

Expand Down
50 changes: 46 additions & 4 deletions pkg/alertmanager/state_replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (

"github.com/go-kit/kit/log"

"github.com/cortexproject/cortex/pkg/alertmanager/alertspb"
"github.com/cortexproject/cortex/pkg/alertmanager/alertstore"
"github.com/cortexproject/cortex/pkg/util/services"
)

Expand Down Expand Up @@ -76,6 +78,25 @@ func (f *fakeReplicator) ReadFullStateForUser(ctx context.Context, userID string
return f.read.res, f.read.err
}

type fakeAlertStore struct {
alertstore.AlertStore

states map[string]alertspb.FullStateDesc
}

func newFakeAlertStore() *fakeAlertStore {
return &fakeAlertStore{
states: make(map[string]alertspb.FullStateDesc),
}
}

func (f *fakeAlertStore) GetFullState(ctx context.Context, user string) (alertspb.FullStateDesc, error) {
if result, ok := f.states[user]; ok {
return result, nil
}
return alertspb.FullStateDesc{}, alertspb.ErrNotFound
}

func TestStateReplication(t *testing.T) {
tc := []struct {
name string
Expand All @@ -102,7 +123,8 @@ func TestStateReplication(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
replicator := newFakeReplicator()
replicator.read = readStateResult{res: nil, err: nil}
s := newReplicatedStates("user-1", tt.replicationFactor, replicator, log.NewNopLogger(), reg)
store := newFakeAlertStore()
s := newReplicatedStates("user-1", tt.replicationFactor, replicator, store, log.NewNopLogger(), reg)

require.False(t, s.Ready())
{
Expand Down Expand Up @@ -163,6 +185,7 @@ func TestStateReplication_Settle(t *testing.T) {
name string
replicationFactor int
read readStateResult
states map[string]alertspb.FullStateDesc
stevesg marked this conversation as resolved.
Show resolved Hide resolved
results map[string][][]byte
}{
{
Expand Down Expand Up @@ -228,9 +251,26 @@ func TestStateReplication_Settle(t *testing.T) {
},
},
{
name: "when reading the full state fails, still become ready.",
name: "when reading from replicas fails, state is read from storage.",
replicationFactor: 3,
read: readStateResult{err: errors.New("Read Error 1")},
states: map[string]alertspb.FullStateDesc{
"user-1": {
State: &clusterpb.FullState{
Parts: []clusterpb.Part{{Key: "key1", Data: []byte("Datum1")}},
},
},
},
results: map[string][][]byte{
"key1": {[]byte("Datum1")},
"key2": nil,
},
},
{
name: "when reading from replicas and from storage fails, still become ready.",
replicationFactor: 3,
read: readStateResult{err: errors.New("Read Error 1")},
states: map[string]alertspb.FullStateDesc{},
results: map[string][][]byte{
"key1": nil,
"key2": nil,
Expand All @@ -253,7 +293,9 @@ func TestStateReplication_Settle(t *testing.T) {

replicator := newFakeReplicator()
replicator.read = tt.read
s := newReplicatedStates("user-1", tt.replicationFactor, replicator, log.NewNopLogger(), reg)
store := newFakeAlertStore()
store.states = tt.states
s := newReplicatedStates("user-1", tt.replicationFactor, replicator, store, log.NewNopLogger(), reg)

key1State := &fakeState{}
key2State := &fakeState{}
Expand Down Expand Up @@ -322,7 +364,7 @@ func TestStateReplication_GetFullState(t *testing.T) {
for _, tt := range tc {
t.Run(tt.name, func(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
s := newReplicatedStates("user-1", 1, nil, log.NewNopLogger(), reg)
s := newReplicatedStates("user-1", 1, nil, nil, log.NewNopLogger(), reg)

for key, datum := range tt.data {
state := &fakeState{binary: datum}
Expand Down