diff --git a/go.mod b/go.mod index 053eb0562..2b22cc1d0 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/teambition/rrule-go v1.8.2 go.uber.org/zap v1.23.0 golang.org/x/exp v0.0.0-20220613132600-b0d781184e0d + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c ) require ( @@ -30,7 +31,6 @@ require ( github.com/ssgreg/journald v1.0.0 // indirect go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect - golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect golang.org/x/sys v0.1.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/internal/incident/db_types.go b/internal/incident/db_types.go index 4ef2bdaea..8749a1ef0 100644 --- a/internal/incident/db_types.go +++ b/internal/incident/db_types.go @@ -1,6 +1,7 @@ package incident import ( + "context" "fmt" "github.com/icinga/icinga-notifications/internal/event" "github.com/icinga/icinga-notifications/internal/recipient" @@ -33,15 +34,15 @@ func (i *IncidentRow) Upsert() interface{} { // Sync synchronizes incidents to the database. // Fetches the last inserted incident id and modifies this incident's id. // Returns an error on database failure. -func (i *IncidentRow) Sync(tx *sqlx.Tx, db *icingadb.DB, upsert bool) error { +func (i *IncidentRow) Sync(ctx context.Context, tx *sqlx.Tx, db *icingadb.DB, upsert bool) error { if upsert { stmt, _ := db.BuildUpsertStmt(i) - _, err := tx.NamedExec(stmt, i) + _, err := tx.NamedExecContext(ctx, stmt, i) if err != nil { return fmt.Errorf("failed to upsert incident: %s", err) } } else { - incidentId, err := utils.InsertAndFetchId(tx, utils.BuildInsertStmtWithout(db, i, "id"), i) + incidentId, err := utils.InsertAndFetchId(ctx, tx, utils.BuildInsertStmtWithout(db, i, "id"), i) if err != nil { return err } diff --git a/internal/incident/history.go b/internal/incident/history.go index 7e0d14e9e..b9b54cc03 100644 --- a/internal/incident/history.go +++ b/internal/incident/history.go @@ -1,6 +1,7 @@ package incident import ( + "context" "fmt" "github.com/icinga/icinga-notifications/internal/event" "github.com/icinga/icinga-notifications/internal/recipient" @@ -14,7 +15,7 @@ import ( // Sync initiates an *incident.IncidentRow from the current incident state and syncs it with the database. // Before syncing any incident related database entries, this method should be called at least once. // Returns an error on db failure. -func (i *Incident) Sync(tx *sqlx.Tx) error { +func (i *Incident) Sync(ctx context.Context, tx *sqlx.Tx) error { incidentRow := &IncidentRow{ ID: i.incidentRowID, ObjectID: i.Object.ID, @@ -23,7 +24,7 @@ func (i *Incident) Sync(tx *sqlx.Tx) error { Severity: i.Severity(), } - err := incidentRow.Sync(tx, i.db, i.incidentRowID != 0) + err := incidentRow.Sync(ctx, tx, i.db, i.incidentRowID != 0) if err != nil { return err } @@ -33,19 +34,19 @@ func (i *Incident) Sync(tx *sqlx.Tx) error { return nil } -func (i *Incident) AddHistory(tx *sqlx.Tx, historyRow *HistoryRow, fetchId bool) (types.Int, error) { +func (i *Incident) AddHistory(ctx context.Context, tx *sqlx.Tx, historyRow *HistoryRow, fetchId bool) (types.Int, error) { historyRow.IncidentID = i.incidentRowID stmt := utils.BuildInsertStmtWithout(i.db, historyRow, "id") if fetchId { - historyId, err := utils.InsertAndFetchId(tx, stmt, historyRow) + historyId, err := utils.InsertAndFetchId(ctx, tx, stmt, historyRow) if err != nil { return types.Int{}, err } return utils.ToDBInt(historyId), nil } else { - _, err := tx.NamedExec(stmt, historyRow) + _, err := tx.NamedExecContext(ctx, stmt, historyRow) if err != nil { return types.Int{}, err } @@ -54,30 +55,30 @@ func (i *Incident) AddHistory(tx *sqlx.Tx, historyRow *HistoryRow, fetchId bool) return types.Int{}, nil } -func (i *Incident) AddEscalationTriggered(tx *sqlx.Tx, state *EscalationState, hr *HistoryRow) (types.Int, error) { +func (i *Incident) AddEscalationTriggered(ctx context.Context, tx *sqlx.Tx, state *EscalationState, hr *HistoryRow) (types.Int, error) { state.IncidentID = i.incidentRowID stmt, _ := i.db.BuildUpsertStmt(state) - _, err := tx.NamedExec(stmt, state) + _, err := tx.NamedExecContext(ctx, stmt, state) if err != nil { return types.Int{}, err } - return i.AddHistory(tx, hr, true) + return i.AddHistory(ctx, tx, hr, true) } // AddEvent Inserts incident history record to the database and returns an error on db failure. -func (i *Incident) AddEvent(tx *sqlx.Tx, ev *event.Event) error { +func (i *Incident) AddEvent(ctx context.Context, tx *sqlx.Tx, ev *event.Event) error { ie := &EventRow{IncidentID: i.incidentRowID, EventID: ev.ID} stmt, _ := i.db.BuildInsertStmt(ie) - _, err := tx.NamedExec(stmt, ie) + _, err := tx.NamedExecContext(ctx, stmt, ie) return err } // AddRecipient adds recipient from the given *rule.Escalation to this incident. // Syncs also all the recipients with the database and returns an error on db failure. -func (i *Incident) AddRecipient(tx *sqlx.Tx, escalation *rule.Escalation, eventId int64) error { +func (i *Incident) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *rule.Escalation, eventId int64) error { newRole := RoleRecipient if i.HasManager() { newRole = RoleSubscriber @@ -110,7 +111,7 @@ func (i *Incident) AddRecipient(tx *sqlx.Tx, escalation *rule.Escalation, eventI OldRecipientRole: oldRole, } - _, err := i.AddHistory(tx, hr, false) + _, err := i.AddHistory(ctx, tx, hr, false) if err != nil { return err } @@ -119,7 +120,7 @@ func (i *Incident) AddRecipient(tx *sqlx.Tx, escalation *rule.Escalation, eventI } stmt, _ := i.db.BuildUpsertStmt(cr) - _, err := tx.NamedExec(stmt, cr) + _, err := tx.NamedExecContext(ctx, stmt, cr) if err != nil { return fmt.Errorf("failed to upsert incident contact %s: %w", r, err) } @@ -130,18 +131,18 @@ func (i *Incident) AddRecipient(tx *sqlx.Tx, escalation *rule.Escalation, eventI // AddRuleMatchedHistory syncs the given *rule.Rule and history entry to the database. // Returns an error on database failure. -func (i *Incident) AddRuleMatchedHistory(tx *sqlx.Tx, r *rule.Rule, hr *HistoryRow) (types.Int, error) { +func (i *Incident) AddRuleMatchedHistory(ctx context.Context, tx *sqlx.Tx, r *rule.Rule, hr *HistoryRow) (types.Int, error) { rr := &RuleRow{IncidentID: i.incidentRowID, RuleID: r.ID} stmt, _ := i.db.BuildUpsertStmt(rr) - _, err := tx.NamedExec(stmt, rr) + _, err := tx.NamedExecContext(ctx, stmt, rr) if err != nil { return types.Int{}, err } - return i.AddHistory(tx, hr, true) + return i.AddHistory(ctx, tx, hr, true) } -func (i *Incident) AddSourceSeverity(tx *sqlx.Tx, severity event.Severity, sourceID int64) error { +func (i *Incident) AddSourceSeverity(ctx context.Context, tx *sqlx.Tx, severity event.Severity, sourceID int64) error { i.SeverityBySource[sourceID] = severity sourceSeverity := &SourceSeverity{ @@ -151,7 +152,7 @@ func (i *Incident) AddSourceSeverity(tx *sqlx.Tx, severity event.Severity, sourc } stmt, _ := i.db.BuildUpsertStmt(sourceSeverity) - _, err := tx.NamedExec(stmt, sourceSeverity) + _, err := tx.NamedExecContext(ctx, stmt, sourceSeverity) return err } diff --git a/internal/incident/incident.go b/internal/incident/incident.go index 797acb04b..c98271815 100644 --- a/internal/incident/incident.go +++ b/internal/incident/incident.go @@ -1,6 +1,7 @@ package incident import ( + "context" "errors" "fmt" "github.com/icinga/icinga-notifications/internal/config" @@ -77,7 +78,7 @@ func (i *Incident) HasManager() bool { } // ProcessEvent processes the given event for the current incident. -func (i *Incident) ProcessEvent(tx *sqlx.Tx, ev event.Event, icingaweb2Url string, created bool) error { +func (i *Incident) ProcessEvent(ctx context.Context, tx *sqlx.Tx, ev event.Event, icingaweb2Url string, created bool) error { i.Lock() i.runtimeConfig.RLock() @@ -88,18 +89,28 @@ func (i *Incident) ProcessEvent(tx *sqlx.Tx, ev event.Event, icingaweb2Url strin i.Icingaweb2Url = icingaweb2Url - if err := i.processIncidentAndSourceSeverity(tx, ev, created); err != nil { + err := i.Object.UpdateMetadata(ctx, tx, ev.SourceId, ev.Name, utils.ToDBString(ev.URL), ev.ExtraTags) + if err != nil { + i.logger.Errorw("can't update object metadata", zap.String("object", i.Object.DisplayName()), zap.Error(err)) + + return errors.New("can't update object metadata") + } + + if err := i.processIncidentAndSourceSeverity(ctx, tx, ev, created); err != nil { return err } if ev.Type == event.TypeAcknowledgement { + // The current request has been processed successfully, so update the in-memory objects cache. + i.Object.UpdateCache() + // Ack events must not trigger escalations! return nil } // Check if any (additional) rules match this object. Filters of rules that already have a state don't have // to be checked again, these rules already matched and stay effective for the ongoing incident. - if err := i.evaluateRules(tx, ev.ID); err != nil { + if err := i.evaluateRules(ctx, tx, ev.ID); err != nil { return err } @@ -108,20 +119,21 @@ func (i *Incident) ProcessEvent(tx *sqlx.Tx, ev event.Event, icingaweb2Url strin return err } - return i.notifyContacts(tx, &ev) -} - -func (i *Incident) processIncidentAndSourceSeverity(tx *sqlx.Tx, ev event.Event, created bool) error { - err := i.Object.UpdateMetadata(tx, ev.SourceId, ev.Name, utils.ToDBString(ev.URL), ev.ExtraTags) + err = i.notifyContacts(ctx, tx, &ev) if err != nil { - i.logger.Errorw("can't update object metadata", zap.String("object", i.Object.DisplayName()), zap.Error(err)) - - return errors.New("can't update object metadata") + return err } + // The current request has been processed successfully, so update the in-memory objects cache. + i.Object.UpdateCache() + + return nil +} + +func (i *Incident) processIncidentAndSourceSeverity(ctx context.Context, tx *sqlx.Tx, ev event.Event, created bool) error { if ev.ID == 0 { eventRow := event.NewEventRow(ev, i.Object.ID) - eventID, err := utils.InsertAndFetchId(tx, utils.BuildInsertStmtWithout(i.db, eventRow, "id"), eventRow) + eventID, err := utils.InsertAndFetchId(ctx, tx, utils.BuildInsertStmtWithout(i.db, eventRow, "id"), eventRow) if err != nil { i.logger.Errorw("can't insert event and fetch its ID", zap.Error(err)) @@ -133,7 +145,7 @@ func (i *Incident) processIncidentAndSourceSeverity(tx *sqlx.Tx, ev event.Event, if created { i.StartedAt = ev.Time - if err := i.Sync(tx); err != nil { + if err := i.Sync(ctx, tx); err != nil { i.logger.Errorw("can't insert incident to the database", zap.Error(err)) return errors.New("can't insert incident to the database") @@ -146,21 +158,21 @@ func (i *Incident) processIncidentAndSourceSeverity(tx *sqlx.Tx, ev event.Event, EventID: utils.ToDBInt(ev.ID), } - if _, err := i.AddHistory(tx, historyRow, false); err != nil { + if _, err := i.AddHistory(ctx, tx, historyRow, false); err != nil { i.logger.Errorw("can't insert incident opened history event", zap.String("incident", i.String()), zap.Error(err)) return errors.New("can't insert incident opened history event") } } - if err := i.AddEvent(tx, &ev); err != nil { + if err := i.AddEvent(ctx, tx, &ev); err != nil { i.logger.Errorw("can't insert incident event to the database", zap.String("incident", i.String()), zap.Error(err)) return errors.New("can't insert incident event to the database") } if ev.Type == event.TypeAcknowledgement { - return i.processAcknowledgmentEvent(tx, ev) + return i.processAcknowledgmentEvent(ctx, tx, ev) } oldIncidentSeverity := i.Severity() @@ -189,7 +201,7 @@ func (i *Incident) processIncidentAndSourceSeverity(tx *sqlx.Tx, ev event.Event, OldSeverity: oldSourceSeverity, Message: utils.ToDBString(ev.Message), } - causedByHistoryId, err := i.AddHistory(tx, history, true) + causedByHistoryId, err := i.AddHistory(ctx, tx, history, true) if err != nil { i.logger.Errorw("failed to insert source severity changed incident history", zap.Error(err)) @@ -198,7 +210,7 @@ func (i *Incident) processIncidentAndSourceSeverity(tx *sqlx.Tx, ev event.Event, i.causedByHistoryID = causedByHistoryId - if err = i.AddSourceSeverity(tx, ev.Severity, ev.SourceId); err != nil { + if err = i.AddSourceSeverity(ctx, tx, ev.Severity, ev.SourceId); err != nil { i.logger.Errorw("failed to update source severity", err) return errors.New("failed to update source severity") @@ -215,7 +227,7 @@ func (i *Incident) processIncidentAndSourceSeverity(tx *sqlx.Tx, ev event.Event, i.Object.DisplayName(), i.String(), oldIncidentSeverity.String(), newIncidentSeverity.String(), ) - if err = i.Sync(tx); err != nil { + if err = i.Sync(ctx, tx); err != nil { i.logger.Errorw("failed to update incident severity", zap.Error(err)) return errors.New("failed to update incident severity") @@ -229,7 +241,7 @@ func (i *Incident) processIncidentAndSourceSeverity(tx *sqlx.Tx, ev event.Event, OldSeverity: oldIncidentSeverity, CausedByIncidentHistoryID: i.causedByHistoryID, } - if causedByHistoryId, err = i.AddHistory(tx, history, true); err != nil { + if causedByHistoryId, err = i.AddHistory(ctx, tx, history, true); err != nil { i.logger.Errorw("failed to insert incident severity changed history", zap.Error(err)) return errors.New("failed to insert incident severity changed history") @@ -245,7 +257,7 @@ func (i *Incident) processIncidentAndSourceSeverity(tx *sqlx.Tx, ev event.Event, RemoveCurrent(i.Object) incidentRow := &IncidentRow{ID: i.incidentRowID, RecoveredAt: types.UnixMilli(i.RecoveredAt)} - _, err = tx.NamedExec(`UPDATE "incident" SET "recovered_at" = :recovered_at WHERE id = :id`, incidentRow) + _, err = tx.NamedExecContext(ctx, `UPDATE "incident" SET "recovered_at" = :recovered_at WHERE id = :id`, incidentRow) if err != nil { i.logger.Errorw("failed to close incident", zap.String("incident", i.String()), zap.Error(err)) @@ -258,7 +270,7 @@ func (i *Incident) processIncidentAndSourceSeverity(tx *sqlx.Tx, ev event.Event, Type: Closed, } - _, err = i.AddHistory(tx, history, false) + _, err = i.AddHistory(ctx, tx, history, false) if err != nil { i.logger.Errorw("can't insert incident closed history to the database", zap.String("incident", i.String()), zap.Error(err)) @@ -276,7 +288,7 @@ func (i *Incident) processIncidentAndSourceSeverity(tx *sqlx.Tx, ev event.Event, // evaluateRules evaluates all the configured rules for this *incident.Object and // generates history entries for each matched rule. // Returns error on database failure. -func (i *Incident) evaluateRules(tx *sqlx.Tx, eventID int64) error { +func (i *Incident) evaluateRules(ctx context.Context, tx *sqlx.Tx, eventID int64) error { for _, r := range i.runtimeConfig.Rules { if !r.IsActive.Valid || !r.IsActive.Bool { continue @@ -304,7 +316,7 @@ func (i *Incident) evaluateRules(tx *sqlx.Tx, eventID int64) error { Type: RuleMatched, CausedByIncidentHistoryID: i.causedByHistoryID, } - insertedID, err := i.AddRuleMatchedHistory(tx, r, history) + insertedID, err := i.AddRuleMatchedHistory(ctx, tx, r, history) if err != nil { i.logger.Errorw("failed to add incident rule matched history", zap.String("rule", r.Name), zap.Error(err)) @@ -368,7 +380,7 @@ func (i *Incident) evaluateEscalations() error { // notifyContacts evaluates the incident.EscalationState and checks if escalations need to be triggered // as well as builds the incident recipients along with their channel types and sends notifications based on that. // Returns error on database failure. -func (i *Incident) notifyContacts(tx *sqlx.Tx, ev *event.Event) error { +func (i *Incident) notifyContacts(ctx context.Context, tx *sqlx.Tx, ev *event.Event) error { managed := i.HasManager() contactChannels := make(map[*recipient.Contact]map[string]struct{}) @@ -396,7 +408,7 @@ func (i *Incident) notifyContacts(tx *sqlx.Tx, ev *event.Event) error { CausedByIncidentHistoryID: i.causedByHistoryID, } - causedByHistoryId, err := i.AddEscalationTriggered(tx, state, history) + causedByHistoryId, err := i.AddEscalationTriggered(ctx, tx, state, history) if err != nil { i.logger.Errorw("failed to add escalation triggered history", zap.String("escalation", escalation.Name), zap.Error(err)) @@ -405,7 +417,7 @@ func (i *Incident) notifyContacts(tx *sqlx.Tx, ev *event.Event) error { i.causedByHistoryID = causedByHistoryId - err = i.AddRecipient(tx, escalation, ev.ID) + err = i.AddRecipient(ctx, tx, escalation, ev.ID) if err != nil { i.logger.Errorw("failed to add incident recipients", zap.String("recipients", escalation.DisplayName()), zap.Error(err)) @@ -462,32 +474,37 @@ func (i *Incident) notifyContacts(tx *sqlx.Tx, ev *event.Event) error { CausedByIncidentHistoryID: i.causedByHistoryID, } - for chType := range channels { - i.logger.Infof("[%s %s] notify %q via %q", i.Object.DisplayName(), i.String(), contact.FullName, chType) + select { + case <-ctx.Done(): + return ctx.Err() + default: + for chType := range channels { + i.logger.Infof("[%s %s] notify %q via %q", i.Object.DisplayName(), i.String(), contact.FullName, chType) - hr.ChannelType = utils.ToDBString(chType) + hr.ChannelType = utils.ToDBString(chType) - _, err := i.AddHistory(tx, hr, false) - if err != nil { - i.logger.Errorln(err) - } + _, err := i.AddHistory(ctx, tx, hr, false) + if err != nil { + i.logger.Errorln(err) + } - chConf := i.runtimeConfig.Channels[chType] - if chConf == nil { - i.logger.Errorf("could not find config for channel type %q", chType) - continue - } + chConf := i.runtimeConfig.Channels[chType] + if chConf == nil { + i.logger.Errorf("could not find config for channel type %q", chType) + continue + } - plugin, err := chConf.GetPlugin() - if err != nil { - i.logger.Errorw("couldn't initialize channel", zap.String("type", chType), zap.Error(err)) - continue - } + plugin, err := chConf.GetPlugin() + if err != nil { + i.logger.Errorw("couldn't initialize channel", zap.String("type", chType), zap.Error(err)) + continue + } - err = plugin.Send(contact, i, ev, i.Icingaweb2Url) - if err != nil { - i.logger.Errorw("failed to send via channel", zap.String("type", chType), zap.Error(err)) - continue + err = plugin.Send(contact, i, ev, i.Icingaweb2Url) + if err != nil { + i.logger.Errorw("failed to send via channel", zap.String("type", chType), zap.Error(err)) + continue + } } } } @@ -498,7 +515,7 @@ func (i *Incident) notifyContacts(tx *sqlx.Tx, ev *event.Event) error { // processAcknowledgmentEvent processes the given ack event. // Promotes the ack author to incident.RoleManager if it's not already the case and generates a history entry. // Returns error on database failure. -func (i *Incident) processAcknowledgmentEvent(tx *sqlx.Tx, ev event.Event) error { +func (i *Incident) processAcknowledgmentEvent(ctx context.Context, tx *sqlx.Tx, ev event.Event) error { contact := i.runtimeConfig.GetContact(ev.Username) if contact == nil { return fmt.Errorf("unknown acknowledgment author %q", ev.Username) @@ -531,7 +548,7 @@ func (i *Incident) processAcknowledgmentEvent(tx *sqlx.Tx, ev event.Event) error Message: utils.ToDBString(ev.Message), } - _, err := i.AddHistory(tx, hr, false) + _, err := i.AddHistory(ctx, tx, hr, false) if err != nil { i.logger.Errorw( "failed to add recipient role changed history", zap.String("contact", contact.String()), @@ -544,7 +561,7 @@ func (i *Incident) processAcknowledgmentEvent(tx *sqlx.Tx, ev event.Event) error cr := &ContactRow{IncidentID: hr.IncidentID, Key: recipientKey, Role: newRole} stmt, _ := i.db.BuildUpsertStmt(cr) - _, err = tx.NamedExec(stmt, cr) + _, err = tx.NamedExecContext(ctx, stmt, cr) if err != nil { i.logger.Errorw("failed to upsert incident contact", zap.String("contact", contact.String()), zap.Error(err)) diff --git a/internal/incident/incidents.go b/internal/incident/incidents.go index 20e3f3163..eb0e548dc 100644 --- a/internal/incident/incidents.go +++ b/internal/incident/incidents.go @@ -1,6 +1,7 @@ package incident import ( + "context" "database/sql" "fmt" "github.com/icinga/icinga-notifications/internal/config" @@ -9,6 +10,7 @@ import ( "github.com/icinga/icinga-notifications/internal/recipient" "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/logging" + "golang.org/x/sync/errgroup" "sync" ) @@ -17,7 +19,7 @@ var ( currentIncidentsMu sync.Mutex ) -func GetCurrent(db *icingadb.DB, obj *object.Object, logger *logging.Logger, runtimeConfig *config.RuntimeConfig, create bool) (*Incident, bool, error) { +func GetCurrent(ctx context.Context, db *icingadb.DB, obj *object.Object, logger *logging.Logger, runtimeConfig *config.RuntimeConfig, create bool) (*Incident, bool, error) { currentIncidentsMu.Lock() defer currentIncidentsMu.Unlock() @@ -31,37 +33,50 @@ func GetCurrent(db *icingadb.DB, obj *object.Object, logger *logging.Logger, run incident.EscalationState = make(map[escalationID]*EscalationState) incident.Recipients = make(map[recipient.Key]*RecipientState) - err := db.QueryRowx(db.Rebind(db.BuildSelectStmt(ir, ir)+` WHERE "object_id" = ? AND "recovered_at" IS NULL`), obj.ID).StructScan(ir) + err := db.QueryRowxContext(ctx, db.Rebind(db.BuildSelectStmt(ir, ir)+` WHERE "object_id" = ? AND "recovered_at" IS NULL`), obj.ID).StructScan(ir) if err != nil && err != sql.ErrNoRows { return nil, false, fmt.Errorf("incident query failed with: %w", err) } else if err == nil { incident.incidentRowID = ir.ID incident.StartedAt = ir.StartedAt.Time() - sourceSeverity := &SourceSeverity{IncidentID: ir.ID} - var sources []SourceSeverity - err := db.Select( - &sources, - db.Rebind(db.BuildSelectStmt(sourceSeverity, sourceSeverity)+` WHERE "incident_id" = ? AND "severity" != ?`), - ir.ID, event.SeverityOK, - ) - if err != nil { - return nil, false, fmt.Errorf("failed to fetch incident sources Severity: %w", err) - } - - for _, source := range sources { - incident.SeverityBySource[source.SourceID] = source.Severity - } - - state := &EscalationState{} - var states []*EscalationState - err = db.Select(&states, db.Rebind(db.BuildSelectStmt(state, state)+` WHERE "incident_id" = ?`), ir.ID) - if err != nil { - return nil, false, fmt.Errorf("failed to fetch incident rule escalation state: %w", err) - } - - for _, state := range states { - incident.EscalationState[state.RuleEscalationID] = state + g, childCtx := errgroup.WithContext(ctx) + g.Go(func() error { + sourceSeverity := &SourceSeverity{IncidentID: ir.ID} + var sources []SourceSeverity + err := db.SelectContext( + childCtx, &sources, + db.Rebind(db.BuildSelectStmt(sourceSeverity, sourceSeverity)+` WHERE "incident_id" = ? AND "severity" != ?`), + ir.ID, event.SeverityOK, + ) + if err != nil { + return fmt.Errorf("failed to fetch incident sources Severity: %w", err) + } + + for _, source := range sources { + incident.SeverityBySource[source.SourceID] = source.Severity + } + + return childCtx.Err() + }) + + g.Go(func() error { + state := &EscalationState{} + var states []*EscalationState + err = db.SelectContext(childCtx, &states, db.Rebind(db.BuildSelectStmt(state, state)+` WHERE "incident_id" = ?`), ir.ID) + if err != nil { + return fmt.Errorf("failed to fetch incident rule escalation state: %w", err) + } + + for _, state := range states { + incident.EscalationState[state.RuleEscalationID] = state + } + + return childCtx.Err() + }) + + if err := g.Wait(); err != nil { + return nil, false, err } currentIncident = incident @@ -83,7 +98,7 @@ func GetCurrent(db *icingadb.DB, obj *object.Object, logger *logging.Logger, run contact := &ContactRow{} var contacts []*ContactRow - err := db.Select(&contacts, db.Rebind(db.BuildSelectStmt(contact, contact)+` WHERE "incident_id" = ?`), currentIncident.ID()) + err := db.SelectContext(ctx, &contacts, db.Rebind(db.BuildSelectStmt(contact, contact)+` WHERE "incident_id" = ?`), currentIncident.ID()) if err != nil { return nil, false, fmt.Errorf("failed to fetch incident recipients: %w", err) } diff --git a/internal/listener/listener.go b/internal/listener/listener.go index 18d5b8a12..f042b2b5e 100644 --- a/internal/listener/listener.go +++ b/internal/listener/listener.go @@ -1,7 +1,6 @@ package listener import ( - "context" "crypto/subtle" "encoding/json" "fmt" @@ -86,7 +85,8 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, req *http.Request) { } } - tx, err := l.db.BeginTxx(context.TODO(), nil) + ctx := req.Context() + tx, err := l.db.BeginTxx(ctx, nil) if err != nil { l.logger.Errorw("can't start a db transaction", zap.Error(err)) @@ -96,7 +96,7 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, req *http.Request) { } defer func() { _ = tx.Rollback() }() - obj, err := object.FromTags(l.db, tx, ev.Tags) + obj, err := object.FromTags(ctx, l.db, tx, ev.Tags) if err != nil { l.logger.Errorln(err) @@ -106,7 +106,7 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, req *http.Request) { } createIncident := ev.Severity != event.SeverityNone && ev.Severity != event.SeverityOK - currentIncident, created, err := incident.GetCurrent(l.db, obj, l.logs.GetChildLogger("incident"), l.runtimeConfig, createIncident) + currentIncident, created, err := incident.GetCurrent(ctx, l.db, obj, l.logs.GetChildLogger("incident"), l.runtimeConfig, createIncident) if err != nil { w.WriteHeader(http.StatusInternalServerError) _, _ = fmt.Fprintln(w, err) @@ -138,7 +138,7 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, req *http.Request) { l.logger.Infof("processing event") - if err := currentIncident.ProcessEvent(tx, ev, l.configFile.Icingaweb2URL, created); err != nil { + if err := currentIncident.ProcessEvent(ctx, tx, ev, l.configFile.Icingaweb2URL, created); err != nil { w.WriteHeader(http.StatusInternalServerError) _, _ = fmt.Fprintln(w, err) return diff --git a/internal/object/object.go b/internal/object/object.go index bea7b453c..147895857 100644 --- a/internal/object/object.go +++ b/internal/object/object.go @@ -2,6 +2,7 @@ package object import ( "bytes" + "context" "crypto/sha256" "encoding/hex" "encoding/json" @@ -31,7 +32,7 @@ var ( cacheMu sync.Mutex ) -func FromTags(db *icingadb.DB, tx *sqlx.Tx, tags map[string]string) (*Object, error) { +func FromTags(ctx context.Context, db *icingadb.DB, tx *sqlx.Tx, tags map[string]string) (*Object, error) { id := ID(tags) cacheMu.Lock() @@ -43,7 +44,6 @@ func FromTags(db *icingadb.DB, tx *sqlx.Tx, tags map[string]string) (*Object, er } object = &Object{ID: id, Tags: tags, db: db} - cache[id.String()] = object stmt, _ := object.db.BuildInsertIgnoreStmt(&ObjectRow{}) dbObj := &ObjectRow{ @@ -55,7 +55,7 @@ func FromTags(db *icingadb.DB, tx *sqlx.Tx, tags map[string]string) (*Object, er dbObj.Service = utils.ToDBString(service) } - _, err := tx.NamedExec(stmt, dbObj) + _, err := tx.NamedExecContext(ctx, stmt, dbObj) if err != nil { return nil, fmt.Errorf("failed to insert object: %s", err) } @@ -104,7 +104,16 @@ func (o *Object) String() string { return b.String() } -func (o *Object) UpdateMetadata(tx *sqlx.Tx, source int64, name string, url types.String, extraTags map[string]string) error { +func (o *Object) UpdateCache() { + cacheMu.Lock() + defer cacheMu.Unlock() + + cache[o.ID.String()] = o +} + +func (o *Object) UpdateMetadata( + ctx context.Context, tx *sqlx.Tx, source int64, name string, url types.String, extraTags map[string]string, +) error { o.mu.Lock() defer o.mu.Unlock() @@ -117,20 +126,20 @@ func (o *Object) UpdateMetadata(tx *sqlx.Tx, source int64, name string, url type } stmt, _ := o.db.BuildUpsertStmt(&SourceMetadata{}) - _, err := tx.NamedExec(stmt, sourceMetadata) + _, err := tx.NamedExecContext(ctx, stmt, sourceMetadata) if err != nil { return err } extraTag := &ExtraTagRow{ObjectId: o.ID, SourceId: source} - _, err = tx.NamedExec(`DELETE FROM "object_extra_tag" WHERE "object_id" = :object_id AND "source_id" = :source_id`, extraTag) + _, err = tx.NamedExecContext(ctx, `DELETE FROM "object_extra_tag" WHERE "object_id" = :object_id AND "source_id" = :source_id`, extraTag) if err != nil { return err } if len(extraTags) > 0 { stmt, _ = o.db.BuildInsertStmt(extraTag) - _, err = tx.NamedExec(stmt, sourceMetadata.mapToExtraTags()) + _, err = tx.NamedExecContext(ctx, stmt, sourceMetadata.mapToExtraTags()) if err != nil { return err } @@ -140,13 +149,7 @@ func (o *Object) UpdateMetadata(tx *sqlx.Tx, source int64, name string, url type o.Metadata = make(map[int64]*SourceMetadata) } - if m := o.Metadata[source]; m != nil { - m.Name = name - m.URL = url - m.ExtraTags = extraTags - } else { - o.Metadata[source] = sourceMetadata - } + o.Metadata[source] = sourceMetadata return nil } diff --git a/internal/utils/utils.go b/internal/utils/utils.go index 6b3a73a32..7bf6b53b0 100644 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -1,6 +1,7 @@ package utils import ( + "context" "database/sql" "fmt" "github.com/icinga/icingadb/pkg/driver" @@ -30,21 +31,21 @@ func BuildInsertStmtWithout(db *icingadb.DB, into interface{}, withoutColumn str } // InsertAndFetchId executes the given query and fetches the last inserted ID. -func InsertAndFetchId(tx *sqlx.Tx, stmt string, args any) (int64, error) { +func InsertAndFetchId(ctx context.Context, tx *sqlx.Tx, stmt string, args any) (int64, error) { var lastInsertId int64 if tx.DriverName() == driver.PostgreSQL { - preparedStmt, err := tx.PrepareNamed(stmt + " RETURNING id") + preparedStmt, err := tx.PrepareNamedContext(ctx, stmt+" RETURNING id") if err != nil { return 0, err } - defer preparedStmt.Close() + defer func() { _ = preparedStmt.Close() }() err = preparedStmt.Get(&lastInsertId, args) if err != nil { return 0, fmt.Errorf("failed to insert entry for type %T: %s", args, err) } } else { - result, err := tx.NamedExec(stmt, args) + result, err := tx.NamedExecContext(ctx, stmt, args) if err != nil { return 0, fmt.Errorf("failed to insert entry for type %T: %s", args, err) }