diff --git a/propulsion-projector/Handler.fs b/propulsion-projector/Handler.fs index d7d8b1a0f..727800802 100644 --- a/propulsion-projector/Handler.fs +++ b/propulsion-projector/Handler.fs @@ -85,7 +85,7 @@ let render (stream : FsCodec.StreamName, span : Propulsion.Streams.StreamSpan<_> #else // !kafka // Each outcome from `handle` is passed to `HandleOk` or `HandleExn` by the scheduler, DumpStats is called at `statsInterval` // The incoming calls are all sequential - the logic does not need to consider concurrent incoming calls -type ProjectorStats(log, statsInterval, stateInterval) = +type Stats(log, statsInterval, stateInterval) = inherit Propulsion.Streams.Stats(log, statsInterval, stateInterval) let mutable totalCount = 0 diff --git a/propulsion-projector/Infrastructure.fs b/propulsion-projector/Infrastructure.fs index 00db11efa..d628bd820 100644 --- a/propulsion-projector/Infrastructure.fs +++ b/propulsion-projector/Infrastructure.fs @@ -17,6 +17,7 @@ type LoggerConfigurationExtensions() = let cfpl = if verbose then Serilog.Events.LogEventLevel.Debug else Serilog.Events.LogEventLevel.Warning // TODO figure out what CFP v3 requires c.MinimumLevel.Override("Microsoft.Azure.Documents.ChangeFeedProcessor", cfpl) + #else #if esdb module CosmosStoreContext = @@ -26,6 +27,7 @@ module CosmosStoreContext = let maxEvents = 256 // default is 0 Equinox.CosmosStore.CosmosStoreContext(storeClient, tipMaxEvents=maxEvents) #endif + #endif //#if (cosmos || esdb) type Equinox.CosmosStore.CosmosStoreConnector with diff --git a/propulsion-projector/Program.fs b/propulsion-projector/Program.fs index 51716ed4b..a628e50a5 100644 --- a/propulsion-projector/Program.fs +++ b/propulsion-projector/Program.fs @@ -62,12 +62,12 @@ module Args = | FromTail _ -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event." | MaxDocuments _ -> "maximum document count to supply for the Change Feed query. Default: use response size limit" | LagFreqM _ -> "specify frequency (minutes) to dump lag stats. Default: off" - | LeaseContainer _ -> "specify Container Name (in this [target] Database) for Leases container. Default: `SourceContainer` + `-aux`." | ConnectionMode _ -> "override the connection mode. Default: Direct." | Connection _ -> "specify a connection string for a Cosmos account. (optional if environment variable EQUINOX_COSMOS_CONNECTION specified)" | Database _ -> "specify a database name for store. (optional if environment variable EQUINOX_COSMOS_DATABASE specified)" | Container _ -> "specify a container name for store. (optional if environment variable EQUINOX_COSMOS_CONTAINER specified)" + | LeaseContainer _ -> "specify Container Name (in this [target] Database) for Leases container. Default: `SourceContainer` + `-aux`." | Timeout _ -> "specify operation timeout in seconds. Default: 5." | Retries _ -> "specify operation retries. Default: 1." | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds. Default: 5." @@ -298,8 +298,8 @@ module Args = and Arguments(c : Configuration, a : ParseResults) = member val Verbose = a.Contains Verbose member val ConsumerGroupName = a.GetResult ConsumerGroupName - member val MaxReadAhead = a.GetResult(MaxReadAhead, 64) - member val MaxConcurrentProcessors =a.GetResult(MaxWriters, 1024) + member val private MaxReadAhead = a.GetResult(MaxReadAhead, 64) + member val private MaxConcurrentProcessors =a.GetResult(MaxWriters, 1024) member val StatsInterval = TimeSpan.FromMinutes 1. member val StateInterval = TimeSpan.FromMinutes 2. member x.BuildProcessorParams() = @@ -310,7 +310,6 @@ module Args = member x.MonitoringParams() = let srcC = x.Cosmos let leases : Microsoft.Azure.Cosmos.Container = srcC.ConnectLeases() - Log.Information("Processing... {dop} writers, max {maxReadAhead} batches read ahead", x.MaxConcurrentProcessors, x.MaxReadAhead) Log.Information("Monitoring Group {processorName} in Database {db} Container {container} with maximum document count limited to {maxDocuments}", x.ConsumerGroupName, leases.Database.Id, leases.Id, Option.toNullable srcC.MaxDocuments) if srcC.FromTail then Log.Warning("(If new projector group) Skipping projection of all existing events.") @@ -326,8 +325,7 @@ module Args = let context = srcE.Cosmos.Connect() |> Async.RunSynchronously |> CosmosStoreContext.create Log.Information("Processing Consumer Group {groupName} from {startPos} (force: {forceRestart}) in Database {db} Container {container}", x.ConsumerGroupName, startPos, srcE.ForceRestart) - Log.Information("Ingesting in batches of [{minBatchSize}..{batchSize}], reading up to {maxReadAhead} uncommitted batches ahead", - srcE.MinBatchSize, srcE.StartingBatchSize, x.MaxReadAhead) + Log.Information("Ingesting in batches of [{minBatchSize}..{batchSize}]", srcE.MinBatchSize, srcE.StartingBatchSize) srcE, context, { groupName = x.ConsumerGroupName; start = startPos; checkpointInterval = srcE.CheckpointInterval; tailInterval = srcE.TailInterval forceRestart = srcE.ForceRestart @@ -358,7 +356,6 @@ module Args = Arguments(Configuration tryGetConfigValue, parser.ParseCommandLine argv) //#if esdb - module Checkpoints = open Equinox.CosmosStore @@ -374,8 +371,8 @@ module Checkpoints = let cat = CosmosStoreCategory(context, codec, Checkpoint.Fold.fold, Checkpoint.Fold.initial, caching, access) let resolve streamName = cat.Resolve(streamName, Equinox.AllowStale) Checkpoint.CheckpointSeries(groupName, resolve) -//#endif // esdb +//#endif // esdb let [] AppName = "ProjectorTemplate" let build (args : Args.Arguments) = @@ -391,7 +388,7 @@ let build (args : Args.Arguments) = let sink = Propulsion.Kafka.StreamsProducerSink.Start(Log.Logger, maxReadAhead, maxConcurrentStreams, Handler.render, producer, stats, args.StatsInterval) #endif // cosmos && kafka && !parallelOnly #else // cosmos && !kafka - let stats = Handler.ProjectorStats(Log.Logger, args.StatsInterval, args.StateInterval) + let stats = Handler.Stats(Log.Logger, args.StatsInterval, args.StateInterval) let sink = Propulsion.Streams.StreamsProjector.Start(Log.Logger, maxReadAhead, maxConcurrentStreams, Handler.handle, stats, args.StatsInterval) #endif // cosmos && !kafka let pipeline = @@ -413,14 +410,14 @@ let build (args : Args.Arguments) = let stats = Handler.ProductionStats(Log.Logger, args.StatsInterval, args.StateInterval) let sink = Propulsion.Kafka.StreamsProducerSink.Start(Log.Logger, maxReadAhead, maxConcurrentStreams, Handler.render, producer, stats, args.StatsInterval) #else // esdb && !kafka - let stats = Handler.ProjectorStats(Log.Logger, args.StatsInterval, args.StateInterval) + let stats = Handler.Stats(Log.Logger, args.StatsInterval, args.StateInterval) let sink = Propulsion.Streams.StreamsProjector.Start(Log.Logger, maxReadAhead, maxConcurrentStreams, Handler.handle, stats, args.StatsInterval) #endif // esdb && !kafka let pipeline = let filterByStreamName _ = true // see `dotnet new proReactor --filter` for an example of how to rig filtering arguments Propulsion.EventStore.EventStoreSource.Run( Log.Logger, sink, checkpoints, connectEs, spec, Handler.tryMapEvent filterByStreamName, - args.MaxReadAhead, args.StatsInterval) + maxReadAhead, args.StatsInterval) #endif // esdb //#if sss let srcSql, spec = args.BuildSqlStreamStoreParams() @@ -436,7 +433,7 @@ let build (args : Args.Arguments) = let stats = Handler.ProductionStats(Log.Logger, args.StatsInterval, args.StateInterval) let sink = Propulsion.Kafka.StreamsProducerSink.Start(Log.Logger, maxReadAhead, maxConcurrentStreams, Handler.render, producer, stats, args.StatsInterval) #else // sss && !kafka - let stats = Handler.ProjectorStats(Log.Logger, args.StatsInterval, args.StateInterval) + let stats = Handler.Stats(Log.Logger, args.StatsInterval, args.StateInterval) let sink = Propulsion.Streams.StreamsProjector.Start(Log.Logger, maxReadAhead, maxConcurrentStreams, Handler.handle, stats, args.StatsInterval) #endif // sss && !kafka let pipeline = Propulsion.SqlStreamStore.SqlStreamStoreSource.Run(Log.Logger, monitored, checkpointer, spec, sink, args.StatsInterval) diff --git a/propulsion-projector/README.md b/propulsion-projector/README.md index 5c94a1492..2e64ca10a 100644 --- a/propulsion-projector/README.md +++ b/propulsion-projector/README.md @@ -138,7 +138,7 @@ This project was generated using: $env:PROPULSION_KAFKA_BROKER="instance.kafka.example.com:9092" # or use -b //#if cosmos // kafka && cosmos - # `-g default` defines the Projector Group identity - each id has separated state in the aux container (aka LeaseId) + # `-g default` defines the Projector Group identity - each has separated state in the Leases (`-aux`) Container (aka processorName) # `-t topic0` identifies the Kafka topic to which the Projector should write # cosmos specifies the source (if you have specified 3x EQUINOX_COSMOS_* environment vars, no arguments are needed) # `-md 1000` sets the change feed maximum document limit to 1000 @@ -165,10 +165,10 @@ This project was generated using: //#if cosmos # (either add environment variables as per step 0 or use -s/-d/-c to specify them) - # `-g default` defines the Projector Group identity - each id has separated state in the aux container (aka LeaseId) + # `-g default` defines the Projector Group identity - each has separated state in the Leases (`-aux`) Container (aka processorName) # cosmos specifies the source (if you have specified 3x EQUINOX_COSMOS_* environment vars, no arguments are needed) # `-md 1000` sets the max batch size to 1000 - dotnet run -- -g default cosmos -md 1000 + dotnet run -- -g default cosmos -md 1000 # NB (assuming you've scaled up enough to have >1 physical partition range, you can run a second instance in a second console with the same arguments) //#endif // !kafka && cosmos