diff --git a/.github/workflows/pull-request.yml b/.github/workflows/pull-request.yml index 7f8e5db2..0cbff665 100644 --- a/.github/workflows/pull-request.yml +++ b/.github/workflows/pull-request.yml @@ -20,6 +20,9 @@ jobs: name: "Build and test" # runs-on: ubuntu-latest runs-on: self-hosted + strategy: + matrix: + dotnet-version: [ '8.0', '9.0' ] env: NUGET_PACKAGES: ${{ github.workspace }}/.nuget/packages TC_CLOUD_TOKEN: ${{ secrets.TC_TOKEN }} @@ -32,13 +35,15 @@ jobs: name: Setup .NET uses: actions/setup-dotnet@v4 with: - dotnet-version: | - 8.0.x - 9.0.x + dotnet-version: ${{ matrix.dotnet-version }} + - + name: Restore + run: | + dotnet restore -p:TargetFramework=net${{ matrix.dotnet-version }} -p:Configuration="Debug CI" - name: Build run: | - dotnet build -c "Debug CI" + dotnet build -c "Debug CI" -f net${{ matrix.dotnet-version }} --no-restore - name: Prepare Testcontainers Cloud agent if: env.TC_CLOUD_TOKEN != '' @@ -46,13 +51,13 @@ jobs: - name: Run tests run: | - dotnet test -c "Debug CI" --no-build + dotnet test -c "Debug CI" --no-build -f net${{ matrix.dotnet-version }} - name: Upload Test Results if: always() uses: actions/upload-artifact@v4 with: - name: Test Results ${{ matrix.dotnet }} + name: Test Results ${{ matrix.dotnet-version }} path: | test-results/**/*.xml test-results/**/*.trx diff --git a/src/Core/src/Eventuous.Application/AggregateService/CommandHandlerBuilder.cs b/src/Core/src/Eventuous.Application/AggregateService/CommandHandlerBuilder.cs index f826f44b..9b00261a 100644 --- a/src/Core/src/Eventuous.Application/AggregateService/CommandHandlerBuilder.cs +++ b/src/Core/src/Eventuous.Application/AggregateService/CommandHandlerBuilder.cs @@ -1,6 +1,8 @@ // Copyright (C) Eventuous HQ OÜ.All rights reserved // Licensed under the Apache License, Version 2.0. +using Eventuous.Persistence; +using Eventuous.Shared; using static Eventuous.CommandServiceDelegates; namespace Eventuous; @@ -38,10 +40,9 @@ public interface IDefineIdentity ICommandHandlerBuilder GetIdAsync(Func> getId); } -public interface IDefineStore +public interface IDefineStore where TAggregate : Aggregate where TState : State, new() - where TId : Id where TCommand : class { /// /// Defines how to resolve the event store from the command. It assigns both reader and writer. @@ -49,13 +50,12 @@ public interface IDefineStore /// /// Function to resolve the event writer /// - IDefineExecution ResolveStore(Func resolveStore); + IDefineExecution ResolveStore(Func resolveStore); } -public interface IDefineReader +public interface IDefineReader where TAggregate : Aggregate where TState : State, new() - where TId : Id where TCommand : class { /// /// Defines how to resolve the event reader from the command. @@ -63,13 +63,12 @@ public interface IDefineReader /// /// Function to resolve the event reader /// - IDefineWriter ResolveReader(Func resolveReader); + IDefineWriter ResolveReader(Func resolveReader); } -public interface IDefineWriter +public interface IDefineWriter where TAggregate : Aggregate where TState : State, new() - where TId : Id where TCommand : class { /// /// Defines how to resolve the event writer from the command. @@ -77,7 +76,7 @@ public interface IDefineWriter /// /// Function to resolve the event writer /// - IDefineExecution ResolveWriter(Func resolveWriter); + IDefineExecution ResolveWriter(Func resolveWriter); } public interface IDefineEventAmendment @@ -90,44 +89,42 @@ public interface IDefineEventAmendment /// A function to amend the event /// - IDefineStoreOrExecution AmendEvent(AmendEvent amendEvent); + IDefineStoreOrExecution AmendEvent(AmendEvent amendEvent); } -public interface IDefineExecution +public interface IDefineExecution where TAggregate : Aggregate where TState : State, new() - where TId : Id where TCommand : class { /// /// Defines how the command that acts on the aggregate. /// /// A function that executes an operation on an aggregate /// - void Act(Action action); + IDefineAppendAmendment Act(Action action); /// /// Defines how the command that acts on the aggregate. /// /// A function that executes an asynchronous operation on an aggregate /// - void ActAsync(Func action); + IDefineAppendAmendment ActAsync(Func action); } -public interface IDefineStoreOrExecution - : IDefineStore, - IDefineReader, - IDefineWriter, - IDefineExecution +public interface IDefineStoreOrExecution + : IDefineStore, + IDefineReader, + IDefineWriter, + IDefineExecution where TAggregate : Aggregate where TState : State, new() - where TId : Id where TCommand : class; public interface ICommandHandlerBuilder - : IDefineStore, - IDefineReader, - IDefineWriter, - IDefineExecution, + : IDefineStore, + IDefineReader, + IDefineWriter, + IDefineExecution, IDefineEventAmendment where TAggregate : Aggregate where TState : State, new() @@ -152,18 +149,20 @@ public class CommandHandlerBuilder( ) : IDefineExpectedState, IDefineIdentity, - IDefineStoreOrExecution, + IDefineStoreOrExecution, + IDefineAppendAmendment, ICommandHandlerBuilder where TCommand : class where TAggregate : Aggregate where TState : State, new() where TId : Id { - GetIdFromUntypedCommand? _getId; - HandleUntypedCommand? _action; - Func? _reader; - Func? _writer; - AmendEvent? _amendEvent; - ExpectedState _expectedState = ExpectedState.Any; + GetIdFromUntypedCommand? _getId; + HandleUntypedCommand? _action; + Func? _reader; + Func? _writer; + AmendEvent? _amendEvent; + ExpectedState _expectedState = ExpectedState.Any; + RegisteredHandler? _handler; IDefineIdentity IDefineExpectedState.InState(ExpectedState expectedState) { _expectedState = expectedState; @@ -183,25 +182,31 @@ ICommandHandlerBuilder IDefineIdentity.Act(Action action) { + IDefineAppendAmendment IDefineExecution.Act(Action action) { _action = (aggregate, cmd, _) => { action(aggregate, (TCommand)cmd); return ValueTask.FromResult(aggregate); }; - service.AddHandler(Build()); + _handler = Build(); + service.AddHandler(_handler); + + return this; } - void IDefineExecution.ActAsync(Func action) { + IDefineAppendAmendment IDefineExecution.ActAsync(Func action) { _action = async (aggregate, cmd, token) => { await action(aggregate, (TCommand)cmd, token).NoContext(); return aggregate; }; - service.AddHandler(Build()); + _handler = Build(); + service.AddHandler(_handler); + + return this; } - IDefineExecution IDefineStore.ResolveStore(Func resolveStore) { + IDefineExecution IDefineStore.ResolveStore(Func resolveStore) { Ensure.NotNull(resolveStore, nameof(resolveStore)); _reader = resolveStore; _writer = resolveStore; @@ -209,24 +214,28 @@ IDefineExecution IDefineStore IDefineReader.ResolveReader(Func resolveReader) { + IDefineWriter IDefineReader.ResolveReader(Func resolveReader) { _reader = resolveReader; return this; } - IDefineExecution IDefineWriter.ResolveWriter(Func resolveWriter) { + IDefineExecution IDefineWriter.ResolveWriter(Func resolveWriter) { _writer = resolveWriter; return this; } - IDefineStoreOrExecution IDefineEventAmendment.AmendEvent(AmendEvent amendEvent) { + IDefineStoreOrExecution IDefineEventAmendment.AmendEvent(AmendEvent amendEvent) { _amendEvent = amendEvent; return this; } + void IDefineAppendAmendment.AmendAppend(AmendAppend amendAppend) { + Ensure.NotNull(_handler, "Handler hasn't been built yet").AmendAppend = (append, cmd) => amendAppend(append, (TCommand)cmd); + } + RegisteredHandler Build() { return new( _expectedState, diff --git a/src/Core/src/Eventuous.Application/AggregateService/CommandHandlersMap.cs b/src/Core/src/Eventuous.Application/AggregateService/CommandHandlersMap.cs index d6d06320..ebfea2d1 100644 --- a/src/Core/src/Eventuous.Application/AggregateService/CommandHandlersMap.cs +++ b/src/Core/src/Eventuous.Application/AggregateService/CommandHandlersMap.cs @@ -2,6 +2,7 @@ // Licensed under the Apache License, Version 2.0. using System.Reflection; +using Eventuous.Persistence; using static Eventuous.CommandServiceDelegates; using static Eventuous.FuncServiceDelegates; @@ -16,7 +17,9 @@ record RegisteredHandler( ResolveReaderFromCommand ResolveReader, ResolveWriterFromCommand ResolveWriter, AmendEventFromCommand? AmendEvent - ) where TAggregate : Aggregate where TId : Id where TState : State, new(); + ) where TAggregate : Aggregate where TId : Id where TState : State, new() { + public AmendAppend? AmendAppend { get; set; } +} class HandlersMap where TAggregate : Aggregate where TId : Id where TState : State, new() { readonly TypeMap> _typeMap = new(); diff --git a/src/Core/src/Eventuous.Application/AggregateService/CommandService.cs b/src/Core/src/Eventuous.Application/AggregateService/CommandService.cs index 1e2e39ee..6b196946 100644 --- a/src/Core/src/Eventuous.Application/AggregateService/CommandService.cs +++ b/src/Core/src/Eventuous.Application/AggregateService/CommandService.cs @@ -1,6 +1,8 @@ // Copyright (C) Eventuous HQ OÜ. All rights reserved // Licensed under the Apache License, Version 2.0. +using Eventuous.Persistence; + namespace Eventuous; using static Diagnostics.ApplicationEventSource; @@ -87,8 +89,10 @@ public async Task> Handle(TCommand command, Cancellatio // Zero in the global position would mean nothing, so the receiver needs to check the Changes.Length if (result.Changes.Count == 0) return Result.FromSuccess(result.State, Array.Empty(), 0); + var proposed = new ProposedAppend(stream, new(result.OriginalVersion), result.Changes.Select(x => new ProposedEvent(x, new())).ToArray()); + var final = registeredHandler.AmendAppend?.Invoke(proposed, command) ?? proposed; var writer = registeredHandler.ResolveWriter(command); - var storeResult = await writer.StoreAggregate(stream, result, Amend, cancellationToken).NoContext(); + var storeResult = await writer.Store(final, Amend, cancellationToken).NoContext(); var changes = result.Changes.Select(x => Change.FromEvent(x, _typeMap)); Log.CommandHandled(); diff --git a/src/Core/src/Eventuous.Application/FunctionalService/CommandHandlerBuilder.cs b/src/Core/src/Eventuous.Application/FunctionalService/CommandHandlerBuilder.cs index 97c1a1b1..00feeae6 100644 --- a/src/Core/src/Eventuous.Application/FunctionalService/CommandHandlerBuilder.cs +++ b/src/Core/src/Eventuous.Application/FunctionalService/CommandHandlerBuilder.cs @@ -1,6 +1,8 @@ // Copyright (C) Eventuous HQ OÜ.All rights reserved // Licensed under the Apache License, Version 2.0. +using Eventuous.Persistence; +using Eventuous.Shared; using static Eventuous.FuncServiceDelegates; namespace Eventuous; @@ -77,28 +79,28 @@ public interface IDefineExecution where TState : State /// /// Function to be executed on the stream for the command /// - void Act(Func executeCommand); + IDefineAppendAmendment Act(Func executeCommand); /// /// Defines the action to take on the stream for the command. The expected state should be New for this to work. /// /// Function to be executed on a new stream for the command /// - void ActAsync(Func> executeCommand); + IDefineAppendAmendment ActAsync(Func> executeCommand); /// /// Defines the action to take on the stream for the command, asynchronously. /// /// Function to be executed on a stream for the command /// - void Act(Func executeCommand); + IDefineAppendAmendment Act(Func executeCommand); /// /// Defines the action to take on the new stream for the command, asynchronously. /// /// Function to be executed on a stream for the command /// - void ActAsync(Func> executeCommand); + IDefineAppendAmendment ActAsync(Func> executeCommand); } public interface IDefineStoreOrExecution @@ -120,6 +122,7 @@ public class CommandHandlerBuilder(CommandService serv : IDefineExpectedState, IDefineStreamName, IDefineStoreOrExecution, + IDefineAppendAmendment, ICommandHandlerBuilder where TState : State, new() where TCommand : class { ExpectedState _expectedState = ExpectedState.Any; @@ -128,6 +131,7 @@ public class CommandHandlerBuilder(CommandService serv Func? _reader; Func? _writer; AmendEvent? _amendEvent; + RegisteredHandler? _handler; IDefineStreamName IDefineExpectedState.InState(ExpectedState expectedState) { _expectedState = expectedState; @@ -141,40 +145,50 @@ ICommandHandlerBuilder IDefineStreamName.Get return this; } - ICommandHandlerBuilder IDefineStreamName.GetStreamAsync( - Func> getStream - ) { + ICommandHandlerBuilder IDefineStreamName.GetStreamAsync(Func> getStream) { _getStream = (cmd, token) => getStream((TCommand)cmd, token); return this; } - void IDefineExecution.Act(Func executeCommand) { + IDefineAppendAmendment IDefineExecution.Act(Func executeCommand) { _execute = (state, events, command, _) => ValueTask.FromResult(executeCommand(state, events, (TCommand)command)); - service.AddHandler(Build()); + _handler = Build(); + service.AddHandler(_handler); + + return this; } - void IDefineExecution.ActAsync(Func> executeCommand) { + IDefineAppendAmendment IDefineExecution.ActAsync(Func> executeCommand) { _execute = async (state, events, cmd, token) => await executeCommand(state, events, (TCommand)cmd, token).NoContext(); - service.AddHandler(Build()); + _handler = Build(); + service.AddHandler(_handler); + + return this; } - void IDefineExecution.Act(Func executeCommand) { + IDefineAppendAmendment IDefineExecution.Act(Func executeCommand) { if (_expectedState != ExpectedState.New) { throw new InvalidOperationException("Action without state is only allowed for new streams"); } _execute = (_, _, command, _) => ValueTask.FromResult(executeCommand((TCommand)command)); - service.AddHandler(Build()); + _handler = Build(); + service.AddHandler(_handler); + + return this; } - void IDefineExecution.ActAsync(Func> executeCommand) { + IDefineAppendAmendment IDefineExecution.ActAsync(Func> executeCommand) { if (_expectedState != ExpectedState.New) { throw new InvalidOperationException("Action without state is only allowed for new streams"); } _execute = executeCommand.AsExecute(); - service.AddHandler(Build()); + _handler = Build(); + service.AddHandler(_handler); + + return this; } IDefineWriter IDefineReader.ResolveReader(Func resolveReader) { @@ -203,6 +217,10 @@ IDefineStoreOrExecution IDefineEventAmendment.AmendAppend(AmendAppend amendAppend) { + Ensure.NotNull(_handler, "Handler hasn't been built yet").AmendAppend = (append, cmd) => amendAppend(append, (TCommand)cmd); + } + RegisteredHandler Build() { return new( _expectedState, diff --git a/src/Core/src/Eventuous.Application/FunctionalService/CommandService.cs b/src/Core/src/Eventuous.Application/FunctionalService/CommandService.cs index 1e0c2e51..9cf8fb7e 100644 --- a/src/Core/src/Eventuous.Application/FunctionalService/CommandService.cs +++ b/src/Core/src/Eventuous.Application/FunctionalService/CommandService.cs @@ -1,6 +1,8 @@ // Copyright (C) Eventuous HQ OÜ. All rights reserved // Licensed under the Apache License, Version 2.0. +using Eventuous.Persistence; + namespace Eventuous; using static Diagnostics.ApplicationEventSource; @@ -84,19 +86,18 @@ public async Task> Handle(TCommand command, Cancellatio _ => throw new ArgumentOutOfRangeException(nameof(registeredHandler.ExpectedState), "Unknown expected state") }; - var result = await registeredHandler - .Handler(loadedState.State, loadedState.Events, command, cancellationToken) - .NoContext(); + var result = (await registeredHandler.Handler(loadedState.State, loadedState.Events, command, cancellationToken).NoContext()).ToArray(); - var newEvents = result.ToArray(); + var newEvents = result.Select(x => new ProposedEvent(x, new())).ToArray(); var newState = newEvents.Aggregate(loadedState.State, (current, evt) => current.When(evt)); // Zero in the global position would mean nothing, so the receiver needs to check the Changes.Length if (newEvents.Length == 0) return Result.FromSuccess(newState, Array.Empty(), 0); - var storeResult = await resolvedWriter.Store(streamName, loadedState.StreamVersion, newEvents, Amend, cancellationToken) - .NoContext(); - var changes = newEvents.Select(x => Change.FromEvent(x, _typeMap)); + var proposed = new ProposedAppend(streamName, loadedState.StreamVersion, newEvents); + var final = registeredHandler.AmendAppend?.Invoke(proposed, command) ?? proposed; + var storeResult = await resolvedWriter.Store(final, Amend, cancellationToken).NoContext(); + var changes = result.Select(x => Change.FromEvent(x, _typeMap)); Log.CommandHandled(); return Result.FromSuccess(newState, changes, storeResult.GlobalPosition); @@ -115,7 +116,5 @@ NewStreamEvent Amend(NewStreamEvent streamEvent) { protected static StreamName GetStream(string id) => StreamName.ForState(id); - internal void AddHandler(RegisteredHandler handler) where TCommand : class { - _handlers.AddHandlerUntyped(typeof(TCommand), handler); - } + internal void AddHandler(RegisteredHandler handler) where TCommand : class => _handlers.AddHandlerUntyped(typeof(TCommand), handler); } diff --git a/src/Core/src/Eventuous.Application/FunctionalService/HandlersMap.cs b/src/Core/src/Eventuous.Application/FunctionalService/HandlersMap.cs index 68712e0b..640defeb 100644 --- a/src/Core/src/Eventuous.Application/FunctionalService/HandlersMap.cs +++ b/src/Core/src/Eventuous.Application/FunctionalService/HandlersMap.cs @@ -3,6 +3,7 @@ using System.Reflection; using Eventuous.Diagnostics; +using Eventuous.Persistence; using static Eventuous.FuncServiceDelegates; namespace Eventuous; @@ -14,7 +15,9 @@ record RegisteredHandler( ResolveReaderFromCommand ResolveReaderFromCommand, ResolveWriterFromCommand ResolveWriterFromCommand, AmendEventFromCommand? AmendEvent - ) where TState : State; + ) where TState : State { + public AmendAppend? AmendAppend { get; set; } +} class HandlersMap where TState : State { readonly TypeMap> _typeMap = new(); diff --git a/src/Core/src/Eventuous.Application/Persistence/ProposedAppend.cs b/src/Core/src/Eventuous.Application/Persistence/ProposedAppend.cs new file mode 100644 index 00000000..fd6301c7 --- /dev/null +++ b/src/Core/src/Eventuous.Application/Persistence/ProposedAppend.cs @@ -0,0 +1,16 @@ +// Copyright (C) Eventuous HQ OÜ.All rights reserved +// Licensed under the Apache License, Version 2.0. + +using System.Runtime.InteropServices; + +namespace Eventuous.Persistence; + +[StructLayout(LayoutKind.Auto)] +public record struct ProposedEvent(object Data, Metadata Metadata); + +[StructLayout(LayoutKind.Auto)] +public record struct ProposedAppend(StreamName StreamName, ExpectedStreamVersion ExpectedVersion, ProposedEvent[] Events); + +public delegate ProposedAppend AmendAppend(ProposedAppend originalEvent, T context); + +delegate ProposedAppend AmendAppend(ProposedAppend originalEvent, object context); diff --git a/src/Core/src/Eventuous.Application/Persistence/WriterExtensions.cs b/src/Core/src/Eventuous.Application/Persistence/WriterExtensions.cs new file mode 100644 index 00000000..c34a9f88 --- /dev/null +++ b/src/Core/src/Eventuous.Application/Persistence/WriterExtensions.cs @@ -0,0 +1,32 @@ +// Copyright (C) Eventuous HQ OÜ.All rights reserved +// Licensed under the Apache License, Version 2.0. + +namespace Eventuous.Persistence; + +static class WriterExtensions { + public static async Task Store(this IEventWriter writer, ProposedAppend append, AmendEvent? amendEvent, CancellationToken cancellationToken) { + Ensure.NotNull(append.Events); + + if (append.Events.Length == 0) return AppendEventsResult.NoOp; + + try { + return await writer.AppendEvents( + append.StreamName, + append.ExpectedVersion, + append.Events.Select(ToStreamEvent).ToArray(), + cancellationToken + ) + .NoContext(); + } catch (Exception e) { + throw e.InnerException?.Message.Contains("WrongExpectedVersion") == true + ? new OptimisticConcurrencyException(append.StreamName, e) + : e; + } + + NewStreamEvent ToStreamEvent(ProposedEvent evt) { + var streamEvent = new NewStreamEvent(Guid.NewGuid(), evt.Data, evt.Metadata); + + return amendEvent?.Invoke(streamEvent) ?? streamEvent; + } + } +} diff --git a/src/Core/src/Eventuous.Application/Shared/IDefineAppendAmendment.cs b/src/Core/src/Eventuous.Application/Shared/IDefineAppendAmendment.cs new file mode 100644 index 00000000..3765aa5b --- /dev/null +++ b/src/Core/src/Eventuous.Application/Shared/IDefineAppendAmendment.cs @@ -0,0 +1,15 @@ +// Copyright (C) Eventuous HQ OÜ.All rights reserved +// Licensed under the Apache License, Version 2.0. + +using Eventuous.Persistence; + +namespace Eventuous.Shared; + +public interface IDefineAppendAmendment where TCommand : class { + /// + /// Amends the proposed append before it gets stored. + /// + /// A function to amend the proposed append + /// + void AmendAppend(AmendAppend amendAppend); +} diff --git a/src/Core/test/Eventuous.Tests.Application/BookingFuncService.cs b/src/Core/test/Eventuous.Tests.Application/BookingFuncService.cs index 5ffbb372..9cd7609d 100644 --- a/src/Core/test/Eventuous.Tests.Application/BookingFuncService.cs +++ b/src/Core/test/Eventuous.Tests.Application/BookingFuncService.cs @@ -26,6 +26,12 @@ public BookingFuncService(IEventStore store, ITypeMapper? typeMap = null, AmendE .GetStream(cmd => GetStream(cmd.BookingId)) .Act((_, _, _) => [new BookingCancelled()]); + On() + .InState(ExpectedState.New) + .GetStream(cmd => GetStream(cmd.BookingId)) + .Act((_, _, _) => [new Executed()]) + .AmendAppend((append, _) => append with { ExpectedVersion = ExpectedStreamVersion.Any }); + return; static IEnumerable RecordPayment(BookingState state, object[] originalEvents, RecordPayment cmd) { diff --git a/src/Core/test/Eventuous.Tests.Application/CommandServiceTests.cs b/src/Core/test/Eventuous.Tests.Application/CommandServiceTests.cs index a6c3dfe7..23a9de3f 100644 --- a/src/Core/test/Eventuous.Tests.Application/CommandServiceTests.cs +++ b/src/Core/test/Eventuous.Tests.Application/CommandServiceTests.cs @@ -6,10 +6,7 @@ namespace Eventuous.Tests.Application; // ReSharper disable once UnusedType.Global [InheritsTests] public class CommandServiceTests : ServiceTestBase { - protected override ICommandService CreateService( - AmendEvent? amendEvent = null, - AmendEvent? amendAll = null - ) + protected override ICommandService CreateService(AmendEvent? amendEvent = null, AmendEvent? amendAll = null) => new ExtendedService(Store, TypeMap, amendEvent, amendAll); class ExtendedService : BookingService { diff --git a/src/Core/test/Eventuous.Tests.Application/FunctionalServiceTests.cs b/src/Core/test/Eventuous.Tests.Application/FunctionalServiceTests.cs index df45d302..e681c78b 100644 --- a/src/Core/test/Eventuous.Tests.Application/FunctionalServiceTests.cs +++ b/src/Core/test/Eventuous.Tests.Application/FunctionalServiceTests.cs @@ -1,16 +1,11 @@ -#pragma warning disable CS0618 // Type or member is obsolete - namespace Eventuous.Tests.Application; using Sut.Domain; // ReSharper disable once UnusedType.Global [InheritsTests] -public class FunctionalServiceTests() : ServiceTestBase() { - protected override ICommandService CreateService( - AmendEvent? amendEvent = null, - AmendEvent? amendAll = null - ) +public class FunctionalServiceTests : ServiceTestBase { + protected override ICommandService CreateService(AmendEvent? amendEvent = null, AmendEvent? amendAll = null) => new ExtendedService(Store, TypeMap, amendEvent, amendAll); class ExtendedService : BookingFuncService { diff --git a/src/Core/test/Eventuous.Tests.Application/ServiceTestBase.OnNew.cs b/src/Core/test/Eventuous.Tests.Application/ServiceTestBase.OnNew.cs index 3731188a..f47efe15 100644 --- a/src/Core/test/Eventuous.Tests.Application/ServiceTestBase.OnNew.cs +++ b/src/Core/test/Eventuous.Tests.Application/ServiceTestBase.OnNew.cs @@ -1,3 +1,4 @@ +using Eventuous.Sut.App; using Eventuous.Sut.Domain; using Eventuous.Testing; @@ -27,4 +28,17 @@ await CommandServiceFixture .When(cmd) .Then(result => result.ResultIsError()); } + + [Test] + public async Task Should_execute_amended_append() { + var seedCmd = Helpers.GetBookRoom(); + var seed = new BookingEvents.RoomBooked(seedCmd.RoomId, seedCmd.CheckIn, seedCmd.CheckOut, seedCmd.Price); + var cmd = new Commands.ExecuteNoMatterWhat(seedCmd.BookingId); + + await CommandServiceFixture + .ForService(() => CreateService(), Store) + .Given(seedCmd.BookingId, seed) + .When(cmd) + .Then(result => result.ResultIsOk(x => x.Changes.Should().HaveCount(1)).FullStreamEventsAre(seed, new BookingEvents.Executed())); + } } diff --git a/src/Core/test/Eventuous.Tests.Application/ServiceTestBase.cs b/src/Core/test/Eventuous.Tests.Application/ServiceTestBase.cs index 53461379..1be69aea 100644 --- a/src/Core/test/Eventuous.Tests.Application/ServiceTestBase.cs +++ b/src/Core/test/Eventuous.Tests.Application/ServiceTestBase.cs @@ -16,7 +16,7 @@ public async Task Ensure_builder_is_thread_safe(CancellationToken cancellationTo var tasks = Enumerable .Range(1, threadCount) - .Select(bookingId => Task.Run(() => service.Handle(Helpers.GetBookRoom(bookingId.ToString()), cancellationToken))) + .Select(bookingId => Task.Run(() => service.Handle(Helpers.GetBookRoom(bookingId.ToString()), cancellationToken), cancellationToken)) .ToList(); await Task.WhenAll(tasks); @@ -52,10 +52,7 @@ protected ServiceTestBase() { readonly TestEventListener _listener; - protected abstract ICommandService CreateService( - AmendEvent? amendEvent = null, - AmendEvent? amendAll = null - ); + protected abstract ICommandService CreateService(AmendEvent? amendEvent = null, AmendEvent? amendAll = null); protected record ImportBooking( string BookingId, diff --git a/src/Diagnostics/test/Eventuous.Tests.OpenTelemetry/Fixtures/MetricsSubscriptionFixtureBase.cs b/src/Diagnostics/test/Eventuous.Tests.OpenTelemetry/Fixtures/MetricsSubscriptionFixtureBase.cs index eacb6d17..ab9308c1 100644 --- a/src/Diagnostics/test/Eventuous.Tests.OpenTelemetry/Fixtures/MetricsSubscriptionFixtureBase.cs +++ b/src/Diagnostics/test/Eventuous.Tests.OpenTelemetry/Fixtures/MetricsSubscriptionFixtureBase.cs @@ -45,13 +45,9 @@ static MetricsSubscriptionFixtureBase() { // ReSharper disable once ConvertToConstant.Global public string SubscriptionId => "test-sub"; - TestListener? _listener; - protected abstract void ConfigureSubscription(TSubscriptionOptions options); protected override void SetupServices(IServiceCollection services) { - _listener = new(); - services.AddProducer(); services.AddSingleton(); @@ -79,10 +75,5 @@ protected override void GetDependencies(IServiceProvider provider) { public override async ValueTask DisposeAsync() { await base.DisposeAsync(); Exporter.Dispose(); - _listener?.Dispose(); } -} - -class TestListener() : GenericListener(SubscriptionMetrics.ListenerName) { - protected override void OnEvent(KeyValuePair obj) => TestContext.Current?.OutputWriter.WriteLine($"{obj.Key} {obj.Value}"); -} +} \ No newline at end of file diff --git a/src/Diagnostics/test/Eventuous.Tests.OpenTelemetry/MetricsTests.cs b/src/Diagnostics/test/Eventuous.Tests.OpenTelemetry/MetricsTests.cs index 318ae4f5..820a8ea4 100644 --- a/src/Diagnostics/test/Eventuous.Tests.OpenTelemetry/MetricsTests.cs +++ b/src/Diagnostics/test/Eventuous.Tests.OpenTelemetry/MetricsTests.cs @@ -8,18 +8,16 @@ namespace Eventuous.Tests.OpenTelemetry; public abstract class MetricsTestsBase(IMetricsSubscriptionFixtureBase fixture) { - [Test] - [Retry(3)] - public async Task ShouldMeasureSubscriptionGapCountBase() { + protected async Task ShouldMeasureSubscriptionGapCountBase() { TestContext.Current?.OutputWriter.WriteLine($"Stream {fixture.Stream}"); await Assert.That(_values).IsNotNull(); var gapCount = GetValue(_values!, SubscriptionMetrics.GapCountMetricName)!; var expectedGap = fixture.Count - fixture.Counter.Count; - gapCount.Should().NotBeNull(); - gapCount.Value.Should().BeInRange(expectedGap - 20, expectedGap + 20); - gapCount.CheckTag(SubscriptionMetrics.SubscriptionIdTag, fixture.SubscriptionId); - gapCount.CheckTag(fixture.DefaultTagKey, fixture.DefaultTagValue); + await Assert.That(gapCount).IsNotNull(); + await Assert.That(gapCount.Value).IsBetween(expectedGap - 20, expectedGap + 20); + await gapCount.CheckTag(SubscriptionMetrics.SubscriptionIdTag, fixture.SubscriptionId); + await gapCount.CheckTag(fixture.DefaultTagKey, fixture.DefaultTagValue); } // [Fact] @@ -65,8 +63,8 @@ public void Teardown() { } static class TagExtensions { - public static void CheckTag(this MetricValue metric, string tag, string expectedValue) { - metric.GetTag(tag).Should().Be(expectedValue); + public static async Task CheckTag(this MetricValue metric, string tag, string expectedValue) { + await Assert.That(metric.GetTag(tag)).IsEqualTo(expectedValue); } static object GetTag(this MetricValue metric, string key) { diff --git a/src/EventStore/test/Eventuous.Tests.EventStore/Fixtures/EsdbContainer.cs b/src/EventStore/test/Eventuous.Tests.EventStore/Fixtures/EsdbContainer.cs index 7b02ed3e..bf4690c6 100644 --- a/src/EventStore/test/Eventuous.Tests.EventStore/Fixtures/EsdbContainer.cs +++ b/src/EventStore/test/Eventuous.Tests.EventStore/Fixtures/EsdbContainer.cs @@ -12,8 +12,6 @@ public static EventStoreDbContainer Create() { return new EventStoreDbBuilder() .WithImage(image) .WithEnvironment("EVENTSTORE_ENABLE_ATOM_PUB_OVER_HTTP", "true") - // .WithCleanUp(false) - // .WithAutoRemove(false) .Build(); } } diff --git a/src/EventStore/test/Eventuous.Tests.EventStore/Metrics/MetricsTests.cs b/src/EventStore/test/Eventuous.Tests.EventStore/Metrics/MetricsTests.cs index ac1415af..fd1539e8 100644 --- a/src/EventStore/test/Eventuous.Tests.EventStore/Metrics/MetricsTests.cs +++ b/src/EventStore/test/Eventuous.Tests.EventStore/Metrics/MetricsTests.cs @@ -3,5 +3,11 @@ namespace Eventuous.Tests.EventStore.Metrics; [ClassDataSource] -[InheritsTests] -public class MetricsTests(MetricsFixture fixture) : MetricsTestsBase(fixture); +[NotInParallel] +public class MetricsTests(MetricsFixture fixture) : MetricsTestsBase(fixture) { + [Test] + [Retry(3)] + public async Task ShouldMeasureSubscriptionGapCountBase_Esdb() { + await ShouldMeasureSubscriptionGapCountBase(); + } +} diff --git a/src/EventStore/test/Eventuous.Tests.EventStore/Store/TieredStoreTests.cs b/src/EventStore/test/Eventuous.Tests.EventStore/Store/TieredStoreTests.cs index 3da1f3dd..2253c8bf 100644 --- a/src/EventStore/test/Eventuous.Tests.EventStore/Store/TieredStoreTests.cs +++ b/src/EventStore/test/Eventuous.Tests.EventStore/Store/TieredStoreTests.cs @@ -9,4 +9,4 @@ public class TieredStoreTests(StoreFixture storeFixture) : TieredStoreTestsBase< public async Task Esdb_should_load_hot_and_archive() { await Should_load_hot_and_archive(); } -} +} \ No newline at end of file diff --git a/src/Kafka/test/Eventuous.Tests.Kafka/BasicProducerTests.cs b/src/Kafka/test/Eventuous.Tests.Kafka/BasicProducerTests.cs index 426809b9..c57bc731 100644 --- a/src/Kafka/test/Eventuous.Tests.Kafka/BasicProducerTests.cs +++ b/src/Kafka/test/Eventuous.Tests.Kafka/BasicProducerTests.cs @@ -12,6 +12,8 @@ namespace Eventuous.Tests.Kafka; [ClassDataSource] +[NotInParallel] +[Retry(3)] public class BasicProducerTests { readonly KafkaFixture _fixture; diff --git a/src/Postgres/test/Eventuous.Tests.Postgres/Metrics/MetricsTests.cs b/src/Postgres/test/Eventuous.Tests.Postgres/Metrics/MetricsTests.cs index b7553cf1..2dddc4ae 100644 --- a/src/Postgres/test/Eventuous.Tests.Postgres/Metrics/MetricsTests.cs +++ b/src/Postgres/test/Eventuous.Tests.Postgres/Metrics/MetricsTests.cs @@ -1,10 +1,14 @@ using Eventuous.Tests.OpenTelemetry; -// ReSharper disable UnusedType.Global - namespace Eventuous.Tests.Postgres.Metrics; [ClassDataSource] -[InheritsTests] -public class MetricsTests(MetricsFixture fixture) : MetricsTestsBase(fixture); +[NotInParallel] +public class MetricsTests(MetricsFixture fixture) : MetricsTestsBase(fixture) { + [Test] + [Retry(3)] + public async Task ShouldMeasureSubscriptionGapCountBase_Postgres() { + await ShouldMeasureSubscriptionGapCountBase(); + } +} diff --git a/src/Postgres/test/Eventuous.Tests.Postgres/Subscriptions/SubscribeTests.cs b/src/Postgres/test/Eventuous.Tests.Postgres/Subscriptions/SubscribeTests.cs index 0825c95e..70740f69 100644 --- a/src/Postgres/test/Eventuous.Tests.Postgres/Subscriptions/SubscribeTests.cs +++ b/src/Postgres/test/Eventuous.Tests.Postgres/Subscriptions/SubscribeTests.cs @@ -6,6 +6,7 @@ namespace Eventuous.Tests.Postgres.Subscriptions; +[NotInParallel] public class SubscribeToAll() : SubscribeToAllBase( new SubscriptionFixture(_ => { }, false) @@ -26,7 +27,8 @@ public async Task Postgres_ShouldUseExistingCheckpoint(CancellationToken cancell } } -[ClassDataSource] +[ClassDataSource(Shared = SharedType.None)] +[NotInParallel] public class SubscribeToStream(StreamNameFixture streamNameFixture) : SubscribeToStreamBase( streamNameFixture.StreamName, diff --git a/src/SqlServer/test/Eventuous.Tests.SqlServer/Metrics/MetricsTests.cs b/src/SqlServer/test/Eventuous.Tests.SqlServer/Metrics/MetricsTests.cs index 8eb32120..faaa22a5 100644 --- a/src/SqlServer/test/Eventuous.Tests.SqlServer/Metrics/MetricsTests.cs +++ b/src/SqlServer/test/Eventuous.Tests.SqlServer/Metrics/MetricsTests.cs @@ -1,9 +1,13 @@ using Eventuous.Tests.OpenTelemetry; -// ReSharper disable UnusedType.Global - namespace Eventuous.Tests.SqlServer.Metrics; [ClassDataSource] -[InheritsTests] -public class MetricsTests(MetricsFixture fixture) : MetricsTestsBase(fixture); +[NotInParallel] +public class MetricsTests(MetricsFixture fixture) : MetricsTestsBase(fixture) { + [Test] + [Retry(3)] + public async Task ShouldMeasureSubscriptionGapCountBase_SqlServer() { + await ShouldMeasureSubscriptionGapCountBase(); + } +} diff --git a/src/SqlServer/test/Eventuous.Tests.SqlServer/Subscriptions/SubscribeTests.cs b/src/SqlServer/test/Eventuous.Tests.SqlServer/Subscriptions/SubscribeTests.cs index 289f564a..f4d31e81 100644 --- a/src/SqlServer/test/Eventuous.Tests.SqlServer/Subscriptions/SubscribeTests.cs +++ b/src/SqlServer/test/Eventuous.Tests.SqlServer/Subscriptions/SubscribeTests.cs @@ -6,6 +6,7 @@ namespace Eventuous.Tests.SqlServer.Subscriptions; +[NotInParallel] public class SubscribeToAll() : SubscribeToAllBase( new SubscriptionFixture(_ => { }, false) @@ -27,6 +28,7 @@ public async Task SqlServer_ShouldUseExistingCheckpoint(CancellationToken cancel } [ClassDataSource(Shared = SharedType.None)] +[NotInParallel] public class SubscribeToStream(StreamNameFixture streamNameFixture) : SubscribeToStreamBase( streamNameFixture.StreamName, diff --git a/src/Testing/src/Eventuous.Testing/AggregateSpec.cs b/src/Testing/src/Eventuous.Testing/AggregateSpec.cs index 9bb7f33d..d052f1cf 100644 --- a/src/Testing/src/Eventuous.Testing/AggregateSpec.cs +++ b/src/Testing/src/Eventuous.Testing/AggregateSpec.cs @@ -1,6 +1,7 @@ // Copyright (C) Eventuous HQ OÜ.All rights reserved // Licensed under the Apache License, Version 2.0. +using System.Diagnostics; using Shouldly; namespace Eventuous.Testing; @@ -27,7 +28,7 @@ public abstract class AggregateSpec(AggregateFactoryRegistry /// /// protected abstract void When(TAggregate aggregate); - + /// /// Function to create aggregate instances. /// @@ -53,6 +54,7 @@ protected TAggregate Then() { /// Events to verify /// Aggregate instance for further inspection // ReSharper disable once UnusedMethodReturnValue.Global + [StackTraceHidden] protected TAggregate Emitted(params object[] events) { if (Instance == null) { Then(); diff --git a/src/Testing/src/Eventuous.Testing/CommandServiceFixture.cs b/src/Testing/src/Eventuous.Testing/CommandServiceFixture.cs index 566541f6..6b87c408 100644 --- a/src/Testing/src/Eventuous.Testing/CommandServiceFixture.cs +++ b/src/Testing/src/Eventuous.Testing/CommandServiceFixture.cs @@ -1,6 +1,7 @@ // Copyright (C) Eventuous HQ OÜ.All rights reserved // Licensed under the Apache License, Version 2.0. +using System.Diagnostics; using Shouldly; namespace Eventuous.Testing; @@ -124,9 +125,10 @@ async Task> Execute(TCommand command) where TCommand : } public class FixtureResult { - readonly StreamEvent[] _streamEvents; - readonly long _version; - public Result Result { get; } + readonly StreamEvent[] _streamEvents; + readonly long _version; + + public Result Result { get; } internal FixtureResult(Result result, StreamEvent[] streamEvents, long version) { _streamEvents = streamEvents; @@ -140,16 +142,28 @@ internal FixtureResult(Result result, StreamEvent[] streamEvents, long v /// Assertion function for successful result /// /// Thrown if the result is not ok + [StackTraceHidden] public FixtureResult ResultIsOk(Action.Ok>? assert = null) { - if (!Result.TryGet(out var ok)) { - throw new ShouldAssertException("Expected the result to be Ok, but it was not"); - } + if (Result.TryGet(out var ok)) { + assert?.Invoke(ok); - assert?.Invoke(ok); + return this; + } + + if (Result.TryGetError(out var error)) { + throw new ShouldAssertException($"Expected the result to be Ok, but it was Error \"{error.ErrorMessage}\"", error.Exception); + } - return this; + throw new ShouldAssertException("Expected the result to be Ok, but it was not"); } + /// + /// Asserts if the result is Error and executes the provided assertions + /// + /// Assertion function for error result + /// + /// Thrown if the result is not an error + [StackTraceHidden] public FixtureResult ResultIsError(Action.Error>? assert = null) { if (!Result.TryGetError(out var error)) { throw new ShouldAssertException("Expected the result to be Error, but it was Ok"); @@ -160,13 +174,21 @@ public FixtureResult ResultIsError(Action.Error>? assert = null) return this; } + /// + /// Asserts if the result is Error and the exception is of type T and executes the provided assertions + /// + /// Assertion function for error result + /// Type of exception that is expected + /// + /// Thrown if the result is not an error or if the exception is not of type T + [StackTraceHidden] public FixtureResult ResultIsError(Action? assert = null) where T : Exception { if (!Result.TryGetError(out var error)) { throw new ShouldAssertException("Expected the result to be Error, but it was Ok"); } - + error.Exception.ShouldBeOfType(); - + assert?.Invoke((T)error.Exception); return this; @@ -177,6 +199,7 @@ public FixtureResult ResultIsError(Action? assert = null) where T : Except /// /// Events to be found in the stream /// + [StackTraceHidden] public FixtureResult FullStreamEventsAre(params object[] events) { var stream = _streamEvents.Select(x => x.Payload); stream.ShouldBe(events); @@ -189,6 +212,7 @@ public FixtureResult FullStreamEventsAre(params object[] events) { /// /// Events that are expected to be new /// + [StackTraceHidden] public FixtureResult NewStreamEventsAre(params object[] events) { var stream = _streamEvents.Where(x => x.Position >= _version).Select(x => x.Payload); stream.ShouldBe(events); @@ -201,6 +225,7 @@ public FixtureResult NewStreamEventsAre(params object[] events) { /// /// Assertion function to check StreamEvent collection /// + [StackTraceHidden] public FixtureResult StreamIs(Action assert) { assert(_streamEvents); diff --git a/src/Testing/src/Eventuous.Testing/InMemoryEventStore.cs b/src/Testing/src/Eventuous.Testing/InMemoryEventStore.cs index 121cadb8..7fff376f 100644 --- a/src/Testing/src/Eventuous.Testing/InMemoryEventStore.cs +++ b/src/Testing/src/Eventuous.Testing/InMemoryEventStore.cs @@ -69,7 +69,7 @@ class InMemoryStream(StreamName name) { readonly List _events = []; public void CheckVersion(ExpectedStreamVersion expectedVersion) { - if (expectedVersion.Value != Version) throw new WrongVersion(expectedVersion, Version); + if (expectedVersion != ExpectedStreamVersion.Any && expectedVersion.Value != Version) throw new WrongVersion(expectedVersion, Version); } public void AppendEvents(ExpectedStreamVersion expectedVersion, IReadOnlyCollection events) { diff --git a/test/Eventuous.Sut.App/BookingService.cs b/test/Eventuous.Sut.App/BookingService.cs index 478cdbbb..88bad5cb 100644 --- a/test/Eventuous.Sut.App/BookingService.cs +++ b/test/Eventuous.Sut.App/BookingService.cs @@ -37,5 +37,11 @@ public BookingService( .InState(ExpectedState.Any) .GetId(cmd => cmd.BookingId) .Act((booking, _) => booking.Cancel()); + + On() + .InState(ExpectedState.New) + .GetId(cmd => new(cmd.BookingId)) + .Act((booking, _) => booking.Execute()) + .AmendAppend((append, _) => append with { ExpectedVersion = ExpectedStreamVersion.Any }); } } diff --git a/test/Eventuous.Sut.App/Commands.cs b/test/Eventuous.Sut.App/Commands.cs index 7f3a361f..c23b988b 100644 --- a/test/Eventuous.Sut.App/Commands.cs +++ b/test/Eventuous.Sut.App/Commands.cs @@ -21,4 +21,6 @@ public record RecordPayment(BookingId BookingId, string PaymentId, Money Amount, }; public record CancelBooking(BookingId BookingId); + + public record ExecuteNoMatterWhat(string BookingId); } diff --git a/test/Eventuous.Sut.Domain/Booking.cs b/test/Eventuous.Sut.Domain/Booking.cs index 892d17ac..6a107ef1 100644 --- a/test/Eventuous.Sut.Domain/Booking.cs +++ b/test/Eventuous.Sut.Domain/Booking.cs @@ -34,6 +34,8 @@ public void RecordPayment(string paymentId, Money amount, DateTimeOffset paidAt) } public bool HasPaymentRecord(string paymentId) => Current.OfType().Any(x => x.PaymentId == paymentId); + + public void Execute() => Apply(new Executed()); } public record BookingId(string Value) : Id(Value); diff --git a/test/Eventuous.Sut.Domain/BookingEvents.cs b/test/Eventuous.Sut.Domain/BookingEvents.cs index 2eb8713b..574a013f 100644 --- a/test/Eventuous.Sut.Domain/BookingEvents.cs +++ b/test/Eventuous.Sut.Domain/BookingEvents.cs @@ -26,6 +26,9 @@ public record BookingCancelled; [EventType(TypeNames.BookingImported)] public record BookingImported(string RoomId, float Price, LocalDate CheckIn, LocalDate CheckOut); + [EventType(TypeNames.Executed)] + public record Executed; + // These constants are for test purpose, use inline names in real apps public static class TypeNames { public const string BookingCancelled = "V1.BookingCancelled"; @@ -35,5 +38,6 @@ public static class TypeNames { public const string OutstandingAmountChanged = "V1.OutstandingAmountChanged"; public const string BookingFullyPaid = "V1.BookingFullyPaid"; public const string BookingOverpaid = "V1.BookingOverpaid"; + public const string Executed = "V1.Executed"; } }