Skip to content

Commit

Permalink
feat(ES.FX): enhance IgniteHostingExtensions and refactor OutboxDbCon…
Browse files Browse the repository at this point in the history
…textInterceptor

- In `IgniteHostingExtensions.cs`, enhanced the configuration of OpenTelemetry by adding an environment variable detector.
- Refactored `OutboxDbContextInterceptor.cs` to change the interception points from SavingChanges/SavedChanges to TransactionCommitting/TransactionCommitted. This ensures that outbox operations are performed at transaction boundaries rather than save changes boundaries.
- Minor formatting adjustments in `OutboxDeliveryService.cs` for better readability.
  • Loading branch information
winromulus committed Sep 7, 2024
1 parent 23c9b7d commit 2466768
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 29 deletions.
6 changes: 5 additions & 1 deletion src/ES.FX.Ignite/Hosting/IgniteHostingExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,11 @@ private static void AddOpenTelemetry(IHostApplicationBuilder builder, IgniteOpen
});

builder.Services.AddOpenTelemetry()
.ConfigureResource(r => r.AddService(builder.Environment.ApplicationName))
.ConfigureResource(r =>
{
r.AddService(builder.Environment.ApplicationName);
r.AddEnvironmentVariableDetector();
})
.WithMetrics(metrics =>
{
if (settings.RuntimeMetricsEnabled) metrics.AddRuntimeInstrumentation();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,66 +15,66 @@ internal class OutboxDbContextInterceptor : ISaveChangesInterceptor, IDbTransact
{
private static readonly ConcurrentDictionary<Type, List<WeakReference?>> DbContextDictionary = new();

public InterceptionResult<int> SavingChanges(DbContextEventData eventData,
InterceptionResult<int> result)

public InterceptionResult TransactionCommitting(DbTransaction transaction, TransactionEventData eventData,
InterceptionResult result)
{
OnOutboxSaving(eventData.Context);
return result;
}

public ValueTask<InterceptionResult<int>> SavingChangesAsync(DbContextEventData eventData,
InterceptionResult<int> result,
CancellationToken cancellationToken = default)
public ValueTask<InterceptionResult> TransactionCommittingAsync(DbTransaction transaction,
TransactionEventData eventData,
InterceptionResult result, CancellationToken cancellationToken = new())
{
OnOutboxSaving(eventData.Context);
return ValueTask.FromResult(result);
}

public ValueTask<int> SavedChangesAsync(SaveChangesCompletedEventData eventData, int result,
CancellationToken cancellationToken = default)
public void TransactionCommitted(DbTransaction transaction, TransactionEndEventData eventData)
{
OnOutboxSaved(eventData.Context);
return ValueTask.FromResult(result);
OnOutboxSaved(eventData.Context, true);
}

public int SavedChanges(SaveChangesCompletedEventData eventData, int result)

public Task TransactionCommittedAsync(DbTransaction transaction, TransactionEndEventData eventData,
CancellationToken cancellationToken = new())
{
OnOutboxSaved(eventData.Context);
return result;
OnOutboxSaved(eventData.Context, true);
return Task.CompletedTask;
}



public InterceptionResult TransactionCommitting(DbTransaction transaction, TransactionEventData eventData,
InterceptionResult result)
public InterceptionResult<int> SavingChanges(DbContextEventData eventData,
InterceptionResult<int> result)
{
OnOutboxSaving(eventData.Context);
return result;
}

public ValueTask<InterceptionResult> TransactionCommittingAsync(DbTransaction transaction,
TransactionEventData eventData,
InterceptionResult result, CancellationToken cancellationToken = new())
public ValueTask<InterceptionResult<int>> SavingChangesAsync(DbContextEventData eventData,
InterceptionResult<int> result,
CancellationToken cancellationToken = default)
{
OnOutboxSaving(eventData.Context);
return ValueTask.FromResult(result);
}

public void TransactionCommitted(DbTransaction transaction, TransactionEndEventData eventData)
public ValueTask<int> SavedChangesAsync(SaveChangesCompletedEventData eventData, int result,
CancellationToken cancellationToken = default)
{
OnOutboxSaved(eventData.Context, true);
OnOutboxSaved(eventData.Context);
return ValueTask.FromResult(result);
}


public Task TransactionCommittedAsync(DbTransaction transaction, TransactionEndEventData eventData,
CancellationToken cancellationToken = new())
public int SavedChanges(SaveChangesCompletedEventData eventData, int result)
{
OnOutboxSaved(eventData.Context, true);
return Task.CompletedTask;
OnOutboxSaved(eventData.Context);
return result;
}


public static void OnOutboxSaved(DbContext? context, bool transactionCommitted = false, [CallerMemberName] string? callerMemberName = null)
public static void OnOutboxSaved(DbContext? context, bool transactionCommitted = false,
[CallerMemberName] string? callerMemberName = null)
{
if (context is null) return;
var type = context.GetType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,12 +293,14 @@ private async Task Sleep(TimeSpan delay, CancellationToken cancellationToken)
string source;
try
{
source = await OutboxDeliverySignal.GetChannel<TDbContext>().Reader.ReadAsync(sleepCts.Token).ConfigureAwait(false);
source = await OutboxDeliverySignal.GetChannel<TDbContext>().Reader.ReadAsync(sleepCts.Token)
.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
source = nameof(OutboxDeliveryOptions<TDbContext>.PollingInterval);
}

logger.LogTrace("Sleep interrupted by {source}", source);
}
}

0 comments on commit 2466768

Please sign in to comment.