Skip to content

Commit

Permalink
Refactor the RabbitMQClient hierarchy to make the worker and publishe…
Browse files Browse the repository at this point in the history
…r subclasses of it.

Push up the publisher's logic to reconnect to RabbitMQClient
  • Loading branch information
jvanecek committed Jul 27, 2024
1 parent 9cf586d commit f12e539
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 217 deletions.
12 changes: 4 additions & 8 deletions source/Ansible-RabbitMQ-Tests/RabbitMQTextReverser.class.st
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,13 @@ RabbitMQTextReverser >> channel [
^ channel
]

{ #category : 'private' }
RabbitMQTextReverser >> configureConnection: builder [


]

{ #category : 'initialization' }
RabbitMQTextReverser >> initializeWorkingWith: aTestCase [

testCase := aTestCase.
self initializeConnection
testCase := aTestCase.
self initializeConfiguredBy: ( Dictionary new
at: #hostname put: 'localhost';
yourself )
]

{ #category : 'private' }
Expand Down
112 changes: 46 additions & 66 deletions source/Ansible-RabbitMQ/RabbitMQClient.class.st
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ Class {
#superclass : 'Object',
#instVars : [
'connection',
'builder'
'builder',
'channel',
'options'
],
#category : 'Ansible-RabbitMQ',
#package : 'Ansible-RabbitMQ'
Expand All @@ -23,15 +25,21 @@ RabbitMQClient class >> isAbstract [
]

{ #category : 'private - configuring' }
RabbitMQClient >> configureConnection: builder [
RabbitMQClient >> connectivityErrors [

self subclassResponsibility
^ NetworkError , ConnectionClosed , AmqpDisconnectedError
]

{ #category : 'private - configuring' }
RabbitMQClient >> connectivityErrors [
{ #category : 'private' }
RabbitMQClient >> ensureChannelOpen [

^ NetworkError , ConnectionClosed , AmqpDisconnectedError
| reconnect |

reconnect := [
self ensureConnectedAndOpen.
channel := connection createChannel
].
channel ifNil: reconnect ifNotNil: [ channel whenOpenDo: [ ] whenClosedDo: reconnect ]
]

{ #category : 'private' }
Expand All @@ -57,96 +65,68 @@ RabbitMQClient >> ensureConnectedAndOpen [
]

{ #category : 'initialization' }
RabbitMQClient >> initializeConnection [

builder := AmqpConnectionBuilder usingAMQP091Protocol.
self configureConnection: builder.
connection := builder build
RabbitMQClient >> initializeConfiguredBy: anOptionsDictionary [

options := anOptionsDictionary.
builder := AmqpConnectionBuilder usingAMQP091Protocol.
builder hostname: ( options at: #hostname ).
builder portNumber: ( options at: #port ifAbsent: [ 5672 ] ).
builder username: ( options at: #username ifAbsent: [ 'guest' ] ).
builder password: ( options at: #password ifAbsent: [ 'guest' ] )
]

{ #category : 'private - connecting' }
RabbitMQClient >> logFailedConnectionAttempt: attemptNumber dueTo: error [

LogRecord emitError: ( 'Attempt #<1p>/<2p> to connect to RabbitMQ failed: <3s>'
expandMacrosWith: attemptNumber
with: self retryCount + 1
with: self retryCount
with: error messageText )
]

{ #category : 'private - connecting' }
RabbitMQClient >> openConnection [

self withSuccessfulConnectionDo: [ :succesfulConnection |
LogRecord emitInfo: 'Connecting to RabbitMQ' during: [
self
try: [ succesfulConnection open ]
onConnectivityErrorDo: [ :attemptNumber :error |
self logFailedConnectionAttempt: attemptNumber dueTo: error ]
]
]
]

{ #category : 'private - configuring' }
RabbitMQClient >> options [
RabbitMQClient >> retryCount [

^ Dictionary new
^ options at: #maximumConnectionAttemps ifAbsent: [ 3 ]
]

{ #category : 'private - configuring' }
RabbitMQClient >> retryCount [
{ #category : 'private - testing' }
RabbitMQClient >> shouldLogDebuggingInfo [

^ self options at: #retryCount ifAbsent: [ 2 ]
^ options at: #enableDebuggingLogs ifAbsent: [ false ]
]

{ #category : 'controlling' }
RabbitMQClient >> start [

self openConnection.
connection
whenConnected: [
LogRecord emitInfo: 'Connected to RabbitMQ'.
self startProcessing
]
whenNot: [ :error |
LogRecord emitError:
( 'Cannot connect to RabbitMQ, <1s>' expandMacrosWith: error messageText ).
AmqpDisconnectedError signal: error messageText
]
]

{ #category : 'private' }
RabbitMQClient >> startProcessing [

self subclassResponsibility
self ensureChannelOpen
]

{ #category : 'controlling' }
RabbitMQClient >> stop [

connection
whenConnected: [ connection close ]
whenNot: [ LogRecord emitWarning: 'RabbitMQ connection was already closed.' ]
ifNil: []
ifNotNil: [
connection
whenConnected: [connection close]
whenNot: [LogRecord emitWarning: 'RabbitMQ connection was already closed.']]
]

{ #category : 'private - connecting' }
RabbitMQClient >> try: aBlock onConnectivityErrorDo: failBlock [
{ #category : 'private - accessing' }
RabbitMQClient >> timeframeBetweenAttempts [

Retry value: aBlock configuredBy: [ :retry |
retry
upTo: self retryCount;
on: self connectivityErrors evaluating: failBlock.
self options at: #retry ifPresent: [ :action | action value: retry ]
]
^ Duration milliSeconds: ( options at: #timeSlotBetweenConnectionRetriesInMs ifAbsent: [ 300 ] )
]

{ #category : 'private - connecting' }
RabbitMQClient >> withSuccessfulConnectionDo: aBlock [

self
try: [ connection whenConnected: [ aBlock value: connection ] whenNot: [ :error | error signal ] ]
onConnectivityErrorDo: [ :attemptNumber :error |
self logFailedConnectionAttempt: attemptNumber dueTo: error.
LogRecord emitWarning: 'Reconnecting socket to RabbitMQ'.
connection initializeSocketConnection
]
RabbitMQClient >> try: aBlock onConnectivityErrorDo: failBlock [

Retry value: aBlock configuredBy: [ :retry |
retry
upTo: self retryCount;
backoffExponentiallyWithTimeSlot: self timeframeBetweenAttempts;
on: self connectivityErrors evaluating: failBlock.
options at: #retry ifPresent: [ :action | action value: retry ]
]
]
115 changes: 1 addition & 114 deletions source/Ansible-RabbitMQ/RabbitMQPublisher.class.st
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,7 @@ as the producers and consumers can operate independently.
"
Class {
#name : 'RabbitMQPublisher',
#superclass : 'Object',
#instVars : [
'options',
'builder',
'channel',
'connection'
],
#superclass : 'RabbitMQClient',
#category : 'Ansible-RabbitMQ',
#package : 'Ansible-RabbitMQ'
}
Expand All @@ -29,72 +23,6 @@ RabbitMQPublisher class >> configuredBy: aConfigurationAction [
^ self new initializeConfiguredBy: options
]

{ #category : 'private - accessing' }
RabbitMQPublisher >> channel [

^ channel
]

{ #category : 'private - accessing' }
RabbitMQPublisher >> connectivityErrors [

^ NetworkError , ConnectionClosed , AmqpDisconnectedError
]

{ #category : 'private - connecting' }
RabbitMQPublisher >> ensureChannelOpen [

| reconnect |

reconnect := [
self ensureConnectedAndOpen.
channel := connection createChannel
].
channel ifNil: reconnect ifNotNil: [ channel whenOpenDo: [ ] whenClosedDo: reconnect ]
]

{ #category : 'private - connecting' }
RabbitMQPublisher >> ensureConnectedAndOpen [

| createConnection |

createConnection := [
self
try: [
connection := builder build.
connection open]
onConnectivityErrorDo: [:attemptNumber :error |
self logFailedConnectionAttempt: attemptNumber dueTo: error]].

connection
ifNil: createConnection
ifNotNil: [connection whenConnected: [] whenNot: createConnection].

connection
whenOpen: []
whenNot: [connection closeReason ifNil: [connection open] ifNotNil: createConnection]
]

{ #category : 'initialization' }
RabbitMQPublisher >> initializeConfiguredBy: anOptionsDictionary [

options := anOptionsDictionary.
builder := AmqpConnectionBuilder usingAMQP091Protocol.
builder hostname: (options at: #hostname).
builder portNumber: (options at: #port ifAbsent: [5672]).
builder username: (options at: #username ifAbsent: ['guest']).
builder password: (options at: #password ifAbsent: ['guest'])
]

{ #category : 'private - logging' }
RabbitMQPublisher >> logFailedConnectionAttempt: anAttemptNumber dueTo: anError [

LogRecord emitError: ( 'Attempt #<1p>/<2p> to connect to RabbitMQ failed: <3s>'
expandMacrosWith: anAttemptNumber
with: self retryCount
with: anError messageText )
]

{ #category : 'publishing' }
RabbitMQPublisher >> publish: aMessageCollection onQueueNamed: aQueueName [

Expand Down Expand Up @@ -122,49 +50,8 @@ RabbitMQPublisher >> publishOnly: aMessage onQueueNamed: aQueueName [
properties: (connection protocolClass basicPropertiesClass new deliveryMode: 2)
]

{ #category : 'private - accessing' }
RabbitMQPublisher >> retryCount [

^options at: #maximumConnectionAttemps ifAbsent: [3]
]

{ #category : 'private - testing' }
RabbitMQPublisher >> shouldLogDebuggingInfo [

^options at: #enableDebuggingLogs ifAbsent: [false]
]

{ #category : 'connecting' }
RabbitMQPublisher >> start [

self ensureChannelOpen
]

{ #category : 'connecting' }
RabbitMQPublisher >> stop [

connection
ifNil: []
ifNotNil: [
connection
whenConnected: [connection close]
whenNot: [LogRecord emitWarning: 'RabbitMQ connection was already closed.']]
]

{ #category : 'private - accessing' }
RabbitMQPublisher >> timeframeBetweenAttempts [

^Duration milliSeconds: (options at: #timeSlotBetweenConnectionRetriesInMs ifAbsent: [300])
]

{ #category : 'private - connecting' }
RabbitMQPublisher >> try: aBlock onConnectivityErrorDo: failBlock [

Retry
value: aBlock
configuredBy: [:retry |
retry
upTo: self retryCount;
backoffExponentiallyWithTimeSlot: self timeframeBetweenAttempts;
on: self connectivityErrors evaluating: failBlock]
]
Loading

0 comments on commit f12e539

Please sign in to comment.