GenRMQ is a set of behaviours meant to be used to create RabbitMQ consumers and publishers.
Internally it is using the AMQP elixir RabbitMQ client. The idea is to reduce boilerplate consumer / publisher code, which usually includes:
- creating connection / channel and keeping it in a state
- creating and binding queue
- handling reconnections / consumer cancellations
GenRMQ provides the following functionality:
GenRMQ.Consumer
- a behaviour for implementing RabbitMQ consumers (example)GenRMQ.Publisher
- a behaviour for implementing RabbitMQ publishers (example)GenRMQ.Processor
- a behaviour for implementing RabbitMQ message processors (this is useful to separate out business logic from your consumer) (example)GenRMQ.Consumer.Telemetry
- telemetry events emitted by a GenRMQ consumerGenRMQ.Publisher.Telemetry
- telemetry events emitted by a GenRMQ publisherGenRMQ.RabbitCase
- test utilities for RabbitMQ (example)
def deps do
[{:gen_rmq, "~> 3.0.0"}]
end
Version 3.0.0
has been released. Please check how to migrate to gen_rmq 3.0.0
.
More thorough examples for using GenRMQ.Consumer
, GenRMQ.Publisher
, and GenRMQ.Processor
can be found under documentation.
defmodule Consumer do
@behaviour GenRMQ.Consumer
@impl GenRMQ.Consumer
def init() do
[
queue: "gen_rmq_in_queue",
exchange: "gen_rmq_exchange",
routing_key: "#",
prefetch_count: "10",
connection: "amqp://guest:guest@localhost:5672",
retry_delay_function: fn attempt -> :timer.sleep(2000 * attempt) end
]
end
@impl GenRMQ.Consumer
def consumer_tag() do
"test_tag"
end
@impl GenRMQ.Consumer
def handle_message(message) do
...
end
@impl GenRMQ.Consumer
def handle_error(message, _reason) do
GenRMQ.Consumer.reject(message, true)
end
end
GenRMQ.Consumer.start_link(Consumer, name: Consumer)
This will result in:
- durable
gen_rmq_exchange.deadletter
exchange created or redeclared - durable
gen_rmq_in_queue_error
queue created or redeclared. It will be bound togen_rmq_exchange.deadletter
- durable topic
gen_rmq_exchange
exchange created or redeclared - durable
gen_rmq_in_queue
queue created or redeclared. It will be bound togen_rmq_exchange
exchange and has a deadletter exchange set togen_rmq_exchange.deadletter
- every
handle_message
callback will be executed in a separate supervised Task. This can be disabled by settingconcurrency: false
ininit
callback - on failed rabbitmq connection it will wait for a bit and then reconnect
There are many options to control the consumer setup details, please check the c:GenRMQ.Consumer.init/0
docs for all available settings.
defmodule Publisher do
@behaviour GenRMQ.Publisher
def init() do
[
exchange: "gen_rmq_exchange",
connection: "amqp://guest:guest@localhost:5672"
]
end
end
GenRMQ.Publisher.start_link(Publisher, name: Publisher)
GenRMQ.Publisher.publish(Publisher, Jason.encode!(%{msg: "msg"}))
- Basic consumer setup
- Consumer with custom deadletter configuration
- Consumer with custom exchange type
- Consumer with custom queue configuration
- Consumer without deadletter configuration
- Consumer with quorum queues
You need docker-compose installed.
$ make test
Please see our Contribution Guidelines.
Are you using GenRMQ in Production? Please let us know, we are curious to learn about your experiences!
- Mateusz (@mkorszun)
The maintainers are responsible for the general project oversight, and empowering further trusted committers (see below).
The maintainers are the ones that create new releases of GenRMQ.
- Joel (@vorce)
- Sebastian (@spier)
- @Shemeikka
- Alexander (@akoutmos)
Trusted Committers are members of our community who we have explicitly added to our GitHub repository. Trusted Committers have elevated rights, allowing them to send in changes directly to branches and to approve Pull Requests. For details see TRUSTED-COMMITTERS.md.
Note: Maintainers and Trusted Committers are listed in .github/CODEOWNERS in order to automatically assign PR reviews to them.
The MIT License.
Copyright (c) Meltwater Inc. underthehood.meltwater.com