Skip to content

Commit

Permalink
add retry and auto-poison
Browse files Browse the repository at this point in the history
  • Loading branch information
Wojciech Turowicz committed Aug 3, 2023
1 parent d8b4f8f commit cc07377
Show file tree
Hide file tree
Showing 9 changed files with 210 additions and 46 deletions.
5 changes: 4 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
{
"dotnet.defaultSolution": "src/Orleans.Streaming.Grains.sln"
"dotnet.defaultSolution": "src/Orleans.Streaming.Grains.sln",
"cSpell.words": [
"Surveily"
]
}
4 changes: 2 additions & 2 deletions src/Orleans.Streaming.Grains.Tests/BaseGrainTestConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ public void Configure(ISiloBuilder siloBuilder)
siloBuilder.ConfigureServices(Configure)
.AddMemoryGrainStorageAsDefault()
.AddMemoryGrainStorage("PubSubStore")
.AddGrainsStreams("Default", 1);
.AddGrainsStreams("Default", 1, 3);
}
else
{
#pragma warning disable CS0618
siloBuilder.ConfigureServices(Configure)
.AddMemoryGrainStorageAsDefault()
.AddMemoryGrainStorage("PubSubStore")
.AddGrainsStreamsForTests("Default", 3, new[]
.AddGrainsStreamsForTests("Default", 3, 3, new[]
{
typeof(BlobMessage),
typeof(SimpleMessage),
Expand Down
63 changes: 43 additions & 20 deletions src/Orleans.Streaming.Grains.Tests/Grains/TransactionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using Orleans.Concurrency;
using Orleans.Streaming.Grains.Abstract;
using Orleans.Streaming.Grains.Services;
using Orleans.Streaming.Grains.State;
using Orleans.Streaming.Grains.Streams;
using Orleans.Streaming.Grains.Test;
using Should;
Expand All @@ -36,9 +37,7 @@ public abstract class BaseTransactionTest : BaseGrainTest<Config>
protected ITransactionService service;
protected IOptions<GrainsOptions> settings;

protected Queue<Guid> queue;
protected Queue<Guid> poison;
protected Dictionary<Guid, DateTimeOffset> transactions;
protected TransactionGrainState state;

public override void Prepare()
{
Expand Down Expand Up @@ -102,25 +101,31 @@ public override async Task Act()

var transaction = client.GetGrain<ITransactionGrain>("1");

(queue, poison, transactions) = await transaction.GetStateAsync();
state = await transaction.GetStateAsync();
}

[Test]
public void State_Should_Have_Poison_Empty()
{
poison.ShouldBeEmpty();
state.Poison.ShouldBeEmpty();
}

[Test]
public void State_Should_Have_Queue_Empty()
{
queue.ShouldBeEmpty();
state.Queue.ShouldBeEmpty();
}

[Test]
public void State_Should_Have_Transactions_Single()
{
transactions.Count.ShouldEqual(1);
state.Transactions.Count.ShouldEqual(1);
}

[Test]
public void State_Should_Have_Transaction_Counts_Single()
{
state.TransactionCounts.Count.ShouldEqual(1);
}
}

Expand All @@ -136,25 +141,31 @@ public override async Task Act()

var transaction = client.GetGrain<ITransactionGrain>("1");

(queue, poison, transactions) = await transaction.GetStateAsync();
state = await transaction.GetStateAsync();
}

[Test]
public void State_Should_Have_Poison_Empty()
{
poison.ShouldBeEmpty();
state.Poison.ShouldBeEmpty();
}

[Test]
public void State_Should_Have_Queue_Empty()
{
queue.Count.ShouldEqual(1);
state.Queue.Count.ShouldEqual(1);
}

[Test]
public void State_Should_Have_Transactions_Single()
public void State_Should_Have_Transactions_Empty()
{
state.Transactions.ShouldBeEmpty();
}

[Test]
public void State_Should_Have_Transaction_Counts_Empty()
{
transactions.ShouldBeEmpty();
state.TransactionCounts.ShouldBeEmpty();
}
}

Expand All @@ -174,7 +185,7 @@ public override async Task Act()

var transaction = client.GetGrain<ITransactionGrain>("1");

(queue, poison, transactions) = await transaction.GetStateAsync();
state = await transaction.GetStateAsync();
}

[Test]
Expand All @@ -186,19 +197,25 @@ public void It_Should_Return_Second_Null()
[Test]
public void State_Should_Have_Poison_Empty()
{
poison.ShouldBeEmpty();
state.Poison.ShouldBeEmpty();
}

[Test]
public void State_Should_Have_Queue_Empty()
{
queue.ShouldBeEmpty();
state.Queue.ShouldBeEmpty();
}

[Test]
public void State_Should_Have_Transactions_Empty()
{
transactions.ShouldBeEmpty();
state.Transactions.ShouldBeEmpty();
}

[Test]
public void State_Should_Have_Transaction_Counts_Empty()
{
state.TransactionCounts.ShouldBeEmpty();
}
}

Expand All @@ -218,7 +235,7 @@ public override async Task Act()

var transaction = client.GetGrain<ITransactionGrain>("1");

(queue, poison, transactions) = await transaction.GetStateAsync();
state = await transaction.GetStateAsync();
}

[Test]
Expand All @@ -230,19 +247,25 @@ public void It_Should_Return_Second_Null()
[Test]
public void State_Should_Have_Poison_Single()
{
poison.Count.ShouldEqual(1);
state.Poison.Count.ShouldEqual(1);
}

[Test]
public void State_Should_Have_Queue_Empty()
{
queue.ShouldBeEmpty();
state.Queue.ShouldBeEmpty();
}

[Test]
public void State_Should_Have_Transactions_Empty()
{
transactions.ShouldBeEmpty();
state.Transactions.ShouldBeEmpty();
}

[Test]
public void State_Should_Have_Transaction_Counts_Empty()
{
state.TransactionCounts.ShouldBeEmpty();
}
}
}
Expand Down
100 changes: 94 additions & 6 deletions src/Orleans.Streaming.Grains.Tests/Streams/Scenarios/OneToMany.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using Orleans.Hosting;
using Orleans.Streaming.Grains.Abstract;
using Orleans.Streaming.Grains.Services;
using Orleans.Streaming.Grains.State;
using Orleans.Streaming.Grains.Streams;
using Orleans.Streaming.Grains.Test;
using Orleans.Streaming.Grains.Tests.Streams.Grains;
Expand Down Expand Up @@ -248,7 +249,93 @@ public void It_Should_Deliver_Expected_Data()

public class When_Sending_Broadcast_Message_One_To_Many_Error : BaseOneToManyTest
{
protected TimeSpan wait = TimeSpan.FromSeconds(10);
protected TransactionGrainState state;

protected string resultText;
protected string expectedText = "text";

protected byte[] resultData;
protected byte[] expectedData = new byte[1024];

public override void Prepare()
{
base.Prepare();

Processor!.Setup(x => x.Process(It.IsAny<string>()))
.Throws<Exception>();

Processor!.Setup(x => x.Process(It.IsAny<byte[]>()))
.Throws<Exception>();

for (var i = 0; i < 1024; i++)
{
expectedData[i] = Convert.ToByte(i % 2);
}
}

public override async Task Act()
{
var grain = Subject.GetGrain<IEmitterGrain>(Guid.NewGuid());
var transaction = Subject.GetGrain<ITransactionGrain>("broadcastmessage-0");

await grain.BroadcastAsync(expectedText, expectedData);

state = await transaction.GetStateAsync();
}

[Test]
public void It_Should_Deliver_Text()
{
Processor!.Verify(x => x.Process(expectedText), Times.Exactly(4));
}

[Test]
public void It_Should_Not_Deliver_Expected_Text()
{
resultText.ShouldBeNull();
}

[Test]
public void It_Should_Deliver_Data()
{
Processor!.Verify(x => x.Process(expectedData), Times.Exactly(4));
}

[Test]
public void It_Should_Not_Deliver_Expected_Data()
{
resultData.ShouldBeNull();
}

[Test]
public void State_Should_Have_Poison_Single()
{
state.Poison.Count.ShouldEqual(1);
}

[Test]
public void State_Should_Have_Queue_Empty()
{
state.Queue.ShouldBeEmpty();
}

[Test]
public void State_Should_Have_Transactions_Empty()
{
state.Transactions.ShouldBeEmpty();
}

[Test]
public void State_Should_Have_Transaction_Counts_Empty()
{
state.TransactionCounts.ShouldBeEmpty();
}
}

/* TODO Retry
public class When_Sending_Broadcast_Message_One_To_Many_Error : BaseOneToManyTest
{
protected TimeSpan wait = TimeSpan.FromSeconds(60);
protected string resultText;
protected Stopwatch timerText;
Expand All @@ -265,7 +352,7 @@ public override void Prepare()
Processor!.Setup(x => x.Process(It.IsAny<string>()))
.Callback<string>(x =>
{
timerText = timerText ?? Stopwatch.StartNew();
timerText ??= Stopwatch.StartNew();
if (timerText.Elapsed < wait)
{
Expand All @@ -280,7 +367,7 @@ public override void Prepare()
Processor!.Setup(x => x.Process(It.IsAny<byte[]>()))
.Callback<byte[]>(x =>
{
timerData = timerData ?? Stopwatch.StartNew();
timerData ??= Stopwatch.StartNew();
if (timerData.Elapsed < wait)
{
Expand All @@ -296,15 +383,16 @@ public override void Prepare()
{
expectedData[i] = Convert.ToByte(i % 2);
}

settings.Value.Timeout = TimeSpan.FromSeconds(2);
}
public override async Task Act()
{
var grain = Subject.GetGrain<IEmitterGrain>(Guid.NewGuid());
var transaction = Subject.GetGrain<ITransactionGrain>(nameof(BroadcastMessage));
await grain.BroadcastAsync(expectedText, expectedData);
var state = await transaction.GetStateAsync();
}
[Test]
Expand All @@ -330,6 +418,6 @@ public void It_Should_Deliver_Expected_Data()
{
expectedData.ShouldEqual(resultData);
}
}
}*/
}
}
7 changes: 5 additions & 2 deletions src/Orleans.Streaming.Grains/Abstract/ITransactionGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,24 @@
using System.Linq;
using System.Threading.Tasks;
using Orleans;
using Orleans.Streaming.Grains.State;

namespace Orleans.Streaming.Grains.Abstract
{
public interface ITransactionGrain : IGrainWithStringKey
{
Task FlushAsync();

Task<Guid?> PopAsync();

Task PostAsync(Guid id);

Task CompleteAsync(Guid id, bool success);

Task<TransactionGrainState> GetStateAsync();

Task SubscribeAsync(ITransactionObserver observer);

Task UnsubscribeAsync(ITransactionObserver observer);

Task<(Queue<Guid> Queue, Queue<Guid> Poison, Dictionary<Guid, DateTimeOffset> Transactions)> GetStateAsync();
}
}
Loading

0 comments on commit cc07377

Please sign in to comment.