Skip to content

Commit

Permalink
refactor(Outbox): enhance OutboxDbContextInterceptor and remove Outbo…
Browse files Browse the repository at this point in the history
…xDbContextTransactionInterceptor

- Refactored `IOutboxMessageHandler` to return a default value in `IsReadyAsync()`.
- Renamed `OutboxDbContextSaveChangesInterceptor.cs` to `OutboxDbContextInterceptor.cs`.
- Enhanced the functionality of `OutboxDbContextInterceptor` by implementing both `ISaveChangesInterceptor` and `IDbTransactionInterceptor`.
- Removed the file `OutboxDbContextTransactionInterceptor.cs`, as its functionalities are now covered by the enhanced `OutboxDbContextInterceptor`.
- Updated type parameter in methods of class 'OutBoxDeliverySignal' from Type to string.
- Removed commented out package reference in `.csproj` file.
- Updated sleep interruption logging in 'OutBoxDeliveryService'.
- Simplified method 'AddOutBoxBehavior' in 'OutBoxExtensions' due to removal of 'OutBoxDbContextTransactionInterceptor'.
  • Loading branch information
winromulus committed Sep 7, 2024
1 parent dd4dab3 commit 23c9b7d
Show file tree
Hide file tree
Showing 11 changed files with 138 additions and 180 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
var factory = serviceProvider.GetRequiredService<IDbContextFactory<SimpleDbContext>>();
while (true)
{
await using var dbContext = await factory.CreateDbContextAsync(stoppingToken);


await using var dbContext = await factory.CreateDbContextAsync(stoppingToken).ConfigureAwait(false);
for (var i = 0; i < 50; i++)
{
dbContext.AddOutboxMessage(new OutboxTestMessage("Property"), new OutboxMessageOptions
Expand All @@ -36,8 +34,9 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
}

await dbContext.SaveChangesAsync(stoppingToken).ConfigureAwait(false);
await Task.Delay(10_000, stoppingToken).ConfigureAwait(false);
await Task.CompletedTask;

await Task.Delay(3_000, stoppingToken).ConfigureAwait(false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,26 @@

<ItemGroup>
<ProjectReference Include="..\..\src\ES.FX.Hosting\ES.FX.Hosting.csproj" />
<ProjectReference
Include="..\..\src\ES.FX.Ignite.AspNetCore.HealthChecks.UI\ES.FX.Ignite.AspNetCore.HealthChecks.UI.csproj" />
<ProjectReference Include="..\..\src\ES.FX.Ignite.Asp.Versioning\ES.FX.Ignite.Asp.Versioning.csproj" />
<ProjectReference Include="..\..\src\ES.FX.Ignite.AspNetCore.HealthChecks.UI\ES.FX.Ignite.AspNetCore.HealthChecks.UI.csproj" />
<ProjectReference Include="..\..\src\ES.FX.Ignite.Azure.Data.Tables\ES.FX.Ignite.Azure.Data.Tables.csproj" />
<ProjectReference
Include="..\..\src\ES.FX.Ignite.Azure.Security.KeyVault.Secrets\ES.FX.Ignite.Azure.Security.KeyVault.Secrets.csproj" />
<ProjectReference Include="..\..\src\ES.FX.Ignite.Azure.Security.KeyVault.Secrets\ES.FX.Ignite.Azure.Security.KeyVault.Secrets.csproj" />
<ProjectReference Include="..\..\src\ES.FX.Ignite.Azure.Storage.Blobs\ES.FX.Ignite.Azure.Storage.Blobs.csproj" />
<ProjectReference Include="..\..\src\ES.FX.Ignite.Azure.Storage.Queues\ES.FX.Ignite.Azure.Storage.Queues.csproj" />
<ProjectReference Include="..\..\src\ES.FX.Ignite.FluentValidation\ES.FX.Ignite.FluentValidation.csproj" />
<ProjectReference
Include="..\..\src\ES.FX.Ignite.Microsoft.Data.SqlClient\ES.FX.Ignite.Microsoft.Data.SqlClient.csproj" />
<ProjectReference
Include="..\..\src\ES.FX.Ignite.Microsoft.EntityFrameworkCore.SqlServer\ES.FX.Ignite.Microsoft.EntityFrameworkCore.SqlServer.csproj" />
<ProjectReference Include="..\..\src\ES.FX.Ignite.Microsoft.Data.SqlClient\ES.FX.Ignite.Microsoft.Data.SqlClient.csproj" />
<ProjectReference Include="..\..\src\ES.FX.Ignite.Microsoft.EntityFrameworkCore.SqlServer\ES.FX.Ignite.Microsoft.EntityFrameworkCore.SqlServer.csproj" />
<ProjectReference Include="..\..\src\ES.FX.Ignite.Migrations\ES.FX.Ignite.Migrations.csproj" />
<ProjectReference Include="..\..\src\ES.FX.Ignite.NSwag\ES.FX.Ignite.NSwag.csproj" />
<ProjectReference
Include="..\..\src\ES.FX.Ignite.OpenTelemetry.Exporter.Seq\ES.FX.Ignite.OpenTelemetry.Exporter.Seq.csproj" />
<ProjectReference Include="..\..\src\ES.FX.Ignite.OpenTelemetry.Exporter.Seq\ES.FX.Ignite.OpenTelemetry.Exporter.Seq.csproj" />
<ProjectReference Include="..\..\src\ES.FX.Ignite.Serilog\ES.FX.Ignite.Serilog.csproj" />
<ProjectReference Include="..\..\src\ES.FX.Ignite.StackExchange.Redis\ES.FX.Ignite.StackExchange.Redis.csproj" />
<ProjectReference Include="..\..\src\ES.FX.Ignite.Swashbuckle\ES.FX.Ignite.Swashbuckle.csproj" />
<ProjectReference Include="..\..\src\ES.FX.Ignite\ES.FX.Ignite.csproj" />
<ProjectReference Include="..\..\src\ES.FX.Serilog\ES.FX.Serilog.csproj" />
<ProjectReference
Include="..\..\src\ES.FX.TransactionalOutbox.EntityFrameworkCore.SqlServer\ES.FX.TransactionalOutbox.EntityFrameworkCore.SqlServer.csproj" />
<ProjectReference
Include="..\Playground.Shared.Data.Simple.EntityFrameworkCore.SqlServer\Playground.Shared.Data.Simple.EntityFrameworkCore.SqlServer.csproj" />
<ProjectReference
Include="..\Playground.Shared.Data.Simple.EntityFrameworkCore\Playground.Shared.Data.Simple.EntityFrameworkCore.csproj" />
<ProjectReference Include="..\..\src\ES.FX.TransactionalOutbox.EntityFrameworkCore.SqlServer\ES.FX.TransactionalOutbox.EntityFrameworkCore.SqlServer.csproj" />
<ProjectReference Include="..\Playground.Shared.Data.Simple.EntityFrameworkCore.SqlServer\Playground.Shared.Data.Simple.EntityFrameworkCore.SqlServer.csproj" />
<ProjectReference Include="..\Playground.Shared.Data.Simple.EntityFrameworkCore\Playground.Shared.Data.Simple.EntityFrameworkCore.csproj" />
</ItemGroup>

</Project>
73 changes: 57 additions & 16 deletions playground/Playground.Microservice.Api.Host/Program.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using Asp.Versioning;
using ES.FX.Hosting.Lifetime;
using ES.FX.Ignite.Asp.Versioning.Hosting;
using ES.FX.Ignite.Azure.Data.Tables.Hosting;
using ES.FX.Ignite.Azure.Storage.Blobs.Hosting;
using ES.FX.Ignite.Azure.Storage.Queues.Hosting;
Expand All @@ -12,6 +14,7 @@
using ES.FX.Ignite.OpenTelemetry.Exporter.Seq.Hosting;
using ES.FX.Ignite.Serilog.Hosting;
using ES.FX.Ignite.StackExchange.Redis.Hosting;
using ES.FX.NSwag.AspNetCore.Generation;
using ES.FX.Serilog.Lifetime;
using ES.FX.TransactionalOutbox.EntityFrameworkCore;
using ES.FX.TransactionalOutbox.EntityFrameworkCore.SqlServer;
Expand All @@ -22,6 +25,7 @@
using Playground.Microservice.Api.Host.Testing;
using Playground.Shared.Data.Simple.EntityFrameworkCore;
using Playground.Shared.Data.Simple.EntityFrameworkCore.SqlServer;
using SharpGrip.FluentValidation.AutoValidation.Endpoints.Extensions;

return await ProgramEntry.CreateBuilder(args).UseSerilog().Build().RunAsync(async _ =>
{
Expand All @@ -34,19 +38,46 @@
builder.Ignite(settings =>
{
settings.HealthChecks.ResponseWriter = UIResponseWriter.WriteHealthCheckUIResponse;
settings.OpenTelemetry.AspNetCoreTracingHealthChecksRequestsFiltered = true;
//settings.OpenTelemetry.AspNetCoreTracingHealthChecksRequestsFiltered = true;
});

//Add Seq
builder.IgniteSeqOpenTelemetryExporter();

//Fluent Validation
builder.IgniteFluentValidation();

// Add health checks UI
//builder.IgniteHealthChecksUi();


//Migrations service
builder.IgniteMigrationsService();

#region Ignite API Versioning

var apiVersions = new[]
{
new ApiVersion(1)
}.Order().ToArray();
builder.Services.AddSingleton<IEnumerable<ApiVersion>>(apiVersions);

builder.IgniteApiVersioning(options =>
options.DefaultApiVersion = apiVersions.Order().Last());

var openApiDocs = new List<(string Name, ApiVersion Version)> { (Name: "latest", Version: apiVersions.Last()) }
.Concat(apiVersions.OrderDescending().Select(v => (Name: $"v{v}", Version: v))).ToList();
openApiDocs.ForEach(doc => builder.Services.AddOpenApiDocument((settings, provider) =>
{
settings.Title = provider.GetRequiredService<IHostEnvironment>().ApplicationName;
settings.DocumentName = doc.Name;
settings.Version = $"v{doc.Version}";
settings.ApiGroupNames = [settings.Version];
settings.SchemaSettings.SchemaNameGenerator = new TypeToStringSchemaNameGenerator();
settings.SchemaSettings.GenerateEnumMappingDescription = true;
}));

#endregion

// Add health checks UI
//builder.IgniteHealthChecksUi();


//SqlServerDbContext
builder.IgniteSqlServerDbContextFactory<SimpleDbContext>(
Expand All @@ -59,44 +90,54 @@
sqlServerDbContextOptionsBuilder.MigrationsAssembly(
typeof(SimpleDbContextDesignTimeFactory).Assembly.FullName);
});

//DbContext Migrations
builder.AddDbContextMigrationsTask<SimpleDbContext>();


//Sql Server Client
builder.IgniteSqlServerClientFactory(nameof(SimpleDbContext));


//Add Seq
builder.IgniteSeqOpenTelemetryExporter();

// Add Storage
builder.IgniteAzureBlobServiceClient();
builder.IgniteAzureQueueServiceClient();
builder.IgniteAzureTableServiceClient();

// Add Redis
builder.IgniteRedisClient();

builder.Services.AddOpenApiDocument();


builder.Services.AddHostedService<TestHostedService>();
builder.Services.AddScoped<IValidator<TestRequest>, TestValidator>();
builder.Services.AddScoped<IValidator<TestComplexRequest>, TestComplexRequestValidator>();


builder.Services.AddOutboxMessageType<OutboxTestMessage>();

builder.Services.AddOutboxDeliveryService<SimpleDbContext, OutboxMessageHandler>();

builder.Services.AddOpenTelemetry().WithTracing(traceBuilder =>
traceBuilder.AddTransactionalOutboxInstrumentation());


var app = builder.Build();
app.Ignite();

app.IgniteNSwag();

var root = app
.MapGroup("v{version:apiVersion}")
.AddFluentValidationAutoValidation()
.WithApiVersionSet(app.NewApiVersionSet()
.ReportApiVersions()
.Build());

root.MapGet("test", (IServiceProvider serviceProvider) =>
{
var dbContext = serviceProvider.GetRequiredService<SimpleDbContext>();
//using var tx = dbContext.Database.BeginTransaction();
dbContext.AddOutboxMessage(new OutboxTestMessage("test"));
dbContext.SaveChanges();
//Task.Delay(5000).Wait();
//tx.Commit();
return Results.Ok();
});

//app.IgniteHealthChecksUi();

await app.RunAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ public class OutboxMessageHandler : IOutboxMessageHandler
public async ValueTask<bool> HandleAsync(OutboxMessageHandlerContext context,
CancellationToken cancellationToken = default)
{
//await Task.Delay(Random.Shared.Next(10,300), cancellationToken);

await Task.CompletedTask;


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public interface IOutboxMessageHandler
/// Returns whether the handler can deliver messages. This can be used to delay the delivery of messages until certain
/// conditions are met (example: health checks)
/// </summary>
public ValueTask<bool> IsReadyAsync();
public ValueTask<bool> IsReadyAsync() => ValueTask.FromResult(true);


/// <summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System.Collections.Concurrent;
using System.Data.Common;
using System.Runtime.CompilerServices;
using ES.FX.TransactionalOutbox.EntityFrameworkCore.Entities;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Diagnostics;
Expand All @@ -9,59 +11,86 @@ namespace ES.FX.TransactionalOutbox.EntityFrameworkCore.Delivery;
/// Interceptor used to handle the saving of <see cref="Outbox" /> and <see cref="OutboxMessage" /> entities and
/// signalling the delivery process
/// </summary>
internal class OutboxDbContextSaveChangesInterceptor : SaveChangesInterceptor
internal class OutboxDbContextInterceptor : ISaveChangesInterceptor, IDbTransactionInterceptor
{
private static readonly ConcurrentDictionary<Type, List<WeakReference>> DbContextDictionary = new();
private static readonly ConcurrentDictionary<Type, List<WeakReference?>> DbContextDictionary = new();

public override InterceptionResult<int> SavingChanges(DbContextEventData eventData,
public InterceptionResult<int> SavingChanges(DbContextEventData eventData,
InterceptionResult<int> result)
{
OnOutboxSaving(eventData.Context);
return base.SavingChanges(eventData, result);
return result;
}

public override ValueTask<InterceptionResult<int>> SavingChangesAsync(DbContextEventData eventData,
public ValueTask<InterceptionResult<int>> SavingChangesAsync(DbContextEventData eventData,
InterceptionResult<int> result,
CancellationToken cancellationToken = default)
{
OnOutboxSaving(eventData.Context);
return base.SavingChangesAsync(eventData, result, cancellationToken);
return ValueTask.FromResult(result);
}

public override ValueTask<int> SavedChangesAsync(SaveChangesCompletedEventData eventData, int result,
public ValueTask<int> SavedChangesAsync(SaveChangesCompletedEventData eventData, int result,
CancellationToken cancellationToken = default)
{
OnOutboxSaved(eventData.Context);
return ValueTask.FromResult(result);
}

public override int SavedChanges(SaveChangesCompletedEventData eventData, int result)
public int SavedChanges(SaveChangesCompletedEventData eventData, int result)
{
OnOutboxSaved(eventData.Context);
return result;
}

public static void OnOutboxSaved(DbContext? context)


public InterceptionResult TransactionCommitting(DbTransaction transaction, TransactionEventData eventData,
InterceptionResult result)
{
OnOutboxSaving(eventData.Context);
return result;
}

public ValueTask<InterceptionResult> TransactionCommittingAsync(DbTransaction transaction,
TransactionEventData eventData,
InterceptionResult result, CancellationToken cancellationToken = new())
{
OnOutboxSaving(eventData.Context);
return ValueTask.FromResult(result);
}

public void TransactionCommitted(DbTransaction transaction, TransactionEndEventData eventData)
{
OnOutboxSaved(eventData.Context, true);
}


public Task TransactionCommittedAsync(DbTransaction transaction, TransactionEndEventData eventData,
CancellationToken cancellationToken = new())
{
OnOutboxSaved(eventData.Context, true);
return Task.CompletedTask;
}


public static void OnOutboxSaved(DbContext? context, bool transactionCommitted = false, [CallerMemberName] string? callerMemberName = null)
{
if (context is null) return;
var type = context.GetType();

// If the transaction is not committed and there is an active transaction, do not signal, wait for the transaction to commit
if (!transactionCommitted && context.Database.CurrentTransaction is not null) return;

DbContextDictionary.AddOrUpdate(context.GetType(),
_ => [],
(_, bag) =>
{
try
{
var matches = bag.RemoveAll(s => context.Equals(s.Target));
if (matches > 0) OutboxDeliverySignal.GetChannel(type).Writer.TryWrite(type);
bag.RemoveAll(s => s is { IsAlive: true });
return bag;
}
catch
{
// Ignored. This needs to complete successfully
}

return [];
var matches = bag.RemoveAll(s => context.Equals(s?.Target));
if (matches > 0)
OutboxDeliverySignal.GetChannel(type).Writer.TryWrite(callerMemberName ?? nameof(OnOutboxSaved));
bag.RemoveAll(s => !s?.IsAlive ?? true);
return bag;
});
}

Expand Down Expand Up @@ -96,18 +125,9 @@ private static void OnOutboxSaving(DbContext? context)
_ => [new WeakReference(context)],
(_, bag) =>
{
try
{
bag.RemoveAll(s => !s.IsAlive);
bag.Add(new WeakReference(context));
return bag;
}
catch
{
// Ignored. This needs to complete successfully
}

return [];
bag.RemoveAll(s => !s?.IsAlive ?? true);
bag.Add(new WeakReference(context));
return bag;
});
}
}
Loading

0 comments on commit 23c9b7d

Please sign in to comment.