Skip to content

Commit

Permalink
Load all open incidents from DB on daemon startup
Browse files Browse the repository at this point in the history
  • Loading branch information
yhabteab committed May 15, 2023
1 parent 93facc0 commit a40c7cd
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 58 deletions.
6 changes: 6 additions & 0 deletions cmd/noma-daemon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/utils"
"github.com/icinga/noma/internal/config"
"github.com/icinga/noma/internal/incident"
"github.com/icinga/noma/internal/listener"
"go.uber.org/zap"
"os"
Expand Down Expand Up @@ -62,6 +63,11 @@ func main() {

go runtimeConfig.PeriodicUpdates(context.TODO(), 1*time.Second)

err = incident.LoadOpenIncidents(db, logs.GetChildLogger("incident"), runtimeConfig, conf.Icingaweb2URL)
if err != nil {
logger.Fatalw("Can't load incidents from database", zap.Error(err))
}

if err := listener.NewListener(db, conf, runtimeConfig, logs).Run(); err != nil {
panic(err)
}
Expand Down
23 changes: 23 additions & 0 deletions internal/incident/incident.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,29 @@ func (i *Incident) processAcknowledgmentEvent(ev event.Event) error {
return nil
}

// ReloadRecipients reloads the current incident recipients from the database.
// Returns error on database failure.
func (i *Incident) ReloadRecipients() error {
i.Lock()
defer i.Unlock()

contact := &ContactRow{}
var contacts []*ContactRow
err := i.db.Select(&contacts, i.db.Rebind(i.db.BuildSelectStmt(contact, contact)+` WHERE "incident_id" = ?`), i.ID())
if err != nil {
return fmt.Errorf("failed to fetch incident recipients: %w", err)
}

recipients := make(map[recipient.Key]*RecipientState)
for _, contact := range contacts {
recipients[contact.Key] = &RecipientState{Role: contact.Role}
}

i.Recipients = recipients

return nil
}

type EscalationState struct {
IncidentID int64 `db:"incident_id"`
RuleEscalationID int64 `db:"rule_escalation_id"`
Expand Down
154 changes: 96 additions & 58 deletions internal/incident/incidents.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package incident

import (
"database/sql"
"fmt"
"github.com/icinga/icingadb/pkg/icingadb"
"github.com/icinga/icingadb/pkg/logging"
Expand All @@ -18,83 +17,122 @@ var (
currentIncidentsMu sync.Mutex
)

func GetCurrent(db *icingadb.DB, obj *object.Object, logger *logging.Logger, runtimeConfig *config.RuntimeConfig, create bool) (*Incident, bool, error) {
func LoadOpenIncidents(db *icingadb.DB, logger *logging.Logger, runtimeConfig *config.RuntimeConfig, icingaweb2Url string) error {
currentIncidentsMu.Lock()
defer currentIncidentsMu.Unlock()

created := false
currentIncident := currentIncidents[obj]
var incidentRows []*IncidentRow
err := db.Select(&incidentRows, db.BuildSelectStmt(&IncidentRow{}, &IncidentRow{})+` WHERE "recovered_at" IS NULL`)
if err != nil {
return fmt.Errorf("failed to fetch open incidents: %w", err)
}

if currentIncident == nil {
ir := &IncidentRow{}
incident := &Incident{Object: obj, db: db, logger: logger, runtimeConfig: runtimeConfig}
incident.SeverityBySource = make(map[int64]event.Severity)
incident.EscalationState = make(map[escalationID]*EscalationState)
incident.Recipients = make(map[recipient.Key]*RecipientState)

err := db.QueryRowx(db.Rebind(db.BuildSelectStmt(ir, ir)+` WHERE "object_id" = ? AND "recovered_at" IS NULL`), obj.ID).StructScan(ir)
if err != nil && err != sql.ErrNoRows {
return nil, false, fmt.Errorf("incident query failed with: %w", err)
} else if err == nil {
incident.incidentRowID = ir.ID
incident.StartedAt = ir.StartedAt.Time()

sourceSeverity := &SourceSeverity{IncidentID: ir.ID}
var sources []SourceSeverity
err := db.Select(
&sources,
db.Rebind(db.BuildSelectStmt(sourceSeverity, sourceSeverity)+` WHERE "incident_id" = ? AND "severity" != ?`),
ir.ID, event.SeverityOK,
)
if err != nil {
return nil, false, fmt.Errorf("failed to fetch incident sources Severity: %w", err)
}
incidents := make(map[*object.Object]*Incident)
for _, incidentRow := range incidentRows {
incident := &Incident{
db: db,
logger: logger,
runtimeConfig: runtimeConfig,
StartedAt: incidentRow.StartedAt.Time(),
incidentRowID: incidentRow.ID,
Icingaweb2Url: icingaweb2Url,
SeverityBySource: map[int64]event.Severity{},
EscalationState: map[escalationID]*EscalationState{},
Rules: map[ruleID]struct{}{},
Recipients: map[recipient.Key]*RecipientState{},
}

for _, source := range sources {
incident.SeverityBySource[source.SourceID] = source.Severity
}
obj, err := object.FromDB(db, incidentRow.ObjectID)
if err != nil {
return err
}

incident.Object = obj

sourceSeverity := &SourceSeverity{IncidentID: incidentRow.ID}
var sources []SourceSeverity
err = db.Select(
&sources,
db.Rebind(db.BuildSelectStmt(sourceSeverity, sourceSeverity)+` WHERE "incident_id" = ? AND "severity" != ?`),
incidentRow.ID, event.SeverityOK,
)
if err != nil {
return fmt.Errorf("failed to fetch incident sources Severity: %w", err)
}

for _, source := range sources {
incident.SeverityBySource[source.SourceID] = source.Severity
}

state := &EscalationState{}
var states []*EscalationState
err = db.Select(&states, db.Rebind(db.BuildSelectStmt(state, state)+` WHERE "incident_id" = ?`), incidentRow.ID)
if err != nil {
return fmt.Errorf("failed to fetch incident rule escalation state: %w", err)
}

for _, state := range states {
incident.EscalationState[state.RuleEscalationID] = state
}

if err := incident.ReloadRecipients(); err != nil {
return err
}

evaluateIncidentRules := func() error {
incident.Lock()
incident.runtimeConfig.RLock()
defer incident.Unlock()
defer incident.runtimeConfig.RUnlock()

state := &EscalationState{}
var states []*EscalationState
err = db.Select(&states, db.Rebind(db.BuildSelectStmt(state, state)+` WHERE "incident_id" = ?`), ir.ID)
err = incident.evaluateRules(0)
if err != nil {
return nil, false, fmt.Errorf("failed to fetch incident rule escalation state: %w", err)
return err
}

for _, state := range states {
incident.EscalationState[state.RuleEscalationID] = state
err = incident.evaluateEscalations()
if err != nil {
return err
}

currentIncident = incident
return nil
}

if create && currentIncident == nil {
created = true
currentIncident = incident
err = evaluateIncidentRules()
if err != nil {
return err
}

if currentIncident != nil {
currentIncidents[obj] = currentIncident
}
incidents[obj] = incident
}

if !created && currentIncident != nil {
currentIncident.Lock()
defer currentIncident.Unlock()
currentIncidents = incidents

contact := &ContactRow{}
var contacts []*ContactRow
err := db.Select(&contacts, db.Rebind(db.BuildSelectStmt(contact, contact)+` WHERE "incident_id" = ?`), currentIncident.ID())
if err != nil {
return nil, false, fmt.Errorf("failed to fetch incident recipients: %w", err)
}
return nil
}

recipients := make(map[recipient.Key]*RecipientState)
for _, contact := range contacts {
recipients[contact.Key] = &RecipientState{Role: contact.Role}
}
func GetCurrent(db *icingadb.DB, obj *object.Object, logger *logging.Logger, runtimeConfig *config.RuntimeConfig, create bool) (*Incident, bool, error) {
currentIncidentsMu.Lock()
defer currentIncidentsMu.Unlock()

created := false
currentIncident := currentIncidents[obj]

if create && currentIncident == nil {
created = true
currentIncident = &Incident{Object: obj, db: db, logger: logger, runtimeConfig: runtimeConfig}
currentIncident.SeverityBySource = make(map[int64]event.Severity)
currentIncident.EscalationState = make(map[escalationID]*EscalationState)
currentIncident.Recipients = make(map[recipient.Key]*RecipientState)

currentIncident.Recipients = recipients
currentIncidents[obj] = currentIncident
}

if !created && currentIncident != nil {
err := currentIncident.ReloadRecipients()
if err != nil {
return nil, false, err
}
}

return currentIncident, created, nil
Expand Down
56 changes: 56 additions & 0 deletions internal/object/db_types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package object

import (
"fmt"
"github.com/icinga/icingadb/pkg/icingadb"
"github.com/icinga/icingadb/pkg/types"
)

Expand All @@ -27,3 +29,57 @@ type ObjectRow struct {
func (d *ObjectRow) TableName() string {
return "object"
}

func FromDB(db *icingadb.DB, id types.Binary) (*Object, error) {
objectRow := &ObjectRow{ID: id}
err := db.QueryRowx(db.Rebind(db.BuildSelectStmt(objectRow, objectRow)+` WHERE "id" = ?`), objectRow.ID).StructScan(objectRow)
if err != nil {
return nil, fmt.Errorf("failed to fetch object: %w", err)
}

tags := map[string]string{"host": objectRow.Host}
if objectRow.Service.Valid {
tags["service"] = objectRow.Service.String
}

metadata := make(map[int64]*SourceMetadata)
var sourceMetas []*SourceMetadata
err = db.Select(&sourceMetas, db.Rebind(db.BuildSelectStmt(&SourceMetadata{}, &SourceMetadata{})+` WHERE "object_id" = ?`), id)
if err != nil {
return nil, fmt.Errorf("failed to fetch source object: %w", err)
}

for _, sourceMeta := range sourceMetas {
source := &SourceMetadata{
ObjectId: id,
SourceId: sourceMeta.SourceId,
Name: sourceMeta.Name,
URL: sourceMeta.URL,
ExtraTags: map[string]string{},
}

var extraTags []*ExtraTagRow
err = db.Select(
&extraTags,
db.Rebind(db.BuildSelectStmt(&ExtraTagRow{}, &ExtraTagRow{})+` WHERE "object_id" = ? AND "source_id" = ?`),
id, sourceMeta.SourceId,
)
if err != nil {
return nil, fmt.Errorf("failed to fetch object extra tags: %w", err)
}

for _, extraTag := range extraTags {
source.ExtraTags[extraTag.Tag] = extraTag.Value
}

metadata[sourceMeta.SourceId] = source
}

obj := &Object{db: db, ID: id, Tags: tags, Metadata: metadata}
cacheMu.Lock()
defer cacheMu.Unlock()

cache[id.String()] = obj

return obj, nil
}

0 comments on commit a40c7cd

Please sign in to comment.