Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
Merge pull request #56 from SoCreate/feature/BrokerClient
Browse files Browse the repository at this point in the history
BrokerClient
  • Loading branch information
loekd authored Mar 6, 2019
2 parents 68ce8d9 + de1b4e7 commit 40bfb6e
Show file tree
Hide file tree
Showing 53 changed files with 828 additions and 2,762 deletions.
164 changes: 74 additions & 90 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,6 @@ Do you need to reliably broadcast messages between Actors and Services?
This code will help you do that.
It supports both Actors and Services as publishers and subscribers.

It uses extension methods to
- Actor
- StatelessService
- StatefulService

## Contribute!
Contributions are welcome.
Please upgrade the package version with a minor tick if there are no breaking changes. And add a line to the readme.md, stating the changes, e.g. 'upgraded to SF version x.y.z'.
Expand All @@ -17,6 +12,7 @@ Please also make sure all feature additions have a corresponding unit test.

## Release notes:

- 8.0.0 Major cleanup and usability improvements. Replaced Helper classes with a single `BrokerClient`. Removed obsolete code (BrokerActor, RelayBrokerActor, extension methods). Removed the `ServiceFabric.PubSubActors.Interfaces` library. Simplified the BrokerService interface.
- 7.6.2 Fix routing key issue.
- 7.6.1 Fix hashing helper null ref issue.
- 7.6.0 Add routing key support, to support attribute based messaging. Fix hashing issue in dotnet core.
Expand Down Expand Up @@ -58,17 +54,15 @@ Please also make sure all feature additions have a corresponding unit test.

## Nuget:
https://www.nuget.org/packages/ServiceFabric.PubSubActors
https://www.nuget.org/packages/ServiceFabric.PubSubActors.Interfaces (for Actor interfaces)


## Getting started



Using this package you can reliably send messages from Publishers (Actors/Services) to many Subscribers (Actors/Services).
This is done using an intermediate, which is the BrokerActor or BrokerService.
This is done using an intermediate, which is the BrokerService.
Add this package to all Reliable Actor & Service projects that participate in the pub/sub messaging.
Add the package 'ServiceFabric.PubSubActors.Interfaces' to all (*ReliableActor).Interfaces projects.


| publisher | broker |subscriber|
Expand All @@ -88,16 +82,15 @@ Add the package 'ServiceFabric.PubSubActors.Interfaces' to all (*ReliableActor).

Add a new Stateful Reliable Service project. Call it 'PubSubService' (optional).
Add Nuget package 'ServiceFabric.PubSubActors' to the 'PubSubActor' project
Add Nuget package 'ServiceFabric.PubSubActors.Interfaces' to the project.
Replace the code of PubSubService with the following code:
```csharp
internal sealed class PubSubService : BrokerService
{
public PubSubService(StatefulServiceContext context)
: base(context)
{
//optional: provide a logging callback
ServiceEventSourceMessageCallback = message => ServiceEventSource.Current.ServiceMessage(this, message);
//optional: provide a logging callback
ServiceEventSourceMessageCallback = message => ServiceEventSource.Current.ServiceMessage(this, message);
}
}
```
Expand All @@ -123,37 +116,52 @@ public class PublishedMessageTwo
}
```

### Publishing messages from Service or Actor
```csharp
var brokerClient = new BrokerClient();
brokerClient.PublishMessageAsync(new PublishedMessageOne { Content = "Hello PubSub World, from Subscriber, using Broker Service!" })
```

### Subscribing to messages using Actors
*Create a sample Actor that implements 'ISubscriberActor', to become a subscriber to messages.*
In this example, the Actor called 'SubscribingActor' subscribes to messages of Type 'PublishedMessageOne'.

Add a Reliable Stateless Actor project called 'SubscribingActor'.
Add Nuget package 'ServiceFabric.PubSubActors' to the 'SubscribingActor' project
Add Nuget package 'ServiceFabric.PubSubActors.Interfaces' to the 'SubscribingActor.Interfaces' project.
Add a project reference to the shared data contracts library ('DataContracts').

Go to the SubscribingActor.Interfaces project, open the file 'ISubscribingActor' and replace the contents with this code:
**notice this implements ISubscriberActor from the package 'ServiceFabric.PubSubActors.Interfaces' which adds a Receive method. The additional methods are to enable this actor to be manipulated from the outside.**
Open the file 'SubscribingActor.cs' and replace the contents with the code below.
**notice that this Actor now implements 'ISubscriberActor' indirectly.**
```csharp
public interface ISubscribingActor : ISubscriberActor
[ActorService(Name = nameof(SubscribingActor))]
[StatePersistence(StatePersistence.None)]
internal class SubscribingActor : Actor, ISubscriberActor
{
private readonly IBrokerClient _brokerClient;

public SubscribingActor(ActorService actorService, ActorId actorId, IBrokerClient brokerClient)
: base(actorService, actorId)
{
// allow external callers to manipulate register/unregister on this sample actor:
//for regular messaging:
Task RegisterAsync();
Task UnregisterAsync();
//for relayed messaging:
Task RegisterWithRelayAsync();
Task UnregisterWithRelayAsync();
//for service broker messaging:
Task RegisterWithBrokerServiceAsync();
Task UnregisterWithBrokerServiceAsync();
_brokerClient = brokerClient ?? new BrokerClient();
}
```

Open the file 'SubscribingActor.cs' and replace the contents with the code below.
**notice that this Actor now implements 'ISubscriberActor' indirectly.**
https://github.com/loekd/ServiceFabric.PubSubActors/blob/master/ServiceFabric.PubSubActors.Demo/SubscribingActor/SubscribingActor.cs
protected override async Task OnActivateAsync()
{
await _brokerClient.SubscribeAsync<PublishedMessageOne>(this, HandleMessageOne);
}

public Task ReceiveMessageAsync(MessageWrapper message)
{
return _brokerClient.ProcessMessageAsync(message);
}

private Task HandleMessageOne(PublishedMessageOne message)
{
ActorEventSource.Current.ActorMessage(this, $"Received message: {message.Content}");
return Task.CompletedTask;
}
}
```

### Subscribing to messages using Services using our base class

Expand All @@ -162,14 +170,14 @@ In this example, the Service called 'SubscribingStatelessService' subscribes to

Add a Reliable Stateless Service project called 'SubscribingStatelessService'.
Add Nuget package 'ServiceFabric.PubSubActors'.
Add Nuget package 'ServiceFabric.PubSubActors.Interfaces'.
Add a project reference to the shared data contracts library ('DataContracts').

Now open the file SubscribingStatelessService.cs in the project 'SubscribingStatelessService' and replace the SubscribingStatelessService class with this code:
```csharp
internal sealed class SubscribingStatelessService : SubscriberStatelessServiceBase
{
public SubscribingStatelessService(StatelessServiceContext serviceContext, ISubscriberServiceHelper subscriberServiceHelper = null) : base(serviceContext, subscriberServiceHelper)
public SubscribingStatelessService(StatelessServiceContext serviceContext, IBrokerClient brokerClient = null)
: base(serviceContext, brokerClient)
{
}

Expand Down Expand Up @@ -198,9 +206,9 @@ If you don't want to inherit from our base classes, you can use the `StatefulSub
The code in `Program` will look like this:

```csharp
var helper = new SubscriberServiceHelper();
var brokerClient = new BrokerClient();
ServiceRuntime.RegisterServiceAsync("SubscribingStatelessServiceType",
context => new StatelessSubscriberServiceBootstrapper<SubscribingStatelessService >(context, ctx => new SubscribingStatelessService (ctx, helper), helper).Build())
context => new StatelessSubscriberServiceBootstrapper<SubscribingStatelessService>(context, ctx => new SubscribingStatelessService (ctx, brokerClient), brokerClient).Build())
.GetAwaiter().GetResult();
```

Expand All @@ -209,14 +217,17 @@ The service looks like this:
```csharp
internal sealed class SubscribingStatelessService : StatelessService, ISubscriberService
{
public SubscribingStatelessService(StatelessServiceContext serviceContext, ISubscriberServiceHelper subscriberServiceHelper = null) : base(serviceContext, subscriberServiceHelper)
private readonly IBrokerClient _brokerClient;

public SubscribingStatelessService(StatelessServiceContext serviceContext, IBrokerClient brokerClient = null) : base(serviceContext)
{
_brokerClient = brokerClient;
}

public Task ReceiveMessageAsync(MessageWrapper messageWrapper)
{
// Automatically delegates work to annotated methods withing this class.
return _subscriberServiceHelper.ProccessMessageAsync(messageWrapper);
return _brokerClient.ProccessMessageAsync(messageWrapper);
}

protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
Expand Down Expand Up @@ -246,64 +257,11 @@ For stateful services, use the `StatefulSubscriberServiceBootstrapper`.
*Check the Demo project for a working reference implementation.*


### Publishing messages from Actors
*Create a sample Actor that publishes messages.*
In this example, the Actor called 'PublishingActor' publishes messages of Type 'PublishedMessageOne'.

In this example, the Publisher Actor publishes messages of Type 'PublishedMessageOne'.
Add a Reliable Stateless Actor project called 'PublishingActor'.
Add Nuget package 'ServiceFabric.PubSubActors' to 'PublishingActor'.
Add Nuget package 'ServiceFabric.PubSubActors.Interfaces' to 'PublishingActor.Interfaces'.
Add a project reference to the shared data contracts library ('DataContracts').

Go to the project 'PublishingActor.Interfaces' and open the file IPublishingActor.cs.
Replace the contents with the code below, to allow external callers to trigger a publish action (not required, Actors can decide for themselves too):

```csharp
public interface IPublishingActor : IActor
{
//enables external callers to trigger a publish action, not required for functionality
Task<string> PublishMessageOneAsync();
Task<string> PublishMessageTwoAsync();
}
```

Now open the file PublishingActor.cs in the project 'PublishingActor' and replace the contents with this code:

https://github.com/loekd/ServiceFabric.PubSubActors/blob/master/ServiceFabric.PubSubActors.Demo/PublishingActor/PublishingActor.cs

### Publishing messages from Services
*Create a sample Service that publishes messages.*
In this example, the Service called 'PublishingStatelessService' publishes messages of Type 'PublishedMessageOne'.

Add a Reliable Stateless Service project called 'PublishingStatelessService'.
Add Nuget package 'ServiceFabric.PubSubActors' to 'PublishingStatelessService'.
Add Nuget package 'ServiceFabric.PubSubActors.Interfaces' to 'PublishingStatelessService'.
Add a project reference to the shared data contracts library ('DataContracts').

Go to the project 'DataContracts' and add an interface file IPublishingStatelessService.cs.
Add the code below:
```csharp
[ServiceContract]
public interface IPublishingStatelessService : IService
{
//allows external callers to trigger a publish action, not required for functionality
[OperationContract]
Task<string> PublishMessageOneAsync();
[OperationContract]
Task<string> PublishMessageTwoAsync();
}
```
Open the file 'PublishingStatelessService.cs'. Replace the contents with the code below:

https://github.com/loekd/ServiceFabric.PubSubActors/blob/master/ServiceFabric.PubSubActors.Demo/PublishingStatelessService/PublishingStatelessService.cs


## Routing
**This experimental feature works only when using the `DefaultPayloadSerializer`.**
It adds support for an additional subscriber filter, based on message content.

### Example
### Example

Given this message type:

Expand All @@ -322,7 +280,33 @@ And given a subscriber that is interested in Customers named 'Customer1'.
The subscription would be registered like this:

```csharp
await brokerService.RegisterServiceSubscriberAsync(serviceReference, typeof(CustomerMessage).FullName, "Customer.Name=Customer1");
await brokerClient.SubscribeAsync<CustomerMessage>(this, HandleMessageOne, routingKey: "Customer.Name=Customer1");
```

The routing key is queried by using `JToken.SelectToken`. More info [here](https://www.newtonsoft.com/json/help/html/SelectToken.htm).


## Upgrading to version 8
Significant changes were made in v8.0.0, including breaking changes to interfaces and tools.
* BrokerActor and RelayBrokerActor were removed. Actors don't make a good Broker, use BrokerService instead.
* ServiceFabric.PubSubActors.Interfaces library was removed. Only the main library is required now.
* Obsolete extension methods were removed.
* Helper classes were removed and replaced with `BrokerClient`.
* V2 is the only supported Service Fabric Remoting version.

Here are some tips to help you through the upgrade process:
1. Upgrade remoting to V2
* v8.0 only supports remoting V2. If you are using .NET Standard, you should already be using V2 and you can skip this step.
* Refer to documentation here: [Upgrade From Remoting V1 to Remoting V2](https://docs.microsoft.com/en-us/azure/service-fabric/service-fabric-reliable-services-communication-remoting#upgrade-from-remoting-v1-to-remoting-v2)
* Upgrade the Broker first. Override `BrokerService.CreateServiceReplicaListeners()` to use the extension method mentioned in the above link so you can use the assembly attribute to set up listening on V1 and V2.
* Upgrade Publisher and Subscriber services. Use the `useRemotingV2` option when creating the BrokerServiceLocator that is used by Helper classes.
2. Upgrade the Broker.
* Remove the ServiceFabric.PubSubActors.Interfaces library and upgrade the ServiceFabric.PubSubActors library to 8.0.
* Publish message and Receive message are backwards compatible, so update BrokerService first and publishers and subscribers should continue to function.
3. Upgrade Subscribers.
* Register/Unregister has been replaced by Subscribe/Unsubscribe, so you won't be able to subscribe/unsubscribe until you update subscriber services.
* Remove the ServiceFabric.PubSubActors.Interfaces library and upgrade the ServiceFabric.PubSubActors library to 8.0.
* Extension methods and helper classes have been removed. If you are using any of them in your Subscriber services, they will need to be refactored to use the BrokerClient or one of the Subscriber base classes (see documentation above).
4. Upgrade Publishers. (Optional, publishing is backwards compatible)
* Remove the ServiceFabric.PubSubActors.Interfaces library and upgrade the ServiceFabric.PubSubActors library to 8.0.
* Extension method and helper classes have been removed. Use BrokerClient.PublishMessageAsync() instead.
49 changes: 0 additions & 49 deletions ServiceFabric.PubSubActors.Interfaces/IBrokerActor.cs

This file was deleted.

44 changes: 0 additions & 44 deletions ServiceFabric.PubSubActors.Interfaces/IRelayBrokerActor.cs

This file was deleted.

22 changes: 0 additions & 22 deletions ServiceFabric.PubSubActors.Interfaces/Properties/AssemblyInfo.cs

This file was deleted.

Loading

0 comments on commit 40bfb6e

Please sign in to comment.