Skip to content

Commit

Permalink
Adding number of partitions to the processor and LogOrchestrator stra…
Browse files Browse the repository at this point in the history
…tegies to enable switch between different ways to handle the execution
  • Loading branch information
juanfranblanco committed Mar 28, 2024
1 parent 592b12b commit 6699ae6
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using Nethereum.BlockchainProcessing.Processor;
using Nethereum.RPC.Eth.DTOs;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace Nethereum.BlockchainProcessing.LogProcessing
{
public interface ILogProcessStrategy
{
Task ProcessLogs(FilterLog[] logs, IEnumerable<ProcessorHandler<FilterLog>> logProcessors);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,37 @@

namespace Nethereum.BlockchainProcessing.LogProcessing
{
public class LogProcessParallelStrategy : ILogProcessStrategy
{
public int Partitions { get; set; } = 2;
public async Task ProcessLogs(FilterLog[] logs, IEnumerable<ProcessorHandler<FilterLog>> logProcessors)
{
await logProcessors.ForEachAsync(async logProcessor =>
{
foreach (var log in logs)
{
await logProcessor.ExecuteAsync(log).ConfigureAwait(false);
}
}, Partitions);
}

}

public class LogProcessSequentialStrategy : ILogProcessStrategy
{
public async Task ProcessLogs(FilterLog[] logs, IEnumerable<ProcessorHandler<FilterLog>> logProcessors)
{
foreach (var logProcessor in logProcessors)
{
foreach (var log in logs)
{
await logProcessor.ExecuteAsync(log).ConfigureAwait(false);
}
}
}
}


public class LogOrchestrator : IBlockchainProcessingOrchestrator
{
public const int MaxGetLogsRetries = 10;
Expand All @@ -22,6 +53,7 @@ public class LogOrchestrator : IBlockchainProcessingOrchestrator
private readonly IEnumerable<ProcessorHandler<FilterLog>> _logProcessors;
private NewFilterInput _filterInput;
private BlockRangeRequestStrategy _blockRangeRequestStrategy;
public ILogProcessStrategy LogProcessStrategy { get; set; } = new LogProcessParallelStrategy();

protected IEthApiContractService EthApi { get; set; }

Expand Down Expand Up @@ -77,14 +109,19 @@ public async Task<OrchestrationProgress> ProcessAsync(BigInteger fromNumber, Big
}

private async Task InvokeLogProcessorsAsync(FilterLog[] logs)
{
await NewMethod(logs);
}

private async Task NewMethod(FilterLog[] logs)
{
await _logProcessors.ForEachAsync(async logProcessor =>
{
foreach (var log in logs)
{
await logProcessor.ExecuteAsync(log).ConfigureAwait(false);
}
});
{
foreach (var log in logs)
{
await logProcessor.ExecuteAsync(log).ConfigureAwait(false);
}
});
}

struct GetLogsResponse
Expand Down
8 changes: 6 additions & 2 deletions src/Nethereum.Util/TaskExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@ public static Task ForEachAsync<T>(this IEnumerable<T> source, int degreeOfParal
await action(partition.Current);
})));
}
public static Task ForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> action)
public static Task ForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> action, int? partitions = null)
{
return Task.WhenAll(Partitioner.Create(source).GetPartitions(Environment.ProcessorCount).Select(partition => Task.Run(async () =>
if(partitions == null)
{
partitions = Environment.ProcessorCount;
}
return Task.WhenAll(Partitioner.Create(source).GetPartitions(partitions.Value).Select(partition => Task.Run(async () =>
{
using (partition)
while (partition.MoveNext())
Expand Down

0 comments on commit 6699ae6

Please sign in to comment.