From 23e89f5b183e2ab550a9a1156d4c5a1841d759f4 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Fri, 6 Dec 2024 07:43:17 -0600 Subject: [PATCH] More adjustments to the projections between JasperFx and Marten --- src/JasperFx/Events/Grouping/EventSlicer.cs | 93 +++++++++++++++++++ src/JasperFx/Events/Grouping/SliceGroup.cs | 2 + .../Events/Grouping/TenantRollupSlicer.cs | 11 +++ .../Events/NewStuff/NewProjectionCode.cs | 3 +- src/JasperFx/Events/Projections/EventRange.cs | 14 +++ 5 files changed, 122 insertions(+), 1 deletion(-) create mode 100644 src/JasperFx/Events/Grouping/EventSlicer.cs create mode 100644 src/JasperFx/Events/Grouping/TenantRollupSlicer.cs diff --git a/src/JasperFx/Events/Grouping/EventSlicer.cs b/src/JasperFx/Events/Grouping/EventSlicer.cs new file mode 100644 index 0000000..eae3c51 --- /dev/null +++ b/src/JasperFx/Events/Grouping/EventSlicer.cs @@ -0,0 +1,93 @@ +namespace JasperFx.Events.Grouping; + +public class EventSlicer: IEventSlicer +{ + private readonly List, IReadOnlyList>> _configurations = new(); + private readonly List _afterGroupingFanoutRules = new(); + private readonly List _beforeGroupingFanoutRules = new(); + + public ValueTask SliceAsync(IReadOnlyList events, SliceGroup grouping) + { + grouping.ApplyFanOutRules(_beforeGroupingFanoutRules); + + foreach (var configuration in _configurations) + { + configuration(grouping, events); + } + + grouping.ApplyFanOutRules(_afterGroupingFanoutRules); + + return new ValueTask(); + } + + public bool HasAnyRules() + { + return _configurations.Any(); + } + + public IEnumerable DetermineEventTypes() + { + foreach (var rule in _beforeGroupingFanoutRules) yield return rule.OriginatingType; + + foreach (var rule in _afterGroupingFanoutRules) yield return rule.OriginatingType; + } + + public EventSlicer Identity(Func identityFunc) + { + _configurations.Add((group, events) => group.AddEvents(identityFunc, events)); + + return this; + } + + public EventSlicer Identities(Func> identitiesFunc) + { + _configurations.Add((group, events) => group.AddEvents(identitiesFunc, events)); + + return this; + } + + /// + /// Apply "fan out" operations to the given TEvent type that inserts an enumerable of TChild events right behind the + /// parent + /// event in the event stream + /// + /// + /// Should the fan out operation happen after grouping, or before? Default is after + /// + /// + public EventSlicer FanOut(Func> fanOutFunc, + FanoutMode mode = FanoutMode.AfterGrouping) + { + return FanOut(new FanOutEventDataOperator(fanOutFunc) { Mode = mode }, mode); + } + + /// + /// Apply "fan out" operations to the given TEvent type that inserts an enumerable of TChild events right behind the + /// parent + /// event in the event stream + /// + /// + /// Should the fan out operation happen after grouping, or before? Default is after + /// + /// + public EventSlicer FanOut(Func, IEnumerable> fanOutFunc, FanoutMode mode = FanoutMode.AfterGrouping) + { + return FanOut(new FanOutEventOperator(fanOutFunc) { Mode = mode }, mode); + } + + private EventSlicer FanOut(IFanOutRule fanout, FanoutMode mode) + { + switch (mode) + { + case FanoutMode.AfterGrouping: + _afterGroupingFanoutRules.Add(fanout); + break; + + case FanoutMode.BeforeGrouping: + _beforeGroupingFanoutRules.Add(fanout); + break; + } + + return this; + } +} diff --git a/src/JasperFx/Events/Grouping/SliceGroup.cs b/src/JasperFx/Events/Grouping/SliceGroup.cs index 97df4dd..2717850 100644 --- a/src/JasperFx/Events/Grouping/SliceGroup.cs +++ b/src/JasperFx/Events/Grouping/SliceGroup.cs @@ -3,6 +3,8 @@ namespace JasperFx.Events.Grouping; +// TODO -- make sure the methods can handle IEvent too because duh + /// /// Structure to hold and help organize events in "slices" by identity to apply /// to the matching aggregate document TDoc. Note that TDoc might be a marker type. diff --git a/src/JasperFx/Events/Grouping/TenantRollupSlicer.cs b/src/JasperFx/Events/Grouping/TenantRollupSlicer.cs new file mode 100644 index 0000000..bd8cbc1 --- /dev/null +++ b/src/JasperFx/Events/Grouping/TenantRollupSlicer.cs @@ -0,0 +1,11 @@ +#nullable enable +namespace JasperFx.Events.Grouping; + +public class TenantRollupSlicer: IEventSlicer +{ + public ValueTask SliceAsync(IReadOnlyList events, SliceGroup grouping) + { + grouping.AddEvents(e => e.TenantId, events); + return new ValueTask(); + } +} diff --git a/src/JasperFx/Events/NewStuff/NewProjectionCode.cs b/src/JasperFx/Events/NewStuff/NewProjectionCode.cs index f34c098..19ddab6 100644 --- a/src/JasperFx/Events/NewStuff/NewProjectionCode.cs +++ b/src/JasperFx/Events/NewStuff/NewProjectionCode.cs @@ -64,6 +64,7 @@ public interface IProjectionSource: IReadOnlyPro bool TryBuildReplayExecutor(TStore store, TDatabase database, out IReplayExecutor executor); IInlineProjection BuildForInline(); + } public interface ISubscriptionSource @@ -77,7 +78,7 @@ public interface ISubscriptionSource } // Assuming that DocumentStore et al will be embedded into this -public interface IAsyncShard +public interface IAsyncShard // might have a subclass for projections { AsyncOptions Options { get; } ShardRole Role { get; } diff --git a/src/JasperFx/Events/Projections/EventRange.cs b/src/JasperFx/Events/Projections/EventRange.cs index 496ac25..2e358bb 100644 --- a/src/JasperFx/Events/Projections/EventRange.cs +++ b/src/JasperFx/Events/Projections/EventRange.cs @@ -9,6 +9,13 @@ namespace JasperFx.Events.Projections; /// public class EventRange { + public EventRange(ShardName name, long floor, long ceiling) + { + ShardName = name; + SequenceFloor = floor; + SequenceCeiling = ceiling; + } + public EventRange(ISubscriptionAgent agent, long floor, long ceiling) { ShardName = agent.Name; @@ -23,6 +30,12 @@ public EventRange(ISubscriptionAgent agent, long ceiling) Agent = agent; SequenceCeiling = ceiling; } + + public EventRange(ShardName shardName, long ceiling) + { + ShardName = shardName; + SequenceCeiling = ceiling; + } /// /// Identifies the projection shard consuming this event range @@ -65,6 +78,7 @@ public async ValueTask SliceAsync(IEventSlicer slicer) private readonly List _groups = new(); + public IReadOnlyList Groups => _groups; protected bool Equals(EventRange other)