diff --git a/playground/Playground.Microservice.Api.Host/Outbox/MassTransitOutboxRelay.cs b/playground/Playground.Microservice.Api.Host/Outbox/MassTransitOutboxRelay.cs index fa96e57..1abc5b6 100644 --- a/playground/Playground.Microservice.Api.Host/Outbox/MassTransitOutboxRelay.cs +++ b/playground/Playground.Microservice.Api.Host/Outbox/MassTransitOutboxRelay.cs @@ -5,8 +5,8 @@ namespace Playground.Microservice.Api.Host.Outbox; public class MassTransitOutboxRelay(IBusControl busControl, IPublishEndpoint publishEndpoint) : IOutboxMessageHandler { - public async ValueTask IsReadyAsync() => - await busControl.WaitForHealthStatus(BusHealthStatus.Healthy, TimeSpan.FromSeconds(5)).ConfigureAwait(false) == + public async ValueTask IsReadyAsync(CancellationToken cancellationToken = default) => + await busControl.WaitForHealthStatus(BusHealthStatus.Healthy, cancellationToken).ConfigureAwait(false) == BusHealthStatus.Healthy; public async ValueTask HandleAsync(OutboxMessageHandlerContext context, diff --git a/src/ES.FX.TransactionalOutbox.EntityFrameworkCore/Delivery/IOutboxMessageHandler.cs b/src/ES.FX.TransactionalOutbox.EntityFrameworkCore/Delivery/IOutboxMessageHandler.cs index ebbbb30..21e1f2e 100644 --- a/src/ES.FX.TransactionalOutbox.EntityFrameworkCore/Delivery/IOutboxMessageHandler.cs +++ b/src/ES.FX.TransactionalOutbox.EntityFrameworkCore/Delivery/IOutboxMessageHandler.cs @@ -9,7 +9,7 @@ public interface IOutboxMessageHandler /// Returns whether the handler can deliver messages. This can be used to delay the delivery of messages until certain /// conditions are met (example: health checks) /// - public ValueTask IsReadyAsync() => ValueTask.FromResult(true); + public ValueTask IsReadyAsync(CancellationToken cancellationToken = default) => ValueTask.FromResult(true); /// diff --git a/src/ES.FX.TransactionalOutbox.EntityFrameworkCore/OutboxDeliveryService.cs b/src/ES.FX.TransactionalOutbox.EntityFrameworkCore/OutboxDeliveryService.cs index 6b3444b..7bb52b5 100644 --- a/src/ES.FX.TransactionalOutbox.EntityFrameworkCore/OutboxDeliveryService.cs +++ b/src/ES.FX.TransactionalOutbox.EntityFrameworkCore/OutboxDeliveryService.cs @@ -43,8 +43,21 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) var options = scope.ServiceProvider .GetRequiredService>>() .CurrentValue; + + using var handlerReadyTimeout = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, + new CancellationTokenSource(options.DeliveryTimeout).Token); var messageHandler = scope.ServiceProvider.GetRequiredService(); - if (!await messageHandler.IsReadyAsync()) + bool handlerReady; + try + { + handlerReady = await messageHandler.IsReadyAsync(handlerReadyTimeout.Token).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + handlerReady = false; + } + + if (!handlerReady) { logger.LogTrace("The message handler is not ready yet."); continue; @@ -61,17 +74,17 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) //Use an execution strategy to handle transient exceptions. This is required for SQL Server with retries enabled await dbContext.Database.CreateExecutionStrategy().ExecuteAsync(async () => { - using var timeout = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, + using var deliveryTimeout = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, new CancellationTokenSource(options.DeliveryTimeout).Token); await using var transaction = await dbContext.Database - .BeginTransactionAsync(options.TransactionIsolationLevel, timeout.Token) + .BeginTransactionAsync(options.TransactionIsolationLevel, deliveryTimeout.Token) .ConfigureAwait(false); try { var outbox = await options.OutboxProvider - .GetNextExclusiveOutboxWithoutDelay(dbContext, timeout.Token) + .GetNextExclusiveOutboxWithoutDelay(dbContext, deliveryTimeout.Token) .ConfigureAwait(false); if (outbox is null) @@ -87,7 +100,7 @@ await dbContext.Database.CreateExecutionStrategy().ExecuteAsync(async () => dbContext.Update((object)outbox); try { - await dbContext.SaveChangesAsync(timeout.Token).ConfigureAwait(false); + await dbContext.SaveChangesAsync(deliveryTimeout.Token).ConfigureAwait(false); } catch (DbUpdateConcurrencyException) { @@ -103,7 +116,7 @@ await dbContext.Database.CreateExecutionStrategy().ExecuteAsync(async () => .OrderBy(x => x.Id) .Take(options.BatchSize) .AsTracking() - .ToListAsync(timeout.Token) + .ToListAsync(deliveryTimeout.Token) .ConfigureAwait(false); @@ -137,7 +150,7 @@ await dbContext.Database.CreateExecutionStrategy().ExecuteAsync(async () => message.DeliveryAttempts++; - if (await DeliverMessage(message, messageHandler, timeout.Token) + if (await DeliverMessage(message, messageHandler, deliveryTimeout.Token) .ConfigureAwait(false)) { dbContext.Remove(message); @@ -197,8 +210,8 @@ await dbContext.Database.CreateExecutionStrategy().ExecuteAsync(async () => dbContext.Remove((object)outbox); } - await dbContext.SaveChangesAsync(timeout.Token).ConfigureAwait(false); - await transaction.CommitAsync(timeout.Token).ConfigureAwait(false); + await dbContext.SaveChangesAsync(deliveryTimeout.Token).ConfigureAwait(false); + await transaction.CommitAsync(deliveryTimeout.Token).ConfigureAwait(false); sleep = false; } catch (OperationCanceledException)