Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Stream.TransactAsyncEx #194

Merged
merged 2 commits into from
Feb 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@ The `Unreleased` section name is replaced by the expected version of next releas
## [Unreleased]

### Added

- `Stream.TransactAsyncEx`, exposing the `Core.ISyncContext` at conclusion of the sync operation, affording the ability to examine the post-state `Version` etc. (This paves the way for exposing [`SessionToken`](https://github.com/jet/equinox/issues/192) at a later point without a breaking change) [#194](https://github.com/jet/equinox/pull/194)

### Changed

- `Stream.QueryEx` to supply `Core.ISyncContext` in lieu of only exposing `Version` (to align with `TransactAsyncEx`) [#194](https://github.com/jet/equinox/pull/194)

### Removed
### Fixed

Expand Down
4 changes: 2 additions & 2 deletions samples/Tutorial/FulfilmentCenter.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ module FulfilmentCenter =
stream.Query id
let queryEx fc (projection : Fold.State -> 't) : Async<int64*'t> =
let stream = resolve fc
stream.QueryEx(fun v s -> v, projection s)
stream.QueryEx(fun c -> c.Version, projection c.State)

member __.UpdateName(id, value) = execute id (Register value)
member __.UpdateAddress(id, value) = execute id (UpdateAddress value)
Expand Down Expand Up @@ -181,4 +181,4 @@ module FulfilmentCenterSummary =
stream.Query(Option.map (fun s -> s.state))

member __.Update(id, version, value) = execute id (Update (version,value))
member __.TryRead id : Async<Summary option> = read id
member __.TryRead id : Async<Summary option> = read id
2 changes: 1 addition & 1 deletion src/Equinox.Cosmos/Cosmos.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1350,4 +1350,4 @@ module Events =

/// Obtains the `index` from the current write Position
let getNextIndex (ctx: Context) (streamName: string) : Async<int64> =
ctx.Sync(ctx.CreateStream streamName) |> stripPosition
ctx.Sync(ctx.CreateStream streamName) |> stripPosition
2 changes: 1 addition & 1 deletion src/Equinox.EventStore/EventStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ module Token =
let (*private*) ofCompactionEventNumber compactedEventNumberOption unstoredEventsPending batchSize streamName streamVersion : StreamToken =
let batchCapacityLimit = batchCapacityLimit compactedEventNumberOption unstoredEventsPending batchSize streamVersion
create compactedEventNumberOption (Some batchCapacityLimit) streamName streamVersion

/// Assume we have not seen any compaction events; use the batchSize and version to infer headroom
let ofUncompactedVersion batchSize streamName streamVersion : StreamToken =
ofCompactionEventNumber None 0 batchSize streamName streamVersion
Expand Down
2 changes: 1 addition & 1 deletion src/Equinox.MemoryStore/MemoryStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,4 @@ type Resolver<'event, 'state, 'Format, 'context>(store : VolatileStore<'Format>,

/// Resolve from a Memento being used in a Continuation [based on position and state typically from Stream.CreateMemento]
member __.FromMemento(Token.Unpack stream as streamToken, state, ?context) =
Stream.ofMemento (streamToken,state) (resolveStream stream.streamName context)
Stream.ofMemento (streamToken,state) (resolveStream stream.streamName context)
2 changes: 1 addition & 1 deletion src/Equinox.SqlStreamStore/SqlStreamStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -562,4 +562,4 @@ type ConnectorBase([<O; D(null)>]?readRetryPolicy, [<O; D(null)>]?writeRetryPoli
member __.Establish(appName) : Async<Connection> = async {
let! store = __.Connect()
return Connection(readConnection=store, writeConnection=store, ?readRetryPolicy=readRetryPolicy, ?writeRetryPolicy=writeRetryPolicy)
}
}
73 changes: 41 additions & 32 deletions src/Equinox/Equinox.fs
Original file line number Diff line number Diff line change
Expand Up @@ -10,41 +10,50 @@ type MaxResyncsExhaustedException(count) =
/// Central Application-facing API. Wraps the handling of decision or query flows in a manner that is store agnostic
type Stream<'event, 'state>
( log, stream : IStream<'event, 'state>, maxAttempts : int,
[<Optional; DefaultParameterValue(null)>] ?mkAttemptsExhaustedException,
[<Optional; DefaultParameterValue(null)>] ?createAttemptsExhaustedException,
[<Optional; DefaultParameterValue(null)>] ?resyncPolicy) =

let transact f =
let resyncPolicy = defaultArg resyncPolicy (fun _log _attemptNumber f -> async { return! f })
let transact decide mapResult =
let resyncPolicy = defaultArg resyncPolicy (fun _log _attemptNumber resyncF -> async { return! resyncF })
let throwMaxResyncsExhaustedException attempts = MaxResyncsExhaustedException attempts
let handleResyncsExceeded = defaultArg mkAttemptsExhaustedException throwMaxResyncsExhaustedException
Flow.transact (maxAttempts, resyncPolicy, handleResyncsExceeded) (stream, log) f

/// 0. Invoke the supplied `interpret` function with the present state
/// 1. Attempt to sync the accumulated events to the stream
/// Tries up to `maxAttempts` times in the case of a conflict, throwing `MaxResyncsExhaustedException` to signal failure.
member __.Transact(interpret : 'state -> 'event list) : Async<unit> = transact (fun state -> async { return (), interpret state })

/// 0. Invoke the supplied `decide` function with the present state
/// 1. Attempt to sync the accumulated events to the stream
/// 2. Yield result
/// Tries up to `maxAttempts` times in the case of a conflict, throwing `MaxResyncsExhaustedException` to signal failure.
member __.Transact(decide : 'state -> 'result * 'event list) : Async<'result> = transact (fun state -> async { return decide state })

/// 0. Invoke the supplied _Async_ `decide` function with the present state
/// 1. Attempt to sync the accumulated events to the stream
/// 2. Yield result
/// Tries up to `maxAttempts` times in the case of a conflict, throwing `MaxResyncsExhaustedException` to signal failure.
member __.TransactAsync(decide : 'state -> Async<'result * 'event list>) : Async<'result> = transact decide

/// Project from the folded `State` without executing a decision flow as `Decide` does
member __.Query(projection : 'state -> 'view) : Async<'view> = Flow.query(stream, log, fun syncState -> projection syncState.State)

/// Project from the folded `State` (with the current version of the stream supplied for context) without executing a decision flow as `Decide` does
member __.QueryEx(projection : int64 -> 'state -> 'view) : Async<'view> = Flow.query(stream, log, fun syncState -> projection syncState.Version syncState.State)

/// Low-level helper to allow one to obtain a reference to a stream and state pair (including the position) in order to pass it as a continuation within the application
/// Such a memento is then held within the application and passed in lieu of a StreamId to the StreamResolver in order to avoid having to reload state
member __.CreateMemento() : Async<StreamToken * 'state> = Flow.query(stream, log, fun syncState -> syncState.Memento)
let handleResyncsExceeded = defaultArg createAttemptsExhaustedException throwMaxResyncsExhaustedException
Flow.transact (maxAttempts, resyncPolicy, handleResyncsExceeded) (stream, log) decide mapResult

/// 0. Invoke the supplied <c>interpret</c> function with the present state
/// 1a. (if events yielded) Attempt to sync the yielded events events to the stream
/// 1b. Tries up to <c>maxAttempts</c> times in the case of a conflict, throwing <c>MaxResyncsExhaustedException</c> to signal failure.
member __.Transact(interpret : 'state -> 'event list) : Async<unit> =
transact (fun state -> async { return (), interpret state }) (fun () _context -> ())

/// 0. Invoke the supplied <c>decide</c> function with the present state, holding the <c>'result</c>
/// 1a. (if events yielded) Attempt to sync the yielded events events to the stream
/// 1b. Tries up to <c>maxAttempts</c> times in the case of a conflict, throwing <c>MaxResyncsExhaustedException</c> to signal failure.
/// 2. Yield result
member __.Transact(decide : 'state -> 'result * 'event list) : Async<'result> =
transact (fun state -> async { return decide state }) (fun result _context -> result)

/// 0. Invoke the supplied <c>_Async_</c> <c>decide</c> function with the present state, holding the <c>'result</c>
/// 1a. (if events yielded) Attempt to sync the yielded events events to the stream
/// 1b. Tries up to <c>maxAttempts</c> times in the case of a conflict, throwing <c>MaxResyncsExhaustedException</c> to signal failure.
/// 2. Yield result
member __.TransactAsync(decide : 'state -> Async<'result * 'event list>) : Async<'result> =
transact decide (fun result _context -> result)

/// 0. Invoke the supplied <c>_Async_</c> <c>decide</c> function with the present state, holding the <c>'result</c>
/// 1a. (if events yielded) Attempt to sync the yielded events events to the stream
/// 1b. Tries up to <c>maxAttempts</c> times in the case of a conflict, throwing <c>MaxResyncsExhaustedException</c> to signal failure.
/// 2. Uses <c>mapResult</c> to render the final outcome from the <c>'result</c> and/or the final <c>ISyncContext</c>
/// 3. Yields the outcome
member __.TransactAsyncEx(decide : 'state -> Async<'result * 'event list>, mapResult : 'result -> ISyncContext<'state> -> 'resultEx) : Async<'resultEx> =
transact decide mapResult

/// Project from the folded <c>'state</c>, without executing a decision flow as <c>Transact</c> does
member __.Query(projection : 'state -> 'view) : Async<'view> =
Flow.query (stream, log, fun syncState -> projection (syncState :> ISyncContext<'state>).State)

/// Project from the stream's <c>'state<c> (including extended context), without executing a decision flow as <c>Transact<c> does
member __.QueryEx(projection : ISyncContext<'state> -> 'view) : Async<'view> =
Flow.query (stream, log, projection)

/// Store-agnostic <c>Context.Resolve</c> Options
type ResolveOption =
Expand Down
50 changes: 32 additions & 18 deletions src/Equinox/Flow.fs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,25 @@ type SyncResult<'state> =
/// Store-agnostic interface representing interactions a Flow can have with the state of a given event stream. Not intended for direct use by consumer code.
type IStream<'event, 'state> =
/// Obtain the state from the target stream
abstract Load : log: ILogger
-> Async<StreamToken * 'state>
abstract Load : log: ILogger -> Async<StreamToken * 'state>

/// Given the supplied `token` [and related `originState`], attempt to move to state `state'` by appending the supplied `events` to the underlying stream
/// SyncResult.Written: implies the state is now the value represented by the Result's value
/// SyncResult.Conflict: implies the `events` were not synced; if desired the consumer can use the included resync workflow in order to retry
abstract TrySync : log: ILogger * token: StreamToken * originState: 'state * events: 'event list -> Async<SyncResult<'state>>

/// Exposed by TransactEx / QueryEx, providing access to extended state information for cases where that's required
type ISyncContext<'state> =

/// Represents a Checkpoint position on a Stream's timeline; Can be used to manage continuations via a Resolver's FromMemento method
abstract member CreateMemento : unit -> StreamToken * 'state

/// Exposes the underlying Store's internal Version/Index (which, depending on the Codec, may or may not be reflected in the last event presented)
abstract member Version : int64

/// The present State of the stream within the context of this Flow
abstract member State : 'state

/// Internal implementation of the Store agnostic load + run/render. See Equinox.fs for App-facing APIs.
module internal Flow =

Expand All @@ -37,11 +48,7 @@ module internal Flow =
trySync : ILogger * StreamToken * 'state * 'event list -> Async<SyncResult<'state>>) =
let mutable tokenAndState = originState

member __.Memento = tokenAndState
member __.State = snd __.Memento
member __.Version = (fst __.Memento).version

member __.TryOr(log, events, handleFailureResync : (Async<StreamToken*'state> -> Async<bool>)) : Async<bool> = async {
let trySyncOr log events (handleFailureResync : Async<StreamToken*'state> -> Async<bool>) : Async<bool> = async {
let! res = let token, state = tokenAndState in trySync (log,token,state,events)
match res with
| SyncResult.Conflict resync ->
Expand All @@ -50,12 +57,19 @@ module internal Flow =
tokenAndState <- token', streamState'
return true }

interface ISyncContext<'state> with
member __.CreateMemento() = tokenAndState
member __.State = snd tokenAndState
member __.Version = (fst tokenAndState).version

member __.TryWithoutResync(log : ILogger, events) : Async<bool> =
trySyncOr log events (fun _resync -> async { return false })
member __.TryOrResync(runResync, attemptNumber: int, log : ILogger, events) : Async<bool> =
let resyncInPreparationForRetry resync = async {
let! streamState' = runResync log attemptNumber resync
tokenAndState <- streamState'
return false }
__.TryOr(log, events, resyncInPreparationForRetry)
trySyncOr log events resyncInPreparationForRetry

/// Process a command, ensuring a consistent final state is established on the stream.
/// 1. make a decision predicated on the known state
Expand All @@ -65,41 +79,41 @@ module internal Flow =
let run (log : ILogger) (maxSyncAttempts : int, resyncRetryPolicy, createMaxAttemptsExhaustedException)
(syncState : SyncState<'event, 'state>)
(decide : 'state -> Async<'result * 'event list>)
: Async<'result> =
(mapResult : 'result -> SyncState<'event, 'state> -> 'resultEx)
: Async<'resultEx> =

if maxSyncAttempts < 1 then raise <| System.ArgumentOutOfRangeException("maxSyncAttempts", maxSyncAttempts, "should be >= 1")

/// Run a decision cycle - decide what events should be appended given the presented state
let rec loop attempt : Async<'result> = async {
let rec loop attempt : Async<'resultEx> = async {
let log = if attempt = 1 then log else log.ForContext("syncAttempt", attempt)
let! result, events = decide syncState.State
let! result, events = decide (syncState :> ISyncContext<'state>).State
if List.isEmpty events then
log.Debug "No events generated"
return result
return mapResult result syncState
elif attempt = maxSyncAttempts then
// Special case: on final attempt, we won't be `resync`ing; we're giving up
let! committed = syncState.TryOr(log, events, fun _resync -> async { return false })

let! committed = syncState.TryWithoutResync(log, events)
if not committed then
log.Debug "Max Sync Attempts exceeded"
return raise (createMaxAttemptsExhaustedException attempt)
else
return result
return mapResult result syncState
else
let! committed = syncState.TryOrResync(resyncRetryPolicy, attempt, log, events)
if not committed then
log.Debug "Resyncing and retrying"
return! loop (attempt + 1)
else
return result }
return mapResult result syncState }

/// Commence, processing based on the incoming state
loop 1

let transact (maxAttempts,resyncRetryPolicy,createMaxAttemptsExhaustedException) (stream : IStream<_, _>, log) decide : Async<'result> = async {
let transact (maxAttempts, resyncRetryPolicy, createMaxAttemptsExhaustedException) (stream : IStream<_, _>, log) decide mapResult : Async<'result> = async {
let! streamState = stream.Load log
let syncState = SyncState(streamState, stream.TrySync)
return! run log (maxAttempts, resyncRetryPolicy, createMaxAttemptsExhaustedException) syncState decide }
return! run log (maxAttempts, resyncRetryPolicy, createMaxAttemptsExhaustedException) syncState decide mapResult }

let query (stream : IStream<'event, 'state>, log : ILogger, project: SyncState<'event, 'state> -> 'result) : Async<'result> = async {
let! streamState = stream.Load log
Expand Down