diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/Internal/RememberEntitiesStarterSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/Internal/RememberEntitiesStarterSpec.cs index 31804d52621..0bbf64c2484 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/Internal/RememberEntitiesStarterSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/Internal/RememberEntitiesStarterSpec.cs @@ -6,8 +6,10 @@ //----------------------------------------------------------------------- using System; +using System.Collections.Generic; using System.Collections.Immutable; using System.Linq; +using System.Threading.Tasks; using Akka.Actor; using Akka.Cluster.Sharding.Internal; using Akka.Cluster.Tools.Singleton; @@ -15,6 +17,7 @@ using Akka.TestKit; using Akka.TestKit.Xunit2.Attributes; using FluentAssertions; +using FluentAssertions.Extensions; using Xunit; using Xunit.Abstractions; @@ -140,7 +143,8 @@ public void RememberEntitiesStarter_must_inform_the_shard_when_entities_has_been ExpectTerminated(rememberEntityStarter); } - [LocalFact(SkipLocal = "Racy in Azure AzDo, strict timing does not work well on AzDo")] + // TODO: check the timing code to make sure that this actually works, it was flaky/racy even when run locally. + [LocalFact(SkipLocal = "Racy unit test, suspected bad code underneath")] public void RememberEntitiesStarter_must_try_start_all_entities_in_a_throttled_way_with_entity_recovery_strategy_constant() { var regionProbe = CreateTestProbe(); diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Core.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Core.verified.txt index 49f702c590b..df6b1461b8e 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Core.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Core.verified.txt @@ -2134,6 +2134,11 @@ namespace Akka.Actor.Internal public UnboundedStashImpl(Akka.Actor.IActorContext context) { } } } +namespace Akka.Actor.Scheduler +{ + [Akka.Annotations.InternalApiAttribute()] + public interface IScheduledTellMsg : Akka.Actor.INoSerializationVerificationNeeded, Akka.Actor.IWrappedMessage { } +} namespace Akka.Actor.Setup { public sealed class ActorSystemSetup diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt index a61e5263e79..c4378d979e3 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt @@ -2161,6 +2161,11 @@ namespace Akka.Actor.Internal public override int Capacity { get; } } } +namespace Akka.Actor.Scheduler +{ + [Akka.Annotations.InternalApiAttribute()] + public interface IScheduledTellMsg : Akka.Actor.INoSerializationVerificationNeeded, Akka.Actor.IWrappedMessage { } +} namespace Akka.Actor.Setup { public sealed class ActorSystemSetup diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt index 56cd25e4edd..bb05355c7fa 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt @@ -2159,6 +2159,11 @@ namespace Akka.Actor.Internal public override int Capacity { get; } } } +namespace Akka.Actor.Scheduler +{ + [Akka.Annotations.InternalApiAttribute()] + public interface IScheduledTellMsg : Akka.Actor.INoSerializationVerificationNeeded, Akka.Actor.IWrappedMessage { } +} namespace Akka.Actor.Setup { public sealed class ActorSystemSetup diff --git a/src/core/Akka.Remote/RemoteActorRefProvider.cs b/src/core/Akka.Remote/RemoteActorRefProvider.cs index 5cb9553b5de..4615c5d6545 100644 --- a/src/core/Akka.Remote/RemoteActorRefProvider.cs +++ b/src/core/Akka.Remote/RemoteActorRefProvider.cs @@ -799,6 +799,7 @@ public RemoteDeadLetterActorRef(IActorRefProvider provider, ActorPath actorPath, protected override void TellInternal(object message, IActorRef sender) { var deadLetter = message as DeadLetter; + if (message is EndpointManager.Send send) { if (send.Seq == null) diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowSplitWhenSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowSplitWhenSpec.cs index 968abbf0ee5..b000a7fb494 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowSplitWhenSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowSplitWhenSpec.cs @@ -14,13 +14,15 @@ using Akka.Streams.Implementation; using Akka.Streams.TestKit; using Akka.TestKit; +using Akka.TestKit.Extensions; using Akka.TestKit.Xunit2.Attributes; using FluentAssertions; using Reactive.Streams; using Xunit; using Xunit.Abstractions; -// ReSharper disable InvokeAsExtensionMethod +using static FluentAssertions.FluentActions; +#nullable enable namespace Akka.Streams.Tests.Dsl { public class FlowSplitWhenSpec : AkkaSpec @@ -53,39 +55,35 @@ public StreamPuppet(IPublisher p, TestKitBase kit) public void Request(int demand) => _subscription.Request(demand); public async Task ExpectNextAsync(int element) => await _probe.ExpectNextAsync(element); - - public void ExpectNext(int element) => _probe.ExpectNext(element); - + public async Task ExpectNoMsgAsync(TimeSpan max) => await _probe.ExpectNoMsgAsync(max); - - public void ExpectNoMsg(TimeSpan max) => _probe.ExpectNoMsg(max); - + public async Task ExpectCompleteAsync() => await _probe.ExpectCompleteAsync(); - public void ExpectComplete() => _probe.ExpectComplete(); - - public void ExpectError(Exception ex) => _probe.ExpectError().Should().Be(ex); + + public async Task ExpectErrorAsync(Exception ex) => (await _probe.ExpectErrorAsync()).Should().Be(ex); public void Cancel() => _subscription.Cancel(); } - private async Task WithSubstreamsSupportAsync(int splitWhen = 3, int elementCount = 6, + private async Task WithSubstreamsSupportAsync( + int splitWhen = 3, + int elementCount = 6, SubstreamCancelStrategy substreamCancelStrategy = SubstreamCancelStrategy.Drain, - Action>, ISubscription, Func>> run = null) + Func>, ISubscription, Func>>, Task>? run = null) { - var source = Source.From(Enumerable.Range(1, elementCount)); var groupStream = source.SplitWhen(substreamCancelStrategy, i => i == splitWhen) .Lift() .RunWith(Sink.AsPublisher>(false), Materializer); - var masterSubscriber = TestSubscriber.CreateManualSubscriberProbe>(this); + var masterSubscriber = this.CreateManualSubscriberProbe>(); groupStream.Subscribe(masterSubscriber); var masterSubscription = await masterSubscriber.ExpectSubscriptionAsync(); - run?.Invoke(masterSubscriber, masterSubscription, () => + run?.Invoke(masterSubscriber, masterSubscription, async () => { masterSubscription.Request(1); - return masterSubscriber.ExpectNext(); + return await masterSubscriber.ExpectNextAsync(); }); } @@ -94,33 +92,33 @@ public async Task SplitWhen_must_work_in_the_happy_case() { await this.AssertAllStagesStoppedAsync(async () => { await WithSubstreamsSupportAsync(elementCount: 4, - run: (masterSubscriber, masterSubscription, getSubFlow) => + run: async (masterSubscriber, masterSubscription, getSubFlow) => { - var s1 = new StreamPuppet(getSubFlow() + var s1 = new StreamPuppet((await getSubFlow()) .RunWith(Sink.AsPublisher(false), Materializer), this); - masterSubscriber.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + await masterSubscriber.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); s1.Request(2); - s1.ExpectNext(1); - s1.ExpectNext(2); + await s1.ExpectNextAsync(1); + await s1.ExpectNextAsync(2); s1.Request(1); - s1.ExpectComplete(); + await s1.ExpectCompleteAsync(); - var s2 = new StreamPuppet(getSubFlow() + var s2 = new StreamPuppet((await getSubFlow()) .RunWith(Sink.AsPublisher(false), Materializer), this); - masterSubscriber.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + await masterSubscriber.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); s2.Request(1); - s2.ExpectNext(3); - s2.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + await s2.ExpectNextAsync(3); + await s2.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); s2.Request(1); - s2.ExpectNext(4); + await s2.ExpectNextAsync(4); s2.Request(1); - s2.ExpectComplete(); + await s2.ExpectCompleteAsync(); masterSubscription.Request(1); - masterSubscriber.ExpectComplete(); + await masterSubscriber.ExpectCompleteAsync(); }); }, Materializer); } @@ -128,19 +126,17 @@ await WithSubstreamsSupportAsync(elementCount: 4, [Fact] public async Task SplitWhen_must_not_emit_substreams_if_the_parent_stream_is_empty() { - await this.AssertAllStagesStoppedAsync(() => { - var task = - Source.Empty() - .SplitWhen(_ => true) - .Lift() - .SelectAsync(1, s => s.RunWith(Sink.FirstOrDefault(), Materializer)) - .Grouped(10) - .RunWith(Sink.FirstOrDefault>(), - Materializer); - task.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); - task.Result.Should().BeEquivalentTo(default(IEnumerable)); - return Task.CompletedTask; - }, Materializer); + await this.AssertAllStagesStoppedAsync(async () => + { + var result = await Source.Empty() + .SplitWhen(_ => true) + .Lift() + .SelectAsync(1, s => s.RunWith(Sink.FirstOrDefault(), Materializer)) + .Grouped(10) + .RunWith(Sink.FirstOrDefault>(), Materializer); + result.Should().BeEquivalentTo(default(IEnumerable)); + }, Materializer) + .ShouldCompleteWithin(RemainingOrDefault); } [Fact] @@ -150,7 +146,7 @@ await this.AssertAllStagesStoppedAsync(async () => { await WithSubstreamsSupportAsync(1, 3, run: async (masterSubscriber, masterSubscription, getSubFlow) => { - var s1 = new StreamPuppet(getSubFlow() + var s1 = new StreamPuppet((await getSubFlow()) .RunWith(Sink.AsPublisher(false), Materializer), this); s1.Request(5); @@ -162,7 +158,8 @@ await WithSubstreamsSupportAsync(1, 3, masterSubscription.Request(1); await masterSubscriber.ExpectCompleteAsync(); }); - }, Materializer); + }, Materializer) + .ShouldCompleteWithin(RemainingOrDefault); } [Fact] @@ -172,10 +169,10 @@ await this.AssertAllStagesStoppedAsync(async () => { await WithSubstreamsSupportAsync(5, 8, run: async (masterSubscriber, masterSubscription, getSubFlow) => { - var s1 = new StreamPuppet(getSubFlow() + var s1 = new StreamPuppet((await getSubFlow()) .RunWith(Sink.AsPublisher(false), Materializer), this); s1.Cancel(); - var s2 = new StreamPuppet(getSubFlow() + var s2 = new StreamPuppet((await getSubFlow()) .RunWith(Sink.AsPublisher(false), Materializer), this); s2.Request(4); @@ -189,8 +186,8 @@ await WithSubstreamsSupportAsync(5, 8, masterSubscription.Request(1); await masterSubscriber.ExpectCompleteAsync(); }); - return Task.CompletedTask; - }, Materializer); + }, Materializer) + .ShouldCompleteWithin(RemainingOrDefault); } [Fact] @@ -259,7 +256,8 @@ await this.AssertAllStagesStoppedAsync(async () => { masterStream3.Cancel(); await inputs3.ExpectCancellationAsync(); - }, Materializer); + }, Materializer) + .ShouldCompleteWithin(RemainingOrDefault); } [Fact] @@ -269,7 +267,7 @@ await this.AssertAllStagesStoppedAsync(async () => { await WithSubstreamsSupportAsync(5, 8, run: async (_, masterSubscription, getSubFlow) => { - var s1 = new StreamPuppet(getSubFlow() + var s1 = new StreamPuppet((await getSubFlow()) .RunWith(Sink.AsPublisher(false), Materializer), this); masterSubscription.Cancel(); @@ -281,7 +279,8 @@ await WithSubstreamsSupportAsync(5, 8, s1.Request(1); await s1.ExpectCompleteAsync(); }); - }, Materializer); + }, Materializer) + .ShouldCompleteWithin(RemainingOrDefault); } [Fact] @@ -317,86 +316,83 @@ await this.AssertAllStagesStoppedAsync(async () => { upstreamSubscription.SendNext(3); - subscriber.ExpectError().Should().Be(ex); - substreamPuppet.ExpectError(ex); + (await subscriber.ExpectErrorAsync()).Should().Be(ex); + await substreamPuppet.ExpectErrorAsync(ex); await upstreamSubscription.ExpectCancellationAsync(); - }, Materializer); + }, Materializer) + .ShouldCompleteWithin(RemainingOrDefault); } [Fact] public async Task SplitWhen_must_work_with_single_element_splits() { - await this.AssertAllStagesStoppedAsync(() => { - var task = Source.From(Enumerable.Range(1, 100)) - .SplitWhen(_ => true) - .Lift() - .SelectAsync(1, s => s.RunWith(Sink.First(), Materializer)) // Please note that this line *also* implicitly asserts nonempty substreams - .Grouped(200) - .RunWith(Sink.First>(), Materializer); - task.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); - task.Result.Should().BeEquivalentTo(Enumerable.Range(1, 100)); - return Task.CompletedTask; - }, Materializer); + await this.AssertAllStagesStoppedAsync(async () => { + var result = await Source.From(Enumerable.Range(1, 100)) + .SplitWhen(_ => true) + .Lift() + .SelectAsync(1, s => s.RunWith(Sink.First(), Materializer)) // Please note that this line *also* implicitly asserts nonempty substreams + .Grouped(200) + .RunWith(Sink.First>(), Materializer); + result.Should().BeEquivalentTo(Enumerable.Range(1, 100)); + }, Materializer) + .ShouldCompleteWithin(RemainingOrDefault); } [LocalFact(SkipLocal = "Racy on Azure DevOps")] public async Task SplitWhen_must_fail_substream_if_materialized_twice() { - await this.AssertAllStagesStoppedAsync(() => { - var task = Source.Single(1).SplitWhen(_ => true).Lift() - .SelectAsync(1, source => + await this.AssertAllStagesStoppedAsync(async () => { - source.RunWith(Sink.Ignore(), Materializer); - // Sink.ignore+mapAsync pipes error back - return Task.Run(() => - { - source.RunWith(Sink.Ignore(), Materializer).Wait(TimeSpan.FromSeconds(3)); - return 1; - }); - }) - .RunWith(Sink.Ignore(), Materializer); - task.Invoking(t => t.Wait(TimeSpan.FromSeconds(3))) - .Should().Throw(); - return Task.CompletedTask; - }, Materializer); + await Awaiting(async () => + await Source.Single(1) + .SplitWhen(_ => true) + .Lift() + .SelectAsync(1, source => + { + source.RunWith(Sink.Ignore(), Materializer); + // Sink.ignore+mapAsync pipes error back + return Task.Run(() => + { + source.RunWith(Sink.Ignore(), Materializer).Wait(TimeSpan.FromSeconds(3)); + return 1; + }); + }) + .RunWith(Sink.Ignore(), Materializer) + ).Should().ThrowAsync(); + }, Materializer) + .ShouldCompleteWithin(RemainingOrDefault); } [Fact] public async Task SplitWhen_must_fail_stream_if_substream_not_materialized_in_time() { - await this.AssertAllStagesStoppedAsync(() => { + await this.AssertAllStagesStoppedAsync(async () => { var tightTimeoutMaterializer = ActorMaterializer.Create(Sys, ActorMaterializerSettings.Create(Sys) - .WithSubscriptionTimeoutSettings( - new StreamSubscriptionTimeoutSettings( - StreamSubscriptionTimeoutTerminationMode.CancelTermination, - TimeSpan.FromMilliseconds(500)))); - var testSource = - Source.Single(1) - .MapMaterializedValue>(_ => null) - .Concat(Source.Maybe()) - .SplitWhen(_ => true); - Action action = () => + .WithSubscriptionTimeoutSettings( + new StreamSubscriptionTimeoutSettings( + StreamSubscriptionTimeoutTerminationMode.CancelTermination, + TimeSpan.FromMilliseconds(500)))); + var testSource = Source.Single(1) + .MapMaterializedValue>(_ => null) + .Concat(Source.Maybe()) + .SplitWhen(_ => true); + + await Awaiting(async () => { - var task = - testSource.Lift() - .Delay(TimeSpan.FromSeconds(1)) - .ConcatMany(s => s.MapMaterializedValue>(_ => null)) - .RunWith(Sink.Ignore(), tightTimeoutMaterializer); - task.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); - }; - - action.Should().Throw(); - return Task.CompletedTask; - }, Materializer); + await testSource.Lift() + .Delay(TimeSpan.FromSeconds(1)) + .ConcatMany(s => s.MapMaterializedValue>(_ => null)) + .RunWith(Sink.Ignore(), tightTimeoutMaterializer); + }).Should().ThrowAsync(); + }, Materializer) + .ShouldCompleteWithin(RemainingOrDefault); } [Fact(Skip = "Supervision is not supported fully by GraphStages yet")] public async Task SplitWhen_must_resume_stream_when_splitWhen_function_throws() { - await this.AssertAllStagesStoppedAsync(() => { - return Task.CompletedTask; - }, Materializer); + await this.AssertAllStagesStoppedAsync(() => Task.CompletedTask, Materializer); } [Fact] @@ -406,18 +402,18 @@ await this.AssertAllStagesStoppedAsync(async () => { var up = this.CreateManualPublisherProbe(); var down = this.CreateManualSubscriberProbe>(); - var flowSubscriber = - Source.AsSubscriber() - .SplitWhen(i => i % 3 == 0) - .Lift() - .To(Sink.FromSubscriber(down)) - .Run(Materializer); + var flowSubscriber = Source.AsSubscriber() + .SplitWhen(i => i % 3 == 0) + .Lift() + .To(Sink.FromSubscriber(down)) + .Run(Materializer); var downstream = await down.ExpectSubscriptionAsync(); downstream.Cancel(); up.Subscribe(flowSubscriber); var upSub = await up.ExpectSubscriptionAsync(); await upSub.ExpectCancellationAsync(); - }, Materializer); + }, Materializer) + .ShouldCompleteWithin(RemainingOrDefault); } [Fact] @@ -427,15 +423,14 @@ await this.AssertAllStagesStoppedAsync(async () => { await WithSubstreamsSupportAsync(5, 8, SubstreamCancelStrategy.Propagate, async (masterSubscriber, _, expectSubFlow) => { - var s1 = new StreamPuppet(expectSubFlow() + var s1 = new StreamPuppet((await expectSubFlow()) .RunWith(Sink.AsPublisher(false), Materializer), this); s1.Cancel(); await masterSubscriber.ExpectCompleteAsync(); - }); - return Task.CompletedTask; - }, Materializer); + }, Materializer) + .ShouldCompleteWithin(RemainingOrDefault); } } } diff --git a/src/core/Akka/Actor/ActorBase.cs b/src/core/Akka/Actor/ActorBase.cs index ddcfdf717af..091e6625be9 100644 --- a/src/core/Akka/Actor/ActorBase.cs +++ b/src/core/Akka/Actor/ActorBase.cs @@ -7,6 +7,7 @@ using System; using Akka.Actor.Internal; +using Akka.Actor.Scheduler; using Akka.Event; namespace Akka.Actor @@ -178,9 +179,9 @@ protected static IActorContext Context /// TBD protected internal virtual bool AroundReceive(Receive receive, object message) { - if (message is Scheduler.TimerScheduler.ITimerMsg tm) + if (message is TimerScheduler.ITimerMsg tm) { - if (this is IWithTimers withTimers && withTimers.Timers is Scheduler.TimerScheduler timers) + if (this is IWithTimers { Timers: TimerScheduler timers }) { switch (timers.InterceptTimerMsg(Context.System.Log, tm)) { @@ -192,7 +193,7 @@ protected internal virtual bool AroundReceive(Receive receive, object message) // discard return true; - case object m: + case var m: if (this is IActorStash) { var actorCell = (ActorCell)Context; @@ -242,8 +243,7 @@ protected internal virtual bool AroundReceive(Receive receive, object message) /// protected virtual void Unhandled(object message) { - var terminatedMessage = message as Terminated; - if (terminatedMessage != null) + if (message is Terminated terminatedMessage) { throw new DeathPactException(terminatedMessage.ActorRef); } diff --git a/src/core/Akka/Actor/ActorCell.DefaultMessages.cs b/src/core/Akka/Actor/ActorCell.DefaultMessages.cs index 14e01309757..7a28c5c4ce0 100644 --- a/src/core/Akka/Actor/ActorCell.DefaultMessages.cs +++ b/src/core/Akka/Actor/ActorCell.DefaultMessages.cs @@ -14,6 +14,7 @@ using Akka.Event; using Debug = Akka.Event.Debug; using System.Globalization; +using Akka.Actor.Scheduler; namespace Akka.Actor { @@ -55,14 +56,16 @@ public int CurrentEnvelopeId /// > public void Invoke(Envelope envelope) { - var message = envelope.Message; - var influenceReceiveTimeout = !(message is INotInfluenceReceiveTimeout); + if (message is IScheduledTellMsg scheduled) + message = scheduled.Message; + + var influenceReceiveTimeout = message is not INotInfluenceReceiveTimeout; try { // Akka JVM doesn't have these lines - CurrentMessage = envelope.Message; + CurrentMessage = message; _currentEnvelopeId++; if (_currentEnvelopeId == int.MaxValue) _currentEnvelopeId = 0; @@ -72,6 +75,7 @@ public void Invoke(Envelope envelope) { CancelReceiveTimeout(); } + if (message is IAutoReceivedMessage) { @@ -79,7 +83,8 @@ public void Invoke(Envelope envelope) } else { - ReceiveMessage(message); + // Intentional, we want to preserve IScheduledMsg + ReceiveMessage(envelope.Message); } CurrentMessage = null; } @@ -105,24 +110,7 @@ private IActorRef MatchSender(Envelope envelope) var sender = envelope.Sender; return sender ?? System.DeadLetters; } - - - /* - def autoReceiveMessage(msg: Envelope): Unit = { - if (system.settings.DebugAutoReceive) - publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg)) - - msg.message match { - case t: Terminated ⇒ receivedTerminated(t) - case AddressTerminated(address) ⇒ addressTerminated(address) - case Kill ⇒ throw new ActorKilledException("Kill") - case PoisonPill ⇒ self.stop() - case sel: ActorSelectionMessage ⇒ receiveSelection(sel) - case Identify(messageId) ⇒ sender() ! ActorIdentity(messageId, Some(self)) - } - } - */ - + /// /// TBD /// @@ -133,15 +121,16 @@ msg.message match { protected internal virtual void AutoReceiveMessage(Envelope envelope) { var message = envelope.Message; + if (message is IScheduledTellMsg scheduled) + message = scheduled.Message; var actor = _actor; - var actorType = actor != null ? actor.GetType() : null; + var actorType = actor?.GetType(); if (System.Settings.DebugAutoReceive) Publish(new Debug(Self.Path.ToString(), actorType, "received AutoReceiveMessage " + message)); - var m = envelope.Message; - switch (m) + switch (message) { case Terminated terminated: ReceivedTerminated(terminated); @@ -190,6 +179,9 @@ public void ReceiveMessageForTest(Envelope envelope) /// The message that will be sent to the actor. protected virtual void ReceiveMessage(object message) { + if (message is IScheduledTellMsg scheduled) + message = scheduled.Message; + var wasHandled = _actor.AroundReceive(_state.GetCurrentBehavior(), message); if (System.Settings.AddLoggingReceive && _actor is ILogReceive) diff --git a/src/core/Akka/Actor/ActorRef.cs b/src/core/Akka/Actor/ActorRef.cs index fabab41f25c..203b72afa33 100644 --- a/src/core/Akka/Actor/ActorRef.cs +++ b/src/core/Akka/Actor/ActorRef.cs @@ -14,6 +14,7 @@ using System.Threading; using System.Threading.Tasks; using Akka.Actor.Internal; +using Akka.Actor.Scheduler; using Akka.Annotations; using Akka.Dispatch.SysMsg; using Akka.Event; @@ -106,7 +107,7 @@ public FutureActorRef(TaskCompletionSource result, ActorPath path, IActorRefP protected override void TellInternal(object message, IActorRef sender) { var handled = false; - + switch (message) { case ISystemMessage msg: diff --git a/src/core/Akka/Actor/Scheduler/HashedWheelTimerScheduler.cs b/src/core/Akka/Actor/Scheduler/HashedWheelTimerScheduler.cs index 388f8562956..6f225f11164 100644 --- a/src/core/Akka/Actor/Scheduler/HashedWheelTimerScheduler.cs +++ b/src/core/Akka/Actor/Scheduler/HashedWheelTimerScheduler.cs @@ -10,6 +10,7 @@ using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; +using Akka.Actor.Scheduler; using Akka.Configuration; using Akka.Dispatch; using Akka.Event; @@ -548,7 +549,11 @@ private sealed class ScheduledTell : IRunnable public ScheduledTell(ICanTell receiver, object message, IActorRef sender) { _receiver = receiver; - _message = message; + _message = receiver is not ActorRefWithCell + ? message + : message is INotInfluenceReceiveTimeout + ? new ScheduledTellMsgNoInfluenceReceiveTimeout(message) + : new ScheduledTellMsg(message); _sender = sender; } diff --git a/src/core/Akka/Actor/Scheduler/IScheduledTellMsg.cs b/src/core/Akka/Actor/Scheduler/IScheduledTellMsg.cs new file mode 100644 index 00000000000..3aa33606dfc --- /dev/null +++ b/src/core/Akka/Actor/Scheduler/IScheduledTellMsg.cs @@ -0,0 +1,47 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2023 Lightbend Inc. +// Copyright (C) 2013-2023 .NET Foundation +// +//----------------------------------------------------------------------- + +using Akka.Annotations; + +namespace Akka.Actor.Scheduler; + +/// +/// Marker interface used to indicate the presence of a scheduled message from the +/// classic scheduler API. +/// +/// +/// Made public so these messages can be filtered for telemetry purposes +/// +[InternalApi] +public interface IScheduledTellMsg : IWrappedMessage, INoSerializationVerificationNeeded +{ +} + +/// +/// INTERNAL API +/// +internal sealed class ScheduledTellMsg : IScheduledTellMsg +{ + public ScheduledTellMsg(object message) + { + Message = message; + } + public object Message { get; } +} + +/// +/// INTERNAL API +/// +internal sealed class ScheduledTellMsgNoInfluenceReceiveTimeout : IScheduledTellMsg, INotInfluenceReceiveTimeout +{ + public ScheduledTellMsgNoInfluenceReceiveTimeout(object message) + { + Message = message; + } + + public object Message { get; } +} \ No newline at end of file