diff --git a/domene/src/main/java/no/nav/foreldrepenger/los/hendelse/behandlinghendelse/BehandlingHendelseConsumer.java b/domene/src/main/java/no/nav/foreldrepenger/los/hendelse/behandlinghendelse/BehandlingHendelseConsumer.java index ef1a8de12..936ab931b 100644 --- a/domene/src/main/java/no/nav/foreldrepenger/los/hendelse/behandlinghendelse/BehandlingHendelseConsumer.java +++ b/domene/src/main/java/no/nav/foreldrepenger/los/hendelse/behandlinghendelse/BehandlingHendelseConsumer.java @@ -27,6 +27,7 @@ public class BehandlingHendelseConsumer implements LiveAndReadinessAware, Contro private static final Environment ENV = Environment.current(); private static final String PROD_APP_ID = "fplos-behandling"; // Hold konstant pga offset commit !! + private static final int HANDLE_MESSAGE_INTERVAL_MILLIS = 20; private String topicName; private KafkaStreams stream; @@ -42,11 +43,23 @@ public BehandlingHendelseConsumer(@KonfigVerdi(value = "kafka.behandlinghendelse final Consumed consumed = Consumed.with(Topology.AutoOffsetReset.EARLIEST); final StreamsBuilder builder = new StreamsBuilder(); - builder.stream(topicName, consumed).foreach(behandlingHendelseHåndterer::handleMessage); + builder.stream(topicName, consumed).foreach((key, payload) -> { + behandlingHendelseHåndterer.handleMessage(key, payload); + sleep(); + }); this.stream = new KafkaStreams(builder.build(), KafkaProperties.forStreamsStringValue(getApplicationId())); } + private static void sleep() { + try { + Thread.sleep(HANDLE_MESSAGE_INTERVAL_MILLIS); + } catch (InterruptedException e) { + LOG.warn("Interrupt!", e); + Thread.currentThread().interrupt(); + } + } + @Override public void start() { addShutdownHooks();