diff --git a/src/Akka.Cluster.Hosting.Tests/ShardedDaemonProcessProxySpecs.cs b/src/Akka.Cluster.Hosting.Tests/ShardedDaemonProcessProxySpecs.cs new file mode 100644 index 0000000..0231b09 --- /dev/null +++ b/src/Akka.Cluster.Hosting.Tests/ShardedDaemonProcessProxySpecs.cs @@ -0,0 +1,150 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Cluster.Sharding; +using Akka.Hosting; +using Akka.Remote.Hosting; +using FluentAssertions; +using FluentAssertions.Extensions; +using Microsoft.Extensions.DependencyInjection; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Cluster.Hosting.Tests; + +public class ShardedDaemonProcessProxySpecs: Akka.Hosting.TestKit.TestKit +{ + private class EchoActor : ReceiveActor + { + public static Props EchoProps(int i) => Props.Create(() => new EchoActor()); + + public EchoActor() + { + ReceiveAny(msg => Sender.Tell(msg)); + } + } + + internal enum ShardedDaemonRouter { } + + public const int NumWorkers = 10; + public const string Name = "daemonTest"; + public const string Role = "workers"; + + protected override void ConfigureAkka(AkkaConfigurationBuilder builder, IServiceProvider provider) + { + builder + .WithRemoting(new RemoteOptions + { + Port = 0 + }) + .WithClustering(new ClusterOptions + { + Roles = new[]{ Role } + }) + .WithShardedDaemonProcess( + name: Name, + numberOfInstances: NumWorkers, + entityPropsFactory: (_, _, _) => EchoActor.EchoProps, + options: new ClusterDaemonOptions + { + KeepAliveInterval = 500.Milliseconds(), + Role = Role, + HandoffStopMessage = PoisonPill.Instance + }) + .AddStartup((system, _) => + { + var cluster = Cluster.Get(system); + cluster.Join(cluster.SelfAddress); + }); + } + + public ShardedDaemonProcessProxySpecs(ITestOutputHelper output) : base(nameof(ShardedDaemonProcessProxySpecs), output) + { } + + [Fact] + public async Task ShardedDaemonProcessProxy_must_start_daemon_process_on_proxy() + { + // validate that we have a cluster + await AwaitAssertAsync(() => + { + Cluster.Get(Sys).State.Members.Count(x => x.Status == MemberStatus.Up).Should().Be(1); + }); + + // + var host = await Host.Services.GetRequiredService().GetAsync(); + + // ping some of the workers via the host + for(var i = 0; i < NumWorkers; i++) + { + var result = await host.Ask(i); + result.Should().Be(i); + } + // + + // + // start the proxy on the proxy system, which runs on a different role not capable of hosting workers + ProxySystem? proxySystem = null; + try + { + proxySystem = new ProxySystem(Output, Sys); + await proxySystem.InitializeAsync(); + + // validate that we have a 2 node cluster with both members marked as up + await AwaitAssertAsync(() => + { + Cluster.Get(Sys).State.Members.Count(x => x.Status == MemberStatus.Up).Should().Be(2); + Cluster.Get(proxySystem.Sys).State.Members.Count(x => x.Status == MemberStatus.Up).Should().Be(2); + }); + + var proxyRouter = await proxySystem.Host.Services + .GetRequiredService().GetAsync(); + + // ping some of the workers via the proxy + for(var i = 0; i < NumWorkers; i++) + { + var result = await proxyRouter.Ask(i); + result.Should().Be(i); + } + } + finally + { + proxySystem?.DisposeAsync(); + } + // + } + +} + +public class ProxySystem: Akka.Hosting.TestKit.TestKit +{ + private readonly Cluster _remoteCluster; + + public ProxySystem(ITestOutputHelper? output, ActorSystem remoteSystem) + : base(nameof(ShardedDaemonProcessProxySpecs), output) + { + _remoteCluster = Cluster.Get(remoteSystem); + } + + protected override void ConfigureAkka(AkkaConfigurationBuilder builder, IServiceProvider provider) + { + builder + .WithRemoting(new RemoteOptions + { + Port = 0 + }) + .WithClustering(new ClusterOptions + { + Roles = new[]{ "proxy" } + }) + .WithShardedDaemonProcessProxy( + name: ShardedDaemonProcessProxySpecs.Name, + numberOfInstances: ShardedDaemonProcessProxySpecs.NumWorkers, + role: ShardedDaemonProcessProxySpecs.Role) + .AddStartup((system, _) => + { + var cluster = Cluster.Get(system); + cluster.Join(_remoteCluster.SelfAddress); + }); + } +} diff --git a/src/Akka.Cluster.Hosting.Tests/ShardedDaemonProcessSpecs.cs b/src/Akka.Cluster.Hosting.Tests/ShardedDaemonProcessSpecs.cs new file mode 100644 index 0000000..8456798 --- /dev/null +++ b/src/Akka.Cluster.Hosting.Tests/ShardedDaemonProcessSpecs.cs @@ -0,0 +1,195 @@ +using System; +using System.Collections; +using System.Collections.Generic; +using System.Linq; +using System.Security.Cryptography; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Cluster.Sharding; +using Akka.Event; +using Akka.Hosting; +using Akka.Remote.Hosting; +using Akka.TestKit; +using FluentAssertions; +using FluentAssertions.Extensions; +using Microsoft.Extensions.DependencyInjection; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Cluster.Hosting.Tests; + +public class ShardedDaemonProcessSpecs: Akka.Hosting.TestKit.TestKit +{ + private sealed class Stop + { + public static Stop Instance { get; } = new(); + private Stop() { } + } + + internal sealed class Started + { + public int Id { get; } + public IActorRef SelfRef { get; } + + public Started(int id, IActorRef selfRef) + { + Id = id; + SelfRef = selfRef; + } + } + + internal class MyDaemonActor : UntypedActor + { + private readonly int _id; + private readonly IActorRef _probe; + private readonly ILoggingAdapter _log; + + public MyDaemonActor(int id, IRequiredActor probe) + { + _id = id; + _probe = probe.ActorRef; + _log = Context.GetLogger(); + } + + protected override void PreStart() + { + base.PreStart(); + _probe.Tell(new Started(_id, Context.Self)); + _log.Info("Actor {0} started", _id); + } + + protected override void PostStop() + { + base.PostStop(); + _log.Info("Actor {0} stopped", _id); + } + + protected override void OnReceive(object message) + { + switch (message) + { + case Stop: + Context.Stop(Self); + break; + default: + Unhandled(message); + break; + } + } + } + + internal enum ShardedDaemonRouter { } + + private Cluster _cluster = null!; + + public ShardedDaemonProcessSpecs(ITestOutputHelper output) : base(nameof(ShardedDaemonProcessSpecs), output) + { + } + + protected override void ConfigureAkka(AkkaConfigurationBuilder builder, IServiceProvider provider) + { + builder + .WithRemoting(new RemoteOptions + { + Port = 0 + }) + .WithClustering() + .WithShardedDaemonProcess( + name: "test", + numberOfInstances: 5, + entityPropsFactory: (_, _, resolver) => id => resolver.Props(typeof(MyDaemonActor), id), + options: new ClusterDaemonOptions + { + KeepAliveInterval = 500.Milliseconds() + }) + .AddStartup((system, _) => + { + var cluster = Cluster.Get(system); + cluster.Join(cluster.SelfAddress); + }); + } + + protected override async Task BeforeTestStart() + { + _cluster = Cluster.Get(Sys); + + await AwaitAssertAsync(() => _cluster.SelfMember.Status.Should().Be(MemberStatus.Up), + TimeSpan.FromSeconds(3)); + } + + [Fact] + public async Task ShardedDaemonProcess_must_start_N_actors_with_unique_ids() + { + var started = new List(); + foreach (var _ in Enumerable.Range(0, 5)) + { + started.Add(await ExpectMsgAsync()); + } + + started.Count.Should().Be(5); + started.Select(s => s.Id).ToList().Should().BeEquivalentTo(new []{0, 1, 2, 3, 4}); + await ExpectNoMsgAsync(1.Seconds()); + } + + [Fact] + public async Task ShardedDaemonProcess_must_restart_actors_if_they_stop() + { + var startMessages = new List(); + foreach (var _ in Enumerable.Range(0, 5)) + { + startMessages.Add(await ExpectMsgAsync()); + } + + startMessages.Count.Should().Be(5); + startMessages.Select(s => s.Id).ToList().Should().BeEquivalentTo(new []{0, 1, 2, 3, 4}); + + // Stop all entities + foreach (var start in startMessages) + { + start.SelfRef.Tell(Stop.Instance); + } + + startMessages.Clear(); + // periodic ping every 1s makes it restart + foreach (var _ in Enumerable.Range(0, 5)) + { + startMessages.Add(await ExpectMsgAsync()); + } + + startMessages.Count.Should().Be(5); + startMessages.Select(s => s.Id).ToList().Should().BeEquivalentTo(new []{0, 1, 2, 3, 4}); + } +} + +public class ShardedDaemonProcessFailureSpecs : Akka.Hosting.TestKit.TestKit +{ + protected override void ConfigureAkka(AkkaConfigurationBuilder builder, IServiceProvider provider) + { + builder + .WithRemoting() + .WithClustering() + .WithShardedDaemonProcess( + name: "test", + numberOfInstances: 5, + entityPropsFactory: (_, _, resolver) => id => resolver.Props(typeof(ShardedDaemonProcessSpecs.MyDaemonActor), id), + options: new ClusterDaemonOptions + { + KeepAliveInterval = 500.Milliseconds(), + Role = "DoNotExist" + }) + .AddStartup((system, _) => + { + var cluster = Cluster.Get(system); + cluster.Join(cluster.SelfAddress); + }); + } + + [Fact] + public async Task ShardedDaemonProcess_must_not_run_if_the_role_does_not_match_node_role() + { + var registry = Host.Services.GetRequiredService(); + registry.TryGet(out _).Should().BeFalse(); + + await ExpectNoMsgAsync(1.Seconds()); + } +} \ No newline at end of file diff --git a/src/Akka.Cluster.Hosting/AkkaClusterHostingExtensions.cs b/src/Akka.Cluster.Hosting/AkkaClusterHostingExtensions.cs index 0f89ff7..76f7499 100644 --- a/src/Akka.Cluster.Hosting/AkkaClusterHostingExtensions.cs +++ b/src/Akka.Cluster.Hosting/AkkaClusterHostingExtensions.cs @@ -13,6 +13,7 @@ using Akka.Configuration; using Akka.Coordination; using Akka.DependencyInjection; +using Akka.Event; using Akka.Hosting; using Akka.Hosting.Coordination; using Akka.Persistence.Hosting; @@ -1011,6 +1012,129 @@ public static AkkaConfigurationBuilder WithShardRegionProxy( registry.Register(shardRegionProxy); }); } + + /// + /// Starts a actor for the + /// given entity type and registers the + /// with in the for this + /// . + /// + /// + /// The builder instance being configured. + /// + /// + /// The name of the entity type + /// + /// + /// The number of actors the should instantiate during start-up + /// + /// + /// Function that, given an integer, returns the of the entity actors that will + /// be created by the . + /// + /// This function also accepts the and the as inputs. + /// + /// + /// The set of options for configuring + /// + /// + /// The type key to use to retrieve the for this . + /// + /// + /// The same instance originally passed in. + /// + public static AkkaConfigurationBuilder WithShardedDaemonProcess( + this AkkaConfigurationBuilder builder, + string name, + int numberOfInstances, + Func> entityPropsFactory, + ClusterDaemonOptions? options = null) + { + var config = options?.ToHocon(); + if (config != null) + builder.AddHocon(config, HoconAddMode.Prepend); + + builder + .AddHocon(ClusterSharding.DefaultConfig(), HoconAddMode.Append) + .AddHocon(ClusterSingletonManager.DefaultConfig(), HoconAddMode.Append) + .AddHocon(DistributedData.DistributedData.DefaultConfig(), HoconAddMode.Append); + + builder.WithActors((system, registry, resolver) => + { + var settings = ShardedDaemonProcessSettings.Create(system); + + if (options is not null) + { + if (!string.IsNullOrWhiteSpace(options.Role)) + settings = settings.WithRole(options.Role!); + if (options.ShardingSettings is not null) + settings = settings.WithShardingSettings(options.ShardingSettings); + if (options.KeepAliveInterval is not null) + settings = settings.WithKeepAliveInterval(options.KeepAliveInterval.Value); + } + + var props = entityPropsFactory(system, registry, resolver); + + var router = ShardedDaemonProcess.Get(system: system).Init( + name: name, + numberOfInstances: numberOfInstances, + propsFactory: props, + settings: settings, + stopMessage: options?.HandoffStopMessage); + + if(router is not null) + registry.Register(router); + }); + + return builder; + } + + /// + /// Starts a proxy actor that + /// points to a hosted on a different inside + /// the cluster and registers the with in the + /// for this . + /// + /// Note that the , , and + /// argument MUST match the target + /// for the proxy to work. + /// + /// + /// The builder instance being configured. + /// + /// + /// The name of the entity type + /// + /// + /// The number of actors the should instantiate during start-up + /// + /// + /// The role of the Akka.Cluster member that is hosting this . + /// + /// + /// The type key to use to retrieve the for this . + /// + /// + /// The same instance originally passed in. + /// + public static AkkaConfigurationBuilder WithShardedDaemonProcessProxy( + this AkkaConfigurationBuilder builder, + string name, + int numberOfInstances, + string role) + { + builder + .AddHocon(ClusterSharding.DefaultConfig(), HoconAddMode.Append) + .AddHocon(ClusterSingletonProxy.DefaultConfig(), HoconAddMode.Append) + .AddHocon(DistributedData.DistributedData.DefaultConfig(), HoconAddMode.Append) + .WithActors((system, registry) => + { + var proxyRouter = ShardedDaemonProcess.Get(system).InitProxy(name, numberOfInstances, role); + registry.Register(proxyRouter); + }); + + return builder; + } /// /// Starts on this node immediately upon startup. diff --git a/src/Akka.Cluster.Hosting/ClusterDaemonOptions.cs b/src/Akka.Cluster.Hosting/ClusterDaemonOptions.cs new file mode 100644 index 0000000..8afcb0e --- /dev/null +++ b/src/Akka.Cluster.Hosting/ClusterDaemonOptions.cs @@ -0,0 +1,21 @@ +using System; +using Akka.Cluster.Sharding; +using Akka.Configuration; +using Akka.Hosting; + +namespace Akka.Cluster.Hosting; + +public sealed class ClusterDaemonOptions +{ + public TimeSpan? KeepAliveInterval { get; set; } + public ClusterShardingSettings? ShardingSettings { get; set; } + public string? Role { get; set; } + public object? HandoffStopMessage { get; set; } + + internal Config? ToHocon() + { + return KeepAliveInterval is not null + ? $"akka.cluster.sharded-daemon-process.keep-alive-interval = {KeepAliveInterval.ToHocon()}" + : null; + } +} \ No newline at end of file diff --git a/src/Akka.Hosting.API.Tests/verify/CoreApiSpec.ApproveCluster.verified.txt b/src/Akka.Hosting.API.Tests/verify/CoreApiSpec.ApproveCluster.verified.txt index 602b08f..c2a7ca5 100644 --- a/src/Akka.Hosting.API.Tests/verify/CoreApiSpec.ApproveCluster.verified.txt +++ b/src/Akka.Hosting.API.Tests/verify/CoreApiSpec.ApproveCluster.verified.txt @@ -27,10 +27,20 @@ namespace Akka.Cluster.Hosting [System.Obsolete("Use IMessageExtractor instead of the ExtractEntityId and ExtractShardId delegates" + ".")] public static Akka.Hosting.AkkaConfigurationBuilder WithShardRegionProxy(this Akka.Hosting.AkkaConfigurationBuilder builder, string typeName, string roleName, Akka.Cluster.Sharding.ExtractEntityId extractEntityId, Akka.Cluster.Sharding.ExtractShardId extractShardId) { } + public static Akka.Hosting.AkkaConfigurationBuilder WithShardedDaemonProcess(this Akka.Hosting.AkkaConfigurationBuilder builder, string name, int numberOfInstances, System.Func> entityPropsFactory, Akka.Cluster.Hosting.ClusterDaemonOptions? options = null) { } + public static Akka.Hosting.AkkaConfigurationBuilder WithShardedDaemonProcessProxy(this Akka.Hosting.AkkaConfigurationBuilder builder, string name, int numberOfInstances, string role) { } public static Akka.Hosting.AkkaConfigurationBuilder WithSingleton(this Akka.Hosting.AkkaConfigurationBuilder builder, string singletonName, Akka.Actor.Props actorProps, Akka.Cluster.Hosting.ClusterSingletonOptions? options = null, bool createProxyToo = true) { } public static Akka.Hosting.AkkaConfigurationBuilder WithSingleton(this Akka.Hosting.AkkaConfigurationBuilder builder, string singletonName, System.Func propsFactory, Akka.Cluster.Hosting.ClusterSingletonOptions? options = null, bool createProxyToo = true) { } public static Akka.Hosting.AkkaConfigurationBuilder WithSingletonProxy(this Akka.Hosting.AkkaConfigurationBuilder builder, string singletonName, Akka.Cluster.Hosting.ClusterSingletonOptions? options = null, string? singletonManagerPath = null) { } } + public sealed class ClusterDaemonOptions + { + public ClusterDaemonOptions() { } + public object? HandoffStopMessage { get; set; } + public System.TimeSpan? KeepAliveInterval { get; set; } + public string? Role { get; set; } + public Akka.Cluster.Sharding.ClusterShardingSettings? ShardingSettings { get; set; } + } public sealed class ClusterOptions { public ClusterOptions() { } diff --git a/src/Akka.Hosting.TestKit/TestKit.cs b/src/Akka.Hosting.TestKit/TestKit.cs index f52e29f..4f5a62e 100644 --- a/src/Akka.Hosting.TestKit/TestKit.cs +++ b/src/Akka.Hosting.TestKit/TestKit.cs @@ -102,20 +102,25 @@ private void InternalConfigureServices(HostBuilderContext context, IServiceColle }); } - ConfigureAkka(builder, provider); - - builder.AddStartup((system, _) => + builder.StartActors((system, registry) => { try { base.InitializeTest(system, (ActorSystemSetup)null!, null, null); - _initialized.SetResult(Done.Instance); + registry.Register(TestActor); } catch (Exception e) { _initialized.SetException(e); } }); + + ConfigureAkka(builder, provider); + + builder.AddStartup((_, _) => + { + _initialized.TrySetResult(Done.Instance); + }); }); } diff --git a/src/Directory.Build.props b/src/Directory.Build.props index f8e3b32..3f0194f 100644 --- a/src/Directory.Build.props +++ b/src/Directory.Build.props @@ -21,11 +21,11 @@ netstandard2.0 net6.0 - 2.7.1 + 2.8.1 17.10.0 6.0.2 - 2.5.8 - 1.5.22 + 2.8.1 + 1.5.24 [6.0.0,)