Skip to content

Commit

Permalink
HACKY PoC
Browse files Browse the repository at this point in the history
This showcases that we *can* make it work; just have to do a lot of
cleanup to get there
  • Loading branch information
jacksontj committed Aug 17, 2023
1 parent f032b91 commit 5b354c3
Show file tree
Hide file tree
Showing 5 changed files with 271 additions and 13 deletions.
6 changes: 3 additions & 3 deletions cmd/promxy/alert_example.rule
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
groups:
- name: example
rules:
- alert: HighErrorRate
expr: job:request_latency_seconds:mean5m{job="myjob"} > 0.5
- alert: testAlert
expr: prometheus_build_info == 1
for: 10m
labels:
severity: page
annotations:
summary: High request latency
summary: Example alert

7 changes: 6 additions & 1 deletion cmd/promxy/demo_robust.conf
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
# Rule files specifies a list of globs. Rules and alerts are read from
# all matching files.
rule_files:
- "*rule"

##
## Regular prometheus configuration
##
global:
evaluation_interval: 60s
evaluation_interval: 5s
external_labels:
source: promxy

Expand Down
9 changes: 7 additions & 2 deletions cmd/promxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"go.uber.org/atomic"
"k8s.io/klog"

"github.com/jacksontj/promxy/pkg/alertbackfill"
proxyconfig "github.com/jacksontj/promxy/pkg/config"
"github.com/jacksontj/promxy/pkg/logging"
"github.com/jacksontj/promxy/pkg/middleware"
Expand Down Expand Up @@ -307,19 +308,23 @@ func main() {
logrus.Infof("Notifier manager stopped")
}()

ruleQueryable := alertbackfill.NewAlertBackfillQueryable(engine, proxyStorage)
ruleManager := rules.NewManager(&rules.ManagerOptions{
Context: ctx, // base context for all background tasks
ExternalURL: externalUrl, // URL listed as URL for "who fired this alert"
QueryFunc: rules.EngineQueryFunc(engine, proxyStorage),
NotifyFunc: sendAlerts(notifierManager, externalUrl.String()),
Appendable: proxyStorage,
Queryable: proxyStorage,
Queryable: ruleQueryable,
Logger: logger,
Registerer: prometheus.DefaultRegisterer,
OutageTolerance: opts.ForOutageTolerance,
ForGracePeriod: opts.ForGracePeriod,
ResendDelay: opts.ResendDelay,
})

ruleQueryable.SetManager(ruleManager)

go ruleManager.Run()

reloadables = append(reloadables, proxyconfig.WrapPromReloadable(&proxyconfig.ApplyConfigFunc{func(cfg *config.Config) error {
Expand Down Expand Up @@ -413,7 +418,7 @@ func main() {
r.NotFound = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Have our fallback rules
if strings.HasPrefix(r.URL.Path, path.Join(webOptions.RoutePrefix, "/debug")) {
http.DefaultServeMux.ServeHTTP(w, r)
http.StripPrefix(webOptions.RoutePrefix, http.DefaultServeMux).ServeHTTP(w, r)
} else if r.URL.Path == path.Join(webOptions.RoutePrefix, "/-/ready") {
if stopping {
w.WriteHeader(http.StatusServiceUnavailable)
Expand Down
7 changes: 0 additions & 7 deletions cmd/promxy/recording_example.rule

This file was deleted.

255 changes: 255 additions & 0 deletions pkg/alertbackfill/backfill.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
package alertbackfill

import (
"context"
"fmt"
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/storage"

"github.com/jacksontj/promxy/pkg/promclient"
"github.com/jacksontj/promxy/pkg/proxyquerier"
)

// NewAlertBackfillQueryable returns a new AlertBackfillQueryable
func NewAlertBackfillQueryable(e *promql.Engine, q storage.Queryable) *AlertBackfillQueryable {
return &AlertBackfillQueryable{e: e, q: q}
}

// AlertBackfillQueryable returns a storage.Queryable that will handle returning
// results for the RuleManager alert backfill. This is done by first attempting
// to query the downstream store and if no result is found it will "recreate" the
// the series by re-running the necessary query to get the data back
type AlertBackfillQueryable struct {
e *promql.Engine
m *rules.Manager
q storage.Queryable
}

// SetManager sets the rules.Manager -- this is required as the manager needs the Queryable at creation
func (q *AlertBackfillQueryable) SetManager(m *rules.Manager) {
q.m = m
}

// Querier returns an AlertBackfillQuerier
func (q *AlertBackfillQueryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return &AlertBackfillQuerier{
e: q.e,
m: q.m,
q: q.q,
ruleValues: make(map[string]*promql.Result),
mint: mint,
maxt: maxt,
}, nil
}

// AlertBackfillQuerier will Query a downstream storage.Queryable for the
// ALERTS_FOR_STATE series, if that series is not found -- it will then
// run the appropriate query_range equivalent to re-generate the data.
type AlertBackfillQuerier struct {
e *promql.Engine
m *rules.Manager
q storage.Queryable

// map of (groupidx.ruleidx) -> result
ruleValues map[string]*promql.Result
mint int64
maxt int64
}

// Select will fetch and return the ALERTS_FOR_STATE series for the given matchers
func (q *AlertBackfillQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
// first, we call the actual downstream to see if we have the correct data
// this will return something if the remote_write from promxy has been saved
// somewhere where promxy is also configured to read from
querier, err := q.q.Querier(context.TODO(), q.mint, q.maxt)
if err != nil {
return proxyquerier.NewSeriesSet(nil, nil, err)
}
ret := querier.Select(sortSeries, hints, matchers...)
downstreamSeries := make([]storage.Series, 0)
for ret.Next() {
downstreamSeries = append(downstreamSeries, ret.At())
}
// If the raw queryable had something; return that
if len(downstreamSeries) > 0 {
return proxyquerier.NewSeriesSet(downstreamSeries, ret.Warnings(), ret.Err())
}

// right now upstream alert state restore *just* uses the name of the alert and labels
// this causes issues if there are any additional alerts (as only one will be queried
// and it's state will be used for all alerts with the same name + labels).
// TODO: Once upstream fixes this (https://github.com/prometheus/prometheus/issues/12714)
// we'll want to adjust this logic. For now we're effectively mirroring upstream logic
// Now we need to "backfill" the data (regenerate the series that it would have queried)
var (
matchingGroupIdx int
matchingRuleIdx int
matchingGroup *rules.Group
matchingRule *rules.AlertingRule
)
alertname := matchers[1].Value

FIND_RULE:
for i, group := range q.m.RuleGroups() {
RULE_LOOP:
for ii, rule := range group.Rules() {
alertingRule, ok := rule.(*rules.AlertingRule)
if !ok {
continue
}
// For now we check if the name and all given labels match
// which is both the best we can do, and equivalent to
// direct prometheus behavior
if alertingRule.Name() == alertname {
// Check the rule labels fit the matcher set
for _, lbl := range alertingRule.Labels() {
for _, m := range matchers[2:] {
if lbl.Name == m.Name {
if !m.Matches(lbl.Value) {
continue RULE_LOOP
}
break
}
}
}

matchingGroupIdx = i
matchingRuleIdx = ii
matchingGroup = group
matchingRule = alertingRule
break FIND_RULE
}
}
}

// If we can't find a matching rule; return an empty set
if matchingRule == nil {
return proxyquerier.NewSeriesSet(nil, nil, nil)
}

step := matchingGroup.Interval() // TODO: better variable naming
key := fmt.Sprintf("%d.%d", matchingGroupIdx, matchingRuleIdx)
result, ok := q.ruleValues[key]
// If we haven't queried this *rule* before; lets load that
if !ok {
now := time.Now()
query, err := q.e.NewRangeQuery(q.q, &promql.QueryOpts{false}, matchingRule.Query().String(), now.Add(-1*matchingRule.HoldDuration()).Add(-1*step), now, step)
if err != nil {
return proxyquerier.NewSeriesSet(nil, nil, err)
}

result = query.Exec(context.TODO())
q.ruleValues[key] = result
}

if result.Err != nil {
return proxyquerier.NewSeriesSet(nil, result.Warnings, result.Err)
}

// Now we need to filter+convert the result
var val model.Value
// convert promql.Value -> model.Value
switch v := result.Value.(type) {
case promql.Matrix:
matrix := make(model.Matrix, 0, v.Len())
MATRIXVALUE_LOOP:
for _, item := range v {
metric := make(model.Metric)
for _, label := range item.Metric {
metric[model.LabelName(label.Name)] = model.LabelValue(label.Value)
}

// Filter to results that match our matchers
for _, matcher := range matchers {
switch matcher.Name {
case model.MetricNameLabel, model.AlertNameLabel:
continue
default:
// Check if the matcher is against a label the rule will add
if matchingRule.Labels().Has(matcher.Name) {
continue
}
// If the matcher doesn't exist; skip this series
if !matcher.Matches(string(metric[model.LabelName(matcher.Name)])) {
continue MATRIXVALUE_LOOP
}
}
}

// Overwrite the __name__ and alertname
metric[model.MetricNameLabel] = model.LabelValue(matchers[0].Value)
metric[model.AlertNameLabel] = model.LabelValue(matchers[1].Value)

// TODO: check that the rule manager doesn't add any more labels
// Add the labels which the alert would add
for _, label := range matchingRule.Labels() {
metric[model.LabelName(label.Name)] = model.LabelValue(label.Value)
}

var (
activeAt model.SampleValue
lastPoint time.Time
)
// Now we have to convert the *actual* result into the series that is stored for the ALERTS
// TODO: move to another method (with its own tests)
samples := make([]model.SamplePair, len(item.Points))
for x, sample := range item.Points {
sampleTime := model.Time(sample.T).Time()

// If we are missing a point in the matrix; then we are going to assume
// that the series cleared, so we need to reset activeAt
if sampleTime.Sub(lastPoint) > step {
activeAt = 0
}
lastPoint = sampleTime

// if there is no `activeAt` set; lets set this timestamp (earliest timestamp in the steps that has a point)
if activeAt == 0 {
activeAt = model.SampleValue(model.Time(sample.T).Unix())
}

samples[x] = model.SamplePair{
Timestamp: model.Time(sample.T),
//Value: model.SampleValue(sample.V),
// The timestamp is a unix timestapm of ActiveAt, so we'll set this to the timestamp instead of the value
Value: activeAt,
}
}

matrix = append(matrix, &model.SampleStream{
Metric: metric,
Values: samples,
})
}
val = matrix

// TODO: check about scalar or subquery?
// We should only get a Matrix result; but just in case we'll have a catchall to not panic :)
default:
return proxyquerier.NewSeriesSet(nil, nil, nil)
}

iterators := promclient.IteratorsForValue(val)

series := make([]storage.Series, len(iterators))
for i, iterator := range iterators {
series[i] = &proxyquerier.Series{iterator}
}

return proxyquerier.NewSeriesSet(series, result.Warnings, nil)
}

func (q *AlertBackfillQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
return nil, nil, fmt.Errorf("not implemented")
}

func (q *AlertBackfillQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
return nil, nil, fmt.Errorf("not implemented")
}

func (q *AlertBackfillQuerier) Close() error { return nil }

0 comments on commit 5b354c3

Please sign in to comment.