Skip to content

Commit

Permalink
Merge pull request #27 from KonstantinCodes/feature/refactoring
Browse files Browse the repository at this point in the history
Refactoring
  • Loading branch information
KonstantinCodes committed Nov 30, 2020
2 parents 911a7b1 + 6aa52f3 commit 09ecc6d
Show file tree
Hide file tree
Showing 17 changed files with 519 additions and 170 deletions.
37 changes: 33 additions & 4 deletions .github/workflows/php.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,45 @@ jobs:

runs-on: ubuntu-latest

services:
kafka:
image: lensesio/fast-data-dev
env:
SAMPLEDATA: 0
RUNTESTS: 0
FORWARDLOGS: 0
ports:
- 9092:9092
- 8081:8081
- 8082:8082
- 8083:8083
- 2181:2181

steps:
- uses: actions/checkout@v2

- name: Validate composer.json and composer.lock
run: composer validate

- name: Install rdkafka
run: sudo apt-get update && sudo apt-get install librdkafka1 librdkafka-dev php-pear php-common && sudo pecl install rdkafka
run:
wget -O confluent_archive.key https://packages.confluent.io/deb/6.0/archive.key &&
sudo apt-key add confluent_archive.key &&
sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/6.0 stable main" &&
sudo apt update && sudo apt install netcat librdkafka1 librdkafka-dev kafkacat

- name: Setup PHP
uses: shivammathur/setup-php@v2
with:
php-version: '7.4'
coverage: xdebug
tools: pecl, phpunit
extensions: rdkafka
ini-values: post_max_size=256M, short_open_tag=On

- name: check if rdkafka is there
run: php -v && php --ri rdkafka

- name: Cache Composer packages
id: composer-cache
uses: actions/cache@v2
Expand All @@ -39,8 +61,15 @@ jobs:
- name: Install dependencies
if: steps.composer-cache.outputs.cache-hit != 'true'
run: composer install --prefer-dist --no-progress --no-suggest
run: composer update --prefer-dist --no-progress

- name: Wait for Kafka
run: .github/workflows/wait-for-kafka.sh


#- name: try to produce
# run: echo "test" > testfile.json && kafkacat -b 0.0.0.0:9092 -t kafkacat_test -P testfile.json

# Docs: https://getcomposer.org/doc/articles/scripts.md
- name: Run test suite
run: ./vendor/bin/simple-phpunit --log-junit results/tests/junit.xml --coverage-html results/coverage-report --coverage-clover clover.xml
#- name: Run test suite
# run: phpunit --log-junit results/tests/junit.xml --coverage-html results/coverage-report --coverage-clover clover.xml
11 changes: 11 additions & 0 deletions .github/workflows/wait-for-kafka.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#! /bin/bash
end=$((SECONDS+60))

while [ $SECONDS -lt $end ]; do
if echo dump | nc localhost 2181 | grep broker ; then
exit 0
else
echo "Kafka didn't start in time"
fi
sleep 1
done
8 changes: 5 additions & 3 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@
"keywords": ["kafka", "symfony", "messenger", "transport", "queue", "bundle"],
"license": "MIT",
"require": {
"php": "^7.1.3",
"php": "^7.1.3|8.*",
"ext-json": "*",
"symfony/config": "^3.0||^4.0||^5.0",
"symfony/dependency-injection": "^3.4.26||^4.1.12|^5.0",
"symfony/http-kernel": "^3.0||^4.0||^5.0",
"symfony/messenger": "^4.4||^5.0",
"psr/http-client": "^1.0",
"psr/http-factory": "^1.0",
"psr/http-message": "^1.0"
"psr/http-message": "^1.0",
"psr/log": "^1.1"
},
"require-dev": {
"friendsofphp/php-cs-fixer": "^2.16",
Expand All @@ -22,7 +23,8 @@
"symfony/framework-bundle": "^5.0",
"symfony/serializer": "^5.0",
"symfony/property-access": "^5.0",
"phpstan/phpstan": "^0.12.52"
"phpstan/phpstan": "^0.12.52",
"nyholm/psr7": "^1.3"
},
"suggest": {
"ext-rdkafka": "^4.0; Needed to support Kafka connectivity",
Expand Down
4 changes: 2 additions & 2 deletions phpunit.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@

<testsuites>
<testsuite name="Symfony Messenger Kafka Transport Test Suite">
<directory>./Tests/</directory>
<directory>./tests/</directory>
</testsuite>
</testsuites>

<filter>
<whitelist>
<directory>./</directory>
<exclude>
<directory>./Tests</directory>
<directory>./tests</directory>
<directory>./vendor</directory>
</exclude>
</whitelist>
Expand Down
136 changes: 136 additions & 0 deletions src/Messenger/KafkaReceiver.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
<?php

declare(strict_types=1);

namespace Koco\Kafka\Messenger;

use Koco\Kafka\RdKafka\RdKafkaFactory;
use Psr\Log\LoggerInterface;
use RdKafka\KafkaConsumer;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;

class KafkaReceiver implements ReceiverInterface
{
/** @var LoggerInterface */
private $logger;

/** @var SerializerInterface */
private $serializer;

/** @var RdKafkaFactory */
private $rdKafkaFactory;

/** @var KafkaReceiverProperties */
private $properties;

/** @var KafkaConsumer */
private $consumer;

/** @var bool */
private $subscribed;

public function __construct(
LoggerInterface $logger,
SerializerInterface $serializer,
RdKafkaFactory $rdKafkaFactory,
KafkaReceiverProperties $properties
) {
$this->logger = $logger;
$this->serializer = $serializer;
$this->rdKafkaFactory = $rdKafkaFactory;
$this->properties = $properties;

$this->subscribed = false;
}

public function get(): iterable
{
$message = $this->getSubscribedConsumer()->consume($this->properties->getReceiveTimeoutMs());

switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
$this->logger->info(sprintf(
'Kafka: Message %s %s %s received ',
$message->topic_name,
$message->partition,
$message->offset
));

$envelope = $this->serializer->decode([
'body' => $message->payload,
'headers' => $message->headers,
]);

return [$envelope->with(new KafkaMessageStamp($message))];
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
$this->logger->info('Kafka: Partition EOF reached. Waiting for next message ...');
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
$this->logger->debug('Kafka: Consumer timeout.');
break;
case RD_KAFKA_RESP_ERR__TRANSPORT:
$this->logger->debug('Kafka: Broker transport failure.');
break;
default:
throw new TransportException($message->errstr(), $message->err);
}

return [];
}

public function ack(Envelope $envelope): void
{
$consumer = $this->getConsumer();

/** @var KafkaMessageStamp $transportStamp */
$transportStamp = $envelope->last(KafkaMessageStamp::class);
$message = $transportStamp->getMessage();

if ($this->properties->isCommitAsync()) {
$consumer->commitAsync($message);

$this->logger->info(sprintf(
'Offset topic=%s partition=%s offset=%s to be committed asynchronously.',
$message->topic_name,
$message->partition,
$message->offset
));
} else {
$consumer->commit($message);

$this->logger->info(sprintf(
'Offset topic=%s partition=%s offset=%s successfully committed.',
$message->topic_name,
$message->partition,
$message->offset
));
}
}

public function reject(Envelope $envelope): void
{
// Do nothing. auto commit should be set to false!
}

private function getSubscribedConsumer(): KafkaConsumer
{
$consumer = $this->getConsumer();

if (false === $this->subscribed) {
$this->logger->info('Partition assignment...');
$consumer->subscribe([$this->properties->getTopicName()]);

$this->subscribed = true;
}

return $consumer;
}

private function getConsumer(): KafkaConsumer
{
return $this->consumer ?? $this->consumer = $this->rdKafkaFactory->createConsumer($this->properties->getKafkaConf());
}
}
54 changes: 54 additions & 0 deletions src/Messenger/KafkaReceiverProperties.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
<?php

declare(strict_types=1);

namespace Koco\Kafka\Messenger;

use RdKafka\Conf as KafkaConf;

final class KafkaReceiverProperties
{
/** @var KafkaConf */
private $kafkaConf;

/** @var string */
private $topicName;

/** @var int */
private $receiveTimeoutMs;

/** @var bool */
private $commitAsync;

public function __construct(
KafkaConf $kafkaConf,
string $topicName,
int $receiveTimeoutMs,
bool $commitAsync
) {
$this->kafkaConf = $kafkaConf;
$this->topicName = $topicName;
$this->receiveTimeoutMs = $receiveTimeoutMs;
$this->commitAsync = $commitAsync;
}

public function getKafkaConf(): KafkaConf
{
return $this->kafkaConf;
}

public function getTopicName(): string
{
return $this->topicName;
}

public function getReceiveTimeoutMs(): int
{
return $this->receiveTimeoutMs;
}

public function isCommitAsync(): bool
{
return $this->commitAsync;
}
}
79 changes: 79 additions & 0 deletions src/Messenger/KafkaSender.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
<?php

declare(strict_types=1);

namespace Koco\Kafka\Messenger;

use Koco\Kafka\RdKafka\RdKafkaFactory;
use Psr\Log\LoggerInterface;
use RdKafka\Producer as KafkaProducer;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;

class KafkaSender implements SenderInterface
{
/** @var LoggerInterface */
private $logger;

/** @var SerializerInterface */
private $serializer;

/** @var RdKafkaFactory */
private $rdKafkaFactory;

/** @var KafkaSenderProperties */
private $properties;

/** @var KafkaProducer */
private $producer;

public function __construct(
LoggerInterface $logger,
SerializerInterface $serializer,
RdKafkaFactory $rdKafkaFactory,
KafkaSenderProperties $properties
) {
$this->logger = $logger;
$this->serializer = $serializer;
$this->rdKafkaFactory = $rdKafkaFactory;
$this->properties = $properties;
}

public function send(Envelope $envelope): Envelope
{
$producer = $this->getProducer();
$topic = $producer->newTopic($this->properties->getTopicName());

$payload = $this->serializer->encode($envelope);

$topic->producev(
RD_KAFKA_PARTITION_UA,
0,
$payload['body'],
$payload['key'] ?? null,
$payload['headers'] ?? null,
$payload['timestamp_ms'] ?? null
);

for ($flushRetries = 0; $flushRetries < $this->properties->getFlushRetries() + 1; ++$flushRetries) {
$code = $producer->flush($this->properties->getFlushTimeoutMs());
if ($code === RD_KAFKA_RESP_ERR_NO_ERROR) {
$this->logger->info(sprintf('Kafka message sent%s', \array_key_exists('key', $payload) ? ' with key ' . $payload['key'] : ''));
break;
}
}

if ($code !== RD_KAFKA_RESP_ERR_NO_ERROR) {
throw new TransportException('Kafka producer response error: ' . $code, $code);
}

return $envelope;
}

private function getProducer(): KafkaProducer
{
return $this->producer ?? $this->producer = $this->rdKafkaFactory->createProducer($this->properties->getKafkaConf());
}
}
Loading

0 comments on commit 09ecc6d

Please sign in to comment.