diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/Delivery/ClusterShardingDeliveryGracefulShutdownSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/Delivery/ClusterShardingDeliveryGracefulShutdownSpec.cs new file mode 100644 index 00000000000..b019110225e --- /dev/null +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/Delivery/ClusterShardingDeliveryGracefulShutdownSpec.cs @@ -0,0 +1,292 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2023 Lightbend Inc. +// Copyright (C) 2013-2023 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Collections.Immutable; +using System.Linq; +using Akka.Actor; +using Akka.Cluster.Sharding.Delivery; +using Akka.Configuration; +using Akka.MultiNode.TestAdapter; +using Akka.Remote.TestKit; +using Akka.Util; +using FluentAssertions; + +namespace Akka.Cluster.Sharding.Tests.MultiNode.Delivery +{ + public class ClusterShardingDeliveryGracefulShutdownSpecConfig : MultiNodeClusterShardingConfig + { + public RoleName First { get; } + public RoleName Second { get; } + + public ClusterShardingDeliveryGracefulShutdownSpecConfig(StateStoreMode mode) + : base(mode: mode, loglevel: "DEBUG", additionalConfig: @" +# don't leak ddata state across runs +akka.cluster.sharding.distributed-data.durable.keys = [] +akka.reliable-delivery.sharding.consumer-controller.allow-bypass = true +") + { + First = Role("first"); + Second = Role("second"); + } + } + + public class PersistentClusterShardingDeliveryGracefulShutdownSpecConfig : ClusterShardingDeliveryGracefulShutdownSpecConfig + { + public PersistentClusterShardingDeliveryGracefulShutdownSpecConfig() + : base(StateStoreMode.Persistence) + { + } + } + + public class DDataClusterShardingDeliveryGracefulShutdownSpecConfig : ClusterShardingDeliveryGracefulShutdownSpecConfig + { + public DDataClusterShardingDeliveryGracefulShutdownSpecConfig() + : base(StateStoreMode.DData) + { + } + } + + public class PersistentClusterShardingDeliveryGracefulShutdownSpec : ClusterShardingDeliveryGracefulShutdownSpec + { + public PersistentClusterShardingDeliveryGracefulShutdownSpec() + : base(new PersistentClusterShardingDeliveryGracefulShutdownSpecConfig(), typeof(PersistentClusterShardingDeliveryGracefulShutdownSpec)) + { + } + } + + public class DDataClusterShardingDeliveryGracefulShutdownSpec : ClusterShardingDeliveryGracefulShutdownSpec + { + public DDataClusterShardingDeliveryGracefulShutdownSpec() + : base(new DDataClusterShardingDeliveryGracefulShutdownSpecConfig(), typeof(DDataClusterShardingDeliveryGracefulShutdownSpec)) + { + } + } + + public abstract class ClusterShardingDeliveryGracefulShutdownSpec : MultiNodeClusterShardingSpec + { + #region setup + + public class TerminationOrderActor : ActorBase + { + public class RegionTerminated + { + public static RegionTerminated Instance = new(); + + private RegionTerminated() + { + } + } + + public class CoordinatorTerminated + { + public static CoordinatorTerminated Instance = new(); + + private CoordinatorTerminated() + { + } + } + + public static Props Props(IActorRef probe, IActorRef coordinator, IActorRef region) + { + return Actor.Props.Create(() => new TerminationOrderActor(probe, coordinator, region)); + } + + private readonly IActorRef _probe; + private readonly IActorRef _coordinator; + private readonly IActorRef _region; + + public TerminationOrderActor(IActorRef probe, IActorRef coordinator, IActorRef region) + { + _probe = probe; + _coordinator = coordinator; + _region = region; + + Context.Watch(coordinator); + Context.Watch(region); + } + + protected override bool Receive(object message) + { + switch (message) + { + case Terminated t when t.ActorRef.Equals(_coordinator): + _probe.Tell(CoordinatorTerminated.Instance); + return true; + + case Terminated t when t.ActorRef.Equals(_region): + _probe.Tell(RegionTerminated.Instance); + return true; + } + return false; + } + } + + private sealed class MessageExtractor: IMessageExtractor + { + public string EntityId(object message) + => message switch + { + SlowStopConsumerEntity.Job j => j.Payload.ToString(), + _ => null + }; + + public object EntityMessage(object message) + => message; + + public string ShardId(object message) + => message switch + { + SlowStopConsumerEntity.Job j => j.Payload.ToString(), + _ => null + }; + + public string ShardId(string entityId, object messageHint = null) + => entityId; + } + + private const string TypeName = "SlowStopEntity"; + private IActorRef _producer; + private IActorRef _producerController; + + protected ClusterShardingDeliveryGracefulShutdownSpec(ClusterShardingDeliveryGracefulShutdownSpecConfig config, Type type) + : base(config, type) + { + } + + private IActorRef CreateProducer(string producerId) + { + _producerController = + Sys.ActorOf( + ShardingProducerController.Create( + producerId: producerId, + shardRegion: ClusterSharding.Get(Sys).ShardRegion(TypeName), + durableQueue: Option.None, + settings: ShardingProducerController.Settings.Create(Sys)), + "shardingProducerController"); + _producer = Sys.ActorOf(Props.Create(() => new TestShardingProducer(_producerController, TestActor)), + "producer"); + return _producer; + } + + private IActorRef StartSharding() + { + return ClusterSharding.Get(Sys).Start( + typeName: TypeName, + entityPropsFactory: e => ShardingConsumerController.Create( + c => Props.Create(() => new SlowStopConsumerEntity(e, c)), + ShardingConsumerController.Settings.Create(Sys)), + settings: Settings.Value.WithRole(null), + messageExtractor: new MessageExtractor(), + allocationStrategy: ShardAllocationStrategy.LeastShardAllocationStrategy(absoluteLimit: 2, relativeLimit: 1.0), + handOffStopMessage: SlowStopConsumerEntity.Stop.Instance); + } + + #endregion + + [MultiNodeFact] + public void ClusterShardingGracefulShutdownSpecs() + { + Cluster_sharding_must_join_cluster(); + Cluster_sharding_must_start_some_shards_in_both_regions(); + Cluster_sharding_must_gracefully_shutdown_the_oldest_region(); + } + + private void Cluster_sharding_must_join_cluster() + { + StartPersistenceIfNeeded(startOn: Config.First, Config.First, Config.Second); + + Join(Config.First, Config.First); + Join(Config.Second, Config.First); + + // make sure all nodes are up + AwaitAssert(() => + { + Cluster.Get(Sys).SendCurrentClusterState(TestActor); + ExpectMsg().Members.Count.Should().Be(2); + }); + + RunOn(() => + { + StartSharding(); + }, Config.First); + + RunOn(() => + { + StartSharding(); + }, Config.Second); + + EnterBarrier("sharding started"); + } + + private void Cluster_sharding_must_start_some_shards_in_both_regions() + { + RunOn(() => + { + var producer = CreateProducer("p-1"); + Within(TimeSpan.FromSeconds(30), () => + { + var regionAddresses = Enumerable.Range(1, 20).Select(n => + { + producer.Tell(n, TestActor); + ExpectMsg(n, TimeSpan.FromSeconds(1)); + return LastSender.Path.Address; + }).ToImmutableHashSet(); + + regionAddresses.Count.Should().Be(2); + }); + }, Config.First); + + EnterBarrier("after-2"); + } + + private void Cluster_sharding_must_gracefully_shutdown_the_oldest_region() + { + Within(TimeSpan.FromSeconds(30), () => + { + RunOn(() => + { + IActorRef coordinator = null; + AwaitAssert(() => + { + coordinator = Sys + .ActorSelection($"/system/sharding/{TypeName}Coordinator/singleton/coordinator") + .ResolveOne(RemainingOrDefault).Result; + }); + var terminationProbe = CreateTestProbe(); + var region = ClusterSharding.Get(Sys).ShardRegion(TypeName); + Sys.ActorOf(TerminationOrderActor.Props(terminationProbe.Ref, coordinator, region)); + + // trigger graceful shutdown + Cluster.Leave(GetAddress(Config.First)); + + // region first + terminationProbe.ExpectMsg(); + terminationProbe.ExpectMsg(); + }, Config.First); + + EnterBarrier("terminated"); + + RunOn(() => + { + var producer = CreateProducer("p-2"); + AwaitAssert(() => + { + var responses = Enumerable.Range(1, 20).Select(n => + { + producer.Tell(n, TestActor); + return ExpectMsg(n, TimeSpan.FromSeconds(1)); + }).ToImmutableHashSet(); + + responses.Count.Should().Be(20); + }); + }, Config.Second); + EnterBarrier("done-o"); + }); + } + } +} diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/Delivery/SlowStopConsumerEntity.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/Delivery/SlowStopConsumerEntity.cs new file mode 100644 index 00000000000..155e0924b93 --- /dev/null +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/Delivery/SlowStopConsumerEntity.cs @@ -0,0 +1,61 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2023 Lightbend Inc. +// Copyright (C) 2013-2023 .NET Foundation +// +// ----------------------------------------------------------------------- +#nullable enable +using System; +using Akka.Actor; +using Akka.Delivery; +using Akka.Event; + +namespace Akka.Cluster.Sharding.Tests.MultiNode.Delivery; + +/// +/// INTERNAL API +/// +public sealed class SlowStopConsumerEntity : ReceiveActor, IWithTimers +{ + private readonly IActorRef _consumerController; + + public SlowStopConsumerEntity(string persistenceId, IActorRef consumerController) + { + _consumerController = consumerController; + + Receive>(delivery => + { + var job = delivery.Message; + job.Probe.Tell(job.Payload); + delivery.ConfirmTo.Tell(ConsumerController.Confirmed.Instance); + }); + + Receive(_ => + { + Timers.StartSingleTimer(ActualStop.Instance, ActualStop.Instance, TimeSpan.FromMilliseconds(50)); + }); + + Receive(_ => Context.Stop(Self)); + } + + protected override void PreStart() + { + _consumerController.Tell(new ConsumerController.Start(Self)); + } + + public sealed class Stop: ConsumerController.IConsumerCommand + { + public static readonly Stop Instance = new(); + private Stop() { } + } + + public sealed class ActualStop + { + public static readonly ActualStop Instance = new(); + private ActualStop() { } + } + + public sealed record Job(int Payload, IActorRef Probe); + + public ITimerScheduler Timers { get; set; } = null!; +} diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/Delivery/TestProducer.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/Delivery/TestProducer.cs new file mode 100644 index 00000000000..31704976205 --- /dev/null +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/Delivery/TestProducer.cs @@ -0,0 +1,79 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2023 Lightbend Inc. +// Copyright (C) 2013-2023 .NET Foundation +// +// ----------------------------------------------------------------------- +#nullable enable +using System; +using Akka.Actor; +using Akka.Cluster.Sharding.Delivery; +using Akka.Event; + +namespace Akka.Cluster.Sharding.Tests.MultiNode.Delivery; + +internal class TestShardingProducer : ReceiveActor, IWithTimers, IWithStash +{ + private readonly ILoggingAdapter _log = Context.GetLogger(); + private readonly IActorRef _producerController; + private readonly IActorRef _probe; + + public TestShardingProducer(IActorRef producerController, IActorRef probe) + { + _producerController = producerController; + _probe = probe; + Idle(); + } + + public ITimerScheduler Timers { get; set; } = null!; + public IStash Stash { get; set; } = null!; + public IActorRef SendNext { get; set; } = ActorRefs.Nobody; + + protected override void PreStart() + { + // simulate fast producer + Timers.StartPeriodicTimer("tick", Tick.Instance, TimeSpan.FromMilliseconds(100)); + _producerController.Tell(new ShardingProducerController.Start(Self)); + } + + private void Idle() + { + Receive(_ => { }); // ignore + + Receive(_ => Stash.Stash()); + + Receive>(req => + { + SendNext = req.SendNextTo; + Become(Active); + }); + } + + private void Active() + { + Receive(_ => + { + if(!Stash.IsEmpty) + Stash.Unstash(); + }); + + Receive(n => + { + _log.Info("Sending {0}", n); + Become(Idle); + var job = new SlowStopConsumerEntity.Job(n, _probe); + SendNext.Tell(new ShardingEnvelope(n.ToString(), job)); + }); + + Receive>(req => + { + SendNext = req.SendNextTo; + }); + } + + public sealed class Tick + { + private Tick() { } + public static readonly Tick Instance = new(); + } +} \ No newline at end of file diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/Delivery/Internal/ShardingConsumerControllerImpl.cs b/src/contrib/cluster/Akka.Cluster.Sharding/Delivery/Internal/ShardingConsumerControllerImpl.cs index f2f8206ff97..56282af39e5 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/Delivery/Internal/ShardingConsumerControllerImpl.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/Delivery/Internal/ShardingConsumerControllerImpl.cs @@ -69,6 +69,19 @@ private void WaitForStart() _log.Debug("Consumer terminated before initialized."); Context.Stop(Self); }); + + ReceiveAny(msg => + { + if (Settings.AllowBypass) + { + _consumer.Forward(msg); + } + else + { + _log.Warning($"Message unhandled [{msg}]. If you need to pass this message to the consumer sharding entity actor, set \"akka.reliable-delivery.sharding.consumer-controller.allow-bypass\" to true"); + Unhandled(msg); + } + }); } private void Active() @@ -128,6 +141,19 @@ private void Active() } } }); + + ReceiveAny(msg => + { + if (Settings.AllowBypass) + { + _consumer.Forward(msg); + } + else + { + _log.Warning($"Message unhandled [{msg}]. If you need to pass this message to the consumer sharding entity actor, set \"akka.reliable-delivery.sharding.consumer-controller.allow-bypass\" to true"); + Unhandled(msg); + } + }); } private ImmutableDictionary UpdatedProducerControllers(IActorRef producerController, @@ -147,4 +173,4 @@ protected override void PreStart() } public IStash Stash { get; set; } = null!; -} \ No newline at end of file +} diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/Delivery/ShardingConsumerController.cs b/src/contrib/cluster/Akka.Cluster.Sharding/Delivery/ShardingConsumerController.cs index 872bf7cb98d..bf500e5fa8a 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/Delivery/ShardingConsumerController.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/Delivery/ShardingConsumerController.cs @@ -39,12 +39,14 @@ public static class ShardingConsumerController { public sealed record Settings { - private Settings(int bufferSize, ConsumerController.Settings consumerControllerSettings) + private Settings(Config config, ConsumerController.Settings consumerControllerSettings) { - BufferSize = bufferSize; + AllowBypass = config.GetBoolean("allow-bypass"); + BufferSize = config.GetInt("buffer-size"); ConsumerControllerSettings = consumerControllerSettings; } + public bool AllowBypass { get; init; } public int BufferSize { get; init; } public ConsumerController.Settings ConsumerControllerSettings { get; init; } @@ -58,7 +60,7 @@ public static Settings Create(ActorSystem system) internal static Settings Create(Config config, Config consumerControllerConfig) // made internal so users can't foot-gun themselves { - return new Settings(config.GetInt("buffer-size"), ConsumerController.Settings.Create(config.WithFallback(consumerControllerConfig))); + return new Settings(config, ConsumerController.Settings.Create(config.WithFallback(consumerControllerConfig))); } public override string ToString() diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/reference.conf b/src/contrib/cluster/Akka.Cluster.Sharding/reference.conf index 666babaa682..51522973768 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/reference.conf +++ b/src/contrib/cluster/Akka.Cluster.Sharding/reference.conf @@ -286,6 +286,11 @@ akka.reliable-delivery { } consumer-controller { + # Allow arbitrary message to be directly sent to the customer actor, + # this lets you use custom handoff messages to be used along side with + # cluster sharding reliable delivery + allow-bypass = false + # Limit of how many messages that can be buffered before the # ShardingConsumerController is initialized by the Start message. buffer-size = 1000 diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.DotNet.verified.txt index 5fca63870b6..326890414e5 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.DotNet.verified.txt @@ -407,6 +407,7 @@ namespace Akka.Cluster.Sharding.Delivery [System.Runtime.CompilerServices.NullableAttribute(0)] public sealed class Settings : System.IEquatable { + public bool AllowBypass { get; set; } public int BufferSize { get; set; } public Akka.Delivery.ConsumerController.Settings ConsumerControllerSettings { get; set; } public static Akka.Cluster.Sharding.Delivery.ShardingConsumerController.Settings Create(Akka.Actor.ActorSystem system) { } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.Net.verified.txt index 584f3aa95db..91e85bad906 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterSharding.Net.verified.txt @@ -407,6 +407,7 @@ namespace Akka.Cluster.Sharding.Delivery [System.Runtime.CompilerServices.NullableAttribute(0)] public sealed class Settings : System.IEquatable { + public bool AllowBypass { get; set; } public int BufferSize { get; set; } public Akka.Delivery.ConsumerController.Settings ConsumerControllerSettings { get; set; } public static Akka.Cluster.Sharding.Delivery.ShardingConsumerController.Settings Create(Akka.Actor.ActorSystem system) { }