diff --git a/go/cron/aggregator.go b/go/cron/aggregator.go index 7cdf33d0bdd..f585c27dd1b 100644 --- a/go/cron/aggregator.go +++ b/go/cron/aggregator.go @@ -56,8 +56,12 @@ type Aggregator struct { db *sql.DB } +func emptyMac(mac string) bool { + return mac == "00:00:00:00:00:00" || mac == "" +} + func updateMacs(ctx context.Context, f *PfFlow, stmt *sql.Stmt) { - if f.SrcMac != "00:00:00:00:00:00" && f.DstMac != "00:00:00:00:00:00" { + if !emptyMac(f.SrcMac) && !emptyMac(f.DstMac) { return } @@ -67,11 +71,11 @@ func updateMacs(ctx context.Context, f *PfFlow, stmt *sql.Stmt) { log.LogErrorf(ctx, "updateMacs Database Error: %s", err.Error()) } - if f.SrcMac == "00:00:00:00:00:00" { + if emptyMac(f.SrcMac) { f.SrcMac = srcMac } - if f.DstMac == "00:00:00:00:00:00" { + if emptyMac(f.DstMac) { f.DstMac = dstMac } } @@ -429,16 +433,16 @@ loop: for _, pfflows := range pfflowsArray { log.LogInfof(ctx, "Received %d flows of FlowType %s", len(*pfflows.Flows), flowType(pfflows.Header.FlowType)) for _, f := range *pfflows.Flows { + if stmt != nil { + updateMacs(ctx, &f, stmt) + } + key := f.Key(&pfflows.Header) val := a.events[key] if a.Heuristics > 0 { f.Heuristics() } - if stmt != nil { - updateMacs(ctx, &f, stmt) - } - a.events[key] = append(val, f) } } diff --git a/go/cron/aggregator_test.go b/go/cron/aggregator_test.go index 0368b9d2862..27982d6478b 100644 --- a/go/cron/aggregator_test.go +++ b/go/cron/aggregator_test.go @@ -17,6 +17,8 @@ func TestAggregator(t *testing.T) { }, Flows: &[]PfFlow{ { + SrcMac: "00:11:22:33:44:55", + DstMac: "00:11:22:33:44:56", SrcIp: netip.AddrFrom4([4]byte{1, 1, 1, 2}), DstIp: netip.AddrFrom4([4]byte{1, 1, 1, 1}), SrcPort: 80, @@ -26,6 +28,8 @@ func TestAggregator(t *testing.T) { ConnectionCount: 2, }, { + SrcMac: "00:11:22:33:44:56", + DstMac: "00:11:22:33:44:55", SrcIp: netip.AddrFrom4([4]byte{1, 1, 1, 1}), DstIp: netip.AddrFrom4([4]byte{1, 1, 1, 2}), SrcPort: 1024, @@ -35,6 +39,8 @@ func TestAggregator(t *testing.T) { ConnectionCount: 2, }, { + SrcMac: "00:11:22:33:44:55", + DstMac: "00:11:22:33:44:56", SrcIp: netip.AddrFrom4([4]byte{1, 1, 1, 2}), DstIp: netip.AddrFrom4([4]byte{1, 1, 1, 1}), SrcPort: 80, @@ -44,6 +50,8 @@ func TestAggregator(t *testing.T) { ConnectionCount: 2, }, { + SrcMac: "00:11:22:33:44:56", + DstMac: "00:11:22:33:44:55", SrcIp: netip.AddrFrom4([4]byte{1, 1, 1, 1}), DstIp: netip.AddrFrom4([4]byte{1, 1, 1, 2}), SrcPort: 1025, @@ -91,6 +99,8 @@ func TestAggregator(t *testing.T) { { Flows: &[]PfFlow{ { + SrcMac: "00:11:22:33:44:55", + DstMac: "00:11:22:33:44:56", SrcIp: netip.AddrFrom4([4]byte{1, 1, 1, 1}), DstIp: netip.AddrFrom4([4]byte{1, 1, 1, 2}), SrcPort: 1024, @@ -99,6 +109,8 @@ func TestAggregator(t *testing.T) { BiFlow: 0, }, { + SrcMac: "00:11:22:33:44:55", + DstMac: "00:11:22:33:44:56", SrcIp: netip.AddrFrom4([4]byte{1, 1, 1, 1}), DstIp: netip.AddrFrom4([4]byte{1, 1, 1, 2}), SrcPort: 1025, @@ -107,6 +119,8 @@ func TestAggregator(t *testing.T) { BiFlow: 0, }, { + SrcMac: "00:11:22:33:44:55", + DstMac: "00:11:22:33:44:56", SrcIp: netip.AddrFrom4([4]byte{1, 1, 1, 1}), DstIp: netip.AddrFrom4([4]byte{1, 1, 1, 2}), SrcPort: 1024,