Skip to content

Commit

Permalink
Merge pull request #47 from the9ball/feat/controller_event
Browse files Browse the repository at this point in the history
feat: Publish Controller events
  • Loading branch information
hadashiA authored Mar 29, 2024
2 parents 2e4a620 + e94d8e1 commit dd79845
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 5 deletions.
44 changes: 44 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ Don't forget about performance. It is very important to hit a lot of RPS on sing
- [Persistent execute results](#persistent-execute-results)
- [REST API for Automation](#rest-api-for-automation)
- [Unity](#unity)
- [Controller event handling](#controller-event-handling)
- [License](#license)

<!-- END doctoc generated TOC please keep comment here to allow auto update -->
Expand Down Expand Up @@ -743,6 +744,49 @@ internal class PreserveAttribute : System.Attribute

![image](https://user-images.githubusercontent.com/46207/155901725-4ce8a36f-46e9-4437-aba7-639425f4b93f.png)
## Controller event handling

`DFrame.Controller` provides event publishing using [MessagePipe](https://github.com/Cysharp/MessagePipe).
You can intercept processing at each step of the controller by obtaining `ISubscriber<ControllerEventMessage>` subscriber from DI.

```csharp
class SomeService : IHostedService
{
readonly ISubscriber<ControllerEventMessage> subscriber;

private IDisposable? eventMessageSubscription = null;

// ISubscriber<ControllerEventMessage> is from DI.
public SomeService(ISubscriber<ControllerEventMessage> subscriber)
{
this.subscriber = subscriber;
}

public Task StartAsync(CancellationToken cancellationToken)
{
var bag = DisposableBag.CreateBuilder(initialCapacity: 1);
subscriber.Subscribe(
eventMessage =>
{
// Your code.
}
).AddTo(bag);
eventMessageSubscription = bag.Build();

return Task.CompletedTask;
}

public Task StopAsync(CancellationToken cancellationToken)
{
eventMessageSubscription?.Dispose();
return Task.CompletedTask;
}
}
```

A sample using this is available [here](sandbox/ConsoleController/EventHandler.cs).
For more information on usage, please refer to the [MessagePipe](https://github.com/Cysharp/MessagePipe) documentation.
License
---
This library is under the MIT License.
65 changes: 65 additions & 0 deletions sandbox/ConsoleController/EventHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
using DFrame.Controller;
using DFrame.Controller.EventMessage;
using MessagePipe;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

namespace ConsoleController;

/// <summary>
/// Controller event handler
/// </summary>
internal record class EventHandler(ILogger<EventHandler> logger, ISubscriber<ControllerEventMessage> subscriber) : IHostedService
{
private IDisposable? eventMessageSubscription = null;

public Task StartAsync(CancellationToken cancellationToken)
{
var bag = DisposableBag.CreateBuilder(initialCapacity: 1);
subscriber.Subscribe(
eventMessage =>
{
switch (eventMessage.MessageType)
{
case ControllerEventMessageType.WorkflowStarted: OnWorkflowStarted(eventMessage.ExecutionSummary); break;
case ControllerEventMessageType.SetupCompleted: OnSetupCompleted(eventMessage.ExecutionSummary); break;
case ControllerEventMessageType.ExecuteCompleted: OnExecuteCompleted(eventMessage.ExecutionSummary); break;
case ControllerEventMessageType.TeardownCompleted: OnTeardownCompleted(eventMessage.ExecutionSummary); break;
case ControllerEventMessageType.WorkflowCompleted: OnWorkflowCompleted(eventMessage.ExecutionSummary); break;
default: logger.LogWarning("Unknown message type: {0}", eventMessage.MessageType); break;
}
}
).AddTo(bag);

eventMessageSubscription = bag.Build();

return Task.CompletedTask;
}

public Task StopAsync(CancellationToken cancellationToken)
{
Interlocked.Exchange(ref eventMessageSubscription, null)?.Dispose();
return Task.CompletedTask;
}

/// <inheritdoc/>
void OnWorkflowStarted(ExecutionSummary executionSummary)
=> logger.LogInformation("{0} : {1} : {2}", nameof(ControllerEventMessageType.WorkflowStarted), executionSummary.Workload, executionSummary.ExecutionId);

/// <inheritdoc/>
void OnSetupCompleted(ExecutionSummary executionSummary)
=> logger.LogInformation("{0} : {1} : {2}", nameof(ControllerEventMessageType.SetupCompleted), executionSummary.Workload, executionSummary.ExecutionId);

/// <inheritdoc/>
void OnExecuteCompleted(ExecutionSummary executionSummary)
=> logger.LogInformation("{0} : {1} : {2}", nameof(ControllerEventMessageType.ExecuteCompleted), executionSummary.Workload, executionSummary.ExecutionId);

/// <inheritdoc/>
void OnTeardownCompleted(ExecutionSummary executionSummary)
=> logger.LogInformation("{0} : {1} : {2}", nameof(ControllerEventMessageType.TeardownCompleted), executionSummary.Workload, executionSummary.ExecutionId);

/// <inheritdoc/>
void OnWorkflowCompleted(ExecutionSummary executionSummary)
=> logger.LogInformation("{0} : {1} : {2}", nameof(ControllerEventMessageType.WorkflowCompleted), executionSummary.Workload, executionSummary.ExecutionId);

}
12 changes: 11 additions & 1 deletion sandbox/ConsoleController/Program.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
using DFrame;
using MessagePipe;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;

var builder = WebApplication.CreateBuilder(args);
builder.WebHost.ConfigureServices(
(ctx, services) =>
{
services.AddMessagePipe();
services.AddHostedService<ConsoleController.EventHandler>();
}
);
await builder.RunDFrameControllerAsync(opt =>
{
opt.Title = "foo";
Expand All @@ -12,4 +21,5 @@ await builder.RunDFrameControllerAsync(opt =>

//builder.Logging.ClearProviders();
//builder.Logging.SetMinimumLevel(LogLevel.Information);
//builder.Logging.AddZLoggerConsole(options => { });
//builder.Logging.AddZLoggerConsole(options => { });

Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using MagicOnion.Server.Hubs;
using MessagePipe;

namespace DFrame.Controller;

Expand All @@ -13,6 +14,7 @@ public class DFrameControllerExecutionEngine : INotifyStateChanged
readonly ILoggerFactory loggerFactory;
readonly IExecutionResultHistoryProvider historyProvider;
readonly DFrameControllerOptions options;
readonly IPublisher<EventMessage.ControllerEventMessage> eventMessagePublisher;

// Global states
readonly Dictionary<WorkerId, WorkerInfo> connections = new();
Expand All @@ -32,12 +34,13 @@ public class DFrameControllerExecutionEngine : INotifyStateChanged
public ExecutionSummary? LatestExecutionSummary { get; private set; } = default;
public SummarizedExecutionResult[] LatestSortedSummarizedExecutionResults { get; private set; } = Array.Empty<SummarizedExecutionResult>();

public DFrameControllerExecutionEngine(ILoggerFactory loggerFactory, IExecutionResultHistoryProvider historyProvider, DFrameControllerOptions options)
public DFrameControllerExecutionEngine(ILoggerFactory loggerFactory, IExecutionResultHistoryProvider historyProvider, DFrameControllerOptions options, IPublisher<EventMessage.ControllerEventMessage> eventMessagePublisher)
{
this.loggerFactory = loggerFactory;
this.logger = loggerFactory.CreateLogger<DFrameControllerExecutionEngine>();
this.historyProvider = historyProvider;
this.options = options;
this.eventMessagePublisher = eventMessagePublisher;
}

public bool StartWorkerFlow(string workloadName, int concurrency, long totalRequestCount, int workerLimit, KeyValuePair<string, string?>[] parameters)
Expand Down Expand Up @@ -135,7 +138,9 @@ public bool StartWorkerFlow(string workloadName, int concurrency, long totalRequ

broadcaster.CreateWorkloadAndSetup(executionId, createWorkloadCount, concurrency, totalRequestCount, workloadName, parameters!);
StateChanged?.Invoke();
}
}

eventMessagePublisher.Publish(new(EventMessage.ControllerEventMessageType.WorkflowStarted, LatestExecutionSummary));

return true;
}
Expand Down Expand Up @@ -220,7 +225,9 @@ public void ReportExecuteResult(WorkerId workerId, BatchedExecuteResult result)
}

public void CreateWorkloadAndSetupComplete(WorkerId workerId, IWorkerReceiver broadcaster, IWorkerReceiver broadcastToSelf)
{
{
eventMessagePublisher.Publish(new(EventMessage.ControllerEventMessageType.SetupCompleted, LatestExecutionSummary!));

lock (EngineSync)
{
if (RunningState?.CreateWorkloadAndSetupComplete(workerId, broadcaster, broadcastToSelf) ?? true)
Expand All @@ -231,7 +238,9 @@ public void CreateWorkloadAndSetupComplete(WorkerId workerId, IWorkerReceiver br
}

public void TeardownComplete(WorkerId workerId)
{
{
eventMessagePublisher.Publish(new(EventMessage.ControllerEventMessageType.TeardownCompleted, LatestExecutionSummary!));

lock (EngineSync)
{
if (RunningState?.TeardownComplete(workerId) ?? true)
Expand All @@ -243,6 +252,8 @@ public void TeardownComplete(WorkerId workerId)

public void ExecuteComplete(WorkerId workerId, Dictionary<WorkloadId, Dictionary<string, string>?> results)
{
eventMessagePublisher.Publish(new(EventMessage.ControllerEventMessageType.ExecuteCompleted, LatestExecutionSummary!));

lock (EngineSync)
{
if (RunningState?.ExecuteComplete(workerId, results) ?? true)
Expand Down
19 changes: 19 additions & 0 deletions src/DFrame.Controller/Controller/EventMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
namespace DFrame.Controller.EventMessage;

/// <summary>
/// Type of <see cref="ControllerEventMessage"/>
/// </summary>
public enum ControllerEventMessageType
{
WorkflowStarted,
SetupCompleted,
ExecuteCompleted,
TeardownCompleted,
WorkflowCompleted,
}

/// <summary>
/// Controller event message.
/// </summary>
public record class ControllerEventMessage(ControllerEventMessageType MessageType, ExecutionSummary ExecutionSummary);

0 comments on commit dd79845

Please sign in to comment.