From 787c12ee45bdbaf2ded25bc9054039f6ab6d7ccf Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 1 Feb 2024 09:24:41 -0800 Subject: [PATCH] * Merge `AsyncIntegration` tests into `Integration`. * Move `TestConnectionFactory` to Integration * Move `TestExchangeDeclareAsync` to `TestExchangeDeclare`. * `AsyncIntegrationFixture` is now a placeholder. * Move `TestExtensionsAsync` to `TestExtensions` * Move `TestFloodPublishingAsync` to `TestFloodPublishing` * Remove the rest of `AsyncIntegration`. --- .github/workflows/build-test.yaml | 9 - Build.csproj | 1 - Makefile | 1 - RabbitMQDotNetClient.sln | 7 - build.ps1 | 3 +- .../RabbitMQ.Client/RabbitMQ.Client.csproj | 3 - .../AsyncIntegrationFixture.cs | 97 --- .../AsyncIntegration/TestBasicGetAsync.cs | 64 -- .../AsyncIntegration/TestConnectionFactory.cs | 105 --- .../TestExchangeDeclareAsync.cs | 102 --- .../TestPassiveDeclareAsync.cs | 67 -- .../AsyncIntegration/TestQueueDeclareAsync.cs | 150 ---- ...onFixtureBase.cs => IntegrationFixture.cs} | 44 +- .../TestConnectionRecoveryBase.cs | 42 +- .../Test/Integration/IntegrationFixture.cs | 43 - .../TestAsyncConsumer.cs | 4 +- .../TestAsyncConsumerExceptions.cs | 4 +- projects/Test/Integration/TestBasicGet.cs | 19 + .../TestBasicPublishAsync.cs | 4 +- ...ncurrentAccessWithSharedConnectionAsync.cs | 4 +- .../TestConfirmSelectAsync.cs | 4 +- .../Test/Integration/TestConnectionFactory.cs | 49 ++ .../Integration/TestConnectionRecovery.cs | 741 ++++++++++++++++++ .../TestConnectionRecoveryWithoutSetup.cs | 3 +- .../TestConnectionTopologyRecovery.cs | 4 +- .../TestConsumerOperationDispatch.cs | 2 +- .../Test/Integration/TestExchangeDeclare.cs | 56 ++ .../TestExtensions.cs} | 8 +- .../TestFloodPublishing.cs} | 10 +- .../TestMessageCount.cs} | 6 +- .../TestPublishSharedChannelAsync.cs | 4 +- .../TestPublisherConfirms.cs} | 6 +- projects/Test/Integration/TestQueueDeclare.cs | 103 +++ .../SequentialIntegrationFixture.cs | 16 +- .../TestConnectionRecovery.cs | 715 +---------------- 35 files changed, 1066 insertions(+), 1434 deletions(-) delete mode 100644 projects/Test/AsyncIntegration/AsyncIntegrationFixture.cs delete mode 100644 projects/Test/AsyncIntegration/TestBasicGetAsync.cs delete mode 100644 projects/Test/AsyncIntegration/TestConnectionFactory.cs delete mode 100644 projects/Test/AsyncIntegration/TestExchangeDeclareAsync.cs delete mode 100644 projects/Test/AsyncIntegration/TestPassiveDeclareAsync.cs delete mode 100644 projects/Test/AsyncIntegration/TestQueueDeclareAsync.cs rename projects/Test/Common/{IntegrationFixtureBase.cs => IntegrationFixture.cs} (92%) rename projects/Test/{SequentialIntegration => Common}/TestConnectionRecoveryBase.cs (92%) delete mode 100644 projects/Test/Integration/IntegrationFixture.cs rename projects/Test/{AsyncIntegration => Integration}/TestAsyncConsumer.cs (99%) rename projects/Test/{AsyncIntegration => Integration}/TestAsyncConsumerExceptions.cs (98%) rename projects/Test/{AsyncIntegration => Integration}/TestBasicPublishAsync.cs (96%) rename projects/Test/{AsyncIntegration => Integration}/TestConcurrentAccessWithSharedConnectionAsync.cs (99%) rename projects/Test/{AsyncIntegration => Integration}/TestConfirmSelectAsync.cs (96%) create mode 100644 projects/Test/Integration/TestConnectionRecovery.cs rename projects/Test/{SequentialIntegration => Integration}/TestConnectionRecoveryWithoutSetup.cs (99%) rename projects/Test/{SequentialIntegration => Integration}/TestConnectionTopologyRecovery.cs (99%) rename projects/Test/{AsyncIntegration/TestExtensionsAsync.cs => Integration/TestExtensions.cs} (93%) rename projects/Test/{AsyncIntegration/TestFloodPublishingAsync.cs => Integration/TestFloodPublishing.cs} (96%) rename projects/Test/{AsyncIntegration/TestMessageCountAsync.cs => Integration/TestMessageCount.cs} (92%) rename projects/Test/{AsyncIntegration => Integration}/TestPublishSharedChannelAsync.cs (97%) rename projects/Test/{AsyncIntegration/TestPublisherConfirmsAsync.cs => Integration/TestPublisherConfirms.cs} (97%) diff --git a/.github/workflows/build-test.yaml b/.github/workflows/build-test.yaml index 15c3e03845..3356bfee7c 100644 --- a/.github/workflows/build-test.yaml +++ b/.github/workflows/build-test.yaml @@ -37,7 +37,6 @@ jobs: name: rabbitmq-dotnet-client-build-win32 path: | projects/Test/Unit/bin - projects/Test/AsyncIntegration/bin projects/Test/Integration/bin projects/Test/SequentialIntegration/bin projects/RabbitMQ.*/bin @@ -67,8 +66,6 @@ jobs: - name: Install and Start RabbitMQ id: install-start-rabbitmq run: .\.ci\windows\gha-setup.ps1 - - name: Async Integration Tests - run: dotnet test --environment "RABBITMQ_RABBITMQCTL_PATH=${{ steps.install-start-rabbitmq.outputs.path }}" --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' "${{ github.workspace }}\projects\Test\AsyncIntegration\AsyncIntegration.csproj" --no-restore --no-build --logger 'console;verbosity=detailed' - name: Integration Tests run: dotnet test --environment "RABBITMQ_RABBITMQCTL_PATH=${{ steps.install-start-rabbitmq.outputs.path }}" --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' --environment 'PASSWORD=grapefruit' --environment SSL_CERTS_DIR="${{ github.workspace }}\.ci\certs" "${{ github.workspace }}\projects\Test\Integration\Integration.csproj" --no-restore --no-build --logger 'console;verbosity=detailed' - name: Maybe upload RabbitMQ logs @@ -146,7 +143,6 @@ jobs: name: rabbitmq-dotnet-client-build-ubuntu path: | projects/Test/Unit/bin - projects/Test/AsyncIntegration/bin projects/Test/Integration/bin projects/Test/SequentialIntegration/bin projects/RabbitMQ.*/bin @@ -170,11 +166,6 @@ jobs: - name: Start RabbitMQ id: start-rabbitmq run: ${{ github.workspace }}/.ci/ubuntu/gha-setup.sh - - name: Async Integration Tests - run: | - dotnet test \ - --environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:${{ steps.start-rabbitmq.outputs.id }}" \ - "${{ github.workspace }}/projects/Test/AsyncIntegration/AsyncIntegration.csproj" --no-restore --no-build --logger 'console;verbosity=detailed' - name: Integration Tests run: | dotnet test \ diff --git a/Build.csproj b/Build.csproj index c59632d35f..84aa80559a 100644 --- a/Build.csproj +++ b/Build.csproj @@ -12,7 +12,6 @@ - diff --git a/Makefile b/Makefile index 88b8227407..7a47cc721f 100644 --- a/Makefile +++ b/Makefile @@ -8,7 +8,6 @@ build: test: dotnet test $(CURDIR)/projects/Test/Unit/Unit.csproj --logger 'console;verbosity=detailed' - dotnet test --environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:$$(docker inspect --format='{{.Id}}' $(RABBITMQ_DOCKER_NAME))" $(CURDIR)/projects/Test/AsyncIntegration/AsyncIntegration.csproj --logger 'console;verbosity=detailed' dotnet test --environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:$$(docker inspect --format='{{.Id}}' $(RABBITMQ_DOCKER_NAME))" $(CURDIR)/projects/Test/Integration/Integration.csproj --logger 'console;verbosity=detailed' dotnet test --environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:$$(docker inspect --format='{{.Id}}' $(RABBITMQ_DOCKER_NAME))" $(CURDIR)/projects/Test/SequentialIntegration/SequentialIntegration.csproj --logger 'console;verbosity=detailed' diff --git a/RabbitMQDotNetClient.sln b/RabbitMQDotNetClient.sln index 8c37f5f06c..ead1354def 100644 --- a/RabbitMQDotNetClient.sln +++ b/RabbitMQDotNetClient.sln @@ -38,8 +38,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SequentialIntegration", "pr EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Common", "projects\Test\Common\Common.csproj", "{C11F25F4-7EA1-4874-9E25-DEB42E3A7C67}" EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AsyncIntegration", "projects\Test\AsyncIntegration\AsyncIntegration.csproj", "{D98F96C5-F7FB-45FC-92A0-9133850FB432}" -EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -86,10 +84,6 @@ Global {C11F25F4-7EA1-4874-9E25-DEB42E3A7C67}.Debug|Any CPU.Build.0 = Debug|Any CPU {C11F25F4-7EA1-4874-9E25-DEB42E3A7C67}.Release|Any CPU.ActiveCfg = Release|Any CPU {C11F25F4-7EA1-4874-9E25-DEB42E3A7C67}.Release|Any CPU.Build.0 = Release|Any CPU - {D98F96C5-F7FB-45FC-92A0-9133850FB432}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {D98F96C5-F7FB-45FC-92A0-9133850FB432}.Debug|Any CPU.Build.0 = Debug|Any CPU - {D98F96C5-F7FB-45FC-92A0-9133850FB432}.Release|Any CPU.ActiveCfg = Release|Any CPU - {D98F96C5-F7FB-45FC-92A0-9133850FB432}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -103,7 +97,6 @@ Global {B01347D8-C327-471B-A1FE-7B86F7684A27} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704} {F25725D7-2978-45F4-B90F-25D6F8B71C9E} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704} {C11F25F4-7EA1-4874-9E25-DEB42E3A7C67} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704} - {D98F96C5-F7FB-45FC-92A0-9133850FB432} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {3C6A0C44-FA63-4101-BBF9-2598641167D1} diff --git a/build.ps1 b/build.ps1 index 124320fdb8..5e1fe39ba3 100644 --- a/build.ps1 +++ b/build.ps1 @@ -17,10 +17,9 @@ if ($RunTests) $tests_dir = Join-Path -Path $PSScriptRoot -ChildPath 'projects' | Join-Path -ChildPath 'Test' $unit_csproj_file = Resolve-Path -LiteralPath (Join-Path -Path $tests_dir -ChildPath 'Unit' | Join-Path -ChildPath 'Unit.csproj') $integration_csproj_file = Resolve-Path -LiteralPath (Join-Path -Path $tests_dir -ChildPath 'Integration' | Join-Path -ChildPath 'Integration.csproj') - $async_integration_csproj_file = Resolve-Path -LiteralPath (Join-Path -Path $tests_dir -ChildPath 'AsyncIntegration' | Join-Path -ChildPath 'AsyncIntegration.csproj') $sequential_integration_csproj_file = Resolve-Path -LiteralPath (Join-Path -Path $tests_dir -ChildPath 'SequentialIntegration' | Join-Path -ChildPath 'SequentialIntegration.csproj') - foreach ($csproj_file in $unit_csproj_file, $integration_csproj_file, $async_integration_csproj_file, $sequential_integration_csproj_file) + foreach ($csproj_file in $unit_csproj_file, $integration_csproj_file, $sequential_integration_csproj_file) { Write-Host "[INFO] running Unit / Integration tests from '$csproj_file' (all frameworks)" -ForegroundColor "Magenta" dotnet test $csproj_file --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' --no-restore --no-build --logger "console;verbosity=detailed" diff --git a/projects/RabbitMQ.Client/RabbitMQ.Client.csproj b/projects/RabbitMQ.Client/RabbitMQ.Client.csproj index a539ea28fb..74b6a7fae3 100644 --- a/projects/RabbitMQ.Client/RabbitMQ.Client.csproj +++ b/projects/RabbitMQ.Client/RabbitMQ.Client.csproj @@ -56,9 +56,6 @@ <_Parameter1>Unit, PublicKey=00240000048000009400000006020000002400005253413100040000010001008d20ec856aeeb8c3153a77faa2d80e6e43b5db93224a20cc7ae384f65f142e89730e2ff0fcc5d578bbe96fa98a7196c77329efdee4579b3814c0789e5a39b51df6edd75b602a33ceabdfcf19a3feb832f31d8254168cd7ba5700dfbca301fbf8db614ba41ba18474de0a5f4c2d51c995bc3636c641c8cbe76f45717bfcb943b5 - - <_Parameter1>AsyncIntegration, PublicKey=00240000048000009400000006020000002400005253413100040000010001008d20ec856aeeb8c3153a77faa2d80e6e43b5db93224a20cc7ae384f65f142e89730e2ff0fcc5d578bbe96fa98a7196c77329efdee4579b3814c0789e5a39b51df6edd75b602a33ceabdfcf19a3feb832f31d8254168cd7ba5700dfbca301fbf8db614ba41ba18474de0a5f4c2d51c995bc3636c641c8cbe76f45717bfcb943b5 - <_Parameter1>Integration, PublicKey=00240000048000009400000006020000002400005253413100040000010001008d20ec856aeeb8c3153a77faa2d80e6e43b5db93224a20cc7ae384f65f142e89730e2ff0fcc5d578bbe96fa98a7196c77329efdee4579b3814c0789e5a39b51df6edd75b602a33ceabdfcf19a3feb832f31d8254168cd7ba5700dfbca301fbf8db614ba41ba18474de0a5f4c2d51c995bc3636c641c8cbe76f45717bfcb943b5 diff --git a/projects/Test/AsyncIntegration/AsyncIntegrationFixture.cs b/projects/Test/AsyncIntegration/AsyncIntegrationFixture.cs deleted file mode 100644 index 23398f6147..0000000000 --- a/projects/Test/AsyncIntegration/AsyncIntegrationFixture.cs +++ /dev/null @@ -1,97 +0,0 @@ -// This source code is dual-licensed under the Apache License, version -// 2.0, and the Mozilla Public License, version 2.0. -// -// The APL v2.0: -// -//--------------------------------------------------------------------------- -// Copyright (c) 2007-2020 VMware, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -//--------------------------------------------------------------------------- -// -// The MPL v2.0: -// -//--------------------------------------------------------------------------- -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at https://mozilla.org/MPL/2.0/. -// -// Copyright (c) 2007-2020 VMware, Inc. All rights reserved. -//--------------------------------------------------------------------------- - -using System.Collections.Generic; -using System.Threading.Tasks; -using RabbitMQ.Client; -using Xunit; -using Xunit.Abstractions; - -namespace Test.AsyncIntegration -{ - public class AsyncIntegrationFixture : IntegrationFixtureBase, IAsyncLifetime - { - protected readonly bool _dispatchConsumersAsync = false; - protected readonly ushort _consumerDispatchConcurrency = 1; - protected readonly bool _openChannel = true; - - public AsyncIntegrationFixture(ITestOutputHelper output, - bool dispatchConsumersAsync = false, - ushort consumerDispatchConcurrency = 1, - bool openChannel = true) : base(output) - { - _dispatchConsumersAsync = dispatchConsumersAsync; - _consumerDispatchConcurrency = consumerDispatchConcurrency; - _openChannel = openChannel; - } - - protected static Task AssertRanToCompletion(params Task[] tasks) - { - return DoAssertRanToCompletion(tasks); - } - - protected static Task AssertRanToCompletion(IEnumerable tasks) - { - return DoAssertRanToCompletion(tasks); - } - - public override async Task InitializeAsync() - { - _connFactory = CreateConnectionFactory(); - _connFactory.DispatchConsumersAsync = _dispatchConsumersAsync; - _connFactory.ConsumerDispatchConcurrency = _consumerDispatchConcurrency; - - _conn = await _connFactory.CreateConnectionAsync(); - if (_connFactory.AutomaticRecoveryEnabled) - { - Assert.IsType(_conn); - } - else - { - Assert.IsType(_conn); - } - - if (_openChannel) - { - _channel = await _conn.CreateChannelAsync(); - } - - base.AddCallbackHandlers(); - } - - private static async Task DoAssertRanToCompletion(IEnumerable tasks) - { - Task whenAllTask = Task.WhenAll(tasks); - await whenAllTask; - Assert.True(whenAllTask.IsCompletedSuccessfully()); - } - } -} diff --git a/projects/Test/AsyncIntegration/TestBasicGetAsync.cs b/projects/Test/AsyncIntegration/TestBasicGetAsync.cs deleted file mode 100644 index 48309660df..0000000000 --- a/projects/Test/AsyncIntegration/TestBasicGetAsync.cs +++ /dev/null @@ -1,64 +0,0 @@ -// This source code is dual-licensed under the Apache License, version -// 2.0, and the Mozilla Public License, version 2.0. -// -// The APL v2.0: -// -//--------------------------------------------------------------------------- -// Copyright (c) 2007-2020 VMware, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -//--------------------------------------------------------------------------- -// -// The MPL v2.0: -// -//--------------------------------------------------------------------------- -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at https://mozilla.org/MPL/2.0/. -// -// Copyright (c) 2007-2020 VMware, Inc. All rights reserved. -//--------------------------------------------------------------------------- - -using System.Threading.Tasks; -using RabbitMQ.Client; -using Xunit; -using Xunit.Abstractions; - -namespace Test.AsyncIntegration -{ - public class TestBasicGetAsync : AsyncIntegrationFixture - { - public TestBasicGetAsync(ITestOutputHelper output) : base(output) - { - } - - [Fact] - public async Task TestBasicGet() - { - const string msg = "for async basic.get"; - - QueueDeclareOk queueResult = await _channel.QueueDeclareAsync(string.Empty, false, true, true); - string queueName = queueResult.QueueName; - - await _channel.BasicPublishAsync(string.Empty, queueName, _encoding.GetBytes(msg), true); - - BasicGetResult getResult = await _channel.BasicGetAsync(queueName, true); - Assert.Equal(msg, _encoding.GetString(getResult.Body.ToArray())); - - QueueDeclareOk queueResultPassive = await _channel.QueueDeclarePassiveAsync(queue: queueName); - Assert.Equal((uint)0, queueResultPassive.MessageCount); - - Assert.Null(await _channel.BasicGetAsync(queueName, true)); - } - } -} diff --git a/projects/Test/AsyncIntegration/TestConnectionFactory.cs b/projects/Test/AsyncIntegration/TestConnectionFactory.cs deleted file mode 100644 index 15763a6d1d..0000000000 --- a/projects/Test/AsyncIntegration/TestConnectionFactory.cs +++ /dev/null @@ -1,105 +0,0 @@ -// This source code is dual-licensed under the Apache License, version -// 2.0, and the Mozilla Public License, version 2.0. -// -// The APL v2.0: -// -//--------------------------------------------------------------------------- -// Copyright (c) 2007-2020 VMware, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -//--------------------------------------------------------------------------- -// -// The MPL v2.0: -// -//--------------------------------------------------------------------------- -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at https://mozilla.org/MPL/2.0/. -// -// Copyright (c) 2011-2020 VMware, Inc. or its affiliates. All rights reserved. -//--------------------------------------------------------------------------- - -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; -using RabbitMQ.Client; -using Xunit; -using Xunit.Abstractions; - -namespace Test.AsyncIntegration -{ - public class TestConnectionFactory : AsyncIntegrationFixture - { - public TestConnectionFactory(ITestOutputHelper output) : base(output) - { - } - - public override Task InitializeAsync() - { - // NB: nothing to do here since each test creates its own factory, - // connections and channels - Assert.Null(_connFactory); - Assert.Null(_conn); - Assert.Null(_channel); - return Task.CompletedTask; - } - - [Fact] - public async Task TestCreateConnectionAsync_WithAlreadyCanceledToken() - { - using (var cts = new CancellationTokenSource()) - { - cts.Cancel(); - - ConnectionFactory cf = CreateConnectionFactory(); - - bool passed = false; - /* - * If anyone wonders why TaskCanceledException is explicitly checked, - * even though it's a subclass of OperationCanceledException: - * https://github.com/rabbitmq/rabbitmq-dotnet-client/commit/383ca5c5f161edb717cf8fae7bf143c13143f634#r135400615 - */ - try - { - await cf.CreateConnectionAsync(cts.Token); - } - catch (TaskCanceledException) - { - passed = true; - } - catch (OperationCanceledException) - { - passed = true; - } - - Assert.True(passed, "FAIL did not see TaskCanceledException nor OperationCanceledException"); - } - } - - [Fact] - public async Task TestCreateConnectionAsync_UsesValidEndpointWhenMultipleSupplied() - { - using (var cts = new CancellationTokenSource(WaitSpan)) - { - ConnectionFactory cf = CreateConnectionFactory(); - var invalidEp = new AmqpTcpEndpoint("not_localhost"); - var ep = new AmqpTcpEndpoint("localhost"); - using (IConnection conn = await cf.CreateConnectionAsync(new List { invalidEp, ep }, cts.Token)) - { - await conn.CloseAsync(); - } - } - } - } -} diff --git a/projects/Test/AsyncIntegration/TestExchangeDeclareAsync.cs b/projects/Test/AsyncIntegration/TestExchangeDeclareAsync.cs deleted file mode 100644 index cb72ad6dbb..0000000000 --- a/projects/Test/AsyncIntegration/TestExchangeDeclareAsync.cs +++ /dev/null @@ -1,102 +0,0 @@ -// This source code is dual-licensed under the Apache License, version -// 2.0, and the Mozilla Public License, version 2.0. -// -// The APL v2.0: -// -//--------------------------------------------------------------------------- -// Copyright (c) 2007-2020 VMware, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -//--------------------------------------------------------------------------- -// -// The MPL v2.0: -// -//--------------------------------------------------------------------------- -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at https://mozilla.org/MPL/2.0/. -// -// Copyright (c) 2007-2020 VMware, Inc. All rights reserved. -//--------------------------------------------------------------------------- - -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Threading.Tasks; -using Xunit; -using Xunit.Abstractions; - -namespace Test.AsyncIntegration -{ - public class TestExchangeDeclareAsync : AsyncIntegrationFixture - { - public TestExchangeDeclareAsync(ITestOutputHelper output) : base(output) - { - } - - [Fact] - public async Task TestConcurrentExchangeDeclareAndBindAsync() - { - var exchangeNames = new ConcurrentBag(); - var tasks = new List(); - NotSupportedException nse = null; - for (int i = 0; i < 256; i++) - { - async Task f() - { - try - { - await Task.Delay(S_Random.Next(5, 50)); - string exchangeName = GenerateExchangeName(); - await _channel.ExchangeDeclareAsync(exchange: exchangeName, type: "fanout", false, false); - await _channel.ExchangeBindAsync(destination: "amq.fanout", source: exchangeName, routingKey: "unused"); - exchangeNames.Add(exchangeName); - } - catch (NotSupportedException e) - { - nse = e; - } - } - var t = Task.Run(f); - tasks.Add(t); - } - - await AssertRanToCompletion(tasks); - Assert.Null(nse); - tasks.Clear(); - - foreach (string exchangeName in exchangeNames) - { - async Task f() - { - try - { - await Task.Delay(S_Random.Next(5, 50)); - await _channel.ExchangeUnbindAsync(destination: "amq.fanout", source: exchangeName, routingKey: "unused", - noWait: false, arguments: null); - await _channel.ExchangeDeleteAsync(exchange: exchangeName, ifUnused: false); - } - catch (NotSupportedException e) - { - nse = e; - } - } - var t = Task.Run(f); - tasks.Add(t); - } - - await AssertRanToCompletion(tasks); - Assert.Null(nse); - } - } -} diff --git a/projects/Test/AsyncIntegration/TestPassiveDeclareAsync.cs b/projects/Test/AsyncIntegration/TestPassiveDeclareAsync.cs deleted file mode 100644 index 43fd9a24e2..0000000000 --- a/projects/Test/AsyncIntegration/TestPassiveDeclareAsync.cs +++ /dev/null @@ -1,67 +0,0 @@ -// This source code is dual-licensed under the Apache License, version -// 2.0, and the Mozilla Public License, version 2.0. -// -// The APL v2.0: -// -//--------------------------------------------------------------------------- -// Copyright (c) 2007-2020 VMware, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -//--------------------------------------------------------------------------- -// -// The MPL v2.0: -// -//--------------------------------------------------------------------------- -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at https://mozilla.org/MPL/2.0/. -// -// Copyright (c) 2007-2020 VMware, Inc. All rights reserved. -//--------------------------------------------------------------------------- - -using System; -using System.Threading.Tasks; -using RabbitMQ.Client; -using RabbitMQ.Client.Exceptions; -using Xunit; -using Xunit.Abstractions; - -namespace Test.AsyncIntegration -{ - public class TestPassiveDeclareAsync : AsyncIntegrationFixture - { - public TestPassiveDeclareAsync(ITestOutputHelper output) : base(output) - { - } - - [Fact] - public Task TestPassiveExchangeDeclareWhenExchangeDoesNotExist() - { - return Assert.ThrowsAsync(() => - { - return _channel.ExchangeDeclareAsync(exchange: Guid.NewGuid().ToString(), type: ExchangeType.Fanout, - passive: true, durable: true, autoDelete: false); - }); - } - - [Fact] - public Task TestPassiveQueueDeclareWhenQueueDoesNotExist() - { - return Assert.ThrowsAsync(() => - { - return _channel.QueueDeclareAsync(queue: Guid.NewGuid().ToString(), passive: true, - durable: true, exclusive: true, autoDelete: false); - }); - } - } -} diff --git a/projects/Test/AsyncIntegration/TestQueueDeclareAsync.cs b/projects/Test/AsyncIntegration/TestQueueDeclareAsync.cs deleted file mode 100644 index e8092e00ca..0000000000 --- a/projects/Test/AsyncIntegration/TestQueueDeclareAsync.cs +++ /dev/null @@ -1,150 +0,0 @@ -// This source code is dual-licensed under the Apache License, version -// 2.0, and the Mozilla Public License, version 2.0. -// -// The APL v2.0: -// -//--------------------------------------------------------------------------- -// Copyright (c) 2007-2020 VMware, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -//--------------------------------------------------------------------------- -// -// The MPL v2.0: -// -//--------------------------------------------------------------------------- -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at https://mozilla.org/MPL/2.0/. -// -// Copyright (c) 2007-2020 VMware, Inc. All rights reserved. -//--------------------------------------------------------------------------- - -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Threading.Tasks; -using RabbitMQ.Client; -using Xunit; -using Xunit.Abstractions; - -namespace Test.AsyncIntegration -{ - public class TestQueueDeclareAsync : AsyncIntegrationFixture - { - public TestQueueDeclareAsync(ITestOutputHelper output) : base(output) - { - } - - [Fact] - public async void TestQueueDeclare() - { - string q = GenerateQueueName(); - - QueueDeclareOk declareResult = await _channel.QueueDeclareAsync(q, false, false, false); - Assert.Equal(q, declareResult.QueueName); - - QueueDeclareOk passiveDeclareResult = await _channel.QueueDeclarePassiveAsync(q); - Assert.Equal(q, passiveDeclareResult.QueueName); - } - - [Fact] - public async void TestConcurrentQueueDeclareAndBindAsync() - { - bool sawShutdown = false; - - _conn.ConnectionShutdown += (o, ea) => - { - HandleConnectionShutdown(_conn, ea, (args) => - { - if (ea.Initiator == ShutdownInitiator.Peer) - { - sawShutdown = true; - } - }); - }; - - _channel.ChannelShutdown += (o, ea) => - { - HandleChannelShutdown(_channel, ea, (args) => - { - if (args.Initiator == ShutdownInitiator.Peer) - { - sawShutdown = true; - } - }); - }; - - var tasks = new List(); - var queues = new ConcurrentBag(); - - NotSupportedException nse = null; - for (int i = 0; i < 256; i++) - { - async Task f() - { - try - { - // sleep for a random amount of time to increase the chances - // of thread interleaving. MK. - await Task.Delay(S_Random.Next(5, 50)); - QueueDeclareOk r = await _channel.QueueDeclareAsync(queue: string.Empty, false, false, false); - string queueName = r.QueueName; - await _channel.QueueBindAsync(queue: queueName, exchange: "amq.fanout", routingKey: queueName); - queues.Add(queueName); - } - catch (NotSupportedException e) - { - nse = e; - } - } - var t = Task.Run(f); - tasks.Add(t); - } - - await AssertRanToCompletion(tasks); - Assert.Null(nse); - tasks.Clear(); - - nse = null; - foreach (string q in queues) - { - async Task f() - { - string qname = q; - try - { - await Task.Delay(S_Random.Next(5, 50)); - - QueueDeclareOk r = await _channel.QueueDeclarePassiveAsync(qname); - Assert.Equal(qname, r.QueueName); - - await _channel.QueueUnbindAsync(queue: qname, exchange: "amq.fanout", routingKey: qname, null); - - uint deletedMessageCount = await _channel.QueueDeleteAsync(qname, false, false); - Assert.Equal((uint)0, deletedMessageCount); - } - catch (NotSupportedException e) - { - nse = e; - } - } - var t = Task.Run(f); - tasks.Add(t); - } - - await AssertRanToCompletion(tasks); - Assert.Null(nse); - Assert.False(sawShutdown); - } - } -} diff --git a/projects/Test/Common/IntegrationFixtureBase.cs b/projects/Test/Common/IntegrationFixture.cs similarity index 92% rename from projects/Test/Common/IntegrationFixtureBase.cs rename to projects/Test/Common/IntegrationFixture.cs index 68e5098887..0b1d33f153 100644 --- a/projects/Test/Common/IntegrationFixtureBase.cs +++ b/projects/Test/Common/IntegrationFixture.cs @@ -47,7 +47,7 @@ namespace Test { - public abstract class IntegrationFixtureBase : IAsyncLifetime + public abstract class IntegrationFixture : IAsyncLifetime { private static bool s_isRunningInCI = false; private static bool s_isWindows = false; @@ -66,6 +66,10 @@ public abstract class IntegrationFixtureBase : IAsyncLifetime protected readonly ITestOutputHelper _output; protected readonly string _testDisplayName; + protected readonly bool _dispatchConsumersAsync = false; + protected readonly ushort _consumerDispatchConcurrency = 1; + protected readonly bool _openChannel = true; + public static readonly TimeSpan WaitSpan; public static readonly TimeSpan LongWaitSpan; public static readonly TimeSpan RecoveryInterval = TimeSpan.FromSeconds(2); @@ -73,7 +77,7 @@ public abstract class IntegrationFixtureBase : IAsyncLifetime public static readonly TimeSpan RequestedConnectionTimeout = TimeSpan.FromSeconds(1); public static readonly Random S_Random; - static IntegrationFixtureBase() + static IntegrationFixture() { S_Random = new Random(); InitIsRunningInCI(); @@ -93,9 +97,16 @@ static IntegrationFixtureBase() } } - public IntegrationFixtureBase(ITestOutputHelper output) + public IntegrationFixture(ITestOutputHelper output, + bool dispatchConsumersAsync = false, + ushort consumerDispatchConcurrency = 1, + bool openChannel = true) { + _dispatchConsumersAsync = dispatchConsumersAsync; + _consumerDispatchConcurrency = consumerDispatchConcurrency; + _openChannel = openChannel; _output = output; + _rabbitMQCtl = new RabbitMQCtl(_output); Type type = _output.GetType(); @@ -103,7 +114,6 @@ public IntegrationFixtureBase(ITestOutputHelper output) ITest test = (ITest)testMember.GetValue(output); _testDisplayName = test.DisplayName .Replace("Test.", string.Empty) - .Replace("AsyncIntegration.", "AI.") .Replace("Integration.", "I.") .Replace("SequentialI.", "SI."); @@ -121,12 +131,19 @@ public virtual async Task InitializeAsync() if (_connFactory == null) { _connFactory = CreateConnectionFactory(); + _connFactory.DispatchConsumersAsync = _dispatchConsumersAsync; + _connFactory.ConsumerDispatchConcurrency = _consumerDispatchConcurrency; } if (_conn == null) { _conn = await CreateConnectionAsyncWithRetries(_connFactory); - _channel = await _conn.CreateChannelAsync(); + + if (_openChannel) + { + _channel = await _conn.CreateChannelAsync(); + } + AddCallbackHandlers(); } @@ -378,6 +395,16 @@ protected static void AssertPreconditionFailed(ShutdownEventArgs args) AssertShutdownError(args, Constants.PreconditionFailed); } + protected static Task AssertRanToCompletion(params Task[] tasks) + { + return DoAssertRanToCompletion(tasks); + } + + protected static Task AssertRanToCompletion(IEnumerable tasks) + { + return DoAssertRanToCompletion(tasks); + } + protected static Task WaitAsync(TaskCompletionSource tcs, string desc) { return WaitAsync(tcs, WaitSpan, desc); @@ -500,6 +527,13 @@ private static int GetConnectionIdx() return Interlocked.Increment(ref _connectionIdx); } + private static async Task DoAssertRanToCompletion(IEnumerable tasks) + { + Task whenAllTask = Task.WhenAll(tasks); + await whenAllTask; + Assert.True(whenAllTask.IsCompletedSuccessfully()); + } + protected static string GetUniqueString(ushort length) { byte[] bytes = GetRandomBody(length); diff --git a/projects/Test/SequentialIntegration/TestConnectionRecoveryBase.cs b/projects/Test/Common/TestConnectionRecoveryBase.cs similarity index 92% rename from projects/Test/SequentialIntegration/TestConnectionRecoveryBase.cs rename to projects/Test/Common/TestConnectionRecoveryBase.cs index d76298f02c..ee50887c2a 100644 --- a/projects/Test/SequentialIntegration/TestConnectionRecoveryBase.cs +++ b/projects/Test/Common/TestConnectionRecoveryBase.cs @@ -38,25 +38,19 @@ using Xunit; using Xunit.Abstractions; -namespace Test.SequentialIntegration +namespace Test { - public class TestConnectionRecoveryBase : SequentialIntegrationFixture + public class TestConnectionRecoveryBase : IntegrationFixture { protected readonly byte[] _messageBody; - protected const ushort _totalMessageCount = 16384; - protected const ushort _closeAtCount = 16; + protected const ushort TotalMessageCount = 16384; + protected const ushort CloseAtCount = 16; public TestConnectionRecoveryBase(ITestOutputHelper output) : base(output) { _messageBody = GetRandomBody(4096); } - public override async Task DisposeAsync() - { - await UnblockAsync(); - await base.DisposeAsync(); - } - protected Task AssertConsumerCountAsync(string q, int count) { return WithTemporaryChannelAsync(async ch => @@ -125,7 +119,7 @@ internal Task CreateAutorecoveringConnectionAsync() internal async Task CreateAutorecoveringConnectionAsync(TimeSpan networkRecoveryInterval) { - var cf = CreateConnectionFactory(); + ConnectionFactory cf = CreateConnectionFactory(); cf.AutomaticRecoveryEnabled = true; cf.NetworkRecoveryInterval = networkRecoveryInterval; IConnection conn = await cf.CreateConnectionAsync(); @@ -134,7 +128,7 @@ internal async Task CreateAutorecoveringConnectionAsyn internal async Task CreateAutorecoveringConnectionAsync(IList endpoints) { - var cf = CreateConnectionFactory(); + ConnectionFactory cf = CreateConnectionFactory(); cf.AutomaticRecoveryEnabled = true; // tests that use this helper will likely list unreachable hosts, // make sure we time out quickly on those @@ -146,7 +140,7 @@ internal async Task CreateAutorecoveringConnectionAsyn internal async Task CreateAutorecoveringConnectionWithTopologyRecoveryDisabledAsync() { - var cf = CreateConnectionFactory(); + ConnectionFactory cf = CreateConnectionFactory(); cf.AutomaticRecoveryEnabled = true; cf.TopologyRecoveryEnabled = false; cf.NetworkRecoveryInterval = RecoveryInterval; @@ -156,7 +150,7 @@ internal async Task CreateAutorecoveringConnectionWith internal async Task CreateAutorecoveringConnectionWithTopologyRecoveryFilterAsync(TopologyRecoveryFilter filter) { - var cf = CreateConnectionFactory(); + ConnectionFactory cf = CreateConnectionFactory(); cf.AutomaticRecoveryEnabled = true; cf.TopologyRecoveryEnabled = true; cf.TopologyRecoveryFilter = filter; @@ -166,7 +160,7 @@ internal async Task CreateAutorecoveringConnectionWith internal async Task CreateAutorecoveringConnectionWithTopologyRecoveryExceptionHandlerAsync(TopologyRecoveryExceptionHandler handler) { - var cf = CreateConnectionFactory(); + ConnectionFactory cf = CreateConnectionFactory(); cf.AutomaticRecoveryEnabled = true; cf.TopologyRecoveryEnabled = true; cf.TopologyRecoveryExceptionHandler = handler; @@ -212,9 +206,9 @@ protected async Task PublishMessagesWhileClosingConnAsync(string queueName) { using (IChannel publishingChannel = await publishingConn.CreateChannelAsync()) { - for (ushort i = 0; i < _totalMessageCount; i++) + for (ushort i = 0; i < TotalMessageCount; i++) { - if (i == _closeAtCount) + if (i == CloseAtCount) { await CloseConnectionAsync(_conn); } @@ -247,20 +241,6 @@ protected static TaskCompletionSource PrepareForRecovery(IConnection conn) return tcs; } - protected Task RestartServerAndWaitForRecoveryAsync() - { - return RestartServerAndWaitForRecoveryAsync((AutorecoveringConnection)_conn); - } - - private async Task RestartServerAndWaitForRecoveryAsync(AutorecoveringConnection conn) - { - TaskCompletionSource sl = PrepareForShutdown(conn); - TaskCompletionSource rl = PrepareForRecovery(conn); - await RestartRabbitMqAsync(); - await WaitAsync(sl, "connection shutdown"); - await WaitAsync(rl, "connection recovery"); - } - protected static Task WaitForConfirmsWithCancellationAsync(IChannel m) { using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(4))) diff --git a/projects/Test/Integration/IntegrationFixture.cs b/projects/Test/Integration/IntegrationFixture.cs deleted file mode 100644 index e8d389fd8f..0000000000 --- a/projects/Test/Integration/IntegrationFixture.cs +++ /dev/null @@ -1,43 +0,0 @@ -// This source code is dual-licensed under the Apache License, version -// 2.0, and the Mozilla Public License, version 2.0. -// -// The APL v2.0: -// -//--------------------------------------------------------------------------- -// Copyright (c) 2007-2020 VMware, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -//--------------------------------------------------------------------------- -// -// The MPL v2.0: -// -//--------------------------------------------------------------------------- -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at https://mozilla.org/MPL/2.0/. -// -// Copyright (c) 2007-2020 VMware, Inc. All rights reserved. -//--------------------------------------------------------------------------- - -using Xunit.Abstractions; - -namespace Test.Integration -{ - public class IntegrationFixture : IntegrationFixtureBase - { - public IntegrationFixture(ITestOutputHelper output) - : base(output) - { - } - } -} diff --git a/projects/Test/AsyncIntegration/TestAsyncConsumer.cs b/projects/Test/Integration/TestAsyncConsumer.cs similarity index 99% rename from projects/Test/AsyncIntegration/TestAsyncConsumer.cs rename to projects/Test/Integration/TestAsyncConsumer.cs index fbfc24eaa7..be64a4f71a 100644 --- a/projects/Test/AsyncIntegration/TestAsyncConsumer.cs +++ b/projects/Test/Integration/TestAsyncConsumer.cs @@ -37,9 +37,9 @@ using Xunit; using Xunit.Abstractions; -namespace Test.AsyncIntegration +namespace Test.Integration { - public class TestAsyncConsumer : AsyncIntegrationFixture + public class TestAsyncConsumer : IntegrationFixture { private readonly ShutdownEventArgs _closeArgs = new ShutdownEventArgs(ShutdownInitiator.Application, Constants.ReplySuccess, "normal shutdown"); diff --git a/projects/Test/AsyncIntegration/TestAsyncConsumerExceptions.cs b/projects/Test/Integration/TestAsyncConsumerExceptions.cs similarity index 98% rename from projects/Test/AsyncIntegration/TestAsyncConsumerExceptions.cs rename to projects/Test/Integration/TestAsyncConsumerExceptions.cs index 63f35a5fd5..92403dcb37 100644 --- a/projects/Test/AsyncIntegration/TestAsyncConsumerExceptions.cs +++ b/projects/Test/Integration/TestAsyncConsumerExceptions.cs @@ -37,9 +37,9 @@ using Xunit; using Xunit.Abstractions; -namespace Test.AsyncIntegration +namespace Test.Integration { - public class TestAsyncConsumerExceptions : AsyncIntegrationFixture + public class TestAsyncConsumerExceptions : IntegrationFixture { private static readonly Exception TestException = new Exception("oops"); diff --git a/projects/Test/Integration/TestBasicGet.cs b/projects/Test/Integration/TestBasicGet.cs index 254b42e547..b616e81192 100644 --- a/projects/Test/Integration/TestBasicGet.cs +++ b/projects/Test/Integration/TestBasicGet.cs @@ -44,6 +44,25 @@ public TestBasicGet(ITestOutputHelper output) : base(output) { } + [Fact] + public async Task TestBasicGetRoundTrip() + { + const string msg = "for async basic.get"; + + QueueDeclareOk queueResult = await _channel.QueueDeclareAsync(string.Empty, false, true, true); + string queueName = queueResult.QueueName; + + await _channel.BasicPublishAsync(string.Empty, queueName, _encoding.GetBytes(msg), true); + + BasicGetResult getResult = await _channel.BasicGetAsync(queueName, true); + Assert.Equal(msg, _encoding.GetString(getResult.Body.ToArray())); + + QueueDeclareOk queueResultPassive = await _channel.QueueDeclarePassiveAsync(queue: queueName); + Assert.Equal((uint)0, queueResultPassive.MessageCount); + + Assert.Null(await _channel.BasicGetAsync(queueName, true)); + } + [Fact] public Task TestBasicGetWithClosedChannel() { diff --git a/projects/Test/AsyncIntegration/TestBasicPublishAsync.cs b/projects/Test/Integration/TestBasicPublishAsync.cs similarity index 96% rename from projects/Test/AsyncIntegration/TestBasicPublishAsync.cs rename to projects/Test/Integration/TestBasicPublishAsync.cs index 6c58fb3f2f..f2a67ab6ea 100644 --- a/projects/Test/AsyncIntegration/TestBasicPublishAsync.cs +++ b/projects/Test/Integration/TestBasicPublishAsync.cs @@ -34,9 +34,9 @@ using Xunit; using Xunit.Abstractions; -namespace Test.AsyncIntegration +namespace Test.Integration { - public class TestBasicPublishAsync : AsyncIntegrationFixture + public class TestBasicPublishAsync : IntegrationFixture { public TestBasicPublishAsync(ITestOutputHelper output) : base(output) { diff --git a/projects/Test/AsyncIntegration/TestConcurrentAccessWithSharedConnectionAsync.cs b/projects/Test/Integration/TestConcurrentAccessWithSharedConnectionAsync.cs similarity index 99% rename from projects/Test/AsyncIntegration/TestConcurrentAccessWithSharedConnectionAsync.cs rename to projects/Test/Integration/TestConcurrentAccessWithSharedConnectionAsync.cs index e91088b47c..de11787dbf 100644 --- a/projects/Test/AsyncIntegration/TestConcurrentAccessWithSharedConnectionAsync.cs +++ b/projects/Test/Integration/TestConcurrentAccessWithSharedConnectionAsync.cs @@ -38,9 +38,9 @@ using Xunit; using Xunit.Abstractions; -namespace Test.AsyncIntegration +namespace Test.Integration { - public class TestConcurrentAccessWithSharedConnectionAsync : AsyncIntegrationFixture + public class TestConcurrentAccessWithSharedConnectionAsync : IntegrationFixture { private const ushort _messageCount = 200; diff --git a/projects/Test/AsyncIntegration/TestConfirmSelectAsync.cs b/projects/Test/Integration/TestConfirmSelectAsync.cs similarity index 96% rename from projects/Test/AsyncIntegration/TestConfirmSelectAsync.cs rename to projects/Test/Integration/TestConfirmSelectAsync.cs index e755c3e5ef..c3658c1b53 100644 --- a/projects/Test/AsyncIntegration/TestConfirmSelectAsync.cs +++ b/projects/Test/Integration/TestConfirmSelectAsync.cs @@ -34,9 +34,9 @@ using Xunit; using Xunit.Abstractions; -namespace Test.AsyncIntegration +namespace Test.Integration { - public class TestConfirmSelectAsync : AsyncIntegrationFixture + public class TestConfirmSelectAsync : IntegrationFixture { readonly byte[] _message = GetRandomBody(64); diff --git a/projects/Test/Integration/TestConnectionFactory.cs b/projects/Test/Integration/TestConnectionFactory.cs index 6a291f552a..660517040e 100644 --- a/projects/Test/Integration/TestConnectionFactory.cs +++ b/projects/Test/Integration/TestConnectionFactory.cs @@ -29,8 +29,10 @@ // Copyright (c) 2011-2020 VMware, Inc. or its affiliates. All rights reserved. //--------------------------------------------------------------------------- +using System; using System.Collections.Generic; using System.Net.Sockets; +using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client; using RabbitMQ.Client.Exceptions; @@ -389,5 +391,52 @@ public async Task TestCreateConnectionWithHostnameListUsesConnectionFactoryMaxMe await conn.CloseAsync(); } } + + [Fact] + public async Task TestCreateConnectionAsync_WithAlreadyCanceledToken() + { + using (var cts = new CancellationTokenSource()) + { + cts.Cancel(); + + ConnectionFactory cf = CreateConnectionFactory(); + + bool passed = false; + /* + * If anyone wonders why TaskCanceledException is explicitly checked, + * even though it's a subclass of OperationCanceledException: + * https://github.com/rabbitmq/rabbitmq-dotnet-client/commit/383ca5c5f161edb717cf8fae7bf143c13143f634#r135400615 + */ + try + { + await cf.CreateConnectionAsync(cts.Token); + } + catch (TaskCanceledException) + { + passed = true; + } + catch (OperationCanceledException) + { + passed = true; + } + + Assert.True(passed, "FAIL did not see TaskCanceledException nor OperationCanceledException"); + } + } + + [Fact] + public async Task TestCreateConnectionAsync_UsesValidEndpointWhenMultipleSupplied() + { + using (var cts = new CancellationTokenSource(WaitSpan)) + { + ConnectionFactory cf = CreateConnectionFactory(); + var invalidEp = new AmqpTcpEndpoint("not_localhost"); + var ep = new AmqpTcpEndpoint("localhost"); + using (IConnection conn = await cf.CreateConnectionAsync(new List { invalidEp, ep }, cts.Token)) + { + await conn.CloseAsync(); + } + } + } } } diff --git a/projects/Test/Integration/TestConnectionRecovery.cs b/projects/Test/Integration/TestConnectionRecovery.cs new file mode 100644 index 0000000000..89ace073b6 --- /dev/null +++ b/projects/Test/Integration/TestConnectionRecovery.cs @@ -0,0 +1,741 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2020 VMware, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v2.0: +// +//--------------------------------------------------------------------------- +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright (c) 2007-2020 VMware, Inc. All rights reserved. +//--------------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; +using RabbitMQ.Client.Exceptions; +using RabbitMQ.Client.Framing.Impl; +using RabbitMQ.Client.Impl; +using Xunit; +using Xunit.Abstractions; +using QueueDeclareOk = RabbitMQ.Client.QueueDeclareOk; + +namespace Test.Integration +{ + public class TestConnectionRecovery : TestConnectionRecoveryBase + { + private readonly string _queueName; + + public TestConnectionRecovery(ITestOutputHelper output) : base(output) + { + _queueName = $"{nameof(TestConnectionRecovery)}-{Guid.NewGuid()}"; + } + + public override async Task DisposeAsync() + { + ConnectionFactory cf = CreateConnectionFactory(); + cf.ClientProvidedName += "-TearDown"; + using (IConnection conn = await cf.CreateConnectionAsync()) + { + using (IChannel ch = await conn.CreateChannelAsync()) + { + await ch.QueueDeleteAsync(_queueName); + await ch.CloseAsync(); + } + await conn.CloseAsync(); + } + + await base.DisposeAsync(); + } + + [Fact] + public async Task TestBasicAckAfterChannelRecovery() + { + var allMessagesSeenTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var cons = new AckingBasicConsumer(_channel, TotalMessageCount, allMessagesSeenTcs); + + QueueDeclareOk q = await _channel.QueueDeclareAsync(_queueName, false, false, false); + string queueName = q.QueueName; + Assert.Equal(queueName, _queueName); + + await _channel.BasicQosAsync(0, 1, false); + await _channel.BasicConsumeAsync(queueName, false, cons); + + TaskCompletionSource sl = PrepareForShutdown(_conn); + TaskCompletionSource rl = PrepareForRecovery(_conn); + + await PublishMessagesWhileClosingConnAsync(queueName); + + await WaitAsync(sl, "connection shutdown"); + await WaitAsync(rl, "connection recovery"); + await WaitAsync(allMessagesSeenTcs, "all messages seen"); + } + + [Fact] + public async Task TestBasicNackAfterChannelRecovery() + { + var allMessagesSeenTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var cons = new NackingBasicConsumer(_channel, TotalMessageCount, allMessagesSeenTcs); + + QueueDeclareOk q = await _channel.QueueDeclareAsync(_queueName, false, false, false); + string queueName = q.QueueName; + Assert.Equal(queueName, _queueName); + + await _channel.BasicQosAsync(0, 1, false); + await _channel.BasicConsumeAsync(queueName, false, cons); + + TaskCompletionSource sl = PrepareForShutdown(_conn); + TaskCompletionSource rl = PrepareForRecovery(_conn); + + await PublishMessagesWhileClosingConnAsync(queueName); + + await WaitAsync(sl, "connection shutdown"); + await WaitAsync(rl, "connection recovery"); + await WaitAsync(allMessagesSeenTcs, "all messages seen"); + } + + [Fact] + public async Task TestBasicRejectAfterChannelRecovery() + { + var allMessagesSeenTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var cons = new RejectingBasicConsumer(_channel, TotalMessageCount, allMessagesSeenTcs); + + string queueName = (await _channel.QueueDeclareAsync(_queueName, false, false, false)).QueueName; + Assert.Equal(queueName, _queueName); + + await _channel.BasicQosAsync(0, 1, false); + await _channel.BasicConsumeAsync(queueName, false, cons); + + TaskCompletionSource sl = PrepareForShutdown(_conn); + TaskCompletionSource rl = PrepareForRecovery(_conn); + + await PublishMessagesWhileClosingConnAsync(queueName); + + await WaitAsync(sl, "connection shutdown"); + await WaitAsync(rl, "connection recovery"); + await WaitAsync(allMessagesSeenTcs, "all messages seen"); + } + + [Fact] + public async Task TestBasicAckAfterBasicGetAndChannelRecovery() + { + string q = GenerateQueueName(); + await _channel.QueueDeclareAsync(q, false, false, false); + // create an offset + await _channel.BasicPublishAsync("", q, _messageBody); + await Task.Delay(50); + BasicGetResult g = await _channel.BasicGetAsync(q, false); + await CloseAndWaitForRecoveryAsync(); + Assert.True(_conn.IsOpen); + Assert.True(_channel.IsOpen); + // ack the message after recovery - this should be out of range and ignored + await _channel.BasicAckAsync(g.DeliveryTag, false); + // do a sync operation to 'check' there is no channel exception + await _channel.BasicGetAsync(q, false); + } + + [Fact] + public async Task TestBasicAckEventHandlerRecovery() + { + await _channel.ConfirmSelectAsync(); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + ((AutorecoveringChannel)_channel).BasicAcks += (m, args) => tcs.SetResult(true); + ((AutorecoveringChannel)_channel).BasicNacks += (m, args) => tcs.SetResult(true); + + await CloseAndWaitForRecoveryAsync(); + await CloseAndWaitForRecoveryAsync(); + Assert.True(_channel.IsOpen); + + await WithTemporaryNonExclusiveQueueAsync(_channel, (ch, q) => + { + return ch.BasicPublishAsync("", q, _messageBody).AsTask(); + }); + + await WaitAsync(tcs, "basic acks/nacks"); + } + + [Fact] + public async Task TestBasicConnectionRecovery() + { + Assert.True(_conn.IsOpen); + await CloseAndWaitForRecoveryAsync(); + Assert.True(_conn.IsOpen); + } + + [Fact] + public async Task TestBasicChannelRecovery() + { + Assert.True(_channel.IsOpen); + await CloseAndWaitForRecoveryAsync(); + Assert.True(_channel.IsOpen); + } + + [Fact] + public Task TestClientNamedQueueRecovery() + { + string s = "dotnet-client.test.recovery.q1"; + return WithTemporaryNonExclusiveQueueAsync(_channel, async (m, q) => + { + await CloseAndWaitForRecoveryAsync(); + await AssertQueueRecoveryAsync(m, q, false); + await _channel.QueueDeleteAsync(q); + }, s); + } + + [Fact] + public Task TestClientNamedQueueRecoveryNoWait() + { + string s = "dotnet-client.test.recovery.q1-nowait"; + return WithTemporaryExclusiveQueueNoWaitAsync(_channel, async (ch, q) => + { + await CloseAndWaitForRecoveryAsync(); + await AssertExclusiveQueueRecoveryAsync(ch, q); + }, s); + } + + [Fact] + public async Task TestConsumerRecoveryWithManyConsumers() + { + string q = (await _channel.QueueDeclareAsync(GenerateQueueName(), false, false, false)).QueueName; + int n = 1024; + + for (int i = 0; i < n; i++) + { + var cons = new EventingBasicConsumer(_channel); + await _channel.BasicConsumeAsync(q, true, cons); + } + + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + ((AutorecoveringConnection)_conn).ConsumerTagChangeAfterRecovery += (prev, current) => tcs.SetResult(true); + + await CloseAndWaitForRecoveryAsync(); + await WaitAsync(tcs, "consumer tag change after recovery"); + Assert.True(_channel.IsOpen); + await AssertConsumerCountAsync(q, n); + } + + [Fact] + public async Task TestDeclarationOfManyAutoDeleteExchangesWithTransientExchangesThatAreDeleted() + { + AssertRecordedExchanges((AutorecoveringConnection)_conn, 0); + for (int i = 0; i < 3; i++) + { + string x1 = $"source-{Guid.NewGuid()}"; + await _channel.ExchangeDeclareAsync(x1, "fanout", false, true); + + string x2 = $"destination-{Guid.NewGuid()}"; + await _channel.ExchangeDeclareAsync(x2, "fanout", false, false); + + await _channel.ExchangeBindAsync(x2, x1, ""); + await _channel.ExchangeDeleteAsync(x2); + } + AssertRecordedExchanges((AutorecoveringConnection)_conn, 0); + } + + [Fact] + public async Task TestDeclarationOfManyAutoDeleteExchangesWithTransientExchangesThatAreUnbound() + { + AssertRecordedExchanges((AutorecoveringConnection)_conn, 0); + for (int i = 0; i < 1000; i++) + { + string x1 = $"source-{Guid.NewGuid()}"; + await _channel.ExchangeDeclareAsync(x1, "fanout", false, true); + string x2 = $"destination-{Guid.NewGuid()}"; + await _channel.ExchangeDeclareAsync(x2, "fanout", false, false); + await _channel.ExchangeBindAsync(x2, x1, ""); + await _channel.ExchangeUnbindAsync(x2, x1, ""); + await _channel.ExchangeDeleteAsync(x2); + } + AssertRecordedExchanges((AutorecoveringConnection)_conn, 0); + } + + [Fact] + public async Task TestDeclarationOfManyAutoDeleteExchangesWithTransientQueuesThatAreDeleted() + { + AssertRecordedExchanges((AutorecoveringConnection)_conn, 0); + for (int i = 0; i < 1000; i++) + { + string x = Guid.NewGuid().ToString(); + await _channel.ExchangeDeclareAsync(x, "fanout", false, true); + RabbitMQ.Client.QueueDeclareOk q = await _channel.QueueDeclareAsync(); + await _channel.QueueBindAsync(q, x, ""); + await _channel.QueueDeleteAsync(q); + } + AssertRecordedExchanges((AutorecoveringConnection)_conn, 0); + } + + [Fact] + public async Task TestDeclarationOfManyAutoDeleteExchangesWithTransientQueuesThatAreUnbound() + { + AssertRecordedExchanges((AutorecoveringConnection)_conn, 0); + for (int i = 0; i < 1000; i++) + { + string x = Guid.NewGuid().ToString(); + await _channel.ExchangeDeclareAsync(x, "fanout", false, true); + RabbitMQ.Client.QueueDeclareOk q = await _channel.QueueDeclareAsync(); + await _channel.QueueBindAsync(q, x, ""); + await _channel.QueueUnbindAsync(q, x, "", null); + } + AssertRecordedExchanges((AutorecoveringConnection)_conn, 0); + } + + [Fact] + public async Task TestDeclarationOfManyAutoDeleteQueuesWithTransientConsumer() + { + AssertRecordedQueues((AutorecoveringConnection)_conn, 0); + for (int i = 0; i < 1000; i++) + { + string q = Guid.NewGuid().ToString(); + await _channel.QueueDeclareAsync(q, false, false, true); + var dummy = new EventingBasicConsumer(_channel); + string tag = await _channel.BasicConsumeAsync(q, true, dummy); + await _channel.BasicCancelAsync(tag); + } + AssertRecordedQueues((AutorecoveringConnection)_conn, 0); + } + + [Fact] + public async Task TestExchangeRecovery() + { + string x = "dotnet-client.test.recovery.x1"; + await DeclareNonDurableExchangeAsync(_channel, x); + await CloseAndWaitForRecoveryAsync(); + await AssertExchangeRecoveryAsync(_channel, x); + await _channel.ExchangeDeleteAsync(x); + } + + [Fact] + public async Task TestExchangeToExchangeBindingRecovery() + { + string q = (await _channel.QueueDeclareAsync("", false, false, false)).QueueName; + string x1 = "amq.fanout"; + string x2 = GenerateExchangeName(); + + await _channel.ExchangeDeclareAsync(x2, "fanout"); + await _channel.ExchangeBindAsync(x1, x2, ""); + await _channel.QueueBindAsync(q, x1, ""); + + try + { + await CloseAndWaitForRecoveryAsync(); + Assert.True(_channel.IsOpen); + await _channel.BasicPublishAsync(x2, "", _encoding.GetBytes("msg")); + await AssertMessageCountAsync(q, 1); + } + finally + { + await WithTemporaryChannelAsync(async ch => + { + await ch.ExchangeDeleteAsync(x2); + await ch.QueueDeleteAsync(q); + }); + } + } + + [Fact] + public async Task TestQueueRecoveryWithManyQueues() + { + var qs = new List(); + int n = 1024; + for (int i = 0; i < n; i++) + { + QueueDeclareOk q = await _channel.QueueDeclareAsync(GenerateQueueName(), false, false, false); + qs.Add(q.QueueName); + } + await CloseAndWaitForRecoveryAsync(); + Assert.True(_channel.IsOpen); + foreach (string q in qs) + { + await AssertQueueRecoveryAsync(_channel, q, false); + await _channel.QueueDeleteAsync(q); + } + } + + [Fact] + public async Task TestRecoveryEventHandlersOnConnection() + { + int counter = 0; + ((AutorecoveringConnection)_conn).RecoverySucceeded += (source, ea) => Interlocked.Increment(ref counter); + + await CloseAndWaitForRecoveryAsync(); + await CloseAndWaitForRecoveryAsync(); + await CloseAndWaitForRecoveryAsync(); + await CloseAndWaitForRecoveryAsync(); + Assert.True(_conn.IsOpen); + Assert.True(counter >= 3); + } + + [Fact] + public async Task TestRecoveryEventHandlersOnChannel() + { + int counter = 0; + ((AutorecoveringChannel)_channel).Recovery += (source, ea) => Interlocked.Increment(ref counter); + + await CloseAndWaitForRecoveryAsync(); + await CloseAndWaitForRecoveryAsync(); + await CloseAndWaitForRecoveryAsync(); + await CloseAndWaitForRecoveryAsync(); + Assert.True(_channel.IsOpen); + Assert.True(counter >= 3); + } + + [Theory] + [InlineData(1)] + [InlineData(3)] + public async Task TestRecoveringConsumerHandlerOnConnection(int iterations) + { + string q = (await _channel.QueueDeclareAsync(GenerateQueueName(), false, false, false)).QueueName; + var cons = new EventingBasicConsumer(_channel); + await _channel.BasicConsumeAsync(q, true, cons); + + int counter = 0; + ((AutorecoveringConnection)_conn).RecoveringConsumer += (sender, args) => Interlocked.Increment(ref counter); + + for (int i = 0; i < iterations; i++) + { + await CloseAndWaitForRecoveryAsync(); + } + + Assert.Equal(iterations, counter); + } + + [Fact] + public async Task TestRecoveringConsumerHandlerOnConnection_EventArgumentsArePassedDown() + { + var myArgs = new Dictionary { { "first-argument", "some-value" } }; + string q = (await _channel.QueueDeclareAsync(GenerateQueueName(), false, false, false)).QueueName; + var cons = new EventingBasicConsumer(_channel); + string expectedCTag = await _channel.BasicConsumeAsync(cons, q, arguments: myArgs); + + bool ctagMatches = false; + bool consumerArgumentMatches = false; + ((AutorecoveringConnection)_conn).RecoveringConsumer += (sender, args) => + { + // We cannot assert here because NUnit throws when an assertion fails. This exception is caught and + // passed to a CallbackExceptionHandler, instead of failing the test. Instead, we have to do this trick + // and assert in the test function. + ctagMatches = args.ConsumerTag == expectedCTag; + consumerArgumentMatches = (string)args.ConsumerArguments["first-argument"] == "some-value"; + args.ConsumerArguments["first-argument"] = "event-handler-set-this-value"; + }; + + await CloseAndWaitForRecoveryAsync(); + Assert.True(ctagMatches, "expected consumer tag to match"); + Assert.True(consumerArgumentMatches, "expected consumer arguments to match"); + string actualVal = (string)Assert.Contains("first-argument", myArgs as IDictionary); + Assert.Equal("event-handler-set-this-value", actualVal); + } + + [Fact] + public async Task TestServerNamedQueueRecovery() + { + string q = (await _channel.QueueDeclareAsync("", false, false, false)).QueueName; + string x = "amq.fanout"; + await _channel.QueueBindAsync(q, x, ""); + + string nameBefore = q; + string nameAfter = null; + + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var connection = (AutorecoveringConnection)_conn; + connection.RecoverySucceeded += (source, ea) => tcs.SetResult(true); + connection.QueueNameChangedAfterRecovery += (source, ea) => { nameAfter = ea.NameAfter; }; + + await CloseAndWaitForRecoveryAsync(); + await WaitAsync(tcs, "recovery succeeded"); + + Assert.NotNull(nameAfter); + Assert.StartsWith("amq.", nameBefore); + Assert.StartsWith("amq.", nameAfter); + Assert.NotEqual(nameBefore, nameAfter); + + await _channel.QueueDeclarePassiveAsync(nameAfter); + } + + [Fact] + public async Task TestShutdownEventHandlersRecoveryOnConnection() + { + int counter = 0; + _conn.ConnectionShutdown += (c, args) => Interlocked.Increment(ref counter); + + Assert.True(_conn.IsOpen); + await CloseAndWaitForRecoveryAsync(); + await CloseAndWaitForRecoveryAsync(); + await CloseAndWaitForRecoveryAsync(); + await CloseAndWaitForRecoveryAsync(); + Assert.True(_conn.IsOpen); + + Assert.True(counter >= 3); + } + + [Fact] + public async Task TestShutdownEventHandlersRecoveryOnChannel() + { + int counter = 0; + _channel.ChannelShutdown += (c, args) => Interlocked.Increment(ref counter); + + Assert.True(_channel.IsOpen); + await CloseAndWaitForRecoveryAsync(); + await CloseAndWaitForRecoveryAsync(); + await CloseAndWaitForRecoveryAsync(); + await CloseAndWaitForRecoveryAsync(); + Assert.True(_channel.IsOpen); + + Assert.True(counter >= 3); + } + + [Fact] + public async Task TestPublishRpcRightAfterReconnect() + { + string testQueueName = $"dotnet-client.test.{nameof(TestPublishRpcRightAfterReconnect)}"; + await _channel.QueueDeclareAsync(testQueueName, false, false, false); + var replyConsumer = new EventingBasicConsumer(_channel); + await _channel.BasicConsumeAsync("amq.rabbitmq.reply-to", true, replyConsumer); + var properties = new BasicProperties(); + properties.ReplyTo = "amq.rabbitmq.reply-to"; + + TimeSpan doneSpan = TimeSpan.FromMilliseconds(100); + var doneTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + Task closeTask = Task.Run(async () => + { + try + { + + await CloseAndWaitForRecoveryAsync(); + } + finally + { + doneTcs.SetResult(true); + } + }); + + while (false == doneTcs.Task.IsCompletedSuccessfully()) + { + try + { + await _channel.BasicPublishAsync(string.Empty, testQueueName, properties, _messageBody); + } + catch (Exception e) + { + if (e is AlreadyClosedException a) + { + // 406 is received, when the reply consumer isn't yet recovered + Assert.NotEqual(406, a.ShutdownReason.ReplyCode); + } + } + + try + { + await doneTcs.Task.WaitAsync(doneSpan); + } + catch (TimeoutException) + { + } + } + + await closeTask; + } + + [Fact] + public async Task TestThatCancelledConsumerDoesNotReappearOnRecovery() + { + string q = (await _channel.QueueDeclareAsync(GenerateQueueName(), false, false, false)).QueueName; + int n = 1024; + + for (int i = 0; i < n; i++) + { + var cons = new EventingBasicConsumer(_channel); + string tag = await _channel.BasicConsumeAsync(q, true, cons); + await _channel.BasicCancelAsync(tag); + } + await CloseAndWaitForRecoveryAsync(); + Assert.True(_channel.IsOpen); + await AssertConsumerCountAsync(q, 0); + } + + [Fact] + public async Task TestThatDeletedExchangeBindingsDontReappearOnRecovery() + { + string q = (await _channel.QueueDeclareAsync("", false, false, false)).QueueName; + string x1 = "amq.fanout"; + string x2 = GenerateExchangeName(); + + await _channel.ExchangeDeclareAsync(x2, "fanout"); + await _channel.ExchangeBindAsync(x1, x2, ""); + await _channel.QueueBindAsync(q, x1, ""); + await _channel.ExchangeUnbindAsync(x1, x2, ""); + + try + { + await CloseAndWaitForRecoveryAsync(); + Assert.True(_channel.IsOpen); + await _channel.BasicPublishAsync(x2, "", _encoding.GetBytes("msg")); + await AssertMessageCountAsync(q, 0); + } + finally + { + await WithTemporaryChannelAsync(async ch => + { + await ch.ExchangeDeleteAsync(x2); + await ch.QueueDeleteAsync(q); + }); + } + } + + [Fact] + public async Task TestThatDeletedExchangesDontReappearOnRecovery() + { + string x = GenerateExchangeName(); + await _channel.ExchangeDeclareAsync(x, "fanout"); + await _channel.ExchangeDeleteAsync(x); + + try + { + await CloseAndWaitForRecoveryAsync(); + Assert.True(_channel.IsOpen); + await _channel.ExchangeDeclarePassiveAsync(x); + Assert.Fail("Expected an exception"); + } + catch (OperationInterruptedException e) + { + // expected + AssertShutdownError(e.ShutdownReason, 404); + } + } + + [Fact] + public async Task TestThatDeletedQueueBindingsDontReappearOnRecovery() + { + string q = (await _channel.QueueDeclareAsync("", false, false, false)).QueueName; + string x1 = "amq.fanout"; + string x2 = GenerateExchangeName(); + + await _channel.ExchangeDeclareAsync(x2, "fanout"); + await _channel.ExchangeBindAsync(x1, x2, ""); + await _channel.QueueBindAsync(q, x1, ""); + await _channel.QueueUnbindAsync(q, x1, "", null); + + try + { + await CloseAndWaitForRecoveryAsync(); + Assert.True(_channel.IsOpen); + await _channel.BasicPublishAsync(x2, "", _encoding.GetBytes("msg")); + await AssertMessageCountAsync(q, 0); + } + finally + { + await WithTemporaryChannelAsync(async ch => + { + await ch.ExchangeDeleteAsync(x2); + await ch.QueueDeleteAsync(q); + }); + } + } + + [Fact] + public async Task TestThatDeletedQueuesDontReappearOnRecovery() + { + string q = "dotnet-client.recovery.q1"; + await _channel.QueueDeclareAsync(q, false, false, false); + await _channel.QueueDeleteAsync(q); + + try + { + await CloseAndWaitForRecoveryAsync(); + Assert.True(_channel.IsOpen); + await _channel.QueueDeclarePassiveAsync(q); + Assert.Fail("Expected an exception"); + } + catch (OperationInterruptedException e) + { + // expected + AssertShutdownError(e.ShutdownReason, 404); + } + } + + [Fact] + public async Task TestBindingRecovery_GH1035() + { + const string routingKey = "unused"; + byte[] body = GetRandomBody(); + + var receivedMessageSemaphore = new SemaphoreSlim(0, 1); + + void MessageReceived(object sender, BasicDeliverEventArgs e) + { + receivedMessageSemaphore.Release(); + } + + string exchangeName = $"ex-gh-1035-{Guid.NewGuid()}"; + string queueName = $"q-gh-1035-{Guid.NewGuid()}"; + + await _channel.ExchangeDeclareAsync(exchange: exchangeName, + type: "fanout", durable: false, autoDelete: true, + arguments: null); + + QueueDeclareOk q0 = await _channel.QueueDeclareAsync(queue: queueName, exclusive: true); + Assert.Equal(queueName, q0); + + await _channel.QueueBindAsync(queue: queueName, exchange: exchangeName, routingKey: routingKey); + + await _channel.CloseAsync(); + _channel.Dispose(); + _channel = null; + + _channel = await _conn.CreateChannelAsync(); + + await _channel.ExchangeDeclareAsync(exchange: exchangeName, + type: "fanout", durable: false, autoDelete: true, + arguments: null); + + QueueDeclareOk q1 = await _channel.QueueDeclareAsync(queue: queueName, exclusive: true); + Assert.Equal(queueName, q1.QueueName); + + await _channel.QueueBindAsync(queue: queueName, exchange: exchangeName, routingKey: routingKey); + + var c = new EventingBasicConsumer(_channel); + c.Received += MessageReceived; + await _channel.BasicConsumeAsync(queue: queueName, autoAck: true, consumer: c); + + using (IChannel pubCh = await _conn.CreateChannelAsync()) + { + await pubCh.BasicPublishAsync(exchange: exchangeName, routingKey: routingKey, body: body); + await pubCh.CloseAsync(); + } + + Assert.True(await receivedMessageSemaphore.WaitAsync(WaitSpan)); + + await CloseAndWaitForRecoveryAsync(); + + using (IChannel pubCh = await _conn.CreateChannelAsync()) + { + await pubCh.BasicPublishAsync(exchange: exchangeName, routingKey: "unused", body: body); + await pubCh.CloseAsync(); + } + + Assert.True(await receivedMessageSemaphore.WaitAsync(WaitSpan)); + } + } +} diff --git a/projects/Test/SequentialIntegration/TestConnectionRecoveryWithoutSetup.cs b/projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs similarity index 99% rename from projects/Test/SequentialIntegration/TestConnectionRecoveryWithoutSetup.cs rename to projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs index a9c088dff0..783da332c4 100644 --- a/projects/Test/SequentialIntegration/TestConnectionRecoveryWithoutSetup.cs +++ b/projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs @@ -39,7 +39,7 @@ using Xunit; using Xunit.Abstractions; -namespace Test.SequentialIntegration +namespace Test.Integration { public class TestConnectionRecoveryWithoutSetup : TestConnectionRecoveryBase { @@ -251,7 +251,6 @@ public async Task TestCreateChannelOnClosedAutorecoveringConnectionDoesNotHang() } finally { - await StartRabbitMqAsync(); await conn.CloseAsync(); } } diff --git a/projects/Test/SequentialIntegration/TestConnectionTopologyRecovery.cs b/projects/Test/Integration/TestConnectionTopologyRecovery.cs similarity index 99% rename from projects/Test/SequentialIntegration/TestConnectionTopologyRecovery.cs rename to projects/Test/Integration/TestConnectionTopologyRecovery.cs index 5c6d3f0725..8abf11b4fc 100644 --- a/projects/Test/SequentialIntegration/TestConnectionTopologyRecovery.cs +++ b/projects/Test/Integration/TestConnectionTopologyRecovery.cs @@ -40,7 +40,7 @@ using Xunit; using Xunit.Abstractions; -namespace Test.SequentialIntegration +namespace Test.Integration { public class TestConnectionTopologyRecovery : TestConnectionRecoveryBase { @@ -116,7 +116,7 @@ public async Task TestTopologyRecoveryQueueFilter() } catch (OperationInterruptedException e) { - IntegrationFixtureBase.AssertShutdownError(e.ShutdownReason, 404); + IntegrationFixture.AssertShutdownError(e.ShutdownReason, 404); } finally { diff --git a/projects/Test/Integration/TestConsumerOperationDispatch.cs b/projects/Test/Integration/TestConsumerOperationDispatch.cs index 2340ca60e0..575092fac4 100644 --- a/projects/Test/Integration/TestConsumerOperationDispatch.cs +++ b/projects/Test/Integration/TestConsumerOperationDispatch.cs @@ -105,7 +105,7 @@ public override Task HandleBasicDeliverAsync(string consumerTag, [SkippableFact] public async Task TestDeliveryOrderingWithSingleChannel() { - Skip.If(IntegrationFixture.IsRunningInCI && IntegrationFixtureBase.IsWindows, "TODO - test is slow in CI on Windows"); + Skip.If(IntegrationFixture.IsRunningInCI && IntegrationFixture.IsWindows, "TODO - test is slow in CI on Windows"); await _channel.ExchangeDeclareAsync(_x, "fanout", durable: false); diff --git a/projects/Test/Integration/TestExchangeDeclare.cs b/projects/Test/Integration/TestExchangeDeclare.cs index 3fdbb45462..0c803ebf98 100644 --- a/projects/Test/Integration/TestExchangeDeclare.cs +++ b/projects/Test/Integration/TestExchangeDeclare.cs @@ -30,6 +30,7 @@ //--------------------------------------------------------------------------- using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading.Tasks; using Xunit; @@ -45,6 +46,61 @@ public TestExchangeDeclare(ITestOutputHelper output) : base(output) { } + [Fact] + public async Task TestConcurrentExchangeDeclareAndBind() + { + var exchangeNames = new ConcurrentBag(); + var tasks = new List(); + NotSupportedException nse = null; + for (int i = 0; i < 256; i++) + { + async Task f() + { + try + { + await Task.Delay(S_Random.Next(5, 50)); + string exchangeName = GenerateExchangeName(); + await _channel.ExchangeDeclareAsync(exchange: exchangeName, type: "fanout", false, false); + await _channel.ExchangeBindAsync(destination: "amq.fanout", source: exchangeName, routingKey: "unused"); + exchangeNames.Add(exchangeName); + } + catch (NotSupportedException e) + { + nse = e; + } + } + var t = Task.Run(f); + tasks.Add(t); + } + + await AssertRanToCompletion(tasks); + Assert.Null(nse); + tasks.Clear(); + + foreach (string exchangeName in exchangeNames) + { + async Task f() + { + try + { + await Task.Delay(S_Random.Next(5, 50)); + await _channel.ExchangeUnbindAsync(destination: "amq.fanout", source: exchangeName, routingKey: "unused", + noWait: false, arguments: null); + await _channel.ExchangeDeleteAsync(exchange: exchangeName, ifUnused: false); + } + catch (NotSupportedException e) + { + nse = e; + } + } + var t = Task.Run(f); + tasks.Add(t); + } + + await AssertRanToCompletion(tasks); + Assert.Null(nse); + } + [Fact] public async Task TestConcurrentExchangeDeclareAndDelete() { diff --git a/projects/Test/AsyncIntegration/TestExtensionsAsync.cs b/projects/Test/Integration/TestExtensions.cs similarity index 93% rename from projects/Test/AsyncIntegration/TestExtensionsAsync.cs rename to projects/Test/Integration/TestExtensions.cs index 22a8ec1ff4..3d0cfe672d 100644 --- a/projects/Test/AsyncIntegration/TestExtensionsAsync.cs +++ b/projects/Test/Integration/TestExtensions.cs @@ -35,11 +35,11 @@ using Xunit; using Xunit.Abstractions; -namespace Test.AsyncIntegration +namespace Test.Integration { - public class TestExtensionsAsync : AsyncIntegrationFixture + public class TestExtensions : IntegrationFixture { - public TestExtensionsAsync(ITestOutputHelper output) : base(output) + public TestExtensions(ITestOutputHelper output) : base(output) { } @@ -61,7 +61,7 @@ public async Task TestConfirmBeforeWait() } [Fact] - public async Task TestExchangeBindingAsync() + public async Task TestExchangeBinding() { await _channel.ConfirmSelectAsync(); diff --git a/projects/Test/AsyncIntegration/TestFloodPublishingAsync.cs b/projects/Test/Integration/TestFloodPublishing.cs similarity index 96% rename from projects/Test/AsyncIntegration/TestFloodPublishingAsync.cs rename to projects/Test/Integration/TestFloodPublishing.cs index 98a626b459..11c0bec638 100644 --- a/projects/Test/AsyncIntegration/TestFloodPublishingAsync.cs +++ b/projects/Test/Integration/TestFloodPublishing.cs @@ -38,14 +38,14 @@ using Xunit; using Xunit.Abstractions; -namespace Test.AsyncIntegration +namespace Test.Integration { - public class TestFloodPublishingAsync : AsyncIntegrationFixture + public class TestFloodPublishing : IntegrationFixture { private static readonly TimeSpan TenSeconds = TimeSpan.FromSeconds(10); private readonly byte[] _body = GetRandomBody(2048); - public TestFloodPublishingAsync(ITestOutputHelper output) : base(output) + public TestFloodPublishing(ITestOutputHelper output) : base(output) { } @@ -56,7 +56,7 @@ public override Task InitializeAsync() } [Fact] - public async Task TestUnthrottledFloodPublishingAsync() + public async Task TestUnthrottledFloodPublishing() { bool sawUnexpectedShutdown = false; _connFactory = CreateConnectionFactory(); @@ -115,7 +115,7 @@ public async Task TestUnthrottledFloodPublishingAsync() } [Fact] - public async Task TestMultithreadFloodPublishingAsync() + public async Task TestMultithreadFloodPublishing() { _connFactory = CreateConnectionFactory(); _connFactory.DispatchConsumersAsync = true; diff --git a/projects/Test/AsyncIntegration/TestMessageCountAsync.cs b/projects/Test/Integration/TestMessageCount.cs similarity index 92% rename from projects/Test/AsyncIntegration/TestMessageCountAsync.cs rename to projects/Test/Integration/TestMessageCount.cs index e40dc79200..845dce6461 100644 --- a/projects/Test/AsyncIntegration/TestMessageCountAsync.cs +++ b/projects/Test/Integration/TestMessageCount.cs @@ -34,11 +34,11 @@ using Xunit; using Xunit.Abstractions; -namespace Test.AsyncIntegration +namespace Test.Integration { - public class TestMessageCountAsync : AsyncIntegrationFixture + public class TestMessageCount : IntegrationFixture { - public TestMessageCountAsync(ITestOutputHelper output) : base(output) + public TestMessageCount(ITestOutputHelper output) : base(output) { } diff --git a/projects/Test/AsyncIntegration/TestPublishSharedChannelAsync.cs b/projects/Test/Integration/TestPublishSharedChannelAsync.cs similarity index 97% rename from projects/Test/AsyncIntegration/TestPublishSharedChannelAsync.cs rename to projects/Test/Integration/TestPublishSharedChannelAsync.cs index 067a097800..d132d51339 100644 --- a/projects/Test/AsyncIntegration/TestPublishSharedChannelAsync.cs +++ b/projects/Test/Integration/TestPublishSharedChannelAsync.cs @@ -35,9 +35,9 @@ using Xunit; using Xunit.Abstractions; -namespace Test.AsyncIntegration +namespace Test.Integration { - public class TestPublishSharedChannelAsync : AsyncIntegrationFixture + public class TestPublishSharedChannelAsync : IntegrationFixture { private const string QueueName = "TestPublishSharedChannel_Queue"; private static readonly CachedString ExchangeName = new CachedString("TestPublishSharedChannel_Ex"); diff --git a/projects/Test/AsyncIntegration/TestPublisherConfirmsAsync.cs b/projects/Test/Integration/TestPublisherConfirms.cs similarity index 97% rename from projects/Test/AsyncIntegration/TestPublisherConfirmsAsync.cs rename to projects/Test/Integration/TestPublisherConfirms.cs index 8385a5ea7b..4b24d7c997 100644 --- a/projects/Test/AsyncIntegration/TestPublisherConfirmsAsync.cs +++ b/projects/Test/Integration/TestPublisherConfirms.cs @@ -38,13 +38,13 @@ using Xunit; using Xunit.Abstractions; -namespace Test.AsyncIntegration +namespace Test.Integration { - public class TestPublisherConfirmsAsync : AsyncIntegrationFixture + public class TestPublisherConfirms : IntegrationFixture { private readonly byte[] _messageBody; - public TestPublisherConfirmsAsync(ITestOutputHelper output) + public TestPublisherConfirms(ITestOutputHelper output) : base(output, openChannel: false) { _messageBody = GetRandomBody(4096); diff --git a/projects/Test/Integration/TestQueueDeclare.cs b/projects/Test/Integration/TestQueueDeclare.cs index 9d461873ea..6c797bc0c0 100644 --- a/projects/Test/Integration/TestQueueDeclare.cs +++ b/projects/Test/Integration/TestQueueDeclare.cs @@ -30,6 +30,7 @@ //--------------------------------------------------------------------------- using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading.Tasks; using RabbitMQ.Client; @@ -44,6 +45,108 @@ public TestQueueDeclare(ITestOutputHelper output) : base(output) { } + [Fact] + public async void TestQueueDeclareAsync() + { + string q = GenerateQueueName(); + + QueueDeclareOk declareResult = await _channel.QueueDeclareAsync(q, false, false, false); + Assert.Equal(q, declareResult.QueueName); + + QueueDeclareOk passiveDeclareResult = await _channel.QueueDeclarePassiveAsync(q); + Assert.Equal(q, passiveDeclareResult.QueueName); + } + + [Fact] + public async void TestConcurrentQueueDeclareAndBindAsync() + { + bool sawShutdown = false; + + _conn.ConnectionShutdown += (o, ea) => + { + HandleConnectionShutdown(_conn, ea, (args) => + { + if (ea.Initiator == ShutdownInitiator.Peer) + { + sawShutdown = true; + } + }); + }; + + _channel.ChannelShutdown += (o, ea) => + { + HandleChannelShutdown(_channel, ea, (args) => + { + if (args.Initiator == ShutdownInitiator.Peer) + { + sawShutdown = true; + } + }); + }; + + var tasks = new List(); + var queues = new ConcurrentBag(); + + NotSupportedException nse = null; + for (int i = 0; i < 256; i++) + { + async Task f() + { + try + { + // sleep for a random amount of time to increase the chances + // of thread interleaving. MK. + await Task.Delay(S_Random.Next(5, 50)); + QueueDeclareOk r = await _channel.QueueDeclareAsync(queue: string.Empty, false, false, false); + string queueName = r.QueueName; + await _channel.QueueBindAsync(queue: queueName, exchange: "amq.fanout", routingKey: queueName); + queues.Add(queueName); + } + catch (NotSupportedException e) + { + nse = e; + } + } + var t = Task.Run(f); + tasks.Add(t); + } + + await AssertRanToCompletion(tasks); + Assert.Null(nse); + tasks.Clear(); + + nse = null; + foreach (string q in queues) + { + async Task f() + { + string qname = q; + try + { + await Task.Delay(S_Random.Next(5, 50)); + + QueueDeclareOk r = await _channel.QueueDeclarePassiveAsync(qname); + Assert.Equal(qname, r.QueueName); + + await _channel.QueueUnbindAsync(queue: qname, exchange: "amq.fanout", routingKey: qname, null); + + uint deletedMessageCount = await _channel.QueueDeleteAsync(qname, false, false); + Assert.Equal((uint)0, deletedMessageCount); + } + catch (NotSupportedException e) + { + nse = e; + } + } + var t = Task.Run(f); + tasks.Add(t); + } + + await AssertRanToCompletion(tasks); + Assert.Null(nse); + Assert.False(sawShutdown); + } + [Fact] public async Task TestConcurrentQueueDeclare() { diff --git a/projects/Test/SequentialIntegration/SequentialIntegrationFixture.cs b/projects/Test/SequentialIntegration/SequentialIntegrationFixture.cs index 960ac0d6ad..734ea638fe 100644 --- a/projects/Test/SequentialIntegration/SequentialIntegrationFixture.cs +++ b/projects/Test/SequentialIntegration/SequentialIntegrationFixture.cs @@ -36,7 +36,7 @@ namespace Test.SequentialIntegration { - public class SequentialIntegrationFixture : IntegrationFixtureBase + public class SequentialIntegrationFixture : TestConnectionRecoveryBase { public SequentialIntegrationFixture(ITestOutputHelper output) : base(output) { @@ -73,6 +73,20 @@ public Task StartRabbitMqAsync() return _rabbitMQCtl.ExecRabbitMQCtlAsync("start_app"); } + public Task RestartServerAndWaitForRecoveryAsync() + { + return RestartServerAndWaitForRecoveryAsync(_conn); + } + + public async Task RestartServerAndWaitForRecoveryAsync(IConnection conn) + { + TaskCompletionSource sl = PrepareForShutdown(conn); + TaskCompletionSource rl = PrepareForRecovery(conn); + await RestartRabbitMqAsync(); + await WaitAsync(sl, "connection shutdown"); + await WaitAsync(rl, "connection recovery"); + } + private Task AwaitRabbitMqAsync() { return _rabbitMQCtl.ExecRabbitMQCtlAsync("await_startup"); diff --git a/projects/Test/SequentialIntegration/TestConnectionRecovery.cs b/projects/Test/SequentialIntegration/TestConnectionRecovery.cs index 2deed73598..bc60318b80 100644 --- a/projects/Test/SequentialIntegration/TestConnectionRecovery.cs +++ b/projects/Test/SequentialIntegration/TestConnectionRecovery.cs @@ -30,21 +30,17 @@ //--------------------------------------------------------------------------- using System; -using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client; -using RabbitMQ.Client.Events; -using RabbitMQ.Client.Exceptions; using RabbitMQ.Client.Framing.Impl; -using RabbitMQ.Client.Impl; using Xunit; using Xunit.Abstractions; using QueueDeclareOk = RabbitMQ.Client.QueueDeclareOk; namespace Test.SequentialIntegration { - public class TestConnectionRecovery : TestConnectionRecoveryBase + public class TestConnectionRecovery : SequentialIntegrationFixture { private readonly string _queueName; @@ -70,136 +66,6 @@ public override async Task DisposeAsync() await base.DisposeAsync(); } - [Fact] - public async Task TestBasicAckAfterChannelRecovery() - { - var allMessagesSeenTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - var cons = new AckingBasicConsumer(_channel, _totalMessageCount, allMessagesSeenTcs); - - QueueDeclareOk q = await _channel.QueueDeclareAsync(_queueName, false, false, false); - string queueName = q.QueueName; - Assert.Equal(queueName, _queueName); - - await _channel.BasicQosAsync(0, 1, false); - await _channel.BasicConsumeAsync(queueName, false, cons); - - TaskCompletionSource sl = PrepareForShutdown(_conn); - TaskCompletionSource rl = PrepareForRecovery(_conn); - - await PublishMessagesWhileClosingConnAsync(queueName); - - await WaitAsync(sl, "connection shutdown"); - await WaitAsync(rl, "connection recovery"); - await WaitAsync(allMessagesSeenTcs, "all messages seen"); - } - - [Fact] - public async Task TestBasicNackAfterChannelRecovery() - { - var allMessagesSeenTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - var cons = new NackingBasicConsumer(_channel, _totalMessageCount, allMessagesSeenTcs); - - QueueDeclareOk q = await _channel.QueueDeclareAsync(_queueName, false, false, false); - string queueName = q.QueueName; - Assert.Equal(queueName, _queueName); - - await _channel.BasicQosAsync(0, 1, false); - await _channel.BasicConsumeAsync(queueName, false, cons); - - TaskCompletionSource sl = PrepareForShutdown(_conn); - TaskCompletionSource rl = PrepareForRecovery(_conn); - - await PublishMessagesWhileClosingConnAsync(queueName); - - await WaitAsync(sl, "connection shutdown"); - await WaitAsync(rl, "connection recovery"); - await WaitAsync(allMessagesSeenTcs, "all messages seen"); - } - - [Fact] - public async Task TestBasicRejectAfterChannelRecovery() - { - var allMessagesSeenTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - var cons = new RejectingBasicConsumer(_channel, _totalMessageCount, allMessagesSeenTcs); - - string queueName = (await _channel.QueueDeclareAsync(_queueName, false, false, false)).QueueName; - Assert.Equal(queueName, _queueName); - - await _channel.BasicQosAsync(0, 1, false); - await _channel.BasicConsumeAsync(queueName, false, cons); - - TaskCompletionSource sl = PrepareForShutdown(_conn); - TaskCompletionSource rl = PrepareForRecovery(_conn); - - await PublishMessagesWhileClosingConnAsync(queueName); - - await WaitAsync(sl, "connection shutdown"); - await WaitAsync(rl, "connection recovery"); - await WaitAsync(allMessagesSeenTcs, "all messages seen"); - } - - [Fact] - public async Task TestBasicAckAfterBasicGetAndChannelRecovery() - { - string q = GenerateQueueName(); - await _channel.QueueDeclareAsync(q, false, false, false); - // create an offset - await _channel.BasicPublishAsync("", q, _messageBody); - await Task.Delay(50); - BasicGetResult g = await _channel.BasicGetAsync(q, false); - await CloseAndWaitForRecoveryAsync(); - Assert.True(_conn.IsOpen); - Assert.True(_channel.IsOpen); - // ack the message after recovery - this should be out of range and ignored - await _channel.BasicAckAsync(g.DeliveryTag, false); - // do a sync operation to 'check' there is no channel exception - await _channel.BasicGetAsync(q, false); - } - - [Fact] - public async Task TestBasicAckEventHandlerRecovery() - { - await _channel.ConfirmSelectAsync(); - var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - ((AutorecoveringChannel)_channel).BasicAcks += (m, args) => tcs.SetResult(true); - ((AutorecoveringChannel)_channel).BasicNacks += (m, args) => tcs.SetResult(true); - - await CloseAndWaitForRecoveryAsync(); - await CloseAndWaitForRecoveryAsync(); - Assert.True(_channel.IsOpen); - - await WithTemporaryNonExclusiveQueueAsync(_channel, (ch, q) => - { - return ch.BasicPublishAsync("", q, _messageBody).AsTask(); - }); - - await WaitAsync(tcs, "basic acks/nacks"); - } - - [Fact] - public async Task TestBasicConnectionRecovery() - { - Assert.True(_conn.IsOpen); - await CloseAndWaitForRecoveryAsync(); - Assert.True(_conn.IsOpen); - } - - [Fact] - public async Task BasicConnectionRecoveryOnBrokerRestart() - { - Assert.True(_conn.IsOpen); - await RestartServerAndWaitForRecoveryAsync(); - Assert.True(_conn.IsOpen); - } - - [Fact] - public async Task TestBasicChannelRecovery() - { - Assert.True(_channel.IsOpen); - await CloseAndWaitForRecoveryAsync(); - Assert.True(_channel.IsOpen); - } - [Fact] public async Task TestBasicChannelRecoveryOnServerRestart() { @@ -282,29 +148,6 @@ public async Task TestBlockedListenersRecovery() } } - [Fact] - public Task TestClientNamedQueueRecovery() - { - string s = "dotnet-client.test.recovery.q1"; - return WithTemporaryNonExclusiveQueueAsync(_channel, async (m, q) => - { - await CloseAndWaitForRecoveryAsync(); - await AssertQueueRecoveryAsync(m, q, false); - await _channel.QueueDeleteAsync(q); - }, s); - } - - [Fact] - public Task TestClientNamedQueueRecoveryNoWait() - { - string s = "dotnet-client.test.recovery.q1-nowait"; - return WithTemporaryExclusiveQueueNoWaitAsync(_channel, async (ch, q) => - { - await CloseAndWaitForRecoveryAsync(); - await AssertExclusiveQueueRecoveryAsync(ch, q); - }, s); - } - [Fact] public Task TestClientNamedQueueRecoveryOnServerRestart() { @@ -317,164 +160,6 @@ public Task TestClientNamedQueueRecoveryOnServerRestart() }, s); } - [Fact] - public async Task TestConsumerRecoveryWithManyConsumers() - { - string q = (await _channel.QueueDeclareAsync(GenerateQueueName(), false, false, false)).QueueName; - int n = 1024; - - for (int i = 0; i < n; i++) - { - var cons = new EventingBasicConsumer(_channel); - await _channel.BasicConsumeAsync(q, true, cons); - } - - var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - ((AutorecoveringConnection)_conn).ConsumerTagChangeAfterRecovery += (prev, current) => tcs.SetResult(true); - - await CloseAndWaitForRecoveryAsync(); - await WaitAsync(tcs, "consumer tag change after recovery"); - Assert.True(_channel.IsOpen); - await AssertConsumerCountAsync(q, n); - } - - [Fact] - public async Task TestDeclarationOfManyAutoDeleteExchangesWithTransientExchangesThatAreDeleted() - { - AssertRecordedExchanges((AutorecoveringConnection)_conn, 0); - for (int i = 0; i < 3; i++) - { - string x1 = $"source-{Guid.NewGuid()}"; - await _channel.ExchangeDeclareAsync(x1, "fanout", false, true); - - string x2 = $"destination-{Guid.NewGuid()}"; - await _channel.ExchangeDeclareAsync(x2, "fanout", false, false); - - await _channel.ExchangeBindAsync(x2, x1, ""); - await _channel.ExchangeDeleteAsync(x2); - } - AssertRecordedExchanges((AutorecoveringConnection)_conn, 0); - } - - [Fact] - public async Task TestDeclarationOfManyAutoDeleteExchangesWithTransientExchangesThatAreUnbound() - { - AssertRecordedExchanges((AutorecoveringConnection)_conn, 0); - for (int i = 0; i < 1000; i++) - { - string x1 = $"source-{Guid.NewGuid()}"; - await _channel.ExchangeDeclareAsync(x1, "fanout", false, true); - string x2 = $"destination-{Guid.NewGuid()}"; - await _channel.ExchangeDeclareAsync(x2, "fanout", false, false); - await _channel.ExchangeBindAsync(x2, x1, ""); - await _channel.ExchangeUnbindAsync(x2, x1, ""); - await _channel.ExchangeDeleteAsync(x2); - } - AssertRecordedExchanges((AutorecoveringConnection)_conn, 0); - } - - [Fact] - public async Task TestDeclarationOfManyAutoDeleteExchangesWithTransientQueuesThatAreDeleted() - { - AssertRecordedExchanges((AutorecoveringConnection)_conn, 0); - for (int i = 0; i < 1000; i++) - { - string x = Guid.NewGuid().ToString(); - await _channel.ExchangeDeclareAsync(x, "fanout", false, true); - RabbitMQ.Client.QueueDeclareOk q = await _channel.QueueDeclareAsync(); - await _channel.QueueBindAsync(q, x, ""); - await _channel.QueueDeleteAsync(q); - } - AssertRecordedExchanges((AutorecoveringConnection)_conn, 0); - } - - [Fact] - public async Task TestDeclarationOfManyAutoDeleteExchangesWithTransientQueuesThatAreUnbound() - { - AssertRecordedExchanges((AutorecoveringConnection)_conn, 0); - for (int i = 0; i < 1000; i++) - { - string x = Guid.NewGuid().ToString(); - await _channel.ExchangeDeclareAsync(x, "fanout", false, true); - RabbitMQ.Client.QueueDeclareOk q = await _channel.QueueDeclareAsync(); - await _channel.QueueBindAsync(q, x, ""); - await _channel.QueueUnbindAsync(q, x, "", null); - } - AssertRecordedExchanges((AutorecoveringConnection)_conn, 0); - } - - [Fact] - public async Task TestDeclarationOfManyAutoDeleteQueuesWithTransientConsumer() - { - AssertRecordedQueues((AutorecoveringConnection)_conn, 0); - for (int i = 0; i < 1000; i++) - { - string q = Guid.NewGuid().ToString(); - await _channel.QueueDeclareAsync(q, false, false, true); - var dummy = new EventingBasicConsumer(_channel); - string tag = await _channel.BasicConsumeAsync(q, true, dummy); - await _channel.BasicCancelAsync(tag); - } - AssertRecordedQueues((AutorecoveringConnection)_conn, 0); - } - - [Fact] - public async Task TestExchangeRecovery() - { - string x = "dotnet-client.test.recovery.x1"; - await DeclareNonDurableExchangeAsync(_channel, x); - await CloseAndWaitForRecoveryAsync(); - await AssertExchangeRecoveryAsync(_channel, x); - await _channel.ExchangeDeleteAsync(x); - } - - [Fact] - public async Task TestExchangeToExchangeBindingRecovery() - { - string q = (await _channel.QueueDeclareAsync("", false, false, false)).QueueName; - string x1 = "amq.fanout"; - string x2 = GenerateExchangeName(); - - await _channel.ExchangeDeclareAsync(x2, "fanout"); - await _channel.ExchangeBindAsync(x1, x2, ""); - await _channel.QueueBindAsync(q, x1, ""); - - try - { - await CloseAndWaitForRecoveryAsync(); - Assert.True(_channel.IsOpen); - await _channel.BasicPublishAsync(x2, "", _encoding.GetBytes("msg")); - await AssertMessageCountAsync(q, 1); - } - finally - { - await WithTemporaryChannelAsync(async ch => - { - await ch.ExchangeDeleteAsync(x2); - await ch.QueueDeleteAsync(q); - }); - } - } - - [Fact] - public async Task TestQueueRecoveryWithManyQueues() - { - var qs = new List(); - int n = 1024; - for (int i = 0; i < n; i++) - { - QueueDeclareOk q = await _channel.QueueDeclareAsync(GenerateQueueName(), false, false, false); - qs.Add(q.QueueName); - } - await CloseAndWaitForRecoveryAsync(); - Assert.True(_channel.IsOpen); - foreach (string q in qs) - { - await AssertQueueRecoveryAsync(_channel, q, false); - await _channel.QueueDeleteAsync(q); - } - } - // rabbitmq/rabbitmq-dotnet-client#43 [Fact] public async Task TestClientNamedTransientAutoDeleteQueueAndBindingRecovery() @@ -549,123 +234,6 @@ public async Task TestServerNamedTransientAutoDeleteQueueAndBindingRecovery() await _channel.ExchangeDeleteAsync(x); } - [Fact] - public async Task TestRecoveryEventHandlersOnConnection() - { - int counter = 0; - ((AutorecoveringConnection)_conn).RecoverySucceeded += (source, ea) => Interlocked.Increment(ref counter); - - await CloseAndWaitForRecoveryAsync(); - await CloseAndWaitForRecoveryAsync(); - await CloseAndWaitForRecoveryAsync(); - await CloseAndWaitForRecoveryAsync(); - Assert.True(_conn.IsOpen); - Assert.True(counter >= 3); - } - - [Fact] - public async Task TestRecoveryEventHandlersOnChannel() - { - int counter = 0; - ((AutorecoveringChannel)_channel).Recovery += (source, ea) => Interlocked.Increment(ref counter); - - await CloseAndWaitForRecoveryAsync(); - await CloseAndWaitForRecoveryAsync(); - await CloseAndWaitForRecoveryAsync(); - await CloseAndWaitForRecoveryAsync(); - Assert.True(_channel.IsOpen); - Assert.True(counter >= 3); - } - - [Theory] - [InlineData(1)] - [InlineData(3)] - public async Task TestRecoveringConsumerHandlerOnConnection(int iterations) - { - string q = (await _channel.QueueDeclareAsync(GenerateQueueName(), false, false, false)).QueueName; - var cons = new EventingBasicConsumer(_channel); - await _channel.BasicConsumeAsync(q, true, cons); - - int counter = 0; - ((AutorecoveringConnection)_conn).RecoveringConsumer += (sender, args) => Interlocked.Increment(ref counter); - - for (int i = 0; i < iterations; i++) - { - await CloseAndWaitForRecoveryAsync(); - } - - Assert.Equal(iterations, counter); - } - - [Fact] - public async Task TestRecoveringConsumerHandlerOnConnection_EventArgumentsArePassedDown() - { - var myArgs = new Dictionary { { "first-argument", "some-value" } }; - string q = (await _channel.QueueDeclareAsync(GenerateQueueName(), false, false, false)).QueueName; - var cons = new EventingBasicConsumer(_channel); - string expectedCTag = await _channel.BasicConsumeAsync(cons, q, arguments: myArgs); - - bool ctagMatches = false; - bool consumerArgumentMatches = false; - ((AutorecoveringConnection)_conn).RecoveringConsumer += (sender, args) => - { - // We cannot assert here because NUnit throws when an assertion fails. This exception is caught and - // passed to a CallbackExceptionHandler, instead of failing the test. Instead, we have to do this trick - // and assert in the test function. - ctagMatches = args.ConsumerTag == expectedCTag; - consumerArgumentMatches = (string)args.ConsumerArguments["first-argument"] == "some-value"; - args.ConsumerArguments["first-argument"] = "event-handler-set-this-value"; - }; - - await CloseAndWaitForRecoveryAsync(); - Assert.True(ctagMatches, "expected consumer tag to match"); - Assert.True(consumerArgumentMatches, "expected consumer arguments to match"); - string actualVal = (string)Assert.Contains("first-argument", myArgs as IDictionary); - Assert.Equal("event-handler-set-this-value", actualVal); - } - - [Fact] - public async Task TestServerNamedQueueRecovery() - { - string q = (await _channel.QueueDeclareAsync("", false, false, false)).QueueName; - string x = "amq.fanout"; - await _channel.QueueBindAsync(q, x, ""); - - string nameBefore = q; - string nameAfter = null; - - var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - var connection = (AutorecoveringConnection)_conn; - connection.RecoverySucceeded += (source, ea) => tcs.SetResult(true); - connection.QueueNameChangedAfterRecovery += (source, ea) => { nameAfter = ea.NameAfter; }; - - await CloseAndWaitForRecoveryAsync(); - await WaitAsync(tcs, "recovery succeeded"); - - Assert.NotNull(nameAfter); - Assert.StartsWith("amq.", nameBefore); - Assert.StartsWith("amq.", nameAfter); - Assert.NotEqual(nameBefore, nameAfter); - - await _channel.QueueDeclarePassiveAsync(nameAfter); - } - - [Fact] - public async Task TestShutdownEventHandlersRecoveryOnConnection() - { - int counter = 0; - _conn.ConnectionShutdown += (c, args) => Interlocked.Increment(ref counter); - - Assert.True(_conn.IsOpen); - await CloseAndWaitForRecoveryAsync(); - await CloseAndWaitForRecoveryAsync(); - await CloseAndWaitForRecoveryAsync(); - await CloseAndWaitForRecoveryAsync(); - Assert.True(_conn.IsOpen); - - Assert.True(counter >= 3); - } - [Fact] public async Task TestShutdownEventHandlersRecoveryOnConnectionAfterDelayedServerRestart() { @@ -692,224 +260,6 @@ public async Task TestShutdownEventHandlersRecoveryOnConnectionAfterDelayedServe Assert.True(counter >= 1); } - [Fact] - public async Task TestShutdownEventHandlersRecoveryOnChannel() - { - int counter = 0; - _channel.ChannelShutdown += (c, args) => Interlocked.Increment(ref counter); - - Assert.True(_channel.IsOpen); - await CloseAndWaitForRecoveryAsync(); - await CloseAndWaitForRecoveryAsync(); - await CloseAndWaitForRecoveryAsync(); - await CloseAndWaitForRecoveryAsync(); - Assert.True(_channel.IsOpen); - - Assert.True(counter >= 3); - } - - [Fact] - public async Task TestRecoverTopologyOnDisposedChannel() - { - string x = GenerateExchangeName(); - string q = GenerateQueueName(); - const string rk = "routing-key"; - - using (IChannel ch = await _conn.CreateChannelAsync()) - { - await ch.ExchangeDeclareAsync(exchange: x, type: "fanout"); - await ch.QueueDeclareAsync(q, false, false, false); - await ch.QueueBindAsync(q, x, rk); - await ch.CloseAsync(); - } - - var cons = new EventingBasicConsumer(_channel); - await _channel.BasicConsumeAsync(q, true, cons); - await AssertConsumerCountAsync(_channel, q, 1); - - await CloseAndWaitForRecoveryAsync(); - await AssertConsumerCountAsync(_channel, q, 1); - - var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - cons.Received += (s, args) => tcs.SetResult(true); - - await _channel.BasicPublishAsync("", q, _messageBody); - await WaitAsync(tcs, "received event"); - - await _channel.QueueUnbindAsync(q, x, rk); - await _channel.ExchangeDeleteAsync(x); - await _channel.QueueDeleteAsync(q); - } - - [Fact] - public async Task TestPublishRpcRightAfterReconnect() - { - string testQueueName = $"dotnet-client.test.{nameof(TestPublishRpcRightAfterReconnect)}"; - await _channel.QueueDeclareAsync(testQueueName, false, false, false); - var replyConsumer = new EventingBasicConsumer(_channel); - await _channel.BasicConsumeAsync("amq.rabbitmq.reply-to", true, replyConsumer); - var properties = new BasicProperties(); - properties.ReplyTo = "amq.rabbitmq.reply-to"; - - TimeSpan doneSpan = TimeSpan.FromMilliseconds(100); - var doneTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - Task closeTask = Task.Run(async () => - { - try - { - - await CloseAndWaitForRecoveryAsync(); - } - finally - { - doneTcs.SetResult(true); - } - }); - - while (false == doneTcs.Task.IsCompletedSuccessfully()) - { - try - { - await _channel.BasicPublishAsync(string.Empty, testQueueName, properties, _messageBody); - } - catch (Exception e) - { - if (e is AlreadyClosedException a) - { - // 406 is received, when the reply consumer isn't yet recovered - Assert.NotEqual(406, a.ShutdownReason.ReplyCode); - } - } - - try - { - await doneTcs.Task.WaitAsync(doneSpan); - } - catch (TimeoutException) - { - } - } - - await closeTask; - } - - [Fact] - public async Task TestThatCancelledConsumerDoesNotReappearOnRecovery() - { - string q = (await _channel.QueueDeclareAsync(GenerateQueueName(), false, false, false)).QueueName; - int n = 1024; - - for (int i = 0; i < n; i++) - { - var cons = new EventingBasicConsumer(_channel); - string tag = await _channel.BasicConsumeAsync(q, true, cons); - await _channel.BasicCancelAsync(tag); - } - await CloseAndWaitForRecoveryAsync(); - Assert.True(_channel.IsOpen); - await AssertConsumerCountAsync(q, 0); - } - - [Fact] - public async Task TestThatDeletedExchangeBindingsDontReappearOnRecovery() - { - string q = (await _channel.QueueDeclareAsync("", false, false, false)).QueueName; - string x1 = "amq.fanout"; - string x2 = GenerateExchangeName(); - - await _channel.ExchangeDeclareAsync(x2, "fanout"); - await _channel.ExchangeBindAsync(x1, x2, ""); - await _channel.QueueBindAsync(q, x1, ""); - await _channel.ExchangeUnbindAsync(x1, x2, ""); - - try - { - await CloseAndWaitForRecoveryAsync(); - Assert.True(_channel.IsOpen); - await _channel.BasicPublishAsync(x2, "", _encoding.GetBytes("msg")); - await AssertMessageCountAsync(q, 0); - } - finally - { - await WithTemporaryChannelAsync(async ch => - { - await ch.ExchangeDeleteAsync(x2); - await ch.QueueDeleteAsync(q); - }); - } - } - - [Fact] - public async Task TestThatDeletedExchangesDontReappearOnRecovery() - { - string x = GenerateExchangeName(); - await _channel.ExchangeDeclareAsync(x, "fanout"); - await _channel.ExchangeDeleteAsync(x); - - try - { - await CloseAndWaitForRecoveryAsync(); - Assert.True(_channel.IsOpen); - await _channel.ExchangeDeclarePassiveAsync(x); - Assert.Fail("Expected an exception"); - } - catch (OperationInterruptedException e) - { - // expected - AssertShutdownError(e.ShutdownReason, 404); - } - } - - [Fact] - public async Task TestThatDeletedQueueBindingsDontReappearOnRecovery() - { - string q = (await _channel.QueueDeclareAsync("", false, false, false)).QueueName; - string x1 = "amq.fanout"; - string x2 = GenerateExchangeName(); - - await _channel.ExchangeDeclareAsync(x2, "fanout"); - await _channel.ExchangeBindAsync(x1, x2, ""); - await _channel.QueueBindAsync(q, x1, ""); - await _channel.QueueUnbindAsync(q, x1, "", null); - - try - { - await CloseAndWaitForRecoveryAsync(); - Assert.True(_channel.IsOpen); - await _channel.BasicPublishAsync(x2, "", _encoding.GetBytes("msg")); - await AssertMessageCountAsync(q, 0); - } - finally - { - await WithTemporaryChannelAsync(async ch => - { - await ch.ExchangeDeleteAsync(x2); - await ch.QueueDeleteAsync(q); - }); - } - } - - [Fact] - public async Task TestThatDeletedQueuesDontReappearOnRecovery() - { - string q = "dotnet-client.recovery.q1"; - await _channel.QueueDeclareAsync(q, false, false, false); - await _channel.QueueDeleteAsync(q); - - try - { - await CloseAndWaitForRecoveryAsync(); - Assert.True(_channel.IsOpen); - await _channel.QueueDeclarePassiveAsync(q); - Assert.Fail("Expected an exception"); - } - catch (OperationInterruptedException e) - { - // expected - AssertShutdownError(e.ShutdownReason, 404); - } - } - [Fact] public async Task TestUnblockedListenersRecovery() { @@ -921,68 +271,5 @@ public async Task TestUnblockedListenersRecovery() await UnblockAsync(); await WaitAsync(tcs, "connection unblocked"); } - - [Fact] - public async Task TestBindingRecovery_GH1035() - { - const string routingKey = "unused"; - byte[] body = GetRandomBody(); - - var receivedMessageSemaphore = new SemaphoreSlim(0, 1); - - void MessageReceived(object sender, BasicDeliverEventArgs e) - { - receivedMessageSemaphore.Release(); - } - - string exchangeName = $"ex-gh-1035-{Guid.NewGuid()}"; - string queueName = $"q-gh-1035-{Guid.NewGuid()}"; - - await _channel.ExchangeDeclareAsync(exchange: exchangeName, - type: "fanout", durable: false, autoDelete: true, - arguments: null); - - QueueDeclareOk q0 = await _channel.QueueDeclareAsync(queue: queueName, exclusive: true); - Assert.Equal(queueName, q0); - - await _channel.QueueBindAsync(queue: queueName, exchange: exchangeName, routingKey: routingKey); - - await _channel.CloseAsync(); - _channel.Dispose(); - _channel = null; - - _channel = await _conn.CreateChannelAsync(); - - await _channel.ExchangeDeclareAsync(exchange: exchangeName, - type: "fanout", durable: false, autoDelete: true, - arguments: null); - - QueueDeclareOk q1 = await _channel.QueueDeclareAsync(queue: queueName, exclusive: true); - Assert.Equal(queueName, q1.QueueName); - - await _channel.QueueBindAsync(queue: queueName, exchange: exchangeName, routingKey: routingKey); - - var c = new EventingBasicConsumer(_channel); - c.Received += MessageReceived; - await _channel.BasicConsumeAsync(queue: queueName, autoAck: true, consumer: c); - - using (IChannel pubCh = await _conn.CreateChannelAsync()) - { - await pubCh.BasicPublishAsync(exchange: exchangeName, routingKey: routingKey, body: body); - await pubCh.CloseAsync(); - } - - Assert.True(await receivedMessageSemaphore.WaitAsync(WaitSpan)); - - await CloseAndWaitForRecoveryAsync(); - - using (IChannel pubCh = await _conn.CreateChannelAsync()) - { - await pubCh.BasicPublishAsync(exchange: exchangeName, routingKey: "unused", body: body); - await pubCh.CloseAsync(); - } - - Assert.True(await receivedMessageSemaphore.WaitAsync(WaitSpan)); - } } }