Skip to content

Commit

Permalink
Load all open incidents from DB on daemon startup
Browse files Browse the repository at this point in the history
  • Loading branch information
yhabteab committed May 23, 2023
1 parent c48835b commit 58abc76
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 19 deletions.
6 changes: 6 additions & 0 deletions cmd/icinga-notifications-daemon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"flag"
"fmt"
"github.com/icinga/icinga-notifications/internal/config"
"github.com/icinga/icinga-notifications/internal/incident"
"github.com/icinga/icinga-notifications/internal/listener"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/utils"
Expand Down Expand Up @@ -62,6 +63,11 @@ func main() {

go runtimeConfig.PeriodicUpdates(context.TODO(), 1*time.Second)

err = incident.LoadOpenIncidents(context.TODO(), db, logs.GetChildLogger("incident"), runtimeConfig, conf.Icingaweb2URL)
if err != nil {
logger.Fatalw("Can't load incidents from database", zap.Error(err))
}

if err := listener.NewListener(db, conf, runtimeConfig, logs).Run(); err != nil {
panic(err)
}
Expand Down
23 changes: 23 additions & 0 deletions internal/incident/incident.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,29 @@ func (i *Incident) processAcknowledgementEvent(ctx context.Context, tx *sqlx.Tx,
return nil
}

// ReloadRecipients reloads the current incident recipients from the database.
// Returns error on database failure.
func (i *Incident) ReloadRecipients(ctx context.Context, tx *sqlx.Tx) error {
i.Lock()
defer i.Unlock()

contact := &ContactRow{}
var contacts []*ContactRow
err := tx.SelectContext(ctx, &contacts, tx.Rebind(i.db.BuildSelectStmt(contact, contact)+` WHERE "incident_id" = ?`), i.ID())
if err != nil {
return fmt.Errorf("failed to fetch incident recipients: %w", err)
}

recipients := make(map[recipient.Key]*RecipientState)
for _, contact := range contacts {
recipients[contact.Key] = &RecipientState{Role: contact.Role}
}

i.Recipients = recipients

return nil
}

type EscalationState struct {
IncidentID int64 `db:"incident_id"`
RuleEscalationID int64 `db:"rule_escalation_id"`
Expand Down
25 changes: 7 additions & 18 deletions internal/incident/incidents.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/icinga/icinga-notifications/internal/recipient"
"github.com/icinga/icingadb/pkg/icingadb"
"github.com/icinga/icingadb/pkg/logging"
"github.com/jmoiron/sqlx"
"golang.org/x/sync/errgroup"
"sync"
)
Expand All @@ -20,7 +21,7 @@ var (
)

func GetCurrent(
ctx context.Context, db *icingadb.DB, obj *object.Object, logger *logging.Logger, runtimeConfig *config.RuntimeConfig, configFile *config.ConfigFile, create bool,
ctx context.Context, db *icingadb.DB, tx *sqlx.Tx, obj *object.Object, logger *logging.Logger, runtimeConfig *config.RuntimeConfig, configFile *config.ConfigFile, create bool,
) (*Incident, bool, error) {
currentIncidentsMu.Lock()
defer currentIncidentsMu.Unlock()
Expand All @@ -35,7 +36,7 @@ func GetCurrent(
incident.EscalationState = make(map[escalationID]*EscalationState)
incident.Recipients = make(map[recipient.Key]*RecipientState)

err := db.QueryRowxContext(ctx, db.Rebind(db.BuildSelectStmt(ir, ir)+` WHERE "object_id" = ? AND "recovered_at" IS NULL`), obj.ID).StructScan(ir)
err := tx.QueryRowxContext(ctx, db.Rebind(db.BuildSelectStmt(ir, ir)+` WHERE "object_id" = ? AND "recovered_at" IS NULL`), obj.ID).StructScan(ir)
if err != nil && err != sql.ErrNoRows {
return nil, false, fmt.Errorf("incident query failed with: %w", err)
} else if err == nil {
Expand All @@ -46,7 +47,7 @@ func GetCurrent(
g.Go(func() error {
sourceSeverity := &SourceSeverity{IncidentID: ir.ID}
var sources []SourceSeverity
err := db.SelectContext(
err := tx.SelectContext(
childCtx, &sources,
db.Rebind(db.BuildSelectStmt(sourceSeverity, sourceSeverity)+` WHERE "incident_id" = ? AND "severity" != ?`),
ir.ID, event.SeverityOK,
Expand All @@ -65,7 +66,7 @@ func GetCurrent(
g.Go(func() error {
state := &EscalationState{}
var states []*EscalationState
err = db.SelectContext(childCtx, &states, db.Rebind(db.BuildSelectStmt(state, state)+` WHERE "incident_id" = ?`), ir.ID)
err = tx.SelectContext(childCtx, &states, db.Rebind(db.BuildSelectStmt(state, state)+` WHERE "incident_id" = ?`), ir.ID)
if err != nil {
return fmt.Errorf("failed to fetch incident rule escalation state: %w", err)
}
Expand Down Expand Up @@ -95,22 +96,10 @@ func GetCurrent(
}

if !created && currentIncident != nil {
currentIncident.Lock()
defer currentIncident.Unlock()

contact := &ContactRow{}
var contacts []*ContactRow
err := db.SelectContext(ctx, &contacts, db.Rebind(db.BuildSelectStmt(contact, contact)+` WHERE "incident_id" = ?`), currentIncident.ID())
err := currentIncident.ReloadRecipients(ctx, tx)
if err != nil {
return nil, false, fmt.Errorf("failed to fetch incident recipients: %w", err)
}

recipients := make(map[recipient.Key]*RecipientState)
for _, contact := range contacts {
recipients[contact.Key] = &RecipientState{Role: contact.Role}
return nil, false, err
}

currentIncident.Recipients = recipients
}

return currentIncident, created, nil
Expand Down
2 changes: 1 addition & 1 deletion internal/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, req *http.Request) {
}

createIncident := ev.Severity != event.SeverityNone && ev.Severity != event.SeverityOK
currentIncident, created, err := incident.GetCurrent(ctx, l.db, obj, l.logs.GetChildLogger("incident"), l.runtimeConfig, l.configFile, createIncident)
currentIncident, created, err := incident.GetCurrent(ctx, l.db, tx, obj, l.logs.GetChildLogger("incident"), l.runtimeConfig, l.configFile, createIncident)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = fmt.Fprintln(w, err)
Expand Down
50 changes: 50 additions & 0 deletions internal/object/db_types.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package object

import (
"context"
"fmt"
"github.com/icinga/icingadb/pkg/icingadb"
"github.com/icinga/icingadb/pkg/types"
"github.com/jmoiron/sqlx"
)

// ExtraTagRow represents a single database object extra tag like `hostgroup/foo: null`.
Expand All @@ -27,3 +31,49 @@ type ObjectRow struct {
func (d *ObjectRow) TableName() string {
return "object"
}

func FromDB(ctx context.Context, db *icingadb.DB, tx *sqlx.Tx, id types.Binary) (*Object, error) {
objectRow := &ObjectRow{ID: id}
err := tx.QueryRowxContext(ctx, db.Rebind(db.BuildSelectStmt(objectRow, objectRow)+` WHERE "id" = ?`), objectRow.ID).StructScan(objectRow)
if err != nil {
return nil, fmt.Errorf("failed to fetch object: %w", err)
}

tags := map[string]string{"host": objectRow.Host}
if objectRow.Service.Valid {
tags["service"] = objectRow.Service.String
}

metadata := make(map[int64]*SourceMetadata)
var sourceMetas []*SourceMetadata
err = tx.SelectContext(ctx, &sourceMetas, db.Rebind(db.BuildSelectStmt(&SourceMetadata{}, &SourceMetadata{})+` WHERE "object_id" = ?`), id)
if err != nil {
return nil, fmt.Errorf("failed to fetch source object: %w", err)
}

var extraTags []*ExtraTagRow
err = tx.SelectContext(
ctx, &extraTags,
db.Rebind(db.BuildSelectStmt(&ExtraTagRow{}, &ExtraTagRow{})+` WHERE "object_id" = ?`), id,
)
if err != nil {
return nil, fmt.Errorf("failed to fetch object extra tags: %w", err)
}

for _, sourceMeta := range sourceMetas {
sourceMeta.ExtraTags = map[string]string{}
metadata[sourceMeta.SourceId] = sourceMeta
}

for _, extraTag := range extraTags {
sourceMeta, ok := metadata[extraTag.SourceId]
if ok {
sourceMeta.ExtraTags[extraTag.Tag] = extraTag.Value
}
}

obj := &Object{db: db, ID: id, Tags: tags, Metadata: metadata}
obj.UpdateCache()

return obj, nil
}

0 comments on commit 58abc76

Please sign in to comment.