Skip to content
This repository has been archived by the owner on Jul 17, 2024. It is now read-only.

Commit

Permalink
switch to high level kafka consumer, manually commit offsets & fix tr…
Browse files Browse the repository at this point in the history
…avis builds (#1)
  • Loading branch information
matthewgoslett authored Sep 7, 2016
1 parent 38c7e6c commit f91d8e9
Show file tree
Hide file tree
Showing 9 changed files with 197 additions and 295 deletions.
20 changes: 18 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,27 @@
language: php

sudo: required

php:
- 5.6
- 7.0
- 7.1
- hhvm
- nightly

before_script:
- composer install
install:
- ./travis-install.sh
- travis_retry composer install --no-interaction

matrix:
fast_finish: true
allow_failures:
- php: 7.1
- php: hhvm
- php: nightly

notifications:
slack:
secure: LCF/2QlcsU0V5HmR5gJx1/SAmoZQ39zxG3jrRVOFE6itPk4au8Aal6b1l6HSlhLYhzyv84pmobMhy/Cjm6lePZyE9QalMcsRzLBQ1oZzF7fVB+ypO3W7dG6V0CnGGtkGO4MsSTwMnQ9X/GEWxNBehcmo0kRnvQPGWtEFSHbAqV+86yq/lpfBW3sXuv3TV6mtCwzfaTkermlMC63i6p3rXwwLgf19kAJ//Gp7d8/eVnrq+CbyGOD6+pAHbCFWxEHr2o6P1SMS8mnPRsgBQ+qCNICWRrmb+8gOUUS5JgnPJSAWLI/n0Q+n8CkJTjIfRK+N352n+CitCRcq+76alT05ogW8CY4mJDp2Qn0nEs5h+6NGGIcwveF3kcXmBpVDz2N2J5zpjzk4mybXX8gilxJ1WnPVGRD0cpJSazmpaNV6Y7lRMM1LqvIiY1LNzFQdp1CJjb0n6MQPaSnUF1w/e2k/UfLh1ZGTDy1US0p7RtY1RibbEFVUbtneEENRTYWoVQt+coBF+DFdsDYX/HoEuTkWQeKGmHBrtSX3UEz7v50QkQkr4jvk151JFi7fqJZLazOTbP59g0WSEggTXfauVw/S14S1Ir2+MA/glAYZXzFneKyM9fKiVbxMFrpJNwyurM5ODE72iDKJ5ejkYZLe9EpTRYbW/1p5/1yOK+s49cqGP2w=
on_success: change
on_failure: always
11 changes: 10 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,17 @@ RUN apt-get update \
&& DEBIAN_FRONTEND=noninteractive apt-get install -y \
git \
zlib1g-dev \
librdkafka-dev \
unzip \
python \
&& ( \
cd /tmp \
&& mkdir librdkafka \
&& cd librdkafka \
&& git clone https://github.com/edenhill/librdkafka.git . \
&& ./configure \
&& make \
&& make install \
) \
&& rm -r /var/lib/apt/lists/*

# PHP Extensions
Expand Down
26 changes: 13 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,24 +44,24 @@ A Kafka adapter for the [php-pubsub](https://github.com/Superbalist/php-pubsub)
## Usage

```php
// use this topic config for both the producer and consumer
$topicConfig = new \RdKafka\TopicConf();
$topicConfig->set('auto.offset.reset', 'smallest');
$topicConfig->set('auto.commit.interval.ms', 300);
// create consumer
$topicConf = new \RdKafka\TopicConf();
$topicConf->set('auto.offset.reset', 'smallest');
$conf = new \RdKafka\Conf();
$conf->set('group.id', 'php-pubsub');
$conf->set('metadata.broker.list', '127.0.0.1');
$conf->set('enable.auto.commit', 'false');
$conf->set('offset.store.method', 'broker');
$conf->setDefaultTopicConf($topicConf);
$consumer = new \RdKafka\KafkaConsumer($conf);
// create producer
$producer = new \RdKafka\Producer();
$producer->addBrokers('127.0.0.1');
// create consumer
// see https://arnaud-lb.github.io/php-rdkafka/phpdoc/rdkafka.examples-high-level-consumer.html
$config = new \RdKafka\Conf();
$config->set('group.id', 'php-pubsub');
$consumer = new \RdKafka\Consumer($config);
$consumer->addBrokers('127.0.0.1');
$adapter = new \Superbalist\PubSub\Kafka\KafkaPubSubAdapter($producer, $consumer, $topicConfig);
$adapter = new \Superbalist\PubSub\Kafka\KafkaPubSubAdapter($producer, $consumer);
// consume messages
// note: this is a blocking call
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ services:
- ./src:/opt/php-pubsub/src
- ./examples:/opt/php-pubsub/examples
kafka:
image: spotify/kafka
image: flozano/kafka
environment:
- ADVERTISED_HOST=HOSTIP
- ADVERTISED_PORT=9092
Expand Down
26 changes: 13 additions & 13 deletions examples/KafkaConsumerExample.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,24 @@

include __DIR__ . '/../vendor/autoload.php';

// use this topic config for both the producer and consumer
$topicConfig = new \RdKafka\TopicConf();
$topicConfig->set('auto.offset.reset', 'smallest');
$topicConfig->set('auto.commit.interval.ms', 300);
// create consumer
$topicConf = new \RdKafka\TopicConf();
$topicConf->set('auto.offset.reset', 'smallest');

$conf = new \RdKafka\Conf();
$conf->set('group.id', 'php-pubsub');
$conf->set('metadata.broker.list', 'kafka');
$conf->set('enable.auto.commit', 'false');
$conf->set('offset.store.method', 'broker');
$conf->setDefaultTopicConf($topicConf);

$consumer = new \RdKafka\KafkaConsumer($conf);

// create producer
$producer = new \RdKafka\Producer();
$producer->addBrokers('kafka');

// create consumer
// see https://arnaud-lb.github.io/php-rdkafka/phpdoc/rdkafka.examples-high-level-consumer.html
$config = new \RdKafka\Conf();
$config->set('group.id', 'php-pubsub');

$consumer = new \RdKafka\Consumer($config);
$consumer->addBrokers('kafka');

$adapter = new \Superbalist\PubSub\Kafka\KafkaPubSubAdapter($producer, $consumer, $topicConfig);
$adapter = new \Superbalist\PubSub\Kafka\KafkaPubSubAdapter($producer, $consumer);

$adapter->subscribe('my_channel', function ($message) {
var_dump($message);
Expand Down
26 changes: 13 additions & 13 deletions examples/KafkaPublishExample.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,24 @@

include __DIR__ . '/../vendor/autoload.php';

// use this topic config for both the producer and consumer
$topicConfig = new \RdKafka\TopicConf();
$topicConfig->set('auto.offset.reset', 'smallest');
$topicConfig->set('auto.commit.interval.ms', 300);
// create consumer
$topicConf = new \RdKafka\TopicConf();
$topicConf->set('auto.offset.reset', 'smallest');

$conf = new \RdKafka\Conf();
$conf->set('group.id', 'php-pubsub');
$conf->set('metadata.broker.list', 'kafka');
$conf->set('enable.auto.commit', 'false');
$conf->set('offset.store.method', 'broker');
$conf->setDefaultTopicConf($topicConf);

$consumer = new \RdKafka\KafkaConsumer($conf);

// create producer
$producer = new \RdKafka\Producer();
$producer->addBrokers('kafka');

// create consumer
// see https://arnaud-lb.github.io/php-rdkafka/phpdoc/rdkafka.examples-high-level-consumer.html
$config = new \RdKafka\Conf();
$config->set('group.id', 'php-pubsub');

$consumer = new \RdKafka\Consumer($config);
$consumer->addBrokers('kafka');

$adapter = new \Superbalist\PubSub\Kafka\KafkaPubSubAdapter($producer, $consumer, $topicConfig);
$adapter = new \Superbalist\PubSub\Kafka\KafkaPubSubAdapter($producer, $consumer);

$adapter->publish('my_channel', 'HELLO WORLD');
$adapter->publish('my_channel', json_encode(['hello' => 'world']));
Expand Down
75 changes: 12 additions & 63 deletions src/KafkaPubSubAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,37 +13,18 @@ class KafkaPubSubAdapter implements PubSubAdapterInterface
protected $producer;

/**
* @var \RdKafka\Consumer
* @var \RdKafka\KafkaConsumer
*/
protected $consumer;

/**
* @var \RdKafka\TopicConf
*/
protected $topicConfig;

/**
* @var mixed
*/
protected $consumerOffset;

/**
* @param \RdKafka\Producer $producer
* @param \RdKafka\Consumer $consumer
* @param \RdKafka\TopicConf $topicConfig
* @param mixed $consumerOffset The offset at which to start consumption
* (RD_KAFKA_OFFSET_BEGINNING, RD_KAFKA_OFFSET_END, RD_KAFKA_OFFSET_STORED)
* @param \RdKafka\KafkaConsumer $consumer
*/
public function __construct(
\RdKafka\Producer $producer,
\RdKafka\Consumer $consumer,
\RdKafka\TopicConf $topicConfig,
$consumerOffset = RD_KAFKA_OFFSET_END
) {
public function __construct(\RdKafka\Producer $producer, \RdKafka\KafkaConsumer $consumer)
{
$this->producer = $producer;
$this->consumer = $consumer;
$this->topicConfig = $topicConfig;
$this->consumerOffset = $consumerOffset;
}

/**
Expand All @@ -59,45 +40,13 @@ public function getProducer()
/**
* Return the Kafka consumer.
*
* @return \RdKafka\Consumer
* @return \RdKafka\KafkaConsumer
*/
public function getConsumer()
{
return $this->consumer;
}

/**
* Return the Kafka TopicConfig.
*
* @return \RdKafka\TopicConf
*/
public function getTopicConfig()
{
return $this->topicConfig;
}

/**
* Return the Kafka consumer offset at which `subscribe()` calls begin consumption.
*
* @return mixed
*/
public function getConsumerOffset()
{
return $this->consumerOffset;
}

/**
* Set the Kafka consumer offset at which `subscribe()` calls begin consumption.
*
* This can be one of `RD_KAFKA_OFFSET_BEGINNING`, `RD_KAFKA_OFFSET_END` or `RD_KAFKA_OFFSET_STORED`
*
* @param mixed $consumerOffset
*/
public function setConsumerOffset($consumerOffset)
{
$this->consumerOffset = $consumerOffset;
}

/**
* Subscribe a handler to a channel.
*
Expand All @@ -107,14 +56,12 @@ public function setConsumerOffset($consumerOffset)
*/
public function subscribe($channel, callable $handler)
{
$topic = $this->consumer->newTopic($channel, $this->topicConfig);

$topic->consumeStart(0, $this->consumerOffset);
$this->consumer->subscribe([$channel]);

$isSubscriptionLoopActive = true;

while ($isSubscriptionLoopActive) {
$message = $topic->consume(0, 1000);
$message = $this->consumer->consume(300);

if ($message === null) {
continue;
Expand All @@ -126,10 +73,12 @@ public function subscribe($channel, callable $handler)

if ($payload === 'unsubscribe') {
$isSubscriptionLoopActive = false;
break;
} else {
call_user_func($handler, $payload);
}

call_user_func($handler, $payload);
$this->consumer->commitAsync($message);

break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
case RD_KAFKA_RESP_ERR__TIMED_OUT:
Expand All @@ -148,7 +97,7 @@ public function subscribe($channel, callable $handler)
*/
public function publish($channel, $message)
{
$topic = $this->producer->newTopic($channel, $this->topicConfig);
$topic = $this->producer->newTopic($channel);
$topic->produce(RD_KAFKA_PARTITION_UA, 0, Utils::serializeMessage($message));
}
}
Loading

0 comments on commit f91d8e9

Please sign in to comment.