Skip to content

Commit

Permalink
Fix how ClusterShardingSettings were applied (#480)
Browse files Browse the repository at this point in the history
* Fix how ClusterShardingSettings were applied

* Add default cluster sharding API

* Update API Approver list

* Do not apply DistributedData options

* Update API Approval list

* Add unit test

* Update API Approval list
  • Loading branch information
Arkatufus authored Jul 29, 2024
1 parent 57c65bc commit 2e56b9f
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 21 deletions.
68 changes: 68 additions & 0 deletions src/Akka.Cluster.Hosting.Tests/ClusterShardingSpecs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Cluster.Hosting.SBR;
using Akka.Cluster.Hosting.Tests.Lease;
using Akka.Cluster.Sharding;
using Akka.Cluster.Tools.Singleton;
using Akka.Configuration;
using Akka.Hosting;
using FluentAssertions;
using FluentAssertions.Extensions;
using Microsoft.Extensions.DependencyInjection;
using Xunit;
using Xunit.Abstractions;
Expand Down Expand Up @@ -110,4 +115,67 @@ public async Task Should_use_ActorRegistry_with_ShardRegion()
id.Should().Be("foo");
sourceRef.Should().Be(actorRegistry.Get<MyTopLevelActor>());
}

[Fact(DisplayName = "ShardOptions with different values should generate valid ClusterShardSettings")]
public void ShardOptionsTest()
{
var settings1 = ToSettings(new ShardOptions
{
RememberEntities = true,
StateStoreMode = StateStoreMode.Persistence,
RememberEntitiesStore = RememberEntitiesStore.Eventsourced,
Role = "first",
PassivateIdleEntityAfter = 1.Seconds(),
SnapshotPluginId = "firstSnapshot",
JournalPluginId = "firstJournal",
LeaseImplementation = new TestLeaseOption(),
LeaseRetryInterval = 2.Seconds(),
ShardRegionQueryTimeout = 3.Seconds(),
});
settings1.RememberEntities.Should().BeTrue();
settings1.StateStoreMode.Should().Be(StateStoreMode.Persistence);
settings1.RememberEntitiesStore.Should().Be(RememberEntitiesStore.Eventsourced);
settings1.Role.Should().Be("first");
settings1.PassivateIdleEntityAfter.Should().Be(1.Seconds());
settings1.SnapshotPluginId.Should().Be("firstSnapshot");
settings1.JournalPluginId.Should().Be("firstJournal");
settings1.LeaseSettings.LeaseImplementation.Should().Be("test-lease");
settings1.LeaseSettings.LeaseRetryInterval.Should().Be(2.Seconds());
settings1.ShardRegionQueryTimeout.Should().Be(3.Seconds());

var settings2 = ToSettings(new ShardOptions
{
RememberEntities = false,
StateStoreMode = StateStoreMode.DData,
RememberEntitiesStore = RememberEntitiesStore.DData,
Role = "second",
PassivateIdleEntityAfter = 4.Seconds(),
SnapshotPluginId = "secondSnapshot",
JournalPluginId = "secondJournal",
ShardRegionQueryTimeout = 5.Seconds(),
});
settings2.RememberEntities.Should().BeFalse();
settings2.StateStoreMode.Should().Be(StateStoreMode.DData);
settings2.RememberEntitiesStore.Should().Be(RememberEntitiesStore.DData);
settings2.Role.Should().Be("second");
settings2.PassivateIdleEntityAfter.Should().Be(4.Seconds());
settings2.JournalPluginId.Should().Be("secondJournal");
settings2.SnapshotPluginId.Should().Be("secondSnapshot");
settings2.LeaseSettings.Should().BeNull();
settings2.ShardRegionQueryTimeout.Should().Be(5.Seconds());
}

private static ClusterShardingSettings ToSettings(ShardOptions shardOptions)
{
var defaultConfig = ClusterSharding.DefaultConfig()
.WithFallback(DistributedData.DistributedData.DefaultConfig())
.WithFallback(ClusterSingletonManager.DefaultConfig());

var shardingConfig = ConfigurationFactory.ParseString(shardOptions.ToString())
.WithFallback(defaultConfig.GetConfig("akka.cluster.sharding"));
var coordinatorConfig = defaultConfig.GetConfig(
shardingConfig.GetString("coordinator-singleton"));

return ClusterShardingSettings.Create(shardingConfig, coordinatorConfig);
}
}
55 changes: 34 additions & 21 deletions src/Akka.Cluster.Hosting/AkkaClusterHostingExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -279,14 +279,17 @@ public sealed class ShardOptions
/// <summary>
/// <para>
/// Settings for the Distributed Data replicator.
/// The <see cref="DistributedData.Role"/> property is not used. The distributed-data
/// The <see cref="ShardingDDataOptions.Role"/> property is not used. The distributed-data
/// role will be the same as <see cref="ShardOptions.Role"/>.
/// Note that there is one Replicator per role and it's not possible
/// to have different distributed-data settings for different sharding entity types.
/// </para>
/// <b>NOTE</b> This setting is only used when <see cref="StateStoreMode"/> is set to
/// <see cref="Akka.Cluster.Sharding.StateStoreMode.DData"/>
/// </summary>
[Obsolete("This property is not being applied to the ActorSystem anymore. " +
"Use manual HOCON configuration to set \"akka.cluster.sharding.distributed-data\" values. " +
"Since v1.5.27")]
public ShardingDDataOptions DistributedData { get; } = new();

/// <summary>
Expand All @@ -303,12 +306,12 @@ public sealed class ShardOptions
/// </summary>
public TimeSpan? PassivateIdleEntityAfter { get; set; }

internal void Apply(AkkaConfigurationBuilder builder)
public TimeSpan? ShardRegionQueryTimeout { get; set; }

public override string ToString()
{
DistributedData.Apply(builder);

var sb = new StringBuilder();

if (Role is not null)
sb.AppendLine($"role = {Role.ToHocon()}");

Expand Down Expand Up @@ -344,12 +347,10 @@ internal void Apply(AkkaConfigurationBuilder builder)
else if(PassivateIdleEntityAfter is not null)
sb.AppendLine($"passivate-idle-entity-after = {PassivateIdleEntityAfter.ToHocon()}");

if (sb.Length > 0)
{
sb.Insert(0, "akka.cluster.sharding {\n");
sb.AppendLine("}");
builder.AddHocon(sb.ToString(), HoconAddMode.Prepend);
}
if (ShardRegionQueryTimeout is not null)
sb.AppendLine($"shard-region-query-timeout = {ShardRegionQueryTimeout.ToHocon()}");

return sb.ToString();
}
}

Expand Down Expand Up @@ -646,7 +647,7 @@ public static AkkaConfigurationBuilder WithDistributedData(
builder.AddHocon(DistributedData.DistributedData.DefaultConfig(), HoconAddMode.Append);
return builder;
}

/// <summary>
/// Starts a <see cref="ShardRegion"/> actor for the given entity <see cref="typeName"/>
/// and registers the ShardRegion <see cref="IActorRef"/> with <see cref="TKey"/> in the
Expand Down Expand Up @@ -850,17 +851,23 @@ public static AkkaConfigurationBuilder WithShardRegion<TKey>(
IMessageExtractor messageExtractor,
ShardOptions shardOptions)
{
shardOptions.Apply(builder);
builder.AddHocon(
ClusterSharding.DefaultConfig().WithFallback(DistributedData.DistributedData.DefaultConfig()),
ClusterSharding.DefaultConfig()
.WithFallback(DistributedData.DistributedData.DefaultConfig())
.WithFallback(ClusterSingletonManager.DefaultConfig()),
HoconAddMode.Append);

return builder.StartActors(Resolver);

async Task Resolver(ActorSystem system, IActorRegistry registry, IDependencyResolver resolver)
{
var props = entityPropsFactory(system, registry, resolver);
var settings = ClusterShardingSettings.Create(system);
var shardingConfig = ConfigurationFactory.ParseString(shardOptions.ToString())
.WithFallback(system.Settings.Config.GetConfig("akka.cluster.sharding"));
var coordinatorConfig = system.Settings.Config.GetConfig(
shardingConfig.GetString("coordinator-singleton"));

var settings = ClusterShardingSettings.Create(shardingConfig, coordinatorConfig);
var allocationStrategy = ClusterSharding.Get(system).DefaultShardAllocationStrategy(settings);
var shardRegion = await ClusterSharding.Get(system).StartAsync(
typeName, props, settings, messageExtractor, allocationStrategy,
Expand Down Expand Up @@ -911,23 +918,29 @@ public static AkkaConfigurationBuilder WithShardRegion<TKey>(
ExtractShardId extractShardId,
ShardOptions shardOptions)
{
shardOptions.Apply(builder);
builder.AddHocon(
ClusterSharding.DefaultConfig().WithFallback(DistributedData.DistributedData.DefaultConfig()),
ClusterSharding.DefaultConfig()
.WithFallback(DistributedData.DistributedData.DefaultConfig())
.WithFallback(ClusterSingletonManager.DefaultConfig()),
HoconAddMode.Append);


return builder.StartActors(Resolver);

async Task Resolver(ActorSystem system, IActorRegistry registry, IDependencyResolver resolver)
{
var props = entityPropsFactory(system, registry, resolver);
var settings = ClusterShardingSettings.Create(system);
var shardingConfig = ConfigurationFactory.ParseString(shardOptions.ToString())
.WithFallback(system.Settings.Config.GetConfig("akka.cluster.sharding"));
var coordinatorConfig = system.Settings.Config.GetConfig(
shardingConfig.GetString("coordinator-singleton"));

var settings = ClusterShardingSettings.Create(shardingConfig, coordinatorConfig);
var allocationStrategy = ClusterSharding.Get(system).DefaultShardAllocationStrategy(settings);
var shardRegion = await ClusterSharding.Get(system).StartAsync(
typeName, props, settings, extractEntityId, extractShardId, allocationStrategy,
shardOptions.HandOffStopMessage ?? PoisonPill.Instance).ConfigureAwait(false);
registry.Register<TKey>(shardRegion);
}

return builder.StartActors(Resolver);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ namespace Akka.Cluster.Hosting
public sealed class ShardOptions
{
public ShardOptions() { }
[System.Obsolete("This property is not being applied to the ActorSystem anymore. Use manual HOCON c" +
"onfiguration to set \"akka.cluster.sharding.distributed-data\" values. Since v1.5." +
"27")]
public Akka.Cluster.Hosting.ShardingDDataOptions DistributedData { get; }
public bool? FailOnInvalidEntityStateTransition { get; set; }
public object? HandOffStopMessage { get; set; }
Expand All @@ -100,10 +103,12 @@ namespace Akka.Cluster.Hosting
public bool? RememberEntities { get; set; }
public Akka.Cluster.Sharding.RememberEntitiesStore? RememberEntitiesStore { get; set; }
public string? Role { get; set; }
public System.TimeSpan? ShardRegionQueryTimeout { get; set; }
public bool? ShouldPassivateIdleEntities { get; set; }
public Akka.Persistence.Hosting.SnapshotOptions? SnapshotOptions { get; set; }
public string? SnapshotPluginId { get; set; }
public Akka.Cluster.Sharding.StateStoreMode? StateStoreMode { get; set; }
public override string ToString() { }
}
public sealed class ShardingDDataOptions : Akka.Cluster.Hosting.DDataOptions
{
Expand Down

0 comments on commit 2e56b9f

Please sign in to comment.