diff --git a/cmd/icinga-notifications-daemon/main.go b/cmd/icinga-notifications-daemon/main.go index 993ece2d0..60e983296 100644 --- a/cmd/icinga-notifications-daemon/main.go +++ b/cmd/icinga-notifications-daemon/main.go @@ -5,6 +5,7 @@ import ( "flag" "fmt" "github.com/icinga/icinga-notifications/internal/config" + "github.com/icinga/icinga-notifications/internal/incident" "github.com/icinga/icinga-notifications/internal/listener" "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/utils" @@ -62,6 +63,11 @@ func main() { go runtimeConfig.PeriodicUpdates(context.TODO(), 1*time.Second) + err = incident.LoadOpenIncidents(context.TODO(), db, logs.GetChildLogger("incident"), runtimeConfig, conf) + if err != nil { + logger.Fatalw("Can't load incidents from database", zap.Error(err)) + } + if err := listener.NewListener(db, conf, runtimeConfig, logs).Run(); err != nil { panic(err) } diff --git a/internal/incident/incident.go b/internal/incident/incident.go index 60dd43760..71248d000 100644 --- a/internal/incident/incident.go +++ b/internal/incident/incident.go @@ -595,6 +595,71 @@ func (i *Incident) processAcknowledgementEvent(ctx context.Context, tx *sqlx.Tx, return nil } +// ReloadRecipients reloads the current incident recipients from the database. +// Returns error on database failure. +func (i *Incident) ReloadRecipients(ctx context.Context) error { + contact := &ContactRow{} + var contacts []*ContactRow + err := i.db.SelectContext(ctx, &contacts, i.db.Rebind(i.db.BuildSelectStmt(contact, contact)+` WHERE "incident_id" = ?`), i.ID()) + if err != nil { + i.logger.Errorw( + "failed to reload incident recipients", zap.String("object", i.ObjectDisplayName()), + zap.String("incident", i.String()), zap.Error(err), + ) + + return errors.New("failed to reload incident recipients") + } + + recipients := make(map[recipient.Key]*RecipientState) + for _, contact := range contacts { + recipients[contact.Key] = &RecipientState{Role: contact.Role} + } + + i.Recipients = recipients + + return nil +} + +// LoadSourceSeverities loads all non-OK source severities from database. +// Returns error on database failure. +func (i *Incident) LoadSourceSeverities(ctx context.Context) error { + sourceSeverity := &SourceSeverity{IncidentID: i.ID()} + var sources []SourceSeverity + err := i.db.SelectContext( + ctx, &sources, + i.db.Rebind(i.db.BuildSelectStmt(sourceSeverity, sourceSeverity)+` WHERE "incident_id" = ? AND "severity" != ?`), + i.ID(), event.SeverityOK, + ) + if err != nil { + i.logger.Errorw("Failed to load incident source severities from database", zap.Error(err)) + + return errors.New("failed to load incident source severities") + } + + for _, source := range sources { + i.SeverityBySource[source.SourceID] = source.Severity + } + + return nil +} + +func (i *Incident) LoadEscalationsState(ctx context.Context) error { + state := &EscalationState{} + var states []*EscalationState + err := i.db.SelectContext(ctx, &states, i.db.Rebind(i.db.BuildSelectStmt(state, state)+` WHERE "incident_id" = ?`), i.ID()) + if err != nil { + i.logger.Errorw("Failed to load incident rule escalation states", zap.Error(err)) + + return errors.New("failed to load incident rule escalation states") + } + + for _, state := range states { + i.EscalationState[state.RuleEscalationID] = state + } + + return nil +} + type EscalationState struct { IncidentID int64 `db:"incident_id"` RuleEscalationID int64 `db:"rule_escalation_id"` diff --git a/internal/incident/incidents.go b/internal/incident/incidents.go index f2c32f172..75dcb6fed 100644 --- a/internal/incident/incidents.go +++ b/internal/incident/incidents.go @@ -10,6 +10,7 @@ import ( "github.com/icinga/icinga-notifications/internal/recipient" "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/types" "go.uber.org/zap" "golang.org/x/sync/errgroup" "sync" @@ -20,6 +21,86 @@ var ( currentIncidentsMu sync.Mutex ) +func LoadOpenIncidents( + ctx context.Context, db *icingadb.DB, logger *logging.Logger, runtimeConfig *config.RuntimeConfig, configFile *config.ConfigFile, +) error { + var incidentRows []*IncidentRow + err := db.SelectContext(ctx, &incidentRows, db.BuildSelectStmt(&IncidentRow{}, &IncidentRow{})+` WHERE "recovered_at" IS NULL`) + if err != nil { + logger.Errorw("failed to load active incidents from database", zap.Error(err)) + + return errors.New("failed to fetch open incidents") + } + + incidents := make(map[*object.Object]*Incident) + g, childCtx := errgroup.WithContext(ctx) + for _, incidentRow := range incidentRows { + incident := &Incident{ + db: db, + runtimeConfig: runtimeConfig, + StartedAt: incidentRow.StartedAt.Time(), + incidentRowID: incidentRow.ID, + configFile: configFile, + SeverityBySource: map[int64]event.Severity{}, + EscalationState: map[escalationID]*EscalationState{}, + Rules: map[ruleID]struct{}{}, + Recipients: map[recipient.Key]*RecipientState{}, + } + + obj, err := object.LoadFromDB(ctx, db, incidentRow.ObjectID) + if err != nil { + return err + } + + incident.Object = obj + incident.logger = logger.With(zap.String("object", obj.DisplayName()), zap.String("incident", incident.String())) + + g.Go(func() error { + return incident.LoadSourceSeverities(childCtx) + }) + g.Go(func() error { + return incident.LoadEscalationsState(childCtx) + }) + g.Go(func() error { + err := incident.ReloadRecipients(childCtx) + if err != nil { + return err + } + + tx, err := db.BeginTxx(ctx, nil) + if err != nil { + return err + } + defer func() { _ = tx.Rollback() }() + + _, err = incident.evaluateRules(childCtx, tx, 0, types.Int{}) + if err != nil { + return err + } + + incident.evaluateEscalations() + if err = tx.Commit(); err != nil { + return err + } + + return childCtx.Err() + }) + + incidents[obj] = incident + } + + if err = g.Wait(); err != nil { + return err + } + + currentIncidentsMu.Lock() + defer currentIncidentsMu.Unlock() + + currentIncidents = incidents + + return nil +} + func GetCurrent( ctx context.Context, db *icingadb.DB, obj *object.Object, logger *logging.Logger, runtimeConfig *config.RuntimeConfig, configFile *config.ConfigFile, create bool, @@ -55,41 +136,10 @@ func GetCurrent( g, childCtx := errgroup.WithContext(ctx) g.Go(func() error { - sourceSeverity := &SourceSeverity{IncidentID: ir.ID} - var sources []SourceSeverity - err := db.SelectContext( - childCtx, &sources, - db.Rebind(db.BuildSelectStmt(sourceSeverity, sourceSeverity)+` WHERE "incident_id" = ? AND "severity" != ?`), - ir.ID, event.SeverityOK, - ) - if err != nil { - incident.logger.Errorw("Failed to load incident source severities from database", zap.Error(err)) - - return errors.New("failed to load incident source severities") - } - - for _, source := range sources { - incident.SeverityBySource[source.SourceID] = source.Severity - } - - return childCtx.Err() + return incident.LoadSourceSeverities(childCtx) }) - g.Go(func() error { - state := &EscalationState{} - var states []*EscalationState - err = db.SelectContext(childCtx, &states, db.Rebind(db.BuildSelectStmt(state, state)+` WHERE "incident_id" = ?`), ir.ID) - if err != nil { - incident.logger.Errorw("Failed to load incident rule escalation states", zap.Error(err)) - - return errors.New("failed to load incident rule escalation states") - } - - for _, state := range states { - incident.EscalationState[state.RuleEscalationID] = state - } - - return childCtx.Err() + return incident.LoadEscalationsState(childCtx) }) if err := g.Wait(); err != nil { @@ -113,21 +163,12 @@ func GetCurrent( currentIncident.Lock() defer currentIncident.Unlock() - contact := &ContactRow{} - var contacts []*ContactRow - err := db.SelectContext(ctx, &contacts, db.Rebind(db.BuildSelectStmt(contact, contact)+` WHERE "incident_id" = ?`), currentIncident.ID()) + err := currentIncident.ReloadRecipients(ctx) if err != nil { currentIncident.logger.Errorw("Failed to reload incident recipients", zap.Error(err)) return nil, false, errors.New("failed to load incident recipients") } - - recipients := make(map[recipient.Key]*RecipientState) - for _, contact := range contacts { - recipients[contact.Key] = &RecipientState{Role: contact.Role} - } - - currentIncident.Recipients = recipients } return currentIncident, created, nil diff --git a/internal/object/db_types.go b/internal/object/db_types.go index c4a4d6e40..44ecca2e6 100644 --- a/internal/object/db_types.go +++ b/internal/object/db_types.go @@ -1,6 +1,9 @@ package object import ( + "context" + "fmt" + "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/types" ) @@ -27,3 +30,49 @@ type ObjectRow struct { func (d *ObjectRow) TableName() string { return "object" } + +func LoadFromDB(ctx context.Context, db *icingadb.DB, id types.Binary) (*Object, error) { + objectRow := &ObjectRow{ID: id} + err := db.QueryRowxContext(ctx, db.Rebind(db.BuildSelectStmt(objectRow, objectRow)+` WHERE "id" = ?`), objectRow.ID).StructScan(objectRow) + if err != nil { + return nil, fmt.Errorf("failed to fetch object: %w", err) + } + + tags := map[string]string{"host": objectRow.Host} + if objectRow.Service.Valid { + tags["service"] = objectRow.Service.String + } + + metadata := make(map[int64]*SourceMetadata) + var sourceMetas []*SourceMetadata + err = db.SelectContext(ctx, &sourceMetas, db.Rebind(db.BuildSelectStmt(&SourceMetadata{}, &SourceMetadata{})+` WHERE "object_id" = ?`), id) + if err != nil { + return nil, fmt.Errorf("failed to fetch source object: %w", err) + } + + var extraTags []*ExtraTagRow + err = db.SelectContext( + ctx, &extraTags, + db.Rebind(db.BuildSelectStmt(&ExtraTagRow{}, &ExtraTagRow{})+` WHERE "object_id" = ?`), id, + ) + if err != nil { + return nil, fmt.Errorf("failed to fetch object extra tags: %w", err) + } + + for _, sourceMeta := range sourceMetas { + sourceMeta.ExtraTags = map[string]string{} + metadata[sourceMeta.SourceId] = sourceMeta + } + + for _, extraTag := range extraTags { + sourceMeta, ok := metadata[extraTag.SourceId] + if ok { + sourceMeta.ExtraTags[extraTag.Tag] = extraTag.Value + } + } + + obj := &Object{db: db, ID: id, Tags: tags, Metadata: metadata} + obj.UpdateCache() + + return obj, nil +}