Skip to content

Commit

Permalink
Merge pull request #637 from Torondor27/cluster_usage_possibility
Browse files Browse the repository at this point in the history
Cluster usage possibility fixes #637
  • Loading branch information
mihaileu authored May 18, 2021
2 parents ece95b9 + 8dc690e commit 9e5e0ad
Show file tree
Hide file tree
Showing 11 changed files with 596 additions and 113 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
- 2021-05-15
* Add possibility to use multiple RabbitMQ hosts

- 2017-01-22
* Add `graceful_max_execution_timeout`

Expand Down
16 changes: 16 additions & 0 deletions DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,22 @@ protected function addConnections(ArrayNodeDefinition $node)
->scalarNode('user')->defaultValue('guest')->end()
->scalarNode('password')->defaultValue('guest')->end()
->scalarNode('vhost')->defaultValue('/')->end()
->arrayNode('hosts')
->info('connection_timeout, read_write_timeout, use_socket, ssl_context, keepalive,
heartbeat and connection_parameters_provider should be specified globally when
you are using multiple hosts')
->canBeUnset()
->prototype('array')
->children()
->scalarNode('url')->defaultValue('')->end()
->scalarNode('host')->defaultValue('localhost')->end()
->scalarNode('port')->defaultValue(5672)->end()
->scalarNode('user')->defaultValue('guest')->end()
->scalarNode('password')->defaultValue('guest')->end()
->scalarNode('vhost')->defaultValue('/')->end()
->end()
->end()
->end()
->booleanNode('lazy')->defaultFalse()->end()
->scalarNode('connection_timeout')->defaultValue(3)->end()
->scalarNode('read_write_timeout')->defaultValue(3)->end()
Expand Down
31 changes: 31 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,37 @@ It's a good idea to set the ```read_write_timeout``` to 2x the heartbeat so your
Please bear in mind, that you can expect problems, if your tasks are generally running longer than the heartbeat period, to which there are no good solutions ([link](https://github.com/php-amqplib/RabbitMqBundle/issues/301)).
Consider using either a big value for the heartbeat or leave the heartbeat disabled in favour of the tcp's `keepalive` (both on the client and server side) and the `graceful_max_execution_timeout` feature.

### Multiple Hosts ###

You can provide multiple hosts for a connection. This will allow you to use RabbitMQ cluster with multiple nodes.

```yaml
old_sound_rabbit_mq:
connections:
default:
hosts:
- host: host1
port: 3672
user: user1
password: password1
vhost: vhost1
- url: 'amqp://guest:password@localhost:5672/vhost'
connection_timeout: 3
read_write_timeout: 3
```

Pay attention that you can not specify
```yaml
connection_timeout
read_write_timeout
use_socket
ssl_context
keepalive
heartbeat
connection_parameters_provider
```
parameters to each host separately.

### Dynamic Connection Parameters ###

Sometimes your connection information may need to be dynamic. Dynamic connection parameters allow you to supply or
Expand Down
61 changes: 24 additions & 37 deletions RabbitMq/AMQPConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use OldSound\RabbitMqBundle\Provider\ConnectionParametersProviderInterface;
use PhpAmqpLib\Connection\AbstractConnection;
use PhpAmqpLib\Connection\AMQPSocketConnection;
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;

class AMQPConnectionFactory
Expand All @@ -24,6 +25,7 @@ class AMQPConnectionFactory
'ssl_context' => null,
'keepalive' => false,
'heartbeat' => 0,
'hosts' => []
);

/**
Expand All @@ -43,6 +45,15 @@ public function __construct(
$this->class = $class;
$this->parameters = array_merge($this->parameters, $parameters);
$this->parameters = $this->parseUrl($this->parameters);

foreach ($this->parameters['hosts'] as $key => $hostParameters) {
if (!isset($hostParameters['url'])) {
continue;
}

$this->parameters['hosts'][$key] = $this->parseUrl($hostParameters);
}

if (is_array($this->parameters['ssl_context'])) {
$this->parameters['ssl_context'] = ! empty($this->parameters['ssl_context'])
? stream_context_create(array('ssl' => $this->parameters['ssl_context']))
Expand All @@ -57,50 +68,26 @@ public function __construct(
* Creates the appropriate connection using current parameters.
*
* @return AbstractConnection
* @throws \Exception
*/
public function createConnection()
{
$ref = new \ReflectionClass($this->class);

if (isset($this->parameters['constructor_args']) && is_array($this->parameters['constructor_args'])) {
return $ref->newInstanceArgs($this->parameters['constructor_args']);
$constructorArgs = array_values($this->parameters['constructor_args']);
return new $this->class(...$constructorArgs);
}

if ($this->class == 'PhpAmqpLib\Connection\AMQPSocketConnection' || is_subclass_of($this->class, 'PhpAmqpLib\Connection\AMQPSocketConnection')) {
return $ref->newInstanceArgs([
$this->parameters['host'],
$this->parameters['port'],
$this->parameters['user'],
$this->parameters['password'],
$this->parameters['vhost'],
false, // insist
'AMQPLAIN', // login_method
null, // login_response
'en_US', // locale
isset($this->parameters['read_timeout']) ? $this->parameters['read_timeout'] : $this->parameters['read_write_timeout'],
$this->parameters['keepalive'],
isset($this->parameters['write_timeout']) ? $this->parameters['write_timeout'] : $this->parameters['read_write_timeout'],
$this->parameters['heartbeat']
]
);
} else {
return $ref->newInstanceArgs([
$this->parameters['host'],
$this->parameters['port'],
$this->parameters['user'],
$this->parameters['password'],
$this->parameters['vhost'],
false, // insist
'AMQPLAIN', // login_method
null, // login_response
'en_US', // locale
$this->parameters['connection_timeout'],
$this->parameters['read_write_timeout'],
$this->parameters['ssl_context'],
$this->parameters['keepalive'],
$this->parameters['heartbeat']
]);
$hosts = $this->parameters['hosts'] ?: [$this->parameters];
$options = $this->parameters;
unset($options['hosts']);

if ($this->class == AMQPSocketConnection::class || is_subclass_of($this->class, AMQPSocketConnection::class)) {
$options['read_timeout'] = $options['read_timeout'] ?? $this->parameters['read_write_timeout'];
$options['write_timeout'] = $options['write_timeout'] ?? $this->parameters['read_write_timeout'];
}

// No need to unpack options, they will be handled inside connection classes
return $this->class::create_connection($hosts, $options);
}

/**
Expand Down
9 changes: 9 additions & 0 deletions Tests/DependencyInjection/Fixtures/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ old_sound_rabbit_mq:
lazy: true
use_socket: true

cluster_connection:
hosts:
- host: cluster_host
port: 111
user: cluster_user
password: cluster_password
vhost: /cluster
- url: amqp://cluster_url_host:cluster_url_pass@host:10000/cluster_url_vhost

default:
default2:
foo_default:
Expand Down
49 changes: 49 additions & 0 deletions Tests/DependencyInjection/OldSoundRabbitMqExtensionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public function testFooConnectionDefinition()
'heartbeat' => 0,
'use_socket' => false,
'url' => '',
'hosts' => [],
), $factory->getArgument(1));
$this->assertEquals('%old_sound_rabbit_mq.connection.class%', $definition->getClass());
}
Expand Down Expand Up @@ -65,6 +66,7 @@ public function testSslConnectionDefinition()
'heartbeat' => 0,
'use_socket' => false,
'url' => '',
'hosts' => [],
), $factory->getArgument(1));
$this->assertEquals('%old_sound_rabbit_mq.connection.class%', $definition->getClass());
}
Expand Down Expand Up @@ -92,6 +94,7 @@ public function testLazyConnectionDefinition()
'heartbeat' => 0,
'use_socket' => false,
'url' => '',
'hosts' => [],
), $factory->getArgument(1));
$this->assertEquals('%old_sound_rabbit_mq.lazy.connection.class%', $definition->getClass());
}
Expand Down Expand Up @@ -119,6 +122,7 @@ public function testDefaultConnectionDefinition()
'heartbeat' => 0,
'use_socket' => false,
'url' => '',
'hosts' => [],
), $factory->getArgument(1));
$this->assertEquals('%old_sound_rabbit_mq.connection.class%', $definition->getClass());
}
Expand All @@ -141,6 +145,51 @@ public function testLazySocketConnectionDefinition()
$this->assertEquals('%old_sound_rabbit_mq.lazy.socket_connection.class%', $definiton->getClass());
}

public function testClusterConnectionDefinition()
{
$container = $this->getContainer('test.yml');

$this->assertTrue($container->has('old_sound_rabbit_mq.connection.cluster_connection'));
$definition = $container->getDefinition('old_sound_rabbit_mq.connection.cluster_connection');
$this->assertTrue($container->has('old_sound_rabbit_mq.connection_factory.cluster_connection'));
$factory = $container->getDefinition('old_sound_rabbit_mq.connection_factory.cluster_connection');
$this->assertEquals(['old_sound_rabbit_mq.connection_factory.cluster_connection', 'createConnection'], $definition->getFactory());
$this->assertEquals([
'hosts' => [
[
'host' => 'cluster_host',
'port' => 111,
'user' => 'cluster_user',
'password' => 'cluster_password',
'vhost' => '/cluster',
'url' => ''
],
[
'host' => 'localhost',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
'vhost' => '/',
'url' => 'amqp://cluster_url_host:cluster_url_pass@host:10000/cluster_url_vhost'
]
],
'host' => 'localhost',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
'vhost' => '/',
'lazy' => false,
'connection_timeout' => 3,
'read_write_timeout' => 3,
'ssl_context' => array(),
'keepalive' => false,
'heartbeat' => 0,
'use_socket' => false,
'url' => '',
], $factory->getArgument(1));
$this->assertEquals('%old_sound_rabbit_mq.connection.class%', $definition->getClass());
}

public function testFooBinding()
{
$container = $this->getContainer('test.yml');
Expand Down
Loading

0 comments on commit 9e5e0ad

Please sign in to comment.