Skip to content

Commit

Permalink
fix metadata NPE for unknown leader
Browse files Browse the repository at this point in the history
  • Loading branch information
moscicky committed Sep 19, 2024
1 parent fa11d18 commit a8b006e
Showing 1 changed file with 3 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
Expand Down Expand Up @@ -117,7 +118,8 @@ private Supplier<ProduceMetadata> produceMetadataSupplier(RecordMetadata recordM
.filter(p -> p.partition() == recordMetadata.partition())
.findFirst();

return partitionInfo.map(partition -> partition.leader().host())
return partitionInfo.flatMap(partition -> Optional.ofNullable(partition.leader()))
.map(Node::host)
.map(ProduceMetadata::new)
.orElse(ProduceMetadata.empty());
} catch (InterruptException e) {
Expand Down

0 comments on commit a8b006e

Please sign in to comment.