Skip to content

Commit

Permalink
Fix event processing
Browse files Browse the repository at this point in the history
  • Loading branch information
ImoutoChan committed Oct 1, 2023
1 parent 0f77baa commit 109d7ed
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 48 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
using ImoutoRebirth.Common.Cqrs.Events;
using ImoutoRebirth.Common.Domain;
using MediatR;

namespace ImoutoRebirth.Common.Cqrs.Behaviors;

internal class EventProcessingBehavior<TRequest, TResponse>
: IPipelineBehavior<TRequest, TResponse>
where TRequest : notnull
{
private readonly IEventPublisher _eventPublisher;
private readonly IEventStorage _eventStorage;

public EventProcessingBehavior(IEventStorage eventStorage, IEventPublisher eventPublisher)
{
_eventPublisher = eventPublisher;
_eventStorage = eventStorage;
}

public async Task<TResponse> Handle(
TRequest request,
RequestHandlerDelegate<TResponse> next,
CancellationToken ct)
{
var mark = Guid.NewGuid();
_eventStorage.Mark(mark);

var response = await next();

var events = _eventStorage.GetAll(mark);
await InvokeEventsAsync(events, ct);

return response;
}

private async Task InvokeEventsAsync(IReadOnlyCollection<IDomainEvent> events, CancellationToken ct)
{
foreach (var domainEvent in events)
{
await _eventPublisher.Publish(domainEvent, ct);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using ImoutoRebirth.Common.Cqrs.Events;
using ImoutoRebirth.Common.Domain;
using ImoutoRebirth.Common.Domain;
using MediatR;

namespace ImoutoRebirth.Common.Cqrs.Behaviors;
Expand All @@ -8,18 +7,8 @@ public class TransactionBehavior<TRequest, TResponse> : IPipelineBehavior<TReque
where TRequest : notnull
{
private readonly IUnitOfWork _unitOfWork;
private readonly IEventStorage _eventStorage;
private readonly IEventPublisher _eventPublisher;

public TransactionBehavior(
IUnitOfWork unitOfWork,
IEventStorage eventStorage,
IEventPublisher eventPublisher)
{
_unitOfWork = unitOfWork;
_eventStorage = eventStorage;
_eventPublisher = eventPublisher;
}
public TransactionBehavior(IUnitOfWork unitOfWork) => _unitOfWork = unitOfWork;

public async Task<TResponse> Handle(
TRequest request,
Expand All @@ -28,19 +17,12 @@ public async Task<TResponse> Handle(
{
var isolationLevel = typeof(TRequest).GetIsolationLevel();

TResponse response;
using (var transaction = await _unitOfWork.CreateTransactionAsync(isolationLevel))
{
response = await next();
using var transaction = await _unitOfWork.CreateTransactionAsync(isolationLevel);

await _unitOfWork.SaveEntitiesAsync(cancellationToken);
await transaction.CommitAsync();
}
var response = await next();

foreach (var domainEvent in _eventStorage.GetAll())
{
await _eventPublisher.Publish(domainEvent, cancellationToken);
}
await _unitOfWork.SaveEntitiesAsync(cancellationToken);
await transaction.CommitAsync();

return response;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public static class ServiceCollectionExtensions
public static IServiceCollection AddTransactionBehavior(this IServiceCollection services)
{
services.AddTransient(typeof(IPipelineBehavior<,>), typeof(TransactionBehavior<,>));
services.AddTransient(typeof(IPipelineBehavior<,>), typeof(EventProcessingBehavior<,>));
services.AddTransient<IEventPublisher, EventPublisher>();

return services;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,26 @@

public class EventStorage : IEventStorage
{
private readonly List<IDomainEvent> _events = new List<IDomainEvent>();
private readonly List<IDomainEvent> _events = new();
private Guid? _marked;

public void Add(IDomainEvent domainEvent) => _events.Add(domainEvent);

public void AddRange(IEnumerable<IDomainEvent> domainEvents)
=> _events.AddRange(domainEvents);

public IReadOnlyCollection<IDomainEvent> GetAll(Guid mark)
{
foreach (var domainEvent in domainEvents)
if (mark != _marked)
{
_events.Add(domainEvent);
return Array.Empty<IDomainEvent>();
}

var events = _events.ToList();
_events.Clear();
_marked = null;
return events;
}

public IReadOnlyCollection<IDomainEvent> GetAll() => _events;
}
public void Mark(Guid mark) => _marked ??= mark;
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,7 @@ public interface IEventStorage

void AddRange(IEnumerable<IDomainEvent> domainEvents);

IReadOnlyCollection<IDomainEvent> GetAll();
}
IReadOnlyCollection<IDomainEvent> GetAll(Guid mark);

void Mark(Guid mark);
}
Original file line number Diff line number Diff line change
@@ -1,36 +1,36 @@
using ImoutoRebirth.Common.Cqrs.Events;
using ImoutoRebirth.Common.Cqrs.Abstract;
using ImoutoRebirth.Common.Domain;
using ImoutoRebirth.Meido.Application.Infrastructure;
using ImoutoRebirth.Meido.Domain.SourceActualizingStateAggregate;
using ImoutoRebirth.Meido.Domain;
using NodaTime;

namespace ImoutoRebirth.Meido.Application.ParsingStatusSlice;
namespace ImoutoRebirth.Meido.Application.ParsingStatusSlice.Commands;

internal class NotesUpdatedDomainEventHandler : DomainEventNotificationHandler<PostsUpdatedDomainEvent>
public record RequestMetadataUpdateCommand(IReadOnlyCollection<int> PostIds, MetadataSource Source) : ICommand;

internal class RequestMetadataUpdateCommandHandler : ICommandHandler<RequestMetadataUpdateCommand>
{
private readonly IClock _clock;
private readonly IParsingStatusRepository _parsingStatusRepository;
private readonly IEventStorage _eventStorage;

public NotesUpdatedDomainEventHandler(
IClock clock,
public RequestMetadataUpdateCommandHandler(
IParsingStatusRepository parsingStatusRepository,
IEventStorage eventStorage)
IEventStorage eventStorage,
IClock clock)
{
_clock = clock;
_parsingStatusRepository = parsingStatusRepository;
_eventStorage = eventStorage;
_clock = clock;
}

protected override async Task Handle(PostsUpdatedDomainEvent domainEvent, CancellationToken ct)
public async Task Handle(RequestMetadataUpdateCommand command, CancellationToken ct)
{
var (sourceActualizingState, postsIdsWithUpdatedNotes) = domainEvent;
var (postIds, source) = command;

var now = _clock.GetCurrentInstant();

var posts = await _parsingStatusRepository.GetBySourcePostIds(
postsIdsWithUpdatedNotes,
sourceActualizingState.Source);
var posts = await _parsingStatusRepository.GetBySourcePostIds(postIds, source);

foreach (var parsingStatus in posts)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using ImoutoRebirth.Common.Cqrs.Events;
using ImoutoRebirth.Meido.Application.ParsingStatusSlice.Commands;
using ImoutoRebirth.Meido.Domain.SourceActualizingStateAggregate;
using MediatR;

namespace ImoutoRebirth.Meido.Application.ParsingStatusSlice;

internal class PostsUpdatedDomainEventHandler : DomainEventNotificationHandler<PostsUpdatedDomainEvent>
{
private readonly IMediator _mediator;

public PostsUpdatedDomainEventHandler(IMediator mediator) => _mediator = mediator;

protected override async Task Handle(PostsUpdatedDomainEvent domainEvent, CancellationToken ct)
{
var (sourceActualizingState, postsIdsWithUpdatedNotes) = domainEvent;

await _mediator.Send(
new RequestMetadataUpdateCommand(postsIdsWithUpdatedNotes, sourceActualizingState.Source),
ct);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ internal class UpdateRequestedDomainEventHandler : DomainEventNotificationHandle
public UpdateRequestedDomainEventHandler(ISourceMetadataRequester metadataRequester)
=> _metadataRequester = metadataRequester;

protected override Task Handle(UpdateRequested domainEvent, CancellationToken cancellationToken)
=> _metadataRequester.Request(domainEvent.Entity.Source, domainEvent.Entity.FileId, domainEvent.Entity.Md5);
protected override async Task Handle(UpdateRequested domainEvent, CancellationToken cancellationToken)
=> await _metadataRequester.Request(domainEvent.Entity.Source, domainEvent.Entity.FileId, domainEvent.Entity.Md5);
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ internal class SourceMetadataRequester : ISourceMetadataRequester

public SourceMetadataRequester(IEnumerable<IMetadataRequester> requesters) => _requesters = requesters;

public Task Request(MetadataSource source, Guid fileId, string md5) => Get(source).SendRequestCommand(fileId, md5);
public Task Request(MetadataSource source, Guid fileId, string md5)
=> Get(source).SendRequestCommand(fileId, md5);

private IMetadataRequester Get(MetadataSource source)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public async Task Consume(ConsumeContext<ITagsUpdatedCommand> context)

var command = new MarkTagsUpdatedCommand(
(MetadataSource)context.Message.SourceId,
context.Message.PostIds,
context.Message.PostIds.Distinct().ToList(),
context.Message.LastHistoryId);

await _mediator.Send(command);
Expand Down

0 comments on commit 109d7ed

Please sign in to comment.