Skip to content

Commit

Permalink
Rename publisher protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
jvanecek committed Sep 4, 2024
1 parent 0892135 commit ff411a1
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 31 deletions.
32 changes: 16 additions & 16 deletions source/Ansible-RabbitMQ-Tests/RabbitMQClientTest.class.st
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ RabbitMQClientTest >> assertQueueStatusAfterPublishing: messagesToSend on: queue

queue := publisher channel declareQueueApplying: [ :builder | builder name: queueName ].

publisher publish: messagesToSend toQueue: queueName.
publisher publish: messagesToSend to: queueName.

self wait.

Expand Down Expand Up @@ -294,7 +294,7 @@ RabbitMQClientTest >> testDebuggingLogsEnabled [
self
runMemoryLoggerDuring: [
anotherPublisher start.
anotherPublisher publish: 'Hello!' toQueue: self queueName.
anotherPublisher publish: 'Hello!' to: self queueName.
anotherPublisher stop
]
assertingLogRecordsMatchRegexes:
Expand Down Expand Up @@ -330,7 +330,7 @@ RabbitMQClientTest >> testDebuggingLogsTurnedOff [
self
runMemoryLoggerDuring: [
anotherPublisher start.
anotherPublisher publish: 'Hello!' toQueue: self queueName.
anotherPublisher publish: 'Hello!' to: self queueName.
anotherPublisher stop
]
assertingLogRecordsMatchRegexes:
Expand All @@ -356,8 +356,8 @@ RabbitMQClientTest >> testPublishingMessageWhenClientUnexpectedlyClosesConnectio

self resumeWorkerDuring: [
publisher
publish: 'Hello' toQueue: self queueName;
publish: 'World' toQueue: self queueName.
publish: 'Hello' to: self queueName;
publish: 'World' to: self queueName.

self wait.

Expand All @@ -371,7 +371,7 @@ RabbitMQClientTest >> testPublishingMessageWhenClientUnexpectedlyClosesConnectio
self
closeAllConnectionsFromTheClientSide;
waitUntilAllRabbitMQConnectionsClose.
publisher publish: 'Test connection restored' toQueue: self queueName
publisher publish: 'Test connection restored' to: self queueName
]
assertingLogRecordsMatchRegexes:
{ '\[INFO\] AMQP connection localhost\:(\d+)->localhost\:5672 closed due to (Connection close while waiting for data.|primitive #primSocketSendDone\: in Socket failed|primitive #primSocketReceiveDataAvailable\: in Socket failed)' .
Expand All @@ -391,8 +391,8 @@ RabbitMQClientTest >> testPublishingMessageWhenConnectionIsTemporallyLost [

self resumeWorkerDuring: [
publisher
publish: 'Hello' toQueue: self queueName;
publish: 'World' toQueue: self queueName.
publish: 'Hello' to: self queueName;
publish: 'World' to: self queueName.

self wait.

Expand All @@ -406,7 +406,7 @@ RabbitMQClientTest >> testPublishingMessageWhenConnectionIsTemporallyLost [
self
closeAllConnectionsFromTheBrokerSide;
waitUntilAllRabbitMQConnectionsClose.
publisher publish: 'Test connection restored' toQueue: self queueName
publisher publish: 'Test connection restored' to: self queueName
]
assertingLogRecordsMatchRegexes:
{ '\[INFO\] AMQP connection localhost\:(\d+)->localhost\:5672 closed due to CONNECTION_FORCED - CloseConnectionsTest' .
Expand All @@ -425,7 +425,7 @@ RabbitMQClientTest >> testPublishingMessageWhenConnectionIsTemporallyLost [
{ #category : 'tests' }
RabbitMQClientTest >> testPublishingMessages [

self resumeWorkerDuring: [ publisher publishAll: #( 'Hello' 'World' ) toQueue: self queueName ].
self resumeWorkerDuring: [ publisher publishAll: #( 'Hello' 'World' ) to: self queueName ].

self
assert: reversedTexts size equals: 2;
Expand All @@ -436,7 +436,7 @@ RabbitMQClientTest >> testPublishingMessages [
{ #category : 'tests' }
RabbitMQClientTest >> testPublishingOneMessage [

self resumeWorkerDuring: [ publisher publish: 'Hello' toQueue: self queueName ].
self resumeWorkerDuring: [ publisher publish: 'Hello' to: self queueName ].

self
withTheOnlyOneIn: reversedTexts
Expand All @@ -446,25 +446,25 @@ RabbitMQClientTest >> testPublishingOneMessage [
{ #category : 'tests' }
RabbitMQClientTest >> testPublishingToFanoutExchange [

| routingKey firstWorkerMessages secondWorkerMessages |
routingKey := self queueName.
| route firstWorkerMessages secondWorkerMessages |
route := self queueName.
firstWorkerMessages := OrderedCollection new.
secondWorkerMessages := OrderedCollection new.

self
runWorkerNamed: 'Reverser'
consumingFrom: 'reverser-queue'
bindedTo: 'amq.fanout'
routedBy: routingKey
routedBy: route
doing: [ :message | firstWorkerMessages add: message utf8Decoded reversed ]
during: [
self
runWorkerNamed: 'Appender'
consumingFrom: 'appender-queue'
bindedTo: 'amq.fanout'
routedBy: routingKey
routedBy: route
doing: [ :message | secondWorkerMessages add: message utf8Decoded ]
during: [ publisher publish: 'Hello' toExchange: 'amq.fanout' usingRoutingKey: routingKey ]
during: [ publisher publish: 'Hello' to: route through: 'amq.fanout' ]
].

self
Expand Down
24 changes: 12 additions & 12 deletions source/Ansible-RabbitMQ/RabbitMQPublisher.class.st
Original file line number Diff line number Diff line change
Expand Up @@ -69,17 +69,23 @@ RabbitMQPublisher >> onPublicationConfirmationDo: anAckBlock onRejectionDo: aNac
]

{ #category : 'publishing' }
RabbitMQPublisher >> publish: aMessage toExchange: anExchangeName usingRoutingKey: aRoutingKey [
RabbitMQPublisher >> publish: aMessage to: aQueueName [

self publish: aMessage to: aQueueName through: self directExchange
]

{ #category : 'publishing' }
RabbitMQPublisher >> publish: aMessage to: aRoutingKey through: anExchangeName [

self
publish: aMessage
toExchange: anExchangeName
usingRoutingKey: aRoutingKey
to: aRoutingKey
through: anExchangeName
configuredWith: [ :properties | properties bePersistent ]
]

{ #category : 'publishing' }
RabbitMQPublisher >> publish: aMessage toExchange: anExchangeName usingRoutingKey: aRoutingKey configuredWith: aConfigurationBlock [
RabbitMQPublisher >> publish: aMessage to: aRoutingKey through: anExchangeName configuredWith: aConfigurationBlock [

| properties tryToPublishMessage |
properties := connection protocolClass basicPropertiesClass new.
Expand All @@ -105,15 +111,9 @@ RabbitMQPublisher >> publish: aMessage toExchange: anExchangeName usingRoutingKe
]

{ #category : 'publishing' }
RabbitMQPublisher >> publish: aMessage toQueue: aQueueName [

self publish: aMessage toExchange: self directExchange usingRoutingKey: aQueueName
]

{ #category : 'publishing' }
RabbitMQPublisher >> publishAll: aMessageCollection toQueue: aQueueName [
RabbitMQPublisher >> publishAll: aMessageCollection to: aQueueName [

aMessageCollection do: [:message | self publish: message toQueue: aQueueName]
aMessageCollection do: [:message | self publish: message to: aQueueName]
]

{ #category : 'connecting' }
Expand Down
6 changes: 3 additions & 3 deletions source/Ansible-Tests/AMQPTest.class.st
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ AMQPTest >> publish: aMessageCollection with: aHeadersDictionary onHeadersExchan
]

{ #category : 'tests - support' }
AMQPTest >> publishAll: aMessageCollection toQueue: aQueueName [
AMQPTest >> publishAll: aMessageCollection to: aQueueName [

self
withLocalhostConnectionDo: [ :connection |
Expand Down Expand Up @@ -311,7 +311,7 @@ AMQPTest >> testBasicConsume [

| channel queue |

self publishAll: #('Do it!') toQueue: 'tasks'.
self publishAll: #('Do it!') to: 'tasks'.

self
withLocalhostConnectionDo: [ :connection |
Expand Down Expand Up @@ -352,7 +352,7 @@ AMQPTest >> testBasicConsumeWithMultipleWorkers [

self
publishAll: #('Do it!' 'Do it.!' 'Do it..!' 'Do it...!' 'Do it....!' 'Do it.....!')
toQueue: 'tasks'.
to: 'tasks'.

firstWorker := self
spawnWorkerNamed: 'first_worker'
Expand Down

0 comments on commit ff411a1

Please sign in to comment.