diff --git a/integration-tests/enrich-kafka/config/enrich-kafka.hocon b/integration-tests/enrich-kafka/config/enrich-kafka.hocon index 2ac939210..ef9150ff8 100644 --- a/integration-tests/enrich-kafka/config/enrich-kafka.hocon +++ b/integration-tests/enrich-kafka/config/enrich-kafka.hocon @@ -10,6 +10,8 @@ "consumerConf": { "enable.auto.commit": "false" "auto.offset.reset" : "earliest" + "security.protocol": "PLAINTEXT" + "sasl.mechanism": "GSSAPI" } } @@ -20,12 +22,22 @@ "bootstrapServers": "broker:29092" "partitionKey": "app_id" "headers": ["app_id"] + "producerConf": { + "acks": "all" + "security.protocol": "PLAINTEXT" + "sasl.mechanism": "GSSAPI" + } } "bad": { "type": "Kafka" "topicName": "it-enrich-kinesis-bad" "bootstrapServers": "broker:29092" + "producerConf": { + "acks": "all" + "security.protocol": "PLAINTEXT" + "sasl.mechanism": "GSSAPI" + } } } diff --git a/modules/kafka/src/it/scala/com/snowplowanalytics/snowplow/enrich/kafka/test/EnrichKafkaSpec.scala b/modules/kafka/src/it/scala/com/snowplowanalytics/snowplow/enrich/kafka/test/EnrichKafkaSpec.scala index 898d161fd..ea30c37c6 100644 --- a/modules/kafka/src/it/scala/com/snowplowanalytics/snowplow/enrich/kafka/test/EnrichKafkaSpec.scala +++ b/modules/kafka/src/it/scala/com/snowplowanalytics/snowplow/enrich/kafka/test/EnrichKafkaSpec.scala @@ -32,7 +32,7 @@ import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Output.{Kafka import com.snowplowanalytics.snowplow.enrich.common.fs2.test.CollectorPayloadGen -import com.snowplowanalytics.snowplow.enrich.kafka.{Sink, Source} +import com.snowplowanalytics.snowplow.enrich.kafka._ class EnrichKafkaSpec extends Specification with CatsEffect { @@ -58,16 +58,20 @@ class EnrichKafkaSpec extends Specification with CatsEffect { "group.id" -> "it-enrich", "auto.offset.reset" -> "earliest", "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", - "value.deserializer" -> "org.apache.kafka.common.serialization.ByteArrayDeserializer" + "value.deserializer" -> "org.apache.kafka.common.serialization.ByteArrayDeserializer", + "security.protocol" -> "PLAINTEXT", + "sasl.mechanism" -> "GSSAPI" ) val producerConf: Map[String, String] = Map( - "acks" -> "all" + "acks" -> "all", + "security.protocol" -> "PLAINTEXT", + "sasl.mechanism" -> "GSSAPI" ) def run(): IO[Aggregates] = { - val resources = Sink.init[IO](OutKafka(collectorPayloadsStream, bootstrapServers, "", Set.empty, producerConf)) + val resources = Sink.init[IO](OutKafka(collectorPayloadsStream, bootstrapServers, "", Set.empty, producerConf), classOf[SourceAuthHandler].getName) resources.use { sink => val generate = @@ -79,10 +83,10 @@ class EnrichKafkaSpec extends Specification with CatsEffect { consumeGood(refGood).merge(consumeBad(refBad)) def consumeGood(ref: Ref[IO, AggregateGood]): Stream[IO, Unit] = - Source.init[IO](InKafka(enrichedStream, bootstrapServers, consumerConf)).map(_.record.value).evalMap(aggregateGood(_, ref)) + Source.init[IO](InKafka(enrichedStream, bootstrapServers, consumerConf), classOf[GoodSinkAuthHandler].getName).map(_.record.value).evalMap(aggregateGood(_, ref)) def consumeBad(ref: Ref[IO, AggregateBad]): Stream[IO, Unit] = - Source.init[IO](InKafka(badRowsStream, bootstrapServers, consumerConf)).map(_.record.value).evalMap(aggregateBad(_, ref)) + Source.init[IO](InKafka(badRowsStream, bootstrapServers, consumerConf), classOf[BadSinkAuthHandler].getName).map(_.record.value).evalMap(aggregateBad(_, ref)) def aggregateGood(r: Array[Byte], ref: Ref[IO, AggregateGood]): IO[Unit] = for { diff --git a/modules/kafka/src/main/resources/application.conf b/modules/kafka/src/main/resources/application.conf index 4efa7e45f..5c5897eda 100644 --- a/modules/kafka/src/main/resources/application.conf +++ b/modules/kafka/src/main/resources/application.conf @@ -5,6 +5,9 @@ "enable.auto.commit": "false" "auto.offset.reset" : "earliest" "group.id": "enrich" + "security.protocol": "SASL_SSL" + "sasl.mechanism": "OAUTHBEARER" + "sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;" } } @@ -13,6 +16,9 @@ "type": "Kafka" "producerConf": { "acks": "all" + "security.protocol": "SASL_SSL" + "sasl.mechanism": "OAUTHBEARER" + "sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;" } "partitionKey": "" "headers": [] @@ -24,6 +30,9 @@ "bootstrapServers": "" "producerConf": { "acks": "all" + "security.protocol": "SASL_SSL" + "sasl.mechanism": "OAUTHBEARER" + "sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;" } "partitionKey": "" "headers": [] @@ -33,6 +42,9 @@ "type": "Kafka" "producerConf": { "acks": "all" + "security.protocol": "SASL_SSL" + "sasl.mechanism": "OAUTHBEARER" + "sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;" } "partitionKey": "" "headers": [] diff --git a/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/AzureAuthenticationCallbackHandler.scala b/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/AzureAuthenticationCallbackHandler.scala new file mode 100644 index 000000000..34bc3eb8c --- /dev/null +++ b/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/AzureAuthenticationCallbackHandler.scala @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. + * All rights reserved. + * + * This software is made available by Snowplow Analytics, Ltd., + * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 + * located at https://docs.snowplow.io/limited-use-license-1.0 + * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION + * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. + */ +package com.snowplowanalytics.snowplow.enrich.kafka + +import java.net.URI +import java.{lang, util} + +import com.nimbusds.jwt.JWTParser + +import javax.security.auth.callback.Callback +import javax.security.auth.callback.UnsupportedCallbackException +import javax.security.auth.login.AppConfigurationEntry + +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler +import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken +import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback + +import com.azure.identity.DefaultAzureCredentialBuilder +import com.azure.core.credential.TokenRequestContext + +// We need separate instances of callback handler with separate source and +// sinks because they need different tokens to authenticate. However we are +// only giving class name to Kafka and it initializes the class itself and if +// we pass same class name for all source and sinks, Kafka initializes and uses +// only one instance of the callback handler. To create separate instances, we +// created multiple different classes and pass their names to respective sink +// and source properties. With this way, all the source and sinks will have their +// own callback handler instance. + +class SourceAuthHandler extends AzureAuthenticationCallbackHandler + +class GoodSinkAuthHandler extends AzureAuthenticationCallbackHandler + +class BadSinkAuthHandler extends AzureAuthenticationCallbackHandler + +class PiiSinkAuthHandler extends AzureAuthenticationCallbackHandler + +class AzureAuthenticationCallbackHandler extends AuthenticateCallbackHandler { + + val credentials = new DefaultAzureCredentialBuilder().build() + + var sbUri: String = "" + + override def configure( + configs: util.Map[String, _], + saslMechanism: String, + jaasConfigEntries: util.List[AppConfigurationEntry] + ): Unit = { + val bootstrapServer = + configs + .get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) + .toString + .replaceAll("\\[|\\]", "") + .split(",") + .toList + .headOption match { + case Some(s) => s + case None => throw new Exception("Empty bootstrap servers list") + } + val uri = URI.create("https://" + bootstrapServer) + // Workload identity works with '.default' scope + this.sbUri = s"${uri.getScheme}://${uri.getHost}/.default" + } + + override def handle(callbacks: Array[Callback]): Unit = + callbacks.foreach { + case callback: OAuthBearerTokenCallback => + val token = getOAuthBearerToken() + callback.token(token) + case callback => throw new UnsupportedCallbackException(callback) + } + + def getOAuthBearerToken(): OAuthBearerToken = { + val reqContext = new TokenRequestContext() + reqContext.addScopes(sbUri) + val accessToken = credentials.getTokenSync(reqContext).getToken + val jwt = JWTParser.parse(accessToken) + val claims = jwt.getJWTClaimsSet + + new OAuthBearerToken { + override def value(): String = accessToken + + override def lifetimeMs(): Long = claims.getExpirationTime.getTime + + override def scope(): util.Set[String] = null + + override def principalName(): String = null + + override def startTimeMs(): lang.Long = null + } + } + + override def close(): Unit = () +} diff --git a/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Main.scala b/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Main.scala index bb24e6589..0a9ef88e0 100644 --- a/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Main.scala +++ b/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Main.scala @@ -57,10 +57,10 @@ object Main extends IOApp { BuildInfo.version, BuildInfo.description, cliConfig => IO.pure(cliConfig), - (input, _) => Source.init[IO](input), - out => Sink.initAttributed(out), - out => Sink.initAttributed(out), - out => Sink.init(out), + (input, _) => Source.init[IO](input, classOf[SourceAuthHandler].getName), + out => Sink.initAttributed(out, classOf[GoodSinkAuthHandler].getName), + out => Sink.initAttributed(out, classOf[PiiSinkAuthHandler].getName), + out => Sink.init(out, classOf[BadSinkAuthHandler].getName), checkpoint, createBlobStorageClient, _.record.value, diff --git a/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Sink.scala b/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Sink.scala index f8d3c3d4d..1a8ecbad1 100644 --- a/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Sink.scala +++ b/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Sink.scala @@ -26,18 +26,20 @@ import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Output object Sink { def init[F[_]: Async: Parallel]( - output: Output + output: Output, + authCallbackClass: String ): Resource[F, ByteSink[F]] = for { - sink <- initAttributed(output) + sink <- initAttributed(output, authCallbackClass) } yield (records: List[Array[Byte]]) => sink(records.map(AttributedData(_, UUID.randomUUID().toString, Map.empty))) def initAttributed[F[_]: Async: Parallel]( - output: Output + output: Output, + authCallbackClass: String ): Resource[F, AttributedByteSink[F]] = output match { case k: Output.Kafka => - mkProducer(k).map { producer => records => + mkProducer(k, authCallbackClass).map { producer => records => records.parTraverse_ { record => producer .produceOne_(toProducerRecord(k.topicName, record)) @@ -49,11 +51,14 @@ object Sink { } private def mkProducer[F[_]: Async]( - output: Output.Kafka + output: Output.Kafka, + authCallbackClass: String ): Resource[F, KafkaProducer[F, String, Array[Byte]]] = { val producerSettings = ProducerSettings[F, String, Array[Byte]] .withBootstrapServers(output.bootstrapServers) + // set before user-provided config to make it possible to override it via config + .withProperty("sasl.login.callback.handler.class", authCallbackClass) .withProperties(output.producerConf) .withProperties( ("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"), diff --git a/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Source.scala b/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Source.scala index 3bdf53c01..c236402be 100644 --- a/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Source.scala +++ b/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Source.scala @@ -21,19 +21,23 @@ import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Input object Source { def init[F[_]: Async]( - input: Input + input: Input, + authCallbackClass: String ): Stream[F, CommittableConsumerRecord[F, String, Array[Byte]]] = input match { - case k: Input.Kafka => kafka(k) + case k: Input.Kafka => kafka(k, authCallbackClass) case i => Stream.raiseError[F](new IllegalArgumentException(s"Input $i is not Kafka")) } def kafka[F[_]: Async]( - input: Input.Kafka + input: Input.Kafka, + authCallbackClass: String ): Stream[F, CommittableConsumerRecord[F, String, Array[Byte]]] = { val consumerSettings = ConsumerSettings[F, String, Array[Byte]] .withBootstrapServers(input.bootstrapServers) + // set before user-provided config to make it possible to override it via config + .withProperty("sasl.login.callback.handler.class", authCallbackClass) .withProperties(input.consumerConf) .withEnableAutoCommit(false) // prevent enabling auto-commits by setting this after user-provided config .withProperties( diff --git a/modules/kafka/src/test/scala/com/snowplowanalytics/snowplow/enrich/kafka/ConfigSpec.scala b/modules/kafka/src/test/scala/com/snowplowanalytics/snowplow/enrich/kafka/ConfigSpec.scala index eb44d8f52..054c5916c 100644 --- a/modules/kafka/src/test/scala/com/snowplowanalytics/snowplow/enrich/kafka/ConfigSpec.scala +++ b/modules/kafka/src/test/scala/com/snowplowanalytics/snowplow/enrich/kafka/ConfigSpec.scala @@ -46,7 +46,10 @@ class ConfigSpec extends Specification with CatsEffect { "auto.offset.reset" -> "earliest", "session.timeout.ms" -> "45000", "enable.auto.commit" -> "false", - "group.id" -> "enrich" + "group.id" -> "enrich", + "security.protocol" -> "SASL_SSL", + "sasl.mechanism" -> "OAUTHBEARER", + "sasl.jaas.config" -> "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;" ) ), io.Outputs( @@ -55,7 +58,12 @@ class ConfigSpec extends Specification with CatsEffect { "localhost:9092", "app_id", Set("app_id"), - Map("acks" -> "all") + Map( + "acks" -> "all", + "security.protocol" -> "SASL_SSL", + "sasl.mechanism" -> "OAUTHBEARER", + "sasl.jaas.config" -> "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;" + ) ), Some( io.Output.Kafka( @@ -63,7 +71,12 @@ class ConfigSpec extends Specification with CatsEffect { "localhost:9092", "app_id", Set("app_id"), - Map("acks" -> "all") + Map( + "acks" -> "all", + "security.protocol" -> "SASL_SSL", + "sasl.mechanism" -> "OAUTHBEARER", + "sasl.jaas.config" -> "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;" + ) ) ), io.Output.Kafka( @@ -71,7 +84,12 @@ class ConfigSpec extends Specification with CatsEffect { "localhost:9092", "", Set(), - Map("acks" -> "all") + Map( + "acks" -> "all", + "security.protocol" -> "SASL_SSL", + "sasl.mechanism" -> "OAUTHBEARER", + "sasl.jaas.config" -> "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;" + ) ) ), io.Concurrency(256, 1), @@ -151,7 +169,10 @@ class ConfigSpec extends Specification with CatsEffect { Map( "auto.offset.reset" -> "earliest", "enable.auto.commit" -> "false", - "group.id" -> "enrich" + "group.id" -> "enrich", + "security.protocol" -> "SASL_SSL", + "sasl.mechanism" -> "OAUTHBEARER", + "sasl.jaas.config" -> "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;" ) ), io.Outputs( @@ -160,7 +181,12 @@ class ConfigSpec extends Specification with CatsEffect { "localhost:9092", "", Set(), - Map("acks" -> "all") + Map( + "acks" -> "all", + "security.protocol" -> "SASL_SSL", + "sasl.mechanism" -> "OAUTHBEARER", + "sasl.jaas.config" -> "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;" + ) ), None, io.Output.Kafka( @@ -168,7 +194,12 @@ class ConfigSpec extends Specification with CatsEffect { "localhost:9092", "", Set(), - Map("acks" -> "all") + Map( + "acks" -> "all", + "security.protocol" -> "SASL_SSL", + "sasl.mechanism" -> "OAUTHBEARER", + "sasl.jaas.config" -> "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;" + ) ) ), io.Concurrency(256, 1),