Skip to content

Commit

Permalink
Pushback i lesing av hendelser fra fpsak
Browse files Browse the repository at this point in the history
  • Loading branch information
palfi committed Aug 8, 2023
1 parent 2b32fb8 commit 9a508b0
Showing 1 changed file with 14 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class BehandlingHendelseConsumer implements LiveAndReadinessAware, Contro
private static final Environment ENV = Environment.current();

private static final String PROD_APP_ID = "fplos-behandling"; // Hold konstant pga offset commit !!
private static final int HANDLE_MESSAGE_INTERVAL_MILLIS = 20;

private String topicName;
private KafkaStreams stream;
Expand All @@ -42,11 +43,23 @@ public BehandlingHendelseConsumer(@KonfigVerdi(value = "kafka.behandlinghendelse
final Consumed<String, String> consumed = Consumed.with(Topology.AutoOffsetReset.EARLIEST);

final StreamsBuilder builder = new StreamsBuilder();
builder.stream(topicName, consumed).foreach(behandlingHendelseHåndterer::handleMessage);
builder.stream(topicName, consumed).foreach((key, payload) -> {
behandlingHendelseHåndterer.handleMessage(key, payload);
sleep();
});

this.stream = new KafkaStreams(builder.build(), KafkaProperties.forStreamsStringValue(getApplicationId()));
}

private static void sleep() {
try {
Thread.sleep(HANDLE_MESSAGE_INTERVAL_MILLIS);
} catch (InterruptedException e) {
LOG.warn("Interrupt!", e);
Thread.currentThread().interrupt();
}
}

@Override
public void start() {
addShutdownHooks();
Expand Down

0 comments on commit 9a508b0

Please sign in to comment.