Skip to content

Commit

Permalink
minor changes to hash calc
Browse files Browse the repository at this point in the history
  • Loading branch information
mosajjal committed Nov 15, 2021
1 parent 8164d61 commit a780d28
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 49 deletions.
30 changes: 16 additions & 14 deletions src/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package main

import (
"encoding/hex"
"fmt"
"os"
"runtime/trace"
"testing"

"github.com/google/gopacket"
Expand Down Expand Up @@ -40,16 +43,15 @@ func DummySink() {
select {
case <-testResultChannel:
cnt++
if cnt%1000000 == 0 {
println(cnt)
}
}
}
}

var encoderList []packetEncoder

func benchmarkUdpPacketProcessingWorker(i uint, b *testing.B) {
func benchmarkUdpPacketProcessingWorker(workers uint, b *testing.B) {
fmt.Println("Benchmarking UDP Packet Processing Worker")
file, _ := os.Create("trace.out")
trace.Start(file)
defer trace.Stop()
e := packetEncoder{
53,
testInputChannel,
Expand All @@ -60,11 +62,10 @@ func benchmarkUdpPacketProcessingWorker(i uint, b *testing.B) {
testTcpChannels,
testReturnChannel,
testResultChannel,
i,
workers,
testDoneChannel,
false,
}
encoderList = append(encoderList, e)
go e.run()
testTcpChannels = append(testTcpChannels, make(chan tcpPacket, TCPAssemblyChannelSize))
go DummySink()
Expand All @@ -74,9 +75,10 @@ func benchmarkUdpPacketProcessingWorker(i uint, b *testing.B) {
}
}

func BenchmarkUdpPacketProcessingWorker1(b *testing.B) { benchmarkUdpPacketProcessingWorker(1, b) }
func BenchmarkUdpPacketProcessingWorker2(b *testing.B) { benchmarkUdpPacketProcessingWorker(2, b) }
func BenchmarkUdpPacketProcessingWorker4(b *testing.B) { benchmarkUdpPacketProcessingWorker(4, b) }
func BenchmarkUdpPacketProcessingWorker6(b *testing.B) { benchmarkUdpPacketProcessingWorker(6, b) }
func BenchmarkUdpPacketProcessingWorker8(b *testing.B) { benchmarkUdpPacketProcessingWorker(8, b) }
func BenchmarkUdpPacketProcessingWorker16(b *testing.B) { benchmarkUdpPacketProcessingWorker(16, b) }
func BenchmarkUdpPacketProcessingWorker1(b *testing.B) { benchmarkUdpPacketProcessingWorker(1, b) }

// func BenchmarkUdpPacketProcessingWorker2(b *testing.B) { benchmarkUdpPacketProcessingWorker(2, b) }
// func BenchmarkUdpPacketProcessingWorker4(b *testing.B) { benchmarkUdpPacketProcessingWorker(4, b) }
// func BenchmarkUdpPacketProcessingWorker6(b *testing.B) { benchmarkUdpPacketProcessingWorker(6, b) }
// func BenchmarkUdpPacketProcessingWorker8(b *testing.B) { benchmarkUdpPacketProcessingWorker(8, b) }
// func BenchmarkUdpPacketProcessingWorker16(b *testing.B) { benchmarkUdpPacketProcessingWorker(16, b) }
68 changes: 33 additions & 35 deletions src/packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/google/gopacket/layers"
mkdns "github.com/miekg/dns"
"github.com/mosajjal/dnsmonster/types"
log "github.com/sirupsen/logrus"
)

func (encoder *packetEncoder) processTransport(foundLayerTypes *[]gopacket.LayerType, udp *layers.UDP, tcp *layers.TCP, flow gopacket.Flow, timestamp time.Time, IPVersion uint8, SrcIP, DstIP net.IP) {
Expand Down Expand Up @@ -62,48 +63,44 @@ func (encoder *packetEncoder) inputHandlerWorker(p chan gopacket.Packet) {
}
parser := gopacket.NewDecodingLayerParser(startLayer, decodeLayers...)
foundLayerTypes := []gopacket.LayerType{}
for {
select {
case packet := <-p:
timestamp := packet.Metadata().Timestamp
if timestamp.IsZero() {
timestamp = time.Now()
}
_ = parser.DecodeLayers(packet.Data(), &foundLayerTypes)
// first parse the ip layer, so we can find fragmented packets
for _, layerType := range foundLayerTypes {
switch layerType {
case layers.LayerTypeIPv4:
// Check for fragmentation
if ip4.Flags&layers.IPv4DontFragment == 0 && (ip4.Flags&layers.IPv4MoreFragments != 0 || ip4.FragOffset != 0) {
// Packet is fragmented, send it to the defragger
encoder.ip4Defrgger <- ipv4ToDefrag{
ip4,
timestamp,
}
break
for packet := range p {
timestamp := packet.Metadata().Timestamp
if timestamp.IsZero() {
timestamp = time.Now()
}
_ = parser.DecodeLayers(packet.Data(), &foundLayerTypes)
// first parse the ip layer, so we can find fragmented packets
for _, layerType := range foundLayerTypes {
switch layerType {
case layers.LayerTypeIPv4:
// Check for fragmentation
if ip4.Flags&layers.IPv4DontFragment == 0 && (ip4.Flags&layers.IPv4MoreFragments != 0 || ip4.FragOffset != 0) {
// Packet is fragmented, send it to the defragger
encoder.ip4Defrgger <- ipv4ToDefrag{
ip4,
timestamp,
}
} else {
// log.Infof("packet %v coming to %p\n", timestamp, &encoder)
encoder.processTransport(&foundLayerTypes, &udp, &tcp, ip4.NetworkFlow(), timestamp, 4, ip4.SrcIP, ip4.DstIP)
continue
case layers.LayerTypeIPv6:
// Store the packet metadata
if ip6.NextHeader == layers.IPProtocolIPv6Fragment {
// TODO: Move the parsing to DecodingLayer when gopacket support it
if frag := packet.Layer(layers.LayerTypeIPv6Fragment).(*layers.IPv6Fragment); frag != nil {
encoder.ip6Defrgger <- ipv6FragmentInfo{
ip6,
*frag,
timestamp,
}
}
case layers.LayerTypeIPv6:
// Store the packet metadata
if ip6.NextHeader == layers.IPProtocolIPv6Fragment {
// TODO: Move the parsing to DecodingLayer when gopacket support it
if frag := packet.Layer(layers.LayerTypeIPv6Fragment).(*layers.IPv6Fragment); frag != nil {
encoder.ip6Defrgger <- ipv6FragmentInfo{
ip6,
*frag,
timestamp,
}
} else {
encoder.processTransport(&foundLayerTypes, &udp, &tcp, ip6.NetworkFlow(), timestamp, 6, ip6.SrcIP, ip6.DstIP)
}
} else {
encoder.processTransport(&foundLayerTypes, &udp, &tcp, ip6.NetworkFlow(), timestamp, 6, ip6.SrcIP, ip6.DstIP)
}
}
break
}

}

}
Expand All @@ -127,6 +124,7 @@ func (encoder *packetEncoder) run() {

var handlerChanList []chan gopacket.Packet
for i := 0; i < int(encoder.handlerCount); i++ {
log.Infof("Creating handler #%d\n", i)
handlerChanList = append(handlerChanList, make(chan gopacket.Packet, 10000)) //todo: parameter for size of this channel needs to be defined
go encoder.inputHandlerWorker(handlerChanList[i])
}
Expand Down Expand Up @@ -167,7 +165,7 @@ func (encoder *packetEncoder) run() {
}
encoder.processTransport(&foundLayerTypes, &udp, &tcp, packet.ip.NetworkFlow(), packet.timestamp, 6, packet.ip.SrcIP, packet.ip.DstIP)
case packet := <-encoder.input:
handlerChanList[packet.TransportLayer().TransportFlow().FastHash()%uint64(encoder.handlerCount)] <- packet
handlerChanList[packet.NetworkLayer().NetworkFlow().FastHash()%uint64(encoder.handlerCount)] <- packet
case <-encoder.done:
continue
}
Expand Down

0 comments on commit a780d28

Please sign in to comment.