Skip to content

Commit

Permalink
new system for worker process
Browse files Browse the repository at this point in the history
  • Loading branch information
mosajjal committed Nov 14, 2021
1 parent 28b30ad commit f01d247
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 68 deletions.
11 changes: 4 additions & 7 deletions src/capture_pcap.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"sync"
"time"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -73,7 +72,6 @@ func newDNSCapturer(options CaptureOptions) DNSCapturer {

go ipv4Defragger(ip4DefraggerChannel, ip4DefraggerReturn, options.GcTime, options.Done)
go ipv6Defragger(ip6DefraggerChannel, ip6DefraggerReturn, options.GcTime, options.Done)

encoder := packetEncoder{
options.Port,
processingChannel,
Expand All @@ -84,15 +82,14 @@ func newDNSCapturer(options CaptureOptions) DNSCapturer {
tcpChannels,
tcpReturnChannel,
options.ResultChannel,
options.PacketHandlerCount,
options.Done,
options.NoEthernetframe,
}
go encoder.run()
options.Wg.Add(1)
// todo: use the global wg for this
var wg sync.WaitGroup
for i := uint(0); i < options.PacketHandlerCount; i++ {
wg.Add(1)
go encoder.run()
}

return DNSCapturer{options, processingChannel}
}

Expand Down
1 change: 1 addition & 0 deletions src/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/mosajjal/Go-Splunk-HTTP/splunk/v2 v2.0.7
github.com/olivere/elastic v6.2.35+incompatible
github.com/pkg/errors v0.9.1 // indirect
github.com/pkg/profile v1.6.0
github.com/rogpeppe/fastuuid v1.2.0
github.com/segmentio/kafka-go v0.4.8
github.com/sirupsen/logrus v1.8.1
Expand Down
2 changes: 2 additions & 0 deletions src/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/profile v1.6.0 h1:hUDfIISABYI59DyeB3OTay/HxSRwTQ8rB/H83k6r5dM=
github.com/pkg/profile v1.6.0/go.mod h1:qBsxPvzyUincmltOk6iyRVxHYg4adc0OFOv72ZdLa18=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/fastuuid v1.2.0 h1:Ppwyp6VYCF1nvBTXL3trRso7mXMlRrw9ooo375wvi2s=
Expand Down
10 changes: 4 additions & 6 deletions src/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/mosajjal/dnsmonster/types"
"github.com/pkg/profile"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -46,12 +47,8 @@ func main() {
checkFlags()
runtime.GOMAXPROCS(generalOptions.Gomaxprocs)
if generalOptions.Cpuprofile != "" {
log.Warn("Writing CPU profile")
f, err := os.Create(generalOptions.Cpuprofile)
errorHandler(err)
err = pprof.StartCPUProfile(f)
errorHandler(err)
defer pprof.StopCPUProfile()

defer profile.Start(profile.CPUProfile).Stop()
}

// load the skipDomainFile if exists
Expand Down Expand Up @@ -110,6 +107,7 @@ func main() {
generalOptions.TcpResultChannelSize,
generalOptions.DefraggerChannelSize,
generalOptions.DefraggerChannelReturnSize,
&wg,
exiting,
captureOptions.NoEthernetframe,
})
Expand Down
82 changes: 82 additions & 0 deletions src/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package main

import (
"encoding/hex"
"testing"

"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/mosajjal/dnsmonster/types"
)

var packetSampleHex = "48df377637b8e48184e3747986dd600cbde600f611362600900053045b0000000000000000012403980000110040000000000000000100359b5800f6cc57f91384000001000200040001096b656570616c69766508676f74696e64657203636f6d0000010001c00c0005000100000001000c096170692d756531617ac016c0340005000100000001000e03617069016305756531617ac016c016000200010002a3000017076e732d3131313509617773646e732d3131036f726700c016000200010002a3000019076e732d3139393009617773646e732d353602636f02756b00c016000200010002a3000013066e732d33333509617773646e732d3431c01fc016000200010002a3000016066e732d37353109617773646e732d3239036e6574000000291000000000000000"
var packetSampleBytes, _ = hex.DecodeString(packetSampleHex)
var packetSample = gopacket.NewPacket(packetSampleBytes, layers.LayerTypeEthernet, gopacket.Default)

const (
TCPResultChannelSize = 60000
PacketChannelSize = 60000
IPDefraggerChannelSize = 60000
IPDefraggerReturnChannelSize = 60000
TCPAssemblyChannelSize = 60000
ResultChannelSize = 60000
NoEthernetframe = false
)

var testReturnChannel = make(chan tcpData, TCPResultChannelSize)
var testInputChannel = make(chan gopacket.Packet, PacketChannelSize)
var testIp4DefraggerChannel = make(chan ipv4ToDefrag, IPDefraggerChannelSize)
var testIp6DefraggerChannel = make(chan ipv6FragmentInfo, IPDefraggerChannelSize)
var testIp4DefraggerReturn = make(chan ipv4Defragged, IPDefraggerReturnChannelSize)
var testIp6DefraggerReturn = make(chan ipv6Defragged, IPDefraggerReturnChannelSize)
var testResultChannel = make(chan types.DNSResult, ResultChannelSize)
var testDoneChannel = make(chan bool)
var testTcpChannels []chan tcpPacket

var cnt = 0

func DummySink() {
for {
select {
case <-testResultChannel:
cnt++
if cnt%1000000 == 0 {
println(cnt)
}
}
}
}

var encoderList []packetEncoder

func benchmarkUdpPacketProcessingWorker(i uint, b *testing.B) {
e := packetEncoder{
53,
testInputChannel,
testIp4DefraggerChannel,
testIp6DefraggerChannel,
testIp4DefraggerReturn,
testIp6DefraggerReturn,
testTcpChannels,
testReturnChannel,
testResultChannel,
i,
testDoneChannel,
false,
}
encoderList = append(encoderList, e)
go e.run()
testTcpChannels = append(testTcpChannels, make(chan tcpPacket, TCPAssemblyChannelSize))
go DummySink()

for n := 0; n < b.N; n++ {
testInputChannel <- packetSample
}
}

func BenchmarkUdpPacketProcessingWorker1(b *testing.B) { benchmarkUdpPacketProcessingWorker(1, b) }
func BenchmarkUdpPacketProcessingWorker2(b *testing.B) { benchmarkUdpPacketProcessingWorker(2, b) }
func BenchmarkUdpPacketProcessingWorker4(b *testing.B) { benchmarkUdpPacketProcessingWorker(4, b) }
func BenchmarkUdpPacketProcessingWorker6(b *testing.B) { benchmarkUdpPacketProcessingWorker(6, b) }
func BenchmarkUdpPacketProcessingWorker8(b *testing.B) { benchmarkUdpPacketProcessingWorker(8, b) }
func BenchmarkUdpPacketProcessingWorker16(b *testing.B) { benchmarkUdpPacketProcessingWorker(16, b) }
8 changes: 4 additions & 4 deletions src/newflags.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ var generalOptions struct {
MaskSize4 int `long:"maskSize4" env:"DNSMONSTER_MASKSIZE4" default:"32" description:"Mask IPv4s by bits. 32 means all the bits of IP is saved in DB"`
MaskSize6 int `long:"maskSize6" env:"DNSMONSTER_MASKSIZE6" default:"128" description:"Mask IPv6s by bits. 32 means all the bits of IP is saved in DB"`
ServerName string `long:"serverName" env:"DNSMONSTER_SERVERNAME" default:"default" description:"Name of the server used to index the metrics."`
TcpAssemblyChannelSize uint `long:"tcpAssemblyChannelSize" env:"DNSMONSTER_TCPASSEMBLYCHANNELSIZE" default:"1000" description:"Size of the tcp assembler"`
TcpResultChannelSize uint `long:"tcpResultChannelSize" env:"DNSMONSTER_TCPRESULTCHANNELSIZE" default:"1000" description:"Size of the tcp result channel"`
TcpAssemblyChannelSize uint `long:"tcpAssemblyChannelSize" env:"DNSMONSTER_TCPASSEMBLYCHANNELSIZE" default:"10000" description:"Size of the tcp assembler"`
TcpResultChannelSize uint `long:"tcpResultChannelSize" env:"DNSMONSTER_TCPRESULTCHANNELSIZE" default:"10000" description:"Size of the tcp result channel"`
TcpHandlerCount uint `long:"tcpHandlerCount" env:"DNSMONSTER_TCPHANDLERCOUNT" default:"1" description:"Number of routines used to handle tcp assembly"`
ResultChannelSize uint `long:"resultChannelSize" env:"DNSMONSTER_RESULTCHANNELSIZE" default:"100000" description:"Size of the result processor channel size"`
LogLevel uint `long:"logLevel" env:"DNSMONSTER_LOGLEVEL" default:"3" description:"Set debug Log level, 0:PANIC, 1:ERROR, 2:WARN, 3:INFO, 4:DEBUG" choice:"0" choice:"1" choice:"2" choice:"3" choice:"4"`
DefraggerChannelSize uint `long:"defraggerChannelSize" env:"DNSMONSTER_DEFRAGGERCHANNELSIZE" default:"500" description:"Size of the channel to send packets to be defragged"`
DefraggerChannelReturnSize uint `long:"defraggerChannelReturnSize" env:"DNSMONSTER_DEFRAGGERCHANNELRETURNSIZE" default:"500" description:"Size of the channel where the defragged packets are returned"`
DefraggerChannelSize uint `long:"defraggerChannelSize" env:"DNSMONSTER_DEFRAGGERCHANNELSIZE" default:"10000" description:"Size of the channel to send packets to be defragged"`
DefraggerChannelReturnSize uint `long:"defraggerChannelReturnSize" env:"DNSMONSTER_DEFRAGGERCHANNELRETURNSIZE" default:"10000" description:"Size of the channel where the defragged packets are returned"`
Cpuprofile string `long:"cpuprofile" env:"DNSMONSTER_CPUPROFILE" default:"" description:"write cpu profile to file"`
Memprofile string `long:"memprofile" env:"DNSMONSTER_MEMPROFILE" default:"" description:"write memory profile to file"`
Gomaxprocs int `long:"gomaxprocs" env:"DNSMONSTER_GOMAXPROCS" default:"-1" description:"GOMAXPROCS variable"`
Expand Down
6 changes: 3 additions & 3 deletions src/output_stdout_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ func stdoutOutputWorker(stdConfig stdoutConfig) {
}

func stdoutOutput(stdConfig stdoutConfig) {
stdConfig.general.wg.Add(1)
defer stdConfig.general.wg.Done()
for i := 0; i < 8; i++ {
for i := 0; i < 1; i++ {
stdConfig.general.wg.Add(1)
defer stdConfig.general.wg.Done()
go stdoutOutputWorker(stdConfig)
}
}
Expand Down
111 changes: 63 additions & 48 deletions src/packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ func (encoder *packetEncoder) processTransport(foundLayerTypes *[]gopacket.Layer

}

func (encoder *packetEncoder) run() {
var detectIP DetectIP
func (encoder *packetEncoder) inputHandlerWorker(p chan gopacket.Packet) {
var ethLayer layers.Ethernet
var ip4 layers.IPv4
var ip6 layers.IPv6
Expand All @@ -61,13 +60,61 @@ func (encoder *packetEncoder) run() {
&udp,
&tcp,
}
// Use the IP Family detector when no ethernet frame is present.
if encoder.NoEthernetframe {
decodeLayers[0] = &detectIP
startLayer = LayerTypeDetectIP
parser := gopacket.NewDecodingLayerParser(startLayer, decodeLayers...)
foundLayerTypes := []gopacket.LayerType{}
for {
select {
case packet := <-p:
timestamp := packet.Metadata().Timestamp
if timestamp.IsZero() {
timestamp = time.Now()
}
_ = parser.DecodeLayers(packet.Data(), &foundLayerTypes)
// first parse the ip layer, so we can find fragmented packets
for _, layerType := range foundLayerTypes {
switch layerType {
case layers.LayerTypeIPv4:
// Check for fragmentation
if ip4.Flags&layers.IPv4DontFragment == 0 && (ip4.Flags&layers.IPv4MoreFragments != 0 || ip4.FragOffset != 0) {
// Packet is fragmented, send it to the defragger
encoder.ip4Defrgger <- ipv4ToDefrag{
ip4,
timestamp,
}
break
}
// log.Infof("packet %v coming to %p\n", timestamp, &encoder)
encoder.processTransport(&foundLayerTypes, &udp, &tcp, ip4.NetworkFlow(), timestamp, 4, ip4.SrcIP, ip4.DstIP)
continue
case layers.LayerTypeIPv6:
// Store the packet metadata
if ip6.NextHeader == layers.IPProtocolIPv6Fragment {
// TODO: Move the parsing to DecodingLayer when gopacket support it
if frag := packet.Layer(layers.LayerTypeIPv6Fragment).(*layers.IPv6Fragment); frag != nil {
encoder.ip6Defrgger <- ipv6FragmentInfo{
ip6,
*frag,
timestamp,
}
}
} else {
encoder.processTransport(&foundLayerTypes, &udp, &tcp, ip6.NetworkFlow(), timestamp, 6, ip6.SrcIP, ip6.DstIP)
}
}
}
break
}
}

parser := gopacket.NewDecodingLayerParser(startLayer, decodeLayers...)
}

func (encoder *packetEncoder) run() {

var ip4 layers.IPv4

var udp layers.UDP
var tcp layers.TCP

parserOnlyUDP := gopacket.NewDecodingLayerParser(
layers.LayerTypeUDP,
&udp,
Expand All @@ -77,6 +124,13 @@ func (encoder *packetEncoder) run() {
&tcp,
)
foundLayerTypes := []gopacket.LayerType{}

var handlerChanList []chan gopacket.Packet
for i := 0; i < int(encoder.handlerCount); i++ {
handlerChanList = append(handlerChanList, make(chan gopacket.Packet, 10000)) //todo: parameter for size of this channel needs to be defined
go encoder.inputHandlerWorker(handlerChanList[i])
}

for {
select {
case data := <-encoder.tcpReturnChannel:
Expand Down Expand Up @@ -113,48 +167,9 @@ func (encoder *packetEncoder) run() {
}
encoder.processTransport(&foundLayerTypes, &udp, &tcp, packet.ip.NetworkFlow(), packet.timestamp, 6, packet.ip.SrcIP, packet.ip.DstIP)
case packet := <-encoder.input:
{
timestamp := packet.Metadata().Timestamp
if timestamp.IsZero() {
timestamp = time.Now()
}
_ = parser.DecodeLayers(packet.Data(), &foundLayerTypes)
// first parse the ip layer, so we can find fragmented packets
for _, layerType := range foundLayerTypes {
switch layerType {
case layers.LayerTypeIPv4:
// Check for fragmentation
if ip4.Flags&layers.IPv4DontFragment == 0 && (ip4.Flags&layers.IPv4MoreFragments != 0 || ip4.FragOffset != 0) {
// Packet is fragmented, send it to the defragger
encoder.ip4Defrgger <- ipv4ToDefrag{
ip4,
timestamp,
}
break
}
// log.Infof("packet %v coming to %p\n", timestamp, &encoder)
encoder.processTransport(&foundLayerTypes, &udp, &tcp, ip4.NetworkFlow(), timestamp, 4, ip4.SrcIP, ip4.DstIP)
break
case layers.LayerTypeIPv6:
// Store the packet metadata
if ip6.NextHeader == layers.IPProtocolIPv6Fragment {
// TODO: Move the parsing to DecodingLayer when gopacket support it
if frag := packet.Layer(layers.LayerTypeIPv6Fragment).(*layers.IPv6Fragment); frag != nil {
encoder.ip6Defrgger <- ipv6FragmentInfo{
ip6,
*frag,
timestamp,
}
}
} else {
encoder.processTransport(&foundLayerTypes, &udp, &tcp, ip6.NetworkFlow(), timestamp, 6, ip6.SrcIP, ip6.DstIP)
}
}
}
break
}
handlerChanList[packet.TransportLayer().TransportFlow().FastHash()%uint64(encoder.handlerCount)] <- packet
case <-encoder.done:
break
continue
}
}
}
2 changes: 2 additions & 0 deletions src/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ type packetEncoder struct {
tcpAssembly []chan tcpPacket
tcpReturnChannel <-chan tcpData
resultChannel chan<- types.DNSResult
handlerCount uint
done chan bool
NoEthernetframe bool
}
Expand All @@ -118,6 +119,7 @@ type CaptureOptions struct {
TCPResultChannelSize uint
IPDefraggerChannelSize uint
IPDefraggerReturnChannelSize uint
Wg *sync.WaitGroup
Done chan bool
NoEthernetframe bool
}
Expand Down

0 comments on commit f01d247

Please sign in to comment.