From ef517600f608537dc5e9357455c48d0ec2280f23 Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Wed, 22 May 2024 10:13:11 +0200 Subject: [PATCH] Extract notify recipients process from incident package --- internal/incident/incident.go | 242 ++++++++++-------------------- internal/incident/incidents.go | 8 +- internal/incident/sync.go | 26 ++-- internal/incident/utils.go | 32 ++++ internal/notification/notifier.go | 103 +++++++++++++ 5 files changed, 228 insertions(+), 183 deletions(-) create mode 100644 internal/incident/utils.go create mode 100644 internal/notification/notifier.go diff --git a/internal/incident/incident.go b/internal/incident/incident.go index 9038c928..ef6dbfcd 100644 --- a/internal/incident/incident.go +++ b/internal/incident/incident.go @@ -7,17 +7,14 @@ import ( "github.com/icinga/icinga-go-library/database" "github.com/icinga/icinga-go-library/types" "github.com/icinga/icinga-notifications/internal/config" - "github.com/icinga/icinga-notifications/internal/daemon" "github.com/icinga/icinga-notifications/internal/event" "github.com/icinga/icinga-notifications/internal/notification" "github.com/icinga/icinga-notifications/internal/object" "github.com/icinga/icinga-notifications/internal/recipient" "github.com/icinga/icinga-notifications/internal/rule" "github.com/icinga/icinga-notifications/internal/utils" - "github.com/icinga/icinga-notifications/pkg/plugin" "github.com/jmoiron/sqlx" "go.uber.org/zap" - "net/url" "sync" "time" ) @@ -50,9 +47,9 @@ type Incident struct { // This prevents us from generating multiple muted histories when receiving several events that mute our Object. isMuted bool - db *database.DB - logger *zap.SugaredLogger - runtimeConfig *config.RuntimeConfig + // notification.Notifier is a helper type used to send notifications. + // It is embedded to allow direct access to its members, such as logger, DB etc. + notification.Notifier sync.Mutex } @@ -61,10 +58,8 @@ func NewIncident( db *database.DB, obj *object.Object, runtimeConfig *config.RuntimeConfig, logger *zap.SugaredLogger, ) *Incident { i := &Incident{ - db: db, Object: obj, - logger: logger, - runtimeConfig: runtimeConfig, + Notifier: notification.Notifier{DB: db, RuntimeConfig: runtimeConfig, Logger: logger}, EscalationState: map[escalationID]*EscalationState{}, Rules: map[ruleID]struct{}{}, Recipients: map[recipient.Key]*RecipientState{}, @@ -83,8 +78,8 @@ func (i *Incident) String() string { func (i *Incident) HasManager() bool { for recipientKey, state := range i.Recipients { - if i.runtimeConfig.GetRecipient(recipientKey) == nil { - i.logger.Debugw("Incident refers unknown recipient key, might got deleted", zap.Inline(recipientKey)) + if i.RuntimeConfig.GetRecipient(recipientKey) == nil { + i.Logger.Debugw("Incident refers unknown recipient key, might got deleted", zap.Inline(recipientKey)) continue } if state.Role == RoleManager { @@ -112,29 +107,29 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event) error { i.Lock() defer i.Unlock() - i.runtimeConfig.RLock() - defer i.runtimeConfig.RUnlock() + i.RuntimeConfig.RLock() + defer i.RuntimeConfig.RUnlock() // These event types are not like the others used to mute an object/incident, such as DowntimeStart, which // uniquely identify themselves why an incident is being muted, but are rather super generic types, and as // such, we are ignoring superfluous ones that don't have any effect on that incident. if i.isMuted && ev.Type == event.TypeMute { - i.logger.Debugw("Ignoring superfluous mute event", zap.String("event", ev.String())) + i.Logger.Debugw("Ignoring superfluous mute event", zap.String("event", ev.String())) return event.ErrSuperfluousMuteUnmuteEvent } else if !i.isMuted && ev.Type == event.TypeUnmute { - i.logger.Debugw("Ignoring superfluous unmute event", zap.String("event", ev.String())) + i.Logger.Debugw("Ignoring superfluous unmute event", zap.String("event", ev.String())) return event.ErrSuperfluousMuteUnmuteEvent } - tx, err := i.db.BeginTxx(ctx, nil) + tx, err := i.DB.BeginTxx(ctx, nil) if err != nil { - i.logger.Errorw("Cannot start a db transaction", zap.Error(err)) + i.Logger.Errorw("Cannot start a db transaction", zap.Error(err)) return err } defer func() { _ = tx.Rollback() }() - if err = ev.Sync(ctx, tx, i.db, i.Object.ID); err != nil { - i.logger.Errorw("Failed to insert event and fetch its ID", zap.String("event", ev.String()), zap.Error(err)) + if err = ev.Sync(ctx, tx, i.DB, i.Object.ID); err != nil { + i.Logger.Errorw("Failed to insert event and fetch its ID", zap.String("event", ev.String()), zap.Error(err)) return err } @@ -145,11 +140,11 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event) error { return err } - i.logger = i.logger.With(zap.String("incident", i.String())) + i.Logger = i.Logger.With(zap.String("incident", i.String())) } if err := i.handleMuteUnmute(ctx, tx, ev); err != nil { - i.logger.Errorw("Cannot insert incident muted history", zap.String("event", ev.String()), zap.Error(err)) + i.Logger.Errorw("Cannot insert incident muted history", zap.String("event", ev.String()), zap.Error(err)) return err } @@ -195,14 +190,14 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event) error { } if err = tx.Commit(); err != nil { - i.logger.Errorw("Cannot commit db transaction", zap.Error(err)) + i.Logger.Errorw("Cannot commit db transaction", zap.Error(err)) return err } // We've just committed the DB transaction and can safely update the incident muted flag. i.isMuted = i.Object.IsMuted() - return i.notifyContacts(ctx, ev, notifications) + return i.NotifyContacts(ctx, i.makeNotificationRequest(ev), notifications) } // RetriggerEscalations tries to re-evaluate the escalations and notify contacts. @@ -210,8 +205,8 @@ func (i *Incident) RetriggerEscalations(ev *event.Event) { i.Lock() defer i.Unlock() - i.runtimeConfig.RLock() - defer i.runtimeConfig.RUnlock() + i.RuntimeConfig.RLock() + defer i.RuntimeConfig.RUnlock() if !i.RecoveredAt.Time().IsZero() { // Incident is recovered in the meantime. @@ -219,25 +214,25 @@ func (i *Incident) RetriggerEscalations(ev *event.Event) { } if !time.Now().After(ev.Time) { - i.logger.DPanicw("Event from the future", zap.Time("event_time", ev.Time), zap.Any("event", ev)) + i.Logger.DPanicw("Event from the future", zap.Time("event_time", ev.Time), zap.Any("event", ev)) return } escalations, err := i.evaluateEscalations(ev.Time) if err != nil { - i.logger.Errorw("Reevaluating time-based escalations failed", zap.Error(err)) + i.Logger.Errorw("Reevaluating time-based escalations failed", zap.Error(err)) return } if len(escalations) == 0 { - i.logger.Debug("Reevaluated escalations, no new escalations triggered") + i.Logger.Debug("Reevaluated escalations, no new escalations triggered") return } notifications := make(notification.PendingNotifications) ctx := context.Background() - err = utils.RunInTx(ctx, i.db, func(tx *sqlx.Tx) error { - err := ev.Sync(ctx, tx, i.db, i.Object.ID) + err = utils.RunInTx(ctx, i.DB, func(tx *sqlx.Tx) error { + err := ev.Sync(ctx, tx, i.DB, i.Object.ID) if err != nil { return err } @@ -255,14 +250,14 @@ func (i *Incident) RetriggerEscalations(ev *event.Event) { return err }) if err != nil { - i.logger.Errorw("Reevaluating time-based escalations failed", zap.Error(err)) + i.Logger.Errorw("Reevaluating time-based escalations failed", zap.Error(err)) } else { - if err = i.notifyContacts(ctx, ev, notifications); err != nil { - i.logger.Errorw("Failed to notify reevaluated escalation recipients", zap.Error(err)) + if err = i.NotifyContacts(ctx, i.makeNotificationRequest(ev), notifications); err != nil { + i.Logger.Errorw("Failed to notify reevaluated escalation recipients", zap.Error(err)) return } - i.logger.Info("Successfully reevaluated time-based escalations") + i.Logger.Info("Successfully reevaluated time-based escalations") } } @@ -274,7 +269,7 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx, return err } - i.logger.Infof("Incident severity changed from %s to %s", oldSeverity.String(), newSeverity.String()) + i.Logger.Infof("Incident severity changed from %s to %s", oldSeverity.String(), newSeverity.String()) hr := &HistoryRow{ IncidentID: i.ID, @@ -285,14 +280,14 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx, OldSeverity: oldSeverity, } - if err := hr.Sync(ctx, i.db, tx); err != nil { - i.logger.Errorw("Failed to insert incident severity changed history", zap.Error(err)) + if err := hr.Sync(ctx, i.DB, tx); err != nil { + i.Logger.Errorw("Failed to insert incident severity changed history", zap.Error(err)) return err } if newSeverity == event.SeverityOK { i.RecoveredAt = types.UnixMilli(time.Now()) - i.logger.Info("All sources recovered, closing incident") + i.Logger.Info("All sources recovered, closing incident") RemoveCurrent(i.Object) @@ -303,8 +298,8 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx, Type: Closed, } - if err := hr.Sync(ctx, i.db, tx); err != nil { - i.logger.Errorw("Cannot insert incident closed history to the database", zap.Error(err)) + if err := hr.Sync(ctx, i.DB, tx); err != nil { + i.Logger.Errorw("Cannot insert incident closed history to the database", zap.Error(err)) return err } @@ -315,7 +310,7 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx, i.Severity = newSeverity if err := i.Sync(ctx, tx); err != nil { - i.logger.Errorw("Failed to update incident severity", zap.Error(err)) + i.Logger.Errorw("Failed to update incident severity", zap.Error(err)) return err } @@ -326,11 +321,11 @@ func (i *Incident) processIncidentOpenedEvent(ctx context.Context, tx *sqlx.Tx, i.StartedAt = types.UnixMilli(ev.Time) i.Severity = ev.Severity if err := i.Sync(ctx, tx); err != nil { - i.logger.Errorw("Cannot insert incident to the database", zap.Error(err)) + i.Logger.Errorw("Cannot insert incident to the database", zap.Error(err)) return err } - i.logger.Infow(fmt.Sprintf("Source %d opened incident at severity %q", ev.SourceId, i.Severity.String()), zap.String("message", ev.Message)) + i.Logger.Infow(fmt.Sprintf("Source %d opened incident at severity %q", ev.SourceId, i.Severity.String()), zap.String("message", ev.Message)) hr := &HistoryRow{ IncidentID: i.ID, @@ -340,8 +335,8 @@ func (i *Incident) processIncidentOpenedEvent(ctx context.Context, tx *sqlx.Tx, NewSeverity: i.Severity, } - if err := hr.Sync(ctx, i.db, tx); err != nil { - i.logger.Errorw("Cannot insert incident opened history event", zap.Error(err)) + if err := hr.Sync(ctx, i.DB, tx); err != nil { + i.Logger.Errorw("Cannot insert incident opened history event", zap.Error(err)) return err } @@ -356,7 +351,7 @@ func (i *Incident) handleMuteUnmute(ctx context.Context, tx *sqlx.Tx, ev *event. } hr := &HistoryRow{IncidentID: i.ID, EventID: utils.ToDBInt(ev.ID), Time: types.UnixMilli(time.Now())} - logger := i.logger.With(zap.String("event", ev.String())) + logger := i.Logger.With(zap.String("event", ev.String())) if i.Object.IsMuted() { hr.Type = Muted // Since the object may have already been muted with previous events before this incident even @@ -370,7 +365,7 @@ func (i *Incident) handleMuteUnmute(ctx context.Context, tx *sqlx.Tx, ev *event. logger.Infow("Unmuting incident", zap.String("reason", ev.MuteReason)) } - return hr.Sync(ctx, i.db, tx) + return hr.Sync(ctx, i.DB, tx) } // evaluateRules evaluates all the configured rules for this *incident.Object and @@ -381,11 +376,11 @@ func (i *Incident) evaluateRules(ctx context.Context, tx *sqlx.Tx, eventID int64 i.Rules = make(map[int64]struct{}) } - for _, r := range i.runtimeConfig.Rules { + for _, r := range i.RuntimeConfig.Rules { if _, ok := i.Rules[r.ID]; !ok { matched, err := r.Eval(i.Object) if err != nil { - i.logger.Warnw("Failed to evaluate object filter", zap.Object("rule", r), zap.Error(err)) + i.Logger.Warnw("Failed to evaluate object filter", zap.Object("rule", r), zap.Error(err)) } if err != nil || !matched { @@ -393,11 +388,11 @@ func (i *Incident) evaluateRules(ctx context.Context, tx *sqlx.Tx, eventID int64 } i.Rules[r.ID] = struct{}{} - i.logger.Infow("Rule matches", zap.Object("rule", r)) + i.Logger.Infow("Rule matches", zap.Object("rule", r)) err = i.AddRuleMatched(ctx, tx, r) if err != nil { - i.logger.Errorw("Failed to upsert incident rule", zap.Object("rule", r), zap.Error(err)) + i.Logger.Errorw("Failed to upsert incident rule", zap.Object("rule", r), zap.Error(err)) return err } @@ -408,8 +403,8 @@ func (i *Incident) evaluateRules(ctx context.Context, tx *sqlx.Tx, eventID int64 RuleID: utils.ToDBInt(r.ID), Type: RuleMatched, } - if err := hr.Sync(ctx, i.db, tx); err != nil { - i.logger.Errorw("Failed to insert rule matched incident history", zap.Object("rule", r), zap.Error(err)) + if err := hr.Sync(ctx, i.DB, tx); err != nil { + i.Logger.Errorw("Failed to insert rule matched incident history", zap.Object("rule", r), zap.Error(err)) return err } } @@ -428,7 +423,7 @@ func (i *Incident) evaluateEscalations(eventTime time.Time) ([]*rule.Entry, erro // Entries are reevaluated now, reset any existing timer, if there might be future time-based escalations, // this function will start a new timer. if i.timer != nil { - i.logger.Info("Stopping reevaluate timer due to escalation evaluation") + i.Logger.Info("Stopping reevaluate timer due to escalation evaluation") i.timer.Stop() i.timer = nil } @@ -439,9 +434,9 @@ func (i *Incident) evaluateEscalations(eventTime time.Time) ([]*rule.Entry, erro retryAfter := rule.RetryNever for rID := range i.Rules { - r := i.runtimeConfig.Rules[rID] + r := i.RuntimeConfig.Rules[rID] if r == nil { - i.logger.Debugw("Incident refers unknown rule, might got deleted", zap.Int64("rule_id", rID)) + i.Logger.Debugw("Incident refers unknown rule, might got deleted", zap.Int64("rule_id", rID)) continue } @@ -450,7 +445,7 @@ func (i *Incident) evaluateEscalations(eventTime time.Time) ([]*rule.Entry, erro if _, ok := i.EscalationState[escalation.ID]; !ok { matched, err := escalation.Eval(filterContext) if err != nil { - i.logger.Warnw( + i.Logger.Warnw( "Failed to evaluate escalation condition", zap.Object("rule", r), zap.Object("escalation", escalation), zap.Error(err), ) @@ -471,9 +466,9 @@ func (i *Incident) evaluateEscalations(eventTime time.Time) ([]*rule.Entry, erro // start time here. nextEvalAt := eventTime.Add(retryAfter) - i.logger.Infow("Scheduling escalation reevaluation", zap.Duration("after", retryAfter), zap.Time("at", nextEvalAt)) + i.Logger.Infow("Scheduling escalation reevaluation", zap.Duration("after", retryAfter), zap.Time("at", nextEvalAt)) i.timer = time.AfterFunc(retryAfter, func() { - i.logger.Info("Reevaluating escalations") + i.Logger.Info("Reevaluating escalations") i.RetriggerEscalations(&event.Event{ Time: nextEvalAt, @@ -490,19 +485,19 @@ func (i *Incident) evaluateEscalations(eventTime time.Time) ([]*rule.Entry, erro // Returns an error on database failure. func (i *Incident) triggerEscalations(ctx context.Context, tx *sqlx.Tx, ev *event.Event, escalations []*rule.Entry) error { for _, escalation := range escalations { - r := i.runtimeConfig.Rules[escalation.RuleID] + r := i.RuntimeConfig.Rules[escalation.RuleID] if r == nil { - i.logger.Debugw("Incident refers unknown rule, might got deleted", zap.Int64("rule_id", escalation.RuleID)) + i.Logger.Debugw("Incident refers unknown rule, might got deleted", zap.Int64("rule_id", escalation.RuleID)) continue } - i.logger.Infow("Rule reached escalation", zap.Object("rule", r), zap.Object("escalation", escalation)) + i.Logger.Infow("Rule reached escalation", zap.Object("rule", r), zap.Object("escalation", escalation)) state := &EscalationState{RuleEscalationID: escalation.ID, TriggeredAt: types.UnixMilli(time.Now())} i.EscalationState[escalation.ID] = state if err := i.AddEscalationTriggered(ctx, tx, state); err != nil { - i.logger.Errorw( + i.Logger.Errorw( "Failed to upsert escalation state", zap.Object("rule", r), zap.Object("escalation", escalation), zap.Error(err), ) @@ -518,8 +513,8 @@ func (i *Incident) triggerEscalations(ctx context.Context, tx *sqlx.Tx, ev *even Type: EscalationTriggered, } - if err := hr.Sync(ctx, i.db, tx); err != nil { - i.logger.Errorw( + if err := hr.Sync(ctx, i.DB, tx); err != nil { + i.Logger.Errorw( "Failed to insert escalation triggered incident history", zap.Object("rule", r), zap.Object("escalation", escalation), zap.Error(err), ) @@ -534,91 +529,6 @@ func (i *Incident) triggerEscalations(ctx context.Context, tx *sqlx.Tx, ev *even return nil } -// notifyContacts executes all the given pending notifications of the current incident. -// Returns error on database failure or if the provided context is cancelled. -func (i *Incident) notifyContacts(ctx context.Context, ev *event.Event, notificationHistories notification.PendingNotifications) error { - baseUrl, err := url.Parse(daemon.Config().Icingaweb2URL) - if err != nil { - i.logger.Errorw("Failed to parse Icinga Web 2 URL", zap.String("url", daemon.Config().Icingaweb2URL), zap.Error(err)) - return err - } - - incidentUrl := baseUrl.JoinPath("/notifications/incident") - incidentUrl.RawQuery = fmt.Sprintf("id=%d", i.ID) - - req := &plugin.NotificationRequest{ - Object: &plugin.Object{ - Name: i.Object.DisplayName(), - Url: ev.URL, - Tags: i.Object.Tags, - ExtraTags: i.Object.ExtraTags, - }, - Incident: &plugin.Incident{ - Id: i.ID, - Url: incidentUrl.String(), - Severity: i.Severity.String(), - }, - Event: &plugin.Event{ - Time: ev.Time, - Type: ev.Type, - Username: ev.Username, - Message: ev.Message, - }, - } - - for contact, histories := range notificationHistories { - for _, history := range histories { - if i.notifyContact(contact, req, history.ChannelID) != nil { - history.State = notification.StateFailed - } else { - history.State = notification.StateSent - } - - history.SentAt = types.UnixMilli(time.Now()) - stmt, _ := i.db.BuildUpdateStmt(history) - if _, err := i.db.NamedExecContext(ctx, stmt, history); err != nil { - i.logger.Errorw("Failed to update contact notified history", - zap.String("contact", contact.String()), zap.Error(err)) - } - } - - if err := ctx.Err(); err != nil { - return err - } - } - - return nil -} - -// notifyContact notifies the given recipient via a channel matching the given ID. -func (i *Incident) notifyContact(contact *recipient.Contact, req *plugin.NotificationRequest, chID int64) error { - ch := i.runtimeConfig.Channels[chID] - if ch == nil { - i.logger.Errorw("Could not find config for channel", zap.Int64("channel_id", chID)) - - return fmt.Errorf("could not find config for channel ID: %d", chID) - } - - i.logger.Infow(fmt.Sprintf("Notify contact %q via %q of type %q", contact.FullName, ch.Name, ch.Type), - zap.Int64("channel_id", chID), zap.String("event_type", req.Event.Type)) - - contactStruct := &plugin.Contact{FullName: contact.FullName} - for _, addr := range contact.Addresses { - contactStruct.Addresses = append(contactStruct.Addresses, &plugin.Address{Type: addr.Type, Address: addr.Address}) - } - req.Contact = contactStruct - - if err := ch.Notify(req); err != nil { - i.logger.Errorw("Failed to send notification via channel plugin", zap.String("type", ch.Type), zap.Error(err)) - return err - } - - i.logger.Infow("Successfully sent a notification via channel plugin", zap.String("type", ch.Type), - zap.String("contact", contact.FullName), zap.String("event_type", req.Event.Type)) - - return nil -} - // errSuperfluousAckEvent is returned when the same ack author submits two successive ack set events on an incident. // This is error is going to be used only within this incident package. var errSuperfluousAckEvent = errors.New("superfluous acknowledgement set event, author is already a manager") @@ -627,9 +537,9 @@ var errSuperfluousAckEvent = errors.New("superfluous acknowledgement set event, // Promotes the ack author to incident.RoleManager if it's not already the case and generates a history entry. // Returns error on database failure. func (i *Incident) processAcknowledgementEvent(ctx context.Context, tx *sqlx.Tx, ev *event.Event) error { - contact := i.runtimeConfig.GetContact(ev.Username) + contact := i.RuntimeConfig.GetContact(ev.Username) if contact == nil { - i.logger.Warnw("Ignoring acknowledgement event from an unknown author", zap.String("author", ev.Username)) + i.Logger.Warnw("Ignoring acknowledgement event from an unknown author", zap.String("author", ev.Username)) return fmt.Errorf("unknown acknowledgment author %q", ev.Username) } @@ -643,14 +553,14 @@ func (i *Incident) processAcknowledgementEvent(ctx context.Context, tx *sqlx.Tx, if oldRole == RoleManager { // The user is already a manager - i.logger.Debugw("Ignoring acknowledgement-set event, author is already a manager", zap.String("author", ev.Username)) + i.Logger.Debugw("Ignoring acknowledgement-set event, author is already a manager", zap.String("author", ev.Username)) return errSuperfluousAckEvent } } else { i.Recipients[recipientKey] = &RecipientState{Role: newRole} } - i.logger.Infof("Contact %q role changed from %s to %s", contact.String(), oldRole.String(), newRole.String()) + i.Logger.Infof("Contact %q role changed from %s to %s", contact.String(), oldRole.String(), newRole.String()) hr := &HistoryRow{ IncidentID: i.ID, @@ -662,17 +572,17 @@ func (i *Incident) processAcknowledgementEvent(ctx context.Context, tx *sqlx.Tx, OldRecipientRole: oldRole, } - if err := hr.Sync(ctx, i.db, tx); err != nil { - i.logger.Errorw("Failed to add recipient role changed history", zap.String("recipient", contact.String()), zap.Error(err)) + if err := hr.Sync(ctx, i.DB, tx); err != nil { + i.Logger.Errorw("Failed to add recipient role changed history", zap.String("recipient", contact.String()), zap.Error(err)) return err } cr := &ContactRow{IncidentID: hr.IncidentID, Key: recipientKey, Role: newRole} - stmt, _ := i.db.BuildUpsertStmt(cr) + stmt, _ := i.DB.BuildUpsertStmt(cr) _, err := tx.NamedExecContext(ctx, stmt, cr) if err != nil { - i.logger.Errorw("Failed to upsert incident contact", zap.String("contact", contact.String()), zap.Error(err)) + i.Logger.Errorw("Failed to upsert incident contact", zap.String("contact", contact.String()), zap.Error(err)) return err } @@ -684,9 +594,9 @@ func (i *Incident) getRecipientsChannel(t time.Time) rule.ContactChannels { contactChs := make(rule.ContactChannels) // Load all escalations recipients channels for escalationID := range i.EscalationState { - escalation := i.runtimeConfig.GetRuleEntry(escalationID) + escalation := i.RuntimeConfig.GetRuleEntry(escalationID) if escalation == nil { - i.logger.Debugw("Incident refers unknown escalation, might got deleted", zap.Int64("escalation_id", escalationID)) + i.Logger.Debugw("Incident refers unknown escalation, might got deleted", zap.Int64("escalation_id", escalationID)) continue } @@ -697,16 +607,16 @@ func (i *Incident) getRecipientsChannel(t time.Time) rule.ContactChannels { // When a recipient has subscribed/managed this incident via the UI or using an ACK, fallback // to the default contact channel. for recipientKey, state := range i.Recipients { - r := i.runtimeConfig.GetRecipient(recipientKey) + r := i.RuntimeConfig.GetRecipient(recipientKey) if r == nil { - i.logger.Debugw("Incident refers unknown recipient key, might got deleted", zap.Inline(recipientKey)) + i.Logger.Debugw("Incident refers unknown recipient key, might got deleted", zap.Inline(recipientKey)) continue } if i.IsNotifiable(state.Role) { contacts := r.GetContactsAt(t) if len(contacts) > 0 { - i.logger.Debugw("Expanded recipient to contacts", + i.Logger.Debugw("Expanded recipient to contacts", zap.Object("recipient", r), zap.Objects("contacts", contacts)) @@ -717,7 +627,7 @@ func (i *Incident) getRecipientsChannel(t time.Time) rule.ContactChannels { } } } else { - i.logger.Warnw("Recipient expanded to no contacts", zap.Object("recipient", r)) + i.Logger.Warnw("Recipient expanded to no contacts", zap.Object("recipient", r)) } } } @@ -730,9 +640,9 @@ func (i *Incident) getRecipientsChannel(t time.Time) rule.ContactChannels { func (i *Incident) restoreRecipients(ctx context.Context) error { contact := &ContactRow{} var contacts []*ContactRow - err := i.db.SelectContext(ctx, &contacts, i.db.Rebind(i.db.BuildSelectStmt(contact, contact)+` WHERE "incident_id" = ?`), i.ID) + err := i.DB.SelectContext(ctx, &contacts, i.DB.Rebind(i.DB.BuildSelectStmt(contact, contact)+` WHERE "incident_id" = ?`), i.ID) if err != nil { - i.logger.Errorw("Failed to restore incident recipients from the database", zap.Error(err)) + i.Logger.Errorw("Failed to restore incident recipients from the database", zap.Error(err)) return err } diff --git a/internal/incident/incidents.go b/internal/incident/incidents.go index 651650c6..1f130c98 100644 --- a/internal/incident/incidents.go +++ b/internal/incident/incidents.go @@ -98,10 +98,10 @@ func LoadOpenIncidents(ctx context.Context, db *database.DB, logger *logging.Log i.EscalationState[state.RuleEscalationID] = state // Restore the incident rule matching the current escalation state if any. - i.runtimeConfig.RLock() - defer i.runtimeConfig.RUnlock() + i.RuntimeConfig.RLock() + defer i.RuntimeConfig.RUnlock() - escalation := i.runtimeConfig.GetRuleEntry(state.RuleEscalationID) + escalation := i.RuntimeConfig.GetRuleEntry(state.RuleEscalationID) if escalation != nil { i.Rules[escalation.RuleID] = struct{}{} } @@ -121,7 +121,7 @@ func LoadOpenIncidents(ctx context.Context, db *database.DB, logger *logging.Log for _, i := range incidentsById { i.Object = object.GetFromCache(i.ObjectID) i.isMuted = i.Object.IsMuted() - i.logger = logger.With(zap.String("object", i.Object.DisplayName()), + i.Logger = logger.With(zap.String("object", i.Object.DisplayName()), zap.String("incident", i.String())) currentIncidentsMu.Lock() diff --git a/internal/incident/sync.go b/internal/incident/sync.go index c0621e57..ccbb59ee 100644 --- a/internal/incident/sync.go +++ b/internal/incident/sync.go @@ -27,13 +27,13 @@ func (i *Incident) Upsert() interface{} { // Returns an error on db failure. func (i *Incident) Sync(ctx context.Context, tx *sqlx.Tx) error { if i.ID != 0 { - stmt, _ := i.db.BuildUpsertStmt(i) + stmt, _ := i.DB.BuildUpsertStmt(i) _, err := tx.NamedExecContext(ctx, stmt, i) if err != nil { return fmt.Errorf("failed to upsert incident: %w", err) } } else { - stmt := utils.BuildInsertStmtWithout(i.db, i, "id") + stmt := utils.BuildInsertStmtWithout(i.DB, i, "id") incidentId, err := utils.InsertAndFetchId(ctx, tx, stmt, i) if err != nil { return err @@ -48,7 +48,7 @@ func (i *Incident) Sync(ctx context.Context, tx *sqlx.Tx) error { func (i *Incident) AddEscalationTriggered(ctx context.Context, tx *sqlx.Tx, state *EscalationState) error { state.IncidentID = i.ID - stmt, _ := i.db.BuildUpsertStmt(state) + stmt, _ := i.DB.BuildUpsertStmt(state) _, err := tx.NamedExecContext(ctx, stmt, state) return err @@ -77,7 +77,7 @@ func (i *Incident) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *ru oldRole := state.Role state.Role = newRole - i.logger.Infof("Contact %q role changed from %s to %s", r, state.Role.String(), newRole.String()) + i.Logger.Infof("Contact %q role changed from %s to %s", r, state.Role.String(), newRole.String()) hr := &HistoryRow{ IncidentID: i.ID, @@ -89,8 +89,8 @@ func (i *Incident) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *ru OldRecipientRole: oldRole, } - if err := hr.Sync(ctx, i.db, tx); err != nil { - i.logger.Errorw( + if err := hr.Sync(ctx, i.DB, tx); err != nil { + i.Logger.Errorw( "Failed to insert recipient role changed incident history", zap.Object("escalation", escalation), zap.String("recipients", r.String()), zap.Error(err), ) @@ -100,10 +100,10 @@ func (i *Incident) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *ru cr.Role = state.Role } - stmt, _ := i.db.BuildUpsertStmt(cr) + stmt, _ := i.DB.BuildUpsertStmt(cr) _, err := tx.NamedExecContext(ctx, stmt, cr) if err != nil { - i.logger.Errorw( + i.Logger.Errorw( "Failed to upsert incident recipient", zap.Object("escalation", escalation), zap.String("recipient", r.String()), zap.Error(err), ) @@ -118,7 +118,7 @@ func (i *Incident) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *ru // Returns an error on database failure. func (i *Incident) AddRuleMatched(ctx context.Context, tx *sqlx.Tx, r *rule.Rule) error { rr := &RuleRow{IncidentID: i.ID, RuleID: r.ID} - stmt, _ := i.db.BuildUpsertStmt(rr) + stmt, _ := i.DB.BuildUpsertStmt(rr) _, err := tx.NamedExecContext(ctx, stmt, rr) return err @@ -132,7 +132,7 @@ func (i *Incident) AddRuleMatched(ctx context.Context, tx *sqlx.Tx, r *rule.Rule func (i *Incident) generateNotifications( ctx context.Context, tx *sqlx.Tx, ev *event.Event, contactChannels rule.ContactChannels, ) (notification.PendingNotifications, error) { - notifications, err := notification.AddNotifications(ctx, i.db, tx, contactChannels, func(n *notification.History) { + notifications, err := notification.AddNotifications(ctx, i.DB, tx, contactChannels, func(n *notification.History) { n.IncidentID = utils.ToDBInt(i.ID) n.Message = utils.ToDBString(ev.Message) if i.isMuted && i.Object.IsMuted() { @@ -140,7 +140,7 @@ func (i *Incident) generateNotifications( } }) if err != nil { - i.logger.Errorw("Failed to add pending notification histories", zap.Error(err)) + i.Logger.Errorw("Failed to add pending notification histories", zap.Error(err)) return nil, err } @@ -155,8 +155,8 @@ func (i *Incident) generateNotifications( NotificationHistoryID: utils.ToDBInt(entry.HistoryRowID), } - if err := hr.Sync(ctx, i.db, tx); err != nil { - i.logger.Errorw("Failed to insert incident notification history", + if err := hr.Sync(ctx, i.DB, tx); err != nil { + i.Logger.Errorw("Failed to insert incident notification history", zap.String("contact", contact.FullName), zap.Bool("incident_muted", i.Object.IsMuted()), zap.Error(err)) return nil, err diff --git a/internal/incident/utils.go b/internal/incident/utils.go new file mode 100644 index 00000000..357aa0e2 --- /dev/null +++ b/internal/incident/utils.go @@ -0,0 +1,32 @@ +package incident + +import ( + "fmt" + "github.com/icinga/icinga-notifications/internal/daemon" + "github.com/icinga/icinga-notifications/internal/event" + "github.com/icinga/icinga-notifications/internal/notification" + "github.com/icinga/icinga-notifications/pkg/plugin" + "go.uber.org/zap" + "net/url" +) + +// makeNotificationRequest generates a *plugin.NotificationRequest for the provided event. +// Fails fatally when fails to parse the Icinga Web 2 url. +func (i *Incident) makeNotificationRequest(ev *event.Event) *plugin.NotificationRequest { + baseUrl, err := url.Parse(daemon.Config().Icingaweb2URL) + if err != nil { + i.Logger.Panicw("Failed to parse Icinga Web 2 URL", zap.String("url", daemon.Config().Icingaweb2URL), zap.Error(err)) + } + + incidentUrl := baseUrl.JoinPath("/notifications/incident") + incidentUrl.RawQuery = fmt.Sprintf("id=%d", i.ID) + + req := notification.NewPluginRequest(i.Object, ev) + req.Incident = &plugin.Incident{ + Id: i.ID, + Url: incidentUrl.String(), + Severity: i.Severity.String(), + } + + return req +} diff --git a/internal/notification/notifier.go b/internal/notification/notifier.go new file mode 100644 index 00000000..79a32cea --- /dev/null +++ b/internal/notification/notifier.go @@ -0,0 +1,103 @@ +package notification + +import ( + "context" + "fmt" + "github.com/icinga/icinga-go-library/database" + "github.com/icinga/icinga-go-library/types" + "github.com/icinga/icinga-notifications/internal/config" + "github.com/icinga/icinga-notifications/internal/event" + "github.com/icinga/icinga-notifications/internal/object" + "github.com/icinga/icinga-notifications/internal/recipient" + "github.com/icinga/icinga-notifications/pkg/plugin" + "go.uber.org/zap" + "time" +) + +// Notifier is helper type used to send notifications requests to their recipients. +type Notifier struct { + DB *database.DB `db:"-" json:"-"` + RuntimeConfig *config.RuntimeConfig `db:"-" json:"-"` + Logger *zap.SugaredLogger `db:"-" json:"-"` +} + +// NotifyContacts delivers all the provided pending notifications to their corresponding contacts. +// +// Each of the given notifications will either be marked as StateSent or StateFailed in the database. +// When a specific notification fails to be sent, it won't interrupt the subsequent notifications, instead +// it will simply log the error and continue sending the remaining ones. +// +// Returns an error if the specified context is cancelled, otherwise always nil. +func (n *Notifier) NotifyContacts(ctx context.Context, req *plugin.NotificationRequest, notifications PendingNotifications) error { + for contact, entries := range notifications { + for _, notification := range entries { + if n.NotifyContact(contact, req, notification.ChannelID) != nil { + notification.State = StateFailed + } else { + notification.State = StateSent + } + notification.SentAt = types.UnixMilli(time.Now()) + + stmt, _ := n.DB.BuildUpdateStmt(notification) + if _, err := n.DB.NamedExecContext(ctx, stmt, notification); err != nil { + n.Logger.Errorw("Failed to update contact notified history", + zap.String("contact", contact.String()), zap.Error(err)) + } + } + + if err := ctx.Err(); err != nil { + return err + } + } + + return nil +} + +// NotifyContact notifies the given recipient via a channel matching the given ID. +// +// Please make sure to call this method while holding the config.RuntimeConfig lock. +// Returns an error if unable to find a channel with the specified ID or fails to send the notification. +func (n *Notifier) NotifyContact(c *recipient.Contact, req *plugin.NotificationRequest, chID int64) error { + ch := n.RuntimeConfig.Channels[chID] + if ch == nil { + n.Logger.Errorw("Cannot not find config for channel", zap.Int64("channel_id", chID)) + return fmt.Errorf("cannot not find config for channel ID '%d'", chID) + } + + n.Logger.Infow(fmt.Sprintf("Notify contact %q via %q of type %q", c, ch.Name, ch.Type), + zap.Int64("channel_id", chID), zap.String("event_tye", req.Event.Type)) + + contactStruct := &plugin.Contact{FullName: c.FullName} + for _, addr := range c.Addresses { + contactStruct.Addresses = append(contactStruct.Addresses, &plugin.Address{Type: addr.Type, Address: addr.Address}) + } + req.Contact = contactStruct + + if err := ch.Notify(req); err != nil { + n.Logger.Errorw("Failed to send notification via channel plugin", zap.String("type", ch.Type), zap.Error(err)) + return err + } + + n.Logger.Infow("Successfully sent a notification via channel plugin", zap.String("type", ch.Type), + zap.String("contact", c.String()), zap.String("event_type", req.Event.Type)) + + return nil +} + +// NewPluginRequest returns a new plugin.NotificationRequest from the given arguments. +func NewPluginRequest(obj *object.Object, ev *event.Event) *plugin.NotificationRequest { + return &plugin.NotificationRequest{ + Object: &plugin.Object{ + Name: obj.DisplayName(), + Url: ev.URL, + Tags: obj.Tags, + ExtraTags: obj.ExtraTags, + }, + Event: &plugin.Event{ + Time: ev.Time, + Type: ev.Type, + Username: ev.Username, + Message: ev.Message, + }, + } +}