Skip to content

Commit

Permalink
H-3682: Introduce TypeScript harpc codec (#5740)
Browse files Browse the repository at this point in the history
  • Loading branch information
indietyp authored Nov 26, 2024
1 parent 69a8e46 commit f33fa03
Show file tree
Hide file tree
Showing 11 changed files with 496 additions and 6 deletions.
90 changes: 90 additions & 0 deletions libs/@local/harpc/client/typescript/src/codec/Decoder.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import type { ParseResult, Schema, Stream } from "effect";
import { Data, Function, Inspectable, Pipeable } from "effect";
import { GenericTag } from "effect/Context";

import { createProto } from "../utils.js";

const TypeId: unique symbol = Symbol("@local/harpc-client/codec/Decoder");
export type TypeId = typeof TypeId;

export class DecodingError extends Data.TaggedError("DecodingError")<{
cause: unknown;
}> {
get message() {
return "Failed to encode value";
}
}

export interface Decoder<E = DecodingError, R = never>
extends Inspectable.Inspectable,
Pipeable.Pipeable {
readonly [TypeId]: TypeId;

readonly decode: {
<SchemaType, SchemaEncoded, SchemaContext>(
schema: Schema.Schema<SchemaType, SchemaEncoded, SchemaContext>,
): <StreamError, StreamContext>(
input: Stream.Stream<ArrayBuffer, StreamError, StreamContext>,
) => Stream.Stream<
SchemaType,
E | StreamError | ParseResult.ParseError,
R | StreamContext | SchemaContext
>;

<SchemaType, SchemaEncoded, SchemaContext, StreamError, StreamContext>(
input: Stream.Stream<ArrayBuffer, StreamError, StreamContext>,
schema: Schema.Schema<SchemaType, SchemaEncoded, SchemaContext>,
): Stream.Stream<
SchemaType,
E | StreamError | ParseResult.ParseError,
R | StreamContext | SchemaContext
>;
};
}

interface DecoderImpl<E = DecodingError, R = never> extends Decoder<E, R> {}

const DecoderProto: Omit<DecoderImpl, "decode"> = {
[TypeId]: TypeId,

toString() {
return `Decoder`;
},

toJSON() {
return {
_id: "Decoder",
};
},

[Inspectable.NodeInspectSymbol]() {
return this.toJSON();
},

pipe() {
// eslint-disable-next-line prefer-rest-params
return Pipeable.pipeArguments(this, arguments);
},
};

export const Decoder = GenericTag<Decoder>("@local/harpc-client/codec/Decoder");

export const make = <E = DecodingError, R = never>(
decode: <
SchemaType,
SchemaEncoded,
SchemaContext,
StreamError,
StreamContext,
>(
input: Stream.Stream<ArrayBuffer, StreamError, StreamContext>,
schema: Schema.Schema<SchemaType, SchemaEncoded, SchemaContext>,
) => Stream.Stream<
SchemaType,
E | StreamError | ParseResult.ParseError,
R | StreamContext | SchemaContext
>,
) =>
createProto(DecoderProto, {
decode: Function.dual(2, decode),
}) satisfies DecoderImpl<E, R> as Decoder<E, R>;
90 changes: 90 additions & 0 deletions libs/@local/harpc/client/typescript/src/codec/Encoder.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import type { ParseResult, Schema, Stream } from "effect";
import { Data, Function, Inspectable, Pipeable } from "effect";
import { GenericTag } from "effect/Context";

import { createProto } from "../utils.js";

const TypeId: unique symbol = Symbol("@local/harpc-client/codec/Encoder");
export type TypeId = typeof TypeId;

export class EncodingError extends Data.TaggedError("EncodingError")<{
cause: unknown;
}> {
get message() {
return "Failed to encode value";
}
}

export interface Encoder<E = EncodingError, R = never>
extends Inspectable.Inspectable,
Pipeable.Pipeable {
readonly [TypeId]: TypeId;

readonly encode: {
<SchemaType, SchemaEncoded, SchemaContext>(
schema: Schema.Schema<SchemaType, SchemaEncoded, SchemaContext>,
): <StreamError, StreamContext>(
input: Stream.Stream<SchemaType, StreamError, StreamContext>,
) => Stream.Stream<
ArrayBuffer,
E | StreamError | ParseResult.ParseError,
R | StreamContext | SchemaContext
>;

<SchemaType, SchemaEncoded, SchemaContext, StreamError, StreamContext>(
input: Stream.Stream<SchemaType, StreamError, StreamContext>,
schema: Schema.Schema<SchemaType, SchemaEncoded, SchemaContext>,
): Stream.Stream<
ArrayBuffer,
E | StreamError | ParseResult.ParseError,
R | StreamContext | SchemaContext
>;
};
}

interface EncoderImpl<E = EncodingError, R = never> extends Encoder<E, R> {}

const EncoderProto: Omit<EncoderImpl, "encode"> = {
[TypeId]: TypeId,

toString() {
return `Encoder`;
},

toJSON() {
return {
_id: "Encoder",
};
},

[Inspectable.NodeInspectSymbol]() {
return this.toJSON();
},

pipe() {
// eslint-disable-next-line prefer-rest-params
return Pipeable.pipeArguments(this, arguments);
},
};

export const Encoder = GenericTag<Encoder>("@local/harpc-client/codec/Encoder");

export const make = <E = EncodingError, R = never>(
encode: <
SchemaType,
SchemaEncoded,
SchemaContext,
StreamError,
StreamContext,
>(
input: Stream.Stream<SchemaType, StreamError, StreamContext>,
schema: Schema.Schema<SchemaType, SchemaEncoded, SchemaContext>,
) => Stream.Stream<
ArrayBuffer,
E | StreamError | ParseResult.ParseError,
R | StreamContext | SchemaContext
>,
) =>
createProto(EncoderProto, {
encode: Function.dual(2, encode),
}) satisfies EncoderImpl<E, R> as Encoder<E, R>;
130 changes: 130 additions & 0 deletions libs/@local/harpc/client/typescript/src/codec/JsonDecoder.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import { Data, Effect, Layer, Option, pipe, Schema, Stream } from "effect";

import * as Decoder from "./Decoder.js";

// 1E is the ASCII record separator character, and is invalid in JSON.
const SEPARATOR = 0x1e;

export class InvalidUtf8Error extends Data.TaggedError("InvalidUtf8Error")<{
cause: unknown;
}> {
get message() {
return "Invalid UTF-8 encoding";
}
}

export class InvalidJsonError extends Data.TaggedError("InvalidJsonError")<{
cause: unknown;
}> {
get message() {
return "Invalid JSON encoding";
}
}

const textDecode = (
decoder: TextDecoder,
buffer: ArrayBuffer,
options: { readonly stream: boolean },
) =>
Effect.try({
try: () => {
return decoder.decode(buffer, options);
},
catch: (cause) => new InvalidUtf8Error({ cause }),
}).pipe(Effect.mapError((cause) => new Decoder.DecodingError({ cause })));

const parseJson = <T>(text: string) =>
Effect.try({
try: () => JSON.parse(text) as unknown as T,
catch: (cause) => new InvalidJsonError({ cause }),
}).pipe(Effect.mapError((cause) => new Decoder.DecodingError({ cause })));

const processArrayBuffer = <T, E, R>(
buffer: ArrayBuffer,
decoder: TextDecoder,
input: string,
decodeText: Option.Option<(text: string) => Effect.Effect<T, E, R>>,
) =>
Effect.gen(function* () {
const items: T[] = [];

let fragment = input;
let slice = buffer;

while (slice.byteLength > 0) {
const separatorPosition = pipe(
new Uint8Array(slice),
(array) => array.findIndex((byte) => byte === SEPARATOR),
Option.liftPredicate((position) => position >= 0),
);

if (Option.isNone(separatorPosition)) {
fragment += yield* textDecode(decoder, slice, { stream: true });

return [fragment, items] as const;
}

const left = slice.slice(0, separatorPosition.value);
slice = slice.slice(separatorPosition.value + 1);

fragment += yield* textDecode(decoder, left, { stream: false });

if (Option.isSome(decodeText)) {
items.push(yield* decodeText.value(fragment));
} else {
items.push(yield* parseJson<T>(fragment));
}

fragment = "";
}

return [fragment, items] as const;
});

interface Options {
schema: boolean;
}

const decoder = (options: Options) =>
Decoder.make((input, schema) => {
const useSchema = options.schema;

const textDecoder = new TextDecoder("utf-8", {
fatal: true,
ignoreBOM: true,
});

const schemaJson = Schema.parseJson(schema);
const decodeJson = Schema.decode(schemaJson);

let fragment = "";

return pipe(
input,
Stream.mapConcatEffect((buffer) =>
Effect.gen(function* () {
const [nextFragment, items] = yield* processArrayBuffer(
buffer,
textDecoder,
fragment,
useSchema ? Option.some(decodeJson) : Option.none(),
);

fragment = nextFragment;

return items;
}),
),
);
});

export const layer = Layer.succeed(Decoder.Decoder, decoder({ schema: true }));

/**
* Like `layer`, but won't invoke the schema decoder, therefore neither transforming or validating the input.
* `layerUnchecked` simply uses `JSON.parse` on the input.
*/
export const layerUnchecked = Layer.succeed(
Decoder.Decoder,
decoder({ schema: false }),
);
46 changes: 46 additions & 0 deletions libs/@local/harpc/client/typescript/src/codec/JsonEncoder.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { Effect, Layer, pipe, Schema, Stream } from "effect";

import * as Encoder from "./Encoder.js";

interface Options {
schema: boolean;
}

const encoder = (options: Options) =>
Encoder.make((input, schema) => {
const useSchema = options.schema;

const textEncoder = new TextEncoder();

const schemaJson = Schema.parseJson(schema);
const encodeJson = Schema.encode(schemaJson);

return pipe(
input,
Stream.mapEffect((item) =>
Effect.gen(function* () {
const json = useSchema
? yield* encodeJson(item)
: JSON.stringify(item);

const text = `${json}\x1e`;

const array = textEncoder.encode(text);

return array.buffer as ArrayBuffer;
}),
),
);
});

export const layer = Layer.succeed(Encoder.Encoder, encoder({ schema: true }));

/**
* Like `layer`, but won't invoke the schema encoder, and instead will just use `JSON.stringify`.
*
* This means that the resulting stream won't have any transformation applied to it.
*/
export const layerUnchecked = Layer.succeed(
Encoder.Encoder,
encoder({ schema: false }),
);
6 changes: 6 additions & 0 deletions libs/@local/harpc/client/typescript/src/codec/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
/* eslint-disable canonical/filename-no-index */

export * as Decoder from "./Decoder.js";
export * as Encoder from "./Encoder.js";
export * as JsonDecoder from "./JsonDecoder.js";
export * as JsonEncoder from "./JsonEncoder.js";
Loading

0 comments on commit f33fa03

Please sign in to comment.