Skip to content

Commit

Permalink
Fixed for udp logger
Browse files Browse the repository at this point in the history
  • Loading branch information
leighmacdonald committed Apr 15, 2024
1 parent 3899c5e commit a1b5142
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 52 deletions.
4 changes: 2 additions & 2 deletions internal/demo/demo_usecase.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,8 @@ func (d demoUsecase) DropDemo(ctx context.Context, demoFile *domain.DemoFile) er
}
}

slog.Debug("Demo expired and removed",
slog.String("bucket", conf.S3.BucketDemo), slog.String("name", demoFile.Title))
// slog.Debug("Demo expired and removed",
// slog.String("bucket", conf.S3.BucketDemo), slog.String("name", demoFile.Title))

return nil
}
15 changes: 9 additions & 6 deletions internal/state/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,12 +320,15 @@ func (c *Collector) startStatus(ctx context.Context) {

waitGroup.Wait()

slog.Debug("RCON update cycle complete",
slog.Int("success", int(successful.Load())),
slog.Int("existing", int(existing.Load())),
slog.Int("fail", len(configs)-int(successful.Load())),
slog.Duration("duration", time.Since(startTIme)))

fail := len(configs) - int(successful.Load())

if fail > 0 {
slog.Debug("RCON update cycle complete",
slog.Int("success", int(successful.Load())),
slog.Int("existing", int(existing.Load())),
slog.Int("fail", fail),
slog.Duration("duration", time.Since(startTIme)))
}
case <-ctx.Done():
return
}
Expand Down
4 changes: 2 additions & 2 deletions internal/state/state_usecase.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ func (s *stateUsecase) Start(ctx context.Context) error {

s.logListener = logSrc

s.stateRepository.Start(ctx)
go s.stateRepository.Start(ctx)

s.updateSrcdsLogSecrets(ctx)

go s.logReader(ctx, false)
go s.logReader(ctx, s.configUsecase.Config().Debug.WriteUnhandledLogEvents)

// TODO run on server Config changes
s.updateSrcdsLogSecrets(ctx)
Expand Down
92 changes: 50 additions & 42 deletions pkg/logparse/udp_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"time"

"github.com/leighmacdonald/gbans/pkg/log"
"go.uber.org/atomic"
)

type srcdsPacket byte
Expand Down Expand Up @@ -93,7 +92,6 @@ func (remoteSrc *UDPLogListener) Start(ctx context.Context) {
slog.String("listen_addr", fmt.Sprintf("%s/udp", remoteSrc.udpAddr.String())))

var (
running = atomic.NewBool(true)
count = uint64(0)
insecureCount = uint64(0)
errCount = uint64(0)
Expand All @@ -103,60 +101,65 @@ func (remoteSrc *UDPLogListener) Start(ctx context.Context) {
go func() {
startTime := time.Now()

for running.Load() {
buffer := make([]byte, 1024)
for {
select {
case <-ctx.Done():
return
default:
buffer := make([]byte, 1024)

readLen, _, errReadUDP := connection.ReadFromUDP(buffer)
if errReadUDP != nil {
slog.Warn("UDP log read error", log.ErrAttr(errReadUDP))
readLen, _, errReadUDP := connection.ReadFromUDP(buffer)
if errReadUDP != nil {
slog.Warn("UDP log read error", log.ErrAttr(errReadUDP))

continue
}

switch srcdsPacket(buffer[4]) {
case s2aLogString:
if insecureCount%10000 == 0 {
slog.Error("Using unsupported log packet type 0x52",
slog.Int64("count", int64(insecureCount+1)))
continue
}

insecureCount++
errCount++
case s2aLogString2:
line := string(buffer)

idx := strings.Index(line, "L ")
if idx == -1 {
slog.Warn("Received malformed log message: Failed to find marker")
switch srcdsPacket(buffer[4]) {
case s2aLogString:
if insecureCount%10000 == 0 {
slog.Error("Using unsupported log packet type 0x52",
slog.Int64("count", int64(insecureCount+1)))
}

insecureCount++
errCount++
case s2aLogString2:
line := string(buffer)

continue
}
idx := strings.Index(line, "L ")
if idx == -1 {
slog.Warn("Received malformed log message: Failed to find marker")

secret, errConv := strconv.ParseInt(line[5:idx], 10, 32)
if errConv != nil {
slog.Error("Received malformed log message: Failed to parse secret",
log.ErrAttr(errConv))
errCount++

errCount++
continue
}

continue
}
secret, errConv := strconv.ParseInt(line[5:idx], 10, 32)
if errConv != nil {
slog.Error("Received malformed log message: Failed to parse secret",
log.ErrAttr(errConv))

errCount++

continue
}

msgIngressChan <- newMsg{source: secret, body: line[idx : readLen-2]}
msgIngressChan <- newMsg{source: secret, body: line[idx : readLen-2]}

count++
count++

if count%10000 == 0 {
rate := float64(count) / time.Since(startTime).Seconds()
if count%10000 == 0 {
rate := float64(count) / time.Since(startTime).Seconds()

slog.Debug("UDP SRCDS Logger Packets",
slog.Uint64("count", count),
slog.Float64("messages/sec", rate),
slog.Uint64("errors", errCount))
slog.Debug("UDP SRCDS Logger Packets",
slog.Uint64("count", count),
slog.Float64("messages/sec", rate),
slog.Uint64("errors", errCount))

startTime = time.Now()
startTime = time.Now()
}
}
}
}
Expand All @@ -169,7 +172,7 @@ func (remoteSrc *UDPLogListener) Start(ctx context.Context) {
for {
select {
case <-ctx.Done():
running.Store(false)
return
case logPayload := <-msgIngressChan:
remoteSrc.RLock()
server, found := remoteSrc.secretMap[int(logPayload.source)]
Expand Down Expand Up @@ -214,6 +217,11 @@ func logToServerEvent(parser *LogParser, serverID int, serverName string, msg st
ServerID: serverID,
ServerName: serverName,
}

if strings.Contains("vote", strings.ToLower(msg)) {
slog.Info(msg)
}

parseResult, errParse := parser.Parse(msg)

if errParse != nil {
Expand Down

0 comments on commit a1b5142

Please sign in to comment.