From 109d7edc1e92c7ab6d5c5863056d562f79d7c670 Mon Sep 17 00:00:00 2001 From: ImoutoChan Date: Mon, 2 Oct 2023 01:41:42 +0400 Subject: [PATCH] Fix event processing --- .../Behaviors/EventProcessingBehavior.cs | 43 +++++++++++++++++++ .../Behaviors/TransactionBehavior.cs | 30 +++---------- .../ServiceCollectionExtensions.cs | 1 + .../EventStorage.cs | 19 +++++--- .../IEventStorage.cs | 6 ++- .../RequestMetadataUpdateCommand.cs} | 26 +++++------ .../PostsUpdatedDomainEventHandler.cs | 22 ++++++++++ .../UpdateRequestedDomainEventHandler.cs | 4 +- .../SourceMetadataRequester.cs | 3 +- .../Consumers/TagsUpdatedCommandConsumer.cs | 2 +- 10 files changed, 108 insertions(+), 48 deletions(-) create mode 100644 Source/ImoutoRebirth.Common/ImoutoRebirth.Common.Cqrs/Behaviors/EventProcessingBehavior.cs rename Source/ImoutoRebirth.Meido/ImoutoRebirth.Meido.Application/ParsingStatusSlice/{NotesUpdatedDomainEventHandler.cs => Commands/RequestMetadataUpdateCommand.cs} (54%) create mode 100644 Source/ImoutoRebirth.Meido/ImoutoRebirth.Meido.Application/ParsingStatusSlice/PostsUpdatedDomainEventHandler.cs diff --git a/Source/ImoutoRebirth.Common/ImoutoRebirth.Common.Cqrs/Behaviors/EventProcessingBehavior.cs b/Source/ImoutoRebirth.Common/ImoutoRebirth.Common.Cqrs/Behaviors/EventProcessingBehavior.cs new file mode 100644 index 00000000..1a27ec97 --- /dev/null +++ b/Source/ImoutoRebirth.Common/ImoutoRebirth.Common.Cqrs/Behaviors/EventProcessingBehavior.cs @@ -0,0 +1,43 @@ +using ImoutoRebirth.Common.Cqrs.Events; +using ImoutoRebirth.Common.Domain; +using MediatR; + +namespace ImoutoRebirth.Common.Cqrs.Behaviors; + +internal class EventProcessingBehavior + : IPipelineBehavior + where TRequest : notnull +{ + private readonly IEventPublisher _eventPublisher; + private readonly IEventStorage _eventStorage; + + public EventProcessingBehavior(IEventStorage eventStorage, IEventPublisher eventPublisher) + { + _eventPublisher = eventPublisher; + _eventStorage = eventStorage; + } + + public async Task Handle( + TRequest request, + RequestHandlerDelegate next, + CancellationToken ct) + { + var mark = Guid.NewGuid(); + _eventStorage.Mark(mark); + + var response = await next(); + + var events = _eventStorage.GetAll(mark); + await InvokeEventsAsync(events, ct); + + return response; + } + + private async Task InvokeEventsAsync(IReadOnlyCollection events, CancellationToken ct) + { + foreach (var domainEvent in events) + { + await _eventPublisher.Publish(domainEvent, ct); + } + } +} diff --git a/Source/ImoutoRebirth.Common/ImoutoRebirth.Common.Cqrs/Behaviors/TransactionBehavior.cs b/Source/ImoutoRebirth.Common/ImoutoRebirth.Common.Cqrs/Behaviors/TransactionBehavior.cs index a7e44f3a..ea8d1b67 100644 --- a/Source/ImoutoRebirth.Common/ImoutoRebirth.Common.Cqrs/Behaviors/TransactionBehavior.cs +++ b/Source/ImoutoRebirth.Common/ImoutoRebirth.Common.Cqrs/Behaviors/TransactionBehavior.cs @@ -1,5 +1,4 @@ -using ImoutoRebirth.Common.Cqrs.Events; -using ImoutoRebirth.Common.Domain; +using ImoutoRebirth.Common.Domain; using MediatR; namespace ImoutoRebirth.Common.Cqrs.Behaviors; @@ -8,18 +7,8 @@ public class TransactionBehavior : IPipelineBehavior _unitOfWork = unitOfWork; public async Task Handle( TRequest request, @@ -28,19 +17,12 @@ public async Task Handle( { var isolationLevel = typeof(TRequest).GetIsolationLevel(); - TResponse response; - using (var transaction = await _unitOfWork.CreateTransactionAsync(isolationLevel)) - { - response = await next(); + using var transaction = await _unitOfWork.CreateTransactionAsync(isolationLevel); - await _unitOfWork.SaveEntitiesAsync(cancellationToken); - await transaction.CommitAsync(); - } + var response = await next(); - foreach (var domainEvent in _eventStorage.GetAll()) - { - await _eventPublisher.Publish(domainEvent, cancellationToken); - } + await _unitOfWork.SaveEntitiesAsync(cancellationToken); + await transaction.CommitAsync(); return response; } diff --git a/Source/ImoutoRebirth.Common/ImoutoRebirth.Common.Cqrs/ServiceCollectionExtensions.cs b/Source/ImoutoRebirth.Common/ImoutoRebirth.Common.Cqrs/ServiceCollectionExtensions.cs index aeb8a85f..44385acd 100644 --- a/Source/ImoutoRebirth.Common/ImoutoRebirth.Common.Cqrs/ServiceCollectionExtensions.cs +++ b/Source/ImoutoRebirth.Common/ImoutoRebirth.Common.Cqrs/ServiceCollectionExtensions.cs @@ -10,6 +10,7 @@ public static class ServiceCollectionExtensions public static IServiceCollection AddTransactionBehavior(this IServiceCollection services) { services.AddTransient(typeof(IPipelineBehavior<,>), typeof(TransactionBehavior<,>)); + services.AddTransient(typeof(IPipelineBehavior<,>), typeof(EventProcessingBehavior<,>)); services.AddTransient(); return services; diff --git a/Source/ImoutoRebirth.Common/ImoutoRebirth.Common.Domain/EventStorage.cs b/Source/ImoutoRebirth.Common/ImoutoRebirth.Common.Domain/EventStorage.cs index ebb40f23..e01ec846 100644 --- a/Source/ImoutoRebirth.Common/ImoutoRebirth.Common.Domain/EventStorage.cs +++ b/Source/ImoutoRebirth.Common/ImoutoRebirth.Common.Domain/EventStorage.cs @@ -2,17 +2,26 @@ public class EventStorage : IEventStorage { - private readonly List _events = new List(); + private readonly List _events = new(); + private Guid? _marked; public void Add(IDomainEvent domainEvent) => _events.Add(domainEvent); public void AddRange(IEnumerable domainEvents) + => _events.AddRange(domainEvents); + + public IReadOnlyCollection GetAll(Guid mark) { - foreach (var domainEvent in domainEvents) + if (mark != _marked) { - _events.Add(domainEvent); + return Array.Empty(); } + + var events = _events.ToList(); + _events.Clear(); + _marked = null; + return events; } - public IReadOnlyCollection GetAll() => _events; -} \ No newline at end of file + public void Mark(Guid mark) => _marked ??= mark; +} diff --git a/Source/ImoutoRebirth.Common/ImoutoRebirth.Common.Domain/IEventStorage.cs b/Source/ImoutoRebirth.Common/ImoutoRebirth.Common.Domain/IEventStorage.cs index cb3c0407..89e5a41c 100644 --- a/Source/ImoutoRebirth.Common/ImoutoRebirth.Common.Domain/IEventStorage.cs +++ b/Source/ImoutoRebirth.Common/ImoutoRebirth.Common.Domain/IEventStorage.cs @@ -6,5 +6,7 @@ public interface IEventStorage void AddRange(IEnumerable domainEvents); - IReadOnlyCollection GetAll(); -} \ No newline at end of file + IReadOnlyCollection GetAll(Guid mark); + + void Mark(Guid mark); +} diff --git a/Source/ImoutoRebirth.Meido/ImoutoRebirth.Meido.Application/ParsingStatusSlice/NotesUpdatedDomainEventHandler.cs b/Source/ImoutoRebirth.Meido/ImoutoRebirth.Meido.Application/ParsingStatusSlice/Commands/RequestMetadataUpdateCommand.cs similarity index 54% rename from Source/ImoutoRebirth.Meido/ImoutoRebirth.Meido.Application/ParsingStatusSlice/NotesUpdatedDomainEventHandler.cs rename to Source/ImoutoRebirth.Meido/ImoutoRebirth.Meido.Application/ParsingStatusSlice/Commands/RequestMetadataUpdateCommand.cs index 2fc3d41a..46018e94 100644 --- a/Source/ImoutoRebirth.Meido/ImoutoRebirth.Meido.Application/ParsingStatusSlice/NotesUpdatedDomainEventHandler.cs +++ b/Source/ImoutoRebirth.Meido/ImoutoRebirth.Meido.Application/ParsingStatusSlice/Commands/RequestMetadataUpdateCommand.cs @@ -1,36 +1,36 @@ -using ImoutoRebirth.Common.Cqrs.Events; +using ImoutoRebirth.Common.Cqrs.Abstract; using ImoutoRebirth.Common.Domain; using ImoutoRebirth.Meido.Application.Infrastructure; -using ImoutoRebirth.Meido.Domain.SourceActualizingStateAggregate; +using ImoutoRebirth.Meido.Domain; using NodaTime; -namespace ImoutoRebirth.Meido.Application.ParsingStatusSlice; +namespace ImoutoRebirth.Meido.Application.ParsingStatusSlice.Commands; -internal class NotesUpdatedDomainEventHandler : DomainEventNotificationHandler +public record RequestMetadataUpdateCommand(IReadOnlyCollection PostIds, MetadataSource Source) : ICommand; + +internal class RequestMetadataUpdateCommandHandler : ICommandHandler { private readonly IClock _clock; private readonly IParsingStatusRepository _parsingStatusRepository; private readonly IEventStorage _eventStorage; - public NotesUpdatedDomainEventHandler( - IClock clock, + public RequestMetadataUpdateCommandHandler( IParsingStatusRepository parsingStatusRepository, - IEventStorage eventStorage) + IEventStorage eventStorage, + IClock clock) { - _clock = clock; _parsingStatusRepository = parsingStatusRepository; _eventStorage = eventStorage; + _clock = clock; } - protected override async Task Handle(PostsUpdatedDomainEvent domainEvent, CancellationToken ct) + public async Task Handle(RequestMetadataUpdateCommand command, CancellationToken ct) { - var (sourceActualizingState, postsIdsWithUpdatedNotes) = domainEvent; + var (postIds, source) = command; var now = _clock.GetCurrentInstant(); - var posts = await _parsingStatusRepository.GetBySourcePostIds( - postsIdsWithUpdatedNotes, - sourceActualizingState.Source); + var posts = await _parsingStatusRepository.GetBySourcePostIds(postIds, source); foreach (var parsingStatus in posts) { diff --git a/Source/ImoutoRebirth.Meido/ImoutoRebirth.Meido.Application/ParsingStatusSlice/PostsUpdatedDomainEventHandler.cs b/Source/ImoutoRebirth.Meido/ImoutoRebirth.Meido.Application/ParsingStatusSlice/PostsUpdatedDomainEventHandler.cs new file mode 100644 index 00000000..f711d74a --- /dev/null +++ b/Source/ImoutoRebirth.Meido/ImoutoRebirth.Meido.Application/ParsingStatusSlice/PostsUpdatedDomainEventHandler.cs @@ -0,0 +1,22 @@ +using ImoutoRebirth.Common.Cqrs.Events; +using ImoutoRebirth.Meido.Application.ParsingStatusSlice.Commands; +using ImoutoRebirth.Meido.Domain.SourceActualizingStateAggregate; +using MediatR; + +namespace ImoutoRebirth.Meido.Application.ParsingStatusSlice; + +internal class PostsUpdatedDomainEventHandler : DomainEventNotificationHandler +{ + private readonly IMediator _mediator; + + public PostsUpdatedDomainEventHandler(IMediator mediator) => _mediator = mediator; + + protected override async Task Handle(PostsUpdatedDomainEvent domainEvent, CancellationToken ct) + { + var (sourceActualizingState, postsIdsWithUpdatedNotes) = domainEvent; + + await _mediator.Send( + new RequestMetadataUpdateCommand(postsIdsWithUpdatedNotes, sourceActualizingState.Source), + ct); + } +} diff --git a/Source/ImoutoRebirth.Meido/ImoutoRebirth.Meido.Application/ParsingStatusSlice/UpdateRequestedDomainEventHandler.cs b/Source/ImoutoRebirth.Meido/ImoutoRebirth.Meido.Application/ParsingStatusSlice/UpdateRequestedDomainEventHandler.cs index 15775748..063b06ab 100644 --- a/Source/ImoutoRebirth.Meido/ImoutoRebirth.Meido.Application/ParsingStatusSlice/UpdateRequestedDomainEventHandler.cs +++ b/Source/ImoutoRebirth.Meido/ImoutoRebirth.Meido.Application/ParsingStatusSlice/UpdateRequestedDomainEventHandler.cs @@ -11,6 +11,6 @@ internal class UpdateRequestedDomainEventHandler : DomainEventNotificationHandle public UpdateRequestedDomainEventHandler(ISourceMetadataRequester metadataRequester) => _metadataRequester = metadataRequester; - protected override Task Handle(UpdateRequested domainEvent, CancellationToken cancellationToken) - => _metadataRequester.Request(domainEvent.Entity.Source, domainEvent.Entity.FileId, domainEvent.Entity.Md5); + protected override async Task Handle(UpdateRequested domainEvent, CancellationToken cancellationToken) + => await _metadataRequester.Request(domainEvent.Entity.Source, domainEvent.Entity.FileId, domainEvent.Entity.Md5); } diff --git a/Source/ImoutoRebirth.Meido/ImoutoRebirth.Meido.Infrastructure/MetadataRequest/SourceMetadataRequester.cs b/Source/ImoutoRebirth.Meido/ImoutoRebirth.Meido.Infrastructure/MetadataRequest/SourceMetadataRequester.cs index 68fef7df..9695db7a 100644 --- a/Source/ImoutoRebirth.Meido/ImoutoRebirth.Meido.Infrastructure/MetadataRequest/SourceMetadataRequester.cs +++ b/Source/ImoutoRebirth.Meido/ImoutoRebirth.Meido.Infrastructure/MetadataRequest/SourceMetadataRequester.cs @@ -9,7 +9,8 @@ internal class SourceMetadataRequester : ISourceMetadataRequester public SourceMetadataRequester(IEnumerable requesters) => _requesters = requesters; - public Task Request(MetadataSource source, Guid fileId, string md5) => Get(source).SendRequestCommand(fileId, md5); + public Task Request(MetadataSource source, Guid fileId, string md5) + => Get(source).SendRequestCommand(fileId, md5); private IMetadataRequester Get(MetadataSource source) { diff --git a/Source/ImoutoRebirth.Meido/ImoutoRebirth.Meido.UI/Consumers/TagsUpdatedCommandConsumer.cs b/Source/ImoutoRebirth.Meido/ImoutoRebirth.Meido.UI/Consumers/TagsUpdatedCommandConsumer.cs index 59833a64..1ceebb59 100644 --- a/Source/ImoutoRebirth.Meido/ImoutoRebirth.Meido.UI/Consumers/TagsUpdatedCommandConsumer.cs +++ b/Source/ImoutoRebirth.Meido/ImoutoRebirth.Meido.UI/Consumers/TagsUpdatedCommandConsumer.cs @@ -28,7 +28,7 @@ public async Task Consume(ConsumeContext context) var command = new MarkTagsUpdatedCommand( (MetadataSource)context.Message.SourceId, - context.Message.PostIds, + context.Message.PostIds.Distinct().ToList(), context.Message.LastHistoryId); await _mediator.Send(command);