From 0f8e963fc1e09693a627674253e431b738b2c276 Mon Sep 17 00:00:00 2001 From: Wojciech Turowicz Date: Wed, 26 Jul 2023 09:59:44 +0000 Subject: [PATCH] broadcast test --- .../Streams/Grains/BlobReceiverGrain.cs | 11 +++++ .../Streams/Grains/EmitterGrain.cs | 14 ++++++ .../Streams/Grains/IEmitterGrain.cs | 2 + .../Streams/Grains/SimpleReceiverGrain.cs | 11 +++++ .../Streams/Messages/BroadcastMessage.cs | 22 +++++++++ .../Streams/Scenarios/OneToMany.cs | 47 +++++++++++++++++++ .../Streams/Scenarios/OneToManyWait.cs | 46 ++++++++++++++++++ 7 files changed, 153 insertions(+) create mode 100644 src/Orleans.Streaming.Grains.Tests/Streams/Messages/BroadcastMessage.cs diff --git a/src/Orleans.Streaming.Grains.Tests/Streams/Grains/BlobReceiverGrain.cs b/src/Orleans.Streaming.Grains.Tests/Streams/Grains/BlobReceiverGrain.cs index e77612d..762ba40 100644 --- a/src/Orleans.Streaming.Grains.Tests/Streams/Grains/BlobReceiverGrain.cs +++ b/src/Orleans.Streaming.Grains.Tests/Streams/Grains/BlobReceiverGrain.cs @@ -9,11 +9,13 @@ namespace Orleans.Streaming.Grains.Tests.Streams.Grains { [ImplicitStreamSubscription(nameof(BlobMessage))] + [ImplicitStreamSubscription(nameof(BroadcastMessage))] public class BlobReceiverGrain : Grain, IBlobReceiverGrain { private readonly IProcessor _processor; private StreamSubscriptionHandle _subscription; + private StreamSubscriptionHandle _broadcast; public BlobReceiverGrain(IProcessor processor) { @@ -24,8 +26,10 @@ public override async Task OnActivateAsync(CancellationToken cancellationToken) { var streamProvider = this.GetStreamProvider("Default"); var stream = StreamFactory.Create(streamProvider, this.GetPrimaryKey()); + var broadcastStream = StreamFactory.Create(streamProvider, this.GetPrimaryKey()); _subscription = await stream.SubscribeAsync(OnNextAsync); + _broadcast = await broadcastStream.SubscribeAsync(OnBroadcastAsync); } private Task OnNextAsync(BlobMessage message, StreamSequenceToken token) @@ -34,5 +38,12 @@ private Task OnNextAsync(BlobMessage message, StreamSequenceToken token) return Task.CompletedTask; } + + private Task OnBroadcastAsync(BroadcastMessage message, StreamSequenceToken token) + { + _processor.Process(message.Text.Value); + + return Task.CompletedTask; + } } } \ No newline at end of file diff --git a/src/Orleans.Streaming.Grains.Tests/Streams/Grains/EmitterGrain.cs b/src/Orleans.Streaming.Grains.Tests/Streams/Grains/EmitterGrain.cs index ad665e1..de9138e 100644 --- a/src/Orleans.Streaming.Grains.Tests/Streams/Grains/EmitterGrain.cs +++ b/src/Orleans.Streaming.Grains.Tests/Streams/Grains/EmitterGrain.cs @@ -14,6 +14,7 @@ public class EmitterGrain : Grain, IEmitterGrain private IAsyncStream _blobStream; private IAsyncStream _simpleStream; private IAsyncStream _compoundStream; + private IAsyncStream _broadcastStream; public override async Task OnActivateAsync(CancellationToken cancellationToken) { @@ -23,6 +24,7 @@ public override async Task OnActivateAsync(CancellationToken cancellationToken) _blobStream = StreamFactory.Create(streamProvider, id); _simpleStream = StreamFactory.Create(streamProvider, id); _compoundStream = StreamFactory.Create(streamProvider, id); + _broadcastStream = StreamFactory.Create(streamProvider, id); await base.OnActivateAsync(cancellationToken); } @@ -60,5 +62,17 @@ await _compoundStream.OnNextAsync(new CompoundMessage }); } } + + public async Task BroadcastAsync(string text, byte[] data) + { + if (_broadcastStream != null) + { + await _broadcastStream.OnNextAsync(new BroadcastMessage + { + Text = new Immutable(text), + Data = new Immutable(data), + }); + } + } } } \ No newline at end of file diff --git a/src/Orleans.Streaming.Grains.Tests/Streams/Grains/IEmitterGrain.cs b/src/Orleans.Streaming.Grains.Tests/Streams/Grains/IEmitterGrain.cs index d4f5541..d08f163 100644 --- a/src/Orleans.Streaming.Grains.Tests/Streams/Grains/IEmitterGrain.cs +++ b/src/Orleans.Streaming.Grains.Tests/Streams/Grains/IEmitterGrain.cs @@ -11,5 +11,7 @@ public interface IEmitterGrain : IGrainWithGuidKey Task SendAsync(byte[] data); Task SendAsync(string text, byte[] data); + + Task BroadcastAsync(string text, byte[] data); } } \ No newline at end of file diff --git a/src/Orleans.Streaming.Grains.Tests/Streams/Grains/SimpleReceiverGrain.cs b/src/Orleans.Streaming.Grains.Tests/Streams/Grains/SimpleReceiverGrain.cs index ac57496..fc13829 100644 --- a/src/Orleans.Streaming.Grains.Tests/Streams/Grains/SimpleReceiverGrain.cs +++ b/src/Orleans.Streaming.Grains.Tests/Streams/Grains/SimpleReceiverGrain.cs @@ -9,11 +9,13 @@ namespace Orleans.Streaming.Grains.Tests.Streams.Grains { [ImplicitStreamSubscription(nameof(SimpleMessage))] + [ImplicitStreamSubscription(nameof(BroadcastMessage))] public class SimpleReceiverGrain : Grain, ISimpleReceiverGrain { private readonly IProcessor _processor; private StreamSubscriptionHandle _subscription; + private StreamSubscriptionHandle _broadcast; public SimpleReceiverGrain(IProcessor processor) { @@ -24,8 +26,10 @@ public override async Task OnActivateAsync(CancellationToken cancellationToken) { var streamProvider = this.GetStreamProvider("Default"); var stream = StreamFactory.Create(streamProvider, this.GetPrimaryKey()); + var broadcastStream = StreamFactory.Create(streamProvider, this.GetPrimaryKey()); _subscription = await stream.SubscribeAsync(OnNextAsync); + _broadcast = await broadcastStream.SubscribeAsync(OnBroadcastAsync); } private Task OnNextAsync(SimpleMessage message, StreamSequenceToken token) @@ -34,5 +38,12 @@ private Task OnNextAsync(SimpleMessage message, StreamSequenceToken token) return Task.CompletedTask; } + + private Task OnBroadcastAsync(BroadcastMessage message, StreamSequenceToken token) + { + _processor.Process(message.Text.Value); + + return Task.CompletedTask; + } } } \ No newline at end of file diff --git a/src/Orleans.Streaming.Grains.Tests/Streams/Messages/BroadcastMessage.cs b/src/Orleans.Streaming.Grains.Tests/Streams/Messages/BroadcastMessage.cs new file mode 100644 index 0000000..5b63905 --- /dev/null +++ b/src/Orleans.Streaming.Grains.Tests/Streams/Messages/BroadcastMessage.cs @@ -0,0 +1,22 @@ +// +// Copyright (c) Surveily Sp. z o.o.. All rights reserved. +// + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Orleans.Concurrency; + +namespace Orleans.Streaming.Grains.Tests.Streams.Messages +{ + [GenerateSerializer] + public class BroadcastMessage + { + [Id(0)] + public Immutable Data { get; set; } + + [Id(1)] + public Immutable Text { get; set; } + } +} \ No newline at end of file diff --git a/src/Orleans.Streaming.Grains.Tests/Streams/Scenarios/OneToMany.cs b/src/Orleans.Streaming.Grains.Tests/Streams/Scenarios/OneToMany.cs index 03c900a..d519205 100644 --- a/src/Orleans.Streaming.Grains.Tests/Streams/Scenarios/OneToMany.cs +++ b/src/Orleans.Streaming.Grains.Tests/Streams/Scenarios/OneToMany.cs @@ -113,5 +113,52 @@ public void It_Should_Deliver_Data() Processor!.Verify(x => x.Process(expectedData), Times.Exactly(10)); } } + + public class When_Sending_Broadcast_Message_One_To_One : BaseOneToManyTest + { + protected string resultText; + protected string expectedText = "text"; + + protected byte[] resultData; + protected byte[] expectedData = new byte[1024]; + + public override void Prepare() + { + base.Prepare(); + + Processor!.Setup(x => x.Process(It.IsAny())) + .Callback(x => resultText = x); + + Processor!.Setup(x => x.Process(It.IsAny())) + .Callback(x => resultData = x); + + for (var i = 0; i < 1024; i++) + { + expectedData[i] = Convert.ToByte(i % 2); + } + } + + public override async Task Act() + { + var grain = Subject.GetGrain(Guid.NewGuid()); + + for (var i = 0; i < 10; i++) + { + await grain.BroadcastAsync(expectedText, expectedData); + } + } + + [Test] + public void It_Should_Deliver_Text() + { + Processor!.Verify(x => x.Process(expectedText), Times.Exactly(10)); + } + + [Test] + public void It_Should_Deliver_Data() + { + Processor!.Verify(x => x.Process(expectedData), Times.Exactly(10)); + } + } } } \ No newline at end of file diff --git a/src/Orleans.Streaming.Grains.Tests/Streams/Scenarios/OneToManyWait.cs b/src/Orleans.Streaming.Grains.Tests/Streams/Scenarios/OneToManyWait.cs index 2ca596e..8087d19 100644 --- a/src/Orleans.Streaming.Grains.Tests/Streams/Scenarios/OneToManyWait.cs +++ b/src/Orleans.Streaming.Grains.Tests/Streams/Scenarios/OneToManyWait.cs @@ -112,5 +112,51 @@ public void It_Should_Deliver_Data() Processor!.Verify(x => x.Process(expectedData), Times.Once); } } + + public class When_Sending_Broadcast_Message_One_To_One : BaseOneToManyTest + { + protected string resultText; + protected string expectedText = "text"; + + protected byte[] resultData; + protected byte[] expectedData = new byte[1024]; + + public override void Prepare() + { + base.Prepare(); + + Processor!.Setup(x => x.Process(It.IsAny())) + .Callback(x => resultText = x); + + Processor!.Setup(x => x.Process(It.IsAny())) + .Callback(x => resultData = x); + + for (var i = 0; i < 1024; i++) + { + expectedData[i] = Convert.ToByte(i % 2); + } + } + + public override async Task Act() + { + var grain = Subject.GetGrain(Guid.NewGuid()); + + await grain.BroadcastAsync(expectedText, expectedData); + + await WaitFor(() => resultText); + } + + [Test] + public void It_Should_Deliver_Text() + { + Processor!.Verify(x => x.Process(expectedText), Times.Once); + } + + [Test] + public void It_Should_Deliver_Data() + { + Processor!.Verify(x => x.Process(expectedData), Times.Once); + } + } } } \ No newline at end of file