From b3a368abff8c38948ed18ec1503f55ce445d48f6 Mon Sep 17 00:00:00 2001 From: Michiel van Praat Date: Thu, 18 Jul 2024 12:43:03 +0200 Subject: [PATCH] Add unit tes, fixed remarks of reusing the httpclient Signed-off-by: Michiel van Praat --- .../WorkflowServiceCollectionExtensions.cs | 9 +- test/Dapr.E2E.Test.App/Startup.cs | 20 ++++ test/Dapr.E2E.Test/DaprTestApp.cs | 1 + test/Dapr.E2E.Test/Workflows/WorkflowTest.cs | 105 ++++++++++++++---- 4 files changed, 112 insertions(+), 23 deletions(-) diff --git a/src/Dapr.Workflow/WorkflowServiceCollectionExtensions.cs b/src/Dapr.Workflow/WorkflowServiceCollectionExtensions.cs index 8c64ccb85..59afef923 100644 --- a/src/Dapr.Workflow/WorkflowServiceCollectionExtensions.cs +++ b/src/Dapr.Workflow/WorkflowServiceCollectionExtensions.cs @@ -60,7 +60,7 @@ public static IServiceCollection AddDaprWorkflow( if (TryGetGrpcAddress(out string address)) { - var client = new HttpClient(); + var client = options?.GrpcChannelOptions?.HttpClient ?? new HttpClient(); var daprApiToken = DaprDefaults.GetDefaultDaprApiToken(); if (!string.IsNullOrEmpty(daprApiToken)) @@ -68,14 +68,14 @@ public static IServiceCollection AddDaprWorkflow( client.DefaultRequestHeaders.Add("Dapr-Api-Token", daprApiToken); } - builder.UseGrpc(CreateChannel(address, client, options.GrpcChannelOptions)); + builder.UseGrpc(CreateChannel(address, client, options?.GrpcChannelOptions)); } else { builder.UseGrpc(); } - builder.AddTasks(registry => options.AddWorkflowsAndActivitiesToRegistry(registry)); + builder.AddTasks(registry => options?.AddWorkflowsAndActivitiesToRegistry(registry)); }); return serviceCollection; @@ -151,8 +151,9 @@ static bool TryGetGrpcAddress(out string address) static GrpcChannel CreateChannel(string address, HttpClient client, GrpcChannelOptions? grpcChannelOptions = null) { + ArgumentNullException.ThrowIfNull(client); + GrpcChannelOptions options = grpcChannelOptions ?? new(); - options.HttpClient ??= client; var daprEndpoint = DaprDefaults.GetDefaultGrpcEndpoint(); if (!String.IsNullOrEmpty(daprEndpoint)) diff --git a/test/Dapr.E2E.Test.App/Startup.cs b/test/Dapr.E2E.Test.App/Startup.cs index bfca60f91..7cd7b496a 100644 --- a/test/Dapr.E2E.Test.App/Startup.cs +++ b/test/Dapr.E2E.Test.App/Startup.cs @@ -33,6 +33,7 @@ namespace Dapr.E2E.Test using System; using Microsoft.Extensions.Logging; using Serilog; + using Grpc.Net.Client; /// /// Startup class. @@ -98,6 +99,25 @@ public void ConfigureServices(IServiceCollection services) return Task.FromResult($"We are shipping {input} to the customer using our hoard of drones!"); }); }); + services.AddDaprWorkflow(options => + { + // Example of registering a "StartOrder" workflow function + options.RegisterWorkflow("StartLargeOrder", implementation: async (context, input) => + { + var itemToPurchase = input; + itemToPurchase = await context.WaitForExternalEventAsync("FinishLargeOrder"); + return itemToPurchase; + }); + options.RegisterActivity("FinishLargeOrder", implementation: (context, input) => + { + return Task.FromResult($"We are finishing, it's huge!"); + }); + options.UseGrpcChannelOptions(new GrpcChannelOptions + { + MaxReceiveMessageSize = 32 * 1024 * 1024, + MaxSendMessageSize = 32 * 1024 * 1024 + }); + }); services.AddActors(options => { options.UseJsonSerialization = JsonSerializationEnabled; diff --git a/test/Dapr.E2E.Test/DaprTestApp.cs b/test/Dapr.E2E.Test/DaprTestApp.cs index 83f9948ac..152aeee98 100644 --- a/test/Dapr.E2E.Test/DaprTestApp.cs +++ b/test/Dapr.E2E.Test/DaprTestApp.cs @@ -58,6 +58,7 @@ public DaprTestApp(ITestOutputHelper output, string appId) "--components-path", componentsPath, "--config", configPath, "--log-level", "debug", + "--dapr-http-max-request-size", "32", }; diff --git a/test/Dapr.E2E.Test/Workflows/WorkflowTest.cs b/test/Dapr.E2E.Test/Workflows/WorkflowTest.cs index d95929ca3..a14a73f89 100644 --- a/test/Dapr.E2E.Test/Workflows/WorkflowTest.cs +++ b/test/Dapr.E2E.Test/Workflows/WorkflowTest.cs @@ -20,6 +20,7 @@ using Xunit; using System.Linq; using System.Diagnostics; +using Grpc.Net.Client; namespace Dapr.E2E.Test { @@ -82,13 +83,11 @@ public async Task TestWorkflowLogging() [Fact] public async Task TestWorkflows() { - var instanceId = "testInstanceId"; - var instanceId2 = "EventRaiseId"; - var workflowComponent = "dapr"; - var workflowName = "PlaceOrder"; + const string instanceId = "testInstanceId"; + const string workflowComponent = "dapr"; + const string workflowName = "PlaceOrder"; object input = "paperclips"; - Dictionary workflowOptions = new Dictionary(); - workflowOptions.Add("task_queue", "testQueue"); + var workflowOptions = new Dictionary { { "task_queue", "testQueue" } }; using var daprClient = new DaprClientBuilder().UseGrpcEndpoint(this.GrpcEndpoint).UseHttpEndpoint(this.HttpEndpoint).Build(); var health = await daprClient.CheckHealthAsync(); @@ -140,33 +139,101 @@ public async Task TestWorkflows() { ex.InnerException.Message.Should().Contain("no such instance exists", $"Instance {instanceId} was not correctly purged"); } - - // Start another workflow for event raising purposes - startResponse = await daprClient.StartWorkflowAsync( - instanceId: instanceId2, + + } + [Fact] + public async Task TestEventRaisingWorkflows() + { + const string instanceId = "EventRaiseId"; + const string workflowComponent = "dapr"; + const string workflowName = "PlaceOrder"; + object input = "paperclips"; + var workflowOptions = new Dictionary { { "task_queue", "testQueue" } }; + + using var daprClient = new DaprClientBuilder() + .UseGrpcEndpoint(this.GrpcEndpoint) + .UseHttpEndpoint(this.HttpEndpoint).Build(); + var health = await daprClient.CheckHealthAsync(); + + health.Should().Be(true, "DaprClient is not healthy"); + + var startResponse = await daprClient.StartWorkflowAsync( + instanceId: instanceId, workflowComponent: workflowComponent, workflowName: workflowName, input: input, workflowOptions: workflowOptions); - + // PARALLEL RAISE EVENT TEST - var event1 = daprClient.RaiseWorkflowEventAsync(instanceId2, workflowComponent, "ChangePurchaseItem", "computers"); - var event2 = daprClient.RaiseWorkflowEventAsync(instanceId2, workflowComponent, "ChangePurchaseItem", "computers"); - var event3 = daprClient.RaiseWorkflowEventAsync(instanceId2, workflowComponent, "ChangePurchaseItem", "computers"); - var event4 = daprClient.RaiseWorkflowEventAsync(instanceId2, workflowComponent, "ChangePurchaseItem", "computers"); - var event5 = daprClient.RaiseWorkflowEventAsync(instanceId2, workflowComponent, "ChangePurchaseItem", "computers"); - + var event1 = daprClient.RaiseWorkflowEventAsync(instanceId, workflowComponent, "ChangePurchaseItem", "computers"); + var event2 = daprClient.RaiseWorkflowEventAsync(instanceId, workflowComponent, "ChangePurchaseItem", "computers"); + var event3 = daprClient.RaiseWorkflowEventAsync(instanceId, workflowComponent, "ChangePurchaseItem", "computers"); + var event4 = daprClient.RaiseWorkflowEventAsync(instanceId, workflowComponent, "ChangePurchaseItem", "computers"); + var event5 = daprClient.RaiseWorkflowEventAsync(instanceId, workflowComponent, "ChangePurchaseItem", "computers"); + var externalEvents = Task.WhenAll(event1, event2, event3, event4, event5); - var winner = await Task.WhenAny(externalEvents, Task.Delay(TimeSpan.FromSeconds(30))); + await Task.WhenAny(externalEvents, Task.Delay(TimeSpan.FromSeconds(30))); externalEvents.IsCompletedSuccessfully.Should().BeTrue($"Unsuccessful at raising events. Status of events: {externalEvents.IsCompletedSuccessfully}"); // Wait up to 30 seconds for the workflow to complete and check the output using var cts = new CancellationTokenSource(delay: TimeSpan.FromSeconds(30)); - getResponse = await daprClient.WaitForWorkflowCompletionAsync(instanceId2, workflowComponent, cts.Token); + var getResponse = await daprClient.WaitForWorkflowCompletionAsync(instanceId, workflowComponent, cts.Token); var outputString = getResponse.Properties["dapr.workflow.output"]; outputString.Should().Be("\"computers\"", $"Purchased item {outputString} was not correct"); var deserializedOutput = getResponse.ReadOutputAs(); deserializedOutput.Should().Be("computers", $"Deserialized output '{deserializedOutput}' was not expected"); } + [Fact] + public async Task TestLargeMessageWorkflow() + { + const string instanceId = "testLargeMessageId"; + const string workflowComponent = "dapr"; + const string workflowName = "StartLargeOrder"; + object input = "paperclips"; + var workflowOptions = new Dictionary { { "task_queue", "testQueue" } }; + const int messageSize = 32 * 1024 * 1024; // 32Mb + const int payloadOverhead = 2000; //substract to allow for some overhead. + var largeString = GetRandomAlphaNumericString(messageSize - payloadOverhead); + + var channelOptions = new GrpcChannelOptions + { + MaxReceiveMessageSize = messageSize, MaxSendMessageSize = messageSize + }; + + using var daprClient = new DaprClientBuilder() + .UseGrpcEndpoint(this.GrpcEndpoint) + .UseGrpcChannelOptions(channelOptions) + .UseHttpEndpoint(this.HttpEndpoint) + .Build(); + + var health = await daprClient.CheckHealthAsync(); + health.Should().Be(true, "DaprClient is not healthy"); + + var startResponse = await daprClient.StartWorkflowAsync( + instanceId: instanceId, + workflowComponent: workflowComponent, + workflowName: workflowName, + input: input, + workflowOptions: workflowOptions); + + var event1 = daprClient.RaiseWorkflowEventAsync(instanceId, workflowComponent, "FinishLargeOrder", largeString); + await event1; + event1.IsCompletedSuccessfully.Should().BeTrue($"Cant send large message {event1.Exception}"); + + // Wait up to 30 seconds for the workflow to complete and check the output + using var cts = new CancellationTokenSource(delay: TimeSpan.FromSeconds(30)); + var getResponse = await daprClient.WaitForWorkflowCompletionAsync(instanceId, workflowComponent, cts.Token); + var outputString = getResponse.Properties["dapr.workflow.output"]; + outputString.Should().Be("\"" + largeString + "\"", $"Purchased item {outputString} was not correct"); + var deserializedOutput = getResponse.ReadOutputAs(); + deserializedOutput.Should().Be(largeString, $"Deserialized output '{deserializedOutput}' was not expected"); + } + private static string GetRandomAlphaNumericString(int length) + { + const string chars = "abcdefghijklmnopqrstuvwxyz0123456789"; + var rand = new Random(); + return new string(Enumerable.Repeat(chars, length) + .Select(s => s[rand.Next(s.Length)]).ToArray()); + } } }