From 7ea3627188d22a9e423ab6806b68af52e128b6c1 Mon Sep 17 00:00:00 2001 From: Shoichi Yasui Date: Tue, 26 Mar 2024 15:31:15 +0900 Subject: [PATCH 1/4] feat: define EventMessage --- .../Controller/EventMessage.cs | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) create mode 100644 src/DFrame.Controller/Controller/EventMessage.cs diff --git a/src/DFrame.Controller/Controller/EventMessage.cs b/src/DFrame.Controller/Controller/EventMessage.cs new file mode 100644 index 0000000..0726b83 --- /dev/null +++ b/src/DFrame.Controller/Controller/EventMessage.cs @@ -0,0 +1,19 @@ +namespace DFrame.Controller.EventMessage; + +/// +/// Type of +/// +public enum ControllerEventMessageType +{ + WorkflowStarted, + SetupCompleted, + ExecuteCompleted, + TeardownCompleted, + WorkflowCompleted, +} + +/// +/// Controller event message. +/// +public record class ControllerEventMessage(ControllerEventMessageType MessageType, ExecutionSummary ExecutionSummary); + From ea6541f680126db3d7b6dbb3c9626d8bb019bd64 Mon Sep 17 00:00:00 2001 From: Shoichi Yasui Date: Tue, 26 Mar 2024 15:31:55 +0900 Subject: [PATCH 2/4] feat: publish EventMessage --- .../DFrameControllerExecutionEngine.cs | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/src/DFrame.Controller/Controller/DFrameControllerExecutionEngine.cs b/src/DFrame.Controller/Controller/DFrameControllerExecutionEngine.cs index c4209a6..7d27a10 100644 --- a/src/DFrame.Controller/Controller/DFrameControllerExecutionEngine.cs +++ b/src/DFrame.Controller/Controller/DFrameControllerExecutionEngine.cs @@ -1,4 +1,5 @@ using MagicOnion.Server.Hubs; +using MessagePipe; namespace DFrame.Controller; @@ -13,6 +14,7 @@ public class DFrameControllerExecutionEngine : INotifyStateChanged readonly ILoggerFactory loggerFactory; readonly IExecutionResultHistoryProvider historyProvider; readonly DFrameControllerOptions options; + readonly IPublisher eventMessagePublisher; // Global states readonly Dictionary connections = new(); @@ -32,12 +34,13 @@ public class DFrameControllerExecutionEngine : INotifyStateChanged public ExecutionSummary? LatestExecutionSummary { get; private set; } = default; public SummarizedExecutionResult[] LatestSortedSummarizedExecutionResults { get; private set; } = Array.Empty(); - public DFrameControllerExecutionEngine(ILoggerFactory loggerFactory, IExecutionResultHistoryProvider historyProvider, DFrameControllerOptions options) + public DFrameControllerExecutionEngine(ILoggerFactory loggerFactory, IExecutionResultHistoryProvider historyProvider, DFrameControllerOptions options, IPublisher eventMessagePublisher) { this.loggerFactory = loggerFactory; this.logger = loggerFactory.CreateLogger(); this.historyProvider = historyProvider; this.options = options; + this.eventMessagePublisher = eventMessagePublisher; } public bool StartWorkerFlow(string workloadName, int concurrency, long totalRequestCount, int workerLimit, KeyValuePair[] parameters) @@ -135,7 +138,9 @@ public bool StartWorkerFlow(string workloadName, int concurrency, long totalRequ broadcaster.CreateWorkloadAndSetup(executionId, createWorkloadCount, concurrency, totalRequestCount, workloadName, parameters!); StateChanged?.Invoke(); - } + } + + eventMessagePublisher.Publish(new(EventMessage.ControllerEventMessageType.WorkflowStarted, LatestExecutionSummary)); return true; } @@ -220,7 +225,9 @@ public void ReportExecuteResult(WorkerId workerId, BatchedExecuteResult result) } public void CreateWorkloadAndSetupComplete(WorkerId workerId, IWorkerReceiver broadcaster, IWorkerReceiver broadcastToSelf) - { + { + eventMessagePublisher.Publish(new(EventMessage.ControllerEventMessageType.SetupCompleted, LatestExecutionSummary!)); + lock (EngineSync) { if (RunningState?.CreateWorkloadAndSetupComplete(workerId, broadcaster, broadcastToSelf) ?? true) @@ -231,7 +238,9 @@ public void CreateWorkloadAndSetupComplete(WorkerId workerId, IWorkerReceiver br } public void TeardownComplete(WorkerId workerId) - { + { + eventMessagePublisher.Publish(new(EventMessage.ControllerEventMessageType.TeardownCompleted, LatestExecutionSummary!)); + lock (EngineSync) { if (RunningState?.TeardownComplete(workerId) ?? true) @@ -243,6 +252,8 @@ public void TeardownComplete(WorkerId workerId) public void ExecuteComplete(WorkerId workerId, Dictionary?> results) { + eventMessagePublisher.Publish(new(EventMessage.ControllerEventMessageType.ExecuteCompleted, LatestExecutionSummary!)); + lock (EngineSync) { if (RunningState?.ExecuteComplete(workerId, results) ?? true) From cc5fd3bd4e4163745243e773f0dea5128142c5b2 Mon Sep 17 00:00:00 2001 From: Shoichi Yasui Date: Tue, 26 Mar 2024 15:32:13 +0900 Subject: [PATCH 3/4] chore: add sample EventHandler --- sandbox/ConsoleController/EventHandler.cs | 65 +++++++++++++++++++++++ sandbox/ConsoleController/Program.cs | 12 ++++- 2 files changed, 76 insertions(+), 1 deletion(-) create mode 100644 sandbox/ConsoleController/EventHandler.cs diff --git a/sandbox/ConsoleController/EventHandler.cs b/sandbox/ConsoleController/EventHandler.cs new file mode 100644 index 0000000..fb7334d --- /dev/null +++ b/sandbox/ConsoleController/EventHandler.cs @@ -0,0 +1,65 @@ +using DFrame.Controller; +using DFrame.Controller.EventMessage; +using MessagePipe; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace ConsoleController; + +/// +/// Controller event handler +/// +internal record class EventHandler(ILogger logger, ISubscriber subscriber) : IHostedService +{ + private IDisposable? eventMessageSubscription = null; + + public Task StartAsync(CancellationToken cancellationToken) + { + var bag = DisposableBag.CreateBuilder(initialCapacity: 1); + subscriber.Subscribe( + eventMessage => + { + switch (eventMessage.MessageType) + { + case ControllerEventMessageType.WorkflowStarted: OnWorkflowStarted(eventMessage.ExecutionSummary); break; + case ControllerEventMessageType.SetupCompleted: OnSetupCompleted(eventMessage.ExecutionSummary); break; + case ControllerEventMessageType.ExecuteCompleted: OnExecuteCompleted(eventMessage.ExecutionSummary); break; + case ControllerEventMessageType.TeardownCompleted: OnTeardownCompleted(eventMessage.ExecutionSummary); break; + case ControllerEventMessageType.WorkflowCompleted: OnWorkflowCompleted(eventMessage.ExecutionSummary); break; + default: logger.LogWarning("Unknown message type: {0}", eventMessage.MessageType); break; + } + } + ).AddTo(bag); + + eventMessageSubscription = bag.Build(); + + return Task.CompletedTask; + } + + public Task StopAsync(CancellationToken cancellationToken) + { + Interlocked.Exchange(ref eventMessageSubscription, null)?.Dispose(); + return Task.CompletedTask; + } + + /// + void OnWorkflowStarted(ExecutionSummary executionSummary) + => logger.LogInformation("{0} : {1} : {2}", nameof(ControllerEventMessageType.WorkflowStarted), executionSummary.Workload, executionSummary.ExecutionId); + + /// + void OnSetupCompleted(ExecutionSummary executionSummary) + => logger.LogInformation("{0} : {1} : {2}", nameof(ControllerEventMessageType.SetupCompleted), executionSummary.Workload, executionSummary.ExecutionId); + + /// + void OnExecuteCompleted(ExecutionSummary executionSummary) + => logger.LogInformation("{0} : {1} : {2}", nameof(ControllerEventMessageType.ExecuteCompleted), executionSummary.Workload, executionSummary.ExecutionId); + + /// + void OnTeardownCompleted(ExecutionSummary executionSummary) + => logger.LogInformation("{0} : {1} : {2}", nameof(ControllerEventMessageType.TeardownCompleted), executionSummary.Workload, executionSummary.ExecutionId); + + /// + void OnWorkflowCompleted(ExecutionSummary executionSummary) + => logger.LogInformation("{0} : {1} : {2}", nameof(ControllerEventMessageType.WorkflowCompleted), executionSummary.Workload, executionSummary.ExecutionId); + +} diff --git a/sandbox/ConsoleController/Program.cs b/sandbox/ConsoleController/Program.cs index dc9c419..fc733fb 100644 --- a/sandbox/ConsoleController/Program.cs +++ b/sandbox/ConsoleController/Program.cs @@ -1,7 +1,16 @@ using DFrame; +using MessagePipe; using Microsoft.AspNetCore.Builder; +using Microsoft.Extensions.DependencyInjection; var builder = WebApplication.CreateBuilder(args); +builder.WebHost.ConfigureServices( + (ctx, services) => + { + services.AddMessagePipe(); + services.AddHostedService(); + } +); await builder.RunDFrameControllerAsync(opt => { opt.Title = "foo"; @@ -12,4 +21,5 @@ await builder.RunDFrameControllerAsync(opt => //builder.Logging.ClearProviders(); //builder.Logging.SetMinimumLevel(LogLevel.Information); -//builder.Logging.AddZLoggerConsole(options => { }); \ No newline at end of file +//builder.Logging.AddZLoggerConsole(options => { }); + From e94d8e1fbf3d14d753812323299c775d9ea04c48 Mon Sep 17 00:00:00 2001 From: Shoichi Yasui Date: Thu, 28 Mar 2024 23:04:39 +0900 Subject: [PATCH 4/4] doc: Append document for ControllerEvent --- README.md | 44 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/README.md b/README.md index 15ab9ee..36811fd 100644 --- a/README.md +++ b/README.md @@ -53,6 +53,7 @@ Don't forget about performance. It is very important to hit a lot of RPS on sing - [Persistent execute results](#persistent-execute-results) - [REST API for Automation](#rest-api-for-automation) - [Unity](#unity) +- [Controller event handling](#controller-event-handling) - [License](#license) @@ -743,6 +744,49 @@ internal class PreserveAttribute : System.Attribute ![image](https://user-images.githubusercontent.com/46207/155901725-4ce8a36f-46e9-4437-aba7-639425f4b93f.png) +## Controller event handling + +`DFrame.Controller` provides event publishing using [MessagePipe](https://github.com/Cysharp/MessagePipe). +You can intercept processing at each step of the controller by obtaining `ISubscriber` subscriber from DI. + +```csharp +class SomeService : IHostedService +{ + readonly ISubscriber subscriber; + + private IDisposable? eventMessageSubscription = null; + + // ISubscriber is from DI. + public SomeService(ISubscriber subscriber) + { + this.subscriber = subscriber; + } + + public Task StartAsync(CancellationToken cancellationToken) + { + var bag = DisposableBag.CreateBuilder(initialCapacity: 1); + subscriber.Subscribe( + eventMessage => + { + // Your code. + } + ).AddTo(bag); + eventMessageSubscription = bag.Build(); + + return Task.CompletedTask; + } + + public Task StopAsync(CancellationToken cancellationToken) + { + eventMessageSubscription?.Dispose(); + return Task.CompletedTask; + } +} +``` + +A sample using this is available [here](sandbox/ConsoleController/EventHandler.cs). +For more information on usage, please refer to the [MessagePipe](https://github.com/Cysharp/MessagePipe) documentation. + License --- This library is under the MIT License.