Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(SystemTextJson): Encoding with conditional compression #126

Open
wants to merge 11 commits into
base: mapex
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
### Added

- `Core.EventData.MapEx`: Enable contextual encoding of bodies [#127](https://github.com/jet/FsCodec/pull/127)
- `SystemTextJson.Compression`: Conditional compression as per `Box.Compression` [#126](https://github.com/jet/FsCodec/pull/126)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix


### Changed
### Removed
Expand Down
2 changes: 1 addition & 1 deletion src/FsCodec.Box/ByteArray.fs
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ type ByteArray private () =
[<Extension>]
static member ToByteArrayCodec<'Event, 'Context>(native: IEventCodec<'Event, ReadOnlyMemory<byte>, 'Context>)
: IEventCodec<'Event, byte[], 'Context> =
FsCodec.Core.EventCodec.Map(native, Func<_, _> ByteArray.ReadOnlyMemoryToBytes, Func<_, _> ByteArray.BytesToReadOnlyMemory)
FsCodec.Core.EventCodec.mapBodies ByteArray.ReadOnlyMemoryToBytes ByteArray.BytesToReadOnlyMemory native
75 changes: 10 additions & 65 deletions src/FsCodec.Box/Compression.fs
Original file line number Diff line number Diff line change
Expand Up @@ -8,77 +8,21 @@ open System.Runtime.InteropServices
/// Enables the decoding side to transparently inflate the data on loading without burdening the application layer with tracking the encoding scheme used
type EncodedBody = (struct(int * ReadOnlyMemory<byte>))

module private EncodedMaybeCompressed =

module Encoding =
let [<Literal>] Direct = 0 // Assumed for all values not listed here
let [<Literal>] Deflate = 1 // Deprecated encoding produced by versions pre 3.0.0-rc.13; no longer produced
let [<Literal>] Brotli = 2 // Default encoding as of 3.0.0-rc.13

(* Decompression logic: triggered by extension methods below at the point where the Codec's Decode retrieves the Data or Meta properties *)

// In versions pre 3.0.0-rc.13, the compression was implemented as follows; NOTE: use of Flush vs Close saves space but is unconventional
// let private deflate (eventBody: ReadOnlyMemory<byte>): System.IO.MemoryStream =
// let output = new System.IO.MemoryStream()
// let compressor = new System.IO.Compression.DeflateStream(output, System.IO.Compression.CompressionLevel.Optimal, leaveOpen = true)
// compressor.Write(eventBody.Span)
// compressor.Flush() // NOTE: using Flush in lieu of close means the result is not padded, which can hinder interop
// output
let private inflate (data: ReadOnlyMemory<byte>): byte[] =
let s = new System.IO.MemoryStream(data.ToArray(), writable = false)
let decompressor = new System.IO.Compression.DeflateStream(s, System.IO.Compression.CompressionMode.Decompress, leaveOpen = true)
let output = new System.IO.MemoryStream()
decompressor.CopyTo output
output.ToArray()
let private brotliDecompress (data: ReadOnlyMemory<byte>): byte[] =
let s = new System.IO.MemoryStream(data.ToArray(), writable = false)
use decompressor = new System.IO.Compression.BrotliStream(s, System.IO.Compression.CompressionMode.Decompress)
use output = new System.IO.MemoryStream()
decompressor.CopyTo output
output.ToArray()
let decode struct (encoding, data): ReadOnlyMemory<byte> =
match encoding with
| Encoding.Deflate -> inflate data |> ReadOnlyMemory
| Encoding.Brotli -> brotliDecompress data |> ReadOnlyMemory
| Encoding.Direct | _ -> data

(* Conditional compression logic: triggered as storage layer pulls Data/Meta fields
Bodies under specified minimum size, or not meeting a required compression gain are stored directly, equivalent to if compression had not been wired in *)

let private brotliCompress (eventBody: ReadOnlyMemory<byte>): System.IO.MemoryStream =
let output = new System.IO.MemoryStream()
use compressor = new System.IO.Compression.BrotliStream(output, System.IO.Compression.CompressionLevel.Optimal, leaveOpen = true)
compressor.Write(eventBody.Span)
compressor.Close() // NOTE Close, not Flush; we want the output fully terminated to reduce surprises when decompressing
output
let encodeUncompressed (raw: ReadOnlyMemory<byte>): EncodedBody = Encoding.Direct, raw
let encode minSize minGain (raw: ReadOnlyMemory<byte>): EncodedBody =
if raw.Length < minSize then encodeUncompressed raw
else match brotliCompress raw with
| tmp when raw.Length > int tmp.Length + minGain -> Encoding.Brotli, tmp.ToArray() |> ReadOnlyMemory
| _ -> encodeUncompressed raw

type [<Struct>] CompressionOptions = { minSize: int; minGain: int } with
/// Attempt to compress anything possible
// TL;DR in general it's worth compressing everything to minimize RU consumption both on insert and update
// For DynamoStore, every time we need to calve from the tip, the RU impact of using TransactWriteItems is significant,
// so preventing or delaying that is of critical importance
// Empirically not much JSON below 48 bytes actually compresses - while we don't assume that, it is what is guiding the derivation of the default
static member Default = { minSize = 48; minGain = 4 }
/// Encode the data without attempting to compress, regardless of size
static member Uncompressed = { minSize = Int32.MaxValue; minGain = 0 }

[<Extension; AbstractClass; Sealed>]
[<Extension; AbstractClass; Sealed; Obsolete "Please use FsCodec.Encoding instead">]
type Compression private () =

static member Utf8ToEncodedDirect(x: ReadOnlyMemory<byte>): EncodedBody =
EncodedMaybeCompressed.encodeUncompressed x
FsCodec.Encoding.OfBlob x
static member Utf8ToEncodedTryCompress(options, x: ReadOnlyMemory<byte>): EncodedBody =
EncodedMaybeCompressed.encode options.minSize options.minGain x
FsCodec.Encoding.OfBlobCompress({ minSize = options.minSize; minGain = options.minGain }, x)
static member EncodedToUtf8(x: EncodedBody): ReadOnlyMemory<byte> =
EncodedMaybeCompressed.decode x
FsCodec.Encoding.ToBlob x
static member EncodedToByteArray(x: EncodedBody): byte[] =
Compression.EncodedToUtf8(x).ToArray()
FsCodec.Encoding.ToBlob(x).ToArray()

/// <summary>Adapts an <c>IEventCodec</c> rendering to <c>ReadOnlyMemory&lt;byte&gt;</c> Event Bodies to attempt to compress the data.<br/>
/// If sufficient compression, as defined by <c>options</c> is not achieved, the body is saved as-is.<br/>
Expand All @@ -87,22 +31,23 @@ type Compression private () =
static member EncodeTryCompress<'Event, 'Context>(native: IEventCodec<'Event, ReadOnlyMemory<byte>, 'Context>, [<Optional; DefaultParameterValue null>] ?options)
: IEventCodec<'Event, EncodedBody, 'Context> =
let opts = defaultArg options CompressionOptions.Default
FsCodec.Core.EventCodec.Map(native, (fun d -> Compression.Utf8ToEncodedTryCompress(opts, d)), Func<_, _> Compression.EncodedToUtf8)
let opts: FsCodec.CompressionOptions = { minSize = opts.minSize; minGain = opts.minGain }
FsCodec.Core.EventCodec.mapBodies (fun d -> Encoding.OfBlobCompress(opts, d)) Encoding.ToBlob native

/// <summary>Adapts an <c>IEventCodec</c> rendering to <c>ReadOnlyMemory&lt;byte&gt;</c> Event Bodies to encode as per <c>EncodeTryCompress</c>, but without attempting compression.</summary>
[<Extension>]
static member EncodeUncompressed<'Event, 'Context>(native: IEventCodec<'Event, ReadOnlyMemory<byte>, 'Context>)
: IEventCodec<'Event, EncodedBody, 'Context> =
FsCodec.Core.EventCodec.Map(native, Func<_, _> Compression.Utf8ToEncodedDirect, Func<_, _> Compression.EncodedToUtf8)
Encoder.Uncompressed native

/// <summary>Adapts an <c>IEventCodec</c> rendering to <c>int * ReadOnlyMemory&lt;byte&gt;</c> Event Bodies to render and/or consume from Uncompressed <c>ReadOnlyMemory&lt;byte&gt;</c>.</summary>
[<Extension>]
static member ToUtf8Codec<'Event, 'Context>(native: IEventCodec<'Event, EncodedBody, 'Context>)
: IEventCodec<'Event, ReadOnlyMemory<byte>, 'Context> =
FsCodec.Core.EventCodec.Map(native, Func<_, _> Compression.EncodedToUtf8, Func<_, _> Compression.Utf8ToEncodedDirect)
Encoder.AsBlob native

/// <summary>Adapts an <c>IEventCodec</c> rendering to <c>int * ReadOnlyMemory&lt;byte&gt;</c> Event Bodies to render and/or consume from Uncompressed <c>byte[]</c>.</summary>
[<Extension>]
static member ToByteArrayCodec<'Event, 'Context>(native: IEventCodec<'Event, EncodedBody, 'Context>)
: IEventCodec<'Event, byte[], 'Context> =
FsCodec.Core.EventCodec.Map(native, Func<_, _> Compression.EncodedToByteArray, Func<_, _> Compression.Utf8ToEncodedDirect)
Encoder.AsByteArray native
6 changes: 3 additions & 3 deletions src/FsCodec.Box/FsCodec.Box.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Condition=" '$(Configuration)' == 'Debug' " Include="../FsCodec/FsCodec.fsproj" />
<!-- TODO if taking a dependency on 3.1, the impl should switch to EventCodec.mapBodies, and EventCodec.Map should be Obsoleted -->
<PackageReference Condition=" '$(Configuration)' == 'Release' " Include="FsCodec" Version="[3.0.0, 4.0.0)" />
<ProjectReference Include="../FsCodec/FsCodec.fsproj" />
<!-- <ProjectReference Condition=" '$(Configuration)' == 'Debug' " Include="../FsCodec/FsCodec.fsproj" />-->
<!-- <PackageReference Condition=" '$(Configuration)' == 'Release' " Include="FsCodec" Version="[3.1.0, 4.0.0)" />-->
</ItemGroup>

</Project>
153 changes: 153 additions & 0 deletions src/FsCodec.SystemTextJson/Encoding.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
namespace FsCodec.SystemTextJson

open FsCodec
open FsCodec.SystemTextJson.Interop
open System
open System.Runtime.CompilerServices
open System.Runtime.InteropServices
open System.Text.Json

/// Represents the body of an Event (or its Metadata), holding the encoded form of the buffer together with an enum value identifying the encoding scheme.
/// Enables the decoding side to transparently inflate the data on loading without burdening the application layer with tracking the encoding scheme used.
type EncodedBody = (struct(int * JsonElement))

module private Impl =

(* Decompression logic: triggered by extension methods below at the point where the Codec's Decode retrieves the Data or Meta properties *)

// Equinox.Cosmos / Equinox.CosmosStore Deflate logic was as below:
// let private deflate (uncompressedBytes: byte[]) =
// let output = new MemoryStream()
// let compressor = new System.IO.Compression.DeflateStream(output, System.IO.Compression.CompressionLevel.Optimal, leaveOpen = true)
// compressor.Write(uncompressedBytes)
// compressor.Flush() // Could `Close`, but not required
// output.ToArray()
let private inflateTo output (compressedBytes: byte[]) =
let input = new System.IO.MemoryStream(compressedBytes)
let decompressor = new System.IO.Compression.DeflateStream(input, System.IO.Compression.CompressionMode.Decompress, leaveOpen = true)
decompressor.CopyTo output
let private brotliDecompressTo output (data: byte[]) =
let s = new System.IO.MemoryStream(data, writable = false)
use decompressor = new System.IO.Compression.BrotliStream(s, System.IO.Compression.CompressionMode.Decompress)
decompressor.CopyTo output
let private unpack post alg compressedBytes =
use output = new System.IO.MemoryStream()
compressedBytes |> alg output
output.ToArray() |> post
let decode_ direct expand struct (encoding, data: JsonElement) =
match encoding, data.ValueKind with
| Encoding.Deflate, JsonValueKind.String -> data.GetBytesFromBase64() |> expand inflateTo
| Encoding.Brotli, JsonValueKind.String -> data.GetBytesFromBase64() |> expand brotliDecompressTo
| _ -> data |> direct
let decode = decode_ id (unpack InteropHelpers.Utf8ToJsonElement)
let private blobToBase64StringJsonElement = Convert.ToBase64String >> JsonSerializer.SerializeToElement
let direct (raw: JsonElement): EncodedBody = Encoding.Direct, raw
let recode struct (encoding, data: ReadOnlyMemory<byte>): EncodedBody =
match encoding with
| Encoding.Deflate -> Encoding.Deflate, data.ToArray() |> blobToBase64StringJsonElement
| Encoding.Brotli -> Encoding.Brotli, data.ToArray() |> blobToBase64StringJsonElement
| _ -> Encoding.Direct, data.ToArray() |> blobToBase64StringJsonElement
let decodeUtf8 = decode_ InteropHelpers.JsonElementToUtf8 (unpack ReadOnlyMemory<byte>)

(* Conditional compression logic: triggered as storage layer pulls Data/Meta fields
Bodies under specified minimum size, or not meeting a required compression gain are stored directly, equivalent to if compression had not been wired in *)

let private brotliCompress (eventBody: ReadOnlyMemory<byte>): System.IO.MemoryStream =
let output = new System.IO.MemoryStream()
use compressor = new System.IO.Compression.BrotliStream(output, System.IO.Compression.CompressionLevel.Optimal, leaveOpen = true)
compressor.Write eventBody.Span
compressor.Close() // NOTE Close, not Flush; we want the output fully terminated to reduce surprises when decompressing
output
let compress minSize minGain (raw: JsonElement): EncodedBody =
let utf8: ReadOnlyMemory<byte> = InteropHelpers.JsonElementToUtf8 raw
if utf8.Length < minSize then direct raw else

let brotli = brotliCompress utf8
if utf8.Length <= int brotli.Length + minGain then direct raw else
Encoding.Brotli, brotli.ToArray() |> blobToBase64StringJsonElement
let directUtf8 (raw: ReadOnlyMemory<byte>): EncodedBody = Encoding.Direct, InteropHelpers.Utf8ToJsonElement raw
let compressUtf8 minSize minGain (utf8: ReadOnlyMemory<byte>): EncodedBody =
if utf8.Length < minSize then directUtf8 utf8 else

let brotli = brotliCompress utf8
if utf8.Length <= int brotli.Length + minGain then directUtf8 utf8 else
Encoding.Brotli, brotli.ToArray() |> blobToBase64StringJsonElement

[<AbstractClass; Sealed>]
type Encoding private () =

static member OfJsonElement(x: JsonElement): EncodedBody =
Impl.direct x
static member OfJsonElementCompress(options, x: JsonElement): EncodedBody =
Impl.compress options.minSize options.minGain x
static member OfUtf8(x: ReadOnlyMemory<byte>): EncodedBody =
Impl.directUtf8 x
static member OfUtf8Compress(options, x: ReadOnlyMemory<byte>): EncodedBody =
Impl.compressUtf8 options.minSize options.minGain x
static member OfEncodedUtf8(x: FsCodec.EncodedBody): EncodedBody =
Impl.recode x
static member ByteCount((_encoding, data): EncodedBody) =
data.GetRawText() |> System.Text.Encoding.UTF8.GetByteCount
static member ByteCountExpanded(x: EncodedBody) =
Impl.decode x |> _.GetRawText() |> System.Text.Encoding.UTF8.GetByteCount
static member ToJsonElement(x: EncodedBody): JsonElement =
Impl.decode x
static member ToUtf8(x: EncodedBody): ReadOnlyMemory<byte> =
Impl.decodeUtf8 x
static member ToStream(ms: System.IO.Stream, x: EncodedBody) =
Impl.decode_ (fun el -> JsonSerializer.Serialize(ms, el)) (fun dec -> dec ms) x

[<Extension; AbstractClass; Sealed>]
type Encoder private () =

/// <summary>Adapts an <c>IEventCodec</c> rendering to <c>JsonElement</c> Event Bodies to encode as per <c>EncodeTryCompress</c>, but without attempting compression.</summary>
[<Extension>]
static member Uncompressed<'Event, 'Context>(native: IEventCodec<'Event, JsonElement, 'Context>)
: IEventCodec<'Event, EncodedBody, 'Context> =
FsCodec.Core.EventCodec.mapBodies Encoding.OfJsonElement Encoding.ToJsonElement native

/// <summary>The body will be saved as-is under the following circumstances:<br/>
/// - the <c>shouldCompress</c> predicate is not satisfied for the event in question.<br/>
/// - sufficient compression, as defined by <c>options</c> is not achieved, the body is saved as-is.<br/>
/// The <c>int</c> produced when <c>Encode</c>ing conveys the encoding used, and must be round tripped alongside the body as a required input of a future <c>Decode</c>.</summary>
/// <remarks>NOTE this is intended for interoperability only; a Codec (such as <c>CodecJsonElement</c>) that encodes to <c>JsonElement</c> is strongly recommended unless you don't have a choice.</remarks>
[<Extension>]
static member CompressedUtf8<'Event, 'Context>(
native: IEventCodec<'Event, ReadOnlyMemory<byte>, 'Context>,
[<Optional; DefaultParameterValue null>] ?shouldCompress: Func<IEventData<ReadOnlyMemory<byte>>, bool>,
[<Optional; DefaultParameterValue null>] ?options)
: IEventCodec<'Event, EncodedBody, 'Context> =
let opts = defaultArg options CompressionOptions.Default
let encode = shouldCompress |> function
| None -> fun _x (d: ReadOnlyMemory<byte>) -> Encoding.OfUtf8Compress(opts, d)
| Some predicate -> fun x d -> if predicate.Invoke x then Encoding.OfUtf8Compress(opts, d) else Encoding.OfUtf8 d
FsCodec.Core.EventCodec.mapBodies_ encode Encoding.ToUtf8 native

/// <summary>Adapts an <c>IEventCodec</c> rendering to <c>JsonElement</c> Event Bodies to attempt to compress the data.<br/>
/// The body will be saved as-is under the following circumstances:<br/>
/// - the <c>shouldCompress</c> predicate is not satisfied for the event in question.<br/>
/// - sufficient compression, as defined by <c>options</c> is not achieved, the body is saved as-is.<br/>
/// The <c>int</c> produced when <c>Encode</c>ing conveys the encoding used, and must be round tripped alongside the body as a required input of a future <c>Decode</c>.</summary>
[<Extension>]
static member Compressed<'Event, 'Context>(
native: IEventCodec<'Event, JsonElement, 'Context>,
[<Optional; DefaultParameterValue null>] ?shouldCompress: Func<IEventData<JsonElement>, bool>,
[<Optional; DefaultParameterValue null>] ?options)
: IEventCodec<'Event, EncodedBody, 'Context> =
let opts = defaultArg options CompressionOptions.Default
let encode = shouldCompress |> function
| None -> fun _x (d: JsonElement) -> Encoding.OfJsonElementCompress(opts, d)
| Some predicate -> fun x d -> if predicate.Invoke x then Encoding.OfJsonElementCompress(opts, d) else Encoding.OfJsonElement d
FsCodec.Core.EventCodec.mapBodies_ encode Encoding.ToJsonElement native

/// <summary>Adapts an <c>IEventCodec</c> rendering to <c>int * JsonElement</c> Event Bodies to render and/or consume uncompressed <c>ReadOnlyMemory&lt;byte&gt;</c>.</summary>
[<Extension>]
static member AsUtf8<'Event, 'Context>(native: IEventCodec<'Event, EncodedBody, 'Context>)
: IEventCodec<'Event, ReadOnlyMemory<byte>, 'Context> =
FsCodec.Core.EventCodec.mapBodies Encoding.ToUtf8 Encoding.OfUtf8 native

/// <summary>Adapts an <c>IEventCodec</c> rendering to <c>int * JsonElement</c> Event Bodies to render and/or consume uncompressed <c>byte[]</c>.</summary>
[<Extension>]
static member AsUtf8ByteArray<'Event, 'Context>(native: IEventCodec<'Event, EncodedBody, 'Context>)
: IEventCodec<'Event, byte[], 'Context> =
FsCodec.Core.EventCodec.mapBodies (Encoding.ToUtf8 >> _.ToArray()) Encoding.OfUtf8 native
Loading