diff --git a/src/Equinox.Core/Batching.fs b/src/Equinox.Core/Batching.fs index 73e0c6e10..9ae559eb5 100644 --- a/src/Equinox.Core/Batching.fs +++ b/src/Equinox.Core/Batching.fs @@ -18,16 +18,16 @@ type internal AsyncBatch<'Req, 'Res>() = // sadly there's no way to detect without a try/catch try queue.TryAdd(item) with :? InvalidOperationException -> false - let mutable attempt = Unchecked.defaultof>> + let mutable attempt = Unchecked.defaultof>> /// Attempt to add a request to the flight /// Succeeds during linger interval (which commences when the first caller triggers the workflow via AwaitResult) /// Fails if this flight has closed (caller should initialize a fresh Batch, potentially holding off until the current attempt completes) - member _.TryAdd(req, dispatch: Func<'Req[], CancellationToken, Task<'Res[]>>, lingerMs: int, limiter: System.Threading.SemaphoreSlim voption, ct) = + member _.TryAdd(req, dispatch: Func<'Req[], CancellationToken, Task<'Res>>, lingerMs: int, limiter: System.Threading.SemaphoreSlim voption, ct) = if not (tryEnqueue req) then false else - // Prepare a new instance, with cancellation under our control (it won't start until the Force triggers it though) - let newInstance: Lazy> = lazy task { + // Prepare a new instance, with cancellation under our control (it won't start until the .Value triggers it though) + let newInstance: Lazy> = lazy task { do! Task.Delay(lingerMs, ct) match limiter with ValueNone -> () | ValueSome s -> do! s.WaitAsync(ct) try queue.CompleteAdding() @@ -45,12 +45,12 @@ type internal AsyncBatch<'Req, 'Res>() = /// Requests are added to pending batch during the wait period, which consists of two phases: /// 1. a defined linger period (min 1ms) /// 2. (optionally) a wait to acquire capacity on a limiter semaphore (e.g. one might have a limit on concurrent dispatches across a pool) -type Batcher<'Req, 'Res> private (tryInclude: Func, 'Req, CancellationToken, bool>) = +type Batcher<'Req, 'Res> private (tryInclude: Func, 'Req, CancellationToken, bool>) = let mutable cell = AsyncBatch<'Req, 'Res>() - new(dispatch: Func<'Req[], CancellationToken, Task<'Res[]>>, lingerMs, limiter) = + new(dispatch: Func<'Req[], CancellationToken, Task<'Res>>, lingerMs, limiter) = if lingerMs < 1 then invalidArg (nameof(lingerMs)) "Minimum linger period is 1ms" // concurrent waiters need to add work to the batch across their threads Batcher(fun cell req ct -> cell.TryAdd(req, dispatch, lingerMs, limiter, ct = ct)) - new(dispatch: 'Req[] -> Async<'Res[]>, ?linger : TimeSpan, + new(dispatch: 'Req[] -> Async<'Res>, ?linger: TimeSpan, // Extends the linger phase to include a period during which we await capacity on an externally managed Semaphore // The Batcher doesn't care, but a typical use is to enable limiting the number of concurrent in-flight dispatches ?limiter) =