This is a wrapper for amqp.node with support for node-amqp API options.
See https://github.com/postwait/node-amqp for details.
See also http://www.squaremobius.net/amqp.node/doc/channel_api.html
To install in your project, run the following command:
npm install --save git://github.com/tagular/node-rabbit-wrap.git
var rabbit = require('rabbit-wrapper');
var connectOpts = {};
var connection = rabbit('amqp://localhost:5672', connectOpts).connect();
//publish a message
connection.exchange('this.is.my.exchange', {type: 'direct', autoDelete: true})
.send('this.is.my.routing.key', {my: 'message', goes: 'here'})
//consume messages
connection.exchange('this.is.my.exchange', {type: 'direct', autoDelete: true})
.queue('this.is.a.queue')
//bind queue to routing key
.bindQueue('this.is.my.exchange', 'this.is.my.routing.key')
.listen({ack: true}, function (msg, ack, headers, fields) {
do.some.stuff.with.my.message(msg);
//want to acknowledge the message? just call `ack`
ack();
//rejecting the message? just pass false to `ack`
ack(false);
//want to reject and requeue? pass true as the second param to `ack`
ack(false, true);
})
You must pass this an amqp URI, documentation for which you is available on the rabbitmq website. Your URIs will look like, e.g. amqp://my.rabbit.server.com:5672
.
heartbeat
(default0
) – interval of the connection heartbeat in secondsnoDelay
(defaultfalse
) - if true, turns on TCP_NODELAY (i.e. Nagle's algorithm) for the underlying socketreconnect
(defaulttrue
) – if true, the wrapper will attempt to reconnect to rabbit on connection failures (i.e. connection error events)
For more options, see the amqp.node api docs.
Opens the connection and calls the callback provided, if any.
Declares a new exchange. Calls your cb
when the operation is complete (or has failed!)
Returns a new Exchange
object.
type
(required) [String
] – sets your exchange type (i.e. direct, topic, fanout, or head)ers)confirm
(defaultfalse
) [Boolean
] – whether to open the exchange on a confirming channel, which will cause rabbit to confirm publishesdurable
(defaulttrue
) [Boolean
] – whether the exchange will survive a rabbit restartautoDelete
(defaultfalse
) [Boolean
]) – whether rabbit will destroy the exchange after its number of bindings reaches zero
For more options, see the amqp.node api docs.
Declares a queue.
Returns a new Queue
object.
exclusive
(defaultfalse
) [Boolean
] – if true, makes the queue available only to the connection that created itdurable
(defaulttrue
) [Boolean
] – whether the queue will survive a rabbit restartautoDelete
(defaultfalse
) [Boolean
]) – whether rabbit will destroy the queue after its number of consumers reaches zeroarguments
[Object
] – additional arguments (e.g.x-expired
) that are specific to rabbitmq
Binds a queue to the given exchange, with the routing key specified in the binding
param
Adds a consumer for the queue. The listener
argument is the function that will be called on new messages. The cb
param is an optional callback that will run after the underlying consume
operation has completed.
ack
[Boolean
] – Sets whether your listener mustack
ornack
messageconsumerTag
[String
] – Sets a custom consumer tag, which can be helpful for identifying consumers in rabbitMQ's management API- priority
[
Number`* – Higher priority consumers will receive messages before lower priority consumers arguments
[Object
– Arbitrary arguments (see rabbitmq documentation for any ])
This parameter must be a function, and it will be given four parameters.
queue.listen({ack: true}, function (msg, ack, headers, fields) {
/* do stuff! */
})
msg
[Object
,Buffer
] – If the content type of the message isapplication/json
, this will be the decoded JSON message as whatever type it should be. If the content type is anything else, this will be aBuffer
that you can use for whatever purpose necessaryack
[Function
]ack()
– acknowledges messageack(false)
– rejects messageack(false, true)
– rejects and requeues message
headers
[Object
] – Message headersfields
[Object
] – Message fields, which have protocol info like the deliveryTag, exchange, and routing key
For more options, see the amqp.node api docs.
$ make test