Ensemble adapter for RabbitMQ
-
Install java 1.8.
-
Get the JAR: build RabbitMQ-Ensemble-javaapi or download latest jar.
-
Create (or use any existing one) Java gateway. To create one go to SMP > System Administration > Configuration > Connectivity > Object Gateways. Remember the
Port
value. -
Start Java Gateway.
-
In a terminal in a target namespace execute:
Set class = "isc.rabbitmq.API" Set classPath = ##class(%ListOfDataTypes).%New() Do classPath.Insert(PATH-TO-JAR) Set gateway = ##class(%Net.Remote.Gateway).%New() Set sc = gateway.%Connect("localhost", PORT, $Namespace, 2, classPath) Set sc = gateway.%Import(class) Write ##class(%Dictionary.CompiledClass).%ExistsId(class)
-
In the namespace
isc.rabbitmq.API
class should be present and compiled. -
Import this project and compile.
-
For samples refer to the test production
isc.rabbitMQ.Production
.
Development is done on Cache-Tort-Git UDL fork
isc.rabbitmq
package provides all traditional Interoperability components: Inbound and Outbound Adapters and Operation/Service.
For RabbitMQ to function Java Gateway is required. It can be run in three different ways:
- As Interoperability Service
- As Object Gateway from SMP
- From OS bash
For Interoperability production we recommend running Java Gateway as Interoperability Service. To do that add new EnsLib.JavaGateway.Service
service to your production. In RabbitMQ production elements select Java Gateway Service name as a value for JGService
setting.
ClassPath for all these elements must contain the paths to amqp jar and RabbitMQ-Ensemble-javaapi jar.
Check isc.rabbitmq.Utils
for sample code. The main class is isc.rabbitmq.API
. It has the following methods.
Method | Arguments | Returns | Description |
---|---|---|---|
%OnNew | gateway, host, port, user, pass, virtualHost, queue, durable, exchange | api | Creates new connection to RabbitMQ |
sendMessage | msg, correlationId, messageId | null | Sends message to default queue (as specified in %OnNew) |
sendMessageToQueue | queue, msg, correlationId, messageId | null | Sends message to the specified queue |
readMessageString | - | props | Reads message from default queue. Returns list of message properties (including message text) |
readMessageStream | props | msg | Reads message from default queue. Returns message text as a stream and props is populated with a list of message properties |
close | - | - | Closes the connection |
Argument | Java type | InterSystems type | Value | Required | Description |
---|---|---|---|---|---|
gateway | - | %Net.Remote.Gateway | - | Yes | Connection to Java Gateway |
host | String | %String | localhost | Yes | Address of RabbitMQ server or amqp:\\ connection string |
port | int | %Integer | -1 | Yes | RabbitMQ listener port |
user | String | %String | guest | Yes | Username |
pass | String | %String | guest | Yes | Password |
virtualHost | String | %String | / | Yes | Virtual host |
queue | String | %String | Test | Yes | Queue name |
durable | int | %Integer | 1 | Required only if you want to create new queue | The queue will survive a server restart |
exchange | String | %String | Test | No | Exchange name. Provide only for sending messages. Modifies the meaning of queue property (if exchange is present, then queue means routing key) |
msg | byte[] | %GlobalBinaryStream | Text | Yes | Message body |
correlationId | String | %String | CorrelationId | Required only with messageId | Correlation identifier |
messageId | String | %String | MessageId | Required only with correlationId | Message identifier |
props | String[] | %ListOfDataTypes | - | Yes | List of message properties |
api | isc.rabbitmq.API | isc.rabbitmq.API | - | - | API object |
First you need a Java Gateway object (hereinafter: gateway
). For example you can get it using this method:
/// Get JGW object
ClassMethod Connect(gatewayName As %String, path As %String, Output sc As %Status) As %Net.Remote.Gateway
{
Set gateway = ""
Set sc = ##class(%Net.Remote.Service).OpenGateway(gatewayName, .gatewayConfig)
Quit:$$$ISERR(sc) gateway
Set sc = ##class(%Net.Remote.Service).ConnectGateway(gatewayConfig, .gateway, path, $$$YES)
Quit gateway
}
Where:
- gatewayName - is name of Java Gateway (it would be started automatically)
- path - path to JAR
Now that Java Gateway connection is established we can connect to RabbitMQ:
ClassMethod GetAPI(gateway As %Net.Remote.Gateway) As isc.rabbitmq.API
{
Set host = "localhost"
Set port = -1
Set user = "guest"
Set pass = "guest"
Set virtualHost = "/"
Set queue = "Test"
Set durable = $$$YES
Set api = ##class(isc.rabbitmq.API).%New(gateway, host, port, user, pass, virtualHost, queue, durable)
Quit api
}
All parameters are described in the table above. Note that durable
argument is used only if you're creating a new queue. If the queue alreay exists you should still provide it (0 or 1) but it won't be used.
Assuming you already have api
object, sending messages can be done by one of two ways:
- sending messages to default queue
- sending messages to specified queue
Default queue is a queue specified during creation of the api
object. To send a message just call sendMessage
or sendMessageId
:
#Dim api As isc.rabbitmq.API
#Dim msg As %GlobalBinaryStream
Do api.sendMessage(msg)
Do api.sendMessageId(msg, "correlationId", "messageId " _ $zdt($zts,3,1,3))
Where stream
is a message body. You can either provide both messageId
and correlationId
or non of them.
Everything is the same as above, except you call sendMessageToQueue
\ sendMessageToQueueId
method and the first argument is the name of the queue.
Messages are always read from the default queue (specified at creation of the api
object). Message body can be received as text or as a stream.
First you need to prepare props
to pass by reference into Java and then call readMessageStream
:
#Dim api As isc.rabbitmq.API
#Dim msg As %GlobalBinaryStream
#Dim props As %ListOfDataTypes
Set props = ##class(%ListOfDataTypes).%New()
For i=1:1:15 Do props.Insert("")
Set msg = api.readMessageStream(.props)
props
would be filled with message metainformation, and msg
is a stream containig message body.
#Dim api As isc.rabbitmq.API
#Dim props As %ListOfDataTypes
Set props = api.readMessageString()
props
would be filled with message metainformation and message body, note that you don't need to initialize it on the InterSystems side before calling RabbitMQ.
Elements appearing in props
. All of them besides first and second are optional. Conversion from list into RabbitMQ.Message
is available in the ListToMessage
method of RabbitMQ.InboundAdapter
class.
Position | Name | Description |
---|---|---|
1 | Message Length | Length of a current message |
2 | Message Count | Number of remaining messages |
3 | ContentType | |
4 | ContentEncoding | |
5 | CorrelationId | |
6 | ReplyTo | |
7 | Expiration | |
8 | MessageId | |
9 | Type | |
10 | UserId | |
11 | AppId | |
12 | ClusterId | |
13 | DeliveryMode | |
14 | Priority | |
15 | Timestamp | |
16 | Message body | If message is read as a string, this element would contain it |