From 59919d6f89a34bd7375dda7dae107daf756d88e2 Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Thu, 11 May 2023 17:08:39 +0200 Subject: [PATCH] Load all open incidents from DB on daemon startup --- cmd/icinga-notifications-daemon/main.go | 6 + internal/incident/incident.go | 23 ++++ internal/incident/incidents.go | 144 ++++++++++++++++++++---- internal/listener/listener.go | 2 +- internal/object/db_types.go | 50 ++++++++ 5 files changed, 205 insertions(+), 20 deletions(-) diff --git a/cmd/icinga-notifications-daemon/main.go b/cmd/icinga-notifications-daemon/main.go index 993ece2d0..2df30ff70 100644 --- a/cmd/icinga-notifications-daemon/main.go +++ b/cmd/icinga-notifications-daemon/main.go @@ -5,6 +5,7 @@ import ( "flag" "fmt" "github.com/icinga/icinga-notifications/internal/config" + "github.com/icinga/icinga-notifications/internal/incident" "github.com/icinga/icinga-notifications/internal/listener" "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/utils" @@ -62,6 +63,11 @@ func main() { go runtimeConfig.PeriodicUpdates(context.TODO(), 1*time.Second) + err = incident.LoadOpenIncidents(context.TODO(), db, logs.GetChildLogger("incident"), runtimeConfig, conf.Icingaweb2URL) + if err != nil { + logger.Fatalw("Can't load incidents from database", zap.Error(err)) + } + if err := listener.NewListener(db, conf, runtimeConfig, logs).Run(); err != nil { panic(err) } diff --git a/internal/incident/incident.go b/internal/incident/incident.go index c98271815..f1c952ca3 100644 --- a/internal/incident/incident.go +++ b/internal/incident/incident.go @@ -571,6 +571,29 @@ func (i *Incident) processAcknowledgmentEvent(ctx context.Context, tx *sqlx.Tx, return nil } +// ReloadRecipients reloads the current incident recipients from the database. +// Returns error on database failure. +func (i *Incident) ReloadRecipients(ctx context.Context, tx *sqlx.Tx) error { + i.Lock() + defer i.Unlock() + + contact := &ContactRow{} + var contacts []*ContactRow + err := tx.SelectContext(ctx, &contacts, tx.Rebind(i.db.BuildSelectStmt(contact, contact)+` WHERE "incident_id" = ?`), i.ID()) + if err != nil { + return fmt.Errorf("failed to fetch incident recipients: %w", err) + } + + recipients := make(map[recipient.Key]*RecipientState) + for _, contact := range contacts { + recipients[contact.Key] = &RecipientState{Role: contact.Role} + } + + i.Recipients = recipients + + return nil +} + type EscalationState struct { IncidentID int64 `db:"incident_id"` RuleEscalationID int64 `db:"rule_escalation_id"` diff --git a/internal/incident/incidents.go b/internal/incident/incidents.go index eb0e548dc..5401fd18b 100644 --- a/internal/incident/incidents.go +++ b/internal/incident/incidents.go @@ -10,6 +10,7 @@ import ( "github.com/icinga/icinga-notifications/internal/recipient" "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/logging" + "github.com/jmoiron/sqlx" "golang.org/x/sync/errgroup" "sync" ) @@ -19,7 +20,124 @@ var ( currentIncidentsMu sync.Mutex ) -func GetCurrent(ctx context.Context, db *icingadb.DB, obj *object.Object, logger *logging.Logger, runtimeConfig *config.RuntimeConfig, create bool) (*Incident, bool, error) { +func LoadOpenIncidents( + ctx context.Context, db *icingadb.DB, logger *logging.Logger, runtimeConfig *config.RuntimeConfig, icingaweb2Url string, +) error { + tx, err := db.BeginTxx(ctx, nil) + if err != nil { + return err + } + defer func() { _ = tx.Rollback() }() + + var incidentRows []*IncidentRow + err = tx.SelectContext(ctx, &incidentRows, db.BuildSelectStmt(&IncidentRow{}, &IncidentRow{})+` WHERE "recovered_at" IS NULL`) + if err != nil { + return fmt.Errorf("failed to fetch open incidents: %w", err) + } + + incidents := make(map[*object.Object]*Incident) + g, childCtx := errgroup.WithContext(ctx) + for _, incidentRow := range incidentRows { + incident := &Incident{ + db: db, + logger: logger, + runtimeConfig: runtimeConfig, + StartedAt: incidentRow.StartedAt.Time(), + incidentRowID: incidentRow.ID, + Icingaweb2Url: icingaweb2Url, + SeverityBySource: map[int64]event.Severity{}, + EscalationState: map[escalationID]*EscalationState{}, + Rules: map[ruleID]struct{}{}, + Recipients: map[recipient.Key]*RecipientState{}, + } + + obj, err := object.FromDB(ctx, db, tx, incidentRow.ObjectID) + if err != nil { + return err + } + + incident.Object = obj + + g.Go(func() error { + sourceSeverity := &SourceSeverity{IncidentID: incidentRow.ID} + var sources []SourceSeverity + err = db.SelectContext( + childCtx, &sources, + db.Rebind(db.BuildSelectStmt(sourceSeverity, sourceSeverity)+` WHERE "incident_id" = ? AND "severity" != ?`), + incidentRow.ID, event.SeverityOK, + ) + if err != nil { + return fmt.Errorf("failed to fetch incident sources Severity: %w", err) + } + + for _, source := range sources { + incident.SeverityBySource[source.SourceID] = source.Severity + } + + return childCtx.Err() + }) + + g.Go(func() error { + state := &EscalationState{} + var states []*EscalationState + err = db.SelectContext(childCtx, &states, db.Rebind(db.BuildSelectStmt(state, state)+` WHERE "incident_id" = ?`), incidentRow.ID) + if err != nil { + return fmt.Errorf("failed to fetch incident rule escalation state: %w", err) + } + + for _, state := range states { + incident.EscalationState[state.RuleEscalationID] = state + } + + return childCtx.Err() + }) + + g.Go(func() error { + return incident.ReloadRecipients(childCtx, tx) + }) + + g.Go(func() error { + incident.Lock() + incident.runtimeConfig.RLock() + defer func() { + defer incident.runtimeConfig.RUnlock() + defer incident.Unlock() + }() + + err = incident.evaluateRules(childCtx, tx, 0) + if err != nil { + return err + } + + err = incident.evaluateEscalations() + if err != nil { + return err + } + + return childCtx.Err() + }) + + incidents[obj] = incident + } + + if err = g.Wait(); err != nil { + return err + } + if err = tx.Commit(); err != nil { + return err + } + + currentIncidentsMu.Lock() + defer currentIncidentsMu.Unlock() + + currentIncidents = incidents + + return nil +} + +func GetCurrent( + ctx context.Context, db *icingadb.DB, tx *sqlx.Tx, obj *object.Object, logger *logging.Logger, runtimeConfig *config.RuntimeConfig, create bool, +) (*Incident, bool, error) { currentIncidentsMu.Lock() defer currentIncidentsMu.Unlock() @@ -33,7 +151,7 @@ func GetCurrent(ctx context.Context, db *icingadb.DB, obj *object.Object, logger incident.EscalationState = make(map[escalationID]*EscalationState) incident.Recipients = make(map[recipient.Key]*RecipientState) - err := db.QueryRowxContext(ctx, db.Rebind(db.BuildSelectStmt(ir, ir)+` WHERE "object_id" = ? AND "recovered_at" IS NULL`), obj.ID).StructScan(ir) + err := tx.QueryRowxContext(ctx, db.Rebind(db.BuildSelectStmt(ir, ir)+` WHERE "object_id" = ? AND "recovered_at" IS NULL`), obj.ID).StructScan(ir) if err != nil && err != sql.ErrNoRows { return nil, false, fmt.Errorf("incident query failed with: %w", err) } else if err == nil { @@ -44,9 +162,9 @@ func GetCurrent(ctx context.Context, db *icingadb.DB, obj *object.Object, logger g.Go(func() error { sourceSeverity := &SourceSeverity{IncidentID: ir.ID} var sources []SourceSeverity - err := db.SelectContext( + err := tx.SelectContext( childCtx, &sources, - db.Rebind(db.BuildSelectStmt(sourceSeverity, sourceSeverity)+` WHERE "incident_id" = ? AND "severity" != ?`), + tx.Rebind(db.BuildSelectStmt(sourceSeverity, sourceSeverity)+` WHERE "incident_id" = ? AND "severity" != ?`), ir.ID, event.SeverityOK, ) if err != nil { @@ -63,7 +181,7 @@ func GetCurrent(ctx context.Context, db *icingadb.DB, obj *object.Object, logger g.Go(func() error { state := &EscalationState{} var states []*EscalationState - err = db.SelectContext(childCtx, &states, db.Rebind(db.BuildSelectStmt(state, state)+` WHERE "incident_id" = ?`), ir.ID) + err = tx.SelectContext(childCtx, &states, tx.Rebind(db.BuildSelectStmt(state, state)+` WHERE "incident_id" = ?`), ir.ID) if err != nil { return fmt.Errorf("failed to fetch incident rule escalation state: %w", err) } @@ -93,22 +211,10 @@ func GetCurrent(ctx context.Context, db *icingadb.DB, obj *object.Object, logger } if !created && currentIncident != nil { - currentIncident.Lock() - defer currentIncident.Unlock() - - contact := &ContactRow{} - var contacts []*ContactRow - err := db.SelectContext(ctx, &contacts, db.Rebind(db.BuildSelectStmt(contact, contact)+` WHERE "incident_id" = ?`), currentIncident.ID()) + err := currentIncident.ReloadRecipients(ctx, tx) if err != nil { - return nil, false, fmt.Errorf("failed to fetch incident recipients: %w", err) + return nil, false, err } - - recipients := make(map[recipient.Key]*RecipientState) - for _, contact := range contacts { - recipients[contact.Key] = &RecipientState{Role: contact.Role} - } - - currentIncident.Recipients = recipients } return currentIncident, created, nil diff --git a/internal/listener/listener.go b/internal/listener/listener.go index f042b2b5e..8df6bc6be 100644 --- a/internal/listener/listener.go +++ b/internal/listener/listener.go @@ -106,7 +106,7 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, req *http.Request) { } createIncident := ev.Severity != event.SeverityNone && ev.Severity != event.SeverityOK - currentIncident, created, err := incident.GetCurrent(ctx, l.db, obj, l.logs.GetChildLogger("incident"), l.runtimeConfig, createIncident) + currentIncident, created, err := incident.GetCurrent(ctx, l.db, tx, obj, l.logs.GetChildLogger("incident"), l.runtimeConfig, createIncident) if err != nil { w.WriteHeader(http.StatusInternalServerError) _, _ = fmt.Fprintln(w, err) diff --git a/internal/object/db_types.go b/internal/object/db_types.go index c4a4d6e40..8a5e7516d 100644 --- a/internal/object/db_types.go +++ b/internal/object/db_types.go @@ -1,7 +1,11 @@ package object import ( + "context" + "fmt" + "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/types" + "github.com/jmoiron/sqlx" ) // ExtraTagRow represents a single database object extra tag like `hostgroup/foo: null`. @@ -27,3 +31,49 @@ type ObjectRow struct { func (d *ObjectRow) TableName() string { return "object" } + +func FromDB(ctx context.Context, db *icingadb.DB, tx *sqlx.Tx, id types.Binary) (*Object, error) { + objectRow := &ObjectRow{ID: id} + err := tx.QueryRowxContext(ctx, db.Rebind(db.BuildSelectStmt(objectRow, objectRow)+` WHERE "id" = ?`), objectRow.ID).StructScan(objectRow) + if err != nil { + return nil, fmt.Errorf("failed to fetch object: %w", err) + } + + tags := map[string]string{"host": objectRow.Host} + if objectRow.Service.Valid { + tags["service"] = objectRow.Service.String + } + + metadata := make(map[int64]*SourceMetadata) + var sourceMetas []*SourceMetadata + err = tx.SelectContext(ctx, &sourceMetas, db.Rebind(db.BuildSelectStmt(&SourceMetadata{}, &SourceMetadata{})+` WHERE "object_id" = ?`), id) + if err != nil { + return nil, fmt.Errorf("failed to fetch source object: %w", err) + } + + var extraTags []*ExtraTagRow + err = tx.SelectContext( + ctx, &extraTags, + db.Rebind(db.BuildSelectStmt(&ExtraTagRow{}, &ExtraTagRow{})+` WHERE "object_id" = ?`), id, + ) + if err != nil { + return nil, fmt.Errorf("failed to fetch object extra tags: %w", err) + } + + for _, sourceMeta := range sourceMetas { + sourceMeta.ExtraTags = map[string]string{} + metadata[sourceMeta.SourceId] = sourceMeta + } + + for _, extraTag := range extraTags { + sourceMeta, ok := metadata[extraTag.SourceId] + if ok { + sourceMeta.ExtraTags[extraTag.Tag] = extraTag.Value + } + } + + obj := &Object{db: db, ID: id, Tags: tags, Metadata: metadata} + obj.UpdateCache() + + return obj, nil +}