The api is inspired by the amusing amqplib
api reference.
- API reference
new Broker([owner])
broker.subscribe(exchangeName, pattern, queueName, onMessage[, options])
broker.subscribeTmp(exchangeName, pattern, onMessage[, options])
broker.subscribeOnce(exchangeName, pattern, onMessage[, options])
broker.unsubscribe(queueName, onMessage)
broker.publish(exchangeName, routingKey[, content, options])
broker.close()
broker.assertExchange(exchangeName[, type = topic, options])
broker.deleteExchange(exchangeName[, ifUnused])
broker.bindExchange(source, destination[, pattern, args])
broker.unbindExchange(source, destination[, pattern])
broker.assertQueue(queueName[, options])
broker.bindQueue(queueName, exchangeName, pattern[, options])
broker.unbindQueue(queueName, exchangeName, pattern)
broker.consume(queueName, onMessage[, options])
broker.cancel(consumerTag[, requeue = true])
broker.createQueue(queueName[, options])
broker.deleteQueue(queueName[, {ifUnused, ifEmpty}])
broker.getExchange(exchangeName)
broker.getQueue(queueName)
broker.getConsumers()
broker.getConsumer(consumerTag)
broker.getState([onlyWithContent])
broker.recover([state])
broker.purgeQueue(queueName)
broker.sendToQueue(queueName, content[, options])
broker.stop()
broker.get(queueName[, options])
broker.ack(message[, allUpTo])
broker.ackAll()
broker.nack(message[, allUpTo, requeue])
broker.nackAll([requeue])
broker.reject(message[, requeue])
broker.createShovel(name, source, destination[, options])
broker.getShovel(name)
broker.closeShovel(name)
broker.on(eventName, callback[, options])
broker.off(eventName, callbackOrObject)
broker.prefetch(count)
broker.reset()
- Exchange
exchange.bindQueue(queue, pattern[, bindOptions])
exchange.close()
exchange.emit(eventName[, content])
exchange.getBinding(queueName, pattern)
exchange.getState()
exchange.on(pattern, handler[, consumeOptions])
exchange.off(pattern, handlerOrObject)
exchange.publish(routingKey[, content, properties])
exchange.recover([state, getQueue])
exchange.stop()
exchange.unbindQueue(queue, pattern)
exchange.unbindQueueByName(queueName)
exchange.closeBinding(binding)
- Binding
- Queue
queue.ack(message[, allUpTo])
queue.ackAll()
queue.assertConsumer(onMessage[, consumeOptions, owner])
queue.cancel(consumerTag[, requeue = true])
queue.close()
queue.consume(onMessage[, options, owner])
queue.delete([deleteOptions])
queue.dismiss(onMessage[, requeue = true])
queue.get([consumeOptions])
queue.getState()
queue.nack(message[, allUpTo, requeue = true])
queue.nackAll([requeue = true])
queue.off(eventName, handler)
queue.on(eventName, handler)
queue.peek([ignoreDelivered])
queue.purge()
queue.queueMessage(fields[, content, properties])
queue.recover([state])
queue.reject(message[, requeue = true])
queue.stop()
queue.unbindConsumer(consumer[, requeue = true])
- Consumer
- Message
- SmqpError
getRoutingKeyPattern(pattern)
- Message eviction
Start new broker owned by optional owner
.
Properties:
exchangeCount
: number of exchangesqueueCount
: number of queuesconsumerCount
: number of consumers
Asserts an exchange, a named queue, and returns consumer to the named queue. The consumer is asserted into existance as well, i.e. message callback and options are matched.
To make sure the exchange, and or queue has the desired behaviour, please use assertExchange()
and assertQueue()
exchangeName
: exchange namepattern
: queue binding patternqueueName
: queue nameonMessage
: message callbackoptions
:autoDelete
: boolean, defaults totrue
, exchange will be deleted when all bindings are removed; the queue will be removed when all consumers are downconsumerTag
: unique consumer tagdeadLetterExchange
: string, name of dead letter exchange. Will be asserted as topic exchangedurable
: boolean, defaults totrue
, makes exchange and queue durable, i.e. will be returned when getting stateexclusive
: boolean, queue is exclusively consumednoAck
: boolean, set totrue
if there is no need to acknowledge messageprefetch
: integer, defaults to1
, number of messages to consume at a timepriority
: integer, defaults to0
, higher value gets messages first
The message callback signature:
import { Broker } from 'smqp';
const owner = { name: 'me' };
const broker = Broker(owner);
broker.subscribe('events', '#', 'event-queue', onMessage);
broker.publish('events', 'start', { arg: 1 });
function onMessage(routingKey, message, brokerOwner) {
console.log('received:', routingKey);
console.log('with message:', message);
console.log('owned by:', brokerOwner.name);
message.ack();
}
Asserts exchange and creates a temporary queue with random name, i.e. not durable, and returns a new consumer.
exchangeName
: exchange namepattern
: queue binding patternonMessage
: message callbackoptions
:autoDelete
: boolean, defaults totrue
, exchange will be deleted when all bindings are removed; the queue will be removed when all consumers are downconsumerTag
: unique consumer tagdeadLetterExchange
: string, name of dead letter exchange. Will be asserted as topic exchangedurable
: set tofalse
with no option to overridenoAck
: boolean, set totrue
if there is no need to acknowledge messageprefetch
: integer, defaults to1
, number of messages to consume at a timepriority
: integer, defaults to0
, higher value gets messages first
Same as subscribeTmp
and will immediately close consumer when first message arrive.
exchangeName
: exchange namepattern
: queue binding patternonMessage
: message callbackoptions
:consumerTag
: unique consumer tagpriority
: integer, defaults to0
, higher value gets messages first
Oh, btw, option noAck
will be set to true
so there is no need to ack message in message callback.
Remove consumer with message callback from queue.
Publish message to exchange.
Arguments:
exchangeName
: exchange nameroutingKey
: routing keycontent
: message contentoptions
: optional message optionsmandatory
: boolean indicating if message is mandatory. Valuetrue
emitsreturn
if not routed to any queuepersistent
: boolean indicating if message is persistent, defaults to undef (true). Valuefalse
ignores the message when queue is recovered from stateexpiration
: integer, expire message after milliseconds, see Message Evictionconfirm
: boolean, confirm message delivered, emitsmessage.nack
,message.ack
, ormessage.undelivered
on broker
Close exchanges, queues, and all consumers
Creates exchange with name.
type
: type of exchange, must be one oftopic
ordirect
, defaults totopic
.options
:durable
: boolean, defaults totrue
, makes queue durable, i.e. will be returned when getting stateautoDelete
: boolean, defaults totrue
, the exchange will be removed when all bindings are gone
Returns Exchange.
Delete exchange by name
Shovel messages between exchanges aka e2e binding.
Arguments:
source
: source exchange namedestination
: destination exchange namepattern
: optional binding pattern, defaults to all (#
)args
: Optional options objectpriority
: optional binding prioritycloneMessage
: clone message function called with shoveled message
Returns:
name
: name of e2e bindingsource
: source exchange namedestination
: destination exchange namepattern
: patternqueue
: name of source e2e queueconsumerTag
: consumer tag for temporary source e2e queueon(eventName, handler)
: listen for shovel events, returns event consumerclose()
: close e2e binding
Close e2e binding.
Arguments:
source
: source exchange namedestination
: destination exchange namepattern
: optional binding pattern, defaults to all (#
)
Assert a queue into existence.
options
: optional queue optionsdurable
: boolean, defaults totrue
, makes queue durable, i.e. will be returned when getting stateautoDelete
: boolean, defaults totrue
, the queue will be removed when all consumers are downdeadLetterExchange
: string, name of dead letter exchange. Will be asserted as topic exchange if non-existingmessageTtl
: integer, expire message after milliseconds, see Message Eviction
Bind queue to exchange with routing key pattern.
queueName
: queue nameexchangeName
: exchange namepattern
: queue binding patternoptions
: binding optionspriority
: integer, defaults to0
, higher value gets messages first
Returns Binding
Unbind queue from exchange that match routing key pattern.
queueName
: queue nameexchangeName
: exchange namepattern
: queue binding pattern
Consume queue. Returns a consumer. If the message callback is already used for consumption, the existing consumer will be returned.
queueName
: queue nameonMessage
: message callbackoptions
: optional consume optionsconsumerTag
: optional consumer tag, one will be generated for you if you don's supply one, if you do supply one it must be uniqueexclusive
: boolean, consume queue exclusively, defaults tofalse
noAck
: boolean, defaults tofalse
prefetch
: integer, defaults to1
, number of messages to consume at a timepriority
: integer, defaults to0
, higher value gets messages first
Returns consumer.
Cancel consumption by consumer tag.
consumerTag
: consumer tagrequeue
: optional boolean to requeue messages consumed by consumer
Returns true if consumer tag was found, and consequently false if not.
Create queue with name. Throws if queue already exists.
Delete queue by name.
- options
ifUnused
: delete if no consumers, defaults to falseifEmpty
: delete if no messages, defaults to false
Get exchange by name.
Get queue by name. Returns existing queue or nothing
Returns a list of consumer properties, i.e. queue name, consumer tag, and options.
Get consumer by consumer tag. Returns existing consumer or nothing
Return serializable object containing durable exchanges, bindings, and durable queues with messages.
onlyWithContent
: boolean indicating that only exchanges and queues with undelivered or queued messages will be returned
Recovers exchanges, bindings, and queues with messages. A state may be passed, preferably from getState()
.
Purge queue by name if found. Removes all non consumed messages.
Send message directly to queue, bypassing routing key patterns etc.
No more messages through this broker, i.e. publish will be ignored. Use broker.recover()
to resume.
Get message from queue. Returns false if there are no messages to be retrieved. Returns undefined if the queue is not found.
Arguments:
queueName
: name of queueoptions
: optional object with optionsnoAck
: optional boolean, defaults tofalse
Ack consumed message.
allUpTo
: optional boolean, ack all outstanding messages on owning queue
Acknowledge all outstanding messages.
Nack consumed message.
allUpTo
: optional boolean, nack all outstanding messages on owning queuerequeue
: optional boolean, requeue messages, defaults to true
Nack all outstanding messages.
Same as broker.nack(message, false, requeue)
Shovel messages from exchange to another broker exchange.
NB! Shovels are not recovered, the source exchange and queue may be recoverable depending on how they were created. Messages are ignored if the destination exchange lacks bound queues, to save cpu etc.
Arguments:
name
: mandatory name of shovelsource
: source optionsexchange
: source exchange namepattern
: optional binding pattern, defaults to all (#
)queue
: optional queue name, defaults to temporary autodeleted queuepriority
: optional binding priorityconsumerTag
: optional consumer tag, defaults to composed consumer tag
destination
: destination broker optionsbroker
: destination broker instanceexchange
: destination exchange name, must be asserted into existance before shovel is createdexchangeKey
: optional destination exchange key, defaults to original message's routing keypublishProperties
: optional object with message properties to overwrite when shovelling messages, applied afteroptions.cloneMessage
function
options
: Optional options objectcloneMessage(message) => message
: clone message function called with shoveled message, must return new message, altough fields are ignored completely. Known to be used to clone the message content to make sure no references to the old message is traversed.
Returns Shovel:
name
: name of shovelsource
: input source optionsdestination
: input destination broker optionsqueue
: name of queue, added if not provided when creating shovel
consumerTag
: consumer tag for source shovel queueon(eventName, handler)
: listen for shovel events, returns event consumerclose()
: close shovel and cancel source consumer tag
Shovel is closed if either source- or destination exchange is closed, or source consumer is canceled.
Get shovel by name.
Close shovel by name.
Listen for events from Broker.
Arguments:
eventName
: name of event or a "routingKey" patterncallback
: event callbackoptions
: optional consume optionsconsumerTag
: optional event consumer tag
Returns consumer - that can be canceled.
Callback is called with the event and the name of the event, in the same object.
import { Broker } from 'smqp';
const broker = new Broker();
broker.on(
'message.*',
(event) => {
console.log(event.name, 'fired');
},
{ consumerTag: 'my-event-consumertag' },
);
Turn off event listener(s) associated with event callback.
Arguments:
eventName
: name of eventcallbackOrObject
: event callback function to off or object with basically one property:consumerTag
: optional event consumer tag to off
import { Broker } from 'smqp';
const broker = new Broker();
broker.assertExchange('event', 'topic');
broker.on('return', onMessageEvent, { consumerTag: 'my-event-consumertag' });
function onMessageEvent(event) {
console.log(event.name, 'fired');
}
broker.publish('event', 'error.1', 'message', { mandatory: true });
/* later */
broker.off('return', onMessageEvent);
broker.publish('event', 'error.2', 'message', { mandatory: true });
/* or */
broker.off('return', { consumerTag: 'my-event-consumertag' });
Noop, only placeholder.
Reset everything. Deletes exchanges, queues, consumers, and bindings.
Exchange
Properties:
name
: exchange nametype
: exchange type, topic or directoptions
: exchange optionsbindingCount
: getter for number of bindingsbindings
: getter for list of bindingsstopped
: boolean for if the exchange is stopped
Bind queue to exchange.
Arguments:
queue
: queue instancepattern
: binding patternbindOptions
: optional binding optionspriority
: defaults to 0
Close exchange and all bindings.
Get binding to queue by name and with pattern.
Get recoverable exchange state.
Listen for exchange events.
Arguments:
pattern
: event patternhandler
: event handler functionconsumeOptions
: optional consume optionsconsumerTag
: optional event consumer tag
Stop consuming events from exchange.
pattern
: event patternhandlerOrObject
: handler function to off or object with basically one property:consumerTag
: optional event consumer tag to off
Publish message on exchange.
Recover exchange.
state
: optional object with exchange state, preferably fromexchange.getState()
. NB! state name and type is ignoredgetQueue
: mandatory function if state.binding is passed, to recover bindings a queue is required, this function should return such by name
Unbind queue from exchange.
Arguments:
queue
: queue instancepattern
: binding pattern
Remove all bindings to queue by queue name.
Close binding.
Arguments:
binding
: Binding instance
Exchange to queue binding
Properties:
id
: exchange binding idoptions
: binding optionspattern
: binding patternexchange
: exchange instancequeue
: queue instance
Test routing key against binding pattern
Close binding
Queue
Properties:
name
: queue nameoptions
: queue optionsmessages
: actual messages array, probably a good idea to not mess with, but it's theremessageCount
: message countconsumerCount
: consumer countstopped
: is stoppedexclusive
: is exclusively consumedmaxLength
: get or set max length of queuecapacity
:maxLength - messageCount
messageTtl
: expire messages after milliseconds, see Message Eviction
Ack message.
Ack all outstanding messages.
Upsert consumer.
Cancel consumer with tag
consumerTag
: consumer tagrequeue
: optional boolean to requeue messages consumed by consumer
Returns true if consumer tag was found, and consequently false if not.
Closes queue consumers and requeues outstanding messages.
Consume queue messages.
onMessage(routingKey, message, owner)
: message callbackroutingKey
: message routing keymessage
: the messageowner
: optional owner passed in signature
options
: optional consume options, seebroker.consume
owner
: optional owner to be passed to message callback, mainly for internal use when consuming by broker but feel free to pass anything here
Returns consumer.
Delete queue.
Arguments:
deleteOptions
: Object with optionsifUnused
: boolean, delete if unusedifEmpty
: boolean, delete if empty
Returns:
messageCount
: number of messages deleted
Dismiss first consumer with matching onMessage
handler.
onMessage
: message handler functionrequeue
: optional boolean to requeue messages consumed by consumer
Same as broker.get
but you don't have to supply a queue name.
Get queue state.
Will throw a TypeError if messages contains circular JSON. The error will be decorated with code EQUEUE_STATE
and the name of the queue as queue
.
Stop listening for events from queue.
Listen for events from queue.
Events:
queue.consumer.cancel
: consumer was cancelledqueue.consume
: consumer was addedqueue.dead-letter
: message was dead-lettered, sendsdeadLetterExchange
name and messagequeue.delete
: queue was deletedqueue.depleted
: queue is depletedqueue.message
: message was queuedqueue.ready
: queue is ready to receive new messagesqueue.saturated
: queue is saturated, i.e. max capacity was reached
Peek into queue.
ignoreDelivered
: ignore if message was delivered or not
Removes all non consumed messages from queue.
Queue message.
fields
: object with fields, proposal:exchangeName
: exchange nameroutingKey
: routing key
content
: message contentproperties
: message properties, basic properties are:persistent
: boolean indicating if message is persistent, defaults to undef (true). Valuefalse
ignores the message when queue is recovered from state
Unbind consumer instance.
consumer
: consumer instancerequeue
: optional boolean to requeue messages consumed by consumer
Queue consumer
Properties:
options
: returns passed optionscapacity
: consumer message capacityconsumerTag
: consumer tagmessageCount
: current amount of messages handled by consumeronMessage
: message callbackqueueName
: consuming queue with nameready
: boolean indicating if the consumer is ready for messagesstopped
: is the consumer stopped
Ack all messages currently held by consumer
Nack all messages currently held by consumer
Cancel consumption and unsubscribe from queue
requeue
: optional boolean to requeue messages consumed by consumer
What it is all about - convey messages.
Properties:
fields
: message fieldsroutingKey
: routing key if anyredelivered
: message is redeliveredexchange
: published through exchangeconsumerTag
: consumer tag when consumed
content
: message contentproperties
: message properties, any number of properties can be set, known:messageId
: unique message idpersistent
: persist message, if unset queue option durable prevailstimestamp
:Date.now()
expiration
: Expire message after milliseconds
get pending()
: boolean indicating that the message is awaiting ack (true) or is acked/nacked (false)
Acknowledge message
allUpTo
: boolean, consider all messages prior to this one to be acknowledged as well
Reject message.
allUpTo
: optional boolean, consider all messages prior to this one to be rejected as wellrequeue
: optional boolean, requeue messages, defaults to true
NB! Beware of
requeue
argument since the message will immmediately be returned to queue and consumed, ergo an infinite loop and maximum call stack size exceeded error. Unless! some precautions are taken.
Same as nack(false, requeu)
requeue
: optional boolean, requeue message, defaults to true
throw SmqpError(message, code)
inherited from Error, it is thrown when package specific errors occur.
ERR_SMQP_CONSUMER_TAG_CONFLICT
: consumer tag is already taken, must be unique within the brokerERR_SMQP_EXCHANGE_TYPE_MISMATCH
: asserting an exchange with different type than existing exchange is not allowedERR_SMQP_EXCLUSIVE_CONFLICT
: consuming a queue that is exclusively consumed by someone else is not exclusiveERR_SMQP_EXCLUSIVE_NOT_ALLOWED
: attempting to exclusively consume a queue that already has consumers is not allowedERR_SMQP_QUEUE_DURABLE_MISMATCH
: asserting a queue that has different durable option than existing queue is not allowedERR_SMQP_QUEUE_NAME_CONFLICT
: creating a queue with the same name as existing queue throws this codeERR_SMQP_QUEUE_NOT_FOUND
: attempting to send a message to or consume a non-existing queue - KABLAM!ERR_SMQP_SHOVEL_DESTINATION_EXCHANGE_NOT_FOUND
: shovel destination exchange was not foundERR_SMQP_SHOVEL_NAME_CONFLICT
: a shovel with the same name already exists, suffix something, e.g._new
or come up with another nameERR_SMQP_SHOVEL_SOURCE_EXCHANGE_NOT_FOUND
: shovel source exchange was not found, a bit self-explanatoryEQUEUE_STATE
: legacy code and acually aTypeError
, will pop if queue messages has circular JSON when getting state. The queue culprit name is added to error as propertyerr.queue
Get routing key pattern tester. Test routing key pattern against routing key.
import { getRoutingKeyPattern } from 'smqp';
const pattern = getRoutingKeyPattern('activity.*');
console.log(pattern.test('activity.start')); // true
console.log(pattern.test('activity.execution.completed')); // false
About message eviction: There are no timeouts that will automatically evict expired messages. Expired messages will simply not be returned in the message callback when the queue is consumed. Use a dead letter exchange to pick them up.