Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

init rabbit mq #730

Merged
merged 1 commit into from
Jan 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/applications/mixcore/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using Mix.Lib.Middlewares;
using Mix.Shared.Services;
using Mix.Shared.Models.Configurations;
using Mix.Queue.Extensions;
var builder = MixCmsHelper.CreateWebApplicationBuilder(args);

if (builder.Environment.IsDevelopment())
Expand All @@ -25,7 +26,9 @@
var globalConfig = builder.Configuration.GetSection(MixAppSettingsSection.GlobalSettings)
.Get<GlobalSettingsModel>();
builder.Services.AddEndpointsApiExplorer();
builder.AddMixQueue();
builder.Services.AddMixServices(Assembly.GetExecutingAssembly(), builder.Configuration);

builder.Services.ApplyMigrations(globalConfig);
builder.Services.AddMixCors();
builder.Services.AddScoped<MixNavigationService>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@
},
"Mix": {
"ProjectId": ""
},
"RabitMqQueueSetting": {
"HostName": "localhost",
"UserName": "",
"Password": "",
"VHost": null,
"Port": 5672
}
}
}
2 changes: 1 addition & 1 deletion src/platform/mix.constant/Enums/MixQueueProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
public enum MixQueueProvider
{
GOOGLE,
KAFKA,
RABITMQ,
AWS,
AZURE,
MIX
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.ObjectPool;
using Mix.Mq.Lib.Models;
using Mix.Queue.Engines;

Expand All @@ -13,8 +14,9 @@ public MixBackgroundTaskPublisher(
IMemoryQueueService<MessageQueueModel> queueService,
IConfiguration configuration,
MixEndpointService mixEndpointService,
ILogger<MixBackgroundTaskPublisher> logger)
: base(TopicId, queueService, configuration, mixEndpointService, logger)
ILogger<MixBackgroundTaskPublisher> logger,
IPooledObjectPolicy<RabbitMQ.Client.IModel> rabbitMqObjectPolicy = null)
: base(TopicId, queueService, configuration, mixEndpointService, logger, rabbitMqObjectPolicy)
{
}
}
Expand Down
6 changes: 4 additions & 2 deletions src/platform/mix.library/Publishers/MixDbCommandPublisher.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.ObjectPool;
using Mix.Mq.Lib.Models;
using Mix.Queue.Engines;

Expand All @@ -13,8 +14,9 @@ public MixDbCommandPublisher(
IMemoryQueueService<MessageQueueModel> queueService,
IConfiguration configuration,
MixEndpointService mixEndpointService,
ILogger<MixDbCommandPublisher> logger)
: base(TopicId, queueService, configuration, mixEndpointService, logger)
ILogger<MixDbCommandPublisher> logger,
IPooledObjectPolicy<RabbitMQ.Client.IModel> rabbitMqObjectPolicy = null)
: base(TopicId, queueService, configuration, mixEndpointService, logger, rabbitMqObjectPolicy)
{
}
}
Expand Down
7 changes: 5 additions & 2 deletions src/platform/mix.library/Publishers/MixPublisher.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.ObjectPool;
using Mix.Mq.Lib.Models;
using Mix.Queue.Engines;
using Mix.Queue.Engines.MixQueue;
using Mix.RepoDb.Publishers;
using RabbitMQ.Client;

namespace Mix.Lib.Publishers
{
Expand All @@ -16,8 +18,9 @@ public MixPublisher(
IMemoryQueueService<MessageQueueModel> queueService,
IConfiguration configuration, IWebHostEnvironment environment,
MixEndpointService mixEndpointService,
ILogger<MixRepoDbPublisher> logger)
: base(topicId, queueService, configuration, mixEndpointService, logger)
ILogger<MixRepoDbPublisher> logger,
IPooledObjectPolicy<IModel> rabbitMqObjectPolicy = null)
: base(topicId, queueService, configuration, mixEndpointService, logger, rabbitMqObjectPolicy)
{
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.ObjectPool;
using Mix.Mq.Lib.Models;
using Mix.Queue.Engines;
using Mix.Queue.Engines.MixQueue;
Expand All @@ -15,8 +16,9 @@ public MixViewModelChangedPublisher(
IMemoryQueueService<MessageQueueModel> queueService,
IConfiguration configuration,
MixEndpointService mixEndpointService,
ILogger<MixViewModelChangedPublisher> logger)
: base(TopicId, queueService, configuration, mixEndpointService, logger)
ILogger<MixViewModelChangedPublisher> logger,
IPooledObjectPolicy<RabbitMQ.Client.IModel> rabbitMqObjectPolicy = null)
: base(TopicId, queueService, configuration, mixEndpointService, logger, rabbitMqObjectPolicy)
{
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.ObjectPool;
using Mix.Communicator.Models;
using Mix.Communicator.Services;
using Mix.Database.Entities.Account;
Expand Down Expand Up @@ -35,8 +36,9 @@ public MixBackgroundTaskSubscriber(
IPortalHubClientService portalHub,
MixDbEventService mixDbEventService,
IMemoryQueueService<MessageQueueModel> queueService,
ILogger<MixBackgroundTaskSubscriber> logger)
: base(TopicId, nameof(MixBackgroundTaskSubscriber), 20, serviceProvider, configuration, queueService, logger)
ILogger<MixBackgroundTaskSubscriber> logger,
IPooledObjectPolicy<RabbitMQ.Client.IModel> rabbitMqObjectPolicy = null)
: base(TopicId, nameof(MixBackgroundTaskSubscriber), 20, serviceProvider, configuration, queueService, logger, rabbitMqObjectPolicy)
{
PortalHub = portalHub;
MixDbEventService = mixDbEventService;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.ObjectPool;
using Mix.Communicator.Models;
using Mix.Communicator.Services;
using Mix.Database.Entities.MixDb;
Expand All @@ -26,8 +27,9 @@ public MixDbCommandSubscriber(
IServiceProvider serviceProvider,
IConfiguration configuration,
IMemoryQueueService<MessageQueueModel> queueService,
ILogger<MixDbCommandSubscriber> logger)
: base(TopicId, nameof(MixDbCommandSubscriber), 20, serviceProvider, configuration, queueService, logger)
ILogger<MixDbCommandSubscriber> logger,
IPooledObjectPolicy<RabbitMQ.Client.IModel> rabbitMqObjectPolicy = null)
: base(TopicId, nameof(MixDbCommandSubscriber), 20, serviceProvider, configuration, queueService, logger, rabbitMqObjectPolicy)
{
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.ObjectPool;
using Mix.Heart.Model;
using Mix.Lib.Interfaces;
using Mix.Lib.Subscribers.Handlers.MixViewModelChangedHandlers;
Expand All @@ -20,8 +21,9 @@ public MixViewModelChangedSubscriber(
IConfiguration configuration,
IMixTenantService mixTenantService,
IMemoryQueueService<MessageQueueModel> queueService,
ILogger<MixViewModelChangedSubscriber> logger)
: base(TopicId, nameof(MixDbCommandSubscriber), 20, serviceProvider, configuration, queueService, logger)
ILogger<MixViewModelChangedSubscriber> logger,
IPooledObjectPolicy<RabbitMQ.Client.IModel> rabbitMqObjectPolicy = null)
: base(TopicId, nameof(MixDbCommandSubscriber), 20, serviceProvider, configuration, queueService, logger, rabbitMqObjectPolicy)
{
_mixTenantService = mixTenantService;
}
Expand Down
6 changes: 4 additions & 2 deletions src/platform/mix.log/Publishers/MixLogPublisher.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.ObjectPool;
using Mix.Constant.Constants;
using Mix.Mq.Lib.Models;
using Mix.Queue.Engines;
Expand All @@ -17,8 +18,9 @@ public MixLogPublisher(
IMemoryQueueService<MessageQueueModel> queueService,
IConfiguration configuration,
MixEndpointService mixEndpointService,
ILogger<MixLogPublisher> logger)
: base(TopicId, queueService, configuration, mixEndpointService, logger)
ILogger<MixLogPublisher> logger,
IPooledObjectPolicy<RabbitMQ.Client.IModel>? rabbitMqObjectPolicy = null)
: base(TopicId, queueService, configuration, mixEndpointService, logger, rabbitMqObjectPolicy)
{
}
}
Expand Down
6 changes: 4 additions & 2 deletions src/platform/mix.log/Subscribers/MixLogSubscriber.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.ObjectPool;
using Mix.Constant.Constants;
using Mix.Constant.Enums;
using Mix.Database.Entities.Queue;
Expand Down Expand Up @@ -42,8 +43,9 @@ public MixLogSubscriber(
IMemoryQueueService<MessageQueueModel> queueService,
IMixQueueLog queueMessageLogService,
IAuditLogService auditLogService,
ILogger<MixLogSubscriber> logger)
: base(TopicId, nameof(MixLogSubscriber), 20, serviceProvider, configuration, queueService, logger)
ILogger<MixLogSubscriber> logger,
IPooledObjectPolicy<RabbitMQ.Client.IModel> rabbitMqObjectPolicy = null)
: base(TopicId, nameof(MixLogSubscriber), 20, serviceProvider, configuration, queueService, logger, rabbitMqObjectPolicy)
{
_queueMessageLogService = queueMessageLogService;
_portalHub = portalHub;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ internal class AzureQueuePublisher<T> : IQueuePublisher<T>
private static ServiceBusClient _client;
private readonly AzureQueueSetting _queueSetting;

public AzureQueuePublisher(QueueSetting queueSetting, string topicName)
public AzureQueuePublisher(IQueueSetting queueSetting, string topicName)
{
_queueSetting = queueSetting as AzureQueueSetting;
InitializeQueue(topicName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ internal class AzureQueueSubscriber<T> : IQueueSubscriber
private readonly AzureQueueSetting _queueSetting;
private readonly Func<T, Task> _messageHandler;
public AzureQueueSubscriber(
QueueSetting queueSetting,
IQueueSetting queueSetting,
string topicId,
string subscriptionId,
Func<T, Task> messageHandler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ internal class GoogleQueuePublisher<T> : IQueuePublisher<T>
private PublisherClient _publisher;
private readonly GoogleQueueSetting _queueSetting;

public GoogleQueuePublisher(QueueSetting queueSetting, string topicName)
public GoogleQueuePublisher(IQueueSetting queueSetting, string topicName)
{
_queueSetting = queueSetting as GoogleQueueSetting;
InitializeQueue(topicName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ internal class GoogleQueueSubscriber<T> : IQueueSubscriber
private readonly Func<T, Task> _messageHandler;

public GoogleQueueSubscriber(
QueueSetting queueSetting,
IQueueSetting queueSetting,
string topicId,
string subscriptionId,
Func<T, Task> messageHandler)
Expand Down
2 changes: 1 addition & 1 deletion src/platform/mix.queue/Engines/Mix/MixQueuePublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class MixQueuePublisher<T> : IQueuePublisher<T>
private readonly MixEndpointService _mixEndpointService;
private GrpcChannelModel<MixMq.MixMqClient> _mixMqPublisher;

public MixQueuePublisher(QueueSetting queueSetting, string topicName, MixEndpointService mixEndpointService)
public MixQueuePublisher(IQueueSetting queueSetting, string topicName, MixEndpointService mixEndpointService)
{
_topicId = topicName;
_mixEndpointService = mixEndpointService;
Expand Down
2 changes: 1 addition & 1 deletion src/platform/mix.queue/Engines/Mix/MixQueueSubscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
public string SubscriptionId { get; set; }
public bool IsProcessing { get; private set; }
private readonly string _subscriptionId;
private MixTopicModel<T> _topic;

Check warning on line 23 in src/platform/mix.queue/Engines/Mix/MixQueueSubscriber.cs

View workflow job for this annotation

GitHub Actions / build

The field 'MixQueueSubscriber<T>._topic' is never used
private readonly MixQueueSetting _queueSetting;
private readonly Func<T, Task> _messageHandler;
private readonly IMemoryQueueService<MessageQueueModel> _memQueues;
Expand All @@ -28,9 +28,9 @@
private GrpcChannelModel<MixMq.MixMqClient> _mixMqSubscriber;
private SubscribeRequest _subscribeRequest;
private AsyncServerStreamingCall<SubscribeReply> _call;
private CancellationToken _startCancellationToken;

Check warning on line 31 in src/platform/mix.queue/Engines/Mix/MixQueueSubscriber.cs

View workflow job for this annotation

GitHub Actions / build

The field 'MixQueueSubscriber<T>._startCancellationToken' is never used
public MixQueueSubscriber(
QueueSetting queueSetting,
IQueueSetting queueSetting,
string topicId,
string subscriptionId,
Func<T, Task> messageHandler,
Expand Down
21 changes: 16 additions & 5 deletions src/platform/mix.queue/Engines/PublisherBase.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.ObjectPool;
using Mix.Heart.Exceptions;
using Mix.Mq.Lib.Models;
using Mix.Queue.Engines.MixQueue;
using Mix.Queue.Interfaces;
using Mix.Queue.Models.QueueSetting;
using Mix.Shared.Services;
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
Expand All @@ -25,22 +27,25 @@ public abstract class PublisherBase : BackgroundService
private readonly string _topicId;
private MixQueueProvider _provider;
protected ILogger<PublisherBase> _logger;
private readonly IPooledObjectPolicy<IModel> _rabbitMqObjectPolicy;
protected PublisherBase(
string topicId,
IMemoryQueueService<MessageQueueModel> queueService,
IConfiguration configuration,
MixEndpointService mixEndpointService,
ILogger<PublisherBase> logger)
ILogger<PublisherBase> logger,
IPooledObjectPolicy<IModel> rabbitMqObjectPolicy)
{
_queueService = queueService;
_configuration = configuration;
_topicId = topicId;
_mixEndpointService = mixEndpointService;
_logger = logger;
_rabbitMqObjectPolicy = rabbitMqObjectPolicy;
}

private List<IQueuePublisher<MessageQueueModel>> CreatePublisher(
string topicName)
string topicId)
{
try
{
Expand All @@ -62,7 +67,7 @@ private List<IQueuePublisher<MessageQueueModel>> CreatePublisher(

queuePublishers.Add(
QueueEngineFactory.CreatePublisher<MessageQueueModel>(
_provider, azureSetting, topicName, _mixEndpointService));
_provider, azureSetting, topicId, _mixEndpointService));
break;
case MixQueueProvider.GOOGLE:
var googleSettingPath = _configuration.GetSection("MessageQueueSetting:GoogleQueueSetting");
Expand All @@ -72,16 +77,22 @@ private List<IQueuePublisher<MessageQueueModel>> CreatePublisher(

queuePublishers.Add(
QueueEngineFactory.CreatePublisher<MessageQueueModel>(
_provider, googleSetting, topicName, _mixEndpointService));
_provider, googleSetting, topicId, _mixEndpointService));
break;

case MixQueueProvider.RABITMQ:
queuePublishers.Add(
QueueEngineFactory.CreateRabbitMqPublisher<MessageQueueModel>(_rabbitMqObjectPolicy, topicId));
break;

case MixQueueProvider.MIX:
if (_mixEndpointService.MixMq != null)
{
var mixSettingPath = _configuration.GetSection("MessageQueueSetting:Mix");
var mixSetting = new MixQueueSetting();
mixSettingPath.Bind(mixSetting);
queuePublishers.Add(
QueueEngineFactory.CreatePublisher<MessageQueueModel>(_provider, mixSetting, topicName, _mixEndpointService));
QueueEngineFactory.CreatePublisher<MessageQueueModel>(_provider, mixSetting, topicId, _mixEndpointService));
}
break;
}
Expand Down
Loading
Loading