Skip to content

Commit

Permalink
use uniqe client id
Browse files Browse the repository at this point in the history
  • Loading branch information
andreasDev committed Jul 20, 2023
1 parent 0ec7216 commit e0118e7
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 13 deletions.
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ val ktorVersion = "2.3.2"
val logbackVersion = "1.4.8"
val logstashEncoderVersion = "7.4"
val prometheusVersion = "0.16.0"
val smCommonVersion = "1.0.10"
val smCommonVersion = "1.0.11"
val sykmeldingVersion = "1.0.4"
val fellesformatVersion = "1.0.4"
val kithHodemeldingVersion = "1.0.4"
Expand Down
4 changes: 2 additions & 2 deletions src/main/kotlin/no/nav/syfo/clients/KafkaConsumers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class KafkaOppgaveDeserializer : Deserializer<OppgaveKafkaAivenRecord> {
}
class KafkaConsumers(env: Environment) {
val kafkaAivenConsumerManuellOppgave = KafkaConsumer<String, String>(
KafkaUtils.getAivenKafkaConfig().also {
KafkaUtils.getAivenKafkaConfig("manuell-oppgave-consumer").also {
it.let {
it[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = "1"
it[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "none"
Expand All @@ -36,7 +36,7 @@ class KafkaConsumers(env: Environment) {
),
)
val oppgaveHendelseConsumer = KafkaConsumer(
KafkaUtils.getAivenKafkaConfig()
KafkaUtils.getAivenKafkaConfig("oppgavehendelse-consumer")
.also {
it[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
it[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = 50
Expand Down
17 changes: 7 additions & 10 deletions src/main/kotlin/no/nav/syfo/clients/KafkaProducers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,30 @@ import no.nav.syfo.model.ReceivedSykmelding
import no.nav.syfo.oppgave.model.OpprettOppgaveKafkaMessage
import no.nav.syfo.util.JacksonKafkaSerializer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import java.util.Properties

class KafkaProducers(private val env: Environment) {
private val producerPropertiesAiven = KafkaUtils.getAivenKafkaConfig()
.toProducerConfig(env.applicationName, valueSerializer = JacksonKafkaSerializer::class)

init {
producerPropertiesAiven[ProducerConfig.RETRIES_CONFIG] = 100
fun getKafkaProducerConfig(clientId: String): Properties {
return KafkaUtils.getAivenKafkaConfig(clientId)
.toProducerConfig(env.applicationName, valueSerializer = JacksonKafkaSerializer::class)
}

val kafkaApprecProducer = KafkaApprecProducer()
val kafkaRecievedSykmeldingProducer = KafkaRecievedSykmeldingProducer()
val kafkaProduceTaskProducer = KafkaProduceTaskProducer()

inner class KafkaApprecProducer {
val producer = KafkaProducer<String, Apprec>(producerPropertiesAiven)
val producer = KafkaProducer<String, Apprec>(getKafkaProducerConfig("apprec-producer"))
val apprecTopic = env.apprecTopic
}

inner class KafkaRecievedSykmeldingProducer {
val producer = KafkaProducer<String, ReceivedSykmelding>(producerPropertiesAiven)
val producer = KafkaProducer<String, ReceivedSykmelding>(getKafkaProducerConfig("sykmelding-producer"))
val okSykmeldingTopic = env.okSykmeldingTopic
}

inner class KafkaProduceTaskProducer {
// Sender til syfosmoppgave
val producer = KafkaProducer<String, OpprettOppgaveKafkaMessage>(producerPropertiesAiven)
val producer = KafkaProducer<String, OpprettOppgaveKafkaMessage>(getKafkaProducerConfig("oppgave-producer"))
val topic = env.produserOppgaveTopic
}
}

0 comments on commit e0118e7

Please sign in to comment.