diff --git a/src/benchmark/Akka.Cluster.Benchmarks/Sharding/ShardMessageRoutingBenchmarks.cs b/src/benchmark/Akka.Cluster.Benchmarks/Sharding/ShardMessageRoutingBenchmarks.cs index 4e456f5e8c2..59a28ec48b8 100644 --- a/src/benchmark/Akka.Cluster.Benchmarks/Sharding/ShardMessageRoutingBenchmarks.cs +++ b/src/benchmark/Akka.Cluster.Benchmarks/Sharding/ShardMessageRoutingBenchmarks.cs @@ -20,7 +20,7 @@ namespace Akka.Cluster.Benchmarks.Sharding { - //[DotTraceDiagnoser] + [DotTraceDiagnoser] [Config(typeof(MonitoringConfig))] [SimpleJob(RunStrategy.Monitoring, launchCount: 10, warmupCount: 10)] public class ShardMessageRoutingBenchmarks diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingRememberEntitiesNewExtractorSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingRememberEntitiesNewExtractorSpec.cs index c3caf1eca5f..6a195ada788 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingRememberEntitiesNewExtractorSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingRememberEntitiesNewExtractorSpec.cs @@ -263,7 +263,7 @@ private void Cluster_with_min_nr_of_members_using_sharding_must_start_new_nodes_ { Within(TimeSpan.FromSeconds(30), () => { - // start it with a new shard id extractor, which will put the entities + // start it with a new shard id messageExtractor, which will put the entities // on different shards RunOn(() => diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingSpec.cs index 6a0a004faff..b6553f50db9 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingSpec.cs @@ -523,8 +523,7 @@ private IActorRef CreateRegion(string typeName, bool rememberEntities) entityProps: _ => QualifiedCounter.Props(typeName), settings: settings, coordinatorPath: "/user/" + typeName + "Coordinator/singleton/coordinator", - extractEntityId: ExtractEntityId, - extractShardId: ExtractShardId, + new DeprecatedHandlerExtractorAdapter(ExtractEntityId, ExtractShardId), handOffStopMessage: PoisonPill.Instance, rememberEntitiesProvider: rememberEntitiesProvider), name: typeName + "Region"); @@ -684,8 +683,7 @@ private void ClusterSharding_should_support_proxy_only_mode() typeName: "counter", settings: settings, coordinatorPath: "/user/counterCoordinator/singleton/coordinator", - extractEntityId: ExtractEntityId, - extractShardId: ExtractShardId), + new DeprecatedHandlerExtractorAdapter(ExtractEntityId, ExtractShardId)), "regionProxy"); proxy.Tell(new Get(1)); diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/RememberEntitiesShardIdExtractorChangeSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/RememberEntitiesShardIdExtractorChangeSpec.cs index 0482621555d..e24dd558e2e 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/RememberEntitiesShardIdExtractorChangeSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/RememberEntitiesShardIdExtractorChangeSpec.cs @@ -21,7 +21,7 @@ namespace Akka.Cluster.Sharding.Tests { /// - /// Covers that remembered entities is correctly migrated when used and the shard id extractor + /// Covers that remembered entities is correctly migrated when used and the shard id messageExtractor /// is changed so that entities should live on other shards after a full restart of the cluster. /// public class RememberEntitiesShardIdExtractorChangeSpec : AkkaSpec diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardEntityFailureSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardEntityFailureSpec.cs index 56446c49cf0..874b24c049d 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardEntityFailureSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ShardEntityFailureSpec.cs @@ -126,8 +126,7 @@ public async Task Persistent_Shard_must_recover_from_failing_entity(Props entity "shard-1", _ => entityProp, settings, - extractEntityId, - extractShardId, + new ExtractorAdapter(new DeprecatedHandlerExtractorAdapter(extractEntityId, extractShardId)), PoisonPill.Instance, provider )); diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ClusterSharding.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ClusterSharding.cs index e944e0f18e6..896d755ba5d 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ClusterSharding.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ClusterSharding.cs @@ -5,15 +5,18 @@ // //----------------------------------------------------------------------- +#nullable enable using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Collections.Immutable; using System.Linq; +using System.Runtime.CompilerServices; using System.Runtime.ExceptionServices; using System.Threading.Tasks; using Akka.Actor; +using Akka.Cluster.Sharding.Serialization.Proto.Msg; using Akka.Cluster.Tools.Singleton; using Akka.Configuration; using Akka.Dispatch; @@ -33,15 +36,10 @@ namespace Akka.Cluster.Sharding public interface IClusterShardingSerializable { } /// - /// TBD + /// INTERNAL API /// - public class ClusterShardingExtensionProvider : ExtensionIdProvider + public sealed class ClusterShardingExtensionProvider : ExtensionIdProvider { - /// - /// TBD - /// - /// TBD - /// TBD public override ClusterSharding CreateExtension(ExtendedActorSystem system) { var extension = new ClusterSharding(system); @@ -49,6 +47,53 @@ public override ClusterSharding CreateExtension(ExtendedActorSystem system) } } + /// + /// INTERNAL API + /// + /// Used to automatically handle built-in sharding messages when used with ClusterSharding. + /// + internal sealed class ExtractorAdapter : IMessageExtractor + { + private readonly IMessageExtractor _underlying; + + public ExtractorAdapter(IMessageExtractor underlying) + { + _underlying = underlying; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public string? EntityId(Msg message) + { + return message switch + { + ShardingEnvelope se => se.EntityId, + _ => _underlying.EntityId(message) + }; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public Msg? EntityMessage(Msg message) + { + return message switch + { + ShardingEnvelope se => se.Message, + _ => _underlying.EntityMessage(message) + }; + } + + [Obsolete("Use ShardId(EntityId, object) instead.")] + public string? ShardId(Msg message) + { + return _underlying.ShardId(message); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public string ShardId(string entityId, Msg? messageHint = null) + { + return _underlying.ShardId(entityId, messageHint); + } + } + /// /// Convenience implementation of that /// construct ShardId based on the of the EntityId. @@ -56,20 +101,20 @@ public override ClusterSharding CreateExtension(ExtendedActorSystem system) /// public abstract class HashCodeMessageExtractor : IMessageExtractor { - private class Implementation : HashCodeMessageExtractor + private sealed class Implementation : HashCodeMessageExtractor { - private readonly Func _entityIdExtractor; - private readonly Func _messageExtractor; - public Implementation(int maxNumberOfShards, Func entityIdExtractor, Func messageExtractor = null) : base(maxNumberOfShards) + private readonly Func _entityIdExtractor; + private readonly Func? _messageExtractor; + public Implementation(int maxNumberOfShards, Func entityIdExtractor, Func? messageExtractor = null) : base(maxNumberOfShards) { _entityIdExtractor = entityIdExtractor ?? throw new NullReferenceException(nameof(entityIdExtractor)); _messageExtractor = messageExtractor; } - public override string EntityId(object message) + public override string EntityId(Msg message) => _entityIdExtractor.Invoke(message); - public override object EntityMessage(object message) + public override Msg EntityMessage(Msg message) => _messageExtractor?.Invoke(message) ?? base.EntityMessage(message); } @@ -80,7 +125,7 @@ public override object EntityMessage(object message) /// /// /// - public static HashCodeMessageExtractor Create(int maxNumberOfShards, Func entityIdExtractor, Func messageExtractor = null) + public static HashCodeMessageExtractor Create(int maxNumberOfShards, Func entityIdExtractor, Func? messageExtractor = null) => new Implementation(maxNumberOfShards, entityIdExtractor, messageExtractor); /// @@ -109,14 +154,16 @@ protected HashCodeMessageExtractor(int maxNumberOfShards) /// /// TBD /// TBD - public abstract string EntityId(object message); + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public abstract string EntityId(Msg message); /// /// Default implementation pass on the message as is. /// /// TBD /// TBD - public virtual object EntityMessage(object message) + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public virtual Msg EntityMessage(Msg message) { return message; } @@ -126,7 +173,9 @@ public virtual object EntityMessage(object message) /// /// TBD /// TBD - public virtual string ShardId(object message) + [MethodImpl(MethodImplOptions.AggressiveInlining)] + [Obsolete("Use ShardId(string, object?) instead")] + public virtual string ShardId(Msg message) { EntityId id; if (message is ShardRegion.StartEntity se) @@ -136,6 +185,12 @@ public virtual string ShardId(object message) return _cachedIds[(Math.Abs(MurmurHash.StringHash(id)) % MaxNumberOfShards)]; } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public virtual string ShardId(string entityId, Msg? messageHint = null) + { + return _cachedIds[(Math.Abs(MurmurHash.StringHash(entityId)) % MaxNumberOfShards)]; + } } @@ -365,14 +420,13 @@ public IActorRef Start( ExtractEntityId extractEntityId, ExtractShardId extractShardId, IShardAllocationStrategy allocationStrategy, - object handOffStopMessage) + Msg handOffStopMessage) { return InternalStart( typeName, _ => entityProps, settings, - extractEntityId, - extractShardId, + new DeprecatedHandlerExtractorAdapter(extractEntityId, extractShardId), allocationStrategy, handOffStopMessage); } @@ -420,14 +474,13 @@ public Task StartAsync( ExtractEntityId extractEntityId, ExtractShardId extractShardId, IShardAllocationStrategy allocationStrategy, - object handOffStopMessage) + Msg handOffStopMessage) { return InternalStartAsync( typeName, _ => entityProps, settings, - extractEntityId, - extractShardId, + new DeprecatedHandlerExtractorAdapter(extractEntityId, extractShardId), allocationStrategy, handOffStopMessage); } @@ -562,14 +615,13 @@ public IActorRef Start( ClusterShardingSettings settings, IMessageExtractor messageExtractor, IShardAllocationStrategy allocationStrategy, - object handOffStopMessage) + Msg handOffStopMessage) { - return Start( + return InternalStart( typeName, - entityProps, + _ => entityProps, settings, - messageExtractor.ToExtractEntityId(), - messageExtractor.ShardId, + messageExtractor, allocationStrategy, handOffStopMessage); } @@ -610,14 +662,13 @@ public Task StartAsync( ClusterShardingSettings settings, IMessageExtractor messageExtractor, IShardAllocationStrategy allocationStrategy, - object handOffStopMessage) + Msg handOffStopMessage) { - return StartAsync( + return InternalStartAsync( typeName, - entityProps, + _ => entityProps, settings, - messageExtractor.ToExtractEntityId(), - messageExtractor.ShardId, + messageExtractor, allocationStrategy, handOffStopMessage); } @@ -744,14 +795,13 @@ public IActorRef Start( ExtractEntityId extractEntityId, ExtractShardId extractShardId, IShardAllocationStrategy allocationStrategy, - object handOffStopMessage) + Msg handOffStopMessage) { return InternalStart( typeName, entityPropsFactory, settings, - extractEntityId, - extractShardId, + new DeprecatedHandlerExtractorAdapter(extractEntityId, extractShardId), allocationStrategy, handOffStopMessage); } @@ -799,14 +849,13 @@ public Task StartAsync( ExtractEntityId extractEntityId, ExtractShardId extractShardId, IShardAllocationStrategy allocationStrategy, - object handOffStopMessage) + Msg handOffStopMessage) { return InternalStartAsync( typeName, entityPropsFactory, settings, - extractEntityId, - extractShardId, + new DeprecatedHandlerExtractorAdapter(extractEntityId, extractShardId), allocationStrategy, handOffStopMessage); } @@ -815,10 +864,9 @@ private IActorRef InternalStart( string typeName, Func entityPropsFactory, ClusterShardingSettings settings, - ExtractEntityId extractEntityId, - ExtractShardId extractShardId, + IMessageExtractor extractor, IShardAllocationStrategy allocationStrategy, - object handOffStopMessage) + Msg handOffStopMessage) { if (settings.ShouldHostShard(_cluster)) { @@ -834,8 +882,7 @@ private IActorRef InternalStart( typeName, entityPropsFactory, settings, - extractEntityId, - extractShardId, + extractor, allocationStrategy, handOffStopMessage); @@ -861,8 +908,7 @@ private IActorRef InternalStart( return StartProxy( typeName, settings.Role, - extractEntityId, - extractShardId); + extractor); } } @@ -870,10 +916,9 @@ private async Task InternalStartAsync( string typeName, Func entityPropsFactory, ClusterShardingSettings settings, - ExtractEntityId extractEntityId, - ExtractShardId extractShardId, + IMessageExtractor extractor, IShardAllocationStrategy allocationStrategy, - object handOffStopMessage) + Msg handOffStopMessage) { if (settings.ShouldHostShard(_cluster)) { @@ -889,8 +934,7 @@ private async Task InternalStartAsync( typeName, entityPropsFactory, settings, - extractEntityId, - extractShardId, + extractor, allocationStrategy, handOffStopMessage); @@ -916,8 +960,7 @@ private async Task InternalStartAsync( return StartProxy( typeName, settings.Role, - extractEntityId, - extractShardId); + extractor); } } @@ -1051,14 +1094,13 @@ public IActorRef Start( ClusterShardingSettings settings, IMessageExtractor messageExtractor, IShardAllocationStrategy allocationStrategy, - object handOffStopMessage) + Msg handOffStopMessage) { - return Start( + return InternalStart( typeName, entityPropsFactory, settings, - messageExtractor.ToExtractEntityId(), - messageExtractor.ShardId, + messageExtractor, allocationStrategy, handOffStopMessage); } @@ -1099,14 +1141,13 @@ public Task StartAsync( ClusterShardingSettings settings, IMessageExtractor messageExtractor, IShardAllocationStrategy allocationStrategy, - object handOffStopMessage) + Msg handOffStopMessage) { - return StartAsync( + return InternalStartAsync( typeName, entityPropsFactory, settings, - messageExtractor.ToExtractEntityId(), - messageExtractor.ShardId, + messageExtractor, allocationStrategy, handOffStopMessage); } @@ -1219,30 +1260,10 @@ public IActorRef StartProxy( ExtractEntityId extractEntityId, ExtractShardId extractShardId) { - if (_proxies.TryGetValue(typeName, out var shardProxy)) - { - // already started, use cached ActorRef - return shardProxy; - } - // it's ok to StartProxy several time, the guardian will deduplicate concurrent requests - var timeout = _system.Settings.CreationTimeout; - var settings = ClusterShardingSettings.Create(_system).WithRole(role); - var startMsg = new ClusterShardingGuardian.StartProxy(typeName, settings, extractEntityId, extractShardId); - var reply = _guardian.Value.Ask(startMsg, timeout).Result; - switch (reply) - { - case ClusterShardingGuardian.Started started: - shardProxy = started.ShardRegion; - _proxies.TryAdd(typeName, shardProxy); - return shardProxy; - - case Status.Failure failure: - ExceptionDispatchInfo.Capture(failure.Cause).Throw(); - return ActorRefs.Nobody; - - default: - throw new ActorInitializationException($"Unsupported guardian response: {reply}"); - } + return StartProxy( + typeName, + role, + new DeprecatedHandlerExtractorAdapter(extractEntityId, extractShardId)); } /// @@ -1269,7 +1290,33 @@ public IActorRef StartProxy( /// that passed the `extractEntityId` will be used /// /// The actor ref of the that is to be responsible for the shard. - public async Task StartProxyAsync(string typeName, string role, ExtractEntityId extractEntityId, ExtractShardId extractShardId) + public Task StartProxyAsync(string typeName, string role, ExtractEntityId extractEntityId, ExtractShardId extractShardId) + { + return StartProxyAsync( + typeName, + role, + new DeprecatedHandlerExtractorAdapter(extractEntityId, extractShardId)); + } + + /// + /// Register a named entity type on this node that will run in proxy only mode, + /// i.e. it will delegate messages to other actors on other nodes, but not host any + /// entity actors itself. The actor for this type can later be retrieved with the + /// method. + /// + /// Some settings can be configured as described in the `akka.cluster.sharding` section + /// of the `reference.conf`. + /// + /// The name of the entity type. + /// + /// Specifies that this entity type is located on cluster nodes with a specific role. + /// If the role is not specified all nodes in the cluster are used. + /// + /// + /// Functions to extract the entity id, shard id, and the message to send to the entity from the incoming message. + /// + /// The actor ref of the that is to be responsible for the shard. + public IActorRef StartProxy(string typeName, string role, IMessageExtractor messageExtractor) { if (_proxies.TryGetValue(typeName, out var shardProxy)) { @@ -1279,8 +1326,8 @@ public async Task StartProxyAsync(string typeName, string role, Extra // it's ok to StartProxy several time, the guardian will deduplicate concurrent requests var timeout = _system.Settings.CreationTimeout; var settings = ClusterShardingSettings.Create(_system).WithRole(role); - var startMsg = new ClusterShardingGuardian.StartProxy(typeName, settings, extractEntityId, extractShardId); - var reply = await _guardian.Value.Ask(startMsg, timeout).ConfigureAwait(false); + var startMsg = new ClusterShardingGuardian.StartProxy(typeName, settings, messageExtractor); + var reply = _guardian.Value.Ask(startMsg, timeout).Result; switch (reply) { case ClusterShardingGuardian.Started started: @@ -1315,40 +1362,32 @@ public async Task StartProxyAsync(string typeName, string role, Extra /// Functions to extract the entity id, shard id, and the message to send to the entity from the incoming message. /// /// The actor ref of the that is to be responsible for the shard. - public IActorRef StartProxy(string typeName, string role, IMessageExtractor messageExtractor) + public async Task StartProxyAsync(string typeName, string role, IMessageExtractor messageExtractor) { - return StartProxy( - typeName, - role, - messageExtractor.ToExtractEntityId(), - messageExtractor.ShardId); - } + if (_proxies.TryGetValue(typeName, out var shardProxy)) + { + // already started, use cached ActorRef + return shardProxy; + } + // it's ok to StartProxy several time, the guardian will deduplicate concurrent requests + var timeout = _system.Settings.CreationTimeout; + var settings = ClusterShardingSettings.Create(_system).WithRole(role); + var startMsg = new ClusterShardingGuardian.StartProxy(typeName, settings, messageExtractor); + var reply = await _guardian.Value.Ask(startMsg, timeout).ConfigureAwait(false); + switch (reply) + { + case ClusterShardingGuardian.Started started: + shardProxy = started.ShardRegion; + _proxies.TryAdd(typeName, shardProxy); + return shardProxy; - /// - /// Register a named entity type on this node that will run in proxy only mode, - /// i.e. it will delegate messages to other actors on other nodes, but not host any - /// entity actors itself. The actor for this type can later be retrieved with the - /// method. - /// - /// Some settings can be configured as described in the `akka.cluster.sharding` section - /// of the `reference.conf`. - /// - /// The name of the entity type. - /// - /// Specifies that this entity type is located on cluster nodes with a specific role. - /// If the role is not specified all nodes in the cluster are used. - /// - /// - /// Functions to extract the entity id, shard id, and the message to send to the entity from the incoming message. - /// - /// The actor ref of the that is to be responsible for the shard. - public Task StartProxyAsync(string typeName, string role, IMessageExtractor messageExtractor) - { - return StartProxyAsync( - typeName, - role, - messageExtractor.ToExtractEntityId(), - messageExtractor.ShardId); + case Status.Failure failure: + ExceptionDispatchInfo.Capture(failure.Cause).Throw(); + return ActorRefs.Nobody; + + default: + throw new ActorInitializationException($"Unsupported guardian response: {reply}"); + } } /// @@ -1359,7 +1398,7 @@ public Task StartProxyAsync(string typeName, string role, IMessageExt #pragma warning disable CS0419 // Ambiguous reference in cref attribute /// /// Retrieve the actor reference of the actor responsible for the named entity type. - /// The entity type must be registered with the or method before it + /// The entity type must be registered with the or method before it /// can be used here. Messages to the entity is always sent via the . /// /// TBD @@ -1385,7 +1424,7 @@ public IActorRef ShardRegion(string typeName) /// Retrieve the actor reference of the actor that will act as a proxy to the /// named entity type running in another data center. A proxy within the same data center can be accessed /// with instead of this method. The entity type must be registered with the - /// method before it can be used here. Messages to the entity is always sent + /// method before it can be used here. Messages to the entity is always sent /// via the . /// /// @@ -1432,6 +1471,7 @@ public IShardAllocationStrategy DefaultShardAllocationStrategy(ClusterShardingSe /// Only messages that passed the will be used /// as input to this function. /// + [Obsolete("Use HashCodeMessageExtractor or IMessageExtractor instead.")] public delegate ShardId ExtractShardId(Msg message); /// @@ -1444,6 +1484,7 @@ public IShardAllocationStrategy DefaultShardAllocationStrategy(ClusterShardingSe /// message to support wrapping in message envelope that is unwrapped before /// sending to the entity actor. /// + [Obsolete("Use HashCodeMessageExtractor or IMessageExtractor instead.")] public delegate Option<(EntityId, Msg)> ExtractEntityId(Msg message); /// @@ -1459,7 +1500,7 @@ public interface IMessageExtractor /// /// TBD /// TBD - EntityId EntityId(object message); + EntityId? EntityId(Msg message); /// /// Extract the message to send to the entity from an incoming . @@ -1469,38 +1510,67 @@ public interface IMessageExtractor /// /// TBD /// TBD - object EntityMessage(object message); + Msg? EntityMessage(Msg message); /// /// Extract the shard id from an incoming . Only messages that /// passed the method will be used as input to this method. /// - /// TBD - /// TBD - string ShardId(object message); + /// The message being delivered to the entity actor. + /// The ShardId. + [Obsolete("Use ShardId(EntityId, object) instead.")] + ShardId? ShardId(Msg message); + + /// + /// More performant overload of that accepts an entity id in order to + /// allow faster method chaining and comparisons inside Akka.NET. + /// + /// Should always be populated with a non-null value. + /// The message - FOR BACKWARDS COMPATIBILITY ONLY. + /// The ShardId. + ShardId ShardId(EntityId entityId, Msg? messageHint = null); } /// - /// TBD + /// INTERNAL API + /// + /// For backwards compatibility reasons, we need to support the old delegate-based extractor API /// - internal static class Extensions + internal sealed class DeprecatedHandlerExtractorAdapter : IMessageExtractor { - /// - /// TBD - /// - /// TBD - /// TBD - public static ExtractEntityId ToExtractEntityId(this IMessageExtractor self) +#pragma warning disable CS0618 // Type or member is obsolete + private readonly ExtractEntityId _extractEntityId; + private readonly ExtractShardId _extractShardId; + + public DeprecatedHandlerExtractorAdapter(ExtractEntityId extractEntityId, ExtractShardId extractShardId) { - Option<(EntityId, Msg)> ExtractEntityId(object msg) - { - if (self.EntityId(msg) != null) - return (self.EntityId(msg), self.EntityMessage(msg)); + _extractEntityId = extractEntityId; + _extractShardId = extractShardId; + } - return Option<(string, object)>.None; - }; + public string? EntityId(Msg message) + { + var entityId = _extractEntityId(message); + return entityId.HasValue ? entityId.Value.Item1 : null; + } - return ExtractEntityId; + public Msg? EntityMessage(Msg message) + { + var entityId = _extractEntityId(message); + return entityId.HasValue ? entityId.Value.Item2 : null; } + + public string? ShardId(Msg message) + { + return _extractShardId(message); + } + + public string ShardId(string entityId, Msg? messageHint = null) + { + if(messageHint is null) + throw new ArgumentNullException(nameof(messageHint), "DeprecatedHandlerExtractorAdapter: Message hint must be provided when using the ShardId(EntityId, object) overload."); + return _extractShardId(messageHint); + } +#pragma warning restore CS0618 // Type or member is obsolete } } diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ClusterShardingGuardian.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ClusterShardingGuardian.cs index 1f76a1818c6..7366f114ded 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ClusterShardingGuardian.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ClusterShardingGuardian.cs @@ -11,7 +11,6 @@ using Akka.Actor; using Akka.Cluster.Sharding.Internal; using Akka.Cluster.Tools.Singleton; -using Akka.Dispatch.SysMsg; using Akka.DistributedData; using Akka.Pattern; @@ -63,14 +62,9 @@ public sealed class Start : INoSerializationVerificationNeeded /// TBD /// public readonly ClusterShardingSettings Settings; - /// - /// TBD - /// - public readonly ExtractEntityId ExtractEntityId; - /// - /// TBD - /// - public readonly ExtractShardId ExtractShardId; + + public readonly IMessageExtractor MessageExtractor; + /// /// TBD /// @@ -86,8 +80,7 @@ public sealed class Start : INoSerializationVerificationNeeded /// TBD /// TBD /// TBD - /// TBD - /// TBD + /// /// TBD /// TBD /// @@ -97,8 +90,7 @@ public Start( string typeName, Func entityProps, ClusterShardingSettings settings, - ExtractEntityId extractEntityId, - ExtractShardId extractShardId, + IMessageExtractor extractor, IShardAllocationStrategy allocationStrategy, object handOffStopMessage) { @@ -108,8 +100,7 @@ public Start( TypeName = typeName; EntityProps = entityProps; Settings = settings; - ExtractEntityId = extractEntityId; - ExtractShardId = extractShardId; + MessageExtractor = extractor; AllocationStrategy = allocationStrategy; HandOffStopMessage = handOffStopMessage; } @@ -129,37 +120,28 @@ public sealed class StartProxy : INoSerializationVerificationNeeded /// TBD /// public readonly ClusterShardingSettings Settings; - /// - /// TBD - /// - public readonly ExtractEntityId ExtractEntityId; - /// - /// TBD - /// - public readonly ExtractShardId ExtractShardId; + + public IMessageExtractor MessageExtractor; /// /// TBD /// /// TBD /// TBD - /// TBD - /// TBD + /// /// /// This exception is thrown when the specified is undefined. /// public StartProxy( string typeName, ClusterShardingSettings settings, - ExtractEntityId extractEntityId, - ExtractShardId extractShardId) + IMessageExtractor messageExtractor) { if (string.IsNullOrEmpty(typeName)) throw new ArgumentNullException(nameof(typeName), "ClusterSharding start proxy requires type name to be provided"); TypeName = typeName; Settings = settings; - ExtractEntityId = extractEntityId; - ExtractShardId = extractShardId; + MessageExtractor = messageExtractor; } } @@ -248,8 +230,7 @@ public ClusterShardingGuardian( entityProps: start.EntityProps, settings: settings, coordinatorPath: coordinatorPath, - extractEntityId: start.ExtractEntityId, - extractShardId: start.ExtractShardId, + messageExtractor: start.MessageExtractor, handOffStopMessage: start.HandOffStopMessage, rememberEntitiesStoreProvider) .WithDispatcher(Context.Props.Dispatcher), encName); @@ -281,8 +262,7 @@ public ClusterShardingGuardian( typeName: startProxy.TypeName, settings: settings, coordinatorPath: coordinatorPath, - extractEntityId: startProxy.ExtractEntityId, - extractShardId: startProxy.ExtractShardId) + messageExtractor: startProxy.MessageExtractor) .WithDispatcher(Context.Props.Dispatcher), encName)); _proxies.TryAdd(startProxy.TypeName, shardRegion); diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/Internal/RememberEntityStarter.cs b/src/contrib/cluster/Akka.Cluster.Sharding/Internal/RememberEntityStarter.cs index 485ec915c97..5ac5a157213 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/Internal/RememberEntityStarter.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/Internal/RememberEntityStarter.cs @@ -144,7 +144,7 @@ private void OnStartBatch(int batchSize) private void OnStartBatch(IImmutableSet entityIds) { // these go through the region rather the directly to the shard - // so that shard id extractor changes make them start on the right shard + // so that shard id messageExtractor changes make them start on the right shard _waitingForAck = _waitingForAck.Union(entityIds); foreach (var entityId in entityIds) _region.Tell(new ShardRegion.StartEntity(entityId)); diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs b/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs index 1ed5b982b37..f672fe9acee 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs @@ -5,6 +5,7 @@ // //----------------------------------------------------------------------- +#nullable enable using System; using System.Collections.Generic; using System.Collections.Immutable; @@ -17,6 +18,7 @@ using Akka.Pattern; using Akka.Util; using Akka.Util.Internal; +using Debug = System.Diagnostics.Debug; namespace Akka.Cluster.Sharding { @@ -38,7 +40,9 @@ internal sealed class Shard : ActorBase, IWithTimers, IWithUnboundedStash /// /// A Shard command /// - public interface IRememberEntityCommand { } + public interface IRememberEntityCommand + { + } /// /// When remembering entities and the entity stops without issuing a `Passivate`, we @@ -56,12 +60,12 @@ public RestartTerminatedEntity(EntityId entity) #region Equals /// - public override bool Equals(object obj) + public override bool Equals(object? obj) { return Equals(obj as RestartTerminatedEntity); } - public bool Equals(RestartTerminatedEntity other) + public bool Equals(RestartTerminatedEntity? other) { if (ReferenceEquals(other, null)) return false; if (ReferenceEquals(other, this)) return true; @@ -82,7 +86,7 @@ public override int GetHashCode() } /// - /// If the shard id extractor is changed, remembered entities will start in a different shard + /// If the shard id messageExtractor is changed, remembered entities will start in a different shard /// and this message is sent to the shard to not leak `entityId -> RememberedButNotStarted` entries /// public sealed class EntitiesMovedToOtherShard : IRememberEntityCommand, IEquatable @@ -97,12 +101,12 @@ public EntitiesMovedToOtherShard(IImmutableSet ids) #region Equals /// - public override bool Equals(object obj) + public override bool Equals(object? obj) { return Equals(obj as EntitiesMovedToOtherShard); } - public bool Equals(EntitiesMovedToOtherShard other) + public bool Equals(EntitiesMovedToOtherShard? other) { if (ReferenceEquals(other, null)) return false; if (ReferenceEquals(other, this)) return true; @@ -131,7 +135,9 @@ public override int GetHashCode() /// /// A query for information about the shard /// - public interface IShardQuery { } + public interface IShardQuery + { + } /// /// TBD @@ -159,6 +165,7 @@ public sealed class CurrentShardState : IClusterShardingSerializable, IEquatable /// TBD /// public readonly ShardId ShardId; + /// /// TBD /// @@ -178,18 +185,18 @@ public CurrentShardState(ShardId shardId, IImmutableSet entityIds) #region Equals /// - public override bool Equals(object obj) + public override bool Equals(object? obj) { return Equals(obj as CurrentShardState); } - public bool Equals(CurrentShardState other) + public bool Equals(CurrentShardState? other) { if (ReferenceEquals(other, null)) return false; if (ReferenceEquals(other, this)) return true; return ShardId.Equals(other.ShardId) - && EntityIds.SetEquals(other.EntityIds); + && EntityIds.SetEquals(other.EntityIds); } /// @@ -205,7 +212,8 @@ public override int GetHashCode() } /// - public override string ToString() => $"CurrentShardState(shardId:{ShardId}, entityIds:{string.Join(", ", EntityIds)})"; + public override string ToString() => + $"CurrentShardState(shardId:{ShardId}, entityIds:{string.Join(", ", EntityIds)})"; #endregion } @@ -239,6 +247,7 @@ public sealed class ShardStats : IClusterShardingSerializable, IEquatable public readonly ShardId ShardId; + /// /// TBD /// @@ -258,18 +267,18 @@ public ShardStats(ShardId shardId, int entityCount) #region Equals /// - public override bool Equals(object obj) + public override bool Equals(object? obj) { return Equals(obj as ShardStats); } - public bool Equals(ShardStats other) + public bool Equals(ShardStats? other) { if (ReferenceEquals(other, null)) return false; if (ReferenceEquals(other, this)) return true; return ShardId.Equals(other.ShardId) - && EntityCount.Equals(other.EntityCount); + && EntityCount.Equals(other.EntityCount); } /// @@ -294,9 +303,9 @@ public sealed class LeaseAcquireResult : IDeadLetterSuppression, INoSerializatio { public readonly bool Acquired; - public readonly Exception Reason; + public readonly Exception? Reason; - public LeaseAcquireResult(bool acquired, Exception reason) + public LeaseAcquireResult(bool acquired, Exception? reason) { Acquired = acquired; Reason = reason; @@ -318,7 +327,10 @@ public LeaseLost(Exception reason) public sealed class LeaseRetry : IDeadLetterSuppression, INoSerializationVerificationNeeded { public static readonly LeaseRetry Instance = new(); - private LeaseRetry() { } + + private LeaseRetry() + { + } } @@ -329,18 +341,16 @@ public static Props Props( ShardId shardId, Func entityProps, ClusterShardingSettings settings, - ExtractEntityId extractEntityId, - ExtractShardId extractShardId, + IMessageExtractor extractor, object handOffStopMessage, - IRememberEntitiesProvider rememberEntitiesProvider) + IRememberEntitiesProvider? rememberEntitiesProvider) { return Actor.Props.Create(() => new Shard( typeName, shardId, entityProps, settings, - extractEntityId, - extractShardId, + extractor, handOffStopMessage, rememberEntitiesProvider)).WithDeploy(Deploy.Local); } @@ -349,7 +359,10 @@ public static Props Props( public sealed class PassivateIdleTick : INoSerializationVerificationNeeded { public static readonly PassivateIdleTick Instance = new(); - private PassivateIdleTick() { } + + private PassivateIdleTick() + { + } } private sealed class EntityTerminated @@ -394,12 +407,11 @@ public RememberEntityTimeout(RememberEntitiesShardStore.ICommand operation) public RememberEntitiesShardStore.ICommand Operation { get; } } - #endregion // // State machine for an entity: - // Started on another shard bc. shard id extractor changed (we need to store that) + // Started on another shard bc. shard id messageExtractor changed (we need to store that) // +------------------------------------------------------------------+ // | | // Entity id remembered on shard start +-------------------------+ StartEntity or early message for entity | @@ -437,7 +449,9 @@ internal abstract class EntityState protected EntityState InvalidTransition(EntityState to, Entities entities) { - var exception = new ArgumentException($"Transition from {this} to {to} not allowed, remember entities: {entities.RememberingEntities}"); + var exception = + new ArgumentException( + $"Transition from {this} to {to} not allowed, remember entities: {entities.RememberingEntities}"); if (entities.FailOnIllegalTransition) { // crash shard @@ -528,7 +542,7 @@ internal sealed class RememberingStart : EntityState, IEquatable.Empty); - public static RememberingStart Create(IActorRef ackTo) + public static RememberingStart Create(IActorRef? ackTo) { if (ackTo == null) return Empty; @@ -571,12 +585,12 @@ public override EntityState Transition(EntityState newState, Entities entities) #region Equals /// - public override bool Equals(object obj) + public override bool Equals(object? obj) { return Equals(obj as RememberingStart); } - public bool Equals(RememberingStart other) + public bool Equals(RememberingStart? other) { if (ReferenceEquals(other, null)) return false; if (ReferenceEquals(other, this)) return true; @@ -642,12 +656,12 @@ public WithRef(IActorRef @ref) #region Equals /// - public override bool Equals(object obj) + public override bool Equals(object? obj) { return Equals(obj as WithRef); } - public bool Equals(WithRef other) + public bool Equals(WithRef? other) { if (ReferenceEquals(other, null)) return false; if (ReferenceEquals(other, this)) return true; @@ -739,8 +753,10 @@ public override EntityState Transition(EntityState newState, Entities entities) internal sealed class Entities { private readonly Dictionary _entities = new(); + // needed to look up entity by ref when a Passivating is received private readonly Dictionary _byRef = new(); + // optimization to not have to go through all entities to find batched writes private readonly HashSet _remembering = new(); @@ -774,7 +790,7 @@ public void AlreadyRemembered(IImmutableSet set) } } - public void RememberingStart(EntityId entityId, IActorRef ackTo) + public void RememberingStart(EntityId entityId, IActorRef? ackTo) { var newState = Shard.RememberingStart.Create(ackTo); var state = EntityState(entityId).Transition(newState, this); @@ -821,13 +837,11 @@ public void AddEntity(EntityId entityId, IActorRef @ref) _remembering.Remove(entityId); } - public IActorRef Entity(EntityId entityId) + public IActorRef? Entity(EntityId entityId) { - if (_entities.TryGetValue(entityId, out var state)) - { - if (state is WithRef wr) - return wr.Ref; - } + if (!_entities.TryGetValue(entityId, out var state)) return null; + if (state is WithRef wr) + return wr.Ref; return null; } @@ -838,7 +852,7 @@ public EntityState EntityState(EntityId id) return NoState.Instance; } - public EntityId EntityId(IActorRef @ref) + public EntityId? EntityId(IActorRef @ref) { if (_byRef.TryGetValue(@ref, out var entityId)) return entityId; @@ -862,7 +876,8 @@ public void EntityPassivating(EntityId entityId) } else { - throw new IllegalStateException($"Tried to passivate entity without an actor ref {entityId}. Current state {oldState}"); + throw new IllegalStateException( + $"Tried to passivate entity without an actor ref {entityId}. Current state {oldState}"); } } @@ -880,13 +895,15 @@ private void RemoveRefIfThereIsOne(EntityState state) // only called for getting shard stats public IImmutableSet ActiveEntityIds => _byRef.Values.ToImmutableHashSet(); - public (IImmutableDictionary Start, IImmutableSet Stop) PendingRememberEntities + public (IImmutableDictionary Start, IImmutableSet Stop) + PendingRememberEntities { get { if (_remembering.Count == 0) { - return (ImmutableDictionary.Empty, ImmutableHashSet.Empty); + return (ImmutableDictionary.Empty, + ImmutableHashSet.Empty); } else { @@ -903,9 +920,11 @@ private void RemoveRefIfThereIsOne(EntityState state) stops.Add(entityId); break; case var state: - throw new IllegalStateException($"{entityId} was in the remembering set but has state {state}"); + throw new IllegalStateException( + $"{entityId} was in the remembering set but has state {state}"); } } + return (starts.ToImmutable(), stops.ToImmutable()); } } @@ -928,49 +947,47 @@ public override string ToString() private readonly string _shardId; private readonly Func _entityProps; private readonly ClusterShardingSettings _settings; - private readonly ExtractEntityId _extractEntityId; - private readonly ExtractShardId _extractShardId; + private readonly IMessageExtractor _extractor; private readonly object _handOffStopMessage; private readonly bool _verboseDebug; - private readonly IActorRef _rememberEntitiesStore; + private readonly IActorRef? _rememberEntitiesStore; private readonly bool _rememberEntities; private readonly Entities _entities; private readonly Dictionary _lastMessageTimestamp = new(); private readonly MessageBufferMap _messageBuffers = new(); - private IActorRef _handOffStopper; - private readonly ICancelable _passivateIdleTask; - private readonly Lease _lease; + private IActorRef? _handOffStopper; + private readonly ICancelable? _passivateIdleTask; + private readonly Lease? _lease; private readonly TimeSpan _leaseRetryInterval = TimeSpan.FromSeconds(5); // won't be used public ILoggingAdapter Log { get; } = Context.GetLogger(); - public IStash Stash { get; set; } - public ITimerScheduler Timers { get; set; } + public IStash Stash { get; set; } = null!; + public ITimerScheduler Timers { get; set; } = null!; public Shard( string typeName, string shardId, Func entityProps, ClusterShardingSettings settings, - ExtractEntityId extractEntityId, - ExtractShardId extractShardId, + IMessageExtractor extractor, object handOffStopMessage, - IRememberEntitiesProvider rememberEntitiesProvider) + IRememberEntitiesProvider? rememberEntitiesProvider) { _typeName = typeName; _shardId = shardId; _entityProps = entityProps; _settings = settings; - _extractEntityId = extractEntityId; - _extractShardId = extractShardId; + _extractor = extractor; _handOffStopMessage = handOffStopMessage; _verboseDebug = Context.System.Settings.Config.GetBoolean("akka.cluster.sharding.verbose-debug-logging"); if (rememberEntitiesProvider != null) { - var store = Context.ActorOf(rememberEntitiesProvider.ShardStoreProps(shardId).WithDeploy(Deploy.Local), "RememberEntitiesStore"); + var store = Context.ActorOf(rememberEntitiesProvider.ShardStoreProps(shardId).WithDeploy(Deploy.Local), + "RememberEntitiesStore"); Context.WatchWith(store, new RememberEntityStoreCrashed(store)); _rememberEntitiesStore = store; } @@ -980,12 +997,14 @@ public Shard( //private val flightRecorder = ShardingFlightRecorder(context.system) var failOnInvalidStateTransition = - Context.System.Settings.Config.GetBoolean("akka.cluster.sharding.fail-on-invalid-entity-state-transition"); + Context.System.Settings.Config.GetBoolean( + "akka.cluster.sharding.fail-on-invalid-entity-state-transition"); _entities = new Entities(Log, settings.RememberEntities, _verboseDebug, failOnInvalidStateTransition); var idleInterval = TimeSpan.FromTicks(_settings.PassivateIdleEntityAfter.Ticks / 2); _passivateIdleTask = _settings.ShouldPassivateIdleEntities - ? Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(idleInterval, idleInterval, Self, PassivateIdleTick.Instance, Self) + ? Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(idleInterval, idleInterval, Self, + PassivateIdleTick.Instance, Self) : null; if (settings.LeaseSettings != null) @@ -1039,7 +1058,8 @@ private void ReleaseLeaseIfNeeded() if (r.IsFaulted || r.IsCanceled) { Log.Error(r.Exception, - "{0}: Failed to release lease of shardId [{1}]. Shard may not be able to run on another node until lease timeout occurs.", _typeName, _shardId); + "{0}: Failed to release lease of shardId [{1}]. Shard may not be able to run on another node until lease timeout occurs.", + _typeName, _shardId); } else if (r.Result) { @@ -1048,7 +1068,8 @@ private void ReleaseLeaseIfNeeded() else { Log.Error( - "{0}: Failed to release lease of shardId [{1}]. Shard may not be able to run on another node until lease timeout occurs.", _typeName, _shardId); + "{0}: Failed to release lease of shardId [{1}]. Shard may not be able to run on another node until lease timeout occurs.", + _typeName, _shardId); } }); } @@ -1082,13 +1103,15 @@ private bool AwaitingLease(object message) return true; case LeaseRetry _: - TryGetLease(_lease); + Debug.Assert(_lease != null, nameof(_lease) + " != null"); + TryGetLease(_lease!); return true; case LeaseLost ll: ReceiveLeaseLost(ll); return true; } + if (_verboseDebug) Log.Debug("{0}: Got msg of type [{1}] from [{2}] while waiting for lease, stashing", _typeName, @@ -1103,10 +1126,7 @@ private void TryGetLease(Lease lease) Log.Info("{0}: Acquiring lease {1}", _typeName, lease.Settings); var self = Self; - lease.Acquire(reason => - { - self.Tell(new LeaseLost(reason)); - }).ContinueWith(r => + lease.Acquire(reason => { self.Tell(new LeaseLost(reason)); }).ContinueWith(r => { if (r.IsFaulted || r.IsCanceled) return new LeaseAcquireResult(false, r.Exception); @@ -1120,7 +1140,8 @@ private void TryLoadRememberedEntities() { if (_rememberEntitiesStore != null) { - Log.Debug("{0}: Waiting for load of entity ids using [{1}] to complete", _typeName, _rememberEntitiesStore); + Log.Debug("{0}: Waiting for load of entity ids using [{1}] to complete", _typeName, + _rememberEntitiesStore); _rememberEntitiesStore.Tell(RememberEntitiesShardStore.GetEntities.Instance); Timers.StartSingleTimer( RememberEntityTimeoutKey, @@ -1144,6 +1165,7 @@ private bool AwaitingRememberedEntities(object message) LoadingEntityIdsFailed(); return true; } + if (_verboseDebug) Log.Debug("{0}: Got msg of type [{1}] from [{2}] while waiting for remember entities, stashing", _typeName, @@ -1156,7 +1178,8 @@ private bool AwaitingRememberedEntities(object message) private void LoadingEntityIdsFailed() { - Log.Error("{0}: Failed to load initial entity ids from remember entities store within [{1}], stopping shard for backoff and restart", + Log.Error( + "{0}: Failed to load initial entity ids from remember entities store within [{1}], stopping shard for backoff and restart", _typeName, _settings.TuningParameters.UpdatingStateTimeout); // parent ShardRegion supervisor will notice that it terminated and will start it again, after backoff @@ -1170,9 +1193,10 @@ private void OnEntitiesRemembered(IImmutableSet ids) _entities.AlreadyRemembered(ids); Log.Debug("{0}: Restarting set of [{1}] entities", _typeName, ids.Count); Context.ActorOf( - RememberEntityStarter.Props(Context.Parent, Self, _shardId, ids, _settings), - "RememberEntitiesStarter"); + RememberEntityStarter.Props(Context.Parent, Self, _shardId, ids, _settings), + "RememberEntitiesStarter"); } + ShardInitialized(); } @@ -1221,18 +1245,26 @@ private bool Idle(object message) case RememberEntityStoreCrashed msg: ReceiveRememberEntityStoreCrashed(msg); return true; - case var msg when _extractEntityId(msg).HasValue: - DeliverMessage(msg, Sender); + default: + var entityId = _extractor.EntityId(message); + if (string.IsNullOrEmpty(entityId)) + { + Log.Warning("{0}: Id must not be empty, dropping message [{1}]", _typeName, + message.GetType().Name); + Context.System.DeadLetters.Tell(new Dropped(message, "No recipient entity id", Sender, Self)); + return true; + } + + DeliverMessage(entityId!, message, Sender); return true; } - return false; } - private void RememberUpdate(IImmutableSet add = null, IImmutableSet remove = null) + private void RememberUpdate(IImmutableSet? add = null, IImmutableSet? remove = null) { - add = add ?? ImmutableHashSet.Empty; - remove = remove ?? ImmutableHashSet.Empty; + add ??= ImmutableHashSet.Empty; + remove ??= ImmutableHashSet.Empty; if (_rememberEntitiesStore == null) OnUpdateDone(add, remove); else @@ -1240,7 +1272,8 @@ private void RememberUpdate(IImmutableSet add = null, IImmutableSet storingStarts, IImmutableSet storingStops) + private void SendToRememberStore(IActorRef store, IImmutableSet storingStarts, + IImmutableSet storingStops) { if (_verboseDebug) Log.Debug("{0}: Remember update [{1}] and stops [{2}] triggered", @@ -1290,7 +1323,8 @@ bool WaitingForRememberEntitiesStore(object message) return true; case RememberEntityTimeout t: Log.Error("{0}: Remember entity store did not respond, restarting shard", _typeName); - throw new InvalidOperationException($"Async write timed out after {_settings.TuningParameters.UpdatingStateTimeout}"); + throw new InvalidOperationException( + $"Async write timed out after {_settings.TuningParameters.UpdatingStateTimeout}"); case ShardRegion.StartEntity se: StartEntity(se.EntityId, Sender); return true; @@ -1325,20 +1359,26 @@ bool WaitingForRememberEntitiesStore(object message) case RememberEntityStoreCrashed msg: ReceiveRememberEntityStoreCrashed(msg); return true; - case var msg when _extractEntityId(msg).HasValue: - DeliverMessage(msg, Sender); - return true; - case var msg: - // shouldn't be any other message types, but just in case - Log.Warning("{0}: Stashing unexpected message [{1}] while waiting for remember entities update of starts [{2}], stops [{3}]", - _typeName, - msg.GetType().Name, - string.Join(", ", update.Started), - string.Join(", ", update.Stopped)); - Stash.Stash(); + default: + var entityId = _extractor.EntityId(message); + if (string.IsNullOrEmpty(entityId)) + { + // shouldn't be any other message types, but just in case + Log.Warning( + "{0}: Stashing unexpected message [{1}] while waiting for remember entities update of starts [{2}], stops [{3}]", + _typeName, + message.GetType().Name, + string.Join(", ", update.Started), + string.Join(", ", update.Stopped)); + Stash.Stash(); + return true; + } + + DeliverMessage(entityId!, message, Sender); return true; } } + return WaitingForRememberEntitiesStore; } @@ -1357,8 +1397,10 @@ private void OnUpdateDone(IImmutableSet starts, IImmutableSet starts, IImmutableSet 0) { - var entityHandOffTimeout = (_settings.TuningParameters.HandOffTimeout - TimeSpan.FromSeconds(5)).Max(TimeSpan.FromSeconds(1)); + var entityHandOffTimeout = + (_settings.TuningParameters.HandOffTimeout - TimeSpan.FromSeconds(5)).Max( + TimeSpan.FromSeconds(1)); Log.Debug("{0}: Starting HandOffStopper for shard [{1}] to terminate [{2}] entities.", _typeName, _shardId, @@ -1551,7 +1605,8 @@ private void HandOff(IActorRef replyTo) foreach (var e in activeEntities) Context.Unwatch(e); _handOffStopper = Context.Watch(Context.ActorOf( - ShardRegion.HandOffStopper.Props(_typeName, _shardId, replyTo, activeEntities, _handOffStopMessage, entityHandOffTimeout), + ShardRegion.HandOffStopper.Props(_typeName, _shardId, replyTo, activeEntities, + _handOffStopMessage, entityHandOffTimeout), "HandOffStopper")); //During hand off we only care about watching for termination of the hand off stopper @@ -1563,6 +1618,7 @@ private void HandOff(IActorRef replyTo) ReceiveTerminated(t.ActorRef); return true; } + return false; }); } @@ -1594,12 +1650,14 @@ private void ReceiveEntityTerminated(IActorRef @ref) { case RememberingStop _: if (_verboseDebug) - Log.Debug("{0}: Stop of [{1}] arrived, already is among the pending stops", _typeName, entityId); + Log.Debug("{0}: Stop of [{1}] arrived, already is among the pending stops", _typeName, + entityId); break; case Active _: if (_rememberEntitiesStore != null) { - Log.Debug("{0}: Entity [{1}] stopped without passivating, will restart after backoff", _typeName, entityId); + Log.Debug("{0}: Entity [{1}] stopped without passivating, will restart after backoff", + _typeName, entityId); _entities.WaitingForRestart(entityId); var msg = new RestartTerminatedEntity(entityId); Timers.StartSingleTimer(msg, msg, _settings.TuningParameters.EntityRestartBackoff); @@ -1609,6 +1667,7 @@ private void ReceiveEntityTerminated(IActorRef @ref) Log.Debug("{0}: Entity [{1}] terminated", _typeName, entityId); _entities.RemoveEntity(entityId); } + break; case Passivating _: @@ -1618,7 +1677,8 @@ private void ReceiveEntityTerminated(IActorRef @ref) { // will go in next batch update if (_verboseDebug) - Log.Debug("{0}: [{1}] terminated after passivating, arrived while updating, adding it to batch of pending stops", + Log.Debug( + "{0}: [{1}] terminated after passivating, arrived while updating, adding it to batch of pending stops", _typeName, entityId); _entities.RememberingStop(entityId); @@ -1634,7 +1694,8 @@ private void ReceiveEntityTerminated(IActorRef @ref) if (_messageBuffers.GetOrEmpty(entityId).NonEmpty) { if (_verboseDebug) - Log.Debug("{0}: [{1}] terminated after passivating, buffered messages found, restarting", + Log.Debug( + "{0}: [{1}] terminated after passivating, buffered messages found, restarting", _typeName, entityId); _entities.RemoveEntity(entityId); @@ -1648,9 +1709,11 @@ private void ReceiveEntityTerminated(IActorRef @ref) _entities.RemoveEntity(entityId); } } + break; case var unexpected: - Log.Warning("{0}: Got a terminated for [{1}], entityId [{2}] which is in unexpected state [{3}]", + Log.Warning( + "{0}: Got a terminated for [{1}], entityId [{2}] which is in unexpected state [{3}]", _typeName, _entities.Entity(entityId), entityId, @@ -1677,7 +1740,8 @@ private void Passivate(IActorRef entity, object stopMessage) } else if (_messageBuffers.GetOrEmpty(entityId).NonEmpty) { - Log.Debug("{0}: Passivation when there are buffered messages for [{2}], ignoring passivation", _typeName, entityId); + Log.Debug("{0}: Passivation when there are buffered messages for [{2}], ignoring passivation", + _typeName, entityId); } else { @@ -1690,7 +1754,8 @@ private void Passivate(IActorRef entity, object stopMessage) } else { - Log.Debug("{0}: Unknown entity passivating [{1}]. Not sending stopMessage back to entity", _typeName, entity); + Log.Debug("{0}: Unknown entity passivating [{1}]. Not sending stopMessage back to entity", _typeName, + entity); } } @@ -1706,12 +1771,15 @@ private void PassivateIdleEntities() { var deadline = DateTime.UtcNow - _settings.PassivateIdleEntityAfter; - var refsToPassivate = _lastMessageTimestamp.Where(i => i.Value < deadline).Select(i => _entities.Entity(i.Key)).Where(i => i != null).ToList(); + var refsToPassivate = _lastMessageTimestamp + .Where(i => i.Value < deadline) + .Select(i => _entities.Entity(i.Key)) + .Where(i => i != null).ToList(); if (refsToPassivate.Count > 0) { Log.Debug("{0}: Passivating [{1}] idle entities", _typeName, refsToPassivate.Count); foreach (var r in refsToPassivate) - Passivate(r, _handOffStopMessage); + Passivate(r!, _handOffStopMessage); } } @@ -1725,7 +1793,8 @@ private void PassivateCompleted(EntityId entityId) _entities.RemoveEntity(entityId); if (hasBufferedMessages) { - Log.Debug("{0}: Entity stopped after passivation [{1}], but will be started again due to buffered messages", + Log.Debug( + "{0}: Entity stopped after passivation [{1}], but will be started again due to buffered messages", _typeName, entityId); //flightRecorder.entityPassivateRestart(entityId) @@ -1748,102 +1817,95 @@ private void PassivateCompleted(EntityId entityId) } } - private void DeliverMessage(object msg, IActorRef snd) + private void DeliverMessage(string entityId, object msg, IActorRef snd) { - var t = _extractEntityId(msg); - var entityId = t.Value.Item1; - var payload = t.Value.Item2; + var payload = _extractor.EntityMessage(msg)!; // payload can't be null unless dev really screwed up - if (string.IsNullOrEmpty(entityId)) - { - Log.Warning("{0}: Id must not be empty, dropping message [{1}]", _typeName, msg.GetType().Name); - Context.System.DeadLetters.Tell(new Dropped(msg, "No recipient entity id", snd, Self)); - } - else + if (payload is ShardRegion.StartEntity start) { - switch (payload) + // Handling StartEntity both here and in the receives allows for sending it both as is and in an envelope + // to be extracted by the entity id messageExtractor. + + // we can only start a new entity if we are not currently waiting for another write + if (_entities.PendingRememberedEntitiesExist) + { + if (_verboseDebug) + Log.Debug("{0}: StartEntity({1}) from [{2}], adding to batch", _typeName, + start.EntityId, snd); + _entities.RememberingStart(entityId, ackTo: snd); + } + else { - case ShardRegion.StartEntity start: - // Handling StartEntity both here and in the receives allows for sending it both as is and in an envelope - // to be extracted by the entity id extractor. + if (_verboseDebug) + Log.Debug("{0}: StartEntity({1}) from [{2}], starting", _typeName, start.EntityId, snd); + StartEntity(start.EntityId, snd); + } - // we can only start a new entity if we are not currently waiting for another write + return; + } + + switch (_entities.EntityState(entityId)) + { + case Active a: + if (_verboseDebug) + Log.Debug("{0}: Delivering message of type [{1}] to [{2}]", _typeName, + payload.GetType().Name, entityId); + TouchLastMessageTimestamp(entityId); + a.Ref.Tell(payload, snd); + break; + case RememberingStart: + case RememberingStop: + case Passivating: + AppendToMessageBuffer(entityId, msg, snd); + break; + case { } state and (WaitingForRestart or RememberedButNotCreated): + if (_verboseDebug) + Log.Debug("{0}: Delivering message of type [{1}] to [{2}] (starting because [{3}])", + _typeName, + payload.GetType().Name, + entityId, + state); + var actor = GetOrCreateEntity(entityId); + TouchLastMessageTimestamp(entityId); + actor.Tell(payload, snd); + break; + case NoState: + if (!_rememberEntities) + { + // don't buffer if remember entities not enabled + GetOrCreateEntity(entityId).Tell(payload, snd); + TouchLastMessageTimestamp(entityId); + } + else + { if (_entities.PendingRememberedEntitiesExist) { + // No actor running and write in progress for some other entity id (can only happen with remember entities enabled) if (_verboseDebug) - Log.Debug("{0}: StartEntity({1}) from [{2}], adding to batch", _typeName, start.EntityId, snd); - _entities.RememberingStart(entityId, ackTo: snd); + Log.Debug( + "{0}: Buffer message [{1}] to [{2}] (which is not started) because of write in progress for [{3}]", + _typeName, + payload.GetType().Name, + entityId, + _entities.PendingRememberEntities); + AppendToMessageBuffer(entityId, msg, snd); + _entities.RememberingStart(entityId, ackTo: null); } else { + // No actor running and no write in progress, start actor and deliver message when started if (_verboseDebug) - Log.Debug("{0}: StartEntity({1}) from [{2}], starting", _typeName, start.EntityId, snd); - StartEntity(start.EntityId, snd); + Log.Debug("{0}: Buffering message [{1}] to [{2}] and starting actor", + _typeName, + payload.GetType().Name, + entityId); + AppendToMessageBuffer(entityId, msg, snd); + _entities.RememberingStart(entityId, ackTo: null); + RememberUpdate(add: ImmutableHashSet.Create(entityId)); } - break; - case var _: - switch (_entities.EntityState(entityId)) - { - case Active a: - if (_verboseDebug) - Log.Debug("{0}: Delivering message of type [{1}] to [{2}]", _typeName, payload.GetType().Name, entityId); - TouchLastMessageTimestamp(entityId); - a.Ref.Tell(payload, snd); - break; - case RememberingStart _: - case RememberingStop _: - case Passivating _: - AppendToMessageBuffer(entityId, msg, snd); - break; - case EntityState state and (WaitingForRestart or RememberedButNotCreated): - if (_verboseDebug) - Log.Debug("{0}: Delivering message of type [{1}] to [{2}] (starting because [{3}])", - _typeName, - payload.GetType().Name, - entityId, - state); - var actor = GetOrCreateEntity(entityId); - TouchLastMessageTimestamp(entityId); - actor.Tell(payload, snd); - break; - case NoState _: - if (!_rememberEntities) - { - // don't buffer if remember entities not enabled - GetOrCreateEntity(entityId).Tell(payload, snd); - TouchLastMessageTimestamp(entityId); - } - else - { - if (_entities.PendingRememberedEntitiesExist) - { - // No actor running and write in progress for some other entity id (can only happen with remember entities enabled) - if (_verboseDebug) - Log.Debug("{0}: Buffer message [{1}] to [{2}] (which is not started) because of write in progress for [{3}]", - _typeName, - payload.GetType().Name, - entityId, - _entities.PendingRememberEntities); - AppendToMessageBuffer(entityId, msg, snd); - _entities.RememberingStart(entityId, ackTo: null); - } - else - { - // No actor running and no write in progress, start actor and deliver message when started - if (_verboseDebug) - Log.Debug("{0}: Buffering message [{1}] to [{2}] and starting actor", - _typeName, - payload.GetType().Name, - entityId); - AppendToMessageBuffer(entityId, msg, snd); - _entities.RememberingStart(entityId, ackTo: null); - RememberUpdate(add: ImmutableHashSet.Create(entityId)); - } - } - break; - } - break; - } + } + + break; } } @@ -1887,7 +1949,8 @@ private void AppendToMessageBuffer(EntityId id, object msg, IActorRef snd) else { if (Log.IsDebugEnabled) - Log.Debug("{0}: Message of type [{1}] for entity [{2}] buffered", _typeName, msg.GetType().Name, id); + Log.Debug("{0}: Message of type [{1}] for entity [{2}] buffered", _typeName, msg.GetType().Name, + id); _messageBuffers.Append(id, msg, snd); } } @@ -1905,16 +1968,18 @@ private void SendMsgBuffer(EntityId entityId) if (messages.NonEmpty) { GetOrCreateEntity(entityId); - Log.Debug("{0}: Sending message buffer for entity [{1}] ([{2}] messages)", _typeName, entityId, messages.Count); + Log.Debug("{0}: Sending message buffer for entity [{1}] ([{2}] messages)", _typeName, entityId, + messages.Count); // Now there is no deliveryBuffer we can try to redeliver // and as the child exists, the message will be directly forwarded - foreach (var (Message, Ref) in messages) + foreach (var (message, @ref) in messages) { - if (Message is ShardRegion.StartEntity se) - StartEntity(se.EntityId, Ref); + if (message is ShardRegion.StartEntity se) + StartEntity(se.EntityId, @ref); else - DeliverMessage(Message, Ref); + DeliverMessage(entityId, message, @ref); } + TouchLastMessageTimestamp(entityId); } } @@ -1924,7 +1989,8 @@ private void DropBufferFor(EntityId entityId, string reason) var count = _messageBuffers.Drop(entityId, reason, Context.System.DeadLetters); if (Log.IsDebugEnabled && count > 0) { - Log.Debug("{0}: Dropping [{1}] buffered messages for [{2}] because {3}", _typeName, count, entityId, reason); + Log.Debug("{0}: Dropping [{1}] buffered messages for [{2}] because {3}", _typeName, count, entityId, + reason); } } @@ -1940,4 +2006,4 @@ protected override void PostStop() Log.Debug("{0}: Shard [{1}] shutting down", _typeName, _shardId); } } -} +} \ No newline at end of file diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs index a0b3bc2f255..5cfce901080 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs @@ -5,6 +5,7 @@ // //----------------------------------------------------------------------- +#nullable enable using System; using System.Collections.Generic; using System.Collections.Immutable; @@ -48,7 +49,10 @@ internal sealed class Retry : IShardRegionCommand, INoSerializationVerificationN /// TBD /// public static readonly Retry Instance = new(); - private Retry() { } + + private Retry() + { + } } /// @@ -69,7 +73,10 @@ internal sealed class RegisterRetry : IShardRegionCommand /// TBD /// public static readonly RegisterRetry Instance = new(); - private RegisterRetry() { } + + private RegisterRetry() + { + } } /// @@ -83,6 +90,7 @@ internal sealed class RestartShard /// TBD /// public readonly ShardId ShardId; + /// /// TBD /// @@ -96,7 +104,7 @@ public RestartShard(ShardId shardId) /// /// When remembering entities and a shard is started, each entity id that needs to /// be running will trigger this message being sent through sharding. For this to work - /// the message *must* be handled by the shard id extractor. + /// the message *must* be handled by the shard id messageExtractor. /// [Serializable] public sealed class StartEntity : IClusterShardingSerializable, IEquatable @@ -118,13 +126,12 @@ public StartEntity(EntityId entityId) #region Equals - - public override bool Equals(object obj) + public override bool Equals(object? obj) { return Equals(obj as StartEntity); } - public bool Equals(StartEntity other) + public bool Equals(StartEntity? other) { if (ReferenceEquals(other, null)) return false; if (ReferenceEquals(other, this)) return true; @@ -132,12 +139,12 @@ public bool Equals(StartEntity other) return EntityId.Equals(other.EntityId); } - + public override int GetHashCode() { return EntityId.GetHashCode(); } - + public override string ToString() => $"StartEntity({EntityId})"; #endregion @@ -148,7 +155,8 @@ public override int GetHashCode() /// to start(it does not guarantee the entity successfully started) /// [Serializable] - internal sealed class StartEntityAck : IClusterShardingSerializable, IDeadLetterSuppression, IEquatable + internal sealed class StartEntityAck : IClusterShardingSerializable, IDeadLetterSuppression, + IEquatable { /// /// An identifier of a newly started entity. Unique in scope of a given shard. @@ -174,22 +182,21 @@ public StartEntityAck(EntityId entityId, ShardId shardId) #region Equals - - public override bool Equals(object obj) + public override bool Equals(object? obj) { return Equals(obj as StartEntityAck); } - public bool Equals(StartEntityAck other) + public bool Equals(StartEntityAck? other) { if (ReferenceEquals(other, null)) return false; if (ReferenceEquals(other, this)) return true; return EntityId.Equals(other.EntityId) - && ShardId.Equals(other.ShardId); + && ShardId.Equals(other.ShardId); } - + public override int GetHashCode() { unchecked @@ -235,13 +242,9 @@ private StopTimeoutWarning() private static readonly TimeSpan StopTimeoutWarningAfter = TimeSpan.FromSeconds(5); - private ILoggingAdapter _log; - /// - /// TBD - /// - public ILoggingAdapter Log { get { return _log ??= Context.GetLogger(); } } + public ILoggingAdapter Log { get; } = Context.GetLogger(); - public ITimerScheduler Timers { get; set; } + public ITimerScheduler Timers { get; set; } = null!; /// /// TBD @@ -261,7 +264,8 @@ public static Props Props( object stopMessage, TimeSpan handoffTimeout) { - return Actor.Props.Create(() => new HandOffStopper(typeName, shard, replyTo, entities, stopMessage, handoffTimeout)) + return Actor.Props.Create(() => + new HandOffStopper(typeName, shard, replyTo, entities, stopMessage, handoffTimeout)) .WithDeploy(Deploy.Local); } @@ -304,13 +308,14 @@ public HandOffStopper( shard, StopTimeoutWarningAfter, stopMessage.GetType(), - (CoordinatedShutdown.Get(Context.System).ShutdownReason != null) ? - "" // the region will be shutdown earlier so would be confusing to say more + (CoordinatedShutdown.Get(Context.System).ShutdownReason != null) + ? "" // the region will be shutdown earlier so would be confusing to say more : $"Waiting additional [{handoffTimeout}] before stopping the remaining entities."); }); Receive(_ => { - Log.Warning("{0}: HandOffStopMessage[{1}] is not handled by some of the entities in shard [{2}] after [{3}], " + + Log.Warning( + "{0}: HandOffStopMessage[{1}] is not handled by some of the entities in shard [{2}] after [{3}], " + "stopping the remaining [{4}] entities.", typeName, stopMessage.GetType().Name, shard, handoffTimeout, remaining.Count); @@ -318,7 +323,8 @@ public HandOffStopper( Context.Stop(r); }); - Timers.StartSingleTimer(StopTimeoutWarning.Instance, StopTimeoutWarning.Instance, StopTimeoutWarningAfter); + Timers.StartSingleTimer(StopTimeoutWarning.Instance, StopTimeoutWarning.Instance, + StopTimeoutWarningAfter); Timers.StartSingleTimer(StopTimeout.Instance, StopTimeout.Instance, handoffTimeout); foreach (var aref in entities) @@ -336,8 +342,7 @@ public HandOffStopper( /// TBD /// TBD /// TBD - /// TBD - /// TBD + /// /// TBD /// TBD /// TBD @@ -346,20 +351,18 @@ internal static Props Props( Func entityProps, ClusterShardingSettings settings, string coordinatorPath, - ExtractEntityId extractEntityId, - ExtractShardId extractShardId, + IMessageExtractor messageExtractor, object handOffStopMessage, IRememberEntitiesProvider rememberEntitiesProvider) { return Actor.Props.Create(() => new ShardRegion( - typeName, - entityProps, - settings, - coordinatorPath, - extractEntityId, - extractShardId, - handOffStopMessage, - rememberEntitiesProvider)) + typeName, + entityProps, + settings, + coordinatorPath, + messageExtractor, + handOffStopMessage, + rememberEntitiesProvider)) .WithDeploy(Deploy.Local); } @@ -369,37 +372,33 @@ internal static Props Props( /// TBD /// TBD /// TBD - /// TBD - /// TBD + /// /// TBD internal static Props ProxyProps( string typeName, ClusterShardingSettings settings, string coordinatorPath, - ExtractEntityId extractEntityId, - ExtractShardId extractShardId) + IMessageExtractor messageExtractor) { return Actor.Props.Create(() => new ShardRegion( - typeName, - null, - settings, - coordinatorPath, - extractEntityId, - extractShardId, - PoisonPill.Instance, - null)) + typeName, + null, + settings, + coordinatorPath, + messageExtractor, + PoisonPill.Instance, + null)) .WithDeploy(Deploy.Local); } private readonly string _typeName; - private readonly Func _entityProps; + private readonly Func? _entityProps; private readonly ClusterShardingSettings _settings; private readonly string _coordinatorPath; - private readonly ExtractEntityId _extractEntityId; - private readonly ExtractShardId _extractShardId; + private readonly IMessageExtractor _messageExtractor; private readonly object _handOffStopMessage; - private readonly IRememberEntitiesProvider _rememberEntitiesProvider; + private readonly IRememberEntitiesProvider? _rememberEntitiesProvider; private readonly bool _verboseDebug; private readonly Cluster _cluster = Cluster.Get(Context.System); private readonly ILoggingAdapter _log = Context.GetLogger(); @@ -407,9 +406,12 @@ internal static Props ProxyProps( private IImmutableSet _membersByAge = ImmutableSortedSet.Empty.WithComparer(Member.AgeOrdering); // membersByAge contains members with these status - private static readonly ImmutableHashSet MemberStatusOfInterest = ImmutableHashSet.Create(MemberStatus.Up, MemberStatus.Leaving, MemberStatus.Exiting); + private static readonly ImmutableHashSet MemberStatusOfInterest = + ImmutableHashSet.Create(MemberStatus.Up, MemberStatus.Leaving, MemberStatus.Exiting); + + private IImmutableDictionary> _regions = + ImmutableDictionary>.Empty; - private IImmutableDictionary> _regions = ImmutableDictionary>.Empty; private IImmutableDictionary _regionByShard = ImmutableDictionary.Empty; private readonly MessageBufferMap _shardBuffers = new(); private IImmutableDictionary _shards = ImmutableDictionary.Empty; @@ -417,7 +419,7 @@ internal static Props ProxyProps( private IImmutableSet _startingShards = ImmutableHashSet.Empty; private IImmutableSet _handingOff = ImmutableHashSet.Empty; - private IActorRef _coordinator; + private IActorRef? _coordinator; private int _retryCount; private readonly TimeSpan _retryInterval; private readonly TimeSpan _initRegistrationDelay; @@ -436,26 +438,23 @@ internal static Props ProxyProps( /// TBD /// TBD /// TBD - /// TBD - /// TBD + /// /// TBD /// TBD public ShardRegion( string typeName, - Func entityProps, + Func? entityProps, ClusterShardingSettings settings, string coordinatorPath, - ExtractEntityId extractEntityId, - ExtractShardId extractShardId, + IMessageExtractor messageExtractor, object handOffStopMessage, - IRememberEntitiesProvider rememberEntitiesProvider) + IRememberEntitiesProvider? rememberEntitiesProvider) { _typeName = typeName; _entityProps = entityProps; _settings = settings; _coordinatorPath = coordinatorPath; - _extractEntityId = extractEntityId; - _extractShardId = extractShardId; + _messageExtractor = new ExtractorAdapter(messageExtractor); _handOffStopMessage = handOffStopMessage; _rememberEntitiesProvider = rememberEntitiesProvider; @@ -485,7 +484,7 @@ private void SetupCoordinatedShutdown() }); } - public ITimerScheduler Timers { get; set; } + public ITimerScheduler Timers { get; set; } = null!; /// /// When leaving the coordinator singleton is started rather quickly on next @@ -554,7 +553,8 @@ private void LogPassivateIdleEntities() _settings.PassivateIdleEntityAfter); if (_settings.RememberEntities) - _log.Debug("{0}: Idle entities will not be passivated because 'rememberEntities' is enabled.", _typeName); + _log.Debug("{0}: Idle entities will not be passivated because 'rememberEntities' is enabled.", + _typeName); } private bool MatchingRole(Member member) @@ -585,7 +585,6 @@ protected override bool Receive(object message) { switch (message) { - case Terminated t: HandleTerminated(t); return true; @@ -607,18 +606,23 @@ protected override bool Receive(object message) case IShardRegionQuery srq: HandleShardRegionQuery(srq); return true; - case RestartShard _: - DeliverMessage(message, Sender); + case RestartShard restart: + DeliverRestartShard(restart, Sender); return true; - case StartEntity _: - DeliverStartEntity(message, Sender); - return true; - case var _ when _extractEntityId(message).HasValue: - DeliverMessage(message, Sender); + case StartEntity se: + DeliverMessage(se.EntityId, message, Sender); return true; default: - _log.Warning("{0}: Message does not have an extractor defined in shard so it was ignored: {1}", _typeName, message); - return false; + var entityId = _messageExtractor.EntityId(message); + if (entityId is null) + { + _log.Warning("{0}: Message does not have an extractor defined in shard so it was ignored: {1}", + _typeName, message); + return false; + } + + DeliverMessage(entityId, message, Sender); + return true; } } @@ -682,10 +686,10 @@ private void Register() else if (_log.IsDebugEnabled) { _log.Debug( - "{0}: Trying to register to coordinator at [{1}], but no acknowledgement. No buffered messages yet. [{2}]", - _typeName, - string.Join(", ", actorSelections.Select(i => i.PathString)), - coordinatorMessage); + "{0}: Trying to register to coordinator at [{1}], but no acknowledgement. No buffered messages yet. [{2}]", + _typeName, + string.Join(", ", actorSelections.Select(i => i.PathString)), + coordinatorMessage); } } else @@ -713,92 +717,80 @@ private void Register() } } - private void DeliverStartEntity(object message, IActorRef sender) + private void DeliverMessage(string entityId, object message, IActorRef sender) { - try - { - DeliverMessage(message, sender); - } - catch (Exception ex) + var shardId = _messageExtractor.ShardId(entityId, message); + if (_regionByShard.TryGetValue(shardId!, out var region)) { - //case ex: MatchError ⇒ - _log.Error(ex, "{0}: When using remember-entities the shard id extractor must handle ShardRegion.StartEntity(id).", _typeName); + if (region.Equals(Self)) + { + var sref = GetShard(shardId); + if (Equals(sref, ActorRefs.Nobody)) + BufferMessage(shardId, message, sender); + else + { + if (_shardBuffers.Contains(shardId)) + { + // Since now messages to a shard is buffered then those messages must be in right order + BufferMessage(shardId, message, sender); + DeliverBufferedMessages(shardId, sref); + } + else + sref.Tell(message, sender); + } + } + else + { + if (_verboseDebug) + _log.Debug("{0}: Forwarding request for shard [{1}] to [{2}]", _typeName, shardId, region); + region.Tell(message, sender); + } } - } - - private void DeliverMessage(object message, IActorRef sender) - { - if (message is RestartShard restart) + else { - var shardId = restart.ShardId; - if (_regionByShard.TryGetValue(shardId, out var regionRef)) + if (string.IsNullOrEmpty(shardId)) { - if (Self.Equals(regionRef)) - GetShard(shardId); + _log.Warning("{0}: Shard must not be empty, dropping message [{1}]", _typeName, + message.GetType().Name); + Context.System.DeadLetters.Tell(message); } else { if (!_shardBuffers.Contains(shardId)) { - _log.Debug("{0}: Request shard [{1}] home. Coordinator [{2}]", _typeName, shardId, _coordinator); + _log.Debug("{0}: Request shard [{1}] home. Coordinator [{2}]", _typeName, shardId, + _coordinator); _coordinator?.Tell(new ShardCoordinator.GetShardHome(shardId)); } - var buffer = _shardBuffers.GetOrEmpty(shardId); - _log.Debug("{0}: Buffer message for shard [{1}]. Total [{2}] buffered messages.", - _typeName, - shardId, - buffer.Count + 1); - _shardBuffers.Append(shardId, message, sender); + BufferMessage(shardId, message, sender); } } + } + + private void DeliverRestartShard(RestartShard restart, IActorRef sender) + { + var shardId = restart.ShardId; + if (_regionByShard.TryGetValue(shardId, out var regionRef)) + { + if (Self.Equals(regionRef)) + GetShard(shardId); + } else { - var shardId = _extractShardId(message); - if (_regionByShard.TryGetValue(shardId, out var region)) + if (!_shardBuffers.Contains(shardId)) { - if (region.Equals(Self)) - { - var sref = GetShard(shardId); - if (Equals(sref, ActorRefs.Nobody)) - BufferMessage(shardId, message, sender); - else - { - if (_shardBuffers.Contains(shardId)) - { - // Since now messages to a shard is buffered then those messages must be in right order - BufferMessage(shardId, message, sender); - DeliverBufferedMessages(shardId, sref); - } - else - sref.Tell(message, sender); - } - } - else - { - if (_verboseDebug) - _log.Debug("{0}: Forwarding request for shard [{1}] to [{2}]", _typeName, shardId, region); - region.Tell(message, sender); - } + _log.Debug("{0}: Request shard [{1}] home. Coordinator [{2}]", _typeName, shardId, _coordinator); + _coordinator?.Tell(new ShardCoordinator.GetShardHome(shardId)); } - else - { - if (string.IsNullOrEmpty(shardId)) - { - _log.Warning("{0}: Shard must not be empty, dropping message [{1}]", _typeName, message.GetType().Name); - Context.System.DeadLetters.Tell(message); - } - else - { - if (!_shardBuffers.Contains(shardId)) - { - _log.Debug("{0}: Request shard [{1}] home. Coordinator [{2}]", _typeName, shardId, _coordinator); - _coordinator?.Tell(new ShardCoordinator.GetShardHome(shardId)); - } - BufferMessage(shardId, message, sender); - } - } + var buffer = _shardBuffers.GetOrEmpty(shardId); + + _log.Debug("{0}: Buffer message for shard [{1}]. Total [{2}] buffered messages.", + _typeName, + shardId, + buffer.Count + 1); + _shardBuffers.Append(shardId, restart, sender); } } @@ -828,7 +820,10 @@ private void BufferMessage(ShardId shardId, Msg message, IActorRef sender) { const string logMsg = "{0}: ShardRegion is using [{1} %] of its buffer capacity."; if (total > bufferSize / 2) - _log.Warning(logMsg + " The coordinator might not be available. You might want to check cluster membership status.", _typeName, 100 * total / bufferSize); + _log.Warning( + logMsg + + " The coordinator might not be available. You might want to check cluster membership status.", + _typeName, 100 * total / bufferSize); else _log.Info(logMsg, _typeName, 100 * total / bufferSize); } @@ -868,6 +863,7 @@ private void HandleShardRegionCommand(IShardRegionCommand command) Register(); ScheduleNextRegistration(); } + break; case GracefulShutdown _: @@ -877,10 +873,12 @@ private void HandleShardRegionCommand(IShardRegionCommand command) if (coordShutdown.ShutdownReason != null) { // use a shorter timeout than the coordinated shutdown phase to be able to log better reason for the timeout - var timeout = coordShutdown.Timeout(CoordinatedShutdown.PhaseClusterShardingShutdownRegion) - TimeSpan.FromSeconds(1); + var timeout = coordShutdown.Timeout(CoordinatedShutdown.PhaseClusterShardingShutdownRegion) - + TimeSpan.FromSeconds(1); if (timeout > TimeSpan.Zero) { - Timers.StartSingleTimer(GracefulShutdownTimeout.Instance, GracefulShutdownTimeout.Instance, timeout); + Timers.StartSingleTimer(GracefulShutdownTimeout.Instance, GracefulShutdownTimeout.Instance, + timeout); } } @@ -952,7 +950,8 @@ Address GetNodeAddress(IActorRef shardOrRegionRef) try { var entityId = getEntityLocation.EntityId; - var shardId = _extractShardId(new StartEntity(getEntityLocation.EntityId)); + var shardId = _messageExtractor.ShardId(getEntityLocation.EntityId, + new StartEntity(getEntityLocation.EntityId)); if (string.IsNullOrEmpty(shardId)) { // unsupported entityId - could only happen in highly customized extractors @@ -960,7 +959,7 @@ Address GetNodeAddress(IActorRef shardOrRegionRef) Option.None)); return; } - + async Task ResolveEntityRef(Address destinationAddress, ActorPath entityPath) { // now we just need to check to see if an entity ref exists @@ -993,15 +992,17 @@ async Task ResolveEntityRef(Address destinationAddress, ActorPath entityPath) // NOTE: in the event that we're querying a shard's location from a ShardRegionProxy // the shard may not be technically "homed" inside the proxy, but it does exist. #pragma warning disable CS4014 - ResolveEntityRef(GetNodeAddress(shardRegionRef), shardRegionRef.Path / shardId / entityId); // needs to run as a detached task + ResolveEntityRef(GetNodeAddress(shardRegionRef), + shardRegionRef.Path / shardId / entityId); // needs to run as a detached task #pragma warning restore CS4014 } return; } - + #pragma warning disable CS4014 - ResolveEntityRef(GetNodeAddress(shardActorRef), shardActorRef.Path / entityId); // needs to run as a detached task + ResolveEntityRef(GetNodeAddress(shardActorRef), + shardActorRef.Path / entityId); // needs to run as a detached task #pragma warning restore CS4014 } catch (Exception ex) @@ -1022,7 +1023,8 @@ private void ReplyToRegionStateQuery(IActorRef sender) .ContinueWith(qr => { return new CurrentShardRegionState( - qr.Result.Responses.Select(state => new ShardState(state.ShardId, state.EntityIds)).ToImmutableHashSet(), + qr.Result.Responses.Select(state => new ShardState(state.ShardId, state.EntityIds)) + .ToImmutableHashSet(), qr.Result.Failed); }, TaskContinuationOptions.ExecuteSynchronously).PipeTo(sender); } @@ -1057,10 +1059,12 @@ private Task> QueryShardsAsync(object message) if (timeout == TimeSpan.Zero) { //simulate timeout - return Task.FromResult(new ShardsQueryResult(_shards.Keys.ToImmutableHashSet(), ImmutableList.Empty, _shards.Count, timeout)); + return Task.FromResult(new ShardsQueryResult(_shards.Keys.ToImmutableHashSet(), + ImmutableList.Empty, _shards.Count, timeout)); } - var tasks = _shards.Select(entity => (Entity: entity.Key, Task: entity.Value.Ask(message, timeout))).ToImmutableList(); + var tasks = _shards.Select(entity => (Entity: entity.Key, Task: entity.Value.Ask(message, timeout))) + .ToImmutableList(); return Task.WhenAll(tasks.Select(i => i.Task)).ContinueWith(_ => { var qr = ShardsQueryResult.Create(tasks, _shards.Count, timeout); @@ -1075,7 +1079,7 @@ private void TryCompleteGracefulShutdownIfInProgress() if (_gracefulShutdownInProgress && _shards.Count == 0 && _shardBuffers.IsEmpty) { _log.Debug("{0}: Completed graceful shutdown of region.", _typeName); - Context.Stop(Self); // all shards have been rebalanced, complete graceful shutdown + Context.Stop(Self); // all shards have been rebalanced, complete graceful shutdown } } @@ -1084,7 +1088,8 @@ private void SendGracefulShutdownToCoordinatorIfInProgress() if (_gracefulShutdownInProgress) { var actorSelections = CoordinatorSelection; - _log.Debug("{0}: Sending graceful shutdown to [{1}]", _typeName, string.Join(", ", actorSelections.Select(i => $"({i})"))); + _log.Debug("{0}: Sending graceful shutdown to [{1}]", _typeName, + string.Join(", ", actorSelections.Select(i => $"({i})"))); actorSelections.ForEach(c => c.Tell(new ShardCoordinator.GracefulShutdownRequest(Self))); } } @@ -1094,29 +1099,30 @@ private void HandleCoordinatorMessage(ShardCoordinator.ICoordinatorMessage messa switch (message) { case ShardCoordinator.HostShard hs: + { + if (_gracefulShutdownInProgress) { - if (_gracefulShutdownInProgress) - { - _log.Debug("{0}: Ignoring Host Shard request for [{1}] as region is shutting down", _typeName, hs.Shard); + _log.Debug("{0}: Ignoring Host Shard request for [{1}] as region is shutting down", _typeName, + hs.Shard); - // if the coordinator is sending HostShard to a region that is shutting down - // it means that it missed the shutting down message (coordinator moved?) - // we want to inform it as soon as possible so it doesn't keep trying to allocate the shard here - SendGracefulShutdownToCoordinatorIfInProgress(); - } - else - { - var shard = hs.Shard; - _log.Debug("{0}: Host shard [{1}]", _typeName, shard); - _regionByShard = _regionByShard.SetItem(shard, Self); - UpdateRegionShards(Self, shard); + // if the coordinator is sending HostShard to a region that is shutting down + // it means that it missed the shutting down message (coordinator moved?) + // we want to inform it as soon as possible so it doesn't keep trying to allocate the shard here + SendGracefulShutdownToCoordinatorIfInProgress(); + } + else + { + var shard = hs.Shard; + _log.Debug("{0}: Host shard [{1}]", _typeName, shard); + _regionByShard = _regionByShard.SetItem(shard, Self); + UpdateRegionShards(Self, shard); - // Start the shard, if already started this does nothing - GetShard(shard); + // Start the shard, if already started this does nothing + GetShard(shard); - Sender.Tell(new ShardCoordinator.ShardStarted(shard)); - } + Sender.Tell(new ShardCoordinator.ShardStarted(shard)); } + } break; case ShardCoordinator.ShardHome home: _log.Debug("{0}: Shard [{1}] located at [{2}]", _typeName, home.Shard, home.Ref); @@ -1126,7 +1132,8 @@ private void HandleCoordinatorMessage(ShardCoordinator.ICoordinatorMessage messa if (region.Equals(Self) && !home.Ref.Equals(Self)) { // should not happen, inconsistency between ShardRegion and ShardCoordinator - throw new IllegalStateException($"{_typeName}: Unexpected change of shard [{home.Shard}] from self to [{home.Ref}]"); + throw new IllegalStateException( + $"{_typeName}: Unexpected change of shard [{home.Shard}] from self to [{home.Ref}]"); } } @@ -1144,6 +1151,7 @@ private void HandleCoordinatorMessage(ShardCoordinator.ICoordinatorMessage messa } else DeliverBufferedMessages(home.Shard, home.Ref); + break; case ShardCoordinator.RegisterAck ra: _coordinator = ra.Coordinator; @@ -1152,54 +1160,56 @@ private void HandleCoordinatorMessage(ShardCoordinator.ICoordinatorMessage messa TryRequestShardBufferHomes(); break; case ShardCoordinator.BeginHandOff bho: + { + var shard = bho.Shard; + _log.Debug("{0}: BeginHandOff shard [{1}]", _typeName, shard); + if (_regionByShard.TryGetValue(shard, out var regionRef)) { - var shard = bho.Shard; - _log.Debug("{0}: BeginHandOff shard [{1}]", _typeName, shard); - if (_regionByShard.TryGetValue(shard, out var regionRef)) - { - if (!_regions.TryGetValue(regionRef, out var updatedShards)) - updatedShards = ImmutableHashSet.Empty; - - updatedShards = updatedShards.Remove(shard); + if (!_regions.TryGetValue(regionRef, out var updatedShards)) + updatedShards = ImmutableHashSet.Empty; - _regions = updatedShards.Count == 0 - ? _regions.Remove(regionRef) - : _regions.SetItem(regionRef, updatedShards); + updatedShards = updatedShards.Remove(shard); - _regionByShard = _regionByShard.Remove(shard); - } + _regions = updatedShards.Count == 0 + ? _regions.Remove(regionRef) + : _regions.SetItem(regionRef, updatedShards); - Sender.Tell(new ShardCoordinator.BeginHandOffAck(shard)); + _regionByShard = _regionByShard.Remove(shard); } + + Sender.Tell(new ShardCoordinator.BeginHandOffAck(shard)); + } break; case ShardCoordinator.HandOff ho: - { - var shard = ho.Shard; - _log.Debug("{0}: HandOff shard [{1}]", _typeName, shard); + { + var shard = ho.Shard; + _log.Debug("{0}: HandOff shard [{1}]", _typeName, shard); - // must drop requests that came in between the BeginHandOff and now, - // because they might be forwarded from other regions and there - // is a risk or message re-ordering otherwise - if (_shardBuffers.Contains(shard)) - { - var dropped = _shardBuffers - .Drop(shard, "Avoiding reordering of buffered messages at shard handoff", Context.System.DeadLetters); - if (dropped > 0) - _log.Warning("{0}: Dropping [{1}] buffered messages to shard [{2}] during hand off to avoid re-ordering", - _typeName, - dropped, - shard); - _loggedFullBufferWarning = false; - } + // must drop requests that came in between the BeginHandOff and now, + // because they might be forwarded from other regions and there + // is a risk or message re-ordering otherwise + if (_shardBuffers.Contains(shard)) + { + var dropped = _shardBuffers + .Drop(shard, "Avoiding reordering of buffered messages at shard handoff", + Context.System.DeadLetters); + if (dropped > 0) + _log.Warning( + "{0}: Dropping [{1}] buffered messages to shard [{2}] during hand off to avoid re-ordering", + _typeName, + dropped, + shard); + _loggedFullBufferWarning = false; + } - if (_shards.TryGetValue(shard, out var actorRef)) - { - _handingOff = _handingOff.Add(actorRef); - actorRef.Forward(message); - } - else - Sender.Tell(new ShardCoordinator.ShardStopped(shard)); + if (_shards.TryGetValue(shard, out var actorRef)) + { + _handingOff = _handingOff.Add(actorRef); + actorRef.Forward(message); } + else + Sender.Tell(new ShardCoordinator.ShardStopped(shard)); + } break; default: Unhandled(message); @@ -1224,7 +1234,8 @@ private void TryRequestShardBufferHomes() { foreach (var buffer in _shardBuffers) { - _log.Debug("{0}: Requesting shard home for [{1}] from coordinator at [{2}]. [{3}] buffered messages.", + _log.Debug( + "{0}: Requesting shard home for [{1}] from coordinator at [{2}]. [{3}] buffered messages.", _typeName, buffer.Key, _coordinator, @@ -1236,12 +1247,13 @@ private void TryRequestShardBufferHomes() if (_retryCount >= RetryCountThreshold && _retryCount % RetryCountThreshold == 0 && _log.IsWarningEnabled) { - _log.Warning("{0}: Requested shard homes [{1}] from coordinator at [{2}]. [{3}] total buffered messages.", + _log.Warning( + "{0}: Requested shard homes [{1}] from coordinator at [{2}]. [{3}] total buffered messages.", _typeName, string.Join(", ", _shardBuffers.Select(i => i.Key).OrderBy(i => i)), _coordinator, _shardBuffers.TotalCount - ); + ); } } @@ -1256,7 +1268,8 @@ private void DeliverBufferedMessages(ShardId shardId, IActorRef receiver) { if (Message is RestartShard && !receiver.Equals(Self)) { - _log.Debug("{0}: Dropping buffered message {1}, these are only processed by a local ShardRegion.", + _log.Debug( + "{0}: Dropping buffered message {1}, these are only processed by a local ShardRegion.", _typeName, Message); } @@ -1278,40 +1291,34 @@ private IActorRef GetShard(ShardId id) if (_startingShards.Contains(id)) return ActorRefs.Nobody; - if (!_shards.TryGetValue(id, out var shard)) - { - if (_entityProps == null) - throw new IllegalStateException("Shard must not be allocated to a proxy only ShardRegion"); - - if (_shardsByRef.Values.All(shardId => shardId != id)) - { - _log.Debug("{0}: Starting shard [{1}] in region", _typeName, id); + if (_shards.TryGetValue(id, out var shard)) return shard ?? ActorRefs.Nobody; + if (_entityProps == null) + throw new IllegalStateException("Shard must not be allocated to a proxy only ShardRegion"); - var name = Uri.EscapeDataString(id); - var shardRef = Context.Watch(Context.ActorOf(Shard.Props( - _typeName, - id, - _entityProps, - _settings, - _extractEntityId, - _extractShardId, - _handOffStopMessage, - _rememberEntitiesProvider) - .WithDispatcher(Context.Props.Dispatcher), name)); - - _shardsByRef = _shardsByRef.SetItem(shardRef, id); - _shards = _shards.SetItem(id, shardRef); - _startingShards = _startingShards.Add(id); - return shardRef; - } - } + if (_shardsByRef.Values.Any(shardId => shardId == id)) return shard ?? ActorRefs.Nobody; + _log.Debug("{0}: Starting shard [{1}] in region", _typeName, id); - return shard ?? ActorRefs.Nobody; + var name = Uri.EscapeDataString(id); + var shardRef = Context.Watch(Context.ActorOf(Shard.Props( + _typeName, + id, + _entityProps, + _settings, + _messageExtractor, + _handOffStopMessage, + _rememberEntitiesProvider) + .WithDispatcher(Context.Props.Dispatcher), name)); + + _shardsByRef = _shardsByRef.SetItem(shardRef, id); + _shards = _shards.SetItem(id, shardRef); + _startingShards = _startingShards.Add(id); + return shardRef; } private void HandleClusterState(ClusterEvent.CurrentClusterState state) { - var members = ImmutableSortedSet.Empty.WithComparer(Member.AgeOrdering).Union(state.Members.Where(m => MemberStatusOfInterest.Contains(m.Status) && MatchingRole(m))); + var members = ImmutableSortedSet.Empty.WithComparer(Member.AgeOrdering) + .Union(state.Members.Where(m => MemberStatusOfInterest.Contains(m.Status) && MatchingRole(m))); ChangeMembers(members); } @@ -1330,13 +1337,13 @@ private void HandleClusterEvent(ClusterEvent.IClusterDomainEvent e) break; case ClusterEvent.MemberRemoved mr: - { - var m = mr.Member; - if (m.UniqueAddress == _cluster.SelfUniqueAddress) - Context.Stop(Self); - else if (MatchingRole(m)) - ChangeMembers(_membersByAge.Remove(m)); - } + { + var m = mr.Member; + if (m.UniqueAddress == _cluster.SelfUniqueAddress) + Context.Stop(Self); + else if (MatchingRole(m)) + ChangeMembers(_membersByAge.Remove(m)); + } break; case ClusterEvent.MemberDowned md: @@ -1345,6 +1352,7 @@ private void HandleClusterEvent(ClusterEvent.IClusterDomainEvent e) _log.Info("{0}: Self downed, stopping ShardRegion [{1}]", _typeName, Self.Path); Context.Stop(Self); } + break; case ClusterEvent.IMemberEvent _: // these are expected, no need to warn about them @@ -1379,11 +1387,11 @@ private void HandleTerminated(Terminated terminated) if (_log.IsDebugEnabled) if (_verboseDebug) _log.Debug( - "{0}: Region [{1}] terminated with [{2}] shards [{3}]", - _typeName, - terminated.ActorRef, - shards.Count, - string.Join(", ", shards)); + "{0}: Region [{1}] terminated with [{2}] shards [{3}]", + _typeName, + terminated.ActorRef, + shards.Count, + string.Join(", ", shards)); else _log.Debug("{0}: Region [{1}] terminated with [{2}] shards", _typeName, terminated.ActorRef, shards.Count); @@ -1403,7 +1411,8 @@ private void HandleTerminated(Terminated terminated) // if persist fails it will stop _log.Debug("{0}: Shard [{1}] terminated while not being handed off", _typeName, shard); if (_settings.RememberEntities) - Context.System.Scheduler.ScheduleTellOnce(_settings.TuningParameters.ShardFailureBackoff, Self, new RestartShard(shard), Self); + Context.System.Scheduler.ScheduleTellOnce(_settings.TuningParameters.ShardFailureBackoff, Self, + new RestartShard(shard), Self); } // did this shard get removed because the ShardRegion is shutting down? @@ -1412,4 +1421,4 @@ private void HandleTerminated(Terminated terminated) } } } -} +} \ No newline at end of file diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ShardedDaemonProcess.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ShardedDaemonProcess.cs index 0de178d708c..1a46a6c9b80 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ShardedDaemonProcess.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ShardedDaemonProcess.cs @@ -70,7 +70,10 @@ public MessageExtractor(int maxNumberOfShards) public override string EntityId(object message) => (message as ShardingEnvelope)?.EntityId; public override object EntityMessage(object message) => (message as ShardingEnvelope)?.Message; - public override string ShardId(object message) => message is ShardRegion.StartEntity se ? se.EntityId : EntityId(message); + public override string ShardId(string entityId, object messageHint = null) + { + return entityId; + } } /// diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.DotNet.verified.txt index 2765e2cef3c..7d4085f72cf 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.DotNet.verified.txt @@ -7,6 +7,7 @@ [assembly: System.Runtime.Versioning.TargetFrameworkAttribute(".NETCoreApp,Version=v6.0", FrameworkDisplayName=".NET 6.0")] namespace Akka.Cluster.Sharding { + [System.Runtime.CompilerServices.NullableAttribute(0)] public class ClusterSharding : Akka.Actor.IExtension { public ClusterSharding(Akka.Actor.ExtendedActorSystem system) { } @@ -38,7 +39,10 @@ namespace Akka.Cluster.Sharding public System.Threading.Tasks.Task StartProxyAsync(string typeName, string role, Akka.Cluster.Sharding.ExtractEntityId extractEntityId, Akka.Cluster.Sharding.ExtractShardId extractShardId) { } public System.Threading.Tasks.Task StartProxyAsync(string typeName, string role, Akka.Cluster.Sharding.IMessageExtractor messageExtractor) { } } - public class ClusterShardingExtensionProvider : Akka.Actor.ExtensionIdProvider + [System.Runtime.CompilerServices.NullableAttribute(new byte[] { + 0, + 1})] + public sealed class ClusterShardingExtensionProvider : Akka.Actor.ExtensionIdProvider { public ClusterShardingExtensionProvider() { } public override Akka.Cluster.Sharding.ClusterSharding CreateExtension(Akka.Actor.ExtendedActorSystem system) { } @@ -113,7 +117,14 @@ namespace Akka.Cluster.Sharding { public static System.Collections.Generic.IEnumerable> Grouped(this System.Collections.Generic.IEnumerable items, int size) { } } + [System.ObsoleteAttribute("Use HashCodeMessageExtractor or IMessageExtractor instead.")] + [return: System.Runtime.CompilerServices.NullableAttribute(new byte[] { + 0, + 0, + 1, + 1})] public delegate Akka.Util.Option> ExtractEntityId(object message); + [System.ObsoleteAttribute("Use HashCodeMessageExtractor or IMessageExtractor instead.")] public delegate string ExtractShardId(object message); public sealed class GetClusterShardingStats : Akka.Cluster.Sharding.IClusterShardingSerializable, Akka.Cluster.Sharding.IShardRegionQuery, System.IEquatable { @@ -149,14 +160,20 @@ namespace Akka.Cluster.Sharding { public static readonly Akka.Cluster.Sharding.GracefulShutdown Instance; } + [System.Runtime.CompilerServices.NullableAttribute(0)] public abstract class HashCodeMessageExtractor : Akka.Cluster.Sharding.IMessageExtractor { public readonly int MaxNumberOfShards; protected HashCodeMessageExtractor(int maxNumberOfShards) { } - public static Akka.Cluster.Sharding.HashCodeMessageExtractor Create(int maxNumberOfShards, System.Func entityIdExtractor, System.Func messageExtractor = null) { } + public static Akka.Cluster.Sharding.HashCodeMessageExtractor Create(int maxNumberOfShards, System.Func entityIdExtractor, [System.Runtime.CompilerServices.NullableAttribute(new byte[] { + 2, + 1, + 1})] System.Func messageExtractor = null) { } public abstract string EntityId(object message); public virtual object EntityMessage(object message) { } + [System.ObsoleteAttribute("Use ShardId(string, object?) instead")] public virtual string ShardId(object message) { } + public virtual string ShardId(string entityId, [System.Runtime.CompilerServices.NullableAttribute(2)] object messageHint = null) { } } public interface IActorSystemDependentAllocationStrategy : Akka.Actor.INoSerializationVerificationNeeded, Akka.Cluster.Sharding.IShardAllocationStrategy { @@ -165,9 +182,14 @@ namespace Akka.Cluster.Sharding public interface IClusterShardingSerializable { } public interface IMessageExtractor { + [return: System.Runtime.CompilerServices.NullableAttribute(2)] string EntityId(object message); + [return: System.Runtime.CompilerServices.NullableAttribute(2)] object EntityMessage(object message); + [System.ObsoleteAttribute("Use ShardId(EntityId, object) instead.")] + [return: System.Runtime.CompilerServices.NullableAttribute(2)] string ShardId(object message); + string ShardId(string entityId, [System.Runtime.CompilerServices.NullableAttribute(2)] object messageHint = null); } public interface IShardAllocationStrategy : Akka.Actor.INoSerializationVerificationNeeded { @@ -203,13 +225,18 @@ namespace Akka.Cluster.Sharding public static Akka.Cluster.Sharding.IShardAllocationStrategy LeastShardAllocationStrategy(int absoluteLimit, double relativeLimit) { } } [Akka.Annotations.InternalStableApiAttribute()] + [System.Runtime.CompilerServices.NullableAttribute(0)] public sealed class ShardRegion : Akka.Actor.ActorBase, Akka.Actor.IWithTimers { - public ShardRegion(string typeName, System.Func entityProps, Akka.Cluster.Sharding.ClusterShardingSettings settings, string coordinatorPath, Akka.Cluster.Sharding.ExtractEntityId extractEntityId, Akka.Cluster.Sharding.ExtractShardId extractShardId, object handOffStopMessage, Akka.Cluster.Sharding.Internal.IRememberEntitiesProvider rememberEntitiesProvider) { } + public ShardRegion(string typeName, [System.Runtime.CompilerServices.NullableAttribute(new byte[] { + 2, + 1, + 1})] System.Func entityProps, Akka.Cluster.Sharding.ClusterShardingSettings settings, string coordinatorPath, Akka.Cluster.Sharding.IMessageExtractor messageExtractor, object handOffStopMessage, [System.Runtime.CompilerServices.NullableAttribute(2)] Akka.Cluster.Sharding.Internal.IRememberEntitiesProvider rememberEntitiesProvider) { } public Akka.Actor.ITimerScheduler Timers { get; set; } protected override void PostStop() { } protected override void PreStart() { } protected override bool Receive(object message) { } + [System.Runtime.CompilerServices.NullableAttribute(0)] public sealed class StartEntity : Akka.Cluster.Sharding.IClusterShardingSerializable, System.IEquatable { public readonly string EntityId; diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.Net.verified.txt index 5509ef03982..fbdae31b6b2 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.Net.verified.txt @@ -7,6 +7,7 @@ [assembly: System.Runtime.Versioning.TargetFrameworkAttribute(".NETStandard,Version=v2.0", FrameworkDisplayName=".NET Standard 2.0")] namespace Akka.Cluster.Sharding { + [System.Runtime.CompilerServices.NullableAttribute(0)] public class ClusterSharding : Akka.Actor.IExtension { public ClusterSharding(Akka.Actor.ExtendedActorSystem system) { } @@ -38,7 +39,10 @@ namespace Akka.Cluster.Sharding public System.Threading.Tasks.Task StartProxyAsync(string typeName, string role, Akka.Cluster.Sharding.ExtractEntityId extractEntityId, Akka.Cluster.Sharding.ExtractShardId extractShardId) { } public System.Threading.Tasks.Task StartProxyAsync(string typeName, string role, Akka.Cluster.Sharding.IMessageExtractor messageExtractor) { } } - public class ClusterShardingExtensionProvider : Akka.Actor.ExtensionIdProvider + [System.Runtime.CompilerServices.NullableAttribute(new byte[] { + 0, + 1})] + public sealed class ClusterShardingExtensionProvider : Akka.Actor.ExtensionIdProvider { public ClusterShardingExtensionProvider() { } public override Akka.Cluster.Sharding.ClusterSharding CreateExtension(Akka.Actor.ExtendedActorSystem system) { } @@ -113,7 +117,14 @@ namespace Akka.Cluster.Sharding { public static System.Collections.Generic.IEnumerable> Grouped(this System.Collections.Generic.IEnumerable items, int size) { } } + [System.ObsoleteAttribute("Use HashCodeMessageExtractor or IMessageExtractor instead.")] + [return: System.Runtime.CompilerServices.NullableAttribute(new byte[] { + 0, + 0, + 1, + 1})] public delegate Akka.Util.Option> ExtractEntityId(object message); + [System.ObsoleteAttribute("Use HashCodeMessageExtractor or IMessageExtractor instead.")] public delegate string ExtractShardId(object message); public sealed class GetClusterShardingStats : Akka.Cluster.Sharding.IClusterShardingSerializable, Akka.Cluster.Sharding.IShardRegionQuery, System.IEquatable { @@ -149,14 +160,20 @@ namespace Akka.Cluster.Sharding { public static readonly Akka.Cluster.Sharding.GracefulShutdown Instance; } + [System.Runtime.CompilerServices.NullableAttribute(0)] public abstract class HashCodeMessageExtractor : Akka.Cluster.Sharding.IMessageExtractor { public readonly int MaxNumberOfShards; protected HashCodeMessageExtractor(int maxNumberOfShards) { } - public static Akka.Cluster.Sharding.HashCodeMessageExtractor Create(int maxNumberOfShards, System.Func entityIdExtractor, System.Func messageExtractor = null) { } + public static Akka.Cluster.Sharding.HashCodeMessageExtractor Create(int maxNumberOfShards, System.Func entityIdExtractor, [System.Runtime.CompilerServices.NullableAttribute(new byte[] { + 2, + 1, + 1})] System.Func messageExtractor = null) { } public abstract string EntityId(object message); public virtual object EntityMessage(object message) { } + [System.ObsoleteAttribute("Use ShardId(string, object?) instead")] public virtual string ShardId(object message) { } + public virtual string ShardId(string entityId, [System.Runtime.CompilerServices.NullableAttribute(2)] object messageHint = null) { } } public interface IActorSystemDependentAllocationStrategy : Akka.Actor.INoSerializationVerificationNeeded, Akka.Cluster.Sharding.IShardAllocationStrategy { @@ -165,9 +182,14 @@ namespace Akka.Cluster.Sharding public interface IClusterShardingSerializable { } public interface IMessageExtractor { + [return: System.Runtime.CompilerServices.NullableAttribute(2)] string EntityId(object message); + [return: System.Runtime.CompilerServices.NullableAttribute(2)] object EntityMessage(object message); + [System.ObsoleteAttribute("Use ShardId(EntityId, object) instead.")] + [return: System.Runtime.CompilerServices.NullableAttribute(2)] string ShardId(object message); + string ShardId(string entityId, [System.Runtime.CompilerServices.NullableAttribute(2)] object messageHint = null); } public interface IShardAllocationStrategy : Akka.Actor.INoSerializationVerificationNeeded { @@ -203,13 +225,18 @@ namespace Akka.Cluster.Sharding public static Akka.Cluster.Sharding.IShardAllocationStrategy LeastShardAllocationStrategy(int absoluteLimit, double relativeLimit) { } } [Akka.Annotations.InternalStableApiAttribute()] + [System.Runtime.CompilerServices.NullableAttribute(0)] public sealed class ShardRegion : Akka.Actor.ActorBase, Akka.Actor.IWithTimers { - public ShardRegion(string typeName, System.Func entityProps, Akka.Cluster.Sharding.ClusterShardingSettings settings, string coordinatorPath, Akka.Cluster.Sharding.ExtractEntityId extractEntityId, Akka.Cluster.Sharding.ExtractShardId extractShardId, object handOffStopMessage, Akka.Cluster.Sharding.Internal.IRememberEntitiesProvider rememberEntitiesProvider) { } + public ShardRegion(string typeName, [System.Runtime.CompilerServices.NullableAttribute(new byte[] { + 2, + 1, + 1})] System.Func entityProps, Akka.Cluster.Sharding.ClusterShardingSettings settings, string coordinatorPath, Akka.Cluster.Sharding.IMessageExtractor messageExtractor, object handOffStopMessage, [System.Runtime.CompilerServices.NullableAttribute(2)] Akka.Cluster.Sharding.Internal.IRememberEntitiesProvider rememberEntitiesProvider) { } public Akka.Actor.ITimerScheduler Timers { get; set; } protected override void PostStop() { } protected override void PreStart() { } protected override bool Receive(object message) { } + [System.Runtime.CompilerServices.NullableAttribute(0)] public sealed class StartEntity : Akka.Cluster.Sharding.IClusterShardingSerializable, System.IEquatable { public readonly string EntityId;