From 58abc764589048ec99f2063d3dde079f9afa0172 Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Thu, 11 May 2023 17:08:39 +0200 Subject: [PATCH] Load all open incidents from DB on daemon startup --- cmd/icinga-notifications-daemon/main.go | 6 +++ internal/incident/incident.go | 23 ++++++++++++ internal/incident/incidents.go | 25 ++++--------- internal/listener/listener.go | 2 +- internal/object/db_types.go | 50 +++++++++++++++++++++++++ 5 files changed, 87 insertions(+), 19 deletions(-) diff --git a/cmd/icinga-notifications-daemon/main.go b/cmd/icinga-notifications-daemon/main.go index 993ece2d0..2df30ff70 100644 --- a/cmd/icinga-notifications-daemon/main.go +++ b/cmd/icinga-notifications-daemon/main.go @@ -5,6 +5,7 @@ import ( "flag" "fmt" "github.com/icinga/icinga-notifications/internal/config" + "github.com/icinga/icinga-notifications/internal/incident" "github.com/icinga/icinga-notifications/internal/listener" "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/utils" @@ -62,6 +63,11 @@ func main() { go runtimeConfig.PeriodicUpdates(context.TODO(), 1*time.Second) + err = incident.LoadOpenIncidents(context.TODO(), db, logs.GetChildLogger("incident"), runtimeConfig, conf.Icingaweb2URL) + if err != nil { + logger.Fatalw("Can't load incidents from database", zap.Error(err)) + } + if err := listener.NewListener(db, conf, runtimeConfig, logs).Run(); err != nil { panic(err) } diff --git a/internal/incident/incident.go b/internal/incident/incident.go index a1f66cf50..857e61b9d 100644 --- a/internal/incident/incident.go +++ b/internal/incident/incident.go @@ -571,6 +571,29 @@ func (i *Incident) processAcknowledgementEvent(ctx context.Context, tx *sqlx.Tx, return nil } +// ReloadRecipients reloads the current incident recipients from the database. +// Returns error on database failure. +func (i *Incident) ReloadRecipients(ctx context.Context, tx *sqlx.Tx) error { + i.Lock() + defer i.Unlock() + + contact := &ContactRow{} + var contacts []*ContactRow + err := tx.SelectContext(ctx, &contacts, tx.Rebind(i.db.BuildSelectStmt(contact, contact)+` WHERE "incident_id" = ?`), i.ID()) + if err != nil { + return fmt.Errorf("failed to fetch incident recipients: %w", err) + } + + recipients := make(map[recipient.Key]*RecipientState) + for _, contact := range contacts { + recipients[contact.Key] = &RecipientState{Role: contact.Role} + } + + i.Recipients = recipients + + return nil +} + type EscalationState struct { IncidentID int64 `db:"incident_id"` RuleEscalationID int64 `db:"rule_escalation_id"` diff --git a/internal/incident/incidents.go b/internal/incident/incidents.go index 4c11406d7..c7bbfce32 100644 --- a/internal/incident/incidents.go +++ b/internal/incident/incidents.go @@ -10,6 +10,7 @@ import ( "github.com/icinga/icinga-notifications/internal/recipient" "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/logging" + "github.com/jmoiron/sqlx" "golang.org/x/sync/errgroup" "sync" ) @@ -20,7 +21,7 @@ var ( ) func GetCurrent( - ctx context.Context, db *icingadb.DB, obj *object.Object, logger *logging.Logger, runtimeConfig *config.RuntimeConfig, configFile *config.ConfigFile, create bool, + ctx context.Context, db *icingadb.DB, tx *sqlx.Tx, obj *object.Object, logger *logging.Logger, runtimeConfig *config.RuntimeConfig, configFile *config.ConfigFile, create bool, ) (*Incident, bool, error) { currentIncidentsMu.Lock() defer currentIncidentsMu.Unlock() @@ -35,7 +36,7 @@ func GetCurrent( incident.EscalationState = make(map[escalationID]*EscalationState) incident.Recipients = make(map[recipient.Key]*RecipientState) - err := db.QueryRowxContext(ctx, db.Rebind(db.BuildSelectStmt(ir, ir)+` WHERE "object_id" = ? AND "recovered_at" IS NULL`), obj.ID).StructScan(ir) + err := tx.QueryRowxContext(ctx, db.Rebind(db.BuildSelectStmt(ir, ir)+` WHERE "object_id" = ? AND "recovered_at" IS NULL`), obj.ID).StructScan(ir) if err != nil && err != sql.ErrNoRows { return nil, false, fmt.Errorf("incident query failed with: %w", err) } else if err == nil { @@ -46,7 +47,7 @@ func GetCurrent( g.Go(func() error { sourceSeverity := &SourceSeverity{IncidentID: ir.ID} var sources []SourceSeverity - err := db.SelectContext( + err := tx.SelectContext( childCtx, &sources, db.Rebind(db.BuildSelectStmt(sourceSeverity, sourceSeverity)+` WHERE "incident_id" = ? AND "severity" != ?`), ir.ID, event.SeverityOK, @@ -65,7 +66,7 @@ func GetCurrent( g.Go(func() error { state := &EscalationState{} var states []*EscalationState - err = db.SelectContext(childCtx, &states, db.Rebind(db.BuildSelectStmt(state, state)+` WHERE "incident_id" = ?`), ir.ID) + err = tx.SelectContext(childCtx, &states, db.Rebind(db.BuildSelectStmt(state, state)+` WHERE "incident_id" = ?`), ir.ID) if err != nil { return fmt.Errorf("failed to fetch incident rule escalation state: %w", err) } @@ -95,22 +96,10 @@ func GetCurrent( } if !created && currentIncident != nil { - currentIncident.Lock() - defer currentIncident.Unlock() - - contact := &ContactRow{} - var contacts []*ContactRow - err := db.SelectContext(ctx, &contacts, db.Rebind(db.BuildSelectStmt(contact, contact)+` WHERE "incident_id" = ?`), currentIncident.ID()) + err := currentIncident.ReloadRecipients(ctx, tx) if err != nil { - return nil, false, fmt.Errorf("failed to fetch incident recipients: %w", err) - } - - recipients := make(map[recipient.Key]*RecipientState) - for _, contact := range contacts { - recipients[contact.Key] = &RecipientState{Role: contact.Role} + return nil, false, err } - - currentIncident.Recipients = recipients } return currentIncident, created, nil diff --git a/internal/listener/listener.go b/internal/listener/listener.go index 2bef8fd9f..c5226cbc8 100644 --- a/internal/listener/listener.go +++ b/internal/listener/listener.go @@ -106,7 +106,7 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, req *http.Request) { } createIncident := ev.Severity != event.SeverityNone && ev.Severity != event.SeverityOK - currentIncident, created, err := incident.GetCurrent(ctx, l.db, obj, l.logs.GetChildLogger("incident"), l.runtimeConfig, l.configFile, createIncident) + currentIncident, created, err := incident.GetCurrent(ctx, l.db, tx, obj, l.logs.GetChildLogger("incident"), l.runtimeConfig, l.configFile, createIncident) if err != nil { w.WriteHeader(http.StatusInternalServerError) _, _ = fmt.Fprintln(w, err) diff --git a/internal/object/db_types.go b/internal/object/db_types.go index c4a4d6e40..8a5e7516d 100644 --- a/internal/object/db_types.go +++ b/internal/object/db_types.go @@ -1,7 +1,11 @@ package object import ( + "context" + "fmt" + "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/types" + "github.com/jmoiron/sqlx" ) // ExtraTagRow represents a single database object extra tag like `hostgroup/foo: null`. @@ -27,3 +31,49 @@ type ObjectRow struct { func (d *ObjectRow) TableName() string { return "object" } + +func FromDB(ctx context.Context, db *icingadb.DB, tx *sqlx.Tx, id types.Binary) (*Object, error) { + objectRow := &ObjectRow{ID: id} + err := tx.QueryRowxContext(ctx, db.Rebind(db.BuildSelectStmt(objectRow, objectRow)+` WHERE "id" = ?`), objectRow.ID).StructScan(objectRow) + if err != nil { + return nil, fmt.Errorf("failed to fetch object: %w", err) + } + + tags := map[string]string{"host": objectRow.Host} + if objectRow.Service.Valid { + tags["service"] = objectRow.Service.String + } + + metadata := make(map[int64]*SourceMetadata) + var sourceMetas []*SourceMetadata + err = tx.SelectContext(ctx, &sourceMetas, db.Rebind(db.BuildSelectStmt(&SourceMetadata{}, &SourceMetadata{})+` WHERE "object_id" = ?`), id) + if err != nil { + return nil, fmt.Errorf("failed to fetch source object: %w", err) + } + + var extraTags []*ExtraTagRow + err = tx.SelectContext( + ctx, &extraTags, + db.Rebind(db.BuildSelectStmt(&ExtraTagRow{}, &ExtraTagRow{})+` WHERE "object_id" = ?`), id, + ) + if err != nil { + return nil, fmt.Errorf("failed to fetch object extra tags: %w", err) + } + + for _, sourceMeta := range sourceMetas { + sourceMeta.ExtraTags = map[string]string{} + metadata[sourceMeta.SourceId] = sourceMeta + } + + for _, extraTag := range extraTags { + sourceMeta, ok := metadata[extraTag.SourceId] + if ok { + sourceMeta.ExtraTags[extraTag.Tag] = extraTag.Value + } + } + + obj := &Object{db: db, ID: id, Tags: tags, Metadata: metadata} + obj.UpdateCache() + + return obj, nil +}