diff --git a/libs/@local/harpc/client/typescript/src/codec/Decoder.ts b/libs/@local/harpc/client/typescript/src/codec/Decoder.ts new file mode 100644 index 00000000000..2925ba6f0e6 --- /dev/null +++ b/libs/@local/harpc/client/typescript/src/codec/Decoder.ts @@ -0,0 +1,90 @@ +import type { ParseResult, Schema, Stream } from "effect"; +import { Data, Function, Inspectable, Pipeable } from "effect"; +import { GenericTag } from "effect/Context"; + +import { createProto } from "../utils.js"; + +const TypeId: unique symbol = Symbol("@local/harpc-client/codec/Decoder"); +export type TypeId = typeof TypeId; + +export class DecodingError extends Data.TaggedError("DecodingError")<{ + cause: unknown; +}> { + get message() { + return "Failed to encode value"; + } +} + +export interface Decoder + extends Inspectable.Inspectable, + Pipeable.Pipeable { + readonly [TypeId]: TypeId; + + readonly decode: { + ( + schema: Schema.Schema, + ): ( + input: Stream.Stream, + ) => Stream.Stream< + SchemaType, + E | StreamError | ParseResult.ParseError, + R | StreamContext | SchemaContext + >; + + ( + input: Stream.Stream, + schema: Schema.Schema, + ): Stream.Stream< + SchemaType, + E | StreamError | ParseResult.ParseError, + R | StreamContext | SchemaContext + >; + }; +} + +interface DecoderImpl extends Decoder {} + +const DecoderProto: Omit = { + [TypeId]: TypeId, + + toString() { + return `Decoder`; + }, + + toJSON() { + return { + _id: "Decoder", + }; + }, + + [Inspectable.NodeInspectSymbol]() { + return this.toJSON(); + }, + + pipe() { + // eslint-disable-next-line prefer-rest-params + return Pipeable.pipeArguments(this, arguments); + }, +}; + +export const Decoder = GenericTag("@local/harpc-client/codec/Decoder"); + +export const make = ( + decode: < + SchemaType, + SchemaEncoded, + SchemaContext, + StreamError, + StreamContext, + >( + input: Stream.Stream, + schema: Schema.Schema, + ) => Stream.Stream< + SchemaType, + E | StreamError | ParseResult.ParseError, + R | StreamContext | SchemaContext + >, +) => + createProto(DecoderProto, { + decode: Function.dual(2, decode), + }) satisfies DecoderImpl as Decoder; diff --git a/libs/@local/harpc/client/typescript/src/codec/Encoder.ts b/libs/@local/harpc/client/typescript/src/codec/Encoder.ts new file mode 100644 index 00000000000..b1df79f3745 --- /dev/null +++ b/libs/@local/harpc/client/typescript/src/codec/Encoder.ts @@ -0,0 +1,90 @@ +import type { ParseResult, Schema, Stream } from "effect"; +import { Data, Function, Inspectable, Pipeable } from "effect"; +import { GenericTag } from "effect/Context"; + +import { createProto } from "../utils.js"; + +const TypeId: unique symbol = Symbol("@local/harpc-client/codec/Encoder"); +export type TypeId = typeof TypeId; + +export class EncodingError extends Data.TaggedError("EncodingError")<{ + cause: unknown; +}> { + get message() { + return "Failed to encode value"; + } +} + +export interface Encoder + extends Inspectable.Inspectable, + Pipeable.Pipeable { + readonly [TypeId]: TypeId; + + readonly encode: { + ( + schema: Schema.Schema, + ): ( + input: Stream.Stream, + ) => Stream.Stream< + ArrayBuffer, + E | StreamError | ParseResult.ParseError, + R | StreamContext | SchemaContext + >; + + ( + input: Stream.Stream, + schema: Schema.Schema, + ): Stream.Stream< + ArrayBuffer, + E | StreamError | ParseResult.ParseError, + R | StreamContext | SchemaContext + >; + }; +} + +interface EncoderImpl extends Encoder {} + +const EncoderProto: Omit = { + [TypeId]: TypeId, + + toString() { + return `Encoder`; + }, + + toJSON() { + return { + _id: "Encoder", + }; + }, + + [Inspectable.NodeInspectSymbol]() { + return this.toJSON(); + }, + + pipe() { + // eslint-disable-next-line prefer-rest-params + return Pipeable.pipeArguments(this, arguments); + }, +}; + +export const Encoder = GenericTag("@local/harpc-client/codec/Encoder"); + +export const make = ( + encode: < + SchemaType, + SchemaEncoded, + SchemaContext, + StreamError, + StreamContext, + >( + input: Stream.Stream, + schema: Schema.Schema, + ) => Stream.Stream< + ArrayBuffer, + E | StreamError | ParseResult.ParseError, + R | StreamContext | SchemaContext + >, +) => + createProto(EncoderProto, { + encode: Function.dual(2, encode), + }) satisfies EncoderImpl as Encoder; diff --git a/libs/@local/harpc/client/typescript/src/codec/JsonDecoder.ts b/libs/@local/harpc/client/typescript/src/codec/JsonDecoder.ts new file mode 100644 index 00000000000..9ecc559d3a3 --- /dev/null +++ b/libs/@local/harpc/client/typescript/src/codec/JsonDecoder.ts @@ -0,0 +1,130 @@ +import { Data, Effect, Layer, Option, pipe, Schema, Stream } from "effect"; + +import * as Decoder from "./Decoder.js"; + +// 1E is the ASCII record separator character, and is invalid in JSON. +const SEPARATOR = 0x1e; + +export class InvalidUtf8Error extends Data.TaggedError("InvalidUtf8Error")<{ + cause: unknown; +}> { + get message() { + return "Invalid UTF-8 encoding"; + } +} + +export class InvalidJsonError extends Data.TaggedError("InvalidJsonError")<{ + cause: unknown; +}> { + get message() { + return "Invalid JSON encoding"; + } +} + +const textDecode = ( + decoder: TextDecoder, + buffer: ArrayBuffer, + options: { readonly stream: boolean }, +) => + Effect.try({ + try: () => { + return decoder.decode(buffer, options); + }, + catch: (cause) => new InvalidUtf8Error({ cause }), + }).pipe(Effect.mapError((cause) => new Decoder.DecodingError({ cause }))); + +const parseJson = (text: string) => + Effect.try({ + try: () => JSON.parse(text) as unknown as T, + catch: (cause) => new InvalidJsonError({ cause }), + }).pipe(Effect.mapError((cause) => new Decoder.DecodingError({ cause }))); + +const processArrayBuffer = ( + buffer: ArrayBuffer, + decoder: TextDecoder, + input: string, + decodeText: Option.Option<(text: string) => Effect.Effect>, +) => + Effect.gen(function* () { + const items: T[] = []; + + let fragment = input; + let slice = buffer; + + while (slice.byteLength > 0) { + const separatorPosition = pipe( + new Uint8Array(slice), + (array) => array.findIndex((byte) => byte === SEPARATOR), + Option.liftPredicate((position) => position >= 0), + ); + + if (Option.isNone(separatorPosition)) { + fragment += yield* textDecode(decoder, slice, { stream: true }); + + return [fragment, items] as const; + } + + const left = slice.slice(0, separatorPosition.value); + slice = slice.slice(separatorPosition.value + 1); + + fragment += yield* textDecode(decoder, left, { stream: false }); + + if (Option.isSome(decodeText)) { + items.push(yield* decodeText.value(fragment)); + } else { + items.push(yield* parseJson(fragment)); + } + + fragment = ""; + } + + return [fragment, items] as const; + }); + +interface Options { + schema: boolean; +} + +const decoder = (options: Options) => + Decoder.make((input, schema) => { + const useSchema = options.schema; + + const textDecoder = new TextDecoder("utf-8", { + fatal: true, + ignoreBOM: true, + }); + + const schemaJson = Schema.parseJson(schema); + const decodeJson = Schema.decode(schemaJson); + + let fragment = ""; + + return pipe( + input, + Stream.mapConcatEffect((buffer) => + Effect.gen(function* () { + const [nextFragment, items] = yield* processArrayBuffer( + buffer, + textDecoder, + fragment, + useSchema ? Option.some(decodeJson) : Option.none(), + ); + + fragment = nextFragment; + + return items; + }), + ), + ); + }); + +export const layer = Layer.succeed(Decoder.Decoder, decoder({ schema: true })); + +/** + * Like `layer`, but won't invoke the schema decoder, therefore neither transforming or validating the input. + * `layerUnchecked` simply uses `JSON.parse` on the input. + */ +export const layerUnchecked = Layer.succeed( + Decoder.Decoder, + decoder({ schema: false }), +); diff --git a/libs/@local/harpc/client/typescript/src/codec/JsonEncoder.ts b/libs/@local/harpc/client/typescript/src/codec/JsonEncoder.ts new file mode 100644 index 00000000000..66b5bf717d7 --- /dev/null +++ b/libs/@local/harpc/client/typescript/src/codec/JsonEncoder.ts @@ -0,0 +1,46 @@ +import { Effect, Layer, pipe, Schema, Stream } from "effect"; + +import * as Encoder from "./Encoder.js"; + +interface Options { + schema: boolean; +} + +const encoder = (options: Options) => + Encoder.make((input, schema) => { + const useSchema = options.schema; + + const textEncoder = new TextEncoder(); + + const schemaJson = Schema.parseJson(schema); + const encodeJson = Schema.encode(schemaJson); + + return pipe( + input, + Stream.mapEffect((item) => + Effect.gen(function* () { + const json = useSchema + ? yield* encodeJson(item) + : JSON.stringify(item); + + const text = `${json}\x1e`; + + const array = textEncoder.encode(text); + + return array.buffer as ArrayBuffer; + }), + ), + ); + }); + +export const layer = Layer.succeed(Encoder.Encoder, encoder({ schema: true })); + +/** + * Like `layer`, but won't invoke the schema encoder, and instead will just use `JSON.stringify`. + * + * This means that the resulting stream won't have any transformation applied to it. + */ +export const layerUnchecked = Layer.succeed( + Encoder.Encoder, + encoder({ schema: false }), +); diff --git a/libs/@local/harpc/client/typescript/src/codec/index.ts b/libs/@local/harpc/client/typescript/src/codec/index.ts new file mode 100644 index 00000000000..d4241f33c29 --- /dev/null +++ b/libs/@local/harpc/client/typescript/src/codec/index.ts @@ -0,0 +1,6 @@ +/* eslint-disable canonical/filename-no-index */ + +export * as Decoder from "./Decoder.js"; +export * as Encoder from "./Encoder.js"; +export * as JsonDecoder from "./JsonDecoder.js"; +export * as JsonEncoder from "./JsonEncoder.js"; diff --git a/libs/@local/harpc/client/typescript/tests/codec/JsonDecoder.test.ts b/libs/@local/harpc/client/typescript/tests/codec/JsonDecoder.test.ts new file mode 100644 index 00000000000..b3098dbd03e --- /dev/null +++ b/libs/@local/harpc/client/typescript/tests/codec/JsonDecoder.test.ts @@ -0,0 +1,82 @@ +// mirror of the Rust test suite + +import { describe, it } from "@effect/vitest"; +import { Chunk, Effect, pipe, Schema, Stream } from "effect"; + +import { Decoder, JsonDecoder } from "../../src/codec/index.js"; + +const decode = (text: readonly string[]) => + Effect.gen(function* () { + const decoder = yield* Decoder.Decoder; + const textEncoder = new TextEncoder(); + + const schema = Schema.Record({ key: Schema.String, value: Schema.String }); + + return yield* pipe( + Stream.fromIterable(text), + Stream.map((input) => textEncoder.encode(input).buffer), + decoder.decode(schema), + Stream.runCollect, + Effect.map(Chunk.toReadonlyArray), + ); + }); + +describe.concurrent("JsonDecoder", () => { + it.effect("single record in single chunk", (cx) => + Effect.gen(function* () { + const textPayload = '{"key": "value"}\x1E'; + + const items = yield* decode([textPayload]); + cx.expect(items).toMatchObject([{ key: "value" }]); + }).pipe(Effect.provide(JsonDecoder.layer)), + ); + + it.effect("multiple records in single chunk", (cx) => + Effect.gen(function* () { + const textPayload = '{"key": "value1"}\x1E{"key": "value2"}\x1E'; + + const items = yield* decode([textPayload]); + cx.expect(items).toMatchObject([{ key: "value1" }, { key: "value2" }]); + }).pipe(Effect.provide(JsonDecoder.layer)), + ); + + it.effect("ends with partial record", (cx) => + Effect.gen(function* () { + const textPayload = '{"key": "value1"}\x1E{"key": "value2'; + + const items = yield* decode([textPayload]); + cx.expect(items).toMatchObject([{ key: "value1" }]); + }).pipe(Effect.provide(JsonDecoder.layer)), + ); + + it.effect("partial record completed in next chunk", (cx) => + Effect.gen(function* () { + const textPayload = ['{"key": "val', 'ue1"}\x1E']; + + const items = yield* decode(textPayload); + cx.expect(items).toMatchObject([{ key: "value1" }]); + }).pipe(Effect.provide(JsonDecoder.layer)), + ); + + it.effect( + "partial record completed in next chunk with another record in the same chunk", + (cx) => + Effect.gen(function* () { + const textPayload = ['{"key": "val', 'ue1"}\x1E{"key": "value2"}\x1E']; + + const items = yield* decode(textPayload); + cx.expect(items).toMatchObject([{ key: "value1" }, { key: "value2" }]); + }).pipe(Effect.provide(JsonDecoder.layer)), + ); + + it.effect("invalid json", (cx) => + Effect.gen(function* () { + const textPayload = '{"key": "valu\x1E'; + + const error = yield* pipe(decode([textPayload]), Effect.flip); + cx.expect(error.toString()).toMatch( + /Unterminated string in JSON at position 13/, + ); + }).pipe(Effect.provide(JsonDecoder.layer)), + ); +}); diff --git a/libs/@local/harpc/client/typescript/tests/codec/JsonEncoder.test.ts b/libs/@local/harpc/client/typescript/tests/codec/JsonEncoder.test.ts new file mode 100644 index 00000000000..d86c8eddddd --- /dev/null +++ b/libs/@local/harpc/client/typescript/tests/codec/JsonEncoder.test.ts @@ -0,0 +1,44 @@ +import { describe, it } from "@effect/vitest"; +import { Chunk, Effect, pipe, Schema, Stream } from "effect"; +import type { ReadonlyRecord } from "effect/Record"; + +import { Encoder, JsonEncoder } from "../../src/codec/index.js"; + +const encode = (items: readonly ReadonlyRecord[]) => + Effect.gen(function* () { + const encoder = yield* Encoder.Encoder; + const textDecoder = new TextDecoder(); + + const schema = Schema.Record({ key: Schema.String, value: Schema.String }); + + return yield* pipe( + Stream.fromIterable(items), + encoder.encode(schema), + Stream.map((buffer) => textDecoder.decode(buffer)), + Stream.runCollect, + Effect.map(Chunk.toReadonlyArray), + ); + }); + +describe.concurrent("JsonEncoder", () => { + it.effect("single record", (cx) => + Effect.gen(function* () { + const payload = [{ key: "value" }]; + + const items = yield* encode(payload); + cx.expect(items).toMatchObject(['{"key":"value"}\x1E']); + }).pipe(Effect.provide(JsonEncoder.layer)), + ); + + it.effect("multiple records", (cx) => + Effect.gen(function* () { + const payload = [{ key: "value1" }, { key: "value2" }]; + + const items = yield* encode(payload); + cx.expect(items).toMatchObject([ + '{"key":"value1"}\x1E', + '{"key":"value2"}\x1E', + ]); + }).pipe(Effect.provide(JsonEncoder.layer)), + ); +}); diff --git a/libs/@local/harpc/client/typescript/tests/decode.test.ts b/libs/@local/harpc/client/typescript/tests/wire-protocol/decode.test.ts similarity index 96% rename from libs/@local/harpc/client/typescript/tests/decode.test.ts rename to libs/@local/harpc/client/typescript/tests/wire-protocol/decode.test.ts index 01cc94873c9..ff1662dff1d 100644 --- a/libs/@local/harpc/client/typescript/tests/decode.test.ts +++ b/libs/@local/harpc/client/typescript/tests/wire-protocol/decode.test.ts @@ -2,8 +2,8 @@ import { NodeContext } from "@effect/platform-node"; import { describe, it } from "@effect/vitest"; import { Effect, Equal, Schema } from "effect"; -import { ResponseKind } from "../src/types/index.js"; -import { Buffer } from "../src/wire-protocol/index.js"; +import { ResponseKind } from "../../src/types/index.js"; +import { Buffer } from "../../src/wire-protocol/index.js"; import { Response, ResponseBegin, @@ -11,7 +11,7 @@ import { ResponseFlags, ResponseFrame, ResponseHeader, -} from "../src/wire-protocol/models/response/index.js"; +} from "../../src/wire-protocol/models/response/index.js"; import { callDecode } from "./utils.js"; const ResponseHeaderFromSelf = Schema.declare(ResponseHeader.isResponseHeader, { diff --git a/libs/@local/harpc/client/typescript/tests/encode.test.ts b/libs/@local/harpc/client/typescript/tests/wire-protocol/encode.test.ts similarity index 97% rename from libs/@local/harpc/client/typescript/tests/encode.test.ts rename to libs/@local/harpc/client/typescript/tests/wire-protocol/encode.test.ts index ee9d1ac8c77..7bb02aed0fb 100644 --- a/libs/@local/harpc/client/typescript/tests/encode.test.ts +++ b/libs/@local/harpc/client/typescript/tests/wire-protocol/encode.test.ts @@ -3,7 +3,7 @@ import type { TestContext } from "@effect/vitest"; import { describe, it } from "@effect/vitest"; import { Effect, Option, Predicate, Schema } from "effect"; -import { Buffer } from "../src/wire-protocol/index.js"; +import { Buffer } from "../../src/wire-protocol/index.js"; import { Request, RequestBegin, @@ -11,7 +11,7 @@ import { RequestFlags, RequestFrame, RequestHeader, -} from "../src/wire-protocol/models/request/index.js"; +} from "../../src/wire-protocol/models/request/index.js"; import { callEncode } from "./utils.js"; const RequestHeaderFromSelf = Schema.declare(RequestHeader.isRequestHeader, { diff --git a/libs/@local/harpc/client/typescript/tests/utils.ts b/libs/@local/harpc/client/typescript/tests/wire-protocol/utils.ts similarity index 99% rename from libs/@local/harpc/client/typescript/tests/utils.ts rename to libs/@local/harpc/client/typescript/tests/wire-protocol/utils.ts index 8bed60fb1f7..9f7e60cb319 100644 --- a/libs/@local/harpc/client/typescript/tests/utils.ts +++ b/libs/@local/harpc/client/typescript/tests/wire-protocol/utils.ts @@ -20,6 +20,7 @@ const executablePath = () => directory, "..", "..", + "..", "wire-protocol/dist/release/codec", ); }); diff --git a/libs/@local/harpc/client/typescript/tsconfig.json b/libs/@local/harpc/client/typescript/tsconfig.json index 1151ea95661..ab21763bae7 100644 --- a/libs/@local/harpc/client/typescript/tsconfig.json +++ b/libs/@local/harpc/client/typescript/tsconfig.json @@ -3,7 +3,8 @@ "compilerOptions": { "lib": ["dom", "dom.iterable", "ES2023", "ESNext"], "module": "NodeNext", - "moduleResolution": "NodeNext" + "moduleResolution": "NodeNext", + "types": ["vitest/importMeta"] }, "include": ["src", "tests"] }