From 4c1dd9a7af6e0f314aca5ef0d9ef8f31411aff39 Mon Sep 17 00:00:00 2001 From: Jonathan Molinatto Date: Wed, 8 Jan 2025 14:35:23 -0500 Subject: [PATCH] first step at moving away from using the standard log library --- .../input/netflow/decoder/config/config.go | 10 ++++---- .../decoder/examples/go-netflow-example.go | 24 ++++++++++--------- .../input/netflow/decoder/ipfix/ipfix.go | 4 +--- .../input/netflow/decoder/test/helper.go | 8 +++---- .../filebeat/input/netflow/decoder/v1/v1.go | 10 ++++---- .../filebeat/input/netflow/decoder/v5/v5.go | 3 +-- .../filebeat/input/netflow/decoder/v6/v6.go | 4 +--- .../filebeat/input/netflow/decoder/v7/v7.go | 3 +-- .../filebeat/input/netflow/decoder/v8/v8.go | 10 ++++---- .../input/netflow/decoder/v9/decoder.go | 13 +++++----- .../input/netflow/decoder/v9/session.go | 17 ++++++------- .../input/netflow/decoder/v9/session_test.go | 18 +++++++------- .../filebeat/input/netflow/decoder/v9/v9.go | 23 +++++++++--------- .../input/netflow/decoder/v9/v9_test.go | 8 ++++--- x-pack/filebeat/input/netflow/input.go | 22 +---------------- x-pack/filebeat/input/netflow/netflow_test.go | 4 ++-- 16 files changed, 81 insertions(+), 100 deletions(-) diff --git a/x-pack/filebeat/input/netflow/decoder/config/config.go b/x-pack/filebeat/input/netflow/decoder/config/config.go index 84f1e050c1fe..e28f918a3987 100644 --- a/x-pack/filebeat/input/netflow/decoder/config/config.go +++ b/x-pack/filebeat/input/netflow/decoder/config/config.go @@ -5,10 +5,10 @@ package config import ( - "io" "time" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/fields" + "github.com/elastic/elastic-agent-libs/logp" ) type ActiveSessionsMetric interface { @@ -19,7 +19,7 @@ type ActiveSessionsMetric interface { // Config stores the configuration used by the NetFlow Collector. type Config struct { protocols []string - logOutput io.Writer + logOutput *logp.Logger expiration time.Duration detectReset bool fields fields.FieldDict @@ -30,7 +30,7 @@ type Config struct { var defaultCfg = Config{ protocols: []string{}, - logOutput: io.Discard, + logOutput: logp.L().Named("netflow"), expiration: time.Hour, detectReset: true, sharedTemplates: false, @@ -53,7 +53,7 @@ func (c *Config) WithProtocols(protos ...string) *Config { } // WithLogOutput sets the output io.Writer for logging. -func (c *Config) WithLogOutput(output io.Writer) *Config { +func (c *Config) WithLogOutput(output *logp.Logger) *Config { c.logOutput = output return c } @@ -121,7 +121,7 @@ func (c *Config) Protocols() []string { } // LogOutput returns the io.Writer where logs are to be written. -func (c *Config) LogOutput() io.Writer { +func (c *Config) LogOutput() *logp.Logger { return c.logOutput } diff --git a/x-pack/filebeat/input/netflow/decoder/examples/go-netflow-example.go b/x-pack/filebeat/input/netflow/decoder/examples/go-netflow-example.go index c86e97e5d460..59d1ee8f0b04 100644 --- a/x-pack/filebeat/input/netflow/decoder/examples/go-netflow-example.go +++ b/x-pack/filebeat/input/netflow/decoder/examples/go-netflow-example.go @@ -8,43 +8,45 @@ import ( "bytes" "encoding/json" "fmt" - "log" "net" - "os" + + "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder" ) func main() { + logger := logp.L().Named("netflow") + decoder, err := decoder.NewDecoder(decoder.NewConfig(). - WithLogOutput(os.Stderr). + WithLogOutput(logger). WithProtocols("v1", "v5", "v9", "ipfix")) if err != nil { - log.Fatal("Failed creating decoder:", err) + logger.Fatal("Failed creating decoder:", err) } addr, err := net.ResolveUDPAddr("udp", ":2055") if err != nil { - log.Fatal("Failed to resolve address:", err) + logger.Fatal("Failed to resolve address:", err) } server, err := net.ListenUDP("udp", addr) if err != nil { - log.Fatalf("Failed to listen on %v: %v", addr, err) + logger.Fatalf("Failed to listen on %v: %v", addr, err) } defer server.Close() if err = server.SetReadBuffer(1 << 16); err != nil { - log.Fatalf("Failed to set read buffer size for socket: %v", err) + logger.Fatalf("Failed to set read buffer size for socket: %v", err) } - log.Println("Listening on ", server.LocalAddr()) + logger.Debug("Listening on ", server.LocalAddr()) buf := make([]byte, 8192) decBuf := new(bytes.Buffer) for { size, remote, err := server.ReadFromUDP(buf) if err != nil { - log.Println("Error reading from socket:", err) + logger.Debug("Error reading from socket:", err) continue } @@ -52,7 +54,7 @@ func main() { decBuf.Write(buf[:size]) records, err := decoder.Read(decBuf, remote) if err != nil { - log.Printf("warn: Failed reading records from %v: %v\n", remote, err) + logger.Debugf("warn: Failed reading records from %v: %v\n", remote, err) } for _, r := range records { @@ -63,7 +65,7 @@ func main() { "data": r.Fields, }) if err != nil { - log.Fatal(err) + logger.Fatal(err) } fmt.Println(string(evt)) } diff --git a/x-pack/filebeat/input/netflow/decoder/ipfix/ipfix.go b/x-pack/filebeat/input/netflow/decoder/ipfix/ipfix.go index b8799c2d3919..c3f3b0669b81 100644 --- a/x-pack/filebeat/input/netflow/decoder/ipfix/ipfix.go +++ b/x-pack/filebeat/input/netflow/decoder/ipfix/ipfix.go @@ -5,8 +5,6 @@ package ipfix import ( - "log" - "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/config" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/protocol" v9 "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/v9" @@ -29,7 +27,7 @@ func init() { } func New(config config.Config) protocol.Protocol { - logger := log.New(config.LogOutput(), LogPrefix, 0) + logger := config.LogOutput().Named(LogPrefix) decoder := DecoderIPFIX{ DecoderV9: v9.DecoderV9{Logger: logger, Fields: config.Fields()}, } diff --git a/x-pack/filebeat/input/netflow/decoder/test/helper.go b/x-pack/filebeat/input/netflow/decoder/test/helper.go index f62d03fa87ad..1e2bbbbeb83b 100644 --- a/x-pack/filebeat/input/netflow/decoder/test/helper.go +++ b/x-pack/filebeat/input/netflow/decoder/test/helper.go @@ -11,6 +11,8 @@ import ( "strconv" "testing" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/stretchr/testify/assert" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/record" @@ -18,11 +20,7 @@ import ( type TestLogWriter struct { testing.TB -} - -func (t TestLogWriter) Write(buf []byte) (int, error) { - t.Log(string(buf)) - return len(buf), nil + log *logp.Logger } func MakeAddress(t testing.TB, ipPortPair string) net.Addr { diff --git a/x-pack/filebeat/input/netflow/decoder/v1/v1.go b/x-pack/filebeat/input/netflow/decoder/v1/v1.go index e023341c4ad4..1e23e35dbca5 100644 --- a/x-pack/filebeat/input/netflow/decoder/v1/v1.go +++ b/x-pack/filebeat/input/netflow/decoder/v1/v1.go @@ -9,7 +9,6 @@ import ( "encoding/binary" "fmt" "io" - "log" "net" "time" @@ -18,6 +17,7 @@ import ( "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/protocol" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/record" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/template" + "github.com/elastic/elastic-agent-libs/logp" ) const ( @@ -52,7 +52,7 @@ var templateV1 = template.Template{ type ReadHeaderFn func(*bytes.Buffer, net.Addr) (int, time.Time, record.Map, error) type NetflowProtocol struct { - logger *log.Logger + logger *logp.Logger flowTemplate *template.Template version uint16 readHeader ReadHeaderFn @@ -63,10 +63,10 @@ func init() { } func New(config config.Config) protocol.Protocol { - return NewProtocol(ProtocolID, &templateV1, readV1Header, log.New(config.LogOutput(), LogPrefix, 0)) + return NewProtocol(ProtocolID, &templateV1, readV1Header, logp.L().Named(LogPrefix)) } -func NewProtocol(version uint16, template *template.Template, readHeader ReadHeaderFn, logger *log.Logger) protocol.Protocol { +func NewProtocol(version uint16, template *template.Template, readHeader ReadHeaderFn, logger *logp.Logger) protocol.Protocol { return &NetflowProtocol{ logger: logger, flowTemplate: template, @@ -90,7 +90,7 @@ func (NetflowProtocol) Stop() error { func (p *NetflowProtocol) OnPacket(buf *bytes.Buffer, source net.Addr) (flows []record.Record, err error) { numFlows, timestamp, metadata, err := p.readHeader(buf, source) if err != nil { - p.logger.Printf("Failed parsing packet: %v", err) + p.logger.Debugf("Failed parsing packet: %v", err) return nil, fmt.Errorf("error reading netflow header: %w", err) } flows, err = p.flowTemplate.Apply(buf, numFlows) diff --git a/x-pack/filebeat/input/netflow/decoder/v5/v5.go b/x-pack/filebeat/input/netflow/decoder/v5/v5.go index 74d4adbb70e0..53141a321355 100644 --- a/x-pack/filebeat/input/netflow/decoder/v5/v5.go +++ b/x-pack/filebeat/input/netflow/decoder/v5/v5.go @@ -8,7 +8,6 @@ import ( "bytes" "encoding/binary" "io" - "log" "net" "time" @@ -58,7 +57,7 @@ func init() { } func New(config config.Config) protocol.Protocol { - return v1.NewProtocol(ProtocolID, &templateV5, ReadV5Header, log.New(config.LogOutput(), LogPrefix, 0)) + return v1.NewProtocol(ProtocolID, &templateV5, ReadV5Header, config.LogOutput().Named(LogPrefix)) } type PacketHeader struct { diff --git a/x-pack/filebeat/input/netflow/decoder/v6/v6.go b/x-pack/filebeat/input/netflow/decoder/v6/v6.go index a5d1bc339e97..ce7d07bf193d 100644 --- a/x-pack/filebeat/input/netflow/decoder/v6/v6.go +++ b/x-pack/filebeat/input/netflow/decoder/v6/v6.go @@ -5,8 +5,6 @@ package v6 import ( - "log" - "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/config" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/fields" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/protocol" @@ -53,5 +51,5 @@ func init() { } func New(config config.Config) protocol.Protocol { - return v1.NewProtocol(ProtocolID, &templateV6, v5.ReadV5Header, log.New(config.LogOutput(), LogPrefix, 0)) + return v1.NewProtocol(ProtocolID, &templateV6, v5.ReadV5Header, config.LogOutput().Named(LogPrefix)) } diff --git a/x-pack/filebeat/input/netflow/decoder/v7/v7.go b/x-pack/filebeat/input/netflow/decoder/v7/v7.go index 62cbdc56a065..0c990f57123d 100644 --- a/x-pack/filebeat/input/netflow/decoder/v7/v7.go +++ b/x-pack/filebeat/input/netflow/decoder/v7/v7.go @@ -8,7 +8,6 @@ import ( "bytes" "encoding/binary" "io" - "log" "net" "time" @@ -59,7 +58,7 @@ func init() { } func New(config config.Config) protocol.Protocol { - return v1.NewProtocol(ProtocolID, &v7template, ReadV7Header, log.New(config.LogOutput(), LogPrefix, 0)) + return v1.NewProtocol(ProtocolID, &v7template, ReadV7Header, config.LogOutput().Named(LogPrefix)) } type PacketHeader struct { diff --git a/x-pack/filebeat/input/netflow/decoder/v8/v8.go b/x-pack/filebeat/input/netflow/decoder/v8/v8.go index 9fa88ea1c686..f1ca1655679e 100644 --- a/x-pack/filebeat/input/netflow/decoder/v8/v8.go +++ b/x-pack/filebeat/input/netflow/decoder/v8/v8.go @@ -9,7 +9,6 @@ import ( "encoding/binary" "fmt" "io" - "log" "net" "time" @@ -18,6 +17,7 @@ import ( "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/protocol" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/record" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/template" + "github.com/elastic/elastic-agent-libs/logp" ) const ( @@ -300,7 +300,7 @@ var templates = map[AggType]*template.Template{ } type NetflowV8Protocol struct { - logger *log.Logger + logger *logp.Logger } func init() { @@ -309,7 +309,7 @@ func init() { func New(config config.Config) protocol.Protocol { return &NetflowV8Protocol{ - logger: log.New(config.LogOutput(), LogPrefix, 0), + logger: logp.L().Named(LogPrefix), } } @@ -320,12 +320,12 @@ func (NetflowV8Protocol) Version() uint16 { func (p *NetflowV8Protocol) OnPacket(buf *bytes.Buffer, source net.Addr) (flows []record.Record, err error) { header, err := ReadPacketHeader(buf) if err != nil { - p.logger.Printf("Failed parsing packet: %v", err) + p.logger.Debugf("Failed parsing packet: %v", err) return nil, fmt.Errorf("error reading V8 header: %w", err) } template, found := templates[header.Aggregation] if !found { - p.logger.Printf("Packet from %s uses an unknown V8 aggregation: %d", source, header.Aggregation) + p.logger.Debugf("Packet from %s uses an unknown V8 aggregation: %d", source, header.Aggregation) return nil, fmt.Errorf("unsupported V8 aggregation: %d", header.Aggregation) } metadata := header.GetMetadata(source) diff --git a/x-pack/filebeat/input/netflow/decoder/v9/decoder.go b/x-pack/filebeat/input/netflow/decoder/v9/decoder.go index bd34b424d2f3..d283bab506be 100644 --- a/x-pack/filebeat/input/netflow/decoder/v9/decoder.go +++ b/x-pack/filebeat/input/netflow/decoder/v9/decoder.go @@ -10,10 +10,11 @@ import ( "errors" "fmt" "io" - "log" "net" "time" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/fields" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/record" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/template" @@ -29,18 +30,18 @@ type Decoder interface { ReadSetHeader(*bytes.Buffer) (SetHeader, error) ReadTemplateSet(setID uint16, buf *bytes.Buffer) ([]*template.Template, error) ReadFieldDefinition(*bytes.Buffer) (field fields.Key, length uint16, err error) - GetLogger() *log.Logger + GetLogger() *logp.Logger GetFields() fields.FieldDict } type DecoderV9 struct { - Logger *log.Logger + Logger *logp.Logger Fields fields.FieldDict } var _ Decoder = (*DecoderV9)(nil) -func (d DecoderV9) GetLogger() *log.Logger { +func (d DecoderV9) GetLogger() *logp.Logger { return d.Logger } @@ -124,10 +125,10 @@ func ReadFields(d Decoder, buf *bytes.Buffer, count int) (record template.Templa if length == template.VariableLength || min <= field.Length && field.Length <= max { field.Info = fieldInfo } else if logger != nil { - logger.Printf("Size of field %s in template is out of bounds (size=%d, min=%d, max=%d)", fieldInfo.Name, field.Length, min, max) + logger.Debugf("Size of field %s in template is out of bounds (size=%d, min=%d, max=%d)", fieldInfo.Name, field.Length, min, max) } } else if logger != nil { - logger.Printf("Field %v in template not found", key) + logger.Debugf("Field %v in template not found", key) } record.Fields[i] = field } diff --git a/x-pack/filebeat/input/netflow/decoder/v9/session.go b/x-pack/filebeat/input/netflow/decoder/v9/session.go index 492576f6b962..7675a1586513 100644 --- a/x-pack/filebeat/input/netflow/decoder/v9/session.go +++ b/x-pack/filebeat/input/netflow/decoder/v9/session.go @@ -5,11 +5,12 @@ package v9 import ( - "log" "net" "sync" "time" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/atomic" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/config" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/template" @@ -45,12 +46,12 @@ type SessionState struct { mutex sync.RWMutex Templates map[TemplateKey]*TemplateWrapper lastSequence uint32 - logger *log.Logger + logger *logp.Logger Delete atomic.Bool } // NewSession creates a new session. -func NewSession(logger *log.Logger) *SessionState { +func NewSession(logger *logp.Logger) *SessionState { return &SessionState{ logger: logger, Templates: make(map[TemplateKey]*TemplateWrapper), @@ -59,7 +60,7 @@ func NewSession(logger *log.Logger) *SessionState { // AddTemplate adds the passed template. func (s *SessionState) AddTemplate(t *template.Template) { - s.logger.Printf("state %p addTemplate %d %p", s, t.ID, t) + s.logger.Debugf("state %p addTemplate %d %p", s, t.ID, t) s.mutex.Lock() defer s.mutex.Unlock() s.Templates[TemplateKey(t.ID)] = &TemplateWrapper{Template: t} @@ -94,7 +95,7 @@ func (s *SessionState) ExpireTemplates() (alive int, removed int) { total = len(s.Templates) for _, id := range toDelete { if template, found := s.Templates[id]; found && template.Delete.Load() { - s.logger.Printf("expired template %v", id) + s.logger.Debugf("expired template %v", id) delete(s.Templates, id) removed++ } @@ -125,12 +126,12 @@ func isValidSequence(current, next uint32) bool { type SessionMap struct { mutex sync.RWMutex Sessions map[SessionKey]*SessionState - logger *log.Logger + logger *logp.Logger metric config.ActiveSessionsMetric } // NewSessionMap returns a new SessionMap. -func NewSessionMap(logger *log.Logger, metric config.ActiveSessionsMetric) SessionMap { +func NewSessionMap(logger *logp.Logger, metric config.ActiveSessionsMetric) SessionMap { return SessionMap{ logger: logger, Sessions: make(map[SessionKey]*SessionState), @@ -216,7 +217,7 @@ func (m *SessionMap) CleanupLoop(interval time.Duration, done <-chan struct{}) { case <-t.C: aliveS, removedS, aliveT, removedT := m.cleanup() if removedS > 0 || removedT > 0 { - m.logger.Printf("Expired %d sessions (%d remain) / %d templates (%d remain)", removedS, aliveS, removedT, aliveT) + m.logger.Debugf("Expired %d sessions (%d remain) / %d templates (%d remain)", removedS, aliveS, removedT, aliveT) } } } diff --git a/x-pack/filebeat/input/netflow/decoder/v9/session_test.go b/x-pack/filebeat/input/netflow/decoder/v9/session_test.go index 8c10b2b98e94..c83d80384e1b 100644 --- a/x-pack/filebeat/input/netflow/decoder/v9/session_test.go +++ b/x-pack/filebeat/input/netflow/decoder/v9/session_test.go @@ -5,26 +5,28 @@ package v9 import ( - "io" - "log" "math" "sync" "testing" "time" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/stretchr/testify/assert" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/template" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/test" ) -var logger = log.New(io.Discard, "", 0) +var logerr = logp.DevelopmentSetup(logp.ToDiscardOutput()) +var glogger = logp.NewLogger("") func makeSessionKey(t testing.TB, ipPortPair string, domain uint32) SessionKey { return MakeSessionKey(test.MakeAddress(t, ipPortPair), domain, false) } func TestSessionMap_GetOrCreate(t *testing.T) { + var logger = glogger.Named("session_map") t.Run("consistent behavior", func(t *testing.T) { sm := NewSessionMap(logger, nil) @@ -101,7 +103,7 @@ func testTemplate(id uint16) *template.Template { } func TestSessionState(t *testing.T) { - logger := log.New(io.Discard, "", 0) + var logger = glogger.Named("session_state") t.Run("create and get", func(t *testing.T) { s := NewSession(logger) t1 := testTemplate(1) @@ -133,7 +135,7 @@ func TestSessionState(t *testing.T) { } func TestSessionMap_Cleanup(t *testing.T) { - sm := NewSessionMap(logger, nil) + sm := NewSessionMap(glogger, nil) // Session is created k1 := makeSessionKey(t, "127.0.0.1:1234", 1) @@ -180,7 +182,7 @@ func TestSessionMap_Cleanup(t *testing.T) { func TestSessionMap_CleanupLoop(t *testing.T) { timeout := time.Millisecond * 100 - sm := NewSessionMap(log.New(io.Discard, "", 0), nil) + sm := NewSessionMap(glogger.Named(""), nil) key := makeSessionKey(t, "127.0.0.1:1", 42) s := sm.GetOrCreate(key) @@ -201,7 +203,7 @@ func TestSessionMap_CleanupLoop(t *testing.T) { } func TestTemplateExpiration(t *testing.T) { - s := NewSession(logger) + s := NewSession(glogger) assert.Nil(t, s.GetTemplate(256)) assert.Nil(t, s.GetTemplate(257)) s.AddTemplate(testTemplate(256)) @@ -263,7 +265,7 @@ func TestSessionCheckReset(t *testing.T) { }, } { t.Run(testCase.title, func(t *testing.T) { - s := NewSession(logger) + s := NewSession(glogger) s.lastSequence = testCase.current prev, isReset := s.CheckReset(testCase.next) assert.Equal(t, prev, testCase.current) diff --git a/x-pack/filebeat/input/netflow/decoder/v9/v9.go b/x-pack/filebeat/input/netflow/decoder/v9/v9.go index 2fafe452c623..a7c58dfb8d55 100644 --- a/x-pack/filebeat/input/netflow/decoder/v9/v9.go +++ b/x-pack/filebeat/input/netflow/decoder/v9/v9.go @@ -8,10 +8,11 @@ import ( "bytes" "context" "fmt" - "log" "net" "time" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/config" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/protocol" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/record" @@ -30,7 +31,7 @@ type NetflowV9Protocol struct { ctx context.Context cancel context.CancelFunc decoder Decoder - logger *log.Logger + logger *logp.Logger Session SessionMap timeout time.Duration cache *pendingTemplatesCache @@ -43,11 +44,11 @@ func init() { } func New(config config.Config) protocol.Protocol { - logger := log.New(config.LogOutput(), LogPrefix, 0) + logger := config.LogOutput().Named(LogPrefix) return NewProtocolWithDecoder(DecoderV9{Logger: logger, Fields: config.Fields()}, config, logger) } -func NewProtocolWithDecoder(decoder Decoder, config config.Config, logger *log.Logger) *NetflowV9Protocol { +func NewProtocolWithDecoder(decoder Decoder, config config.Config, logger *logp.Logger) *NetflowV9Protocol { ctx, cancel := context.WithCancel(context.Background()) pd := &NetflowV9Protocol{ ctx: ctx, @@ -93,7 +94,7 @@ func (p *NetflowV9Protocol) Stop() error { func (p *NetflowV9Protocol) OnPacket(buf *bytes.Buffer, source net.Addr) (flows []record.Record, err error) { header, payload, numFlowSets, err := p.decoder.ReadPacketHeader(buf) if err != nil { - p.logger.Printf("Unable to read V9 header: %v", err) + p.logger.Debugf("Unable to read V9 header: %v", err) return nil, fmt.Errorf("error reading header: %w", err) } buf = payload @@ -103,10 +104,10 @@ func (p *NetflowV9Protocol) OnPacket(buf *bytes.Buffer, source net.Addr) (flows session := p.Session.GetOrCreate(sessionKey) remote := source.String() - p.logger.Printf("Packet from:%s src:%d seq:%d", remote, header.SourceID, header.SequenceNo) + p.logger.Debugf("Packet from:%s src:%d seq:%d", remote, header.SourceID, header.SequenceNo) if p.detectReset { if prev, reset := session.CheckReset(header.SequenceNo); reset { - p.logger.Printf("Session %s reset (sequence=%d last=%d)", remote, header.SequenceNo, prev) + p.logger.Debugf("Session %s reset (sequence=%d last=%d)", remote, header.SequenceNo, prev) } } @@ -116,15 +117,15 @@ func (p *NetflowV9Protocol) OnPacket(buf *bytes.Buffer, source net.Addr) (flows break } if buf.Len() < set.BodyLength() { - p.logger.Printf("FlowSet ID %+v overflows packet from %s", set, source) + p.logger.Debugf("FlowSet ID %+v overflows packet from %s", set, source) break } body := bytes.NewBuffer(buf.Next(set.BodyLength())) - p.logger.Printf("FlowSet ID %d length %d", set.SetID, set.BodyLength()) + p.logger.Debugf("FlowSet ID %d length %d", set.SetID, set.BodyLength()) f, err := p.parseSet(set.SetID, sessionKey, session, body) if err != nil { - p.logger.Printf("Error parsing set %d: %v", set.SetID, err) + p.logger.Debugf("Error parsing set %d: %v", set.SetID, err) return nil, fmt.Errorf("error parsing set: %w", err) } flows = append(flows, f...) @@ -151,7 +152,7 @@ func (p *NetflowV9Protocol) parseSet( if p.cache != nil { p.cache.Add(key, buf) } else { - p.logger.Printf("No template for ID %d", setID) + p.logger.Debugf("No template for ID %d", setID) } return nil, nil } diff --git a/x-pack/filebeat/input/netflow/decoder/v9/v9_test.go b/x-pack/filebeat/input/netflow/decoder/v9/v9_test.go index a98f6150f6aa..4a720e2aff35 100644 --- a/x-pack/filebeat/input/netflow/decoder/v9/v9_test.go +++ b/x-pack/filebeat/input/netflow/decoder/v9/v9_test.go @@ -10,6 +10,8 @@ import ( "github.com/stretchr/testify/assert" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/config" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/fields" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/test" @@ -162,7 +164,7 @@ func TestSessionReset(t *testing.T) { } t.Run("Reset disabled", func(t *testing.T) { cfg := config.Defaults() - cfg.WithSequenceResetEnabled(false).WithLogOutput(test.TestLogWriter{TB: t}) + cfg.WithSequenceResetEnabled(false).WithLogOutput(logp.NewLogger("v9_test")) proto := New(cfg) flows, err := proto.OnPacket(test.MakePacket(templatePacket), addr) assert.NoError(t, err) @@ -173,7 +175,7 @@ func TestSessionReset(t *testing.T) { }) t.Run("Reset enabled", func(t *testing.T) { cfg := config.Defaults() - cfg.WithSequenceResetEnabled(true).WithLogOutput(test.TestLogWriter{TB: t}) + cfg.WithSequenceResetEnabled(true).WithLogOutput(logp.NewLogger("v9_test")) proto := New(cfg) flows, err := proto.OnPacket(test.MakePacket(templatePacket), addr) assert.NoError(t, err) @@ -193,7 +195,7 @@ func TestSessionReset(t *testing.T) { return test.MakePacket(tmp) } cfg := config.Defaults() - cfg.WithSequenceResetEnabled(true).WithLogOutput(test.TestLogWriter{TB: t}) + cfg.WithSequenceResetEnabled(true).WithLogOutput(logp.NewLogger("v9_test")) proto := New(cfg) flows, err := proto.OnPacket(mkPack(templatePacket, 1, 1000), addr) assert.NoError(t, err) diff --git a/x-pack/filebeat/input/netflow/input.go b/x-pack/filebeat/input/netflow/input.go index bb4046b74a91..5cee693b3c61 100644 --- a/x-pack/filebeat/input/netflow/input.go +++ b/x-pack/filebeat/input/netflow/input.go @@ -132,7 +132,7 @@ func (n *netflowInput) Run(env v2.Context, connector beat.PipelineConnector) err n.decoder, err = decoder.NewDecoder(decoder.NewConfig(). WithProtocols(n.cfg.Protocols...). WithExpiration(n.cfg.ExpirationTimeout). - WithLogOutput(&logDebugWrapper{Logger: n.logger}). + WithLogOutput(n.logger). WithCustomFields(n.customFields...). WithSequenceResetEnabled(n.cfg.DetectSequenceReset). WithSharedTemplates(n.cfg.ShareTemplates). @@ -236,26 +236,6 @@ func (n *netflowInput) Run(env v2.Context, connector beat.PipelineConnector) err return nil } -// An adapter so that logp.Logger can be used as a log.Logger. -type logDebugWrapper struct { - sync.Mutex - Logger *logp.Logger - buf []byte -} - -// Write writes messages to the log. -func (w *logDebugWrapper) Write(p []byte) (n int, err error) { - w.Lock() - defer w.Unlock() - n = len(p) - w.buf = append(w.buf, p...) - for endl := bytes.IndexByte(w.buf, '\n'); endl != -1; endl = bytes.IndexByte(w.buf, '\n') { - w.Logger.Debug(string(w.buf[:endl])) - w.buf = w.buf[endl+1:] - } - return n, nil -} - // stop stops the netflow input func (n *netflowInput) stop() { n.mtx.Lock() diff --git a/x-pack/filebeat/input/netflow/netflow_test.go b/x-pack/filebeat/input/netflow/netflow_test.go index 127445af1b85..dd4571d291d6 100644 --- a/x-pack/filebeat/input/netflow/netflow_test.go +++ b/x-pack/filebeat/input/netflow/netflow_test.go @@ -290,7 +290,7 @@ func getFlowsFromDat(t testing.TB, name string, testCase TestCase) TestResult { WithProtocols(protocol.Registry.All()...). WithSequenceResetEnabled(false). WithExpiration(0). - WithLogOutput(test.TestLogWriter{TB: t}) + WithLogOutput(logp.NewLogger("netflow_test")) for _, fieldFile := range testCase.Fields { fields, err := LoadFieldDefinitionsFromFile(filepath.Join(fieldsDir, fieldFile)) @@ -352,7 +352,7 @@ func getFlowsFromPCAP(t testing.TB, name, pcapFile string) TestResult { WithSequenceResetEnabled(false). WithExpiration(0). WithCache(strings.HasSuffix(pcapFile, ".reversed.pcap")). - WithLogOutput(test.TestLogWriter{TB: t}) + WithLogOutput(logp.NewLogger("netflow_test")) decoder, err := decoder.NewDecoder(config) if !assert.NoError(t, err) {