Skip to content

Commit

Permalink
Ensure DisposeAsync and Dispose are idempotent
Browse files Browse the repository at this point in the history
Fixes #1749

* Add test project for GH #1749.
* Add `FirstChanceException` logging in #1749 test project.
* Add code to repeatedly close connection.
* Allow passing hostname to GH-1749 program as the first arg.
* Toxiproxy tests fixup.
* Ensure `_disposed` is set in `finally` block.
* Add lock object for disposal of Channels and Connections. Note: a
  `SemaphoreSlim` can't be used because it must be disposed as well,
  and that can't happen cleanly in a `Dispose` method.
* Add basic test to see what dispose after a channel exception does.
* Modify `CreateChannel` app to try and trigger GH1751
* Add TCS for server-originated channel closure. This is the "real" fix
  for the issue.
  • Loading branch information
lukebakken committed Jan 7, 2025
1 parent 5b1c9cc commit fd2377b
Show file tree
Hide file tree
Showing 13 changed files with 982 additions and 406 deletions.
7 changes: 7 additions & 0 deletions RabbitMQDotNetClient.sln
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GH-1647", "projects\Applica
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PublisherConfirms", "projects\Applications\PublisherConfirms\PublisherConfirms.csproj", "{13149F73-2CDB-4ECF-BF2C-403860045751}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GH-1749", "projects\Applications\GH-1749\GH-1749.csproj", "{725D9986-ACD1-424E-AF4C-2BEB407D2BD9}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -108,6 +110,10 @@ Global
{13149F73-2CDB-4ECF-BF2C-403860045751}.Debug|Any CPU.Build.0 = Debug|Any CPU
{13149F73-2CDB-4ECF-BF2C-403860045751}.Release|Any CPU.ActiveCfg = Release|Any CPU
{13149F73-2CDB-4ECF-BF2C-403860045751}.Release|Any CPU.Build.0 = Release|Any CPU
{725D9986-ACD1-424E-AF4C-2BEB407D2BD9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{725D9986-ACD1-424E-AF4C-2BEB407D2BD9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{725D9986-ACD1-424E-AF4C-2BEB407D2BD9}.Release|Any CPU.ActiveCfg = Release|Any CPU
{725D9986-ACD1-424E-AF4C-2BEB407D2BD9}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -123,6 +129,7 @@ Global
{AB5B7C53-D7EC-4985-A6DE-70178E4B688A} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704}
{64ED07BF-4D77-47CD-AF4F-5B4525686FA1} = {D21B282C-49E6-4A30-887B-9626D94B8D69}
{13149F73-2CDB-4ECF-BF2C-403860045751} = {D21B282C-49E6-4A30-887B-9626D94B8D69}
{725D9986-ACD1-424E-AF4C-2BEB407D2BD9} = {D21B282C-49E6-4A30-887B-9626D94B8D69}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {3C6A0C44-FA63-4101-BBF9-2598641167D1}
Expand Down
77 changes: 59 additions & 18 deletions projects/Applications/CreateChannel/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@
//---------------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions;

namespace CreateChannel
{
Expand All @@ -44,49 +45,89 @@ public static class Program
private const int ChannelsToOpen = 50;

private static int channelsOpened;
private static AutoResetEvent doneEvent;

public static async Task Main()
{
doneEvent = new AutoResetEvent(false);
var doneTcs = new TaskCompletionSource<bool>();

var connectionFactory = new ConnectionFactory { };
await using IConnection connection = await connectionFactory.CreateConnectionAsync();

var watch = Stopwatch.StartNew();
_ = Task.Run(async () =>
var workTask = Task.Run(async () =>
{
var channels = new IChannel[ChannelsToOpen];
for (int i = 0; i < Repeats; i++)
try
{
for (int j = 0; j < channels.Length; j++)
var channelOpenTasks = new List<Task<IChannel>>();
var channelDisposeTasks = new List<ValueTask>();
var channels = new List<IChannel>();
for (int i = 0; i < Repeats; i++)
{
channels[j] = await connection.CreateChannelAsync();
channelsOpened++;
}
for (int j = 0; j < ChannelsToOpen; j++)
{
channelOpenTasks.Add(connection.CreateChannelAsync());
}

for (int j = 0; j < channels.Length; j++)
{
await channels[j].DisposeAsync();
for (int j = 0; j < channelOpenTasks.Count; j++)
{
IChannel ch = await channelOpenTasks[j];
if (j % 8 == 0)
{
try
{
await ch.QueueDeclarePassiveAsync(Guid.NewGuid().ToString());
}
catch (OperationInterruptedException)
{
await ch.DisposeAsync();
}
catch (Exception ex)
{
_ = Console.Error.WriteLineAsync($"{DateTime.Now:s} [ERROR] {ex}");
}
}
else
{
channels.Add(ch);
channelsOpened++;
}
}
channelOpenTasks.Clear();

for (int j = 0; j < channels.Count; j++)
{
channelDisposeTasks.Add(channels[j].DisposeAsync());
}

for (int j = 0; j < channels.Count; j++)
{
await channelDisposeTasks[j];
}
channelDisposeTasks.Clear();
}
}

doneEvent.Set();
doneTcs.SetResult(true);
}
catch (Exception ex)
{
doneTcs.SetException(ex);
}
});

Console.WriteLine($"{Repeats} times opening {ChannelsToOpen} channels on a connection. => Total channel open/close: {Repeats * ChannelsToOpen}");
Console.WriteLine();
Console.WriteLine("Opened");
while (!doneEvent.WaitOne(500))
while (false == doneTcs.Task.IsCompleted)
{
Console.WriteLine($"{channelsOpened,5}");
await Task.Delay(150);
}
watch.Stop();
Console.WriteLine($"{channelsOpened,5}");
Console.WriteLine();
Console.WriteLine($"Took {watch.Elapsed.TotalMilliseconds} ms");
Console.WriteLine($"Took {watch.Elapsed}");

Console.ReadLine();
await workTask;
}
}
}
2 changes: 1 addition & 1 deletion projects/Applications/GH-1647/GH-1647.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="../../RabbitMQ.Client\RabbitMQ.Client.csproj" />
<ProjectReference Include="../../RabbitMQ.Client/RabbitMQ.Client.csproj" />
</ItemGroup>

</Project>
15 changes: 15 additions & 0 deletions projects/Applications/GH-1749/GH-1749.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<RootNamespace>GH_1749</RootNamespace>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="../../RabbitMQ.Client/RabbitMQ.Client.csproj" />
</ItemGroup>

</Project>
199 changes: 199 additions & 0 deletions projects/Applications/GH-1749/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
//
// The APL v2.0:
//
//---------------------------------------------------------------------------
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//---------------------------------------------------------------------------
//
// The MPL v2.0:
//
//---------------------------------------------------------------------------
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
//---------------------------------------------------------------------------

#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task

using System.Runtime.ExceptionServices;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;

namespace GH_1749
{
class GH1749Consumer : AsyncDefaultBasicConsumer
{
public GH1749Consumer(IChannel channel) : base(channel)
{
}

protected override Task OnCancelAsync(string[] consumerTags, CancellationToken cancellationToken = default)
{
Console.WriteLine("{0} [INFO] OnCancelAsync, tags[0]: {1}", DateTime.Now.ToString("s"), consumerTags[0]);
return base.OnCancelAsync(consumerTags, cancellationToken);
}
}

static class Program
{
const string DefaultHostName = "localhost";
const string ConnectionClientProvidedName = "GH_1749";
static readonly CancellationTokenSource s_cancellationTokenSource = new();
static readonly CancellationToken s_cancellationToken = s_cancellationTokenSource.Token;

static async Task Main(string[] args)
{
string hostname = DefaultHostName;
if (args.Length > 0)
{
hostname = args[0];
}

AppDomain.CurrentDomain.FirstChanceException += CurrentDomain_FirstChanceException;

ConnectionFactory connectionFactory = new()
{
HostName = hostname,
AutomaticRecoveryEnabled = false,
TopologyRecoveryEnabled = false,
RequestedConnectionTimeout = TimeSpan.FromSeconds(600),
RequestedHeartbeat = TimeSpan.FromSeconds(600),
UserName = "guest",
Password = "guest",
ClientProvidedName = ConnectionClientProvidedName
};

var channelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true);
await using var connection = await connectionFactory.CreateConnectionAsync();

connection.RecoverySucceededAsync += (object sender, AsyncEventArgs ea) =>
{
Console.WriteLine("{0} [INFO] saw RecoverySucceededAsync, event: {1}", Now, ea);
return Task.CompletedTask;
};

connection.CallbackExceptionAsync += Connection_CallbackExceptionAsync;

connection.ConnectionBlockedAsync += Connection_ConnectionBlockedAsync;
connection.ConnectionUnblockedAsync += Connection_ConnectionUnblockedAsync;

connection.ConnectionRecoveryErrorAsync += Connection_ConnectionRecoveryErrorAsync;

connection.ConnectionShutdownAsync += (object sender, ShutdownEventArgs ea) =>
{
Console.WriteLine("{0} [INFO] saw ConnectionShutdownAsync, event: {1}", Now, ea);
return Task.CompletedTask;
};

connection.ConsumerTagChangeAfterRecoveryAsync += Connection_ConsumerTagChangeAfterRecoveryAsync;
connection.QueueNameChangedAfterRecoveryAsync += Connection_QueueNameChangedAfterRecoveryAsync;

connection.RecoveringConsumerAsync += Connection_RecoveringConsumerAsync;

await using var channel = await connection.CreateChannelAsync(options: channelOptions);

channel.CallbackExceptionAsync += Channel_CallbackExceptionAsync;
channel.ChannelShutdownAsync += Channel_ChannelShutdownAsync;

try
{
await channel.QueueDeclarePassiveAsync(Guid.NewGuid().ToString());
}
catch (OperationInterruptedException)
{
await channel.DisposeAsync();
// rabbitmq-dotnet-client-1749
// await Task.Delay(2000);
}
}

private static string Now => DateTime.Now.ToString("s");

private static Task Channel_CallbackExceptionAsync(object sender, CallbackExceptionEventArgs ea)
{
Console.WriteLine("{0} [INFO] channel saw CallbackExceptionAsync, event: {1}", Now, ea);
Console.WriteLine("{0} [INFO] channel CallbackExceptionAsync, exception: {1}", Now, ea.Exception);
return Task.CompletedTask;
}

private static Task Channel_ChannelShutdownAsync(object sender, ShutdownEventArgs ea)
{
Console.WriteLine("{0} [INFO] saw ChannelShutdownAsync, event: {1}", Now, ea);
return Task.CompletedTask;
// rabbitmq-dotnet-client-1749
// return Task.Delay(1000);
}

private static void CurrentDomain_FirstChanceException(object? sender, FirstChanceExceptionEventArgs e)
{
if (e.Exception is ObjectDisposedException)
{
Console.WriteLine("{0} [INFO] saw FirstChanceException, exception: {1}", Now, e.Exception);
}
}

private static Task Connection_CallbackExceptionAsync(object sender, CallbackExceptionEventArgs ea)
{
Console.WriteLine("{0} [INFO] connection saw CallbackExceptionAsync, event: {1}", Now, ea);
Console.WriteLine("{0} [INFO] connection CallbackExceptionAsync, exception: {1}", Now, ea.Exception);
return Task.CompletedTask;
}

private static Task Connection_ConnectionBlockedAsync(object sender, ConnectionBlockedEventArgs ea)
{
Console.WriteLine("{0} [INFO] saw ConnectionBlockedAsync, event: {1}", Now, ea);
return Task.CompletedTask;
}

private static Task Connection_ConnectionUnblockedAsync(object sender, AsyncEventArgs ea)
{
Console.WriteLine("{0} [INFO] saw ConnectionUnlockedAsync, event: {1}", Now, ea);
return Task.CompletedTask;
}

private static Task Connection_ConnectionRecoveryErrorAsync(object sender, ConnectionRecoveryErrorEventArgs ea)
{
Console.WriteLine("{0} [INFO] saw ConnectionRecoveryErrorAsync, event: {1}", Now, ea);
Console.WriteLine("{0} [INFO] ConnectionRecoveryErrorAsync, exception: {1}", Now, ea.Exception);
return Task.CompletedTask;
}

private static Task Connection_ConsumerTagChangeAfterRecoveryAsync(object sender, ConsumerTagChangedAfterRecoveryEventArgs ea)
{
Console.WriteLine("{0} [INFO] saw ConsumerTagChangeAfterRecoveryAsync, event: {1}", Now, ea);
Console.WriteLine("{0} [INFO] ConsumerTagChangeAfterRecoveryAsync, tags: {1} {2}", Now, ea.TagBefore, ea.TagAfter);
return Task.CompletedTask;
}

private static Task Connection_QueueNameChangedAfterRecoveryAsync(object sender, QueueNameChangedAfterRecoveryEventArgs ea)
{
Console.WriteLine("{0} [INFO] saw QueueNameChangedAfterRecoveryAsync, event: {1}", Now, ea);
Console.WriteLine("{0} [INFO] QueueNameChangedAfterRecoveryAsync, queue names: {1} {2}", Now, ea.NameBefore, ea.NameAfter);
return Task.CompletedTask;
}

private static Task Connection_RecoveringConsumerAsync(object sender, RecoveringConsumerEventArgs ea)
{
Console.WriteLine("{0} [INFO] saw RecoveringConsumerAsync, event: {1}, tag: {2}", Now, ea, ea.ConsumerTag);
return Task.CompletedTask;
}
}
}

Loading

0 comments on commit fd2377b

Please sign in to comment.