From f8189fe31c9058e7bba4cfe0ed563dd7c34fe3d3 Mon Sep 17 00:00:00 2001 From: Magnus Evensberget Date: Thu, 3 Jun 2021 14:30:43 +0200 Subject: [PATCH] more... --- .../config/kafka/KafkaConsumerWrapper.kt | 70 ++++++++++--------- 1 file changed, 36 insertions(+), 34 deletions(-) diff --git a/src/main/kotlin/no/nav/fo/veilarbvarsel/config/kafka/KafkaConsumerWrapper.kt b/src/main/kotlin/no/nav/fo/veilarbvarsel/config/kafka/KafkaConsumerWrapper.kt index 507a291..2547468 100644 --- a/src/main/kotlin/no/nav/fo/veilarbvarsel/config/kafka/KafkaConsumerWrapper.kt +++ b/src/main/kotlin/no/nav/fo/veilarbvarsel/config/kafka/KafkaConsumerWrapper.kt @@ -10,11 +10,13 @@ import no.nav.fo.veilarbvarsel.config.KafkaEnvironment import no.nav.fo.veilarbvarsel.config.system.features.ClosableJob import no.nav.fo.veilarbvarsel.varsel.Test import no.nav.fo.veilarbvarsel.varsel.VarselEvent +import org.apache.kafka.clients.consumer.KafkaConsumer import org.slf4j.LoggerFactory +import java.time.Duration import java.util.* abstract class KafkaConsumerWrapper( - env: KafkaEnvironment, + val env: KafkaEnvironment, systemUser: String, private val topics: String ) : ClosableJob { @@ -25,19 +27,19 @@ abstract class KafkaConsumerWrapper( private val props = Properties() - val consumerClient: KafkaConsumerClient + //val consumerClient: KafkaConsumerClient private var shutdown = false private var running = false init { - val credentials = getCredentials("service_user") - - consumerClient = KafkaConsumerClientBuilder.builder() - .withProps(onPremDefaultConsumerProperties(CONSUMER_GROUP_ID, env.bootstrapServers, credentials)) - .withConsumers(topicConsumers()) - .build() +// val credentials = getCredentials("service_user") +// +// consumerClient = KafkaConsumerClientBuilder.builder() +// .withProps(onPremDefaultConsumerProperties(CONSUMER_GROUP_ID, env.bootstrapServers, credentials)) +// .withConsumers(topicConsumers()) +// .build() // props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = env.bootstrapServers @@ -64,39 +66,39 @@ abstract class KafkaConsumerWrapper( override fun run() { logger.info("Starting Kafka Consumer on topics $topics") - consumerClient.start() + //consumerClient.start() -// running = true -// -// val consumer = KafkaConsumer(props).apply { -// subscribe(topics) -// } -// -// consumer.use { -// while (!shutdown) { -// val records = consumer.poll(Duration.ofMillis(5000)) -// -// logger.info("Getting records from $topics. size: ${records.count()}") -// -// records.iterator().forEach { -// handle(it.value()) -// } -// } -// } -// -// consumer.close() -// running = false + running = true + val credentials = getCredentials("service_user") + val consumer = KafkaConsumer(onPremDefaultConsumerProperties(CONSUMER_GROUP_ID, env.bootstrapServers, credentials)).apply { + subscribe(listOf(topics)) + } + + consumer.use { + while (!shutdown) { + val records = consumer.poll(Duration.ofMillis(5000)) + + logger.info("Getting records from $topics. size: ${records.count()}") + + records.iterator().forEach { + handle(it.value()) + } + } + } + + consumer.close() + running = false } override fun close() { logger.info("Closing Kafka Consumer on topics $topics...") - consumerClient.stop() + //consumerClient.stop() -// shutdown = true -// while (running) { -// Thread.sleep(100) -// } + shutdown = true + while (running) { + Thread.sleep(100) + } logger.info("Kafka Consumer on topics $topics closed!") }