Skip to content

Commit

Permalink
first step at moving away from using the standard log library
Browse files Browse the repository at this point in the history
  • Loading branch information
jrmolin committed Jan 8, 2025
1 parent bde07fa commit 4c1dd9a
Show file tree
Hide file tree
Showing 16 changed files with 81 additions and 100 deletions.
10 changes: 5 additions & 5 deletions x-pack/filebeat/input/netflow/decoder/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
package config

import (
"io"
"time"

"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/fields"
"github.com/elastic/elastic-agent-libs/logp"
)

type ActiveSessionsMetric interface {
Expand All @@ -19,7 +19,7 @@ type ActiveSessionsMetric interface {
// Config stores the configuration used by the NetFlow Collector.
type Config struct {
protocols []string
logOutput io.Writer
logOutput *logp.Logger
expiration time.Duration
detectReset bool
fields fields.FieldDict
Expand All @@ -30,7 +30,7 @@ type Config struct {

var defaultCfg = Config{
protocols: []string{},
logOutput: io.Discard,
logOutput: logp.L().Named("netflow"),
expiration: time.Hour,
detectReset: true,
sharedTemplates: false,
Expand All @@ -53,7 +53,7 @@ func (c *Config) WithProtocols(protos ...string) *Config {
}

// WithLogOutput sets the output io.Writer for logging.
func (c *Config) WithLogOutput(output io.Writer) *Config {
func (c *Config) WithLogOutput(output *logp.Logger) *Config {
c.logOutput = output
return c
}
Expand Down Expand Up @@ -121,7 +121,7 @@ func (c *Config) Protocols() []string {
}

// LogOutput returns the io.Writer where logs are to be written.
func (c *Config) LogOutput() io.Writer {
func (c *Config) LogOutput() *logp.Logger {
return c.logOutput
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,51 +8,53 @@ import (
"bytes"
"encoding/json"
"fmt"
"log"
"net"
"os"

"github.com/elastic/elastic-agent-libs/logp"

"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder"
)

func main() {
logger := logp.L().Named("netflow")

decoder, err := decoder.NewDecoder(decoder.NewConfig().
WithLogOutput(os.Stderr).
WithLogOutput(logger).
WithProtocols("v1", "v5", "v9", "ipfix"))
if err != nil {
log.Fatal("Failed creating decoder:", err)
logger.Fatal("Failed creating decoder:", err)
}

addr, err := net.ResolveUDPAddr("udp", ":2055")
if err != nil {
log.Fatal("Failed to resolve address:", err)
logger.Fatal("Failed to resolve address:", err)
}

server, err := net.ListenUDP("udp", addr)
if err != nil {
log.Fatalf("Failed to listen on %v: %v", addr, err)
logger.Fatalf("Failed to listen on %v: %v", addr, err)
}
defer server.Close()

if err = server.SetReadBuffer(1 << 16); err != nil {
log.Fatalf("Failed to set read buffer size for socket: %v", err)
logger.Fatalf("Failed to set read buffer size for socket: %v", err)
}

log.Println("Listening on ", server.LocalAddr())
logger.Debug("Listening on ", server.LocalAddr())
buf := make([]byte, 8192)
decBuf := new(bytes.Buffer)
for {
size, remote, err := server.ReadFromUDP(buf)
if err != nil {
log.Println("Error reading from socket:", err)
logger.Debug("Error reading from socket:", err)
continue
}

decBuf.Reset()
decBuf.Write(buf[:size])
records, err := decoder.Read(decBuf, remote)
if err != nil {
log.Printf("warn: Failed reading records from %v: %v\n", remote, err)
logger.Debugf("warn: Failed reading records from %v: %v\n", remote, err)
}

for _, r := range records {
Expand All @@ -63,7 +65,7 @@ func main() {
"data": r.Fields,
})
if err != nil {
log.Fatal(err)
logger.Fatal(err)
}
fmt.Println(string(evt))
}
Expand Down
4 changes: 1 addition & 3 deletions x-pack/filebeat/input/netflow/decoder/ipfix/ipfix.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
package ipfix

import (
"log"

"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/config"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/protocol"
v9 "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/v9"
Expand All @@ -29,7 +27,7 @@ func init() {
}

func New(config config.Config) protocol.Protocol {
logger := log.New(config.LogOutput(), LogPrefix, 0)
logger := config.LogOutput().Named(LogPrefix)
decoder := DecoderIPFIX{
DecoderV9: v9.DecoderV9{Logger: logger, Fields: config.Fields()},
}
Expand Down
8 changes: 3 additions & 5 deletions x-pack/filebeat/input/netflow/decoder/test/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,16 @@ import (
"strconv"
"testing"

"github.com/elastic/elastic-agent-libs/logp"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/record"
)

type TestLogWriter struct {
testing.TB
}

func (t TestLogWriter) Write(buf []byte) (int, error) {
t.Log(string(buf))
return len(buf), nil
log *logp.Logger
}

func MakeAddress(t testing.TB, ipPortPair string) net.Addr {
Expand Down
10 changes: 5 additions & 5 deletions x-pack/filebeat/input/netflow/decoder/v1/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"encoding/binary"
"fmt"
"io"
"log"
"net"
"time"

Expand All @@ -18,6 +17,7 @@ import (
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/protocol"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/record"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/template"
"github.com/elastic/elastic-agent-libs/logp"
)

const (
Expand Down Expand Up @@ -52,7 +52,7 @@ var templateV1 = template.Template{
type ReadHeaderFn func(*bytes.Buffer, net.Addr) (int, time.Time, record.Map, error)

type NetflowProtocol struct {
logger *log.Logger
logger *logp.Logger
flowTemplate *template.Template
version uint16
readHeader ReadHeaderFn
Expand All @@ -63,10 +63,10 @@ func init() {
}

func New(config config.Config) protocol.Protocol {
return NewProtocol(ProtocolID, &templateV1, readV1Header, log.New(config.LogOutput(), LogPrefix, 0))
return NewProtocol(ProtocolID, &templateV1, readV1Header, logp.L().Named(LogPrefix))
}

func NewProtocol(version uint16, template *template.Template, readHeader ReadHeaderFn, logger *log.Logger) protocol.Protocol {
func NewProtocol(version uint16, template *template.Template, readHeader ReadHeaderFn, logger *logp.Logger) protocol.Protocol {
return &NetflowProtocol{
logger: logger,
flowTemplate: template,
Expand All @@ -90,7 +90,7 @@ func (NetflowProtocol) Stop() error {
func (p *NetflowProtocol) OnPacket(buf *bytes.Buffer, source net.Addr) (flows []record.Record, err error) {
numFlows, timestamp, metadata, err := p.readHeader(buf, source)
if err != nil {
p.logger.Printf("Failed parsing packet: %v", err)
p.logger.Debugf("Failed parsing packet: %v", err)
return nil, fmt.Errorf("error reading netflow header: %w", err)
}
flows, err = p.flowTemplate.Apply(buf, numFlows)
Expand Down
3 changes: 1 addition & 2 deletions x-pack/filebeat/input/netflow/decoder/v5/v5.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"bytes"
"encoding/binary"
"io"
"log"
"net"
"time"

Expand Down Expand Up @@ -58,7 +57,7 @@ func init() {
}

func New(config config.Config) protocol.Protocol {
return v1.NewProtocol(ProtocolID, &templateV5, ReadV5Header, log.New(config.LogOutput(), LogPrefix, 0))
return v1.NewProtocol(ProtocolID, &templateV5, ReadV5Header, config.LogOutput().Named(LogPrefix))
}

type PacketHeader struct {
Expand Down
4 changes: 1 addition & 3 deletions x-pack/filebeat/input/netflow/decoder/v6/v6.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
package v6

import (
"log"

"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/config"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/fields"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/protocol"
Expand Down Expand Up @@ -53,5 +51,5 @@ func init() {
}

func New(config config.Config) protocol.Protocol {
return v1.NewProtocol(ProtocolID, &templateV6, v5.ReadV5Header, log.New(config.LogOutput(), LogPrefix, 0))
return v1.NewProtocol(ProtocolID, &templateV6, v5.ReadV5Header, config.LogOutput().Named(LogPrefix))
}
3 changes: 1 addition & 2 deletions x-pack/filebeat/input/netflow/decoder/v7/v7.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"bytes"
"encoding/binary"
"io"
"log"
"net"
"time"

Expand Down Expand Up @@ -59,7 +58,7 @@ func init() {
}

func New(config config.Config) protocol.Protocol {
return v1.NewProtocol(ProtocolID, &v7template, ReadV7Header, log.New(config.LogOutput(), LogPrefix, 0))
return v1.NewProtocol(ProtocolID, &v7template, ReadV7Header, config.LogOutput().Named(LogPrefix))
}

type PacketHeader struct {
Expand Down
10 changes: 5 additions & 5 deletions x-pack/filebeat/input/netflow/decoder/v8/v8.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"encoding/binary"
"fmt"
"io"
"log"
"net"
"time"

Expand All @@ -18,6 +17,7 @@ import (
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/protocol"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/record"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/template"
"github.com/elastic/elastic-agent-libs/logp"
)

const (
Expand Down Expand Up @@ -300,7 +300,7 @@ var templates = map[AggType]*template.Template{
}

type NetflowV8Protocol struct {
logger *log.Logger
logger *logp.Logger
}

func init() {
Expand All @@ -309,7 +309,7 @@ func init() {

func New(config config.Config) protocol.Protocol {
return &NetflowV8Protocol{
logger: log.New(config.LogOutput(), LogPrefix, 0),
logger: logp.L().Named(LogPrefix),
}
}

Expand All @@ -320,12 +320,12 @@ func (NetflowV8Protocol) Version() uint16 {
func (p *NetflowV8Protocol) OnPacket(buf *bytes.Buffer, source net.Addr) (flows []record.Record, err error) {
header, err := ReadPacketHeader(buf)
if err != nil {
p.logger.Printf("Failed parsing packet: %v", err)
p.logger.Debugf("Failed parsing packet: %v", err)
return nil, fmt.Errorf("error reading V8 header: %w", err)
}
template, found := templates[header.Aggregation]
if !found {
p.logger.Printf("Packet from %s uses an unknown V8 aggregation: %d", source, header.Aggregation)
p.logger.Debugf("Packet from %s uses an unknown V8 aggregation: %d", source, header.Aggregation)
return nil, fmt.Errorf("unsupported V8 aggregation: %d", header.Aggregation)
}
metadata := header.GetMetadata(source)
Expand Down
13 changes: 7 additions & 6 deletions x-pack/filebeat/input/netflow/decoder/v9/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import (
"errors"
"fmt"
"io"
"log"
"net"
"time"

"github.com/elastic/elastic-agent-libs/logp"

"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/fields"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/record"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/template"
Expand All @@ -29,18 +30,18 @@ type Decoder interface {
ReadSetHeader(*bytes.Buffer) (SetHeader, error)
ReadTemplateSet(setID uint16, buf *bytes.Buffer) ([]*template.Template, error)
ReadFieldDefinition(*bytes.Buffer) (field fields.Key, length uint16, err error)
GetLogger() *log.Logger
GetLogger() *logp.Logger
GetFields() fields.FieldDict
}

type DecoderV9 struct {
Logger *log.Logger
Logger *logp.Logger
Fields fields.FieldDict
}

var _ Decoder = (*DecoderV9)(nil)

func (d DecoderV9) GetLogger() *log.Logger {
func (d DecoderV9) GetLogger() *logp.Logger {
return d.Logger
}

Expand Down Expand Up @@ -124,10 +125,10 @@ func ReadFields(d Decoder, buf *bytes.Buffer, count int) (record template.Templa
if length == template.VariableLength || min <= field.Length && field.Length <= max {
field.Info = fieldInfo
} else if logger != nil {
logger.Printf("Size of field %s in template is out of bounds (size=%d, min=%d, max=%d)", fieldInfo.Name, field.Length, min, max)
logger.Debugf("Size of field %s in template is out of bounds (size=%d, min=%d, max=%d)", fieldInfo.Name, field.Length, min, max)
}
} else if logger != nil {
logger.Printf("Field %v in template not found", key)
logger.Debugf("Field %v in template not found", key)
}
record.Fields[i] = field
}
Expand Down
Loading

0 comments on commit 4c1dd9a

Please sign in to comment.