diff --git a/playground/Playground.Microservice.Api.Host/HostedServices/TestHostedService.cs b/playground/Playground.Microservice.Api.Host/HostedServices/TestHostedService.cs index d4c1f41..dc29c9f 100644 --- a/playground/Playground.Microservice.Api.Host/HostedServices/TestHostedService.cs +++ b/playground/Playground.Microservice.Api.Host/HostedServices/TestHostedService.cs @@ -21,9 +21,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) var factory = serviceProvider.GetRequiredService>(); while (true) { - await using var dbContext = await factory.CreateDbContextAsync(stoppingToken); - - + await using var dbContext = await factory.CreateDbContextAsync(stoppingToken).ConfigureAwait(false); for (var i = 0; i < 50; i++) { dbContext.AddOutboxMessage(new OutboxTestMessage("Property"), new OutboxMessageOptions @@ -36,8 +34,9 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) } await dbContext.SaveChangesAsync(stoppingToken).ConfigureAwait(false); + await Task.Delay(10_000, stoppingToken).ConfigureAwait(false); + await Task.CompletedTask; - await Task.Delay(3_000, stoppingToken).ConfigureAwait(false); } } } \ No newline at end of file diff --git a/playground/Playground.Microservice.Api.Host/Playground.Microservice.Api.Host.csproj b/playground/Playground.Microservice.Api.Host/Playground.Microservice.Api.Host.csproj index 4388652..83351a3 100644 --- a/playground/Playground.Microservice.Api.Host/Playground.Microservice.Api.Host.csproj +++ b/playground/Playground.Microservice.Api.Host/Playground.Microservice.Api.Host.csproj @@ -18,33 +18,26 @@ - + + - + - - + + - + - - - + + + \ No newline at end of file diff --git a/playground/Playground.Microservice.Api.Host/Program.cs b/playground/Playground.Microservice.Api.Host/Program.cs index 89d6d41..11ea69b 100644 --- a/playground/Playground.Microservice.Api.Host/Program.cs +++ b/playground/Playground.Microservice.Api.Host/Program.cs @@ -1,4 +1,6 @@ +using Asp.Versioning; using ES.FX.Hosting.Lifetime; +using ES.FX.Ignite.Asp.Versioning.Hosting; using ES.FX.Ignite.Azure.Data.Tables.Hosting; using ES.FX.Ignite.Azure.Storage.Blobs.Hosting; using ES.FX.Ignite.Azure.Storage.Queues.Hosting; @@ -12,6 +14,7 @@ using ES.FX.Ignite.OpenTelemetry.Exporter.Seq.Hosting; using ES.FX.Ignite.Serilog.Hosting; using ES.FX.Ignite.StackExchange.Redis.Hosting; +using ES.FX.NSwag.AspNetCore.Generation; using ES.FX.Serilog.Lifetime; using ES.FX.TransactionalOutbox.EntityFrameworkCore; using ES.FX.TransactionalOutbox.EntityFrameworkCore.SqlServer; @@ -22,6 +25,7 @@ using Playground.Microservice.Api.Host.Testing; using Playground.Shared.Data.Simple.EntityFrameworkCore; using Playground.Shared.Data.Simple.EntityFrameworkCore.SqlServer; +using SharpGrip.FluentValidation.AutoValidation.Endpoints.Extensions; return await ProgramEntry.CreateBuilder(args).UseSerilog().Build().RunAsync(async _ => { @@ -34,19 +38,46 @@ builder.Ignite(settings => { settings.HealthChecks.ResponseWriter = UIResponseWriter.WriteHealthCheckUIResponse; - settings.OpenTelemetry.AspNetCoreTracingHealthChecksRequestsFiltered = true; + //settings.OpenTelemetry.AspNetCoreTracingHealthChecksRequestsFiltered = true; }); + //Add Seq + builder.IgniteSeqOpenTelemetryExporter(); + //Fluent Validation builder.IgniteFluentValidation(); - // Add health checks UI - //builder.IgniteHealthChecksUi(); - - //Migrations service builder.IgniteMigrationsService(); + #region Ignite API Versioning + + var apiVersions = new[] + { + new ApiVersion(1) + }.Order().ToArray(); + builder.Services.AddSingleton>(apiVersions); + + builder.IgniteApiVersioning(options => + options.DefaultApiVersion = apiVersions.Order().Last()); + + var openApiDocs = new List<(string Name, ApiVersion Version)> { (Name: "latest", Version: apiVersions.Last()) } + .Concat(apiVersions.OrderDescending().Select(v => (Name: $"v{v}", Version: v))).ToList(); + openApiDocs.ForEach(doc => builder.Services.AddOpenApiDocument((settings, provider) => + { + settings.Title = provider.GetRequiredService().ApplicationName; + settings.DocumentName = doc.Name; + settings.Version = $"v{doc.Version}"; + settings.ApiGroupNames = [settings.Version]; + settings.SchemaSettings.SchemaNameGenerator = new TypeToStringSchemaNameGenerator(); + settings.SchemaSettings.GenerateEnumMappingDescription = true; + })); + + #endregion + + // Add health checks UI + //builder.IgniteHealthChecksUi(); + //SqlServerDbContext builder.IgniteSqlServerDbContextFactory( @@ -59,28 +90,18 @@ sqlServerDbContextOptionsBuilder.MigrationsAssembly( typeof(SimpleDbContextDesignTimeFactory).Assembly.FullName); }); - //DbContext Migrations builder.AddDbContextMigrationsTask(); - - //Sql Server Client builder.IgniteSqlServerClientFactory(nameof(SimpleDbContext)); - - //Add Seq - builder.IgniteSeqOpenTelemetryExporter(); - // Add Storage builder.IgniteAzureBlobServiceClient(); builder.IgniteAzureQueueServiceClient(); builder.IgniteAzureTableServiceClient(); - // Add Redis builder.IgniteRedisClient(); - builder.Services.AddOpenApiDocument(); - builder.Services.AddHostedService(); builder.Services.AddScoped, TestValidator>(); @@ -88,15 +109,35 @@ builder.Services.AddOutboxMessageType(); - builder.Services.AddOutboxDeliveryService(); + builder.Services.AddOpenTelemetry().WithTracing(traceBuilder => + traceBuilder.AddTransactionalOutboxInstrumentation()); + var app = builder.Build(); app.Ignite(); app.IgniteNSwag(); + var root = app + .MapGroup("v{version:apiVersion}") + .AddFluentValidationAutoValidation() + .WithApiVersionSet(app.NewApiVersionSet() + .ReportApiVersions() + .Build()); + + root.MapGet("test", (IServiceProvider serviceProvider) => + { + var dbContext = serviceProvider.GetRequiredService(); + //using var tx = dbContext.Database.BeginTransaction(); + dbContext.AddOutboxMessage(new OutboxTestMessage("test")); + dbContext.SaveChanges(); + //Task.Delay(5000).Wait(); + //tx.Commit(); + return Results.Ok(); + }); + //app.IgniteHealthChecksUi(); await app.RunAsync(); diff --git a/playground/Playground.Microservice.Api.Host/Testing/OutboxMessageHandler.cs b/playground/Playground.Microservice.Api.Host/Testing/OutboxMessageHandler.cs index 9cb7ed5..e9a638a 100644 --- a/playground/Playground.Microservice.Api.Host/Testing/OutboxMessageHandler.cs +++ b/playground/Playground.Microservice.Api.Host/Testing/OutboxMessageHandler.cs @@ -9,6 +9,8 @@ public class OutboxMessageHandler : IOutboxMessageHandler public async ValueTask HandleAsync(OutboxMessageHandlerContext context, CancellationToken cancellationToken = default) { + //await Task.Delay(Random.Shared.Next(10,300), cancellationToken); + await Task.CompletedTask; diff --git a/src/ES.FX.TransactionalOutbox.EntityFrameworkCore/Delivery/IOutboxMessageHandler.cs b/src/ES.FX.TransactionalOutbox.EntityFrameworkCore/Delivery/IOutboxMessageHandler.cs index ee5d87a..ebbbb30 100644 --- a/src/ES.FX.TransactionalOutbox.EntityFrameworkCore/Delivery/IOutboxMessageHandler.cs +++ b/src/ES.FX.TransactionalOutbox.EntityFrameworkCore/Delivery/IOutboxMessageHandler.cs @@ -9,7 +9,7 @@ public interface IOutboxMessageHandler /// Returns whether the handler can deliver messages. This can be used to delay the delivery of messages until certain /// conditions are met (example: health checks) /// - public ValueTask IsReadyAsync(); + public ValueTask IsReadyAsync() => ValueTask.FromResult(true); /// diff --git a/src/ES.FX.TransactionalOutbox.EntityFrameworkCore/Delivery/OutboxDbContextSaveChangesInterceptor.cs b/src/ES.FX.TransactionalOutbox.EntityFrameworkCore/Delivery/OutboxDbContextInterceptor.cs similarity index 50% rename from src/ES.FX.TransactionalOutbox.EntityFrameworkCore/Delivery/OutboxDbContextSaveChangesInterceptor.cs rename to src/ES.FX.TransactionalOutbox.EntityFrameworkCore/Delivery/OutboxDbContextInterceptor.cs index faba19a..d06ee88 100644 --- a/src/ES.FX.TransactionalOutbox.EntityFrameworkCore/Delivery/OutboxDbContextSaveChangesInterceptor.cs +++ b/src/ES.FX.TransactionalOutbox.EntityFrameworkCore/Delivery/OutboxDbContextInterceptor.cs @@ -1,4 +1,6 @@ using System.Collections.Concurrent; +using System.Data.Common; +using System.Runtime.CompilerServices; using ES.FX.TransactionalOutbox.EntityFrameworkCore.Entities; using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Diagnostics; @@ -9,59 +11,86 @@ namespace ES.FX.TransactionalOutbox.EntityFrameworkCore.Delivery; /// Interceptor used to handle the saving of and entities and /// signalling the delivery process /// -internal class OutboxDbContextSaveChangesInterceptor : SaveChangesInterceptor +internal class OutboxDbContextInterceptor : ISaveChangesInterceptor, IDbTransactionInterceptor { - private static readonly ConcurrentDictionary> DbContextDictionary = new(); + private static readonly ConcurrentDictionary> DbContextDictionary = new(); - public override InterceptionResult SavingChanges(DbContextEventData eventData, + public InterceptionResult SavingChanges(DbContextEventData eventData, InterceptionResult result) { OnOutboxSaving(eventData.Context); - return base.SavingChanges(eventData, result); + return result; } - public override ValueTask> SavingChangesAsync(DbContextEventData eventData, + public ValueTask> SavingChangesAsync(DbContextEventData eventData, InterceptionResult result, CancellationToken cancellationToken = default) { OnOutboxSaving(eventData.Context); - return base.SavingChangesAsync(eventData, result, cancellationToken); + return ValueTask.FromResult(result); } - public override ValueTask SavedChangesAsync(SaveChangesCompletedEventData eventData, int result, + public ValueTask SavedChangesAsync(SaveChangesCompletedEventData eventData, int result, CancellationToken cancellationToken = default) { OnOutboxSaved(eventData.Context); return ValueTask.FromResult(result); } - public override int SavedChanges(SaveChangesCompletedEventData eventData, int result) + public int SavedChanges(SaveChangesCompletedEventData eventData, int result) { OnOutboxSaved(eventData.Context); return result; } - public static void OnOutboxSaved(DbContext? context) + + + public InterceptionResult TransactionCommitting(DbTransaction transaction, TransactionEventData eventData, + InterceptionResult result) + { + OnOutboxSaving(eventData.Context); + return result; + } + + public ValueTask TransactionCommittingAsync(DbTransaction transaction, + TransactionEventData eventData, + InterceptionResult result, CancellationToken cancellationToken = new()) + { + OnOutboxSaving(eventData.Context); + return ValueTask.FromResult(result); + } + + public void TransactionCommitted(DbTransaction transaction, TransactionEndEventData eventData) + { + OnOutboxSaved(eventData.Context, true); + } + + + public Task TransactionCommittedAsync(DbTransaction transaction, TransactionEndEventData eventData, + CancellationToken cancellationToken = new()) + { + OnOutboxSaved(eventData.Context, true); + return Task.CompletedTask; + } + + + public static void OnOutboxSaved(DbContext? context, bool transactionCommitted = false, [CallerMemberName] string? callerMemberName = null) { if (context is null) return; var type = context.GetType(); + + // If the transaction is not committed and there is an active transaction, do not signal, wait for the transaction to commit + if (!transactionCommitted && context.Database.CurrentTransaction is not null) return; + DbContextDictionary.AddOrUpdate(context.GetType(), _ => [], (_, bag) => { - try - { - var matches = bag.RemoveAll(s => context.Equals(s.Target)); - if (matches > 0) OutboxDeliverySignal.GetChannel(type).Writer.TryWrite(type); - bag.RemoveAll(s => s is { IsAlive: true }); - return bag; - } - catch - { - // Ignored. This needs to complete successfully - } - - return []; + var matches = bag.RemoveAll(s => context.Equals(s?.Target)); + if (matches > 0) + OutboxDeliverySignal.GetChannel(type).Writer.TryWrite(callerMemberName ?? nameof(OnOutboxSaved)); + bag.RemoveAll(s => !s?.IsAlive ?? true); + return bag; }); } @@ -96,18 +125,9 @@ private static void OnOutboxSaving(DbContext? context) _ => [new WeakReference(context)], (_, bag) => { - try - { - bag.RemoveAll(s => !s.IsAlive); - bag.Add(new WeakReference(context)); - return bag; - } - catch - { - // Ignored. This needs to complete successfully - } - - return []; + bag.RemoveAll(s => !s?.IsAlive ?? true); + bag.Add(new WeakReference(context)); + return bag; }); } } \ No newline at end of file diff --git a/src/ES.FX.TransactionalOutbox.EntityFrameworkCore/Delivery/OutboxDbContextTransactionInterceptor.cs b/src/ES.FX.TransactionalOutbox.EntityFrameworkCore/Delivery/OutboxDbContextTransactionInterceptor.cs deleted file mode 100644 index 3350ee9..0000000 --- a/src/ES.FX.TransactionalOutbox.EntityFrameworkCore/Delivery/OutboxDbContextTransactionInterceptor.cs +++ /dev/null @@ -1,96 +0,0 @@ -using System.Collections.Concurrent; -using System.Data.Common; -using ES.FX.TransactionalOutbox.EntityFrameworkCore.Entities; -using Microsoft.EntityFrameworkCore; -using Microsoft.EntityFrameworkCore.Diagnostics; - -namespace ES.FX.TransactionalOutbox.EntityFrameworkCore.Delivery; - -/// -/// Interceptor used to signal the delivery process of the when -/// entities are added within a transaction -/// This interceptor is used to signal the delivery process when the transaction is committed -/// -internal class OutboxDbContextTransactionInterceptor : DbTransactionInterceptor -{ - private static readonly ConcurrentDictionary> DbContextDictionary = new(); - - public override InterceptionResult TransactionCommitting(DbTransaction transaction, TransactionEventData eventData, - InterceptionResult result) - { - OnOutboxSaving(eventData.Context); - return base.TransactionCommitting(transaction, eventData, result); - } - - public override async ValueTask TransactionCommittingAsync(DbTransaction transaction, - TransactionEventData eventData, - InterceptionResult result, CancellationToken cancellationToken = new()) - { - OnOutboxSaving(eventData.Context); - return await base.TransactionCommittingAsync(transaction, eventData, result, cancellationToken) - .ConfigureAwait(false); - } - - public override void TransactionCommitted(DbTransaction transaction, TransactionEndEventData eventData) - { - OnOutboxSaving(eventData.Context); - base.TransactionCommitted(transaction, eventData); - } - - - public override async Task TransactionCommittedAsync(DbTransaction transaction, TransactionEndEventData eventData, - CancellationToken cancellationToken = new()) - { - OnOutboxSaved(eventData.Context); - await base.TransactionCommittedAsync(transaction, eventData, cancellationToken).ConfigureAwait(false); - } - - - public static void OnOutboxSaved(DbContext? context) - { - if (context is null) return; - var type = context.GetType(); - DbContextDictionary.AddOrUpdate(context.GetType(), - _ => [], - (_, bag) => - { - try - { - var matches = bag.RemoveAll(s => context.Equals(s.Target)); - if (matches > 0) OutboxDeliverySignal.GetChannel(type).Writer.TryWrite(type); - bag.RemoveAll(s => s is { IsAlive: true }); - return bag; - } - catch - { - // Ignored. This needs to complete successfully - } - - return []; - }); - } - - private static void OnOutboxSaving(DbContext? context) - { - if (context == null) return; - if (!context.ChangeTracker.Entries().Any()) return; - - DbContextDictionary.AddOrUpdate(context.GetType(), - _ => [new WeakReference(context)], - (_, bag) => - { - try - { - bag.RemoveAll(s => !s.IsAlive); - bag.Add(new WeakReference(context)); - return bag; - } - catch - { - // Ignored. This needs to complete successfully - } - - return []; - }); - } -} \ No newline at end of file diff --git a/src/ES.FX.TransactionalOutbox.EntityFrameworkCore/Delivery/OutboxDeliverySignal.cs b/src/ES.FX.TransactionalOutbox.EntityFrameworkCore/Delivery/OutboxDeliverySignal.cs index e0e45ca..33ab180 100644 --- a/src/ES.FX.TransactionalOutbox.EntityFrameworkCore/Delivery/OutboxDeliverySignal.cs +++ b/src/ES.FX.TransactionalOutbox.EntityFrameworkCore/Delivery/OutboxDeliverySignal.cs @@ -8,21 +8,21 @@ namespace ES.FX.TransactionalOutbox.EntityFrameworkCore.Delivery; /// internal static class OutboxDeliverySignal { - private static readonly ConcurrentDictionary> Channels = new(); + private static readonly ConcurrentDictionary> Channels = new(); - public static Channel GetChannel(Type type) + public static Channel GetChannel(Type type) { - return Channels.GetOrAdd(type, _ => Channel.CreateUnbounded()); + return Channels.GetOrAdd(type, _ => Channel.CreateUnbounded()); } - public static Channel GetChannel() => GetChannel(typeof(TType)); + public static Channel GetChannel() => GetChannel(typeof(TType)); - public static Channel RenewChannel(Type type) + public static Channel RenewChannel(Type type) { - var channel = Channel.CreateUnbounded(); + var channel = Channel.CreateUnbounded(); return Channels.AddOrUpdate(type, _ => channel, (_, _) => channel); } - public static Channel RenewChannel() => RenewChannel(typeof(TType)); + public static Channel RenewChannel() => RenewChannel(typeof(TType)); } \ No newline at end of file diff --git a/src/ES.FX.TransactionalOutbox.EntityFrameworkCore/ES.FX.TransactionalOutbox.EntityFrameworkCore.csproj b/src/ES.FX.TransactionalOutbox.EntityFrameworkCore/ES.FX.TransactionalOutbox.EntityFrameworkCore.csproj index 75e8a76..49cd87f 100644 --- a/src/ES.FX.TransactionalOutbox.EntityFrameworkCore/ES.FX.TransactionalOutbox.EntityFrameworkCore.csproj +++ b/src/ES.FX.TransactionalOutbox.EntityFrameworkCore/ES.FX.TransactionalOutbox.EntityFrameworkCore.csproj @@ -15,7 +15,6 @@ - diff --git a/src/ES.FX.TransactionalOutbox.EntityFrameworkCore/OutboxDeliveryService.cs b/src/ES.FX.TransactionalOutbox.EntityFrameworkCore/OutboxDeliveryService.cs index 59d2899..94d9f85 100644 --- a/src/ES.FX.TransactionalOutbox.EntityFrameworkCore/OutboxDeliveryService.cs +++ b/src/ES.FX.TransactionalOutbox.EntityFrameworkCore/OutboxDeliveryService.cs @@ -290,14 +290,15 @@ private async Task Sleep(TimeSpan delay, CancellationToken cancellationToken) using var delayTokenCts = new CancellationTokenSource(delay); using var sleepCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, delayTokenCts.Token); + string source; try { - await OutboxDeliverySignal.GetChannel().Reader.ReadAsync(sleepCts.Token).ConfigureAwait(false); - logger.LogTrace("Sleep interrupted by signal"); + source = await OutboxDeliverySignal.GetChannel().Reader.ReadAsync(sleepCts.Token).ConfigureAwait(false); } catch (OperationCanceledException) { - // Ignored. This is expected due to the delay token + source = nameof(OutboxDeliveryOptions.PollingInterval); } + logger.LogTrace("Sleep interrupted by {source}", source); } } \ No newline at end of file diff --git a/src/ES.FX.TransactionalOutbox.EntityFrameworkCore/OutboxExtensions.cs b/src/ES.FX.TransactionalOutbox.EntityFrameworkCore/OutboxExtensions.cs index 493801f..9806264 100644 --- a/src/ES.FX.TransactionalOutbox.EntityFrameworkCore/OutboxExtensions.cs +++ b/src/ES.FX.TransactionalOutbox.EntityFrameworkCore/OutboxExtensions.cs @@ -38,8 +38,7 @@ public static void AddOutboxEntities(this ModelBuilder modelBuilder) public static void AddOutboxBehavior(this DbContextOptionsBuilder optionsBuilder) { optionsBuilder.AddInterceptors( - new OutboxDbContextSaveChangesInterceptor(), - new OutboxDbContextTransactionInterceptor()); + new OutboxDbContextInterceptor()); }