diff --git a/internal/event/event.go b/internal/event/event.go index c8c94f4cc..e2a99c0f9 100644 --- a/internal/event/event.go +++ b/internal/event/event.go @@ -4,7 +4,6 @@ import ( "bytes" "fmt" "github.com/icinga/icinga-notifications/internal/utils" - "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/types" "time" ) @@ -73,30 +72,6 @@ func (e *Event) FullString() string { return b.String() } -// Sync transforms this event to *event.EventRow and calls its sync method. -func (e *Event) Sync(db *icingadb.DB, objectId types.Binary) error { - if e.ID != 0 { - return nil - } - - eventRow := &EventRow{ - Time: types.UnixMilli(e.Time), - SourceID: e.SourceId, - ObjectID: objectId, - Type: utils.ToDBString(e.Type), - Severity: e.Severity, - Username: utils.ToDBString(e.Username), - Message: utils.ToDBString(e.Message), - } - - err := eventRow.Sync(db) - if err == nil { - e.ID = eventRow.ID - } - - return err -} - // EventRow represents a single event database row and isn't an in-memory representation of an event. type EventRow struct { ID int64 `db:"id"` @@ -114,15 +89,14 @@ func (er *EventRow) TableName() string { return "event" } -// Sync synchronizes this types data to the database. -// Returns an error when any of the database operation fails. -func (er *EventRow) Sync(db *icingadb.DB) error { - eventId, err := utils.InsertAndFetchId(db, utils.BuildInsertStmtWithout(db, er, "id"), er) - if err != nil { - return err +func NewEventRow(e Event, objectId types.Binary) *EventRow { + return &EventRow{ + Time: types.UnixMilli(e.Time), + SourceID: e.SourceId, + ObjectID: objectId, + Type: utils.ToDBString(e.Type), + Severity: e.Severity, + Username: utils.ToDBString(e.Username), + Message: utils.ToDBString(e.Message), } - - er.ID = eventId - - return nil } diff --git a/internal/incident/db_types.go b/internal/incident/db_types.go index 9dfa81de1..4ef2bdaea 100644 --- a/internal/incident/db_types.go +++ b/internal/incident/db_types.go @@ -7,6 +7,7 @@ import ( "github.com/icinga/icinga-notifications/internal/utils" "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/types" + "github.com/jmoiron/sqlx" ) type IncidentRow struct { @@ -32,15 +33,15 @@ func (i *IncidentRow) Upsert() interface{} { // Sync synchronizes incidents to the database. // Fetches the last inserted incident id and modifies this incident's id. // Returns an error on database failure. -func (i *IncidentRow) Sync(db *icingadb.DB, upsert bool) error { +func (i *IncidentRow) Sync(tx *sqlx.Tx, db *icingadb.DB, upsert bool) error { if upsert { stmt, _ := db.BuildUpsertStmt(i) - _, err := db.NamedExec(stmt, i) + _, err := tx.NamedExec(stmt, i) if err != nil { return fmt.Errorf("failed to upsert incident: %s", err) } } else { - incidentId, err := utils.InsertAndFetchId(db, utils.BuildInsertStmtWithout(db, i, "id"), i) + incidentId, err := utils.InsertAndFetchId(tx, utils.BuildInsertStmtWithout(db, i, "id"), i) if err != nil { return err } diff --git a/internal/incident/incident.go b/internal/incident/incident.go index d375b7218..98bf81f0b 100644 --- a/internal/incident/incident.go +++ b/internal/incident/incident.go @@ -12,6 +12,7 @@ import ( "github.com/icinga/icinga-notifications/internal/utils" "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/types" + "github.com/jmoiron/sqlx" "go.uber.org/zap" "sync" "time" @@ -73,15 +74,34 @@ func (i *Incident) HasManager() bool { } // ProcessEvent processes the given event for the current incident. -func (i *Incident) ProcessEvent(ev event.Event, created bool) error { +func (i *Incident) ProcessEvent(tx *sqlx.Tx, ev event.Event, created bool) error { i.Lock() defer i.Unlock() i.runtimeConfig.RLock() defer i.runtimeConfig.RUnlock() + err := i.Object.UpdateMetadata(tx, ev.SourceId, ev.Name, utils.ToDBString(ev.URL), ev.ExtraTags) + if err != nil { + i.logger.Errorw("Can't update object metadata", zap.Error(err)) + + return errors.New("can't update object metadata") + } + + if ev.ID == 0 { + eventRow := event.NewEventRow(ev, i.Object.ID) + eventID, err := utils.InsertAndFetchId(tx, utils.BuildInsertStmtWithout(i.db, eventRow, "id"), eventRow) + if err != nil { + i.logger.Errorw("Failed to insert event and fetch its ID", zap.Error(err)) + + return errors.New("can't insert event and fetch its ID") + } + + ev.ID = eventID + } + if created { - err := i.processIncidentOpenedEvent(ev) + err := i.processIncidentOpenedEvent(tx, ev) if err != nil { return err } @@ -89,24 +109,24 @@ func (i *Incident) ProcessEvent(ev event.Event, created bool) error { i.logger = i.logger.With(zap.String("incident", i.String())) } - if err := i.AddEvent(&ev); err != nil { + if err := i.AddEvent(tx, &ev); err != nil { i.logger.Errorw("Can't insert incident event to the database", zap.Error(err)) return errors.New("can't insert incident event to the database") } if ev.Type == event.TypeAcknowledgement { - return i.processAcknowledgementEvent(ev) + return i.processAcknowledgementEvent(tx, ev) } - causedBy, err := i.processSeverityChangedEvent(ev) + causedBy, err := i.processSeverityChangedEvent(tx, ev) if err != nil { return err } // Check if any (additional) rules match this object. Filters of rules that already have a state don't have // to be checked again, these rules already matched and stay effective for the ongoing incident. - causedBy, err = i.evaluateRules(ev.ID, causedBy) + causedBy, err = i.evaluateRules(tx, ev.ID, causedBy) if err != nil { return err } @@ -114,10 +134,10 @@ func (i *Incident) ProcessEvent(ev event.Event, created bool) error { // Re-evaluate escalations based on the newly evaluated rules. i.evaluateEscalations() - return i.notifyContacts(&ev, causedBy) + return i.notifyContacts(tx, &ev, causedBy) } -func (i *Incident) processSeverityChangedEvent(ev event.Event) (types.Int, error) { +func (i *Incident) processSeverityChangedEvent(tx *sqlx.Tx, ev event.Event) (types.Int, error) { oldIncidentSeverity := i.Severity() oldSourceSeverity := i.SeverityBySource[ev.SourceId] if oldSourceSeverity == event.SeverityNone { @@ -143,14 +163,14 @@ func (i *Incident) processSeverityChangedEvent(ev event.Event) (types.Int, error OldSeverity: oldSourceSeverity, Message: utils.ToDBString(ev.Message), } - causedByHistoryId, err := i.AddHistory(history, true) + causedByHistoryId, err := i.AddHistory(tx, history, true) if err != nil { i.logger.Errorw("Can't insert source severity changed history to the database", zap.Error(err)) return types.Int{}, errors.New("can't insert source severity changed history to the database") } - if err = i.AddSourceSeverity(ev.Severity, ev.SourceId); err != nil { + if err = i.AddSourceSeverity(tx, ev.Severity, ev.SourceId); err != nil { i.logger.Errorw("Failed to upsert source severity", zap.Error(err)) return types.Int{}, errors.New("failed to upsert source severity") @@ -166,7 +186,7 @@ func (i *Incident) processSeverityChangedEvent(ev event.Event) (types.Int, error "Incident severity changed from %s to %s", oldIncidentSeverity.String(), newIncidentSeverity.String(), ) - if err = i.Sync(); err != nil { + if err = i.Sync(tx); err != nil { i.logger.Errorw("Failed to update incident severity", zap.Error(err)) return types.Int{}, errors.New("failed to update incident severity") @@ -180,7 +200,7 @@ func (i *Incident) processSeverityChangedEvent(ev event.Event) (types.Int, error OldSeverity: oldIncidentSeverity, CausedByIncidentHistoryID: causedByHistoryId, } - if causedByHistoryId, err = i.AddHistory(history, true); err != nil { + if causedByHistoryId, err = i.AddHistory(tx, history, true); err != nil { i.logger.Errorw("Failed to insert incident severity changed history", zap.Error(err)) return types.Int{}, errors.New("failed to insert incident severity changed history") @@ -194,7 +214,7 @@ func (i *Incident) processSeverityChangedEvent(ev event.Event) (types.Int, error RemoveCurrent(i.Object) incidentRow := &IncidentRow{ID: i.incidentRowID, RecoveredAt: types.UnixMilli(i.RecoveredAt)} - _, err = i.db.NamedExec(`UPDATE "incident" SET "recovered_at" = :recovered_at WHERE id = :id`, incidentRow) + _, err = tx.NamedExec(`UPDATE "incident" SET "recovered_at" = :recovered_at WHERE id = :id`, incidentRow) if err != nil { i.logger.Errorw("Failed to close incident", zap.Error(err)) @@ -206,7 +226,8 @@ func (i *Incident) processSeverityChangedEvent(ev event.Event) (types.Int, error Time: types.UnixMilli(i.RecoveredAt), Type: Closed, } - _, err = i.AddHistory(history, false) + + _, err = i.AddHistory(tx, history, false) if err != nil { i.logger.Errorw("Can't insert incident closed history to the database", zap.Error(err)) @@ -217,9 +238,9 @@ func (i *Incident) processSeverityChangedEvent(ev event.Event) (types.Int, error return causedByHistoryId, nil } -func (i *Incident) processIncidentOpenedEvent(ev event.Event) error { +func (i *Incident) processIncidentOpenedEvent(tx *sqlx.Tx, ev event.Event) error { i.StartedAt = ev.Time - if err := i.Sync(); err != nil { + if err := i.Sync(tx); err != nil { i.logger.Errorw("Can't insert incident to the database", zap.Error(err)) return errors.New("can't insert incident to the database") @@ -232,7 +253,7 @@ func (i *Incident) processIncidentOpenedEvent(ev event.Event) error { EventID: utils.ToDBInt(ev.ID), } - if _, err := i.AddHistory(historyRow, false); err != nil { + if _, err := i.AddHistory(tx, historyRow, false); err != nil { i.logger.Errorw("Can't insert incident opened history event", zap.Error(err)) return errors.New("can't insert incident opened history event") @@ -244,7 +265,7 @@ func (i *Incident) processIncidentOpenedEvent(ev event.Event) error { // evaluateRules evaluates all the configured rules for this *incident.Object and // generates history entries for each matched rule. // Returns error on database failure. -func (i *Incident) evaluateRules(eventID int64, causedBy types.Int) (types.Int, error) { +func (i *Incident) evaluateRules(tx *sqlx.Tx, eventID int64, causedBy types.Int) (types.Int, error) { if i.Rules == nil { i.Rules = make(map[int64]struct{}) } @@ -269,7 +290,7 @@ func (i *Incident) evaluateRules(eventID int64, causedBy types.Int) (types.Int, i.Rules[r.ID] = struct{}{} i.logger.Infof("Rule %q matches", r.Name) - err := i.AddRuleMatched(r) + err := i.AddRuleMatched(tx, r) if err != nil { i.logger.Errorw("Failed to upsert incident rule", zap.String("rule", r.Name), zap.Error(err)) @@ -283,7 +304,7 @@ func (i *Incident) evaluateRules(eventID int64, causedBy types.Int) (types.Int, Type: RuleMatched, CausedByIncidentHistoryID: causedBy, } - insertedID, err := i.AddHistory(history, true) + insertedID, err := i.AddHistory(tx, history, true) if err != nil { i.logger.Errorw("Failed to insert rule matched incident history", zap.String("rule", r.Name), zap.Error(err)) @@ -349,7 +370,7 @@ func (i *Incident) evaluateEscalations() { // notifyContacts evaluates the incident.EscalationState and checks if escalations need to be triggered // as well as builds the incident recipients along with their channel types and sends notifications based on that. // Returns error on database failure. -func (i *Incident) notifyContacts(ev *event.Event, causedBy types.Int) error { +func (i *Incident) notifyContacts(tx *sqlx.Tx, ev *event.Event, causedBy types.Int) error { managed := i.HasManager() contactChannels := make(map[*recipient.Contact]map[string]struct{}) @@ -372,7 +393,7 @@ func (i *Incident) notifyContacts(ev *event.Event, causedBy types.Int) error { r := i.runtimeConfig.Rules[escalation.RuleID] i.logger.Infof("Rule %q reached escalation %q", r.Name, escalation.DisplayName()) - err := i.AddEscalationTriggered(state) + err := i.AddEscalationTriggered(tx, state) if err != nil { i.logger.Errorw( "Failed to upsert escalation state", zap.String("rule", r.Name), @@ -390,7 +411,7 @@ func (i *Incident) notifyContacts(ev *event.Event, causedBy types.Int) error { Type: EscalationTriggered, CausedByIncidentHistoryID: causedBy, } - causedBy, err = i.AddHistory(history, true) + causedBy, err = i.AddHistory(tx, history, true) if err != nil { i.logger.Errorw( "Failed to insert escalation triggered incident history", zap.String("rule", r.Name), @@ -400,7 +421,7 @@ func (i *Incident) notifyContacts(ev *event.Event, causedBy types.Int) error { return errors.New("failed to insert escalation triggered incident history") } - err = i.AddRecipient(escalation, ev.ID) + err = i.AddRecipient(tx, escalation, ev.ID) if err != nil { return err } @@ -460,7 +481,7 @@ func (i *Incident) notifyContacts(ev *event.Event, causedBy types.Int) error { hr.ChannelType = utils.ToDBString(chType) - _, err := i.AddHistory(hr, false) + _, err := i.AddHistory(tx, hr, false) if err != nil { i.logger.Errorw( "Failed to insert contact notified incident history", zap.String("contact", contact.String()), @@ -495,10 +516,10 @@ func (i *Incident) notifyContacts(ev *event.Event, causedBy types.Int) error { return nil } -// processAcknowledgmentEvent processes the given ack event. +// processAcknowledgementEvent processes the given ack event. // Promotes the ack author to incident.RoleManager if it's not already the case and generates a history entry. // Returns error on database failure. -func (i *Incident) processAcknowledgementEvent(ev event.Event) error { +func (i *Incident) processAcknowledgementEvent(tx *sqlx.Tx, ev event.Event) error { contact := i.runtimeConfig.GetContact(ev.Username) if contact == nil { i.logger.Warnw("Ignoring acknowledgement event from an unknown author", zap.String("author", ev.Username)) @@ -533,7 +554,7 @@ func (i *Incident) processAcknowledgementEvent(ev event.Event) error { Message: utils.ToDBString(ev.Message), } - _, err := i.AddHistory(hr, false) + _, err := i.AddHistory(tx, hr, false) if err != nil { i.logger.Errorw( "Failed to add recipient role changed history", zap.String("recipient", contact.String()), zap.Error(err), @@ -545,7 +566,7 @@ func (i *Incident) processAcknowledgementEvent(ev event.Event) error { cr := &ContactRow{IncidentID: hr.IncidentID, Key: recipientKey, Role: newRole} stmt, _ := i.db.BuildUpsertStmt(cr) - _, err = i.db.NamedExec(stmt, cr) + _, err = tx.NamedExec(stmt, cr) if err != nil { i.logger.Errorw( "Failed to upsert incident contact", zap.String("contact", contact.String()), zap.Error(err), diff --git a/internal/incident/sync.go b/internal/incident/sync.go index 3de45150f..09126d21f 100644 --- a/internal/incident/sync.go +++ b/internal/incident/sync.go @@ -7,6 +7,7 @@ import ( "github.com/icinga/icinga-notifications/internal/rule" "github.com/icinga/icinga-notifications/internal/utils" "github.com/icinga/icingadb/pkg/types" + "github.com/jmoiron/sqlx" "go.uber.org/zap" "time" ) @@ -14,7 +15,7 @@ import ( // Sync initiates an *incident.IncidentRow from the current incident state and syncs it with the database. // Before syncing any incident related database entries, this method should be called at least once. // Returns an error on db failure. -func (i *Incident) Sync() error { +func (i *Incident) Sync(tx *sqlx.Tx) error { incidentRow := &IncidentRow{ ID: i.incidentRowID, ObjectID: i.Object.ID, @@ -23,7 +24,7 @@ func (i *Incident) Sync() error { Severity: i.Severity(), } - err := incidentRow.Sync(i.db, i.incidentRowID != 0) + err := incidentRow.Sync(tx, i.db, i.incidentRowID != 0) if err != nil { return err } @@ -33,19 +34,19 @@ func (i *Incident) Sync() error { return nil } -func (i *Incident) AddHistory(historyRow *HistoryRow, fetchId bool) (types.Int, error) { +func (i *Incident) AddHistory(tx *sqlx.Tx, historyRow *HistoryRow, fetchId bool) (types.Int, error) { historyRow.IncidentID = i.incidentRowID stmt := utils.BuildInsertStmtWithout(i.db, historyRow, "id") if fetchId { - historyId, err := utils.InsertAndFetchId(i.db, stmt, historyRow) + historyId, err := utils.InsertAndFetchId(tx, stmt, historyRow) if err != nil { return types.Int{}, err } return utils.ToDBInt(historyId), nil } else { - _, err := i.db.NamedExec(stmt, historyRow) + _, err := tx.NamedExec(stmt, historyRow) if err != nil { return types.Int{}, err } @@ -54,27 +55,27 @@ func (i *Incident) AddHistory(historyRow *HistoryRow, fetchId bool) (types.Int, return types.Int{}, nil } -func (i *Incident) AddEscalationTriggered(state *EscalationState) error { +func (i *Incident) AddEscalationTriggered(tx *sqlx.Tx, state *EscalationState) error { state.IncidentID = i.incidentRowID stmt, _ := i.db.BuildUpsertStmt(state) - _, err := i.db.NamedExec(stmt, state) + _, err := tx.NamedExec(stmt, state) return err } // AddEvent Inserts incident history record to the database and returns an error on db failure. -func (i *Incident) AddEvent(ev *event.Event) error { +func (i *Incident) AddEvent(tx *sqlx.Tx, ev *event.Event) error { ie := &EventRow{IncidentID: i.incidentRowID, EventID: ev.ID} stmt, _ := i.db.BuildInsertStmt(ie) - _, err := i.db.NamedExec(stmt, ie) + _, err := tx.NamedExec(stmt, ie) return err } // AddRecipient adds recipient from the given *rule.Escalation to this incident. // Syncs also all the recipients with the database and returns an error on db failure. -func (i *Incident) AddRecipient(escalation *rule.Escalation, eventId int64) error { +func (i *Incident) AddRecipient(tx *sqlx.Tx, escalation *rule.Escalation, eventId int64) error { newRole := RoleRecipient if i.HasManager() { newRole = RoleSubscriber @@ -107,7 +108,7 @@ func (i *Incident) AddRecipient(escalation *rule.Escalation, eventId int64) erro OldRecipientRole: oldRole, } - _, err := i.AddHistory(hr, false) + _, err := i.AddHistory(tx, hr, false) if err != nil { i.logger.Errorw( "Failed to insert recipient role changed incident history", zap.String("escalation", escalation.DisplayName()), @@ -121,7 +122,7 @@ func (i *Incident) AddRecipient(escalation *rule.Escalation, eventId int64) erro } stmt, _ := i.db.BuildUpsertStmt(cr) - _, err := i.db.NamedExec(stmt, cr) + _, err := tx.NamedExec(stmt, cr) if err != nil { i.logger.Errorw( "Failed to upsert incident recipient", zap.String("escalation", escalation.DisplayName()), @@ -137,15 +138,15 @@ func (i *Incident) AddRecipient(escalation *rule.Escalation, eventId int64) erro // AddRuleMatched syncs the given *rule.Rule to the database. // Returns an error on database failure. -func (i *Incident) AddRuleMatched(r *rule.Rule) error { +func (i *Incident) AddRuleMatched(tx *sqlx.Tx, r *rule.Rule) error { rr := &RuleRow{IncidentID: i.incidentRowID, RuleID: r.ID} stmt, _ := i.db.BuildUpsertStmt(rr) - _, err := i.db.NamedExec(stmt, rr) + _, err := tx.NamedExec(stmt, rr) return err } -func (i *Incident) AddSourceSeverity(severity event.Severity, sourceID int64) error { +func (i *Incident) AddSourceSeverity(tx *sqlx.Tx, severity event.Severity, sourceID int64) error { i.SeverityBySource[sourceID] = severity sourceSeverity := &SourceSeverity{ @@ -155,7 +156,7 @@ func (i *Incident) AddSourceSeverity(severity event.Severity, sourceID int64) er } stmt, _ := i.db.BuildUpsertStmt(sourceSeverity) - _, err := i.db.NamedExec(stmt, sourceSeverity) + _, err := tx.NamedExec(stmt, sourceSeverity) return err } diff --git a/internal/listener/listener.go b/internal/listener/listener.go index f29307bbf..9936e90b4 100644 --- a/internal/listener/listener.go +++ b/internal/listener/listener.go @@ -1,6 +1,7 @@ package listener import ( + "context" "crypto/subtle" "encoding/json" "fmt" @@ -8,7 +9,6 @@ import ( "github.com/icinga/icinga-notifications/internal/event" "github.com/icinga/icinga-notifications/internal/incident" "github.com/icinga/icinga-notifications/internal/object" - "github.com/icinga/icinga-notifications/internal/utils" "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/logging" "go.uber.org/zap" @@ -86,16 +86,17 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, req *http.Request) { } } - obj, err := object.FromTags(l.db, ev.Tags) + tx, err := l.db.BeginTxx(context.TODO(), nil) if err != nil { - l.logger.Errorln(err) + l.logger.Errorw("Can't start a db transaction", zap.Error(err)) w.WriteHeader(http.StatusInternalServerError) - _, _ = fmt.Fprintln(w, err.Error()) + _, _ = fmt.Fprintln(w, "can't start a db transaction") return } + defer func() { _ = tx.Rollback() }() - err = obj.UpdateMetadata(ev.SourceId, ev.Name, utils.ToDBString(ev.URL), ev.ExtraTags) + obj, err := object.FromTags(l.db, tx, ev.Tags) if err != nil { l.logger.Errorln(err) @@ -104,14 +105,6 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, req *http.Request) { return } - if err = ev.Sync(l.db, obj.ID); err != nil { - l.logger.Errorln(err) - - w.WriteHeader(http.StatusInternalServerError) - _, _ = fmt.Fprintln(w, err.Error()) - return - } - createIncident := ev.Severity != event.SeverityNone && ev.Severity != event.SeverityOK currentIncident, created, err := incident.GetCurrent(l.db, obj, l.logs.GetChildLogger("incident"), l.runtimeConfig, l.configFile, createIncident) if err != nil { @@ -144,12 +137,23 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, req *http.Request) { l.logger.Infof("Processing event") - if err := currentIncident.ProcessEvent(ev, created); err != nil { + if err := currentIncident.ProcessEvent(tx, ev, created); err != nil { w.WriteHeader(http.StatusInternalServerError) _, _ = fmt.Fprintln(w, err) return } + if err = tx.Commit(); err != nil { + l.logger.Errorw( + "Can't commit db transaction", zap.String("object", obj.DisplayName()), + zap.String("incident", currentIncident.String()), zap.Error(err), + ) + + w.WriteHeader(http.StatusInternalServerError) + _, _ = fmt.Fprintln(w, "can't commit db transaction") + return + } + w.WriteHeader(http.StatusOK) _, _ = fmt.Fprintln(w, "event processed successfully") _, _ = fmt.Fprintln(w) diff --git a/internal/object/object.go b/internal/object/object.go index c33535f76..bea7b453c 100644 --- a/internal/object/object.go +++ b/internal/object/object.go @@ -2,7 +2,6 @@ package object import ( "bytes" - "context" "crypto/sha256" "encoding/hex" "encoding/json" @@ -10,6 +9,7 @@ import ( "github.com/icinga/icinga-notifications/internal/utils" "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/types" + "github.com/jmoiron/sqlx" "regexp" "sort" "strings" @@ -31,7 +31,7 @@ var ( cacheMu sync.Mutex ) -func FromTags(db *icingadb.DB, tags map[string]string) (*Object, error) { +func FromTags(db *icingadb.DB, tx *sqlx.Tx, tags map[string]string) (*Object, error) { id := ID(tags) cacheMu.Lock() @@ -55,7 +55,7 @@ func FromTags(db *icingadb.DB, tags map[string]string) (*Object, error) { dbObj.Service = utils.ToDBString(service) } - _, err := object.db.NamedExec(stmt, dbObj) + _, err := tx.NamedExec(stmt, dbObj) if err != nil { return nil, fmt.Errorf("failed to insert object: %s", err) } @@ -104,7 +104,7 @@ func (o *Object) String() string { return b.String() } -func (o *Object) UpdateMetadata(source int64, name string, url types.String, extraTags map[string]string) error { +func (o *Object) UpdateMetadata(tx *sqlx.Tx, source int64, name string, url types.String, extraTags map[string]string) error { o.mu.Lock() defer o.mu.Unlock() @@ -117,35 +117,25 @@ func (o *Object) UpdateMetadata(source int64, name string, url types.String, ext } stmt, _ := o.db.BuildUpsertStmt(&SourceMetadata{}) - _, err := o.db.NamedExec(stmt, sourceMetadata) + _, err := tx.NamedExec(stmt, sourceMetadata) if err != nil { - return fmt.Errorf("failed to upsert object metadata: %s", err) + return err } - tx, err := o.db.BeginTxx(context.TODO(), nil) - if err != nil { - return fmt.Errorf("failed to start transaction for object extra tags: %s", err) - } - defer tx.Rollback() - extraTag := &ExtraTagRow{ObjectId: o.ID, SourceId: source} _, err = tx.NamedExec(`DELETE FROM "object_extra_tag" WHERE "object_id" = :object_id AND "source_id" = :source_id`, extraTag) if err != nil { - return fmt.Errorf("failed to delete object extra tags: %s", err) + return err } if len(extraTags) > 0 { stmt, _ = o.db.BuildInsertStmt(extraTag) _, err = tx.NamedExec(stmt, sourceMetadata.mapToExtraTags()) if err != nil { - return fmt.Errorf("failed to insert object extra tags: %s", err) + return err } } - if err = tx.Commit(); err != nil { - return fmt.Errorf("failed to commit object extrag tags transaction: %s", err) - } - if o.Metadata == nil { o.Metadata = make(map[int64]*SourceMetadata) } diff --git a/internal/utils/utils.go b/internal/utils/utils.go index 783bf8b7b..6b3a73a32 100644 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -7,6 +7,7 @@ import ( "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/types" "github.com/icinga/icingadb/pkg/utils" + "github.com/jmoiron/sqlx" "strings" ) @@ -29,10 +30,10 @@ func BuildInsertStmtWithout(db *icingadb.DB, into interface{}, withoutColumn str } // InsertAndFetchId executes the given query and fetches the last inserted ID. -func InsertAndFetchId(db *icingadb.DB, stmt string, args any) (int64, error) { +func InsertAndFetchId(tx *sqlx.Tx, stmt string, args any) (int64, error) { var lastInsertId int64 - if db.DriverName() == driver.PostgreSQL { - preparedStmt, err := db.PrepareNamed(stmt + " RETURNING id") + if tx.DriverName() == driver.PostgreSQL { + preparedStmt, err := tx.PrepareNamed(stmt + " RETURNING id") if err != nil { return 0, err } @@ -43,7 +44,7 @@ func InsertAndFetchId(db *icingadb.DB, stmt string, args any) (int64, error) { return 0, fmt.Errorf("failed to insert entry for type %T: %s", args, err) } } else { - result, err := db.NamedExec(stmt, args) + result, err := tx.NamedExec(stmt, args) if err != nil { return 0, fmt.Errorf("failed to insert entry for type %T: %s", args, err) }