Skip to content

Commit

Permalink
logging cleanup and code re-org. experiment syslog added
Browse files Browse the repository at this point in the history
  • Loading branch information
davidcoles committed Mar 28, 2024
1 parent df8eb4b commit c8a37b4
Show file tree
Hide file tree
Showing 9 changed files with 450 additions and 246 deletions.
221 changes: 190 additions & 31 deletions cmd/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,51 +20,31 @@ package main

import (
"errors"
"fmt"
"log"
"net"
"net/netip"
"time"

"github.com/davidcoles/cue"
"github.com/davidcoles/cue/mon"
"github.com/davidcoles/xvs"
)

type Client = *xvs.Client

type tuple struct {
addr netip.Addr
port uint16
prot uint8
}

type Balancer struct {
Client *xvs.Client
ProbeFunc func(vip, rip, nat netip.Addr, check cue.Check) (bool, string) // see Probe() below
}

// Only needed if you need to override the built in monitoring health checking mechanism
// Here we use it to run checks against the NAT address in the network namespace.
// ProbeFunc method in Balancer not needed if you don't require this functionality
func (b *Balancer) Probe(vip netip.Addr, rip netip.Addr, check cue.Check) (bool, string) {

f := b.ProbeFunc

if f == nil {
return false, "No probe function defined"
}

nat, ok := b.Client.NATAddress(vip, rip)

if !ok {
return false, "No NAT destination defined for " + vip.String() + "/" + rip.String()
}

return f(vip, rip, nat, check)
NetNS *nns
Logger *logger
Client *xvs.Client
}

// interface method called by the director when the load balancer needs to be reconfigured
func (b *Balancer) Configure(services []cue.Service) error {

target := map[tuple]cue.Service{}

for _, s := range services {
target[tuple{addr: s.Address, port: s.Port, prot: s.Protocol}] = s
target[tuple{Address: s.Address, Port: s.Port, Protocol: s.Protocol}] = s

for _, d := range s.Destinations {
if s.Port != d.Port {
Expand All @@ -75,7 +55,7 @@ func (b *Balancer) Configure(services []cue.Service) error {

svcs, _ := b.Client.Services()
for _, s := range svcs {
key := tuple{addr: s.Service.Address, port: s.Service.Port, prot: s.Service.Protocol}
key := tuple{Address: s.Service.Address, Port: s.Service.Port, Protocol: s.Service.Protocol}
if _, wanted := target[key]; !wanted {
b.Client.RemoveService(s.Service)
}
Expand All @@ -100,3 +80,182 @@ func (b *Balancer) Configure(services []cue.Service) error {

return nil
}

// interface method alled by mon when a destination's heatlh status transistions up or down
func (b *Balancer) Notify(instance mon.Instance, state bool) {
if logger := b.Logger; logger != nil {
logger.NOTICE("notify", notifyLog(instance, state))
}
}

// interface method called by mon when a destination needs to be probed - find the NAT address and probe that via the netns
func (b *Balancer) Probe(_ *mon.Mon, instance mon.Instance, check mon.Check) (ok bool, diagnostic string) {

vip := instance.Service.Address
rip := instance.Destination.Address
nat, ok := b.Client.NATAddress(vip, rip)

if !ok {
diagnostic = "No NAT destination defined for " + vip.String() + "/" + rip.String()
} else {
ok, diagnostic = b.NetNS.Probe(nat, check)
}

if b.Logger != nil {
b.Logger.DEBUG("probe", probeLog(instance, nat, check, ok, diagnostic))
}

return ok, diagnostic
}

func (b *Balancer) Multicast(multicast string) {
go b.multicast_send(multicast)
go b.multicast_recv(multicast)
}

const maxDatagramSize = 1500

func (b *Balancer) multicast_send(address string) {

addr, err := net.ResolveUDPAddr("udp", address)

if err != nil {
log.Fatal(err)
}

conn, err := net.DialUDP("udp", nil, addr)

if err != nil {
log.Fatal(err)
}

conn.SetWriteBuffer(maxDatagramSize * 100)

ticker := time.NewTicker(time.Millisecond * 10)

var buff [maxDatagramSize]byte

for {
select {
case <-ticker.C:
n := 0

read_flow:
f := b.Client.ReadFlow()
if len(f) > 0 {
buff[n] = uint8(len(f))

copy(buff[n+1:], f[:])
n += 1 + len(f)
if n < maxDatagramSize-100 {
goto read_flow
}
}

if n > 0 {
conn.Write(buff[:n])
}
}
}
}

func (b *Balancer) multicast_recv(address string) {
udp, err := net.ResolveUDPAddr("udp", address)

if err != nil {
log.Fatal(err)
}

s := []string{`|`, `/`, `-`, `\`}
var x int

conn, err := net.ListenMulticastUDP("udp", nil, udp)

conn.SetReadBuffer(maxDatagramSize * 1000)

buff := make([]byte, maxDatagramSize)

for {
nread, _, err := conn.ReadFromUDP(buff)
fmt.Print(s[x%4] + "\b")
x++
if err == nil {
for n := 0; n+1 < nread; {
l := int(buff[n])
o := n + 1
n = o + l
if l > 0 && n <= nread {
b.Client.WriteFlow(buff[o:n])
}
}
}
}
}

func probeLog(instance mon.Instance, addr netip.Addr, check mon.Check, status bool, reason string) map[string]any {

kv := map[string]any{
"reason": reason,
"status": updown(status),
"proto": proto(instance.Service.Protocol),
"saddr": instance.Service.Address.String(),
"sport": instance.Service.Port,
"daddr": instance.Destination.Address.String(),
"dport": instance.Destination.Port,
"probe": check.Type,
"pport": check.Port,
"paddr": addr,
}

switch check.Type {
case "dns":
if check.Method {
kv["method"] = "tcp"
} else {
kv["method"] = "udp"
}
case "http":
fallthrough
case "https":
if check.Method {
kv["method"] = "HEAD"
} else {
kv["method"] = "GET"
}

if check.Host != "" {
kv["host"] = check.Host
}

if check.Path != "" {
kv["path"] = check.Path
}

if len(check.Expect) > 0 {
kv["expect"] = fmt.Sprintf("%v", check.Expect)
}
}

return kv
}

func notifyLog(instance mon.Instance, status bool) map[string]any {
return map[string]any{
"status": updown(status),
"proto": proto(instance.Service.Protocol),
"saddr": instance.Service.Address.String(),
"sport": instance.Service.Port,
"daddr": instance.Destination.Address.String(),
"dport": instance.Destination.Port,
}
}

func proto(p uint8) string {
switch p {
case TCP:
return "tcp"
case UDP:
return "udp"
}
return fmt.Sprintf("%d", p)
}
21 changes: 11 additions & 10 deletions cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ func Load(file string) (*Config, error) {
type ipport = IPPort

type IPPort struct {
Addr netip.Addr
Port uint16
Address netip.Addr
Port uint16
}

func (i *ipport) MarshalJSON() ([]byte, error) {
Expand All @@ -159,7 +159,7 @@ func (i *ipport) UnmarshalJSON(data []byte) error {
}

func (i ipport) MarshalText() ([]byte, error) {
return []byte(fmt.Sprintf("%s:%d", i.Addr, i.Port)), nil
return []byte(fmt.Sprintf("%s:%d", i.Address, i.Port)), nil
}

func (i *ipport) UnmarshalText(data []byte) error {
Expand All @@ -182,7 +182,7 @@ func (i *ipport) UnmarshalText(data []byte) error {
return errors.New("Badly formed ip:port - IP: " + m[1])
}

i.Addr = ip
i.Address = ip

if m[3] != "" {

Expand All @@ -203,14 +203,15 @@ func (i *ipport) UnmarshalText(data []byte) error {

/**********************************************************************/

type tuple = Tuple
type Tuple struct {
Addr netip.Addr
Address netip.Addr
Port uint16
Protocol uint8
}

func (i *Tuple) Compare(j *Tuple) (r int) {
if r = i.Addr.Compare(j.Addr); r != 0 {
if r = i.Address.Compare(j.Address); r != 0 {
return r
}

Expand Down Expand Up @@ -267,7 +268,7 @@ func (t Tuple) MarshalText() ([]byte, error) {
return nil, errors.New("Invalid protocol")
}

return []byte(fmt.Sprintf("%s:%d:%s", t.Addr, t.Port, p)), nil
return []byte(fmt.Sprintf("%s:%d:%s", t.Address, t.Port, p)), nil
}

func (t *Tuple) UnmarshalText(data []byte) error {
Expand All @@ -292,7 +293,7 @@ func (t *Tuple) UnmarshalText(data []byte) error {
return errors.New("Badly formed ip:port:protocol - IP " + text)
}

t.Addr = ip
t.Address = ip

port, err := strconv.Atoi(m[2])
if err != nil {
Expand Down Expand Up @@ -359,7 +360,7 @@ func (c *Config) parse() []cue.Service {
for ipp, svc := range c.Services {

service := cue.Service{
Address: ipp.Addr,
Address: ipp.Address,
Port: ipp.Port,
Protocol: ipp.Protocol,
Required: svc.Need,
Expand All @@ -369,7 +370,7 @@ func (c *Config) parse() []cue.Service {
for ap, dst := range svc.Destinations {

destination := cue.Destination{
Address: ap.Addr,
Address: ap.Address,
Port: ap.Port,
Weight: dst.Weight,
Disabled: dst.Disabled,
Expand Down
4 changes: 4 additions & 0 deletions cmd/config.pl
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@
$json->{'logging'}->{'alert'}+=0;
}

if(defined $json->{'logging'}) {
$json->{'logging'}->{'syslog'} = jsonbool($json->{'logging'}->{'syslog'});
}

if(defined $json->{'defcon'}) {
$json->{'defcon'}+=0;
given($json->{'defcon'}) {
Expand Down
2 changes: 1 addition & 1 deletion cmd/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module vc5
go 1.20

require (
github.com/davidcoles/cue v0.0.4
github.com/davidcoles/cue v0.0.7
github.com/davidcoles/xvs v0.1.9
github.com/elastic/go-elasticsearch/v7 v7.17.10
)
Expand Down
4 changes: 2 additions & 2 deletions cmd/go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/davidcoles/cue v0.0.4 h1:DnDmr5PuKCypMk7e7pY+Nd0upxCz/Jjixp07/g7ulpg=
github.com/davidcoles/cue v0.0.4/go.mod h1:uUTkCvlkD+nqbmP9XyA63BGMW3g3JJRYC0q8vJKSXMc=
github.com/davidcoles/cue v0.0.7 h1:4flG/3DUEY3lx1yjDbxJglfKi///rG9MNcHznd3ZaH4=
github.com/davidcoles/cue v0.0.7/go.mod h1:uUTkCvlkD+nqbmP9XyA63BGMW3g3JJRYC0q8vJKSXMc=
github.com/davidcoles/xvs v0.1.9 h1:fWDumZAuT7oikBJ2KVBEbWHiFPJQFtGf9ry+6Osl0Z8=
github.com/davidcoles/xvs v0.1.9/go.mod h1:xSFQ+/pDWhsMyH0DuAJr92ujAdPnRMcqqBbY+BRkRoE=
github.com/elastic/go-elasticsearch/v7 v7.17.10 h1:TCQ8i4PmIJuBunvBS6bwT2ybzVFxxUhhltAs3Gyu1yo=
Expand Down
Loading

0 comments on commit c8a37b4

Please sign in to comment.