From cb15ebe8941c9e046e3932443681b7743718e858 Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Wed, 17 May 2023 16:49:58 +0200 Subject: [PATCH] Use `sqlx.Tx` for event processing --- internal/event/event.go | 44 ++++---------------- internal/incident/db_types.go | 7 ++-- internal/incident/incident.go | 78 +++++++++++++++++++++++------------ internal/incident/sync.go | 37 +++++++++-------- internal/listener/listener.go | 32 +++++++------- internal/object/object.go | 26 ++++-------- internal/utils/utils.go | 9 ++-- 7 files changed, 114 insertions(+), 119 deletions(-) diff --git a/internal/event/event.go b/internal/event/event.go index c8c94f4cc..e2a99c0f9 100644 --- a/internal/event/event.go +++ b/internal/event/event.go @@ -4,7 +4,6 @@ import ( "bytes" "fmt" "github.com/icinga/icinga-notifications/internal/utils" - "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/types" "time" ) @@ -73,30 +72,6 @@ func (e *Event) FullString() string { return b.String() } -// Sync transforms this event to *event.EventRow and calls its sync method. -func (e *Event) Sync(db *icingadb.DB, objectId types.Binary) error { - if e.ID != 0 { - return nil - } - - eventRow := &EventRow{ - Time: types.UnixMilli(e.Time), - SourceID: e.SourceId, - ObjectID: objectId, - Type: utils.ToDBString(e.Type), - Severity: e.Severity, - Username: utils.ToDBString(e.Username), - Message: utils.ToDBString(e.Message), - } - - err := eventRow.Sync(db) - if err == nil { - e.ID = eventRow.ID - } - - return err -} - // EventRow represents a single event database row and isn't an in-memory representation of an event. type EventRow struct { ID int64 `db:"id"` @@ -114,15 +89,14 @@ func (er *EventRow) TableName() string { return "event" } -// Sync synchronizes this types data to the database. -// Returns an error when any of the database operation fails. -func (er *EventRow) Sync(db *icingadb.DB) error { - eventId, err := utils.InsertAndFetchId(db, utils.BuildInsertStmtWithout(db, er, "id"), er) - if err != nil { - return err +func NewEventRow(e Event, objectId types.Binary) *EventRow { + return &EventRow{ + Time: types.UnixMilli(e.Time), + SourceID: e.SourceId, + ObjectID: objectId, + Type: utils.ToDBString(e.Type), + Severity: e.Severity, + Username: utils.ToDBString(e.Username), + Message: utils.ToDBString(e.Message), } - - er.ID = eventId - - return nil } diff --git a/internal/incident/db_types.go b/internal/incident/db_types.go index 9dfa81de1..4ef2bdaea 100644 --- a/internal/incident/db_types.go +++ b/internal/incident/db_types.go @@ -7,6 +7,7 @@ import ( "github.com/icinga/icinga-notifications/internal/utils" "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/types" + "github.com/jmoiron/sqlx" ) type IncidentRow struct { @@ -32,15 +33,15 @@ func (i *IncidentRow) Upsert() interface{} { // Sync synchronizes incidents to the database. // Fetches the last inserted incident id and modifies this incident's id. // Returns an error on database failure. -func (i *IncidentRow) Sync(db *icingadb.DB, upsert bool) error { +func (i *IncidentRow) Sync(tx *sqlx.Tx, db *icingadb.DB, upsert bool) error { if upsert { stmt, _ := db.BuildUpsertStmt(i) - _, err := db.NamedExec(stmt, i) + _, err := tx.NamedExec(stmt, i) if err != nil { return fmt.Errorf("failed to upsert incident: %s", err) } } else { - incidentId, err := utils.InsertAndFetchId(db, utils.BuildInsertStmtWithout(db, i, "id"), i) + incidentId, err := utils.InsertAndFetchId(tx, utils.BuildInsertStmtWithout(db, i, "id"), i) if err != nil { return err } diff --git a/internal/incident/incident.go b/internal/incident/incident.go index dd1eca226..fce03e087 100644 --- a/internal/incident/incident.go +++ b/internal/incident/incident.go @@ -13,6 +13,7 @@ import ( "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/types" + "github.com/jmoiron/sqlx" "go.uber.org/zap" "sync" "time" @@ -74,21 +75,43 @@ func (i *Incident) HasManager() bool { } // ProcessEvent processes the given event for the current incident. -func (i *Incident) ProcessEvent(ev event.Event, created bool) error { +func (i *Incident) ProcessEvent(tx *sqlx.Tx, ev event.Event, created bool) error { i.Lock() defer i.Unlock() i.runtimeConfig.RLock() defer i.runtimeConfig.RUnlock() + err := i.Object.UpdateMetadata(tx, ev.SourceId, ev.Name, utils.ToDBString(ev.URL), ev.ExtraTags) + if err != nil { + i.logger.Errorw("can't update object metadata", zap.String("object", i.Object.DisplayName()), zap.Error(err)) + + return errors.New("can't update object metadata") + } + + if ev.ID == 0 { + eventRow := event.NewEventRow(ev, i.Object.ID) + eventID, err := utils.InsertAndFetchId(tx, utils.BuildInsertStmtWithout(i.db, eventRow, "id"), eventRow) + if err != nil { + i.logger.Errorw( + "failed insert event and fetch its ID", zap.String("object", i.ObjectDisplayName()), + zap.String("incident", i.String()), zap.Error(err), + ) + + return errors.New("can't insert event and fetch its ID") + } + + ev.ID = eventID + } + if created { - err := i.processIncidentOpenedEvent(ev) + err := i.processIncidentOpenedEvent(tx, ev) if err != nil { return err } } - if err := i.AddEvent(&ev); err != nil { + if err := i.AddEvent(tx, &ev); err != nil { i.logger.Errorw( "can't insert incident event to the database", zap.String("object", i.ObjectDisplayName()), zap.String("incident", i.String()), zap.Error(err), @@ -98,17 +121,17 @@ func (i *Incident) ProcessEvent(ev event.Event, created bool) error { } if ev.Type == event.TypeAcknowledgement { - return i.processAcknowledgementEvent(ev) + return i.processAcknowledgementEvent(tx, ev) } - causedBy, err := i.processSeverityChangedEvent(ev) + causedBy, err := i.processSeverityChangedEvent(tx, ev) if err != nil { return err } // Check if any (additional) rules match this object. Filters of rules that already have a state don't have // to be checked again, these rules already matched and stay effective for the ongoing incident. - causedBy, err = i.evaluateRules(ev.ID, causedBy) + causedBy, err = i.evaluateRules(tx, ev.ID, causedBy) if err != nil { return err } @@ -116,10 +139,10 @@ func (i *Incident) ProcessEvent(ev event.Event, created bool) error { // Re-evaluate escalations based on the newly evaluated rules. i.evaluateEscalations() - return i.notifyContacts(&ev, causedBy) + return i.notifyContacts(tx, &ev, causedBy) } -func (i *Incident) processSeverityChangedEvent(ev event.Event) (types.Int, error) { +func (i *Incident) processSeverityChangedEvent(tx *sqlx.Tx, ev event.Event) (types.Int, error) { oldIncidentSeverity := i.Severity() oldSourceSeverity := i.SeverityBySource[ev.SourceId] if oldSourceSeverity == event.SeverityNone { @@ -146,7 +169,7 @@ func (i *Incident) processSeverityChangedEvent(ev event.Event) (types.Int, error OldSeverity: oldSourceSeverity, Message: utils.ToDBString(ev.Message), } - causedByHistoryId, err := i.AddHistory(history, true) + causedByHistoryId, err := i.AddHistory(tx, history, true) if err != nil { i.logger.Errorw( "can't insert source severity changed history to the database", zap.String("object", i.ObjectDisplayName()), @@ -156,7 +179,7 @@ func (i *Incident) processSeverityChangedEvent(ev event.Event) (types.Int, error return types.Int{}, errors.New("can't insert source severity changed history to the database") } - if err = i.AddSourceSeverity(ev.Severity, ev.SourceId); err != nil { + if err = i.AddSourceSeverity(tx, ev.Severity, ev.SourceId); err != nil { i.logger.Errorw( "failed to upsert source severity", zap.String("object", i.ObjectDisplayName()), zap.String("incident", i.String()), zap.Error(err), @@ -176,7 +199,7 @@ func (i *Incident) processSeverityChangedEvent(ev event.Event) (types.Int, error zap.String("object", i.ObjectDisplayName()), zap.String("incident", i.String()), ) - if err = i.Sync(); err != nil { + if err = i.Sync(tx); err != nil { i.logger.Errorw( "failed to update incident severity", zap.String("object", i.ObjectDisplayName()), zap.String("incident", i.String()), zap.Error(err), @@ -193,7 +216,7 @@ func (i *Incident) processSeverityChangedEvent(ev event.Event) (types.Int, error OldSeverity: oldIncidentSeverity, CausedByIncidentHistoryID: causedByHistoryId, } - if causedByHistoryId, err = i.AddHistory(history, true); err != nil { + if causedByHistoryId, err = i.AddHistory(tx, history, true); err != nil { i.logger.Errorw( "failed to insert incident severity changed history", zap.String("object", i.ObjectDisplayName()), zap.String("incident", i.String()), zap.Error(err), @@ -210,7 +233,7 @@ func (i *Incident) processSeverityChangedEvent(ev event.Event) (types.Int, error RemoveCurrent(i.Object) incidentRow := &IncidentRow{ID: i.incidentRowID, RecoveredAt: types.UnixMilli(i.RecoveredAt)} - _, err = i.db.NamedExec(`UPDATE "incident" SET "recovered_at" = :recovered_at WHERE id = :id`, incidentRow) + _, err = tx.NamedExec(`UPDATE "incident" SET "recovered_at" = :recovered_at WHERE id = :id`, incidentRow) if err != nil { i.logger.Errorw( "failed to close incident", zap.String("object", i.ObjectDisplayName()), zap.String("incident", i.String()), @@ -225,7 +248,8 @@ func (i *Incident) processSeverityChangedEvent(ev event.Event) (types.Int, error Time: types.UnixMilli(i.RecoveredAt), Type: Closed, } - _, err = i.AddHistory(history, false) + + _, err = i.AddHistory(tx, history, false) if err != nil { i.logger.Errorw( "can't insert incident closed history to the database", zap.String("object", i.ObjectDisplayName()), @@ -239,9 +263,9 @@ func (i *Incident) processSeverityChangedEvent(ev event.Event) (types.Int, error return causedByHistoryId, nil } -func (i *Incident) processIncidentOpenedEvent(ev event.Event) error { +func (i *Incident) processIncidentOpenedEvent(tx *sqlx.Tx, ev event.Event) error { i.StartedAt = ev.Time - if err := i.Sync(); err != nil { + if err := i.Sync(tx); err != nil { i.logger.Errorw( "can't insert incident to the database", zap.String("incident", i.String()), zap.String("object", i.ObjectDisplayName()), zap.Error(err), @@ -257,7 +281,7 @@ func (i *Incident) processIncidentOpenedEvent(ev event.Event) error { EventID: utils.ToDBInt(ev.ID), } - if _, err := i.AddHistory(historyRow, false); err != nil { + if _, err := i.AddHistory(tx, historyRow, false); err != nil { i.logger.Errorw( "can't insert incident opened history event", zap.String("object", i.ObjectDisplayName()), zap.String("incident", i.String()), zap.Error(err), @@ -272,7 +296,7 @@ func (i *Incident) processIncidentOpenedEvent(ev event.Event) error { // evaluateRules evaluates all the configured rules for this *incident.Object and // generates history entries for each matched rule. // Returns error on database failure. -func (i *Incident) evaluateRules(eventID int64, causedBy types.Int) (types.Int, error) { +func (i *Incident) evaluateRules(tx *sqlx.Tx, eventID int64, causedBy types.Int) (types.Int, error) { if i.Rules == nil { i.Rules = make(map[int64]struct{}) } @@ -309,7 +333,7 @@ func (i *Incident) evaluateRules(eventID int64, causedBy types.Int) (types.Int, Type: RuleMatched, CausedByIncidentHistoryID: causedBy, } - insertedID, err := i.AddRuleMatchedHistory(r, history) + insertedID, err := i.AddRuleMatchedHistory(tx, r, history) if err != nil { i.logger.Errorw( "failed to add incident rule matched history", zap.String("rule", r.Name), zap.String("object", i.ObjectDisplayName()), @@ -379,7 +403,7 @@ func (i *Incident) evaluateEscalations() { // notifyContacts evaluates the incident.EscalationState and checks if escalations need to be triggered // as well as builds the incident recipients along with their channel types and sends notifications based on that. // Returns error on database failure. -func (i *Incident) notifyContacts(ev *event.Event, causedBy types.Int) error { +func (i *Incident) notifyContacts(tx *sqlx.Tx, ev *event.Event, causedBy types.Int) error { managed := i.HasManager() contactChannels := make(map[*recipient.Contact]map[string]struct{}) @@ -414,7 +438,7 @@ func (i *Incident) notifyContacts(ev *event.Event, causedBy types.Int) error { CausedByIncidentHistoryID: causedBy, } - causedByHistoryId, err := i.AddEscalationTriggered(state, history) + causedByHistoryId, err := i.AddEscalationTriggered(tx, state, history) if err != nil { i.logger.Errorw( "failed to add escalation triggered history", zap.String("incident", i.String()), @@ -427,7 +451,7 @@ func (i *Incident) notifyContacts(ev *event.Event, causedBy types.Int) error { causedBy = causedByHistoryId - err = i.AddRecipient(escalation, ev.ID) + err = i.AddRecipient(tx, escalation, ev.ID) if err != nil { i.logger.Errorw( "failed to add incident recipients", zap.String("object", i.ObjectDisplayName()), zap.String("incident", i.String()), @@ -496,7 +520,7 @@ func (i *Incident) notifyContacts(ev *event.Event, causedBy types.Int) error { hr.ChannelType = utils.ToDBString(chType) - _, err := i.AddHistory(hr, false) + _, err := i.AddHistory(tx, hr, false) if err != nil { i.logger.Errorln(err) } @@ -533,10 +557,10 @@ func (i *Incident) notifyContacts(ev *event.Event, causedBy types.Int) error { return nil } -// processAcknowledgmentEvent processes the given ack event. +// processAcknowledgementEvent processes the given ack event. // Promotes the ack author to incident.RoleManager if it's not already the case and generates a history entry. // Returns error on database failure. -func (i *Incident) processAcknowledgementEvent(ev event.Event) error { +func (i *Incident) processAcknowledgementEvent(tx *sqlx.Tx, ev event.Event) error { contact := i.runtimeConfig.GetContact(ev.Username) if contact == nil { i.logger.Warnw( @@ -577,7 +601,7 @@ func (i *Incident) processAcknowledgementEvent(ev event.Event) error { Message: utils.ToDBString(ev.Message), } - _, err := i.AddHistory(hr, false) + _, err := i.AddHistory(tx, hr, false) if err != nil { i.logger.Errorw( "failed to add recipient role changed history", zap.String("recipient", contact.String()), @@ -590,7 +614,7 @@ func (i *Incident) processAcknowledgementEvent(ev event.Event) error { cr := &ContactRow{IncidentID: hr.IncidentID, Key: recipientKey, Role: newRole} stmt, _ := i.db.BuildUpsertStmt(cr) - _, err = i.db.NamedExec(stmt, cr) + _, err = tx.NamedExec(stmt, cr) if err != nil { i.logger.Errorw( "failed to upsert incident contact", zap.String("contact", contact.String()), zap.String("object", i.ObjectDisplayName()), diff --git a/internal/incident/sync.go b/internal/incident/sync.go index b7498b068..d0f874afe 100644 --- a/internal/incident/sync.go +++ b/internal/incident/sync.go @@ -7,6 +7,7 @@ import ( "github.com/icinga/icinga-notifications/internal/rule" "github.com/icinga/icinga-notifications/internal/utils" "github.com/icinga/icingadb/pkg/types" + "github.com/jmoiron/sqlx" "go.uber.org/zap" "time" ) @@ -14,7 +15,7 @@ import ( // Sync initiates an *incident.IncidentRow from the current incident state and syncs it with the database. // Before syncing any incident related database entries, this method should be called at least once. // Returns an error on db failure. -func (i *Incident) Sync() error { +func (i *Incident) Sync(tx *sqlx.Tx) error { incidentRow := &IncidentRow{ ID: i.incidentRowID, ObjectID: i.Object.ID, @@ -23,7 +24,7 @@ func (i *Incident) Sync() error { Severity: i.Severity(), } - err := incidentRow.Sync(i.db, i.incidentRowID != 0) + err := incidentRow.Sync(tx, i.db, i.incidentRowID != 0) if err != nil { return err } @@ -33,19 +34,19 @@ func (i *Incident) Sync() error { return nil } -func (i *Incident) AddHistory(historyRow *HistoryRow, fetchId bool) (types.Int, error) { +func (i *Incident) AddHistory(tx *sqlx.Tx, historyRow *HistoryRow, fetchId bool) (types.Int, error) { historyRow.IncidentID = i.incidentRowID stmt := utils.BuildInsertStmtWithout(i.db, historyRow, "id") if fetchId { - historyId, err := utils.InsertAndFetchId(i.db, stmt, historyRow) + historyId, err := utils.InsertAndFetchId(tx, stmt, historyRow) if err != nil { return types.Int{}, err } return utils.ToDBInt(historyId), nil } else { - _, err := i.db.NamedExec(stmt, historyRow) + _, err := tx.NamedExec(stmt, historyRow) if err != nil { return types.Int{}, err } @@ -54,30 +55,30 @@ func (i *Incident) AddHistory(historyRow *HistoryRow, fetchId bool) (types.Int, return types.Int{}, nil } -func (i *Incident) AddEscalationTriggered(state *EscalationState, hr *HistoryRow) (types.Int, error) { +func (i *Incident) AddEscalationTriggered(tx *sqlx.Tx, state *EscalationState, hr *HistoryRow) (types.Int, error) { state.IncidentID = i.incidentRowID stmt, _ := i.db.BuildUpsertStmt(state) - _, err := i.db.NamedExec(stmt, state) + _, err := tx.NamedExec(stmt, state) if err != nil { return types.Int{}, err } - return i.AddHistory(hr, true) + return i.AddHistory(tx, hr, true) } // AddEvent Inserts incident history record to the database and returns an error on db failure. -func (i *Incident) AddEvent(ev *event.Event) error { +func (i *Incident) AddEvent(tx *sqlx.Tx, ev *event.Event) error { ie := &EventRow{IncidentID: i.incidentRowID, EventID: ev.ID} stmt, _ := i.db.BuildInsertStmt(ie) - _, err := i.db.NamedExec(stmt, ie) + _, err := tx.NamedExec(stmt, ie) return err } // AddRecipient adds recipient from the given *rule.Escalation to this incident. // Syncs also all the recipients with the database and returns an error on db failure. -func (i *Incident) AddRecipient(escalation *rule.Escalation, eventId int64) error { +func (i *Incident) AddRecipient(tx *sqlx.Tx, escalation *rule.Escalation, eventId int64) error { newRole := RoleRecipient if i.HasManager() { newRole = RoleSubscriber @@ -113,7 +114,7 @@ func (i *Incident) AddRecipient(escalation *rule.Escalation, eventId int64) erro OldRecipientRole: oldRole, } - _, err := i.AddHistory(hr, false) + _, err := i.AddHistory(tx, hr, false) if err != nil { return err } @@ -122,7 +123,7 @@ func (i *Incident) AddRecipient(escalation *rule.Escalation, eventId int64) erro } stmt, _ := i.db.BuildUpsertStmt(cr) - _, err := i.db.NamedExec(stmt, cr) + _, err := tx.NamedExec(stmt, cr) if err != nil { return fmt.Errorf("failed to upsert incident contact %s: %w", r, err) } @@ -133,18 +134,18 @@ func (i *Incident) AddRecipient(escalation *rule.Escalation, eventId int64) erro // AddRuleMatchedHistory syncs the given *rule.Rule and history entry to the database. // Returns an error on database failure. -func (i *Incident) AddRuleMatchedHistory(r *rule.Rule, hr *HistoryRow) (types.Int, error) { +func (i *Incident) AddRuleMatchedHistory(tx *sqlx.Tx, r *rule.Rule, hr *HistoryRow) (types.Int, error) { rr := &RuleRow{IncidentID: i.incidentRowID, RuleID: r.ID} stmt, _ := i.db.BuildUpsertStmt(rr) - _, err := i.db.NamedExec(stmt, rr) + _, err := tx.NamedExec(stmt, rr) if err != nil { return types.Int{}, err } - return i.AddHistory(hr, true) + return i.AddHistory(tx, hr, true) } -func (i *Incident) AddSourceSeverity(severity event.Severity, sourceID int64) error { +func (i *Incident) AddSourceSeverity(tx *sqlx.Tx, severity event.Severity, sourceID int64) error { i.SeverityBySource[sourceID] = severity sourceSeverity := &SourceSeverity{ @@ -154,7 +155,7 @@ func (i *Incident) AddSourceSeverity(severity event.Severity, sourceID int64) er } stmt, _ := i.db.BuildUpsertStmt(sourceSeverity) - _, err := i.db.NamedExec(stmt, sourceSeverity) + _, err := tx.NamedExec(stmt, sourceSeverity) return err } diff --git a/internal/listener/listener.go b/internal/listener/listener.go index d328d6d5b..c8dea4bb4 100644 --- a/internal/listener/listener.go +++ b/internal/listener/listener.go @@ -1,6 +1,7 @@ package listener import ( + "context" "crypto/subtle" "encoding/json" "fmt" @@ -8,7 +9,6 @@ import ( "github.com/icinga/icinga-notifications/internal/event" "github.com/icinga/icinga-notifications/internal/incident" "github.com/icinga/icinga-notifications/internal/object" - "github.com/icinga/icinga-notifications/internal/utils" "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/logging" "go.uber.org/zap" @@ -86,16 +86,17 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, req *http.Request) { } } - obj, err := object.FromTags(l.db, ev.Tags) + tx, err := l.db.BeginTxx(context.TODO(), nil) if err != nil { - l.logger.Errorln(err) + l.logger.Errorw("can't start a db transaction", zap.Error(err)) w.WriteHeader(http.StatusInternalServerError) - _, _ = fmt.Fprintln(w, err.Error()) + _, _ = fmt.Fprintln(w, "can't start a db transaction") return } + defer func() { _ = tx.Rollback() }() - err = obj.UpdateMetadata(ev.SourceId, ev.Name, utils.ToDBString(ev.URL), ev.ExtraTags) + obj, err := object.FromTags(l.db, tx, ev.Tags) if err != nil { l.logger.Errorln(err) @@ -104,14 +105,6 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, req *http.Request) { return } - if err = ev.Sync(l.db, obj.ID); err != nil { - l.logger.Errorln(err) - - w.WriteHeader(http.StatusInternalServerError) - _, _ = fmt.Fprintln(w, err.Error()) - return - } - createIncident := ev.Severity != event.SeverityNone && ev.Severity != event.SeverityOK currentIncident, created, err := incident.GetCurrent(l.db, obj, l.logs.GetChildLogger("incident"), l.runtimeConfig, l.configFile, createIncident) if err != nil { @@ -144,12 +137,23 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, req *http.Request) { l.logger.Infof("processing event") - if err := currentIncident.ProcessEvent(ev, created); err != nil { + if err := currentIncident.ProcessEvent(tx, ev, created); err != nil { w.WriteHeader(http.StatusInternalServerError) _, _ = fmt.Fprintln(w, err) return } + if err = tx.Commit(); err != nil { + l.logger.Errorw( + "can't commit db transaction", zap.String("object", obj.DisplayName()), + zap.String("incident", currentIncident.String()), zap.Error(err), + ) + + w.WriteHeader(http.StatusInternalServerError) + _, _ = fmt.Fprintln(w, "can't commit db transaction") + return + } + w.WriteHeader(http.StatusOK) _, _ = fmt.Fprintln(w, "received event") _, _ = fmt.Fprintln(w) diff --git a/internal/object/object.go b/internal/object/object.go index c33535f76..bea7b453c 100644 --- a/internal/object/object.go +++ b/internal/object/object.go @@ -2,7 +2,6 @@ package object import ( "bytes" - "context" "crypto/sha256" "encoding/hex" "encoding/json" @@ -10,6 +9,7 @@ import ( "github.com/icinga/icinga-notifications/internal/utils" "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/types" + "github.com/jmoiron/sqlx" "regexp" "sort" "strings" @@ -31,7 +31,7 @@ var ( cacheMu sync.Mutex ) -func FromTags(db *icingadb.DB, tags map[string]string) (*Object, error) { +func FromTags(db *icingadb.DB, tx *sqlx.Tx, tags map[string]string) (*Object, error) { id := ID(tags) cacheMu.Lock() @@ -55,7 +55,7 @@ func FromTags(db *icingadb.DB, tags map[string]string) (*Object, error) { dbObj.Service = utils.ToDBString(service) } - _, err := object.db.NamedExec(stmt, dbObj) + _, err := tx.NamedExec(stmt, dbObj) if err != nil { return nil, fmt.Errorf("failed to insert object: %s", err) } @@ -104,7 +104,7 @@ func (o *Object) String() string { return b.String() } -func (o *Object) UpdateMetadata(source int64, name string, url types.String, extraTags map[string]string) error { +func (o *Object) UpdateMetadata(tx *sqlx.Tx, source int64, name string, url types.String, extraTags map[string]string) error { o.mu.Lock() defer o.mu.Unlock() @@ -117,35 +117,25 @@ func (o *Object) UpdateMetadata(source int64, name string, url types.String, ext } stmt, _ := o.db.BuildUpsertStmt(&SourceMetadata{}) - _, err := o.db.NamedExec(stmt, sourceMetadata) + _, err := tx.NamedExec(stmt, sourceMetadata) if err != nil { - return fmt.Errorf("failed to upsert object metadata: %s", err) + return err } - tx, err := o.db.BeginTxx(context.TODO(), nil) - if err != nil { - return fmt.Errorf("failed to start transaction for object extra tags: %s", err) - } - defer tx.Rollback() - extraTag := &ExtraTagRow{ObjectId: o.ID, SourceId: source} _, err = tx.NamedExec(`DELETE FROM "object_extra_tag" WHERE "object_id" = :object_id AND "source_id" = :source_id`, extraTag) if err != nil { - return fmt.Errorf("failed to delete object extra tags: %s", err) + return err } if len(extraTags) > 0 { stmt, _ = o.db.BuildInsertStmt(extraTag) _, err = tx.NamedExec(stmt, sourceMetadata.mapToExtraTags()) if err != nil { - return fmt.Errorf("failed to insert object extra tags: %s", err) + return err } } - if err = tx.Commit(); err != nil { - return fmt.Errorf("failed to commit object extrag tags transaction: %s", err) - } - if o.Metadata == nil { o.Metadata = make(map[int64]*SourceMetadata) } diff --git a/internal/utils/utils.go b/internal/utils/utils.go index 783bf8b7b..6b3a73a32 100644 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -7,6 +7,7 @@ import ( "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/types" "github.com/icinga/icingadb/pkg/utils" + "github.com/jmoiron/sqlx" "strings" ) @@ -29,10 +30,10 @@ func BuildInsertStmtWithout(db *icingadb.DB, into interface{}, withoutColumn str } // InsertAndFetchId executes the given query and fetches the last inserted ID. -func InsertAndFetchId(db *icingadb.DB, stmt string, args any) (int64, error) { +func InsertAndFetchId(tx *sqlx.Tx, stmt string, args any) (int64, error) { var lastInsertId int64 - if db.DriverName() == driver.PostgreSQL { - preparedStmt, err := db.PrepareNamed(stmt + " RETURNING id") + if tx.DriverName() == driver.PostgreSQL { + preparedStmt, err := tx.PrepareNamed(stmt + " RETURNING id") if err != nil { return 0, err } @@ -43,7 +44,7 @@ func InsertAndFetchId(db *icingadb.DB, stmt string, args any) (int64, error) { return 0, fmt.Errorf("failed to insert entry for type %T: %s", args, err) } } else { - result, err := db.NamedExec(stmt, args) + result, err := tx.NamedExec(stmt, args) if err != nil { return 0, fmt.Errorf("failed to insert entry for type %T: %s", args, err) }