Skip to content

Commit

Permalink
Completed the ETL.
Browse files Browse the repository at this point in the history
  • Loading branch information
Utar94 committed Apr 21, 2024
1 parent 668188f commit 2be9ecf
Show file tree
Hide file tree
Showing 19 changed files with 242 additions and 100 deletions.
44 changes: 44 additions & 0 deletions backend/tools/Faktur.ETL.Worker/AggregateExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
using Faktur.Contracts.Receipts;
using Faktur.Domain.Receipts;
using Faktur.Domain.Receipts.Events;
using Logitar.EventSourcing;
using Logitar.Portal.Contracts;

namespace Faktur.ETL.Worker;

internal static class AggregateExtensions
{
public static void SetDates(this AggregateRoot aggregate, Aggregate model)
{
foreach (DomainEvent change in aggregate.Changes)
{
if (change.Version > 1)
{
change.OccurredOn = model.UpdatedOn;
}
else
{
change.OccurredOn = model.CreatedOn;
}
}
}

public static void SetDates(this ReceiptAggregate aggregate, Receipt model)
{
foreach (DomainEvent change in aggregate.Changes)
{
if (change is ReceiptCategorizedEvent categorized && model.ProcessedOn.HasValue)
{
categorized.OccurredOn = model.ProcessedOn.Value == default ? model.UpdatedOn : model.ProcessedOn.Value;
}
else if (change.Version > 1)
{
change.OccurredOn = model.UpdatedOn;
}
else
{
change.OccurredOn = model.CreatedOn;
}
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using MediatR;
using Logitar.Portal.Contracts.Actors;
using MediatR;

namespace Faktur.ETL.Worker.Commands;

internal record ExtractDataCommand : IRequest<ExtractedData>;
internal record ExtractDataCommand(Actor Actor) : IRequest<ExtractedData>;
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
using Faktur.Contracts.Products;
using Faktur.Contracts.Receipts;
using Faktur.Contracts.Stores;
using Logitar.Portal.Contracts.Actors;
using MediatR;

namespace Faktur.ETL.Worker.Commands;
Expand All @@ -19,10 +18,9 @@ public ExtractDataCommandHandler(ILogger<ExtractDataCommandHandler> logger, ISen
_sender = sender;
}

public async Task<ExtractedData> Handle(ExtractDataCommand _, CancellationToken cancellationToken)
public async Task<ExtractedData> Handle(ExtractDataCommand command, CancellationToken cancellationToken)
{
IEnumerable<Actor> actors = await _sender.Send(new ExtractActorsCommand(), cancellationToken);
Mapper mapper = new(actors);
Mapper mapper = new(command.Actor);

IEnumerable<Article> articles = await _sender.Send(new ExtractArticlesCommand(mapper), cancellationToken);
_logger.LogInformation("Extracted {Count} articles.", articles.Count());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public async Task<int> Handle(ImportArticlesCommand command, CancellationToken c

if (existingArticle.HasChanges)
{
existingArticle.SetDates(article);
count++;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public async Task<int> Handle(ImportBannersCommand command, CancellationToken ca

if (existingBanner.HasChanges)
{
existingBanner.SetDates(banner);
count++;
}
}
Expand Down
5 changes: 3 additions & 2 deletions backend/tools/Faktur.ETL.Worker/Commands/ImportDataCommand.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using MediatR;
using Logitar.Portal.Contracts.Actors;
using MediatR;

namespace Faktur.ETL.Worker.Commands;

internal record ImportDataCommand(ExtractedData ExtractedData) : IRequest;
internal record ImportDataCommand(Actor Actor, ExtractedData ExtractedData) : IRequest;
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ public async Task Handle(ImportDataCommand command, CancellationToken cancellati
{
ExtractedData extractedData = command.ExtractedData;

int taxes = await _sender.Send(new ImportTaxesCommand(command.Actor), cancellationToken);
_logger.LogInformation("Saved {Count} taxes.", taxes);

int articles = await _sender.Send(new ImportArticlesCommand(extractedData.Articles), cancellationToken);
_logger.LogInformation("Saved {Count} articles.", articles);

Expand All @@ -29,6 +32,7 @@ public async Task Handle(ImportDataCommand command, CancellationToken cancellati
int products = await _sender.Send(new ImportProductsCommand(extractedData.Products), cancellationToken);
_logger.LogInformation("Saved {Count} products.", products);

// TODO(fpion): import receipts
int receipts = await _sender.Send(new ImportReceiptsCommand(extractedData.Receipts), cancellationToken);
_logger.LogInformation("Saved {Count} receipts.", receipts);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public async Task<int> Handle(ImportProductsCommand command, CancellationToken c

if (existingProduct.HasChanges)
{
existingProduct.SetDates(product);
count++;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
using Faktur.Contracts.Receipts;
using MediatR;

namespace Faktur.ETL.Worker.Commands;

internal record ImportReceiptsCommand(IEnumerable<Receipt> Receipts) : IRequest<int>;
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
using Faktur.Contracts.Receipts;
using Faktur.Domain.Articles;
using Faktur.Domain.Products;
using Faktur.Domain.Receipts;
using Faktur.Domain.Stores;
using Faktur.Domain.Taxes;
using Logitar.EventSourcing;
using Logitar.Identity.Domain.Shared;
using MediatR;

namespace Faktur.ETL.Worker.Commands;

internal class ImportReceiptsCommandHandler : IRequestHandler<ImportReceiptsCommand, int>
{
private readonly IReceiptRepository _receiptRepository;
private readonly ITaxRepository _taxRepository;

public ImportReceiptsCommandHandler(IReceiptRepository receiptRepository, ITaxRepository taxRepository)
{
_receiptRepository = receiptRepository;
_taxRepository = taxRepository;
}

public async Task<int> Handle(ImportReceiptsCommand command, CancellationToken cancellationToken)
{
int errors = 0;

IEnumerable<TaxAggregate> taxes = await _taxRepository.LoadAsync(cancellationToken);

Dictionary<Guid, ReceiptAggregate> receipts = (await _receiptRepository.LoadAsync(cancellationToken))
.ToDictionary(x => x.Id.ToGuid(), x => x);
int count = 0;
foreach (Receipt receipt in command.Receipts)
{
ReceiptId id = new(receipt.Id);

NumberUnit? number = NumberUnit.TryCreate(receipt.Number);
if (receipts.TryGetValue(receipt.Id, out ReceiptAggregate? existingReceipt))
{
existingReceipt.IssuedOn = receipt.IssuedOn;
existingReceipt.Number = number;

ActorId updatedBy = new(receipt.UpdatedBy.Id);
existingReceipt.Update(updatedBy);
}
else
{
StoreId storeId = new(receipt.Store.Id);
StoreAggregate store = new(storeId.AggregateId);

List<ReceiptItemUnit> items = new(capacity: receipt.Items.Count);
Dictionary<ushort, CategoryUnit?> itemCategories = new(capacity: receipt.Items.Count);
foreach (ReceiptItem item in receipt.Items)
{
if (item.Quantity < 1 || item.UnitPrice < 1 || item.Price < 1)
{
errors++;
continue;
}

NumberUnit? departmentNumber = null;
DepartmentUnit? department = null;
if (item.Department != null)
{
departmentNumber = new NumberUnit(item.Department.Number);
department = new DepartmentUnit(new DisplayNameUnit(item.Department.DisplayName), DescriptionUnit.TryCreate(item.Department.Description));
}

items.Add(new ReceiptItemUnit(GtinUnit.TryCreate(item.Gtin), SkuUnit.TryCreate(item.Sku), new DisplayNameUnit(item.Label), FlagsUnit.TryCreate(item.Flags),
item.Quantity, item.UnitPrice, item.Price, departmentNumber, department));

CategoryUnit? category = null;
if (item.Category != null && item.Category != "Shared")
{
category = new CategoryUnit(item.Category);
}
itemCategories[item.Number] = category;
}

ActorId createdBy = new(receipt.CreatedBy.Id);
existingReceipt = ReceiptAggregate.Import(store, receipt.IssuedOn, number, items, taxes, createdBy, id);
receipts[receipt.Id] = existingReceipt;

if (receipt.HasBeenProcessed)
{
ActorId processedBy = receipt.ProcessedBy == null ? createdBy : new(receipt.ProcessedBy.Id);
existingReceipt.Categorize(itemCategories, processedBy);
}
}

if (existingReceipt.HasChanges)
{
existingReceipt.SetDates(receipt);
count++;
}
}

await _receiptRepository.SaveAsync(receipts.Values, cancellationToken);

return count;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public async Task<int> Handle(ImportStoresCommand command, CancellationToken can

if (existingStore.HasChanges)
{
existingStore.SetDates(store);
count++;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@

namespace Faktur.ETL.Worker.Commands;

internal record ExtractActorsCommand : IRequest<IEnumerable<Actor>>;
internal record ImportTaxesCommand(Actor Actor) : IRequest<int>;
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
using Faktur.Domain.Products;
using Faktur.Domain.Taxes;
using Logitar.EventSourcing;
using MediatR;

namespace Faktur.ETL.Worker.Commands;

internal class ImportTaxesCommandHandler : IRequestHandler<ImportTaxesCommand, int>
{
private readonly ITaxRepository _taxRepository;

public ImportTaxesCommandHandler(ITaxRepository taxRepository)
{
_taxRepository = taxRepository;
}

public async Task<int> Handle(ImportTaxesCommand command, CancellationToken cancellationToken)
{
ActorId actorId = new(command.Actor.Id);

Dictionary<TaxCodeUnit, TaxAggregate> taxes = (await _taxRepository.LoadAsync(cancellationToken))
.ToDictionary(x => x.Code, x => x);

int count = 0;

TaxCodeUnit gstCode = new("GST");
if (!taxes.TryGetValue(gstCode, out TaxAggregate? gst))
{
gst = new(gstCode, rate: 0.05, actorId);
taxes[gstCode] = gst;
}
gst.Flags = new FlagsUnit("F");
gst.Update(actorId);
if (gst.HasChanges)
{
count++;
}

TaxCodeUnit qstCode = new("QST");
if (!taxes.TryGetValue(qstCode, out TaxAggregate? qst))
{
qst = new(qstCode, rate: 0.09975, actorId);
taxes[qstCode] = qst;
}
qst.Flags = new FlagsUnit("P");
qst.Update(actorId);
if (gst.HasChanges)
{
count++;
}

await _taxRepository.SaveAsync(taxes.Values, cancellationToken);

return count;
}
}

This file was deleted.

1 change: 0 additions & 1 deletion backend/tools/Faktur.ETL.Worker/Entities/LegacyContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ public LegacyContext(DbContextOptions<LegacyContext> options) : base(options)
public DbSet<ReceiptTaxEntity> ReceiptTaxes { get; private set; }
public DbSet<ReceiptEntity> Receipts { get; private set; }
public DbSet<StoreEntity> Stores { get; private set; }
public DbSet<UserEntity> Users { get; private set; }

protected override void OnModelCreating(ModelBuilder modelBuilder)
{
Expand Down
15 changes: 0 additions & 15 deletions backend/tools/Faktur.ETL.Worker/Entities/UserEntity.cs

This file was deleted.

Loading

0 comments on commit 2be9ecf

Please sign in to comment.