Skip to content

Commit

Permalink
Added snapshot support to sagas via a SnapshotSagaRepository
Browse files Browse the repository at this point in the history
Switched to using threadsafe collections in a threaded operation
  • Loading branch information
cdmdotnet committed Oct 19, 2024
1 parent 1d052c4 commit 578c25b
Show file tree
Hide file tree
Showing 29 changed files with 872 additions and 58 deletions.
46 changes: 26 additions & 20 deletions Framework/Azure/Cqrs.Azure.Storage/BlobStorageStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#endregion

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
Expand Down Expand Up @@ -210,11 +211,11 @@ public override IQueryProvider Provider
/// </summary>
protected virtual
#if NET472
void
void SaveData
#else
async Task
async Task SaveDataAsync
#endif
AsyncSaveData<TResult>(TData data, Func<TData, BlobClient, TResult> function, Func<TData, string> customFilenameFunction = null)
<TResult>(TData data, Func<TData, BlobClient, TResult> function, Func<TData, string> customFilenameFunction = null)
{
IList<Task> persistTasks = new List<Task>();
foreach ((BlobServiceClient Client, BlobContainerClient Container) tuple in WritableCollection)
Expand Down Expand Up @@ -276,10 +277,10 @@ async Task AddAsync
(TData data)
{
#if NET472
SaveData
#else
await
await SaveDataAsync
#endif
AsyncSaveData
(
data,
#if NET472
Expand Down Expand Up @@ -321,10 +322,10 @@ async Task DestroyAsync
(TData data)
{
#if NET472
SaveData
#else
await
await SaveDataAsync
#endif
AsyncSaveData
(
data,
#if NET472
Expand Down Expand Up @@ -428,8 +429,8 @@ async Task<IEnumerable<Stream>> OpenStreamsForReadingAsync
#endif
(Func<BlobItem, bool> predicate = null, string blobPrefix = null, string folderName = null)
{
IList<Stream> results = null;
for(int i = 0; i < 10; i++)
ConcurrentQueue<Stream> results = null;
for(int i = 0; i < 100; i++)
{
AsyncPageable<BlobItem> blobs;
if (!string.IsNullOrWhiteSpace(folderName))
Expand All @@ -454,20 +455,20 @@ async Task<IEnumerable<Stream>> OpenStreamsForReadingAsync
sourceQuery = query.Values;
IList<BlobItem> source = sourceQuery.ToList();

results = new List<Stream>();
IList<Task> downloadTasks = new List<Task>();
results = new ConcurrentQueue<Stream>();
var downloadTasks = new ConcurrentQueue<Task>();
foreach (BlobItem x in source)
{
#if NET472
downloadTasks.Add
downloadTasks.Enqueue
(
Task.Run(async () =>
{
#endif
BlobClient blobClient = ReadableSource.GetBlobClient(x.Name);
BlobDownloadResult downloadResult = await blobClient.DownloadContentAsync();
BinaryData as1 = downloadResult.Content;
results.Add(as1.ToStream());
results.Enqueue(as1.ToStream());
#if NET472
})
);
Expand All @@ -485,14 +486,19 @@ async Task<IEnumerable<Stream>> OpenStreamsForReadingAsync
if (!hasFinished)
{
Logger.LogError("Loading streams faulted.");
throw new Exception("Did not read all blobs.");
// Now we just go round again
// throw new Exception("Did not read all blobs.");
}
else
#endif
{
// We discovered that sometimes getting blobs can return null streams... not helpful. Seems to be a race condition
// Turns out this was probably due to using a non threadsafe collection... so the below might not be needed anymore
if (results.Count == source.Count && !results.Any(x => x == null))
break;
}

// We discovered that sometimes getting blobs can return null streams... not helpful. Seems to be a race condition
if (results.Count == source.Count && !results.Any(x => x == null))
break;
Thread.Sleep(150);
Thread.Sleep(250);
results = null;
}
if (results == null)
Expand Down Expand Up @@ -544,7 +550,7 @@ IEnumerable<TData> GetByFolder
#else
async Task<IEnumerable<TData>> GetByFolderAsync
#endif
(string folderName)
(string folderName, Func<BlobItem, bool> predicate = null)
{
string folder = new Uri(string.Format(folderName.StartsWith("..\\") ? "http://l/2/{0}" : "http://l/{0}", folderName)).AbsolutePath.Substring(1);
return
Expand All @@ -554,7 +560,7 @@ async Task<IEnumerable<TData>> GetByFolderAsync
#else
await OpenStreamsForReadingAsync
#endif
(folderName: folder)
(folderName: folder, predicate: predicate)
)
.Select(Deserialise);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
<DocumentationFile>Cqrs.Azure.Storage.xml</DocumentationFile>
<PackageLicenseExpression>Apache-2.0</PackageLicenseExpression>
<TreatWarningsAsErrors>True</TreatWarningsAsErrors>
<LangVersion>8</LangVersion>
<LangVersion>12.0</LangVersion>
</PropertyGroup>

<ItemGroup>
Expand Down
20 changes: 16 additions & 4 deletions Framework/Azure/Cqrs.Azure.Storage/Events/BlobStorageEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,23 @@ async Task<IEnumerable<IEvent<TAuthenticationToken>>> GetAsync
{
string streamName = string.Format(CqrsEventStoreStreamNamePattern, aggregateRootType.FullName, aggregateId);

Func<BlobItem, bool> predicate = null;
if (fromVersion > 0)
{
predicate = blob =>
{
string[] parts = blob.Name.Remove(0, streamName.Length).Split(["/"], StringSplitOptions.RemoveEmptyEntries);
// We're checking first if this isn't numeric... just return true, otherwise if the version matches return true.
// This is because while we should trust file name conventions... poorly dropped manual files shouldn't break things.
return !int.TryParse(parts[0], out int v) || v > fromVersion;
};
}

IEnumerable<EventData> query =
#if NET472
BlobStorageStore.GetByFolder(streamName)
BlobStorageStore.GetByFolder(streamName, predicate)
#else
(await BlobStorageStore.GetByFolderAsync(streamName))
(await BlobStorageStore.GetByFolderAsync(streamName, predicate))
#endif
.Where(eventData => eventData.AggregateId == streamName && eventData.Version > fromVersion)
.OrderByDescending(eventData => eventData.Version);
Expand Down Expand Up @@ -306,10 +318,10 @@ async Task AddToCorrelationFolderAsync
(EventData data)
{
#if NET472
SaveData
#else
await
await SaveDataAsync
#endif
AsyncSaveData
(
data,
#if NET472
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@
using System;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Chinchilla.Logging;
using Cqrs.Configuration;
using Cqrs.Events;
using Cqrs.Snapshots;

#if NET472
#else
using System.Threading.Tasks;
#endif

namespace Cqrs.Azure.Storage.Events
{
/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,17 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.Data.Tables;
using Chinchilla.Logging;
using Cqrs.Domain;
using Cqrs.Events;
using Cqrs.Messages;

#if NET472
#else
using System.Threading.Tasks;
#endif

namespace Cqrs.Azure.Storage.Events
{
/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,18 @@

using System;
using System.Linq;
using System.Threading.Tasks;
using Azure.Data.Tables;
using Chinchilla.Logging;
using Cqrs.Configuration;
using Cqrs.Domain;
using Cqrs.Events;
using Cqrs.Snapshots;

#if NET472
#else
using System.Threading.Tasks;
#endif

namespace Cqrs.Azure.Storage.Events
{
/// <summary>
Expand Down
Loading

0 comments on commit 578c25b

Please sign in to comment.