Skip to content

Commit

Permalink
Detect result change via database lookup (#488)
Browse files Browse the repository at this point in the history
  • Loading branch information
ushis authored Nov 17, 2023
1 parent c59ca12 commit f595557
Show file tree
Hide file tree
Showing 15 changed files with 195 additions and 68 deletions.
8 changes: 1 addition & 7 deletions check/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ func (r Runner) run(ctx context.Context, events chan<- result.Event) {
case <-ctx.Done():
return
}
statusHistory := result.StatusHistory(result.StatusOK)
checkInterval := uint64(interval / time.Second)

ticker := time.NewTicker(interval)
defer ticker.Stop()

Expand All @@ -64,16 +61,13 @@ func (r Runner) run(ctx context.Context, events chan<- result.Event) {
NodeName: r.node.Name,
CheckId: r.id,
CheckDescription: r.description,
CheckInterval: checkInterval,
StatusHistory: statusHistory,
CheckInterval: uint64(interval / time.Second),
Id: uuid.New().String(),
Status: res.Status,
Message: res.Message,
Timestamp: time.Now(),
ExpiresAt: time.Now().Add(3 * interval),
}
statusHistory.Append(res.Status)

select {
case events <- event:
case <-ctx.Done():
Expand Down
2 changes: 1 addition & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (l modConfLoader) loadFilter(conf *filterConfig, path string, meta toml.Met
if err != nil {
return nil, err
}
return fn(func(cfg interface{}) error {
return fn(l.db, func(cfg interface{}) error {
return meta.PrimitiveDecode(conf.Config, cfg)
})
}
Expand Down
3 changes: 2 additions & 1 deletion db/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ type Database interface {
Close() error
InsertEvent(e result.Event) (bool, error)
GetEvents() ([]result.Event, error)
GetEventsByNode(name string) ([]result.Event, error)
GetEventsByNode(nodeName string) ([]result.Event, error)
GetEventsByCheck(nodeName, checkId string) ([]result.Event, error)
}
13 changes: 10 additions & 3 deletions db/filesystem/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,21 @@ func (db *Database) GetEvents() ([]result.Event, error) {
return db.db.GetEvents()
}

func (db *Database) GetEventsByNode(name string) ([]result.Event, error) {
return db.db.GetEventsByNode(name)
func (db *Database) GetEventsByNode(nodeName string) ([]result.Event, error) {
return db.db.GetEventsByNode(nodeName)
}

func (db *Database) GetEventsByCheck(nodeName, checkId string) ([]result.Event, error) {
return db.db.GetEventsByCheck(nodeName, checkId)
}

func (db *Database) autoVacuum(ctx context.Context) {
ticker := time.NewTicker(db.vacuumInterval)
defer ticker.Stop()

for {
select {
case <-time.After(db.vacuumInterval):
case <-ticker.C:
case <-ctx.Done():
return
}
Expand Down
104 changes: 85 additions & 19 deletions db/memory/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ func init() {

type Database struct {
*sync.RWMutex
db map[string]map[string]result.Event
db map[string]map[string]*cappedList[result.Event]
}

func New(_ func(interface{}) error) (db.Database, error) {
return Database{&sync.RWMutex{}, make(map[string]map[string]result.Event)}, nil
return Database{&sync.RWMutex{}, make(map[string]map[string]*cappedList[result.Event])}, nil
}

func (db Database) Close() error {
Expand All @@ -32,51 +32,117 @@ func (db Database) InsertEvent(e result.Event) (bool, error) {
checks, ok := db.db[e.NodeName]

if !ok {
db.db[e.NodeName] = map[string]result.Event{e.CheckId: e}
return true, nil
checks = map[string]*cappedList[result.Event]{}
db.db[e.NodeName] = checks
}
prevE, ok := checks[e.CheckId]
events, ok := checks[e.CheckId]

if !ok || prevE.Id != e.Id {
checks[e.CheckId] = e
return true, nil
if !ok {
events = &cappedList[result.Event]{}
checks[e.CheckId] = events
}
for _, prevEvent := range events.slice() {
if prevEvent.Id == e.Id {
return false, nil
}
}
return false, nil
events.push(e)
return true, nil
}

func (db Database) GetEvents() ([]result.Event, error) {
db.RLock()
defer db.RUnlock()

now := time.Now()
events := []result.Event{}
result := []result.Event{}

for _, checks := range db.db {
for _, event := range checks {
for _, events := range checks {
for _, event := range events.slice() {
if event.ExpiresAt.After(now) {
result = append(result, event)
}
}
}
}
return result, nil
}

func (db Database) GetEventsByNode(nodeName string) ([]result.Event, error) {
db.RLock()
defer db.RUnlock()

checks, ok := db.db[nodeName]

if !ok {
return []result.Event{}, nil
}
now := time.Now()
result := []result.Event{}

for _, events := range checks {
for _, event := range events.slice() {
if event.ExpiresAt.After(now) {
events = append(events, event)
result = append(result, event)
}
}
}
return events, nil
return result, nil
}

func (db Database) GetEventsByNode(name string) ([]result.Event, error) {
func (db Database) GetEventsByCheck(nodeName, checkId string) ([]result.Event, error) {
db.RLock()
defer db.RUnlock()

checks, ok := db.db[name]
checks, ok := db.db[nodeName]

if !ok {
return []result.Event{}, nil
}
events, ok := checks[checkId]

if !ok {
return []result.Event{}, nil
}
now := time.Now()
events := []result.Event{}
result := []result.Event{}

for _, event := range checks {
for _, event := range events.slice() {
if event.ExpiresAt.After(now) {
events = append(events, event)
result = append(result, event)
}
}
return events, nil
return result, nil
}

const cappedListBufLen = 64
const cappedListCap = 6

type cappedList[T any] struct {
buffer [cappedListBufLen]T
offset int
length int
}

func (l *cappedList[T]) push(v T) {
index := l.offset + l.length

if index >= len(l.buffer) {
copy(l.buffer[:], l.buffer[l.offset+1:])
l.offset = 0
l.length -= 1
index = l.length
}
l.buffer[index] = v

if l.length < cappedListCap {
l.length += 1
} else {
l.offset += 1
}
}

func (l *cappedList[T]) slice() []T {
return l.buffer[l.offset : l.offset+l.length]
}
52 changes: 41 additions & 11 deletions db/redis/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ func init() {

// Database Layout
//
// nodes set<nodeName>
// nodes:<nodename>:events hash<checkId, eventId>
// events:<eventId> event
//
// nodes set<nodeName>
// nodes:<nodename>:checks set<checkId>
// nodes:<nodename>:checks:<checkId>:events list<eventId>
// events:<eventId> event
func New(configure func(interface{}) error) (db.Database, error) {
conf := Config{}

Expand Down Expand Up @@ -62,18 +62,27 @@ func (db Database) InsertEvent(e result.Event) (bool, error) {
if ok, err := db.rdb.SetNX(db.rdb.Context(), key, val, ttl).Result(); !ok || err != nil {
return ok, err
}
key = mkkey("nodes", e.NodeName, "events")
key = mkkey("nodes", e.NodeName, "checks", e.CheckId, "events")

if err := db.rdb.RPush(db.rdb.Context(), key, e.Id).Err(); err != nil {
return true, err
}
if err := db.rdb.LTrim(db.rdb.Context(), key, -6, -1).Err(); err != nil {
return true, err
}
key = mkkey("nodes", e.NodeName, "checks")

if err := db.rdb.HSet(db.rdb.Context(), key, e.CheckId, e.Id).Err(); err != nil {
if err := db.rdb.SAdd(db.rdb.Context(), key, e.CheckId).Err(); err != nil {
return true, err
}
key = mkkey("nodes")

return true, db.rdb.SAdd(db.rdb.Context(), key, e.NodeName).Err()
}

func (db Database) GetEventsByNode(name string) ([]result.Event, error) {
ids, err := db.rdb.HVals(db.rdb.Context(), mkkey("nodes", name, "events")).Result()
func (db Database) GetEventsByCheck(nodeName, checkId string) ([]result.Event, error) {
key := mkkey("nodes", nodeName, "checks", checkId, "events")
ids, err := db.rdb.LRange(db.rdb.Context(), key, 0, -1).Result()

if err != nil {
return nil, err
Expand Down Expand Up @@ -108,16 +117,37 @@ func (db Database) GetEventsByNode(name string) ([]result.Event, error) {
return events[:i], nil
}

func (db Database) GetEventsByNode(nodeName string) ([]result.Event, error) {
key := mkkey("nodes", nodeName, "checks")
checkIds, err := db.rdb.SMembers(db.rdb.Context(), key).Result()

if err != nil {
return nil, err
}
events := []result.Event{}

for _, checkId := range checkIds {
checkEvents, err := db.GetEventsByCheck(nodeName, checkId)

if err != nil {
return nil, err
}
events = append(events, checkEvents...)
}
return events, nil
}

func (db Database) GetEvents() ([]result.Event, error) {
nodes, err := db.rdb.SMembers(db.rdb.Context(), mkkey("nodes")).Result()
key := mkkey("nodes")
nodeNames, err := db.rdb.SMembers(db.rdb.Context(), key).Result()

if err != nil {
return nil, err
}
events := []result.Event{}

for _, node := range nodes {
nodeEvents, err := db.GetEventsByNode(node)
for _, nodeName := range nodeNames {
nodeEvents, err := db.GetEventsByNode(nodeName)

if err != nil {
return nil, err
Expand Down
5 changes: 3 additions & 2 deletions filter/office-hours/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import (
"strconv"
"time"

"github.com/ushis/gesundheit/result"
"github.com/ushis/gesundheit/db"
"github.com/ushis/gesundheit/filter"
"github.com/ushis/gesundheit/result"
)

type Filter struct {
Expand All @@ -18,7 +19,7 @@ func init() {
filter.Register("office-hours", New)
}

func New(configure func(interface{}) error) (filter.Filter, error) {
func New(_ db.Database, configure func(interface{}) error) (filter.Filter, error) {
f := Filter{}

if err := configure(&f); err != nil {
Expand Down
4 changes: 3 additions & 1 deletion filter/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package filter

import (
"errors"

"github.com/ushis/gesundheit/db"
)

type FilterFunc func(func(interface{}) error) (Filter, error)
type FilterFunc func(db.Database, func(interface{}) error) (Filter, error)

type Registry map[string]FilterFunc

Expand Down
33 changes: 29 additions & 4 deletions filter/result-change/filter.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,45 @@
package resultchange

import (
"github.com/ushis/gesundheit/db"
"github.com/ushis/gesundheit/filter"
"github.com/ushis/gesundheit/result"
)

type Filter struct{}
type Filter struct {
db db.Database
}

func init() {
filter.Register("result-change", New)
}

func New(_ func(interface{}) error) (filter.Filter, error) {
return Filter{}, nil
func New(db db.Database, _ func(interface{}) error) (filter.Filter, error) {
return Filter{db}, nil
}

func (f Filter) Filter(e result.Event) (result.Event, bool) {
return e, e.Status != e.StatusHistory.Last()
events, err := f.db.GetEventsByCheck(e.NodeName, e.CheckId)

if err != nil || len(events) == 0 {
return e, e.Status == result.StatusFail
}
index := indexOfEvent(events, e)

if index == 0 {
return e, e.Status == result.StatusFail
}
if index < 0 {
index = len(events)
}
return e, e.Status != events[index-1].Status
}

func indexOfEvent(events []result.Event, e result.Event) int {
for i, event := range events {
if event.Id == e.Id {
return i
}
}
return -1
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/gobwas/ws v1.1.0
github.com/google/uuid v1.3.0
github.com/itchyny/gojq v0.12.7
github.com/streadway/amqp v1.0.0
github.com/rabbitmq/amqp091-go v1.9.0
golang.org/x/crypto v0.14.0
)

Expand Down
Loading

0 comments on commit f595557

Please sign in to comment.