Skip to content

Commit

Permalink
Load all open incidents from DB on daemon startup
Browse files Browse the repository at this point in the history
  • Loading branch information
yhabteab committed May 22, 2023
1 parent 2d04bb8 commit 59919d6
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 20 deletions.
6 changes: 6 additions & 0 deletions cmd/icinga-notifications-daemon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"flag"
"fmt"
"github.com/icinga/icinga-notifications/internal/config"
"github.com/icinga/icinga-notifications/internal/incident"
"github.com/icinga/icinga-notifications/internal/listener"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/utils"
Expand Down Expand Up @@ -62,6 +63,11 @@ func main() {

go runtimeConfig.PeriodicUpdates(context.TODO(), 1*time.Second)

err = incident.LoadOpenIncidents(context.TODO(), db, logs.GetChildLogger("incident"), runtimeConfig, conf.Icingaweb2URL)
if err != nil {
logger.Fatalw("Can't load incidents from database", zap.Error(err))
}

if err := listener.NewListener(db, conf, runtimeConfig, logs).Run(); err != nil {
panic(err)
}
Expand Down
23 changes: 23 additions & 0 deletions internal/incident/incident.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,29 @@ func (i *Incident) processAcknowledgmentEvent(ctx context.Context, tx *sqlx.Tx,
return nil
}

// ReloadRecipients reloads the current incident recipients from the database.
// Returns error on database failure.
func (i *Incident) ReloadRecipients(ctx context.Context, tx *sqlx.Tx) error {
i.Lock()
defer i.Unlock()

contact := &ContactRow{}
var contacts []*ContactRow
err := tx.SelectContext(ctx, &contacts, tx.Rebind(i.db.BuildSelectStmt(contact, contact)+` WHERE "incident_id" = ?`), i.ID())
if err != nil {
return fmt.Errorf("failed to fetch incident recipients: %w", err)
}

recipients := make(map[recipient.Key]*RecipientState)
for _, contact := range contacts {
recipients[contact.Key] = &RecipientState{Role: contact.Role}
}

i.Recipients = recipients

return nil
}

type EscalationState struct {
IncidentID int64 `db:"incident_id"`
RuleEscalationID int64 `db:"rule_escalation_id"`
Expand Down
144 changes: 125 additions & 19 deletions internal/incident/incidents.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/icinga/icinga-notifications/internal/recipient"
"github.com/icinga/icingadb/pkg/icingadb"
"github.com/icinga/icingadb/pkg/logging"
"github.com/jmoiron/sqlx"
"golang.org/x/sync/errgroup"
"sync"
)
Expand All @@ -19,7 +20,124 @@ var (
currentIncidentsMu sync.Mutex
)

func GetCurrent(ctx context.Context, db *icingadb.DB, obj *object.Object, logger *logging.Logger, runtimeConfig *config.RuntimeConfig, create bool) (*Incident, bool, error) {
func LoadOpenIncidents(
ctx context.Context, db *icingadb.DB, logger *logging.Logger, runtimeConfig *config.RuntimeConfig, icingaweb2Url string,
) error {
tx, err := db.BeginTxx(ctx, nil)
if err != nil {
return err
}
defer func() { _ = tx.Rollback() }()

var incidentRows []*IncidentRow
err = tx.SelectContext(ctx, &incidentRows, db.BuildSelectStmt(&IncidentRow{}, &IncidentRow{})+` WHERE "recovered_at" IS NULL`)
if err != nil {
return fmt.Errorf("failed to fetch open incidents: %w", err)
}

incidents := make(map[*object.Object]*Incident)
g, childCtx := errgroup.WithContext(ctx)
for _, incidentRow := range incidentRows {
incident := &Incident{
db: db,
logger: logger,
runtimeConfig: runtimeConfig,
StartedAt: incidentRow.StartedAt.Time(),
incidentRowID: incidentRow.ID,
Icingaweb2Url: icingaweb2Url,
SeverityBySource: map[int64]event.Severity{},
EscalationState: map[escalationID]*EscalationState{},
Rules: map[ruleID]struct{}{},
Recipients: map[recipient.Key]*RecipientState{},
}

obj, err := object.FromDB(ctx, db, tx, incidentRow.ObjectID)
if err != nil {
return err
}

incident.Object = obj

g.Go(func() error {
sourceSeverity := &SourceSeverity{IncidentID: incidentRow.ID}
var sources []SourceSeverity
err = db.SelectContext(
childCtx, &sources,
db.Rebind(db.BuildSelectStmt(sourceSeverity, sourceSeverity)+` WHERE "incident_id" = ? AND "severity" != ?`),
incidentRow.ID, event.SeverityOK,
)
if err != nil {
return fmt.Errorf("failed to fetch incident sources Severity: %w", err)
}

for _, source := range sources {
incident.SeverityBySource[source.SourceID] = source.Severity
}

return childCtx.Err()
})

g.Go(func() error {
state := &EscalationState{}
var states []*EscalationState
err = db.SelectContext(childCtx, &states, db.Rebind(db.BuildSelectStmt(state, state)+` WHERE "incident_id" = ?`), incidentRow.ID)
if err != nil {
return fmt.Errorf("failed to fetch incident rule escalation state: %w", err)
}

for _, state := range states {
incident.EscalationState[state.RuleEscalationID] = state
}

return childCtx.Err()
})

g.Go(func() error {
return incident.ReloadRecipients(childCtx, tx)
})

g.Go(func() error {
incident.Lock()
incident.runtimeConfig.RLock()
defer func() {
defer incident.runtimeConfig.RUnlock()
defer incident.Unlock()
}()

err = incident.evaluateRules(childCtx, tx, 0)
if err != nil {
return err
}

err = incident.evaluateEscalations()
if err != nil {
return err
}

return childCtx.Err()
})

incidents[obj] = incident
}

if err = g.Wait(); err != nil {
return err
}
if err = tx.Commit(); err != nil {
return err
}

currentIncidentsMu.Lock()
defer currentIncidentsMu.Unlock()

currentIncidents = incidents

return nil
}

func GetCurrent(
ctx context.Context, db *icingadb.DB, tx *sqlx.Tx, obj *object.Object, logger *logging.Logger, runtimeConfig *config.RuntimeConfig, create bool,
) (*Incident, bool, error) {
currentIncidentsMu.Lock()
defer currentIncidentsMu.Unlock()

Expand All @@ -33,7 +151,7 @@ func GetCurrent(ctx context.Context, db *icingadb.DB, obj *object.Object, logger
incident.EscalationState = make(map[escalationID]*EscalationState)
incident.Recipients = make(map[recipient.Key]*RecipientState)

err := db.QueryRowxContext(ctx, db.Rebind(db.BuildSelectStmt(ir, ir)+` WHERE "object_id" = ? AND "recovered_at" IS NULL`), obj.ID).StructScan(ir)
err := tx.QueryRowxContext(ctx, db.Rebind(db.BuildSelectStmt(ir, ir)+` WHERE "object_id" = ? AND "recovered_at" IS NULL`), obj.ID).StructScan(ir)
if err != nil && err != sql.ErrNoRows {
return nil, false, fmt.Errorf("incident query failed with: %w", err)
} else if err == nil {
Expand All @@ -44,9 +162,9 @@ func GetCurrent(ctx context.Context, db *icingadb.DB, obj *object.Object, logger
g.Go(func() error {
sourceSeverity := &SourceSeverity{IncidentID: ir.ID}
var sources []SourceSeverity
err := db.SelectContext(
err := tx.SelectContext(
childCtx, &sources,
db.Rebind(db.BuildSelectStmt(sourceSeverity, sourceSeverity)+` WHERE "incident_id" = ? AND "severity" != ?`),
tx.Rebind(db.BuildSelectStmt(sourceSeverity, sourceSeverity)+` WHERE "incident_id" = ? AND "severity" != ?`),
ir.ID, event.SeverityOK,
)
if err != nil {
Expand All @@ -63,7 +181,7 @@ func GetCurrent(ctx context.Context, db *icingadb.DB, obj *object.Object, logger
g.Go(func() error {
state := &EscalationState{}
var states []*EscalationState
err = db.SelectContext(childCtx, &states, db.Rebind(db.BuildSelectStmt(state, state)+` WHERE "incident_id" = ?`), ir.ID)
err = tx.SelectContext(childCtx, &states, tx.Rebind(db.BuildSelectStmt(state, state)+` WHERE "incident_id" = ?`), ir.ID)
if err != nil {
return fmt.Errorf("failed to fetch incident rule escalation state: %w", err)
}
Expand Down Expand Up @@ -93,22 +211,10 @@ func GetCurrent(ctx context.Context, db *icingadb.DB, obj *object.Object, logger
}

if !created && currentIncident != nil {
currentIncident.Lock()
defer currentIncident.Unlock()

contact := &ContactRow{}
var contacts []*ContactRow
err := db.SelectContext(ctx, &contacts, db.Rebind(db.BuildSelectStmt(contact, contact)+` WHERE "incident_id" = ?`), currentIncident.ID())
err := currentIncident.ReloadRecipients(ctx, tx)
if err != nil {
return nil, false, fmt.Errorf("failed to fetch incident recipients: %w", err)
return nil, false, err
}

recipients := make(map[recipient.Key]*RecipientState)
for _, contact := range contacts {
recipients[contact.Key] = &RecipientState{Role: contact.Role}
}

currentIncident.Recipients = recipients
}

return currentIncident, created, nil
Expand Down
2 changes: 1 addition & 1 deletion internal/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, req *http.Request) {
}

createIncident := ev.Severity != event.SeverityNone && ev.Severity != event.SeverityOK
currentIncident, created, err := incident.GetCurrent(ctx, l.db, obj, l.logs.GetChildLogger("incident"), l.runtimeConfig, createIncident)
currentIncident, created, err := incident.GetCurrent(ctx, l.db, tx, obj, l.logs.GetChildLogger("incident"), l.runtimeConfig, createIncident)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = fmt.Fprintln(w, err)
Expand Down
50 changes: 50 additions & 0 deletions internal/object/db_types.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package object

import (
"context"
"fmt"
"github.com/icinga/icingadb/pkg/icingadb"
"github.com/icinga/icingadb/pkg/types"
"github.com/jmoiron/sqlx"
)

// ExtraTagRow represents a single database object extra tag like `hostgroup/foo: null`.
Expand All @@ -27,3 +31,49 @@ type ObjectRow struct {
func (d *ObjectRow) TableName() string {
return "object"
}

func FromDB(ctx context.Context, db *icingadb.DB, tx *sqlx.Tx, id types.Binary) (*Object, error) {
objectRow := &ObjectRow{ID: id}
err := tx.QueryRowxContext(ctx, db.Rebind(db.BuildSelectStmt(objectRow, objectRow)+` WHERE "id" = ?`), objectRow.ID).StructScan(objectRow)
if err != nil {
return nil, fmt.Errorf("failed to fetch object: %w", err)
}

tags := map[string]string{"host": objectRow.Host}
if objectRow.Service.Valid {
tags["service"] = objectRow.Service.String
}

metadata := make(map[int64]*SourceMetadata)
var sourceMetas []*SourceMetadata
err = tx.SelectContext(ctx, &sourceMetas, db.Rebind(db.BuildSelectStmt(&SourceMetadata{}, &SourceMetadata{})+` WHERE "object_id" = ?`), id)
if err != nil {
return nil, fmt.Errorf("failed to fetch source object: %w", err)
}

var extraTags []*ExtraTagRow
err = tx.SelectContext(
ctx, &extraTags,
db.Rebind(db.BuildSelectStmt(&ExtraTagRow{}, &ExtraTagRow{})+` WHERE "object_id" = ?`), id,
)
if err != nil {
return nil, fmt.Errorf("failed to fetch object extra tags: %w", err)
}

for _, sourceMeta := range sourceMetas {
sourceMeta.ExtraTags = map[string]string{}
metadata[sourceMeta.SourceId] = sourceMeta
}

for _, extraTag := range extraTags {
sourceMeta, ok := metadata[extraTag.SourceId]
if ok {
sourceMeta.ExtraTags[extraTag.Tag] = extraTag.Value
}
}

obj := &Object{db: db, ID: id, Tags: tags, Metadata: metadata}
obj.UpdateCache()

return obj, nil
}

0 comments on commit 59919d6

Please sign in to comment.