diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt index 6a0f57ea371..4bd36b8e712 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt @@ -2559,6 +2559,7 @@ namespace Akka.Delivery public bool OnlyFlowControl { get; set; } public System.TimeSpan ResendIntervalMax { get; set; } public System.TimeSpan ResendIntervalMin { get; set; } + public bool RetryConfirmation { get; set; } public static Akka.Delivery.ConsumerController.Settings Create(Akka.Actor.ActorSystem actorSystem) { } public static Akka.Delivery.ConsumerController.Settings Create(Akka.Configuration.Config config) { } public override string ToString() { } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt index 88832976eb4..55a5ef3780b 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt @@ -2557,6 +2557,7 @@ namespace Akka.Delivery public bool OnlyFlowControl { get; set; } public System.TimeSpan ResendIntervalMax { get; set; } public System.TimeSpan ResendIntervalMin { get; set; } + public bool RetryConfirmation { get; set; } public static Akka.Delivery.ConsumerController.Settings Create(Akka.Actor.ActorSystem actorSystem) { } public static Akka.Delivery.ConsumerController.Settings Create(Akka.Configuration.Config config) { } public override string ToString() { } diff --git a/src/core/Akka.Tests/Delivery/ConsumerControllerRetryConfirmSpecs.cs b/src/core/Akka.Tests/Delivery/ConsumerControllerRetryConfirmSpecs.cs new file mode 100644 index 00000000000..74d6e821cc9 --- /dev/null +++ b/src/core/Akka.Tests/Delivery/ConsumerControllerRetryConfirmSpecs.cs @@ -0,0 +1,75 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2024 Lightbend Inc. +// Copyright (C) 2013-2024 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Configuration; +using Akka.Delivery; +using Akka.Util; +using FluentAssertions; +using FluentAssertions.Extensions; +using Xunit; +using Xunit.Abstractions; +using static Akka.Tests.Delivery.TestConsumer; + +namespace Akka.Tests.Delivery; + +public class ConsumerControllerRetryConfirmSpecs : TestKit.Xunit2.TestKit +{ + public static readonly Config Config = @" + akka.reliable-delivery.consumer-controller { + flow-control-window = 20 + resend-interval-min = 500ms + retry-confirmation = true + }"; + + public ConsumerControllerRetryConfirmSpecs(ITestOutputHelper outputHelper) : base( + Config.WithFallback(TestSerializer.Config).WithFallback(ZeroLengthSerializer.Config), output: outputHelper) + { + } + + private int _idCount = 0; + private int NextId() => _idCount++; + private string ProducerId => $"p-{_idCount}"; + + [Fact] + public void ConsumerController_Settings_confirmation_retry_must_not_be_set_by_default() + { + var config = ConfigurationFactory.Default(); + var settings = ConsumerController.Settings.Create(config.GetConfig("akka.reliable-delivery.consumer-controller")); + settings.RetryConfirmation.Should().BeFalse(); + } + + [Fact] + public void ConsumerController_Settings_confirmation_retry_must_be_set() + { + var settings = ConsumerController.Settings.Create(Sys); + settings.RetryConfirmation.Should().BeTrue(); + } + + [Fact] + public async Task ConsumerController_must_resend_Delivery_on_confirmation_retry() + { + var id = NextId(); + var consumerProbe = CreateTestProbe(); + var consumerController = Sys.ActorOf(ConsumerController.Create(Sys, Option.None), + $"consumerController-{id}"); + var producerControllerProbe = CreateTestProbe(); + + consumerController.Tell(new ConsumerController.Start(consumerProbe.Ref)); + consumerController.Tell(new ConsumerController.RegisterToProducerController(producerControllerProbe.Ref)); + await producerControllerProbe.ExpectMsgAsync>(); + + consumerController.Tell(SequencedMessage(ProducerId, 1, producerControllerProbe.Ref)); + + await consumerProbe.ExpectMsgAsync>(); + + // expected resend + await consumerProbe.ExpectMsgAsync>(1.5.Seconds()); + } + +} \ No newline at end of file diff --git a/src/core/Akka.Tests/Delivery/ConsumerControllerSpecs.cs b/src/core/Akka.Tests/Delivery/ConsumerControllerSpecs.cs index 4ca05b86d06..d407942a449 100644 --- a/src/core/Akka.Tests/Delivery/ConsumerControllerSpecs.cs +++ b/src/core/Akka.Tests/Delivery/ConsumerControllerSpecs.cs @@ -7,6 +7,7 @@ using Akka.Delivery.Internal; using Akka.Util; using FluentAssertions; +using FluentAssertions.Extensions; using Xunit; using Xunit.Abstractions; using static Akka.Tests.Delivery.TestConsumer; @@ -672,4 +673,26 @@ public async Task ConsumerController_can_process_zero_length_Chunk() consumerController.Tell(seqMessages1.First()); (await consumerProbe.ExpectMsgAsync>()).Message.Should().Be(ZeroLengthSerializer.TestMsg.Instance); } + + [Fact] + public async Task ConsumerController_must_not_resend_Delivery() + { + var id = NextId(); + var consumerProbe = CreateTestProbe(); + var consumerController = Sys.ActorOf(ConsumerController.Create(Sys, Option.None), + $"consumerController-{id}"); + var producerControllerProbe = CreateTestProbe(); + + consumerController.Tell(new ConsumerController.Start(consumerProbe.Ref)); + consumerController.Tell(new ConsumerController.RegisterToProducerController(producerControllerProbe.Ref)); + await producerControllerProbe.ExpectMsgAsync>(); + + consumerController.Tell(SequencedMessage(ProducerId, 1, producerControllerProbe.Ref)); + + await consumerProbe.ExpectMsgAsync>(); + + // expects no resend + await consumerProbe.ExpectNoMsgAsync(1.5.Seconds()); + } + } \ No newline at end of file diff --git a/src/core/Akka/Configuration/akka.conf b/src/core/Akka/Configuration/akka.conf index e4594491f39..fb95fb6612e 100644 --- a/src/core/Akka/Configuration/akka.conf +++ b/src/core/Akka/Configuration/akka.conf @@ -664,6 +664,12 @@ akka { # kept in memory in the `ProducerController` until they have been # confirmed, but the drawback is that lost messages will not be delivered. only-flow-control = false + + # When disabled, the will discard any Retry messages when it is + # waiting for a message delivery confirmation. + # + # When enabled, timed-out message delivery will be subject to the same retry mechanism as all other message types. + retry-confirmation = false } work-pulling { diff --git a/src/core/Akka/Delivery/ConsumerController.cs b/src/core/Akka/Delivery/ConsumerController.cs index 415c3349643..6912dc26858 100644 --- a/src/core/Akka/Delivery/ConsumerController.cs +++ b/src/core/Akka/Delivery/ConsumerController.cs @@ -229,26 +229,59 @@ public static Settings Create(ActorSystem actorSystem) public static Settings Create(Config config) { - return new Settings(config.GetInt("flow-control-window"), config.GetTimeSpan("resend-interval-min"), - config.GetTimeSpan("resend-interval-max"), config.GetBoolean("only-flow-control")); + return new Settings( + flowControlWindow: config.GetInt("flow-control-window"), + resendIntervalMin: config.GetTimeSpan("resend-interval-min"), + resendIntervalMax: config.GetTimeSpan("resend-interval-max"), + onlyFlowControl: config.GetBoolean("only-flow-control"), + retryConfirmation: config.GetBoolean("retry-confirmation")); } private Settings(int flowControlWindow, TimeSpan resendIntervalMin, TimeSpan resendIntervalMax, - bool onlyFlowControl) + bool onlyFlowControl, bool retryConfirmation) { FlowControlWindow = flowControlWindow; ResendIntervalMin = resendIntervalMin; ResendIntervalMax = resendIntervalMax; OnlyFlowControl = onlyFlowControl; + RetryConfirmation = retryConfirmation; } + /// + /// Number of messages in flight between and . + /// + /// The requests for more message when half of the window has been used. + /// public int FlowControlWindow { get; init; } + /// + /// The ConsumerController resends flow control messages to the ProducerController with the , + /// and increasing it gradually to when idle. + /// public TimeSpan ResendIntervalMin { get; init; } + /// + /// The ConsumerController resends flow control messages to the ProducerController with the , + /// and increasing it gradually to when idle. + /// public TimeSpan ResendIntervalMax { get; init; } + /// + /// If this is enabled lost messages will not be resent, but flow control is used. + /// + /// This can be more efficient since messages don't have to be kept in memory in the + /// until they have been confirmed, but the drawback is that lost messages + /// will not be delivered. + /// public bool OnlyFlowControl { get; init; } + + /// + /// When disabled, the will discard any Retry messages when it is + /// waiting for a message delivery confirmation. + /// + /// When enabled, timed-out message delivery will be subject to the same retry mechanism as all other message types. + /// + public bool RetryConfirmation { get; init; } public override string ToString() { diff --git a/src/core/Akka/Delivery/Internal/ConsumerControllerImpl.cs b/src/core/Akka/Delivery/Internal/ConsumerControllerImpl.cs index 94db04dc68c..5eeafbfe3d5 100644 --- a/src/core/Akka/Delivery/Internal/ConsumerControllerImpl.cs +++ b/src/core/Akka/Delivery/Internal/ConsumerControllerImpl.cs @@ -386,6 +386,17 @@ long ComputeNextSeqNr() Receive(_ => { // no retries when WaitingForConfirmation, will be performed from (idle) active + if (!Settings.RetryConfirmation) + return; + + _log.Debug("Consumer received Retry while waiting for confirmation."); + ReceiveRetry(() => + { + _log.Debug("Retry sending SequencedMessage [{0}].", sequencedMessage.SeqNr); + CurrentState.Consumer.Tell(new Delivery(sequencedMessage.Message.Message!, Context.Self, + sequencedMessage.ProducerId, sequencedMessage.SeqNr)); + Become(() => WaitingForConfirmation(sequencedMessage)); + }); }); Receive>(start =>