Skip to content

Commit

Permalink
feat(outbox): add cancellation token to IsReadyAsync method
Browse files Browse the repository at this point in the history
- Added CancellationToken parameter to the IsReadyAsync method in both MassTransitOutboxRelay and IOutboxMessageHandler classes.
- Updated OutboxDeliveryService class to handle OperationCanceledException when checking if message handler is ready.
- Replaced timeout with deliveryTimeout in various methods within OutboxDeliveryService for better clarity.
  • Loading branch information
winromulus committed Sep 13, 2024
1 parent 6586483 commit 5d5e59a
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ namespace Playground.Microservice.Api.Host.Outbox;

public class MassTransitOutboxRelay(IBusControl busControl, IPublishEndpoint publishEndpoint) : IOutboxMessageHandler
{
public async ValueTask<bool> IsReadyAsync() =>
await busControl.WaitForHealthStatus(BusHealthStatus.Healthy, TimeSpan.FromSeconds(5)).ConfigureAwait(false) ==
public async ValueTask<bool> IsReadyAsync(CancellationToken cancellationToken = default) =>
await busControl.WaitForHealthStatus(BusHealthStatus.Healthy, cancellationToken).ConfigureAwait(false) ==
BusHealthStatus.Healthy;

public async ValueTask<bool> HandleAsync(OutboxMessageHandlerContext context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public interface IOutboxMessageHandler
/// Returns whether the handler can deliver messages. This can be used to delay the delivery of messages until certain
/// conditions are met (example: health checks)
/// </summary>
public ValueTask<bool> IsReadyAsync() => ValueTask.FromResult(true);
public ValueTask<bool> IsReadyAsync(CancellationToken cancellationToken = default) => ValueTask.FromResult(true);


/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,21 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
var options = scope.ServiceProvider
.GetRequiredService<IOptionsMonitor<OutboxDeliveryOptions<TDbContext>>>()
.CurrentValue;

using var handlerReadyTimeout = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken,
new CancellationTokenSource(options.DeliveryTimeout).Token);
var messageHandler = scope.ServiceProvider.GetRequiredService<IOutboxMessageHandler>();
if (!await messageHandler.IsReadyAsync())
bool handlerReady;
try
{
handlerReady = await messageHandler.IsReadyAsync(handlerReadyTimeout.Token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
handlerReady = false;
}

if (!handlerReady)
{
logger.LogTrace("The message handler is not ready yet.");
continue;
Expand All @@ -61,17 +74,17 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
//Use an execution strategy to handle transient exceptions. This is required for SQL Server with retries enabled
await dbContext.Database.CreateExecutionStrategy().ExecuteAsync(async () =>
{
using var timeout = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken,
using var deliveryTimeout = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken,
new CancellationTokenSource(options.DeliveryTimeout).Token);

await using var transaction = await dbContext.Database
.BeginTransactionAsync(options.TransactionIsolationLevel, timeout.Token)
.BeginTransactionAsync(options.TransactionIsolationLevel, deliveryTimeout.Token)
.ConfigureAwait(false);

try
{
var outbox = await options.OutboxProvider
.GetNextExclusiveOutboxWithoutDelay(dbContext, timeout.Token)
.GetNextExclusiveOutboxWithoutDelay(dbContext, deliveryTimeout.Token)
.ConfigureAwait(false);

if (outbox is null)
Expand All @@ -87,7 +100,7 @@ await dbContext.Database.CreateExecutionStrategy().ExecuteAsync(async () =>
dbContext.Update((object)outbox);
try
{
await dbContext.SaveChangesAsync(timeout.Token).ConfigureAwait(false);
await dbContext.SaveChangesAsync(deliveryTimeout.Token).ConfigureAwait(false);
}
catch (DbUpdateConcurrencyException)
{
Expand All @@ -103,7 +116,7 @@ await dbContext.Database.CreateExecutionStrategy().ExecuteAsync(async () =>
.OrderBy(x => x.Id)
.Take(options.BatchSize)
.AsTracking()
.ToListAsync(timeout.Token)
.ToListAsync(deliveryTimeout.Token)
.ConfigureAwait(false);


Expand Down Expand Up @@ -137,7 +150,7 @@ await dbContext.Database.CreateExecutionStrategy().ExecuteAsync(async () =>
message.DeliveryAttempts++;


if (await DeliverMessage(message, messageHandler, timeout.Token)
if (await DeliverMessage(message, messageHandler, deliveryTimeout.Token)
.ConfigureAwait(false))
{
dbContext.Remove(message);
Expand Down Expand Up @@ -197,8 +210,8 @@ await dbContext.Database.CreateExecutionStrategy().ExecuteAsync(async () =>
dbContext.Remove((object)outbox);
}

await dbContext.SaveChangesAsync(timeout.Token).ConfigureAwait(false);
await transaction.CommitAsync(timeout.Token).ConfigureAwait(false);
await dbContext.SaveChangesAsync(deliveryTimeout.Token).ConfigureAwait(false);
await transaction.CommitAsync(deliveryTimeout.Token).ConfigureAwait(false);
sleep = false;
}
catch (OperationCanceledException)
Expand Down

0 comments on commit 5d5e59a

Please sign in to comment.