diff --git a/CHANGELOG b/CHANGELOG index 5939d358..202b4c9e 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,6 @@ +- 2021-05-15 + * Add possibility to use multiple RabbitMQ hosts + - 2017-01-22 * Add `graceful_max_execution_timeout` diff --git a/DependencyInjection/Configuration.php b/DependencyInjection/Configuration.php index f42a9aff..a1c2ce20 100644 --- a/DependencyInjection/Configuration.php +++ b/DependencyInjection/Configuration.php @@ -72,6 +72,22 @@ protected function addConnections(ArrayNodeDefinition $node) ->scalarNode('user')->defaultValue('guest')->end() ->scalarNode('password')->defaultValue('guest')->end() ->scalarNode('vhost')->defaultValue('/')->end() + ->arrayNode('hosts') + ->info('connection_timeout, read_write_timeout, use_socket, ssl_context, keepalive, + heartbeat and connection_parameters_provider should be specified globally when + you are using multiple hosts') + ->canBeUnset() + ->prototype('array') + ->children() + ->scalarNode('url')->defaultValue('')->end() + ->scalarNode('host')->defaultValue('localhost')->end() + ->scalarNode('port')->defaultValue(5672)->end() + ->scalarNode('user')->defaultValue('guest')->end() + ->scalarNode('password')->defaultValue('guest')->end() + ->scalarNode('vhost')->defaultValue('/')->end() + ->end() + ->end() + ->end() ->booleanNode('lazy')->defaultFalse()->end() ->scalarNode('connection_timeout')->defaultValue(3)->end() ->scalarNode('read_write_timeout')->defaultValue(3)->end() diff --git a/README.md b/README.md index 9a2f310f..b06f068e 100644 --- a/README.md +++ b/README.md @@ -195,6 +195,37 @@ It's a good idea to set the ```read_write_timeout``` to 2x the heartbeat so your Please bear in mind, that you can expect problems, if your tasks are generally running longer than the heartbeat period, to which there are no good solutions ([link](https://github.com/php-amqplib/RabbitMqBundle/issues/301)). Consider using either a big value for the heartbeat or leave the heartbeat disabled in favour of the tcp's `keepalive` (both on the client and server side) and the `graceful_max_execution_timeout` feature. +### Multiple Hosts ### + +You can provide multiple hosts for a connection. This will allow you to use RabbitMQ cluster with multiple nodes. + +```yaml + old_sound_rabbit_mq: + connections: + default: + hosts: + - host: host1 + port: 3672 + user: user1 + password: password1 + vhost: vhost1 + - url: 'amqp://guest:password@localhost:5672/vhost' + connection_timeout: 3 + read_write_timeout: 3 +``` + +Pay attention that you can not specify +```yaml + connection_timeout + read_write_timeout + use_socket + ssl_context + keepalive + heartbeat + connection_parameters_provider +``` +parameters to each host separately. + ### Dynamic Connection Parameters ### Sometimes your connection information may need to be dynamic. Dynamic connection parameters allow you to supply or diff --git a/RabbitMq/AMQPConnectionFactory.php b/RabbitMq/AMQPConnectionFactory.php index 3a2b5439..16a14542 100644 --- a/RabbitMq/AMQPConnectionFactory.php +++ b/RabbitMq/AMQPConnectionFactory.php @@ -4,6 +4,7 @@ use OldSound\RabbitMqBundle\Provider\ConnectionParametersProviderInterface; use PhpAmqpLib\Connection\AbstractConnection; +use PhpAmqpLib\Connection\AMQPSocketConnection; use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException; class AMQPConnectionFactory @@ -24,6 +25,7 @@ class AMQPConnectionFactory 'ssl_context' => null, 'keepalive' => false, 'heartbeat' => 0, + 'hosts' => [] ); /** @@ -43,6 +45,15 @@ public function __construct( $this->class = $class; $this->parameters = array_merge($this->parameters, $parameters); $this->parameters = $this->parseUrl($this->parameters); + + foreach ($this->parameters['hosts'] as $key => $hostParameters) { + if (!isset($hostParameters['url'])) { + continue; + } + + $this->parameters['hosts'][$key] = $this->parseUrl($hostParameters); + } + if (is_array($this->parameters['ssl_context'])) { $this->parameters['ssl_context'] = ! empty($this->parameters['ssl_context']) ? stream_context_create(array('ssl' => $this->parameters['ssl_context'])) @@ -57,50 +68,26 @@ public function __construct( * Creates the appropriate connection using current parameters. * * @return AbstractConnection + * @throws \Exception */ public function createConnection() { - $ref = new \ReflectionClass($this->class); - if (isset($this->parameters['constructor_args']) && is_array($this->parameters['constructor_args'])) { - return $ref->newInstanceArgs($this->parameters['constructor_args']); + $constructorArgs = array_values($this->parameters['constructor_args']); + return new $this->class(...$constructorArgs); } - if ($this->class == 'PhpAmqpLib\Connection\AMQPSocketConnection' || is_subclass_of($this->class, 'PhpAmqpLib\Connection\AMQPSocketConnection')) { - return $ref->newInstanceArgs([ - $this->parameters['host'], - $this->parameters['port'], - $this->parameters['user'], - $this->parameters['password'], - $this->parameters['vhost'], - false, // insist - 'AMQPLAIN', // login_method - null, // login_response - 'en_US', // locale - isset($this->parameters['read_timeout']) ? $this->parameters['read_timeout'] : $this->parameters['read_write_timeout'], - $this->parameters['keepalive'], - isset($this->parameters['write_timeout']) ? $this->parameters['write_timeout'] : $this->parameters['read_write_timeout'], - $this->parameters['heartbeat'] - ] - ); - } else { - return $ref->newInstanceArgs([ - $this->parameters['host'], - $this->parameters['port'], - $this->parameters['user'], - $this->parameters['password'], - $this->parameters['vhost'], - false, // insist - 'AMQPLAIN', // login_method - null, // login_response - 'en_US', // locale - $this->parameters['connection_timeout'], - $this->parameters['read_write_timeout'], - $this->parameters['ssl_context'], - $this->parameters['keepalive'], - $this->parameters['heartbeat'] - ]); + $hosts = $this->parameters['hosts'] ?: [$this->parameters]; + $options = $this->parameters; + unset($options['hosts']); + + if ($this->class == AMQPSocketConnection::class || is_subclass_of($this->class, AMQPSocketConnection::class)) { + $options['read_timeout'] = $options['read_timeout'] ?? $this->parameters['read_write_timeout']; + $options['write_timeout'] = $options['write_timeout'] ?? $this->parameters['read_write_timeout']; } + + // No need to unpack options, they will be handled inside connection classes + return $this->class::create_connection($hosts, $options); } /** diff --git a/Tests/DependencyInjection/Fixtures/test.yml b/Tests/DependencyInjection/Fixtures/test.yml index adef3b89..0c0e6119 100644 --- a/Tests/DependencyInjection/Fixtures/test.yml +++ b/Tests/DependencyInjection/Fixtures/test.yml @@ -45,6 +45,15 @@ old_sound_rabbit_mq: lazy: true use_socket: true + cluster_connection: + hosts: + - host: cluster_host + port: 111 + user: cluster_user + password: cluster_password + vhost: /cluster + - url: amqp://cluster_url_host:cluster_url_pass@host:10000/cluster_url_vhost + default: default2: foo_default: diff --git a/Tests/DependencyInjection/OldSoundRabbitMqExtensionTest.php b/Tests/DependencyInjection/OldSoundRabbitMqExtensionTest.php index a2500dc9..e6f151bc 100644 --- a/Tests/DependencyInjection/OldSoundRabbitMqExtensionTest.php +++ b/Tests/DependencyInjection/OldSoundRabbitMqExtensionTest.php @@ -36,6 +36,7 @@ public function testFooConnectionDefinition() 'heartbeat' => 0, 'use_socket' => false, 'url' => '', + 'hosts' => [], ), $factory->getArgument(1)); $this->assertEquals('%old_sound_rabbit_mq.connection.class%', $definition->getClass()); } @@ -65,6 +66,7 @@ public function testSslConnectionDefinition() 'heartbeat' => 0, 'use_socket' => false, 'url' => '', + 'hosts' => [], ), $factory->getArgument(1)); $this->assertEquals('%old_sound_rabbit_mq.connection.class%', $definition->getClass()); } @@ -92,6 +94,7 @@ public function testLazyConnectionDefinition() 'heartbeat' => 0, 'use_socket' => false, 'url' => '', + 'hosts' => [], ), $factory->getArgument(1)); $this->assertEquals('%old_sound_rabbit_mq.lazy.connection.class%', $definition->getClass()); } @@ -119,6 +122,7 @@ public function testDefaultConnectionDefinition() 'heartbeat' => 0, 'use_socket' => false, 'url' => '', + 'hosts' => [], ), $factory->getArgument(1)); $this->assertEquals('%old_sound_rabbit_mq.connection.class%', $definition->getClass()); } @@ -141,6 +145,51 @@ public function testLazySocketConnectionDefinition() $this->assertEquals('%old_sound_rabbit_mq.lazy.socket_connection.class%', $definiton->getClass()); } + public function testClusterConnectionDefinition() + { + $container = $this->getContainer('test.yml'); + + $this->assertTrue($container->has('old_sound_rabbit_mq.connection.cluster_connection')); + $definition = $container->getDefinition('old_sound_rabbit_mq.connection.cluster_connection'); + $this->assertTrue($container->has('old_sound_rabbit_mq.connection_factory.cluster_connection')); + $factory = $container->getDefinition('old_sound_rabbit_mq.connection_factory.cluster_connection'); + $this->assertEquals(['old_sound_rabbit_mq.connection_factory.cluster_connection', 'createConnection'], $definition->getFactory()); + $this->assertEquals([ + 'hosts' => [ + [ + 'host' => 'cluster_host', + 'port' => 111, + 'user' => 'cluster_user', + 'password' => 'cluster_password', + 'vhost' => '/cluster', + 'url' => '' + ], + [ + 'host' => 'localhost', + 'port' => 5672, + 'user' => 'guest', + 'password' => 'guest', + 'vhost' => '/', + 'url' => 'amqp://cluster_url_host:cluster_url_pass@host:10000/cluster_url_vhost' + ] + ], + 'host' => 'localhost', + 'port' => 5672, + 'user' => 'guest', + 'password' => 'guest', + 'vhost' => '/', + 'lazy' => false, + 'connection_timeout' => 3, + 'read_write_timeout' => 3, + 'ssl_context' => array(), + 'keepalive' => false, + 'heartbeat' => 0, + 'use_socket' => false, + 'url' => '', + ], $factory->getArgument(1)); + $this->assertEquals('%old_sound_rabbit_mq.connection.class%', $definition->getClass()); + } + public function testFooBinding() { $container = $this->getContainer('test.yml'); diff --git a/Tests/RabbitMq/AMQPConnectionFactoryTest.php b/Tests/RabbitMq/AMQPConnectionFactoryTest.php index d82d9c31..3e3f724e 100644 --- a/Tests/RabbitMq/AMQPConnectionFactoryTest.php +++ b/Tests/RabbitMq/AMQPConnectionFactoryTest.php @@ -6,6 +6,7 @@ use OldSound\RabbitMqBundle\RabbitMq\AMQPConnectionFactory; use OldSound\RabbitMqBundle\Tests\RabbitMq\Fixtures\AMQPConnection; use OldSound\RabbitMqBundle\Tests\RabbitMq\Fixtures\AMQPSocketConnection; +use OldSound\RabbitMqBundle\Tests\RabbitMq\Fixtures\AMQPSSLConnection; use PHPUnit\Framework\MockObject\MockObject; use PHPUnit\Framework\TestCase; @@ -14,14 +15,14 @@ class AMQPConnectionFactoryTest extends TestCase public function testDefaultValues() { $factory = new AMQPConnectionFactory( - 'OldSound\RabbitMqBundle\Tests\RabbitMq\Fixtures\AMQPConnection', - array() + AMQPConnection::class, + [] ); /** @var AMQPConnection $instance */ $instance = $factory->createConnection(); - $this->assertInstanceOf('OldSound\RabbitMqBundle\Tests\RabbitMq\Fixtures\AMQPConnection', $instance); - $this->assertEquals(array( + $this->assertInstanceOf(AMQPConnection::class, $instance); + $this->assertEquals([ 'localhost', // host 5672, // port 'guest', // user @@ -36,20 +37,20 @@ public function testDefaultValues() null, // context false, // keepalive 0, // heartbeat - ), $instance->constructParams); + ], $instance->constructParams); } public function testSocketConnection() { $factory = new AMQPConnectionFactory( - 'OldSound\RabbitMqBundle\Tests\RabbitMq\Fixtures\AMQPSocketConnection', - array() + AMQPSocketConnection::class, + [] ); /** @var AMQPSocketConnection $instance */ $instance = $factory->createConnection(); - $this->assertInstanceOf('PhpAmqpLib\Connection\AMQPSocketConnection', $instance); - $this->assertEquals(array( + $this->assertInstanceOf(AMQPSocketConnection::class, $instance); + $this->assertEquals([ 'localhost', // host 5672, // port 'guest', // user @@ -63,23 +64,23 @@ public function testSocketConnection() false, // keepalive 3, // write_timeout 0, // heartbeat - ), $instance->constructParams); + ], $instance->constructParams); } public function testSocketConnectionWithParams() { $factory = new AMQPConnectionFactory( - 'OldSound\RabbitMqBundle\Tests\RabbitMq\Fixtures\AMQPSocketConnection', - array( + AMQPSocketConnection::class, + [ 'read_timeout' => 31, 'write_timeout' => 32, - ) + ] ); /** @var AMQPSocketConnection $instance */ $instance = $factory->createConnection(); - $this->assertInstanceOf('PhpAmqpLib\Connection\AMQPSocketConnection', $instance); - $this->assertEquals(array( + $this->assertInstanceOf(AMQPSocketConnection::class, $instance); + $this->assertEquals([ 'localhost', // host 5672, // port 'guest', // user @@ -93,26 +94,26 @@ public function testSocketConnectionWithParams() false, // keepalive 32, // write_timeout 0, // heartbeat - ), $instance->constructParams); + ], $instance->constructParams); } public function testStandardConnectionParameters() { $factory = new AMQPConnectionFactory( - 'OldSound\RabbitMqBundle\Tests\RabbitMq\Fixtures\AMQPConnection', - array( + AMQPConnection::class, + [ 'host' => 'foo_host', 'port' => 123, 'user' => 'foo_user', 'password' => 'foo_password', 'vhost' => '/vhost', - ) + ] ); /** @var AMQPConnection $instance */ $instance = $factory->createConnection(); - $this->assertInstanceOf('OldSound\RabbitMqBundle\Tests\RabbitMq\Fixtures\AMQPConnection', $instance); - $this->assertEquals(array( + $this->assertInstanceOf(AMQPConnection::class, $instance); + $this->assertEquals([ 'foo_host', // host 123, // port 'foo_user', // user @@ -127,27 +128,27 @@ public function testStandardConnectionParameters() null, // context false, // keepalive 0, // heartbeat - ), $instance->constructParams); + ], $instance->constructParams); } public function testSetConnectionParametersWithUrl() { $factory = new AMQPConnectionFactory( - 'OldSound\RabbitMqBundle\Tests\RabbitMq\Fixtures\AMQPConnection', - array( + AMQPConnection::class, + [ 'url' => 'amqp://bar_user:bar_password@bar_host:321/whost?keepalive=1&connection_timeout=6&read_write_timeout=6', 'host' => 'foo_host', 'port' => 123, 'user' => 'foo_user', 'password' => 'foo_password', 'vhost' => '/vhost', - ) + ] ); /** @var AMQPConnection $instance */ $instance = $factory->createConnection(); - $this->assertInstanceOf('OldSound\RabbitMqBundle\Tests\RabbitMq\Fixtures\AMQPConnection', $instance); - $this->assertEquals(array( + $this->assertInstanceOf(AMQPConnection::class, $instance); + $this->assertEquals([ 'bar_host', // host 321, // port 'bar_user', // user @@ -162,22 +163,22 @@ public function testSetConnectionParametersWithUrl() null, // context true, // keepalive 0, // heartbeat - ), $instance->constructParams); + ], $instance->constructParams); } public function testSetConnectionParametersWithUrlEncoded() { $factory = new AMQPConnectionFactory( - 'OldSound\RabbitMqBundle\Tests\RabbitMq\Fixtures\AMQPConnection', - array( + AMQPConnection::class, + [ 'url' => 'amqp://user%61:%61pass@ho%61st:10000/v%2fhost?keepalive=1&connection_timeout=6&read_write_timeout=6', - ) + ] ); /** @var AMQPConnection $instance */ $instance = $factory->createConnection(); - $this->assertInstanceOf('OldSound\RabbitMqBundle\Tests\RabbitMq\Fixtures\AMQPConnection', $instance); - $this->assertEquals(array( + $this->assertInstanceOf(AMQPConnection::class, $instance); + $this->assertEquals([ 'hoast', // host 10000, // port 'usera', // user @@ -192,22 +193,22 @@ public function testSetConnectionParametersWithUrlEncoded() null, // context true, // keepalive 0, // heartbeat - ), $instance->constructParams); + ], $instance->constructParams); } public function testSetConnectionParametersWithUrlWithoutVhost() { $factory = new AMQPConnectionFactory( - 'OldSound\RabbitMqBundle\Tests\RabbitMq\Fixtures\AMQPConnection', - array( + AMQPConnection::class, + [ 'url' => 'amqp://user:pass@host:321/?keepalive=1&connection_timeout=6&read_write_timeout=6', - ) + ] ); /** @var AMQPConnection $instance */ $instance = $factory->createConnection(); - $this->assertInstanceOf('OldSound\RabbitMqBundle\Tests\RabbitMq\Fixtures\AMQPConnection', $instance); - $this->assertEquals(array( + $this->assertInstanceOf(AMQPConnection::class, $instance); + $this->assertEquals([ 'host', // host 321, // port 'user', // user @@ -222,52 +223,390 @@ public function testSetConnectionParametersWithUrlWithoutVhost() null, // context true, // keepalive 0, // heartbeat - ), $instance->constructParams); + ], $instance->constructParams); } public function testSSLConnectionParameters() { $factory = new AMQPConnectionFactory( - 'OldSound\RabbitMqBundle\Tests\RabbitMq\Fixtures\AMQPConnection', - array( + AMQPSSLConnection::class, + [ 'host' => 'ssl_host', 'port' => 123, 'user' => 'ssl_user', 'password' => 'ssl_password', 'vhost' => '/ssl', - 'ssl_context' => array( + 'ssl_context' => [ 'verify_peer' => false, - ), - ) + ], + ] ); - /** @var AMQPConnection $instance */ + /** @var AMQPSSLConnection $instance */ $instance = $factory->createConnection(); - $this->assertInstanceOf('OldSound\RabbitMqBundle\Tests\RabbitMq\Fixtures\AMQPConnection', $instance); - $this->assertArrayHasKey(11, $instance->constructParams); - $context = $instance->constructParams[11]; + $this->assertInstanceOf(AMQPSSLConnection::class, $instance); + $this->assertArrayHasKey(6, $instance->constructParams); + $options = $instance->constructParams[6]; + $this->assertArrayHasKey('ssl_context', $options); + $context = $options['ssl_context']; // unset to check whole array at once later - $instance->constructParams[11] = null; + $instance->constructParams[6]['ssl_context'] = null; $this->assertIsResource($context); $this->assertEquals('stream-context', get_resource_type($context)); $options = stream_context_get_options($context); - $this->assertEquals(array('ssl' => array('verify_peer' => false)), $options); - $this->assertEquals(array( - 'ssl_host', // host - 123, // port - 'ssl_user', // user + $this->assertEquals(['ssl' => ['verify_peer' => false]], $options); + $this->assertEquals([ + 'ssl_host', // host + 123 , // port + 'ssl_user', // user 'ssl_password', // password - '/ssl', // vhost + '/ssl', // vhost, + [], // ssl_options + [ // options + 'url' => '', + 'host' => 'ssl_host', + 'port' => 123, + 'user' => 'ssl_user', + 'password' => 'ssl_password', + 'vhost' => '/ssl', + 'connection_timeout' => 3, + 'read_write_timeout' => 3, + 'ssl_context' => null, // context checked earlier + 'keepalive' => false, + 'heartbeat' => 0, + ] + ], $instance->constructParams); + } + + public function testClusterConnectionParametersWithoutRootConnectionKeys() + { + $factory = new AMQPConnectionFactory( + AMQPConnection::class, + [ + 'hosts' => [ + [ + 'host' => 'cluster_host', + 'port' => 123, + 'user' => 'cluster_user', + 'password' => 'cluster_password', + 'vhost' => '/cluster_vhost', + ], + [ + 'url' => 'amqp://user:pass@host:321/vhost', + ] + ] + ] + ); + + /** @var AMQPConnection $instance */ + $instance = $factory->createConnection(); + $this->assertInstanceOf(AMQPConnection::class, $instance); + $this->assertEquals([ + 'cluster_host', // host + 123, // port + 'cluster_user', // user + 'cluster_password', // password + '/cluster_vhost', // vhost + false, // insist + "AMQPLAIN", // login method + null, // login response + "en_US", // locale + 3, // connection timeout + 3, // read write timeout + null, // context + false, // keepalive + 0, // heartbeat + ], $instance->constructParams); + + $this->assertEquals( + [ + [ + [ + 'host' => 'cluster_host', + 'port' => 123, + 'user' => 'cluster_user', + 'password' => 'cluster_password', + 'vhost' => '/cluster_vhost', + ], + [ + 'host' => 'host', + 'port' => 321, + 'user' => 'user', + 'password' => 'pass', + 'vhost' => 'vhost', + ] + ], + [ + 'url' => '', + 'host' => 'localhost', + 'port' => 5672, + 'user' => 'guest', + 'password' => 'guest', + 'vhost' => '/', + 'connection_timeout' => 3, + 'read_write_timeout' => 3, + 'ssl_context' => null, + 'keepalive' => false, + 'heartbeat' => 0 + ] + ], + $instance::$createConnectionParams + ); + } + + public function testClusterConnectionParametersWithRootConnectionKeys() + { + $factory = new AMQPConnectionFactory( + AMQPConnection::class, + [ + 'host' => 'host', + 'port' => 123, + 'user' => 'user', + 'password' => 'password', + 'vhost' => '/vhost', + 'hosts' => [ + [ + 'host' => 'cluster_host', + 'port' => 123, + 'user' => 'cluster_user', + 'password' => 'cluster_password', + 'vhost' => '/vhost', + ], + ] + ] + ); + + /** @var AMQPConnection $instance */ + $instance = $factory->createConnection(); + $this->assertInstanceOf(AMQPConnection::class, $instance); + $this->assertEquals([ + 'cluster_host', // host + 123, // port + 'cluster_user', // user + 'cluster_password', // password + '/vhost', // vhost false, // insist "AMQPLAIN", // login method null, // login response "en_US", // locale 3, // connection timeout 3, // read write timeout - null, // context checked earlier + null, // context false, // keepalive 0, // heartbeat - ), $instance->constructParams); + ], $instance->constructParams); + + $this->assertEquals( + [ + [ + [ + 'host' => 'cluster_host', + 'port' => 123, + 'user' => 'cluster_user', + 'password' => 'cluster_password', + 'vhost' => '/vhost', + ] + ], + [ + 'url' => '', + 'host' => 'host', + 'port' => 123, + 'user' => 'user', + 'password' => 'password', + 'vhost' => '/vhost', + 'connection_timeout' => 3, + 'read_write_timeout' => 3, + 'ssl_context' => null, + 'keepalive' => false, + 'heartbeat' => 0 + ] + ], + $instance::$createConnectionParams + ); + } + + public function testSSLClusterConnectionParameters() + { + $factory = new AMQPConnectionFactory( + AMQPSSLConnection::class, + [ + 'hosts' => [ + [ + 'host' => 'ssl_cluster_host', + 'port' => 123, + 'user' => 'ssl_cluster_user', + 'password' => 'ssl_cluster_password', + 'vhost' => '/ssl_cluster_vhost', + ], + [ + 'url' => 'amqp://user:pass@host:321/vhost', + ] + ], + 'ssl_context' => [ + 'verify_peer' => false, + ], + ] + ); + + /** @var AMQPSSLConnection $instance */ + $instance = $factory->createConnection(); + $this->assertInstanceOf(AMQPSSLConnection::class, $instance); + + $this->assertArrayHasKey(6, $instance->constructParams); + $options = $instance->constructParams[6]; + $this->assertArrayHasKey('ssl_context', $options); + $context = $options['ssl_context']; + // unset to check whole array at once later + $instance->constructParams[6]['ssl_context'] = null; + $this->assertIsResource($context); + $this->assertEquals('stream-context', get_resource_type($context)); + $options = stream_context_get_options($context); + $this->assertEquals(['ssl' => ['verify_peer' => false]], $options); + + $this->assertArrayHasKey(1, $instance::$createConnectionParams); + $createConnectionOptions = $instance::$createConnectionParams[1]; + $this->assertArrayHasKey('ssl_context', $createConnectionOptions); + $createConnectionContext = $createConnectionOptions['ssl_context']; + // unset to check whole array at once later + $instance::$createConnectionParams[1]['ssl_context'] = null; + $this->assertIsResource($createConnectionContext); + $this->assertEquals('stream-context', get_resource_type($createConnectionContext)); + $createConnectionOptions = stream_context_get_options($createConnectionContext); + $this->assertEquals(['ssl' => ['verify_peer' => false]], $createConnectionOptions); + + $this->assertEquals([ + 'ssl_cluster_host', // host + 123 , // port + 'ssl_cluster_user', // user + 'ssl_cluster_password', // password + '/ssl_cluster_vhost', // vhost, + [], // ssl_options + [ // options + 'url' => '', + 'host' => 'localhost', + 'port' => 5672, + 'user' => 'guest', + 'password' => 'guest', + 'vhost' => '/', + 'connection_timeout' => 3, + 'read_write_timeout' => 3, + 'ssl_context' => null, + 'keepalive' => false, + 'heartbeat' => 0, + ] + ], $instance->constructParams); + + $this->assertEquals( + [ + [ + [ + 'host' => 'ssl_cluster_host', + 'port' => 123, + 'user' => 'ssl_cluster_user', + 'password' => 'ssl_cluster_password', + 'vhost' => '/ssl_cluster_vhost', + ], + [ + 'host' => 'host', + 'port' => 321, + 'user' => 'user', + 'password' => 'pass', + 'vhost' => 'vhost', + ] + ], + [ + 'url' => '', + 'host' => 'localhost', + 'port' => 5672, + 'user' => 'guest', + 'password' => 'guest', + 'vhost' => '/', + 'connection_timeout' => 3, + 'read_write_timeout' => 3, + 'ssl_context' => null, // context checked earlier + 'keepalive' => false, + 'heartbeat' => 0 + ] + ], + $instance::$createConnectionParams + ); + } + + public function testSocketClusterConnectionParameters() + { + $factory = new AMQPConnectionFactory( + AMQPSocketConnection::class, + [ + 'hosts' => [ + [ + 'host' => 'cluster_host', + 'port' => 123, + 'user' => 'cluster_user', + 'password' => 'cluster_password', + 'vhost' => '/cluster_vhost', + ], + [ + 'url' => 'amqp://user:pass@host:321/vhost', + ] + ] + ] + ); + + /** @var AMQPSocketConnection $instance */ + $instance = $factory->createConnection(); + $this->assertInstanceOf(AMQPSocketConnection::class, $instance); + $this->assertEquals([ + 'cluster_host', // host + 123, // port + 'cluster_user', // user + 'cluster_password', // password + '/cluster_vhost', // vhost + false, // insist + "AMQPLAIN", // login method + null, // login response + "en_US", // locale + 3, // read_timeout + false, // keepalive + 3, // write_timeout + 0, // heartbeat + ], $instance->constructParams); + + $this->assertEquals( + [ + [ + [ + 'host' => 'cluster_host', + 'port' => 123, + 'user' => 'cluster_user', + 'password' => 'cluster_password', + 'vhost' => '/cluster_vhost', + ], + [ + 'host' => 'host', + 'port' => 321, + 'user' => 'user', + 'password' => 'pass', + 'vhost' => 'vhost', + ] + ], + [ + 'url' => '', + 'host' => 'localhost', + 'port' => 5672, + 'user' => 'guest', + 'password' => 'guest', + 'vhost' => '/', + 'ssl_context' => null, + 'keepalive' => false, + 'heartbeat' => 0, + 'connection_timeout' => 3, + 'read_write_timeout' => 3, + 'read_timeout' => 3, + 'write_timeout' => 3 + ] + ], + $instance::$createConnectionParams + ); } public function testConnectionsParametersProviderWithConstructorArgs() @@ -276,20 +615,20 @@ public function testConnectionsParametersProviderWithConstructorArgs() $connectionParametersProvider->expects($this->once()) ->method('getConnectionParameters') ->will($this->returnValue( - array( - 'constructor_args' => array(1,2,3,4) - ) + [ + 'constructor_args' => [1,2,3,4] + ] )); $factory = new AMQPConnectionFactory( - 'OldSound\RabbitMqBundle\Tests\RabbitMq\Fixtures\AMQPConnection', - array(), + AMQPConnection::class, + [], $connectionParametersProvider ); /** @var AMQPConnection $instance */ $instance = $factory->createConnection(); - $this->assertInstanceOf('OldSound\RabbitMqBundle\Tests\RabbitMq\Fixtures\AMQPConnection', $instance); - $this->assertEquals(array(1,2,3,4), $instance->constructParams); + $this->assertInstanceOf(AMQPConnection::class, $instance); + $this->assertEquals([1,2,3,4], $instance->constructParams); } public function testConnectionsParametersProvider() @@ -298,24 +637,24 @@ public function testConnectionsParametersProvider() $connectionParametersProvider->expects($this->once()) ->method('getConnectionParameters') ->will($this->returnValue( - array( + [ 'host' => '1.2.3.4', 'port' => 5678, 'user' => 'admin', 'password' => 'admin', 'vhost' => 'foo', - ) + ] )); $factory = new AMQPConnectionFactory( - 'OldSound\RabbitMqBundle\Tests\RabbitMq\Fixtures\AMQPConnection', - array(), + AMQPConnection::class, + [], $connectionParametersProvider ); /** @var AMQPConnection $instance */ $instance = $factory->createConnection(); - $this->assertInstanceOf('OldSound\RabbitMqBundle\Tests\RabbitMq\Fixtures\AMQPConnection', $instance); - $this->assertEquals(array( + $this->assertInstanceOf(AMQPConnection::class, $instance); + $this->assertEquals([ '1.2.3.4', // host 5678, // port 'admin', // user @@ -330,7 +669,7 @@ public function testConnectionsParametersProvider() null, // context false, // keepalive 0, // heartbeat - ), $instance->constructParams); + ], $instance->constructParams); } /** @@ -340,7 +679,7 @@ public function testConnectionsParametersProvider() */ private function prepareConnectionParametersProvider() { - return $this->getMockBuilder('OldSound\RabbitMqBundle\Provider\ConnectionParametersProviderInterface') + return $this->getMockBuilder(ConnectionParametersProviderInterface::class) ->getMock(); } } diff --git a/Tests/RabbitMq/Fixtures/AMQPConnection.php b/Tests/RabbitMq/Fixtures/AMQPConnection.php index 073dea48..8be87581 100644 --- a/Tests/RabbitMq/Fixtures/AMQPConnection.php +++ b/Tests/RabbitMq/Fixtures/AMQPConnection.php @@ -2,13 +2,25 @@ namespace OldSound\RabbitMqBundle\Tests\RabbitMq\Fixtures; -class AMQPConnection +use PhpAmqpLib\Connection\AMQPStreamConnection; + +class AMQPConnection extends AMQPStreamConnection { public $constructParams; + public static $createConnectionParams; + public function __construct() { // save params for direct access in tests $this->constructParams = func_get_args(); } + + public static function create_connection($hosts, $options = array()) + { + // save params for direct access in tests + self::$createConnectionParams = func_get_args(); + + return parent::create_connection($hosts, $options); + } } diff --git a/Tests/RabbitMq/Fixtures/AMQPSSLConnection.php b/Tests/RabbitMq/Fixtures/AMQPSSLConnection.php new file mode 100644 index 00000000..1825fce2 --- /dev/null +++ b/Tests/RabbitMq/Fixtures/AMQPSSLConnection.php @@ -0,0 +1,26 @@ +constructParams = func_get_args(); + } + + public static function create_connection($hosts, $options = array()) + { + // save params for direct access in tests + self::$createConnectionParams = func_get_args(); + + return parent::create_connection($hosts, $options); + } +} diff --git a/Tests/RabbitMq/Fixtures/AMQPSocketConnection.php b/Tests/RabbitMq/Fixtures/AMQPSocketConnection.php index 48241bff..c56cffe8 100644 --- a/Tests/RabbitMq/Fixtures/AMQPSocketConnection.php +++ b/Tests/RabbitMq/Fixtures/AMQPSocketConnection.php @@ -8,9 +8,19 @@ class AMQPSocketConnection extends BaseAMQPSocketConnection { public $constructParams; + public static $createConnectionParams; + public function __construct() { // save params for direct access in tests $this->constructParams = func_get_args(); } + + public static function create_connection($hosts, $options = array()) + { + // save params for direct access in tests + self::$createConnectionParams = func_get_args(); + + return parent::create_connection($hosts, $options); + } } diff --git a/Tests/RabbitMq/RpcClientTest.php b/Tests/RabbitMq/RpcClientTest.php index 2428b610..0052b24a 100644 --- a/Tests/RabbitMq/RpcClientTest.php +++ b/Tests/RabbitMq/RpcClientTest.php @@ -2,6 +2,7 @@ namespace OldSound\RabbitMqBundle\Tests\RabbitMq; +use InvalidArgumentException; use OldSound\RabbitMqBundle\RabbitMq\RpcClient; use PhpAmqpLib\Exception\AMQPTimeoutException; use PhpAmqpLib\Message\AMQPMessage; @@ -64,7 +65,7 @@ public function testInvalidParameterOnNotify() ->disableOriginalConstructor() ->getMock(); - $this->expectException('\InvalidArgumentException'); + $this->expectException(InvalidArgumentException::class); $client->notify('not a callable'); } @@ -84,7 +85,7 @@ public function testChannelCancelOnGetRepliesException() ->method('wait') ->willThrowException(new AMQPTimeoutException()); - $this->expectException('\PhpAmqpLib\Exception\AMQPTimeoutException'); + $this->expectException(AMQPTimeoutException::class); $channel->expects($this->once()) ->method('basic_cancel');