Skip to content

Commit

Permalink
Update to Propulsion 2.12.0-rc.2 (#107)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink authored Dec 20, 2021
1 parent f5016b6 commit 43f1bd1
Show file tree
Hide file tree
Showing 22 changed files with 110 additions and 97 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ The `Unreleased` section name is replaced by the expected version of next releas
### Changed

- Remove usage of `type Command` DUs [#103](https://github.com/jet/dotnet-templates/pull/103)
- Target `Equinox` v `3.0.5`
- Target `Equinox` v `3.0.5`, `Propulsion` v `2.12.0-rc.2`
- Target `Destructurama.FSharp` v `1.2.0`, `Serilog.Sinks.Async` v `1.5.0`, `Serilog.Sinks.Console` v `4.0.0` [#101](https://github.com/jet/dotnet-templates/pull/101)

### Removed
### Fixed
Expand Down
2 changes: 1 addition & 1 deletion equinox-shipping/Watchdog.Integration/ReactorFixture.fs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type MemoryReactorFixture(testOutput) =
let processManager = Shipping.Domain.FinalizationWorkflow.Config.create 4 store.Config
let sink =
let processingTimeout = TimeSpan.FromSeconds 1.
let maxReadAhead, maxConcurrentStreams = 1024, 4 // TODO make Int32.MaxValue work
let maxReadAhead, maxConcurrentStreams = Int32.MaxValue, 4
Program.startWatchdog log (processingTimeout, stats) (maxReadAhead, maxConcurrentStreams) processManager.Pump
let projector = MemoryStoreProjector.Start(log, sink)
let projectorStoreSubscription =
Expand Down
21 changes: 11 additions & 10 deletions equinox-shipping/Watchdog/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -123,17 +123,18 @@ let build (args : Args.Arguments) =
let processManager = Shipping.Domain.FinalizationWorkflow.Config.create args.ProcessManagerMaxDop storeCfg
let stats = Handler.Stats(Log.Logger, statsInterval=args.StatsInterval, stateInterval=args.StateInterval)
startWatchdog Log.Logger (args.ProcessingTimeout, stats) (maxReadAhead, maxConcurrentStreams) processManager.Pump
let pipeline =
use observer = CosmosStoreSource.CreateObserver(Log.Logger, sink.StartIngester, Seq.collect Handler.transformOrFilter)
let source =
let observer = CosmosStoreSource.CreateObserver(Log.Logger, sink.StartIngester, Seq.collect Handler.transformOrFilter)
let leases, startFromTail, maxItems, lagFrequency = args.Cosmos.MonitoringParams()
CosmosStoreSource.Run(Log.Logger, monitored, leases, processorName, observer, startFromTail, ?maxItems=maxItems, lagReportFreq=lagFrequency)
sink, pipeline

let run args = async {
let sink, pipeline = build args
pipeline |> Async.Start
return! sink.AwaitWithStopOnCancellation()
}
CosmosStoreSource.Start(Log.Logger, monitored, leases, processorName, observer, startFromTail, ?maxItems=maxItems, lagReportFreq=lagFrequency)
sink, source

open Propulsion.CosmosStore.Infrastructure // AwaitKeyboardInterruptAsTaskCancelledException

let run args =
let sink, source = build args
[ Async.AwaitKeyboardInterruptAsTaskCancelledException(); source.AwaitWithStopOnCancellation(); sink.AwaitWithStopOnCancellation() ]
|> Async.Parallel |> Async.Ignore<unit[]>

[<EntryPoint>]
let main argv =
Expand Down
2 changes: 1 addition & 1 deletion equinox-shipping/Watchdog/Watchdog.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<ItemGroup>
<PackageReference Include="Argu" Version="6.1.1" />
<PackageReference Include="Destructurama.FSharp" Version="1.2.0" />
<PackageReference Include="Propulsion.CosmosStore" Version="2.11.0" />
<PackageReference Include="Propulsion.CosmosStore" Version="2.12.0-rc.2" />
<PackageReference Include="Serilog.Sinks.Console" Version="4.0.0" />
</ItemGroup>

Expand Down
4 changes: 2 additions & 2 deletions feed-consumer/FeedConsumer.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
<PackageReference Include="Destructurama.FSharp" Version="1.2.0" />
<PackageReference Include="Equinox.CosmosStore" Version="3.0.5" />
<PackageReference Include="Equinox.CosmosStore.Prometheus" Version="3.0.5" />
<PackageReference Include="Propulsion.CosmosStore" Version="2.11.0" />
<PackageReference Include="Propulsion.Feed" Version="2.11.0" />
<PackageReference Include="Propulsion.CosmosStore" Version="2.12.0-rc.2" />
<PackageReference Include="Propulsion.Feed" Version="2.12.0-rc.2" />
<PackageReference Include="Serilog.Sinks.Async" Version="1.5.0" />
<PackageReference Include="Serilog.Sinks.Console" Version="4.0.0" />
</ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion periodic-ingester/ApiClient.fs
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,5 @@ type TicketsFeed(baseUri) =
let tickets = TicketsClient(client)

// TODO add retries - consumer loop will abort if this throws
member _.Crawl(): AsyncSeq<Propulsion.Feed.SourceItem[]> =
member _.Crawl(_trancheId): AsyncSeq<Propulsion.Feed.SourceItem[]> =
tickets.Crawl()
4 changes: 2 additions & 2 deletions periodic-ingester/PeriodicIngester.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
<PackageReference Include="Destructurama.FSharp" Version="1.2.0" />
<PackageReference Include="Equinox.CosmosStore.Prometheus" Version="3.0.5" />
<PackageReference Include="prometheus-net.AspNetCore" Version="3.6.0" />
<PackageReference Include="Propulsion.CosmosStore" Version="2.11.0" />
<PackageReference Include="Propulsion.Feed" Version="2.11.0" />
<PackageReference Include="Propulsion.CosmosStore" Version="2.12.0-rc.2" />
<PackageReference Include="Propulsion.Feed" Version="2.12.0-rc.2" />
<PackageReference Include="Serilog.Sinks.Async" Version="1.5.0" />
<PackageReference Include="Serilog.Sinks.Console" Version="4.0.0" />
</ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion propulsion-archiver/Archiver.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<PackageReference Include="Argu" Version="6.1.1" />
<PackageReference Include="Destructurama.FSharp" Version="1.2.0" />
<PackageReference Include="Equinox.CosmosStore.Prometheus" Version="3.0.5" />
<PackageReference Include="Propulsion.CosmosStore" Version="2.11.0" />
<PackageReference Include="Propulsion.CosmosStore" Version="2.12.0-rc.2" />
<PackageReference Include="prometheus-net.AspNetCore" Version="3.6.0" />
<PackageReference Include="Serilog.Sinks.Async" Version="1.5.0" />
<PackageReference Include="Serilog.Sinks.Console" Version="4.0.0" />
Expand Down
15 changes: 8 additions & 7 deletions propulsion-archiver/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,11 @@ let build (args : Args.Arguments, log) =
let context = args.DestinationArchive.Connect() |> Async.RunSynchronously |> CosmosStoreContext.create
let eventsContext = Equinox.CosmosStore.Core.EventsContext(context, Config.log)
CosmosStoreSink.Start(log, args.MaxReadAhead, eventsContext, args.MaxWriters, args.StatsInterval, args.StateInterval, (*purgeInterval=TimeSpan.FromMinutes 10.,*) maxBytes = args.MaxBytes)
let pipeline =
use observer = CosmosStoreSource.CreateObserver(log, archiverSink.StartIngester, Seq.collect Handler.selectArchivable)
let source =
let observer = CosmosStoreSource.CreateObserver(log, archiverSink.StartIngester, Seq.collect Handler.selectArchivable)
let monitored, leases, processorName, startFromTail, maxItems, lagFrequency = args.MonitoringParams()
CosmosStoreSource.Run(log, monitored, leases, processorName, observer, startFromTail, ?maxItems=maxItems, lagReportFreq=lagFrequency)
archiverSink, pipeline
CosmosStoreSource.Start(log, monitored, leases, processorName, observer, startFromTail, ?maxItems=maxItems, lagReportFreq=lagFrequency)
archiverSink, source

// A typical app will likely have health checks etc, implying the wireup would be via `endpoints.MapMetrics()` and thus not use this ugly code directly
let startMetricsServer port : IDisposable =
Expand All @@ -197,12 +197,13 @@ let startMetricsServer port : IDisposable =
Log.Information("Prometheus /metrics endpoint on port {port}", port)
{ new IDisposable with member x.Dispose() = ms.Stop(); (metricsServer :> IDisposable).Dispose() }

open Propulsion.CosmosStore.Infrastructure // AwaitKeyboardInterruptAsTaskCancelledException

let run args = async {
let log = Log.ForContext<Propulsion.Streams.Scheduling.StreamSchedulingEngine>()
let sink, pipeline = build (args, log)
pipeline |> Async.Start
let sink, source = build (args, log)
use _metricsServer : IDisposable = args.PrometheusPort |> Option.map startMetricsServer |> Option.toObj
do! sink.AwaitWithStopOnCancellation()
return! Async.Parallel [ Async.AwaitKeyboardInterruptAsTaskCancelledException(); source.AwaitWithStopOnCancellation(); sink.AwaitWithStopOnCancellation() ] |> Async.Ignore<unit[]>
}

[<EntryPoint>]
Expand Down
2 changes: 1 addition & 1 deletion propulsion-consumer/Consumer.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<ItemGroup>
<PackageReference Include="Argu" Version="6.1.1" />
<PackageReference Include="Destructurama.FSharp" Version="1.2.0" />
<PackageReference Include="Propulsion.Kafka" Version="2.11.0" />
<PackageReference Include="Propulsion.Kafka" Version="2.12.0-rc.2" />
<PackageReference Include="Serilog.Sinks.Console" Version="4.0.0" />
</ItemGroup>

Expand Down
15 changes: 8 additions & 7 deletions propulsion-cosmos-reactor/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,12 @@ let build (args : Args.Arguments) =
let stats = Reactor.Stats(Log.Logger, args.StatsInterval, args.StateInterval)
let handle = Reactor.Config.createHandler store
Propulsion.Streams.StreamsProjector.Start(Log.Logger, maxReadAhead, maxConcurrentStreams, handle, stats, args.StatsInterval)
let pipeline =
let source =
let parseFeedDoc : _ -> Propulsion.Streams.StreamEvent<_> seq = Seq.collect Propulsion.CosmosStore.EquinoxNewtonsoftParser.enumStreamEvents
use observer = Propulsion.CosmosStore.CosmosStoreSource.CreateObserver(Log.Logger, sink.StartIngester, parseFeedDoc)
let observer = Propulsion.CosmosStore.CosmosStoreSource.CreateObserver(Log.Logger, sink.StartIngester, parseFeedDoc)
let leases, startFromTail, maxItems, lagFrequency = args.Cosmos.MonitoringParams()
Propulsion.CosmosStore.CosmosStoreSource.Run(Log.Logger, monitored, leases, processorName, observer, startFromTail, ?maxItems=maxItems, lagReportFreq=lagFrequency)
sink, pipeline
Propulsion.CosmosStore.CosmosStoreSource.Start(Log.Logger, monitored, leases, processorName, observer, startFromTail, ?maxItems=maxItems, lagReportFreq=lagFrequency)
sink, source

// A typical app will likely have health checks etc, implying the wireup would be via `endpoints.MapMetrics()` and thus not use this ugly code directly
let startMetricsServer port : IDisposable =
Expand All @@ -133,11 +133,12 @@ let startMetricsServer port : IDisposable =
Log.Information("Prometheus /metrics endpoint on port {port}", port)
{ new IDisposable with member x.Dispose() = ms.Stop(); (metricsServer :> IDisposable).Dispose() }

open Propulsion.CosmosStore.Infrastructure // AwaitKeyboardInterruptAsTaskCancelledException

let run args = async {
let sink, pipeline = build args
pipeline |> Async.Start
let sink, source = build args
use _metricsServer : IDisposable = args.PrometheusPort |> Option.map startMetricsServer |> Option.toObj
return! sink.AwaitWithStopOnCancellation()
return! Async.Parallel [ Async.AwaitKeyboardInterruptAsTaskCancelledException(); source.AwaitWithStopOnCancellation(); sink.AwaitWithStopOnCancellation() ] |> Async.Ignore<unit[]>
}

[<EntryPoint>]
Expand Down
2 changes: 1 addition & 1 deletion propulsion-cosmos-reactor/Reactor.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<PackageReference Include="Destructurama.FSharp" Version="1.2.0" />
<PackageReference Include="Equinox.CosmosStore.Prometheus" Version="3.0.5" />
<PackageReference Include="prometheus-net.AspNetCore" Version="3.6.0" />
<PackageReference Include="Propulsion.CosmosStore" Version="2.11.0" />
<PackageReference Include="Propulsion.CosmosStore" Version="2.12.0-rc.2" />
<PackageReference Include="Serilog.Sinks.Async" Version="1.5.0" />
<PackageReference Include="Serilog.Sinks.Console" Version="4.0.0" />
</ItemGroup>
Expand Down
45 changes: 25 additions & 20 deletions propulsion-projector/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -327,13 +327,6 @@ module Args =
#endif
//#if sss
member val SqlStreamStore = SqlStreamStoreSourceArguments(c, a.GetResult SqlMs)
member x.BuildSqlStreamStoreParams() =
let src = x.SqlStreamStore
let spec : Propulsion.SqlStreamStore.ReaderSpec =
{ consumerGroup = x.ProcessorName
maxBatchSize = src.MaxBatchSize
tailSleepInterval = src.TailInterval }
src, spec
//#endif
//#if kafka
member val Target = TargetInfo (c, a)
Expand Down Expand Up @@ -369,6 +362,10 @@ module Checkpoints =
//#endif // esdb
let [<Literal>] AppName = "ProjectorTemplate"

#if cosmos // cosmos
open Propulsion.CosmosStore.Infrastructure // AwaitKeyboardInterruptAsTaskCancelledException

#endif
let build (args : Args.Arguments) =
let maxReadAhead, maxConcurrentStreams = args.ProcessorParams()
#if cosmos // cosmos
Expand All @@ -385,10 +382,11 @@ let build (args : Args.Arguments) =
let stats = Handler.Stats(Log.Logger, args.StatsInterval, args.StateInterval)
let sink = Propulsion.Streams.StreamsProjector.Start(Log.Logger, maxReadAhead, maxConcurrentStreams, Handler.handle, stats, args.StatsInterval)
#endif // cosmos && !kafka
let pipeline =
use observer = Propulsion.CosmosStore.CosmosStoreSource.CreateObserver(Log.Logger, sink.StartIngester, Handler.mapToStreamItems)
let source =
let observer = Propulsion.CosmosStore.CosmosStoreSource.CreateObserver(Log.Logger, sink.StartIngester, Handler.mapToStreamItems)
let monitored, leases, processorName, startFromTail, maxItems, lagFrequency = args.MonitoringParams()
Propulsion.CosmosStore.CosmosStoreSource.Run(Log.Logger, monitored, leases, processorName, observer, startFromTail, ?maxItems=maxItems, lagReportFreq=lagFrequency)
Propulsion.CosmosStore.CosmosStoreSource.Start(Log.Logger, monitored, leases, processorName, observer, startFromTail, ?maxItems=maxItems, lagReportFreq=lagFrequency)
[ Async.AwaitKeyboardInterruptAsTaskCancelledException(); source.AwaitWithStopOnCancellation(); sink.AwaitWithStopOnCancellation() ]
#endif // cosmos
#if esdb
let srcE, context, spec = args.BuildEventStoreParams()
Expand All @@ -407,19 +405,20 @@ let build (args : Args.Arguments) =
let stats = Handler.Stats(Log.Logger, args.StatsInterval, args.StateInterval)
let sink = Propulsion.Streams.StreamsProjector.Start(Log.Logger, maxReadAhead, maxConcurrentStreams, Handler.handle, stats, args.StatsInterval)
#endif // esdb && !kafka
let pipeline =
let pumpSource =
let filterByStreamName _ = true // see `dotnet new proReactor --filter` for an example of how to rig filtering arguments
Propulsion.EventStore.EventStoreSource.Run(
Log.Logger, sink, checkpoints, connectEs, spec, Handler.tryMapEvent filterByStreamName,
maxReadAhead, args.StatsInterval)
[ pumpSource; sink.AwaitWithStopOnCancellation() ]
#endif // esdb
//#if sss
let srcSql, spec = args.BuildSqlStreamStoreParams()
let srcSql = args.SqlStreamStore

let monitored = srcSql.Connect()

let connectionString = srcSql.BuildCheckpointsConnectionString()
let checkpointer = Propulsion.SqlStreamStore.SqlCheckpointer(connectionString)
let checkpoints = Propulsion.SqlStreamStore.ReaderCheckpoint.Service(connectionString)

#if kafka // sss && kafka
let broker, topic = args.Target.BuildTargetParams()
Expand All @@ -430,15 +429,21 @@ let build (args : Args.Arguments) =
let stats = Handler.Stats(Log.Logger, args.StatsInterval, args.StateInterval)
let sink = Propulsion.Streams.StreamsProjector.Start(Log.Logger, maxReadAhead, maxConcurrentStreams, Handler.handle, stats, args.StatsInterval)
#endif // sss && !kafka
let pipeline = Propulsion.SqlStreamStore.SqlStreamStoreSource.Run(Log.Logger, monitored, checkpointer, spec, sink, args.StatsInterval)
let pumpSource =
let sourceId = Propulsion.Feed.SourceId.parse "default" // (was hardwired as '$all' prior to v 2.12.0)
let checkpointEventInterval = TimeSpan.FromHours 1. // Ignored when storing to Propulsion.SqlStreamStore.ReaderCheckpoint; relevant for Cosmos
let source =
Propulsion.SqlStreamStore.SqlStreamStoreSource
( Log.Logger, args.StatsInterval,
sourceId, srcSql.MaxBatchSize, srcSql.TailInterval,
checkpoints, checkpointEventInterval,
monitored, sink)
source.Pump(args.ProcessorName)
[ pumpSource; sink.AwaitWithStopOnCancellation() ]
//#endif // sss
sink, pipeline

let run args = async {
let sink, pipeline = build args
pipeline |> Async.Start
do! sink.AwaitWithStopOnCancellation()
}
let run args =
build args |> Async.Parallel |> Async.Ignore<unit[]>

[<EntryPoint>]
let main argv =
Expand Down
8 changes: 4 additions & 4 deletions propulsion-projector/Projector.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@
<PackageReference Include="Equinox.CosmosStore" Version="3.0.5" />
<!--#endif-->
<!--#if cosmos-->
<PackageReference Include="Propulsion.CosmosStore" Version="2.11.0" />
<PackageReference Include="Propulsion.CosmosStore" Version="2.12.0-rc.2" />
<!--#endif-->
<!--#if (esdb)-->
<PackageReference Include="Propulsion.EventStore" Version="2.11.0" />
<PackageReference Include="Propulsion.EventStore" Version="2.12.0-rc.2" />
<!--#endif-->
<!--#if (sss)-->
<PackageReference Include="Equinox.SqlStreamStore.MsSql" Version="3.0.5" />
<PackageReference Include="Propulsion.SqlStreamStore" Version="2.11.0" />
<PackageReference Include="Propulsion.SqlStreamStore" Version="2.12.0-rc.2" />
<!--#endif-->
<!--#if kafka-->
<PackageReference Include="Propulsion.Kafka" Version="2.11.0" />
<PackageReference Include="Propulsion.Kafka" Version="2.12.0-rc.2" />
<!--#endif-->
<PackageReference Include="Serilog.Sinks.Console" Version="4.0.0" />
</ItemGroup>
Expand Down
15 changes: 8 additions & 7 deletions propulsion-pruner/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,11 @@ let build (args : Args.Arguments, log : ILogger) =
let context = target.Connect() |> Async.RunSynchronously |> CosmosStoreContext.create
let eventsContext = Equinox.CosmosStore.Core.EventsContext(context, Config.log)
CosmosStorePruner.Start(Log.Logger, args.MaxReadAhead, eventsContext, args.MaxWriters, args.StatsInterval, args.StateInterval)
let pipeline =
use observer = CosmosStoreSource.CreateObserver(log.ForContext<CosmosStoreSource>(), deletingEventsSink.StartIngester, Seq.collect Handler.selectPrunable)
let source =
let observer = CosmosStoreSource.CreateObserver(log.ForContext<CosmosStoreSource>(), deletingEventsSink.StartIngester, Seq.collect Handler.selectPrunable)
let monitored, leases, processorName, startFromTail, maxItems, lagFrequency = args.MonitoringParams()
CosmosStoreSource.Run(log, monitored, leases, processorName, observer, startFromTail, ?maxItems=maxItems, lagReportFreq=lagFrequency)
deletingEventsSink, pipeline
CosmosStoreSource.Start(log, monitored, leases, processorName, observer, startFromTail, ?maxItems=maxItems, lagReportFreq=lagFrequency)
deletingEventsSink, source

// A typical app will likely have health checks etc, implying the wireup would be via `endpoints.MapMetrics()` and thus not use this ugly code directly
let startMetricsServer port : IDisposable =
Expand All @@ -186,12 +186,13 @@ let startMetricsServer port : IDisposable =
Log.Information("Prometheus /metrics endpoint on port {port}", port)
{ new IDisposable with member x.Dispose() = ms.Stop(); (metricsServer :> IDisposable).Dispose() }

open Propulsion.CosmosStore.Infrastructure // AwaitKeyboardInterruptAsTaskCancelledException

let run args = async {
let log = Log.ForContext<Propulsion.Streams.Scheduling.StreamSchedulingEngine>()
let sink, pipeline = build (args, log)
pipeline |> Async.Start
let sink, source = build (args, log)
use _metricsServer : IDisposable = args.PrometheusPort |> Option.map startMetricsServer |> Option.toObj
do! sink.AwaitWithStopOnCancellation()
return! Async.Parallel [ Async.AwaitKeyboardInterruptAsTaskCancelledException(); source.AwaitWithStopOnCancellation(); sink.AwaitWithStopOnCancellation() ] |> Async.Ignore<unit[]>
}

[<EntryPoint>]
Expand Down
2 changes: 1 addition & 1 deletion propulsion-pruner/Pruner.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<PackageReference Include="Destructurama.FSharp" Version="1.2.0" />
<PackageReference Include="Equinox.CosmosStore.Prometheus" Version="3.0.5" />
<PackageReference Include="prometheus-net.AspNetCore" Version="3.6.0" />
<PackageReference Include="Propulsion.CosmosStore" Version="2.11.0" />
<PackageReference Include="Propulsion.CosmosStore" Version="2.12.0-rc.2" />
<PackageReference Include="Serilog.Sinks.Async" Version="1.5.0" />
<PackageReference Include="Serilog.Sinks.Console" Version="4.0.0" />
<PackageReference Include="Serilog.Sinks.Seq" Version="4.0.0" />
Expand Down
Loading

0 comments on commit 43f1bd1

Please sign in to comment.