Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spike RPC acquiring #1743

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion projects/RabbitMQ.Client/Impl/AsyncRpcContinuations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@

namespace RabbitMQ.Client.Impl
{
internal abstract class AsyncRpcContinuation<T> : IRpcContinuation
internal interface IAsyncRpcContinuation : IRpcContinuation
{
CancellationToken CancellationToken { get; }
}

internal abstract class AsyncRpcContinuation<T> : IAsyncRpcContinuation
{
private readonly CancellationTokenSource _continuationTimeoutCancellationTokenSource;
private readonly CancellationTokenRegistration _continuationTimeoutCancellationTokenRegistration;
Expand Down
41 changes: 14 additions & 27 deletions projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs
Original file line number Diff line number Diff line change
Expand Up @@ -126,43 +126,30 @@ private void ConfigurePublisherConfirmations(bool publisherConfirmationsEnabled,
_outstandingPublisherConfirmationsRateLimiter = outstandingPublisherConfirmationsRateLimiter;
}

private async Task MaybeConfirmSelect(CancellationToken cancellationToken)
private async Task MaybeConfirmSelect(RpcParentLease<ChannelOpenAsyncRpcContinuation> parentLease,
CancellationToken cancellationToken)
{
if (_publisherConfirmationsEnabled)
{
// NOTE: _rpcSemaphore is held
bool enqueued = false;
var k = new ConfirmSelectAsyncRpcContinuation(ContinuationTimeout, cancellationToken);
using var lease = AcquireRpcChildLease(parentLease, new ConfirmSelectAsyncRpcContinuation(ContinuationTimeout, cancellationToken));

try
if (_nextPublishSeqNo == 0UL)
{
if (_nextPublishSeqNo == 0UL)
if (_publisherConfirmationTrackingEnabled)
{
if (_publisherConfirmationTrackingEnabled)
{
_confirmsTaskCompletionSources.Clear();
}
_nextPublishSeqNo = 1;
_confirmsTaskCompletionSources.Clear();
}
_nextPublishSeqNo = 1;
}

enqueued = Enqueue(k);

var method = new ConfirmSelect(false);
await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);
var method = new ConfirmSelect(false);
await ModelSendAsync(in method, lease.CancellationToken)
.ConfigureAwait(false);

bool result = await k;
Debug.Assert(result);
bool result = await lease.Continuation;
Debug.Assert(result);

return;
}
finally
{
if (false == enqueued)
{
k.Dispose();
}
}
return;
}
}

Expand Down
177 changes: 111 additions & 66 deletions projects/RabbitMQ.Client/Impl/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -203,26 +203,23 @@ public Task CloseAsync(ushort replyCode, string replyText, bool abort,
public async Task CloseAsync(ShutdownEventArgs args, bool abort,
CancellationToken cancellationToken)
{
bool enqueued = false;
var k = new ChannelCloseAsyncRpcContinuation(ContinuationTimeout, cancellationToken);

await _rpcSemaphore.WaitAsync(k.CancellationToken)
using var lease = await AcquireRpcLeaseAsync(new ChannelCloseAsyncRpcContinuation(ContinuationTimeout, cancellationToken))
.ConfigureAwait(false);

try
{
ChannelShutdownAsync += k.OnConnectionShutdownAsync;
enqueued = Enqueue(k);
ChannelShutdownAsync += lease.Continuation.OnConnectionShutdownAsync;
ConsumerDispatcher.Quiesce();

if (SetCloseReason(args))
{
var method = new ChannelClose(
args.ReplyCode, args.ReplyText, args.ClassId, args.MethodId);
await ModelSendAsync(in method, k.CancellationToken)
await ModelSendAsync(in method, lease.CancellationToken)
.ConfigureAwait(false);
}

bool result = await k;
bool result = await lease.Continuation;
Debug.Assert(result);

await ConsumerDispatcher.WaitForShutdownAsync()
Expand Down Expand Up @@ -251,12 +248,7 @@ await ConsumerDispatcher.WaitForShutdownAsync()
}
finally
{
if (false == enqueued)
{
k.Dispose();
}
_rpcSemaphore.Release();
ChannelShutdownAsync -= k.OnConnectionShutdownAsync;
ChannelShutdownAsync -= lease.Continuation.OnConnectionShutdownAsync;
}
}

Expand All @@ -272,38 +264,22 @@ internal async ValueTask ConnectionOpenAsync(string virtualHost, CancellationTok
internal async ValueTask<ConnectionSecureOrTune> ConnectionSecureOkAsync(byte[] response,
CancellationToken cancellationToken)
{
bool enqueued = false;
var k = new ConnectionSecureOrTuneAsyncRpcContinuation(ContinuationTimeout, cancellationToken);

await _rpcSemaphore.WaitAsync(k.CancellationToken)
using var lease = await AcquireRpcLeaseAsync(new ConnectionSecureOrTuneAsyncRpcContinuation(ContinuationTimeout, cancellationToken))
.ConfigureAwait(false);
try
{
enqueued = Enqueue(k);

try
{
var method = new ConnectionSecureOk(response);
await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);
}
catch (AlreadyClosedException)
{
// let continuation throw OperationInterruptedException,
// which is a much more suitable exception before connection
// negotiation finishes
}

return await k;
var method = new ConnectionSecureOk(response);
await ModelSendAsync(in method, lease.CancellationToken)
.ConfigureAwait(false);
}
finally
catch (AlreadyClosedException)
{
if (false == enqueued)
{
k.Dispose();
}
_rpcSemaphore.Release();
// let continuation throw OperationInterruptedException,
// which is a much more suitable exception before connection
// negotiation finishes
}

return await lease.Continuation;
}

internal async ValueTask<ConnectionSecureOrTune> ConnectionStartOkAsync(
Expand Down Expand Up @@ -359,40 +335,111 @@ protected bool Enqueue(IRpcContinuation k)
}
}

internal async Task<IChannel> OpenAsync(CreateChannelOptions createChannelOptions,
CancellationToken cancellationToken)
private RpcChildLease<TParent, TContinuation> AcquireRpcChildLease<TParent, TContinuation>(TParent parent, TContinuation k)
where TParent : struct
where TContinuation : IAsyncRpcContinuation
{
ConfigurePublisherConfirmations(createChannelOptions.PublisherConfirmationsEnabled,
createChannelOptions.PublisherConfirmationTrackingEnabled,
createChannelOptions.OutstandingPublisherConfirmationsRateLimiter);

bool enqueued = false;
var k = new ChannelOpenAsyncRpcContinuation(ContinuationTimeout, cancellationToken);
if (IsOpen)
{
_continuationQueue.Enqueue(k);
return new RpcChildLease<TParent, TContinuation>(parent, k, false);
}
else
{
k.HandleChannelShutdown(CloseReason);
return new RpcChildLease<TParent, TContinuation>(parent, k, true);
}
}

private async ValueTask<RpcParentLease<TContinuation>> AcquireRpcLeaseAsync<TContinuation>(TContinuation k)
where TContinuation : IAsyncRpcContinuation
{
await _rpcSemaphore.WaitAsync(k.CancellationToken)
.ConfigureAwait(false);
try
if (IsOpen)
{
enqueued = Enqueue(k);
_continuationQueue.Enqueue(k);
return new RpcParentLease<TContinuation>(_rpcSemaphore, k, false);
}
else
{
k.HandleChannelShutdown(CloseReason);
return new RpcParentLease<TContinuation>(_rpcSemaphore, k, true);
}
}

var method = new ChannelOpen();
await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);
readonly struct RpcChildLease<TParent, TContinuation> : IDisposable
where TParent : struct
where TContinuation : IAsyncRpcContinuation
{

bool result = await k;
Debug.Assert(result);
private readonly bool _ownsContinuation;

await MaybeConfirmSelect(cancellationToken)
.ConfigureAwait(false);
public RpcChildLease(TParent parent, TContinuation continuation, bool ownsContinuation)
{
Continuation = continuation;
_ownsContinuation = ownsContinuation;
}
finally

public TContinuation Continuation { get; }

public CancellationToken CancellationToken => Continuation.CancellationToken;

public void Dispose()
{
if (false == enqueued)
if (_ownsContinuation)
{
k.Dispose();
Continuation.Dispose();
}
_rpcSemaphore.Release();
}
}

readonly struct RpcParentLease<TContinuation> : IDisposable
where TContinuation : IAsyncRpcContinuation
{

private readonly bool _ownsContinuation;
private readonly SemaphoreSlim _semaphore;

public RpcParentLease(SemaphoreSlim semaphore, TContinuation continuation, bool ownsContinuation)
{
_semaphore = semaphore;
Continuation = continuation;
_ownsContinuation = ownsContinuation;
}

public TContinuation Continuation { get; }

public CancellationToken CancellationToken => Continuation.CancellationToken;

public void Dispose()
{
if (_ownsContinuation)
{
Continuation.Dispose();
}
_semaphore.Release();
}
}

internal async Task<IChannel> OpenAsync(CreateChannelOptions createChannelOptions,
CancellationToken cancellationToken)
{
ConfigurePublisherConfirmations(createChannelOptions.PublisherConfirmationsEnabled,
createChannelOptions.PublisherConfirmationTrackingEnabled,
createChannelOptions.OutstandingPublisherConfirmationsRateLimiter);

using var lease = await AcquireRpcLeaseAsync(new ChannelOpenAsyncRpcContinuation(ContinuationTimeout, cancellationToken))
.ConfigureAwait(false);
var method = new ChannelOpen();
await ModelSendAsync(in method, lease.CancellationToken)
.ConfigureAwait(false);

bool result = await lease.Continuation;
Debug.Assert(result);

await MaybeConfirmSelect(lease, cancellationToken)
.ConfigureAwait(false);

return this;
}
Expand Down Expand Up @@ -422,11 +469,9 @@ private async Task HandleCommandAsync(IncomingCommand cmd, CancellationToken can
if (false == await DispatchCommandAsync(cmd, cancellationToken)
.ConfigureAwait(false))
{
using (IRpcContinuation c = _continuationQueue.Next())
{
await c.HandleCommandAsync(cmd)
.ConfigureAwait(false);
}
using IRpcContinuation c = _continuationQueue.Next();
await c.HandleCommandAsync(cmd)
.ConfigureAwait(false);
}
}
finally
Expand Down
Loading