Skip to content

Commit

Permalink
Merge pull request #4525 from navikt/feature/consumer-group
Browse files Browse the repository at this point in the history
Feature/consumer group
  • Loading branch information
sondrele authored Oct 24, 2024
2 parents f902dd4 + c404e39 commit 4267316
Show file tree
Hide file tree
Showing 11 changed files with 116 additions and 58 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
package no.nav.mulighetsrommet.kafka

import kotlinx.coroutines.runBlocking
import net.javacrumbs.shedlock.provider.jdbc.JdbcLockProvider
import no.nav.common.kafka.consumer.KafkaConsumerClient
import no.nav.common.kafka.consumer.feilhandtering.KafkaConsumerRecordProcessor
import no.nav.common.kafka.consumer.feilhandtering.util.KafkaConsumerRecordProcessorBuilder
import no.nav.common.kafka.consumer.util.ConsumerUtils.findConsumerConfigsWithStoreOnFailure
import no.nav.common.kafka.consumer.util.KafkaConsumerClientBuilder
import no.nav.common.kafka.consumer.util.KafkaConsumerClientBuilder.TopicConfig
import no.nav.mulighetsrommet.database.Database
import no.nav.mulighetsrommet.metrics.Metrikker
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.slf4j.LoggerFactory
import java.util.*
import java.util.function.Consumer

class KafkaConsumerOrchestrator(
config: Config = Config(),
Expand All @@ -17,7 +22,7 @@ class KafkaConsumerOrchestrator(
consumers: List<KafkaTopicConsumer<*, *>>,
) {
private val logger = LoggerFactory.getLogger(javaClass)
private val consumerClients: Map<String, KafkaConsumerClient>
private val consumerClients: Map<String, Consumer>
private val consumerRecordProcessor: KafkaConsumerRecordProcessor
private val topicPoller: Poller
private val topicRepository = TopicRepository(db)
Expand All @@ -30,28 +35,30 @@ class KafkaConsumerOrchestrator(
val topicStatePollDelay: Long = 10_000,
)

private data class Consumer(
val topicConfig: TopicConfig<*, *>,
val client: KafkaConsumerClient,
)

init {
logger.info("Initializing Kafka consumer clients")

validateConsumers(consumers)

resetTopics(consumers)

val consumerTopicsConfig = consumers.map { consumer ->
consumer.toTopicConfig(kafkaConsumerRepository)
}
consumerClients = consumerTopicsConfig.associate {
val client = KafkaConsumerClientBuilder.builder()
.withProperties(consumerPreset)
.withTopicConfig(it)
.build()
it.consumerConfig.topic to client
consumerClients = consumers.associate { consumer ->
val topicConfig = toTopicConfig(consumer)
val client = toKafkaConsumerClient(consumer, consumerPreset, topicConfig)
consumer.config.id to Consumer(topicConfig, client)
}

val lockProvider = JdbcLockProvider(db.getDatasource())
val topicConfigs = consumerClients.map { it.value.topicConfig }
consumerRecordProcessor = KafkaConsumerRecordProcessorBuilder
.builder()
.withLockProvider(lockProvider)
.withLockProvider(JdbcLockProvider(db.getDatasource()))
.withKafkaConsumerRepository(kafkaConsumerRepository)
.withConsumerConfigs(findConsumerConfigsWithStoreOnFailure(consumerTopicsConfig))
.withConsumerConfigs(findConsumerConfigsWithStoreOnFailure(topicConfigs))
.build()

topicPoller = Poller(config.topicStatePollDelay) {
Expand All @@ -76,7 +83,7 @@ class KafkaConsumerOrchestrator(
}

fun getConsumers(): List<KafkaConsumerClient> {
return consumerClients.toList().map { it.second }
return consumerClients.map { it.value.client }
}

fun updateRunningTopics(topics: List<Topic>): List<Topic> {
Expand All @@ -89,9 +96,48 @@ class KafkaConsumerOrchestrator(
topicPoller.stop()
}

private fun <K, V> toTopicConfig(consumer: KafkaTopicConsumer<K, V>): TopicConfig<K, V> {
return TopicConfig<K, V>()
.withMetrics(Metrikker.appMicrometerRegistry)
.withLogging()
.withStoreOnFailure(kafkaConsumerRepository)
.withConsumerConfig(
consumer.config.topic,
consumer.keyDeserializer,
consumer.valueDeserializer,
Consumer { event ->
runBlocking {
consumer.consume(event.key(), event.value())
}
},
)
}

private fun toKafkaConsumerClient(
consumer: KafkaTopicConsumer<*, *>,
consumerPreset: Properties,
topicConfig: TopicConfig<out Any?, out Any?>,
): KafkaConsumerClient {
fun withConsumerGroupId(p: Properties, consumerGroupId: String): Properties {
val p2 = Properties()
p2.putAll(p)
p2[ConsumerConfig.GROUP_ID_CONFIG] = consumerGroupId
return p2
}

val consumerClientPreset = consumer.config.consumerGroupId
?.let { withConsumerGroupId(consumerPreset, it) }
?: consumerPreset

return KafkaConsumerClientBuilder.builder()
.withProperties(consumerClientPreset)
.withTopicConfig(topicConfig)
.build()
}

private fun updateClientRunningState() {
getTopics().forEach {
val client = consumerClients[it.topic]
val client = consumerClients[it.id]?.client
if (client != null) {
if (client.isRunning && !it.running) {
client.stop()
Expand All @@ -110,9 +156,9 @@ class KafkaConsumerOrchestrator(
val topics = consumers.map { consumer ->
val (id, topic, initialRunningState) = consumer.config

val running = currentTopics
.firstOrNull { it.id == id }
.let { it?.running ?: initialRunningState }
val running = currentTopics.firstOrNull { it.id == id }
?.running
?: initialRunningState

Topic(id = id, topic = topic, type = TopicType.CONSUMER, running = running)
}
Expand All @@ -122,3 +168,9 @@ class KafkaConsumerOrchestrator(
private fun getUpdatedTopicsOnly(updated: List<Topic>, current: List<Topic>) =
updated.filter { x -> current.any { y -> y.id == x.id && y.running != x.running } }
}

private fun validateConsumers(consumers: List<KafkaTopicConsumer<*, *>>) {
require(consumers.distinctBy { it.config.id }.size == consumers.size) {
"Each consumer must have a unique 'id'. At least two consumers share the same 'id'."
}
}
Original file line number Diff line number Diff line change
@@ -1,39 +1,19 @@
package no.nav.mulighetsrommet.kafka

import kotlinx.coroutines.runBlocking
import no.nav.common.kafka.consumer.util.KafkaConsumerClientBuilder.TopicConfig
import no.nav.mulighetsrommet.metrics.Metrikker
import org.apache.kafka.common.serialization.Deserializer
import java.util.function.Consumer

abstract class KafkaTopicConsumer<K, V>(
val config: Config,
private val keyDeserializer: Deserializer<K>,
private val valueDeserializer: Deserializer<V>,
val keyDeserializer: Deserializer<K>,
val valueDeserializer: Deserializer<V>,
) {

data class Config(
val id: String,
val topic: String,
val initialRunningState: Boolean = false,
val consumerGroupId: String? = null,
)

internal fun toTopicConfig(kafkaConsumerRepository: KafkaConsumerRepositoryImpl): TopicConfig<K, V> {
return TopicConfig<K, V>()
.withMetrics(Metrikker.appMicrometerRegistry)
.withLogging()
.withStoreOnFailure(kafkaConsumerRepository)
.withConsumerConfig(
config.topic,
keyDeserializer,
valueDeserializer,
Consumer { event ->
runBlocking {
consume(event.key(), event.value())
}
},
)
}

abstract suspend fun consume(key: K, message: V)
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ class Poller(private val delay: Long, val block: () -> Unit) {
private val scope = CoroutineScope(Dispatchers.Default)

fun start() {
block()
scope.launch {
while (scope.isActive) {
block()
delay(delay)
block()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class KafkaConsumerOrchestratorTest : FunSpec({
}

test("should store topics based on provided consumers during setup") {
val consumer = TestConsumer("foo")
val consumer = TestConsumer(id = "1", topic = "foo")

val orchestrator = KafkaConsumerOrchestrator(
KafkaConsumerOrchestrator.Config(topicStatePollDelay = Long.MAX_VALUE),
Expand All @@ -63,7 +63,7 @@ class KafkaConsumerOrchestratorTest : FunSpec({

orchestrator.getTopics() shouldContainExactly listOf(
Topic(
id = "foo",
id = "1",
topic = "foo",
type = TopicType.CONSUMER,
running = true,
Expand All @@ -72,7 +72,7 @@ class KafkaConsumerOrchestratorTest : FunSpec({
}

test("should update the consumer running state based on the topic configuration") {
val consumer = TestConsumer("foo")
val consumer = TestConsumer(id = "1", topic = "foo")

val orchestrator = KafkaConsumerOrchestrator(
KafkaConsumerOrchestrator.Config(topicStatePollDelay = 10),
Expand Down Expand Up @@ -101,7 +101,7 @@ class KafkaConsumerOrchestratorTest : FunSpec({
producer.send(ProducerRecord(topic, "key2", null))
producer.close()

val consumer = spyk(TestConsumer(topic))
val consumer = spyk(TestConsumer(id = "1", topic))

KafkaConsumerOrchestrator(
KafkaConsumerOrchestrator.Config(topicStatePollDelay = Long.MAX_VALUE),
Expand All @@ -119,6 +119,31 @@ class KafkaConsumerOrchestratorTest : FunSpec({
}
}

test("multiple consumers should be able to process events from the same topic") {
val topic = uniqueTopicName()

val producer = kafka.stringStringProducer()
producer.send(ProducerRecord(topic, "key1", "true"))
producer.close()

val consumer1 = spyk(TestConsumer("1", topic, "group-1"))
val consumer2 = spyk(TestConsumer("2", topic, "group-2"))

KafkaConsumerOrchestrator(
KafkaConsumerOrchestrator.Config(topicStatePollDelay = Long.MAX_VALUE),
kafka.getConsumerProperties(),
database.db,
listOf(consumer1, consumer2),
)

eventually(5.seconds) {
coVerify(exactly = 1) {
consumer1.consume("key1", "true")
consumer2.consume("key1", "true")
}
}
}

test("consumer should process json events from topic") {
val topic = uniqueTopicName()

Expand Down Expand Up @@ -151,7 +176,7 @@ class KafkaConsumerOrchestratorTest : FunSpec({
producer.send(ProducerRecord(topic, "false"))
producer.close()

val consumer = spyk(TestConsumer(topic))
val consumer = spyk(TestConsumer(id = "1", topic))

KafkaConsumerOrchestrator(
KafkaConsumerOrchestrator.Config(topicStatePollDelay = Long.MAX_VALUE),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import kotlinx.serialization.json.jsonPrimitive
import no.nav.common.kafka.consumer.util.deserializer.Deserializers.stringDeserializer
import no.nav.mulighetsrommet.kafka.serialization.JsonElementDeserializer

class TestConsumer(name: String) : KafkaTopicConsumer<String?, String?>(
Config(name, name, true),
class TestConsumer(id: String, topic: String, group: String? = null) : KafkaTopicConsumer<String?, String?>(
Config(id, topic, true, group),
stringDeserializer(),
stringDeserializer(),
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ data class AdGruppeNavAnsattRolleMapping(
data class KafkaConfig(
val brokerUrl: String? = null,
val producerId: String,
val consumerGroupId: String,
val defaultConsumerGroupId: String,
val producers: KafkaProducers,
val consumers: KafkaConsumers,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,15 @@ private fun kafka(appConfig: AppConfig) = module {
single { SisteTiltaksgjennomforingerV1KafkaProducer(producerClient, config.producers.tiltaksgjennomforinger) }
single { SisteTiltakstyperV2KafkaProducer(producerClient, config.producers.tiltakstyper) }

val properties = when (NaisEnv.current()) {
val consumerPreset = when (NaisEnv.current()) {
NaisEnv.Local -> KafkaPropertiesBuilder.consumerBuilder()
.withBaseProperties()
.withConsumerGroupId(config.consumerGroupId)
.withConsumerGroupId(config.defaultConsumerGroupId)
.withBrokerUrl(config.brokerUrl)
.withDeserializers(ByteArrayDeserializer::class.java, ByteArrayDeserializer::class.java)
.build()

else -> KafkaPropertiesPreset.aivenDefaultConsumerProperties(config.consumerGroupId)
else -> KafkaPropertiesPreset.aivenDefaultConsumerProperties(config.defaultConsumerGroupId)
}

single {
Expand All @@ -156,7 +156,7 @@ private fun kafka(appConfig: AppConfig) = module {
),
)
KafkaConsumerOrchestrator(
consumerPreset = properties,
consumerPreset = consumerPreset,
db = get(),
consumers = consumers,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ app:

kafka:
producerId: mulighetsrommet-api-kafka-producer.v1
consumerGroupId: mulighetsrommet-api-kafka-consumer.v1
defaultConsumerGroupId: mulighetsrommet-api-kafka-consumer.v1
producers:
arenaMigreringTiltaksgjennomforinger:
topic: team-mulighetsrommet.arena-migrering-tiltaksgjennomforinger-v1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ app:
kafka:
brokerUrl: localhost:29092
producerId: mulighetsrommet-api-kafka-producer.v1
consumerGroupId: mulighetsrommet-api-kafka-consumer.v1
defaultConsumerGroupId: mulighetsrommet-api-kafka-consumer.v1
producers:
arenaMigreringTiltaksgjennomforinger:
topic: arena-migrering-tiltaksgjennomforinger-v1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ app:

kafka:
producerId: mulighetsrommet-api-kafka-producer.v1
consumerGroupId: mulighetsrommet-api-kafka-consumer.v1
defaultConsumerGroupId: mulighetsrommet-api-kafka-consumer.v1
producers:
arenaMigreringTiltaksgjennomforinger:
topic: team-mulighetsrommet.arena-migrering-tiltaksgjennomforinger-v1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ fun createKafkaConfig(): KafkaConfig = KafkaConfig(
topic = "arena-migrering-tiltaksgjennomforinger-v1",
),
),
consumerGroupId = "mulighetsrommet-api-consumer",
defaultConsumerGroupId = "mulighetsrommet-api-consumer",
consumers = KafkaConsumers(
tiltaksgjennomforingerV1 = KafkaTopicConsumer.Config(
id = "siste-tiltaksgjennomforinger",
Expand Down

0 comments on commit 4267316

Please sign in to comment.