From d3b2188c9712b6de74b96b8c4652d9579f09105c Mon Sep 17 00:00:00 2001 From: Agustin Salvidio Date: Mon, 5 Aug 2024 20:56:51 -0300 Subject: [PATCH] :wrench: Add extra client properties to AmqpConnection --- .../AmqpConnection.class.st | 64 ++++++++++++------- .../AmqpConnectionBuilder.class.st | 13 +++- .../Ansible-RabbitMQ/RabbitMQClient.class.st | 20 ++++-- .../RabbitMQPublisher.class.st | 26 ++++++-- 4 files changed, 84 insertions(+), 39 deletions(-) diff --git a/source/Ansible-Protocol-Core/AmqpConnection.class.st b/source/Ansible-Protocol-Core/AmqpConnection.class.st index 6e6dfc9b..761c6781 100644 --- a/source/Ansible-Protocol-Core/AmqpConnection.class.st +++ b/source/Ansible-Protocol-Core/AmqpConnection.class.st @@ -15,21 +15,28 @@ Class { 'heartbeatSender', 'hostname', 'socketConnectionStatus', - 'portNumber' + 'portNumber', + 'extraClientProperties' ], #category : 'Ansible-Protocol-Core', #package : 'Ansible-Protocol-Core' } { #category : 'instance creation' } -AmqpConnection class >> to: aHostname over: aPort using: aProtocolVersion with: connectionCredentials parameterizedBy: connectionParameters [ +AmqpConnection class >> to: aHostname + over: aPort + using: aProtocolVersion + with: aConnectionCredentialCollection + parameterizedBy: aConnectionParameterCollection + extraProperties: aClientPropertyCollection [ ^ self new initializeTo: aHostname over: aPort using: aProtocolVersion - with: connectionCredentials - parameterizedBy: connectionParameters + with: aConnectionCredentialCollection + parameterizedBy: aConnectionParameterCollection + extraProperties: aClientPropertyCollection ] { #category : 'connection-handling' } @@ -187,15 +194,23 @@ AmqpConnection >> initializeSocketConnection [ ] { #category : 'initialization' } -AmqpConnection >> initializeTo: aHostname over: aPort using: aProtocolVersion with: connectionCredentials parameterizedBy: connectionParameters [ +AmqpConnection >> initializeTo: aHostname + over: aPort + using: aProtocolVersion + with: aConnectionCredentialCollection + parameterizedBy: aConnectionParameterCollection + extraProperties: aClientPropertyCollection [ protocolClass := aProtocolVersion. hostname := aHostname. portNumber := aPort. - credentials := connectionCredentials. - parameters := connectionParameters. - self initializeSocketConnection. - self initializeHeartbeatSender + credentials := aConnectionCredentialCollection. + parameters := aConnectionParameterCollection. + extraClientProperties := aClientPropertyCollection. + + self + initializeSocketConnection; + initializeHeartbeatSender ] { #category : 'private-opening' } @@ -439,20 +454,23 @@ AmqpConnection >> socketConnectionStatus [ AmqpConnection >> startConnection [ self withNextFrameDo: [ :nextFrame | - | response | - - response := credentials responseFor: nextFrame method. - response ifNil: [ - AmqpDisconnectedError signal: 'No acceptable SASL mechanism for the given credentials' ]. - self - sendMethod: ( self protocolClass connectionStartOkMethod new - clientProperties: ( Dictionary new - at: 'product' put: 'RabbitMQ Smalltalk'; - yourself ); - mechanism: response key; - response: response value ) - onChannel: 0. - credentials := nil + | response clientProperties | + + response := credentials responseFor: nextFrame method. + response ifNil: [ + AmqpDisconnectedError signal: 'No acceptable SASL mechanism for the given credentials' ]. + clientProperties := Dictionary new + at: 'product' put: 'RabbitMQ Smalltalk'; + yourself. + extraClientProperties keysAndValuesDo: [ :key :value | clientProperties at: key put: value ]. + + self + sendMethod: ( self protocolClass connectionStartOkMethod new + clientProperties: clientProperties; + mechanism: response key; + response: response value ) + onChannel: 0. + credentials := nil ] ] diff --git a/source/Ansible-Protocol-Core/AmqpConnectionBuilder.class.st b/source/Ansible-Protocol-Core/AmqpConnectionBuilder.class.st index 8cbf420e..86878a49 100644 --- a/source/Ansible-Protocol-Core/AmqpConnectionBuilder.class.st +++ b/source/Ansible-Protocol-Core/AmqpConnectionBuilder.class.st @@ -7,7 +7,8 @@ Class { 'username', 'password', 'portNumber', - 'protocolClass' + 'protocolClass', + 'clientProperties' ], #category : 'Ansible-Protocol-Core', #package : 'Ansible-Protocol-Core' @@ -19,10 +20,16 @@ AmqpConnectionBuilder class >> forProtocol: aProtocolClass [ ^self new initializeForProtocol: aProtocolClass ] +{ #category : 'initialization' } +AmqpConnectionBuilder >> atClientProperty: aName put: aValue [ + + clientProperties at: aName put: aValue +] + { #category : 'building' } AmqpConnectionBuilder >> build [ - protocolClass ifNil: [ Error signal: 'Protocol must be configured' ]. + protocolClass ifNil: [ Error signal: 'Protocol must be configured' ]. ^ AmqpConnection to: hostname @@ -30,6 +37,7 @@ AmqpConnectionBuilder >> build [ using: protocolClass with: self credentials parameterizedBy: parameters + extraProperties: clientProperties ] { #category : 'private-accessing' } @@ -60,6 +68,7 @@ AmqpConnectionBuilder >> initialize [ username: 'guest'; password: 'guest'; hostname: 'localhost'. + clientProperties := Dictionary new. parameters := AmqpConnectionParameters new channelMax: 0; frameMax: 131072; diff --git a/source/Ansible-RabbitMQ/RabbitMQClient.class.st b/source/Ansible-RabbitMQ/RabbitMQClient.class.st index c9a2db26..3563cafb 100644 --- a/source/Ansible-RabbitMQ/RabbitMQClient.class.st +++ b/source/Ansible-RabbitMQ/RabbitMQClient.class.st @@ -38,13 +38,19 @@ RabbitMQClient >> connectivityErrors [ { #category : 'initialization' } RabbitMQClient >> createAMQPConnection [ - | builder | - builder := AmqpConnectionBuilder usingAMQP091Protocol. - builder hostname: ( options at: #hostname ifAbsent: [ 'localhost' ] ). - builder portNumber: ( options at: #port ifAbsent: [ 5672 ] ). - builder username: ( options at: #username ifAbsent: [ 'guest' ] ). - builder password: ( options at: #password ifAbsent: [ 'guest' ] ). - ^ builder build + | builder | + + builder := AmqpConnectionBuilder usingAMQP091Protocol. + builder hostname: ( options at: #hostname ifAbsent: [ 'localhost' ] ). + builder portNumber: ( options at: #port ifAbsent: [ 5672 ] ). + builder username: ( options at: #username ifAbsent: [ 'guest' ] ). + builder password: ( options at: #password ifAbsent: [ 'guest' ] ). + options at: 'extraClientProperties' ifPresent: [ :extraProperties | + extraProperties keysAndValuesDo: [ :propertyName :propertyValue | + builder atClientProperty: propertyName put: propertyValue ] + ]. + + ^ builder build ] { #category : 'private' } diff --git a/source/Ansible-RabbitMQ/RabbitMQPublisher.class.st b/source/Ansible-RabbitMQ/RabbitMQPublisher.class.st index 13d54727..b2f1a3d2 100644 --- a/source/Ansible-RabbitMQ/RabbitMQPublisher.class.st +++ b/source/Ansible-RabbitMQ/RabbitMQPublisher.class.st @@ -57,13 +57,17 @@ RabbitMQPublisher >> publish: aMessageCollection onQueueNamed: aQueueName [ { #category : 'publishing' } RabbitMQPublisher >> publishOnly: aMessage onQueueNamed: aQueueName [ - self ensureChannelOpen. + | tryToPublishMessage | - channel - basicPublish: aMessage utf8Encoded - exchange: '' - routingKey: aQueueName - properties: ( connection protocolClass basicPropertiesClass new deliveryMode: 2 ). + self ensureChannelOpen. + tryToPublishMessage := [ + channel + basicPublish: aMessage utf8Encoded + exchange: '' + routingKey: aQueueName + properties: + ( connection protocolClass basicPropertiesClass new deliveryMode: 2 ) + ]. self shouldLogDebuggingInfo then: [ LogRecord emitStructuredDebuggingInfo: 'RabbitMQ message published' with: [ :data | @@ -72,7 +76,15 @@ RabbitMQPublisher >> publishOnly: aMessage onQueueNamed: aQueueName [ at: #routingKey put: aQueueName; at: #connectionDescription put: connection connectionPairsDescription ] - ] + ]. + + tryToPublishMessage + on: self connectivityErrors + do: [ :signal | + connection hardCloseDescribedWith: signal messageText. + self ensureChannelOpen. + signal return: tryToPublishMessage value + ] ] { #category : 'connecting' }