Skip to content

Commit

Permalink
broadcast test
Browse files Browse the repository at this point in the history
  • Loading branch information
Wojciech Turowicz committed Jul 26, 2023
1 parent 4c38ec6 commit 0f8e963
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
namespace Orleans.Streaming.Grains.Tests.Streams.Grains
{
[ImplicitStreamSubscription(nameof(BlobMessage))]
[ImplicitStreamSubscription(nameof(BroadcastMessage))]
public class BlobReceiverGrain : Grain, IBlobReceiverGrain
{
private readonly IProcessor _processor;

private StreamSubscriptionHandle<BlobMessage> _subscription;
private StreamSubscriptionHandle<BroadcastMessage> _broadcast;

public BlobReceiverGrain(IProcessor processor)
{
Expand All @@ -24,8 +26,10 @@ public override async Task OnActivateAsync(CancellationToken cancellationToken)
{
var streamProvider = this.GetStreamProvider("Default");
var stream = StreamFactory.Create<BlobMessage>(streamProvider, this.GetPrimaryKey());
var broadcastStream = StreamFactory.Create<BroadcastMessage>(streamProvider, this.GetPrimaryKey());

_subscription = await stream.SubscribeAsync(OnNextAsync);
_broadcast = await broadcastStream.SubscribeAsync(OnBroadcastAsync);
}

private Task OnNextAsync(BlobMessage message, StreamSequenceToken token)
Expand All @@ -34,5 +38,12 @@ private Task OnNextAsync(BlobMessage message, StreamSequenceToken token)

return Task.CompletedTask;
}

private Task OnBroadcastAsync(BroadcastMessage message, StreamSequenceToken token)
{
_processor.Process(message.Text.Value);

return Task.CompletedTask;
}
}
}
14 changes: 14 additions & 0 deletions src/Orleans.Streaming.Grains.Tests/Streams/Grains/EmitterGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public class EmitterGrain : Grain, IEmitterGrain
private IAsyncStream<BlobMessage> _blobStream;
private IAsyncStream<SimpleMessage> _simpleStream;
private IAsyncStream<CompoundMessage> _compoundStream;
private IAsyncStream<BroadcastMessage> _broadcastStream;

public override async Task OnActivateAsync(CancellationToken cancellationToken)
{
Expand All @@ -23,6 +24,7 @@ public override async Task OnActivateAsync(CancellationToken cancellationToken)
_blobStream = StreamFactory.Create<BlobMessage>(streamProvider, id);
_simpleStream = StreamFactory.Create<SimpleMessage>(streamProvider, id);
_compoundStream = StreamFactory.Create<CompoundMessage>(streamProvider, id);
_broadcastStream = StreamFactory.Create<BroadcastMessage>(streamProvider, id);

await base.OnActivateAsync(cancellationToken);
}
Expand Down Expand Up @@ -60,5 +62,17 @@ await _compoundStream.OnNextAsync(new CompoundMessage
});
}
}

public async Task BroadcastAsync(string text, byte[] data)
{
if (_broadcastStream != null)
{
await _broadcastStream.OnNextAsync(new BroadcastMessage
{
Text = new Immutable<string>(text),
Data = new Immutable<byte[]>(data),
});
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,7 @@ public interface IEmitterGrain : IGrainWithGuidKey
Task SendAsync(byte[] data);

Task SendAsync(string text, byte[] data);

Task BroadcastAsync(string text, byte[] data);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
namespace Orleans.Streaming.Grains.Tests.Streams.Grains
{
[ImplicitStreamSubscription(nameof(SimpleMessage))]
[ImplicitStreamSubscription(nameof(BroadcastMessage))]
public class SimpleReceiverGrain : Grain, ISimpleReceiverGrain
{
private readonly IProcessor _processor;

private StreamSubscriptionHandle<SimpleMessage> _subscription;
private StreamSubscriptionHandle<BroadcastMessage> _broadcast;

public SimpleReceiverGrain(IProcessor processor)
{
Expand All @@ -24,8 +26,10 @@ public override async Task OnActivateAsync(CancellationToken cancellationToken)
{
var streamProvider = this.GetStreamProvider("Default");
var stream = StreamFactory.Create<SimpleMessage>(streamProvider, this.GetPrimaryKey());
var broadcastStream = StreamFactory.Create<BroadcastMessage>(streamProvider, this.GetPrimaryKey());

_subscription = await stream.SubscribeAsync(OnNextAsync);
_broadcast = await broadcastStream.SubscribeAsync(OnBroadcastAsync);
}

private Task OnNextAsync(SimpleMessage message, StreamSequenceToken token)
Expand All @@ -34,5 +38,12 @@ private Task OnNextAsync(SimpleMessage message, StreamSequenceToken token)

return Task.CompletedTask;
}

private Task OnBroadcastAsync(BroadcastMessage message, StreamSequenceToken token)
{
_processor.Process(message.Text.Value);

return Task.CompletedTask;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// <copyright file="BroadcastMessage.cs" company="Surveily Sp. z o.o.">
// Copyright (c) Surveily Sp. z o.o.. All rights reserved.
// </copyright>

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Orleans.Concurrency;

namespace Orleans.Streaming.Grains.Tests.Streams.Messages
{
[GenerateSerializer]
public class BroadcastMessage
{
[Id(0)]
public Immutable<byte[]> Data { get; set; }

[Id(1)]
public Immutable<string> Text { get; set; }
}
}
47 changes: 47 additions & 0 deletions src/Orleans.Streaming.Grains.Tests/Streams/Scenarios/OneToMany.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,5 +113,52 @@ public void It_Should_Deliver_Data()
Processor!.Verify(x => x.Process(expectedData), Times.Exactly(10));
}
}

public class When_Sending_Broadcast_Message_One_To_One : BaseOneToManyTest
{
protected string resultText;
protected string expectedText = "text";

protected byte[] resultData;
protected byte[] expectedData = new byte[1024];

public override void Prepare()
{
base.Prepare();

Processor!.Setup(x => x.Process(It.IsAny<string>()))
.Callback<string>(x => resultText = x);

Processor!.Setup(x => x.Process(It.IsAny<byte[]>()))
.Callback<byte[]>(x => resultData = x);

for (var i = 0; i < 1024; i++)
{
expectedData[i] = Convert.ToByte(i % 2);
}
}

public override async Task Act()
{
var grain = Subject.GetGrain<IEmitterGrain>(Guid.NewGuid());

for (var i = 0; i < 10; i++)
{
await grain.BroadcastAsync(expectedText, expectedData);
}
}

[Test]
public void It_Should_Deliver_Text()
{
Processor!.Verify(x => x.Process(expectedText), Times.Exactly(10));
}

[Test]
public void It_Should_Deliver_Data()
{
Processor!.Verify(x => x.Process(expectedData), Times.Exactly(10));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,5 +112,51 @@ public void It_Should_Deliver_Data()
Processor!.Verify(x => x.Process(expectedData), Times.Once);
}
}

public class When_Sending_Broadcast_Message_One_To_One : BaseOneToManyTest
{
protected string resultText;
protected string expectedText = "text";

protected byte[] resultData;
protected byte[] expectedData = new byte[1024];

public override void Prepare()
{
base.Prepare();

Processor!.Setup(x => x.Process(It.IsAny<string>()))
.Callback<string>(x => resultText = x);

Processor!.Setup(x => x.Process(It.IsAny<byte[]>()))
.Callback<byte[]>(x => resultData = x);

for (var i = 0; i < 1024; i++)
{
expectedData[i] = Convert.ToByte(i % 2);
}
}

public override async Task Act()
{
var grain = Subject.GetGrain<IEmitterGrain>(Guid.NewGuid());

await grain.BroadcastAsync(expectedText, expectedData);

await WaitFor(() => resultText);
}

[Test]
public void It_Should_Deliver_Text()
{
Processor!.Verify(x => x.Process(expectedText), Times.Once);
}

[Test]
public void It_Should_Deliver_Data()
{
Processor!.Verify(x => x.Process(expectedData), Times.Once);
}
}
}
}

0 comments on commit 0f8e963

Please sign in to comment.