From 578c25b460abcc340f8a282459282fab3c2d83e3 Mon Sep 17 00:00:00 2001 From: Grover <135006+cdmdotnet@users.noreply.github.com> Date: Sat, 19 Oct 2024 18:53:56 +1300 Subject: [PATCH] Added snapshot support to sagas via a SnapshotSagaRepository Switched to using threadsafe collections in a threaded operation --- .../Cqrs.Azure.Storage/BlobStorageStore.cs | 46 +-- .../Cqrs.Azure.Storage.csproj | 2 +- .../Events/BlobStorageEventStore.cs | 20 +- .../Events/BlobStorageSnapshotStore.cs | 6 +- .../Events/TableStorageEventStore.cs | 6 +- .../Events/TableStorageSnapshotStore.cs | 6 +- .../BlobStorageSnapshotSagaRepositoryTests.cs | 246 ++++++++++++++++ ...Cqrs.Azure.Storage.Test.Integration.csproj | 1 + .../TestSnapshotSaga.cs | 61 ++++ Framework/Cqrs.Tests/Substitutes/TestSaga.cs | 1 - .../Substitutes/TestSagaUnitOfWork.cs | 8 +- Framework/Cqrs/Cqrs.csproj | 1 + Framework/Cqrs/Domain/AggregateRepository.cs | 6 +- Framework/Cqrs/Domain/IAggregateRepository.cs | 6 +- Framework/Cqrs/Domain/ISagaDescriptor.cs | 3 + Framework/Cqrs/Domain/ISagaRepository.cs | 6 +- Framework/Cqrs/Domain/ISagaUnitOfWork.cs | 8 +- .../Cqrs/Domain/ISnapshotSagaRepository.cs | 21 ++ Framework/Cqrs/Domain/IUnitOfWork.cs | 6 +- Framework/Cqrs/Domain/SagaDescriptor.cs | 3 + Framework/Cqrs/Domain/SagaUnitOfWork.cs | 31 +- .../Cqrs/Snapshots/DefaultSnapshotStrategy.cs | 25 +- Framework/Cqrs/Snapshots/ISnapshotStore.cs | 6 +- Framework/Cqrs/Snapshots/ISnapshotStrategy.cs | 8 + .../Cqrs/Snapshots/SnapshotAggregateRoot.cs | 11 + .../Cqrs/Snapshots/SnapshotRepository.cs | 20 +- Framework/Cqrs/Snapshots/SnapshotSaga.cs | 82 ++++++ .../Cqrs/Snapshots/SnapshotSagaRepository.cs | 278 ++++++++++++++++++ Framework/Cqrs/Snapshots/SnapshotStore.cs | 6 +- 29 files changed, 872 insertions(+), 58 deletions(-) create mode 100644 Framework/Azure/Tests/Cqrs.Azure.Storage.Test.Integration/BlobStorageSnapshotSagaRepositoryTests.cs create mode 100644 Framework/Azure/Tests/Cqrs.Azure.Storage.Test.Integration/TestSnapshotSaga.cs create mode 100644 Framework/Cqrs/Domain/ISnapshotSagaRepository.cs create mode 100644 Framework/Cqrs/Snapshots/SnapshotSaga.cs create mode 100644 Framework/Cqrs/Snapshots/SnapshotSagaRepository.cs diff --git a/Framework/Azure/Cqrs.Azure.Storage/BlobStorageStore.cs b/Framework/Azure/Cqrs.Azure.Storage/BlobStorageStore.cs index ab4dcba7c4..5cc39ddb9f 100644 --- a/Framework/Azure/Cqrs.Azure.Storage/BlobStorageStore.cs +++ b/Framework/Azure/Cqrs.Azure.Storage/BlobStorageStore.cs @@ -7,6 +7,7 @@ #endregion using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Linq; @@ -210,11 +211,11 @@ public override IQueryProvider Provider /// protected virtual #if NET472 - void + void SaveData #else - async Task + async Task SaveDataAsync #endif - AsyncSaveData(TData data, Func function, Func customFilenameFunction = null) + (TData data, Func function, Func customFilenameFunction = null) { IList persistTasks = new List(); foreach ((BlobServiceClient Client, BlobContainerClient Container) tuple in WritableCollection) @@ -276,10 +277,10 @@ async Task AddAsync (TData data) { #if NET472 + SaveData #else - await + await SaveDataAsync #endif - AsyncSaveData ( data, #if NET472 @@ -321,10 +322,10 @@ async Task DestroyAsync (TData data) { #if NET472 + SaveData #else - await + await SaveDataAsync #endif - AsyncSaveData ( data, #if NET472 @@ -428,8 +429,8 @@ async Task> OpenStreamsForReadingAsync #endif (Func predicate = null, string blobPrefix = null, string folderName = null) { - IList results = null; - for(int i = 0; i < 10; i++) + ConcurrentQueue results = null; + for(int i = 0; i < 100; i++) { AsyncPageable blobs; if (!string.IsNullOrWhiteSpace(folderName)) @@ -454,12 +455,12 @@ async Task> OpenStreamsForReadingAsync sourceQuery = query.Values; IList source = sourceQuery.ToList(); - results = new List(); - IList downloadTasks = new List(); + results = new ConcurrentQueue(); + var downloadTasks = new ConcurrentQueue(); foreach (BlobItem x in source) { #if NET472 - downloadTasks.Add + downloadTasks.Enqueue ( Task.Run(async () => { @@ -467,7 +468,7 @@ async Task> OpenStreamsForReadingAsync BlobClient blobClient = ReadableSource.GetBlobClient(x.Name); BlobDownloadResult downloadResult = await blobClient.DownloadContentAsync(); BinaryData as1 = downloadResult.Content; - results.Add(as1.ToStream()); + results.Enqueue(as1.ToStream()); #if NET472 }) ); @@ -485,14 +486,19 @@ async Task> OpenStreamsForReadingAsync if (!hasFinished) { Logger.LogError("Loading streams faulted."); - throw new Exception("Did not read all blobs."); + // Now we just go round again + // throw new Exception("Did not read all blobs."); } + else #endif + { + // We discovered that sometimes getting blobs can return null streams... not helpful. Seems to be a race condition + // Turns out this was probably due to using a non threadsafe collection... so the below might not be needed anymore + if (results.Count == source.Count && !results.Any(x => x == null)) + break; + } - // We discovered that sometimes getting blobs can return null streams... not helpful. Seems to be a race condition - if (results.Count == source.Count && !results.Any(x => x == null)) - break; - Thread.Sleep(150); + Thread.Sleep(250); results = null; } if (results == null) @@ -544,7 +550,7 @@ IEnumerable GetByFolder #else async Task> GetByFolderAsync #endif - (string folderName) + (string folderName, Func predicate = null) { string folder = new Uri(string.Format(folderName.StartsWith("..\\") ? "http://l/2/{0}" : "http://l/{0}", folderName)).AbsolutePath.Substring(1); return @@ -554,7 +560,7 @@ async Task> GetByFolderAsync #else await OpenStreamsForReadingAsync #endif - (folderName: folder) + (folderName: folder, predicate: predicate) ) .Select(Deserialise); } diff --git a/Framework/Azure/Cqrs.Azure.Storage/Cqrs.Azure.Storage.csproj b/Framework/Azure/Cqrs.Azure.Storage/Cqrs.Azure.Storage.csproj index 8af4be0bd8..0842cb829c 100644 --- a/Framework/Azure/Cqrs.Azure.Storage/Cqrs.Azure.Storage.csproj +++ b/Framework/Azure/Cqrs.Azure.Storage/Cqrs.Azure.Storage.csproj @@ -32,7 +32,7 @@ Cqrs.Azure.Storage.xml Apache-2.0 True - 8 + 12.0 diff --git a/Framework/Azure/Cqrs.Azure.Storage/Events/BlobStorageEventStore.cs b/Framework/Azure/Cqrs.Azure.Storage/Events/BlobStorageEventStore.cs index 9778c0284b..9a9a4e75d3 100644 --- a/Framework/Azure/Cqrs.Azure.Storage/Events/BlobStorageEventStore.cs +++ b/Framework/Azure/Cqrs.Azure.Storage/Events/BlobStorageEventStore.cs @@ -65,11 +65,23 @@ async Task>> GetAsync { string streamName = string.Format(CqrsEventStoreStreamNamePattern, aggregateRootType.FullName, aggregateId); + Func predicate = null; + if (fromVersion > 0) + { + predicate = blob => + { + string[] parts = blob.Name.Remove(0, streamName.Length).Split(["/"], StringSplitOptions.RemoveEmptyEntries); + // We're checking first if this isn't numeric... just return true, otherwise if the version matches return true. + // This is because while we should trust file name conventions... poorly dropped manual files shouldn't break things. + return !int.TryParse(parts[0], out int v) || v > fromVersion; + }; + } + IEnumerable query = #if NET472 - BlobStorageStore.GetByFolder(streamName) + BlobStorageStore.GetByFolder(streamName, predicate) #else - (await BlobStorageStore.GetByFolderAsync(streamName)) + (await BlobStorageStore.GetByFolderAsync(streamName, predicate)) #endif .Where(eventData => eventData.AggregateId == streamName && eventData.Version > fromVersion) .OrderByDescending(eventData => eventData.Version); @@ -306,10 +318,10 @@ async Task AddToCorrelationFolderAsync (EventData data) { #if NET472 + SaveData #else - await + await SaveDataAsync #endif - AsyncSaveData ( data, #if NET472 diff --git a/Framework/Azure/Cqrs.Azure.Storage/Events/BlobStorageSnapshotStore.cs b/Framework/Azure/Cqrs.Azure.Storage/Events/BlobStorageSnapshotStore.cs index 84d8ad3397..999d0e47e4 100644 --- a/Framework/Azure/Cqrs.Azure.Storage/Events/BlobStorageSnapshotStore.cs +++ b/Framework/Azure/Cqrs.Azure.Storage/Events/BlobStorageSnapshotStore.cs @@ -9,12 +9,16 @@ using System; using System.IO; using System.Linq; -using System.Threading.Tasks; using Chinchilla.Logging; using Cqrs.Configuration; using Cqrs.Events; using Cqrs.Snapshots; +#if NET472 +#else +using System.Threading.Tasks; +#endif + namespace Cqrs.Azure.Storage.Events { /// diff --git a/Framework/Azure/Cqrs.Azure.Storage/Events/TableStorageEventStore.cs b/Framework/Azure/Cqrs.Azure.Storage/Events/TableStorageEventStore.cs index 0cdaef9da4..50f7e8f6ac 100644 --- a/Framework/Azure/Cqrs.Azure.Storage/Events/TableStorageEventStore.cs +++ b/Framework/Azure/Cqrs.Azure.Storage/Events/TableStorageEventStore.cs @@ -9,13 +9,17 @@ using System; using System.Collections.Generic; using System.Linq; -using System.Threading.Tasks; using Azure.Data.Tables; using Chinchilla.Logging; using Cqrs.Domain; using Cqrs.Events; using Cqrs.Messages; +#if NET472 +#else +using System.Threading.Tasks; +#endif + namespace Cqrs.Azure.Storage.Events { /// diff --git a/Framework/Azure/Cqrs.Azure.Storage/Events/TableStorageSnapshotStore.cs b/Framework/Azure/Cqrs.Azure.Storage/Events/TableStorageSnapshotStore.cs index 78b27619ae..90cc3a7c5d 100644 --- a/Framework/Azure/Cqrs.Azure.Storage/Events/TableStorageSnapshotStore.cs +++ b/Framework/Azure/Cqrs.Azure.Storage/Events/TableStorageSnapshotStore.cs @@ -8,7 +8,6 @@ using System; using System.Linq; -using System.Threading.Tasks; using Azure.Data.Tables; using Chinchilla.Logging; using Cqrs.Configuration; @@ -16,6 +15,11 @@ using Cqrs.Events; using Cqrs.Snapshots; +#if NET472 +#else +using System.Threading.Tasks; +#endif + namespace Cqrs.Azure.Storage.Events { /// diff --git a/Framework/Azure/Tests/Cqrs.Azure.Storage.Test.Integration/BlobStorageSnapshotSagaRepositoryTests.cs b/Framework/Azure/Tests/Cqrs.Azure.Storage.Test.Integration/BlobStorageSnapshotSagaRepositoryTests.cs new file mode 100644 index 0000000000..7bb3660849 --- /dev/null +++ b/Framework/Azure/Tests/Cqrs.Azure.Storage.Test.Integration/BlobStorageSnapshotSagaRepositoryTests.cs @@ -0,0 +1,246 @@ +#region Copyright +// // ----------------------------------------------------------------------- +// // +// // Copyright Chinchilla Software Limited. All rights reserved. +// // +// // ----------------------------------------------------------------------- +#endregion + +using System; +using System.Collections.Generic; + +using Chinchilla.Logging; +using Chinchilla.Logging.Configuration; +using Chinchilla.StateManagement.Threaded; +using Cqrs.Azure.ServiceBus.Tests.Unit; +using Cqrs.Azure.Storage.Events; +using Cqrs.Commands; +using Cqrs.Configuration; +using Cqrs.Domain; +using Cqrs.Domain.Factories; +using Cqrs.Events; +using Cqrs.Snapshots; +using Moq; +using NUnit.Framework; + +using TestClass = NUnit.Framework.TestFixtureAttribute; +using TestMethod = NUnit.Framework.TestAttribute; +using TestInitialize = NUnit.Framework.SetUpAttribute; +using TestCleanup = NUnit.Framework.TearDownAttribute; +using TestContext = System.Object; + + + + + +#if NET472 +#else +using System.Threading.Tasks; +#endif +#if NET472_OR_GREATER +#else +using Cqrs.Azure.ConfigurationManager; +using Microsoft.Extensions.Configuration; +#endif + +namespace Cqrs.Azure.Storage.Test.Integration +{ + /// + /// A series of tests on the class + /// + [TestClass] + public class BlobStorageSnapshotSagaRepositoryTests + { + /// + /// Tests the method + /// Passing a valid test + /// Expecting the test is able to be read. + /// + [TestMethod] + public virtual +#if NET472 + void +#else + async Task +#endif + Get_ValidEvent_EventCanBeRetreived() + { + // Arrange + IConfigurationManager configurationManager; +#if NET472_OR_GREATER + configurationManager = new Configuration.ConfigurationManager(); +#else + IConfigurationRoot config = new ConfigurationBuilder() + .AddJsonFile("cqrs.json", optional: true, reloadOnChange: true) + .AddEnvironmentVariables() + .Build(); + + configurationManager = new CloudConfigurationManager(config); + DependencyResolver.ConfigurationManager = configurationManager; +#endif + + var mockRepository = new MockRepository(MockBehavior.Strict); + + var mockEventPublisher = mockRepository.Create< +#if NET472 + IEventPublisher +#else + IAsyncEventPublisher +#endif + >(); + mockEventPublisher + .Setup(x => x. +#if NET472 + Publish +#else + PublishAsync +#endif + (It.Is>(y => true))) +#if NET472 +#else + .Returns(Task.CompletedTask) +#endif + ; + mockEventPublisher + .Setup(x => x. +#if NET472 + Publish +#else + PublishAsync +#endif + (It.Is>>(y => true))) +#if NET472 +#else + .Returns(Task.CompletedTask) +#endif + ; + var mockCommandPublisher = mockRepository.Create < +#if NET472 + ICommandPublisher +#else + IAsyncCommandPublisher +#endif + >(); + mockCommandPublisher + .Setup(x => x. +#if NET472 + Publish +#else + PublishAsync +#endif + (It.Is>(y => true))) +#if NET472 +#else + .Returns(Task.CompletedTask) +#endif + ; + mockCommandPublisher + .Setup(x => x. +#if NET472 + Publish +#else + PublishAsync +#endif + (It.Is>>(y => true))) +#if NET472 +#else + .Returns(Task.CompletedTask) +#endif + ; + + Mock mockDependencyResolver = mockRepository.Create(); + mockDependencyResolver + .Setup(x => x.Resolve()) + .Returns(configurationManager); + mockDependencyResolver + .Setup(x => x.Resolve< +#if NET472 + ICommandPublisher +#else + IAsyncCommandPublisher +#endif + >()) + .Returns(mockCommandPublisher.Object); + mockDependencyResolver + .Setup(x => x.Resolve< +#if NET472 + IEventPublisher +#else + IAsyncEventPublisher +#endif + >()) + .Returns(mockEventPublisher.Object); + + var correlationIdHelper = new CorrelationIdHelper(new ContextItemCollectionFactory()); + correlationIdHelper.SetCorrelationId(Guid.NewGuid()); + var logger = new ConsoleLogger(new LoggerSettingsConfigurationSection(), correlationIdHelper); + var snapshotStore = new BlobStorageSnapshotStore(configurationManager, new SnapshotDeserialiser(), logger, correlationIdHelper, new DefaultSnapshotBuilder(), new BlobStorageSnapshotStoreConnectionStringFactory(configurationManager, logger)); + IAggregateFactory aggregateFactory = new AggregateFactory(mockDependencyResolver.Object, logger); + var eventStore = new BlobStorageEventStore(new DefaultEventBuilder(), new EventDeserialiser(), logger, new BlobStorageEventStoreConnectionStringFactory(configurationManager, logger)); + var sagaRepository = new SagaRepository(aggregateFactory, eventStore, mockEventPublisher.Object, mockCommandPublisher.Object, correlationIdHelper); + var snapshotRepository = new SnapshotSagaRepository(snapshotStore, new DefaultSnapshotStrategy(), sagaRepository, eventStore, aggregateFactory); + + var id1 = Guid.NewGuid(); + var id2 = Guid.NewGuid(); + var saga1 = aggregateFactory.Create(id1); + var saga2 = aggregateFactory.Create(id2); + + var unitOfWork = new SagaUnitOfWork(snapshotRepository, sagaRepository); + + // Act + for (int i = 0; i < 40; i++) + { +#if NET472 + unitOfWork.Add +#else + await unitOfWork.AddAsync +#endif + (saga1, true); +#if NET472 + unitOfWork.Add +#else + await unitOfWork.AddAsync +#endif + (saga2, true); + + var @event = new TestEvent + { + Rsn = i % 2 == 1 ? id1 : id2, + Id = i % 2 == 1 ? id1 : id2, + CorrelationId = correlationIdHelper.GetCorrelationId(), + Frameworks = new List { $"Test {i}" }, + TimeStamp = DateTimeOffset.UtcNow + }; + ( + i % 2 == 1 + ? saga1 + : saga2 + ).Handle(@event); + +#if NET472 + unitOfWork.Commit(); +#else + await unitOfWork.CommitAsync(); +#endif + } + + // Assert + TestSnapshotSaga _saga1 = +#if NET472 + unitOfWork.Get +#else + await unitOfWork.GetAsync +#endif + (id1, useSnapshots: true); + TestSnapshotSaga _saga2 = +#if NET472 + unitOfWork.Get +#else + await unitOfWork.GetAsync +#endif + (id2, useSnapshots: true); + Assert.AreEqual(20, _saga1.EventCount); + Assert.AreEqual(20, _saga2.EventCount); + } + } +} \ No newline at end of file diff --git a/Framework/Azure/Tests/Cqrs.Azure.Storage.Test.Integration/Cqrs.Azure.Storage.Test.Integration.csproj b/Framework/Azure/Tests/Cqrs.Azure.Storage.Test.Integration/Cqrs.Azure.Storage.Test.Integration.csproj index 783662c39d..cba5bb97a3 100644 --- a/Framework/Azure/Tests/Cqrs.Azure.Storage.Test.Integration/Cqrs.Azure.Storage.Test.Integration.csproj +++ b/Framework/Azure/Tests/Cqrs.Azure.Storage.Test.Integration/Cqrs.Azure.Storage.Test.Integration.csproj @@ -21,6 +21,7 @@ + diff --git a/Framework/Azure/Tests/Cqrs.Azure.Storage.Test.Integration/TestSnapshotSaga.cs b/Framework/Azure/Tests/Cqrs.Azure.Storage.Test.Integration/TestSnapshotSaga.cs new file mode 100644 index 0000000000..90694e6f19 --- /dev/null +++ b/Framework/Azure/Tests/Cqrs.Azure.Storage.Test.Integration/TestSnapshotSaga.cs @@ -0,0 +1,61 @@ +using Chinchilla.Logging; +using Cqrs.Azure.ServiceBus.Tests.Unit; +using Cqrs.Configuration; +using Cqrs.Events; +using Cqrs.Snapshots; +using System; + +namespace Cqrs.Azure.Storage.Test.Integration +{ + public class TestSnapshotSaga : SnapshotSaga + { + private TestSnapshotSaga(IDependencyResolver dependencyResolver, ILogger logger) + : base(dependencyResolver, logger) + { + } + + private TestSnapshotSaga(IDependencyResolver dependencyResolver, ILogger logger, Guid id) + : base(dependencyResolver, logger) + { + Id = id; + } + + public int EventCount { get; private set; } + + #region Implementation of IMessageHandler + + public void Handle(TestEvent message) + { + ApplyChange(message); + } + + public void Apply(TestEvent e) + { + EventCount++; + } + + protected override void SetId(ISagaEvent sagaEvent) + { + // We set Id as the eventstore is using that and not an IEventWithIdentity + sagaEvent.Id = Rsn; + sagaEvent.Rsn = Rsn; + } + + protected override TestSagaSnapshot CreateSnapshot() + { + return new TestSagaSnapshot { EventCount = EventCount }; + } + + protected override void RestoreFromSnapshot(TestSagaSnapshot snapshot) + { + EventCount = snapshot.EventCount; + } + + #endregion + } + + public class TestSagaSnapshot : Snapshot + { + public int EventCount { get; set; } + } +} diff --git a/Framework/Cqrs.Tests/Substitutes/TestSaga.cs b/Framework/Cqrs.Tests/Substitutes/TestSaga.cs index 3b02b44363..b69c3fa4eb 100644 --- a/Framework/Cqrs.Tests/Substitutes/TestSaga.cs +++ b/Framework/Cqrs.Tests/Substitutes/TestSaga.cs @@ -75,7 +75,6 @@ public void Handle(TestAggregateDidSomethingElse2 message) } public class TestSaga : Saga - { public int DidSomethingCount; diff --git a/Framework/Cqrs.Tests/Substitutes/TestSagaUnitOfWork.cs b/Framework/Cqrs.Tests/Substitutes/TestSagaUnitOfWork.cs index 616e1524db..7d3d5e80ef 100644 --- a/Framework/Cqrs.Tests/Substitutes/TestSagaUnitOfWork.cs +++ b/Framework/Cqrs.Tests/Substitutes/TestSagaUnitOfWork.cs @@ -25,7 +25,7 @@ static TestSagaUnitOfWork() /// /// Add an item into the ready to be committed. /// - public void Add(TSaga saga) + public void Add(TSaga saga, bool useSnapshots = false) where TSaga : ISaga { Tuple, int, int, int> testSaga; @@ -39,7 +39,7 @@ public void Add(TSaga saga) /// /// Get an item from the if it has already been loaded. /// - public TSaga Get(Guid id, int? expectedVersion = null) + public TSaga Get(Guid id, int? expectedVersion = null, bool useSnapshots = false) where TSaga : ISaga { Tuple, int, int, int> testSaga = TestGuidSagasWithAddCountGetCountAndCommitCount[id]; @@ -75,7 +75,7 @@ public void Commit() /// /// Add an item into the ready to be committed. /// - void ISagaUnitOfWork.Add(TSaga saga) + void ISagaUnitOfWork.Add(TSaga saga, bool useSnapshots) { Tuple, int, int, int> testSaga; if (!TestSagasWithAddCountGetCountAndCommitCount.TryGetValue(saga.Id, out testSaga)) @@ -88,7 +88,7 @@ void ISagaUnitOfWork.Add(TSaga saga) /// /// Get an item from the if it has already been loaded. /// - TSaga ISagaUnitOfWork.Get(Guid id, int? expectedVersion) + TSaga ISagaUnitOfWork.Get(Guid id, int? expectedVersion, bool useSnapshots) { Tuple, int, int, int> testSaga = TestSagasWithAddCountGetCountAndCommitCount[id]; testSaga = new Tuple, int, int, int>(testSaga.Item1, testSaga.Item2, testSaga.Item3 + 1, testSaga.Item4); diff --git a/Framework/Cqrs/Cqrs.csproj b/Framework/Cqrs/Cqrs.csproj index f46805fed7..d276f3d728 100644 --- a/Framework/Cqrs/Cqrs.csproj +++ b/Framework/Cqrs/Cqrs.csproj @@ -20,6 +20,7 @@ * Added the ability to delay sending message publishing. * Introduced the IMessageWithOrderingKey interface to aid in applying message process ordering if supported. * Added the ability to publish non-saga events from a saga. + * Added snapshot support to sagas via a SnapshotSagaRepository Version 5.0 diff --git a/Framework/Cqrs/Domain/AggregateRepository.cs b/Framework/Cqrs/Domain/AggregateRepository.cs index dd8cb6d1c5..46cb4b527e 100644 --- a/Framework/Cqrs/Domain/AggregateRepository.cs +++ b/Framework/Cqrs/Domain/AggregateRepository.cs @@ -9,13 +9,17 @@ using System; using System.Collections.Generic; using System.Linq; -using System.Threading.Tasks; using Chinchilla.Logging; using Cqrs.Configuration; using Cqrs.Domain.Exceptions; using Cqrs.Domain.Factories; using Cqrs.Events; +#if NET472 +#else +using System.Threading.Tasks; +#endif + namespace Cqrs.Domain { /// diff --git a/Framework/Cqrs/Domain/IAggregateRepository.cs b/Framework/Cqrs/Domain/IAggregateRepository.cs index 233068b1fa..68839c1f98 100644 --- a/Framework/Cqrs/Domain/IAggregateRepository.cs +++ b/Framework/Cqrs/Domain/IAggregateRepository.cs @@ -8,9 +8,13 @@ using System; using System.Collections.Generic; -using System.Threading.Tasks; using Cqrs.Events; +#if NET472 +#else +using System.Threading.Tasks; +#endif + namespace Cqrs.Domain { /// diff --git a/Framework/Cqrs/Domain/ISagaDescriptor.cs b/Framework/Cqrs/Domain/ISagaDescriptor.cs index d341aed634..e2ced204d1 100644 --- a/Framework/Cqrs/Domain/ISagaDescriptor.cs +++ b/Framework/Cqrs/Domain/ISagaDescriptor.cs @@ -14,6 +14,9 @@ internal interface ISagaDescriptor { [DataMember] int Version { get; set; } + + [DataMember] + bool UseSnapshots { get; set; } } internal interface ISagaDescriptor : ISagaDescriptor diff --git a/Framework/Cqrs/Domain/ISagaRepository.cs b/Framework/Cqrs/Domain/ISagaRepository.cs index 68d17cb2bf..45ea3361bd 100644 --- a/Framework/Cqrs/Domain/ISagaRepository.cs +++ b/Framework/Cqrs/Domain/ISagaRepository.cs @@ -8,9 +8,13 @@ using System; using System.Collections.Generic; -using System.Threading.Tasks; using Cqrs.Events; +#if NET472 +#else +using System.Threading.Tasks; +#endif + namespace Cqrs.Domain { /// diff --git a/Framework/Cqrs/Domain/ISagaUnitOfWork.cs b/Framework/Cqrs/Domain/ISagaUnitOfWork.cs index e6c62230a0..d28487bc8c 100644 --- a/Framework/Cqrs/Domain/ISagaUnitOfWork.cs +++ b/Framework/Cqrs/Domain/ISagaUnitOfWork.cs @@ -7,7 +7,11 @@ #endregion using System; + +#if NET472 +#else using System.Threading.Tasks; +#endif namespace Cqrs.Domain { @@ -24,7 +28,7 @@ void Add #else Task AddAsync #endif - (TSaga saga) + (TSaga saga, bool useSnapshots = false) where TSaga : ISaga; /// @@ -35,7 +39,7 @@ TSaga Get #else Task GetAsync #endif - (Guid id, int? expectedVersion = null) + (Guid id, int? expectedVersion = null, bool useSnapshots = false) where TSaga : ISaga; /// diff --git a/Framework/Cqrs/Domain/ISnapshotSagaRepository.cs b/Framework/Cqrs/Domain/ISnapshotSagaRepository.cs new file mode 100644 index 0000000000..3b5e860b47 --- /dev/null +++ b/Framework/Cqrs/Domain/ISnapshotSagaRepository.cs @@ -0,0 +1,21 @@ +#region Copyright +// // ----------------------------------------------------------------------- +// // +// // Copyright Chinchilla Software Limited. All rights reserved. +// // +// // ----------------------------------------------------------------------- +#endregion + +using System; + +namespace Cqrs.Domain +{ + /// + /// Provides basic snapshot repository methods for operations with instances of . + /// + /// The of authentication token. + public interface ISnapshotSagaRepository + : ISagaRepository + { + } +} \ No newline at end of file diff --git a/Framework/Cqrs/Domain/IUnitOfWork.cs b/Framework/Cqrs/Domain/IUnitOfWork.cs index 2b9a73852d..1b19320183 100644 --- a/Framework/Cqrs/Domain/IUnitOfWork.cs +++ b/Framework/Cqrs/Domain/IUnitOfWork.cs @@ -7,9 +7,13 @@ #endregion using System; -using System.Threading.Tasks; using Cqrs.Events; +#if NET472 +#else +using System.Threading.Tasks; +#endif + namespace Cqrs.Domain { /// diff --git a/Framework/Cqrs/Domain/SagaDescriptor.cs b/Framework/Cqrs/Domain/SagaDescriptor.cs index 3ab2a3e364..f6036158fb 100644 --- a/Framework/Cqrs/Domain/SagaDescriptor.cs +++ b/Framework/Cqrs/Domain/SagaDescriptor.cs @@ -23,5 +23,8 @@ ISaga ISagaDescriptor.Saga [DataMember] public int Version { get; set; } + + [DataMember] + public bool UseSnapshots { get; set; } } } \ No newline at end of file diff --git a/Framework/Cqrs/Domain/SagaUnitOfWork.cs b/Framework/Cqrs/Domain/SagaUnitOfWork.cs index dc7d16c866..840adeea50 100644 --- a/Framework/Cqrs/Domain/SagaUnitOfWork.cs +++ b/Framework/Cqrs/Domain/SagaUnitOfWork.cs @@ -28,8 +28,22 @@ public class SagaUnitOfWork : ISagaUnitOfWork Repository { get; set; } + private ISnapshotSagaRepository SnapshotRepository { get; set; } + private Dictionary> TrackedSagas { get; set; } + /// + /// Instantiates a new instance of + /// + public SagaUnitOfWork(ISnapshotSagaRepository snapshotRepository, ISagaRepository repository) + : this(repository) + { + if (snapshotRepository == null) + throw new ArgumentNullException("snapshotRepository"); + + SnapshotRepository = snapshotRepository; + } + /// /// Instantiates a new instance of /// @@ -51,7 +65,7 @@ void Add #else async Task AddAsync #endif - (TSaga saga) + (TSaga saga, bool useSnapshots = false) where TSaga : ISaga { if (!IsTracked(saga.Id)) @@ -59,7 +73,8 @@ async Task AddAsync var sagaDescriptor = new SagaDescriptor { Saga = saga, - Version = saga.Version + Version = saga.Version, + UseSnapshots = useSnapshots }; TrackedSagas.Add(saga.Id, sagaDescriptor); } @@ -80,7 +95,7 @@ TSaga Get #else async Task GetAsync #endif - (Guid id, int? expectedVersion = null) + (Guid id, int? expectedVersion = null, bool useSnapshots = false) where TSaga : ISaga { if(IsTracked(id)) @@ -93,9 +108,9 @@ async Task GetAsync var saga = #if NET40 - Repository.Get + (useSnapshots ? SnapshotRepository : Repository).Get #else - await Repository.GetAsync + await (useSnapshots ? SnapshotRepository : Repository).GetAsync #endif (id); if (expectedVersion != null && saga.Version != expectedVersion) @@ -105,7 +120,7 @@ await Repository.GetAsync #else await AddAsync #endif - (saga); + (saga, useSnapshots); return saga; } @@ -131,9 +146,9 @@ async Task CommitAsync foreach (ISagaDescriptor descriptor in TrackedSagas.Values) { #if NET40 - Repository.Save + (descriptor.UseSnapshots ? SnapshotRepository : Repository).Save #else - await Repository.SaveAsync + await (descriptor.UseSnapshots ? SnapshotRepository : Repository).SaveAsync #endif (descriptor.Saga, descriptor.Version); } diff --git a/Framework/Cqrs/Snapshots/DefaultSnapshotStrategy.cs b/Framework/Cqrs/Snapshots/DefaultSnapshotStrategy.cs index ed596c2283..9a89449a23 100644 --- a/Framework/Cqrs/Snapshots/DefaultSnapshotStrategy.cs +++ b/Framework/Cqrs/Snapshots/DefaultSnapshotStrategy.cs @@ -32,7 +32,7 @@ public virtual bool IsSnapshotable(Type aggregateType) { if (aggregateType.BaseType == null) return false; - if (aggregateType.BaseType.IsGenericType && aggregateType.BaseType.GetGenericTypeDefinition() == typeof(SnapshotAggregateRoot<,>)) + if (aggregateType.BaseType.IsGenericType && new [] { typeof(SnapshotAggregateRoot<,>), typeof(SnapshotSaga<,>) }.Contains(aggregateType.BaseType.GetGenericTypeDefinition())) return true; return IsSnapshotable(aggregateType.BaseType); } @@ -60,6 +60,29 @@ public virtual bool ShouldMakeSnapShot(IAggregateRoot aggr return false; } + /// + /// Checks and if it is, also checks if the calculated version number would be exactly dividable by . + /// + /// The to check. + /// A collection of uncommited changes to assess. If null the aggregate will be asked to provide them. + public bool ShouldMakeSnapShot(ISaga saga, IEnumerable> uncommittedChanges = null) + { + if (!IsSnapshotable(saga.GetType())) + return false; + + // The reason this isn't as simple as `(aggregate.Version + aggregate.GetUncommittedChanges().Count()) % SnapshotInterval` is + // because if there are enough uncommited events that this would result in a snapshot, plus some left over, the final state is what will be generated + // when the snapshot is taken, thus this is a faster and more accurate assessment. + int limit = (uncommittedChanges ?? saga.GetUncommittedChanges()).Count(); + int i = saga.Version - limit; + int snapshotInterval = GetSnapshotInterval(); + + for (int j = 0; j < limit; j++) + if (++i % snapshotInterval == 0 && i != 0) + return true; + return false; + } + /// /// Returns the value of . /// diff --git a/Framework/Cqrs/Snapshots/ISnapshotStore.cs b/Framework/Cqrs/Snapshots/ISnapshotStore.cs index 2147c91658..dbeefa404a 100644 --- a/Framework/Cqrs/Snapshots/ISnapshotStore.cs +++ b/Framework/Cqrs/Snapshots/ISnapshotStore.cs @@ -7,9 +7,13 @@ #endregion using System; -using System.Threading.Tasks; using Cqrs.Domain; +#if NET472 +#else +using System.Threading.Tasks; +#endif + namespace Cqrs.Snapshots { /// diff --git a/Framework/Cqrs/Snapshots/ISnapshotStrategy.cs b/Framework/Cqrs/Snapshots/ISnapshotStrategy.cs index 8cfb6e0f27..4a2ce90474 100644 --- a/Framework/Cqrs/Snapshots/ISnapshotStrategy.cs +++ b/Framework/Cqrs/Snapshots/ISnapshotStrategy.cs @@ -27,6 +27,14 @@ public interface ISnapshotStrategy /// A collection of uncommited changes to assess. If null the aggregate will be asked to provide them. bool ShouldMakeSnapShot(IAggregateRoot aggregate, IEnumerable> uncommittedChanges = null); + /// + /// Indicates if the provided should have a made. + /// This does NOT indicate if the provided can have a made or not. + /// + /// The to check. + /// A collection of uncommited changes to assess. If null the aggregate will be asked to provide them. + bool ShouldMakeSnapShot(ISaga saga, IEnumerable> uncommittedChanges = null); + /// /// Indicates if the provided can have a made or not. /// diff --git a/Framework/Cqrs/Snapshots/SnapshotAggregateRoot.cs b/Framework/Cqrs/Snapshots/SnapshotAggregateRoot.cs index 3a597acaab..c412d013d7 100644 --- a/Framework/Cqrs/Snapshots/SnapshotAggregateRoot.cs +++ b/Framework/Cqrs/Snapshots/SnapshotAggregateRoot.cs @@ -6,7 +6,10 @@ // // ----------------------------------------------------------------------- #endregion +using Chinchilla.Logging; +using Cqrs.Configuration; using Cqrs.Domain; +using System; namespace Cqrs.Snapshots { @@ -19,6 +22,14 @@ public abstract class SnapshotAggregateRoot : AggregateRoot where TSnapshot : Snapshot { + /// + /// A constructor for the + /// + protected SnapshotAggregateRoot() + : base() + { + } + /// /// Calls and applies the of this instance to the result. /// diff --git a/Framework/Cqrs/Snapshots/SnapshotRepository.cs b/Framework/Cqrs/Snapshots/SnapshotRepository.cs index dedc23c22d..099596eb9a 100644 --- a/Framework/Cqrs/Snapshots/SnapshotRepository.cs +++ b/Framework/Cqrs/Snapshots/SnapshotRepository.cs @@ -9,12 +9,16 @@ using System; using System.Collections.Generic; using System.Linq; -using System.Threading.Tasks; using Cqrs.Domain; using Cqrs.Domain.Factories; using Cqrs.Events; using Cqrs.Infrastructure; +#if NET472 +#else +using System.Threading.Tasks; +#endif + namespace Cqrs.Snapshots { /// @@ -86,7 +90,7 @@ async Task SaveAsync (TAggregateRoot aggregate, int? expectedVersion = null) where TAggregateRoot : IAggregateRoot { - // We need to grab these first as the changes will have been commitedd already by the time we go to make the snapshot. + // We need to grab these first as the changes will have been commited already by the time we go to make the snapshot. IEnumerable> uncommittedChanges = aggregate.GetUncommittedChanges(); // Save the evets first then snapshot the system. #if NET40 @@ -265,7 +269,7 @@ await SnapshotStore.GetAsync #if NET40 /// - /// Calls on + /// Calls on /// If the is snapshot-able is called /// The is calculated, finally is called on . /// @@ -273,7 +277,7 @@ await SnapshotStore.GetAsync /// A collection of uncommited changes to assess. If null the aggregate will be asked to provide them. #else /// - /// Calls on + /// Calls on /// If the is snapshot-able is called /// The is calculated, finally is called on . /// @@ -291,16 +295,16 @@ async Task TryMakeSnapshotAsync if (!SnapshotStrategy.ShouldMakeSnapShot(aggregate, uncommittedChanges)) return; dynamic snapshot = aggregate.AsDynamic().GetSnapshot().RealObject; - var rsnapshot = snapshot as Snapshot; - if (rsnapshot != null) + var rSnapshot = snapshot as Snapshot; + if (rSnapshot != null) { - rsnapshot.Version = aggregate.Version; + rSnapshot.Version = aggregate.Version; #if NET40 SnapshotStore.Save #else await SnapshotStore.SaveAsync #endif - (rsnapshot); + (rSnapshot); } else { diff --git a/Framework/Cqrs/Snapshots/SnapshotSaga.cs b/Framework/Cqrs/Snapshots/SnapshotSaga.cs new file mode 100644 index 0000000000..72e42582f4 --- /dev/null +++ b/Framework/Cqrs/Snapshots/SnapshotSaga.cs @@ -0,0 +1,82 @@ +#region Copyright +// // ----------------------------------------------------------------------- +// // +// // Copyright Chinchilla Software Limited. All rights reserved. +// // +// // ----------------------------------------------------------------------- +#endregion + +using Chinchilla.Logging; +using Cqrs.Configuration; +using Cqrs.Domain; +using System; + +namespace Cqrs.Snapshots +{ + /// + /// An that supports snapshots for optimised rehydration. + /// + public abstract class SnapshotSaga + : Saga + where TSnapshot : Snapshot + { + /// + /// A constructor for the + /// + protected SnapshotSaga() + :base() + { + } + + /// + /// A constructor for the + /// + protected SnapshotSaga(IDependencyResolver dependencyResolver, ILogger logger) + : base(dependencyResolver, logger) + { + } + + /// + /// A constructor for the + /// + protected SnapshotSaga(IDependencyResolver dependencyResolver, ILogger logger, Guid rsn) + : base(dependencyResolver, logger, rsn) + { + } + + /// + /// Calls and applies the of this instance to the result. + /// + public virtual TSnapshot GetSnapshot() + { + TSnapshot snapshot = CreateSnapshot(); + snapshot.Id = Id; + // Version is handled by the repository + return snapshot; + } + + /// + /// Sets the of this instance from the provided , + /// sets the of this instance from the provided , + /// then calls + /// + /// The to rehydrate this instance from. + public virtual void Restore(TSnapshot snapshot) + { + Id = snapshot.Id; + Version = snapshot.Version; + RestoreFromSnapshot(snapshot); + } + + /// + /// Create a of the current state of this instance. + /// + protected abstract TSnapshot CreateSnapshot(); + + /// + /// Rehydrate this instance from the provided . + /// + /// The to rehydrate this instance from. + protected abstract void RestoreFromSnapshot(TSnapshot snapshot); + } +} \ No newline at end of file diff --git a/Framework/Cqrs/Snapshots/SnapshotSagaRepository.cs b/Framework/Cqrs/Snapshots/SnapshotSagaRepository.cs new file mode 100644 index 0000000000..5fe3272759 --- /dev/null +++ b/Framework/Cqrs/Snapshots/SnapshotSagaRepository.cs @@ -0,0 +1,278 @@ +#region Copyright +// // ----------------------------------------------------------------------- +// // +// // Copyright Chinchilla Software Limited. All rights reserved. +// // +// // ----------------------------------------------------------------------- +#endregion + +using System; +using System.Collections.Generic; +using System.Linq; +using Cqrs.Domain; +using Cqrs.Domain.Factories; +using Cqrs.Events; +using Cqrs.Infrastructure; + +#if NET472 +#else +using System.Threading.Tasks; +#endif + +namespace Cqrs.Snapshots +{ + /// + /// Provides basic repository methods for operations with instances of + /// utilising snapshots for optimised rehydration. + /// + /// The of authentication token. + public class SnapshotSagaRepository + : ISnapshotSagaRepository + { + /// + /// Gets or sets the . + /// + protected ISnapshotStore SnapshotStore { get; private set; } + + /// + /// Gets or sets the . + /// + protected ISnapshotStrategy SnapshotStrategy { get; private set; } + + /// + /// Gets or sets the . + /// + protected ISagaRepository Repository { get; private set; } + + /// + /// Gets or sets the . + /// + protected IEventStore EventStore { get; private set; } + + /// + /// Gets or sets the . + /// + protected IAggregateFactory SagaFactory { get; private set; } + + /// + /// Instantiates a new instance of . + /// + public SnapshotSagaRepository(ISnapshotStore snapshotStore, ISnapshotStrategy snapshotStrategy, ISagaRepository repository, IEventStore eventStore, IAggregateFactory sagaFactory) + { + SnapshotStore = snapshotStore; + SnapshotStrategy = snapshotStrategy; + Repository = repository; + EventStore = eventStore; + SagaFactory = sagaFactory; + } + +#if NET40 + /// + /// Calls then ISagaRepository{TAuthenticationToken}.Save on . + /// + /// The of the . + /// The to save and persist. + /// The version number the is expected to be at. +#else + /// + /// Calls then ISagaRepository{TAuthenticationToken}.Save on . + /// + /// The of the . + /// The to save and persist. + /// The version number the is expected to be at. +#endif + public virtual +#if NET40 + void Save +#else + async Task SaveAsync +#endif + (TSaga saga, int? expectedVersion = null) + where TSaga : ISaga + { + // We need to grab these first as the changes will have been commited already by the time we go to make the snapshot. + IEnumerable> uncommittedChanges = saga.GetUncommittedChanges(); + // Save the evets first then snapshot the system. +#if NET40 + Repository.Save +#else + await Repository.SaveAsync +#endif + (saga, expectedVersion); + +#if NET40 + TryMakeSnapshot +#else + await TryMakeSnapshotAsync +#endif + (saga, uncommittedChanges); + } + +#if NET40 + /// + /// Retrieves an of type , + /// First using , otherwise via ISagaRepository{TAuthenticationToken}.Get on + /// Then does rehydration. + /// + /// The of the . + /// The identifier of the to retrieve. + /// + /// A collection of to replay on the retrieved . + /// If null, the will be used to retrieve a list of for you. + /// +#else + /// + /// Retrieves an of type , + /// First using , otherwise via ISagaRepository{TAuthenticationToken}.Get on + /// Then does rehydration. + /// + /// The of the . + /// The identifier of the to retrieve. + /// + /// A collection of to replay on the retrieved . + /// If null, the will be used to retrieve a list of for you. + /// +#endif + public virtual +#if NET40 + TSaga Get +#else + async Task GetAsync +#endif + (Guid sagaId, IList> events = null) + where TSaga : ISaga + { + var saga = SagaFactory.Create(); + int snapshotVersion = +#if NET40 + TryRestoreSagaFromSnapshot +#else + await TryRestoreSagaFromSnapshotAsync +#endif + (sagaId, saga); + if (snapshotVersion == -1) + { + return +#if NET40 + Repository.Get +#else + await Repository.GetAsync +#endif + (sagaId); + } + IEnumerable> theseEvents = events ?? ( +#if NET40 + EventStore.Get +#else + await EventStore.GetAsync +#endif + (sagaId, false, snapshotVersion) + ).Where(desc => desc.Version > snapshotVersion) + .Cast>().ToList(); + saga.LoadFromHistory(theseEvents); + + return saga; + } + +#if NET40 + /// + /// Calls on + /// If the is snapshot-able is called on . + /// The Restore method is then called on + /// + /// The of the . + /// The identifier of the to restore, since the may be completely uninitialised. + /// The + /// -1 if no restoration was made, otherwise version number the was rehydrated to. + /// There may be more events after the snapshot that still need to rehydrated into the after restoration. +#else + /// + /// Calls on + /// If the is snapshot-able is called on . + /// The Restore method is then called on + /// + /// The of the . + /// The identifier of the to restore, since the may be completely uninitialised. + /// The + /// -1 if no restoration was made, otherwise version number the was rehydrated to. + /// There may be more events after the snapshot that still need to rehydrated into the after restoration. +#endif + protected virtual +#if NET40 + int TryRestoreSagaFromSnapshot +#else + async Task TryRestoreSagaFromSnapshotAsync +#endif + (Guid id, TSaga saga) + { + int version = -1; + if (SnapshotStrategy.IsSnapshotable(typeof(TSaga))) + { + Snapshot snapshot = +#if NET40 + SnapshotStore.Get +#else + await SnapshotStore.GetAsync +#endif + (id); + if (snapshot != null) + { + saga.AsDynamic().Restore(snapshot); + version = snapshot.Version; + } + } + return version; + } + +#if NET40 + /// + /// Calls on + /// If the is snapshot-able is called + /// The is calculated, finally is called on . + /// + /// The to try and snapshot. + /// A collection of uncommited changes to assess. If null the saga will be asked to provide them. +#else + /// + /// Calls on + /// If the is snapshot-able is called + /// The is calculated, finally is called on . + /// + /// The to try and snapshot. + /// A collection of uncommited changes to assess. If null the saga will be asked to provide them. +#endif + protected virtual +#if NET40 + void TryMakeSnapshot +#else + async Task TryMakeSnapshotAsync +#endif + (ISaga saga, IEnumerable> uncommittedChanges) + { + if (!SnapshotStrategy.ShouldMakeSnapShot(saga, uncommittedChanges)) + return; + dynamic snapshot = saga.AsDynamic().GetSnapshot().RealObject; + var rSnapshot = snapshot as Snapshot; + if (rSnapshot != null) + { + rSnapshot.Version = saga.Version; +#if NET40 + SnapshotStore.Save +#else + await SnapshotStore.SaveAsync +#endif + (rSnapshot); + } + else + { + snapshot.Version = saga.Version; +#if NET40 + SnapshotStore.Save +#else + await SnapshotStore.SaveAsync +#endif + (snapshot); + } + } + } +} \ No newline at end of file diff --git a/Framework/Cqrs/Snapshots/SnapshotStore.cs b/Framework/Cqrs/Snapshots/SnapshotStore.cs index 9d47aa3225..5014bfe8ce 100644 --- a/Framework/Cqrs/Snapshots/SnapshotStore.cs +++ b/Framework/Cqrs/Snapshots/SnapshotStore.cs @@ -7,12 +7,16 @@ #endregion using System; -using System.Threading.Tasks; using Chinchilla.Logging; using Cqrs.Configuration; using Cqrs.Domain; using Cqrs.Events; +#if NET472 +#else +using System.Threading.Tasks; +#endif + namespace Cqrs.Snapshots { ///