diff --git a/projects/RabbitMQ.Client/Impl/AsyncRpcContinuations.cs b/projects/RabbitMQ.Client/Impl/AsyncRpcContinuations.cs index 1e0068a48..93d565016 100644 --- a/projects/RabbitMQ.Client/Impl/AsyncRpcContinuations.cs +++ b/projects/RabbitMQ.Client/Impl/AsyncRpcContinuations.cs @@ -41,7 +41,12 @@ namespace RabbitMQ.Client.Impl { - internal abstract class AsyncRpcContinuation : IRpcContinuation + internal interface IAsyncRpcContinuation : IRpcContinuation + { + CancellationToken CancellationToken { get; } + } + + internal abstract class AsyncRpcContinuation : IAsyncRpcContinuation { private readonly CancellationTokenSource _continuationTimeoutCancellationTokenSource; private readonly CancellationTokenRegistration _continuationTimeoutCancellationTokenRegistration; diff --git a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs index dff4504d1..5be92a41d 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs @@ -126,43 +126,30 @@ private void ConfigurePublisherConfirmations(bool publisherConfirmationsEnabled, _outstandingPublisherConfirmationsRateLimiter = outstandingPublisherConfirmationsRateLimiter; } - private async Task MaybeConfirmSelect(CancellationToken cancellationToken) + private async Task MaybeConfirmSelect(RpcParentLease parentLease, + CancellationToken cancellationToken) { if (_publisherConfirmationsEnabled) { - // NOTE: _rpcSemaphore is held - bool enqueued = false; - var k = new ConfirmSelectAsyncRpcContinuation(ContinuationTimeout, cancellationToken); + using var lease = AcquireRpcChildLease(parentLease, new ConfirmSelectAsyncRpcContinuation(ContinuationTimeout, cancellationToken)); - try + if (_nextPublishSeqNo == 0UL) { - if (_nextPublishSeqNo == 0UL) + if (_publisherConfirmationTrackingEnabled) { - if (_publisherConfirmationTrackingEnabled) - { - _confirmsTaskCompletionSources.Clear(); - } - _nextPublishSeqNo = 1; + _confirmsTaskCompletionSources.Clear(); } + _nextPublishSeqNo = 1; + } - enqueued = Enqueue(k); - - var method = new ConfirmSelect(false); - await ModelSendAsync(in method, k.CancellationToken) - .ConfigureAwait(false); + var method = new ConfirmSelect(false); + await ModelSendAsync(in method, lease.CancellationToken) + .ConfigureAwait(false); - bool result = await k; - Debug.Assert(result); + bool result = await lease.Continuation; + Debug.Assert(result); - return; - } - finally - { - if (false == enqueued) - { - k.Dispose(); - } - } + return; } } diff --git a/projects/RabbitMQ.Client/Impl/Channel.cs b/projects/RabbitMQ.Client/Impl/Channel.cs index 64be4152e..fe81d2c89 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.cs @@ -203,26 +203,23 @@ public Task CloseAsync(ushort replyCode, string replyText, bool abort, public async Task CloseAsync(ShutdownEventArgs args, bool abort, CancellationToken cancellationToken) { - bool enqueued = false; - var k = new ChannelCloseAsyncRpcContinuation(ContinuationTimeout, cancellationToken); - - await _rpcSemaphore.WaitAsync(k.CancellationToken) + using var lease = await AcquireRpcLeaseAsync(new ChannelCloseAsyncRpcContinuation(ContinuationTimeout, cancellationToken)) .ConfigureAwait(false); + try { - ChannelShutdownAsync += k.OnConnectionShutdownAsync; - enqueued = Enqueue(k); + ChannelShutdownAsync += lease.Continuation.OnConnectionShutdownAsync; ConsumerDispatcher.Quiesce(); if (SetCloseReason(args)) { var method = new ChannelClose( args.ReplyCode, args.ReplyText, args.ClassId, args.MethodId); - await ModelSendAsync(in method, k.CancellationToken) + await ModelSendAsync(in method, lease.CancellationToken) .ConfigureAwait(false); } - bool result = await k; + bool result = await lease.Continuation; Debug.Assert(result); await ConsumerDispatcher.WaitForShutdownAsync() @@ -251,12 +248,7 @@ await ConsumerDispatcher.WaitForShutdownAsync() } finally { - if (false == enqueued) - { - k.Dispose(); - } - _rpcSemaphore.Release(); - ChannelShutdownAsync -= k.OnConnectionShutdownAsync; + ChannelShutdownAsync -= lease.Continuation.OnConnectionShutdownAsync; } } @@ -272,38 +264,22 @@ internal async ValueTask ConnectionOpenAsync(string virtualHost, CancellationTok internal async ValueTask ConnectionSecureOkAsync(byte[] response, CancellationToken cancellationToken) { - bool enqueued = false; - var k = new ConnectionSecureOrTuneAsyncRpcContinuation(ContinuationTimeout, cancellationToken); - - await _rpcSemaphore.WaitAsync(k.CancellationToken) + using var lease = await AcquireRpcLeaseAsync(new ConnectionSecureOrTuneAsyncRpcContinuation(ContinuationTimeout, cancellationToken)) .ConfigureAwait(false); try { - enqueued = Enqueue(k); - - try - { - var method = new ConnectionSecureOk(response); - await ModelSendAsync(in method, k.CancellationToken) - .ConfigureAwait(false); - } - catch (AlreadyClosedException) - { - // let continuation throw OperationInterruptedException, - // which is a much more suitable exception before connection - // negotiation finishes - } - - return await k; + var method = new ConnectionSecureOk(response); + await ModelSendAsync(in method, lease.CancellationToken) + .ConfigureAwait(false); } - finally + catch (AlreadyClosedException) { - if (false == enqueued) - { - k.Dispose(); - } - _rpcSemaphore.Release(); + // let continuation throw OperationInterruptedException, + // which is a much more suitable exception before connection + // negotiation finishes } + + return await lease.Continuation; } internal async ValueTask ConnectionStartOkAsync( @@ -359,40 +335,111 @@ protected bool Enqueue(IRpcContinuation k) } } - internal async Task OpenAsync(CreateChannelOptions createChannelOptions, - CancellationToken cancellationToken) + private RpcChildLease AcquireRpcChildLease(TParent parent, TContinuation k) + where TParent : struct + where TContinuation : IAsyncRpcContinuation { - ConfigurePublisherConfirmations(createChannelOptions.PublisherConfirmationsEnabled, - createChannelOptions.PublisherConfirmationTrackingEnabled, - createChannelOptions.OutstandingPublisherConfirmationsRateLimiter); - - bool enqueued = false; - var k = new ChannelOpenAsyncRpcContinuation(ContinuationTimeout, cancellationToken); + if (IsOpen) + { + _continuationQueue.Enqueue(k); + return new RpcChildLease(parent, k, false); + } + else + { + k.HandleChannelShutdown(CloseReason); + return new RpcChildLease(parent, k, true); + } + } + private async ValueTask> AcquireRpcLeaseAsync(TContinuation k) + where TContinuation : IAsyncRpcContinuation + { await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); - try + if (IsOpen) { - enqueued = Enqueue(k); + _continuationQueue.Enqueue(k); + return new RpcParentLease(_rpcSemaphore, k, false); + } + else + { + k.HandleChannelShutdown(CloseReason); + return new RpcParentLease(_rpcSemaphore, k, true); + } + } - var method = new ChannelOpen(); - await ModelSendAsync(in method, k.CancellationToken) - .ConfigureAwait(false); + readonly struct RpcChildLease : IDisposable + where TParent : struct + where TContinuation : IAsyncRpcContinuation + { - bool result = await k; - Debug.Assert(result); + private readonly bool _ownsContinuation; - await MaybeConfirmSelect(cancellationToken) - .ConfigureAwait(false); + public RpcChildLease(TParent parent, TContinuation continuation, bool ownsContinuation) + { + Continuation = continuation; + _ownsContinuation = ownsContinuation; } - finally + + public TContinuation Continuation { get; } + + public CancellationToken CancellationToken => Continuation.CancellationToken; + + public void Dispose() { - if (false == enqueued) + if (_ownsContinuation) { - k.Dispose(); + Continuation.Dispose(); } - _rpcSemaphore.Release(); } + } + + readonly struct RpcParentLease : IDisposable + where TContinuation : IAsyncRpcContinuation + { + + private readonly bool _ownsContinuation; + private readonly SemaphoreSlim _semaphore; + + public RpcParentLease(SemaphoreSlim semaphore, TContinuation continuation, bool ownsContinuation) + { + _semaphore = semaphore; + Continuation = continuation; + _ownsContinuation = ownsContinuation; + } + + public TContinuation Continuation { get; } + + public CancellationToken CancellationToken => Continuation.CancellationToken; + + public void Dispose() + { + if (_ownsContinuation) + { + Continuation.Dispose(); + } + _semaphore.Release(); + } + } + + internal async Task OpenAsync(CreateChannelOptions createChannelOptions, + CancellationToken cancellationToken) + { + ConfigurePublisherConfirmations(createChannelOptions.PublisherConfirmationsEnabled, + createChannelOptions.PublisherConfirmationTrackingEnabled, + createChannelOptions.OutstandingPublisherConfirmationsRateLimiter); + + using var lease = await AcquireRpcLeaseAsync(new ChannelOpenAsyncRpcContinuation(ContinuationTimeout, cancellationToken)) + .ConfigureAwait(false); + var method = new ChannelOpen(); + await ModelSendAsync(in method, lease.CancellationToken) + .ConfigureAwait(false); + + bool result = await lease.Continuation; + Debug.Assert(result); + + await MaybeConfirmSelect(lease, cancellationToken) + .ConfigureAwait(false); return this; } @@ -422,11 +469,9 @@ private async Task HandleCommandAsync(IncomingCommand cmd, CancellationToken can if (false == await DispatchCommandAsync(cmd, cancellationToken) .ConfigureAwait(false)) { - using (IRpcContinuation c = _continuationQueue.Next()) - { - await c.HandleCommandAsync(cmd) - .ConfigureAwait(false); - } + using IRpcContinuation c = _continuationQueue.Next(); + await c.HandleCommandAsync(cmd) + .ConfigureAwait(false); } } finally