Skip to content

Commit

Permalink
feat(SystemTextJson): Encoding with conditional compression (#126)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink authored Jan 29, 2025
1 parent a87c89d commit 309fb78
Show file tree
Hide file tree
Showing 15 changed files with 455 additions and 96 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ The `Unreleased` section name is replaced by the expected version of next releas
## [Unreleased]

### Added

- `SystemTextJson.Encoding`: Conditional compression as per `FsCodec.Encoding` [#126](https://github.com/jet/FsCodec/pull/126)

### Changed
### Removed
### Fixed
Expand Down
7 changes: 6 additions & 1 deletion src/FsCodec.Box/ByteArray.fs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ type ByteArray private () =
/// <summary>Adapt an IEventCodec that handles ReadOnlyMemory&lt;byte&gt; Event Bodies to instead use <c>byte[]</c><br/>
/// Ideally not used as it makes pooling problematic; only provided for interop/porting scaffolding wrt Equinox V3 and EventStore.Client etc</summary>
[<Extension>]
static member AsByteArray<'Event, 'Context>(native: IEventCodec<'Event, ReadOnlyMemory<byte>, 'Context>)
: IEventCodec<'Event, byte[], 'Context> =
FsCodec.Core.EventCodec.mapBodies ByteArray.ReadOnlyMemoryToBytes ByteArray.BytesToReadOnlyMemory native

[<Extension; Obsolete "Renamed to AsByteArray for consistency">]
static member ToByteArrayCodec<'Event, 'Context>(native: IEventCodec<'Event, ReadOnlyMemory<byte>, 'Context>)
: IEventCodec<'Event, byte[], 'Context> =
FsCodec.Core.EventCodec.Map(native, Func<_, _> ByteArray.ReadOnlyMemoryToBytes, Func<_, _> ByteArray.BytesToReadOnlyMemory)
FsCodec.Core.EventCodec.mapBodies ByteArray.ReadOnlyMemoryToBytes ByteArray.BytesToReadOnlyMemory native
95 changes: 18 additions & 77 deletions src/FsCodec.Box/Compression.fs
Original file line number Diff line number Diff line change
Expand Up @@ -4,105 +4,46 @@ open System
open System.Runtime.CompilerServices
open System.Runtime.InteropServices

/// Represents the body of an Event (or its Metadata), holding the encoded form of the buffer together with an enum value signifying the encoding scheme.
/// Enables the decoding side to transparently inflate the data on loading without burdening the application layer with tracking the encoding scheme used
type EncodedBody = (struct(int * ReadOnlyMemory<byte>))

module private EncodedMaybeCompressed =

module Encoding =
let [<Literal>] Direct = 0 // Assumed for all values not listed here
let [<Literal>] Deflate = 1 // Deprecated encoding produced by versions pre 3.0.0-rc.13; no longer produced
let [<Literal>] Brotli = 2 // Default encoding as of 3.0.0-rc.13

(* Decompression logic: triggered by extension methods below at the point where the Codec's Decode retrieves the Data or Meta properties *)

// In versions pre 3.0.0-rc.13, the compression was implemented as follows; NOTE: use of Flush vs Close saves space but is unconventional
// let private deflate (eventBody: ReadOnlyMemory<byte>): System.IO.MemoryStream =
// let output = new System.IO.MemoryStream()
// let compressor = new System.IO.Compression.DeflateStream(output, System.IO.Compression.CompressionLevel.Optimal, leaveOpen = true)
// compressor.Write(eventBody.Span)
// compressor.Flush() // NOTE: using Flush in lieu of close means the result is not padded, which can hinder interop
// output
let private inflate (data: ReadOnlyMemory<byte>): byte[] =
let s = new System.IO.MemoryStream(data.ToArray(), writable = false)
let decompressor = new System.IO.Compression.DeflateStream(s, System.IO.Compression.CompressionMode.Decompress, leaveOpen = true)
let output = new System.IO.MemoryStream()
decompressor.CopyTo output
output.ToArray()
let private brotliDecompress (data: ReadOnlyMemory<byte>): byte[] =
let s = new System.IO.MemoryStream(data.ToArray(), writable = false)
use decompressor = new System.IO.Compression.BrotliStream(s, System.IO.Compression.CompressionMode.Decompress)
use output = new System.IO.MemoryStream()
decompressor.CopyTo output
output.ToArray()
let decode struct (encoding, data): ReadOnlyMemory<byte> =
match encoding with
| Encoding.Deflate -> inflate data |> ReadOnlyMemory
| Encoding.Brotli -> brotliDecompress data |> ReadOnlyMemory
| Encoding.Direct | _ -> data

(* Conditional compression logic: triggered as storage layer pulls Data/Meta fields
Bodies under specified minimum size, or not meeting a required compression gain are stored directly, equivalent to if compression had not been wired in *)

let private brotliCompress (eventBody: ReadOnlyMemory<byte>): System.IO.MemoryStream =
let output = new System.IO.MemoryStream()
use compressor = new System.IO.Compression.BrotliStream(output, System.IO.Compression.CompressionLevel.Optimal, leaveOpen = true)
compressor.Write(eventBody.Span)
compressor.Close() // NOTE Close, not Flush; we want the output fully terminated to reduce surprises when decompressing
output
let encodeUncompressed (raw: ReadOnlyMemory<byte>): EncodedBody = Encoding.Direct, raw
let encode minSize minGain (raw: ReadOnlyMemory<byte>): EncodedBody =
if raw.Length < minSize then encodeUncompressed raw
else match brotliCompress raw with
| tmp when raw.Length > int tmp.Length + minGain -> Encoding.Brotli, tmp.ToArray() |> ReadOnlyMemory
| _ -> encodeUncompressed raw

type [<Struct>] CompressionOptions = { minSize: int; minGain: int } with
/// Attempt to compress anything possible
// TL;DR in general it's worth compressing everything to minimize RU consumption both on insert and update
// For DynamoStore, every time we need to calve from the tip, the RU impact of using TransactWriteItems is significant,
// so preventing or delaying that is of critical importance
// Empirically not much JSON below 48 bytes actually compresses - while we don't assume that, it is what is guiding the derivation of the default
static member Default = { minSize = 48; minGain = 4 }
/// Encode the data without attempting to compress, regardless of size
static member Uncompressed = { minSize = Int32.MaxValue; minGain = 0 }

[<Extension; AbstractClass; Sealed>]
[<Extension; AbstractClass; Sealed; Obsolete "Please use FsCodec.Encoding instead">]
type Compression private () =

static member Utf8ToEncodedDirect(x: ReadOnlyMemory<byte>): EncodedBody =
EncodedMaybeCompressed.encodeUncompressed x
static member Utf8ToEncodedTryCompress(options, x: ReadOnlyMemory<byte>): EncodedBody =
EncodedMaybeCompressed.encode options.minSize options.minGain x
static member EncodedToUtf8(x: EncodedBody): ReadOnlyMemory<byte> =
EncodedMaybeCompressed.decode x
static member EncodedToByteArray(x: EncodedBody): byte[] =
Compression.EncodedToUtf8(x).ToArray()
static member Utf8ToEncodedDirect(x: ReadOnlyMemory<byte>): Encoded =
FsCodec.Encoding.OfBlob x
static member Utf8ToEncodedTryCompress(options, x: ReadOnlyMemory<byte>): Encoded =
FsCodec.Encoding.OfBlobCompress({ minSize = options.minSize; minGain = options.minGain }, x)
static member EncodedToUtf8(x: Encoded): ReadOnlyMemory<byte> =
FsCodec.Encoding.ToBlob x
static member EncodedToByteArray(x: Encoded): byte[] =
FsCodec.Encoding.ToBlob(x).ToArray()

/// <summary>Adapts an <c>IEventCodec</c> rendering to <c>ReadOnlyMemory&lt;byte&gt;</c> Event Bodies to attempt to compress the data.<br/>
/// If sufficient compression, as defined by <c>options</c> is not achieved, the body is saved as-is.<br/>
/// The <c>int</c> conveys a value that must be round tripped alongside the body in order for the decoding process to correctly interpret it.</summary>
[<Extension>]
static member EncodeTryCompress<'Event, 'Context>(native: IEventCodec<'Event, ReadOnlyMemory<byte>, 'Context>, [<Optional; DefaultParameterValue null>] ?options)
: IEventCodec<'Event, EncodedBody, 'Context> =
: IEventCodec<'Event, Encoded, 'Context> =
let opts = defaultArg options CompressionOptions.Default
FsCodec.Core.EventCodec.Map(native, (fun x -> Compression.Utf8ToEncodedTryCompress(opts, x)), Func<_, _> Compression.EncodedToUtf8)
let opts: FsCodec.CompressionOptions = { minSize = opts.minSize; minGain = opts.minGain }
FsCodec.Core.EventCodec.mapBodies (fun d -> Encoding.OfBlobCompress(opts, d)) Encoding.ToBlob native

/// <summary>Adapts an <c>IEventCodec</c> rendering to <c>ReadOnlyMemory&lt;byte&gt;</c> Event Bodies to encode as per <c>EncodeTryCompress</c>, but without attempting compression.</summary>
[<Extension>]
static member EncodeUncompressed<'Event, 'Context>(native: IEventCodec<'Event, ReadOnlyMemory<byte>, 'Context>)
: IEventCodec<'Event, EncodedBody, 'Context> =
FsCodec.Core.EventCodec.Map(native, Func<_, _> Compression.Utf8ToEncodedDirect, Func<_, _> Compression.EncodedToUtf8)
: IEventCodec<'Event, Encoded, 'Context> =
Encoder.Uncompressed native

/// <summary>Adapts an <c>IEventCodec</c> rendering to <c>int * ReadOnlyMemory&lt;byte&gt;</c> Event Bodies to render and/or consume from Uncompressed <c>ReadOnlyMemory&lt;byte&gt;</c>.</summary>
[<Extension>]
static member ToUtf8Codec<'Event, 'Context>(native: IEventCodec<'Event, EncodedBody, 'Context>)
static member ToUtf8Codec<'Event, 'Context>(native: IEventCodec<'Event, Encoded, 'Context>)
: IEventCodec<'Event, ReadOnlyMemory<byte>, 'Context> =
FsCodec.Core.EventCodec.Map(native, Func<_, _> Compression.EncodedToUtf8, Func<_, _> Compression.Utf8ToEncodedDirect)
Encoder.AsBlob native

/// <summary>Adapts an <c>IEventCodec</c> rendering to <c>int * ReadOnlyMemory&lt;byte&gt;</c> Event Bodies to render and/or consume from Uncompressed <c>byte[]</c>.</summary>
[<Extension>]
static member ToByteArrayCodec<'Event, 'Context>(native: IEventCodec<'Event, EncodedBody, 'Context>)
static member ToByteArrayCodec<'Event, 'Context>(native: IEventCodec<'Event, Encoded, 'Context>)
: IEventCodec<'Event, byte[], 'Context> =
FsCodec.Core.EventCodec.Map(native, Func<_, _> Compression.EncodedToByteArray, Func<_, _> Compression.Utf8ToEncodedDirect)
Encoder.AsByteArray native
3 changes: 2 additions & 1 deletion src/FsCodec.Box/FsCodec.Box.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Include="../FsCodec/FsCodec.fsproj" />
<ProjectReference Condition=" '$(Configuration)' == 'Debug' " Include="../FsCodec/FsCodec.fsproj" />
<PackageReference Condition=" '$(Configuration)' == 'Release' " Include="FsCodec" Version="[3.1.0-rc.1, 4.0.0)" />
<!-- <PackageReference Condition=" '$(Configuration)' == 'Release' " Include="FsCodec" Version="[3.1.0-rc.2, 4.0.0)" />-->
</ItemGroup>

</Project>
Loading

0 comments on commit 309fb78

Please sign in to comment.