Skip to content

Commit

Permalink
dedup events (#32)
Browse files Browse the repository at this point in the history
  • Loading branch information
ushis authored Mar 8, 2022
1 parent 4032a6c commit 1071516
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 13 deletions.
2 changes: 1 addition & 1 deletion db/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

type Database interface {
Close() error
Handle(e result.Event) error
InsertEvent(e result.Event) (bool, error)
GetEvents() ([]result.Event, error)
GetEventsByNode(name string) ([]result.Event, error)
GetLatestEventByNode(name string) (event result.Event, ok bool, err error)
Expand Down
17 changes: 12 additions & 5 deletions db/memory/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,23 @@ func (db Database) Close() error {
return nil
}

func (db Database) Handle(e result.Event) error {
func (db Database) InsertEvent(e result.Event) (bool, error) {
db.Lock()
defer db.Unlock()

if checks, ok := db.db[e.NodeName]; ok {
checks[e.CheckId] = e
} else {
checks, ok := db.db[e.NodeName]

if !ok {
db.db[e.NodeName] = map[string]result.Event{e.CheckId: e}
return true, nil
}
return nil
prevE, ok := checks[e.CheckId]

if !ok || prevE.Id != e.Id {
checks[e.CheckId] = e
return true, nil
}
return false, nil
}

func (db Database) GetEvents() ([]result.Event, error) {
Expand Down
10 changes: 6 additions & 4 deletions db/redis/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,23 +55,25 @@ func (db Database) Close() error {
// vals: event, ttl
//
const scriptStoreEvent = `
if redis.call('exists', 'events:' .. KEYS[3]) > 0 then return 0 end
redis.call('set', 'events:' .. KEYS[3], ARGV[1], 'ex', ARGV[2])
redis.call('hset', 'nodes:' .. KEYS[1] .. ':events', KEYS[2], KEYS[3])
redis.call('set', 'nodes:' .. KEYS[1] .. ':events:latest', KEYS[3])
redis.call('sadd', 'nodes', KEYS[1])
return true
return 1
`

func (db Database) Handle(e result.Event) error {
func (db Database) InsertEvent(e result.Event) (bool, error) {
val, err := json.Marshal(e)

if err != nil {
return err
return false, err
}
ttl := 2 * e.CheckInterval
keys := []string{e.NodeName, e.CheckId, e.Id}
vals := []interface{}{val, ttl}
return db.rdb.Eval(db.rdb.Context(), scriptStoreEvent, keys, vals...).Err()
res, err := db.rdb.Eval(db.rdb.Context(), scriptStoreEvent, keys, vals...).Int()
return res == 1, err
}

// retreive events
Expand Down
10 changes: 9 additions & 1 deletion hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ import (
"sync"

"github.com/ushis/gesundheit/check"
"github.com/ushis/gesundheit/result"
"github.com/ushis/gesundheit/db"
"github.com/ushis/gesundheit/handler"
"github.com/ushis/gesundheit/input"
"github.com/ushis/gesundheit/result"
)

type hub struct {
db db.Database
checkRunners []check.Runner
inputRunners []input.Runner
handlers []handler.Handler
Expand Down Expand Up @@ -74,6 +76,12 @@ func (h *hub) runRunners(ctx context.Context, wg *sync.WaitGroup, events chan<-
}

func (h *hub) dispatch(e result.Event) {
if ok, err := h.db.InsertEvent(e); err != nil {
log.Println(err)
} else if !ok {
return
}

for _, r := range h.handlers {
if err := r.Handle(e); err != nil {
log.Println(err)
Expand Down
3 changes: 1 addition & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,7 @@ func cmdServe(args []string) {
if conf.Log.Timestamps {
log.SetFlags(log.Ldate | log.Ltime)
}
h := &hub{}
h.registerHandler(db)
h := &hub{db: db}

confDir := filepath.Dir(confPath)
modConfs := filepath.Join(confDir, conf.Modules.Config)
Expand Down

0 comments on commit 1071516

Please sign in to comment.