Skip to content

Commit

Permalink
clean code
Browse files Browse the repository at this point in the history
  • Loading branch information
Wojciech Turowicz committed Jul 26, 2023
1 parent 140c297 commit d5ef9c2
Showing 1 changed file with 8 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ namespace Orleans.Streaming.Grains.Tests.Streams.Grains
[ImplicitStreamSubscription(nameof(CompoundMessage))]
public class CompoundReceiverGrain : Grain, IBlobReceiverGrain
{
private IAsyncStream<BlobMessage> _blobStream;
private IAsyncStream<SimpleMessage> _simpleStream;
private StreamSubscriptionHandle<CompoundMessage> _subscription;

public override async Task OnActivateAsync(CancellationToken cancellationToken)
Expand All @@ -23,20 +25,20 @@ public override async Task OnActivateAsync(CancellationToken cancellationToken)
var stream = StreamFactory.Create<CompoundMessage>(streamProvider, this.GetPrimaryKey());

_subscription = await stream.SubscribeAsync(OnNextAsync);
_blobStream = StreamFactory.Create<BlobMessage>(streamProvider, this.GetPrimaryKey());
_simpleStream = StreamFactory.Create<SimpleMessage>(streamProvider, this.GetPrimaryKey());

await base.OnActivateAsync(cancellationToken);
}

private async Task OnNextAsync(CompoundMessage message, StreamSequenceToken token)
{
var streamProvider = this.GetStreamProvider("Default");
var blobStream = StreamFactory.Create<BlobMessage>(streamProvider, this.GetPrimaryKey());
var simpleStream = StreamFactory.Create<SimpleMessage>(streamProvider, this.GetPrimaryKey());

await blobStream.OnNextAsync(new BlobMessage
await _blobStream.OnNextAsync(new BlobMessage
{
Data = message.Data,
});

await simpleStream.OnNextAsync(new SimpleMessage
await _simpleStream.OnNextAsync(new SimpleMessage
{
Text = message.Text,
});
Expand Down

0 comments on commit d5ef9c2

Please sign in to comment.