Skip to content

Commit

Permalink
Load all open incidents from DB on daemon startup
Browse files Browse the repository at this point in the history
  • Loading branch information
yhabteab committed May 26, 2023
1 parent 24b6dd0 commit 0030bfd
Show file tree
Hide file tree
Showing 4 changed files with 204 additions and 43 deletions.
6 changes: 6 additions & 0 deletions cmd/icinga-notifications-daemon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"flag"
"fmt"
"github.com/icinga/icinga-notifications/internal/config"
"github.com/icinga/icinga-notifications/internal/incident"
"github.com/icinga/icinga-notifications/internal/listener"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/utils"
Expand Down Expand Up @@ -62,6 +63,11 @@ func main() {

go runtimeConfig.PeriodicUpdates(context.TODO(), 1*time.Second)

err = incident.LoadOpenIncidents(context.TODO(), db, logs.GetChildLogger("incident"), runtimeConfig, conf)
if err != nil {
logger.Fatalw("Can't load incidents from database", zap.Error(err))
}

if err := listener.NewListener(db, conf, runtimeConfig, logs).Run(); err != nil {
panic(err)
}
Expand Down
65 changes: 65 additions & 0 deletions internal/incident/incident.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,71 @@ func (i *Incident) processAcknowledgementEvent(ctx context.Context, tx *sqlx.Tx,
return nil
}

// ReloadRecipients reloads the current incident recipients from the database.
// Returns error on database failure.
func (i *Incident) ReloadRecipients(ctx context.Context) error {
contact := &ContactRow{}
var contacts []*ContactRow
err := i.db.SelectContext(ctx, &contacts, i.db.Rebind(i.db.BuildSelectStmt(contact, contact)+` WHERE "incident_id" = ?`), i.ID())
if err != nil {
i.logger.Errorw(
"failed to reload incident recipients", zap.String("object", i.ObjectDisplayName()),
zap.String("incident", i.String()), zap.Error(err),
)

return errors.New("failed to reload incident recipients")
}

recipients := make(map[recipient.Key]*RecipientState)
for _, contact := range contacts {
recipients[contact.Key] = &RecipientState{Role: contact.Role}
}

i.Recipients = recipients

return nil
}

// LoadSourceSeverities loads all non-OK source severities from database.
// Returns error on database failure.
func (i *Incident) LoadSourceSeverities(ctx context.Context) error {
sourceSeverity := &SourceSeverity{IncidentID: i.ID()}
var sources []SourceSeverity
err := i.db.SelectContext(
ctx, &sources,
i.db.Rebind(i.db.BuildSelectStmt(sourceSeverity, sourceSeverity)+` WHERE "incident_id" = ? AND "severity" != ?`),
i.ID(), event.SeverityOK,
)
if err != nil {
i.logger.Errorw("Failed to load incident source severities from database", zap.Error(err))

return errors.New("failed to load incident source severities")
}

for _, source := range sources {
i.SeverityBySource[source.SourceID] = source.Severity
}

return nil
}

func (i *Incident) LoadEscalationsState(ctx context.Context) error {
state := &EscalationState{}
var states []*EscalationState
err := i.db.SelectContext(ctx, &states, i.db.Rebind(i.db.BuildSelectStmt(state, state)+` WHERE "incident_id" = ?`), i.ID())
if err != nil {
i.logger.Errorw("Failed to load incident rule escalation states", zap.Error(err))

return errors.New("failed to load incident rule escalation states")
}

for _, state := range states {
i.EscalationState[state.RuleEscalationID] = state
}

return nil
}

type EscalationState struct {
IncidentID int64 `db:"incident_id"`
RuleEscalationID int64 `db:"rule_escalation_id"`
Expand Down
127 changes: 84 additions & 43 deletions internal/incident/incidents.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/icinga/icinga-notifications/internal/recipient"
"github.com/icinga/icingadb/pkg/icingadb"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/types"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"sync"
Expand All @@ -20,6 +21,86 @@ var (
currentIncidentsMu sync.Mutex
)

func LoadOpenIncidents(
ctx context.Context, db *icingadb.DB, logger *logging.Logger, runtimeConfig *config.RuntimeConfig, configFile *config.ConfigFile,
) error {
var incidentRows []*IncidentRow
err := db.SelectContext(ctx, &incidentRows, db.BuildSelectStmt(&IncidentRow{}, &IncidentRow{})+` WHERE "recovered_at" IS NULL`)
if err != nil {
logger.Errorw("failed to load active incidents from database", zap.Error(err))

return errors.New("failed to fetch open incidents")
}

incidents := make(map[*object.Object]*Incident)
g, childCtx := errgroup.WithContext(ctx)
for _, incidentRow := range incidentRows {
incident := &Incident{
db: db,
runtimeConfig: runtimeConfig,
StartedAt: incidentRow.StartedAt.Time(),
incidentRowID: incidentRow.ID,
configFile: configFile,
SeverityBySource: map[int64]event.Severity{},
EscalationState: map[escalationID]*EscalationState{},
Rules: map[ruleID]struct{}{},
Recipients: map[recipient.Key]*RecipientState{},
}

obj, err := object.LoadFromDB(ctx, db, incidentRow.ObjectID)
if err != nil {
return err
}

incident.Object = obj
incident.logger = logger.With(zap.String("object", obj.DisplayName()), zap.String("incident", incident.String()))

g.Go(func() error {
return incident.LoadSourceSeverities(childCtx)
})
g.Go(func() error {
return incident.LoadEscalationsState(childCtx)
})
g.Go(func() error {
err := incident.ReloadRecipients(childCtx)
if err != nil {
return err
}

tx, err := db.BeginTxx(ctx, nil)
if err != nil {
return err
}
defer func() { _ = tx.Rollback() }()

_, err = incident.evaluateRules(childCtx, tx, 0, types.Int{})
if err != nil {
return err
}

incident.evaluateEscalations()
if err = tx.Commit(); err != nil {
return err
}

return childCtx.Err()
})

incidents[obj] = incident
}

if err = g.Wait(); err != nil {
return err
}

currentIncidentsMu.Lock()
defer currentIncidentsMu.Unlock()

currentIncidents = incidents

return nil
}

func GetCurrent(
ctx context.Context, db *icingadb.DB, obj *object.Object, logger *logging.Logger, runtimeConfig *config.RuntimeConfig,
configFile *config.ConfigFile, create bool,
Expand Down Expand Up @@ -55,41 +136,10 @@ func GetCurrent(

g, childCtx := errgroup.WithContext(ctx)
g.Go(func() error {
sourceSeverity := &SourceSeverity{IncidentID: ir.ID}
var sources []SourceSeverity
err := db.SelectContext(
childCtx, &sources,
db.Rebind(db.BuildSelectStmt(sourceSeverity, sourceSeverity)+` WHERE "incident_id" = ? AND "severity" != ?`),
ir.ID, event.SeverityOK,
)
if err != nil {
incident.logger.Errorw("Failed to load incident source severities from database", zap.Error(err))

return errors.New("failed to load incident source severities")
}

for _, source := range sources {
incident.SeverityBySource[source.SourceID] = source.Severity
}

return childCtx.Err()
return incident.LoadSourceSeverities(childCtx)
})

g.Go(func() error {
state := &EscalationState{}
var states []*EscalationState
err = db.SelectContext(childCtx, &states, db.Rebind(db.BuildSelectStmt(state, state)+` WHERE "incident_id" = ?`), ir.ID)
if err != nil {
incident.logger.Errorw("Failed to load incident rule escalation states", zap.Error(err))

return errors.New("failed to load incident rule escalation states")
}

for _, state := range states {
incident.EscalationState[state.RuleEscalationID] = state
}

return childCtx.Err()
return incident.LoadEscalationsState(childCtx)
})

if err := g.Wait(); err != nil {
Expand All @@ -113,21 +163,12 @@ func GetCurrent(
currentIncident.Lock()
defer currentIncident.Unlock()

contact := &ContactRow{}
var contacts []*ContactRow
err := db.SelectContext(ctx, &contacts, db.Rebind(db.BuildSelectStmt(contact, contact)+` WHERE "incident_id" = ?`), currentIncident.ID())
err := currentIncident.ReloadRecipients(ctx)
if err != nil {
currentIncident.logger.Errorw("Failed to reload incident recipients", zap.Error(err))

return nil, false, errors.New("failed to load incident recipients")
}

recipients := make(map[recipient.Key]*RecipientState)
for _, contact := range contacts {
recipients[contact.Key] = &RecipientState{Role: contact.Role}
}

currentIncident.Recipients = recipients
}

return currentIncident, created, nil
Expand Down
49 changes: 49 additions & 0 deletions internal/object/db_types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package object

import (
"context"
"fmt"
"github.com/icinga/icingadb/pkg/icingadb"
"github.com/icinga/icingadb/pkg/types"
)

Expand All @@ -27,3 +30,49 @@ type ObjectRow struct {
func (d *ObjectRow) TableName() string {
return "object"
}

func LoadFromDB(ctx context.Context, db *icingadb.DB, id types.Binary) (*Object, error) {
objectRow := &ObjectRow{ID: id}
err := db.QueryRowxContext(ctx, db.Rebind(db.BuildSelectStmt(objectRow, objectRow)+` WHERE "id" = ?`), objectRow.ID).StructScan(objectRow)
if err != nil {
return nil, fmt.Errorf("failed to fetch object: %w", err)
}

tags := map[string]string{"host": objectRow.Host}
if objectRow.Service.Valid {
tags["service"] = objectRow.Service.String
}

metadata := make(map[int64]*SourceMetadata)
var sourceMetas []*SourceMetadata
err = db.SelectContext(ctx, &sourceMetas, db.Rebind(db.BuildSelectStmt(&SourceMetadata{}, &SourceMetadata{})+` WHERE "object_id" = ?`), id)
if err != nil {
return nil, fmt.Errorf("failed to fetch source object: %w", err)
}

var extraTags []*ExtraTagRow
err = db.SelectContext(
ctx, &extraTags,
db.Rebind(db.BuildSelectStmt(&ExtraTagRow{}, &ExtraTagRow{})+` WHERE "object_id" = ?`), id,
)
if err != nil {
return nil, fmt.Errorf("failed to fetch object extra tags: %w", err)
}

for _, sourceMeta := range sourceMetas {
sourceMeta.ExtraTags = map[string]string{}
metadata[sourceMeta.SourceId] = sourceMeta
}

for _, extraTag := range extraTags {
sourceMeta, ok := metadata[extraTag.SourceId]
if ok {
sourceMeta.ExtraTags[extraTag.Tag] = extraTag.Value
}
}

obj := &Object{db: db, ID: id, Tags: tags, Metadata: metadata}
obj.UpdateCache()

return obj, nil
}

0 comments on commit 0030bfd

Please sign in to comment.