diff --git a/source/Ansible-RabbitMQ-Tests/RabbitMQTextReverser.class.st b/source/Ansible-RabbitMQ-Tests/RabbitMQTextReverser.class.st index 1e814075..c015fab5 100644 --- a/source/Ansible-RabbitMQ-Tests/RabbitMQTextReverser.class.st +++ b/source/Ansible-RabbitMQ-Tests/RabbitMQTextReverser.class.st @@ -20,17 +20,13 @@ RabbitMQTextReverser >> channel [ ^ channel ] -{ #category : 'private' } -RabbitMQTextReverser >> configureConnection: builder [ - - -] - { #category : 'initialization' } RabbitMQTextReverser >> initializeWorkingWith: aTestCase [ - testCase := aTestCase. - self initializeConnection + testCase := aTestCase. + self initializeConfiguredBy: ( Dictionary new + at: #hostname put: 'localhost'; + yourself ) ] { #category : 'private' } diff --git a/source/Ansible-RabbitMQ/RabbitMQClient.class.st b/source/Ansible-RabbitMQ/RabbitMQClient.class.st index 1b484f08..24ba42ed 100644 --- a/source/Ansible-RabbitMQ/RabbitMQClient.class.st +++ b/source/Ansible-RabbitMQ/RabbitMQClient.class.st @@ -9,7 +9,9 @@ Class { #superclass : 'Object', #instVars : [ 'connection', - 'builder' + 'builder', + 'channel', + 'options' ], #category : 'Ansible-RabbitMQ', #package : 'Ansible-RabbitMQ' @@ -23,15 +25,21 @@ RabbitMQClient class >> isAbstract [ ] { #category : 'private - configuring' } -RabbitMQClient >> configureConnection: builder [ +RabbitMQClient >> connectivityErrors [ - self subclassResponsibility + ^ NetworkError , ConnectionClosed , AmqpDisconnectedError ] -{ #category : 'private - configuring' } -RabbitMQClient >> connectivityErrors [ +{ #category : 'private' } +RabbitMQClient >> ensureChannelOpen [ - ^ NetworkError , ConnectionClosed , AmqpDisconnectedError + | reconnect | + + reconnect := [ + self ensureConnectedAndOpen. + channel := connection createChannel + ]. + channel ifNil: reconnect ifNotNil: [ channel whenOpenDo: [ ] whenClosedDo: reconnect ] ] { #category : 'private' } @@ -57,11 +65,14 @@ RabbitMQClient >> ensureConnectedAndOpen [ ] { #category : 'initialization' } -RabbitMQClient >> initializeConnection [ - - builder := AmqpConnectionBuilder usingAMQP091Protocol. - self configureConnection: builder. - connection := builder build +RabbitMQClient >> initializeConfiguredBy: anOptionsDictionary [ + + options := anOptionsDictionary. + builder := AmqpConnectionBuilder usingAMQP091Protocol. + builder hostname: ( options at: #hostname ). + builder portNumber: ( options at: #port ifAbsent: [ 5672 ] ). + builder username: ( options at: #username ifAbsent: [ 'guest' ] ). + builder password: ( options at: #password ifAbsent: [ 'guest' ] ) ] { #category : 'private - connecting' } @@ -69,84 +80,53 @@ RabbitMQClient >> logFailedConnectionAttempt: attemptNumber dueTo: error [ LogRecord emitError: ( 'Attempt #<1p>/<2p> to connect to RabbitMQ failed: <3s>' expandMacrosWith: attemptNumber - with: self retryCount + 1 + with: self retryCount with: error messageText ) ] -{ #category : 'private - connecting' } -RabbitMQClient >> openConnection [ - - self withSuccessfulConnectionDo: [ :succesfulConnection | - LogRecord emitInfo: 'Connecting to RabbitMQ' during: [ - self - try: [ succesfulConnection open ] - onConnectivityErrorDo: [ :attemptNumber :error | - self logFailedConnectionAttempt: attemptNumber dueTo: error ] - ] - ] -] - { #category : 'private - configuring' } -RabbitMQClient >> options [ +RabbitMQClient >> retryCount [ - ^ Dictionary new + ^ options at: #maximumConnectionAttemps ifAbsent: [ 3 ] ] -{ #category : 'private - configuring' } -RabbitMQClient >> retryCount [ +{ #category : 'private - testing' } +RabbitMQClient >> shouldLogDebuggingInfo [ - ^ self options at: #retryCount ifAbsent: [ 2 ] + ^ options at: #enableDebuggingLogs ifAbsent: [ false ] ] { #category : 'controlling' } RabbitMQClient >> start [ - self openConnection. - connection - whenConnected: [ - LogRecord emitInfo: 'Connected to RabbitMQ'. - self startProcessing - ] - whenNot: [ :error | - LogRecord emitError: - ( 'Cannot connect to RabbitMQ, <1s>' expandMacrosWith: error messageText ). - AmqpDisconnectedError signal: error messageText - ] -] - -{ #category : 'private' } -RabbitMQClient >> startProcessing [ - - self subclassResponsibility + self ensureChannelOpen ] { #category : 'controlling' } RabbitMQClient >> stop [ connection - whenConnected: [ connection close ] - whenNot: [ LogRecord emitWarning: 'RabbitMQ connection was already closed.' ] + ifNil: [] + ifNotNil: [ + connection + whenConnected: [connection close] + whenNot: [LogRecord emitWarning: 'RabbitMQ connection was already closed.']] ] -{ #category : 'private - connecting' } -RabbitMQClient >> try: aBlock onConnectivityErrorDo: failBlock [ +{ #category : 'private - accessing' } +RabbitMQClient >> timeframeBetweenAttempts [ - Retry value: aBlock configuredBy: [ :retry | - retry - upTo: self retryCount; - on: self connectivityErrors evaluating: failBlock. - self options at: #retry ifPresent: [ :action | action value: retry ] - ] + ^ Duration milliSeconds: ( options at: #timeSlotBetweenConnectionRetriesInMs ifAbsent: [ 300 ] ) ] { #category : 'private - connecting' } -RabbitMQClient >> withSuccessfulConnectionDo: aBlock [ - - self - try: [ connection whenConnected: [ aBlock value: connection ] whenNot: [ :error | error signal ] ] - onConnectivityErrorDo: [ :attemptNumber :error | - self logFailedConnectionAttempt: attemptNumber dueTo: error. - LogRecord emitWarning: 'Reconnecting socket to RabbitMQ'. - connection initializeSocketConnection - ] +RabbitMQClient >> try: aBlock onConnectivityErrorDo: failBlock [ + + Retry value: aBlock configuredBy: [ :retry | + retry + upTo: self retryCount; + backoffExponentiallyWithTimeSlot: self timeframeBetweenAttempts; + on: self connectivityErrors evaluating: failBlock. + options at: #retry ifPresent: [ :action | action value: retry ] + ] ] diff --git a/source/Ansible-RabbitMQ/RabbitMQPublisher.class.st b/source/Ansible-RabbitMQ/RabbitMQPublisher.class.st index 2c01d3ca..a48980f2 100644 --- a/source/Ansible-RabbitMQ/RabbitMQPublisher.class.st +++ b/source/Ansible-RabbitMQ/RabbitMQPublisher.class.st @@ -8,13 +8,7 @@ as the producers and consumers can operate independently. " Class { #name : 'RabbitMQPublisher', - #superclass : 'Object', - #instVars : [ - 'options', - 'builder', - 'channel', - 'connection' - ], + #superclass : 'RabbitMQClient', #category : 'Ansible-RabbitMQ', #package : 'Ansible-RabbitMQ' } @@ -29,72 +23,6 @@ RabbitMQPublisher class >> configuredBy: aConfigurationAction [ ^ self new initializeConfiguredBy: options ] -{ #category : 'private - accessing' } -RabbitMQPublisher >> channel [ - - ^ channel -] - -{ #category : 'private - accessing' } -RabbitMQPublisher >> connectivityErrors [ - - ^ NetworkError , ConnectionClosed , AmqpDisconnectedError -] - -{ #category : 'private - connecting' } -RabbitMQPublisher >> ensureChannelOpen [ - - | reconnect | - - reconnect := [ - self ensureConnectedAndOpen. - channel := connection createChannel - ]. - channel ifNil: reconnect ifNotNil: [ channel whenOpenDo: [ ] whenClosedDo: reconnect ] -] - -{ #category : 'private - connecting' } -RabbitMQPublisher >> ensureConnectedAndOpen [ - - | createConnection | - - createConnection := [ - self - try: [ - connection := builder build. - connection open] - onConnectivityErrorDo: [:attemptNumber :error | - self logFailedConnectionAttempt: attemptNumber dueTo: error]]. - - connection - ifNil: createConnection - ifNotNil: [connection whenConnected: [] whenNot: createConnection]. - - connection - whenOpen: [] - whenNot: [connection closeReason ifNil: [connection open] ifNotNil: createConnection] -] - -{ #category : 'initialization' } -RabbitMQPublisher >> initializeConfiguredBy: anOptionsDictionary [ - - options := anOptionsDictionary. - builder := AmqpConnectionBuilder usingAMQP091Protocol. - builder hostname: (options at: #hostname). - builder portNumber: (options at: #port ifAbsent: [5672]). - builder username: (options at: #username ifAbsent: ['guest']). - builder password: (options at: #password ifAbsent: ['guest']) -] - -{ #category : 'private - logging' } -RabbitMQPublisher >> logFailedConnectionAttempt: anAttemptNumber dueTo: anError [ - - LogRecord emitError: ( 'Attempt #<1p>/<2p> to connect to RabbitMQ failed: <3s>' - expandMacrosWith: anAttemptNumber - with: self retryCount - with: anError messageText ) -] - { #category : 'publishing' } RabbitMQPublisher >> publish: aMessageCollection onQueueNamed: aQueueName [ @@ -122,49 +50,8 @@ RabbitMQPublisher >> publishOnly: aMessage onQueueNamed: aQueueName [ properties: (connection protocolClass basicPropertiesClass new deliveryMode: 2) ] -{ #category : 'private - accessing' } -RabbitMQPublisher >> retryCount [ - - ^options at: #maximumConnectionAttemps ifAbsent: [3] -] - -{ #category : 'private - testing' } -RabbitMQPublisher >> shouldLogDebuggingInfo [ - - ^options at: #enableDebuggingLogs ifAbsent: [false] -] - { #category : 'connecting' } RabbitMQPublisher >> start [ self ensureChannelOpen ] - -{ #category : 'connecting' } -RabbitMQPublisher >> stop [ - - connection - ifNil: [] - ifNotNil: [ - connection - whenConnected: [connection close] - whenNot: [LogRecord emitWarning: 'RabbitMQ connection was already closed.']] -] - -{ #category : 'private - accessing' } -RabbitMQPublisher >> timeframeBetweenAttempts [ - - ^Duration milliSeconds: (options at: #timeSlotBetweenConnectionRetriesInMs ifAbsent: [300]) -] - -{ #category : 'private - connecting' } -RabbitMQPublisher >> try: aBlock onConnectivityErrorDo: failBlock [ - - Retry - value: aBlock - configuredBy: [:retry | - retry - upTo: self retryCount; - backoffExponentiallyWithTimeSlot: self timeframeBetweenAttempts; - on: self connectivityErrors evaluating: failBlock] -] diff --git a/source/Ansible-RabbitMQ/RabbitMQWorker.class.st b/source/Ansible-RabbitMQ/RabbitMQWorker.class.st index 881037af..9a06bb66 100644 --- a/source/Ansible-RabbitMQ/RabbitMQWorker.class.st +++ b/source/Ansible-RabbitMQ/RabbitMQWorker.class.st @@ -14,9 +14,6 @@ task during a short HTTP request window. Class { #name : 'RabbitMQWorker', #superclass : 'RabbitMQClient', - #instVars : [ - 'channel' - ], #category : 'Ansible-RabbitMQ', #package : 'Ansible-RabbitMQ' } @@ -29,29 +26,16 @@ RabbitMQWorker class >> isAbstract [ ] { #category : 'private' } -RabbitMQWorker >> configureChannel [ - - channel queueDeclare: self queueName durable: true. - channel prefetchCount: 1. - channel consumeFrom: self queueName applying: [ :message | - self process: message body. - channel basicAck: message method deliveryTag - ] +RabbitMQWorker >> declareQueueInChannel [ + + channel queueDeclare: self queueName durable: true ] { #category : 'private' } RabbitMQWorker >> ensureChannelOpen [ - | reconnect | - - reconnect := [ - self ensureConnectedAndOpen. - channel := connection createChannel - ]. - - channel ifNil: reconnect ifNotNil: [ channel whenOpenDo: [ ] whenClosedDo: reconnect ]. - - self configureChannel + super ensureChannelOpen. + self declareQueueInChannel ] { #category : 'private' } @@ -75,15 +59,37 @@ RabbitMQWorker >> queueName [ ^ self subclassResponsibility ] +{ #category : 'controlling' } +RabbitMQWorker >> start [ + + super start. + self startProcessing +] + { #category : 'private' } -RabbitMQWorker >> startProcessing [ +RabbitMQWorker >> startConsumingFromQueue [ + + self ensureChannelOpen. + + channel prefetchCount: 1. + channel consumeFrom: self queueName applying: [ :message | + self process: message body. + channel basicAck: message method deliveryTag + ] +] - self ensureChannelOpen. +{ #category : 'private' } +RabbitMQWorker >> startProcessing [ - [[ connection waitForEvent ] - on: self connectivityErrors - do: [ :error | - self - logDisconnectionDueTo: error; - ensureChannelOpen]] repeat + self startConsumingFromQueue. + + [ + [ connection waitForEvent ] + on: self connectivityErrors + do: [ :error | + self + logDisconnectionDueTo: error; + startConsumingFromQueue + ] + ] repeat ]