Skip to content

Commit

Permalink
Adding test API
Browse files Browse the repository at this point in the history
  • Loading branch information
magnusev committed Jun 4, 2021
1 parent 87f8904 commit 8204fae
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ class ApplicationContext {
// topic = environment.kafkaTopics.varselKvitteringOutgoing
// )

val varselProducer = VarselEventProducer(
env = environment.kafka,
topic = environment.kafkaTopics.varselIncoming
)
// val varselProducer = VarselEventProducer(
// env = environment.kafka,
// topic = environment.kafkaTopics.varselIncoming
// )


// val beskjedProducer = BrukernotifikasjonBeskjedProducer(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package no.nav.fo.veilarbvarsel.config.kafka

import no.nav.common.kafka.consumer.KafkaConsumerClient
import no.nav.common.kafka.consumer.TopicConsumer
import no.nav.common.kafka.consumer.util.ConsumerUtils.jsonConsumer
import no.nav.common.kafka.consumer.util.KafkaConsumerClientBuilder
import no.nav.common.kafka.util.KafkaPropertiesPreset.onPremDefaultConsumerProperties
import no.nav.common.utils.NaisUtils.getCredentials
import no.nav.fo.veilarbvarsel.config.KafkaEnvironment
import no.nav.fo.veilarbvarsel.config.system.features.ClosableJob
import no.nav.fo.veilarbvarsel.varsel.Test
import no.nav.fo.veilarbvarsel.varsel.VarselEvent
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.slf4j.LoggerFactory
import java.time.Duration
import java.util.*

abstract class KafkaConsumerWrapper<K, V>(
Expand All @@ -25,19 +25,19 @@ abstract class KafkaConsumerWrapper<K, V>(

private val props = Properties()

//val consumerClient: KafkaConsumerClient<String, String>
val consumerClient: KafkaConsumerClient<String, String>

private var shutdown = false
private var running = false

init {

//val credentials = getCredentials("service_user")
val credentials = getCredentials("service_user")

// consumerClient = KafkaConsumerClientBuilder.builder<String, String>()
// .withProps(onPremDefaultConsumerProperties(CONSUMER_GROUP_ID, env.bootstrapServers, credentials))
// .withConsumers(topicConsumers())
// .build()
consumerClient = KafkaConsumerClientBuilder.builder<String, String>()
.withProps(onPremDefaultConsumerProperties(CONSUMER_GROUP_ID, env.bootstrapServers, credentials))
.withConsumers(topicConsumers())
.build()
}

abstract fun handle(data: V)
Expand All @@ -55,52 +55,52 @@ abstract class KafkaConsumerWrapper<K, V>(
override fun run() {
logger.info("Starting Kafka Consumer on topics $topics")

//consumerClient.start()

running = true
val credentials = getCredentials("service_user")
val consumer = KafkaConsumer<K, V>(
onPremDefaultConsumerProperties(
CONSUMER_GROUP_ID,
env.bootstrapServers,
credentials
)
).apply {
subscribe(listOf(topics))
}

consumer.use {
try {
while (!shutdown) {
val records = consumer.poll(Duration.ofMillis(5000))

logger.info("Getting records from $topics. size: ${records.count()}")

records.iterator().forEach {
handle(it.value())
}
}
} catch (e: Exception) {
logger.error("Got exception", e)
}

logger.info("Outside while loop?")
}
consumerClient.start()

// running = true
// val credentials = getCredentials("service_user")
// val consumer = KafkaConsumer<K, V>(
// onPremDefaultConsumerProperties(
// CONSUMER_GROUP_ID,
// env.bootstrapServers,
// credentials
// )
// ).apply {
// subscribe(listOf(topics))
// }
//
// consumer.use {
// try {
// while (!shutdown) {
// val records = consumer.poll(Duration.ofMillis(5000))
//
// logger.info("Getting records from $topics. size: ${records.count()}")
//
// records.iterator().forEach {
// handle(it.value())
// }
// }
// } catch (e: Exception) {
// logger.error("Got exception", e)
// }
//
// logger.info("Outside while loop?")
// }

logger.info("End of run. $shutdown")
consumer.close()
//consumer.close()
running = false
}

override fun close() {
logger.info("Closing Kafka Consumer on topics $topics...")

//consumerClient.stop()
consumerClient.stop()

shutdown = true
while (running) {
Thread.sleep(100)
}
// while (running) {
// Thread.sleep(100)
// }

logger.info("Kafka Consumer on topics $topics closed!")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ fun Application.healthModule(appContext: ApplicationContext) {
application.log.debug(it.buildText())
}
healthApi(appContext.metrics)
varselApi(appContext.varselProducer)
//varselApi(appContext.varselProducer)
varselApi()
}
}
8 changes: 4 additions & 4 deletions src/main/kotlin/no/nav/fo/veilarbvarsel/main.kt
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ fun Application.mainModule(appContext: ApplicationContext = ApplicationContext()
}
}

install(BackgroundJob.BackgroundJobFeature("Events Consumer")) {
job = appContext.eventConsumer
}
// install(BackgroundJob.BackgroundJobFeature("Events Consumer")) {
// job = appContext.eventConsumer
// }

//appContext.eventConsumer.run()
appContext.eventConsumer.run()
}
62 changes: 31 additions & 31 deletions src/main/kotlin/no/nav/fo/veilarbvarsel/varsel/VarselApi.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@ import io.ktor.request.*
import io.ktor.response.*
import io.ktor.routing.*
import org.slf4j.LoggerFactory
import java.time.LocalDateTime
import java.util.*

fun Route.varselApi(
varselEventProducer: VarselEventProducer
//varselEventProducer: VarselEventProducer
) {

val logger = LoggerFactory.getLogger(javaClass)
Expand All @@ -19,41 +17,43 @@ fun Route.varselApi(

post {
val varsel = call.receive<Varsel>()

varselEventProducer.send(
CreateVarselVarselEvent(
UUID.randomUUID(),
LocalDateTime.now(),
varsel.system,
varsel.id,
varsel.type,
varsel.fodselsnummer,
varsel.groupId,
varsel.message,
varsel.link.toString(),
varsel.sikkerhetsnivaa,
varsel.visibleUntil,
varsel.externalVarsling
)
)
println("Here 4")

// varselEventProducer.send(
// CreateVarselVarselEvent(
// UUID.randomUUID(),
// LocalDateTime.now(),
// varsel.system,
// varsel.id,
// varsel.type,
// varsel.fodselsnummer,
// varsel.groupId,
// varsel.message,
// varsel.link.toString(),
// varsel.sikkerhetsnivaa,
// varsel.visibleUntil,
// varsel.externalVarsling
// )
// )

call.respond(HttpStatusCode.Created)

}

post("/done") {
val done = call.receive<Done>()

varselEventProducer.send(
DoneVarselEvent(
UUID.randomUUID(),
LocalDateTime.now(),
done.system,
done.id,
done.fodselsnummer,
done.groupId
)
)
println("Here 5")

// varselEventProducer.send(
// DoneVarselEvent(
// UUID.randomUUID(),
// LocalDateTime.now(),
// done.system,
// done.id,
// done.fodselsnummer,
// done.groupId
// )
// )

call.respond(HttpStatusCode.Created)
}
Expand Down

0 comments on commit 8204fae

Please sign in to comment.