Skip to content

Commit

Permalink
Updatestatus (#137)
Browse files Browse the repository at this point in the history
* start leader elector service

* update timetype

* skip 404 oppgaver

* update status to deleted if it is not found

* Use another leader elector method

* update error handling

* Update build.gradle.kts

* Log warn instead of error in LeaderElector
  • Loading branch information
andreasDev authored Jul 19, 2023
1 parent 738cfb0 commit ac53b1b
Show file tree
Hide file tree
Showing 21 changed files with 406 additions and 8 deletions.
1 change: 0 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ val kafkaVersion = "3.5.0"
val commonsCodecVersion = "1.16.0"
val logbacksyslog4jVersion = "1.0.0"


plugins {
kotlin("jvm") version "1.9.0"
id("com.github.johnrengelman.shadow") version "8.1.1"
Expand Down
1 change: 1 addition & 0 deletions naiserator-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ metadata:
labels:
team: teamsykmelding
spec:
leaderElection: true
gcp:
sqlInstances:
- name: syfosmmanuell-backend-instance
Expand Down
1 change: 1 addition & 0 deletions naiserator-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ metadata:
labels:
team: teamsykmelding
spec:
leaderElection: true
gcp:
sqlInstances:
- name: syfosmmanuell-backend-instance
Expand Down
14 changes: 12 additions & 2 deletions src/main/kotlin/no/nav/syfo/Bootstrap.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ import no.nav.syfo.clients.HttpClients
import no.nav.syfo.clients.KafkaConsumers
import no.nav.syfo.clients.KafkaProducers
import no.nav.syfo.db.Database
import no.nav.syfo.elector.LeaderElector
import no.nav.syfo.elector.LeadershipHandling
import no.nav.syfo.oppgave.kafka.OppgaveHendelseConsumer
import no.nav.syfo.oppgave.service.OppgaveService
import no.nav.syfo.oppgave.service.UpdateStatusService
import no.nav.syfo.persistering.MottattSykmeldingService
import no.nav.syfo.service.ManuellOppgaveService
import no.nav.syfo.util.TrackableException
Expand Down Expand Up @@ -106,8 +109,15 @@ fun main() {
createListener(applicationState) {
oppgaveHendelseConsumer.start()
}

ApplicationServer(applicationEngine, applicationState).start()
val leaderElectorService = LeadershipHandling(
updateService = UpdateStatusService(
database = database,
oppgaveClient = httpClients.oppgaveClient,
),
LeaderElector(httpClient = httpClients.httpClient, env.electorPath),
)
leaderElectorService.start()
ApplicationServer(applicationEngine, applicationState, leaderElectorService).start()
}

@DelicateCoroutinesApi
Expand Down
1 change: 1 addition & 0 deletions src/main/kotlin/no/nav/syfo/Environment.kt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ data class Environment(
val dbName: String = getEnvVar("DB_DATABASE"),
val cluster: String = getEnvVar("NAIS_CLUSTER_NAME"),
val oppgaveHendelseTopic: String = getEnvVar("OPPGAVE_HENDELSE_TOPIC"),
val electorPath: String = getEnvVar("ELECTOR_PATH"),
)

fun getEnvVar(varName: String, defaultValue: String? = null) =
Expand Down
3 changes: 3 additions & 0 deletions src/main/kotlin/no/nav/syfo/application/ApplicationServer.kt
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
package no.nav.syfo.application

import io.ktor.server.engine.ApplicationEngine
import no.nav.syfo.elector.LeadershipHandling
import java.util.concurrent.TimeUnit

class ApplicationServer(
private val applicationServer: ApplicationEngine,
private val applicationState: ApplicationState,
private val leaderElectorService: LeadershipHandling,
) {

init {
Runtime.getRuntime().addShutdownHook(
Thread {
this.applicationState.ready = false
leaderElectorService.stop()
this.applicationServer.stop(TimeUnit.SECONDS.toMillis(10), TimeUnit.SECONDS.toMillis(10))
},
)
Expand Down
2 changes: 1 addition & 1 deletion src/main/kotlin/no/nav/syfo/clients/HttpClients.kt
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class HttpClients(env: Environment) {
}
}

private val httpClient = HttpClient(Apache, config)
val httpClient = HttpClient(Apache, config)

private val azureAdV2Client = AzureAdV2Client(
azureAppClientId = env.azureAppClientId,
Expand Down
35 changes: 35 additions & 0 deletions src/main/kotlin/no/nav/syfo/elector/LeaderElector.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package no.nav.syfo.elector

import io.ktor.client.HttpClient
import io.ktor.client.call.body
import io.ktor.client.request.get
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import no.nav.syfo.log
import java.net.InetAddress

class LeaderElector(
private val httpClient: HttpClient,
private val electorPath: String,
) {
suspend fun isLeader(): Boolean {
val hostname: String = withContext(Dispatchers.IO) { InetAddress.getLocalHost() }.hostName

return try {
val leader = httpClient.get(getHttpPath(electorPath)).body<Leader>()
leader.name == hostname
} catch (e: Exception) {
val message = "Kall mot elector feiler"
log.warn(message)
false
}
}

private fun getHttpPath(url: String): String =
when (url.startsWith("http://")) {
true -> url
else -> "http://$url"
}

private data class Leader(val name: String)
}
78 changes: 78 additions & 0 deletions src/main/kotlin/no/nav/syfo/elector/LeadershipHandling.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package no.nav.syfo.elector

import io.ktor.utils.io.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import no.nav.syfo.service.UpdateService
import org.slf4j.LoggerFactory
import kotlin.time.Duration.Companion.seconds

class LeadershipHandling(
private val updateService: UpdateService,
private val leaderElector: LeaderElector,
private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO),
) {
private val logger = LoggerFactory.getLogger(LeadershipHandling::class.java)
private var isLeaderPreviously = false

fun start() {
logger.info("Starting LeaderElectorService")
scope.launch {
while (isActive) {
try {
handleLeadership()
delay(5.seconds)
} catch (ex: Exception) {
when (ex) {
is CancellationException -> {
logger.warn("Job was cancelled, message: ${ex.message}")
throw ex
}
else -> {
logger.error("Error occurred in leadership handling loop delaying for 10 seconds", ex)
delay(10.seconds)
}
}
}
}

if (isLeaderPreviously) {
logger.info("Not active, stopping")
updateService.stop()
}
}
}

private suspend fun handleLeadership() {
var isLeaderNow = leaderElector.isLeader()

if (isLeaderNow && !isLeaderPreviously) {
logger.info("Is leader, delay for 10 seconds")
delay(10.seconds)
isLeaderNow = leaderElector.isLeader()

if (isLeaderNow) {
logger.info("Is still leader, starting service")
updateService.start()
} else {
logger.info("Is not leader after delay")
}
}

if (!isLeaderNow && isLeaderPreviously) {
logger.info("Is not leader, stopping")
updateService.stop()
}

isLeaderPreviously = isLeaderNow
}

fun stop() {
logger.info("Shutting down LeadershipHandling")
scope.cancel()
}
}
1 change: 1 addition & 0 deletions src/main/kotlin/no/nav/syfo/model/ManuellOppgave.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ enum class ManuellOppgaveStatus {
APEN,
FERDIGSTILT,
FEILREGISTRERT,
DELETED,
}
2 changes: 2 additions & 0 deletions src/main/kotlin/no/nav/syfo/oppgave/Oppgave.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package no.nav.syfo.oppgave

import java.time.LocalDate
import java.time.ZonedDateTime

data class OpprettOppgave(
val tildeltEnhetsnr: String? = null,
Expand Down Expand Up @@ -34,6 +35,7 @@ data class OpprettOppgaveResponse(
val status: String? = null,
val tildeltEnhetsnr: String? = null,
val mappeId: Int? = null,
val endretTidspunkt: ZonedDateTime? = null,
)

enum class OppgaveStatus(val status: String) {
Expand Down
5 changes: 3 additions & 2 deletions src/main/kotlin/no/nav/syfo/oppgave/client/OppgaveClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,17 @@ class OppgaveClient(
}
}

suspend fun hentOppgave(oppgaveId: Int, msgId: String): OpprettOppgaveResponse {
suspend fun hentOppgave(oppgaveId: Int, msgId: String): OpprettOppgaveResponse? {
val response = httpClient.get("$url/$oppgaveId") {
contentType(ContentType.Application.Json)
val token = azureAdV2Client.getAccessToken(scope)
header("Authorization", "Bearer $token")
header("X-Correlation-ID", msgId)
}
if (response.status == HttpStatusCode.OK) {
log.info("Hentet oppgave med id $oppgaveId")
return response.body<OpprettOppgaveResponse>()
} else if (response.status == HttpStatusCode.NotFound) {
return null
} else {
log.error("Noe gikk galt ved henting av oppgave med id $oppgaveId: ${response.status}")
throw RuntimeException("Noe gikk galt ved henting av oppgave med id $oppgaveId: ${response.status}")
Expand Down
3 changes: 3 additions & 0 deletions src/main/kotlin/no/nav/syfo/oppgave/service/OppgaveService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ class OppgaveService(

suspend fun ferdigstillOppgave(manuellOppgave: ManuellOppgaveKomplett, loggingMeta: LoggingMeta, enhet: String?, veileder: String?) {
val oppgave = oppgaveClient.hentOppgave(manuellOppgave.oppgaveid, manuellOppgave.receivedSykmelding.msgId)
requireNotNull(oppgave) {
throw RuntimeException("Could not find oppgave for ${manuellOppgave.oppgaveid}")
}
val oppgaveVersjon = oppgave.versjon

if (oppgave.status != OppgaveStatus.FERDIGSTILT.name) {
Expand Down
92 changes: 92 additions & 0 deletions src/main/kotlin/no/nav/syfo/oppgave/service/UpdateStatusService.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package no.nav.syfo.oppgave.service

import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import no.nav.syfo.db.DatabaseInterface
import no.nav.syfo.model.ManuellOppgaveStatus
import no.nav.syfo.oppgave.client.OppgaveClient
import no.nav.syfo.persistering.db.getOppgaveWithNullStatus
import no.nav.syfo.persistering.db.oppdaterOppgaveHendelse
import no.nav.syfo.service.UpdateService
import org.slf4j.LoggerFactory
import java.time.LocalDateTime
import kotlin.time.Duration.Companion.seconds

class UpdateStatusService(
private val database: DatabaseInterface,
private val oppgaveClient: OppgaveClient,
) : UpdateService {

private var updateJob: Job? = null
private val limit = 10
private val logger = LoggerFactory.getLogger(UpdateStatusService::class.java)

companion object {
private val statusMap = mapOf(
"FERDIGSTILT" to ManuellOppgaveStatus.FERDIGSTILT,
"FEILREGISTRERT" to ManuellOppgaveStatus.FEILREGISTRERT,
null to ManuellOppgaveStatus.DELETED,
)
}

override suspend fun start() = coroutineScope {
if (updateJob?.isActive != true) {
updateJob = launch(Dispatchers.IO) {
while (isActive) {
try {
val oppgaveList = database.getOppgaveWithNullStatus(limit)

if (oppgaveList.isEmpty()) break

val jobs = oppgaveList.map { (oppgaveId, id) ->
launch(Dispatchers.IO) {
processOppgave(oppgaveId, id)
}
}
jobs.joinAll()
} catch (ex: Exception) {
when (ex) {
is CancellationException -> {
logger.warn("Job was cancelled, message: ${ex.message}")
throw ex
}
else -> {
logger.error("Caught unexpected delaying for 10s $ex")
delay(10.seconds)
}
}
}
}
}
}
}

private suspend fun processOppgave(oppgaveId: Int, id: String) {
try {
val oppgave = oppgaveClient.hentOppgave(oppgaveId, id)
if (oppgave == null) {
logger.warn("Could not find oppgave for oppgaveId $oppgaveId")
}

database.oppdaterOppgaveHendelse(
oppgaveId = oppgaveId,
status = statusMap[oppgave?.status] ?: ManuellOppgaveStatus.APEN,
statusTimestamp = oppgave?.endretTidspunkt?.toLocalDateTime() ?: LocalDateTime.now(),
)
} catch (ex: Exception) {
logger.error("Caught $ex for oppgaveId $oppgaveId")
}
}

override suspend fun stop() {
updateJob?.cancelAndJoin()
updateJob = null
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,28 @@ suspend fun DatabaseInterface.oppdaterOppgaveHendelse(oppgaveId: Int, status: Ma
}
}
}

suspend fun DatabaseInterface.getOppgaveWithNullStatus(limit: Int): List<Pair<Int, String>> =
withContext(Dispatchers.IO) {
connection.use { conn ->
conn.prepareStatement(
"""
SELECT oppgaveId, id
FROM MANUELLOPPGAVE
WHERE status IS NULL
LIMIT ?;
""",
).use { stmt ->
stmt.setInt(1, limit)
stmt.executeQuery().use { rs ->
generateSequence {
if (rs.next()) {
rs.getInt("oppgaveId") to rs.getString("id")
} else {
null
}
}.toList()
}
}
}
}
6 changes: 6 additions & 0 deletions src/main/kotlin/no/nav/syfo/service/UpdateService.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package no.nav.syfo.service

interface UpdateService {
suspend fun start()
suspend fun stop()
}
1 change: 1 addition & 0 deletions src/test/kotlin/no/nav/syfo/api/AuthenticateTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ class AuthenticateTest : FunSpec({
dbPort = "",
cluster = "dev-gcp",
oppgaveHendelseTopic = "oppgavehendlese",
electorPath = "Electorpath",
)
with(TestApplicationEngine()) {
start()
Expand Down
Loading

0 comments on commit ac53b1b

Please sign in to comment.