Skip to content

Commit

Permalink
Added kafka-consumer for oppgavehendelse, and updating db
Browse files Browse the repository at this point in the history
  • Loading branch information
andreasDev committed Jul 18, 2023
1 parent 8160c81 commit 2a971f1
Show file tree
Hide file tree
Showing 14 changed files with 268 additions and 156 deletions.
2 changes: 2 additions & 0 deletions naiserator-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,5 @@ spec:
value: api://dev-fss.oppgavehandtering.oppgave-q1/.default
- name: OPPGAVEBEHANDLING_URL
value: https://oppgave-q1.dev-fss-pub.nais.io/api/v1/oppgaver
- name: OPPGAVE_HENDELSE_TOPIC
value: oppgavehandtering.oppgavehendelse-q1-v1
2 changes: 2 additions & 0 deletions naiserator-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,5 @@ spec:
value: api://prod-fss.oppgavehandtering.oppgave/.default
- name: OPPGAVEBEHANDLING_URL
value: https://oppgave.prod-fss-pub.nais.io/api/v1/oppgaver
- name: OPPGAVE_HENDELSE_TOPIC
value: oppgavehandtering.oppgavehendelse-v1
18 changes: 13 additions & 5 deletions src/main/kotlin/no/nav/syfo/Bootstrap.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import no.nav.syfo.clients.HttpClients
import no.nav.syfo.clients.KafkaConsumers
import no.nav.syfo.clients.KafkaProducers
import no.nav.syfo.db.Database
import no.nav.syfo.oppgave.kafka.OppgaveHendelseConsumer
import no.nav.syfo.oppgave.service.OppgaveService
import no.nav.syfo.persistering.MottattSykmeldingService
import no.nav.syfo.service.ManuellOppgaveService
Expand Down Expand Up @@ -91,12 +92,19 @@ fun main() {
manuellOppgaveService = manuellOppgaveService,
)

GlobalScope.launch {
applicationState.ready = true
val oppgaveHendelseConsumer = OppgaveHendelseConsumer(
kafkaConsumer = kafkaConsumers.oppgaveHendelseConsumer,
topic = env.oppgaveHendelseTopic,
applicationState,
database,
)
applicationState.ready = true

createListener(applicationState) {
mottattSykmeldingService.startAivenConsumer()
}
createListener(applicationState) {
mottattSykmeldingService.startAivenConsumer()
}
createListener(applicationState) {
oppgaveHendelseConsumer.start()
}

ApplicationServer(applicationEngine, applicationState).start()
Expand Down
1 change: 1 addition & 0 deletions src/main/kotlin/no/nav/syfo/Environment.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ data class Environment(
val dbPort: String = getEnvVar("DB_PORT"),
val dbName: String = getEnvVar("DB_DATABASE"),
val cluster: String = getEnvVar("NAIS_CLUSTER_NAME"),
val oppgaveHendelseTopic: String = getEnvVar("OPPGAVE_HENDELSE_TOPIC"),
)

fun getEnvVar(varName: String, defaultValue: String? = null) =
Expand Down
2 changes: 2 additions & 0 deletions src/main/kotlin/no/nav/syfo/aksessering/UlosteOppgave.kt
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package no.nav.syfo.aksessering

import no.nav.syfo.model.ManuellOppgaveStatus
import java.time.LocalDateTime

data class UlosteOppgave(
val oppgaveId: Int,
val mottattDato: LocalDateTime,
val status: ManuellOppgaveStatus? = null,
)
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,19 @@ import no.nav.syfo.objectMapper
import java.sql.ResultSet
import java.time.LocalDateTime

fun DatabaseInterface.finnesOppgave(oppgaveId: Int) =
connection.use { connection ->
connection.prepareStatement(
"""
suspend fun DatabaseInterface.finnesOppgave(oppgaveId: Int) =
withContext(Dispatchers.IO) {
connection.use { connection ->
connection.prepareStatement(
"""
SELECT true
FROM MANUELLOPPGAVE
WHERE oppgaveid=?;
""",
).use {
it.setInt(1, oppgaveId)
it.executeQuery().next()
).use {
it.setInt(1, oppgaveId)
it.executeQuery().next()
}
}
}

Expand All @@ -44,7 +46,7 @@ suspend fun DatabaseInterface.finnesSykmelding(id: String) =
}
}

fun DatabaseInterface.erApprecSendt(oppgaveId: Int) =
suspend fun DatabaseInterface.erApprecSendt(oppgaveId: Int) = withContext(Dispatchers.IO) {
connection.use { connection ->
connection.prepareStatement(
"""
Expand All @@ -59,20 +61,23 @@ fun DatabaseInterface.erApprecSendt(oppgaveId: Int) =
it.executeQuery().next()
}
}
}

fun DatabaseInterface.hentManuellOppgaver(oppgaveId: Int): ManuellOppgaveDTO? =
connection.use { connection ->
connection.prepareStatement(
"""
suspend fun DatabaseInterface.hentManuellOppgaver(oppgaveId: Int): ManuellOppgaveDTO? =
withContext(Dispatchers.IO) {
connection.use { connection ->
connection.prepareStatement(
"""
SELECT oppgaveid,receivedsykmelding,validationresult
FROM MANUELLOPPGAVE
WHERE oppgaveid=?
AND ferdigstilt=?;
""",
).use {
it.setInt(1, oppgaveId)
it.setBoolean(2, false)
it.executeQuery().toList { toManuellOppgaveDTO() }.firstOrNull()
).use {
it.setInt(1, oppgaveId)
it.setBoolean(2, false)
it.executeQuery().toList { toManuellOppgaveDTO() }.firstOrNull()
}
}
}

Expand All @@ -87,41 +92,47 @@ fun ResultSet.toManuellOppgaveDTO(): ManuellOppgaveDTO {
)
}

fun DatabaseInterface.hentKomplettManuellOppgave(oppgaveId: Int): List<ManuellOppgaveKomplett> =
connection.use { connection ->
connection.prepareStatement(
"""
suspend fun DatabaseInterface.hentKomplettManuellOppgave(oppgaveId: Int): List<ManuellOppgaveKomplett> =
withContext(Dispatchers.IO) {
connection.use { connection ->
connection.prepareStatement(
"""
SELECT receivedsykmelding,validationresult,apprec,oppgaveid,ferdigstilt,sendt_apprec,opprinnelig_validationresult
FROM MANUELLOPPGAVE
WHERE oppgaveid=?;
""",
).use {
it.setInt(1, oppgaveId)
it.executeQuery().toList { toManuellOppgave() }
).use {
it.setInt(1, oppgaveId)
it.executeQuery().toList { toManuellOppgave() }
}
}
}

fun DatabaseInterface.hentManuellOppgaveForSykmeldingId(sykmeldingId: String): ManuellOppgaveKomplett? =
connection.use { connection ->
connection.prepareStatement(
"""
suspend fun DatabaseInterface.hentManuellOppgaveForSykmeldingId(sykmeldingId: String): ManuellOppgaveKomplett? =
withContext(Dispatchers.IO) {
connection.use { connection ->
connection.prepareStatement(
"""
SELECT receivedsykmelding,validationresult,apprec,oppgaveid,ferdigstilt,sendt_apprec,opprinnelig_validationresult
FROM MANUELLOPPGAVE
WHERE receivedsykmelding->'sykmelding'->>'id' = ?;
""",
).use {
it.setString(1, sykmeldingId)
it.executeQuery().toList { toManuellOppgave() }.firstOrNull()
).use {
it.setString(1, sykmeldingId)
it.executeQuery().toList { toManuellOppgave() }.firstOrNull()
}
}
}
fun DatabaseInterface.getUlosteOppgaver(): List<UlosteOppgave> =
connection.use { connection ->
connection.prepareStatement(
"""select receivedsykmelding->>'mottattDato' as dato, oppgaveId FROM MANUELLOPPGAVE
suspend fun DatabaseInterface.getUlosteOppgaver(): List<UlosteOppgave> =
withContext(Dispatchers.IO) {
connection.use { connection ->
connection.prepareStatement(
"""select receivedsykmelding->>'mottattDato' as dato, oppgaveId FROM MANUELLOPPGAVE
WHERE ferdigstilt is not true
""",
).use {
it.executeQuery().toList { toUlostOppgave() }
).use {
it.executeQuery().toList { toUlostOppgave() }
}
}
}
fun ResultSet.toUlostOppgave(): UlosteOppgave =
Expand Down
49 changes: 39 additions & 10 deletions src/main/kotlin/no/nav/syfo/clients/KafkaConsumers.kt
Original file line number Diff line number Diff line change
@@ -1,22 +1,51 @@
package no.nav.syfo.clients

import com.fasterxml.jackson.databind.DeserializationFeature
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import com.fasterxml.jackson.module.kotlin.registerKotlinModule
import no.nav.syfo.Environment
import no.nav.syfo.kafka.aiven.KafkaUtils
import no.nav.syfo.kafka.toConsumerConfig
import no.nav.syfo.oppgave.kafka.OppgaveKafkaAivenRecord
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.Deserializer
import org.apache.kafka.common.serialization.StringDeserializer

class KafkaOppgaveDeserializer : Deserializer<OppgaveKafkaAivenRecord> {
private val objectMapper = ObjectMapper()
.registerModule(JavaTimeModule())
.registerKotlinModule()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)

override fun deserialize(topic: String, bytes: ByteArray): OppgaveKafkaAivenRecord {
return objectMapper.readValue(bytes, OppgaveKafkaAivenRecord::class.java)
}
}
class KafkaConsumers(env: Environment) {
private val consumerPropertiesAiven = KafkaUtils.getAivenKafkaConfig().also {
it.let {
it[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = "1"
it[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "none"
}
}.toConsumerConfig(
"${env.applicationName}-consumer",
valueDeserializer = StringDeserializer::class,
val kafkaAivenConsumerManuellOppgave = KafkaConsumer<String, String>(
KafkaUtils.getAivenKafkaConfig().also {
it.let {
it[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = "1"
it[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "none"
}
}.toConsumerConfig(
"${env.applicationName}-consumer",
valueDeserializer = StringDeserializer::class,
),
)
val oppgaveHendelseConsumer = KafkaConsumer(
KafkaUtils.getAivenKafkaConfig()
.also {
it[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
it[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = 50
}
.toConsumerConfig(
"${env.applicationName}-consumer",
KafkaOppgaveDeserializer::class,
),
StringDeserializer(),
KafkaOppgaveDeserializer(),
)

val kafkaAivenConsumerManuellOppgave = KafkaConsumer<String, String>(consumerPropertiesAiven)
}
6 changes: 6 additions & 0 deletions src/main/kotlin/no/nav/syfo/model/ManuellOppgave.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,9 @@ fun Apprec.toPGObject() = PGobject().also {
it.type = "json"
it.value = objectMapper.writeValueAsString(this)
}

enum class ManuellOppgaveStatus {
APEN,
FERDIGSTILT,
FEILREGISTRERT,
}
Original file line number Diff line number Diff line change
@@ -1,22 +1,31 @@
package no.nav.syfo.oppgave.kafka

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.withContext
import no.nav.syfo.aksessering.db.finnesOppgave
import no.nav.syfo.application.ApplicationState
import no.nav.syfo.log
import no.nav.syfo.db.DatabaseInterface
import no.nav.syfo.persistering.db.oppdaterOppgaveHendelse
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.errors.AuthorizationException
import org.apache.kafka.common.errors.ClusterAuthorizationException
import org.slf4j.LoggerFactory
import kotlin.time.Duration.Companion.seconds
import kotlin.time.toJavaDuration

class OppgaveHendelseConsumer(
private val kafkaConsumer: KafkaConsumer<String, OppgaveKafkaAivenRecord>,
private val topic: String,
private val applicationState: ApplicationState,
private val database: DatabaseInterface,
) {

companion object {
private val DELAY_ON_ERROR_SECONDS = 60.seconds
private val POLL_TIME_DURATION = 10.seconds
private val log = LoggerFactory.getLogger(OppgaveHendelseConsumer::class.java)
}

suspend fun start() {
Expand All @@ -25,26 +34,44 @@ class OppgaveHendelseConsumer(
kafkaConsumer.subscribe(listOf(topic))
consumeMessages()
} catch (ex: Exception) {
when (ex) {
is AuthorizationException, is ClusterAuthorizationException -> {
throw ex
}

else -> {
log.error("Aiven: Caught exception, unsubscribing and retrying", ex)
kafkaConsumer.unsubscribe()
delay(DELAY_ON_ERROR_SECONDS)
}
}
handleException(ex)
}
}
}

suspend fun consumeMessages() {
private suspend fun consumeMessages() {
while (applicationState.ready) {
val records = kafkaConsumer.poll(POLL_TIME_DURATION.toJavaDuration())
records.forEach {
val oppgaveHendlese = it.value()
val records = withContext(Dispatchers.IO) {
kafkaConsumer.poll(POLL_TIME_DURATION.toJavaDuration())
}
records.forEach { record ->
processRecord(record)
}
}
}

private suspend fun processRecord(record: ConsumerRecord<String, OppgaveKafkaAivenRecord>) {
val oppgaveHendlese = record.value()
val oppgaveStatus = oppgaveHendlese.hendelse.hendelsestype.manuellOppgaveStatus()
val timestamp = oppgaveHendlese.hendelse.tidspunkt
val oppgaveId = oppgaveHendlese.oppgave.oppgaveId.toInt()
if (database.finnesOppgave(oppgaveId)) {
log.info("Oppdaterer oppgave for oppgaveId: {} til {}", oppgaveId, oppgaveStatus)
database.oppdaterOppgaveHendelse(
oppgaveId = oppgaveId,
status = oppgaveStatus,
statusTimestamp = timestamp,
)
}
}

private suspend fun handleException(exception: Exception) {
when (exception) {
is AuthorizationException, is ClusterAuthorizationException -> throw exception
else -> {
log.error("Aiven: Caught exception, unsubscribing and retrying", exception)
kafkaConsumer.unsubscribe()
delay(DELAY_ON_ERROR_SECONDS)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package no.nav.syfo.oppgave.kafka

import no.nav.syfo.model.ManuellOppgaveStatus
import java.time.LocalDateTime

data class OppgaveKafkaAivenRecord(
Expand Down Expand Up @@ -43,3 +44,12 @@ enum class Hendelsestype {
OPPGAVE_FERDIGSTILT,
OPPGAVE_FEILREGISTRERT,
}

fun Hendelsestype.manuellOppgaveStatus(): ManuellOppgaveStatus {
return when (this) {
Hendelsestype.OPPGAVE_OPPRETTET -> ManuellOppgaveStatus.APEN
Hendelsestype.OPPGAVE_ENDRET -> ManuellOppgaveStatus.APEN
Hendelsestype.OPPGAVE_FERDIGSTILT -> ManuellOppgaveStatus.FERDIGSTILT
Hendelsestype.OPPGAVE_FEILREGISTRERT -> ManuellOppgaveStatus.FEILREGISTRERT
}
}
Loading

0 comments on commit 2a971f1

Please sign in to comment.