Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure broker-originated channel closure completes #1752

Merged
merged 1 commit into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 31 additions & 7 deletions projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -48,6 +49,7 @@ internal sealed class AutorecoveringChannel : IChannel, IRecoverable
private AutorecoveringConnection _connection;
private RecoveryAwareChannel _innerChannel;
private bool _disposed;
private int _isDisposing;

private ushort _prefetchCountConsumer;
private ushort _prefetchCountGlobal;
Expand Down Expand Up @@ -256,19 +258,25 @@ public override string ToString()

public async ValueTask DisposeAsync()
{
if (_disposed)
if (IsDisposing)
{
return;
}

if (IsOpen)
try
{
await this.AbortAsync()
.ConfigureAwait(false);
}
if (IsOpen)
{
await this.AbortAsync()
.ConfigureAwait(false);
}

_recordedConsumerTags.Clear();
_disposed = true;
_recordedConsumerTags.Clear();
}
finally
{
_disposed = true;
}
}

public ValueTask<ulong> GetNextPublishSequenceNumberAsync(CancellationToken cancellationToken = default) => InnerChannel.GetNextPublishSequenceNumberAsync(cancellationToken);
Expand Down Expand Up @@ -482,7 +490,23 @@ private void ThrowIfDisposed()
ThrowDisposed();
}

return;

[DoesNotReturn]
static void ThrowDisposed() => throw new ObjectDisposedException(typeof(AutorecoveringChannel).FullName);
}

private bool IsDisposing
{
get
{
if (Interlocked.Exchange(ref _isDisposing, 1) != 0)
{
return true;
}

return false;
}
}
}
}
29 changes: 24 additions & 5 deletions projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -50,6 +51,7 @@ internal sealed partial class AutorecoveringConnection : IConnection

private Connection _innerConnection;
private bool _disposed;
private int _isDisposing;

private Connection InnerConnection
{
Expand Down Expand Up @@ -272,7 +274,7 @@ await RecordChannelAsync(autorecoveringChannel, channelsSemaphoreHeld: false, ca

public async ValueTask DisposeAsync()
{
if (_disposed)
if (IsDisposing)
{
return;
}
Expand All @@ -281,17 +283,18 @@ public async ValueTask DisposeAsync()
{
await _innerConnection.DisposeAsync()
.ConfigureAwait(false);

_channels.Clear();
_recordedEntitiesSemaphore.Dispose();
_channelsSemaphore.Dispose();
_recoveryCancellationTokenSource.Dispose();
}
catch (OperationInterruptedException)
{
// ignored, see rabbitmq/rabbitmq-dotnet-client#133
}
finally
{
_channels.Clear();
_recordedEntitiesSemaphore.Dispose();
_channelsSemaphore.Dispose();
_recoveryCancellationTokenSource.Dispose();
_disposed = true;
}
}
Expand All @@ -307,7 +310,23 @@ private void ThrowIfDisposed()
ThrowDisposed();
}

return;

[DoesNotReturn]
static void ThrowDisposed() => throw new ObjectDisposedException(typeof(AutorecoveringConnection).FullName);
}

private bool IsDisposing
{
get
{
if (Interlocked.Exchange(ref _isDisposing, 1) != 0)
{
return true;
}

return false;
}
}
}
}
132 changes: 103 additions & 29 deletions projects/RabbitMQ.Client/Impl/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,13 @@ internal partial class Channel : IChannel, IRecoverable
private ShutdownEventArgs? _closeReason;
public ShutdownEventArgs? CloseReason => Volatile.Read(ref _closeReason);

private TaskCompletionSource<bool>? _serverOriginatedChannelCloseTcs;

internal readonly IConsumerDispatcher ConsumerDispatcher;

private bool _disposed;
private int _isDisposing;

public Channel(ISession session, CreateChannelOptions createChannelOptions)
{
ContinuationTimeout = createChannelOptions.ContinuationTimeout;
Expand Down Expand Up @@ -514,22 +519,41 @@ public override string ToString()

void IDisposable.Dispose()
{
if (_disposed)
{
return;
}

Dispose(true);
}

protected virtual void Dispose(bool disposing)
{
if (IsDisposing)
{
return;
}

if (disposing)
{
if (IsOpen)
try
{
this.AbortAsync().GetAwaiter().GetResult();
}
if (IsOpen)
{
this.AbortAsync().GetAwaiter().GetResult();
}

ConsumerDispatcher.Dispose();
_rpcSemaphore.Dispose();
_confirmSemaphore.Dispose();
_outstandingPublisherConfirmationsRateLimiter?.Dispose();
_serverOriginatedChannelCloseTcs?.Task.Wait(TimeSpan.FromSeconds(5));

ConsumerDispatcher.Dispose();
_rpcSemaphore.Dispose();
_confirmSemaphore.Dispose();
_outstandingPublisherConfirmationsRateLimiter?.Dispose();
}
finally
{
_disposed = true;
}
}
}

Expand All @@ -543,18 +567,37 @@ await DisposeAsyncCore()

protected virtual async ValueTask DisposeAsyncCore()
{
if (IsOpen)
if (IsDisposing)
{
await this.AbortAsync().ConfigureAwait(false);
return;
}

ConsumerDispatcher.Dispose();
_rpcSemaphore.Dispose();
_confirmSemaphore.Dispose();
if (_outstandingPublisherConfirmationsRateLimiter is not null)
try
{
await _outstandingPublisherConfirmationsRateLimiter.DisposeAsync()
.ConfigureAwait(false);
if (IsOpen)
{
await this.AbortAsync().ConfigureAwait(false);
}

if (_serverOriginatedChannelCloseTcs is not null)
{
await _serverOriginatedChannelCloseTcs.Task.WaitAsync(TimeSpan.FromSeconds(5))
.ConfigureAwait(false);
}

ConsumerDispatcher.Dispose();
_rpcSemaphore.Dispose();
_confirmSemaphore.Dispose();

if (_outstandingPublisherConfirmationsRateLimiter is not null)
{
await _outstandingPublisherConfirmationsRateLimiter.DisposeAsync()
.ConfigureAwait(false);
}
}
finally
{
_disposed = true;
}
}

Expand Down Expand Up @@ -651,23 +694,41 @@ protected virtual ulong AdjustDeliveryTag(ulong deliveryTag)

protected async Task<bool> HandleChannelCloseAsync(IncomingCommand cmd, CancellationToken cancellationToken)
{
var channelClose = new ChannelClose(cmd.MethodSpan);
SetCloseReason(new ShutdownEventArgs(ShutdownInitiator.Peer,
channelClose._replyCode,
channelClose._replyText,
channelClose._classId,
channelClose._methodId));
TaskCompletionSource<bool>? serverOriginatedChannelCloseTcs = _serverOriginatedChannelCloseTcs;
if (serverOriginatedChannelCloseTcs is null)
{
// Attempt to assign the new TCS only if _tcs is still null
_ = Interlocked.CompareExchange(ref _serverOriginatedChannelCloseTcs,
new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously), null);
}

await Session.CloseAsync(_closeReason, notify: false)
.ConfigureAwait(false);
try
{
var channelClose = new ChannelClose(cmd.MethodSpan);
SetCloseReason(new ShutdownEventArgs(ShutdownInitiator.Peer,
channelClose._replyCode,
channelClose._replyText,
channelClose._classId,
channelClose._methodId));

var method = new ChannelCloseOk();
await ModelSendAsync(in method, cancellationToken)
.ConfigureAwait(false);
await Session.CloseAsync(_closeReason, notify: false)
.ConfigureAwait(false);

await Session.NotifyAsync(cancellationToken)
.ConfigureAwait(false);
return true;
var method = new ChannelCloseOk();
await ModelSendAsync(in method, cancellationToken)
.ConfigureAwait(false);

await Session.NotifyAsync(cancellationToken)
.ConfigureAwait(false);

_serverOriginatedChannelCloseTcs?.TrySetResult(true);
return true;
}
catch (Exception ex)
{
_serverOriginatedChannelCloseTcs?.TrySetException(ex);
throw;
}
}

protected async Task<bool> HandleChannelCloseOkAsync(IncomingCommand cmd, CancellationToken cancellationToken)
Expand Down Expand Up @@ -1587,5 +1648,18 @@ private Task<bool> DispatchCommandAsync(IncomingCommand cmd, CancellationToken c
}
}
}

private bool IsDisposing
{
get
{
if (Interlocked.Exchange(ref _isDisposing, 1) != 0)
{
return true;
}

return false;
}
}
}
}
28 changes: 22 additions & 6 deletions projects/RabbitMQ.Client/Impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;
Expand All @@ -46,6 +47,7 @@ namespace RabbitMQ.Client.Framing
internal sealed partial class Connection : IConnection
{
private bool _disposed;
private int _isDisposing;
private volatile bool _closed;

private readonly ConnectionConfig _config;
Expand Down Expand Up @@ -489,7 +491,7 @@ internal ValueTask WriteAsync(RentedMemory frames, CancellationToken cancellatio

public async ValueTask DisposeAsync()
{
if (_disposed)
if (IsDisposing)
{
return;
}
Expand Down Expand Up @@ -523,23 +525,37 @@ private void ThrowIfDisposed()
{
if (_disposed)
{
ThrowObjectDisposedException();
ThrowDisposed();
}

static void ThrowObjectDisposedException()
{
throw new ObjectDisposedException(typeof(Connection).FullName);
}
return;

[DoesNotReturn]
static void ThrowDisposed() => throw new ObjectDisposedException(typeof(Connection).FullName);
}

public override string ToString()
{
return $"Connection({_id},{Endpoint})";
}

[DoesNotReturn]
private static void ThrowAlreadyClosedException(ShutdownEventArgs closeReason)
{
throw new AlreadyClosedException(closeReason);
}

private bool IsDisposing
{
get
{
if (Interlocked.Exchange(ref _isDisposing, 1) != 0)
{
return true;
}

return false;
}
}
}
}
Loading
Loading