Skip to content

Commit

Permalink
added multi HEC recievers - Testing
Browse files Browse the repository at this point in the history
  • Loading branch information
mosajjal committed May 24, 2021
1 parent 51fda5d commit 42c08cd
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 8 deletions.
17 changes: 15 additions & 2 deletions src/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,18 @@ import (

const VERSION = "v0.8.3.2"

type splunkOutputEndpointList []string

func (i *splunkOutputEndpointList) String() string {
return strings.Join(*i, " ")
}
func (i *splunkOutputEndpointList) Set(value string) error {
*i = append(*i, value)
return nil
}

var splunkOutputEndpoints splunkOutputEndpointList

var fs = flag.NewFlagSetWithEnvPrefix(os.Args[0], "DNSMONSTER", 0)
var devName = fs.String("devName", "", "Device used to capture")
var pcapFile = fs.String("pcapFile", "", "Pcap filename to run")
Expand Down Expand Up @@ -79,7 +91,6 @@ var elasticOutputIndex = fs.String("elasticOutputIndex", "default", "elastic ind
var elasticBatchSize = fs.Uint("elasticBatchSize", 1000, "Send data to Elastic in batch sizes")
var elasticBatchDelay = fs.Duration("elasticBatchDelay", 1*time.Second, "Interval between sending results to Elastic if Batch size is not filled")
var splunkOutputType = fs.Uint("splunkOutputType", 0, "What should be written to HEC. options: 0: none, 1: all, 2: apply skipdomains logic, 3: apply allowdomains logic, 4: apply both skip and allow domains logic")
var splunkOutputEndpoint = fs.String("splunkOutputEndpoint", "", "HEC endpoint address, example: http://127.0.0.1:8088. Used if splunkOutputType is not none")
var splunkOutputToken = fs.String("splunkOutputToken", "00000000-0000-0000-0000-000000000000", "Splunk HEC Token")
var splunkOutputIndex = fs.String("splunkOutputIndex", "temp", "Splunk Output Index")
var splunkOutputSource = fs.String("splunkOutputSource", "dnsmonster", "Splunk Output Source")
Expand All @@ -104,7 +115,9 @@ var skipDomainMapBool = false
var allowDomainMapBool = false

func checkFlags() {
fs.Var(&splunkOutputEndpoints, "splunkOutputEndpoint", "HEC endpoint address, example: http://127.0.0.1:8088. Used if splunkOutputType is not none")
err := fs.Parse(os.Args[1:])
log.Println(splunkOutputEndpoints)
errorHandler(err)

if *version {
Expand Down Expand Up @@ -338,7 +351,7 @@ func main() {
go elasticOutput(elasticResultChannel, exiting, &wg, *elasticOutputEndpoint, *elasticOutputIndex, *elasticBatchSize, *elasticBatchDelay, *packetLimit)
}
if *splunkOutputType > 0 {
go splunkOutput(splunkResultChannel, exiting, &wg, *splunkOutputEndpoint, *splunkOutputToken, *splunkOutputIndex, *splunkBatchSize, *splunkBatchDelay, *packetLimit)
go splunkOutput(splunkResultChannel, exiting, &wg, splunkOutputEndpoints, *splunkOutputToken, *splunkOutputIndex, *splunkBatchSize, *splunkBatchDelay, *packetLimit)
}
if *memprofile != "" {
go func() {
Expand Down
29 changes: 23 additions & 6 deletions src/output_splunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"encoding/json"
"fmt"
"log"
"math/rand"
"net/http"
"strings"
"sync"
"time"

Expand All @@ -14,7 +16,15 @@ import (

var splunkStats = outputStats{"splunk", 0, 0}

func connecSplunkRetry(exiting chan bool, splunkEndpoint string, splunkHecToken string, skipTlsVerification bool) *splunk.Client {
func connectMultiSplunkRetry(exiting chan bool, splunkEndpoints []string, splunkHecToken string, skipTlsVerification bool) []*splunk.Client {
var outputs []*splunk.Client
for _, splunkEndpoint := range splunkEndpoints {
outputs = append(outputs, connectSplunkRetry(exiting, splunkEndpoint, splunkHecToken, skipTlsVerification))
}
return outputs
}

func connectSplunkRetry(exiting chan bool, splunkEndpoint string, splunkHecToken string, skipTlsVerification bool) *splunk.Client {
tick := time.NewTicker(5 * time.Second)
// don't retry connection if we're doing dry run
if *splunkOutputType == 0 {
Expand All @@ -41,9 +51,15 @@ func connectSplunk(exiting chan bool, splunkEndpoint string, splunkHecToken stri

tr := &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: skipTlsVerification}}
httpClient := &http.Client{Timeout: time.Second * 20, Transport: tr}

splunkURL := splunkEndpoint
if !strings.HasSuffix(splunkEndpoint, "/services/collector") {
splunkURL = fmt.Sprintf("%s/services/collector", splunkEndpoint)
}

client := splunk.NewClient(
httpClient,
fmt.Sprintf("%s/services/collector", splunkEndpoint),
splunkURL,
splunkHecToken,
"",
"",
Expand All @@ -53,14 +69,14 @@ func connectSplunk(exiting chan bool, splunkEndpoint string, splunkHecToken stri
return client, err
}

func splunkOutput(resultChannel chan DNSResult, exiting chan bool, wg *sync.WaitGroup, splunkEndpoint string, splunkHecToken string, splunkIndex string, splunkBatchSize uint, batchDelay time.Duration, limit int) {
func splunkOutput(resultChannel chan DNSResult, exiting chan bool, wg *sync.WaitGroup, splunkEndpoints []string, splunkHecToken string, splunkIndex string, splunkBatchSize uint, batchDelay time.Duration, limit int) {
wg.Add(1)
defer wg.Done()

client := connecSplunkRetry(exiting, splunkEndpoint, splunkHecToken, *skipTlsVerification)
clients := connectMultiSplunkRetry(exiting, splunkEndpoints, splunkHecToken, *skipTlsVerification)

batch := make([]DNSResult, 0, splunkBatchSize)

rand.Seed(time.Now().Unix())
ticker := time.Tick(batchDelay)
printStatsTicker := time.Tick(*printStatsDelay)

Expand All @@ -71,9 +87,10 @@ func splunkOutput(resultChannel chan DNSResult, exiting chan bool, wg *sync.Wait
batch = append(batch, data)
}
case <-ticker:
client := clients[rand.Intn(len(clients))]
if err := splunkSendData(client, splunkIndex, *splunkOutputSource, *splunkOutputSourceType, batch); err != nil {
log.Println(err)
client = connecSplunkRetry(exiting, splunkEndpoint, splunkHecToken, *skipTlsVerification)
client = connectSplunkRetry(exiting, client.URL, splunkHecToken, *skipTlsVerification)
} else {
batch = make([]DNSResult, 0, splunkBatchSize)
}
Expand Down

0 comments on commit 42c08cd

Please sign in to comment.