From 8875e8c2209bf72fe7ca83fbe23fe67e7c7be19e Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Thu, 2 Jan 2025 00:19:06 +0000 Subject: [PATCH] Add shouldCompress predicate --- .../{EncodedBody.fs => Encoding.fs} | 91 +++++++++++-------- .../FsCodec.SystemTextJson.fsproj | 2 +- src/FsCodec/FsCodec.fs | 21 +++-- .../{EncodedBodyTests.fs => EncodingTests.fs} | 10 +- .../FsCodec.SystemTextJson.Tests.fsproj | 2 +- 5 files changed, 75 insertions(+), 51 deletions(-) rename src/FsCodec.SystemTextJson/{EncodedBody.fs => Encoding.fs} (61%) rename tests/FsCodec.SystemTextJson.Tests/{EncodedBodyTests.fs => EncodingTests.fs} (90%) diff --git a/src/FsCodec.SystemTextJson/EncodedBody.fs b/src/FsCodec.SystemTextJson/Encoding.fs similarity index 61% rename from src/FsCodec.SystemTextJson/EncodedBody.fs rename to src/FsCodec.SystemTextJson/Encoding.fs index 71c1f7e..99af8cb 100644 --- a/src/FsCodec.SystemTextJson/EncodedBody.fs +++ b/src/FsCodec.SystemTextJson/Encoding.fs @@ -9,7 +9,7 @@ open System.Text.Json /// Represents the body of an Event (or its Metadata), holding the encoded form of the buffer together with an enum value identifying the encoding scheme. /// Enables the decoding side to transparently inflate the data on loading without burdening the application layer with tracking the encoding scheme used. -type EncodedBodyT = (struct(int * JsonElement)) +type EncodedBody = (struct(int * JsonElement)) module private Impl = @@ -35,7 +35,7 @@ module private Impl = let s = new System.IO.MemoryStream(data, writable = false) use decompressor = new System.IO.Compression.BrotliStream(s, System.IO.Compression.CompressionMode.Decompress) decompressor.CopyTo output - let expand post alg compressedBytes = + let private unpack post alg compressedBytes = use output = new System.IO.MemoryStream() compressedBytes |> alg output output.ToArray() |> post @@ -44,34 +44,34 @@ module private Impl = | Encoding.Deflate, JsonValueKind.String -> data.GetBytesFromBase64() |> expand inflateTo | Encoding.Brotli, JsonValueKind.String -> data.GetBytesFromBase64() |> expand brotliDecompressTo | _ -> data |> direct - let decode = decode_ id (expand InteropHelpers.Utf8ToJsonElement) - let decodeUtf8 = decode_ InteropHelpers.JsonElementToUtf8 (expand ReadOnlyMemory) + let decode = decode_ id (unpack InteropHelpers.Utf8ToJsonElement) + let decodeUtf8 = decode_ InteropHelpers.JsonElementToUtf8 (unpack ReadOnlyMemory) (* Conditional compression logic: triggered as storage layer pulls Data/Meta fields Bodies under specified minimum size, or not meeting a required compression gain are stored directly, equivalent to if compression had not been wired in *) - let encodeUncompressed (raw: JsonElement): EncodedBodyT = Encoding.Direct, raw - let private blobToStringElement = Convert.ToBase64String >> JsonSerializer.SerializeToElement + let encodeUncompressed (raw: JsonElement): EncodedBody = Encoding.Direct, raw + let private blobToBase64StringJsonElement = Convert.ToBase64String >> JsonSerializer.SerializeToElement let private brotliCompress (eventBody: ReadOnlyMemory): System.IO.MemoryStream = let output = new System.IO.MemoryStream() use compressor = new System.IO.Compression.BrotliStream(output, System.IO.Compression.CompressionLevel.Optimal, leaveOpen = true) compressor.Write eventBody.Span compressor.Close() // NOTE Close, not Flush; we want the output fully terminated to reduce surprises when decompressing output - let tryCompress minSize minGain (raw: JsonElement): EncodedBodyT = + let tryCompress minSize minGain (raw: JsonElement): EncodedBody = let utf8: ReadOnlyMemory = InteropHelpers.JsonElementToUtf8 raw if utf8.Length < minSize then encodeUncompressed raw else let brotli = brotliCompress utf8 if utf8.Length <= int brotli.Length + minGain then encodeUncompressed raw else - Encoding.Brotli, brotli.ToArray() |> blobToStringElement - let encodeUncompressedUtf8 (raw: ReadOnlyMemory): EncodedBodyT = Encoding.Direct, InteropHelpers.Utf8ToJsonElement raw - let tryCompressUtf8 minSize minGain (utf8: ReadOnlyMemory): EncodedBodyT = + Encoding.Brotli, brotli.ToArray() |> blobToBase64StringJsonElement + let encodeUncompressedUtf8 (raw: ReadOnlyMemory): EncodedBody = Encoding.Direct, InteropHelpers.Utf8ToJsonElement raw + let tryCompressUtf8 minSize minGain (utf8: ReadOnlyMemory): EncodedBody = if utf8.Length < minSize then encodeUncompressedUtf8 utf8 else let brotli = brotliCompress utf8 if utf8.Length <= int brotli.Length + minGain then encodeUncompressedUtf8 utf8 else - Encoding.Brotli, brotli.ToArray() |> blobToStringElement + Encoding.Brotli, brotli.ToArray() |> blobToBase64StringJsonElement type [] CompressionOptions = { minSize: int; minGain: int } with /// Attempt to compress anything possible @@ -82,57 +82,72 @@ type [] CompressionOptions = { minSize: int; minGain: int } with static member Default = { minSize = 48; minGain = 4 } [] -type EncodedBody private () = +type Encoding private () = - static member Uncompressed(x: JsonElement): EncodedBodyT = + static member Uncompressed(x: JsonElement): EncodedBody = Impl.encodeUncompressed x - static member Uncompressed(x: ReadOnlyMemory): EncodedBodyT = + static member Uncompressed(x: ReadOnlyMemory): EncodedBody = Impl.encodeUncompressedUtf8 x - static member TryCompress(options, x: JsonElement): EncodedBodyT = + static member TryCompress(options, x: JsonElement): EncodedBody = Impl.tryCompress options.minSize options.minGain x - static member TryCompress(options, x: ReadOnlyMemory): EncodedBodyT = + static member TryCompress(options, x: ReadOnlyMemory): EncodedBody = Impl.tryCompressUtf8 options.minSize options.minGain x - static member ToJsonElement(x: EncodedBodyT): JsonElement = + static member ToJsonElement(x: EncodedBody): JsonElement = Impl.decode x - static member ToUtf8(x: EncodedBodyT): ReadOnlyMemory = + static member ToUtf8(x: EncodedBody): ReadOnlyMemory = Impl.decodeUtf8 x - static member ToByteArray(x: EncodedBodyT): byte[] = - EncodedBody.ToUtf8(x).ToArray() - static member ExpandTo(ms: System.IO.Stream, x: EncodedBodyT) = + static member ToByteArray(x: EncodedBody): byte[] = + Encoding.ToUtf8(x).ToArray() + static member ExpandTo(ms: System.IO.Stream, x: EncodedBody) = Impl.decode_ (fun el -> JsonSerializer.Serialize(ms, el)) (fun dec -> dec ms) x - /// Adapts an IEventCodec rendering to JsonElement Event Bodies to attempt to compress the data.
- /// If sufficient compression, as defined by options is not achieved, the body is saved as-is.
- /// The int conveys a value that must be round tripped alongside the body in order for the decoding process to correctly interpret it.
+ /// The body will be saved as-is under the following circumstances:
+ /// - the shouldCompress predicate is not satisfied for the event in question.
+ /// - sufficient compression, as defined by options is not achieved, the body is saved as-is.
+ /// The int produced when Encodeing conveys the encoding used, and must be round tripped alongside the body as a required input of a future Decode.
[] - static member EncodeTryCompress<'Event, 'Context>(native: IEventCodec<'Event, ReadOnlyMemory, 'Context>, [] ?options) - : IEventCodec<'Event, EncodedBodyT, 'Context> = + static member EncodeTryCompress<'Event, 'Context>( + native: IEventCodec<'Event, ReadOnlyMemory, 'Context>, + [] ?shouldCompress: Func>, bool>, + [] ?options) + : IEventCodec<'Event, EncodedBody, 'Context> = let opts = defaultArg options CompressionOptions.Default - FsCodec.Core.EventCodec.Map(native, (fun x -> EncodedBody.TryCompress(opts, x)), Func<_, _> EncodedBody.ToUtf8) + let encode = shouldCompress |> function + | None -> fun _x (d: ReadOnlyMemory) -> Encoding.TryCompress(opts, d) + | Some predicate -> fun x d -> if predicate.Invoke x then Encoding.TryCompress(opts, d) else Encoding.Uncompressed d + FsCodec.Core.EventCodec.MapEx(native, encode, Func<_, _> Encoding.ToUtf8) /// Adapts an IEventCodec rendering to JsonElement Event Bodies to attempt to compress the data.
- /// If sufficient compression, as defined by options is not achieved, the body is saved as-is.
- /// The int conveys a value that must be round tripped alongside the body in order for the decoding process to correctly interpret it.
+ /// The body will be saved as-is under the following circumstances:
+ /// - the shouldCompress predicate is not satisfied for the event in question.
+ /// - sufficient compression, as defined by options is not achieved, the body is saved as-is.
+ /// The int produced when Encodeing conveys the encoding used, and must be round tripped alongside the body as a required input of a future Decode. [] - static member EncodeTryCompress<'Event, 'Context>(native: IEventCodec<'Event, JsonElement, 'Context>, [] ?options) - : IEventCodec<'Event, EncodedBodyT, 'Context> = + static member EncodeTryCompress<'Event, 'Context>( + native: IEventCodec<'Event, JsonElement, 'Context>, + [] ?shouldCompress: Func, bool>, + [] ?options) + : IEventCodec<'Event, EncodedBody, 'Context> = let opts = defaultArg options CompressionOptions.Default - FsCodec.Core.EventCodec.Map(native, (fun x -> EncodedBody.TryCompress(opts, x)), Func<_, _> EncodedBody.ToJsonElement) + let encode = shouldCompress |> function + | None -> fun _x (d: JsonElement) -> Encoding.TryCompress(opts, d) + | Some predicate -> fun x d -> if predicate.Invoke x then Encoding.TryCompress(opts, d) else Encoding.Uncompressed d + FsCodec.Core.EventCodec.MapEx(native, encode, Func<_, _> Encoding.ToJsonElement) /// Adapts an IEventCodec rendering to JsonElement Event Bodies to encode as per EncodeTryCompress, but without attempting compression. [] static member EncodeUncompressed<'Event, 'Context>(native: IEventCodec<'Event, JsonElement, 'Context>) - : IEventCodec<'Event, EncodedBodyT, 'Context> = - FsCodec.Core.EventCodec.Map(native, Func<_, _> EncodedBody.Uncompressed, Func<_, _> EncodedBody.ToJsonElement) + : IEventCodec<'Event, EncodedBody, 'Context> = + FsCodec.Core.EventCodec.Map(native, Func<_, _> Encoding.Uncompressed, Func<_, _> Encoding.ToJsonElement) /// Adapts an IEventCodec rendering to int * JsonElement Event Bodies to render and/or consume Uncompressed ReadOnlyMemory<byte>. [] - static member ToUtf8Codec<'Event, 'Context>(native: IEventCodec<'Event, EncodedBodyT, 'Context>) + static member ToUtf8Codec<'Event, 'Context>(native: IEventCodec<'Event, EncodedBody, 'Context>) : IEventCodec<'Event, ReadOnlyMemory, 'Context> = - FsCodec.Core.EventCodec.Map(native, Func<_, _> EncodedBody.ToUtf8, Func<_, _> EncodedBody.Uncompressed) + FsCodec.Core.EventCodec.Map(native, Func<_, _> Encoding.ToUtf8, Func<_, _> Encoding.Uncompressed) /// Adapts an IEventCodec rendering to int * JsonElement Event Bodies to render and/or consume Uncompressed byte[]. [] - static member ToByteArrayCodec<'Event, 'Context>(native: IEventCodec<'Event, EncodedBodyT, 'Context>) + static member ToByteArrayCodec<'Event, 'Context>(native: IEventCodec<'Event, EncodedBody, 'Context>) : IEventCodec<'Event, byte[], 'Context> = - FsCodec.Core.EventCodec.Map(native, Func<_, _> EncodedBody.ToByteArray, Func<_, _> EncodedBody.Uncompressed) + FsCodec.Core.EventCodec.Map(native, Func<_, _> Encoding.ToByteArray, Func<_, _> Encoding.Uncompressed) diff --git a/src/FsCodec.SystemTextJson/FsCodec.SystemTextJson.fsproj b/src/FsCodec.SystemTextJson/FsCodec.SystemTextJson.fsproj index ad3c1b4..515eeec 100644 --- a/src/FsCodec.SystemTextJson/FsCodec.SystemTextJson.fsproj +++ b/src/FsCodec.SystemTextJson/FsCodec.SystemTextJson.fsproj @@ -19,7 +19,7 @@ - + diff --git a/src/FsCodec/FsCodec.fs b/src/FsCodec/FsCodec.fs index f811585..3f8e56e 100755 --- a/src/FsCodec/FsCodec.fs +++ b/src/FsCodec/FsCodec.fs @@ -61,17 +61,21 @@ type EventData<'Format>(eventType, data, meta, eventId, correlationId, causation member _.CausationId = causationId member _.Timestamp = timestamp - static member Map<'Mapped>(f: Func<'Format, 'Mapped>) + static member MapEx<'Mapped>(f: Func, 'Format, 'Mapped>) (x: IEventData<'Format>): IEventData<'Mapped> = { new IEventData<'Mapped> with member _.EventType = x.EventType - member _.Data = f.Invoke x.Data - member _.Meta = f.Invoke x.Meta + member _.Data = f.Invoke(x, x.Data) + member _.Meta = f.Invoke(x, x.Meta) member _.EventId = x.EventId member _.CorrelationId = x.CorrelationId member _.CausationId = x.CausationId member _.Timestamp = x.Timestamp } + static member Map<'Mapped>(f: Func<'Format, 'Mapped>) + (x: IEventData<'Format>): IEventData<'Mapped> = + EventData.MapEx(Func<_, _, _>(fun _x -> f.Invoke)) x + /// An Event or Unfold that's been read from a Store and hence has a defined Index on the Event Timeline. [] type TimelineEvent<'Format>(index, eventType, data, meta, eventId, correlationId, causationId, timestamp, isUnfold, context, size) = @@ -90,7 +94,7 @@ type TimelineEvent<'Format>(index, eventType, data, meta, eventId, correlationId TimelineEvent(index, inner.EventType, inner.Data, inner.Meta, inner.EventId, inner.CorrelationId, inner.CausationId, inner.Timestamp, isUnfold, Option.toObj context, size) :> _ override _.ToString() = sprintf "%s %s @%i" (if isUnfold then "Unfold" else "Event") eventType index - + interface ITimelineEvent<'Format> with member _.Index = index member _.IsUnfold = isUnfold @@ -122,10 +126,10 @@ type TimelineEvent<'Format>(index, eventType, data, meta, eventId, correlationId [] type EventCodec<'Event, 'Format, 'Context> private () = - static member Map<'TargetFormat>(native: IEventCodec<'Event, 'Format, 'Context>, up: Func<'Format,'TargetFormat>, down: Func<'TargetFormat, 'Format>) + static member MapEx<'TargetFormat>(native: IEventCodec<'Event, 'Format, 'Context>, up: Func, 'Format,'TargetFormat>, down: Func<'TargetFormat, 'Format>) : IEventCodec<'Event, 'TargetFormat, 'Context> = - let upConvert = EventData.Map up + let upConvert = EventData.MapEx up let downConvert = TimelineEvent.Map down { new IEventCodec<'Event, 'TargetFormat, 'Context> with @@ -137,3 +141,8 @@ type EventCodec<'Event, 'Format, 'Context> private () = member _.Decode target = let encoded = downConvert target native.Decode encoded } + + static member Map<'TargetFormat>(native: IEventCodec<'Event, 'Format, 'Context>, up: Func<'Format,'TargetFormat>, down: Func<'TargetFormat, 'Format>) + : IEventCodec<'Event, 'TargetFormat, 'Context> = + + EventCodec.MapEx(native, Func<_, _, _>(fun _x -> up.Invoke), down) diff --git a/tests/FsCodec.SystemTextJson.Tests/EncodedBodyTests.fs b/tests/FsCodec.SystemTextJson.Tests/EncodingTests.fs similarity index 90% rename from tests/FsCodec.SystemTextJson.Tests/EncodedBodyTests.fs rename to tests/FsCodec.SystemTextJson.Tests/EncodingTests.fs index 3a894f7..76002c2 100644 --- a/tests/FsCodec.SystemTextJson.Tests/EncodedBodyTests.fs +++ b/tests/FsCodec.SystemTextJson.Tests/EncodingTests.fs @@ -1,4 +1,4 @@ -module FsCodec.SystemTextJson.Tests.EncodedBodyTests +module FsCodec.SystemTextJson.Tests.EncodingTests open Swensen.Unquote open System @@ -38,8 +38,8 @@ module InternalDecoding = let explicitBrotli = struct (2, JsonSerializer.SerializeToElement "CwuAeyJ2YWx1ZSI6IkhlbGxvIFdvcmxkIn0D") let decode useRom = - if useRom then FsCodec.SystemTextJson.EncodedBody.ToByteArray >> JsonSerializer.Deserialize - else FsCodec.SystemTextJson.EncodedBody.ToJsonElement >> JsonSerializer.Deserialize + if useRom then FsCodec.SystemTextJson.Encoding.ToByteArray >> JsonSerializer.Deserialize + else FsCodec.SystemTextJson.Encoding.ToJsonElement >> JsonSerializer.Deserialize let [] ``Can decode all known representations`` useRom = test <@ decode useRom direct = inputValue @> @@ -61,7 +61,7 @@ type JsonElement with member x.Utf8ByteCount = if x.ValueKind = JsonValueKind.Nu module TryCompress = - let sut = FsCodec.SystemTextJson.EncodedBody.EncodeTryCompress StringUtf8.sut + let sut = FsCodec.SystemTextJson.Encoding.EncodeTryCompress StringUtf8.sut let compressibleValue = {| value = String('x', 5000) |} @@ -83,7 +83,7 @@ module TryCompress = module Uncompressed = - let sut = FsCodec.SystemTextJson.EncodedBody.EncodeUncompressed StringUtf8.sut + let sut = FsCodec.SystemTextJson.Encoding.EncodeUncompressed StringUtf8.sut // Borrow the value we just demonstrated to be compressible let compressibleValue = TryCompress.compressibleValue diff --git a/tests/FsCodec.SystemTextJson.Tests/FsCodec.SystemTextJson.Tests.fsproj b/tests/FsCodec.SystemTextJson.Tests/FsCodec.SystemTextJson.Tests.fsproj index ae7c372..dbc02fc 100644 --- a/tests/FsCodec.SystemTextJson.Tests/FsCodec.SystemTextJson.Tests.fsproj +++ b/tests/FsCodec.SystemTextJson.Tests/FsCodec.SystemTextJson.Tests.fsproj @@ -43,7 +43,7 @@ SomeNullHandlingTests.fs - +