Skip to content

Commit

Permalink
feat(OutboxMessage): refactor ConsumeAsync methods and add new migration
Browse files Browse the repository at this point in the history
- Refactored `ConsumeAsync` method in `Consumer` class to use expression-bodied member.
- Modified `ConsumeAsync` method in `MessageRecordConsumer` class to be asynchronous.
- Added a new outbox message with delivery options in the `TestHostedService`.
- Created a new migration file '20240905162734_V1.cs' for database schema changes related to Outboxes, OutboxFailedMessages, OutboxMessages, and SimpleUsers tables.
- Renamed 'V2.Designer.cs' to '20240905162734_V1.Designer.cs' reflecting the updated migration version.
- Deleted old migration files 'V1.cs' and 'V1.Designer.cs'.
  • Loading branch information
winromulus committed Sep 5, 2024
1 parent 064621e commit ccd5004
Show file tree
Hide file tree
Showing 54 changed files with 765 additions and 452 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,15 @@ public record MessageRecord(string SomeProp);

public class Consumer : IOutboxMessageConsumer<Message>
{
public ValueTask<bool> ConsumeAsync(Message message)
{
return ValueTask.FromResult(true);
}
public ValueTask<bool> ConsumeAsync(Message message) => ValueTask.FromResult(true);
}

public class MessageRecordConsumer : IOutboxMessageConsumer<MessageRecord>
{


public ValueTask<bool> ConsumeAsync(MessageRecord message)
public async ValueTask<bool> ConsumeAsync(MessageRecord message)
{
return ValueTask.FromResult(true);
//await Task.Delay(5000);
await Task.CompletedTask;
return true;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma warning disable CS9113 // Parameter is unread.

using ES.FX.TransactionalOutbox.EntityFrameworkCore;
using ES.FX.TransactionalOutbox.EntityFrameworkCore.Configuration;
using Microsoft.EntityFrameworkCore;
using Playground.Shared.Data.Simple.EntityFrameworkCore;
using Playground.Shared.Data.Simple.EntityFrameworkCore.Entities;
Expand All @@ -24,13 +25,18 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)

for (var i = 0; i < 50; i++)
{
dbContext.AddOutboxMessage(new MessageRecord("Property"));
dbContext.AddOutboxMessage(new MessageRecord("Property"), new OutboxMessageDeliveryOptions
{
MaxAttempts = 5,
DelayBetweenAttempts = 5,
DelayBetweenAttemptsIsExponential = true
});
dbContext.SimpleUsers.Add(new SimpleUser { Id = Guid.NewGuid() });
}

await dbContext.SaveChangesAsync(stoppingToken).ConfigureAwait(false);

await Task.Delay(3_000, stoppingToken).ConfigureAwait(false);

}
}
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
using System;
using Microsoft.EntityFrameworkCore.Migrations;

#nullable disable

namespace Playground.Shared.Data.Simple.EntityFrameworkCore.SqlServer.Migrations
{
/// <inheritdoc />
public partial class V1 : Migration
{
/// <inheritdoc />
protected override void Up(MigrationBuilder migrationBuilder)
{
migrationBuilder.CreateTable(
name: "__Outboxes",
columns: table => new
{
Id = table.Column<Guid>(type: "uniqueidentifier", nullable: false),
AddedAt = table.Column<DateTimeOffset>(type: "datetimeoffset", nullable: false),
Lock = table.Column<Guid>(type: "uniqueidentifier", nullable: false),
DeliveryDelayedUntil = table.Column<DateTimeOffset>(type: "datetimeoffset", nullable: true),
RowVersion = table.Column<byte[]>(type: "rowversion", rowVersion: true, nullable: true)
},
constraints: table =>
{
table.PrimaryKey("PK___Outboxes", x => x.Id);
});

migrationBuilder.CreateTable(
name: "__OutboxFailedMessages",
columns: table => new
{
Id = table.Column<long>(type: "bigint", nullable: false),
OutboxId = table.Column<Guid>(type: "uniqueidentifier", nullable: false),
AddedAt = table.Column<DateTimeOffset>(type: "datetimeoffset", nullable: false),
Headers = table.Column<string>(type: "nvarchar(max)", nullable: false),
PayloadType = table.Column<string>(type: "nvarchar(max)", nullable: false),
Payload = table.Column<string>(type: "nvarchar(max)", nullable: false),
DeliveryAttempts = table.Column<int>(type: "int", nullable: false),
DeliveryFirstAttemptedAt = table.Column<DateTimeOffset>(type: "datetimeoffset", nullable: true),
DeliveryLastAttemptedAt = table.Column<DateTimeOffset>(type: "datetimeoffset", nullable: true),
DeliveryLastAttemptError = table.Column<string>(type: "nvarchar(4000)", maxLength: 4000, nullable: true),
DeliveryNotBefore = table.Column<DateTimeOffset>(type: "datetimeoffset", nullable: true),
DeliveryNotAfter = table.Column<DateTimeOffset>(type: "datetimeoffset", nullable: true),
FailedAt = table.Column<DateTimeOffset>(type: "datetimeoffset", nullable: false)
},
constraints: table =>
{
table.PrimaryKey("PK___OutboxFailedMessages", x => x.Id);
});

migrationBuilder.CreateTable(
name: "__OutboxMessages",
columns: table => new
{
Id = table.Column<long>(type: "bigint", nullable: false)
.Annotation("SqlServer:Identity", "1, 1"),
OutboxId = table.Column<Guid>(type: "uniqueidentifier", nullable: false),
AddedAt = table.Column<DateTimeOffset>(type: "datetimeoffset", nullable: false),
Headers = table.Column<string>(type: "nvarchar(max)", nullable: false),
PayloadType = table.Column<string>(type: "nvarchar(max)", nullable: false),
Payload = table.Column<string>(type: "nvarchar(max)", nullable: false),
DeliveryAttempts = table.Column<int>(type: "int", nullable: false),
DeliveryMaxAttempts = table.Column<int>(type: "int", nullable: true),
DeliveryFirstAttemptedAt = table.Column<DateTimeOffset>(type: "datetimeoffset", nullable: true),
DeliveryLastAttemptedAt = table.Column<DateTimeOffset>(type: "datetimeoffset", nullable: true),
DeliveryLastAttemptError = table.Column<string>(type: "nvarchar(4000)", maxLength: 4000, nullable: true),
DeliveryNotBefore = table.Column<DateTimeOffset>(type: "datetimeoffset", nullable: true),
DeliveryNotAfter = table.Column<DateTimeOffset>(type: "datetimeoffset", nullable: true),
DeliveryAttemptDelay = table.Column<int>(type: "int", nullable: false),
DeliveryAttemptDelayIsExponential = table.Column<bool>(type: "bit", nullable: false),
RowVersion = table.Column<byte[]>(type: "rowversion", rowVersion: true, nullable: true)
},
constraints: table =>
{
table.PrimaryKey("PK___OutboxMessages", x => x.Id);
});

migrationBuilder.CreateTable(
name: "SimpleUsers",
columns: table => new
{
Id = table.Column<Guid>(type: "uniqueidentifier", nullable: false)
},
constraints: table =>
{
table.PrimaryKey("PK_SimpleUsers", x => x.Id);
});

migrationBuilder.CreateIndex(
name: "IX___Outboxes_AddedAt",
table: "__Outboxes",
column: "AddedAt");

migrationBuilder.CreateIndex(
name: "IX___Outboxes_DeliveryDelayedUntil",
table: "__Outboxes",
column: "DeliveryDelayedUntil");
}

/// <inheritdoc />
protected override void Down(MigrationBuilder migrationBuilder)
{
migrationBuilder.DropTable(
name: "__Outboxes");

migrationBuilder.DropTable(
name: "__OutboxFailedMessages");

migrationBuilder.DropTable(
name: "__OutboxMessages");

migrationBuilder.DropTable(
name: "SimpleUsers");
}
}
}
Loading

0 comments on commit ccd5004

Please sign in to comment.