diff --git a/internal/incident/db_types.go b/internal/incident/db_types.go index d9224430..12648640 100644 --- a/internal/incident/db_types.go +++ b/internal/incident/db_types.go @@ -66,22 +66,20 @@ func (r *RuleRow) TableName() string { // HistoryRow represents a single incident history database entry. type HistoryRow struct { - ID int64 `db:"id"` - IncidentID int64 `db:"incident_id"` - RuleEntryID types.Int `db:"rule_entry_id"` - EventID types.Int `db:"event_id"` - recipient.Key `db:",inline"` - RuleID types.Int `db:"rule_id"` - Time types.UnixMilli `db:"time"` - Type HistoryEventType `db:"type"` - ChannelID types.Int `db:"channel_id"` - NewSeverity event.Severity `db:"new_severity"` - OldSeverity event.Severity `db:"old_severity"` - NewRecipientRole ContactRole `db:"new_recipient_role"` - OldRecipientRole ContactRole `db:"old_recipient_role"` - Message types.String `db:"message"` - NotificationState NotificationState `db:"notification_state"` - SentAt types.UnixMilli `db:"sent_at"` + ID int64 `db:"id"` + IncidentID int64 `db:"incident_id"` + RuleEntryID types.Int `db:"rule_entry_id"` + EventID types.Int `db:"event_id"` + recipient.Key `db:",inline"` + RuleID types.Int `db:"rule_id"` + NotificationHistoryID types.Int `db:"notification_history_id"` + Time types.UnixMilli `db:"time"` + Type HistoryEventType `db:"type"` + NewSeverity event.Severity `db:"new_severity"` + OldSeverity event.Severity `db:"old_severity"` + NewRecipientRole ContactRole `db:"new_recipient_role"` + OldRecipientRole ContactRole `db:"old_recipient_role"` + Message types.String `db:"message"` // Is only used to store Incident (un)mute reason. } // TableName implements the contracts.TableNamer interface. @@ -101,22 +99,3 @@ func (h *HistoryRow) Sync(ctx context.Context, db *database.DB, tx *sqlx.Tx) err return nil } - -// NotificationEntry is used to cache a set of incident history fields of type Notified. -// -// The event processing workflow is performed in a separate transaction before trying to send the actual -// notifications. Thus, all resulting notification entries are marked as pending, and it creates a reference -// to them of this type. The cached entries are then used to actually notify the contacts and mark the pending -// notification entries as either NotificationStateSent or NotificationStateFailed. -type NotificationEntry struct { - HistoryRowID int64 `db:"id"` - ContactID int64 `db:"-"` - ChannelID int64 `db:"-"` - State NotificationState `db:"notification_state"` - SentAt types.UnixMilli `db:"sent_at"` -} - -// TableName implements the contracts.TableNamer interface. -func (h *NotificationEntry) TableName() string { - return "incident_history" -} diff --git a/internal/incident/incident.go b/internal/incident/incident.go index 76613500..9038c928 100644 --- a/internal/incident/incident.go +++ b/internal/incident/incident.go @@ -9,6 +9,7 @@ import ( "github.com/icinga/icinga-notifications/internal/config" "github.com/icinga/icinga-notifications/internal/daemon" "github.com/icinga/icinga-notifications/internal/event" + "github.com/icinga/icinga-notifications/internal/notification" "github.com/icinga/icinga-notifications/internal/object" "github.com/icinga/icinga-notifications/internal/recipient" "github.com/icinga/icinga-notifications/internal/rule" @@ -188,8 +189,7 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event) error { } } - var notifications []*NotificationEntry - notifications, err = i.generateNotifications(ctx, tx, ev, i.getRecipientsChannel(ev.Time)) + notifications, err := i.generateNotifications(ctx, tx, ev, i.getRecipientsChannel(ev.Time)) if err != nil { return err } @@ -234,7 +234,7 @@ func (i *Incident) RetriggerEscalations(ev *event.Event) { return } - var notifications []*NotificationEntry + notifications := make(notification.PendingNotifications) ctx := context.Background() err = utils.RunInTx(ctx, i.db, func(tx *sqlx.Tx) error { err := ev.Sync(ctx, tx, i.db, i.Object.ID) @@ -283,7 +283,6 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx, Type: IncidentSeverityChanged, NewSeverity: newSeverity, OldSeverity: oldSeverity, - Message: utils.ToDBString(ev.Message), } if err := hr.Sync(ctx, i.db, tx); err != nil { @@ -339,7 +338,6 @@ func (i *Incident) processIncidentOpenedEvent(ctx context.Context, tx *sqlx.Tx, Time: types.UnixMilli(ev.Time), EventID: utils.ToDBInt(ev.ID), NewSeverity: i.Severity, - Message: utils.ToDBString(ev.Message), } if err := hr.Sync(ctx, i.db, tx); err != nil { @@ -538,7 +536,7 @@ func (i *Incident) triggerEscalations(ctx context.Context, tx *sqlx.Tx, ev *even // notifyContacts executes all the given pending notifications of the current incident. // Returns error on database failure or if the provided context is cancelled. -func (i *Incident) notifyContacts(ctx context.Context, ev *event.Event, notifications []*NotificationEntry) error { +func (i *Incident) notifyContacts(ctx context.Context, ev *event.Event, notificationHistories notification.PendingNotifications) error { baseUrl, err := url.Parse(daemon.Config().Icingaweb2URL) if err != nil { i.logger.Errorw("Failed to parse Icinga Web 2 URL", zap.String("url", daemon.Config().Icingaweb2URL), zap.Error(err)) @@ -568,26 +566,20 @@ func (i *Incident) notifyContacts(ctx context.Context, ev *event.Event, notifica }, } - for _, notification := range notifications { - contact := i.runtimeConfig.Contacts[notification.ContactID] - if contact == nil { - i.logger.Debugw("Incident refers unknown contact, might got deleted", zap.Int64("contact_id", notification.ContactID)) - continue - } - - if i.notifyContact(contact, req, notification.ChannelID) != nil { - notification.State = NotificationStateFailed - } else { - notification.State = NotificationStateSent - } + for contact, histories := range notificationHistories { + for _, history := range histories { + if i.notifyContact(contact, req, history.ChannelID) != nil { + history.State = notification.StateFailed + } else { + history.State = notification.StateSent + } - notification.SentAt = types.UnixMilli(time.Now()) - stmt, _ := i.db.BuildUpdateStmt(notification) - if _, err := i.db.NamedExecContext(ctx, stmt, notification); err != nil { - i.logger.Errorw( - "Failed to update contact notified incident history", zap.String("contact", contact.String()), - zap.Error(err), - ) + history.SentAt = types.UnixMilli(time.Now()) + stmt, _ := i.db.BuildUpdateStmt(history) + if _, err := i.db.NamedExecContext(ctx, stmt, history); err != nil { + i.logger.Errorw("Failed to update contact notified history", + zap.String("contact", contact.String()), zap.Error(err)) + } } if err := ctx.Err(); err != nil { @@ -668,7 +660,6 @@ func (i *Incident) processAcknowledgementEvent(ctx context.Context, tx *sqlx.Tx, Time: types.UnixMilli(time.Now()), NewRecipientRole: newRole, OldRecipientRole: oldRole, - Message: utils.ToDBString(ev.Message), } if err := hr.Sync(ctx, i.db, tx); err != nil { diff --git a/internal/incident/notification_state.go b/internal/incident/notification_state.go deleted file mode 100644 index 36196839..00000000 --- a/internal/incident/notification_state.go +++ /dev/null @@ -1,71 +0,0 @@ -package incident - -import ( - "database/sql/driver" - "fmt" -) - -type NotificationState int - -const ( - NotificationStateNull NotificationState = iota - NotificationStateSuppressed - NotificationStatePending - NotificationStateSent - NotificationStateFailed -) - -var notificationStatTypeByName = map[string]NotificationState{ - "suppressed": NotificationStateSuppressed, - "pending": NotificationStatePending, - "sent": NotificationStateSent, - "failed": NotificationStateFailed, -} - -var notificationStateTypeToName = func() map[NotificationState]string { - stateTypes := make(map[NotificationState]string) - for name, eventType := range notificationStatTypeByName { - stateTypes[eventType] = name - } - return stateTypes -}() - -// Scan implements the sql.Scanner interface. -// Supports SQL NULL. -func (n *NotificationState) Scan(src any) error { - if src == nil { - *n = NotificationStateNull - return nil - } - - var name string - switch val := src.(type) { - case string: - name = val - case []byte: - name = string(val) - default: - return fmt.Errorf("unable to scan type %T into NotificationState", src) - } - - historyType, ok := notificationStatTypeByName[name] - if !ok { - return fmt.Errorf("unknown notification state type %q", name) - } - - *n = historyType - - return nil -} - -func (n NotificationState) Value() (driver.Value, error) { - if n == NotificationStateNull { - return nil, nil - } - - return n.String(), nil -} - -func (n *NotificationState) String() string { - return notificationStateTypeToName[*n] -} diff --git a/internal/incident/sync.go b/internal/incident/sync.go index a1d376a6..c0621e57 100644 --- a/internal/incident/sync.go +++ b/internal/incident/sync.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/icinga/icinga-go-library/types" "github.com/icinga/icinga-notifications/internal/event" + "github.com/icinga/icinga-notifications/internal/notification" "github.com/icinga/icinga-notifications/internal/recipient" "github.com/icinga/icinga-notifications/internal/rule" "github.com/icinga/icinga-notifications/internal/utils" @@ -125,28 +126,33 @@ func (i *Incident) AddRuleMatched(ctx context.Context, tx *sqlx.Tx, r *rule.Rule // generateNotifications generates incident notification histories of the given recipients. // -// This function will just insert NotificationStateSuppressed incident histories and return an empty slice if +// This function will just insert notification.StateSuppressed incident histories and return an empty slice if // the current Object is muted, otherwise a slice of pending *NotificationEntry(ies) that can be used to update // the corresponding histories after the actual notifications have been sent out. func (i *Incident) generateNotifications( ctx context.Context, tx *sqlx.Tx, ev *event.Event, contactChannels rule.ContactChannels, -) ([]*NotificationEntry, error) { - var notifications []*NotificationEntry - suppress := i.isMuted && i.Object.IsMuted() - for contact, channels := range contactChannels { - for chID := range channels { +) (notification.PendingNotifications, error) { + notifications, err := notification.AddNotifications(ctx, i.db, tx, contactChannels, func(n *notification.History) { + n.IncidentID = utils.ToDBInt(i.ID) + n.Message = utils.ToDBString(ev.Message) + if i.isMuted && i.Object.IsMuted() { + n.NotificationState = notification.StateSuppressed + } + }) + if err != nil { + i.logger.Errorw("Failed to add pending notification histories", zap.Error(err)) + return nil, err + } + + for contact, entries := range notifications { + for _, entry := range entries { hr := &HistoryRow{ - IncidentID: i.ID, - Key: recipient.ToKey(contact), - EventID: utils.ToDBInt(ev.ID), - Time: types.UnixMilli(time.Now()), - Type: Notified, - ChannelID: utils.ToDBInt(chID), - NotificationState: NotificationStatePending, - Message: utils.ToDBString(ev.Message), - } - if suppress { - hr.NotificationState = NotificationStateSuppressed + IncidentID: i.ID, + Key: recipient.ToKey(contact), + EventID: utils.ToDBInt(ev.ID), + Time: types.UnixMilli(time.Now()), + Type: Notified, + NotificationHistoryID: utils.ToDBInt(entry.HistoryRowID), } if err := hr.Sync(ctx, i.db, tx); err != nil { @@ -155,17 +161,12 @@ func (i *Incident) generateNotifications( zap.Error(err)) return nil, err } - - if !suppress { - notifications = append(notifications, &NotificationEntry{ - HistoryRowID: hr.ID, - ContactID: contact.ID, - State: NotificationStatePending, - ChannelID: chID, - }) - } } } + if i.isMuted && i.Object.IsMuted() { + notifications = nil + } + return notifications, nil } diff --git a/internal/notification/history.go b/internal/notification/history.go new file mode 100644 index 00000000..fb252d3b --- /dev/null +++ b/internal/notification/history.go @@ -0,0 +1,104 @@ +package notification + +import ( + "context" + "github.com/icinga/icinga-go-library/database" + "github.com/icinga/icinga-go-library/types" + "github.com/icinga/icinga-notifications/internal/recipient" + "github.com/icinga/icinga-notifications/internal/rule" + "github.com/icinga/icinga-notifications/internal/utils" + "github.com/jmoiron/sqlx" + "github.com/pkg/errors" + "time" +) + +// History represents a single history database entry. +type History struct { + ID int64 `db:"id"` + IncidentID types.Int `db:"incident_id"` // Is only set for incident related notifications + RuleEntryID types.Int `db:"rule_entry_id"` + recipient.Key `db:",inline"` + Time types.UnixMilli `db:"time"` + ChannelID int64 `db:"channel_id"` + Message types.String `db:"message"` + NotificationState State `db:"notification_state"` + SentAt types.UnixMilli `db:"sent_at"` +} + +// Sync persists the current state of this history to the database and retrieves the just inserted history ID. +// Returns error when failed to execute the query. +func (h *History) Sync(ctx context.Context, db *database.DB, tx *sqlx.Tx) error { + historyId, err := utils.InsertAndFetchId(ctx, tx, utils.BuildInsertStmtWithout(db, h, "id"), h) + if err != nil { + return err + } + + h.ID = historyId + + return nil +} + +// TableName implements the contracts.TableNamer interface. +func (h *History) TableName() string { + return "notification_history" +} + +// Entry is used to cache a set of incident history fields of type Notified. +// +// The event processing workflow is performed in a separate transaction before trying to send the actual +// notifications. Thus, all resulting notification entries are marked as pending, and it creates a reference +// to them of this type. The cached entries are then used to actually notify the contacts and mark the pending +// notification entries as either StateSent or StateFailed. +type Entry struct { + HistoryRowID int64 `db:"id"` + ContactID int64 + ChannelID int64 + State State `db:"notification_state"` + SentAt types.UnixMilli `db:"sent_at"` +} + +// TableName implements the contracts.TableNamer interface. +func (h *Entry) TableName() string { + return "notification_history" +} + +// InitHistoryFunc is used to additionally initialise a History entry before persisting it to the database. +type InitHistoryFunc func(*History) + +// PendingNotifications is a map of per recipient.Contact pending notifications. +// Is just a short/readable form of the actual map. +type PendingNotifications map[*recipient.Contact][]*Entry + +// AddNotifications inserts by default pending notification histories into the global notification History table. +// If you need to set/override some additional fields of the History type, you can specify an InitHistoryFunc +// as an argument that is called prior to persisting the history entry to the database. +// +// Returns on success PendingNotifications referencing the just inserted entries and error on any database failure. +func AddNotifications( + ctx context.Context, db *database.DB, tx *sqlx.Tx, contactChannels rule.ContactChannels, initializer InitHistoryFunc, +) (PendingNotifications, error) { + notifications := make(PendingNotifications) + for contact, channels := range contactChannels { + for chID := range channels { + nh := &History{Key: recipient.ToKey(contact), Time: types.UnixMilli(time.Now()), ChannelID: chID} + nh.NotificationState = StatePending + if initializer != nil { + // Might be used to initialise some context specific fields like "incident_id", "rule_entry_id" etc. + initializer(nh) + } + + if err := nh.Sync(ctx, db, tx); err != nil { + return nil, errors.Wrapf(err, "cannot insert pending notification history for %q", contact.String()) + } + + notifications[contact] = append(notifications[contact], &Entry{ + HistoryRowID: nh.ID, + ContactID: contact.ID, + ChannelID: chID, + State: nh.NotificationState, + }) + } + } + + return notifications, nil +} diff --git a/internal/notification/state.go b/internal/notification/state.go new file mode 100644 index 00000000..b5f081c2 --- /dev/null +++ b/internal/notification/state.go @@ -0,0 +1,71 @@ +package notification + +import ( + "database/sql/driver" + "fmt" +) + +type State int + +const ( + StateNull State = iota + StateSuppressed + StatePending + StateSent + StateFailed +) + +var statTypeByName = map[string]State{ + "suppressed": StateSuppressed, + "pending": StatePending, + "sent": StateSent, + "failed": StateFailed, +} + +var stateTypeToName = func() map[State]string { + stateTypes := make(map[State]string) + for name, eventType := range statTypeByName { + stateTypes[eventType] = name + } + return stateTypes +}() + +// Scan implements the sql.Scanner interface. +// Supports SQL NULL. +func (n *State) Scan(src any) error { + if src == nil { + *n = StateNull + return nil + } + + var name string + switch val := src.(type) { + case string: + name = val + case []byte: + name = string(val) + default: + return fmt.Errorf("unable to scan type %T into NotificationState", src) + } + + historyType, ok := statTypeByName[name] + if !ok { + return fmt.Errorf("unknown notification state type %q", name) + } + + *n = historyType + + return nil +} + +func (n State) Value() (driver.Value, error) { + if n == StateNull { + return nil, nil + } + + return n.String(), nil +} + +func (n *State) String() string { + return stateTypeToName[*n] +}