Skip to content

Commit

Permalink
Enable Kafka SSL (#27)
Browse files Browse the repository at this point in the history
* add Kafka SSL support

* cleanup; use release version of librdkafka no RC

* refresh go.sum
  • Loading branch information
jarett-cyxtera authored and palmerabollo committed Oct 24, 2019
1 parent 986d7f8 commit c8c6ead
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 20 deletions.
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.11-alpine as build
FROM golang:1.13.3-alpine3.10 as build

RUN apk add --no-cache alpine-sdk librdkafka librdkafka-dev

Expand All @@ -7,7 +7,7 @@ ADD . /src/prometheus-kafka-adapter

RUN go build -o /prometheus-kafka-adapter

FROM alpine:3.8
FROM alpine:3.10

RUN apk add --no-cache librdkafka

Expand Down
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ Prometheus-kafka-adapter listens for metrics coming from Prometheus and sends th
- `LOG_LEVEL`: defines log level for [`logrus`](https://github.com/sirupsen/logrus), can be `debug`, `info`, `warn`, `error`, `fatal` or `panic`, defaults to `info`.
- `GIN_MODE`: manage [gin](https://github.com/gin-gonic/gin) debug logging, can be `debug` or `release`.

To connect to Kafka over SSL define the following additonal environment variables:

- `KAFKA_TLS_CLIENT_CERT_FILE`: Kafka SSL client certificate file, defaults to `""`
- `KAFKA_TLS_CLIENT_KEY_FILE`: Kafka SSL client certificate key file, defaults to `""`
- `KAFKA_TLS_CLIENT_KEY_PASS`: Kafka SSL client certificate key password (optional), defaults to `""`
- `KAFKA_TLS_CA_CERT_FILE`: Kafka SSL broker CA certificate file, defaults to `""`

### prometheus

Prometheus needs to have a `remote_write` url configured, pointing to the '/receive' endpoint of the host and port where the prometheus-kafka-adapter service is running. For example:
Expand Down
27 changes: 24 additions & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,14 @@ var (
Topic: &kafkaTopic,
Partition: kafka.PartitionAny,
}
kafkaCompression = "none"
kafkaBatchNumMessages = "10000"
serializer Serializer
kafkaCompression = "none"
kafkaBatchNumMessages = "10000"
kafkaSslClientCertFile = ""
kafkaSslClientKeyFile = ""
kafkaSslClientKeyPass = ""
kafkaSslCACertFile = ""
kafkaSslValidation = true
serializer Serializer
)

func init() {
Expand Down Expand Up @@ -74,6 +79,22 @@ func init() {
kafkaBatchNumMessages = value
}

if value := os.Getenv("KAFKA_SSL_CLIENT_CERT_FILE"); value != "" {
kafkaSslClientCertFile = value
}

if value := os.Getenv("KAFKA_SSL_CLIENT_KEY_FILE"); value != "" {
kafkaSslClientKeyFile = value
}

if value := os.Getenv("KAFKA_SSL_CLIENT_KEY_PASS"); value != "" {
kafkaSslClientKeyPass = value
}

if value := os.Getenv("KAFKA_SSL_CA_CERT_FILE"); value != "" {
kafkaSslCACertFile = value
}

var err error
serializer, err = parseSerializationFormat(os.Getenv("SERIALIZATION_FORMAT"))
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ module github.com/Telefonica/prometheus-kafka-adapter
require (
github.com/actgardner/gogen-avro v5.1.0+incompatible // indirect
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 // indirect
github.com/confluentinc/confluent-kafka-go v0.11.4
github.com/confluentinc/confluent-kafka-go v1.0.0
github.com/containous/traefik v1.7.1
github.com/edenhill/librdkafka v1.0.1
github.com/fatih/structs v1.1.0
github.com/gin-contrib/sse v0.0.0-20170109093832-22d885f9ecc7 // indirect
github.com/gin-gonic/contrib v0.0.0-20180614032058-39cfb9727134
Expand All @@ -29,3 +30,5 @@ require (
gopkg.in/go-playground/validator.v8 v8.18.2 // indirect
gopkg.in/yaml.v2 v2.2.1 // indirect
)

go 1.13
10 changes: 3 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/actgardner/gogen-avro v5.1.0+incompatible h1:FifTNNceWAXLIgeLiLaFzLcJ9NyBqh59g113kgOmqvo=
github.com/actgardner/gogen-avro v5.1.0+incompatible/go.mod h1:N2PzqZtS+5w9xxGp2daeykhWdTL0lBiRhbbvkVj4Yd8=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/confluentinc/confluent-kafka-go v0.11.4 h1:uH5doflVcMn+2G/ECv0wxpgmVkvEpTwYFW57V2iLqHo=
github.com/confluentinc/confluent-kafka-go v0.11.4/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg=
github.com/confluentinc/confluent-kafka-go v1.0.0 h1:y+G9NTXsvoelf1cRzjtLKOZsPqh71noS4+t+e+eINIk=
github.com/confluentinc/confluent-kafka-go v1.0.0/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg=
github.com/containous/traefik v1.7.1 h1:8fZ0MIRANiu39sBo/sIVy1EV1hRKcdz1Nc1QQjpL5zM=
github.com/containous/traefik v1.7.1/go.mod h1:epDRqge3JzKOhlSWzOpNYEEKXmM6yfN5tPzDGKk3ljo=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo=
github.com/edenhill/librdkafka v1.0.1/go.mod h1:F8PuMcF+v4otvUmH+SaJzju25e4o1IZunsVnaYzfIYo=
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/gin-contrib/sse v0.0.0-20170109093832-22d885f9ecc7 h1:AzN37oI0cOS+cougNAV9szl6CVoj2RYwzS3DpUQNtlY=
github.com/gin-contrib/sse v0.0.0-20170109093832-22d885f9ecc7/go.mod h1:VJ0WA2NBN22VlZ2dKZQPAPnyWw5XTlK1KymzLKsr59s=
Expand All @@ -37,7 +35,6 @@ github.com/mattn/go-isatty v0.0.4 h1:bnP0vzxcAdeI1zdubAl5PjU6zsERjGZb7raWodagDYs
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v0.8.0 h1:1921Yw9Gc3iSc4VQh3PIoOqgPCZS7G/4xQNVUp8Mda8=
github.com/prometheus/client_golang v0.8.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
Expand All @@ -51,7 +48,6 @@ github.com/prometheus/prometheus v2.4.2+incompatible h1:IpbpeZAXsg39pqRThfPHoNRY
github.com/prometheus/prometheus v2.4.2+incompatible/go.mod h1:oAIUtOny2rjMX0OWN5vPR5/q/twIROJvdqnQKDdil/s=
github.com/sirupsen/logrus v1.1.0 h1:65VZabgUiV9ktjGM5nTq0+YurgTyX+YI2lSSfDjI+qU=
github.com/sirupsen/logrus v1.1.0/go.mod h1:zrgwTnHtNr00buQ1vSptGe8m1f/BbgsPukg8qsT7A+A=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/ugorji/go/codec v0.0.0-20180927125128-99ea80c8b19a h1:BgdofUvNP/srMxiUUpGyZm+WjX/qXpMXdl3edRf1Ta0=
github.com/ugorji/go/codec v0.0.0-20180927125128-99ea80c8b19a/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
Expand Down
24 changes: 17 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,23 @@ import (
func main() {
log.Info("creating kafka producer")

producer, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": kafkaBrokerList,
"compression.codec": kafkaCompression,
"batch.num.messages": kafkaBatchNumMessages,
"go.batch.producer": true, // Enable batch producer (for increased performance).
"go.delivery.reports": false, // per-message delivery reports to the Events() channel
})
kafkaConfig := kafka.ConfigMap{
"bootstrap.servers": kafkaBrokerList,
"compression.codec": kafkaCompression,
"batch.num.messages": kafkaBatchNumMessages,
"go.batch.producer": true, // Enable batch producer (for increased performance).
"go.delivery.reports": false, // per-message delivery reports to the Events() channel
"ssl.ca.location": kafkaSslCACertFile, // CA certificate file for verifying the broker's certificate.
"ssl.certificate.location": kafkaSslClientCertFile, // Client's certificate
"ssl.key.location": kafkaSslClientKeyFile, // Client's key
"ssl.key.password": kafkaSslClientKeyPass, // Key password, if any.
}

if kafkaSslClientCertFile != "" && kafkaSslClientKeyFile != "" && kafkaSslCACertFile != "" {
kafkaConfig["security.protocol"] = "ssl"
}

producer, err := kafka.NewProducer(&kafkaConfig)

if err != nil {
logrus.WithError(err).Fatal("couldn't create kafka producer")
Expand Down

0 comments on commit c8c6ead

Please sign in to comment.