diff --git a/RabbitMQDotNetClient.sln b/RabbitMQDotNetClient.sln index 4307f8500..deb687a47 100644 --- a/RabbitMQDotNetClient.sln +++ b/RabbitMQDotNetClient.sln @@ -46,6 +46,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GH-1647", "projects\Applica EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PublisherConfirms", "projects\Applications\PublisherConfirms\PublisherConfirms.csproj", "{13149F73-2CDB-4ECF-BF2C-403860045751}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GH-1749", "projects\Applications\GH-1749\GH-1749.csproj", "{725D9986-ACD1-424E-AF4C-2BEB407D2BD9}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -108,6 +110,10 @@ Global {13149F73-2CDB-4ECF-BF2C-403860045751}.Debug|Any CPU.Build.0 = Debug|Any CPU {13149F73-2CDB-4ECF-BF2C-403860045751}.Release|Any CPU.ActiveCfg = Release|Any CPU {13149F73-2CDB-4ECF-BF2C-403860045751}.Release|Any CPU.Build.0 = Release|Any CPU + {725D9986-ACD1-424E-AF4C-2BEB407D2BD9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {725D9986-ACD1-424E-AF4C-2BEB407D2BD9}.Debug|Any CPU.Build.0 = Debug|Any CPU + {725D9986-ACD1-424E-AF4C-2BEB407D2BD9}.Release|Any CPU.ActiveCfg = Release|Any CPU + {725D9986-ACD1-424E-AF4C-2BEB407D2BD9}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -123,6 +129,7 @@ Global {AB5B7C53-D7EC-4985-A6DE-70178E4B688A} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704} {64ED07BF-4D77-47CD-AF4F-5B4525686FA1} = {D21B282C-49E6-4A30-887B-9626D94B8D69} {13149F73-2CDB-4ECF-BF2C-403860045751} = {D21B282C-49E6-4A30-887B-9626D94B8D69} + {725D9986-ACD1-424E-AF4C-2BEB407D2BD9} = {D21B282C-49E6-4A30-887B-9626D94B8D69} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {3C6A0C44-FA63-4101-BBF9-2598641167D1} diff --git a/projects/Applications/CreateChannel/Program.cs b/projects/Applications/CreateChannel/Program.cs index 4ed61c6f9..01c33860c 100644 --- a/projects/Applications/CreateChannel/Program.cs +++ b/projects/Applications/CreateChannel/Program.cs @@ -30,11 +30,12 @@ //--------------------------------------------------------------------------- using System; +using System.Collections.Generic; using System.Diagnostics; -using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client; +using RabbitMQ.Client.Exceptions; namespace CreateChannel { @@ -44,49 +45,89 @@ public static class Program private const int ChannelsToOpen = 50; private static int channelsOpened; - private static AutoResetEvent doneEvent; public static async Task Main() { - doneEvent = new AutoResetEvent(false); + var doneTcs = new TaskCompletionSource(); var connectionFactory = new ConnectionFactory { }; await using IConnection connection = await connectionFactory.CreateConnectionAsync(); var watch = Stopwatch.StartNew(); - _ = Task.Run(async () => + var workTask = Task.Run(async () => { - var channels = new IChannel[ChannelsToOpen]; - for (int i = 0; i < Repeats; i++) + try { - for (int j = 0; j < channels.Length; j++) + var channelOpenTasks = new List>(); + var channelDisposeTasks = new List(); + var channels = new List(); + for (int i = 0; i < Repeats; i++) { - channels[j] = await connection.CreateChannelAsync(); - channelsOpened++; - } + for (int j = 0; j < ChannelsToOpen; j++) + { + channelOpenTasks.Add(connection.CreateChannelAsync()); + } - for (int j = 0; j < channels.Length; j++) - { - await channels[j].DisposeAsync(); + for (int j = 0; j < channelOpenTasks.Count; j++) + { + IChannel ch = await channelOpenTasks[j]; + if (j % 8 == 0) + { + try + { + await ch.QueueDeclarePassiveAsync(Guid.NewGuid().ToString()); + } + catch (OperationInterruptedException) + { + await ch.DisposeAsync(); + } + catch (Exception ex) + { + _ = Console.Error.WriteLineAsync($"{DateTime.Now:s} [ERROR] {ex}"); + } + } + else + { + channels.Add(ch); + channelsOpened++; + } + } + channelOpenTasks.Clear(); + + for (int j = 0; j < channels.Count; j++) + { + channelDisposeTasks.Add(channels[j].DisposeAsync()); + } + + for (int j = 0; j < channels.Count; j++) + { + await channelDisposeTasks[j]; + } + channelDisposeTasks.Clear(); } - } - doneEvent.Set(); + doneTcs.SetResult(true); + } + catch (Exception ex) + { + doneTcs.SetException(ex); + } }); Console.WriteLine($"{Repeats} times opening {ChannelsToOpen} channels on a connection. => Total channel open/close: {Repeats * ChannelsToOpen}"); Console.WriteLine(); Console.WriteLine("Opened"); - while (!doneEvent.WaitOne(500)) + while (false == doneTcs.Task.IsCompleted) { Console.WriteLine($"{channelsOpened,5}"); + await Task.Delay(150); } watch.Stop(); Console.WriteLine($"{channelsOpened,5}"); Console.WriteLine(); - Console.WriteLine($"Took {watch.Elapsed.TotalMilliseconds} ms"); + Console.WriteLine($"Took {watch.Elapsed}"); - Console.ReadLine(); + await workTask; } } } diff --git a/projects/Applications/GH-1647/GH-1647.csproj b/projects/Applications/GH-1647/GH-1647.csproj index f2591d159..f08f12982 100644 --- a/projects/Applications/GH-1647/GH-1647.csproj +++ b/projects/Applications/GH-1647/GH-1647.csproj @@ -9,7 +9,7 @@ - + diff --git a/projects/Applications/GH-1749/GH-1749.csproj b/projects/Applications/GH-1749/GH-1749.csproj new file mode 100644 index 000000000..9b238226a --- /dev/null +++ b/projects/Applications/GH-1749/GH-1749.csproj @@ -0,0 +1,15 @@ + + + + Exe + net8.0 + GH_1749 + enable + enable + + + + + + + diff --git a/projects/Applications/GH-1749/Program.cs b/projects/Applications/GH-1749/Program.cs new file mode 100644 index 000000000..d543e0263 --- /dev/null +++ b/projects/Applications/GH-1749/Program.cs @@ -0,0 +1,199 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v2.0: +// +//--------------------------------------------------------------------------- +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +//--------------------------------------------------------------------------- + +#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task + +using System.Runtime.ExceptionServices; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; +using RabbitMQ.Client.Exceptions; + +namespace GH_1749 +{ + class GH1749Consumer : AsyncDefaultBasicConsumer + { + public GH1749Consumer(IChannel channel) : base(channel) + { + } + + protected override Task OnCancelAsync(string[] consumerTags, CancellationToken cancellationToken = default) + { + Console.WriteLine("{0} [INFO] OnCancelAsync, tags[0]: {1}", DateTime.Now.ToString("s"), consumerTags[0]); + return base.OnCancelAsync(consumerTags, cancellationToken); + } + } + + static class Program + { + const string DefaultHostName = "localhost"; + const string ConnectionClientProvidedName = "GH_1749"; + static readonly CancellationTokenSource s_cancellationTokenSource = new(); + static readonly CancellationToken s_cancellationToken = s_cancellationTokenSource.Token; + + static async Task Main(string[] args) + { + string hostname = DefaultHostName; + if (args.Length > 0) + { + hostname = args[0]; + } + + AppDomain.CurrentDomain.FirstChanceException += CurrentDomain_FirstChanceException; + + ConnectionFactory connectionFactory = new() + { + HostName = hostname, + AutomaticRecoveryEnabled = false, + TopologyRecoveryEnabled = false, + RequestedConnectionTimeout = TimeSpan.FromSeconds(600), + RequestedHeartbeat = TimeSpan.FromSeconds(600), + UserName = "guest", + Password = "guest", + ClientProvidedName = ConnectionClientProvidedName + }; + + var channelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true); + await using var connection = await connectionFactory.CreateConnectionAsync(); + + connection.RecoverySucceededAsync += (object sender, AsyncEventArgs ea) => + { + Console.WriteLine("{0} [INFO] saw RecoverySucceededAsync, event: {1}", Now, ea); + return Task.CompletedTask; + }; + + connection.CallbackExceptionAsync += Connection_CallbackExceptionAsync; + + connection.ConnectionBlockedAsync += Connection_ConnectionBlockedAsync; + connection.ConnectionUnblockedAsync += Connection_ConnectionUnblockedAsync; + + connection.ConnectionRecoveryErrorAsync += Connection_ConnectionRecoveryErrorAsync; + + connection.ConnectionShutdownAsync += (object sender, ShutdownEventArgs ea) => + { + Console.WriteLine("{0} [INFO] saw ConnectionShutdownAsync, event: {1}", Now, ea); + return Task.CompletedTask; + }; + + connection.ConsumerTagChangeAfterRecoveryAsync += Connection_ConsumerTagChangeAfterRecoveryAsync; + connection.QueueNameChangedAfterRecoveryAsync += Connection_QueueNameChangedAfterRecoveryAsync; + + connection.RecoveringConsumerAsync += Connection_RecoveringConsumerAsync; + + await using var channel = await connection.CreateChannelAsync(options: channelOptions); + + channel.CallbackExceptionAsync += Channel_CallbackExceptionAsync; + channel.ChannelShutdownAsync += Channel_ChannelShutdownAsync; + + try + { + await channel.QueueDeclarePassiveAsync(Guid.NewGuid().ToString()); + } + catch (OperationInterruptedException) + { + await channel.DisposeAsync(); + // rabbitmq-dotnet-client-1749 + // await Task.Delay(2000); + } + } + + private static string Now => DateTime.Now.ToString("s"); + + private static Task Channel_CallbackExceptionAsync(object sender, CallbackExceptionEventArgs ea) + { + Console.WriteLine("{0} [INFO] channel saw CallbackExceptionAsync, event: {1}", Now, ea); + Console.WriteLine("{0} [INFO] channel CallbackExceptionAsync, exception: {1}", Now, ea.Exception); + return Task.CompletedTask; + } + + private static Task Channel_ChannelShutdownAsync(object sender, ShutdownEventArgs ea) + { + Console.WriteLine("{0} [INFO] saw ChannelShutdownAsync, event: {1}", Now, ea); + return Task.CompletedTask; + // rabbitmq-dotnet-client-1749 + // return Task.Delay(1000); + } + + private static void CurrentDomain_FirstChanceException(object? sender, FirstChanceExceptionEventArgs e) + { + if (e.Exception is ObjectDisposedException) + { + Console.WriteLine("{0} [INFO] saw FirstChanceException, exception: {1}", Now, e.Exception); + } + } + + private static Task Connection_CallbackExceptionAsync(object sender, CallbackExceptionEventArgs ea) + { + Console.WriteLine("{0} [INFO] connection saw CallbackExceptionAsync, event: {1}", Now, ea); + Console.WriteLine("{0} [INFO] connection CallbackExceptionAsync, exception: {1}", Now, ea.Exception); + return Task.CompletedTask; + } + + private static Task Connection_ConnectionBlockedAsync(object sender, ConnectionBlockedEventArgs ea) + { + Console.WriteLine("{0} [INFO] saw ConnectionBlockedAsync, event: {1}", Now, ea); + return Task.CompletedTask; + } + + private static Task Connection_ConnectionUnblockedAsync(object sender, AsyncEventArgs ea) + { + Console.WriteLine("{0} [INFO] saw ConnectionUnlockedAsync, event: {1}", Now, ea); + return Task.CompletedTask; + } + + private static Task Connection_ConnectionRecoveryErrorAsync(object sender, ConnectionRecoveryErrorEventArgs ea) + { + Console.WriteLine("{0} [INFO] saw ConnectionRecoveryErrorAsync, event: {1}", Now, ea); + Console.WriteLine("{0} [INFO] ConnectionRecoveryErrorAsync, exception: {1}", Now, ea.Exception); + return Task.CompletedTask; + } + + private static Task Connection_ConsumerTagChangeAfterRecoveryAsync(object sender, ConsumerTagChangedAfterRecoveryEventArgs ea) + { + Console.WriteLine("{0} [INFO] saw ConsumerTagChangeAfterRecoveryAsync, event: {1}", Now, ea); + Console.WriteLine("{0} [INFO] ConsumerTagChangeAfterRecoveryAsync, tags: {1} {2}", Now, ea.TagBefore, ea.TagAfter); + return Task.CompletedTask; + } + + private static Task Connection_QueueNameChangedAfterRecoveryAsync(object sender, QueueNameChangedAfterRecoveryEventArgs ea) + { + Console.WriteLine("{0} [INFO] saw QueueNameChangedAfterRecoveryAsync, event: {1}", Now, ea); + Console.WriteLine("{0} [INFO] QueueNameChangedAfterRecoveryAsync, queue names: {1} {2}", Now, ea.NameBefore, ea.NameAfter); + return Task.CompletedTask; + } + + private static Task Connection_RecoveringConsumerAsync(object sender, RecoveringConsumerEventArgs ea) + { + Console.WriteLine("{0} [INFO] saw RecoveringConsumerAsync, event: {1}, tag: {2}", Now, ea, ea.ConsumerTag); + return Task.CompletedTask; + } + } +} + diff --git a/projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs b/projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs index 963fced57..761ec8bc9 100644 --- a/projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs +++ b/projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs @@ -202,6 +202,15 @@ protected override Task InternalShutdownAsync() protected abstract Task ProcessChannelAsync(); + protected enum WorkType : byte + { + Shutdown, + Cancel, + CancelOk, + Deliver, + ConsumeOk + } + protected readonly struct WorkStruct : IDisposable { public readonly IAsyncBasicConsumer Consumer; @@ -276,42 +285,35 @@ public static WorkStruct CreateDeliver(IAsyncBasicConsumer consumer, string cons public void Dispose() => Body.Dispose(); } - protected enum WorkType : byte + public void Dispose() { - Shutdown, - Cancel, - CancelOk, - Deliver, - ConsumeOk + // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method + Dispose(disposing: true); + GC.SuppressFinalize(this); } protected virtual void Dispose(bool disposing) { - if (!_disposed) + if (_disposed) { - try - { - if (disposing) - { - Quiesce(); - } - } - catch - { - // CHOMP - } - finally + return; + } + + try + { + if (disposing) { - _disposed = true; + Quiesce(); } } - } - - public void Dispose() - { - // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method - Dispose(disposing: true); - GC.SuppressFinalize(this); + catch + { + // CHOMP + } + finally + { + _disposed = true; + } } } } diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs index 26d77f8ea..454df1a5e 100644 --- a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs @@ -47,7 +47,10 @@ internal sealed class AutorecoveringChannel : IChannel, IRecoverable private AutorecoveringConnection _connection; private RecoveryAwareChannel _innerChannel; + private bool _disposed; + private bool _isDisposing; + private readonly object _locker = new(); private ushort _prefetchCountConsumer; private ushort _prefetchCountGlobal; @@ -252,7 +255,15 @@ await _connection.DeleteRecordedChannelAsync(this, public override string ToString() => InnerChannel.ToString(); - public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult(); + public void Dispose() + { + if (_disposed) + { + return; + } + + DisposeAsync().AsTask().GetAwaiter().GetResult(); + } public async ValueTask DisposeAsync() { @@ -261,14 +272,30 @@ public async ValueTask DisposeAsync() return; } - if (IsOpen) + lock (_locker) { - await this.AbortAsync() - .ConfigureAwait(false); + if (_isDisposing) + { + return; + } + _isDisposing = true; } - _recordedConsumerTags.Clear(); - _disposed = true; + try + { + if (IsOpen) + { + await this.AbortAsync() + .ConfigureAwait(false); + } + + _recordedConsumerTags.Clear(); + } + finally + { + _disposed = true; + _isDisposing = false; + } } public ValueTask GetNextPublishSequenceNumberAsync(CancellationToken cancellationToken = default) => InnerChannel.GetNextPublishSequenceNumberAsync(cancellationToken); diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs index 61df604f7..23878c64c 100644 --- a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs +++ b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs @@ -268,7 +268,15 @@ await RecordChannelAsync(autorecoveringChannel, channelsSemaphoreHeld: false, ca return autorecoveringChannel; } - public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult(); + public void Dispose() + { + if (_disposed) + { + return; + } + + DisposeAsync().AsTask().GetAwaiter().GetResult(); + } public async ValueTask DisposeAsync() { @@ -288,10 +296,17 @@ await _innerConnection.DisposeAsync() } finally { - _channels.Clear(); - _recordedEntitiesSemaphore.Dispose(); - _channelsSemaphore.Dispose(); - _recoveryCancellationTokenSource.Dispose(); + try + { + _channels.Clear(); + _recordedEntitiesSemaphore.Dispose(); + _channelsSemaphore.Dispose(); + _recoveryCancellationTokenSource.Dispose(); + } + catch + { + } + _disposed = true; } } diff --git a/projects/RabbitMQ.Client/Impl/Channel.cs b/projects/RabbitMQ.Client/Impl/Channel.cs index 4acc3da62..532609723 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.cs @@ -57,10 +57,15 @@ internal partial class Channel : IChannel, IRecoverable private readonly RpcContinuationQueue _continuationQueue = new RpcContinuationQueue(); private ShutdownEventArgs? _closeReason; - public ShutdownEventArgs? CloseReason => Volatile.Read(ref _closeReason); + private TaskCompletionSource? _serverOriginatedChannelCloseTcs; internal readonly IConsumerDispatcher ConsumerDispatcher; + private bool _disposed; + private bool _isDisposing; + + private readonly object _locker = new(); + public Channel(ISession session, CreateChannelOptions createChannelOptions) { ContinuationTimeout = createChannelOptions.ContinuationTimeout; @@ -82,6 +87,8 @@ public Channel(ISession session, CreateChannelOptions createChannelOptions) internal TimeSpan HandshakeContinuationTimeout { get; set; } = TimeSpan.FromSeconds(10); + public ShutdownEventArgs? CloseReason => Volatile.Read(ref _closeReason); + public TimeSpan ContinuationTimeout { get; set; } public event AsyncEventHandler BasicAcksAsync @@ -182,17 +189,6 @@ public void MaybeSetConnectionStartException(Exception ex) } } - protected void TakeOver(Channel other) - { - _basicAcksAsyncWrapper.Takeover(other._basicAcksAsyncWrapper); - _basicNacksAsyncWrapper.Takeover(other._basicNacksAsyncWrapper); - _basicReturnAsyncWrapper.Takeover(other._basicReturnAsyncWrapper); - _callbackExceptionAsyncWrapper.Takeover(other._callbackExceptionAsyncWrapper); - _flowControlAsyncWrapper.Takeover(other._flowControlAsyncWrapper); - _channelShutdownAsyncWrapper.Takeover(other._channelShutdownAsyncWrapper); - _recoveryAsyncWrapper.Takeover(other._recoveryAsyncWrapper); - } - public Task CloseAsync(ushort replyCode, string replyText, bool abort, CancellationToken cancellationToken) { @@ -345,20 +341,6 @@ await ModelSendAsync(in method, k.CancellationToken) } } - protected bool Enqueue(IRpcContinuation k) - { - if (IsOpen) - { - _continuationQueue.Enqueue(k); - return true; - } - else - { - k.HandleChannelShutdown(CloseReason); - return false; - } - } - internal async Task OpenAsync(CreateChannelOptions createChannelOptions, CancellationToken cancellationToken) { @@ -409,30 +391,15 @@ await Session.CloseAsync(reason) m_connectionStartCell?.TrySetResult(null); } - private async Task HandleCommandAsync(IncomingCommand cmd, CancellationToken cancellationToken) + protected void TakeOver(Channel other) { - /* - * If DispatchCommandAsync returns `true`, it means that the incoming command is server-originated, and has - * already been handled. - * - * Else, the incoming command is the return of an RPC call, and must be handled. - */ - try - { - if (false == await DispatchCommandAsync(cmd, cancellationToken) - .ConfigureAwait(false)) - { - using (IRpcContinuation c = _continuationQueue.Next()) - { - await c.HandleCommandAsync(cmd) - .ConfigureAwait(false); - } - } - } - finally - { - cmd.ReturnBuffers(); - } + _basicAcksAsyncWrapper.Takeover(other._basicAcksAsyncWrapper); + _basicNacksAsyncWrapper.Takeover(other._basicNacksAsyncWrapper); + _basicReturnAsyncWrapper.Takeover(other._basicReturnAsyncWrapper); + _callbackExceptionAsyncWrapper.Takeover(other._callbackExceptionAsyncWrapper); + _flowControlAsyncWrapper.Takeover(other._flowControlAsyncWrapper); + _channelShutdownAsyncWrapper.Takeover(other._channelShutdownAsyncWrapper); + _recoveryAsyncWrapper.Takeover(other._recoveryAsyncWrapper); } [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -454,6 +421,20 @@ internal Task OnCallbackExceptionAsync(CallbackExceptionEventArgs args) return _callbackExceptionAsyncWrapper.InvokeAsync(this, args); } + private bool Enqueue(IRpcContinuation k) + { + if (IsOpen) + { + _continuationQueue.Enqueue(k); + return true; + } + else + { + k.HandleChannelShutdown(CloseReason); + return false; + } + } + ///Broadcasts notification of the final shutdown of the channel. /// /// @@ -514,347 +495,182 @@ public override string ToString() void IDisposable.Dispose() { + if (_disposed) + { + return; + } + Dispose(true); } - protected virtual void Dispose(bool disposing) + public async ValueTask DisposeAsync() { - if (disposing) + if (_disposed) { - if (IsOpen) - { - this.AbortAsync().GetAwaiter().GetResult(); - } - - ConsumerDispatcher.Dispose(); - _rpcSemaphore.Dispose(); - _confirmSemaphore.Dispose(); - _outstandingPublisherConfirmationsRateLimiter?.Dispose(); + return; } - } - public async ValueTask DisposeAsync() - { - await DisposeAsyncCore() + await DisposeAsyncCore(true) .ConfigureAwait(false); Dispose(false); } - protected virtual async ValueTask DisposeAsyncCore() + protected virtual void Dispose(bool disposing) { - if (IsOpen) + if (_disposed) { - await this.AbortAsync().ConfigureAwait(false); + return; } - ConsumerDispatcher.Dispose(); - _rpcSemaphore.Dispose(); - _confirmSemaphore.Dispose(); - if (_outstandingPublisherConfirmationsRateLimiter is not null) + if (disposing) { - await _outstandingPublisherConfirmationsRateLimiter.DisposeAsync() - .ConfigureAwait(false); - } - } - - public Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heartbeat, CancellationToken cancellationToken) - { - var method = new ConnectionTuneOk(channelMax, frameMax, heartbeat); - return ModelSendAsync(in method, cancellationToken).AsTask(); - } + lock (_locker) + { + if (_isDisposing) + { + return; + } + _isDisposing = true; + } - protected async Task HandleBasicAck(IncomingCommand cmd, - CancellationToken cancellationToken = default) - { - var ack = new BasicAck(cmd.MethodSpan); - if (!_basicAcksAsyncWrapper.IsEmpty) - { - var args = new BasicAckEventArgs(ack._deliveryTag, ack._multiple, cancellationToken); - await _basicAcksAsyncWrapper.InvokeAsync(this, args) - .ConfigureAwait(false); - } + try + { + MaybeAbort(); - HandleAck(ack._deliveryTag, ack._multiple); + MaybeDisposeOutstandingPublisherConfirmationsRateLimiter(); - return true; + MaybeWaitForServerOriginatedClose(); + } + finally + { + try + { + ConsumerDispatcher.Dispose(); + _rpcSemaphore.Dispose(); + _confirmSemaphore.Dispose(); + } + catch + { + } + _disposed = true; + _isDisposing = false; + } + } } - protected async Task HandleBasicNack(IncomingCommand cmd, - CancellationToken cancellationToken = default) + protected virtual async ValueTask DisposeAsyncCore(bool disposing) { - var nack = new BasicNack(cmd.MethodSpan); - if (!_basicNacksAsyncWrapper.IsEmpty) + if (_disposed) { - var args = new BasicNackEventArgs( - nack._deliveryTag, nack._multiple, nack._requeue, cancellationToken); - await _basicNacksAsyncWrapper.InvokeAsync(this, args) - .ConfigureAwait(false); + return; } - HandleNack(nack._deliveryTag, nack._multiple, false); - - return true; - } + if (disposing) + { + lock (_locker) + { + if (_isDisposing) + { + return; + } + _isDisposing = true; + } - protected async Task HandleBasicReturn(IncomingCommand cmd, CancellationToken cancellationToken) - { - var basicReturn = new BasicReturn(cmd.MethodSpan); + try + { + await MaybeAbortAsync() + .ConfigureAwait(false); - var e = new BasicReturnEventArgs(basicReturn._replyCode, basicReturn._replyText, - basicReturn._exchange, basicReturn._routingKey, - new ReadOnlyBasicProperties(cmd.HeaderSpan), cmd.Body.Memory, cancellationToken); + await MaybeDisposeOutstandingPublisherConfirmationsRateLimiterAsync() + .ConfigureAwait(false); - if (!_basicReturnAsyncWrapper.IsEmpty) - { - await _basicReturnAsyncWrapper.InvokeAsync(this, e) - .ConfigureAwait(false); + await MaybeWaitForServerOriginatedCloseAsync() + .ConfigureAwait(false); + } + finally + { + try + { + ConsumerDispatcher.Dispose(); + _rpcSemaphore.Dispose(); + _confirmSemaphore.Dispose(); + } + catch + { + } + _disposed = true; + _isDisposing = false; + } } + } - HandleReturn(e); + public Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heartbeat, CancellationToken cancellationToken) + { + var method = new ConnectionTuneOk(channelMax, frameMax, heartbeat); + return ModelSendAsync(in method, cancellationToken).AsTask(); + } - return true; + protected virtual ulong AdjustDeliveryTag(ulong deliveryTag) + { + return deliveryTag; } - protected async Task HandleBasicCancelAsync(IncomingCommand cmd, CancellationToken cancellationToken) + public virtual ValueTask BasicAckAsync(ulong deliveryTag, bool multiple, + CancellationToken cancellationToken) { - string consumerTag = new BasicCancel(cmd.MethodSpan)._consumerTag; - await ConsumerDispatcher.HandleBasicCancelAsync(consumerTag, cancellationToken) - .ConfigureAwait(false); - return true; + var method = new BasicAck(deliveryTag, multiple); + return ModelSendAsync(in method, cancellationToken); } - protected async Task HandleBasicDeliverAsync(IncomingCommand cmd, CancellationToken cancellationToken) + public virtual ValueTask BasicNackAsync(ulong deliveryTag, bool multiple, bool requeue, + CancellationToken cancellationToken) { - var method = new BasicDeliver(cmd.MethodSpan); - var header = new ReadOnlyBasicProperties(cmd.HeaderSpan); - await ConsumerDispatcher.HandleBasicDeliverAsync( - method._consumerTag, - AdjustDeliveryTag(method._deliveryTag), - method._redelivered, - method._exchange, - method._routingKey, - header, - /* - * Takeover Body so it doesn't get returned as it is necessary - * for handling the Basic.Deliver method by client code. - */ - cmd.TakeoverBody(), - cancellationToken).ConfigureAwait(false); - return true; + var method = new BasicNack(deliveryTag, multiple, requeue); + return ModelSendAsync(in method, cancellationToken); } - protected virtual ulong AdjustDeliveryTag(ulong deliveryTag) + public virtual ValueTask BasicRejectAsync(ulong deliveryTag, bool requeue, + CancellationToken cancellationToken) { - return deliveryTag; + var method = new BasicReject(deliveryTag, requeue); + return ModelSendAsync(in method, cancellationToken); } - protected async Task HandleChannelCloseAsync(IncomingCommand cmd, CancellationToken cancellationToken) + public async Task BasicCancelAsync(string consumerTag, bool noWait, + CancellationToken cancellationToken) { - var channelClose = new ChannelClose(cmd.MethodSpan); - SetCloseReason(new ShutdownEventArgs(ShutdownInitiator.Peer, - channelClose._replyCode, - channelClose._replyText, - channelClose._classId, - channelClose._methodId)); + bool enqueued = false; + // NOTE: + // Maybe don't dispose these instances because the CancellationTokens must remain + // valid for processing the response. + var k = new BasicCancelAsyncRpcContinuation(consumerTag, ConsumerDispatcher, + ContinuationTimeout, cancellationToken); - await Session.CloseAsync(_closeReason, notify: false) + await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); + try + { + var method = new BasicCancel(consumerTag, noWait); - var method = new ChannelCloseOk(); - await ModelSendAsync(in method, cancellationToken) - .ConfigureAwait(false); + if (noWait) + { + await ModelSendAsync(in method, k.CancellationToken) + .ConfigureAwait(false); + ConsumerDispatcher.GetAndRemoveConsumer(consumerTag); + } + else + { + enqueued = Enqueue(k); - await Session.NotifyAsync(cancellationToken) - .ConfigureAwait(false); - return true; - } + await ModelSendAsync(in method, k.CancellationToken) + .ConfigureAwait(false); - protected async Task HandleChannelCloseOkAsync(IncomingCommand cmd, CancellationToken cancellationToken) - { - /* - * Note: - * This call _must_ come before completing the async continuation - */ - await FinishCloseAsync(cancellationToken) - .ConfigureAwait(false); + bool result = await k; + Debug.Assert(result); + } - if (_continuationQueue.TryPeek(out ChannelCloseAsyncRpcContinuation? k)) - { - _continuationQueue.Next(); - await k.HandleCommandAsync(cmd) - .ConfigureAwait(false); - } - - return true; - } - - protected async Task HandleChannelFlowAsync(IncomingCommand cmd, CancellationToken cancellationToken) - { - bool active = new ChannelFlow(cmd.MethodSpan)._active; - if (active) - { - _flowControlBlock.Set(); - } - else - { - _flowControlBlock.Reset(); - } - - var method = new ChannelFlowOk(active); - await ModelSendAsync(in method, cancellationToken). - ConfigureAwait(false); - - if (!_flowControlAsyncWrapper.IsEmpty) - { - await _flowControlAsyncWrapper.InvokeAsync(this, new FlowControlEventArgs(active, cancellationToken)) - .ConfigureAwait(false); - } - - return true; - } - - protected async Task HandleConnectionBlockedAsync(IncomingCommand cmd, CancellationToken cancellationToken) - { - string reason = new ConnectionBlocked(cmd.MethodSpan)._reason; - await Session.Connection.HandleConnectionBlockedAsync(reason, cancellationToken) - .ConfigureAwait(false); - return true; - } - - protected async Task HandleConnectionCloseAsync(IncomingCommand cmd, CancellationToken cancellationToken) - { - var method = new ConnectionClose(cmd.MethodSpan); - var reason = new ShutdownEventArgs(ShutdownInitiator.Peer, method._replyCode, method._replyText, method._classId, method._methodId); - try - { - await Session.Connection.ClosedViaPeerAsync(reason, cancellationToken) - .ConfigureAwait(false); - - var replyMethod = new ConnectionCloseOk(); - await ModelSendAsync(in replyMethod, cancellationToken) - .ConfigureAwait(false); - - SetCloseReason(Session.Connection.CloseReason!); - } - catch (IOException) - { - // Ignored. We're only trying to be polite by sending - // the close-ok, after all. - } - catch (AlreadyClosedException) - { - // Ignored. We're only trying to be polite by sending - // the close-ok, after all. - } - - return true; - } - - protected async Task HandleConnectionSecureAsync(IncomingCommand cmd, CancellationToken cancellationToken) - { - var k = (ConnectionSecureOrTuneAsyncRpcContinuation)_continuationQueue.Next(); - await k.HandleCommandAsync(new IncomingCommand()) - .ConfigureAwait(false); // release the continuation. - return true; - } - - protected async Task HandleConnectionStartAsync(IncomingCommand cmd, CancellationToken cancellationToken) - { - if (m_connectionStartCell is null) - { - var reason = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.CommandInvalid, "Unexpected Connection.Start"); - await Session.Connection.CloseAsync(reason, false, - InternalConstants.DefaultConnectionCloseTimeout, - cancellationToken) - .ConfigureAwait(false); - } - else - { - var method = new ConnectionStart(cmd.MethodSpan); - var details = new ConnectionStartDetails(method._locales, method._mechanisms, - method._serverProperties, method._versionMajor, method._versionMinor); - m_connectionStartCell.SetResult(details); - m_connectionStartCell = null; - } - - return true; - } - - protected async Task HandleConnectionTuneAsync(IncomingCommand cmd, CancellationToken cancellationToken) - { - // Note: `using` here to ensure instance is disposed - using var k = (ConnectionSecureOrTuneAsyncRpcContinuation)_continuationQueue.Next(); - - // Note: releases the continuation and returns the buffers - await k.HandleCommandAsync(cmd) - .ConfigureAwait(false); - - return true; - } - - protected async Task HandleConnectionUnblockedAsync(CancellationToken cancellationToken) - { - await Session.Connection.HandleConnectionUnblockedAsync(cancellationToken) - .ConfigureAwait(false); - return true; - } - - public virtual ValueTask BasicAckAsync(ulong deliveryTag, bool multiple, - CancellationToken cancellationToken) - { - var method = new BasicAck(deliveryTag, multiple); - return ModelSendAsync(in method, cancellationToken); - } - - public virtual ValueTask BasicNackAsync(ulong deliveryTag, bool multiple, bool requeue, - CancellationToken cancellationToken) - { - var method = new BasicNack(deliveryTag, multiple, requeue); - return ModelSendAsync(in method, cancellationToken); - } - - public virtual ValueTask BasicRejectAsync(ulong deliveryTag, bool requeue, - CancellationToken cancellationToken) - { - var method = new BasicReject(deliveryTag, requeue); - return ModelSendAsync(in method, cancellationToken); - } - - public async Task BasicCancelAsync(string consumerTag, bool noWait, - CancellationToken cancellationToken) - { - bool enqueued = false; - // NOTE: - // Maybe don't dispose these instances because the CancellationTokens must remain - // valid for processing the response. - var k = new BasicCancelAsyncRpcContinuation(consumerTag, ConsumerDispatcher, - ContinuationTimeout, cancellationToken); - - await _rpcSemaphore.WaitAsync(k.CancellationToken) - .ConfigureAwait(false); - try - { - var method = new BasicCancel(consumerTag, noWait); - - if (noWait) - { - await ModelSendAsync(in method, k.CancellationToken) - .ConfigureAwait(false); - ConsumerDispatcher.GetAndRemoveConsumer(consumerTag); - } - else - { - enqueued = Enqueue(k); - - await ModelSendAsync(in method, k.CancellationToken) - .ConfigureAwait(false); - - bool result = await k; - Debug.Assert(result); - } - - return; + return; } finally { @@ -1500,6 +1316,32 @@ internal static Task CreateAndOpenAsync(CreateChannelOptions createCha return channel.OpenAsync(createChannelOptions, cancellationToken); } + private async Task HandleCommandAsync(IncomingCommand cmd, CancellationToken cancellationToken) + { + /* + * If DispatchCommandAsync returns `true`, it means that the incoming command is server-originated, and has + * already been handled. + * + * Else, the incoming command is the return of an RPC call, and must be handled. + */ + try + { + if (false == await DispatchCommandAsync(cmd, cancellationToken) + .ConfigureAwait(false)) + { + using (IRpcContinuation c = _continuationQueue.Next()) + { + await c.HandleCommandAsync(cmd) + .ConfigureAwait(false); + } + } + } + finally + { + cmd.ReturnBuffers(); + } + } + /// /// Returning true from this method means that the command was server-originated, /// and handled already. @@ -1587,5 +1429,329 @@ private Task DispatchCommandAsync(IncomingCommand cmd, CancellationToken c } } } + + private async Task HandleBasicAck(IncomingCommand cmd, + CancellationToken cancellationToken = default) + { + var ack = new BasicAck(cmd.MethodSpan); + if (!_basicAcksAsyncWrapper.IsEmpty) + { + var args = new BasicAckEventArgs(ack._deliveryTag, ack._multiple, cancellationToken); + await _basicAcksAsyncWrapper.InvokeAsync(this, args) + .ConfigureAwait(false); + } + + HandleAck(ack._deliveryTag, ack._multiple); + + return true; + } + + private async Task HandleBasicNack(IncomingCommand cmd, + CancellationToken cancellationToken = default) + { + var nack = new BasicNack(cmd.MethodSpan); + if (!_basicNacksAsyncWrapper.IsEmpty) + { + var args = new BasicNackEventArgs( + nack._deliveryTag, nack._multiple, nack._requeue, cancellationToken); + await _basicNacksAsyncWrapper.InvokeAsync(this, args) + .ConfigureAwait(false); + } + + HandleNack(nack._deliveryTag, nack._multiple, false); + + return true; + } + + private async Task HandleBasicReturn(IncomingCommand cmd, CancellationToken cancellationToken) + { + var basicReturn = new BasicReturn(cmd.MethodSpan); + + var e = new BasicReturnEventArgs(basicReturn._replyCode, basicReturn._replyText, + basicReturn._exchange, basicReturn._routingKey, + new ReadOnlyBasicProperties(cmd.HeaderSpan), cmd.Body.Memory, cancellationToken); + + if (!_basicReturnAsyncWrapper.IsEmpty) + { + await _basicReturnAsyncWrapper.InvokeAsync(this, e) + .ConfigureAwait(false); + } + + HandleReturn(e); + + return true; + } + + private async Task HandleBasicCancelAsync(IncomingCommand cmd, CancellationToken cancellationToken) + { + string consumerTag = new BasicCancel(cmd.MethodSpan)._consumerTag; + await ConsumerDispatcher.HandleBasicCancelAsync(consumerTag, cancellationToken) + .ConfigureAwait(false); + return true; + } + + private async Task HandleBasicDeliverAsync(IncomingCommand cmd, CancellationToken cancellationToken) + { + var method = new BasicDeliver(cmd.MethodSpan); + var header = new ReadOnlyBasicProperties(cmd.HeaderSpan); + await ConsumerDispatcher.HandleBasicDeliverAsync( + method._consumerTag, + AdjustDeliveryTag(method._deliveryTag), + method._redelivered, + method._exchange, + method._routingKey, + header, + /* + * Takeover Body so it doesn't get returned as it is necessary + * for handling the Basic.Deliver method by client code. + */ + cmd.TakeoverBody(), + cancellationToken).ConfigureAwait(false); + return true; + } + + private async Task HandleChannelCloseAsync(IncomingCommand cmd, CancellationToken cancellationToken) + { + lock (_locker) + { + _serverOriginatedChannelCloseTcs ??= new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + } + + try + { + var channelClose = new ChannelClose(cmd.MethodSpan); + SetCloseReason(new ShutdownEventArgs(ShutdownInitiator.Peer, + channelClose._replyCode, + channelClose._replyText, + channelClose._classId, + channelClose._methodId)); + + await Session.CloseAsync(_closeReason, notify: false) + .ConfigureAwait(false); + + var method = new ChannelCloseOk(); + await ModelSendAsync(in method, cancellationToken) + .ConfigureAwait(false); + + await Session.NotifyAsync(cancellationToken) + .ConfigureAwait(false); + + _serverOriginatedChannelCloseTcs.TrySetResult(true); + return true; + } + catch (Exception ex) + { + _serverOriginatedChannelCloseTcs.TrySetException(ex); + throw; + } + } + + private async Task HandleChannelCloseOkAsync(IncomingCommand cmd, CancellationToken cancellationToken) + { + /* + * Note: + * This call _must_ come before completing the async continuation + */ + await FinishCloseAsync(cancellationToken) + .ConfigureAwait(false); + + if (_continuationQueue.TryPeek(out ChannelCloseAsyncRpcContinuation? k)) + { + _continuationQueue.Next(); + await k.HandleCommandAsync(cmd) + .ConfigureAwait(false); + } + + return true; + } + + private async Task HandleChannelFlowAsync(IncomingCommand cmd, CancellationToken cancellationToken) + { + bool active = new ChannelFlow(cmd.MethodSpan)._active; + if (active) + { + _flowControlBlock.Set(); + } + else + { + _flowControlBlock.Reset(); + } + + var method = new ChannelFlowOk(active); + await ModelSendAsync(in method, cancellationToken). + ConfigureAwait(false); + + if (!_flowControlAsyncWrapper.IsEmpty) + { + await _flowControlAsyncWrapper.InvokeAsync(this, new FlowControlEventArgs(active, cancellationToken)) + .ConfigureAwait(false); + } + + return true; + } + + private async Task HandleConnectionBlockedAsync(IncomingCommand cmd, CancellationToken cancellationToken) + { + string reason = new ConnectionBlocked(cmd.MethodSpan)._reason; + await Session.Connection.HandleConnectionBlockedAsync(reason, cancellationToken) + .ConfigureAwait(false); + return true; + } + + private async Task HandleConnectionCloseAsync(IncomingCommand cmd, CancellationToken cancellationToken) + { + var method = new ConnectionClose(cmd.MethodSpan); + var reason = new ShutdownEventArgs(ShutdownInitiator.Peer, method._replyCode, method._replyText, method._classId, method._methodId); + try + { + await Session.Connection.ClosedViaPeerAsync(reason, cancellationToken) + .ConfigureAwait(false); + + var replyMethod = new ConnectionCloseOk(); + await ModelSendAsync(in replyMethod, cancellationToken) + .ConfigureAwait(false); + + SetCloseReason(Session.Connection.CloseReason!); + } + catch (IOException) + { + // Ignored. We're only trying to be polite by sending + // the close-ok, after all. + } + catch (AlreadyClosedException) + { + // Ignored. We're only trying to be polite by sending + // the close-ok, after all. + } + + return true; + } + + private async Task HandleConnectionSecureAsync(IncomingCommand cmd, CancellationToken cancellationToken) + { + var k = (ConnectionSecureOrTuneAsyncRpcContinuation)_continuationQueue.Next(); + await k.HandleCommandAsync(new IncomingCommand()) + .ConfigureAwait(false); // release the continuation. + return true; + } + + private async Task HandleConnectionStartAsync(IncomingCommand cmd, CancellationToken cancellationToken) + { + if (m_connectionStartCell is null) + { + var reason = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.CommandInvalid, "Unexpected Connection.Start"); + await Session.Connection.CloseAsync(reason, false, + InternalConstants.DefaultConnectionCloseTimeout, + cancellationToken) + .ConfigureAwait(false); + } + else + { + var method = new ConnectionStart(cmd.MethodSpan); + var details = new ConnectionStartDetails(method._locales, method._mechanisms, + method._serverProperties, method._versionMajor, method._versionMinor); + m_connectionStartCell.SetResult(details); + m_connectionStartCell = null; + } + + return true; + } + + private async Task HandleConnectionTuneAsync(IncomingCommand cmd, CancellationToken cancellationToken) + { + // Note: `using` here to ensure instance is disposed + using var k = (ConnectionSecureOrTuneAsyncRpcContinuation)_continuationQueue.Next(); + + // Note: releases the continuation and returns the buffers + await k.HandleCommandAsync(cmd) + .ConfigureAwait(false); + + return true; + } + + private async Task HandleConnectionUnblockedAsync(CancellationToken cancellationToken) + { + await Session.Connection.HandleConnectionUnblockedAsync(cancellationToken) + .ConfigureAwait(false); + return true; + } + + private void MaybeAbort() + { + if (IsOpen) + { + this.AbortAsync().GetAwaiter().GetResult(); + } + } + + private Task MaybeAbortAsync() + { + if (IsOpen) + { + return this.AbortAsync(); + } + else + { + return Task.CompletedTask; + } + } + + private void MaybeDisposeOutstandingPublisherConfirmationsRateLimiter() + { + if (_outstandingPublisherConfirmationsRateLimiter is not null) + { + try + { + _outstandingPublisherConfirmationsRateLimiter.Dispose(); + } + catch + { + } + } + } + + private async Task MaybeDisposeOutstandingPublisherConfirmationsRateLimiterAsync() + { + if (_outstandingPublisherConfirmationsRateLimiter is not null) + { + try + { + await _outstandingPublisherConfirmationsRateLimiter.DisposeAsync() + .ConfigureAwait(false); + } + catch + { + } + } + } + + private void MaybeWaitForServerOriginatedClose() + { + if (_serverOriginatedChannelCloseTcs is not null) + { + try + { + _serverOriginatedChannelCloseTcs.Task.Wait(TimeSpan.FromSeconds(5)); + } + catch + { + } + } + } + + private async Task MaybeWaitForServerOriginatedCloseAsync() + { + if (_serverOriginatedChannelCloseTcs is not null) + { + try + { + await _serverOriginatedChannelCloseTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)) + .ConfigureAwait(false); + } + catch + { + } + } + } } } diff --git a/projects/RabbitMQ.Client/Impl/Connection.cs b/projects/RabbitMQ.Client/Impl/Connection.cs index 68cadcfd7..6bf2a335e 100644 --- a/projects/RabbitMQ.Client/Impl/Connection.cs +++ b/projects/RabbitMQ.Client/Impl/Connection.cs @@ -46,6 +46,9 @@ namespace RabbitMQ.Client.Framing internal sealed partial class Connection : IConnection { private bool _disposed; + private bool _isDisposing; + private readonly object _isDisposingLock = new(); + private volatile bool _closed; private readonly ConnectionConfig _config; @@ -485,7 +488,15 @@ internal ValueTask WriteAsync(RentedMemory frames, CancellationToken cancellatio return _frameHandler.WriteAsync(frames, cancellationToken); } - public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult(); + public void Dispose() + { + if (_disposed) + { + return; + } + + DisposeAsync().AsTask().GetAwaiter().GetResult(); + } public async ValueTask DisposeAsync() { @@ -494,6 +505,15 @@ public async ValueTask DisposeAsync() return; } + lock (_isDisposingLock) + { + if (_isDisposing) + { + return; + } + _isDisposing = true; + } + try { if (IsOpen) @@ -515,6 +535,7 @@ await _channel0.DisposeAsync() finally { _disposed = true; + _isDisposing = false; } } diff --git a/projects/Test/Integration/TestChannelShutdown.cs b/projects/Test/Integration/TestChannelShutdown.cs index 1774555b3..3b2e6f2d3 100644 --- a/projects/Test/Integration/TestChannelShutdown.cs +++ b/projects/Test/Integration/TestChannelShutdown.cs @@ -30,6 +30,8 @@ //--------------------------------------------------------------------------- using System; +using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client; using RabbitMQ.Client.Impl; @@ -61,5 +63,52 @@ public async Task TestConsumerDispatcherShutdown() await WaitAsync(tcs, TimeSpan.FromSeconds(5), "channel shutdown"); Assert.True(autorecoveringChannel.ConsumerDispatcher.IsShutdown, "dispatcher should be shut down after CloseAsync"); } + + [Fact] + public async Task TestConcurrentDisposeAsync_GH1749() + { + bool sawCallbackException = false; + int channelShutdownCount = 0; + + _channel.CallbackExceptionAsync += (channel, ea) => + { + sawCallbackException = true; + return Task.CompletedTask; + }; + + _channel.ChannelShutdownAsync += (channel, args) => + { + Interlocked.Increment(ref channelShutdownCount); + return Task.CompletedTask; + }; + + var disposeTasks = new List + { + _channel.DisposeAsync(), + _channel.DisposeAsync(), + _channel.DisposeAsync() + }; + + foreach (ValueTask vt in disposeTasks) + { + await vt; + } + + Assert.Equal(1, channelShutdownCount); + Assert.False(sawCallbackException); + + disposeTasks.Clear(); + disposeTasks.Add(_conn.DisposeAsync()); + disposeTasks.Add(_conn.DisposeAsync()); + disposeTasks.Add(_conn.DisposeAsync()); + + foreach (ValueTask vt in disposeTasks) + { + await vt; + } + + _channel = null; + _conn = null; + } } } diff --git a/projects/Test/Integration/TestQueueDeclare.cs b/projects/Test/Integration/TestQueueDeclare.cs index 94e758a64..91510029f 100644 --- a/projects/Test/Integration/TestQueueDeclare.cs +++ b/projects/Test/Integration/TestQueueDeclare.cs @@ -57,6 +57,22 @@ public async Task TestQueueDeclareAsync() Assert.Equal(q, passiveDeclareResult.QueueName); } + [Fact] + public async Task TestPassiveQueueDeclareException_GH1749() + { + string q = GenerateQueueName(); + try + { + await _channel.QueueDeclarePassiveAsync(q); + } + catch (Exception ex) + { + _output.WriteLine("{0} ex: {1}", _testDisplayName, ex); + await _channel.DisposeAsync(); + _channel = null; + } + } + [Fact] public async Task TestConcurrentQueueDeclareAndBindAsync() { diff --git a/projects/Test/Integration/TestToxiproxy.cs b/projects/Test/Integration/TestToxiproxy.cs index 4aeb219ec..3f0307bb7 100644 --- a/projects/Test/Integration/TestToxiproxy.cs +++ b/projects/Test/Integration/TestToxiproxy.cs @@ -41,12 +41,14 @@ using Xunit; using Xunit.Abstractions; +#nullable enable + namespace Test.Integration { public class TestToxiproxy : IntegrationFixture { private readonly TimeSpan _heartbeatTimeout = TimeSpan.FromSeconds(1); - private ToxiproxyManager _toxiproxyManager; + private ToxiproxyManager? _toxiproxyManager; private int _proxyPort; public TestToxiproxy(ITestOutputHelper output) : base(output) @@ -61,14 +63,24 @@ public override Task InitializeAsync() Assert.Null(_conn); Assert.Null(_channel); - _toxiproxyManager = new ToxiproxyManager(_testDisplayName, IsRunningInCI, IsWindows); - _proxyPort = ToxiproxyManager.ProxyPort; - return _toxiproxyManager.InitializeAsync(); + if (AreToxiproxyTestsEnabled) + { + _toxiproxyManager = new ToxiproxyManager(_testDisplayName, IsRunningInCI, IsWindows); + _proxyPort = ToxiproxyManager.ProxyPort; + return _toxiproxyManager.InitializeAsync(); + } + else + { + return Task.CompletedTask; + } } public override async Task DisposeAsync() { - await _toxiproxyManager.DisposeAsync(); + if (_toxiproxyManager is not null) + { + await _toxiproxyManager.DisposeAsync(); + } await base.DisposeAsync(); } @@ -77,6 +89,7 @@ public override async Task DisposeAsync() public async Task TestCloseConnection() { Skip.IfNot(AreToxiproxyTestsEnabled, "RABBITMQ_TOXIPROXY_TESTS is not set, skipping test"); + Assert.NotNull(_toxiproxyManager); ConnectionFactory cf = CreateConnectionFactory(); cf.Port = _proxyPort; @@ -199,6 +212,7 @@ async Task PublishLoop() public async Task TestThatStoppedSocketResultsInHeartbeatTimeout() { Skip.IfNot(AreToxiproxyTestsEnabled, "RABBITMQ_TOXIPROXY_TESTS is not set, skipping test"); + Assert.NotNull(_toxiproxyManager); ConnectionFactory cf = CreateConnectionFactory(); cf.Port = _proxyPort; @@ -246,6 +260,7 @@ await Assert.ThrowsAsync(() => public async Task TestTcpReset_GH1464() { Skip.IfNot(AreToxiproxyTestsEnabled, "RABBITMQ_TOXIPROXY_TESTS is not set, skipping test"); + Assert.NotNull(_toxiproxyManager); ConnectionFactory cf = CreateConnectionFactory(); cf.Endpoint = new AmqpTcpEndpoint(IPAddress.Loopback.ToString(), _proxyPort); @@ -298,6 +313,7 @@ public async Task TestTcpReset_GH1464() public async Task TestPublisherConfirmationThrottling() { Skip.IfNot(AreToxiproxyTestsEnabled, "RABBITMQ_TOXIPROXY_TESTS is not set, skipping test"); + Assert.NotNull(_toxiproxyManager); const int TotalMessageCount = 64; const int MaxOutstandingConfirms = 8; @@ -397,7 +413,7 @@ private bool AreToxiproxyTestsEnabled { get { - string s = Environment.GetEnvironmentVariable("RABBITMQ_TOXIPROXY_TESTS"); + string? s = Environment.GetEnvironmentVariable("RABBITMQ_TOXIPROXY_TESTS"); if (string.IsNullOrEmpty(s)) {