Skip to content

Commit

Permalink
Allow to override events and expected version (#397)
Browse files Browse the repository at this point in the history
* Allow to override events and expected version
  • Loading branch information
alexeyzimarev authored Dec 27, 2024
1 parent 4007e08 commit 1d3ccf2
Show file tree
Hide file tree
Showing 32 changed files with 295 additions and 134 deletions.
17 changes: 11 additions & 6 deletions .github/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ jobs:
name: "Build and test"
# runs-on: ubuntu-latest
runs-on: self-hosted
strategy:
matrix:
dotnet-version: [ '8.0', '9.0' ]
env:
NUGET_PACKAGES: ${{ github.workspace }}/.nuget/packages
TC_CLOUD_TOKEN: ${{ secrets.TC_TOKEN }}
Expand All @@ -32,27 +35,29 @@ jobs:
name: Setup .NET
uses: actions/setup-dotnet@v4
with:
dotnet-version: |
8.0.x
9.0.x
dotnet-version: ${{ matrix.dotnet-version }}
-
name: Restore
run: |
dotnet restore -p:TargetFramework=net${{ matrix.dotnet-version }} -p:Configuration="Debug CI"
-
name: Build
run: |
dotnet build -c "Debug CI"
dotnet build -c "Debug CI" -f net${{ matrix.dotnet-version }} --no-restore
-
name: Prepare Testcontainers Cloud agent
if: env.TC_CLOUD_TOKEN != ''
uses: atomicjar/testcontainers-cloud-setup-action@main
-
name: Run tests
run: |
dotnet test -c "Debug CI" --no-build
dotnet test -c "Debug CI" --no-build -f net${{ matrix.dotnet-version }}
-
name: Upload Test Results
if: always()
uses: actions/upload-artifact@v4
with:
name: Test Results ${{ matrix.dotnet }}
name: Test Results ${{ matrix.dotnet-version }}
path: |
test-results/**/*.xml
test-results/**/*.trx
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright (C) Eventuous HQ OÜ.All rights reserved
// Licensed under the Apache License, Version 2.0.

using Eventuous.Persistence;
using Eventuous.Shared;
using static Eventuous.CommandServiceDelegates;

namespace Eventuous;
Expand Down Expand Up @@ -38,46 +40,43 @@ public interface IDefineIdentity<out TCommand, out TAggregate, out TState, TId>
ICommandHandlerBuilder<TCommand, TAggregate, TState, TId> GetIdAsync(Func<TCommand, CancellationToken, ValueTask<TId>> getId);
}

public interface IDefineStore<out TCommand, out TAggregate, out TState, TId>
public interface IDefineStore<out TCommand, out TAggregate, out TState>
where TAggregate : Aggregate<TState>
where TState : State<TState>, new()
where TId : Id
where TCommand : class {
/// <summary>
/// Defines how to resolve the event store from the command. It assigns both reader and writer.
/// If not defined, the reader and writer provided by the functional service will be used.
/// </summary>
/// <param name="resolveStore">Function to resolve the event writer</param>
/// <returns></returns>
IDefineExecution<TCommand, TAggregate, TState, TId> ResolveStore(Func<TCommand, IEventStore> resolveStore);
IDefineExecution<TCommand, TAggregate, TState> ResolveStore(Func<TCommand, IEventStore> resolveStore);
}

public interface IDefineReader<out TCommand, out TAggregate, out TState, TId>
public interface IDefineReader<out TCommand, out TAggregate, out TState>
where TAggregate : Aggregate<TState>
where TState : State<TState>, new()
where TId : Id
where TCommand : class {
/// <summary>
/// Defines how to resolve the event reader from the command.
/// If not defined, the reader provided by the functional service will be used.
/// </summary>
/// <param name="resolveReader">Function to resolve the event reader</param>
/// <returns></returns>
IDefineWriter<TCommand, TAggregate, TState, TId> ResolveReader(Func<TCommand, IEventReader> resolveReader);
IDefineWriter<TCommand, TAggregate, TState> ResolveReader(Func<TCommand, IEventReader> resolveReader);
}

public interface IDefineWriter<out TCommand, out TAggregate, out TState, TId>
public interface IDefineWriter<out TCommand, out TAggregate, out TState>
where TAggregate : Aggregate<TState>
where TState : State<TState>, new()
where TId : Id
where TCommand : class {
/// <summary>
/// Defines how to resolve the event writer from the command.
/// If not defined, the writer provided by the functional service will be used.
/// </summary>
/// <param name="resolveWriter">Function to resolve the event writer</param>
/// <returns></returns>
IDefineExecution<TCommand, TAggregate, TState, TId> ResolveWriter(Func<TCommand, IEventWriter> resolveWriter);
IDefineExecution<TCommand, TAggregate, TState> ResolveWriter(Func<TCommand, IEventWriter> resolveWriter);
}

public interface IDefineEventAmendment<out TCommand, out TAggregate, out TState, TId>
Expand All @@ -90,44 +89,42 @@ public interface IDefineEventAmendment<out TCommand, out TAggregate, out TState,
/// </summary>
/// <param name="amendEvent">A function to amend the event</param>
/// <returns></returns>
IDefineStoreOrExecution<TCommand, TAggregate, TState, TId> AmendEvent(AmendEvent<TCommand> amendEvent);
IDefineStoreOrExecution<TCommand, TAggregate, TState> AmendEvent(AmendEvent<TCommand> amendEvent);
}

public interface IDefineExecution<out TCommand, out TAggregate, out TState, TId>
public interface IDefineExecution<out TCommand, out TAggregate, out TState>
where TAggregate : Aggregate<TState>
where TState : State<TState>, new()
where TId : Id
where TCommand : class {
/// <summary>
/// Defines how the command that acts on the aggregate.
/// </summary>
/// <param name="action">A function that executes an operation on an aggregate</param>
/// <returns></returns>
void Act(Action<TAggregate, TCommand> action);
IDefineAppendAmendment<TCommand> Act(Action<TAggregate, TCommand> action);

/// <summary>
/// Defines how the command that acts on the aggregate.
/// </summary>
/// <param name="action">A function that executes an asynchronous operation on an aggregate</param>
/// <returns></returns>
void ActAsync(Func<TAggregate, TCommand, CancellationToken, Task> action);
IDefineAppendAmendment<TCommand> ActAsync(Func<TAggregate, TCommand, CancellationToken, Task> action);
}

public interface IDefineStoreOrExecution<out TCommand, out TAggregate, out TState, TId>
: IDefineStore<TCommand, TAggregate, TState, TId>,
IDefineReader<TCommand, TAggregate, TState, TId>,
IDefineWriter<TCommand, TAggregate, TState, TId>,
IDefineExecution<TCommand, TAggregate, TState, TId>
public interface IDefineStoreOrExecution<out TCommand, out TAggregate, out TState>
: IDefineStore<TCommand, TAggregate, TState>,
IDefineReader<TCommand, TAggregate, TState>,
IDefineWriter<TCommand, TAggregate, TState>,
IDefineExecution<TCommand, TAggregate, TState>
where TAggregate : Aggregate<TState>
where TState : State<TState>, new()
where TId : Id
where TCommand : class;

public interface ICommandHandlerBuilder<out TCommand, out TAggregate, out TState, TId>
: IDefineStore<TCommand, TAggregate, TState, TId>,
IDefineReader<TCommand, TAggregate, TState, TId>,
IDefineWriter<TCommand, TAggregate, TState, TId>,
IDefineExecution<TCommand, TAggregate, TState, TId>,
: IDefineStore<TCommand, TAggregate, TState>,
IDefineReader<TCommand, TAggregate, TState>,
IDefineWriter<TCommand, TAggregate, TState>,
IDefineExecution<TCommand, TAggregate, TState>,
IDefineEventAmendment<TCommand, TAggregate, TState, TId>
where TAggregate : Aggregate<TState>
where TState : State<TState>, new()
Expand All @@ -152,18 +149,20 @@ public class CommandHandlerBuilder<TCommand, TAggregate, TState, TId>(
)
: IDefineExpectedState<TCommand, TAggregate, TState, TId>,
IDefineIdentity<TCommand, TAggregate, TState, TId>,
IDefineStoreOrExecution<TCommand, TAggregate, TState, TId>,
IDefineStoreOrExecution<TCommand, TAggregate, TState>,
IDefineAppendAmendment<TCommand>,
ICommandHandlerBuilder<TCommand, TAggregate, TState, TId>
where TCommand : class
where TAggregate : Aggregate<TState>
where TState : State<TState>, new()
where TId : Id {
GetIdFromUntypedCommand<TId>? _getId;
HandleUntypedCommand<TAggregate, TState>? _action;
Func<TCommand, IEventReader>? _reader;
Func<TCommand, IEventWriter>? _writer;
AmendEvent<TCommand>? _amendEvent;
ExpectedState _expectedState = ExpectedState.Any;
GetIdFromUntypedCommand<TId>? _getId;
HandleUntypedCommand<TAggregate, TState>? _action;
Func<TCommand, IEventReader>? _reader;
Func<TCommand, IEventWriter>? _writer;
AmendEvent<TCommand>? _amendEvent;
ExpectedState _expectedState = ExpectedState.Any;
RegisteredHandler<TAggregate, TState, TId>? _handler;

IDefineIdentity<TCommand, TAggregate, TState, TId> IDefineExpectedState<TCommand, TAggregate, TState, TId>.InState(ExpectedState expectedState) {
_expectedState = expectedState;
Expand All @@ -183,50 +182,60 @@ ICommandHandlerBuilder<TCommand, TAggregate, TState, TId> IDefineIdentity<TComma
return this;
}

void IDefineExecution<TCommand, TAggregate, TState, TId>.Act(Action<TAggregate, TCommand> action) {
IDefineAppendAmendment<TCommand> IDefineExecution<TCommand, TAggregate, TState>.Act(Action<TAggregate, TCommand> action) {
_action = (aggregate, cmd, _) => {
action(aggregate, (TCommand)cmd);

return ValueTask.FromResult(aggregate);
};
service.AddHandler<TCommand>(Build());
_handler = Build();
service.AddHandler<TCommand>(_handler);

return this;
}

void IDefineExecution<TCommand, TAggregate, TState, TId>.ActAsync(Func<TAggregate, TCommand, CancellationToken, Task> action) {
IDefineAppendAmendment<TCommand> IDefineExecution<TCommand, TAggregate, TState>.ActAsync(Func<TAggregate, TCommand, CancellationToken, Task> action) {
_action = async (aggregate, cmd, token) => {
await action(aggregate, (TCommand)cmd, token).NoContext();

return aggregate;
};
service.AddHandler<TCommand>(Build());
_handler = Build();
service.AddHandler<TCommand>(_handler);

return this;
}

IDefineExecution<TCommand, TAggregate, TState, TId> IDefineStore<TCommand, TAggregate, TState, TId>.ResolveStore(Func<TCommand, IEventStore> resolveStore) {
IDefineExecution<TCommand, TAggregate, TState> IDefineStore<TCommand, TAggregate, TState>.ResolveStore(Func<TCommand, IEventStore> resolveStore) {
Ensure.NotNull(resolveStore, nameof(resolveStore));
_reader = resolveStore;
_writer = resolveStore;

return this;
}

IDefineWriter<TCommand, TAggregate, TState, TId> IDefineReader<TCommand, TAggregate, TState, TId>.ResolveReader(Func<TCommand, IEventReader> resolveReader) {
IDefineWriter<TCommand, TAggregate, TState> IDefineReader<TCommand, TAggregate, TState>.ResolveReader(Func<TCommand, IEventReader> resolveReader) {
_reader = resolveReader;

return this;
}

IDefineExecution<TCommand, TAggregate, TState, TId> IDefineWriter<TCommand, TAggregate, TState, TId>.ResolveWriter(Func<TCommand, IEventWriter> resolveWriter) {
IDefineExecution<TCommand, TAggregate, TState> IDefineWriter<TCommand, TAggregate, TState>.ResolveWriter(Func<TCommand, IEventWriter> resolveWriter) {
_writer = resolveWriter;

return this;
}

IDefineStoreOrExecution<TCommand, TAggregate, TState, TId> IDefineEventAmendment<TCommand, TAggregate, TState, TId>.AmendEvent(AmendEvent<TCommand> amendEvent) {
IDefineStoreOrExecution<TCommand, TAggregate, TState> IDefineEventAmendment<TCommand, TAggregate, TState, TId>.AmendEvent(AmendEvent<TCommand> amendEvent) {
_amendEvent = amendEvent;

return this;
}

void IDefineAppendAmendment<TCommand>.AmendAppend(AmendAppend<TCommand> amendAppend) {
Ensure.NotNull(_handler, "Handler hasn't been built yet").AmendAppend = (append, cmd) => amendAppend(append, (TCommand)cmd);
}

RegisteredHandler<TAggregate, TState, TId> Build() {
return new(
_expectedState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0.

using System.Reflection;
using Eventuous.Persistence;
using static Eventuous.CommandServiceDelegates;
using static Eventuous.FuncServiceDelegates;

Expand All @@ -16,7 +17,9 @@ record RegisteredHandler<TAggregate, TState, TId>(
ResolveReaderFromCommand ResolveReader,
ResolveWriterFromCommand ResolveWriter,
AmendEventFromCommand? AmendEvent
) where TAggregate : Aggregate<TState> where TId : Id where TState : State<TState>, new();
) where TAggregate : Aggregate<TState> where TId : Id where TState : State<TState>, new() {
public AmendAppend? AmendAppend { get; set; }
}

class HandlersMap<TAggregate, TState, TId> where TAggregate : Aggregate<TState> where TId : Id where TState : State<TState>, new() {
readonly TypeMap<RegisteredHandler<TAggregate, TState, TId>> _typeMap = new();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright (C) Eventuous HQ OÜ. All rights reserved
// Licensed under the Apache License, Version 2.0.

using Eventuous.Persistence;

namespace Eventuous;

using static Diagnostics.ApplicationEventSource;
Expand Down Expand Up @@ -87,8 +89,10 @@ public async Task<Result<TState>> Handle<TCommand>(TCommand command, Cancellatio
// Zero in the global position would mean nothing, so the receiver needs to check the Changes.Length
if (result.Changes.Count == 0) return Result<TState>.FromSuccess(result.State, Array.Empty<Change>(), 0);

var proposed = new ProposedAppend(stream, new(result.OriginalVersion), result.Changes.Select(x => new ProposedEvent(x, new())).ToArray());
var final = registeredHandler.AmendAppend?.Invoke(proposed, command) ?? proposed;
var writer = registeredHandler.ResolveWriter(command);
var storeResult = await writer.StoreAggregate<TAggregate, TState>(stream, result, Amend, cancellationToken).NoContext();
var storeResult = await writer.Store(final, Amend, cancellationToken).NoContext();
var changes = result.Changes.Select(x => Change.FromEvent(x, _typeMap));
Log.CommandHandled<TCommand>();

Expand Down
Loading

0 comments on commit 1d3ccf2

Please sign in to comment.