diff --git a/src/Nethereum.BlockchainProcessing/LogProcessing/ILogProcessStrategy.cs b/src/Nethereum.BlockchainProcessing/LogProcessing/ILogProcessStrategy.cs new file mode 100644 index 000000000..f699b5605 --- /dev/null +++ b/src/Nethereum.BlockchainProcessing/LogProcessing/ILogProcessStrategy.cs @@ -0,0 +1,12 @@ +using Nethereum.BlockchainProcessing.Processor; +using Nethereum.RPC.Eth.DTOs; +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace Nethereum.BlockchainProcessing.LogProcessing +{ + public interface ILogProcessStrategy + { + Task ProcessLogs(FilterLog[] logs, IEnumerable> logProcessors); + } +} \ No newline at end of file diff --git a/src/Nethereum.BlockchainProcessing/LogProcessing/LogOrchestrator.cs b/src/Nethereum.BlockchainProcessing/LogProcessing/LogOrchestrator.cs index dcb6597a4..1f3b02921 100644 --- a/src/Nethereum.BlockchainProcessing/LogProcessing/LogOrchestrator.cs +++ b/src/Nethereum.BlockchainProcessing/LogProcessing/LogOrchestrator.cs @@ -14,6 +14,37 @@ namespace Nethereum.BlockchainProcessing.LogProcessing { + public class LogProcessParallelStrategy : ILogProcessStrategy + { + public int Partitions { get; set; } = 2; + public async Task ProcessLogs(FilterLog[] logs, IEnumerable> logProcessors) + { + await logProcessors.ForEachAsync(async logProcessor => + { + foreach (var log in logs) + { + await logProcessor.ExecuteAsync(log).ConfigureAwait(false); + } + }, Partitions); + } + + } + + public class LogProcessSequentialStrategy : ILogProcessStrategy + { + public async Task ProcessLogs(FilterLog[] logs, IEnumerable> logProcessors) + { + foreach (var logProcessor in logProcessors) + { + foreach (var log in logs) + { + await logProcessor.ExecuteAsync(log).ConfigureAwait(false); + } + } + } + } + + public class LogOrchestrator : IBlockchainProcessingOrchestrator { public const int MaxGetLogsRetries = 10; @@ -22,6 +53,7 @@ public class LogOrchestrator : IBlockchainProcessingOrchestrator private readonly IEnumerable> _logProcessors; private NewFilterInput _filterInput; private BlockRangeRequestStrategy _blockRangeRequestStrategy; + public ILogProcessStrategy LogProcessStrategy { get; set; } = new LogProcessParallelStrategy(); protected IEthApiContractService EthApi { get; set; } @@ -77,14 +109,19 @@ public async Task ProcessAsync(BigInteger fromNumber, Big } private async Task InvokeLogProcessorsAsync(FilterLog[] logs) + { + await NewMethod(logs); + } + + private async Task NewMethod(FilterLog[] logs) { await _logProcessors.ForEachAsync(async logProcessor => - { - foreach (var log in logs) - { - await logProcessor.ExecuteAsync(log).ConfigureAwait(false); - } - }); + { + foreach (var log in logs) + { + await logProcessor.ExecuteAsync(log).ConfigureAwait(false); + } + }); } struct GetLogsResponse diff --git a/src/Nethereum.Util/TaskExtensions.cs b/src/Nethereum.Util/TaskExtensions.cs index db6ab7d36..bb7aa591f 100644 --- a/src/Nethereum.Util/TaskExtensions.cs +++ b/src/Nethereum.Util/TaskExtensions.cs @@ -26,9 +26,13 @@ public static Task ForEachAsync(this IEnumerable source, int degreeOfParal await action(partition.Current); }))); } - public static Task ForEachAsync(this IEnumerable source, Func action) + public static Task ForEachAsync(this IEnumerable source, Func action, int? partitions = null) { - return Task.WhenAll(Partitioner.Create(source).GetPartitions(Environment.ProcessorCount).Select(partition => Task.Run(async () => + if(partitions == null) + { + partitions = Environment.ProcessorCount; + } + return Task.WhenAll(Partitioner.Create(source).GetPartitions(partitions.Value).Select(partition => Task.Run(async () => { using (partition) while (partition.MoveNext())