Skip to content

Commit

Permalink
Add unit tes, fixed remarks of reusing the httpclient
Browse files Browse the repository at this point in the history
Signed-off-by: Michiel van Praat <michiel.vanpraat@humandigital.nl>
  • Loading branch information
humandigital-michiel committed Aug 26, 2024
1 parent 6f5033c commit b3a368a
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 23 deletions.
9 changes: 5 additions & 4 deletions src/Dapr.Workflow/WorkflowServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,22 +60,22 @@ public static IServiceCollection AddDaprWorkflow(
if (TryGetGrpcAddress(out string address))
{
var client = new HttpClient();
var client = options?.GrpcChannelOptions?.HttpClient ?? new HttpClient();
var daprApiToken = DaprDefaults.GetDefaultDaprApiToken();
if (!string.IsNullOrEmpty(daprApiToken))
{
client.DefaultRequestHeaders.Add("Dapr-Api-Token", daprApiToken);
}
builder.UseGrpc(CreateChannel(address, client, options.GrpcChannelOptions));
builder.UseGrpc(CreateChannel(address, client, options?.GrpcChannelOptions));
}
else
{
builder.UseGrpc();
}
builder.AddTasks(registry => options.AddWorkflowsAndActivitiesToRegistry(registry));
builder.AddTasks(registry => options?.AddWorkflowsAndActivitiesToRegistry(registry));
});

return serviceCollection;
Expand Down Expand Up @@ -151,8 +151,9 @@ static bool TryGetGrpcAddress(out string address)

static GrpcChannel CreateChannel(string address, HttpClient client, GrpcChannelOptions? grpcChannelOptions = null)
{
ArgumentNullException.ThrowIfNull(client);

GrpcChannelOptions options = grpcChannelOptions ?? new();
options.HttpClient ??= client;

var daprEndpoint = DaprDefaults.GetDefaultGrpcEndpoint();
if (!String.IsNullOrEmpty(daprEndpoint))
Expand Down
20 changes: 20 additions & 0 deletions test/Dapr.E2E.Test.App/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ namespace Dapr.E2E.Test
using System;
using Microsoft.Extensions.Logging;
using Serilog;
using Grpc.Net.Client;

/// <summary>
/// Startup class.
Expand Down Expand Up @@ -98,6 +99,25 @@ public void ConfigureServices(IServiceCollection services)
return Task.FromResult($"We are shipping {input} to the customer using our hoard of drones!");
});
});
services.AddDaprWorkflow(options =>
{
// Example of registering a "StartOrder" workflow function
options.RegisterWorkflow<string, string>("StartLargeOrder", implementation: async (context, input) =>
{
var itemToPurchase = input;
itemToPurchase = await context.WaitForExternalEventAsync<string>("FinishLargeOrder");
return itemToPurchase;
});
options.RegisterActivity<string, string>("FinishLargeOrder", implementation: (context, input) =>
{
return Task.FromResult($"We are finishing, it's huge!");
});
options.UseGrpcChannelOptions(new GrpcChannelOptions
{
MaxReceiveMessageSize = 32 * 1024 * 1024,
MaxSendMessageSize = 32 * 1024 * 1024
});
});
services.AddActors(options =>
{
options.UseJsonSerialization = JsonSerializationEnabled;
Expand Down
1 change: 1 addition & 0 deletions test/Dapr.E2E.Test/DaprTestApp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public DaprTestApp(ITestOutputHelper output, string appId)
"--components-path", componentsPath,
"--config", configPath,
"--log-level", "debug",
"--dapr-http-max-request-size", "32",

};

Expand Down
105 changes: 86 additions & 19 deletions test/Dapr.E2E.Test/Workflows/WorkflowTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
using Xunit;
using System.Linq;
using System.Diagnostics;
using Grpc.Net.Client;

namespace Dapr.E2E.Test
{
Expand Down Expand Up @@ -82,13 +83,11 @@ public async Task TestWorkflowLogging()
[Fact]
public async Task TestWorkflows()
{
var instanceId = "testInstanceId";
var instanceId2 = "EventRaiseId";
var workflowComponent = "dapr";
var workflowName = "PlaceOrder";
const string instanceId = "testInstanceId";
const string workflowComponent = "dapr";
const string workflowName = "PlaceOrder";
object input = "paperclips";
Dictionary<string, string> workflowOptions = new Dictionary<string, string>();
workflowOptions.Add("task_queue", "testQueue");
var workflowOptions = new Dictionary<string, string> { { "task_queue", "testQueue" } };

using var daprClient = new DaprClientBuilder().UseGrpcEndpoint(this.GrpcEndpoint).UseHttpEndpoint(this.HttpEndpoint).Build();
var health = await daprClient.CheckHealthAsync();
Expand Down Expand Up @@ -140,33 +139,101 @@ public async Task TestWorkflows()
{
ex.InnerException.Message.Should().Contain("no such instance exists", $"Instance {instanceId} was not correctly purged");
}

// Start another workflow for event raising purposes
startResponse = await daprClient.StartWorkflowAsync(
instanceId: instanceId2,

}
[Fact]
public async Task TestEventRaisingWorkflows()
{
const string instanceId = "EventRaiseId";
const string workflowComponent = "dapr";
const string workflowName = "PlaceOrder";
object input = "paperclips";
var workflowOptions = new Dictionary<string, string> { { "task_queue", "testQueue" } };

using var daprClient = new DaprClientBuilder()
.UseGrpcEndpoint(this.GrpcEndpoint)
.UseHttpEndpoint(this.HttpEndpoint).Build();
var health = await daprClient.CheckHealthAsync();

health.Should().Be(true, "DaprClient is not healthy");

var startResponse = await daprClient.StartWorkflowAsync(
instanceId: instanceId,
workflowComponent: workflowComponent,
workflowName: workflowName,
input: input,
workflowOptions: workflowOptions);

// PARALLEL RAISE EVENT TEST
var event1 = daprClient.RaiseWorkflowEventAsync(instanceId2, workflowComponent, "ChangePurchaseItem", "computers");
var event2 = daprClient.RaiseWorkflowEventAsync(instanceId2, workflowComponent, "ChangePurchaseItem", "computers");
var event3 = daprClient.RaiseWorkflowEventAsync(instanceId2, workflowComponent, "ChangePurchaseItem", "computers");
var event4 = daprClient.RaiseWorkflowEventAsync(instanceId2, workflowComponent, "ChangePurchaseItem", "computers");
var event5 = daprClient.RaiseWorkflowEventAsync(instanceId2, workflowComponent, "ChangePurchaseItem", "computers");

var event1 = daprClient.RaiseWorkflowEventAsync(instanceId, workflowComponent, "ChangePurchaseItem", "computers");
var event2 = daprClient.RaiseWorkflowEventAsync(instanceId, workflowComponent, "ChangePurchaseItem", "computers");
var event3 = daprClient.RaiseWorkflowEventAsync(instanceId, workflowComponent, "ChangePurchaseItem", "computers");
var event4 = daprClient.RaiseWorkflowEventAsync(instanceId, workflowComponent, "ChangePurchaseItem", "computers");
var event5 = daprClient.RaiseWorkflowEventAsync(instanceId, workflowComponent, "ChangePurchaseItem", "computers");
var externalEvents = Task.WhenAll(event1, event2, event3, event4, event5);
var winner = await Task.WhenAny(externalEvents, Task.Delay(TimeSpan.FromSeconds(30)));
await Task.WhenAny(externalEvents, Task.Delay(TimeSpan.FromSeconds(30)));
externalEvents.IsCompletedSuccessfully.Should().BeTrue($"Unsuccessful at raising events. Status of events: {externalEvents.IsCompletedSuccessfully}");

// Wait up to 30 seconds for the workflow to complete and check the output
using var cts = new CancellationTokenSource(delay: TimeSpan.FromSeconds(30));
getResponse = await daprClient.WaitForWorkflowCompletionAsync(instanceId2, workflowComponent, cts.Token);
var getResponse = await daprClient.WaitForWorkflowCompletionAsync(instanceId, workflowComponent, cts.Token);
var outputString = getResponse.Properties["dapr.workflow.output"];
outputString.Should().Be("\"computers\"", $"Purchased item {outputString} was not correct");
var deserializedOutput = getResponse.ReadOutputAs<string>();
deserializedOutput.Should().Be("computers", $"Deserialized output '{deserializedOutput}' was not expected");
}
[Fact]
public async Task TestLargeMessageWorkflow()
{
const string instanceId = "testLargeMessageId";
const string workflowComponent = "dapr";
const string workflowName = "StartLargeOrder";
object input = "paperclips";
var workflowOptions = new Dictionary<string, string> { { "task_queue", "testQueue" } };
const int messageSize = 32 * 1024 * 1024; // 32Mb
const int payloadOverhead = 2000; //substract to allow for some overhead.
var largeString = GetRandomAlphaNumericString(messageSize - payloadOverhead);

var channelOptions = new GrpcChannelOptions
{
MaxReceiveMessageSize = messageSize, MaxSendMessageSize = messageSize
};

using var daprClient = new DaprClientBuilder()
.UseGrpcEndpoint(this.GrpcEndpoint)
.UseGrpcChannelOptions(channelOptions)
.UseHttpEndpoint(this.HttpEndpoint)
.Build();

var health = await daprClient.CheckHealthAsync();
health.Should().Be(true, "DaprClient is not healthy");

var startResponse = await daprClient.StartWorkflowAsync(
instanceId: instanceId,
workflowComponent: workflowComponent,
workflowName: workflowName,
input: input,
workflowOptions: workflowOptions);

var event1 = daprClient.RaiseWorkflowEventAsync(instanceId, workflowComponent, "FinishLargeOrder", largeString);
await event1;
event1.IsCompletedSuccessfully.Should().BeTrue($"Cant send large message {event1.Exception}");

// Wait up to 30 seconds for the workflow to complete and check the output
using var cts = new CancellationTokenSource(delay: TimeSpan.FromSeconds(30));
var getResponse = await daprClient.WaitForWorkflowCompletionAsync(instanceId, workflowComponent, cts.Token);
var outputString = getResponse.Properties["dapr.workflow.output"];
outputString.Should().Be("\"" + largeString + "\"", $"Purchased item {outputString} was not correct");
var deserializedOutput = getResponse.ReadOutputAs<string>();
deserializedOutput.Should().Be(largeString, $"Deserialized output '{deserializedOutput}' was not expected");
}
private static string GetRandomAlphaNumericString(int length)
{
const string chars = "abcdefghijklmnopqrstuvwxyz0123456789";
var rand = new Random();
return new string(Enumerable.Repeat(chars, length)
.Select(s => s[rand.Next(s.Length)]).ToArray());
}
}
}

0 comments on commit b3a368a

Please sign in to comment.