Skip to content

Commit

Permalink
refactor(Cosmos)!: Cleanup CosmosClientFactory (#430)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink authored Sep 27, 2023
1 parent 84a6f95 commit 551d321
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 97 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `CosmosStore`: Only log `bytes` when log level is `Debug` [#305](https://github.com/jet/equinox/pull/305)
- `CosmosStore.AccessStrategy.MultiSnapshot`,`Custom`: Change `list` and `seq` types to `array` [#338](https://github.com/jet/equinox/pull/338)
- `CosmosStore.Core.Initialization.initAux`: Replace hard-coded manual 400 RU with `mode` parameter [#328](https://github.com/jet/equinox/pull/328) :pray: [@brihadish](https://github.com/brihadish)
- `CosmosStore.CosmosClientFactory`: Moved to Core [#430](https://github.com/jet/equinox/pull/430)
- `EventStore`: Target `EventStore.Client` v `22.0.0-preview`; rename `Connector` -> `EventStoreConnector` [#317](https://github.com/jet/equinox/pull/317)
- `Tool`/`samples/`: switched to use `Equinox.EventStoreDb` [#196](https://github.com/jet/equinox/pull/196)

Expand Down
11 changes: 4 additions & 7 deletions DOCUMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -1840,7 +1840,6 @@ following key benefits:
### Example Code

```fsharp
open Equinox.CosmosStore.Core
// open MyCodecs.Json // example of using specific codec which can yield UTF-8
// byte arrays from a type using `Json.toBytes` via Fleece
Expand All @@ -1852,22 +1851,20 @@ type EventData with
// Load connection string from your Key Vault (example here is the CosmosDB
// simulator's well known key)
// see https://github.com/jet/equinox-provisioning-cosmosdb
// see https://github.com/jet/equinox#provisioning-cosmosdb
let connectionString: string =
"AccountEndpoint=https://localhost:8081;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==;"
// Forward to Log (you can use `Log.Logger` and/or `Log.ForContext` if your app
// uses Serilog already)
let outputLog = LoggerConfiguration().WriteTo.NLog().CreateLogger()
// Forward to Log (use `Log.Logger` if your app already uses Serilog)
let outputLog = LoggerConfiguration().WriteTo.Console().CreateLogger()
// Serilog has a `ForContext<T>()`, but if you are using a `module` for the
// wiring, you might create a tagged logger like this:
let gatewayLog =
outputLog.ForContext(Serilog.Core.Constants.SourceContextPropertyName, "Equinox")
let discovery = Discovery.ConnectionString (read "EQUINOX_COSMOS_CONNECTION")
let connector: Equinox.CosmosStore.CosmosStoreConnector =
CosmosStoreConnector(
discovery,
Equinox.CosmosStore.Discovery.ConnectionString connectionString,
requestTimeout = TimeSpan.FromSeconds 5.,
maxRetryAttemptsOnRateLimitedRequests = 1,
maxRetryWaitTimeOnRateLimitedRequests = TimeSpan.FromSeconds 3.)
Expand Down
20 changes: 12 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -665,21 +665,25 @@ For more complete instructions, follow https://developers.eventstore.com/server/
#### Using Azure Cosmos DB Service
dotnet run --project tools/Equinox.Tool -- init -ru 400 `
cosmos -s $env:EQUINOX_COSMOS_CONNECTION -d $env:EQUINOX_COSMOS_DATABASE -c $env:EQUINOX_COSMOS_CONTAINER
# Same for a Archive Container for integration testing of the archive store fallback mechanism
$env:EQUINOX_COSMOS_CONTAINER_ARCHIVE="equinox-test-archive"
dotnet run --project tools/Equinox.Tool -- init -ru 400 `
cosmos -s $env:EQUINOX_COSMOS_CONNECTION -d $env:EQUINOX_COSMOS_DATABASE -c $env:EQUINOX_COSMOS_CONTAINER_ARCHIVE
```bash
dotnet run --project tools/Equinox.Tool -- init -ru 400 `
cosmos -s $env:EQUINOX_COSMOS_CONNECTION -d $env:EQUINOX_COSMOS_DATABASE -c $env:EQUINOX_COSMOS_CONTAINER
# Same for a Archive Container for integration testing of the archive store fallback mechanism
$env:EQUINOX_COSMOS_CONTAINER_ARCHIVE="equinox-test-archive"
dotnet run --project tools/Equinox.Tool -- init -ru 400 `
cosmos -s $env:EQUINOX_COSMOS_CONNECTION -d $env:EQUINOX_COSMOS_DATABASE -c $env:EQUINOX_COSMOS_CONTAINER_ARCHIVE
```
#### Using Cosmos Emulator on an Intel Mac
NOTE There's [no Apple Silicon emulator available as yet](https://github.com/Azure/azure-cosmos-db-emulator-docker/issues/54#issuecomment-1399067365).
NOTE Have not tested with the Windows Emulator, but it should work with analogous steps.
docker compose up equinox-cosmos -d
bash docker-compose-cosmos.sh
```bash
docker compose up equinox-cosmos -d
bash docker-compose-cosmos.sh
```
### Provisioning SqlStreamStore
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ services:
- AZURE_COSMOS_EMULATOR_IP_ADDRESS_OVERRIDE=127.0.0.1
ports:
- "8081:8081" # so docker-cosmos-init.sh can get the cert and/or humans can use https://localhost:8081/_explorer/index.html
- "10250-10256:10250-10256" # tests connect using Direct mode
- "10250-10255:10250-10255" # tests connect using Direct mode

equinox-mssql:
container_name: equinox-mssql
Expand Down
122 changes: 51 additions & 71 deletions src/Equinox.CosmosStore/CosmosStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1107,6 +1107,35 @@ module ConnectionString =
| true, (:? string as s) when not (String.IsNullOrEmpty s) -> s
| _ -> invalidOp "Connection string does not contain an \"AccountEndpoint\""

[<RequireQualifiedAccess; NoComparison>]
type DiscoveryMode =
| AccountUriAndKey of accountUri: string * key: string
| ConnectionString of connectionString: string
member x.Endpoint = x |> function
| DiscoveryMode.AccountUriAndKey (u, _k) -> u
| DiscoveryMode.ConnectionString (ConnectionString.AccountEndpoint e) -> e

/// Manages establishing a CosmosClient, which is used by CosmosStoreClient to read from the underlying Cosmos DB Container.
type CosmosClientFactory(options) =
static member CreateDefaultOptions(requestTimeout: TimeSpan, maxRetryAttemptsOnRateLimitedRequests: int, maxRetryWaitTimeOnRateLimitedRequests: TimeSpan) =
CosmosClientOptions(
MaxRetryAttemptsOnRateLimitedRequests = maxRetryAttemptsOnRateLimitedRequests,
MaxRetryWaitTimeOnRateLimitedRequests = maxRetryWaitTimeOnRateLimitedRequests,
RequestTimeout = requestTimeout,
Serializer = CosmosJsonSerializer(JsonSerializerOptions()))
/// CosmosClientOptions for this CosmosClientFactory as configured (NOTE while the Options object is not immutable, it should not have setters called on it)
member val Options = options
/// Creates an instance of CosmosClient without actually validating or establishing the connection
/// It's recommended to use <c>CreateAndInitializeAsync</c> in preference to this API
/// in order to avoid latency spikes, and/or deferring discovery of connectivity or permission issues.
member x.CreateUninitialized(discovery: DiscoveryMode) = discovery |> function
| DiscoveryMode.AccountUriAndKey (accountUri = uri; key = key) -> new CosmosClient(uri, key, x.Options)
| DiscoveryMode.ConnectionString cs -> new CosmosClient(cs, x.Options)
/// Creates and validates a CosmosClient [including loading metadata](https://devblogs.microsoft.com/cosmosdb/improve-net-sdk-initialization) for the specified containers
member x.CreateAndInitializeAsync(discovery: DiscoveryMode, containers, ct) = discovery |> function
| DiscoveryMode.AccountUriAndKey (accountUri = uri; key = key) -> CosmosClient.CreateAndInitializeAsync(uri, key, containers, x.Options, ct)
| DiscoveryMode.ConnectionString cs -> CosmosClient.CreateAndInitializeAsync(cs, containers, x.Options, ct)

namespace Equinox.CosmosStore

open Equinox.Core
Expand All @@ -1120,63 +1149,9 @@ type Discovery =
| AccountUriAndKey of accountUri: Uri * key: string
/// Cosmos SDK Connection String
| ConnectionString of connectionString: string
member x.Endpoint: Uri = x |> function
| Discovery.AccountUriAndKey (u, _k) -> u
| Discovery.ConnectionString (ConnectionString.AccountEndpoint e) -> Uri e

/// Manages establishing a CosmosClient, which is used by CosmosStoreClient to read from the underlying Cosmos DB Container.
[<System.ComponentModel.EditorBrowsable(System.ComponentModel.EditorBrowsableState.Never)>]
type CosmosClientFactory
( // Timeout to apply to individual reads/write round-trips going to CosmosDB. CosmosDB Default: 1m.
requestTimeout: TimeSpan,
// Maximum number of times to attempt when failure reason is a 429 from CosmosDB, signifying RU limits have been breached. CosmosDB default: 9
maxRetryAttemptsOnRateLimitedRequests: int,
// Maximum number of seconds to wait (especially if a higher wait delay is suggested by CosmosDB in the 429 response). CosmosDB default: 30s
maxRetryWaitTimeOnRateLimitedRequests: TimeSpan,
// Connection mode (default: ConnectionMode.Direct (best performance, same as Microsoft.Azure.Cosmos SDK default)
// NOTE: default for Equinox.Cosmos.Connector (i.e. V2) was Gateway (worst performance, least trouble, Microsoft.Azure.DocumentDb SDK default)
[<O; D null>] ?mode: ConnectionMode,
// Connection limit for Gateway Mode. CosmosDB default: 50
[<O; D null>] ?gatewayModeMaxConnectionLimit,
// consistency mode (default: ConsistencyLevel.Session)
[<O; D null>] ?defaultConsistencyLevel: ConsistencyLevel,
// Inhibits certificate verification when set to <c>true</c>, i.e. for working with the CosmosDB Emulator (default <c>false</c>)
[<O; D null>] ?bypassCertificateValidation: bool) =

/// CosmosClientOptions for this CosmosClientFactory as configured
member val Options =
let co = CosmosClientOptions(
MaxRetryAttemptsOnRateLimitedRequests = maxRetryAttemptsOnRateLimitedRequests,
MaxRetryWaitTimeOnRateLimitedRequests = maxRetryWaitTimeOnRateLimitedRequests,
RequestTimeout = requestTimeout,
Serializer = CosmosJsonSerializer(System.Text.Json.JsonSerializerOptions()))
match mode with
| None | Some ConnectionMode.Direct -> co.ConnectionMode <- ConnectionMode.Direct
| Some ConnectionMode.Gateway | Some _ (* enum total match :( *) -> co.ConnectionMode <- ConnectionMode.Gateway // only supports Https
match gatewayModeMaxConnectionLimit with
| Some _ when co.ConnectionMode = ConnectionMode.Direct -> invalidArg "gatewayModeMaxConnectionLimit" "Not admissible in Direct mode"
| x -> if co.ConnectionMode = ConnectionMode.Gateway then co.GatewayModeMaxConnectionLimit <- defaultArg x 50
match defaultConsistencyLevel with
| Some x -> co.ConsistencyLevel <- x
| None -> ()
// https://github.com/Azure/azure-cosmos-dotnet-v3/blob/1ef6e399f114a0fd580272d4cdca86b9f8732cf3/Microsoft.Azure.Cosmos.Samples/Usage/HttpClientFactory/Program.cs#L96
if bypassCertificateValidation = Some true && co.ConnectionMode = ConnectionMode.Gateway then
let cb = System.Net.Http.HttpClientHandler.DangerousAcceptAnyServerCertificateValidator
let ch = new System.Net.Http.HttpClientHandler(ServerCertificateCustomValidationCallback = cb)
co.HttpClientFactory <- fun () -> new System.Net.Http.HttpClient(ch)
co

/// Creates an instance of CosmosClient without actually validating or establishing the connection
/// It's recommended to use <c>CreateAndInitializeAsync</c> in preference to this API
/// in order to avoid latency spikes, and/or deferring discovery of connectivity or permission issues.
member x.CreateUninitialized(discovery: Discovery) = discovery |> function
| Discovery.AccountUriAndKey (accountUri = uri; key = key) -> new CosmosClient(string uri, key, x.Options)
| Discovery.ConnectionString cs -> new CosmosClient(cs, x.Options)

/// Creates and validates a CosmosClient [including loading metadata](https://devblogs.microsoft.com/cosmosdb/improve-net-sdk-initialization) for the specified containers
member x.CreateAndInitializeAsync(discovery: Discovery, containers, ct) = discovery |> function
| Discovery.AccountUriAndKey (accountUri = uri; key = key) -> CosmosClient.CreateAndInitializeAsync(string uri, key, containers, x.Options, ct)
| Discovery.ConnectionString cs -> CosmosClient.CreateAndInitializeAsync(cs, containers, x.Options, ct)
member x.ToDiscoveryMode() = x |> function
| Discovery.AccountUriAndKey (u, k) -> DiscoveryMode.AccountUriAndKey (string u, k)
| Discovery.ConnectionString c -> DiscoveryMode.ConnectionString c

/// Manages establishing a CosmosClient, which is used by CosmosStoreClient to read from the underlying Cosmos DB Container.
type CosmosStoreConnector
Expand All @@ -1191,32 +1166,37 @@ type CosmosStoreConnector
// Connection mode (default: ConnectionMode.Direct (best performance, same as Microsoft.Azure.Cosmos SDK default)
// NOTE: default for Equinox.Cosmos.Connector (i.e. V2) was Gateway (worst performance, least trouble, Microsoft.Azure.DocumentDb SDK default)
[<O; D null>] ?mode: ConnectionMode,
// Connection limit for Gateway Mode. CosmosDB default: 50
[<O; D null>] ?gatewayModeMaxConnectionLimit,
// consistency mode (default: ConsistencyLevel.Session)
// consistency mode (default: use configuration specified for Database)
[<O; D null>] ?defaultConsistencyLevel: ConsistencyLevel,
// Inhibits certificate verification when set to <c>true</c>, i.e. for working with the CosmosDB Emulator (default <c>false</c>)
[<O; D null>] ?bypassCertificateValidation: bool) =

// Inhibits certificate verification when set to `true`. Default: false.
[<O; D null>] ?bypassCertificateValidation: bool,
[<O; D null>] ?customize: Action<CosmosClientOptions>) =
let discoveryMode = discovery.ToDiscoveryMode()
let factory =
CosmosClientFactory
( requestTimeout, maxRetryAttemptsOnRateLimitedRequests, maxRetryWaitTimeOnRateLimitedRequests, ?mode = mode,
?gatewayModeMaxConnectionLimit = gatewayModeMaxConnectionLimit, ?defaultConsistencyLevel = defaultConsistencyLevel,
?bypassCertificateValidation = bypassCertificateValidation)
let o = CosmosClientFactory.CreateDefaultOptions(requestTimeout, maxRetryAttemptsOnRateLimitedRequests, maxRetryWaitTimeOnRateLimitedRequests)
mode |> Option.iter (fun x -> o.ConnectionMode <- x)
defaultConsistencyLevel |> Option.iter (fun x -> o.ConsistencyLevel <- x)
// https://github.com/Azure/azure-cosmos-dotnet-v3/blob/1ef6e399f114a0fd580272d4cdca86b9f8732cf3/Microsoft.Azure.Cosmos.Samples/Usage/HttpClientFactory/Program.cs#L96
if defaultArg bypassCertificateValidation false then
let cb = System.Net.Http.HttpClientHandler.DangerousAcceptAnyServerCertificateValidator
let ch = new System.Net.Http.HttpClientHandler(ServerCertificateCustomValidationCallback = cb)
o.HttpClientFactory <- fun () -> new System.Net.Http.HttpClient(ch)
customize |> Option.iter (fun c -> c.Invoke o)
CosmosClientFactory o

/// The <c>CosmosClientOptions</c> used when connecting to CosmosDB
member _.Options = factory.Options

/// The Endpoint Uri for the target CosmosDB
member _.Endpoint = discovery.Endpoint
member val Endpoint = discoveryMode.Endpoint |> Uri

/// Creates an instance of CosmosClient without actually validating or establishing the connection
/// It's recommended to use <c>Connect</c> and/or <c>CreateAndInitialize</c> in preference to this API
/// in order to avoid latency spikes, and/or deferring discovery of connectivity or permission issues.
member _.CreateUninitialized() = factory.CreateUninitialized(discovery)
member _.CreateUninitialized() = factory.CreateUninitialized(discoveryMode)

/// Creates and validates a CosmosClient [including loading metadata](https://devblogs.microsoft.com/cosmosdb/improve-net-sdk-initialization) for the specified containers
member _.CreateAndInitializeAsync(containers, ct): Task<CosmosClient> = factory.CreateAndInitializeAsync(discovery, containers, ct)
member _.CreateAndInitializeAsync(containers, ct): Task<CosmosClient> = factory.CreateAndInitializeAsync(discoveryMode, containers, ct)
/// Creates and validates a CosmosClient [including loading metadata](https://devblogs.microsoft.com/cosmosdb/improve-net-sdk-initialization) for the specified containers
member x.CreateAndInitialize(databaseAndContainerIds: struct (string * string)[]) =
Async.call (fun ct -> x.CreateAndInitializeAsync(databaseAndContainerIds, ct))
Expand All @@ -1226,7 +1206,7 @@ type CosmosStoreConnector

/// Creates and validates a CosmosStoreClient [including loading metadata](https://devblogs.microsoft.com/cosmosdb/improve-net-sdk-initialization) for the specified containers
member _.ConnectAsync(containers, ct): Task<CosmosStoreClient> = task {
let! cosmosClient = factory.CreateAndInitializeAsync(discovery, containers, ct)
let! cosmosClient = factory.CreateAndInitializeAsync(discoveryMode, containers, ct)
return CosmosStoreClient(cosmosClient) }
/// Creates and validates a CosmosStoreClient [including loading metadata](https://devblogs.microsoft.com/cosmosdb/improve-net-sdk-initialization) for the specified containers
member x.Connect(databaseAndContainerIds: struct (string * string)[]) =
Expand Down
8 changes: 3 additions & 5 deletions src/Equinox.EventStoreDb/EventStoreDb.fs
Original file line number Diff line number Diff line change
Expand Up @@ -449,23 +449,21 @@ type ConnectionStrategy =
type EventStoreConnector
( reqTimeout: TimeSpan,
[<O; D null>] ?readRetryPolicy, [<O; D null>] ?writeRetryPolicy, [<O; D null>] ?tags,
[<O; D null>] ?customize: EventStoreClientSettings -> unit) =
[<O; D null>] ?customize: Action<EventStoreClientSettings>) =

member _.Connect
( // Name should be sufficient to uniquely identify this connection within a single app instance's logs
name, discovery: Discovery, ?clusterNodePreference): EventStoreClient =
let settings =
match discovery with
| Discovery.ConnectionString s -> EventStoreClientSettings.Create(s)
if name = null then nullArg "name"
let name = String.concat ";" <| seq {
name
string clusterNodePreference
match tags with None -> () | Some tags -> for key, value in tags do sprintf "%s=%s" key value }
let sanitizedName = name.Replace('\'','_').Replace(':','_') // ES internally uses `:` and `'` as separators in log messages and ... people regex logs
let settings = discovery |> function Discovery.ConnectionString s -> EventStoreClientSettings.Create s
settings.ConnectionName <- sanitizedName
match clusterNodePreference with None -> () | Some np -> settings.ConnectivitySettings.NodePreference <- np
match customize with None -> () | Some f -> f settings
match customize with None -> () | Some f -> f.Invoke settings
settings.DefaultDeadline <- reqTimeout
// TODO implement reqRetries
new EventStoreClient(settings)
Expand Down
4 changes: 2 additions & 2 deletions tests/Equinox.CosmosStore.Integration/CosmosFixtures.fs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ let private databaseId = tryRead "EQUINOX_COSMOS_DATABASE" |> Option.defaultValu
let private containerId = tryRead "EQUINOX_COSMOS_CONTAINER" |> Option.defaultValue "equinox-test"
let private archiveContainerId = tryRead "EQUINOX_COSMOS_CONTAINER_ARCHIVE" |> Option.defaultValue "equinox-test-archive"

// see https://github.com/jet/equinox-provisioning-cosmosdb for details of what's expected in terms of provisioned containers etc
// see https://github.com/jet/equinox#provisioning-cosmosdb for details of what's expected in terms of provisioned containers etc
let discoverConnection () =
match tryRead "EQUINOX_COSMOS_CONNECTION" with
| None -> "localDocDbSim", Discovery.AccountUriAndKey(Uri "https://localhost:8081", "C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==")
Expand All @@ -90,7 +90,7 @@ let createConnector (log: Serilog.ILogger) =
let name, discovery = discoverConnection ()
let connector = CosmosStoreConnector(discovery, requestTimeout = TimeSpan.FromSeconds 3.,
maxRetryAttemptsOnRateLimitedRequests = 2, maxRetryWaitTimeOnRateLimitedRequests = TimeSpan.FromMinutes 1.)
log.Information("CosmosStore {name} {endpoint}", name, discovery.Endpoint)
log.Information("CosmosStore {name} {endpoint}", name, connector.Endpoint)
connector

[<Xunit.CollectionDefinition "DocStore">]
Expand Down
Loading

0 comments on commit 551d321

Please sign in to comment.