Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds Data Pipeline Operators #1378

Draft
wants to merge 16 commits into
base: master
Choose a base branch
from

Conversation

WhitWaldo
Copy link
Contributor

@WhitWaldo WhitWaldo commented Oct 23, 2024

Description

For some time now, there's been an active discussion about how much the SDK should do in terms of handling serialization for developers. I'm in the camp that the SDK should do exactly as much as the developer wants: that is, is the developer wants to provide the SDK some generic type and get that type back, the SDK should handle all the serialization necessary behind the scenes.

Today, the SDK does this, but it handles it differently throughout the clients. Dapr.Client uses System.Text.Json under the hood and while the options can be themselves customized by the developer, the serializer itself cannot be replaced with another. Dapr.Actors.Client gives developers the option of using Data Contracts or System.Text.Json, but again the former is baked into the project and the developer can only configure it with the provided options.

I've seen issues like this and requests in Discord for support for compression and today, there's not a great place to put that in the SDK. Do we want to bake in support for Gzip, Zip and LZMA? How are options configured for each?

I've also thought there could be an opportunity for interplay between different functionality. Today, the state store provides transparent encryption handled by the runtime with a few limited options, but what if the developer wants to utilize the more broadly capable cryptography block to perform that encryption/decryption operation in a more customizable manner?

I can think of other such operations a developer might want to optionally run against data they're putting into and taking out of the various SDKs:

  • Data masking/anonymization
  • Verification
  • Transformation

I propose that instead of making all this declaratively done uniformly on a per-component basis in the runtime, it should instead be done at the SDK level (utilizing the runtime functionality as able).

However, the .NET SDK as built today doesn't have a clear place to put any of that, much less make it optional. As I intend to make the SDK more DI-friendly, I would like to further I propose a solution to this problem adding a shared functionality that heavily relies on DI to provide an opt-in bidirectional data processing pipeline on a per-type basis.

Terminology

I'll review some terminology first so we're on the same page.

  • Pipeline: Providing the core processing apparatus, a pipeline asynchronously performs each of the intended operations. A DaprEncodingPipeline converts from the generic type passed in to the ReadOnlyMemory<byte> it produces and a DaprDecodingPipeline handles the reverse from ReadOnlyMemory<byte> back to the generic type.
  • Operation: Any data-oriented activity that can process the data. All operations must provide a processing and a reversal method as well as a name.
  • Pipeline Factory: A class used internally to provision the services from their DI registration for any given type that produces the DaprDataPipeline used to process or reverse a payload.

Now, this system relies heavily on the DI registration of the various operations used so it can inject them later on, but it does not rely on reflection, by design. I'll cover registration itself in a moment. Do note that if the following seems more convoluted than it should be, I agree, but C# doesn't support second-order generics and I really didn't want to get into reflection, so we're stuck with what we have until that happy day.

Pipeline Lifecycle

The processing lifecycle and the types of operations are as follows in this order (note that you only need to know the interface names if you're going to extend this pipeline with your own operations):

  • The generic type instance is provided to the pipeline for processing and it is serialized into a string by the IDaprTStringTransitionOperation.
  • All operations registered as an IDaprStringBasedOperation will then process this string value and return a string in the order in which they were specified in the type's attribute. More on this later.
  • The string is encoded into a ReadOnlyMemory<byte> by the IDaprStringByteTransitionOperation.
  • All operations registered as an IDaprByteBasedOperation will then process this byte array and return a ReadOnlyMemory<byte> in the order in which they were specified in the type's attribute.

It is mandatory that if the developer opts into this pipeline, they must register at least a serializer and an encoder and that the other operations are optional.

This implementation provides the following:

  • Serialization is provided by System.Text.Json in SystemTextJsonSerializer implemented as an IDaprTStringTransitionOperation.
  • Encoding is provided by the built in C# encoding and supports UTF-8 via Utf8Encoder implemented as an IDaprStringByteTransitionOperation.
  • Verification is performed using the built-in SHA256 functionality via Sha256Verifier implemented as an IDaprByteBasedOperation
  • Compression is supported using Gzip in GzipCompressor implemented as an IDaprByteBasedOperation

Dependency Injection Registration

So how should these types be registered in DI so we can ensure they're mapped up to the right lifecycle portions? There's a fluent registration API provided that looks like the following:

        var services = new ServiceCollection();
        services.AddDaprDataProcessingPipeline()
            .WithCompressor<GzipCompressor>()
            .WithSerializer<SystemTextJsonSerializer<SampleRecord>>()
            .WithIntegrity(_ => new Sha256Validator(ServiceLifetime.Scoped))
            .WithEncoder(c => new Utf8Encoder());

        var serviceProvider = services.BuildServiceProvider();

Operation lifetimes default to singletons, but can be registered as scoped or transient. They can be registered as types or via a Func<IServiceProvider> in which it is provided an instance of IServiceProvider in case other services need to be injected for the operation registration. You must provide a registration for the operation if it is going to be referenced in an attribute, and that's an opportunity for an analyzer down the road. You're welcome to register as many compressors, serializers, integrity operations and encoders as you'd like here as they'll only be used as indicated in the pipeline attribute.

Defining operational order

The order in which the operations are processed are important as one cannot get from a generic type to a ReadOnlyMemory<byte> and do things in the middle without some sort of process, but as the developer is welcome to do any number of string-based and byte-based operations between the mandatory serialization and encoding steps, the system needs to understand the order so it can properly run the operation in reverse when the developer pulls the data back out of the backing store. The developer can specify the order using the DataPipelineAttribute as illustrated below:

[DataPipeline(typeof(GzipCompressor), typeof(SystemTextJsonSerializer), typeof(Utf8Encoder)]
public sealed record SampleRecord(string Name, int Value, bool Flag);

Now, order is important, but only with regards to the IDaprStringBasedOperation and IDaprByteBasedOperation types. Order will be recognized from first to last as provided in the arguments for this attribute with regards to each of those types, but the encoder and serializer and the other types can be defined anywhere at all. If more than one IDaprTStringTransitionOperation (e.g. serializer) or IDaprStringByteTransitionOperation (e.g. encoder) is specified, only the first is used and the others ignored. You're also welcome to duplicate operations in the attribute as much as you'd like and it will run them in their lifecycle order in the order presented. For example:

[DataPipeline(typeof(SystemTextJsonSerializer<MyType>), typeof(GzipCompressor), typeof(Utf8Encoder), typeof(Sha256Validator), typeof(GzipCompressor), typeof(Sha256Validator))]
private record MyType(string Name);

This will run the operations in the following order:

  • SystemTextJsonSerializer (because this is the first pipeline operation)
  • Utf8Encoder (because no string-specific operations are specified, so this is the third pipeline operation)
  • GZipCompressor (first observed byte-specific operation)
  • Sha256Validator
  • GzipCompressor
  • Sha256Validator

Some operations may need to create and persist metadata (e.g. hard to validate an operation down the line without persisting the hash), so the pipeline supports a shared dictionary of string-based values keyed to the ordered type that created it. In the same vein, the pipeline retains the order in which the operations are performed. This because reverse operations are performed in the order specified in the metadata irrespective of the order on the type attribute (only matters during forward processing). That said, any configuration options provided to each operation are not persisted in the encoding exercise, so any migration using different options would require versioning these operations in this version of the implementation.

Conclusion

On the SDK side then, there need only be a check on the optionally injected DataPipelineFactory to see if it was populated or not and if it is either:

  • create an encoding pipeline to serialize the data to the byte array and persist both the data and its metadata.
  • retrieve the payload and metadata for the persisted data and create a decoding pipeline to deserialize it

This is submitted as a draft for review, but is fully functional and tested at the time of this writing.

Checklist

Please make sure you've completed the relevant tasks for this PR, out of the following list:

  • Code compiles correctly
  • Created/updated tests
  • Extended the documentation

Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
…ention for each operation. Added more unit tests, including E2E of standard and duplicated operations.

Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
@WhitWaldo WhitWaldo added this to the Future milestone Oct 23, 2024
@WhitWaldo WhitWaldo self-assigned this Oct 23, 2024
Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
@WhitWaldo WhitWaldo added the kind/enhancement New feature or request label Oct 27, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/client kind/enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant