Skip to content

Commit

Permalink
added ExtractorAdapter to automatically handle messages
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaronontheweb committed Aug 2, 2023
1 parent 8853190 commit e884298
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ private IActorRef CreateRegion(string typeName, bool rememberEntities)
entityProps: _ => QualifiedCounter.Props(typeName),
settings: settings,
coordinatorPath: "/user/" + typeName + "Coordinator/singleton/coordinator",
new MessageExtractorAdapter(ExtractEntityId, ExtractShardId),
new DeprecatedHandlerExtractorAdapter(ExtractEntityId, ExtractShardId),
handOffStopMessage: PoisonPill.Instance,
rememberEntitiesProvider: rememberEntitiesProvider),
name: typeName + "Region");
Expand Down Expand Up @@ -683,7 +683,7 @@ private void ClusterSharding_should_support_proxy_only_mode()
typeName: "counter",
settings: settings,
coordinatorPath: "/user/counterCoordinator/singleton/coordinator",
new MessageExtractorAdapter(ExtractEntityId, ExtractShardId)),
new DeprecatedHandlerExtractorAdapter(ExtractEntityId, ExtractShardId)),
"regionProxy");
proxy.Tell(new Get(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public async Task Persistent_Shard_must_recover_from_failing_entity(Props entity
"shard-1",
_ => entityProp,
settings,
new MessageExtractorAdapter(extractEntityId, extractShardId),
new DeprecatedHandlerExtractorAdapter(extractEntityId, extractShardId),
PoisonPill.Instance,
provider
));
Expand Down
69 changes: 59 additions & 10 deletions src/contrib/cluster/Akka.Cluster.Sharding/ClusterSharding.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using System.Threading.Tasks;

using Akka.Actor;
using Akka.Cluster.Sharding.Serialization.Proto.Msg;
using Akka.Cluster.Tools.Singleton;
using Akka.Configuration;
using Akka.Dispatch;
Expand All @@ -37,7 +38,7 @@ public interface IClusterShardingSerializable { }
/// <summary>
/// TBD
/// </summary>
public class ClusterShardingExtensionProvider : ExtensionIdProvider<ClusterSharding>
public sealed class ClusterShardingExtensionProvider : ExtensionIdProvider<ClusterSharding>
{
/// <summary>
/// TBD
Expand All @@ -51,6 +52,54 @@ public override ClusterSharding CreateExtension(ExtendedActorSystem system)
}
}

/// <summary>
/// INTERNAL API
///
/// Used to automatically handle built-in sharding messages when used with ClusterSharding.
/// </summary>
internal sealed class ExtractorAdapter : IMessageExtractor
{
private readonly IMessageExtractor _underlying;

public ExtractorAdapter(IMessageExtractor underlying)
{
_underlying = underlying;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public string? EntityId(object message)
{
return message switch
{
StartEntity se => se.EntityId,
ShardingEnvelope se => se.EntityId,
_ => _underlying.EntityId(message)
};
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public object? EntityMessage(object message)
{
return message switch
{
ShardingEnvelope se => se.Message,
_ => _underlying.EntityMessage(message)
};
}

[Obsolete("Use ShardId(EntityId, object) instead.")]
public string? ShardId(object message)
{
return _underlying.ShardId(message);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public string ShardId(string entityId, object? messageHint = null)
{
return _underlying.ShardId(entityId, messageHint);
}
}

/// <summary>
/// Convenience implementation of <see cref="IMessageExtractor"/> that
/// construct ShardId based on the <see cref="MurmurHash.StringHash"/> of the EntityId.
Expand Down Expand Up @@ -383,7 +432,7 @@ public IActorRef Start(
typeName,
_ => entityProps,
settings,
new MessageExtractorAdapter(extractEntityId, extractShardId),
new DeprecatedHandlerExtractorAdapter(extractEntityId, extractShardId),
allocationStrategy,
handOffStopMessage);
}
Expand Down Expand Up @@ -437,7 +486,7 @@ public Task<IActorRef> StartAsync(
typeName,
_ => entityProps,
settings,
new MessageExtractorAdapter(extractEntityId, extractShardId),
new DeprecatedHandlerExtractorAdapter(extractEntityId, extractShardId),
allocationStrategy,
handOffStopMessage);
}
Expand Down Expand Up @@ -758,7 +807,7 @@ public IActorRef Start(
typeName,
entityPropsFactory,
settings,
new MessageExtractorAdapter(extractEntityId, extractShardId),
new DeprecatedHandlerExtractorAdapter(extractEntityId, extractShardId),
allocationStrategy,
handOffStopMessage);
}
Expand Down Expand Up @@ -812,7 +861,7 @@ public Task<IActorRef> StartAsync(
typeName,
entityPropsFactory,
settings,
new MessageExtractorAdapter(extractEntityId, extractShardId),
new DeprecatedHandlerExtractorAdapter(extractEntityId, extractShardId),
allocationStrategy,
handOffStopMessage);
}
Expand Down Expand Up @@ -1220,7 +1269,7 @@ public IActorRef StartProxy(
return StartProxy(
typeName,
role,
new MessageExtractorAdapter(extractEntityId, extractShardId));
new DeprecatedHandlerExtractorAdapter(extractEntityId, extractShardId));
}

/// <summary>
Expand Down Expand Up @@ -1252,7 +1301,7 @@ public Task<IActorRef> StartProxyAsync(string typeName, string role, ExtractEnti
return StartProxyAsync(
typeName,
role,
new MessageExtractorAdapter(extractEntityId, extractShardId));
new DeprecatedHandlerExtractorAdapter(extractEntityId, extractShardId));
}

/// <summary>
Expand Down Expand Up @@ -1488,12 +1537,12 @@ public interface IMessageExtractor
///
/// For backwards compatibility reasons, we need to support the old delegate-based extractor API
/// </summary>
internal sealed class MessageExtractorAdapter : IMessageExtractor
internal sealed class DeprecatedHandlerExtractorAdapter : IMessageExtractor
{
private readonly ExtractEntityId _extractEntityId;
private readonly ExtractShardId _extractShardId;

public MessageExtractorAdapter(ExtractEntityId extractEntityId, ExtractShardId extractShardId)
public DeprecatedHandlerExtractorAdapter(ExtractEntityId extractEntityId, ExtractShardId extractShardId)
{
_extractEntityId = extractEntityId;
_extractShardId = extractShardId;
Expand All @@ -1519,7 +1568,7 @@ public MessageExtractorAdapter(ExtractEntityId extractEntityId, ExtractShardId e
public string ShardId(string entityId, object? messageHint = null)
{
if(messageHint is null)
throw new ArgumentNullException(nameof(messageHint), "MessageExtractorAdapter: Message hint must be provided when using the ShardId(EntityId, object) overload.");
throw new ArgumentNullException(nameof(messageHint), "DeprecatedHandlerExtractorAdapter: Message hint must be provided when using the ShardId(EntityId, object) overload.");
return _extractShardId(messageHint);
}
}
Expand Down
5 changes: 1 addition & 4 deletions src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ public ShardRegion(
_entityProps = entityProps;
_settings = settings;
_coordinatorPath = coordinatorPath;
_messageExtractor = messageExtractor;
_messageExtractor = new ExtractorAdapter(messageExtractor);
_handOffStopMessage = handOffStopMessage;
_rememberEntitiesProvider = rememberEntitiesProvider;

Expand Down Expand Up @@ -605,9 +605,6 @@ protected override bool Receive(object message)
case RestartShard restart:
DeliverRestartShard(restart, Sender);
return true;
case StartEntity se:
DeliverMessage(se.EntityId, message, Sender);
return true;
default:
var entityId = _messageExtractor.EntityId(message);
if (entityId is null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ namespace Akka.Cluster.Sharding
[System.Runtime.CompilerServices.NullableAttribute(new byte[] {
0,
1})]
public class ClusterShardingExtensionProvider : Akka.Actor.ExtensionIdProvider<Akka.Cluster.Sharding.ClusterSharding>
public sealed class ClusterShardingExtensionProvider : Akka.Actor.ExtensionIdProvider<Akka.Cluster.Sharding.ClusterSharding>
{
public ClusterShardingExtensionProvider() { }
public override Akka.Cluster.Sharding.ClusterSharding CreateExtension(Akka.Actor.ExtendedActorSystem system) { }
Expand Down Expand Up @@ -169,8 +169,9 @@ namespace Akka.Cluster.Sharding
1})] System.Func<object, object> messageExtractor = null) { }
public abstract string EntityId(object message);
public virtual object EntityMessage(object message) { }
[System.ObsoleteAttribute("Use ShardId(string, object?) instead")]
public virtual string ShardId(object message) { }
public string ShardId(string entityId, [System.Runtime.CompilerServices.NullableAttribute(2)] object messageHint = null) { }
public virtual string ShardId(string entityId, [System.Runtime.CompilerServices.NullableAttribute(2)] object messageHint = null) { }
}
public interface IActorSystemDependentAllocationStrategy : Akka.Actor.INoSerializationVerificationNeeded, Akka.Cluster.Sharding.IShardAllocationStrategy
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ namespace Akka.Cluster.Sharding
[System.Runtime.CompilerServices.NullableAttribute(new byte[] {
0,
1})]
public class ClusterShardingExtensionProvider : Akka.Actor.ExtensionIdProvider<Akka.Cluster.Sharding.ClusterSharding>
public sealed class ClusterShardingExtensionProvider : Akka.Actor.ExtensionIdProvider<Akka.Cluster.Sharding.ClusterSharding>
{
public ClusterShardingExtensionProvider() { }
public override Akka.Cluster.Sharding.ClusterSharding CreateExtension(Akka.Actor.ExtendedActorSystem system) { }
Expand Down Expand Up @@ -169,8 +169,9 @@ namespace Akka.Cluster.Sharding
1})] System.Func<object, object> messageExtractor = null) { }
public abstract string EntityId(object message);
public virtual object EntityMessage(object message) { }
[System.ObsoleteAttribute("Use ShardId(string, object?) instead")]
public virtual string ShardId(object message) { }
public string ShardId(string entityId, [System.Runtime.CompilerServices.NullableAttribute(2)] object messageHint = null) { }
public virtual string ShardId(string entityId, [System.Runtime.CompilerServices.NullableAttribute(2)] object messageHint = null) { }
}
public interface IActorSystemDependentAllocationStrategy : Akka.Actor.INoSerializationVerificationNeeded, Akka.Cluster.Sharding.IShardAllocationStrategy
{
Expand Down

0 comments on commit e884298

Please sign in to comment.