From e8afac814a7d2e5fc28c7050acb4043769c2fd40 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Einar=20Nor=C3=B0fj=C3=B6r=C3=B0?= Date: Sun, 11 Jun 2023 21:47:10 -0400 Subject: [PATCH 1/6] feat: add telemetry to the ingestion pipeline --- src/Propulsion/Streams.fs | 20 +- .../Propulsion.MessageDb.Integration.fsproj | 4 + .../Propulsion.MessageDb.Integration/Tests.fs | 193 +++++++++--------- 3 files changed, 121 insertions(+), 96 deletions(-) diff --git a/src/Propulsion/Streams.fs b/src/Propulsion/Streams.fs index b6000c01..73595fbf 100755 --- a/src/Propulsion/Streams.fs +++ b/src/Propulsion/Streams.fs @@ -1,5 +1,6 @@ namespace Propulsion.Streams +open System.Diagnostics open Propulsion open Propulsion.Internal open Serilog @@ -848,6 +849,8 @@ module Dispatcher = member _.AwaitCapacity(ct) = inner.AwaitButRelease(ct) member _.TryReplenish(pending, markStarted, project) = tryFillDispatcher pending markStarted project + let private source = new ActivitySource("Propulsion") + /// Implementation of IDispatcher that feeds items to an item dispatcher that maximizes concurrent requests (within a limit) type Concurrent<'P, 'R, 'E, 'F> internal ( inner : ItemDispatcher, 'F>, @@ -857,9 +860,22 @@ module Dispatcher = ( maxDop, project : FsCodec.StreamName -> FsCodec.ITimelineEvent<'F>[] -> CancellationToken -> Task)>, interpretProgress) = - let project struct (startTs, item : Scheduling.Item<'F>) (ct : CancellationToken) = task { + let project struct (startTs: int64, item : Scheduling.Item<'F>) (ct : CancellationToken) = task { + use act = source.StartActivity("process", ActivityKind.Consumer) + if act <> null then + let struct(category, streamId) = FsCodec.StreamName.splitCategoryAndStreamId item.stream + act.DisplayName <- $"{category} process" + act.SetTag("eqx.stream_name", item.stream) + .SetTag("eqx.stream_id", streamId) + .SetTag("eqx.category", category) + .SetTag("eqx.timestamp", item.span[0].Timestamp) + |> ignore let! struct (progressed, res) = project item.stream item.span ct - return struct (Stopwatch.elapsed startTs, item.stream, progressed, res) } + let elapsed = Stopwatch.elapsed startTs + if act <> null then + let oldestItemTs = item.span[0].Timestamp + act.SetTag("eqx.lead_time_ms", (DateTimeOffset.UtcNow - oldestItemTs).TotalMilliseconds) |> ignore + return struct (elapsed, item.stream, progressed, res) } Concurrent<_, _, _, _>(ItemDispatcher(maxDop), project, interpretProgress) static member Create(maxDop, prepare : Func<_, _, _>, handle : Func<_, _, CancellationToken, Task<_>>, toIndex : Func<_, 'R, int64>) = let project sn span ct = task { diff --git a/tests/Propulsion.MessageDb.Integration/Propulsion.MessageDb.Integration.fsproj b/tests/Propulsion.MessageDb.Integration/Propulsion.MessageDb.Integration.fsproj index 0e8f6fe1..ec43d995 100644 --- a/tests/Propulsion.MessageDb.Integration/Propulsion.MessageDb.Integration.fsproj +++ b/tests/Propulsion.MessageDb.Integration/Propulsion.MessageDb.Integration.fsproj @@ -9,7 +9,11 @@ + + + + diff --git a/tests/Propulsion.MessageDb.Integration/Tests.fs b/tests/Propulsion.MessageDb.Integration/Tests.fs index 5de46a98..16dd60e3 100644 --- a/tests/Propulsion.MessageDb.Integration/Tests.fs +++ b/tests/Propulsion.MessageDb.Integration/Tests.fs @@ -1,7 +1,5 @@ -module Propulsion.MessageDb.Integration.Tests +module Propulsion.MessageDb.Integration -open Npgsql -open NpgsqlTypes open Propulsion.Internal open Propulsion.MessageDb open Swensen.Unquote @@ -10,6 +8,11 @@ open System.Collections.Generic open System.Diagnostics open System.Threading.Tasks open Xunit +open OpenTelemetry +open OpenTelemetry.Trace +open OpenTelemetry.Resources + +let source = new ActivitySource("Propulsion.MessageDb.Integration") module Simple = type Hello = { name : string} @@ -17,15 +20,9 @@ module Simple = | Hello of Hello interface TypeShape.UnionContract.IUnionContract let codec = FsCodec.SystemTextJson.Codec.Create() - -let createStreamMessage streamName = - let cmd = NpgsqlBatchCommand() - cmd.CommandText <- "select 1 from write_message(@Id::text, @StreamName, @EventType, @Data, null, null)" - cmd.Parameters.AddWithValue("Id", NpgsqlDbType.Uuid, Guid.NewGuid()) |> ignore - cmd.Parameters.AddWithValue("StreamName", NpgsqlDbType.Text, streamName) |> ignore - cmd.Parameters.AddWithValue("EventType", NpgsqlDbType.Text, "Hello") |> ignore - cmd.Parameters.AddWithValue("Data", NpgsqlDbType.Jsonb, """{"name": "world"}""") |> ignore - cmd + type State = unit + let initial = () + let fold state events = state let ConnectionString = match Environment.GetEnvironmentVariable "MSG_DB_CONNECTION_STRING" with @@ -36,25 +33,18 @@ let CheckpointConnectionString = | null -> "Host=localhost; Database=message_store; Port=5432; Username=postgres; Password=postgres" | s -> s +let decider categoryName id = + let client = Equinox.MessageDb.MessageDbClient(ConnectionString) + let ctx = Equinox.MessageDb.MessageDbContext(client) + let category = Equinox.MessageDb.MessageDbCategory(ctx, Simple.codec, Simple.fold, Simple.initial) + Equinox.Decider.resolve Serilog.Log.Logger category categoryName (Equinox.StreamId.gen string id) -let connect () = task { - let conn = new NpgsqlConnection(ConnectionString) - do! conn.OpenAsync() - return conn -} - -let writeMessagesToStream (conn: NpgsqlConnection) streamName = task { - let batch = conn.CreateBatch() - for _ in 1..20 do - let cmd = createStreamMessage streamName - batch.BatchCommands.Add(cmd) - do! batch.ExecuteNonQueryAsync() :> Task } - -let writeMessagesToCategory conn category = task { +let writeMessagesToCategory category = task { for _ in 1..50 do - let streamName = $"{category}-{Guid.NewGuid():N}" - do! writeMessagesToStream conn streamName -} + let streamId = Guid.NewGuid().ToString("N") + let decider = decider category streamId + let decide _ = List.replicate 20 (Simple.Event.Hello { name = "world" }) + do! decider.Transact(decide, load = Equinox.LoadOption.AssumeEmpty) } let stats log = { new Propulsion.Streams.Stats<_>(log, TimeSpan.FromMinutes 1, TimeSpan.FromMinutes 1) with member _.HandleOk x = () @@ -65,47 +55,6 @@ let makeCheckpoints consumerGroup = task { do! checkpoints.CreateSchemaIfNotExists() return checkpoints } -[] -let ``It processes events for a category`` () = task { - use! conn = connect () - let log = Serilog.Log.Logger - let consumerGroup = $"{Guid.NewGuid():N}" - let category1 = $"{Guid.NewGuid():N}" - let category2 = $"{Guid.NewGuid():N}" - do! writeMessagesToCategory conn category1 - do! writeMessagesToCategory conn category2 - let! checkpoints = makeCheckpoints consumerGroup - let stats = stats log - let mutable stop = ignore - let handled = HashSet<_>() - let handle stream (events: Propulsion.Sinks.Event[]) _ct = task { - lock handled (fun _ -> - for evt in events do - handled.Add((stream, evt.Index)) |> ignore) - test <@ Array.chooseV Simple.codec.TryDecode events |> Array.forall ((=) (Simple.Hello { name = "world" })) @> - if handled.Count >= 2000 then - stop () - return struct (Propulsion.Sinks.StreamResult.AllProcessed, ()) } - use sink = Propulsion.Sinks.Factory.StartConcurrentAsync(log, 2, 2, handle, stats) - let source = MessageDbSource( - log, TimeSpan.FromMinutes 1, - ConnectionString, 1000, TimeSpan.FromMilliseconds 100, - checkpoints, sink, [| category1; category2 |]) - use src = source.Start() - stop <- src.Stop - - Task.Delay(TimeSpan.FromSeconds 30).ContinueWith(fun _ -> src.Stop()) |> ignore - - do! src.Await() - - // 2000 total events - test <@ handled.Count = 2000 @> - // 20 in each stream - test <@ handled |> Array.ofSeq |> Array.groupBy fst |> Array.map (snd >> Array.length) |> Array.forall ((=) 20) @> - // they were handled in order within streams - let ordering = handled |> Seq.groupBy fst |> Seq.map (snd >> Seq.map snd >> Seq.toArray) |> Seq.toArray - test <@ ordering |> Array.forall ((=) [| 0L..19L |]) @> } - type ActivityCapture() = let operations = ResizeArray() let listener = @@ -121,29 +70,85 @@ type ActivityCapture() = interface IDisposable with member _.Dispose() = listener.Dispose() -[] -let ``It doesn't read the tail event again`` () = task { - let log = Serilog.LoggerConfiguration().CreateLogger() - let consumerGroup = $"{Guid.NewGuid():N}" - let category = $"{Guid.NewGuid():N}" - use! conn = connect () - do! writeMessagesToStream conn $"{category}-1" - let! checkpoints = makeCheckpoints consumerGroup - let stats = stats log +type Tests() = + let sdk = + Sdk.CreateTracerProviderBuilder() + .SetResourceBuilder(ResourceBuilder.CreateDefault().AddService(serviceName = "Propulsion.MessageDb.Integration")) + .AddSource("Equinox") + .AddSource("Equinox.MessageDb") + .AddSource("Propulsion") + .AddSource("Propulsion.MessageDb.Integration") + .AddSource("Npqsl") + .AddOtlpExporter(fun opts -> opts.Endpoint <- Uri("http://localhost:4317")) + .AddConsoleExporter() + .Build() + + [] + let ``It processes events for a category`` () = task { + use _ = source.StartActivity("It processes events for a category", ActivityKind.Server) + let log = Serilog.Log.Logger + let consumerGroup = $"{Guid.NewGuid():N}" + let category1 = $"{Guid.NewGuid():N}" + let category2 = $"{Guid.NewGuid():N}" + do! writeMessagesToCategory category1 + do! writeMessagesToCategory category2 + let! checkpoints = makeCheckpoints consumerGroup + let stats = stats log + let mutable stop = ignore + let handled = HashSet<_>() + let handle stream (events: Propulsion.Sinks.Event[]) _ct = task { + lock handled (fun _ -> for evt in events do handled.Add((stream, evt.Index)) |> ignore) + test <@ Array.chooseV Simple.codec.TryDecode events |> Array.forall ((=) (Simple.Hello { name = "world" })) @> + if handled.Count >= 2000 then stop () + return struct (Propulsion.Sinks.StreamResult.AllProcessed, ()) } + use sink = Propulsion.Sinks.Factory.StartConcurrentAsync(log, 2, 2, handle, stats) + let source = MessageDbSource( + log, TimeSpan.FromMinutes 1, + ConnectionString, 1000, TimeSpan.FromMilliseconds 100, + checkpoints, sink, [| category1; category2 |]) + use src = source.Start() + stop <- src.Stop + + Task.Delay(TimeSpan.FromSeconds 30).ContinueWith(fun _ -> src.Stop()) |> ignore + + do! src.Await() + + // 2000 total events + test <@ handled.Count = 2000 @> + // 20 in each stream + test <@ handled |> Array.ofSeq |> Array.groupBy fst |> Array.map (snd >> Array.length) |> Array.forall ((=) 20) @> + // they were handled in order within streams + let ordering = handled |> Seq.groupBy fst |> Seq.map (snd >> Seq.map snd >> Seq.toArray) |> Seq.toArray + test <@ ordering |> Array.forall ((=) [| 0L..19L |]) @> } + + [] + let ``It doesn't read the tail event again`` () = task { + use _ = source.StartActivity("It doesn't read the tail event again", ActivityKind.Server) + let log = Serilog.LoggerConfiguration().CreateLogger() + let consumerGroup = $"{Guid.NewGuid():N}" + let category = $"{Guid.NewGuid():N}" + let decider = decider category "1" + do! decider.Transact((fun _ -> List.replicate 20 (Simple.Hello { name = "world" })), load = Equinox.LoadOption.AssumeEmpty) + let! checkpoints = makeCheckpoints consumerGroup + + let stats = stats log + + let handle _ _ _ = task { + return struct (Propulsion.Sinks.StreamResult.AllProcessed, ()) } + use sink = Propulsion.Sinks.Factory.StartConcurrentAsync(log, 1, 1, handle, stats) + let batchSize = 10 + let source = MessageDbSource( + log, TimeSpan.FromMilliseconds 1000, + ConnectionString, batchSize, TimeSpan.FromMilliseconds 10000, + checkpoints, sink, [| category |]) + + use capture = new ActivityCapture() + + do! source.RunUntilCaughtUp(TimeSpan.FromSeconds(10), stats.StatsInterval) :> Task + + // 3 batches fetched, 1 checkpoint read, and 1 checkpoint write + test <@ capture.Operations.Count = 5 @> } - let handle _ _ _ = task { - return struct (Propulsion.Sinks.StreamResult.AllProcessed, ()) } - use sink = Propulsion.Sinks.Factory.StartConcurrentAsync(log, 1, 1, handle, stats) - let batchSize = 10 - let source = MessageDbSource( - log, TimeSpan.FromMilliseconds 1000, - ConnectionString, batchSize, TimeSpan.FromMilliseconds 1000, - checkpoints, sink, [| category |]) - - use capture = new ActivityCapture() - - do! source.RunUntilCaughtUp(TimeSpan.FromSeconds(10), stats.StatsInterval) :> Task - - // 3 batches fetched, 1 checkpoint read, and 1 checkpoint write - test <@ capture.Operations.Count = 5 @> } + interface IDisposable with + member _.Dispose() = sdk.Shutdown() |> ignore From 9f6429d6218e860bc0eadd4315dfa597d9120ae9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Einar=20Nor=C3=B0fj=C3=B6r=C3=B0?= Date: Sun, 11 Jun 2023 22:01:44 -0400 Subject: [PATCH 2/6] fixes --- src/Propulsion/Streams.fs | 11 ++++++----- tests/Propulsion.MessageDb.Integration/Tests.fs | 4 ++-- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/src/Propulsion/Streams.fs b/src/Propulsion/Streams.fs index 73595fbf..ebb8469c 100755 --- a/src/Propulsion/Streams.fs +++ b/src/Propulsion/Streams.fs @@ -865,16 +865,17 @@ module Dispatcher = if act <> null then let struct(category, streamId) = FsCodec.StreamName.splitCategoryAndStreamId item.stream act.DisplayName <- $"{category} process" - act.SetTag("eqx.stream_name", item.stream) - .SetTag("eqx.stream_id", streamId) - .SetTag("eqx.category", category) - .SetTag("eqx.timestamp", item.span[0].Timestamp) + act.SetTag("propulsion.stream_name", item.stream) + .SetTag("propulsion.stream_id", streamId) + .SetTag("propulsion.category", category) + .SetTag("propulsion.batch_size", item.span.Length) + .SetTag("propulsion.first_timestamp", item.span[0].Timestamp) |> ignore let! struct (progressed, res) = project item.stream item.span ct let elapsed = Stopwatch.elapsed startTs if act <> null then let oldestItemTs = item.span[0].Timestamp - act.SetTag("eqx.lead_time_ms", (DateTimeOffset.UtcNow - oldestItemTs).TotalMilliseconds) |> ignore + act.SetTag("propulsion.lead_time_ms", (DateTimeOffset.UtcNow - oldestItemTs).TotalMilliseconds) |> ignore return struct (elapsed, item.stream, progressed, res) } Concurrent<_, _, _, _>(ItemDispatcher(maxDop), project, interpretProgress) static member Create(maxDop, prepare : Func<_, _, _>, handle : Func<_, _, CancellationToken, Task<_>>, toIndex : Func<_, 'R, int64>) = diff --git a/tests/Propulsion.MessageDb.Integration/Tests.fs b/tests/Propulsion.MessageDb.Integration/Tests.fs index 16dd60e3..35a6d439 100644 --- a/tests/Propulsion.MessageDb.Integration/Tests.fs +++ b/tests/Propulsion.MessageDb.Integration/Tests.fs @@ -74,12 +74,12 @@ type ActivityCapture() = type Tests() = let sdk = Sdk.CreateTracerProviderBuilder() - .SetResourceBuilder(ResourceBuilder.CreateDefault().AddService(serviceName = "Propulsion.MessageDb.Integration")) + .SetResourceBuilder(ResourceBuilder.CreateDefault().AddService(serviceName = "Tests")) .AddSource("Equinox") .AddSource("Equinox.MessageDb") .AddSource("Propulsion") .AddSource("Propulsion.MessageDb.Integration") - .AddSource("Npqsl") + .AddSource("Npgsql") .AddOtlpExporter(fun opts -> opts.Endpoint <- Uri("http://localhost:4317")) .AddConsoleExporter() .Build() From df3c9972708ff49fee2f8d48cd87c918868da848 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Einar=20Nor=C3=B0fj=C3=B6r=C3=B0?= Date: Sun, 11 Jun 2023 22:08:08 -0400 Subject: [PATCH 3/6] docs --- DOCUMENTATION.md | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/DOCUMENTATION.md b/DOCUMENTATION.md index 43696094..825deb9e 100755 --- a/DOCUMENTATION.md +++ b/DOCUMENTATION.md @@ -482,3 +482,20 @@ Resources: - [low level documentation of the client settings](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) - [thorough free book](https://www.confluent.io/wp-content/uploads/confluent-kafka-definitive-guide-complete.pdf) - [medium post covering some high level structures that Jet explored in this space](https://medium.com/@eulerfx/scaling-event-sourcing-at-jet-9c873cac33b8). + + +# Telemetry + +Propulsion emits OpenTelemetry spans for stream processing. All span attributes are prefixed with the `propulsion.` +namespace + +## {category} process + +| Attribute | Description | +|-------------------|------------------------------------------------------------------------------------------------------------------| +| `category` | The category being processed | +| `stream_name` | The full stream name being processed | +| `stream_id` | The id of the stream being processed | +| `batch_size` | The size of the batch being processed | +| `first_timestamp` | The receive timestamp of the first event in the batch being handled | +| `lead_time_ms` | The [lead time](https://www.merriam-webster.com/dictionary/lead%20time) in milliseconds for processing the batch | From 68403f4c12f955598eb7eeb3e5eba22664abca0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Einar=20Nor=C3=B0fj=C3=B6r=C3=B0?= Date: Sun, 11 Jun 2023 23:07:25 -0400 Subject: [PATCH 4/6] expose consumer group name from checkpoint store to enable using it as a tag on metrics in future --- src/Propulsion.CosmosStore/ReaderCheckpoint.fs | 1 + src/Propulsion.Feed/FeedSource.fs | 4 +--- src/Propulsion.MessageDb/ReaderCheckpoint.fs | 1 + src/Propulsion.SqlStreamStore/ReaderCheckpoint.fs | 1 + src/Propulsion/Feed.fs | 1 + 5 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/Propulsion.CosmosStore/ReaderCheckpoint.fs b/src/Propulsion.CosmosStore/ReaderCheckpoint.fs index 9c8e88ce..e1c10439 100644 --- a/src/Propulsion.CosmosStore/ReaderCheckpoint.fs +++ b/src/Propulsion.CosmosStore/ReaderCheckpoint.fs @@ -123,6 +123,7 @@ type Decider<'e, 's> = Equinox.Decider<'e, 's> type Service internal (resolve : SourceId * TrancheId * string -> Decider, consumerGroupName, defaultCheckpointFrequency) = interface IFeedCheckpointStore with + member _.ConsumerGroupName = consumerGroupName /// Start a checkpointing series with the supplied parameters /// Yields the checkpoint interval and the starting position diff --git a/src/Propulsion.Feed/FeedSource.fs b/src/Propulsion.Feed/FeedSource.fs index c6998aa9..4dfdf83c 100644 --- a/src/Propulsion.Feed/FeedSource.fs +++ b/src/Propulsion.Feed/FeedSource.fs @@ -269,9 +269,7 @@ type TailingFeedSource let crawl trancheId (wasLast, startPos) ct = taskSeq { if wasLast then do! Task.delay tailSleepInterval ct - try let batches = crawl.Invoke(trancheId, startPos, ct) - for batch in batches do - yield batch + try yield! crawl.Invoke(trancheId, startPos, ct) with e -> // Swallow (and sleep, if requested) if there's an issue reading from a tailing log match logReadFailure with None -> log.ForContext("tranche", trancheId).ForContext().Warning(e, "Read failure") | Some l -> l e match readFailureSleepInterval with None -> () | Some interval -> do! Task.delay interval ct } diff --git a/src/Propulsion.MessageDb/ReaderCheckpoint.fs b/src/Propulsion.MessageDb/ReaderCheckpoint.fs index 39f38fea..9d8daf31 100644 --- a/src/Propulsion.MessageDb/ReaderCheckpoint.fs +++ b/src/Propulsion.MessageDb/ReaderCheckpoint.fs @@ -61,6 +61,7 @@ type CheckpointStore(connString : string, schema : string, consumerGroupName, de setPos source tranche pos ct |> Task.ignore interface IFeedCheckpointStore with + member _.ConsumerGroupName = consumerGroupName member _.Start(source, tranche, establishOrigin, ct) = let start conn ct = task { diff --git a/src/Propulsion.SqlStreamStore/ReaderCheckpoint.fs b/src/Propulsion.SqlStreamStore/ReaderCheckpoint.fs index 9ca8283a..a2b8bcfa 100644 --- a/src/Propulsion.SqlStreamStore/ReaderCheckpoint.fs +++ b/src/Propulsion.SqlStreamStore/ReaderCheckpoint.fs @@ -60,6 +60,7 @@ type Service(connString : string, consumerGroupName, defaultCheckpointFrequency) return! createIfNotExists conn } interface IFeedCheckpointStore with + member _.ConsumerGroupName = consumerGroupName member _.Start(source, tranche, establishOrigin, ct) = task { use conn = createConnection connString diff --git a/src/Propulsion/Feed.fs b/src/Propulsion/Feed.fs index d4f915fa..26642d87 100644 --- a/src/Propulsion/Feed.fs +++ b/src/Propulsion/Feed.fs @@ -31,3 +31,4 @@ type IFeedCheckpointStore = /// Determines the starting position, and checkpointing frequency for a given tranche abstract member Start: source: SourceId * tranche : TrancheId * establishOrigin : Func> option * ct : CancellationToken -> Task abstract member Commit: source: SourceId * tranche : TrancheId * pos: Position * CancellationToken -> Task + abstract member ConsumerGroupName: string From dbe2c7b2bf9a92103d036a50eb9b0d476de38cb9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Einar=20Nor=C3=B0fj=C3=B6r=C3=B0?= Date: Sun, 11 Jun 2023 23:09:36 -0400 Subject: [PATCH 5/6] Revert "expose consumer group name from checkpoint store to enable using it as a tag on metrics in future" This reverts commit 68403f4c12f955598eb7eeb3e5eba22664abca0b. --- src/Propulsion.CosmosStore/ReaderCheckpoint.fs | 1 - src/Propulsion.Feed/FeedSource.fs | 4 +++- src/Propulsion.MessageDb/ReaderCheckpoint.fs | 1 - src/Propulsion.SqlStreamStore/ReaderCheckpoint.fs | 1 - src/Propulsion/Feed.fs | 1 - 5 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/Propulsion.CosmosStore/ReaderCheckpoint.fs b/src/Propulsion.CosmosStore/ReaderCheckpoint.fs index e1c10439..9c8e88ce 100644 --- a/src/Propulsion.CosmosStore/ReaderCheckpoint.fs +++ b/src/Propulsion.CosmosStore/ReaderCheckpoint.fs @@ -123,7 +123,6 @@ type Decider<'e, 's> = Equinox.Decider<'e, 's> type Service internal (resolve : SourceId * TrancheId * string -> Decider, consumerGroupName, defaultCheckpointFrequency) = interface IFeedCheckpointStore with - member _.ConsumerGroupName = consumerGroupName /// Start a checkpointing series with the supplied parameters /// Yields the checkpoint interval and the starting position diff --git a/src/Propulsion.Feed/FeedSource.fs b/src/Propulsion.Feed/FeedSource.fs index 4dfdf83c..c6998aa9 100644 --- a/src/Propulsion.Feed/FeedSource.fs +++ b/src/Propulsion.Feed/FeedSource.fs @@ -269,7 +269,9 @@ type TailingFeedSource let crawl trancheId (wasLast, startPos) ct = taskSeq { if wasLast then do! Task.delay tailSleepInterval ct - try yield! crawl.Invoke(trancheId, startPos, ct) + try let batches = crawl.Invoke(trancheId, startPos, ct) + for batch in batches do + yield batch with e -> // Swallow (and sleep, if requested) if there's an issue reading from a tailing log match logReadFailure with None -> log.ForContext("tranche", trancheId).ForContext().Warning(e, "Read failure") | Some l -> l e match readFailureSleepInterval with None -> () | Some interval -> do! Task.delay interval ct } diff --git a/src/Propulsion.MessageDb/ReaderCheckpoint.fs b/src/Propulsion.MessageDb/ReaderCheckpoint.fs index 9d8daf31..39f38fea 100644 --- a/src/Propulsion.MessageDb/ReaderCheckpoint.fs +++ b/src/Propulsion.MessageDb/ReaderCheckpoint.fs @@ -61,7 +61,6 @@ type CheckpointStore(connString : string, schema : string, consumerGroupName, de setPos source tranche pos ct |> Task.ignore interface IFeedCheckpointStore with - member _.ConsumerGroupName = consumerGroupName member _.Start(source, tranche, establishOrigin, ct) = let start conn ct = task { diff --git a/src/Propulsion.SqlStreamStore/ReaderCheckpoint.fs b/src/Propulsion.SqlStreamStore/ReaderCheckpoint.fs index a2b8bcfa..9ca8283a 100644 --- a/src/Propulsion.SqlStreamStore/ReaderCheckpoint.fs +++ b/src/Propulsion.SqlStreamStore/ReaderCheckpoint.fs @@ -60,7 +60,6 @@ type Service(connString : string, consumerGroupName, defaultCheckpointFrequency) return! createIfNotExists conn } interface IFeedCheckpointStore with - member _.ConsumerGroupName = consumerGroupName member _.Start(source, tranche, establishOrigin, ct) = task { use conn = createConnection connString diff --git a/src/Propulsion/Feed.fs b/src/Propulsion/Feed.fs index 26642d87..d4f915fa 100644 --- a/src/Propulsion/Feed.fs +++ b/src/Propulsion/Feed.fs @@ -31,4 +31,3 @@ type IFeedCheckpointStore = /// Determines the starting position, and checkpointing frequency for a given tranche abstract member Start: source: SourceId * tranche : TrancheId * establishOrigin : Func> option * ct : CancellationToken -> Task abstract member Commit: source: SourceId * tranche : TrancheId * pos: Position * CancellationToken -> Task - abstract member ConsumerGroupName: string From bce5a03b7dfd0742d2e16c22dc6206ec1e17d054 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Einar=20Nor=C3=B0fj=C3=B6r=C3=B0?= Date: Sun, 11 Jun 2023 23:10:12 -0400 Subject: [PATCH 6/6] simplify crawl --- src/Propulsion.Feed/FeedSource.fs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/Propulsion.Feed/FeedSource.fs b/src/Propulsion.Feed/FeedSource.fs index c6998aa9..4dfdf83c 100644 --- a/src/Propulsion.Feed/FeedSource.fs +++ b/src/Propulsion.Feed/FeedSource.fs @@ -269,9 +269,7 @@ type TailingFeedSource let crawl trancheId (wasLast, startPos) ct = taskSeq { if wasLast then do! Task.delay tailSleepInterval ct - try let batches = crawl.Invoke(trancheId, startPos, ct) - for batch in batches do - yield batch + try yield! crawl.Invoke(trancheId, startPos, ct) with e -> // Swallow (and sleep, if requested) if there's an issue reading from a tailing log match logReadFailure with None -> log.ForContext("tranche", trancheId).ForContext().Warning(e, "Read failure") | Some l -> l e match readFailureSleepInterval with None -> () | Some interval -> do! Task.delay interval ct }