Skip to content

Commit

Permalink
[Workflow] Improve management API usability (#1087)
Browse files Browse the repository at this point in the history
* [Workflow] Improve management API usability

Signed-off-by: Chris Gillum <cgillum@microsoft.com>

* PR feedback and update E2E test

Signed-off-by: Chris Gillum <cgillum@microsoft.com>

* PR feedback

Signed-off-by: Chris Gillum <cgillum@microsoft.com>

---------

Signed-off-by: Chris Gillum <cgillum@microsoft.com>
  • Loading branch information
cgillum authored May 12, 2023
1 parent 610632a commit 8152c74
Show file tree
Hide file tree
Showing 11 changed files with 304 additions and 101 deletions.
1 change: 1 addition & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ charset = utf-8-bom
# Organize usings
dotnet_sort_system_directives_first = true
dotnet_separate_import_directive_groups = false
csharp_using_directive_placement = outside_namespace

# this. preferences
dotnet_style_qualification_for_field = false:silent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public override async Task<object> RunAsync(WorkflowActivityContext context, Pay
this.logger.LogInformation(
"Payment for request ID '{requestId}' could not be processed. Insufficient inventory.",
req.RequestId);
throw new InvalidOperationException("Not enough inventory!");
throw new InvalidOperationException($"Not enough '{req.ItemName}' inventory! Requested {req.Amount} but only {item.Quantity} available.");
}

// Update the statestore with the new amount of the item
Expand Down
28 changes: 17 additions & 11 deletions examples/Workflow/WorkflowConsoleApp/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
using WorkflowConsoleApp.Models;
using WorkflowConsoleApp.Workflows;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.DependencyInjection;

const string storeName = "statestore";
const string StoreName = "statestore";
const string DaprWorkflowComponent = "dapr";

// The workflow host is a background service that connects to the sidecar over gRPC
var builder = Host.CreateDefaultBuilder(args).ConfigureServices(services =>
Expand Down Expand Up @@ -58,8 +58,6 @@
// This is just to make the log output look a little nicer.
Thread.Sleep(TimeSpan.FromSeconds(1));

DaprWorkflowClient workflowClient = host.Services.GetRequiredService<DaprWorkflowClient>();

var baseInventory = new List<InventoryItem>
{
new InventoryItem(Name: "Paperclips", PerItemCost: 5, Quantity: 100),
Expand Down Expand Up @@ -114,23 +112,31 @@

// Start the workflow using the order ID as the workflow ID
Console.WriteLine($"Starting order workflow '{orderId}' purchasing {amount} {itemName}");
await workflowClient.ScheduleNewWorkflowAsync(
name: nameof(OrderProcessingWorkflow),
await daprClient.StartWorkflowAsync(
workflowComponent: DaprWorkflowComponent,
workflowName: nameof(OrderProcessingWorkflow),
input: orderInfo,
instanceId: orderId);

// Wait for the workflow to start and confirm the input
GetWorkflowResponse state = await daprClient.WaitForWorkflowStartAsync(
instanceId: orderId,
input: orderInfo);
workflowComponent: DaprWorkflowComponent);

Console.WriteLine($"{state.WorkflowName} (ID = {orderId}) started successfully with {state.ReadInputAs<OrderPayload>()}");

// Wait for the workflow to complete
WorkflowState state = await workflowClient.WaitForWorkflowCompletionAsync(
state = await daprClient.WaitForWorkflowCompletionAsync(
instanceId: orderId,
getInputsAndOutputs: true);
workflowComponent: DaprWorkflowComponent);

if (state.RuntimeStatus == WorkflowRuntimeStatus.Completed)
{
OrderResult result = state.ReadOutputAs<OrderResult>();
if (result.Processed)
{
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine($"Order workflow is {state.RuntimeStatus} and the order was processed successfully.");
Console.WriteLine($"Order workflow is {state.RuntimeStatus} and the order was processed successfully ({result}).");
Console.ResetColor();
}
else
Expand All @@ -154,6 +160,6 @@ static async Task RestockInventory(DaprClient daprClient, List<InventoryItem> in
foreach (var item in inventory)
{
Console.WriteLine($"*** \t{item.Name}: {item.Quantity}");
await daprClient.SaveStateAsync(storeName, item.Name.ToLowerInvariant(), item);
await daprClient.SaveStateAsync(StoreName, item.Name.ToLowerInvariant(), item);
}
}
81 changes: 77 additions & 4 deletions src/Dapr.Client/DaprClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -978,22 +978,95 @@ public abstract Task<UnlockResponse> Unlock(
/// <summary>
/// Attempt to start the given workflow with response indicating success.
/// </summary>
/// <param name="instanceId">Identifier of the specific run.</param>
/// <param name="workflowComponent">The component to interface with.</param>
/// <param name="workflowName">Name of the workflow to run.</param>
/// <param name="instanceId">Identifier of the specific run.</param>
/// <param name="input">The JSON-serializeable input for the given workflow.</param>
/// <param name="workflowOptions">The list of options that are potentially needed to start a workflow.</param>
/// <param name="input">The input input for the given workflow.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> that can be used to cancel the operation.</param>
/// <returns>A <see cref="Task"/> containing a <see cref="StartWorkflowResponse"/></returns>
[Obsolete("This API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")]
public abstract Task<StartWorkflowResponse> StartWorkflowAsync(
string instanceId,
string workflowComponent,
string workflowName,
Object input,
string instanceId = null,
object input = null,
IReadOnlyDictionary<string, string> workflowOptions = default,
CancellationToken cancellationToken = default);

/// <summary>
/// Waits for a workflow to start running and returns a <see cref="GetWorkflowResponse"/> object that contains metadata
/// about the started workflow.
/// </summary>
/// <remarks>
/// <para>
/// A "started" workflow instance is any instance not in the <see cref="WorkflowRuntimeStatus.Pending"/> state.
/// </para><para>
/// This method will return a completed task if the workflow has already started running or has already completed.
/// </para>
/// </remarks>
/// <param name="instanceId">The unique ID of the workflow instance to wait for.</param>
/// <param name="workflowComponent">The component to interface with.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> that can be used to cancel the wait operation.</param>
/// <returns>
/// Returns a <see cref="GetWorkflowResponse"/> record that describes the workflow instance and its execution status.
/// </returns>
[Obsolete("This API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")]
public virtual async Task<GetWorkflowResponse> WaitForWorkflowStartAsync(
string instanceId,
string workflowComponent,
CancellationToken cancellationToken = default)
{
while (true)
{
var response = await this.GetWorkflowAsync(instanceId, workflowComponent, cancellationToken);
if (response.RuntimeStatus != WorkflowRuntimeStatus.Pending)
{
return response;
}

await Task.Delay(TimeSpan.FromMilliseconds(500), cancellationToken);
}
}

/// <summary>
/// Waits for a workflow to complete and returns a <see cref="GetWorkflowResponse"/>
/// object that contains metadata about the started instance.
/// </summary>
/// <remarks>
/// <para>
/// A "completed" workflow instance is any instance in one of the terminal states. For example, the
/// <see cref="WorkflowRuntimeStatus.Completed"/>, <see cref="WorkflowRuntimeStatus.Failed"/>, or
/// <see cref="WorkflowRuntimeStatus.Terminated"/> states.
/// </para><para>
/// Workflows are long-running and could take hours, days, or months before completing.
/// Workflows can also be eternal, in which case they'll never complete unless terminated.
/// In such cases, this call may block indefinitely, so care must be taken to ensure appropriate timeouts are
/// enforced using the <paramref name="cancellationToken"/> parameter.
/// </para><para>
/// If a workflow instance is already complete when this method is called, the method will return immediately.
/// </para>
/// </remarks>
[Obsolete("This API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")]
public virtual async Task<GetWorkflowResponse> WaitForWorkflowCompletionAsync(
string instanceId,
string workflowComponent,
CancellationToken cancellationToken = default)
{
while (true)
{
var response = await this.GetWorkflowAsync(instanceId, workflowComponent, cancellationToken);
if (response.RuntimeStatus == WorkflowRuntimeStatus.Completed ||
response.RuntimeStatus == WorkflowRuntimeStatus.Failed ||
response.RuntimeStatus == WorkflowRuntimeStatus.Terminated)
{
return response;
}

await Task.Delay(TimeSpan.FromMilliseconds(500), cancellationToken);
}
}

/// <summary>
/// Attempt to get information about the given workflow.
/// </summary>
Expand Down
64 changes: 45 additions & 19 deletions src/Dapr.Client/DaprClientGrpc.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1467,10 +1467,10 @@ public async override Task<UnlockResponse> Unlock(
/// <inheritdoc/>
[Obsolete]
public async override Task<StartWorkflowResponse> StartWorkflowAsync(
string instanceId,
string workflowComponent,
string workflowName,
Object input,
string instanceId = null,
object input = null,
IReadOnlyDictionary<string, string> workflowOptions = default,
CancellationToken cancellationToken = default)
{
Expand All @@ -1480,14 +1480,18 @@ public async override Task<StartWorkflowResponse> StartWorkflowAsync(
ArgumentVerifier.ThrowIfNull(input, nameof(input));

// Serialize json data. Converts input object to bytes and then bytestring inside the request.
var jsonUtf8Bytes = JsonSerializer.SerializeToUtf8Bytes(input);
byte[] jsonUtf8Bytes = null;
if (input is not null)
{
jsonUtf8Bytes = JsonSerializer.SerializeToUtf8Bytes(input);
}

var request = new Autogenerated.StartWorkflowRequest()
{
InstanceId = instanceId,
WorkflowComponent = workflowComponent,
WorkflowName = workflowName,
Input = ByteString.CopyFrom(jsonUtf8Bytes),
Input = jsonUtf8Bytes is not null ? ByteString.CopyFrom(jsonUtf8Bytes) : null,
};

if (workflowOptions?.Count > 0)
Expand Down Expand Up @@ -1533,31 +1537,53 @@ public async override Task<GetWorkflowResponse> GetWorkflowAsync(
var response = await client.GetWorkflowAlpha1Async(request, options);
if (response == null)
{
throw new DaprException("Get workflow operation failed: the object response is null");
}
if (response.CreatedAt == null)
{
response.CreatedAt = new Timestamp();
throw new DaprException("Get workflow operation failed: CreatedAt object response is null");
throw new DaprException("Get workflow operation failed: the Dapr endpoint returned an empty result.");
}
if (response.LastUpdatedAt == null)

response.CreatedAt ??= new Timestamp();
response.LastUpdatedAt ??= response.CreatedAt;

return new GetWorkflowResponse
{
response.LastUpdatedAt = response.CreatedAt;
}
return new GetWorkflowResponse(response.InstanceId,
response.WorkflowName,
response.CreatedAt.ToDateTime(),
response.LastUpdatedAt.ToDateTime(),
response.RuntimeStatus,
response.Properties);
InstanceId = response.InstanceId,
WorkflowName = response.WorkflowName,
WorkflowComponentName = workflowComponent,
CreatedAt = response.CreatedAt.ToDateTime(),
LastUpdatedAt = response.LastUpdatedAt.ToDateTime(),
RuntimeStatus = GetWorkflowRuntimeStatus(response.RuntimeStatus),
Properties = response.Properties,
FailureDetails = GetWorkflowFailureDetails(response, workflowComponent),
};
}
catch (RpcException ex)
{
throw new DaprException("Get workflow operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
}
}

private static WorkflowRuntimeStatus GetWorkflowRuntimeStatus(string runtimeStatus)
{
if (!System.Enum.TryParse(runtimeStatus, true /* ignoreCase */, out WorkflowRuntimeStatus status))
{
status = WorkflowRuntimeStatus.Unknown;
}

return status;
}

private static WorkflowFailureDetails GetWorkflowFailureDetails(Autogenerated.GetWorkflowResponse response, string componentName)
{
// FUTURE: Make this part of the protobuf contract instead of getting it from properties
// NOTE: The use of | instead of || is intentional. We want to get all the values.
if (response.Properties.TryGetValue($"{componentName}.workflow.failure.error_type", out string errorType) |
response.Properties.TryGetValue($"{componentName}.workflow.failure.error_message", out string errorMessage) |
response.Properties.TryGetValue($"{componentName}.workflow.failure.stack_trace", out string stackTrace))
{
return new WorkflowFailureDetails(errorMessage, errorType, stackTrace);
}

return null;
}

/// <inheritdoc/>
[Obsolete]
Expand Down
Loading

0 comments on commit 8152c74

Please sign in to comment.