Skip to content

Commit

Permalink
chore: make prepare task configurable (#5806)
Browse files Browse the repository at this point in the history
  • Loading branch information
srikanthccv committed Sep 4, 2024
1 parent 1066b21 commit be7a687
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 105 deletions.
203 changes: 116 additions & 87 deletions pkg/query-service/rules/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (

"github.com/google/uuid"

"github.com/go-kit/log"

"go.uber.org/zap"

"errors"
Expand All @@ -27,6 +25,17 @@ import (
"go.signoz.io/signoz/pkg/query-service/utils/labels"
)

type PrepareTaskOptions struct {
Rule *PostableRule
TaskName string
RuleDB RuleDB
Logger *zap.Logger
Reader interfaces.Reader
FF interfaces.FeatureLookup
ManagerOpts *ManagerOptions
NotifyFunc NotifyFunc
}

const taskNamesuffix = "webAppEditor"

func ruleIdFromTaskName(n string) string {
Expand Down Expand Up @@ -56,13 +65,15 @@ type ManagerOptions struct {
DBConn *sqlx.DB

Context context.Context
Logger log.Logger
Logger *zap.Logger
ResendDelay time.Duration
DisableRules bool
FeatureFlags interfaces.FeatureLookup
Reader interfaces.Reader

EvalDelay time.Duration

PrepareTaskFunc func(opts PrepareTaskOptions) (Task, error)
}

// The Manager manages recording and alerting rules.
Expand All @@ -78,10 +89,12 @@ type Manager struct {
// datastore to store alert definitions
ruleDB RuleDB

logger log.Logger
logger *zap.Logger

featureFlags interfaces.FeatureLookup
reader interfaces.Reader

prepareTaskFunc func(opts PrepareTaskOptions) (Task, error)
}

func defaultOptions(o *ManagerOptions) *ManagerOptions {
Expand All @@ -94,9 +107,69 @@ func defaultOptions(o *ManagerOptions) *ManagerOptions {
if o.ResendDelay == time.Duration(0) {
o.ResendDelay = 1 * time.Minute
}
if o.Logger == nil {
o.Logger = zap.L()
}
if o.PrepareTaskFunc == nil {
o.PrepareTaskFunc = defaultPrepareTaskFunc
}
return o
}

func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) {

rules := make([]Rule, 0)
var task Task

ruleId := ruleIdFromTaskName(opts.TaskName)
if opts.Rule.RuleType == RuleTypeThreshold {
// create a threshold rule
tr, err := NewThresholdRule(
ruleId,
opts.Rule,
ThresholdRuleOpts{
EvalDelay: opts.ManagerOpts.EvalDelay,
},
opts.FF,
opts.Reader,
)

if err != nil {
return task, err
}

rules = append(rules, tr)

// create ch rule task for evalution
task = newTask(TaskTypeCh, opts.TaskName, taskNamesuffix, time.Duration(opts.Rule.Frequency), rules, opts.ManagerOpts, opts.NotifyFunc, opts.RuleDB)

} else if opts.Rule.RuleType == RuleTypeProm {

// create promql rule
pr, err := NewPromRule(
ruleId,
opts.Rule,
opts.Logger,
PromRuleOpts{},
opts.Reader,
)

if err != nil {
return task, err
}

rules = append(rules, pr)

// create promql rule task for evalution
task = newTask(TaskTypeProm, opts.TaskName, taskNamesuffix, time.Duration(opts.Rule.Frequency), rules, opts.ManagerOpts, opts.NotifyFunc, opts.RuleDB)

} else {
return nil, fmt.Errorf("unsupported rule type. Supported types: %s, %s", RuleTypeProm, RuleTypeThreshold)
}

return task, nil
}

// NewManager returns an implementation of Manager, ready to be started
// by calling the Run method.
func NewManager(o *ManagerOptions) (*Manager, error) {
Expand All @@ -116,15 +189,16 @@ func NewManager(o *ManagerOptions) (*Manager, error) {
telemetry.GetInstance().SetAlertsInfoCallback(db.GetAlertsInfo)

m := &Manager{
tasks: map[string]Task{},
rules: map[string]Rule{},
notifier: notifier,
ruleDB: db,
opts: o,
block: make(chan struct{}),
logger: o.Logger,
featureFlags: o.FeatureFlags,
reader: o.Reader,
tasks: map[string]Task{},
rules: map[string]Rule{},
notifier: notifier,
ruleDB: db,
opts: o,
block: make(chan struct{}),
logger: o.Logger,
featureFlags: o.FeatureFlags,
reader: o.Reader,
prepareTaskFunc: o.PrepareTaskFunc,
}
return m, nil
}
Expand Down Expand Up @@ -251,13 +325,26 @@ func (m *Manager) editTask(rule *PostableRule, taskName string) error {

zap.L().Debug("editing a rule task", zap.String("name", taskName))

newTask, err := m.prepareTask(false, rule, taskName)
newTask, err := m.prepareTaskFunc(PrepareTaskOptions{
Rule: rule,
TaskName: taskName,
RuleDB: m.ruleDB,
Logger: m.logger,
Reader: m.reader,
FF: m.featureFlags,
ManagerOpts: m.opts,
NotifyFunc: m.prepareNotifyFunc(),
})

if err != nil {
zap.L().Error("loading tasks failed", zap.Error(err))
return errors.New("error preparing rule with given parameters, previous rule set restored")
}

for _, r := range newTask.Rules() {
m.rules[r.ID()] = r
}

// If there is an old task with the same identifier, stop it and wait for
// it to finish the current iteration. Then copy it into the new group.
oldTask, ok := m.tasks[taskName]
Expand Down Expand Up @@ -357,7 +444,20 @@ func (m *Manager) addTask(rule *PostableRule, taskName string) error {
defer m.mtx.Unlock()

zap.L().Debug("adding a new rule task", zap.String("name", taskName))
newTask, err := m.prepareTask(false, rule, taskName)
newTask, err := m.prepareTaskFunc(PrepareTaskOptions{
Rule: rule,
TaskName: taskName,
RuleDB: m.ruleDB,
Logger: m.logger,
Reader: m.reader,
FF: m.featureFlags,
ManagerOpts: m.opts,
NotifyFunc: m.prepareNotifyFunc(),
})

for _, r := range newTask.Rules() {
m.rules[r.ID()] = r
}

if err != nil {
zap.L().Error("creating rule task failed", zap.String("name", taskName), zap.Error(err))
Expand All @@ -382,77 +482,6 @@ func (m *Manager) addTask(rule *PostableRule, taskName string) error {
return nil
}

// prepareTask prepares a rule task from postable rule
func (m *Manager) prepareTask(acquireLock bool, r *PostableRule, taskName string) (Task, error) {

if acquireLock {
m.mtx.Lock()
defer m.mtx.Unlock()
}

rules := make([]Rule, 0)
var task Task

if r.AlertName == "" {
zap.L().Error("task load failed, at least one rule must be set", zap.String("name", taskName))
return task, fmt.Errorf("task load failed, at least one rule must be set")
}

ruleId := ruleIdFromTaskName(taskName)
if r.RuleType == RuleTypeThreshold {
// create a threshold rule
tr, err := NewThresholdRule(
ruleId,
r,
ThresholdRuleOpts{
EvalDelay: m.opts.EvalDelay,
},
m.featureFlags,
m.reader,
)

if err != nil {
return task, err
}

rules = append(rules, tr)

// create ch rule task for evalution
task = newTask(TaskTypeCh, taskName, taskNamesuffix, time.Duration(r.Frequency), rules, m.opts, m.prepareNotifyFunc(), m.ruleDB)

// add rule to memory
m.rules[ruleId] = tr

} else if r.RuleType == RuleTypeProm {

// create promql rule
pr, err := NewPromRule(
ruleId,
r,
log.With(m.logger, "alert", r.AlertName),
PromRuleOpts{},
m.reader,
)

if err != nil {
return task, err
}

rules = append(rules, pr)

// create promql rule task for evalution
task = newTask(TaskTypeProm, taskName, taskNamesuffix, time.Duration(r.Frequency), rules, m.opts, m.prepareNotifyFunc(), m.ruleDB)

// add rule to memory
m.rules[ruleId] = pr

} else {
return nil, fmt.Errorf("unsupported rule type. Supported types: %s, %s", RuleTypeProm, RuleTypeThreshold)
}

return task, nil
}

// RuleTasks returns the list of manager's rule tasks.
func (m *Manager) RuleTasks() []Task {
m.mtx.RLock()
Expand Down Expand Up @@ -783,7 +812,7 @@ func (m *Manager) TestNotification(ctx context.Context, ruleStr string) (int, *m
rule, err = NewPromRule(
alertname,
parsedRule,
log.With(m.logger, "alert", alertname),
m.logger,
PromRuleOpts{
SendAlways: true,
},
Expand Down
8 changes: 3 additions & 5 deletions pkg/query-service/rules/prom_rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"go.uber.org/zap"

plabels "github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -54,7 +52,7 @@ type PromRule struct {
// map of active alerts
active map[uint64]*Alert

logger log.Logger
logger *zap.Logger
opts PromRuleOpts

reader interfaces.Reader
Expand All @@ -63,7 +61,7 @@ type PromRule struct {
func NewPromRule(
id string,
postableRule *PostableRule,
logger log.Logger,
logger *zap.Logger,
opts PromRuleOpts,
reader interfaces.Reader,
) (*PromRule, error) {
Expand Down Expand Up @@ -405,7 +403,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) (
result, err := tmpl.Expand()
if err != nil {
result = fmt.Sprintf("<error expanding template: %s>", err)
level.Warn(r.logger).Log("msg", "Expanding alert template failed", "err", err, "data", tmplData)
r.logger.Warn("Expanding alert template failed", zap.Error(err), zap.Any("data", tmplData))
}
return result
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/query-service/rules/prom_rule_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"sync"
"time"

"github.com/go-kit/log"
opentracing "github.com/opentracing/opentracing-go"
plabels "github.com/prometheus/prometheus/model/labels"
"go.signoz.io/signoz/pkg/query-service/common"
Expand All @@ -33,7 +32,7 @@ type PromRuleTask struct {
terminated chan struct{}

pause bool
logger log.Logger
logger *zap.Logger
notify NotifyFunc

ruleDB RuleDB
Expand All @@ -60,7 +59,7 @@ func newPromRuleTask(name, file string, frequency time.Duration, rules []Rule, o
terminated: make(chan struct{}),
notify: notify,
ruleDB: ruleDB,
logger: log.With(opts.Logger, "group", name),
logger: opts.Logger,
}
}

Expand Down
12 changes: 2 additions & 10 deletions pkg/query-service/rules/promrule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,9 @@ import (
pql "github.com/prometheus/prometheus/promql"
"github.com/stretchr/testify/assert"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.uber.org/zap"
)

type testLogger struct {
t *testing.T
}

func (l testLogger) Log(args ...interface{}) error {
l.t.Log(args...)
return nil
}

func TestPromRuleShouldAlert(t *testing.T) {
postableRule := PostableRule{
AlertName: "Test Rule",
Expand Down Expand Up @@ -611,7 +603,7 @@ func TestPromRuleShouldAlert(t *testing.T) {
postableRule.RuleCondition.MatchType = MatchType(c.matchType)
postableRule.RuleCondition.Target = &c.target

rule, err := NewPromRule("69", &postableRule, testLogger{t}, PromRuleOpts{}, nil)
rule, err := NewPromRule("69", &postableRule, zap.NewNop(), PromRuleOpts{}, nil)
if err != nil {
assert.NoError(t, err)
}
Expand Down

0 comments on commit be7a687

Please sign in to comment.