From d3b99aba7cfcc99d39bd85e536feaf6030fcb3e3 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Fri, 12 Jun 2020 14:38:25 +0100 Subject: [PATCH] Add Cosmos support --- equinox-fc/Domain.Tests/Fixtures.fs | 17 ++++++ equinox-fc/Domain.Tests/LocationTests.fs | 14 +++++ equinox-fc/Domain/Domain.fsproj | 3 +- equinox-fc/Domain/Infrastructure.fs | 7 +++ equinox-fc/Domain/Inventory.fs | 65 +++++++++++++++------- equinox-fc/Domain/InventoryEpoch.fs | 58 +++++++++++++++---- equinox-fc/Domain/InventorySeries.fs | 71 ++++++++++++++++++++++++ equinox-fc/Domain/Location.fs | 7 +++ equinox-fc/Domain/LocationEpoch.fs | 10 ++++ equinox-fc/Domain/LocationSeries.fs | 11 ++++ equinox-fc/Domain/StockProcessManager.fs | 15 ++++- equinox-fc/Domain/StockTransaction.fs | 11 ++++ equinox-fc/Web/Program.fs | 60 +++++++++++++++++--- 13 files changed, 307 insertions(+), 42 deletions(-) create mode 100644 equinox-fc/Domain/InventorySeries.fs diff --git a/equinox-fc/Domain.Tests/Fixtures.fs b/equinox-fc/Domain.Tests/Fixtures.fs index 58dbe1e3e..a7a2d54a4 100644 --- a/equinox-fc/Domain.Tests/Fixtures.fs +++ b/equinox-fc/Domain.Tests/Fixtures.fs @@ -8,6 +8,23 @@ module EnvVar = let tryGet k = Environment.GetEnvironmentVariable k |> Option.ofObj +module Cosmos = + + open Equinox.Cosmos + let connect () = + match EnvVar.tryGet "EQUINOX_COSMOS_CONNECTION", EnvVar.tryGet "EQUINOX_COSMOS_DATABASE", EnvVar.tryGet "EQUINOX_COSMOS_CONTAINER" with + | Some s, Some d, Some c -> + let appName = "Domain.Tests" + let discovery = Discovery.FromConnectionString s + let connector = Connector(TimeSpan.FromSeconds 5., 5, TimeSpan.FromSeconds 5., Serilog.Log.Logger) + let connection = connector.Connect(appName, discovery) |> Async.RunSynchronously + let context = Context(connection, d, c) + let cache = Equinox.Cache (appName, 10) + context, cache + | s, d, c -> + failwithf "Connection, Database and Container EQUINOX_COSMOS_* Environment variables are required (%b,%b,%b)" + (Option.isSome s) (Option.isSome d) (Option.isSome c) + module EventStore = open Equinox.EventStore diff --git a/equinox-fc/Domain.Tests/LocationTests.fs b/equinox-fc/Domain.Tests/LocationTests.fs index 4e4a67645..287f0d0b2 100644 --- a/equinox-fc/Domain.Tests/LocationTests.fs +++ b/equinox-fc/Domain.Tests/LocationTests.fs @@ -60,6 +60,20 @@ let [] ``MemoryStore properties`` epochLen args = let service = Location.MemoryStore.create (zero, cf, sc) store run service args +type Cosmos(testOutput) = + + let log = TestOutputLogger.create testOutput + do Serilog.Log.Logger <- log + + let context, cache = Cosmos.connect () + + let [] properties epochLen args = + let epochLen, idsWindow = max 1 epochLen, 5 + let zero, cf, sc = Epoch.zeroBalance, Epoch.toBalanceCarriedForward idsWindow, Epoch.shouldClose epochLen + + let service = Location.Cosmos.create (zero, cf, sc) (context, cache, 50) + run service args + type EventStore(testOutput) = let log = TestOutputLogger.create testOutput diff --git a/equinox-fc/Domain/Domain.fsproj b/equinox-fc/Domain/Domain.fsproj index 3d0c2a748..8a592ca3f 100644 --- a/equinox-fc/Domain/Domain.fsproj +++ b/equinox-fc/Domain/Domain.fsproj @@ -11,14 +11,15 @@ + + - diff --git a/equinox-fc/Domain/Infrastructure.fs b/equinox-fc/Domain/Infrastructure.fs index febe0e6b8..2f41917e5 100644 --- a/equinox-fc/Domain/Infrastructure.fs +++ b/equinox-fc/Domain/Infrastructure.fs @@ -21,6 +21,13 @@ module InventoryId = let parse (value : string) : InventoryId = %value let toString (value : InventoryId) : string = %value +type InventoryEpochId = int +and [] inventoryEpochId +module InventoryEpochId = + let parse (value : int) : InventoryEpochId = %value + let next (value : InventoryEpochId) : InventoryEpochId = % (%value + 1) + let toString (value : InventoryEpochId) : string = string %value + type InventoryTransactionId = string and [] inventoryTransactionId module InventoryTransactionId = diff --git a/equinox-fc/Domain/Inventory.fs b/equinox-fc/Domain/Inventory.fs index 6d6578df4..d27768d11 100644 --- a/equinox-fc/Domain/Inventory.fs +++ b/equinox-fc/Domain/Inventory.fs @@ -1,6 +1,7 @@ namespace Fc.Domain.Inventory open Equinox.Core // we use Equinox's AsyncCacheCell helper below +open FSharp.UMX type internal IdsCache<'Id>() = let all = System.Collections.Concurrent.ConcurrentDictionary<'Id, unit>() // Bounded only by relatively low number of physical pick tickets IRL @@ -8,41 +9,56 @@ type internal IdsCache<'Id>() = member __.Add ids = for x in ids do all.[x] <- () member __.Contains id = all.ContainsKey id -/// Ingests items into a log of items, making a best effort at deduplicating as it writes -/// Prior to first add, reads recent ids, in order to minimize the number of duplicated Ids we ingest -type Service internal (inventoryId, epochs : Epoch.Service) = +/// Maintains active Epoch Id in a thread-safe manner while ingesting items into the `series` of `epochs` +/// Prior to first add, reads `lookBack` epochs to seed the cache, in order to minimize the number of duplicated Ids we ingest +type Service internal (inventoryId, series : Series.Service, epochs : Epoch.Service, lookBack, capacity) = - static let log = Serilog.Log.ForContext() + let log = Serilog.Log.ForContext() + + // Maintains what we believe to be the currently open EpochId + // Guaranteed to be set only after `previousIds.AwaitValue()` + let mutable activeEpochId = Unchecked.defaultof<_> // We want max one request in flight to establish the pre-existing Events from which the TransactionIds cache will be seeded - let previousIds : AsyncCacheCell> = - let read = async { let! r = epochs.TryIngest(inventoryId, Seq.empty) in return r.transactionIds } - AsyncCacheCell read + let previousEpochs = AsyncCacheCell> list> <| async { + let! startingId = series.ReadIngestionEpoch(inventoryId) + activeEpochId <- %startingId + let read epochId = async { let! r = epochs.TryIngest(inventoryId, epochId, (fun _ -> 1), Seq.empty) in return r.transactionIds } + return [ for epoch in (max 0 (%startingId - lookBack)) .. (%startingId - 1) -> AsyncCacheCell(read %epoch) ] } // TransactionIds cache - used to maintain a list of transactions that have already been ingested in order to avoid db round-trips let previousIds : AsyncCacheCell> = AsyncCacheCell <| async { - let! previousIds = previousIds.AwaitValue() - return IdsCache.Create(previousIds) } + let! previousEpochs = previousEpochs.AwaitValue() + let! ids = seq { for x in previousEpochs -> x.AwaitValue() } |> Async.Parallel + return IdsCache.Create(Seq.concat ids) } let tryIngest events = async { let! previousIds = previousIds.AwaitValue() + let initialEpochId = %activeEpochId - let rec aux totalIngested items = async { + let rec aux epochId totalIngested items = async { let SeqPartition f = Seq.toArray >> Array.partition f let dup, fresh = items |> SeqPartition (Epoch.Events.chooseInventoryTransactionId >> Option.exists previousIds.Contains) let fullCount = List.length items let dropping = fullCount - Array.length fresh - if dropping <> 0 then log.Information("Ignoring {count}/{fullCount} duplicate ids: {ids}", dropping, fullCount, dup) + if dropping <> 0 then log.Information("Ignoring {count}/{fullCount} duplicate ids: {ids} for {epochId}", dropping, fullCount, dup, epochId) if Array.isEmpty fresh then return totalIngested else - let! res = epochs.TryIngest(inventoryId, fresh) - log.Information("Added {count} items to {inventoryId:l}", res.added, inventoryId) + let! res = epochs.TryIngest(inventoryId, epochId, capacity, fresh) + log.Information("Added {count} items to {inventoryId:l}/{epochId}", res.added, inventoryId, epochId) // The adding is potentially redundant; we don't care previousIds.Add res.transactionIds + // Any writer noticing we've moved to a new epoch shares the burden of marking it active + if not res.isClosed && activeEpochId < %epochId then + log.Information("Marking {inventoryId:l}/{epochId} active", inventoryId, epochId) + do! series.AdvanceIngestionEpoch(inventoryId, epochId) + System.Threading.Interlocked.CompareExchange(&activeEpochId, %epochId, activeEpochId) |> ignore let totalIngestedTransactions = totalIngested + res.added - return totalIngestedTransactions } - return! aux 0 events + match res.rejected with + | [] -> return totalIngestedTransactions + | rej -> return! aux (InventoryEpochId.next epochId) totalIngestedTransactions rej } + return! aux initialEpochId 0 events } /// Upon startup, we initialize the TransactionIds cache with recent epochs; we want to kick that process off before our first ingest @@ -53,11 +69,22 @@ type Service internal (inventoryId, epochs : Epoch.Service) = module internal Helpers = - let create inventoryId epochs = - Service(inventoryId, epochs) + let create inventoryId (maxTransactionsPerEpoch, lookBackLimit) (series, epochs) = + let remainingEpochCapacity (state: Epoch.Fold.State) = + let currentLen = state.ids.Count + max 0 (maxTransactionsPerEpoch - currentLen) + Service(inventoryId, series, epochs, lookBack=lookBackLimit, capacity=remainingEpochCapacity) + +module Cosmos = + + let create inventoryId (maxTransactionsPerEpoch, lookBackLimit) (context, cache) = + let series = Series.Cosmos.create (context, cache) + let epochs = Epoch.Cosmos.create (context, cache) + Helpers.create inventoryId (maxTransactionsPerEpoch, lookBackLimit) (series, epochs) module EventStore = - let create inventoryId (context, cache) = + let create inventoryId (maxTransactionsPerEpoch, lookBackLimit) (context, cache) = + let series = Series.EventStore.create (context, cache) let epochs = Epoch.EventStore.create (context, cache) - Helpers.create inventoryId epochs + Helpers.create inventoryId (maxTransactionsPerEpoch, lookBackLimit) (series, epochs) diff --git a/equinox-fc/Domain/InventoryEpoch.fs b/equinox-fc/Domain/InventoryEpoch.fs index eff0cb5a8..5b6086867 100644 --- a/equinox-fc/Domain/InventoryEpoch.fs +++ b/equinox-fc/Domain/InventoryEpoch.fs @@ -1,26 +1,30 @@ /// Manages the ingestion (and deduplication based on a TransactionId) of events reflecting transfers or stock adjustments /// that have been effected across a given set of Inventory -/// See Inventory.Service for surface level API which manages the ingestion +/// See Inventory.Service for surface level API which manages the ingestion, including transitioning to a new Epoch when an epoch reaches 'full' state module Fc.Domain.Inventory.Epoch let [] Category = "InventoryEpoch" -let streamName inventoryId = FsCodec.StreamName.compose Category [InventoryId.toString inventoryId; "0"] +let streamName (inventoryId, epochId) = FsCodec.StreamName.compose Category [InventoryId.toString inventoryId; InventoryEpochId.toString epochId] // NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care [] module Events = type TransactionRef = { transactionId : InventoryTransactionId } + type Snapshotted = { closed: bool; ids : InventoryTransactionId[] } type Event = | Adjusted of TransactionRef | Transferred of TransactionRef + | Closed + | Snapshotted of Snapshotted interface TypeShape.UnionContract.IUnionContract let codec = FsCodec.NewtonsoftJson.Codec.Create() /// Used for deduplicating input events let chooseInventoryTransactionId = function | Adjusted { transactionId = id } | Transferred { transactionId = id } -> Some id + | Closed | Snapshotted _ -> None module Fold = @@ -28,41 +32,71 @@ module Fold = let initial = { closed = false; ids = Set.empty } let evolve state = function | (Events.Adjusted e | Events.Transferred e) -> { state with ids = Set.add e.transactionId state.ids } + | Events.Closed -> { state with closed = true } + | Events.Snapshotted e -> { closed = e.closed; ids = Set.ofArray e.ids } let fold : State -> Events.Event seq -> State = Seq.fold evolve + let isOrigin = function Events.Snapshotted _ -> true | _ -> false + let snapshot s = Events.Snapshotted { closed = s.closed; ids = Array.ofSeq s.ids } type Result = - { /// Count of items added to this epoch. May be less than requested due to removal of duplicates and/or rejected items + { /// Indicates whether this epoch is closed (either previously or as a side-effect this time) + isClosed : bool + /// Count of items added to this epoch. May be less than requested due to removal of duplicates and/or rejected items added : int + /// residual items that [are not duplicates and] were not accepted into this epoch + rejected : Events.Event list /// identifiers for all items in this epoch transactionIds : Set } -let decideSync events (state : Fold.State) : Result * Events.Event list = +let decideSync capacity events (state : Fold.State) : Result * Events.Event list = let isFresh = function | Events.Adjusted { transactionId = id } | Events.Transferred { transactionId = id } -> (not << state.ids.Contains) id + | Events.Closed | Events.Snapshotted _ -> false let news = events |> Seq.filter isFresh |> List.ofSeq - let newCount = List.length news - let events = [ if newCount <> 0 then yield! news ] + let closed, allowing, markClosed, residual = + let newCount = List.length news + if state.closed then + true, 0, false, news + else + let capacityNow = capacity state + let accepting = min capacityNow newCount + let closing = accepting = capacityNow + let residual = List.skip accepting news + closing, accepting, closing, residual + let events = + [ if allowing <> 0 then yield! news + if markClosed then yield Events.Closed ] let state' = Fold.fold state events - { added = newCount; transactionIds = state'.ids }, events + { isClosed = closed; added = allowing; rejected = residual; transactionIds = state'.ids }, events -type Service internal (resolve : InventoryId -> Equinox.Stream) = +type Service internal (resolve : InventoryId * InventoryEpochId -> Equinox.Stream) = /// Attempt ingestion of `events` into the cited Epoch. /// - None will be accepted if the Epoch is `closed` /// - The `capacity` function will be passed a non-closed `state` in order to determine number of items that can be admitted prior to closing /// - If the computed capacity result is >= the number of items being submitted (which may be 0), the Epoch will be marked Closed /// NOTE the result may include rejected items (which the caller is expected to feed into a successor epoch) - member __.TryIngest(inventoryId, events) : Async = - let stream = resolve inventoryId - stream.Transact(decideSync events) + member __.TryIngest(inventoryId, epochId, capacity, events) : Async = + let stream = resolve (inventoryId, epochId) + stream.Transact(decideSync capacity events) let create resolve = let resolve ids = let stream = resolve (streamName ids) - Equinox.Stream(Serilog.Log.ForContext(), stream, maxAttempts=2) + Equinox.Stream(Serilog.Log.ForContext(), stream, maxAttempts = 2) Service(resolve) +module Cosmos = + + open Equinox.Cosmos + + let accessStrategy = AccessStrategy.Snapshot (Fold.isOrigin, Fold.snapshot) + let create (context, cache) = + let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) + let resolver = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy) + create resolver.Resolve + module EventStore = open Equinox.EventStore diff --git a/equinox-fc/Domain/InventorySeries.fs b/equinox-fc/Domain/InventorySeries.fs new file mode 100644 index 000000000..9d83ef4c8 --- /dev/null +++ b/equinox-fc/Domain/InventorySeries.fs @@ -0,0 +1,71 @@ +/// Manages a) the ingestion epoch id b) the current checkpointed read position for a long-running Inventory Series +/// See InventoryEpoch for the logic managing the actual events logged within a given epoch +/// See Inventory.Service for the surface API which manages the writing +module Fc.Domain.Inventory.Series + +let [] Category = "InventorySeries" +let streamName inventoryId = FsCodec.StreamName.create Category (InventoryId.toString inventoryId) + +// NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care +[] +module Events = + + type Started = { epoch : InventoryEpochId } + type Event = + | Started of Started + interface TypeShape.UnionContract.IUnionContract + let codec = FsCodec.NewtonsoftJson.Codec.Create() + +module Fold = + + type State = InventoryEpochId option + let initial = None + let evolve _state = function + | Events.Started e -> Some e.epoch + let fold : State -> Events.Event seq -> State = Seq.fold evolve + +let queryActiveEpoch state = state |> Option.defaultValue (InventoryEpochId.parse 0) + +let interpretAdvanceIngestionEpoch epochId (state : Fold.State) = + if queryActiveEpoch state >= epochId then [] + else [Events.Started { epoch = epochId }] + +type Service internal (resolve : InventoryId -> Equinox.Stream) = + + member __.ReadIngestionEpoch(inventoryId) : Async = + let stream = resolve inventoryId + stream.Query queryActiveEpoch + + member __.AdvanceIngestionEpoch(inventoryId, epochId) : Async = + let stream = resolve inventoryId + stream.Transact(interpretAdvanceIngestionEpoch epochId) + +let create resolve = + // For this stream, we uniformly use stale reads as: + // a) we don't require any information from competing writers + // b) while there are competing writers [which might cause us to have to retry a Transact], this should be infrequent + let opt = Equinox.ResolveOption.AllowStale + let resolve locationId = + let stream = resolve (streamName locationId, opt) + Equinox.Stream(Serilog.Log.ForContext(), stream, maxAttempts = 2) + Service(resolve) + +module Cosmos = + + open Equinox.Cosmos + + let accessStrategy = AccessStrategy.LatestKnownEvent + let create (context, cache) = + let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) + let resolver = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy) + create <| fun (id, opt) -> resolver.Resolve(id, opt) + +module EventStore = + + open Equinox.EventStore + + let accessStrategy = AccessStrategy.LatestKnownEvent + let create (context, cache) = + let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) + let resolver = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy) + create <| fun (id, opt) -> resolver.Resolve(id, opt) diff --git a/equinox-fc/Domain/Location.fs b/equinox-fc/Domain/Location.fs index dddfd29fb..22d42731a 100644 --- a/equinox-fc/Domain/Location.fs +++ b/equinox-fc/Domain/Location.fs @@ -40,6 +40,13 @@ module Helpers = let create (zeroBalance, toBalanceCarriedForward, shouldClose) (series, epochs) = Service(zeroBalance, toBalanceCarriedForward, shouldClose, series, epochs) +module Cosmos = + + let create (zeroBalance, toBalanceCarriedForward, shouldClose) (context, cache, maxAttempts) = + let series = Series.Cosmos.create (context, cache, maxAttempts) + let epochs = Epoch.Cosmos.create (context, cache, maxAttempts) + create (zeroBalance, toBalanceCarriedForward, shouldClose) (series, epochs) + module EventStore = let create (zeroBalance, toBalanceCarriedForward, shouldClose) (context, cache, maxAttempts) = diff --git a/equinox-fc/Domain/LocationEpoch.fs b/equinox-fc/Domain/LocationEpoch.fs index 1b0422e05..fb0e1c52f 100644 --- a/equinox-fc/Domain/LocationEpoch.fs +++ b/equinox-fc/Domain/LocationEpoch.fs @@ -155,6 +155,16 @@ let create resolve maxAttempts = Equinox.Stream(Serilog.Log.ForContext(), stream, maxAttempts=maxAttempts) Service (resolve) +module Cosmos = + + open Equinox.Cosmos + + let accessStrategy = AccessStrategy.Unoptimized + let create (context, cache, maxAttempts) = + let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) + let resolver = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy) + create resolver.Resolve maxAttempts + module EventStore = open Equinox.EventStore diff --git a/equinox-fc/Domain/LocationSeries.fs b/equinox-fc/Domain/LocationSeries.fs index d58274aa8..e4d800361 100644 --- a/equinox-fc/Domain/LocationSeries.fs +++ b/equinox-fc/Domain/LocationSeries.fs @@ -44,6 +44,17 @@ let create resolve maxAttempts = Equinox.Stream(Serilog.Log.ForContext(), stream, maxAttempts=maxAttempts) Service(resolve) +module Cosmos = + + open Equinox.Cosmos + + let accessStrategy = AccessStrategy.LatestKnownEvent + let create (context, cache, maxAttempts) = + let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) + let resolver = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy) + let resolve (id, opt) = resolver.Resolve(id, opt) + create resolve maxAttempts + module EventStore = open Equinox.EventStore diff --git a/equinox-fc/Domain/StockProcessManager.fs b/equinox-fc/Domain/StockProcessManager.fs index 4722c3c29..e1eebb6d5 100644 --- a/equinox-fc/Domain/StockProcessManager.fs +++ b/equinox-fc/Domain/StockProcessManager.fs @@ -45,13 +45,24 @@ type Service(transactions : StockTransaction.Service, locations : Location.Servi member __.Drive(transactionId) = async { let! _ = execute transactionId None in () } +module Cosmos = + + let create (context, cache) inventoryId (maxTransactionsPerEpoch, lookBackLimit) (epochLen, idsWindow, maxAttempts) = + let transactions, locations, inventory = + let transactions = StockTransaction.Cosmos.create (context, cache) + let zero, cf, sc = Location.Epoch.zeroBalance, Location.Epoch.toBalanceCarriedForward idsWindow, Location.Epoch.shouldClose epochLen + let locations = Location.Cosmos.create (zero, cf, sc) (context, cache, maxAttempts) + let inventory = Inventory.Cosmos.create inventoryId (maxTransactionsPerEpoch, lookBackLimit) (context, cache) + transactions, locations, inventory + Service(transactions, locations, inventory) + module EventStore = - let create (context, cache) inventoryId (epochLen, idsWindow, maxAttempts) = + let create (context, cache) inventoryId (maxTransactionsPerEpoch, lookBackLimit) (epochLen, idsWindow, maxAttempts) = let transactions, locations, inventory = let transactions = StockTransaction.EventStore.create (context, cache) let zero, cf, sc = Location.Epoch.zeroBalance, Location.Epoch.toBalanceCarriedForward idsWindow, Location.Epoch.shouldClose epochLen let locations = Location.EventStore.create (zero, cf, sc) (context, cache, maxAttempts) - let inventory = Inventory.EventStore.create inventoryId (context, cache) + let inventory = Inventory.EventStore.create inventoryId (maxTransactionsPerEpoch, lookBackLimit) (context, cache) transactions, locations, inventory Service(transactions, locations, inventory) diff --git a/equinox-fc/Domain/StockTransaction.fs b/equinox-fc/Domain/StockTransaction.fs index f21d6513f..7227ec118 100644 --- a/equinox-fc/Domain/StockTransaction.fs +++ b/equinox-fc/Domain/StockTransaction.fs @@ -144,6 +144,17 @@ let create resolve = Equinox.Stream(Serilog.Log.ForContext(), stream, maxAttempts=2) Service(resolve) +module Cosmos = + + open Equinox.Cosmos + + // in the happy path case, the event stream will typically be short, and the state cached, so snapshotting is less critical + let accessStrategy = AccessStrategy.Unoptimized + let create (context, cache) = + let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) + let resolver = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy) + create <| fun (id, opt) -> resolver.Resolve(id, opt) + module EventStore = open Equinox.EventStore diff --git a/equinox-fc/Web/Program.fs b/equinox-fc/Web/Program.fs index aa9562926..2a8cf25ba 100644 --- a/equinox-fc/Web/Program.fs +++ b/equinox-fc/Web/Program.fs @@ -37,24 +37,28 @@ module Args = | Some x -> x let isEnvVarTrue varName = EnvVar.tryGet varName |> Option.exists (fun s -> String.Equals(s, bool.TrueString, StringComparison.OrdinalIgnoreCase)) open Argu + open Equinox.Cosmos open Equinox.EventStore [] type Parameters = | [] Verbose | [] Es of ParseResults + | [] Cosmos of ParseResults interface IArgParserTemplate with member a.Usage = match a with | Verbose -> "request Verbose Logging. Default: off." | Es _ -> "specify EventStore input parameters." + | Cosmos _ -> "specify CosmosDB input parameters." and Arguments(a : ParseResults) = member __.Verbose = a.Contains Parameters.Verbose member __.StatsInterval = TimeSpan.FromMinutes 1. - member val Source : EsArguments = + member val Source : Choice = match a.TryGetSubCommand() with - | Some (Es es) -> (EsArguments es) + | Some (Es es) -> Choice1Of2 (EsArguments es) + | Some (Cosmos cosmos) -> Choice2Of2 (CosmosArguments cosmos) | _ -> raise (MissingArg "Must specify one of cosmos or es for Src") and [] EsParameters = | [] Verbose @@ -82,7 +86,7 @@ module Args = match __.Tcp, __.Port with | false, None -> Discovery.GossipDns __.Host | false, Some p -> Discovery.GossipDnsCustomPort (__.Host, p) - | true, None -> Discovery.Uri (UriBuilder("tcp", __.Host, 1113).Uri) + | true, None -> Discovery.Uri (UriBuilder("tcp", __.Host).Uri) | true, Some p -> Discovery.Uri (UriBuilder("tcp", __.Host, p).Uri) member __.Tcp = a.Contains Tcp || isEnvVarTrue "EQUINOX_ES_TCP" member __.Port = match a.TryGetResult Port with Some x -> Some x | None -> EnvVar.tryGet "EQUINOX_ES_PORT" |> Option.map int @@ -102,6 +106,40 @@ module Args = let tags=["M", Environment.MachineName; "I", Guid.NewGuid() |> string] Connector(x.User, x.Password, x.Timeout, x.Retries, log=log, heartbeatTimeout=x.Heartbeat, tags=tags) .Establish(appName, discovery, connectionStrategy) |> Async.RunSynchronously + and [] CosmosParameters = + | [] Connection of string + | [] ConnectionMode of ConnectionMode + | [] Database of string + | [] Container of string + | [] Timeout of float + | [] Retries of int + | [] RetriesWaitTime of float + interface IArgParserTemplate with + member a.Usage = + match a with + | ConnectionMode _ -> "override the connection mode. Default: Direct." + | Connection _ -> "specify a connection string for a Cosmos account. (optional if environment variable EQUINOX_COSMOS_CONNECTION specified)" + | Database _ -> "specify a database name for Cosmos store. (optional if environment variable EQUINOX_COSMOS_DATABASE specified)" + | Container _ -> "specify a container name for Cosmos store. (optional if environment variable EQUINOX_COSMOS_CONTAINER specified)" + | Timeout _ -> "specify operation timeout in seconds. Default: 5." + | Retries _ -> "specify operation retries. Default: 1." + | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds. Default: 5." + and CosmosArguments(a : ParseResults) = + member __.Mode = a.GetResult(CosmosParameters.ConnectionMode, Equinox.Cosmos.ConnectionMode.Direct) + member __.Connection = a.TryGetResult CosmosParameters.Connection |> defaultWithEnvVar "EQUINOX_COSMOS_CONNECTION" "Connection" + member __.Database = a.TryGetResult CosmosParameters.Database |> defaultWithEnvVar "EQUINOX_COSMOS_DATABASE" "Database" + member __.Container = a.TryGetResult CosmosParameters.Container |> defaultWithEnvVar "EQUINOX_COSMOS_CONTAINER" "Container" + member __.Timeout = a.GetResult(CosmosParameters.Timeout, 5.) |> TimeSpan.FromSeconds + member __.Retries = a.GetResult(CosmosParameters.Retries, 1) + member __.MaxRetryWaitTime = a.GetResult(CosmosParameters.RetriesWaitTime, 5.) |> TimeSpan.FromSeconds + member x.BuildConnectionDetails() = + let (Discovery.UriAndKey (endpointUri, _) as discovery) = Discovery.FromConnectionString x.Connection + Log.Information("CosmosDb {mode} {endpointUri} Database {database} Container {container}", + x.Mode, endpointUri, x.Database, x.Container) + Log.Information("CosmosDb timeout {timeout}s; Throttling retries {retries}, max wait {maxRetryWaitTime}s", + (let t = x.Timeout in t.TotalSeconds), x.Retries, (let t = x.MaxRetryWaitTime in t.TotalSeconds)) + let connector = Equinox.Cosmos.Connector(x.Timeout, x.Retries, x.MaxRetryWaitTime, Log.Logger, mode=x.Mode) + discovery, x.Database, x.Container, connector /// Parse the commandline; can throw exceptions in response to missing arguments and/or `-h`/`--help` args let parse argv = @@ -149,12 +187,18 @@ type Startup() = let build (args : Args.Arguments) = let cache = Equinox.Cache(AppName, sizeMb=10) let create = - let es = args.Source - let connection = es.Connect(Log.Logger, Log.Logger, AppName, Equinox.EventStore.ConnectionStrategy.ClusterSingle Equinox.EventStore.NodePreference.Master) - let context = Equinox.EventStore.Context(connection, Equinox.EventStore.BatchingPolicy(maxBatchSize=500)) - Fc.Domain.StockProcessManager.EventStore.create (context, cache) + match args.Source with + | Choice1Of2 es -> + let connection = es.Connect(Log.Logger, Log.Logger, AppName, Equinox.EventStore.ConnectionStrategy.ClusterSingle Equinox.EventStore.NodePreference.Master) + let context = Equinox.EventStore.Context(connection, Equinox.EventStore.BatchingPolicy(maxBatchSize=500)) + Fc.Domain.StockProcessManager.EventStore.create (context, cache) + | Choice2Of2 cosmos -> + let (discovery, database, container, connector) = cosmos.BuildConnectionDetails() + let connection = connector.Connect(AppName, discovery) |> Async.RunSynchronously + let context = Equinox.Cosmos.Context(connection, database, container) + Fc.Domain.StockProcessManager.Cosmos.create (context, cache) let inventoryId = InventoryId.parse "FC000" - create inventoryId (1000, 5, 3) + create inventoryId (1000, 10) (1000, 5, 3) let run argv args = let processManager = build args