Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/consumer group #4525

Merged
merged 3 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
package no.nav.mulighetsrommet.kafka

import kotlinx.coroutines.runBlocking
import net.javacrumbs.shedlock.provider.jdbc.JdbcLockProvider
import no.nav.common.kafka.consumer.KafkaConsumerClient
import no.nav.common.kafka.consumer.feilhandtering.KafkaConsumerRecordProcessor
import no.nav.common.kafka.consumer.feilhandtering.util.KafkaConsumerRecordProcessorBuilder
import no.nav.common.kafka.consumer.util.ConsumerUtils.findConsumerConfigsWithStoreOnFailure
import no.nav.common.kafka.consumer.util.KafkaConsumerClientBuilder
import no.nav.common.kafka.consumer.util.KafkaConsumerClientBuilder.TopicConfig
import no.nav.mulighetsrommet.database.Database
import no.nav.mulighetsrommet.metrics.Metrikker
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.slf4j.LoggerFactory
import java.util.*
import java.util.function.Consumer

class KafkaConsumerOrchestrator(
config: Config = Config(),
Expand All @@ -17,7 +22,7 @@ class KafkaConsumerOrchestrator(
consumers: List<KafkaTopicConsumer<*, *>>,
) {
private val logger = LoggerFactory.getLogger(javaClass)
private val consumerClients: Map<String, KafkaConsumerClient>
private val consumerClients: Map<String, Consumer>
private val consumerRecordProcessor: KafkaConsumerRecordProcessor
private val topicPoller: Poller
private val topicRepository = TopicRepository(db)
Expand All @@ -30,28 +35,30 @@ class KafkaConsumerOrchestrator(
val topicStatePollDelay: Long = 10_000,
)

private data class Consumer(
val topicConfig: TopicConfig<*, *>,
val client: KafkaConsumerClient,
)

init {
logger.info("Initializing Kafka consumer clients")

validateConsumers(consumers)

resetTopics(consumers)

val consumerTopicsConfig = consumers.map { consumer ->
consumer.toTopicConfig(kafkaConsumerRepository)
}
consumerClients = consumerTopicsConfig.associate {
val client = KafkaConsumerClientBuilder.builder()
.withProperties(consumerPreset)
.withTopicConfig(it)
.build()
it.consumerConfig.topic to client
consumerClients = consumers.associate { consumer ->
val topicConfig = toTopicConfig(consumer)
val client = toKafkaConsumerClient(consumer, consumerPreset, topicConfig)
consumer.config.id to Consumer(topicConfig, client)
}

val lockProvider = JdbcLockProvider(db.getDatasource())
val topicConfigs = consumerClients.map { it.value.topicConfig }
consumerRecordProcessor = KafkaConsumerRecordProcessorBuilder
.builder()
.withLockProvider(lockProvider)
.withLockProvider(JdbcLockProvider(db.getDatasource()))
.withKafkaConsumerRepository(kafkaConsumerRepository)
.withConsumerConfigs(findConsumerConfigsWithStoreOnFailure(consumerTopicsConfig))
.withConsumerConfigs(findConsumerConfigsWithStoreOnFailure(topicConfigs))
.build()

topicPoller = Poller(config.topicStatePollDelay) {
Expand All @@ -76,7 +83,7 @@ class KafkaConsumerOrchestrator(
}

fun getConsumers(): List<KafkaConsumerClient> {
return consumerClients.toList().map { it.second }
return consumerClients.map { it.value.client }
}

fun updateRunningTopics(topics: List<Topic>): List<Topic> {
Expand All @@ -89,9 +96,48 @@ class KafkaConsumerOrchestrator(
topicPoller.stop()
}

private fun <K, V> toTopicConfig(consumer: KafkaTopicConsumer<K, V>): TopicConfig<K, V> {
return TopicConfig<K, V>()
.withMetrics(Metrikker.appMicrometerRegistry)
.withLogging()
.withStoreOnFailure(kafkaConsumerRepository)
.withConsumerConfig(
consumer.config.topic,
consumer.keyDeserializer,
consumer.valueDeserializer,
Consumer { event ->
runBlocking {
consumer.consume(event.key(), event.value())
}
},
)
}

private fun toKafkaConsumerClient(
consumer: KafkaTopicConsumer<*, *>,
consumerPreset: Properties,
topicConfig: TopicConfig<out Any?, out Any?>,
): KafkaConsumerClient {
fun withConsumerGroupId(p: Properties, consumerGroupId: String): Properties {
val p2 = Properties()
p2.putAll(p)
p2[ConsumerConfig.GROUP_ID_CONFIG] = consumerGroupId
return p2
}

val consumerClientPreset = consumer.config.consumerGroupId
?.let { withConsumerGroupId(consumerPreset, it) }
?: consumerPreset

return KafkaConsumerClientBuilder.builder()
.withProperties(consumerClientPreset)
.withTopicConfig(topicConfig)
.build()
}

private fun updateClientRunningState() {
getTopics().forEach {
val client = consumerClients[it.topic]
val client = consumerClients[it.id]?.client
if (client != null) {
if (client.isRunning && !it.running) {
client.stop()
Expand All @@ -110,9 +156,9 @@ class KafkaConsumerOrchestrator(
val topics = consumers.map { consumer ->
val (id, topic, initialRunningState) = consumer.config

val running = currentTopics
.firstOrNull { it.id == id }
.let { it?.running ?: initialRunningState }
val running = currentTopics.firstOrNull { it.id == id }
?.running
?: initialRunningState

Topic(id = id, topic = topic, type = TopicType.CONSUMER, running = running)
}
Expand All @@ -122,3 +168,9 @@ class KafkaConsumerOrchestrator(
private fun getUpdatedTopicsOnly(updated: List<Topic>, current: List<Topic>) =
updated.filter { x -> current.any { y -> y.id == x.id && y.running != x.running } }
}

private fun validateConsumers(consumers: List<KafkaTopicConsumer<*, *>>) {
require(consumers.distinctBy { it.config.id }.size == consumers.size) {
"Each consumer must have a unique 'id'. At least two consumers share the same 'id'."
}
}
Original file line number Diff line number Diff line change
@@ -1,39 +1,19 @@
package no.nav.mulighetsrommet.kafka

import kotlinx.coroutines.runBlocking
import no.nav.common.kafka.consumer.util.KafkaConsumerClientBuilder.TopicConfig
import no.nav.mulighetsrommet.metrics.Metrikker
import org.apache.kafka.common.serialization.Deserializer
import java.util.function.Consumer

abstract class KafkaTopicConsumer<K, V>(
val config: Config,
private val keyDeserializer: Deserializer<K>,
private val valueDeserializer: Deserializer<V>,
val keyDeserializer: Deserializer<K>,
val valueDeserializer: Deserializer<V>,
) {

data class Config(
val id: String,
val topic: String,
val initialRunningState: Boolean = false,
val consumerGroupId: String? = null,
)

internal fun toTopicConfig(kafkaConsumerRepository: KafkaConsumerRepositoryImpl): TopicConfig<K, V> {
return TopicConfig<K, V>()
.withMetrics(Metrikker.appMicrometerRegistry)
.withLogging()
.withStoreOnFailure(kafkaConsumerRepository)
.withConsumerConfig(
config.topic,
keyDeserializer,
valueDeserializer,
Consumer { event ->
runBlocking {
consume(event.key(), event.value())
}
},
)
}

abstract suspend fun consume(key: K, message: V)
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ class Poller(private val delay: Long, val block: () -> Unit) {
private val scope = CoroutineScope(Dispatchers.Default)

fun start() {
block()
scope.launch {
while (scope.isActive) {
block()
delay(delay)
block()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class KafkaConsumerOrchestratorTest : FunSpec({
}

test("should store topics based on provided consumers during setup") {
val consumer = TestConsumer("foo")
val consumer = TestConsumer(id = "1", topic = "foo")

val orchestrator = KafkaConsumerOrchestrator(
KafkaConsumerOrchestrator.Config(topicStatePollDelay = Long.MAX_VALUE),
Expand All @@ -63,7 +63,7 @@ class KafkaConsumerOrchestratorTest : FunSpec({

orchestrator.getTopics() shouldContainExactly listOf(
Topic(
id = "foo",
id = "1",
topic = "foo",
type = TopicType.CONSUMER,
running = true,
Expand All @@ -72,7 +72,7 @@ class KafkaConsumerOrchestratorTest : FunSpec({
}

test("should update the consumer running state based on the topic configuration") {
val consumer = TestConsumer("foo")
val consumer = TestConsumer(id = "1", topic = "foo")

val orchestrator = KafkaConsumerOrchestrator(
KafkaConsumerOrchestrator.Config(topicStatePollDelay = 10),
Expand Down Expand Up @@ -101,7 +101,7 @@ class KafkaConsumerOrchestratorTest : FunSpec({
producer.send(ProducerRecord(topic, "key2", null))
producer.close()

val consumer = spyk(TestConsumer(topic))
val consumer = spyk(TestConsumer(id = "1", topic))

KafkaConsumerOrchestrator(
KafkaConsumerOrchestrator.Config(topicStatePollDelay = Long.MAX_VALUE),
Expand All @@ -119,6 +119,31 @@ class KafkaConsumerOrchestratorTest : FunSpec({
}
}

test("multiple consumers should be able to process events from the same topic") {
val topic = uniqueTopicName()

val producer = kafka.stringStringProducer()
producer.send(ProducerRecord(topic, "key1", "true"))
producer.close()

val consumer1 = spyk(TestConsumer("1", topic, "group-1"))
val consumer2 = spyk(TestConsumer("2", topic, "group-2"))

KafkaConsumerOrchestrator(
KafkaConsumerOrchestrator.Config(topicStatePollDelay = Long.MAX_VALUE),
kafka.getConsumerProperties(),
database.db,
listOf(consumer1, consumer2),
)

eventually(5.seconds) {
coVerify(exactly = 1) {
consumer1.consume("key1", "true")
consumer2.consume("key1", "true")
}
}
}

test("consumer should process json events from topic") {
val topic = uniqueTopicName()

Expand Down Expand Up @@ -151,7 +176,7 @@ class KafkaConsumerOrchestratorTest : FunSpec({
producer.send(ProducerRecord(topic, "false"))
producer.close()

val consumer = spyk(TestConsumer(topic))
val consumer = spyk(TestConsumer(id = "1", topic))

KafkaConsumerOrchestrator(
KafkaConsumerOrchestrator.Config(topicStatePollDelay = Long.MAX_VALUE),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import kotlinx.serialization.json.jsonPrimitive
import no.nav.common.kafka.consumer.util.deserializer.Deserializers.stringDeserializer
import no.nav.mulighetsrommet.kafka.serialization.JsonElementDeserializer

class TestConsumer(name: String) : KafkaTopicConsumer<String?, String?>(
Config(name, name, true),
class TestConsumer(id: String, topic: String, group: String? = null) : KafkaTopicConsumer<String?, String?>(
Config(id, topic, true, group),
stringDeserializer(),
stringDeserializer(),
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ data class AdGruppeNavAnsattRolleMapping(
data class KafkaConfig(
val brokerUrl: String? = null,
val producerId: String,
val consumerGroupId: String,
val defaultConsumerGroupId: String,
val producers: KafkaProducers,
val consumers: KafkaConsumers,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,15 @@ private fun kafka(appConfig: AppConfig) = module {
single { SisteTiltaksgjennomforingerV1KafkaProducer(producerClient, config.producers.tiltaksgjennomforinger) }
single { SisteTiltakstyperV2KafkaProducer(producerClient, config.producers.tiltakstyper) }

val properties = when (NaisEnv.current()) {
val consumerPreset = when (NaisEnv.current()) {
NaisEnv.Local -> KafkaPropertiesBuilder.consumerBuilder()
.withBaseProperties()
.withConsumerGroupId(config.consumerGroupId)
.withConsumerGroupId(config.defaultConsumerGroupId)
.withBrokerUrl(config.brokerUrl)
.withDeserializers(ByteArrayDeserializer::class.java, ByteArrayDeserializer::class.java)
.build()

else -> KafkaPropertiesPreset.aivenDefaultConsumerProperties(config.consumerGroupId)
else -> KafkaPropertiesPreset.aivenDefaultConsumerProperties(config.defaultConsumerGroupId)
}

single {
Expand All @@ -156,7 +156,7 @@ private fun kafka(appConfig: AppConfig) = module {
),
)
KafkaConsumerOrchestrator(
consumerPreset = properties,
consumerPreset = consumerPreset,
db = get(),
consumers = consumers,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ app:

kafka:
producerId: mulighetsrommet-api-kafka-producer.v1
consumerGroupId: mulighetsrommet-api-kafka-consumer.v1
defaultConsumerGroupId: mulighetsrommet-api-kafka-consumer.v1
producers:
arenaMigreringTiltaksgjennomforinger:
topic: team-mulighetsrommet.arena-migrering-tiltaksgjennomforinger-v1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ app:
kafka:
brokerUrl: localhost:29092
producerId: mulighetsrommet-api-kafka-producer.v1
consumerGroupId: mulighetsrommet-api-kafka-consumer.v1
defaultConsumerGroupId: mulighetsrommet-api-kafka-consumer.v1
producers:
arenaMigreringTiltaksgjennomforinger:
topic: arena-migrering-tiltaksgjennomforinger-v1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ app:

kafka:
producerId: mulighetsrommet-api-kafka-producer.v1
consumerGroupId: mulighetsrommet-api-kafka-consumer.v1
defaultConsumerGroupId: mulighetsrommet-api-kafka-consumer.v1
producers:
arenaMigreringTiltaksgjennomforinger:
topic: team-mulighetsrommet.arena-migrering-tiltaksgjennomforinger-v1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ fun createKafkaConfig(): KafkaConfig = KafkaConfig(
topic = "arena-migrering-tiltaksgjennomforinger-v1",
),
),
consumerGroupId = "mulighetsrommet-api-consumer",
defaultConsumerGroupId = "mulighetsrommet-api-consumer",
consumers = KafkaConsumers(
tiltaksgjennomforingerV1 = KafkaTopicConsumer.Config(
id = "siste-tiltaksgjennomforinger",
Expand Down