Skip to content

Commit

Permalink
Merge pull request #4 from byerlikaya/refactor-and-improvements
Browse files Browse the repository at this point in the history
Refactor and improvements
  • Loading branch information
byerlikaya authored Jun 27, 2024
2 parents ed67619 + f243d6d commit d170dca
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 23 deletions.
4 changes: 2 additions & 2 deletions sample/Consumer.Sample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@

return;

Task BasicConsumerReceived(object sender, BasicDeliverEventArgs @event)
async Task BasicConsumerReceived(object sender, BasicDeliverEventArgs @event)
{
var message = Encoding.UTF8.GetString(@event.Body.ToArray());

Console.WriteLine($" [>] Received {message}");

channel?.BasicAck(@event.DeliveryTag, false);

return Task.CompletedTask;
await Task.Yield();
}
11 changes: 9 additions & 2 deletions sample/Producer.Sample.Api/Controllers/ProducerController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ namespace Producer.Sample.Api.Controllers;
// ReSharper disable once HollowTypeName
public class ProducerController(IMessageProducer messageProducer) : ControllerBase
{
[HttpPost("/sendMessage")]
public async Task<IActionResult> SendMessage(string message)
[HttpPost("/sendMessageMultiple")]
public async Task<IActionResult> SendMessageMultiple(string message)
{
await Parallel.ForAsync(0, 10000, (x, _) =>
{
Expand All @@ -15,4 +15,11 @@ await Parallel.ForAsync(0, 10000, (x, _) =>

return Ok();
}

[HttpPost("/sendMessage")]
public Task<IActionResult> SendMessage(string message)
{
messageProducer.SendMessage("Test_Queue", "Test_Routing_Key", $"{message}");
return Task.FromResult<IActionResult>(Ok());
}
}
2 changes: 1 addition & 1 deletion src/Basic.RabbitMQ/Basic.RabbitMQ.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<PackageLicenseFile>LICENSE</PackageLicenseFile>
<Description>.NET Core library that simplifies RabbitMQ usage and works with the Direct Exchange type.</Description>
<PackageId>Basic.RabbitMQ</PackageId>
<Version>2.0.0</Version>
<Version>2.0.0.1</Version>
<Product>Basic.RabbitMQ</Product>
<LangVersion>preview</LangVersion>
</PropertyGroup>
Expand Down
12 changes: 8 additions & 4 deletions src/Basic.RabbitMQ/Extensions/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,32 +30,36 @@ private static void CreateServices(
VirtualHost = messageBrokerOptions.VirtualHost,
DispatchConsumersAsync = true,
AutomaticRecoveryEnabled = true,
NetworkRecoveryInterval = TimeSpan.FromSeconds(30)
NetworkRecoveryInterval = TimeSpan.FromSeconds(30),
ClientProvidedName = "Basic.RabbitMQ"
});

services.AddSingleton<RabbitMqClientService>();
services.AddSingleton<IMessageConsumer, MessageConsumer>();

CreateConsumerService(services, messageConsumerServiceLifetime);
CreateConsumerAndProducerService(services, messageConsumerServiceLifetime);
}

private static void CreateConsumerService(
private static void CreateConsumerAndProducerService(
IServiceCollection services,
ServiceLifetime messageConsumerServiceLifetime)
{
switch (messageConsumerServiceLifetime)
{
case ServiceLifetime.Singleton:
services.AddSingleton<IMessageProducer, MessageProducer>();
services.AddSingleton<IMessageConsumer, MessageConsumer>();
break;
case ServiceLifetime.Scoped:
services.AddScoped<IMessageProducer, MessageProducer>();
services.AddScoped<IMessageConsumer, MessageConsumer>();
break;
case ServiceLifetime.Transient:
services.AddTransient<IMessageProducer, MessageProducer>();
services.AddTransient<IMessageConsumer, MessageConsumer>();
break;
default:
services.AddSingleton<IMessageProducer, MessageProducer>();
services.AddSingleton<IMessageConsumer, MessageConsumer>();
break;
}
}
Expand Down
13 changes: 9 additions & 4 deletions src/Basic.RabbitMQ/MessageConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
namespace Basic.RabbitMQ;

public class MessageConsumer(RabbitMqClientService rabbitMqClientService) : IMessageConsumer
public class MessageConsumer(
RabbitMqClientService rabbitMqClientService,
ConnectionFactory connectionFactory) : IMessageConsumer, IDisposable
{
private IConnection _connection;

public IModel Channel(string queueName, string routingKey, ushort prefetchCount = 1)
{
var channel = rabbitMqClientService.Connect(queueName);
_connection ??= connectionFactory.CreateConnection();

var channel = rabbitMqClientService.Connect(_connection, queueName);
channel.QueueBind(
queue: queueName,
exchange: rabbitMqClientService.BrokerOptions.ExchangeName,
routingKey: routingKey,
arguments: null);

channel.BasicQos(0, prefetchCount, false);

return channel;
}

Expand All @@ -28,4 +31,6 @@ public AsyncEventingBasicConsumer GetConsumer(IModel channel)

return consumer;
}

public void Dispose() => _connection?.Dispose();
}
11 changes: 1 addition & 10 deletions src/Basic.RabbitMQ/Services/RabbitMQClientService.cs
Original file line number Diff line number Diff line change
@@ -1,18 +1,9 @@
namespace Basic.RabbitMQ.Services;

public class RabbitMqClientService(IConfiguration configuration, ConnectionFactory connectionFactory)
public class RabbitMqClientService(IConfiguration configuration)
{
public readonly MessageBrokerOptions BrokerOptions = configuration.GetSection(nameof(MessageBrokerOptions)).Get<MessageBrokerOptions>();

private IConnection _connection;

public IModel Connect(string queueName)
{
if (_connection is not { IsOpen: true })
_connection = connectionFactory.CreateConnection();
return CreateChannel(_connection, queueName);
}

public IModel Connect(
IConnection connection,
string queueName) => CreateChannel(connection, queueName);
Expand Down

0 comments on commit d170dca

Please sign in to comment.