From 9bbcb5e2b8891045eeb28f3f850cec7dbda8aea8 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Mon, 6 Jan 2025 18:31:41 -0800 Subject: [PATCH] Ensure broker-originated channel closure completes Fixes #1749 * Ensure `Dispose` and `DisposeAsync` are idempotent and thread-safe. * Use TaskCompletionSource when `HandleChannelCloseAsync` runs to allow dispose methods to wait. * Use `Interlocked` for thread safety. * I like `_isDisposing` better. So sue me! * Move the `Interlocked.Exchange` code to a getter, for readability. * Minor nullable change. --- .../Impl/AutorecoveringChannel.cs | 38 ++++- .../Impl/AutorecoveringConnection.cs | 29 +++- projects/RabbitMQ.Client/Impl/Channel.cs | 132 ++++++++++++++---- projects/RabbitMQ.Client/Impl/Connection.cs | 28 +++- .../Test/Integration/TestChannelShutdown.cs | 49 +++++++ projects/Test/Integration/TestQueueDeclare.cs | 16 +++ 6 files changed, 245 insertions(+), 47 deletions(-) diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs index 26d77f8ea..b057af115 100644 --- a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs @@ -31,6 +31,7 @@ using System; using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; @@ -48,6 +49,7 @@ internal sealed class AutorecoveringChannel : IChannel, IRecoverable private AutorecoveringConnection _connection; private RecoveryAwareChannel _innerChannel; private bool _disposed; + private int _isDisposing; private ushort _prefetchCountConsumer; private ushort _prefetchCountGlobal; @@ -256,19 +258,25 @@ public override string ToString() public async ValueTask DisposeAsync() { - if (_disposed) + if (IsDisposing) { return; } - if (IsOpen) + try { - await this.AbortAsync() - .ConfigureAwait(false); - } + if (IsOpen) + { + await this.AbortAsync() + .ConfigureAwait(false); + } - _recordedConsumerTags.Clear(); - _disposed = true; + _recordedConsumerTags.Clear(); + } + finally + { + _disposed = true; + } } public ValueTask GetNextPublishSequenceNumberAsync(CancellationToken cancellationToken = default) => InnerChannel.GetNextPublishSequenceNumberAsync(cancellationToken); @@ -482,7 +490,23 @@ private void ThrowIfDisposed() ThrowDisposed(); } + return; + + [DoesNotReturn] static void ThrowDisposed() => throw new ObjectDisposedException(typeof(AutorecoveringChannel).FullName); } + + private bool IsDisposing + { + get + { + if (Interlocked.Exchange(ref _isDisposing, 1) != 0) + { + return true; + } + + return false; + } + } } } diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs index 61df604f7..879ed249a 100644 --- a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs +++ b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs @@ -31,6 +31,7 @@ using System; using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; @@ -50,6 +51,7 @@ internal sealed partial class AutorecoveringConnection : IConnection private Connection _innerConnection; private bool _disposed; + private int _isDisposing; private Connection InnerConnection { @@ -272,7 +274,7 @@ await RecordChannelAsync(autorecoveringChannel, channelsSemaphoreHeld: false, ca public async ValueTask DisposeAsync() { - if (_disposed) + if (IsDisposing) { return; } @@ -281,6 +283,11 @@ public async ValueTask DisposeAsync() { await _innerConnection.DisposeAsync() .ConfigureAwait(false); + + _channels.Clear(); + _recordedEntitiesSemaphore.Dispose(); + _channelsSemaphore.Dispose(); + _recoveryCancellationTokenSource.Dispose(); } catch (OperationInterruptedException) { @@ -288,10 +295,6 @@ await _innerConnection.DisposeAsync() } finally { - _channels.Clear(); - _recordedEntitiesSemaphore.Dispose(); - _channelsSemaphore.Dispose(); - _recoveryCancellationTokenSource.Dispose(); _disposed = true; } } @@ -307,7 +310,23 @@ private void ThrowIfDisposed() ThrowDisposed(); } + return; + + [DoesNotReturn] static void ThrowDisposed() => throw new ObjectDisposedException(typeof(AutorecoveringConnection).FullName); } + + private bool IsDisposing + { + get + { + if (Interlocked.Exchange(ref _isDisposing, 1) != 0) + { + return true; + } + + return false; + } + } } } diff --git a/projects/RabbitMQ.Client/Impl/Channel.cs b/projects/RabbitMQ.Client/Impl/Channel.cs index 4acc3da62..7293f584c 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.cs @@ -59,8 +59,13 @@ internal partial class Channel : IChannel, IRecoverable private ShutdownEventArgs? _closeReason; public ShutdownEventArgs? CloseReason => Volatile.Read(ref _closeReason); + private TaskCompletionSource? _serverOriginatedChannelCloseTcs; + internal readonly IConsumerDispatcher ConsumerDispatcher; + private bool _disposed; + private int _isDisposing; + public Channel(ISession session, CreateChannelOptions createChannelOptions) { ContinuationTimeout = createChannelOptions.ContinuationTimeout; @@ -514,22 +519,41 @@ public override string ToString() void IDisposable.Dispose() { + if (_disposed) + { + return; + } + Dispose(true); } protected virtual void Dispose(bool disposing) { + if (IsDisposing) + { + return; + } + if (disposing) { - if (IsOpen) + try { - this.AbortAsync().GetAwaiter().GetResult(); - } + if (IsOpen) + { + this.AbortAsync().GetAwaiter().GetResult(); + } - ConsumerDispatcher.Dispose(); - _rpcSemaphore.Dispose(); - _confirmSemaphore.Dispose(); - _outstandingPublisherConfirmationsRateLimiter?.Dispose(); + _serverOriginatedChannelCloseTcs?.Task.Wait(TimeSpan.FromSeconds(5)); + + ConsumerDispatcher.Dispose(); + _rpcSemaphore.Dispose(); + _confirmSemaphore.Dispose(); + _outstandingPublisherConfirmationsRateLimiter?.Dispose(); + } + finally + { + _disposed = true; + } } } @@ -543,18 +567,37 @@ await DisposeAsyncCore() protected virtual async ValueTask DisposeAsyncCore() { - if (IsOpen) + if (IsDisposing) { - await this.AbortAsync().ConfigureAwait(false); + return; } - ConsumerDispatcher.Dispose(); - _rpcSemaphore.Dispose(); - _confirmSemaphore.Dispose(); - if (_outstandingPublisherConfirmationsRateLimiter is not null) + try { - await _outstandingPublisherConfirmationsRateLimiter.DisposeAsync() - .ConfigureAwait(false); + if (IsOpen) + { + await this.AbortAsync().ConfigureAwait(false); + } + + if (_serverOriginatedChannelCloseTcs is not null) + { + await _serverOriginatedChannelCloseTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)) + .ConfigureAwait(false); + } + + ConsumerDispatcher.Dispose(); + _rpcSemaphore.Dispose(); + _confirmSemaphore.Dispose(); + + if (_outstandingPublisherConfirmationsRateLimiter is not null) + { + await _outstandingPublisherConfirmationsRateLimiter.DisposeAsync() + .ConfigureAwait(false); + } + } + finally + { + _disposed = true; } } @@ -651,23 +694,41 @@ protected virtual ulong AdjustDeliveryTag(ulong deliveryTag) protected async Task HandleChannelCloseAsync(IncomingCommand cmd, CancellationToken cancellationToken) { - var channelClose = new ChannelClose(cmd.MethodSpan); - SetCloseReason(new ShutdownEventArgs(ShutdownInitiator.Peer, - channelClose._replyCode, - channelClose._replyText, - channelClose._classId, - channelClose._methodId)); + TaskCompletionSource? serverOriginatedChannelCloseTcs = _serverOriginatedChannelCloseTcs; + if (serverOriginatedChannelCloseTcs is null) + { + // Attempt to assign the new TCS only if _tcs is still null + _ = Interlocked.CompareExchange(ref _serverOriginatedChannelCloseTcs, + new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously), null); + } - await Session.CloseAsync(_closeReason, notify: false) - .ConfigureAwait(false); + try + { + var channelClose = new ChannelClose(cmd.MethodSpan); + SetCloseReason(new ShutdownEventArgs(ShutdownInitiator.Peer, + channelClose._replyCode, + channelClose._replyText, + channelClose._classId, + channelClose._methodId)); - var method = new ChannelCloseOk(); - await ModelSendAsync(in method, cancellationToken) - .ConfigureAwait(false); + await Session.CloseAsync(_closeReason, notify: false) + .ConfigureAwait(false); - await Session.NotifyAsync(cancellationToken) - .ConfigureAwait(false); - return true; + var method = new ChannelCloseOk(); + await ModelSendAsync(in method, cancellationToken) + .ConfigureAwait(false); + + await Session.NotifyAsync(cancellationToken) + .ConfigureAwait(false); + + _serverOriginatedChannelCloseTcs?.TrySetResult(true); + return true; + } + catch (Exception ex) + { + _serverOriginatedChannelCloseTcs?.TrySetException(ex); + throw; + } } protected async Task HandleChannelCloseOkAsync(IncomingCommand cmd, CancellationToken cancellationToken) @@ -1587,5 +1648,18 @@ private Task DispatchCommandAsync(IncomingCommand cmd, CancellationToken c } } } + + private bool IsDisposing + { + get + { + if (Interlocked.Exchange(ref _isDisposing, 1) != 0) + { + return true; + } + + return false; + } + } } } diff --git a/projects/RabbitMQ.Client/Impl/Connection.cs b/projects/RabbitMQ.Client/Impl/Connection.cs index 68cadcfd7..fd88b5b5b 100644 --- a/projects/RabbitMQ.Client/Impl/Connection.cs +++ b/projects/RabbitMQ.Client/Impl/Connection.cs @@ -32,6 +32,7 @@ using System; using System.Collections.Generic; using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; using System.IO; using System.Runtime.CompilerServices; using System.Threading; @@ -46,6 +47,7 @@ namespace RabbitMQ.Client.Framing internal sealed partial class Connection : IConnection { private bool _disposed; + private int _isDisposing; private volatile bool _closed; private readonly ConnectionConfig _config; @@ -489,7 +491,7 @@ internal ValueTask WriteAsync(RentedMemory frames, CancellationToken cancellatio public async ValueTask DisposeAsync() { - if (_disposed) + if (IsDisposing) { return; } @@ -523,13 +525,13 @@ private void ThrowIfDisposed() { if (_disposed) { - ThrowObjectDisposedException(); + ThrowDisposed(); } - static void ThrowObjectDisposedException() - { - throw new ObjectDisposedException(typeof(Connection).FullName); - } + return; + + [DoesNotReturn] + static void ThrowDisposed() => throw new ObjectDisposedException(typeof(Connection).FullName); } public override string ToString() @@ -537,9 +539,23 @@ public override string ToString() return $"Connection({_id},{Endpoint})"; } + [DoesNotReturn] private static void ThrowAlreadyClosedException(ShutdownEventArgs closeReason) { throw new AlreadyClosedException(closeReason); } + + private bool IsDisposing + { + get + { + if (Interlocked.Exchange(ref _isDisposing, 1) != 0) + { + return true; + } + + return false; + } + } } } diff --git a/projects/Test/Integration/TestChannelShutdown.cs b/projects/Test/Integration/TestChannelShutdown.cs index 1774555b3..3b2e6f2d3 100644 --- a/projects/Test/Integration/TestChannelShutdown.cs +++ b/projects/Test/Integration/TestChannelShutdown.cs @@ -30,6 +30,8 @@ //--------------------------------------------------------------------------- using System; +using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client; using RabbitMQ.Client.Impl; @@ -61,5 +63,52 @@ public async Task TestConsumerDispatcherShutdown() await WaitAsync(tcs, TimeSpan.FromSeconds(5), "channel shutdown"); Assert.True(autorecoveringChannel.ConsumerDispatcher.IsShutdown, "dispatcher should be shut down after CloseAsync"); } + + [Fact] + public async Task TestConcurrentDisposeAsync_GH1749() + { + bool sawCallbackException = false; + int channelShutdownCount = 0; + + _channel.CallbackExceptionAsync += (channel, ea) => + { + sawCallbackException = true; + return Task.CompletedTask; + }; + + _channel.ChannelShutdownAsync += (channel, args) => + { + Interlocked.Increment(ref channelShutdownCount); + return Task.CompletedTask; + }; + + var disposeTasks = new List + { + _channel.DisposeAsync(), + _channel.DisposeAsync(), + _channel.DisposeAsync() + }; + + foreach (ValueTask vt in disposeTasks) + { + await vt; + } + + Assert.Equal(1, channelShutdownCount); + Assert.False(sawCallbackException); + + disposeTasks.Clear(); + disposeTasks.Add(_conn.DisposeAsync()); + disposeTasks.Add(_conn.DisposeAsync()); + disposeTasks.Add(_conn.DisposeAsync()); + + foreach (ValueTask vt in disposeTasks) + { + await vt; + } + + _channel = null; + _conn = null; + } } } diff --git a/projects/Test/Integration/TestQueueDeclare.cs b/projects/Test/Integration/TestQueueDeclare.cs index 94e758a64..91510029f 100644 --- a/projects/Test/Integration/TestQueueDeclare.cs +++ b/projects/Test/Integration/TestQueueDeclare.cs @@ -57,6 +57,22 @@ public async Task TestQueueDeclareAsync() Assert.Equal(q, passiveDeclareResult.QueueName); } + [Fact] + public async Task TestPassiveQueueDeclareException_GH1749() + { + string q = GenerateQueueName(); + try + { + await _channel.QueueDeclarePassiveAsync(q); + } + catch (Exception ex) + { + _output.WriteLine("{0} ex: {1}", _testDisplayName, ex); + await _channel.DisposeAsync(); + _channel = null; + } + } + [Fact] public async Task TestConcurrentQueueDeclareAndBindAsync() {