Skip to content

Commit

Permalink
introduce long running handlers (#34)
Browse files Browse the repository at this point in the history
  • Loading branch information
ushis authored Mar 9, 2022
1 parent 873a649 commit a5c088e
Show file tree
Hide file tree
Showing 15 changed files with 233 additions and 150 deletions.
5 changes: 4 additions & 1 deletion check/exec/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package filemtime

import (
"os/exec"
"syscall"

"github.com/ushis/gesundheit/check"
"github.com/ushis/gesundheit/result"
Expand All @@ -26,7 +27,9 @@ func New(_ check.Database, configure func(interface{}) error) (check.Check, erro
}

func (c Check) Exec() result.Result {
out, err := exec.Command(c.Command, c.Args...).CombinedOutput()
cmd := exec.Command(c.Command, c.Args...)
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true, Pgid: 0}
out, err := cmd.CombinedOutput()

if err == nil {
return result.OK(string(out))
Expand Down
8 changes: 4 additions & 4 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func loadConf(path string) (config, db.Database, *http.Server, error) {
var httpServer *http.Server

if conf.Http.Enabled {
httpServer = http.New(conf.Http.Listen, db)
httpServer = http.New(db, conf.Http.Listen)
}
if len(meta.Undecoded()) > 0 {
return conf, nil, nil, fmt.Errorf("failed to load config: %s: unknown field %s", path, meta.Undecoded()[0])
Expand Down Expand Up @@ -181,7 +181,7 @@ func (l modConfLoader) loadCheck(conf *checkConfig, path string, meta toml.MetaD
if err != nil {
return fmt.Errorf("failed to load check config: %s: %s", path, err.Error())
}
l.hub.registerCheckRunner(check.NewRunner(l.node, filepath.Base(path), conf.Description, interval, chk))
l.hub.registerProducer(check.NewRunner(l.node, filepath.Base(path), conf.Description, interval, chk))

return nil
}
Expand All @@ -208,7 +208,7 @@ func (l modConfLoader) loadHandler(conf *handlerConfig, path string, meta toml.M
}
filters = append(filters, f)
}
l.hub.registerHandler(filter.Handler(hdl, filters))
l.hub.registerConsumer(filter.Handler(hdl, filters))

if len(meta.Undecoded()) > 0 {
return fmt.Errorf("failed to load handler config: %s: unknown field %s", path, meta.Undecoded()[0])
Expand Down Expand Up @@ -242,7 +242,7 @@ func (l modConfLoader) loadInput(conf *inputConfig, path string, meta toml.MetaD
if len(meta.Undecoded()) > 0 {
return fmt.Errorf("failed to load input config: %s: unknown field %s", path, meta.Undecoded()[0])
}
l.hub.registerInputRunner(input.NewRunner(in))
l.hub.registerProducer(in)

return nil
}
31 changes: 0 additions & 31 deletions filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,8 @@ package filter

import (
"github.com/ushis/gesundheit/result"
"github.com/ushis/gesundheit/handler"
)

type Filter interface {
Filter(result.Event) (result.Event, bool)
}

type filterHandler struct {
handler handler.Handler
filters []Filter
}

func (h filterHandler) Handle(e result.Event) error {
if e, ok := h.filter(e); ok {
return h.handler.Handle(e)
}
return nil
}

func (h filterHandler) filter(e result.Event) (result.Event, bool) {
ok := true

for _, filter := range h.filters {
if e, ok = filter.Filter(e); !ok {
return e, ok
}
}
return e, ok
}

func Handler(handler handler.Handler, filters []Filter) handler.Handler {
if len(filters) == 0 {
return handler
}
return filterHandler{handler, filters}
}
56 changes: 56 additions & 0 deletions filter/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package filter

import (
"sync"

"github.com/ushis/gesundheit/handler"
"github.com/ushis/gesundheit/result"
)

func Handler(handler handler.Handler, filters []Filter) handler.Handler {
if len(filters) == 0 {
return handler
}
return filterHandler{handler, filters}
}

type filterHandler struct {
handler handler.Handler
filters []Filter
}

func (h filterHandler) Run(wg *sync.WaitGroup) (chan<- result.Event, error) {
out, err := h.handler.Run(wg)

if err != nil {
return nil, err
}
in := make(chan result.Event)
wg.Add(1)

go func() {
h.run(out, in)
wg.Done()
}()

return in, nil
}

func (h filterHandler) run(out chan<- result.Event, in <-chan result.Event) {
for e := range in {
if e, ok := h.filter(e); ok {
out <- e
}
}
}

func (h filterHandler) filter(e result.Event) (result.Event, bool) {
ok := true

for _, filter := range h.filters {
if e, ok = filter.Filter(e); !ok {
return e, ok
}
}
return e, ok
}
6 changes: 3 additions & 3 deletions handler/gotify/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"net/http"
"net/url"

"github.com/ushis/gesundheit/result"
"github.com/ushis/gesundheit/handler"
"github.com/ushis/gesundheit/result"
)

type Handler struct {
Expand All @@ -19,10 +19,10 @@ type Handler struct {
}

func init() {
handler.Register("gotify", New)
handler.RegisterSimple("gotify", New)
}

func New(configure func(interface{}) error) (handler.Handler, error) {
func New(configure func(interface{}) error) (handler.Simple, error) {
h := Handler{Priority: 4}

if err := configure(&h); err != nil {
Expand Down
8 changes: 6 additions & 2 deletions handler/handler.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package handler

import "github.com/ushis/gesundheit/result"
import (
"sync"

"github.com/ushis/gesundheit/result"
)

type Handler interface {
Handle(result.Event) error
Run(*sync.WaitGroup) (chan<- result.Event, error)
}
4 changes: 2 additions & 2 deletions handler/log/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (
type Handler struct{}

func init() {
handler.Register("log", New)
handler.RegisterSimple("log", New)
}

func New(_ func(interface{}) error) (handler.Handler, error) {
func New(_ func(interface{}) error) (handler.Simple, error) {
return Handler{}, nil
}

Expand Down
13 changes: 13 additions & 0 deletions handler/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import "errors"

type HandlerFunc func(func(interface{}) error) (Handler, error)

type SimpleFunc func(func(interface{}) error) (Simple, error)

type Registry map[string]HandlerFunc

func (r Registry) Register(name string, fn HandlerFunc) {
Expand All @@ -26,6 +28,17 @@ func Register(name string, fn HandlerFunc) {
defaultRegistry.Register(name, fn)
}

func RegisterSimple(name string, fn SimpleFunc) {
defaultRegistry.Register(name, wrapSimpleFunc(fn))
}

func wrapSimpleFunc(fn SimpleFunc) HandlerFunc {
return func(configure func(interface{}) error) (Handler, error) {
simple, err := fn(configure)
return simpleWrapper{simple}, err
}
}

func Get(name string) (HandlerFunc, error) {
return defaultRegistry.Get(name)
}
6 changes: 3 additions & 3 deletions handler/remote/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import (
"encoding/json"
"net"

"github.com/ushis/gesundheit/result"
"github.com/ushis/gesundheit/crypto"
"github.com/ushis/gesundheit/handler"
"github.com/ushis/gesundheit/result"
)

func init() {
handler.Register("remote", New)
handler.RegisterSimple("remote", New)
}

type Handler struct {
Expand All @@ -25,7 +25,7 @@ type Config struct {
Address string
}

func New(configure func(interface{}) error) (handler.Handler, error) {
func New(configure func(interface{}) error) (handler.Simple, error) {
conf := Config{}

if err := configure(&conf); err != nil {
Expand Down
36 changes: 36 additions & 0 deletions handler/simple.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package handler

import (
"log"
"sync"

"github.com/ushis/gesundheit/result"
)

type Simple interface {
Handle(result.Event) error
}

type simpleWrapper struct {
Simple
}

func (w simpleWrapper) Run(wg *sync.WaitGroup) (chan<- result.Event, error) {
chn := make(chan result.Event)
wg.Add(1)

go func() {
w.run(chn)
wg.Done()
}()

return chn, nil
}

func (w simpleWrapper) run(chn <-chan result.Event) {
for e := range chn {
if err := w.Handle(e); err != nil {
log.Println(e)
}
}
}
70 changes: 33 additions & 37 deletions http/server.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package http

import (
"context"
"embed"
"encoding/json"
"io/fs"
Expand Down Expand Up @@ -29,66 +28,63 @@ func init() {
}

type Server struct {
Listen string
db db.Database
sockets *sockPool
db db.Database
listen string
}

type Config struct {
Listen string
func New(db db.Database, listen string) *Server {
return &Server{db: db, listen: listen}
}

func New(listen string, db db.Database) *Server {
return &Server{Listen: listen, db: db, sockets: newSockPool()}
}

func (s *Server) Run(ctx context.Context, wg *sync.WaitGroup) error {
l, err := net.Listen("tcp", s.Listen)
func (s *Server) Run(wg *sync.WaitGroup) (chan<- result.Event, error) {
l, err := net.Listen("tcp", s.listen)

if err != nil {
return err
return nil, err
}
socks := newSockPool()
chn := make(chan result.Event)
wg.Add(2)

go func() {
s.run(l)
s.sockets.closeAll()
s.serve(l, socks)
socks.closeAll()
wg.Done()
}()

go func() {
<-ctx.Done()
s.run(socks, chn)
l.Close()
wg.Done()
}()

return nil
return chn, nil
}

func (s *Server) Handle(e result.Event) error {
s.sockets.broadcast(e)
return nil
func (s *Server) run(socks *sockPool, chn <-chan result.Event) {
for e := range chn {
socks.broadcast(e)
}
}

func (s *Server) run(l net.Listener) {
func (s *Server) serve(l net.Listener, socks *sockPool) {
mux := http.NewServeMux()
mux.Handle("/", http.FileServer(http.FS(uiFS)))
mux.HandleFunc("/api/events", s.serveEvents)
mux.HandleFunc("/api/events/socket", s.serveEventsSocket)
http.Serve(l, mux)
}

func (s *Server) serveEvents(w http.ResponseWriter, r *http.Request) {
if events, err := s.db.GetEvents(); err != nil {
w.WriteHeader(http.StatusInternalServerError)
} else {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
json.NewEncoder(w).Encode(events)
}
}
mux.HandleFunc("/api/events", func(w http.ResponseWriter, r *http.Request) {
if events, err := s.db.GetEvents(); err != nil {
w.WriteHeader(http.StatusInternalServerError)
} else {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
json.NewEncoder(w).Encode(events)
}
})

mux.HandleFunc("/api/events/socket", func(w http.ResponseWriter, r *http.Request) {
if conn, _, _, err := ws.UpgradeHTTP(r, w); err == nil {
socks.serve(conn)
}
})

func (s *Server) serveEventsSocket(w http.ResponseWriter, r *http.Request) {
if conn, _, _, err := ws.UpgradeHTTP(r, w); err == nil {
s.sockets.serve(conn)
}
http.Serve(l, mux)
}
Loading

0 comments on commit a5c088e

Please sign in to comment.