Skip to content

Commit

Permalink
Use sqlx.Tx for event processing
Browse files Browse the repository at this point in the history
  • Loading branch information
yhabteab committed May 25, 2023
1 parent 9125802 commit cb15ebe
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 119 deletions.
44 changes: 9 additions & 35 deletions internal/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"fmt"
"github.com/icinga/icinga-notifications/internal/utils"
"github.com/icinga/icingadb/pkg/icingadb"
"github.com/icinga/icingadb/pkg/types"
"time"
)
Expand Down Expand Up @@ -73,30 +72,6 @@ func (e *Event) FullString() string {
return b.String()
}

// Sync transforms this event to *event.EventRow and calls its sync method.
func (e *Event) Sync(db *icingadb.DB, objectId types.Binary) error {
if e.ID != 0 {
return nil
}

eventRow := &EventRow{
Time: types.UnixMilli(e.Time),
SourceID: e.SourceId,
ObjectID: objectId,
Type: utils.ToDBString(e.Type),
Severity: e.Severity,
Username: utils.ToDBString(e.Username),
Message: utils.ToDBString(e.Message),
}

err := eventRow.Sync(db)
if err == nil {
e.ID = eventRow.ID
}

return err
}

// EventRow represents a single event database row and isn't an in-memory representation of an event.
type EventRow struct {
ID int64 `db:"id"`
Expand All @@ -114,15 +89,14 @@ func (er *EventRow) TableName() string {
return "event"
}

// Sync synchronizes this types data to the database.
// Returns an error when any of the database operation fails.
func (er *EventRow) Sync(db *icingadb.DB) error {
eventId, err := utils.InsertAndFetchId(db, utils.BuildInsertStmtWithout(db, er, "id"), er)
if err != nil {
return err
func NewEventRow(e Event, objectId types.Binary) *EventRow {
return &EventRow{
Time: types.UnixMilli(e.Time),
SourceID: e.SourceId,
ObjectID: objectId,
Type: utils.ToDBString(e.Type),
Severity: e.Severity,
Username: utils.ToDBString(e.Username),
Message: utils.ToDBString(e.Message),
}

er.ID = eventId

return nil
}
7 changes: 4 additions & 3 deletions internal/incident/db_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/icinga/icinga-notifications/internal/utils"
"github.com/icinga/icingadb/pkg/icingadb"
"github.com/icinga/icingadb/pkg/types"
"github.com/jmoiron/sqlx"
)

type IncidentRow struct {
Expand All @@ -32,15 +33,15 @@ func (i *IncidentRow) Upsert() interface{} {
// Sync synchronizes incidents to the database.
// Fetches the last inserted incident id and modifies this incident's id.
// Returns an error on database failure.
func (i *IncidentRow) Sync(db *icingadb.DB, upsert bool) error {
func (i *IncidentRow) Sync(tx *sqlx.Tx, db *icingadb.DB, upsert bool) error {
if upsert {
stmt, _ := db.BuildUpsertStmt(i)
_, err := db.NamedExec(stmt, i)
_, err := tx.NamedExec(stmt, i)
if err != nil {
return fmt.Errorf("failed to upsert incident: %s", err)
}
} else {
incidentId, err := utils.InsertAndFetchId(db, utils.BuildInsertStmtWithout(db, i, "id"), i)
incidentId, err := utils.InsertAndFetchId(tx, utils.BuildInsertStmtWithout(db, i, "id"), i)
if err != nil {
return err
}
Expand Down
78 changes: 51 additions & 27 deletions internal/incident/incident.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/icinga/icingadb/pkg/icingadb"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/types"
"github.com/jmoiron/sqlx"
"go.uber.org/zap"
"sync"
"time"
Expand Down Expand Up @@ -74,21 +75,43 @@ func (i *Incident) HasManager() bool {
}

// ProcessEvent processes the given event for the current incident.
func (i *Incident) ProcessEvent(ev event.Event, created bool) error {
func (i *Incident) ProcessEvent(tx *sqlx.Tx, ev event.Event, created bool) error {
i.Lock()
defer i.Unlock()

i.runtimeConfig.RLock()
defer i.runtimeConfig.RUnlock()

err := i.Object.UpdateMetadata(tx, ev.SourceId, ev.Name, utils.ToDBString(ev.URL), ev.ExtraTags)
if err != nil {
i.logger.Errorw("can't update object metadata", zap.String("object", i.Object.DisplayName()), zap.Error(err))

return errors.New("can't update object metadata")
}

if ev.ID == 0 {
eventRow := event.NewEventRow(ev, i.Object.ID)
eventID, err := utils.InsertAndFetchId(tx, utils.BuildInsertStmtWithout(i.db, eventRow, "id"), eventRow)
if err != nil {
i.logger.Errorw(
"failed insert event and fetch its ID", zap.String("object", i.ObjectDisplayName()),
zap.String("incident", i.String()), zap.Error(err),
)

return errors.New("can't insert event and fetch its ID")
}

ev.ID = eventID
}

if created {
err := i.processIncidentOpenedEvent(ev)
err := i.processIncidentOpenedEvent(tx, ev)
if err != nil {
return err
}
}

if err := i.AddEvent(&ev); err != nil {
if err := i.AddEvent(tx, &ev); err != nil {
i.logger.Errorw(
"can't insert incident event to the database", zap.String("object", i.ObjectDisplayName()),
zap.String("incident", i.String()), zap.Error(err),
Expand All @@ -98,28 +121,28 @@ func (i *Incident) ProcessEvent(ev event.Event, created bool) error {
}

if ev.Type == event.TypeAcknowledgement {
return i.processAcknowledgementEvent(ev)
return i.processAcknowledgementEvent(tx, ev)
}

causedBy, err := i.processSeverityChangedEvent(ev)
causedBy, err := i.processSeverityChangedEvent(tx, ev)
if err != nil {
return err
}

// Check if any (additional) rules match this object. Filters of rules that already have a state don't have
// to be checked again, these rules already matched and stay effective for the ongoing incident.
causedBy, err = i.evaluateRules(ev.ID, causedBy)
causedBy, err = i.evaluateRules(tx, ev.ID, causedBy)
if err != nil {
return err
}

// Re-evaluate escalations based on the newly evaluated rules.
i.evaluateEscalations()

return i.notifyContacts(&ev, causedBy)
return i.notifyContacts(tx, &ev, causedBy)
}

func (i *Incident) processSeverityChangedEvent(ev event.Event) (types.Int, error) {
func (i *Incident) processSeverityChangedEvent(tx *sqlx.Tx, ev event.Event) (types.Int, error) {
oldIncidentSeverity := i.Severity()
oldSourceSeverity := i.SeverityBySource[ev.SourceId]
if oldSourceSeverity == event.SeverityNone {
Expand All @@ -146,7 +169,7 @@ func (i *Incident) processSeverityChangedEvent(ev event.Event) (types.Int, error
OldSeverity: oldSourceSeverity,
Message: utils.ToDBString(ev.Message),
}
causedByHistoryId, err := i.AddHistory(history, true)
causedByHistoryId, err := i.AddHistory(tx, history, true)
if err != nil {
i.logger.Errorw(
"can't insert source severity changed history to the database", zap.String("object", i.ObjectDisplayName()),
Expand All @@ -156,7 +179,7 @@ func (i *Incident) processSeverityChangedEvent(ev event.Event) (types.Int, error
return types.Int{}, errors.New("can't insert source severity changed history to the database")
}

if err = i.AddSourceSeverity(ev.Severity, ev.SourceId); err != nil {
if err = i.AddSourceSeverity(tx, ev.Severity, ev.SourceId); err != nil {
i.logger.Errorw(
"failed to upsert source severity", zap.String("object", i.ObjectDisplayName()), zap.String("incident", i.String()),
zap.Error(err),
Expand All @@ -176,7 +199,7 @@ func (i *Incident) processSeverityChangedEvent(ev event.Event) (types.Int, error
zap.String("object", i.ObjectDisplayName()), zap.String("incident", i.String()),
)

if err = i.Sync(); err != nil {
if err = i.Sync(tx); err != nil {
i.logger.Errorw(
"failed to update incident severity", zap.String("object", i.ObjectDisplayName()), zap.String("incident", i.String()),
zap.Error(err),
Expand All @@ -193,7 +216,7 @@ func (i *Incident) processSeverityChangedEvent(ev event.Event) (types.Int, error
OldSeverity: oldIncidentSeverity,
CausedByIncidentHistoryID: causedByHistoryId,
}
if causedByHistoryId, err = i.AddHistory(history, true); err != nil {
if causedByHistoryId, err = i.AddHistory(tx, history, true); err != nil {
i.logger.Errorw(
"failed to insert incident severity changed history", zap.String("object", i.ObjectDisplayName()),
zap.String("incident", i.String()), zap.Error(err),
Expand All @@ -210,7 +233,7 @@ func (i *Incident) processSeverityChangedEvent(ev event.Event) (types.Int, error
RemoveCurrent(i.Object)

incidentRow := &IncidentRow{ID: i.incidentRowID, RecoveredAt: types.UnixMilli(i.RecoveredAt)}
_, err = i.db.NamedExec(`UPDATE "incident" SET "recovered_at" = :recovered_at WHERE id = :id`, incidentRow)
_, err = tx.NamedExec(`UPDATE "incident" SET "recovered_at" = :recovered_at WHERE id = :id`, incidentRow)
if err != nil {
i.logger.Errorw(
"failed to close incident", zap.String("object", i.ObjectDisplayName()), zap.String("incident", i.String()),
Expand All @@ -225,7 +248,8 @@ func (i *Incident) processSeverityChangedEvent(ev event.Event) (types.Int, error
Time: types.UnixMilli(i.RecoveredAt),
Type: Closed,
}
_, err = i.AddHistory(history, false)

_, err = i.AddHistory(tx, history, false)
if err != nil {
i.logger.Errorw(
"can't insert incident closed history to the database", zap.String("object", i.ObjectDisplayName()),
Expand All @@ -239,9 +263,9 @@ func (i *Incident) processSeverityChangedEvent(ev event.Event) (types.Int, error
return causedByHistoryId, nil
}

func (i *Incident) processIncidentOpenedEvent(ev event.Event) error {
func (i *Incident) processIncidentOpenedEvent(tx *sqlx.Tx, ev event.Event) error {
i.StartedAt = ev.Time
if err := i.Sync(); err != nil {
if err := i.Sync(tx); err != nil {
i.logger.Errorw(
"can't insert incident to the database", zap.String("incident", i.String()),
zap.String("object", i.ObjectDisplayName()), zap.Error(err),
Expand All @@ -257,7 +281,7 @@ func (i *Incident) processIncidentOpenedEvent(ev event.Event) error {
EventID: utils.ToDBInt(ev.ID),
}

if _, err := i.AddHistory(historyRow, false); err != nil {
if _, err := i.AddHistory(tx, historyRow, false); err != nil {
i.logger.Errorw(
"can't insert incident opened history event", zap.String("object", i.ObjectDisplayName()),
zap.String("incident", i.String()), zap.Error(err),
Expand All @@ -272,7 +296,7 @@ func (i *Incident) processIncidentOpenedEvent(ev event.Event) error {
// evaluateRules evaluates all the configured rules for this *incident.Object and
// generates history entries for each matched rule.
// Returns error on database failure.
func (i *Incident) evaluateRules(eventID int64, causedBy types.Int) (types.Int, error) {
func (i *Incident) evaluateRules(tx *sqlx.Tx, eventID int64, causedBy types.Int) (types.Int, error) {
if i.Rules == nil {
i.Rules = make(map[int64]struct{})
}
Expand Down Expand Up @@ -309,7 +333,7 @@ func (i *Incident) evaluateRules(eventID int64, causedBy types.Int) (types.Int,
Type: RuleMatched,
CausedByIncidentHistoryID: causedBy,
}
insertedID, err := i.AddRuleMatchedHistory(r, history)
insertedID, err := i.AddRuleMatchedHistory(tx, r, history)
if err != nil {
i.logger.Errorw(
"failed to add incident rule matched history", zap.String("rule", r.Name), zap.String("object", i.ObjectDisplayName()),
Expand Down Expand Up @@ -379,7 +403,7 @@ func (i *Incident) evaluateEscalations() {
// notifyContacts evaluates the incident.EscalationState and checks if escalations need to be triggered
// as well as builds the incident recipients along with their channel types and sends notifications based on that.
// Returns error on database failure.
func (i *Incident) notifyContacts(ev *event.Event, causedBy types.Int) error {
func (i *Incident) notifyContacts(tx *sqlx.Tx, ev *event.Event, causedBy types.Int) error {
managed := i.HasManager()

contactChannels := make(map[*recipient.Contact]map[string]struct{})
Expand Down Expand Up @@ -414,7 +438,7 @@ func (i *Incident) notifyContacts(ev *event.Event, causedBy types.Int) error {
CausedByIncidentHistoryID: causedBy,
}

causedByHistoryId, err := i.AddEscalationTriggered(state, history)
causedByHistoryId, err := i.AddEscalationTriggered(tx, state, history)
if err != nil {
i.logger.Errorw(
"failed to add escalation triggered history", zap.String("incident", i.String()),
Expand All @@ -427,7 +451,7 @@ func (i *Incident) notifyContacts(ev *event.Event, causedBy types.Int) error {

causedBy = causedByHistoryId

err = i.AddRecipient(escalation, ev.ID)
err = i.AddRecipient(tx, escalation, ev.ID)
if err != nil {
i.logger.Errorw(
"failed to add incident recipients", zap.String("object", i.ObjectDisplayName()), zap.String("incident", i.String()),
Expand Down Expand Up @@ -496,7 +520,7 @@ func (i *Incident) notifyContacts(ev *event.Event, causedBy types.Int) error {

hr.ChannelType = utils.ToDBString(chType)

_, err := i.AddHistory(hr, false)
_, err := i.AddHistory(tx, hr, false)
if err != nil {
i.logger.Errorln(err)
}
Expand Down Expand Up @@ -533,10 +557,10 @@ func (i *Incident) notifyContacts(ev *event.Event, causedBy types.Int) error {
return nil
}

// processAcknowledgmentEvent processes the given ack event.
// processAcknowledgementEvent processes the given ack event.
// Promotes the ack author to incident.RoleManager if it's not already the case and generates a history entry.
// Returns error on database failure.
func (i *Incident) processAcknowledgementEvent(ev event.Event) error {
func (i *Incident) processAcknowledgementEvent(tx *sqlx.Tx, ev event.Event) error {
contact := i.runtimeConfig.GetContact(ev.Username)
if contact == nil {
i.logger.Warnw(
Expand Down Expand Up @@ -577,7 +601,7 @@ func (i *Incident) processAcknowledgementEvent(ev event.Event) error {
Message: utils.ToDBString(ev.Message),
}

_, err := i.AddHistory(hr, false)
_, err := i.AddHistory(tx, hr, false)
if err != nil {
i.logger.Errorw(
"failed to add recipient role changed history", zap.String("recipient", contact.String()),
Expand All @@ -590,7 +614,7 @@ func (i *Incident) processAcknowledgementEvent(ev event.Event) error {
cr := &ContactRow{IncidentID: hr.IncidentID, Key: recipientKey, Role: newRole}

stmt, _ := i.db.BuildUpsertStmt(cr)
_, err = i.db.NamedExec(stmt, cr)
_, err = tx.NamedExec(stmt, cr)
if err != nil {
i.logger.Errorw(
"failed to upsert incident contact", zap.String("contact", contact.String()), zap.String("object", i.ObjectDisplayName()),
Expand Down
Loading

0 comments on commit cb15ebe

Please sign in to comment.