diff --git a/src/Messenger/KafkaTransportFactory.php b/src/Messenger/KafkaTransportFactory.php index 420118b..b9a4217 100644 --- a/src/Messenger/KafkaTransportFactory.php +++ b/src/Messenger/KafkaTransportFactory.php @@ -56,16 +56,19 @@ public function supports(string $dsn, array $options): bool public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface { - $conf = new KafkaConf(); + $senderConf = new KafkaConf(); + $receiverConf = new KafkaConf(); // Set a rebalance callback to log partition assignments (optional) - $conf->setRebalanceCb($this->createRebalanceCb($this->logger)); + $receiverConf->setRebalanceCb($this->createRebalanceCb($this->logger)); $brokers = $this->stripProtocol($dsn); - $conf->set('metadata.broker.list', implode(',', $brokers)); + $senderConf->set('metadata.broker.list', implode(',', $brokers)); + $receiverConf->set('metadata.broker.list', implode(',', $brokers)); foreach (array_merge($options['topic_conf'] ?? [], $options['kafka_conf'] ?? []) as $option => $value) { - $conf->set($option, $value); + $senderConf->set($option, $value); + $receiverConf->set($option, $value); } return new KafkaTransport( @@ -73,13 +76,13 @@ public function createTransport(string $dsn, array $options, SerializerInterface $serializer, $this->kafkaFactory, new KafkaSenderProperties( - $conf, + $senderConf, $options['topic']['name'], $options['flushTimeout'] ?? 10000, $options['flushRetries'] ?? 0 ), new KafkaReceiverProperties( - $conf, + $receiverConf, $options['topic']['name'], $options['receiveTimeout'] ?? 10000, $options['commitAsync'] ?? false