Skip to content

Commit

Permalink
Refactor the RabbitMQClient hierarchy to make the worker and publishe…
Browse files Browse the repository at this point in the history
…r subclasses of it.

Push up the publisher's logic to reconnect to RabbitMQClient
  • Loading branch information
jvanecek committed Jul 27, 2024
1 parent d75c19a commit 4169541
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 193 deletions.
110 changes: 48 additions & 62 deletions source/Ansible-RabbitMQ/RabbitMQClient.class.st
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ Class {
#superclass : 'Object',
#instVars : [
'connection',
'builder'
'builder',
'channel',
'options'
],
#category : 'Ansible-RabbitMQ',
#package : 'Ansible-RabbitMQ'
Expand All @@ -23,15 +25,21 @@ RabbitMQClient class >> isAbstract [
]

{ #category : 'private - configuring' }
RabbitMQClient >> configureConnection: builder [
RabbitMQClient >> connectivityErrors [

self subclassResponsibility
^ NetworkError , ConnectionClosed , AmqpDisconnectedError
]

{ #category : 'private - configuring' }
RabbitMQClient >> connectivityErrors [
{ #category : 'private' }
RabbitMQClient >> ensureChannelOpen [

^ NetworkError , ConnectionClosed , AmqpDisconnectedError
| reconnect |

reconnect := [
self ensureConnectedAndOpen.
channel := connection createChannel
].
channel ifNil: reconnect ifNotNil: [ channel whenOpenDo: [ ] whenClosedDo: reconnect ]
]

{ #category : 'private' }
Expand All @@ -57,96 +65,74 @@ RabbitMQClient >> ensureConnectedAndOpen [
]

{ #category : 'initialization' }
RabbitMQClient >> initializeConnection [

builder := AmqpConnectionBuilder usingAMQP091Protocol.
self configureConnection: builder.
connection := builder build
RabbitMQClient >> initializeConfiguredBy: anOptionsDictionary [

options := anOptionsDictionary.
builder := AmqpConnectionBuilder usingAMQP091Protocol.
builder hostname: ( options at: #hostname ).
builder portNumber: ( options at: #port ifAbsent: [ 5672 ] ).
builder username: ( options at: #username ifAbsent: [ 'guest' ] ).
builder password: ( options at: #password ifAbsent: [ 'guest' ] )
]

{ #category : 'private - connecting' }
RabbitMQClient >> logFailedConnectionAttempt: attemptNumber dueTo: error [

LogRecord emitError: ( 'Attempt #<1p>/<2p> to connect to RabbitMQ failed: <3s>'
expandMacrosWith: attemptNumber
with: self retryCount + 1
with: self retryCount
with: error messageText )
]

{ #category : 'private - connecting' }
RabbitMQClient >> openConnection [

self withSuccessfulConnectionDo: [ :succesfulConnection |
LogRecord emitInfo: 'Connecting to RabbitMQ' during: [
self
try: [ succesfulConnection open ]
onConnectivityErrorDo: [ :attemptNumber :error |
self logFailedConnectionAttempt: attemptNumber dueTo: error ]
]
]
]

{ #category : 'private - configuring' }
RabbitMQClient >> options [

^ Dictionary new
^ options
]

{ #category : 'private - configuring' }
RabbitMQClient >> retryCount [

^ self options at: #retryCount ifAbsent: [ 2 ]
^ self options at: #maximumConnectionAttemps ifAbsent: [ 3 ]
]

{ #category : 'controlling' }
RabbitMQClient >> start [
{ #category : 'private - testing' }
RabbitMQClient >> shouldLogDebuggingInfo [

self openConnection.
connection
whenConnected: [
LogRecord emitInfo: 'Connected to RabbitMQ'.
self startProcessing
]
whenNot: [ :error |
LogRecord emitError:
( 'Cannot connect to RabbitMQ, <1s>' expandMacrosWith: error messageText ).
AmqpDisconnectedError signal: error messageText
]
^options at: #enableDebuggingLogs ifAbsent: [false]
]

{ #category : 'private' }
RabbitMQClient >> startProcessing [
{ #category : 'controlling' }
RabbitMQClient >> start [

self subclassResponsibility
self ensureChannelOpen
]

{ #category : 'controlling' }
RabbitMQClient >> stop [

connection
whenConnected: [ connection close ]
whenNot: [ LogRecord emitWarning: 'RabbitMQ connection was already closed.' ]
ifNil: []
ifNotNil: [
connection
whenConnected: [connection close]
whenNot: [LogRecord emitWarning: 'RabbitMQ connection was already closed.']]
]

{ #category : 'private - connecting' }
RabbitMQClient >> try: aBlock onConnectivityErrorDo: failBlock [
{ #category : 'private - accessing' }
RabbitMQClient >> timeframeBetweenAttempts [

Retry value: aBlock configuredBy: [ :retry |
retry
upTo: self retryCount;
on: self connectivityErrors evaluating: failBlock.
self options at: #retry ifPresent: [ :action | action value: retry ]
]
^Duration milliSeconds: (options at: #timeSlotBetweenConnectionRetriesInMs ifAbsent: [300])
]

{ #category : 'private - connecting' }
RabbitMQClient >> withSuccessfulConnectionDo: aBlock [

self
try: [ connection whenConnected: [ aBlock value: connection ] whenNot: [ :error | error signal ] ]
onConnectivityErrorDo: [ :attemptNumber :error |
self logFailedConnectionAttempt: attemptNumber dueTo: error.
LogRecord emitWarning: 'Reconnecting socket to RabbitMQ'.
connection initializeSocketConnection
]
RabbitMQClient >> try: aBlock onConnectivityErrorDo: failBlock [

Retry value: aBlock configuredBy: [ :retry |
retry
upTo: self retryCount;
backoffExponentiallyWithTimeSlot: self timeframeBetweenAttempts;
on: self connectivityErrors evaluating: failBlock.
self options at: #retry ifPresent: [ :action | action value: retry ]
]
]
109 changes: 1 addition & 108 deletions source/Ansible-RabbitMQ/RabbitMQPublisher.class.st
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,7 @@ as the producers and consumers can operate independently.
"
Class {
#name : 'RabbitMQPublisher',
#superclass : 'Object',
#instVars : [
'options',
'builder',
'channel',
'connection'
],
#superclass : 'RabbitMQClient',
#category : 'Ansible-RabbitMQ',
#package : 'Ansible-RabbitMQ'
}
Expand All @@ -35,66 +29,6 @@ RabbitMQPublisher >> channel [
^ channel
]

{ #category : 'private - accessing' }
RabbitMQPublisher >> connectivityErrors [

^ NetworkError , ConnectionClosed , AmqpDisconnectedError
]

{ #category : 'private - connecting' }
RabbitMQPublisher >> ensureChannelOpen [

| reconnect |

reconnect := [
self ensureConnectedAndOpen.
channel := connection createChannel
].
channel ifNil: reconnect ifNotNil: [ channel whenOpenDo: [ ] whenClosedDo: reconnect ]
]

{ #category : 'private - connecting' }
RabbitMQPublisher >> ensureConnectedAndOpen [

| createConnection |

createConnection := [
self
try: [
connection := builder build.
connection open]
onConnectivityErrorDo: [:attemptNumber :error |
self logFailedConnectionAttempt: attemptNumber dueTo: error]].

connection
ifNil: createConnection
ifNotNil: [connection whenConnected: [] whenNot: createConnection].

connection
whenOpen: []
whenNot: [connection closeReason ifNil: [connection open] ifNotNil: createConnection]
]

{ #category : 'initialization' }
RabbitMQPublisher >> initializeConfiguredBy: anOptionsDictionary [

options := anOptionsDictionary.
builder := AmqpConnectionBuilder usingAMQP091Protocol.
builder hostname: (options at: #hostname).
builder portNumber: (options at: #port ifAbsent: [5672]).
builder username: (options at: #username ifAbsent: ['guest']).
builder password: (options at: #password ifAbsent: ['guest'])
]

{ #category : 'private - logging' }
RabbitMQPublisher >> logFailedConnectionAttempt: anAttemptNumber dueTo: anError [

LogRecord emitError: ( 'Attempt #<1p>/<2p> to connect to RabbitMQ failed: <3s>'
expandMacrosWith: anAttemptNumber
with: self retryCount
with: anError messageText )
]

{ #category : 'publishing' }
RabbitMQPublisher >> publish: aMessageCollection onQueueNamed: aQueueName [

Expand Down Expand Up @@ -122,49 +56,8 @@ RabbitMQPublisher >> publishOnly: aMessage onQueueNamed: aQueueName [
properties: (connection protocolClass basicPropertiesClass new deliveryMode: 2)
]

{ #category : 'private - accessing' }
RabbitMQPublisher >> retryCount [

^options at: #maximumConnectionAttemps ifAbsent: [3]
]

{ #category : 'private - testing' }
RabbitMQPublisher >> shouldLogDebuggingInfo [

^options at: #enableDebuggingLogs ifAbsent: [false]
]

{ #category : 'connecting' }
RabbitMQPublisher >> start [

self ensureChannelOpen
]

{ #category : 'connecting' }
RabbitMQPublisher >> stop [

connection
ifNil: []
ifNotNil: [
connection
whenConnected: [connection close]
whenNot: [LogRecord emitWarning: 'RabbitMQ connection was already closed.']]
]

{ #category : 'private - accessing' }
RabbitMQPublisher >> timeframeBetweenAttempts [

^Duration milliSeconds: (options at: #timeSlotBetweenConnectionRetriesInMs ifAbsent: [300])
]

{ #category : 'private - connecting' }
RabbitMQPublisher >> try: aBlock onConnectivityErrorDo: failBlock [

Retry
value: aBlock
configuredBy: [:retry |
retry
upTo: self retryCount;
backoffExponentiallyWithTimeSlot: self timeframeBetweenAttempts;
on: self connectivityErrors evaluating: failBlock]
]
49 changes: 26 additions & 23 deletions source/Ansible-RabbitMQ/RabbitMQWorker.class.st
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ task during a short HTTP request window.
Class {
#name : 'RabbitMQWorker',
#superclass : 'RabbitMQClient',
#instVars : [
'channel'
],
#category : 'Ansible-RabbitMQ',
#package : 'Ansible-RabbitMQ'
}
Expand All @@ -29,29 +26,16 @@ RabbitMQWorker class >> isAbstract [
]

{ #category : 'private' }
RabbitMQWorker >> configureChannel [

channel queueDeclare: self queueName durable: true.
channel prefetchCount: 1.
channel consumeFrom: self queueName applying: [ :message |
self process: message body.
channel basicAck: message method deliveryTag
]
RabbitMQWorker >> declareQueueInChannel [

channel queueDeclare: self queueName durable: true
]

{ #category : 'private' }
RabbitMQWorker >> ensureChannelOpen [

| reconnect |

reconnect := [
self ensureConnectedAndOpen.
channel := connection createChannel
].

channel ifNil: reconnect ifNotNil: [ channel whenOpenDo: [ ] whenClosedDo: reconnect ].

self configureChannel
super ensureChannelOpen.
self declareQueueInChannel
]

{ #category : 'private' }
Expand All @@ -75,15 +59,34 @@ RabbitMQWorker >> queueName [
^ self subclassResponsibility
]

{ #category : 'controlling' }
RabbitMQWorker >> start [

super start.
self startProcessing
]

{ #category : 'private' }
RabbitMQWorker >> startConsumingFromQueue [

self ensureChannelOpen.

channel prefetchCount: 1.
channel consumeFrom: self queueName applying: [ :message |
self process: message body.
channel basicAck: message method deliveryTag
]
]

{ #category : 'private' }
RabbitMQWorker >> startProcessing [

self ensureChannelOpen.
self startConsumingFromQueue.

[[ connection waitForEvent ]
on: self connectivityErrors
do: [ :error |
self
logDisconnectionDueTo: error;
ensureChannelOpen]] repeat
startConsumingFromQueue]] repeat
]

0 comments on commit 4169541

Please sign in to comment.