diff --git a/Directory.Build.props b/Directory.Build.props index af36e16f..0f823c0d 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -6,7 +6,7 @@ https://github.com/jet/propulsion fsharp eventsourcing cosmosdb changefeedprocessor dynamodb equinox eventstoredb messagedb kafka Apache-2.0 - Copyright © 2018-23 + Copyright © 2018-24 5 diff --git a/src/Propulsion/Parallel.fs b/src/Propulsion/Parallel.fs index c324b614..62628832 100755 --- a/src/Propulsion/Parallel.fs +++ b/src/Propulsion/Parallel.fs @@ -14,7 +14,6 @@ module Scheduling = /// Single instance per system; coordinates the dispatching of work, subject to the maxDop concurrent processors constraint /// Semaphore is allocated on queueing, deallocated on completion of the processing type Dispatcher(maxDop) = - // Using a Queue as a) the ordering is more correct, favoring more important work b) we are adding from many threads so no value in ConcurrentBag's thread-affinity let tryWrite, wait, apply = let c = Channel.unboundedSwSr<_> in let r, w = c.Reader, c.Writer w.TryWrite, Channel.awaitRead r, Channel.apply r diff --git a/src/Propulsion/Streams.fs b/src/Propulsion/Streams.fs index 1fd63ea3..6bcbb826 100755 --- a/src/Propulsion/Streams.fs +++ b/src/Propulsion/Streams.fs @@ -876,7 +876,6 @@ module Dispatcher = let result = Event<'R>() let dop = Sem maxDop - // NOTE this obviously depends on the passed computation never throwing, or we'd leak dop let runHandler struct (computation: CancellationToken -> Task<'R>, ct) = task { let! res = computation ct dop.Release() @@ -886,7 +885,8 @@ module Dispatcher = member _.State = dop.State member _.HasCapacity = dop.HasCapacity member _.AwaitButRelease(ct) = dop.WaitButRelease(ct) - member _.TryAdd(item) = dop.TryTake() && tryWrite item + // NOTE computation is required/trusted to have an outer catch (or results would not be posted and dop would leak) + member _.TryAdd(computation) = dop.TryTake() && tryWrite computation member _.Pump(ct: CancellationToken) = task { while not ct.IsCancellationRequested do @@ -926,6 +926,7 @@ module Dispatcher = interpretProgress: Scheduling.StreamStates<'F> -> FsCodec.StreamName -> Result<'P,'E> -> struct (int64 voption * Result<'R, 'E>)) = static member Create ( maxDop, + // NOTE `project` must not throw under any circumstances, or the exception will go unobserved, and DOP will leak in the dispatcher project: FsCodec.StreamName -> FsCodec.ITimelineEvent<'F>[] -> CancellationToken -> Task>, interpretProgress: Scheduling.StreamStates<'F> -> FsCodec.StreamName -> Result<'P, 'E> -> struct (int64 voption * Result<'R, 'E>)) = let project struct (startTs, item: Scheduling.Item<'F>) (ct: CancellationToken) = task { @@ -955,7 +956,7 @@ module Dispatcher = /// Implementation of IDispatcher that allows a supplied handler select work and declare completion based on arbitrarily defined criteria type Batched<'F> ( select: Func seq, Scheduling.Item<'F>[]>, - // NOTE Handler must not throw under any circumstances, or the exception will go unobserved + // NOTE `handle` must not throw under any circumstances, or the exception will go unobserved handle: Scheduling.Item<'F>[] -> CancellationToken -> Task>[]>) = let inner = DopDispatcher 1