From be7a687088d0dd018c40f07321fe4bfa4a8d5ae3 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Wed, 4 Sep 2024 18:09:40 +0530 Subject: [PATCH] chore: make prepare task configurable (#5806) --- pkg/query-service/rules/manager.go | 203 ++++++++++++---------- pkg/query-service/rules/prom_rule.go | 8 +- pkg/query-service/rules/prom_rule_task.go | 5 +- pkg/query-service/rules/promrule_test.go | 12 +- 4 files changed, 123 insertions(+), 105 deletions(-) diff --git a/pkg/query-service/rules/manager.go b/pkg/query-service/rules/manager.go index c21873f230..768c753cb8 100644 --- a/pkg/query-service/rules/manager.go +++ b/pkg/query-service/rules/manager.go @@ -12,8 +12,6 @@ import ( "github.com/google/uuid" - "github.com/go-kit/log" - "go.uber.org/zap" "errors" @@ -27,6 +25,17 @@ import ( "go.signoz.io/signoz/pkg/query-service/utils/labels" ) +type PrepareTaskOptions struct { + Rule *PostableRule + TaskName string + RuleDB RuleDB + Logger *zap.Logger + Reader interfaces.Reader + FF interfaces.FeatureLookup + ManagerOpts *ManagerOptions + NotifyFunc NotifyFunc +} + const taskNamesuffix = "webAppEditor" func ruleIdFromTaskName(n string) string { @@ -56,13 +65,15 @@ type ManagerOptions struct { DBConn *sqlx.DB Context context.Context - Logger log.Logger + Logger *zap.Logger ResendDelay time.Duration DisableRules bool FeatureFlags interfaces.FeatureLookup Reader interfaces.Reader EvalDelay time.Duration + + PrepareTaskFunc func(opts PrepareTaskOptions) (Task, error) } // The Manager manages recording and alerting rules. @@ -78,10 +89,12 @@ type Manager struct { // datastore to store alert definitions ruleDB RuleDB - logger log.Logger + logger *zap.Logger featureFlags interfaces.FeatureLookup reader interfaces.Reader + + prepareTaskFunc func(opts PrepareTaskOptions) (Task, error) } func defaultOptions(o *ManagerOptions) *ManagerOptions { @@ -94,9 +107,69 @@ func defaultOptions(o *ManagerOptions) *ManagerOptions { if o.ResendDelay == time.Duration(0) { o.ResendDelay = 1 * time.Minute } + if o.Logger == nil { + o.Logger = zap.L() + } + if o.PrepareTaskFunc == nil { + o.PrepareTaskFunc = defaultPrepareTaskFunc + } return o } +func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) { + + rules := make([]Rule, 0) + var task Task + + ruleId := ruleIdFromTaskName(opts.TaskName) + if opts.Rule.RuleType == RuleTypeThreshold { + // create a threshold rule + tr, err := NewThresholdRule( + ruleId, + opts.Rule, + ThresholdRuleOpts{ + EvalDelay: opts.ManagerOpts.EvalDelay, + }, + opts.FF, + opts.Reader, + ) + + if err != nil { + return task, err + } + + rules = append(rules, tr) + + // create ch rule task for evalution + task = newTask(TaskTypeCh, opts.TaskName, taskNamesuffix, time.Duration(opts.Rule.Frequency), rules, opts.ManagerOpts, opts.NotifyFunc, opts.RuleDB) + + } else if opts.Rule.RuleType == RuleTypeProm { + + // create promql rule + pr, err := NewPromRule( + ruleId, + opts.Rule, + opts.Logger, + PromRuleOpts{}, + opts.Reader, + ) + + if err != nil { + return task, err + } + + rules = append(rules, pr) + + // create promql rule task for evalution + task = newTask(TaskTypeProm, opts.TaskName, taskNamesuffix, time.Duration(opts.Rule.Frequency), rules, opts.ManagerOpts, opts.NotifyFunc, opts.RuleDB) + + } else { + return nil, fmt.Errorf("unsupported rule type. Supported types: %s, %s", RuleTypeProm, RuleTypeThreshold) + } + + return task, nil +} + // NewManager returns an implementation of Manager, ready to be started // by calling the Run method. func NewManager(o *ManagerOptions) (*Manager, error) { @@ -116,15 +189,16 @@ func NewManager(o *ManagerOptions) (*Manager, error) { telemetry.GetInstance().SetAlertsInfoCallback(db.GetAlertsInfo) m := &Manager{ - tasks: map[string]Task{}, - rules: map[string]Rule{}, - notifier: notifier, - ruleDB: db, - opts: o, - block: make(chan struct{}), - logger: o.Logger, - featureFlags: o.FeatureFlags, - reader: o.Reader, + tasks: map[string]Task{}, + rules: map[string]Rule{}, + notifier: notifier, + ruleDB: db, + opts: o, + block: make(chan struct{}), + logger: o.Logger, + featureFlags: o.FeatureFlags, + reader: o.Reader, + prepareTaskFunc: o.PrepareTaskFunc, } return m, nil } @@ -251,13 +325,26 @@ func (m *Manager) editTask(rule *PostableRule, taskName string) error { zap.L().Debug("editing a rule task", zap.String("name", taskName)) - newTask, err := m.prepareTask(false, rule, taskName) + newTask, err := m.prepareTaskFunc(PrepareTaskOptions{ + Rule: rule, + TaskName: taskName, + RuleDB: m.ruleDB, + Logger: m.logger, + Reader: m.reader, + FF: m.featureFlags, + ManagerOpts: m.opts, + NotifyFunc: m.prepareNotifyFunc(), + }) if err != nil { zap.L().Error("loading tasks failed", zap.Error(err)) return errors.New("error preparing rule with given parameters, previous rule set restored") } + for _, r := range newTask.Rules() { + m.rules[r.ID()] = r + } + // If there is an old task with the same identifier, stop it and wait for // it to finish the current iteration. Then copy it into the new group. oldTask, ok := m.tasks[taskName] @@ -357,7 +444,20 @@ func (m *Manager) addTask(rule *PostableRule, taskName string) error { defer m.mtx.Unlock() zap.L().Debug("adding a new rule task", zap.String("name", taskName)) - newTask, err := m.prepareTask(false, rule, taskName) + newTask, err := m.prepareTaskFunc(PrepareTaskOptions{ + Rule: rule, + TaskName: taskName, + RuleDB: m.ruleDB, + Logger: m.logger, + Reader: m.reader, + FF: m.featureFlags, + ManagerOpts: m.opts, + NotifyFunc: m.prepareNotifyFunc(), + }) + + for _, r := range newTask.Rules() { + m.rules[r.ID()] = r + } if err != nil { zap.L().Error("creating rule task failed", zap.String("name", taskName), zap.Error(err)) @@ -382,77 +482,6 @@ func (m *Manager) addTask(rule *PostableRule, taskName string) error { return nil } -// prepareTask prepares a rule task from postable rule -func (m *Manager) prepareTask(acquireLock bool, r *PostableRule, taskName string) (Task, error) { - - if acquireLock { - m.mtx.Lock() - defer m.mtx.Unlock() - } - - rules := make([]Rule, 0) - var task Task - - if r.AlertName == "" { - zap.L().Error("task load failed, at least one rule must be set", zap.String("name", taskName)) - return task, fmt.Errorf("task load failed, at least one rule must be set") - } - - ruleId := ruleIdFromTaskName(taskName) - if r.RuleType == RuleTypeThreshold { - // create a threshold rule - tr, err := NewThresholdRule( - ruleId, - r, - ThresholdRuleOpts{ - EvalDelay: m.opts.EvalDelay, - }, - m.featureFlags, - m.reader, - ) - - if err != nil { - return task, err - } - - rules = append(rules, tr) - - // create ch rule task for evalution - task = newTask(TaskTypeCh, taskName, taskNamesuffix, time.Duration(r.Frequency), rules, m.opts, m.prepareNotifyFunc(), m.ruleDB) - - // add rule to memory - m.rules[ruleId] = tr - - } else if r.RuleType == RuleTypeProm { - - // create promql rule - pr, err := NewPromRule( - ruleId, - r, - log.With(m.logger, "alert", r.AlertName), - PromRuleOpts{}, - m.reader, - ) - - if err != nil { - return task, err - } - - rules = append(rules, pr) - - // create promql rule task for evalution - task = newTask(TaskTypeProm, taskName, taskNamesuffix, time.Duration(r.Frequency), rules, m.opts, m.prepareNotifyFunc(), m.ruleDB) - - // add rule to memory - m.rules[ruleId] = pr - - } else { - return nil, fmt.Errorf("unsupported rule type. Supported types: %s, %s", RuleTypeProm, RuleTypeThreshold) - } - - return task, nil -} - // RuleTasks returns the list of manager's rule tasks. func (m *Manager) RuleTasks() []Task { m.mtx.RLock() @@ -783,7 +812,7 @@ func (m *Manager) TestNotification(ctx context.Context, ruleStr string) (int, *m rule, err = NewPromRule( alertname, parsedRule, - log.With(m.logger, "alert", alertname), + m.logger, PromRuleOpts{ SendAlways: true, }, diff --git a/pkg/query-service/rules/prom_rule.go b/pkg/query-service/rules/prom_rule.go index 06f9ae311d..c8159c49c8 100644 --- a/pkg/query-service/rules/prom_rule.go +++ b/pkg/query-service/rules/prom_rule.go @@ -8,8 +8,6 @@ import ( "sync" "time" - "github.com/go-kit/log" - "github.com/go-kit/log/level" "go.uber.org/zap" plabels "github.com/prometheus/prometheus/model/labels" @@ -54,7 +52,7 @@ type PromRule struct { // map of active alerts active map[uint64]*Alert - logger log.Logger + logger *zap.Logger opts PromRuleOpts reader interfaces.Reader @@ -63,7 +61,7 @@ type PromRule struct { func NewPromRule( id string, postableRule *PostableRule, - logger log.Logger, + logger *zap.Logger, opts PromRuleOpts, reader interfaces.Reader, ) (*PromRule, error) { @@ -405,7 +403,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) ( result, err := tmpl.Expand() if err != nil { result = fmt.Sprintf("", err) - level.Warn(r.logger).Log("msg", "Expanding alert template failed", "err", err, "data", tmplData) + r.logger.Warn("Expanding alert template failed", zap.Error(err), zap.Any("data", tmplData)) } return result } diff --git a/pkg/query-service/rules/prom_rule_task.go b/pkg/query-service/rules/prom_rule_task.go index 13c24ca1fa..f2f11cd494 100644 --- a/pkg/query-service/rules/prom_rule_task.go +++ b/pkg/query-service/rules/prom_rule_task.go @@ -7,7 +7,6 @@ import ( "sync" "time" - "github.com/go-kit/log" opentracing "github.com/opentracing/opentracing-go" plabels "github.com/prometheus/prometheus/model/labels" "go.signoz.io/signoz/pkg/query-service/common" @@ -33,7 +32,7 @@ type PromRuleTask struct { terminated chan struct{} pause bool - logger log.Logger + logger *zap.Logger notify NotifyFunc ruleDB RuleDB @@ -60,7 +59,7 @@ func newPromRuleTask(name, file string, frequency time.Duration, rules []Rule, o terminated: make(chan struct{}), notify: notify, ruleDB: ruleDB, - logger: log.With(opts.Logger, "group", name), + logger: opts.Logger, } } diff --git a/pkg/query-service/rules/promrule_test.go b/pkg/query-service/rules/promrule_test.go index a06b510f2e..6b67253668 100644 --- a/pkg/query-service/rules/promrule_test.go +++ b/pkg/query-service/rules/promrule_test.go @@ -7,17 +7,9 @@ import ( pql "github.com/prometheus/prometheus/promql" "github.com/stretchr/testify/assert" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.uber.org/zap" ) -type testLogger struct { - t *testing.T -} - -func (l testLogger) Log(args ...interface{}) error { - l.t.Log(args...) - return nil -} - func TestPromRuleShouldAlert(t *testing.T) { postableRule := PostableRule{ AlertName: "Test Rule", @@ -611,7 +603,7 @@ func TestPromRuleShouldAlert(t *testing.T) { postableRule.RuleCondition.MatchType = MatchType(c.matchType) postableRule.RuleCondition.Target = &c.target - rule, err := NewPromRule("69", &postableRule, testLogger{t}, PromRuleOpts{}, nil) + rule, err := NewPromRule("69", &postableRule, zap.NewNop(), PromRuleOpts{}, nil) if err != nil { assert.NoError(t, err) }