Skip to content

Commit

Permalink
better defaults and starting to carve out types
Browse files Browse the repository at this point in the history
  • Loading branch information
mosajjal committed Nov 13, 2021
1 parent aa3d4fb commit 28b30ad
Show file tree
Hide file tree
Showing 14 changed files with 84 additions and 64 deletions.
7 changes: 4 additions & 3 deletions src/capture_dnstap.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"
"time"

"github.com/mosajjal/dnsmonster/types"
log "github.com/sirupsen/logrus"

dnstap "github.com/dnstap/golang-dnstap"
Expand Down Expand Up @@ -59,13 +60,13 @@ func handleDNSTapInterrupt(done chan bool) {
}()
}

func dnsTapMsgToDNSResult(msg []byte) DNSResult {
func dnsTapMsgToDNSResult(msg []byte) types.DNSResult {
dnstapObject := &dnstap.Dnstap{}

proto.Unmarshal(msg, dnstapObject)

// var myDNSrow DNSRow
var myDNSResult DNSResult
var myDNSResult types.DNSResult

if dnstapObject.Message.GetQueryMessage() != nil {
myDNSResult.DNS.Unpack(dnstapObject.Message.GetQueryMessage())
Expand All @@ -83,7 +84,7 @@ func dnsTapMsgToDNSResult(msg []byte) DNSResult {
return myDNSResult
}

func startDNSTap(resultChannel chan DNSResult) {
func startDNSTap(resultChannel chan types.DNSResult) {
log.Info("Starting DNStap capture")
input := parseDnstapSocket(captureOptions.DnstapSocket, captureOptions.DnstapPermission)

Expand Down
3 changes: 2 additions & 1 deletion src/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import (
"sync"
"time"

"github.com/mosajjal/dnsmonster/types"
log "github.com/sirupsen/logrus"
)

func dispatchOutput(resultChannel chan DNSResult, exiting chan bool, wg *sync.WaitGroup) {
func dispatchOutput(resultChannel chan types.DNSResult, exiting chan bool, wg *sync.WaitGroup) {
wg.Add(1)
defer wg.Done()

Expand Down
5 changes: 2 additions & 3 deletions src/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ require (
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.7.0 // indirect
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 // indirect
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20210510120138-977fb7262007 // indirect
golang.org/x/text v0.3.5 // indirect
golang.org/x/sys v0.0.0-20211112193437-faf0a1b62c6b // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)
15 changes: 7 additions & 8 deletions src/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzB
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d h1:20cMwl2fHAzkJMEA+8J4JgqBQcQGzbisXo31MIeenXI=
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand All @@ -94,14 +94,13 @@ golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007 h1:gG67DSER+11cZvqIMb8S8bt0vZtiN6xWYARwirrOSfE=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20211112193437-faf0a1b62c6b h1:uo+9AuR+gDt/gdj+1BaLhdOHsaGI6YU6585IiDcLrFE=
golang.org/x/sys v0.0.0-20211112193437-faf0a1b62c6b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.5 h1:i6eZZ+zk0SOf0xgBpEpPD18qWcJda6q1sxt3S0kzyUQ=
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191216052735-49a3e744a425/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
Expand Down
17 changes: 9 additions & 8 deletions src/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"sync"
"time"

"github.com/mosajjal/dnsmonster/types"
log "github.com/sirupsen/logrus"
)

Expand All @@ -31,14 +32,14 @@ var allowDomainMap = make(map[string]bool)
var skipDomainMapBool = false
var allowDomainMapBool = false

var clickhouseResultChannel = make(chan DNSResult, generalOptions.ResultChannelSize)
var kafkaResultChannel = make(chan DNSResult, generalOptions.ResultChannelSize)
var elasticResultChannel = make(chan DNSResult, generalOptions.ResultChannelSize)
var splunkResultChannel = make(chan DNSResult, generalOptions.ResultChannelSize)
var stdoutResultChannel = make(chan DNSResult, generalOptions.ResultChannelSize)
var fileResultChannel = make(chan DNSResult, generalOptions.ResultChannelSize)
var syslogResultChannel = make(chan DNSResult, generalOptions.ResultChannelSize)
var resultChannel = make(chan DNSResult, generalOptions.ResultChannelSize)
var clickhouseResultChannel = make(chan types.DNSResult, generalOptions.ResultChannelSize)
var kafkaResultChannel = make(chan types.DNSResult, generalOptions.ResultChannelSize)
var elasticResultChannel = make(chan types.DNSResult, generalOptions.ResultChannelSize)
var splunkResultChannel = make(chan types.DNSResult, generalOptions.ResultChannelSize)
var stdoutResultChannel = make(chan types.DNSResult, generalOptions.ResultChannelSize)
var fileResultChannel = make(chan types.DNSResult, generalOptions.ResultChannelSize)
var syslogResultChannel = make(chan types.DNSResult, generalOptions.ResultChannelSize)
var resultChannel = make(chan types.DNSResult, generalOptions.ResultChannelSize)

func main() {
flagsProcess()
Expand Down
4 changes: 2 additions & 2 deletions src/newflags.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ var captureOptions struct {
Port uint `long:"port" env:"DNSMONSTER_PORT" default:"53" description:"Port selected to filter packets"`
SampleRatio string `long:"sampleRatio" env:"DNSMONSTER_SAMPLERATIO" default:"1:1" description:"Capture Sampling by a:b. eg sampleRatio of 1:100 will process 1 percent of the incoming packets"`
DnstapPermission string `long:"dnstapPermission" env:"DNSMONSTER_DNSTAPPERMISSION" default:"755" description:"Set the dnstap socket permission, only applicable when unix:// is used"`
PacketHandlerCount uint `long:"packetHandlerCount" env:"DNSMONSTER_PACKETHANDLERCOUNT" default:"1" description:"Number of routines used to handle received packets"`
PacketChannelSize uint `long:"packetChannelSize" env:"DNSMONSTER_PACKETCHANNELSIZE" default:"100000" description:"Size of the packet handler channel"`
PacketHandlerCount uint `long:"packetHandlerCount" env:"DNSMONSTER_PACKETHANDLERCOUNT" default:"2" description:"Number of routines used to handle received packets"`
PacketChannelSize uint `long:"packetChannelSize" env:"DNSMONSTER_PACKETCHANNELSIZE" default:"1000" description:"Size of the packet handler channel"`
AfpacketBuffersizeMb uint `long:"afpacketBuffersizeMb" env:"DNSMONSTER_AFPACKETBUFFERSIZEMB" default:"64" description:"Afpacket Buffersize in MB"`
Filter string `long:"filter" env:"DNSMONSTER_FILTER" default:"((ip and (ip[9] == 6 or ip[9] == 17)) or (ip6 and (ip6[6] == 17 or ip6[6] == 6 or ip6[6] == 44)))" description:"BPF filter applied to the packet stream. If port is selected, the packets will not be defragged."`
UseAfpacket bool `long:"useAfpacket" env:"DNSMONSTER_USEAFPACKET" description:"Use AFPacket for live captures. Supported on Linux 3.0+ only"`
Expand Down
7 changes: 4 additions & 3 deletions src/output_clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"time"

"github.com/mosajjal/dnsmonster/types"
"github.com/rogpeppe/fastuuid"
log "github.com/sirupsen/logrus"

Expand Down Expand Up @@ -63,7 +64,7 @@ func clickhouseOutput(chConfig clickHouseConfig) {
defer chConfig.general.wg.Done()

connect := connectClickhouseRetry(chConfig)
batch := make([]DNSResult, 0, chConfig.clickhouseBatchSize)
batch := make([]types.DNSResult, 0, chConfig.clickhouseBatchSize)

ticker := time.Tick(chConfig.clickhouseDelay)
printStatsTicker := time.Tick(chConfig.general.printStatsDelay)
Expand All @@ -78,7 +79,7 @@ func clickhouseOutput(chConfig clickHouseConfig) {
log.Info(err)
connect = connectClickhouseRetry(chConfig)
} else {
batch = make([]DNSResult, 0, chConfig.clickhouseBatchSize)
batch = make([]types.DNSResult, 0, chConfig.clickhouseBatchSize)
}
case <-chConfig.general.exiting:
return
Expand All @@ -88,7 +89,7 @@ func clickhouseOutput(chConfig clickHouseConfig) {
}
}

func clickhouseSendData(connect clickhouse.Clickhouse, batch []DNSResult, chConfig clickHouseConfig) error {
func clickhouseSendData(connect clickhouse.Clickhouse, batch []types.DNSResult, chConfig clickHouseConfig) error {
if len(batch) == 0 {
return nil
}
Expand Down
7 changes: 4 additions & 3 deletions src/output_elastic.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"time"

"github.com/mosajjal/dnsmonster/types"
log "github.com/sirupsen/logrus"

"github.com/olivere/elastic"
Expand Down Expand Up @@ -64,7 +65,7 @@ func elasticOutput(esConfig elasticConfig) {
defer esConfig.general.wg.Done()

client := connectelasticRetry(esConfig)
batch := make([]DNSResult, 0, esConfig.elasticBatchSize)
batch := make([]types.DNSResult, 0, esConfig.elasticBatchSize)

ticker := time.Tick(esConfig.elasticBatchDelay)
printStatsTicker := time.Tick(esConfig.general.printStatsDelay)
Expand Down Expand Up @@ -94,7 +95,7 @@ func elasticOutput(esConfig elasticConfig) {
log.Info(err)
client = connectelasticRetry(esConfig)
} else {
batch = make([]DNSResult, 0, esConfig.elasticBatchSize)
batch = make([]types.DNSResult, 0, esConfig.elasticBatchSize)
}
case <-esConfig.general.exiting:
return
Expand All @@ -104,7 +105,7 @@ func elasticOutput(esConfig elasticConfig) {
}
}

func elasticSendData(client *elastic.Client, batch []DNSResult, esConfig elasticConfig) error {
func elasticSendData(client *elastic.Client, batch []types.DNSResult, esConfig elasticConfig) error {
for i := range batch {
for _, dnsQuery := range batch[i].DNS.Question {
if checkIfWeSkip(esConfig.elasticOutputType, dnsQuery.Name) {
Expand Down
7 changes: 4 additions & 3 deletions src/output_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"time"

"github.com/mosajjal/dnsmonster/types"
log "github.com/sirupsen/logrus"

"github.com/rogpeppe/fastuuid"
Expand Down Expand Up @@ -54,7 +55,7 @@ func kafkaOutput(kafConfig kafkaConfig) {
defer kafConfig.general.wg.Done()

connect := connectKafkaRetry(kafConfig)
batch := make([]DNSResult, 0, kafConfig.kafkaBatchSize)
batch := make([]types.DNSResult, 0, kafConfig.kafkaBatchSize)

ticker := time.Tick(kafConfig.kafkaBatchDelay)
printStatsTicker := time.Tick(kafConfig.general.printStatsDelay)
Expand All @@ -70,7 +71,7 @@ func kafkaOutput(kafConfig kafkaConfig) {
log.Info(err)
connect = connectKafkaRetry(kafConfig)
} else {
batch = make([]DNSResult, 0, kafConfig.kafkaBatchDelay)
batch = make([]types.DNSResult, 0, kafConfig.kafkaBatchDelay)
}
case <-kafConfig.general.exiting:
return
Expand All @@ -80,7 +81,7 @@ func kafkaOutput(kafConfig kafkaConfig) {
}
}

func kafkaSendData(connect *kafka.Conn, batch []DNSResult, kafConfig kafkaConfig) error {
func kafkaSendData(connect *kafka.Conn, batch []types.DNSResult, kafConfig kafkaConfig) error {
var msg []kafka.Message
for i := range batch {
for _, dnsQuery := range batch[i].DNS.Question {
Expand Down
7 changes: 4 additions & 3 deletions src/output_splunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
log "github.com/sirupsen/logrus"

"github.com/mosajjal/Go-Splunk-HTTP/splunk/v2"
"github.com/mosajjal/dnsmonster/types"
)

var splunkStats = outputStats{"splunk", 0, 0}
Expand Down Expand Up @@ -102,7 +103,7 @@ func splunkOutput(spConfig splunkConfig) {
log.Infof("Connecting to Splunk endpoints")
connectMultiSplunkRetry(spConfig)

batch := make([]DNSResult, 0, spConfig.splunkBatchSize)
batch := make([]types.DNSResult, 0, spConfig.splunkBatchSize)
rand.Seed(time.Now().Unix())
ticker := time.Tick(spConfig.splunkBatchDelay)
printStatsTicker := time.Tick(spConfig.general.printStatsDelay)
Expand All @@ -124,7 +125,7 @@ func splunkOutput(spConfig splunkConfig) {
splunkConnectionList[healthyId] = conn
splunkStats.Skipped += len(batch)
} else {
batch = make([]DNSResult, 0, spConfig.splunkBatchSize)
batch = make([]types.DNSResult, 0, spConfig.splunkBatchSize)
}
} else {
log.Warn("Splunk Connection not found")
Expand All @@ -138,7 +139,7 @@ func splunkOutput(spConfig splunkConfig) {
}
}

func splunkSendData(client *splunk.Client, batch []DNSResult, spConfig splunkConfig) error {
func splunkSendData(client *splunk.Client, batch []types.DNSResult, spConfig splunkConfig) error {
var events []*splunk.Event
for i := range batch {
for _, dnsQuery := range batch[i].DNS.Question {
Expand Down
12 changes: 9 additions & 3 deletions src/output_stdout_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@ import (
var stdoutstats = outputStats{"Stdout", 0, 0}
var fileoutstats = outputStats{"File", 0, 0}

func stdoutOutput(stdConfig stdoutConfig) {
stdConfig.general.wg.Add(1)
defer stdConfig.general.wg.Done()
func stdoutOutputWorker(stdConfig stdoutConfig) {
printStatsTicker := time.Tick(stdConfig.general.printStatsDelay)

for {
Expand All @@ -39,6 +37,14 @@ func stdoutOutput(stdConfig stdoutConfig) {
}
}

func stdoutOutput(stdConfig stdoutConfig) {
stdConfig.general.wg.Add(1)
defer stdConfig.general.wg.Done()
for i := 0; i < 8; i++ {
go stdoutOutputWorker(stdConfig)
}
}

func fileOutput(fConfig fileConfig) {
fConfig.general.wg.Add(1)
defer fConfig.general.wg.Done()
Expand Down
5 changes: 3 additions & 2 deletions src/packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
mkdns "github.com/miekg/dns"
"github.com/mosajjal/dnsmonster/types"
)

func (encoder *packetEncoder) processTransport(foundLayerTypes *[]gopacket.LayerType, udp *layers.UDP, tcp *layers.TCP, flow gopacket.Flow, timestamp time.Time, IPVersion uint8, SrcIP, DstIP net.IP) {
Expand All @@ -25,7 +26,7 @@ func (encoder *packetEncoder) processTransport(foundLayerTypes *[]gopacket.Layer
MaskSize = generalOptions.MaskSize6
BitSize = 8 * net.IPv6len
}
encoder.resultChannel <- DNSResult{timestamp, msg, IPVersion, SrcIP.Mask(net.CIDRMask(MaskSize, BitSize)), DstIP.Mask(net.CIDRMask(MaskSize, BitSize)), "udp", uint16(len(udp.Payload))}
encoder.resultChannel <- types.DNSResult{timestamp, msg, IPVersion, SrcIP.Mask(net.CIDRMask(MaskSize, BitSize)), DstIP.Mask(net.CIDRMask(MaskSize, BitSize)), "udp", uint16(len(udp.Payload))}
}
}
case layers.LayerTypeTCP:
Expand Down Expand Up @@ -87,7 +88,7 @@ func (encoder *packetEncoder) run() {
MaskSize = generalOptions.MaskSize6
BitSize = 8 * net.IPv6len
}
encoder.resultChannel <- DNSResult{data.timestamp, msg, data.IPVersion, data.SrcIP.Mask(net.CIDRMask(MaskSize, BitSize)), data.DstIP.Mask(net.CIDRMask(MaskSize, BitSize)), "tcp", uint16(len(data.data))}
encoder.resultChannel <- types.DNSResult{data.timestamp, msg, data.IPVersion, data.SrcIP.Mask(net.CIDRMask(MaskSize, BitSize)), data.DstIP.Mask(net.CIDRMask(MaskSize, BitSize)), "tcp", uint16(len(data.data))}
}
case packet := <-encoder.ip4DefrggerReturn:
// Packet was defragged, parse the remaining data
Expand Down
Loading

0 comments on commit 28b30ad

Please sign in to comment.