Skip to content

Commit

Permalink
Introduce and use notification package
Browse files Browse the repository at this point in the history
  • Loading branch information
yhabteab committed Aug 1, 2024
1 parent f021d9a commit fae5a5c
Show file tree
Hide file tree
Showing 6 changed files with 233 additions and 158 deletions.
49 changes: 14 additions & 35 deletions internal/incident/db_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,22 +66,20 @@ func (r *RuleRow) TableName() string {

// HistoryRow represents a single incident history database entry.
type HistoryRow struct {
ID int64 `db:"id"`
IncidentID int64 `db:"incident_id"`
RuleEntryID types.Int `db:"rule_entry_id"`
EventID types.Int `db:"event_id"`
recipient.Key `db:",inline"`
RuleID types.Int `db:"rule_id"`
Time types.UnixMilli `db:"time"`
Type HistoryEventType `db:"type"`
ChannelID types.Int `db:"channel_id"`
NewSeverity event.Severity `db:"new_severity"`
OldSeverity event.Severity `db:"old_severity"`
NewRecipientRole ContactRole `db:"new_recipient_role"`
OldRecipientRole ContactRole `db:"old_recipient_role"`
Message types.String `db:"message"`
NotificationState NotificationState `db:"notification_state"`
SentAt types.UnixMilli `db:"sent_at"`
ID int64 `db:"id"`
IncidentID int64 `db:"incident_id"`
RuleEntryID types.Int `db:"rule_entry_id"`
EventID types.Int `db:"event_id"`
recipient.Key `db:",inline"`
RuleID types.Int `db:"rule_id"`
NotificationHistoryID types.Int `db:"notification_history_id"`
Time types.UnixMilli `db:"time"`
Type HistoryEventType `db:"type"`
NewSeverity event.Severity `db:"new_severity"`
OldSeverity event.Severity `db:"old_severity"`
NewRecipientRole ContactRole `db:"new_recipient_role"`
OldRecipientRole ContactRole `db:"old_recipient_role"`
Message types.String `db:"message"` // Is only used to store Incident (un)mute reason.
}

// TableName implements the contracts.TableNamer interface.
Expand All @@ -101,22 +99,3 @@ func (h *HistoryRow) Sync(ctx context.Context, db *database.DB, tx *sqlx.Tx) err

return nil
}

// NotificationEntry is used to cache a set of incident history fields of type Notified.
//
// The event processing workflow is performed in a separate transaction before trying to send the actual
// notifications. Thus, all resulting notification entries are marked as pending, and it creates a reference
// to them of this type. The cached entries are then used to actually notify the contacts and mark the pending
// notification entries as either NotificationStateSent or NotificationStateFailed.
type NotificationEntry struct {
HistoryRowID int64 `db:"id"`
ContactID int64 `db:"-"`
ChannelID int64 `db:"-"`
State NotificationState `db:"notification_state"`
SentAt types.UnixMilli `db:"sent_at"`
}

// TableName implements the contracts.TableNamer interface.
func (h *NotificationEntry) TableName() string {
return "incident_history"
}
43 changes: 17 additions & 26 deletions internal/incident/incident.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/icinga/icinga-notifications/internal/config"
"github.com/icinga/icinga-notifications/internal/daemon"
"github.com/icinga/icinga-notifications/internal/event"
"github.com/icinga/icinga-notifications/internal/notification"
"github.com/icinga/icinga-notifications/internal/object"
"github.com/icinga/icinga-notifications/internal/recipient"
"github.com/icinga/icinga-notifications/internal/rule"
Expand Down Expand Up @@ -188,8 +189,7 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event) error {
}
}

var notifications []*NotificationEntry
notifications, err = i.generateNotifications(ctx, tx, ev, i.getRecipientsChannel(ev.Time))
notifications, err := i.generateNotifications(ctx, tx, ev, i.getRecipientsChannel(ev.Time))
if err != nil {
return err
}
Expand Down Expand Up @@ -234,7 +234,7 @@ func (i *Incident) RetriggerEscalations(ev *event.Event) {
return
}

var notifications []*NotificationEntry
notifications := make(notification.PendingNotifications)
ctx := context.Background()
err = utils.RunInTx(ctx, i.db, func(tx *sqlx.Tx) error {
err := ev.Sync(ctx, tx, i.db, i.Object.ID)
Expand Down Expand Up @@ -283,7 +283,6 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx,
Type: IncidentSeverityChanged,
NewSeverity: newSeverity,
OldSeverity: oldSeverity,
Message: utils.ToDBString(ev.Message),
}

if err := hr.Sync(ctx, i.db, tx); err != nil {
Expand Down Expand Up @@ -339,7 +338,6 @@ func (i *Incident) processIncidentOpenedEvent(ctx context.Context, tx *sqlx.Tx,
Time: types.UnixMilli(ev.Time),
EventID: utils.ToDBInt(ev.ID),
NewSeverity: i.Severity,
Message: utils.ToDBString(ev.Message),
}

if err := hr.Sync(ctx, i.db, tx); err != nil {
Expand Down Expand Up @@ -538,7 +536,7 @@ func (i *Incident) triggerEscalations(ctx context.Context, tx *sqlx.Tx, ev *even

// notifyContacts executes all the given pending notifications of the current incident.
// Returns error on database failure or if the provided context is cancelled.
func (i *Incident) notifyContacts(ctx context.Context, ev *event.Event, notifications []*NotificationEntry) error {
func (i *Incident) notifyContacts(ctx context.Context, ev *event.Event, notificationHistories notification.PendingNotifications) error {
baseUrl, err := url.Parse(daemon.Config().Icingaweb2URL)
if err != nil {
i.logger.Errorw("Failed to parse Icinga Web 2 URL", zap.String("url", daemon.Config().Icingaweb2URL), zap.Error(err))
Expand Down Expand Up @@ -568,26 +566,20 @@ func (i *Incident) notifyContacts(ctx context.Context, ev *event.Event, notifica
},
}

for _, notification := range notifications {
contact := i.runtimeConfig.Contacts[notification.ContactID]
if contact == nil {
i.logger.Debugw("Incident refers unknown contact, might got deleted", zap.Int64("contact_id", notification.ContactID))
continue
}

if i.notifyContact(contact, req, notification.ChannelID) != nil {
notification.State = NotificationStateFailed
} else {
notification.State = NotificationStateSent
}
for contact, histories := range notificationHistories {
for _, history := range histories {
if i.notifyContact(contact, req, history.ChannelID) != nil {
history.State = notification.StateFailed
} else {
history.State = notification.StateSent
}

notification.SentAt = types.UnixMilli(time.Now())
stmt, _ := i.db.BuildUpdateStmt(notification)
if _, err := i.db.NamedExecContext(ctx, stmt, notification); err != nil {
i.logger.Errorw(
"Failed to update contact notified incident history", zap.String("contact", contact.String()),
zap.Error(err),
)
history.SentAt = types.UnixMilli(time.Now())
stmt, _ := i.db.BuildUpdateStmt(history)
if _, err := i.db.NamedExecContext(ctx, stmt, history); err != nil {
i.logger.Errorw("Failed to update contact notified history",
zap.String("contact", contact.String()), zap.Error(err))
}
}

if err := ctx.Err(); err != nil {
Expand Down Expand Up @@ -668,7 +660,6 @@ func (i *Incident) processAcknowledgementEvent(ctx context.Context, tx *sqlx.Tx,
Time: types.UnixMilli(time.Now()),
NewRecipientRole: newRole,
OldRecipientRole: oldRole,
Message: utils.ToDBString(ev.Message),
}

if err := hr.Sync(ctx, i.db, tx); err != nil {
Expand Down
71 changes: 0 additions & 71 deletions internal/incident/notification_state.go

This file was deleted.

53 changes: 27 additions & 26 deletions internal/incident/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"github.com/icinga/icinga-go-library/types"
"github.com/icinga/icinga-notifications/internal/event"
"github.com/icinga/icinga-notifications/internal/notification"
"github.com/icinga/icinga-notifications/internal/recipient"
"github.com/icinga/icinga-notifications/internal/rule"
"github.com/icinga/icinga-notifications/internal/utils"
Expand Down Expand Up @@ -125,28 +126,33 @@ func (i *Incident) AddRuleMatched(ctx context.Context, tx *sqlx.Tx, r *rule.Rule

// generateNotifications generates incident notification histories of the given recipients.
//
// This function will just insert NotificationStateSuppressed incident histories and return an empty slice if
// This function will just insert notification.StateSuppressed incident histories and return an empty slice if
// the current Object is muted, otherwise a slice of pending *NotificationEntry(ies) that can be used to update
// the corresponding histories after the actual notifications have been sent out.
func (i *Incident) generateNotifications(
ctx context.Context, tx *sqlx.Tx, ev *event.Event, contactChannels rule.ContactChannels,
) ([]*NotificationEntry, error) {
var notifications []*NotificationEntry
suppress := i.isMuted && i.Object.IsMuted()
for contact, channels := range contactChannels {
for chID := range channels {
) (notification.PendingNotifications, error) {
notifications, err := notification.AddNotifications(ctx, i.db, tx, contactChannels, func(n *notification.History) {
n.IncidentID = utils.ToDBInt(i.ID)
n.Message = utils.ToDBString(ev.Message)
if i.isMuted && i.Object.IsMuted() {
n.NotificationState = notification.StateSuppressed
}
})
if err != nil {
i.logger.Errorw("Failed to add pending notification histories", zap.Error(err))
return nil, err
}

for contact, entries := range notifications {
for _, entry := range entries {
hr := &HistoryRow{
IncidentID: i.ID,
Key: recipient.ToKey(contact),
EventID: utils.ToDBInt(ev.ID),
Time: types.UnixMilli(time.Now()),
Type: Notified,
ChannelID: utils.ToDBInt(chID),
NotificationState: NotificationStatePending,
Message: utils.ToDBString(ev.Message),
}
if suppress {
hr.NotificationState = NotificationStateSuppressed
IncidentID: i.ID,
Key: recipient.ToKey(contact),
EventID: utils.ToDBInt(ev.ID),
Time: types.UnixMilli(time.Now()),
Type: Notified,
NotificationHistoryID: utils.ToDBInt(entry.HistoryRowID),
}

if err := hr.Sync(ctx, i.db, tx); err != nil {
Expand All @@ -155,17 +161,12 @@ func (i *Incident) generateNotifications(
zap.Error(err))
return nil, err
}

if !suppress {
notifications = append(notifications, &NotificationEntry{
HistoryRowID: hr.ID,
ContactID: contact.ID,
State: NotificationStatePending,
ChannelID: chID,
})
}
}
}

if i.isMuted && i.Object.IsMuted() {
notifications = nil
}

return notifications, nil
}
Loading

0 comments on commit fae5a5c

Please sign in to comment.