Skip to content

Commit

Permalink
Work on ConsumerController deadlock (akkadotnet#7092)
Browse files Browse the repository at this point in the history
* added XML-DOC comments to current `ConsumerController.Settings`

working on akkadotnet#7088

* Add `RetryConfirmation` to `ConsumerController` settings

* Add retry handling during message confirmation

* Add specs

* Add anti-specs and settings tests

* Update API approval list

* Update XML-Doc and configuration comments

* Shorten retry time

---------

Co-authored-by: Gregorius Soedharmo <[email protected]>
  • Loading branch information
Aaronontheweb and Arkatufus authored Feb 16, 2024
1 parent 18247e1 commit 1c8ffc3
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2559,6 +2559,7 @@ namespace Akka.Delivery
public bool OnlyFlowControl { get; set; }
public System.TimeSpan ResendIntervalMax { get; set; }
public System.TimeSpan ResendIntervalMin { get; set; }
public bool RetryConfirmation { get; set; }
public static Akka.Delivery.ConsumerController.Settings Create(Akka.Actor.ActorSystem actorSystem) { }
public static Akka.Delivery.ConsumerController.Settings Create(Akka.Configuration.Config config) { }
public override string ToString() { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2557,6 +2557,7 @@ namespace Akka.Delivery
public bool OnlyFlowControl { get; set; }
public System.TimeSpan ResendIntervalMax { get; set; }
public System.TimeSpan ResendIntervalMin { get; set; }
public bool RetryConfirmation { get; set; }
public static Akka.Delivery.ConsumerController.Settings Create(Akka.Actor.ActorSystem actorSystem) { }
public static Akka.Delivery.ConsumerController.Settings Create(Akka.Configuration.Config config) { }
public override string ToString() { }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// -----------------------------------------------------------------------
// <copyright file="ConsumerControllerRetryConfirmSpecs.cs" company="Akka.NET Project">
// Copyright (C) 2009-2024 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2024 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
using Akka.Delivery;
using Akka.Util;
using FluentAssertions;
using FluentAssertions.Extensions;
using Xunit;
using Xunit.Abstractions;
using static Akka.Tests.Delivery.TestConsumer;

namespace Akka.Tests.Delivery;

public class ConsumerControllerRetryConfirmSpecs : TestKit.Xunit2.TestKit
{
public static readonly Config Config = @"
akka.reliable-delivery.consumer-controller {
flow-control-window = 20
resend-interval-min = 500ms
retry-confirmation = true
}";

public ConsumerControllerRetryConfirmSpecs(ITestOutputHelper outputHelper) : base(
Config.WithFallback(TestSerializer.Config).WithFallback(ZeroLengthSerializer.Config), output: outputHelper)
{
}

private int _idCount = 0;
private int NextId() => _idCount++;
private string ProducerId => $"p-{_idCount}";

[Fact]
public void ConsumerController_Settings_confirmation_retry_must_not_be_set_by_default()
{
var config = ConfigurationFactory.Default();
var settings = ConsumerController.Settings.Create(config.GetConfig("akka.reliable-delivery.consumer-controller"));
settings.RetryConfirmation.Should().BeFalse();
}

[Fact]
public void ConsumerController_Settings_confirmation_retry_must_be_set()
{
var settings = ConsumerController.Settings.Create(Sys);
settings.RetryConfirmation.Should().BeTrue();
}

[Fact]
public async Task ConsumerController_must_resend_Delivery_on_confirmation_retry()
{
var id = NextId();
var consumerProbe = CreateTestProbe();
var consumerController = Sys.ActorOf(ConsumerController.Create<Job>(Sys, Option<IActorRef>.None),
$"consumerController-{id}");
var producerControllerProbe = CreateTestProbe();

consumerController.Tell(new ConsumerController.Start<Job>(consumerProbe.Ref));
consumerController.Tell(new ConsumerController.RegisterToProducerController<Job>(producerControllerProbe.Ref));
await producerControllerProbe.ExpectMsgAsync<ProducerController.RegisterConsumer<Job>>();

consumerController.Tell(SequencedMessage(ProducerId, 1, producerControllerProbe.Ref));

await consumerProbe.ExpectMsgAsync<ConsumerController.Delivery<Job>>();

// expected resend
await consumerProbe.ExpectMsgAsync<ConsumerController.Delivery<Job>>(1.5.Seconds());
}

}
23 changes: 23 additions & 0 deletions src/core/Akka.Tests/Delivery/ConsumerControllerSpecs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using Akka.Delivery.Internal;
using Akka.Util;
using FluentAssertions;
using FluentAssertions.Extensions;
using Xunit;
using Xunit.Abstractions;
using static Akka.Tests.Delivery.TestConsumer;
Expand Down Expand Up @@ -672,4 +673,26 @@ public async Task ConsumerController_can_process_zero_length_Chunk()
consumerController.Tell(seqMessages1.First());
(await consumerProbe.ExpectMsgAsync<ConsumerController.Delivery<ZeroLengthSerializer.TestMsg>>()).Message.Should().Be(ZeroLengthSerializer.TestMsg.Instance);
}

[Fact]
public async Task ConsumerController_must_not_resend_Delivery()
{
var id = NextId();
var consumerProbe = CreateTestProbe();
var consumerController = Sys.ActorOf(ConsumerController.Create<Job>(Sys, Option<IActorRef>.None),
$"consumerController-{id}");
var producerControllerProbe = CreateTestProbe();

consumerController.Tell(new ConsumerController.Start<Job>(consumerProbe.Ref));
consumerController.Tell(new ConsumerController.RegisterToProducerController<Job>(producerControllerProbe.Ref));
await producerControllerProbe.ExpectMsgAsync<ProducerController.RegisterConsumer<Job>>();

consumerController.Tell(SequencedMessage(ProducerId, 1, producerControllerProbe.Ref));

await consumerProbe.ExpectMsgAsync<ConsumerController.Delivery<Job>>();

// expects no resend
await consumerProbe.ExpectNoMsgAsync(1.5.Seconds());
}

}
6 changes: 6 additions & 0 deletions src/core/Akka/Configuration/akka.conf
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,12 @@ akka {
# kept in memory in the `ProducerController` until they have been
# confirmed, but the drawback is that lost messages will not be delivered.
only-flow-control = false

# When disabled, the <see cref="ConsumerController"/> will discard any <c>Retry</c> messages when it is
# waiting for a message delivery confirmation.
#
# When enabled, timed-out message delivery will be subject to the same retry mechanism as all other message types.
retry-confirmation = false
}

work-pulling {
Expand Down
39 changes: 36 additions & 3 deletions src/core/Akka/Delivery/ConsumerController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -229,26 +229,59 @@ public static Settings Create(ActorSystem actorSystem)

public static Settings Create(Config config)
{
return new Settings(config.GetInt("flow-control-window"), config.GetTimeSpan("resend-interval-min"),
config.GetTimeSpan("resend-interval-max"), config.GetBoolean("only-flow-control"));
return new Settings(
flowControlWindow: config.GetInt("flow-control-window"),
resendIntervalMin: config.GetTimeSpan("resend-interval-min"),
resendIntervalMax: config.GetTimeSpan("resend-interval-max"),
onlyFlowControl: config.GetBoolean("only-flow-control"),
retryConfirmation: config.GetBoolean("retry-confirmation"));
}

private Settings(int flowControlWindow, TimeSpan resendIntervalMin, TimeSpan resendIntervalMax,
bool onlyFlowControl)
bool onlyFlowControl, bool retryConfirmation)
{
FlowControlWindow = flowControlWindow;
ResendIntervalMin = resendIntervalMin;
ResendIntervalMax = resendIntervalMax;
OnlyFlowControl = onlyFlowControl;
RetryConfirmation = retryConfirmation;
}

/// <summary>
/// Number of messages in flight between <see cref="ProducerController"/> and <see cref="ConsumerController"/>.
///
/// The <see cref="ConsumerController"/> requests for more message when half of the window has been used.
/// </summary>
public int FlowControlWindow { get; init; }

/// <summary>
/// The ConsumerController resends flow control messages to the ProducerController with the <see cref="ResendIntervalMin"/>,
/// and increasing it gradually to <see cref="ResendIntervalMax"/> when idle.
/// </summary>
public TimeSpan ResendIntervalMin { get; init; }

/// <summary>
/// The ConsumerController resends flow control messages to the ProducerController with the <see cref="ResendIntervalMin"/>,
/// and increasing it gradually to <see cref="ResendIntervalMax"/> when idle.
/// </summary>
public TimeSpan ResendIntervalMax { get; init; }

/// <summary>
/// If this is enabled lost messages will not be resent, but flow control is used.
///
/// This can be more efficient since messages don't have to be kept in memory in the
/// <see cref="ProducerController"/> until they have been confirmed, but the drawback is that lost messages
/// will not be delivered.
/// </summary>
public bool OnlyFlowControl { get; init; }

/// <summary>
/// When disabled, the <see cref="ConsumerController"/> will discard any <c>Retry</c> messages when it is
/// waiting for a message delivery confirmation.
///
/// When enabled, timed-out message delivery will be subject to the same retry mechanism as all other message types.
/// </summary>
public bool RetryConfirmation { get; init; }

public override string ToString()
{
Expand Down
11 changes: 11 additions & 0 deletions src/core/Akka/Delivery/Internal/ConsumerControllerImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,17 @@ long ComputeNextSeqNr()
Receive<Retry>(_ =>
{
// no retries when WaitingForConfirmation, will be performed from (idle) active
if (!Settings.RetryConfirmation)
return;

_log.Debug("Consumer received Retry while waiting for confirmation.");
ReceiveRetry(() =>
{
_log.Debug("Retry sending SequencedMessage [{0}].", sequencedMessage.SeqNr);
CurrentState.Consumer.Tell(new Delivery<T>(sequencedMessage.Message.Message!, Context.Self,
sequencedMessage.ProducerId, sequencedMessage.SeqNr));
Become(() => WaitingForConfirmation(sequencedMessage));
});
});

Receive<ConsumerController.Start<T>>(start =>
Expand Down

0 comments on commit 1c8ffc3

Please sign in to comment.