diff --git a/CHANGELOG.md b/CHANGELOG.md index 65c9e51..e8d4b02 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,9 @@ The `Unreleased` section name is replaced by the expected version of next releas ## [Unreleased] ### Added + +- `SystemTextJson.Encoding`: Conditional compression as per `FsCodec.Encoding` [#126](https://github.com/jet/FsCodec/pull/126) + ### Changed ### Removed ### Fixed diff --git a/src/FsCodec.Box/ByteArray.fs b/src/FsCodec.Box/ByteArray.fs index 85c456f..914486d 100644 --- a/src/FsCodec.Box/ByteArray.fs +++ b/src/FsCodec.Box/ByteArray.fs @@ -17,6 +17,11 @@ type ByteArray private () = /// Adapt an IEventCodec that handles ReadOnlyMemory<byte> Event Bodies to instead use byte[]
/// Ideally not used as it makes pooling problematic; only provided for interop/porting scaffolding wrt Equinox V3 and EventStore.Client etc
[] + static member AsByteArray<'Event, 'Context>(native: IEventCodec<'Event, ReadOnlyMemory, 'Context>) + : IEventCodec<'Event, byte[], 'Context> = + FsCodec.Core.EventCodec.mapBodies ByteArray.ReadOnlyMemoryToBytes ByteArray.BytesToReadOnlyMemory native + + [] static member ToByteArrayCodec<'Event, 'Context>(native: IEventCodec<'Event, ReadOnlyMemory, 'Context>) : IEventCodec<'Event, byte[], 'Context> = - FsCodec.Core.EventCodec.Map(native, Func<_, _> ByteArray.ReadOnlyMemoryToBytes, Func<_, _> ByteArray.BytesToReadOnlyMemory) + FsCodec.Core.EventCodec.mapBodies ByteArray.ReadOnlyMemoryToBytes ByteArray.BytesToReadOnlyMemory native diff --git a/src/FsCodec.Box/Compression.fs b/src/FsCodec.Box/Compression.fs index eb021e0..c622c0c 100644 --- a/src/FsCodec.Box/Compression.fs +++ b/src/FsCodec.Box/Compression.fs @@ -4,105 +4,46 @@ open System open System.Runtime.CompilerServices open System.Runtime.InteropServices -/// Represents the body of an Event (or its Metadata), holding the encoded form of the buffer together with an enum value signifying the encoding scheme. -/// Enables the decoding side to transparently inflate the data on loading without burdening the application layer with tracking the encoding scheme used -type EncodedBody = (struct(int * ReadOnlyMemory)) - -module private EncodedMaybeCompressed = - - module Encoding = - let [] Direct = 0 // Assumed for all values not listed here - let [] Deflate = 1 // Deprecated encoding produced by versions pre 3.0.0-rc.13; no longer produced - let [] Brotli = 2 // Default encoding as of 3.0.0-rc.13 - - (* Decompression logic: triggered by extension methods below at the point where the Codec's Decode retrieves the Data or Meta properties *) - - // In versions pre 3.0.0-rc.13, the compression was implemented as follows; NOTE: use of Flush vs Close saves space but is unconventional - // let private deflate (eventBody: ReadOnlyMemory): System.IO.MemoryStream = - // let output = new System.IO.MemoryStream() - // let compressor = new System.IO.Compression.DeflateStream(output, System.IO.Compression.CompressionLevel.Optimal, leaveOpen = true) - // compressor.Write(eventBody.Span) - // compressor.Flush() // NOTE: using Flush in lieu of close means the result is not padded, which can hinder interop - // output - let private inflate (data: ReadOnlyMemory): byte[] = - let s = new System.IO.MemoryStream(data.ToArray(), writable = false) - let decompressor = new System.IO.Compression.DeflateStream(s, System.IO.Compression.CompressionMode.Decompress, leaveOpen = true) - let output = new System.IO.MemoryStream() - decompressor.CopyTo output - output.ToArray() - let private brotliDecompress (data: ReadOnlyMemory): byte[] = - let s = new System.IO.MemoryStream(data.ToArray(), writable = false) - use decompressor = new System.IO.Compression.BrotliStream(s, System.IO.Compression.CompressionMode.Decompress) - use output = new System.IO.MemoryStream() - decompressor.CopyTo output - output.ToArray() - let decode struct (encoding, data): ReadOnlyMemory = - match encoding with - | Encoding.Deflate -> inflate data |> ReadOnlyMemory - | Encoding.Brotli -> brotliDecompress data |> ReadOnlyMemory - | Encoding.Direct | _ -> data - - (* Conditional compression logic: triggered as storage layer pulls Data/Meta fields - Bodies under specified minimum size, or not meeting a required compression gain are stored directly, equivalent to if compression had not been wired in *) - - let private brotliCompress (eventBody: ReadOnlyMemory): System.IO.MemoryStream = - let output = new System.IO.MemoryStream() - use compressor = new System.IO.Compression.BrotliStream(output, System.IO.Compression.CompressionLevel.Optimal, leaveOpen = true) - compressor.Write(eventBody.Span) - compressor.Close() // NOTE Close, not Flush; we want the output fully terminated to reduce surprises when decompressing - output - let encodeUncompressed (raw: ReadOnlyMemory): EncodedBody = Encoding.Direct, raw - let encode minSize minGain (raw: ReadOnlyMemory): EncodedBody = - if raw.Length < minSize then encodeUncompressed raw - else match brotliCompress raw with - | tmp when raw.Length > int tmp.Length + minGain -> Encoding.Brotli, tmp.ToArray() |> ReadOnlyMemory - | _ -> encodeUncompressed raw - type [] CompressionOptions = { minSize: int; minGain: int } with - /// Attempt to compress anything possible - // TL;DR in general it's worth compressing everything to minimize RU consumption both on insert and update - // For DynamoStore, every time we need to calve from the tip, the RU impact of using TransactWriteItems is significant, - // so preventing or delaying that is of critical importance - // Empirically not much JSON below 48 bytes actually compresses - while we don't assume that, it is what is guiding the derivation of the default static member Default = { minSize = 48; minGain = 4 } - /// Encode the data without attempting to compress, regardless of size static member Uncompressed = { minSize = Int32.MaxValue; minGain = 0 } -[] +[] type Compression private () = - static member Utf8ToEncodedDirect(x: ReadOnlyMemory): EncodedBody = - EncodedMaybeCompressed.encodeUncompressed x - static member Utf8ToEncodedTryCompress(options, x: ReadOnlyMemory): EncodedBody = - EncodedMaybeCompressed.encode options.minSize options.minGain x - static member EncodedToUtf8(x: EncodedBody): ReadOnlyMemory = - EncodedMaybeCompressed.decode x - static member EncodedToByteArray(x: EncodedBody): byte[] = - Compression.EncodedToUtf8(x).ToArray() + static member Utf8ToEncodedDirect(x: ReadOnlyMemory): Encoded = + FsCodec.Encoding.OfBlob x + static member Utf8ToEncodedTryCompress(options, x: ReadOnlyMemory): Encoded = + FsCodec.Encoding.OfBlobCompress({ minSize = options.minSize; minGain = options.minGain }, x) + static member EncodedToUtf8(x: Encoded): ReadOnlyMemory = + FsCodec.Encoding.ToBlob x + static member EncodedToByteArray(x: Encoded): byte[] = + FsCodec.Encoding.ToBlob(x).ToArray() /// Adapts an IEventCodec rendering to ReadOnlyMemory<byte> Event Bodies to attempt to compress the data.
/// If sufficient compression, as defined by options is not achieved, the body is saved as-is.
/// The int conveys a value that must be round tripped alongside the body in order for the decoding process to correctly interpret it.
[] static member EncodeTryCompress<'Event, 'Context>(native: IEventCodec<'Event, ReadOnlyMemory, 'Context>, [] ?options) - : IEventCodec<'Event, EncodedBody, 'Context> = + : IEventCodec<'Event, Encoded, 'Context> = let opts = defaultArg options CompressionOptions.Default - FsCodec.Core.EventCodec.Map(native, (fun x -> Compression.Utf8ToEncodedTryCompress(opts, x)), Func<_, _> Compression.EncodedToUtf8) + let opts: FsCodec.CompressionOptions = { minSize = opts.minSize; minGain = opts.minGain } + FsCodec.Core.EventCodec.mapBodies (fun d -> Encoding.OfBlobCompress(opts, d)) Encoding.ToBlob native /// Adapts an IEventCodec rendering to ReadOnlyMemory<byte> Event Bodies to encode as per EncodeTryCompress, but without attempting compression. [] static member EncodeUncompressed<'Event, 'Context>(native: IEventCodec<'Event, ReadOnlyMemory, 'Context>) - : IEventCodec<'Event, EncodedBody, 'Context> = - FsCodec.Core.EventCodec.Map(native, Func<_, _> Compression.Utf8ToEncodedDirect, Func<_, _> Compression.EncodedToUtf8) + : IEventCodec<'Event, Encoded, 'Context> = + Encoder.Uncompressed native /// Adapts an IEventCodec rendering to int * ReadOnlyMemory<byte> Event Bodies to render and/or consume from Uncompressed ReadOnlyMemory<byte>. [] - static member ToUtf8Codec<'Event, 'Context>(native: IEventCodec<'Event, EncodedBody, 'Context>) + static member ToUtf8Codec<'Event, 'Context>(native: IEventCodec<'Event, Encoded, 'Context>) : IEventCodec<'Event, ReadOnlyMemory, 'Context> = - FsCodec.Core.EventCodec.Map(native, Func<_, _> Compression.EncodedToUtf8, Func<_, _> Compression.Utf8ToEncodedDirect) + Encoder.AsBlob native /// Adapts an IEventCodec rendering to int * ReadOnlyMemory<byte> Event Bodies to render and/or consume from Uncompressed byte[]. [] - static member ToByteArrayCodec<'Event, 'Context>(native: IEventCodec<'Event, EncodedBody, 'Context>) + static member ToByteArrayCodec<'Event, 'Context>(native: IEventCodec<'Event, Encoded, 'Context>) : IEventCodec<'Event, byte[], 'Context> = - FsCodec.Core.EventCodec.Map(native, Func<_, _> Compression.EncodedToByteArray, Func<_, _> Compression.Utf8ToEncodedDirect) + Encoder.AsByteArray native diff --git a/src/FsCodec.Box/FsCodec.Box.fsproj b/src/FsCodec.Box/FsCodec.Box.fsproj index 5c4ef88..3776537 100644 --- a/src/FsCodec.Box/FsCodec.Box.fsproj +++ b/src/FsCodec.Box/FsCodec.Box.fsproj @@ -23,8 +23,9 @@ + - + diff --git a/src/FsCodec.SystemTextJson/Encoding.fs b/src/FsCodec.SystemTextJson/Encoding.fs new file mode 100644 index 0000000..314bfef --- /dev/null +++ b/src/FsCodec.SystemTextJson/Encoding.fs @@ -0,0 +1,174 @@ +namespace FsCodec.SystemTextJson + +open FsCodec +open FsCodec.SystemTextJson.Interop +open System +open System.Runtime.CompilerServices +open System.Runtime.InteropServices +open System.Text.Json + +/// Represents the body of an Event (or its Metadata), holding the encoded form of the buffer together with an enum value identifying the encoding scheme. +/// Enables the decoding side to transparently inflate the data on loading without burdening the application layer with tracking the encoding scheme used. +type Encoded = (struct(int * JsonElement)) +type BlobEncoded = FsCodec.EncodedBody + +module Encoding = + let [] Direct = 0 // Assumed for all values not listed here + let [] Deflate = 1 // Deprecated encoding produced by versions pre 3.0.0-rc.13; no longer produced + let [] Brotli = 2 // Default encoding as of 3.0.0-rc.13 + +module private Impl = + + (* Decompression logic: triggered by extension methods below at the point where the Codec's Decode retrieves the Data or Meta properties *) + + // Equinox.Cosmos / Equinox.CosmosStore Deflate logic was as below: + // let private deflate (uncompressedBytes: byte[]) = + // let output = new MemoryStream() + // let compressor = new System.IO.Compression.DeflateStream(output, System.IO.Compression.CompressionLevel.Optimal, leaveOpen = true) + // compressor.Write(uncompressedBytes) + // compressor.Flush() // Could `Close`, but not required + // output.ToArray() + let private inflateTo output (compressedBytes: byte[]) = + let input = new System.IO.MemoryStream(compressedBytes) + let decompressor = new System.IO.Compression.DeflateStream(input, System.IO.Compression.CompressionMode.Decompress, leaveOpen = true) + decompressor.CopyTo output + let private brotliDecompressTo output (data: byte[]) = + let s = new System.IO.MemoryStream(data, writable = false) + use decompressor = new System.IO.Compression.BrotliStream(s, System.IO.Compression.CompressionMode.Decompress) + decompressor.CopyTo output + let private unpack post alg compressedBytes = + use output = new System.IO.MemoryStream() + compressedBytes |> alg output + output.ToArray() |> post + let decode_ direct expand (struct (encoding, data: JsonElement) as x) = + match encoding, data.ValueKind with + | Encoding.Deflate, JsonValueKind.String -> data.GetBytesFromBase64() |> expand inflateTo + | Encoding.Brotli, JsonValueKind.String -> data.GetBytesFromBase64() |> expand brotliDecompressTo + | _ -> direct data + let decode = decode_ unbox (unpack InteropHelpers.Utf8ToJsonElement) + let private blobToBase64StringJsonElement = Convert.ToBase64String >> JsonSerializer.SerializeToElement + let direct (raw: JsonElement): Encoded = Encoding.Direct, raw + let ofUtf8Encoded struct (encoding, data: ReadOnlyMemory): Encoded = + match encoding with + | Encoding.Deflate -> Encoding.Deflate, data.ToArray() |> blobToBase64StringJsonElement + | Encoding.Brotli -> Encoding.Brotli, data.ToArray() |> blobToBase64StringJsonElement + | _ -> Encoding.Direct, data |> InteropHelpers.Utf8ToJsonElement + let decodeUtf8 = decode_ InteropHelpers.JsonElementToUtf8 (unpack ReadOnlyMemory) + let toUtf8Encoded struct (encoding, data: JsonElement): BlobEncoded = + match encoding, data.ValueKind with + | Encoding.Deflate, JsonValueKind.String -> Encoding.Deflate, data.GetBytesFromBase64() |> ReadOnlyMemory + | Encoding.Brotli, JsonValueKind.String -> Encoding.Brotli, data.GetBytesFromBase64() |> ReadOnlyMemory + | _ -> Encoding.Direct, data |> InteropHelpers.JsonElementToUtf8 + + (* Conditional compression logic: triggered as storage layer pulls Data/Meta fields + Bodies under specified minimum size, or not meeting a required compression gain are stored directly, equivalent to if compression had not been wired in *) + + let private brotliCompress (eventBody: ReadOnlyMemory): System.IO.MemoryStream = + let output = new System.IO.MemoryStream() + use compressor = new System.IO.Compression.BrotliStream(output, System.IO.Compression.CompressionLevel.Optimal, leaveOpen = true) + compressor.Write eventBody.Span + compressor.Close() // NOTE Close, not Flush; we want the output fully terminated to reduce surprises when decompressing + output + let compress minSize minGain (raw: JsonElement): Encoded = + let utf8: ReadOnlyMemory = InteropHelpers.JsonElementToUtf8 raw + if utf8.Length < minSize then direct raw else + + let brotli = brotliCompress utf8 + if utf8.Length <= int brotli.Length + minGain then direct raw else + Encoding.Brotli, brotli.ToArray() |> blobToBase64StringJsonElement + let directUtf8 (raw: ReadOnlyMemory): Encoded = Encoding.Direct, InteropHelpers.Utf8ToJsonElement raw + let compressUtf8 minSize minGain (utf8: ReadOnlyMemory): Encoded = + if utf8.Length < minSize then directUtf8 utf8 else + + let brotli = brotliCompress utf8 + if utf8.Length <= int brotli.Length + minGain then directUtf8 utf8 else + Encoding.Brotli, brotli.ToArray() |> blobToBase64StringJsonElement + +[] +type Encoding private () = + + static member OfJsonElement(x: JsonElement): Encoded = + Impl.direct x + static member OfJsonElementCompress(options, x: JsonElement): Encoded = + Impl.compress options.minSize options.minGain x + static member OfUtf8(x: ReadOnlyMemory): Encoded = + Impl.directUtf8 x + static member OfUtf8Compress(options, x: ReadOnlyMemory): Encoded = + Impl.compressUtf8 options.minSize options.minGain x + static member OfUtf8Encoded(x: BlobEncoded): Encoded = + Impl.ofUtf8Encoded x + static member Utf8EncodedToJsonElement(x: BlobEncoded): JsonElement = + Encoding.OfUtf8Encoded x |> Encoding.ToJsonElement + static member ByteCount((_encoding, data): Encoded) = + data.GetRawText() |> System.Text.Encoding.UTF8.GetByteCount + static member ByteCountExpanded(x: Encoded) = + Impl.decode x |> _.GetRawText() |> System.Text.Encoding.UTF8.GetByteCount + static member ToJsonElement(x: Encoded): JsonElement = + Impl.decode x + static member ToUtf8(x: Encoded): ReadOnlyMemory = + Impl.decodeUtf8 x + static member ToEncodedUtf8(x: Encoded): BlobEncoded = + Impl.toUtf8Encoded x + static member ToStream(ms: System.IO.Stream, x: Encoded) = + Impl.decode_ (fun el -> JsonSerializer.Serialize(ms, el)) (fun dec -> dec ms) x + +[] +type Encoder private () = + + /// Adapts an IEventCodec rendering to JsonElement Event Bodies to encode as per Compress, but without attempting compression. + [] + static member Uncompressed<'Event, 'Context>(native: IEventCodec<'Event, JsonElement, 'Context>) + : IEventCodec<'Event, Encoded, 'Context> = + FsCodec.Core.EventCodec.mapBodies Encoding.OfJsonElement Encoding.ToJsonElement native + + /// The body will be saved as-is under the following circumstances:
+ /// - the shouldCompress predicate is not satisfied for the event in question.
+ /// - sufficient compression, as defined by options is not achieved, the body is saved as-is.
+ /// The int produced when Encodeing conveys the encoding used, and must be round tripped alongside the body as a required input of a future Decode.
+ /// NOTE this is intended for interoperability only; a Codec (such as CodecJsonElement) that encodes to JsonElement is strongly recommended unless you don't have a choice. + [] + static member CompressedUtf8<'Event, 'Context>( + native: IEventCodec<'Event, ReadOnlyMemory, 'Context>, + [] ?shouldCompress: Func>, bool>, + [] ?options) + : IEventCodec<'Event, Encoded, 'Context> = + let opts = defaultArg options CompressionOptions.Default + let encode = shouldCompress |> function + | None -> fun _x (d: ReadOnlyMemory) -> Encoding.OfUtf8Compress(opts, d) + | Some predicate -> fun x d -> if predicate.Invoke x then Encoding.OfUtf8Compress(opts, d) else Encoding.OfUtf8 d + FsCodec.Core.EventCodec.mapBodies_ encode Encoding.ToUtf8 native + + /// Adapts an IEventCodec rendering to JsonElement Event Bodies to attempt to compress the data.
+ /// The body will be saved as-is under the following circumstances:
+ /// - the shouldCompress predicate is not satisfied for the event in question.
+ /// - sufficient compression, as defined by options is not achieved, the body is saved as-is.
+ /// The int produced when Encodeing conveys the encoding used, and must be round tripped alongside the body as a required input of a future Decode.
+ [] + static member Compressed<'Event, 'Context>( + native: IEventCodec<'Event, JsonElement, 'Context>, + [] ?shouldCompress: Func, bool>, + [] ?options) + : IEventCodec<'Event, Encoded, 'Context> = + let opts = defaultArg options CompressionOptions.Default + let encode = shouldCompress |> function + | None -> fun _x (d: JsonElement) -> Encoding.OfJsonElementCompress(opts, d) + | Some predicate -> fun x d -> if predicate.Invoke x then Encoding.OfJsonElementCompress(opts, d) else Encoding.OfJsonElement d + FsCodec.Core.EventCodec.mapBodies_ encode Encoding.ToJsonElement native + + /// Adapts an IEventCodec rendering to int * JsonElement Event Bodies to render and/or consume uncompressed ReadOnlyMemory<byte>. + [] + static member AsUtf8<'Event, 'Context>(native: IEventCodec<'Event, Encoded, 'Context>) + : IEventCodec<'Event, ReadOnlyMemory, 'Context> = + FsCodec.Core.EventCodec.mapBodies Encoding.ToUtf8 Encoding.OfUtf8 native + + /// Adapts an IEventCodec rendering to int * JsonElement Event Bodies to render and/or consume uncompressed byte[]. + [] + static member AsUtf8ByteArray<'Event, 'Context>(native: IEventCodec<'Event, Encoded, 'Context>) + : IEventCodec<'Event, byte[], 'Context> = + FsCodec.Core.EventCodec.mapBodies (Encoding.ToUtf8 >> _.ToArray()) Encoding.OfUtf8 native + + /// Adapts an IEventCodec rendering to int * ReadOnlyMemory<byte> Event Bodies to encode to JsonElement, with the Decode side of the roundtrip not attempting to Compress. + [] + static member Utf8AsJsonElement<'Event, 'Context>(native: IEventCodec<'Event, BlobEncoded, 'Context>) + : IEventCodec<'Event, JsonElement, 'Context> = + FsCodec.Core.EventCodec.mapBodies (Encoding.OfUtf8Encoded >> Encoding.ToJsonElement) (Encoding.OfJsonElement >> Encoding.ToEncodedUtf8) native diff --git a/src/FsCodec.SystemTextJson/FsCodec.SystemTextJson.fsproj b/src/FsCodec.SystemTextJson/FsCodec.SystemTextJson.fsproj index ee35d82..7629b2f 100644 --- a/src/FsCodec.SystemTextJson/FsCodec.SystemTextJson.fsproj +++ b/src/FsCodec.SystemTextJson/FsCodec.SystemTextJson.fsproj @@ -19,6 +19,7 @@ + @@ -30,10 +31,9 @@ - - - - + + + diff --git a/src/FsCodec.SystemTextJson/Interop.fs b/src/FsCodec.SystemTextJson/Interop.fs index 5e6bb1e..5230d1c 100644 --- a/src/FsCodec.SystemTextJson/Interop.fs +++ b/src/FsCodec.SystemTextJson/Interop.fs @@ -21,11 +21,11 @@ type InteropHelpers private () = [] static member ToUtf8Codec<'Event, 'Context>(native: FsCodec.IEventCodec<'Event, JsonElement, 'Context>) : FsCodec.IEventCodec<'Event, ReadOnlyMemory, 'Context> = - FsCodec.Core.EventCodec.Map(native, Func<_, _> InteropHelpers.JsonElementToUtf8, Func<_, _> InteropHelpers.Utf8ToJsonElement) + FsCodec.Core.EventCodec.mapBodies InteropHelpers.JsonElementToUtf8 InteropHelpers.Utf8ToJsonElement native /// Adapts an IEventCodec that's rendering to ReadOnlyMemory<byte> Event Bodies to handle JsonElement bodies instead.
/// NOTE where possible, it's better to use CodecJsonElement in preference to Codec to encode directly in order to avoid this mapping process.
[] static member ToJsonElementCodec<'Event, 'Context>(native: FsCodec.IEventCodec<'Event, ReadOnlyMemory, 'Context>) : FsCodec.IEventCodec<'Event, JsonElement, 'Context> = - FsCodec.Core.EventCodec.Map(native, Func<_, _> InteropHelpers.Utf8ToJsonElement, Func<_, _> InteropHelpers.JsonElementToUtf8) + FsCodec.Core.EventCodec.mapBodies InteropHelpers.Utf8ToJsonElement InteropHelpers.JsonElementToUtf8 native diff --git a/src/FsCodec/Encoding.fs b/src/FsCodec/Encoding.fs new file mode 100644 index 0000000..4cf3cc1 --- /dev/null +++ b/src/FsCodec/Encoding.fs @@ -0,0 +1,109 @@ +namespace FsCodec + +open System +open System.Runtime.CompilerServices +open System.Runtime.InteropServices + +/// Represents the body of an Event (or its Metadata), holding the encoded form of the buffer together with an enum value signifying the encoding scheme. +/// Enables the decoding side to transparently inflate the data on loading without burdening the application layer with tracking the encoding scheme used +type Encoded = (struct(int * ReadOnlyMemory)) + +module Encoding = + let [] Direct = 0 // Assumed for all values not listed here + let [] Deflate = 1 // Deprecated encoding produced by versions pre 3.0.0-rc.13; no longer produced + let [] Brotli = 2 // Default encoding as of 3.0.0-rc.13 + +module private Impl = + + (* Decompression logic: triggered by extension methods below at the point where the Codec's Decode retrieves the Data or Meta properties *) + + // In versions pre 3.0.0-rc.13, the compression was implemented as follows; NOTE: use of Flush vs Close saves space but is unconventional + // let private deflate (eventBody: ReadOnlyMemory): System.IO.MemoryStream = + // let output = new System.IO.MemoryStream() + // let compressor = new System.IO.Compression.DeflateStream(output, System.IO.Compression.CompressionLevel.Optimal, leaveOpen = true) + // compressor.Write(eventBody.Span) + // compressor.Flush() // NOTE: using Flush in lieu of close means the result is not padded, which can hinder interop + // output + let private inflateTo output (data: ReadOnlyMemory) = + let input = new System.IO.MemoryStream(data.ToArray(), writable = false) + let decompressor = new System.IO.Compression.DeflateStream(input, System.IO.Compression.CompressionMode.Decompress, leaveOpen = true) + decompressor.CopyTo output + let private brotliDecompressTo output (data: ReadOnlyMemory) = + let input = new System.IO.MemoryStream(data.ToArray(), writable = false) + use decompressor = new System.IO.Compression.BrotliStream(input, System.IO.Compression.CompressionMode.Decompress) + decompressor.CopyTo output + let private unpack alg compressedBytes = + use output = new System.IO.MemoryStream() + compressedBytes |> alg output + output.ToArray() |> ReadOnlyMemory + let decode struct (encoding, data): ReadOnlyMemory = + match encoding with + | Encoding.Deflate -> data |> unpack inflateTo + | Encoding.Brotli -> data |> unpack brotliDecompressTo + | Encoding.Direct | _ -> data + + (* Conditional compression logic: triggered as storage layer pulls Data/Meta fields + Bodies under specified minimum size, or not meeting a required compression gain are stored directly, equivalent to if compression had not been wired in *) + + let private brotliCompress (eventBody: ReadOnlyMemory): System.IO.MemoryStream = + let output = new System.IO.MemoryStream() + use compressor = new System.IO.Compression.BrotliStream(output, System.IO.Compression.CompressionLevel.Optimal, leaveOpen = true) + compressor.Write(eventBody.Span) + compressor.Close() // NOTE Close, not Flush; we want the output fully terminated to reduce surprises when decompressing + output + let encodeUncompressed (raw: ReadOnlyMemory): Encoded = Encoding.Direct, raw + let tryCompress minSize minGain (raw: ReadOnlyMemory): Encoded = + if raw.Length < minSize then encodeUncompressed raw + else match brotliCompress raw with + | tmp when raw.Length > int tmp.Length + minGain -> Encoding.Brotli, tmp.ToArray() |> ReadOnlyMemory + | _ -> encodeUncompressed raw + +type [] CompressionOptions = { minSize: int; minGain: int } with + /// Attempt to compress anything possible + // TL;DR in general it's worth compressing everything to minimize RU consumption both on insert and update + // For DynamoStore, every time we need to calve from the tip, the RU impact of using TransactWriteItems is significant, + // so preventing or delaying that is of critical importance + // Empirically not much JSON below 48 bytes actually compresses - while we don't assume that, it is what is guiding the derivation of the default + static member Default = { minSize = 48; minGain = 4 } + +[] +type Encoding private () = + + static member OfBlob(x: ReadOnlyMemory): Encoded = + Impl.encodeUncompressed x + static member OfBlobCompress(options, x: ReadOnlyMemory): Encoded = + Impl.tryCompress options.minSize options.minGain x + static member ToBlob(x: Encoded): ReadOnlyMemory = + Impl.decode x + static member ByteCount((_encoding, data): Encoded) = + data.Length + +[] +type Encoder private () = + + /// Adapts an IEventCodec rendering to ReadOnlyMemory<byte> Event Bodies to attempt to compress the data.
+ /// If sufficient compression, as defined by options is not achieved, the body is saved as-is.
+ /// The int conveys a value that must be round tripped alongside the body in order for the decoding process to correctly interpret it.
+ [] + static member Compressed<'Event, 'Context>(native: IEventCodec<'Event, ReadOnlyMemory, 'Context>, [] ?options) + : IEventCodec<'Event, Encoded, 'Context> = + let opts = defaultArg options CompressionOptions.Default + FsCodec.Core.EventCodec.mapBodies (fun d -> Encoding.OfBlobCompress(opts, d)) Encoding.ToBlob native + + /// Adapts an IEventCodec rendering to ReadOnlyMemory<byte> Event Bodies to encode as per Compressed, but without attempting compression. + [] + static member Uncompressed<'Event, 'Context>(native: IEventCodec<'Event, ReadOnlyMemory, 'Context>) + : IEventCodec<'Event, Encoded, 'Context> = + FsCodec.Core.EventCodec.mapBodies Encoding.OfBlob Encoding.ToBlob native + + /// Adapts an IEventCodec rendering to int * ReadOnlyMemory<byte> Event Bodies to render and/or consume from Uncompressed ReadOnlyMemory<byte>. + [] + static member AsBlob<'Event, 'Context>(native: IEventCodec<'Event, Encoded, 'Context>) + : IEventCodec<'Event, ReadOnlyMemory, 'Context> = + FsCodec.Core.EventCodec.mapBodies Encoding.ToBlob Encoding.OfBlob native + + /// Adapts an IEventCodec rendering to int * ReadOnlyMemory<byte> Event Bodies to render and/or consume from Uncompressed byte[]. + [] + static member AsByteArray<'Event, 'Context>(native: IEventCodec<'Event, Encoded, 'Context>) + : IEventCodec<'Event, byte[], 'Context> = + FsCodec.Core.EventCodec.mapBodies (Encoding.ToBlob >> _.ToArray()) Encoding.OfBlob native diff --git a/src/FsCodec/FsCodec.fs b/src/FsCodec/FsCodec.fs index ff203ed..28311c8 100755 --- a/src/FsCodec/FsCodec.fs +++ b/src/FsCodec/FsCodec.fs @@ -169,9 +169,7 @@ type EventCodec<'Event, 'Format, 'Context> private () = let encoded = downConvert.Invoke target native.Decode encoded } - // NOTE To be be replaced by MapBodies/EventCodec.mapBodies for symmetry with TimelineEvent and EventData - // TO BE be Obsoleted and whenever FsCodec.Box is next released - [] + [] static member Map<'TargetFormat>(native: IEventCodec<'Event, 'Format, 'Context>, up: Func<'Format, 'TargetFormat>, down: Func<'TargetFormat, 'Format>) : IEventCodec<'Event, 'TargetFormat, 'Context> = EventCodec.MapBodies(native, Func<_, _, _>(fun _x -> up.Invoke), down) diff --git a/src/FsCodec/FsCodec.fsproj b/src/FsCodec/FsCodec.fsproj index 2fa424e..cb5b675 100644 --- a/src/FsCodec/FsCodec.fsproj +++ b/src/FsCodec/FsCodec.fsproj @@ -3,13 +3,16 @@ - netstandard2.0 + netstandard2.1 + + PKV006 3.0.0 + diff --git a/tests/FsCodec.SystemTextJson.Tests/EncodingTests.fs b/tests/FsCodec.SystemTextJson.Tests/EncodingTests.fs new file mode 100644 index 0000000..0e3545a --- /dev/null +++ b/tests/FsCodec.SystemTextJson.Tests/EncodingTests.fs @@ -0,0 +1,100 @@ +module FsCodec.SystemTextJson.Tests.EncodingTests + +open Swensen.Unquote +open System +open System.Text.Json +open Xunit + +let inline roundtrip (sut: FsCodec.IEventCodec<'event, 'F, unit>) value = + let encoded = sut.Encode((), value = value) + let loaded = FsCodec.Core.TimelineEvent.Create(-1L, encoded) + sut.Decode loaded + +(* Base Fixture Round-trips a String encoded as JsonElement *) + +module StringUtf8 = + + let eventType = "EventType" + let enc (x: 't): JsonElement = JsonSerializer.SerializeToElement x + let dec (b: JsonElement): 't = JsonSerializer.Deserialize b + let jsonElementCodec<'t> = + let encode e = struct (eventType, enc e) + let decode s (b: JsonElement) = if s = eventType then ValueSome (dec b) else invalidOp "Invalid eventType value" + FsCodec.Codec.Create(encode, decode) + + let sut<'t> = jsonElementCodec<'t> + + let [] roundtrips () = + let value = {| value = "Hello World" |} + let res' = roundtrip sut value + res' =! ValueSome value + +module InternalDecoding = + + let inputValue = {| value = "Hello World" |} + // A JsonElement that's a JSON Object should be handled as an uncompressed value + let direct = struct (0, JsonSerializer.SerializeToElement inputValue) + let explicitDeflate = struct (1, JsonSerializer.SerializeToElement "qlYqS8wpTVWyUvJIzcnJVwjPL8pJUaoFAAAA//8=") + let explicitBrotli = struct (2, JsonSerializer.SerializeToElement "CwuAeyJ2YWx1ZSI6IkhlbGxvIFdvcmxkIn0D") + + let decode useRom = + if useRom then FsCodec.SystemTextJson.Encoding.ToUtf8 >> _.ToArray() >> JsonSerializer.Deserialize + else FsCodec.SystemTextJson.Encoding.ToJsonElement >> JsonSerializer.Deserialize + + let [] ``Can decode all known representations`` useRom = + test <@ decode useRom direct = inputValue @> + test <@ decode useRom explicitDeflate = inputValue @> + test <@ decode useRom explicitBrotli = inputValue @> + + let [] ``Defaults to leaving the body alone if unknown`` useRom = + let struct (_, je) = direct + let body = struct (99, je) + let decoded = decode useRom body + test <@ decoded = inputValue @> + + let [] ``Defaults to leaving the body alone if string`` useRom = + let body = struct (99, JsonSerializer.SerializeToElement "test") + let decoded = decode useRom body + test <@ "test" = decoded @> + +type JsonElement with member x.Utf8ByteCount = if x.ValueKind = JsonValueKind.Null then 0 else x.GetRawText() |> System.Text.Encoding.UTF8.GetByteCount + +module TryCompress = + + let sut = FsCodec.SystemTextJson.Encoder.Compressed StringUtf8.sut + + let compressibleValue = {| value = String('x', 5000) |} + + let [] roundtrips () = + let res' = roundtrip sut compressibleValue + res' =! ValueSome compressibleValue + + let [] ``compresses when possible`` () = + let encoded = sut.Encode((), value = compressibleValue) + let struct (_encoding, encodedValue) = encoded.Data + encodedValue.Utf8ByteCount ] ``produces equivalent JsonElement where compression not possible`` () = + let value = {| value = "NotCompressible" |} + let directResult = StringUtf8.sut.Encode((), value).Data + let failedToCompressResult = sut.Encode((), value = value) + let struct (_encoding, result) = failedToCompressResult.Data + true =! JsonElement.DeepEquals(directResult, result) + +module Uncompressed = + + let sut = FsCodec.SystemTextJson.Encoder.Uncompressed StringUtf8.sut + + // Borrow the value we just demonstrated to be compressible + let compressibleValue = TryCompress.compressibleValue + + let [] roundtrips () = + let rom = ReadOnlyMemory(null : byte[]) + let res' = roundtrip sut compressibleValue + res' =! ValueSome compressibleValue + + let [] ``does not compress (despite it being possible to)`` () = + let directResult = StringUtf8.sut.Encode((), compressibleValue).Data + let shouldNotBeCompressedResult = sut.Encode((), value = compressibleValue) + let struct (_encoding, result) = shouldNotBeCompressedResult.Data + result.Utf8ByteCount =! directResult.Utf8ByteCount diff --git a/tests/FsCodec.SystemTextJson.Tests/Examples.fsx b/tests/FsCodec.SystemTextJson.Tests/Examples.fsx index ee24363..3f40945 100755 --- a/tests/FsCodec.SystemTextJson.Tests/Examples.fsx +++ b/tests/FsCodec.SystemTextJson.Tests/Examples.fsx @@ -511,3 +511,26 @@ Client ClientB, event 2 meta { principal = "me" } event Removed { name = null } Unhandled Event: Category Misc, Id x, Index 0, Event: "Dummy" *) + +(* Well known states for Compression regression tests *) + +open System +let private brotliCompress (eventBody: ReadOnlyMemory): System.IO.MemoryStream = + let output = new System.IO.MemoryStream() + use compressor = new System.IO.Compression.BrotliStream(output, System.IO.Compression.CompressionLevel.Optimal, leaveOpen = true) + compressor.Write eventBody.Span + compressor.Close() // NOTE Close, not Flush; we want the output fully terminated to reduce surprises when decompressing + output + +/// Equinox.Cosmos / Equinox.CosmosStore Deflate logic was exactly as below, do not tweak: +let private deflate (uncompressedBytes: byte[]) = + let output = new System.IO.MemoryStream() + let compressor = new System.IO.Compression.DeflateStream(output, System.IO.Compression.CompressionLevel.Optimal, leaveOpen = true) + compressor.Write(uncompressedBytes) + compressor.Flush() // Could `Close`, but not required + output.ToArray() +let raw = {| value = "Hello World" |} + +[| raw |> System.Text.Json.JsonSerializer.SerializeToUtf8Bytes |> ReadOnlyMemory |> brotliCompress |> _.ToArray() + raw |> System.Text.Json.JsonSerializer.SerializeToUtf8Bytes |> deflate |] +|> Array.map Convert.ToBase64String diff --git a/tests/FsCodec.SystemTextJson.Tests/FsCodec.SystemTextJson.Tests.fsproj b/tests/FsCodec.SystemTextJson.Tests/FsCodec.SystemTextJson.Tests.fsproj index ac6b204..dbc02fc 100644 --- a/tests/FsCodec.SystemTextJson.Tests/FsCodec.SystemTextJson.Tests.fsproj +++ b/tests/FsCodec.SystemTextJson.Tests/FsCodec.SystemTextJson.Tests.fsproj @@ -43,6 +43,7 @@ SomeNullHandlingTests.fs + diff --git a/tests/FsCodec.Tests/CompressionTests.fs b/tests/FsCodec.Tests/EncodingTests.fs similarity index 88% rename from tests/FsCodec.Tests/CompressionTests.fs rename to tests/FsCodec.Tests/EncodingTests.fs index ab89e32..2dcc7ce 100644 --- a/tests/FsCodec.Tests/CompressionTests.fs +++ b/tests/FsCodec.Tests/EncodingTests.fs @@ -1,4 +1,4 @@ -module FsCodec.Tests.CompressionTests +module FsCodec.Tests.EncodingTests open System open Swensen.Unquote @@ -30,7 +30,7 @@ module StringUtf8 = module TryCompress = - let sut = FsCodec.Compression.EncodeTryCompress(StringUtf8.sut) + let sut = FsCodec.Encoder.Compressed(StringUtf8.sut) let compressibleValue = String('x', 5000) @@ -52,7 +52,7 @@ module TryCompress = module Uncompressed = - let sut = FsCodec.Compression.EncodeUncompressed(StringUtf8.sut) + let sut = FsCodec.Encoder.Uncompressed(StringUtf8.sut) // Borrow a demonstrably compressible value let value = TryCompress.compressibleValue @@ -74,13 +74,13 @@ module Decoding = let brotli = struct(2, Convert.FromBase64String("CwWASGVsbG8gV29ybGQ=") |> ReadOnlyMemory) let [] ``Can decode all known bodies`` () = - let decode = FsCodec.Compression.EncodedToByteArray >> Text.Encoding.UTF8.GetString + let decode = FsCodec.Encoding.ToBlob >> _.ToArray() >> Text.Encoding.UTF8.GetString test <@ decode raw = "Hello World" @> test <@ decode deflated = "Hello World" @> test <@ decode brotli = "Hello World" @> let [] ``Defaults to leaving the memory alone if unknown`` () = let struct(_, mem) = raw - let body = struct(99, mem) - let decoded = body |> FsCodec.Compression.EncodedToByteArray |> Text.Encoding.UTF8.GetString + let body = struct (99, mem) + let decoded = body |> FsCodec.Encoding.ToBlob |> _.ToArray() |> Text.Encoding.UTF8.GetString test <@ decoded = "Hello World" @> diff --git a/tests/FsCodec.Tests/FsCodec.Tests.fsproj b/tests/FsCodec.Tests/FsCodec.Tests.fsproj index e46994f..c635b36 100644 --- a/tests/FsCodec.Tests/FsCodec.Tests.fsproj +++ b/tests/FsCodec.Tests/FsCodec.Tests.fsproj @@ -7,7 +7,7 @@ - + @@ -21,6 +21,7 @@ +