Skip to content

Commit

Permalink
proProjector: minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Jun 22, 2021
1 parent 3227c7c commit 0f9a1d5
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 16 deletions.
2 changes: 1 addition & 1 deletion propulsion-projector/Handler.fs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ let render (stream : FsCodec.StreamName, span : Propulsion.Streams.StreamSpan<_>
#else // !kafka
// Each outcome from `handle` is passed to `HandleOk` or `HandleExn` by the scheduler, DumpStats is called at `statsInterval`
// The incoming calls are all sequential - the logic does not need to consider concurrent incoming calls
type ProjectorStats(log, statsInterval, stateInterval) =
type Stats(log, statsInterval, stateInterval) =
inherit Propulsion.Streams.Stats<int>(log, statsInterval, stateInterval)

let mutable totalCount = 0
Expand Down
2 changes: 2 additions & 0 deletions propulsion-projector/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type LoggerConfigurationExtensions() =
let cfpl = if verbose then Serilog.Events.LogEventLevel.Debug else Serilog.Events.LogEventLevel.Warning
// TODO figure out what CFP v3 requires
c.MinimumLevel.Override("Microsoft.Azure.Documents.ChangeFeedProcessor", cfpl)

#else
#if esdb
module CosmosStoreContext =
Expand All @@ -26,6 +27,7 @@ module CosmosStoreContext =
let maxEvents = 256 // default is 0
Equinox.CosmosStore.CosmosStoreContext(storeClient, tipMaxEvents=maxEvents)
#endif

#endif
//#if (cosmos || esdb)
type Equinox.CosmosStore.CosmosStoreConnector with
Expand Down
21 changes: 9 additions & 12 deletions propulsion-projector/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@ module Args =
| FromTail _ -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event."
| MaxDocuments _ -> "maximum document count to supply for the Change Feed query. Default: use response size limit"
| LagFreqM _ -> "specify frequency (minutes) to dump lag stats. Default: off"
| LeaseContainer _ -> "specify Container Name (in this [target] Database) for Leases container. Default: `SourceContainer` + `-aux`."

| ConnectionMode _ -> "override the connection mode. Default: Direct."
| Connection _ -> "specify a connection string for a Cosmos account. (optional if environment variable EQUINOX_COSMOS_CONNECTION specified)"
| Database _ -> "specify a database name for store. (optional if environment variable EQUINOX_COSMOS_DATABASE specified)"
| Container _ -> "specify a container name for store. (optional if environment variable EQUINOX_COSMOS_CONTAINER specified)"
| LeaseContainer _ -> "specify Container Name (in this [target] Database) for Leases container. Default: `SourceContainer` + `-aux`."
| Timeout _ -> "specify operation timeout in seconds. Default: 5."
| Retries _ -> "specify operation retries. Default: 1."
| RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds. Default: 5."
Expand Down Expand Up @@ -298,8 +298,8 @@ module Args =
and Arguments(c : Configuration, a : ParseResults<Parameters>) =
member val Verbose = a.Contains Verbose
member val ConsumerGroupName = a.GetResult ConsumerGroupName
member val MaxReadAhead = a.GetResult(MaxReadAhead, 64)
member val MaxConcurrentProcessors =a.GetResult(MaxWriters, 1024)
member val private MaxReadAhead = a.GetResult(MaxReadAhead, 64)
member val private MaxConcurrentProcessors =a.GetResult(MaxWriters, 1024)
member val StatsInterval = TimeSpan.FromMinutes 1.
member val StateInterval = TimeSpan.FromMinutes 2.
member x.BuildProcessorParams() =
Expand All @@ -310,7 +310,6 @@ module Args =
member x.MonitoringParams() =
let srcC = x.Cosmos
let leases : Microsoft.Azure.Cosmos.Container = srcC.ConnectLeases()
Log.Information("Processing... {dop} writers, max {maxReadAhead} batches read ahead", x.MaxConcurrentProcessors, x.MaxReadAhead)
Log.Information("Monitoring Group {processorName} in Database {db} Container {container} with maximum document count limited to {maxDocuments}",
x.ConsumerGroupName, leases.Database.Id, leases.Id, Option.toNullable srcC.MaxDocuments)
if srcC.FromTail then Log.Warning("(If new projector group) Skipping projection of all existing events.")
Expand All @@ -326,8 +325,7 @@ module Args =
let context = srcE.Cosmos.Connect() |> Async.RunSynchronously |> CosmosStoreContext.create
Log.Information("Processing Consumer Group {groupName} from {startPos} (force: {forceRestart}) in Database {db} Container {container}",
x.ConsumerGroupName, startPos, srcE.ForceRestart)
Log.Information("Ingesting in batches of [{minBatchSize}..{batchSize}], reading up to {maxReadAhead} uncommitted batches ahead",
srcE.MinBatchSize, srcE.StartingBatchSize, x.MaxReadAhead)
Log.Information("Ingesting in batches of [{minBatchSize}..{batchSize}]", srcE.MinBatchSize, srcE.StartingBatchSize)
srcE, context,
{ groupName = x.ConsumerGroupName; start = startPos; checkpointInterval = srcE.CheckpointInterval; tailInterval = srcE.TailInterval
forceRestart = srcE.ForceRestart
Expand Down Expand Up @@ -358,7 +356,6 @@ module Args =
Arguments(Configuration tryGetConfigValue, parser.ParseCommandLine argv)

//#if esdb

module Checkpoints =

open Equinox.CosmosStore
Expand All @@ -374,8 +371,8 @@ module Checkpoints =
let cat = CosmosStoreCategory(context, codec, Checkpoint.Fold.fold, Checkpoint.Fold.initial, caching, access)
let resolve streamName = cat.Resolve(streamName, Equinox.AllowStale)
Checkpoint.CheckpointSeries(groupName, resolve)
//#endif // esdb

//#endif // esdb
let [<Literal>] AppName = "ProjectorTemplate"

let build (args : Args.Arguments) =
Expand All @@ -391,7 +388,7 @@ let build (args : Args.Arguments) =
let sink = Propulsion.Kafka.StreamsProducerSink.Start(Log.Logger, maxReadAhead, maxConcurrentStreams, Handler.render, producer, stats, args.StatsInterval)
#endif // cosmos && kafka && !parallelOnly
#else // cosmos && !kafka
let stats = Handler.ProjectorStats(Log.Logger, args.StatsInterval, args.StateInterval)
let stats = Handler.Stats(Log.Logger, args.StatsInterval, args.StateInterval)
let sink = Propulsion.Streams.StreamsProjector.Start(Log.Logger, maxReadAhead, maxConcurrentStreams, Handler.handle, stats, args.StatsInterval)
#endif // cosmos && !kafka
let pipeline =
Expand All @@ -413,14 +410,14 @@ let build (args : Args.Arguments) =
let stats = Handler.ProductionStats(Log.Logger, args.StatsInterval, args.StateInterval)
let sink = Propulsion.Kafka.StreamsProducerSink.Start(Log.Logger, maxReadAhead, maxConcurrentStreams, Handler.render, producer, stats, args.StatsInterval)
#else // esdb && !kafka
let stats = Handler.ProjectorStats(Log.Logger, args.StatsInterval, args.StateInterval)
let stats = Handler.Stats(Log.Logger, args.StatsInterval, args.StateInterval)
let sink = Propulsion.Streams.StreamsProjector.Start(Log.Logger, maxReadAhead, maxConcurrentStreams, Handler.handle, stats, args.StatsInterval)
#endif // esdb && !kafka
let pipeline =
let filterByStreamName _ = true // see `dotnet new proReactor --filter` for an example of how to rig filtering arguments
Propulsion.EventStore.EventStoreSource.Run(
Log.Logger, sink, checkpoints, connectEs, spec, Handler.tryMapEvent filterByStreamName,
args.MaxReadAhead, args.StatsInterval)
maxReadAhead, args.StatsInterval)
#endif // esdb
//#if sss
let srcSql, spec = args.BuildSqlStreamStoreParams()
Expand All @@ -436,7 +433,7 @@ let build (args : Args.Arguments) =
let stats = Handler.ProductionStats(Log.Logger, args.StatsInterval, args.StateInterval)
let sink = Propulsion.Kafka.StreamsProducerSink.Start(Log.Logger, maxReadAhead, maxConcurrentStreams, Handler.render, producer, stats, args.StatsInterval)
#else // sss && !kafka
let stats = Handler.ProjectorStats(Log.Logger, args.StatsInterval, args.StateInterval)
let stats = Handler.Stats(Log.Logger, args.StatsInterval, args.StateInterval)
let sink = Propulsion.Streams.StreamsProjector.Start(Log.Logger, maxReadAhead, maxConcurrentStreams, Handler.handle, stats, args.StatsInterval)
#endif // sss && !kafka
let pipeline = Propulsion.SqlStreamStore.SqlStreamStoreSource.Run(Log.Logger, monitored, checkpointer, spec, sink, args.StatsInterval)
Expand Down
6 changes: 3 additions & 3 deletions propulsion-projector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ This project was generated using:
$env:PROPULSION_KAFKA_BROKER="instance.kafka.example.com:9092" # or use -b

//#if cosmos // kafka && cosmos
# `-g default` defines the Projector Group identity - each id has separated state in the aux container (aka LeaseId)
# `-g default` defines the Projector Group identity - each has separated state in the Leases (`-aux`) Container (aka processorName)
# `-t topic0` identifies the Kafka topic to which the Projector should write
# cosmos specifies the source (if you have specified 3x EQUINOX_COSMOS_* environment vars, no arguments are needed)
# `-md 1000` sets the change feed maximum document limit to 1000
Expand All @@ -165,10 +165,10 @@ This project was generated using:
//#if cosmos
# (either add environment variables as per step 0 or use -s/-d/-c to specify them)

# `-g default` defines the Projector Group identity - each id has separated state in the aux container (aka LeaseId)
# `-g default` defines the Projector Group identity - each has separated state in the Leases (`-aux`) Container (aka processorName)
# cosmos specifies the source (if you have specified 3x EQUINOX_COSMOS_* environment vars, no arguments are needed)
# `-md 1000` sets the max batch size to 1000
dotnet run -- -g default cosmos -md 1000
dotnet run -- -g default cosmos -md 1000

# NB (assuming you've scaled up enough to have >1 physical partition range, you can run a second instance in a second console with the same arguments)
//#endif // !kafka && cosmos
Expand Down

0 comments on commit 0f9a1d5

Please sign in to comment.