Skip to content

Commit

Permalink
Adding test API
Browse files Browse the repository at this point in the history
  • Loading branch information
magnusev committed Jun 4, 2021
1 parent 029e76d commit 87f8904
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 52 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package no.nav.fo.veilarbvarsel.config.kafka

import no.nav.common.kafka.consumer.KafkaConsumerClient
import no.nav.common.kafka.consumer.TopicConsumer
import no.nav.common.kafka.consumer.util.ConsumerUtils.jsonConsumer
import no.nav.common.kafka.consumer.util.KafkaConsumerClientBuilder
import no.nav.common.kafka.util.KafkaPropertiesPreset.onPremDefaultConsumerProperties
import no.nav.common.utils.NaisUtils.getCredentials
import no.nav.fo.veilarbvarsel.config.KafkaEnvironment
Expand All @@ -27,28 +25,19 @@ abstract class KafkaConsumerWrapper<K, V>(

private val props = Properties()

val consumerClient: KafkaConsumerClient<String, String>
//val consumerClient: KafkaConsumerClient<String, String>

private var shutdown = false
private var running = false

init {

val credentials = getCredentials("service_user")

consumerClient = KafkaConsumerClientBuilder.builder<String, String>()
.withProps(onPremDefaultConsumerProperties(CONSUMER_GROUP_ID, env.bootstrapServers, credentials))
.withConsumers(topicConsumers())
.build()
//val credentials = getCredentials("service_user")


// props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = env.bootstrapServers
// props["group.id"] = systemUser
// props["key.deserializer"] = StringDeserializer::class.java
// props["value.deserializer"] = KafkaEventDeserializer::class.java
// props["max.poll.records"] = 1
// props["max.partition.fetch.bytes"] = 1048576 / 2
// props["auto.offset.reset"] = "earliest"
// consumerClient = KafkaConsumerClientBuilder.builder<String, String>()
// .withProps(onPremDefaultConsumerProperties(CONSUMER_GROUP_ID, env.bootstrapServers, credentials))
// .withConsumers(topicConsumers())
// .build()
}

abstract fun handle(data: V)
Expand All @@ -66,46 +55,52 @@ abstract class KafkaConsumerWrapper<K, V>(
override fun run() {
logger.info("Starting Kafka Consumer on topics $topics")

consumerClient.start()
//consumerClient.start()

running = true
// val credentials = getCredentials("service_user")
// val consumer = KafkaConsumer<K, V>(onPremDefaultConsumerProperties(CONSUMER_GROUP_ID, env.bootstrapServers, credentials)).apply {
// subscribe(listOf(topics))
// }
//
// consumer.use {
// try {
// while (!shutdown) {
// val records = consumer.poll(Duration.ofMillis(5000))
//
// logger.info("Getting records from $topics. size: ${records.count()}")
//
// records.iterator().forEach {
// handle(it.value())
// }
// }
// } catch (e: Exception) {
// logger.error("Got exception", e)
// }
//
// logger.info("Outside while loop?")
// }
//
// logger.info("End of run. $shutdown")
// consumer.close()
val credentials = getCredentials("service_user")
val consumer = KafkaConsumer<K, V>(
onPremDefaultConsumerProperties(
CONSUMER_GROUP_ID,
env.bootstrapServers,
credentials
)
).apply {
subscribe(listOf(topics))
}

consumer.use {
try {
while (!shutdown) {
val records = consumer.poll(Duration.ofMillis(5000))

logger.info("Getting records from $topics. size: ${records.count()}")

records.iterator().forEach {
handle(it.value())
}
}
} catch (e: Exception) {
logger.error("Got exception", e)
}

logger.info("Outside while loop?")
}

logger.info("End of run. $shutdown")
consumer.close()
running = false
}

override fun close() {
logger.info("Closing Kafka Consumer on topics $topics...")

consumerClient.stop()
//consumerClient.stop()

// shutdown = true
// while (running) {
// Thread.sleep(100)
// }
shutdown = true
while (running) {
Thread.sleep(100)
}

logger.info("Kafka Consumer on topics $topics closed!")
}
Expand Down
8 changes: 4 additions & 4 deletions src/main/kotlin/no/nav/fo/veilarbvarsel/main.kt
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ fun Application.mainModule(appContext: ApplicationContext = ApplicationContext()
}
}

// install(BackgroundJob.BackgroundJobFeature("Events Consumer")) {
// job = appContext.eventConsumer
// }
install(BackgroundJob.BackgroundJobFeature("Events Consumer")) {
job = appContext.eventConsumer
}

appContext.eventConsumer.run()
//appContext.eventConsumer.run()
}

0 comments on commit 87f8904

Please sign in to comment.