diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaMessageSender.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaMessageSender.java index fd62a9142c..457b372a34 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaMessageSender.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaMessageSender.java @@ -6,6 +6,7 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.InterruptException; @@ -117,7 +118,8 @@ private Supplier produceMetadataSupplier(RecordMetadata recordM .filter(p -> p.partition() == recordMetadata.partition()) .findFirst(); - return partitionInfo.map(partition -> partition.leader().host()) + return partitionInfo.flatMap(partition -> Optional.ofNullable(partition.leader())) + .map(Node::host) .map(ProduceMetadata::new) .orElse(ProduceMetadata.empty()); } catch (InterruptException e) {