From b1d80bfb4d51d0df9b5f1e77592bc0b589270c1f Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 3 Jan 2024 14:19:18 -0600 Subject: [PATCH 1/4] Troubleshoot ReliableDeliveryRandomSpecs reproduce #7033 --- .../Delivery/ReliableDeliveryRandomSpecs.cs | 67 ++++++++++--------- 1 file changed, 35 insertions(+), 32 deletions(-) diff --git a/src/core/Akka.Tests/Delivery/ReliableDeliveryRandomSpecs.cs b/src/core/Akka.Tests/Delivery/ReliableDeliveryRandomSpecs.cs index 5d8ddcc74d6..e2e56e4e530 100644 --- a/src/core/Akka.Tests/Delivery/ReliableDeliveryRandomSpecs.cs +++ b/src/core/Akka.Tests/Delivery/ReliableDeliveryRandomSpecs.cs @@ -20,11 +20,13 @@ namespace Akka.Tests.Delivery; public class ReliableDeliveryRandomSpecs : TestKit.Xunit2.TestKit { - internal static readonly Config Config = @"akka.reliable-delivery.consumer-controller{ + private static readonly Config Config = @"akka.reliable-delivery.consumer-controller{ flow-control-window = 20 resend-interval-min = 500ms resend-interval-max = 2s - }"; + } + akka.loglevel = DEBUG + "; public ReliableDeliveryRandomSpecs(ITestOutputHelper output) : this(output, Config) { @@ -57,16 +59,6 @@ private async Task Test(int numberOfMessages, double producerDropProbability, do consumerDropProbability, producerDropProbability, consumerDelay, producerDelay, durableFailProbability, durableDelay); - // RandomFlakyNetwork to simulate lost messages from producerController to consumerController - double ConsumerDrop(object msg) - { - return msg switch - { - ConsumerController.SequencedMessage _ => consumerDropProbability, - _ => 0 - }; - } - var consumerEndProbe = CreateTestProbe(); var consumerController = Sys.ActorOf(ConsumerController.CreateWithFuzzing(Sys, Option.None, ConsumerDrop, consumerControllerSettings), $"consumer-controller-{_idCount}"); @@ -75,18 +67,6 @@ double ConsumerDrop(object msg) TestConsumer.PropsFor(consumerDelay, numberOfMessages, consumerEndProbe.Ref, consumerController), $"consumer-{_idCount}"); - // RandomFlakyNetwork to simulate lost messages from consumerController to producerController - double ProducerDrop(object msg) - { - return msg switch - { - ProducerController.Request _ => producerDropProbability, - ProducerController.Resend _ => producerDropProbability, - ProducerController.RegisterConsumer _ => producerDropProbability, - _ => 0 - }; - } - var stateHolder = new AtomicReference>(DurableProducerQueueStateHolder.Empty); var durableQueue = durableFailProbability.Select(p => { @@ -105,31 +85,54 @@ double ProducerDrop(object msg) new ConsumerController.RegisterToProducerController(producerController)); await consumerEndProbe.ExpectMsgAsync(TimeSpan.FromSeconds(120)); + return; + + // RandomFlakyNetwork to simulate lost messages from producerController to consumerController + double ConsumerDrop(object msg) + { + return msg switch + { + ConsumerController.SequencedMessage _ => consumerDropProbability, + _ => 0 + }; + } + + // RandomFlakyNetwork to simulate lost messages from consumerController to producerController + double ProducerDrop(object msg) + { + return msg switch + { + ProducerController.Request _ => producerDropProbability, + ProducerController.Resend _ => producerDropProbability, + ProducerController.RegisterConsumer _ => producerDropProbability, + _ => 0 + }; + } } [Fact] - public async Task ReliableDelivery_with_random_failures_must_work_with_flaky_network() + public Task ReliableDelivery_with_random_failures_must_work_with_flaky_network() { NextId(); var consumerDropProbability = 0.1 + ThreadLocalRandom.Current.NextDouble() * 0.2; var producerDropProbability = 0.1 + ThreadLocalRandom.Current.NextDouble() * 0.2; - await Test(numberOfMessages: 63, producerDropProbability: producerDropProbability, + return Test(numberOfMessages: 63, producerDropProbability: producerDropProbability, consumerDropProbability: consumerDropProbability, Option.None, true); } [Fact] - public async Task ReliableDelivery_with_random_failures_must_work_with_flaky_DurableProducerQueue() + public Task ReliableDelivery_with_random_failures_must_work_with_flaky_DurableProducerQueue() { NextId(); var durableFailProbability = 0.1 + ThreadLocalRandom.Current.NextDouble() * 0.1; - await Test(numberOfMessages: 31, producerDropProbability: 0.0, + return Test(numberOfMessages: 31, producerDropProbability: 0.0, consumerDropProbability:0.0, durableFailProbability, true); } [Fact] - public async Task ReliableDelivery_with_random_failures_must_work_with_flaky_network_and_flaky_DurableProducerQueue() + public Task ReliableDelivery_with_random_failures_must_work_with_flaky_network_and_flaky_DurableProducerQueue() { NextId(); var consumerDropProbability = 0.1 + ThreadLocalRandom.Current.NextDouble() * 0.1; @@ -137,18 +140,18 @@ public async Task ReliableDelivery_with_random_failures_must_work_with_flaky_net var durableFailProbability = 0.1 + ThreadLocalRandom.Current.NextDouble() * 0.1; - await Test(numberOfMessages: 17, producerDropProbability: producerDropProbability, + return Test(numberOfMessages: 17, producerDropProbability: producerDropProbability, consumerDropProbability: consumerDropProbability, durableFailProbability, true); } [Fact] - public async Task ReliableDelivery_with_random_failures_must_work_with_flaky_network_without_resending() + public Task ReliableDelivery_with_random_failures_must_work_with_flaky_network_without_resending() { NextId(); var consumerDropProbability = 0.1 + ThreadLocalRandom.Current.NextDouble() * 0.4; var producerDropProbability = 0.1 + ThreadLocalRandom.Current.NextDouble() * 0.3; - await Test(numberOfMessages: 63, producerDropProbability: producerDropProbability, + return Test(numberOfMessages: 63, producerDropProbability: producerDropProbability, consumerDropProbability: consumerDropProbability, Option.None, false); } } \ No newline at end of file From a5b47f8d359a1ec80a8b027f42f57dd76d4d6532 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 3 Jan 2024 14:34:33 -0600 Subject: [PATCH 2/4] fixed logging bug --- src/core/Akka/Delivery/Internal/ProducerControllerImpl.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/Akka/Delivery/Internal/ProducerControllerImpl.cs b/src/core/Akka/Delivery/Internal/ProducerControllerImpl.cs index 29e61035044..ca43253af6b 100644 --- a/src/core/Akka/Delivery/Internal/ProducerControllerImpl.cs +++ b/src/core/Akka/Delivery/Internal/ProducerControllerImpl.cs @@ -490,7 +490,7 @@ private void ReceiveRequest(long newConfirmedSeqNr, long newRequestedSeqNr, bool : newRequestedSeqNr; if (newRequestedSeqNr2 != newRequestedSeqNr) - _log.Debug("Expanded requestedSeqNr from [{0}] to [{1}], because current [{3}] and all were probably lost.", + _log.Debug("Expanded requestedSeqNr from [{0}] to [{1}], because current [{2}] and all were probably lost.", newRequestedSeqNr, newRequestedSeqNr2, stateAfterAck.CurrentSeqNr); if (newRequestedSeqNr > CurrentState.RequestedSeqNr) From 1f7a6832993a1fcbd45d92d1a6bc2d13869a91a9 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 3 Jan 2024 14:58:56 -0600 Subject: [PATCH 3/4] adding better logging around fuzz factor --- .../Internal/ConsumerControllerImpl.cs | 70 ++++++++++--------- 1 file changed, 38 insertions(+), 32 deletions(-) diff --git a/src/core/Akka/Delivery/Internal/ConsumerControllerImpl.cs b/src/core/Akka/Delivery/Internal/ConsumerControllerImpl.cs index 89053c457ad..94db04dc68c 100644 --- a/src/core/Akka/Delivery/Internal/ConsumerControllerImpl.cs +++ b/src/core/Akka/Delivery/Internal/ConsumerControllerImpl.cs @@ -57,7 +57,11 @@ protected internal override bool AroundReceive(Receive receive, object message) { // TESTING PURPOSES ONLY - used to simulate network failures. if (_fuzzingControl != null && ThreadLocalRandom.Current.NextDouble() < _fuzzingControl(message)) + { + _log.Debug("[Testing] dropping message [{0}] due to fuzzing factor", message); return true; + } + return base.AroundReceive(receive, message); } @@ -281,38 +285,6 @@ private void WaitingForConfirmation(SequencedMessage sequencedMessage) if (_log.IsDebugEnabled) _log.Debug("Received Confirmed seqNr [{0}] from consumer, stashed size [{1}].", seqNr, Stash.Count); - long ComputeNextSeqNr() - { - if (sequencedMessage.First) - { - // confirm the first message immediately to cancel resending of first - var newRequestedSeqNr = seqNr - 1 + Settings.FlowControlWindow; - _log.Debug("Sending Request after first with confirmedSeqNr [{0}], requestUpToSeqNr [{1}]", seqNr, - newRequestedSeqNr); - CurrentState.ProducerController.Tell(new Request(seqNr, newRequestedSeqNr, ResendLost, false)); - return newRequestedSeqNr; - } - - if (CurrentState.RequestedSeqNr - seqNr == Settings.FlowControlWindow / 2) - { - var newRequestedSeqNr = CurrentState.RequestedSeqNr + Settings.FlowControlWindow / 2; - _log.Debug("Sending Request with confirmedSeqNr [{0}], requestUpToSeqNr [{1}]", seqNr, - newRequestedSeqNr); - CurrentState.ProducerController.Tell(new Request(seqNr, newRequestedSeqNr, ResendLost, false)); - _retryTimer.Start(); // reset interval since Request was just sent - return newRequestedSeqNr; - } - - if (sequencedMessage.Ack) - { - if (_log.IsDebugEnabled) - _log.Debug("Sending Ack seqNr [{0}]", seqNr); - CurrentState.ProducerController.Tell(new Ack(seqNr)); - } - - return CurrentState.RequestedSeqNr; - } - var requestedSeqNr = ComputeNextSeqNr(); if (CurrentState.Stopping && Stash.IsEmpty) { @@ -351,6 +323,40 @@ async Task ShutDownAndStop() Stash.Unstash(); Become(Active); } + + return; + + long ComputeNextSeqNr() + { + if (sequencedMessage.First) + { + // confirm the first message immediately to cancel resending of first + var newRequestedSeqNr = seqNr - 1 + Settings.FlowControlWindow; + _log.Debug("Sending Request after first with confirmedSeqNr [{0}], requestUpToSeqNr [{1}]", seqNr, + newRequestedSeqNr); + CurrentState.ProducerController.Tell(new Request(seqNr, newRequestedSeqNr, ResendLost, false)); + return newRequestedSeqNr; + } + + if (CurrentState.RequestedSeqNr - seqNr == Settings.FlowControlWindow / 2) + { + var newRequestedSeqNr = CurrentState.RequestedSeqNr + Settings.FlowControlWindow / 2; + _log.Debug("Sending Request with confirmedSeqNr [{0}], requestUpToSeqNr [{1}]", seqNr, + newRequestedSeqNr); + CurrentState.ProducerController.Tell(new Request(seqNr, newRequestedSeqNr, ResendLost, false)); + _retryTimer.Start(); // reset interval since Request was just sent + return newRequestedSeqNr; + } + + if (sequencedMessage.Ack) + { + if (_log.IsDebugEnabled) + _log.Debug("Sending Ack seqNr [{0}]", seqNr); + CurrentState.ProducerController.Tell(new Ack(seqNr)); + } + + return CurrentState.RequestedSeqNr; + } }); Receive>(msg => From d784f5d931bad36cbc9af48a05cb3e1f0917cd8e Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Thu, 4 Jan 2024 08:14:56 -0600 Subject: [PATCH 4/4] found the bug --- src/core/Akka/Delivery/Internal/ProducerControllerImpl.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/Akka/Delivery/Internal/ProducerControllerImpl.cs b/src/core/Akka/Delivery/Internal/ProducerControllerImpl.cs index ca43253af6b..d011d2b5c51 100644 --- a/src/core/Akka/Delivery/Internal/ProducerControllerImpl.cs +++ b/src/core/Akka/Delivery/Internal/ProducerControllerImpl.cs @@ -493,7 +493,7 @@ private void ReceiveRequest(long newConfirmedSeqNr, long newRequestedSeqNr, bool _log.Debug("Expanded requestedSeqNr from [{0}] to [{1}], because current [{2}] and all were probably lost.", newRequestedSeqNr, newRequestedSeqNr2, stateAfterAck.CurrentSeqNr); - if (newRequestedSeqNr > CurrentState.RequestedSeqNr) + if (newRequestedSeqNr2 > CurrentState.RequestedSeqNr) { bool newRequested; if (CurrentState.StoreMessageSentInProgress != 0)