Skip to content

Commit

Permalink
kafka sasl plaintext auth. closes #83
Browse files Browse the repository at this point in the history
  • Loading branch information
mosajjal committed Dec 9, 2023
1 parent 6a526f5 commit 32ea845
Showing 1 changed file with 16 additions and 2 deletions.
18 changes: 16 additions & 2 deletions internal/output/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ import (
"crypto/tls"
"crypto/x509"
"errors"
"io/ioutil"
"net"
"os"
"strings"
"time"

"github.com/mosajjal/dnsmonster/internal/util"
Expand All @@ -30,11 +31,13 @@ import (

"github.com/rogpeppe/fastuuid"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/plain"
)

type kafkaConfig struct {
KafkaOutputType uint `long:"kafkaoutputtype" ini-name:"kafkaoutputtype" env:"DNSMONSTER_KAFKAOUTPUTTYPE" default:"0" description:"What should be written to kafka. options:\n;\t0: Disable Output\n;\t1: Enable Output without any filters\n;\t2: Enable Output and apply skipdomains logic\n;\t3: Enable Output and apply allowdomains logic\n;\t4: Enable Output and apply both skip and allow domains logic" choice:"0" choice:"1" choice:"2" choice:"3" choice:"4"`
KafkaOutputBroker []string `long:"kafkaoutputbroker" ini-name:"kafkaoutputbroker" env:"DNSMONSTER_KAFKAOUTPUTBROKER" default:"" description:"kafka broker address(es), example: 127.0.0.1:9092. Used if kafkaOutputType is not none"`
kafkaSASLCredentials string `long:"kafkasaslcredentials" ini-name:"kafkasaslcredentials" env:"DNSMONSTER_KAFKASASLCREDENTIALS" default:"" description:"kafka SASL credentials, example: username:password. only plain SASL is supported"`
KafkaOutputTopic string `long:"kafkaoutputtopic" ini-name:"kafkaoutputtopic" env:"DNSMONSTER_KAFKAOUTPUTTOPIC" default:"dnsmonster" description:"Kafka topic for logging"`
KafkaBatchSize uint `long:"kafkabatchsize" ini-name:"kafkabatchsize" env:"DNSMONSTER_KAFKABATCHSIZE" default:"1000" description:"Minimum capacity of the cache array used to send data to Kafka"`
KafkaOutputFormat string `long:"kafkaoutputformat" ini-name:"kafkaoutputformat" env:"DNSMONSTER_KAFKAOUTPUTFORMAT" default:"json" description:"Output format. options:json, gob. " choice:"json" choice:"gob"`
Expand Down Expand Up @@ -99,7 +102,7 @@ func (kafConfig kafkaConfig) getWriter() *kafka.Writer {
tlsConfig := &tls.Config{}

if kafConfig.KafkaCACertificatePath != "" {
caCert, err := ioutil.ReadFile(kafConfig.KafkaCACertificatePath)
caCert, err := os.ReadFile(kafConfig.KafkaCACertificatePath)
if err != nil {
log.Fatalf("Could not read kafka CA certificate: %v", err)
}
Expand All @@ -122,6 +125,17 @@ func (kafConfig kafkaConfig) getWriter() *kafka.Writer {
transport.TLS = tlsConfig
}

if kafConfig.kafkaSASLCredentials != "" {
creds := strings.Split(kafConfig.kafkaSASLCredentials, ":")
if len(creds) != 2 {
log.Errorf("Could not parse kafka SASL credentials: %v", kafConfig.kafkaSASLCredentials)
}
mechanism := plain.Mechanism{
Username: creds[0],
Password: creds[1],
}
transport.SASL = mechanism
}
kWriter := &kafka.Writer{
Addr: kafka.TCP(kafConfig.KafkaOutputBroker...),
Async: true,
Expand Down

0 comments on commit 32ea845

Please sign in to comment.