ScatterGather - .NET library to implement the scatter-gather pattern using a database to store distributed progress state
NuGet package | Link |
---|---|
ScatterGather | |
ScatterGather.DynamoDB | |
ScatterGather.MongoDB |
Scatter-gather is an enteprise integration pattern where a single big operation is split into a number of sub-operations, usually performed by separate workers, then some other operation must be carried on after all sub-operations have finished.
This library provides means to keep track of how many scattered sub-operations have been completed, that is gathered, using a database (currently Amazon DynamoDB and MongoDB are supported) to store that state and a gateway to manage it.
A typical scenario involves a component that is asked to process a large operation, and a set of workers this component wants to delegate parts of that large operation. For example, the first component may want to send messages to workers using a message queue, keeping processing of parts asynchronous.
Once a worker completes processing of a part, it may send a message back to the first component, or even to a third one, which act as an aggregator for the processed parts. When all parts have been processed, the large operation may move forward.
The first component performs a scatter of the large operation, while each worker perform a gather.
When a worker performs a gather, and the scatter-gather gateway provided by this libary notices it was the last part left, it calls a callback function in the context of that worker, and only that worker.
The scatter-gather pattern may be implemented on AWS using the Map state of Step functions. Using Map, you can split an operation across multiple workers. If you need to process a large number of sub-operations, you can run a so-called Distributed Map state, which takes its input from a file saved in an S3 bucket, containing the list the parts to scatter.
The Map state of Step functions will wait for all workers to complete before moving forward. In case of errors, the Map state will produce reports containing the failed or pending parts (saving them into an S3 bucket, in case of a Distributed Map state), which you can use to fabricate a new input to resume the failed state machine.
As an alternative, you might want to use the scatter-gather gateway provided by this library in the following cases:
- you want to decouple the scattering component and workers using a message queue, so that processing is asynchronous
- in case of errors, you want to take advantage of dead-letter queues so that you can restart a failing scatter-gather operation for the failed parts, using the same mechanics of non-scatter-gather operations
- you want more control over the progress state of the scatter-gather operation
- you don't want to create a file on S3 to list the scattered parts for a Distributed Map state
- you don't want to parse the result files saved by the Distributed Map state to S3 to know which parts failed and which parts did not even start
The IScatterGatherGateway
interface in the ScatterGather
namespace is the entry point for functionality of the scatter-gather gateway provided by this library. Two implementations are provided, one using Amazon DynamoDB and MongoDB.
Each scatter-gather request must be identified by a unique ScatterRequestId
generated by the application. Each scattered part must be identified by a ScatterPartId
that must be unique in that scatter-gather operation. Both types are just simple wrappers for a string, to make these values type safe.
Both the scattering component and gathering workers must create an IScatterGatherGateway
using one of the provided constructors.
Typically, the scattering component creates a unique ScatterRequestId
and executes a BeginScatter
, Scatter
, EndScatter
sequence, maybe calling Scatter
multiple times to add more parts to the scatter-gather operation (that is, the scatter-gather gateway is "stream friendly"). Each worker calls Gather
to mark each part as completed.
Performance-wise, the run time of all methods of IScatterGatherGateway
is proportional to the number of elements passed to that method, but is irrespective of the number of elements in the whole scatter-gather operation.
Task BeginScatter(ScatterRequestId requestId, string context);
Task Scatter (ScatterRequestId requestId, IEnumerable<ScatterPartId> partIds, Func<Task> callback);
Task<T> Scatter<T> (ScatterRequestId requestId, IEnumerable<ScatterPartId> partIds, Func<Task<T>> callback);
Task EndScatter (ScatterRequestId requestId, Func<string, Task> handleCompletion);
BeginScatter
initializes a new scatter-gather request identified by requestId
and accepts an arbitrary context
string that is associated with that request. This context string can contain any text meaningful to the application, perhaps even some JSON-encoded data, and it will be passed to the completion handler function. The size of the context string is limited by the size of the underlying storage, that is less than 400 kB for DynamoDB and less than 16 megabytes for MongoDB.
Scatter
tells the scatter-gather gateway that there are one or more new parts that are about to be scattered for the request identified by requestId
. Part identifiers are listed in partIds
. The callback
function must be specified to execute specific action on each scattered part, for example to send a message to a worker.
Scatter
has two overloads: one accepting a callback function returning a Task<T>
, where the resulting T
is returned by Scatter
itself, or one returning a Task
, where Scatter
itself returns nothing.
EndScatter
signals that all parts have been scattered, thus it is now possible to expect completion of the whole scatter-gather operation. The EndScatter
calls the completion handler function in case all parts, if any, have been gathered so fast that the scatter-gather operation is already completed.
Task Gather(ScatterRequestId requestId, IReadOnlyCollection<ScatterPartId> partIds, Func<string, Task> handleCompletion)
A worker calls Gather
after processing its parts (after is important for idempotency), to mark those parts as complete. The gathered parts (maybe just one) are identified by partIds
within requestId
. The handleCompletion
function will be executed if the scatter-gather gateway notices that that was the last part to be gathered.
Note that only one worker will be able to call the completion handler function, because the scatter-gather gateway treats it as a critical section. Also note that, in case of errors during the completion handler function, restarting the worker that was processing the last Gather
will restart the completion handler function (that is, the critical section is re-entrant).
This library provides two implementations of the IScatterGatherGateway
interface, allowing to choose between Amazon DynamoDB and MongoDB as storage to persist the progress state of a scatter-gather operation among distributed workers.
The scatter-gather gateway for DynamoDB is provided by the ScatterGather.DynamoDB
library.
The scatter-gather gateway needs a pair of master-detail DynamoDB tables to keep its state: one to store current scatter requests and one to list scattered parts.
The scatter-gather gateway will try to create those tables if they don't already exist, otherwise it will use the ones already present. Considering the typical usage patter of the scatter-gather gateway, the tables are created with a "pay per request" billing mode.
If you want more control over the billing mode and provisioned capacity, you can create the pair of tables manually, as follows:
- a table for scatter requests, that will contain an item for each scatter request that has been created, having a simple primary key composed of a
RequestId
string field as the partition key - a table for scattered parts, that will contain an item for each scattered sub-operation of a scatter request, having a composite primary key composed of a
RequestId
string field as the partition key and aPartId
string field as the sort key
Note that your application will need the following permissions on those two tables: dynamodb:CreateTable
, dynamodb:DescribeTable
, dynamodb:Query
, dynamodb:PutItem
, dynamodb:DeleteItem
, dynamodb:UpdateItem
and dynamodb:BatchWriteItem
.
For billing, given a scatter-gather operation consisting of N scattered parts, account for about 4 write request units for the request table, and about 2N write request units and N read request units for the part table. This is for normal operation with no errors and restarts involved.
To construct the scatter-gather gateway use one of the constructors of the ScatterGatherGateway
class in the ScatterGather.DynamoDB
namespace:
ScatterGatherGateway(string requestTableName, string partTableName);
ScatterGatherGateway(string dynamoDbServiceUrlOption, string requestTableName, string partTableName);
Pass either constructor the names for the request table and the part table, which may either be already existing or not. The second constructor allows you to pass a custom URL for the DynamoDB service, useful when you want to use, for example, a local DynamoDB for development and testing.
The scatter-gather gateway for MongoDB is provided by the ScatterGather.MongoDB
library.
To construct the scatter-gather gateway use one of the constructors of the ScatterGatherGateway
class in the ScatterGather.MongoDB
namespace:
ScatterGatherGateway(IMongoDatabase mongoDatabase, string collectionNamePrefix);
Pass the constructor the instance of the Mongo database to store the scatter-gather state and a string that will be used as a name prefix for the collections used by the scatter-gather gateway. Two collections are used, named <collectionNamePrefix>.Requests
and <collectionNamePrefix>.Parts
, storing scatted-gather operations and sub-operations respectively.
A full example using either database-specific implementation is provided in the Example directory of the repository.
Automated tests are run against containers created from the DynamoDB-local and MongoDB Docker images. A docker-compose file is provided so that you can just run docker-compose up
to run the DynamoDB and MongoDB containers before running tests. DynamoDB and MongoDB are mapped to TCP ports 8998 and 27017 on the host, respectively, and use no authentication.
Thanks to Matteo Pierangeli for his initial review and comments!
Permissive, 2-clause BSD style
ScatterGather - .NET library to implement the scatter-gather pattern using a database to store distributed progress state
Copyright 2023-2025 Salvatore ISAJA. All rights reserved.
Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
-
Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
-
Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.