Skip to content

Commit

Permalink
add kafka-client-id (#139)
Browse files Browse the repository at this point in the history
* add kafka-client-id

* use uniqe client id
  • Loading branch information
andreasDev authored Jul 20, 2023
1 parent f5333be commit 80dcb3b
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 15 deletions.
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ val ktorVersion = "2.3.2"
val logbackVersion = "1.4.8"
val logstashEncoderVersion = "7.4"
val prometheusVersion = "0.16.0"
val smCommonVersion = "1.0.10"
val smCommonVersion = "1.0.11"
val sykmeldingVersion = "1.0.4"
val fellesformatVersion = "1.0.4"
val kithHodemeldingVersion = "1.0.4"
Expand Down
6 changes: 5 additions & 1 deletion naiserator-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,8 @@ spec:
- name: OPPGAVEBEHANDLING_URL
value: https://oppgave-q1.dev-fss-pub.nais.io/api/v1/oppgaver
- name: OPPGAVE_HENDELSE_TOPIC
value: oppgavehandtering.oppgavehendelse-q1-v1
value: oppgavehandtering.oppgavehendelse-q1-v1
- name: KAFKA_CLIENT_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
6 changes: 5 additions & 1 deletion naiserator-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,8 @@ spec:
- name: OPPGAVEBEHANDLING_URL
value: https://oppgave.prod-fss-pub.nais.io/api/v1/oppgaver
- name: OPPGAVE_HENDELSE_TOPIC
value: oppgavehandtering.oppgavehendelse-v1
value: oppgavehandtering.oppgavehendelse-v1
- name: KAFKA_CLIENT_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
4 changes: 2 additions & 2 deletions src/main/kotlin/no/nav/syfo/clients/KafkaConsumers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class KafkaOppgaveDeserializer : Deserializer<OppgaveKafkaAivenRecord> {
}
class KafkaConsumers(env: Environment) {
val kafkaAivenConsumerManuellOppgave = KafkaConsumer<String, String>(
KafkaUtils.getAivenKafkaConfig().also {
KafkaUtils.getAivenKafkaConfig("manuell-oppgave-consumer").also {
it.let {
it[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = "1"
it[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "none"
Expand All @@ -36,7 +36,7 @@ class KafkaConsumers(env: Environment) {
),
)
val oppgaveHendelseConsumer = KafkaConsumer(
KafkaUtils.getAivenKafkaConfig()
KafkaUtils.getAivenKafkaConfig("oppgavehendelse-consumer")
.also {
it[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
it[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = 50
Expand Down
17 changes: 7 additions & 10 deletions src/main/kotlin/no/nav/syfo/clients/KafkaProducers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,30 @@ import no.nav.syfo.model.ReceivedSykmelding
import no.nav.syfo.oppgave.model.OpprettOppgaveKafkaMessage
import no.nav.syfo.util.JacksonKafkaSerializer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import java.util.Properties

class KafkaProducers(private val env: Environment) {
private val producerPropertiesAiven = KafkaUtils.getAivenKafkaConfig()
.toProducerConfig(env.applicationName, valueSerializer = JacksonKafkaSerializer::class)

init {
producerPropertiesAiven[ProducerConfig.RETRIES_CONFIG] = 100
fun getKafkaProducerConfig(clientId: String): Properties {
return KafkaUtils.getAivenKafkaConfig(clientId)
.toProducerConfig(env.applicationName, valueSerializer = JacksonKafkaSerializer::class)
}

val kafkaApprecProducer = KafkaApprecProducer()
val kafkaRecievedSykmeldingProducer = KafkaRecievedSykmeldingProducer()
val kafkaProduceTaskProducer = KafkaProduceTaskProducer()

inner class KafkaApprecProducer {
val producer = KafkaProducer<String, Apprec>(producerPropertiesAiven)
val producer = KafkaProducer<String, Apprec>(getKafkaProducerConfig("apprec-producer"))
val apprecTopic = env.apprecTopic
}

inner class KafkaRecievedSykmeldingProducer {
val producer = KafkaProducer<String, ReceivedSykmelding>(producerPropertiesAiven)
val producer = KafkaProducer<String, ReceivedSykmelding>(getKafkaProducerConfig("sykmelding-producer"))
val okSykmeldingTopic = env.okSykmeldingTopic
}

inner class KafkaProduceTaskProducer {
// Sender til syfosmoppgave
val producer = KafkaProducer<String, OpprettOppgaveKafkaMessage>(producerPropertiesAiven)
val producer = KafkaProducer<String, OpprettOppgaveKafkaMessage>(getKafkaProducerConfig("oppgave-producer"))
val topic = env.produserOppgaveTopic
}
}

0 comments on commit 80dcb3b

Please sign in to comment.