diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml
index 5772f4b1..09e8c338 100644
--- a/.github/workflows/ci.yaml
+++ b/.github/workflows/ci.yaml
@@ -25,10 +25,6 @@ jobs:
- name: Checkout
uses: actions/checkout@v2
- - uses: actions/setup-dotnet@v1
- with:
- dotnet-version: '6.0.x'
-
- name: Install message-db
env:
MESSAGE_DB_VERSION: 1.3.0
diff --git a/azure-pipelines.yml b/azure-pipelines.yml
index 72bbd880..bb6d770b 100644
--- a/azure-pipelines.yml
+++ b/azure-pipelines.yml
@@ -47,10 +47,5 @@ jobs:
pool:
vmImage: 'macOS-latest'
steps:
- - task: UseDotNet@2
- displayName: 'Install .NET Core sdk'
- inputs:
- packageType: sdk
- version: 6.x
- script: dotnet pack build.proj
displayName: dotnet pack build.proj
diff --git a/global.json b/global.json
index 0f050ba5..9bb4dcbe 100644
--- a/global.json
+++ b/global.json
@@ -1,6 +1,6 @@
{
"sdk": {
- "version": "6.0.300",
+ "version": "8.0.101",
"rollForward": "latestMajor"
}
}
diff --git a/src/Propulsion.CosmosStore/EquinoxSystemTextJsonParser.fs b/src/Propulsion.CosmosStore/EquinoxSystemTextJsonParser.fs
index e1a136f5..b59df5f3 100644
--- a/src/Propulsion.CosmosStore/EquinoxSystemTextJsonParser.fs
+++ b/src/Propulsion.CosmosStore/EquinoxSystemTextJsonParser.fs
@@ -4,8 +4,8 @@ open Equinox.CosmosStore.Core
open Propulsion.Sinks
-/// Maps fields in an Event within an Equinox.Cosmos V1+ Event (in a Batch or Tip) to the interface defined by Propulsion.Streams
-/// NOTE No attempt is made to filter out Tip (`id=-1`) batches from the ChangeFeed; Equinox versions >= 3, Tip batches can bear events.
+/// Maps fields in an Event within an Equinox.Cosmos V1+ Event (in a Batch or Tip) to the interface defined by Propulsion.Streams.
+/// NOTE No attempt is made to filter out Tip (`id=-1`) batches from the ChangeFeed, as in Equinox versions >= 3, Tip batches can bear events.
[]
#if !COSMOSV3
module EquinoxSystemTextJsonParser =
diff --git a/src/Propulsion.DynamoStore/AppendsEpoch.fs b/src/Propulsion.DynamoStore/AppendsEpoch.fs
index 4fb361d2..ed079151 100644
--- a/src/Propulsion.DynamoStore/AppendsEpoch.fs
+++ b/src/Propulsion.DynamoStore/AppendsEpoch.fs
@@ -45,7 +45,7 @@ module Events =
let next (x: Events.StreamSpan) = int x.i + x.c.Length
/// Aggregates all spans per stream into a single Span from the lowest index to the highest
let flatten: Events.StreamSpan seq -> Events.StreamSpan seq =
- Seq.groupBy (fun x -> x.p)
+ Seq.groupBy _.p
>> Seq.map (fun (p, xs) ->
let mutable i = -1L
let c = ResizeArray()
@@ -175,7 +175,7 @@ module Reader =
member _.ReadVersion(partitionId, epochId): Async =
let decider = resolve (partitionId, epochId, System.Int64.MaxValue)
- decider.QueryEx(fun c -> c.Version)
+ decider.QueryEx _.Version
module Factory =
diff --git a/src/Propulsion.DynamoStore/DynamoDbExport.fs b/src/Propulsion.DynamoStore/DynamoDbExport.fs
index 93cf4f99..015ed9e2 100644
--- a/src/Propulsion.DynamoStore/DynamoDbExport.fs
+++ b/src/Propulsion.DynamoStore/DynamoDbExport.fs
@@ -47,7 +47,7 @@ type Importer(buffer: DynamoStoreIndex.Buffer, emit, dump) =
let _ok, _ = buffer.LogIndexed(string streamSpan.p, { i = int streamSpan.i; c = streamSpan.c })
pending.Remove(string streamSpan.p) |> ignore
totalIngestedSpans <- totalIngestedSpans + batch.LongLength
- return pending.Values |> Seq.sumBy (fun x -> x.c.Length) }
+ return pending.Values |> Seq.sumBy _.c.Length }
/// Ingest a file worth of data, flushing whenever we've accumulated enough pending data to be written
member _.IngestDynamoDbJsonFile(file, bufferedEventsFlushThreshold) = async {
diff --git a/src/Propulsion.DynamoStore/DynamoStoreIndex.fs b/src/Propulsion.DynamoStore/DynamoStoreIndex.fs
index e213eef0..f31a591c 100644
--- a/src/Propulsion.DynamoStore/DynamoStoreIndex.fs
+++ b/src/Propulsion.DynamoStore/DynamoStoreIndex.fs
@@ -100,7 +100,7 @@ module Reader =
let! maybeStreamBytes, _version, state = epochs.Read(partitionId, epochId, 0)
let sizeB, loadS = defaultValueArg maybeStreamBytes 0L, Stopwatch.elapsedSeconds ts
let spans = state.changes |> Array.collect (fun struct (_i, spans) -> spans)
- let totalEvents = spans |> Array.sumBy (fun x -> x.c.Length)
+ let totalEvents = spans |> Array.sumBy _.c.Length
let totalStreams = spans |> AppendsEpoch.flatten |> Seq.length
log.Information("Epoch {epochId} {totalE} events {totalS} streams ({spans} spans, {batches} batches, {k:n3} MiB) {loadS:n1}s",
string epochId, totalEvents, totalStreams, spans.Length, state.changes.Length, Log.miB sizeB, loadS)
diff --git a/src/Propulsion.DynamoStore/DynamoStoreSource.fs b/src/Propulsion.DynamoStore/DynamoStoreSource.fs
index 0ab5f3e7..96fc6b4d 100644
--- a/src/Propulsion.DynamoStore/DynamoStoreSource.fs
+++ b/src/Propulsion.DynamoStore/DynamoStoreSource.fs
@@ -50,7 +50,7 @@ module private Impl =
sw.Stop()
let totalStreams, chosenEvents, totalEvents, streamEvents =
let all = state.changes |> Seq.collect (fun struct (_i, xs) -> xs) |> AppendsEpoch.flatten |> Array.ofSeq
- let totalEvents = all |> Array.sumBy (fun x -> x.c.Length)
+ let totalEvents = all |> Array.sumBy _.c.Length
let mutable chosenEvents = 0
let chooseStream (span: AppendsEpoch.Events.StreamSpan) =
match maybeLoad (IndexStreamId.toStreamName span.p) (span.i, span.c) with
diff --git a/src/Propulsion.EventStoreDb/EventStoreSource.fs b/src/Propulsion.EventStoreDb/EventStoreSource.fs
index 302426d8..ae4918fb 100644
--- a/src/Propulsion.EventStoreDb/EventStoreSource.fs
+++ b/src/Propulsion.EventStoreDb/EventStoreSource.fs
@@ -16,7 +16,7 @@ module private Impl =
let readBatch withData batchSize streamFilter (store: EventStoreClient) pos ct = task {
let pos = let p = pos |> Propulsion.Feed.Position.toInt64 |> uint64 in Position(p, p)
let res = store.ReadAllAsync(Direction.Forwards, pos, batchSize, withData, cancellationToken = ct)
- let! batch = res |> TaskSeq.map (fun e -> e.Event) |> TaskSeq.toArrayAsync
+ let! batch = res |> TaskSeq.map _.Event |> TaskSeq.toArrayAsync
return ({ checkpoint = checkpointPos batch; items = toItems streamFilter batch; isTail = batch.LongLength <> batchSize }: Propulsion.Feed.Core.Batch<_>) }
// @scarvel8: event_global_position = 256 x 1024 x 1024 x chunk_number + chunk_header_size (128) + event_position_offset_in_chunk
diff --git a/src/Propulsion.MemoryStore/MemoryStoreLogger.fs b/src/Propulsion.MemoryStore/MemoryStoreLogger.fs
index 3be9d71f..94586730 100644
--- a/src/Propulsion.MemoryStore/MemoryStoreLogger.fs
+++ b/src/Propulsion.MemoryStore/MemoryStoreLogger.fs
@@ -19,7 +19,7 @@ let renderSubmit (log: Serilog.ILogger) struct (epoch, categoryName, streamId, e
if (not << log.IsEnabled) Serilog.Events.LogEventLevel.Debug then log
elif typedefof<'F> <> typeof> then log
else log |> propEventJsonUtf8 "Json" (unbox events)
- let types = events |> Seq.map (fun e -> e.EventType)
+ let types = events |> Seq.map _.EventType
log.ForContext("types", types).Debug("Submit #{epoch} {categoryName}-{streamId}x{count}", epoch, categoryName, streamId, events.Length)
elif log.IsEnabled Serilog.Events.LogEventLevel.Debug then
let types = seq { for e in events -> e.EventType } |> Seq.truncate 5
diff --git a/src/Propulsion/FeedMonitor.fs b/src/Propulsion/FeedMonitor.fs
index 688be4d3..f4767ba9 100644
--- a/src/Propulsion/FeedMonitor.fs
+++ b/src/Propulsion/FeedMonitor.fs
@@ -77,7 +77,7 @@ and FeedMonitor(log: Serilog.ILogger, fetchPositions: unit -> struct (TrancheId
let logInterval = IntervalTimer logInterval
let logWaitStatusUpdateNow () =
let current = fetchPositions ()
- let currentRead, completed = current |> choose (fun v -> v.ReadPos), current |> choose (fun v -> v.CompletedPos)
+ let currentRead, completed = current |> choose _.ReadPos, current |> choose _.CompletedPos
match waitMode with
| OriginalWorkOnly -> log.Information("FeedMonitor {totalTime:n1}s Awaiting Started {starting} Completed {completed}",
sw.ElapsedSeconds, startReadPositions, completed)
@@ -89,7 +89,7 @@ and FeedMonitor(log: Serilog.ILogger, fetchPositions: unit -> struct (TrancheId
let busy () =
let current = fetchPositions ()
match waitMode with
- | OriginalWorkOnly -> let completed = current |> choose (fun v -> v.CompletedPos)
+ | OriginalWorkOnly -> let completed = current |> choose _.CompletedPos
let trancheCompletedPos = System.Collections.Generic.Dictionary(completed |> Seq.map ValueTuple.toKvp)
let startPosStillPendingCompletion trancheStartPos trancheId =
match trancheCompletedPos.TryGetValue trancheId with
@@ -145,7 +145,7 @@ and FeedMonitor(log: Serilog.ILogger, fetchPositions: unit -> struct (TrancheId
| xs when Array.any xs && requireTail && xs |> Array.forall (ValueTuple.snd >> TranchePosition.isDrained) ->
xs |> choose (fun v -> v.ReadPos |> orDummyValue)
| xs when xs |> Array.forall (fun struct (_, v) -> TranchePosition.isEmpty v && (not requireTail || v.IsTail)) -> Array.empty
- | originals -> originals |> choose (fun v -> v.ReadPos)
+ | originals -> originals |> choose _.ReadPos
match! awaitPropagation sleep propagationDelay activeTranches ct with
| [||] ->
if propagationDelay = TimeSpan.Zero then log.Debug("FeedSource Wait Skipped; no processing pending. Completed {completed}", currentCompleted)
diff --git a/src/Propulsion/Internal.fs b/src/Propulsion/Internal.fs
index 21fb74e3..1b477dd0 100644
--- a/src/Propulsion/Internal.fs
+++ b/src/Propulsion/Internal.fs
@@ -234,7 +234,7 @@ module Stats =
let emit = logStatsPadded log keys
let summary =
cats.Values
- |> Seq.collect (fun x -> x.All)
+ |> Seq.collect _.All
|> Seq.groupBy ValueTuple.fst
|> Seq.map (fun (g, xs) -> struct (g, Seq.sumBy ValueTuple.snd xs))
|> Seq.sortByDescending ValueTuple.snd
@@ -254,7 +254,7 @@ module Stats =
open MathNet.Numerics.Statistics
let private logLatencyPercentiles (log: Serilog.ILogger) (label: string) (xs: TimeSpan seq) =
- let sortedLatencies = xs |> Seq.map (fun ts -> ts.TotalSeconds) |> Seq.sort |> Seq.toArray
+ let sortedLatencies = xs |> Seq.map _.TotalSeconds |> Seq.sort |> Seq.toArray
let pc p = SortedArrayStatistics.Percentile(sortedLatencies, p) |> TimeSpan.FromSeconds
let l = {
@@ -310,9 +310,9 @@ module Stats =
if buckets.Count <> 0 then
let clusters = buckets |> Seq.groupBy (fun kv -> bucketGroup kv.Key) |> Seq.sortBy fst |> Seq.toArray
let emit = logLatencyPercentilesPadded log (clusters |> Seq.map fst)
- totalLabel |> Option.iter (fun l -> emit l (buckets |> Seq.collect (fun kv -> kv.Value)))
+ totalLabel |> Option.iter (fun l -> emit l (buckets |> Seq.collect _.Value))
for name, items in clusters do
- emit name (items |> Seq.collect (fun kv -> kv.Value))
+ emit name (items |> Seq.collect _.Value)
member _.Clear() = buckets.Clear()
type LogEventLevel = Serilog.Events.LogEventLevel
diff --git a/src/Propulsion/Parallel.fs b/src/Propulsion/Parallel.fs
index 62628832..33b47dd5 100755
--- a/src/Propulsion/Parallel.fs
+++ b/src/Propulsion/Parallel.fs
@@ -98,9 +98,9 @@ module Scheduling =
log.Information("Scheduler {cycles} cycles Started {startedBatches}b {startedItems}i Completed {completedBatches}b {completedItems}i latency {completedLatency:f1}ms Ready {readyitems} Waiting {waitingBatches}b",
cycles, statsTotal startedB, statsTotal startedI, statsTotal completedB, totalItemsCompleted, latencyMs, waiting.Count, incoming.Count)
let active =
- seq { for KeyValue(pid,q) in active -> pid, q |> Seq.sumBy (fun x -> x.remaining) }
- |> Seq.filter (fun (_,snd) -> snd <> 0)
- |> Seq.sortBy (fun (_,snd) -> -snd)
+ seq { for KeyValue (pid, q) in active -> pid, q |> Seq.sumBy _.remaining }
+ |> Seq.filter (fun (_, snd) -> snd <> 0)
+ |> Seq.sortBy (fun (_, snd) -> -snd)
log.Information("Partitions Active items {@active} Started batches {@startedBatches} items {@startedItems} Completed batches {@completedBatches} items {@completedItems}",
active, startedB, startedI, completedB, completedI)
cycles <- 0; processingDuration <- TimeSpan.Zero; startedBatches.Clear(); completedBatches.Clear(); startedItems.Clear(); completedItems.Clear()
diff --git a/src/Propulsion/Streams.fs b/src/Propulsion/Streams.fs
index 6bcbb826..e045182c 100755
--- a/src/Propulsion/Streams.fs
+++ b/src/Propulsion/Streams.fs
@@ -198,7 +198,7 @@ module Buffer =
let sn, wp = FsCodec.StreamName.toString stream, defaultValueArg state.WritePos 0L
waitingStreams.Ingest(sprintf "%s@%dx%d" sn wp state.queue[0].Length, (sz + 512L) / 1024L)
waiting <- waiting + 1
- waitingE <- waitingE + (state.queue |> Array.sumBy (fun x -> x.Length))
+ waitingE <- waitingE + (state.queue |> Array.sumBy _.Length)
waitingB <- waitingB + sz
let m = Log.Metric.BufferReport { cats = waitingCats.Count; streams = waiting; events = waitingE; bytes = waitingB }
(log |> Log.withMetric m).Information(" Streams Waiting {busy:n0}/{busyMb:n1}MB", waiting, Log.miB waitingB)
diff --git a/tests/Propulsion.Kafka.Integration/ConsumersIntegration.fs b/tests/Propulsion.Kafka.Integration/ConsumersIntegration.fs
index 8521363a..cd225e25 100644
--- a/tests/Propulsion.Kafka.Integration/ConsumersIntegration.fs
+++ b/tests/Propulsion.Kafka.Integration/ConsumersIntegration.fs
@@ -272,7 +272,7 @@ and [] ConsumerIntegration(testOutputHelper, expectConcurrentSche
// ``should have consumed all expected messages`
let unconsumed =
allMessages
- |> Array.groupBy (fun msg -> msg.payload.producerId)
+ |> Array.groupBy _.payload.producerId
|> Array.map (fun (_, gp) -> gp |> Array.distinctBy (fun msg -> msg.payload.messageId))
|> Array.where (fun gp -> gp.Length <> messagesPerProducer)
let unconsumedCounts =
diff --git a/tests/Propulsion.Kafka.Integration/Propulsion.Kafka.Integration.fsproj b/tests/Propulsion.Kafka.Integration/Propulsion.Kafka.Integration.fsproj
index f67727ca..74b429fb 100644
--- a/tests/Propulsion.Kafka.Integration/Propulsion.Kafka.Integration.fsproj
+++ b/tests/Propulsion.Kafka.Integration/Propulsion.Kafka.Integration.fsproj
@@ -1,7 +1,7 @@
- net6.0
+ net8.0
false
diff --git a/tests/Propulsion.MessageDb.Integration/Propulsion.MessageDb.Integration.fsproj b/tests/Propulsion.MessageDb.Integration/Propulsion.MessageDb.Integration.fsproj
index 171da889..0c5a9d63 100644
--- a/tests/Propulsion.MessageDb.Integration/Propulsion.MessageDb.Integration.fsproj
+++ b/tests/Propulsion.MessageDb.Integration/Propulsion.MessageDb.Integration.fsproj
@@ -1,7 +1,7 @@
- net6.0
+ net8.0
diff --git a/tests/Propulsion.Tests/Propulsion.Tests.fsproj b/tests/Propulsion.Tests/Propulsion.Tests.fsproj
index 70be998d..f5117433 100644
--- a/tests/Propulsion.Tests/Propulsion.Tests.fsproj
+++ b/tests/Propulsion.Tests/Propulsion.Tests.fsproj
@@ -1,7 +1,7 @@
- net6.0
+ net8.0
TRIM_FEED
diff --git a/tools/Propulsion.Tool/Propulsion.Tool.fsproj b/tools/Propulsion.Tool/Propulsion.Tool.fsproj
index f1f8fddc..4959f367 100644
--- a/tools/Propulsion.Tool/Propulsion.Tool.fsproj
+++ b/tools/Propulsion.Tool/Propulsion.Tool.fsproj
@@ -1,7 +1,7 @@
- net6.0
+ net8.0
Exe
true