Skip to content


Repository files navigation


License: MIT Nuget Nuget Build Status Quality Gate Status CodeQL Compile & Test .NET

Simple implementation of RabbitMQ consumer and sender.


  • Sender implementation
  • Multiple consumer instances supported
  • Multiple processing options for received messages
  • Random expiration for messages sent to an holding queue (depending on the processing option)
  • TLS connection support
  • Limiter for message processing
  • Message properties for more advanced scenarios such as queues with support for priority messages, messages Headers, etc.

More details available on the project website.


Download the package from NuGet:

Install-Package JN.RabbitMQClient -Version [version number]


First, you must create the RabbitMqConsumerService and then define delegates for ReceiveMessage, ShutdownConsumer and ReceiveMessageError. The service will start the required number of consumers when StartConsumers is called.

To use a retry queue, the method StartConsumers should be called with a RetryQueueDetails object.

Message processing instructions

OK - message is considered as successfully processed

RequeueMessageWithDelay - message is removed from the queue, but sent to a holding queue for later processing (typically with a dead letter configuration)

IgnoreMessage - message is removed from the queue and ignored

IgnoreMessageWithRequeue - message is rejected and sent back to the queue


Example for consumer and sender services:

    class Program
        static void Main(string[] args)
            Console.WriteLine("Hello World!");

            // consumer

            // you can also use extensions AddConsumersService() and AddSenderService() (in namespace JN.RabbitMQClient.Extensions)

            var consumerService = new RabbitMqConsumerService(GetBrokerConfigConsumers());

            consumerService.ReceiveMessage += ReceiveMessage;
            consumerService.ShutdownConsumer += ShutdownConsumer;
            consumerService.ReceiveMessageError += ReceiveMessageError;
            consumerService.MaxChannelsPerConnection = 5;
            consumerService.ConsumersPrefetch = 2;
            consumerService.ServiceDescription = "test consumer service";

            consumerService.StartConsumers("my consumer");
            // sender

            var senderService = new RabbitMqSenderService(GetBrokerConfigSender());

            IMessageProperties properties = new MessageProperties { Priority = 3 };

            senderService.Send("my message", properties);

            Console.WriteLine("Press any key to exit...");


        private static IBrokerConfigSender GetBrokerConfigSender()
            IBrokerConfigSender configSender = new BrokerConfigSender
                Username = "test",
                Password = "123",
                Host = hostName,
                VirtualHost = "MyVirtualHost",
                RoutingKeyOrQueueName = "MyTestQueue",
                KeepConnectionOpen = true
            return configSender;

        private static IBrokerConfigConsumers GetBrokerConfigConsumers()
            IBrokerConfigConsumers configConsumers = new BrokerConfigConsumers
                Username = "test",
                Password = "123",
                Host = hostName,
                VirtualHost = "MyVirtualHost",
                RoutingKeyOrQueueName = "MyTestQueue",
                ShuffleHostList = false,
                Port = 0,
                TotalInstances = 4
            return configConsumers;

        private static async Task ReceiveMessageError(string routingKeyOrQueueName, string consumerTag, string exchange, string message, string errorMessage)
            await Console.Out.WriteLineAsync($"Error: '{consumerTag}' | Queued message: {message} | Error message: {errorMessage}").ConfigureAwait(false);

        private static async Task ShutdownConsumer(string consumerTag, ushort errorCode, string shutdownInitiator, string errorMessage)
            await Console.Out.WriteLineAsync($"Shutdown '{consumerTag}' | {errorCode} | {shutdownInitiator} | {errorMessage}").ConfigureAwait(false);

        private static async Task<MessageProcessInstruction> ReceiveMessage(string routingKeyOrQueueName, string consumerTag, long firstErrorTimestamp, string exchange, string message, string additionalInfo, IMessageProperties properties)
            var priorityReceived = properties.Priority;

            var newPriority = (byte)(priorityReceived <= 3 ? 5 : priorityReceived);

            await Console.Out.WriteLineAsync($"Message received by '{consumerTag}' from queue '{routingKeyOrQueueName}': {message}; Priority received: {properties.Priority} ").ConfigureAwait(false);
            return new MessageProcessInstruction
                Value = Constants.MessageProcessInstruction.OK,
                Priority = newPriority,
                AdditionalInfo = "id: 123"