Skip to content

Commit

Permalink
Akka.Actor: Context.Watch on FutureActorRef<T> creates memory lea…
Browse files Browse the repository at this point in the history
…ks (#7502)

* added repro for #7501

* added `Context.Watch` support to `FutureActorRef<T>`

close #7501

* added comment about `Unwatch`

* hardened death watch task listening

* fixed `AskSpec` `ISystemMessage` handling

* Update API approval list

---------

Co-authored-by: Gregorius Soedharmo <[email protected]>
  • Loading branch information
Aaronontheweb and Arkatufus authored Feb 14, 2025
1 parent 1b52686 commit 50b6cfe
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -926,6 +926,7 @@ namespace Akka.Actor
public override Akka.Actor.ActorPath Path { get; }
public override Akka.Actor.IActorRefProvider Provider { get; }
public virtual void DeliverAsk(object message, Akka.Actor.ICanTell destination) { }
public override void SendSystemMessage(Akka.Dispatch.SysMsg.ISystemMessage message) { }
protected override void TellInternal(object message, Akka.Actor.IActorRef sender) { }
}
public class static Futures
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -924,6 +924,7 @@ namespace Akka.Actor
public override Akka.Actor.ActorPath Path { get; }
public override Akka.Actor.IActorRefProvider Provider { get; }
public virtual void DeliverAsk(object message, Akka.Actor.ICanTell destination) { }
public override void SendSystemMessage(Akka.Dispatch.SysMsg.ISystemMessage message) { }
protected override void TellInternal(object message, Akka.Actor.IActorRef sender) { }
}
public class static Futures
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Tests/Actor/AskSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public SomeActor()
Sender.Tell(123);
break;
case "system":
Sender.Tell(new DummySystemMessage());
Sender.As<IInternalActorRef>().SendSystemMessage(new DummySystemMessage());
break;
}

Expand Down
64 changes: 64 additions & 0 deletions src/core/Akka.Tests/Actor/Bugfix7501Specs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// -----------------------------------------------------------------------
// <copyright file="Bugfix7501Specs.cs" company="Akka.NET Project">
// Copyright (C) 2009-2025 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2025 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System.Threading.Tasks;
using Akka.Actor;
using Akka.Actor.Dsl;
using Akka.TestKit;
using Akka.TestKit.TestActors;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Tests.Actor;

public class Bugfix7501Specs : AkkaSpec
{
public Bugfix7501Specs(ITestOutputHelper output) : base(output)
{

}

[Fact]
public async Task FutureActorRefShouldSupportDeathWatch()
{
// arrange
var customDeathWatchProbe = CreateTestProbe();
var watcher = Sys.ActorOf(act =>
{
act.Receive<string>((_, context) =>
{
// complete the Ask
context.Sender.Tell("hi");

// DeathWatch the FutureActorRef<T> BEFORE it completes
context.Watch(context.Sender);

// deliver the IActorRef of the Ask-er to TestActor
TestActor.Tell(context.Sender);
});

act.Receive<Terminated>((terminated, context) =>
{
// shut ourselves down to signal that we got our Terminated from FutureActorRef
context.Stop(context.Self);
});
});

// act
await customDeathWatchProbe.WatchAsync(watcher);
await watcher.Ask<string>("boo", RemainingOrDefault);
var futureActorRef = await ExpectMsgAsync<IActorRef>();
await WatchAsync(futureActorRef); // Ask is finished - should immediately dead-letter

// assert
await ExpectTerminatedAsync(futureActorRef);

// get the DeathWatch notification from the original actor
// this can only be received if the original actor got a Terminated message from FutureActorRef
await customDeathWatchProbe.ExpectTerminatedAsync(watcher);
}
}
62 changes: 58 additions & 4 deletions src/core/Akka/Actor/ActorRef.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,35 @@ public interface IRepointableRef : IActorRefScope
bool IsStarted { get; }
}

/// <summary>
/// INTERNAL API - didn't want static helper methods declared inside generic class
/// </summary>
internal static class FutureActorRefDeathWatchSupport
{
internal static async Task ScheduleDeathWatch(IInternalActorRef notifier, IActorRef self, Task completionTask)
{
try
{
await completionTask;
}
catch
{
// we don't do error handling for this - we do not care
}
finally
{
// regardless of whether we succeeded or failed, we notify watchers
notifier.SendSystemMessage(TerminatedFor(self));
}

}

internal static DeathWatchNotification TerminatedFor(IActorRef self)
{
return new DeathWatchNotification(self, true, false);
}
}

/// <summary>
/// INTERNAL API.
///
Expand Down Expand Up @@ -110,9 +139,6 @@ protected override void TellInternal(object message, IActorRef sender)

switch (message)
{
case ISystemMessage msg:
handled = _result.TrySetException(new InvalidOperationException($"system message of type '{msg.GetType().Name}' is invalid for {nameof(FutureActorRef<T>)}"));
break;
case T t:
handled = _result.TrySetResult(t);
break;
Expand Down Expand Up @@ -140,7 +166,35 @@ protected override void TellInternal(object message, IActorRef sender)
if (!handled && !_result.Task.IsCanceled)
_provider.DeadLetters.Tell(message ?? default(T), this);
}


public override void SendSystemMessage(ISystemMessage message)
{
if (message is Watch watch)
{
if (_result.Task.IsCompleted)
{
watch.Watcher.SendSystemMessage(FutureActorRefDeathWatchSupport.TerminatedFor(this));
}
else
{
_ = FutureActorRefDeathWatchSupport.ScheduleDeathWatch(watch.Watcher, watch.Watchee, _result.Task);
}

}
else if (message is Unwatch unwatch)
{
// we're not going to support Unwatch - watchers
// already have to handle scenarios where the Unwatch arrives too late
// anyway, so we're just going to treat this like that in order to keep
// state management as simple as possible
}
else
{
// TODO: blow up the caller here by just throwing the exception at the callsite?
_result.TrySetException(new InvalidOperationException($"system message of type '{message.GetType().Name}' is invalid for {nameof(FutureActorRef<T>)}"));
}
}

public virtual void DeliverAsk(object message, ICanTell destination){
destination.Tell(message, this);
}
Expand Down

0 comments on commit 50b6cfe

Please sign in to comment.