Skip to content


Add Cosmos support
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Jun 12, 2020
1 parent d3d6338 commit d3b99ab
Show file tree
Hide file tree
Showing 13 changed files with 307 additions and 42 deletions.
17 changes: 17 additions & 0 deletions equinox-fc/Domain.Tests/Fixtures.fs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,23 @@ module EnvVar =

let tryGet k = Environment.GetEnvironmentVariable k |> Option.ofObj

module Cosmos =

open Equinox.Cosmos
let connect () =
| Some s, Some d, Some c ->
let appName = "Domain.Tests"
let discovery = Discovery.FromConnectionString s
let connector = Connector(TimeSpan.FromSeconds 5., 5, TimeSpan.FromSeconds 5., Serilog.Log.Logger)
let connection = connector.Connect(appName, discovery) |> Async.RunSynchronously
let context = Context(connection, d, c)
let cache = Equinox.Cache (appName, 10)
context, cache
| s, d, c ->
failwithf "Connection, Database and Container EQUINOX_COSMOS_* Environment variables are required (%b,%b,%b)"
(Option.isSome s) (Option.isSome d) (Option.isSome c)

module EventStore =

open Equinox.EventStore
Expand Down
14 changes: 14 additions & 0 deletions equinox-fc/Domain.Tests/LocationTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,20 @@ let [<Property>] ``MemoryStore properties`` epochLen args =
let service = Location.MemoryStore.create (zero, cf, sc) store
run service args

type Cosmos(testOutput) =

let log = TestOutputLogger.create testOutput
do Serilog.Log.Logger <- log

let context, cache = Cosmos.connect ()

let [<Property(MaxTest=5, MaxFail=1)>] properties epochLen args =
let epochLen, idsWindow = max 1 epochLen, 5
let zero, cf, sc = Epoch.zeroBalance, Epoch.toBalanceCarriedForward idsWindow, Epoch.shouldClose epochLen

let service = Location.Cosmos.create (zero, cf, sc) (context, cache, 50)
run service args

type EventStore(testOutput) =

let log = TestOutputLogger.create testOutput
Expand Down
3 changes: 2 additions & 1 deletion equinox-fc/Domain/Domain.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@
<Compile Include="LocationSeries.fs" />
<Compile Include="LocationEpoch.fs" />
<Compile Include="Location.fs" />
<Compile Include="InventorySeries.fs" />
<Compile Include="InventoryEpoch.fs" />
<Compile Include="Inventory.fs" />
<Compile Include="StockTransaction.fs" />
<Compile Include="StockProcessManager.fs" />
<PackageReference Include="Equinox.Cosmos" Version="2.1.0" />
<PackageReference Include="Equinox.EventStore" Version="2.1.0" />
<PackageReference Include="FsCodec.NewtonsoftJson" Version="2.1.0" />

7 changes: 7 additions & 0 deletions equinox-fc/Domain/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ module InventoryId =
let parse (value : string) : InventoryId = %value
let toString (value : InventoryId) : string = %value

type InventoryEpochId = int<inventoryEpochId>
and [<Measure>] inventoryEpochId
module InventoryEpochId =
let parse (value : int) : InventoryEpochId = %value
let next (value : InventoryEpochId) : InventoryEpochId = % (%value + 1)
let toString (value : InventoryEpochId) : string = string %value

type InventoryTransactionId = string<inventoryTransactionId>
and [<Measure>] inventoryTransactionId
module InventoryTransactionId =
Expand Down
65 changes: 46 additions & 19 deletions equinox-fc/Domain/Inventory.fs
Original file line number Diff line number Diff line change
@@ -1,48 +1,64 @@
namespace Fc.Domain.Inventory

open Equinox.Core // we use Equinox's AsyncCacheCell helper below
open FSharp.UMX

type internal IdsCache<'Id>() =
let all = System.Collections.Concurrent.ConcurrentDictionary<'Id, unit>() // Bounded only by relatively low number of physical pick tickets IRL
static member Create init = let x = IdsCache() in x.Add init; x
member __.Add ids = for x in ids do all.[x] <- ()
member __.Contains id = all.ContainsKey id

/// Ingests items into a log of items, making a best effort at deduplicating as it writes
/// Prior to first add, reads recent ids, in order to minimize the number of duplicated Ids we ingest
type Service internal (inventoryId, epochs : Epoch.Service) =
/// Maintains active Epoch Id in a thread-safe manner while ingesting items into the `series` of `epochs`
/// Prior to first add, reads `lookBack` epochs to seed the cache, in order to minimize the number of duplicated Ids we ingest
type Service internal (inventoryId, series : Series.Service, epochs : Epoch.Service, lookBack, capacity) =

static let log = Serilog.Log.ForContext<Service>()
let log = Serilog.Log.ForContext<Service>()

// Maintains what we believe to be the currently open EpochId
// Guaranteed to be set only after `previousIds.AwaitValue()`
let mutable activeEpochId = Unchecked.defaultof<_>

// We want max one request in flight to establish the pre-existing Events from which the TransactionIds cache will be seeded
let previousIds : AsyncCacheCell<Set<InventoryTransactionId>> =
let read = async { let! r = epochs.TryIngest(inventoryId, Seq.empty) in return r.transactionIds }
AsyncCacheCell read
let previousEpochs = AsyncCacheCell<AsyncCacheCell<Set<InventoryTransactionId>> list> <| async {
let! startingId = series.ReadIngestionEpoch(inventoryId)
activeEpochId <- %startingId
let read epochId = async { let! r = epochs.TryIngest(inventoryId, epochId, (fun _ -> 1), Seq.empty) in return r.transactionIds }
return [ for epoch in (max 0 (%startingId - lookBack)) .. (%startingId - 1) -> AsyncCacheCell(read %epoch) ] }

// TransactionIds cache - used to maintain a list of transactions that have already been ingested in order to avoid db round-trips
let previousIds : AsyncCacheCell<IdsCache<_>> = AsyncCacheCell <| async {
let! previousIds = previousIds.AwaitValue()
return IdsCache.Create(previousIds) }
let! previousEpochs = previousEpochs.AwaitValue()
let! ids = seq { for x in previousEpochs -> x.AwaitValue() } |> Async.Parallel
return IdsCache.Create(Seq.concat ids) }

let tryIngest events = async {
let! previousIds = previousIds.AwaitValue()
let initialEpochId = %activeEpochId

let rec aux totalIngested items = async {
let rec aux epochId totalIngested items = async {
let SeqPartition f = Seq.toArray >> Array.partition f
let dup, fresh = items |> SeqPartition (Epoch.Events.chooseInventoryTransactionId >> Option.exists previousIds.Contains)
let fullCount = List.length items
let dropping = fullCount - Array.length fresh
if dropping <> 0 then log.Information("Ignoring {count}/{fullCount} duplicate ids: {ids}", dropping, fullCount, dup)
if dropping <> 0 then log.Information("Ignoring {count}/{fullCount} duplicate ids: {ids} for {epochId}", dropping, fullCount, dup, epochId)
if Array.isEmpty fresh then
return totalIngested
let! res = epochs.TryIngest(inventoryId, fresh)
log.Information("Added {count} items to {inventoryId:l}", res.added, inventoryId)
let! res = epochs.TryIngest(inventoryId, epochId, capacity, fresh)
log.Information("Added {count} items to {inventoryId:l}/{epochId}", res.added, inventoryId, epochId)
// The adding is potentially redundant; we don't care
previousIds.Add res.transactionIds
// Any writer noticing we've moved to a new epoch shares the burden of marking it active
if not res.isClosed && activeEpochId < %epochId then
log.Information("Marking {inventoryId:l}/{epochId} active", inventoryId, epochId)
do! series.AdvanceIngestionEpoch(inventoryId, epochId)
System.Threading.Interlocked.CompareExchange(&activeEpochId, %epochId, activeEpochId) |> ignore
let totalIngestedTransactions = totalIngested + res.added
return totalIngestedTransactions }
return! aux 0 events
match res.rejected with
| [] -> return totalIngestedTransactions
| rej -> return! aux ( epochId) totalIngestedTransactions rej }
return! aux initialEpochId 0 events

/// Upon startup, we initialize the TransactionIds cache with recent epochs; we want to kick that process off before our first ingest
Expand All @@ -53,11 +69,22 @@ type Service internal (inventoryId, epochs : Epoch.Service) =

module internal Helpers =

let create inventoryId epochs =
Service(inventoryId, epochs)
let create inventoryId (maxTransactionsPerEpoch, lookBackLimit) (series, epochs) =
let remainingEpochCapacity (state: Epoch.Fold.State) =
let currentLen = state.ids.Count
max 0 (maxTransactionsPerEpoch - currentLen)
Service(inventoryId, series, epochs, lookBack=lookBackLimit, capacity=remainingEpochCapacity)

module Cosmos =

let create inventoryId (maxTransactionsPerEpoch, lookBackLimit) (context, cache) =
let series = Series.Cosmos.create (context, cache)
let epochs = Epoch.Cosmos.create (context, cache)
Helpers.create inventoryId (maxTransactionsPerEpoch, lookBackLimit) (series, epochs)

module EventStore =

let create inventoryId (context, cache) =
let create inventoryId (maxTransactionsPerEpoch, lookBackLimit) (context, cache) =
let series = Series.EventStore.create (context, cache)
let epochs = Epoch.EventStore.create (context, cache)
Helpers.create inventoryId epochs
Helpers.create inventoryId (maxTransactionsPerEpoch, lookBackLimit) (series, epochs)
58 changes: 46 additions & 12 deletions equinox-fc/Domain/InventoryEpoch.fs
Original file line number Diff line number Diff line change
@@ -1,68 +1,102 @@
/// Manages the ingestion (and deduplication based on a TransactionId) of events reflecting transfers or stock adjustments
/// that have been effected across a given set of Inventory
/// See Inventory.Service for surface level API which manages the ingestion
/// See Inventory.Service for surface level API which manages the ingestion, including transitioning to a new Epoch when an epoch reaches 'full' state
module Fc.Domain.Inventory.Epoch

let [<Literal>] Category = "InventoryEpoch"
let streamName inventoryId = FsCodec.StreamName.compose Category [InventoryId.toString inventoryId; "0"]
let streamName (inventoryId, epochId) = FsCodec.StreamName.compose Category [InventoryId.toString inventoryId; InventoryEpochId.toString epochId]

// NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
module Events =

type TransactionRef = { transactionId : InventoryTransactionId }
type Snapshotted = { closed: bool; ids : InventoryTransactionId[] }

type Event =
| Adjusted of TransactionRef
| Transferred of TransactionRef
| Closed
| Snapshotted of Snapshotted
interface TypeShape.UnionContract.IUnionContract
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()

/// Used for deduplicating input events
let chooseInventoryTransactionId = function
| Adjusted { transactionId = id } | Transferred { transactionId = id } -> Some id
| Closed | Snapshotted _ -> None

module Fold =

type State = { closed : bool; ids : Set<InventoryTransactionId> }
let initial = { closed = false; ids = Set.empty }
let evolve state = function
| (Events.Adjusted e | Events.Transferred e) -> { state with ids = Set.add e.transactionId state.ids }
| Events.Closed -> { state with closed = true }
| Events.Snapshotted e -> { closed = e.closed; ids = Set.ofArray e.ids }
let fold : State -> Events.Event seq -> State = Seq.fold evolve
let isOrigin = function Events.Snapshotted _ -> true | _ -> false
let snapshot s = Events.Snapshotted { closed = s.closed; ids = Array.ofSeq s.ids }

type Result =
{ /// Count of items added to this epoch. May be less than requested due to removal of duplicates and/or rejected items
{ /// Indicates whether this epoch is closed (either previously or as a side-effect this time)
isClosed : bool
/// Count of items added to this epoch. May be less than requested due to removal of duplicates and/or rejected items
added : int
/// residual items that [are not duplicates and] were not accepted into this epoch
rejected : Events.Event list
/// identifiers for all items in this epoch
transactionIds : Set<InventoryTransactionId> }

let decideSync events (state : Fold.State) : Result * Events.Event list =
let decideSync capacity events (state : Fold.State) : Result * Events.Event list =
let isFresh = function
| Events.Adjusted { transactionId = id }
| Events.Transferred { transactionId = id } -> (not << state.ids.Contains) id
| Events.Closed | Events.Snapshotted _ -> false
let news = events |> Seq.filter isFresh |> List.ofSeq
let newCount = List.length news
let events = [ if newCount <> 0 then yield! news ]
let closed, allowing, markClosed, residual =
let newCount = List.length news
if state.closed then
true, 0, false, news
let capacityNow = capacity state
let accepting = min capacityNow newCount
let closing = accepting = capacityNow
let residual = List.skip accepting news
closing, accepting, closing, residual
let events =
[ if allowing <> 0 then yield! news
if markClosed then yield Events.Closed ]
let state' = Fold.fold state events
{ added = newCount; transactionIds = state'.ids }, events
{ isClosed = closed; added = allowing; rejected = residual; transactionIds = state'.ids }, events

type Service internal (resolve : InventoryId -> Equinox.Stream<Events.Event, Fold.State>) =
type Service internal (resolve : InventoryId * InventoryEpochId -> Equinox.Stream<Events.Event, Fold.State>) =

/// Attempt ingestion of `events` into the cited Epoch.
/// - None will be accepted if the Epoch is `closed`
/// - The `capacity` function will be passed a non-closed `state` in order to determine number of items that can be admitted prior to closing
/// - If the computed capacity result is >= the number of items being submitted (which may be 0), the Epoch will be marked Closed
/// NOTE the result may include rejected items (which the caller is expected to feed into a successor epoch)
member __.TryIngest(inventoryId, events) : Async<Result> =
let stream = resolve inventoryId
stream.Transact(decideSync events)
member __.TryIngest(inventoryId, epochId, capacity, events) : Async<Result> =
let stream = resolve (inventoryId, epochId)
stream.Transact(decideSync capacity events)

let create resolve =
let resolve ids =
let stream = resolve (streamName ids)
Equinox.Stream(Serilog.Log.ForContext<Service>(), stream, maxAttempts=2)
Equinox.Stream(Serilog.Log.ForContext<Service>(), stream, maxAttempts = 2)

module Cosmos =

open Equinox.Cosmos

let accessStrategy = AccessStrategy.Snapshot (Fold.isOrigin, Fold.snapshot)
let create (context, cache) =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
let resolver = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
create resolver.Resolve

module EventStore =

open Equinox.EventStore
Expand Down
71 changes: 71 additions & 0 deletions equinox-fc/Domain/InventorySeries.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/// Manages a) the ingestion epoch id b) the current checkpointed read position for a long-running Inventory Series
/// See InventoryEpoch for the logic managing the actual events logged within a given epoch
/// See Inventory.Service for the surface API which manages the writing
module Fc.Domain.Inventory.Series

let [<Literal>] Category = "InventorySeries"
let streamName inventoryId = FsCodec.StreamName.create Category (InventoryId.toString inventoryId)

// NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
module Events =

type Started = { epoch : InventoryEpochId }
type Event =
| Started of Started
interface TypeShape.UnionContract.IUnionContract
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()

module Fold =

type State = InventoryEpochId option
let initial = None
let evolve _state = function
| Events.Started e -> Some e.epoch
let fold : State -> Events.Event seq -> State = Seq.fold evolve

let queryActiveEpoch state = state |> Option.defaultValue (InventoryEpochId.parse 0)

let interpretAdvanceIngestionEpoch epochId (state : Fold.State) =
if queryActiveEpoch state >= epochId then []
else [Events.Started { epoch = epochId }]

type Service internal (resolve : InventoryId -> Equinox.Stream<Events.Event, Fold.State>) =

member __.ReadIngestionEpoch(inventoryId) : Async<InventoryEpochId> =
let stream = resolve inventoryId
stream.Query queryActiveEpoch

member __.AdvanceIngestionEpoch(inventoryId, epochId) : Async<unit> =
let stream = resolve inventoryId
stream.Transact(interpretAdvanceIngestionEpoch epochId)

let create resolve =
// For this stream, we uniformly use stale reads as:
// a) we don't require any information from competing writers
// b) while there are competing writers [which might cause us to have to retry a Transact], this should be infrequent
let opt = Equinox.ResolveOption.AllowStale
let resolve locationId =
let stream = resolve (streamName locationId, opt)
Equinox.Stream(Serilog.Log.ForContext<Service>(), stream, maxAttempts = 2)

module Cosmos =

open Equinox.Cosmos

let accessStrategy = AccessStrategy.LatestKnownEvent
let create (context, cache) =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
let resolver = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
create <| fun (id, opt) -> resolver.Resolve(id, opt)

module EventStore =

open Equinox.EventStore

let accessStrategy = AccessStrategy.LatestKnownEvent
let create (context, cache) =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
let resolver = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
create <| fun (id, opt) -> resolver.Resolve(id, opt)
7 changes: 7 additions & 0 deletions equinox-fc/Domain/Location.fs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ module Helpers =
let create (zeroBalance, toBalanceCarriedForward, shouldClose) (series, epochs) =
Service(zeroBalance, toBalanceCarriedForward, shouldClose, series, epochs)

module Cosmos =

let create (zeroBalance, toBalanceCarriedForward, shouldClose) (context, cache, maxAttempts) =
let series = Series.Cosmos.create (context, cache, maxAttempts)
let epochs = Epoch.Cosmos.create (context, cache, maxAttempts)
create (zeroBalance, toBalanceCarriedForward, shouldClose) (series, epochs)

module EventStore =

let create (zeroBalance, toBalanceCarriedForward, shouldClose) (context, cache, maxAttempts) =
Expand Down

0 comments on commit d3b99ab

Please sign in to comment.