From 32ea845b95854067d1d16caf0513a713a7a1df27 Mon Sep 17 00:00:00 2001 From: Ali Mosajjal Date: Sat, 9 Dec 2023 14:30:43 +1300 Subject: [PATCH] kafka sasl plaintext auth. closes #83 --- internal/output/kafka.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/internal/output/kafka.go b/internal/output/kafka.go index b0f82a2..81d5082 100644 --- a/internal/output/kafka.go +++ b/internal/output/kafka.go @@ -20,8 +20,9 @@ import ( "crypto/tls" "crypto/x509" "errors" - "io/ioutil" "net" + "os" + "strings" "time" "github.com/mosajjal/dnsmonster/internal/util" @@ -30,11 +31,13 @@ import ( "github.com/rogpeppe/fastuuid" "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go/sasl/plain" ) type kafkaConfig struct { KafkaOutputType uint `long:"kafkaoutputtype" ini-name:"kafkaoutputtype" env:"DNSMONSTER_KAFKAOUTPUTTYPE" default:"0" description:"What should be written to kafka. options:\n;\t0: Disable Output\n;\t1: Enable Output without any filters\n;\t2: Enable Output and apply skipdomains logic\n;\t3: Enable Output and apply allowdomains logic\n;\t4: Enable Output and apply both skip and allow domains logic" choice:"0" choice:"1" choice:"2" choice:"3" choice:"4"` KafkaOutputBroker []string `long:"kafkaoutputbroker" ini-name:"kafkaoutputbroker" env:"DNSMONSTER_KAFKAOUTPUTBROKER" default:"" description:"kafka broker address(es), example: 127.0.0.1:9092. Used if kafkaOutputType is not none"` + kafkaSASLCredentials string `long:"kafkasaslcredentials" ini-name:"kafkasaslcredentials" env:"DNSMONSTER_KAFKASASLCREDENTIALS" default:"" description:"kafka SASL credentials, example: username:password. only plain SASL is supported"` KafkaOutputTopic string `long:"kafkaoutputtopic" ini-name:"kafkaoutputtopic" env:"DNSMONSTER_KAFKAOUTPUTTOPIC" default:"dnsmonster" description:"Kafka topic for logging"` KafkaBatchSize uint `long:"kafkabatchsize" ini-name:"kafkabatchsize" env:"DNSMONSTER_KAFKABATCHSIZE" default:"1000" description:"Minimum capacity of the cache array used to send data to Kafka"` KafkaOutputFormat string `long:"kafkaoutputformat" ini-name:"kafkaoutputformat" env:"DNSMONSTER_KAFKAOUTPUTFORMAT" default:"json" description:"Output format. options:json, gob. " choice:"json" choice:"gob"` @@ -99,7 +102,7 @@ func (kafConfig kafkaConfig) getWriter() *kafka.Writer { tlsConfig := &tls.Config{} if kafConfig.KafkaCACertificatePath != "" { - caCert, err := ioutil.ReadFile(kafConfig.KafkaCACertificatePath) + caCert, err := os.ReadFile(kafConfig.KafkaCACertificatePath) if err != nil { log.Fatalf("Could not read kafka CA certificate: %v", err) } @@ -122,6 +125,17 @@ func (kafConfig kafkaConfig) getWriter() *kafka.Writer { transport.TLS = tlsConfig } + if kafConfig.kafkaSASLCredentials != "" { + creds := strings.Split(kafConfig.kafkaSASLCredentials, ":") + if len(creds) != 2 { + log.Errorf("Could not parse kafka SASL credentials: %v", kafConfig.kafkaSASLCredentials) + } + mechanism := plain.Mechanism{ + Username: creds[0], + Password: creds[1], + } + transport.SASL = mechanism + } kWriter := &kafka.Writer{ Addr: kafka.TCP(kafConfig.KafkaOutputBroker...), Async: true,