Skip to content
This repository has been archived by the owner on Nov 13, 2024. It is now read-only.

Commit

Permalink
more...
Browse files Browse the repository at this point in the history
  • Loading branch information
magnusev committed Jun 3, 2021
1 parent d2fc236 commit f8189fe
Showing 1 changed file with 36 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ import no.nav.fo.veilarbvarsel.config.KafkaEnvironment
import no.nav.fo.veilarbvarsel.config.system.features.ClosableJob
import no.nav.fo.veilarbvarsel.varsel.Test
import no.nav.fo.veilarbvarsel.varsel.VarselEvent
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.slf4j.LoggerFactory
import java.time.Duration
import java.util.*

abstract class KafkaConsumerWrapper<K, V>(
env: KafkaEnvironment,
val env: KafkaEnvironment,
systemUser: String,
private val topics: String
) : ClosableJob {
Expand All @@ -25,19 +27,19 @@ abstract class KafkaConsumerWrapper<K, V>(

private val props = Properties()

val consumerClient: KafkaConsumerClient<String, String>
//val consumerClient: KafkaConsumerClient<String, String>

private var shutdown = false
private var running = false

init {

val credentials = getCredentials("service_user")

consumerClient = KafkaConsumerClientBuilder.builder<String, String>()
.withProps(onPremDefaultConsumerProperties(CONSUMER_GROUP_ID, env.bootstrapServers, credentials))
.withConsumers(topicConsumers())
.build()
// val credentials = getCredentials("service_user")
//
// consumerClient = KafkaConsumerClientBuilder.builder<String, String>()
// .withProps(onPremDefaultConsumerProperties(CONSUMER_GROUP_ID, env.bootstrapServers, credentials))
// .withConsumers(topicConsumers())
// .build()


// props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = env.bootstrapServers
Expand All @@ -64,39 +66,39 @@ abstract class KafkaConsumerWrapper<K, V>(
override fun run() {
logger.info("Starting Kafka Consumer on topics $topics")

consumerClient.start()
//consumerClient.start()

// running = true
//
// val consumer = KafkaConsumer<K, V>(props).apply {
// subscribe(topics)
// }
//
// consumer.use {
// while (!shutdown) {
// val records = consumer.poll(Duration.ofMillis(5000))
//
// logger.info("Getting records from $topics. size: ${records.count()}")
//
// records.iterator().forEach {
// handle(it.value())
// }
// }
// }
//
// consumer.close()
// running = false
running = true
val credentials = getCredentials("service_user")
val consumer = KafkaConsumer<K, V>(onPremDefaultConsumerProperties(CONSUMER_GROUP_ID, env.bootstrapServers, credentials)).apply {
subscribe(listOf(topics))
}

consumer.use {
while (!shutdown) {
val records = consumer.poll(Duration.ofMillis(5000))

logger.info("Getting records from $topics. size: ${records.count()}")

records.iterator().forEach {
handle(it.value())
}
}
}

consumer.close()
running = false
}

override fun close() {
logger.info("Closing Kafka Consumer on topics $topics...")

consumerClient.stop()
//consumerClient.stop()

// shutdown = true
// while (running) {
// Thread.sleep(100)
// }
shutdown = true
while (running) {
Thread.sleep(100)
}

logger.info("Kafka Consumer on topics $topics closed!")
}
Expand Down

0 comments on commit f8189fe

Please sign in to comment.