diff --git a/package.json b/package.json index ad350ef..1b1ce5f 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "turbo-stream", - "version": "0.0.1", + "version": "0.0.2", "description": "A streaming data transport format that aims to support built-in features such as Promises, Dates, RegExps, Maps, Sets and more.", "type": "module", "files": [ diff --git a/src/constants.ts b/src/constants.ts deleted file mode 100644 index cc1135f..0000000 --- a/src/constants.ts +++ /dev/null @@ -1,15 +0,0 @@ -export const UNDEFINED = -1; -export const HOLE = -1; -export const NAN = -2; -export const POSITIVE_INFINITY = -3; -export const NEGATIVE_INFINITY = -4; -export const NEGATIVE_ZERO = -5; - -export const TYPE_BIGINT = "B"; -export const TYPE_DATE = "D"; -export const TYPE_MAP = "M"; -export const TYPE_SET = "S"; -export const TYPE_REGEXP = "R"; -export const TYPE_SYMBOL = "Y"; -export const TYPE_NULL_OBJECT = "N"; -export const TYPE_PROMISE = "P"; diff --git a/src/decode.spec.ts b/src/decode.spec.ts deleted file mode 100644 index f9d5845..0000000 --- a/src/decode.spec.ts +++ /dev/null @@ -1,128 +0,0 @@ -import { test } from "node:test"; -import { expect } from "expect"; - -import { unflatten, type ThisDecode } from "./decode.js"; -import { flatten, type ThisEncode } from "./encode.js"; - -function quickEncode(value: unknown) { - const encoder: ThisEncode = { - index: 0, - indicies: new Map(), - stringified: [], - deferred: [], - }; - - const id = flatten.call(encoder, value); - const encoded = - id < 0 ? String(id) : "[" + encoder.stringified.join(",") + "]"; - - return JSON.parse(encoded); -} - -function quickDecode(value: unknown) { - const decoder: ThisDecode = { - hydrated: [], - values: [], - deferred: {}, - }; - - return unflatten.call(decoder, value); -} - -test("should encode and decode undefined", () => { - const input = undefined; - const output = quickDecode(quickEncode(input)); - expect(output).toEqual(input); -}); - -test("should encode and decode null", () => { - const input = null; - const output = quickDecode(quickEncode(input)); - expect(output).toEqual(input); -}); - -test("should encode and decode boolean", () => { - const input = true; - const output = quickDecode(quickEncode(input)); - expect(output).toEqual(input); - - const input2 = false; - const output2 = quickDecode(quickEncode(input2)); - expect(output2).toEqual(input2); -}); - -test("should encode and decode number", () => { - const input = 42; - const output = quickDecode(quickEncode(input)); - expect(output).toEqual(input); -}); - -test("should encode and decode string", () => { - const input = "Hello World"; - const output = quickDecode(quickEncode(input)); - expect(output).toEqual(input); -}); - -test("should encode and decode array", () => { - const input = [1, 2, 3]; - const output = quickDecode(quickEncode(input)); - expect(output).toEqual(input); -}); - -test("should encode and decode array with holes", () => { - const input = [1, , 3]; - const output = quickDecode(quickEncode(input)); - expect(output).toEqual(input); -}); - -test("should encode and decode object", () => { - const input = { foo: "bar" }; - const output = quickDecode(quickEncode(input)); - expect(output).toEqual(input); -}); - -test("should encode and decode object with undefined", () => { - const input = { foo: undefined }; - const output = quickDecode(quickEncode(input)) as typeof input; - expect(output).toEqual(input); - expect("foo" in output).toBe(true); -}); - -test("should encode and decode a symbol", () => { - const input = Symbol.for("foo"); - const output = quickDecode(quickEncode(input)); - expect(output).toEqual(input); -}); - -test("should encode and decode a bigint", () => { - const input = BigInt(42); - const output = quickDecode(quickEncode(input)); - expect(output).toEqual(input); -}); - -test("should encode and decode a date", () => { - const input = new Date(42); - const output = quickDecode(quickEncode(input)); - expect(output).toEqual(input); -}); - -test("should encode and decode a regexp", () => { - const input = /foo/g; - const output = quickDecode(quickEncode(input)); - expect(output).toEqual(input); -}); - -test("should encode and decode a set", () => { - const input = new Set([1, 2, 3]); - const output = quickDecode(quickEncode(input)); - expect(output).toEqual(input); -}); - -test("should encode and decode a map", () => { - const input = new Map([ - [1, 2], - [3, 4], - ]); - const output = quickDecode(quickEncode(input)); - expect(output).toEqual(input); -}); diff --git a/src/deferred.ts b/src/deferred.ts deleted file mode 100644 index 749ce44..0000000 --- a/src/deferred.ts +++ /dev/null @@ -1,12 +0,0 @@ -export class Deferred { - promise: Promise; - resolve!: (value: R | PromiseLike) => void; - reject!: (reason?: unknown) => void; - - constructor() { - this.promise = new Promise((resolve, reject) => { - this.resolve = resolve; - this.reject = reject; - }); - } -} diff --git a/src/encode.spec.ts b/src/encode.spec.ts deleted file mode 100644 index 45965cd..0000000 --- a/src/encode.spec.ts +++ /dev/null @@ -1,193 +0,0 @@ -import { test } from "node:test"; -import { expect } from "expect"; - -import { - HOLE, - NAN, - NEGATIVE_INFINITY, - NEGATIVE_ZERO, - POSITIVE_INFINITY, - UNDEFINED, - TYPE_BIGINT, - TYPE_DATE, - TYPE_MAP, - TYPE_REGEXP, - TYPE_SET, - TYPE_SYMBOL, - TYPE_PROMISE, -} from "./constants.js"; -import { flatten, ThisEncode } from "./encode.js"; - -function quickEncode(value: unknown) { - const encoder: ThisEncode = { - index: 0, - indicies: new Map(), - stringified: [], - deferred: [], - }; - - return { - id: flatten.call(encoder, value), - encoder, - }; -} - -test("should flatten undfined", () => { - const { id, encoder } = quickEncode(undefined); - expect(id).toEqual(UNDEFINED); - expect(encoder.stringified).toEqual([]); -}); - -test("should flatten null", () => { - const { id, encoder } = quickEncode(null); - expect(id).toEqual(0); - expect(encoder.stringified).toEqual(["null"]); -}); - -test("should flatten boolean", () => { - const { id, encoder } = quickEncode(true); - expect(id).toEqual(0); - expect(encoder.stringified).toEqual(["true"]); - - const { id: id2, encoder: encoder2 } = quickEncode(false); - expect(id2).toEqual(0); - expect(encoder2.stringified).toEqual(["false"]); -}); - -test("should flatten number", () => { - const { id, encoder } = quickEncode(42); - expect(id).toEqual(0); - expect(encoder.stringified).toEqual(["42"]); -}); - -test("should flatten NaN", () => { - const { id, encoder } = quickEncode(NaN); - expect(id).toEqual(NAN); - expect(encoder.stringified).toEqual([]); -}); - -test("should flatten Infinity", () => { - const { id, encoder } = quickEncode(Infinity); - expect(id).toEqual(POSITIVE_INFINITY); - expect(encoder.stringified).toEqual([]); -}); - -test("should flatten -Infinity", () => { - const { id, encoder } = quickEncode(-Infinity); - expect(id).toEqual(NEGATIVE_INFINITY); - expect(encoder.stringified).toEqual([]); -}); - -test("should flatten -0", () => { - const { id, encoder } = quickEncode(-0); - expect(id).toEqual(NEGATIVE_ZERO); - expect(encoder.stringified).toEqual([]); -}); - -test("should flatten string", () => { - const { id, encoder } = quickEncode("Hello World"); - expect(id).toEqual(0); - expect(encoder.stringified).toEqual(['"Hello World"']); -}); - -test("should flatten bigint", () => { - const { id, encoder } = quickEncode(BigInt(42)); - expect(id).toEqual(0); - expect(encoder.stringified).toEqual([`["${TYPE_BIGINT}","42"]`]); -}); - -test("should flatten symbol", () => { - const { id, encoder } = quickEncode(Symbol.for("foo")); - expect(id).toEqual(0); - expect(encoder.stringified).toEqual([`["${TYPE_SYMBOL}","foo"]`]); -}); - -test("should flatten date", () => { - const { id, encoder } = quickEncode(new Date(42)); - expect(id).toEqual(0); - expect(encoder.stringified).toEqual([`["${TYPE_DATE}",42]`]); -}); - -test("should flatten regexp", () => { - const { id, encoder } = quickEncode(/foo/g); - expect(id).toEqual(0); - expect(encoder.stringified).toEqual([`["${TYPE_REGEXP}","foo","g"]`]); -}); - -test("should flatten array", () => { - const { id, encoder } = quickEncode(["a", "b", "c", undefined]); - expect(id).toEqual(0); - console.log(encoder.stringified); - expect(encoder.stringified).toEqual(["[1,2,3,-1]", '"a"', '"b"', '"c"']); -}); - -test("should flatten array with holes", () => { - const { id, encoder } = quickEncode(["a", , "c"]); - expect(id).toEqual(0); - expect(encoder.stringified).toEqual([`[1,${HOLE},2]`, '"a"', '"c"']); -}); - -test("should dedupe array", () => { - const { id, encoder } = quickEncode(["a", "b", "c", "a", "b", "c"]); - expect(id).toEqual(0); - expect(encoder.stringified).toEqual(["[1,2,3,1,2,3]", '"a"', '"b"', '"c"']); -}); - -test("should flatten self referencing array", () => { - const input: any = ["a", "b", "c"]; - input.push(input); - const { id, encoder } = quickEncode(input); - expect(id).toEqual(0); - expect(encoder.stringified).toEqual(["[1,2,3,0]", '"a"', '"b"', '"c"']); -}); - -test("should flatten object", () => { - const { id, encoder } = quickEncode({ foo: "bar" }); - expect(id).toEqual(0); - expect(encoder.stringified).toEqual(['{"foo":1}', '"bar"']); -}); - -test("should flatten object with undefined", () => { - const { id, encoder } = quickEncode({ foo: undefined }); - expect(id).toEqual(0); - expect(encoder.stringified).toEqual(['{"foo":-1}']); -}); - -test("should flatten self referencing object", () => { - const input: any = { foo: "bar" }; - input.a = input; - const { id, encoder } = quickEncode(input); - expect(id).toEqual(0); - expect(encoder.stringified).toEqual(['{"foo":1,"a":0}', '"bar"']); -}); - -test("should flatten set", () => { - const { id, encoder } = quickEncode(new Set(["a", "b"])); - expect(id).toEqual(0); - expect(encoder.stringified).toEqual([`["${TYPE_SET}",1,2]`, '"a"', '"b"']); -}); - -test("should flatten map", () => { - const { id, encoder } = quickEncode( - new Map([ - ["a", "b"], - ["b", "a"], - ]) - ); - expect(id).toEqual(0); - expect(encoder.stringified).toEqual([ - `["${TYPE_MAP}",1,2,2,1]`, - '"a"', - '"b"', - ]); -}); - -test("should flatten promise", async () => { - const { id, encoder } = quickEncode(Promise.resolve("foo")); - expect(id).toEqual(0); - expect(encoder.stringified).toEqual([`["${TYPE_PROMISE}",0]`]); - expect(encoder.deferred).toHaveLength(1); - expect(encoder.deferred[0][0]).toEqual(0); - expect(encoder.deferred[0][1]).toBeInstanceOf(Promise); - expect(await encoder.deferred[0][1]).toEqual("foo"); -}); diff --git a/src/encode.ts b/src/flatten.ts similarity index 93% rename from src/encode.ts rename to src/flatten.ts index 9fd8216..0c9e827 100644 --- a/src/encode.ts +++ b/src/flatten.ts @@ -4,22 +4,16 @@ import { NEGATIVE_INFINITY, NEGATIVE_ZERO, POSITIVE_INFINITY, - UNDEFINED, TYPE_BIGINT, TYPE_DATE, TYPE_MAP, + TYPE_PROMISE, TYPE_REGEXP, TYPE_SET, TYPE_SYMBOL, - TYPE_PROMISE, -} from "./constants.js"; - -export interface ThisEncode { - index: number; - indicies: Map; - stringified: string[]; - deferred: [number, Promise][]; -} + UNDEFINED, + type ThisEncode, +} from "./utils.js"; export function flatten(this: ThisEncode, input: unknown): number { if (this.indicies.has(input)) { @@ -110,12 +104,11 @@ function stringify(this: ThisEncode, input: unknown, index: number) { if (input instanceof Promise) { this.stringified[index] = `["${TYPE_PROMISE}",${index}]`; - this.deferred.push([index, input]); + this.deferred[index] = input; break; } if (!isPlainObject(input)) { - console.log(input); throw new Error("Cannot encode object with prototype"); } diff --git a/src/turbo-stream.spec.ts b/src/turbo-stream.spec.ts index ef325db..f79c087 100644 --- a/src/turbo-stream.spec.ts +++ b/src/turbo-stream.spec.ts @@ -71,7 +71,26 @@ test("should encode and decode object with undefined", async () => { test("should encode and decode promise", async () => { const input = Promise.resolve("foo"); const decoded = await decode(encode(input)); - expect(decoded.value).toBeInstanceOf(Promise); - expect(await decoded.value).toEqual(await input); + expect(decoded.value).toEqual(await input); + await decoded.done; +}); + +test("should encode and decode object with promises as values", async () => { + const input = { foo: Promise.resolve("bar") }; + const decoded = await decode(encode(input)); + const value = decoded.value as typeof input; + expect(value).toEqual({ foo: expect.any(Promise) }); + expect(await value.foo).toEqual(await input.foo); + await decoded.done; +}); + +test("should encode and decode set with promises as values", async () => { + const prom = Promise.resolve("foo"); + const input = new Set([prom, prom]); + const decoded = await decode(encode(input)); + const value = decoded.value as typeof input; + expect(value).toEqual(new Set([expect.any(Promise)])); + const proms = Array.from(value); + expect(await proms[0]).toEqual(await Array.from(input)[0]); await decoded.done; }); diff --git a/src/turbo-stream.ts b/src/turbo-stream.ts index e8f84d0..553ad11 100644 --- a/src/turbo-stream.ts +++ b/src/turbo-stream.ts @@ -1,146 +1,177 @@ -import { TYPE_PROMISE } from "./constants.js"; -import { unflatten, type ThisDecode } from "./decode.js"; -import { Deferred } from "./deferred.js"; -import { flatten, type ThisEncode } from "./encode.js"; +import { flatten } from "./flatten.js"; +import { unflatten } from "./unflatten.js"; +import { + createLineSplittingTransform, + Deferred, + TYPE_PROMISE, + type ThisDecode, + type ThisEncode, +} from "./utils.js"; -export async function decode( - input: ReadableStream -): Promise<{ value: T; done: Promise }> { - const decoder = new Decoder(input); - return decoder.decode() as Promise<{ value: T; done: Promise }>; -} - -class Decoder { - private reader: ReadableStreamDefaultReader; - private decoder: ThisDecode; - constructor(input: ReadableStream) { - this.reader = input.getReader(); - this.decoder = { - deferred: {}, - hydrated: [], - values: [], - }; - } - async decode(): Promise<{ value: unknown; done: Promise }> { - const iterator = makeTextFileLineIterator(this.reader); +export async function decode(readable: ReadableStream) { + const done = new Deferred(); + const reader = readable + .pipeThrough(createLineSplittingTransform()) + .getReader(); - const read = await iterator.next(); - if (!read.value || read.done) throw new Error("Invalid input"); - const decoded = unflatten.call(this.decoder, JSON.parse(read.value)); + const decoder: ThisDecode = { + values: [], + hydrated: [], + deferred: {}, + }; - const done = (async () => { - for await (const line of iterator) { - let type = line[0]; + const decoded = await decodeInitial.call(decoder, reader); - switch (type) { - case TYPE_PROMISE: - const colonIndex = line.indexOf(":"); - const deferredId = Number(line.slice(1, colonIndex)); - const lineData = line.slice(colonIndex + 1); - const deferredResult = unflatten.call( - this.decoder, - JSON.parse(lineData) - ); - this.decoder.deferred[deferredId].resolve(deferredResult); - break; - default: - throw new Error("Invalid input"); + let donePromise = done.promise; + if (decoded.done) { + done.resolve(); + } else { + donePromise = decodeDeferred + .call(decoder, reader) + .then(done.resolve) + .catch((reason) => { + for (const deferred of Object.values(decoder.deferred)) { + deferred.reject(reason); } - } - })(); - return { value: decoded, done }; + done.reject(reason); + }); } + + return { + done: donePromise.then(() => reader.closed), + value: decoded.value, + }; } -export function encode(input: unknown): ReadableStream { - return new ReadableStream({ - async start(controller) { - const textEncoder = new TextEncoder(); - const encoder: ThisEncode = { - index: 0, - indicies: new Map(), - stringified: [], - deferred: [], - }; +class SyntaxError extends Error { + name = "SyntaxError"; + constructor(message?: string) { + super(message ?? `Invalid input`); + } +} - const id = flatten.call(encoder, input); - const encoded = - id < 0 ? String(id) : "[" + encoder.stringified.join(",") + "]"; - controller.enqueue(textEncoder.encode(encoded + "\n")); +async function decodeInitial( + this: ThisDecode, + reader: ReadableStreamDefaultReader +) { + const read = await reader.read(); + if (!read.value) { + throw new SyntaxError(); + } + + let line; + try { + line = JSON.parse(read.value); + } catch (reason) { + throw new SyntaxError(); + } - let activeDeferred = 0; - const done = new Deferred(); - let alreadyDone = false; + return { + done: read.done, + value: unflatten.call(this, line), + }; +} - if (encoder.deferred.length === 0) { - alreadyDone = true; - done.resolve(); +async function decodeDeferred( + this: ThisDecode, + reader: ReadableStreamDefaultReader +) { + let read = await reader.read(); + while (!read.done) { + if (!read.value) continue; + const line = read.value; + switch (line[0]) { + case TYPE_PROMISE: + const colonIndex = line.indexOf(":"); + const deferredId = Number(line.slice(1, colonIndex)); + const deferred = this.deferred[deferredId]; + if (!deferred) { + throw new Error(`Deferred ID ${deferredId} not found in stream`); + } + const lineData = line.slice(colonIndex + 1); + let jsonLine; + try { + jsonLine = JSON.parse(lineData); + } catch (reason) { + throw new SyntaxError(); + } + const value = unflatten.call(this, jsonLine); + deferred.resolve(value); + break; + // case TYPE_PROMISE_ERROR: + // // TODO: transport promise rejections + // break; + default: + throw new SyntaxError(); + } + read = await reader.read(); + } +} + +export function encode(input: unknown) { + const encoder: ThisEncode = { + deferred: {}, + index: 0, + indicies: new Map(), + stringified: [], + }; + const textEncoder = new TextEncoder(); + let lastSentIndex = 0; + const readable = new ReadableStream({ + async start(controller) { + const id = flatten.call(encoder, await input); + if (id < 0) { + controller.enqueue(textEncoder.encode(`${id}\n`)); } else { - for (const [promiseId, promise] of encoder.deferred) { - activeDeferred++; - promise - .then((value) => { - const id = flatten.call(encoder, value); - const encoded = - id < 0 - ? String(id) - : "[" + encoder.stringified.slice(id).join(",") + "]"; - controller.enqueue( - textEncoder.encode( - `${TYPE_PROMISE}${promiseId}:` + encoded + "\n" - ) - ); + controller.enqueue( + textEncoder.encode(`[${encoder.stringified.join(",")}]\n`) + ); + lastSentIndex = encoder.stringified.length - 1; + } - activeDeferred--; - if (activeDeferred === 0) { - alreadyDone = true; - done.resolve(); - } - }) - .catch((reason) => { - if (alreadyDone) return; - alreadyDone = true; - done.reject(reason); - }); + const seenPromises = new WeakSet>(); + while (Object.keys(encoder.deferred).length > 0) { + for (const [deferredId, deferred] of Object.entries(encoder.deferred)) { + if (seenPromises.has(deferred)) continue; + seenPromises.add( + (encoder.deferred[Number(deferredId)] = deferred + .then( + (resolved) => { + const id = flatten.call(encoder, resolved); + if (id < 0) { + controller.enqueue( + textEncoder.encode(`${TYPE_PROMISE}${deferredId}:${id}\n`) + ); + } else { + const values = encoder.stringified + .slice(lastSentIndex + 1) + .join(","); + controller.enqueue( + textEncoder.encode( + `${TYPE_PROMISE}${deferredId}:[${values}]\n` + ) + ); + lastSentIndex = encoder.stringified.length - 1; + } + }, + (reason) => { + // TODO: Encode and send errors + throw reason; + } + ) + .finally(() => { + delete encoder.deferred[Number(deferredId)]; + })) + ); } + await Promise.race(Object.values(encoder.deferred)); } + await Promise.all(Object.values(encoder.deferred)); - await done.promise; controller.close(); }, }); -} -async function* makeTextFileLineIterator( - reader: ReadableStreamDefaultReader -): AsyncGenerator { - const decoder = new TextDecoder(); - let read = await reader.read(); - let chunk = read.value ? decoder.decode(read.value, { stream: true }) : ""; - - let re = /\r\n|\n|\r/gm; - let startIndex = 0; - - for (;;) { - let result = re.exec(chunk); - if (!result) { - if (read.done) { - break; - } - let remainder = chunk.slice(startIndex); - read = await reader.read(); - chunk = - remainder + - (read.value ? decoder.decode(read.value, { stream: true }) : ""); - startIndex = re.lastIndex = 0; - continue; - } - yield chunk.substring(startIndex, result.index); - startIndex = re.lastIndex; - } - if (startIndex < chunk.length) { - // last line didn't end in a newline char - yield chunk.slice(startIndex); - } + return readable; } diff --git a/src/decode.ts b/src/unflatten.ts similarity index 89% rename from src/decode.ts rename to src/unflatten.ts index 9fb603a..6240059 100644 --- a/src/decode.ts +++ b/src/unflatten.ts @@ -1,26 +1,20 @@ import { + Deferred, HOLE, NAN, NEGATIVE_INFINITY, NEGATIVE_ZERO, POSITIVE_INFINITY, - UNDEFINED, TYPE_BIGINT, TYPE_DATE, TYPE_MAP, + TYPE_PROMISE, TYPE_REGEXP, TYPE_SET, TYPE_SYMBOL, - TYPE_PROMISE, -} from "./constants.js"; - -import { Deferred } from "./deferred.js"; - -export interface ThisDecode { - values: unknown[]; - hydrated: unknown[]; - deferred: Record>; -} + UNDEFINED, + type ThisDecode, +} from "./utils.js"; export function unflatten(this: ThisDecode, parsed: unknown): unknown { if (typeof parsed === "number") return hydrate.call(this, parsed, true); @@ -83,9 +77,13 @@ function hydrate(this: ThisDecode, index: number, standalone?: true) { } break; case TYPE_PROMISE: - const deferred = new Deferred(); - this.deferred[value[1]] = deferred; - this.hydrated[index] = deferred.promise; + if (this.hydrated[value[1]]) { + this.hydrated[index] = this.hydrated[value[1]]; + } else { + const deferred = new Deferred(); + this.deferred[value[1]] = deferred; + this.hydrated[index] = deferred.promise; + } break; default: throw new Error(`Invalid input`); diff --git a/src/utils.ts b/src/utils.ts new file mode 100644 index 0000000..73e5d35 --- /dev/null +++ b/src/utils.ts @@ -0,0 +1,67 @@ +export const UNDEFINED = -1; +export const HOLE = -1; +export const NAN = -2; +export const POSITIVE_INFINITY = -3; +export const NEGATIVE_INFINITY = -4; +export const NEGATIVE_ZERO = -5; + +export const TYPE_BIGINT = "B"; +export const TYPE_DATE = "D"; +export const TYPE_MAP = "M"; +export const TYPE_SET = "S"; +export const TYPE_REGEXP = "R"; +export const TYPE_SYMBOL = "Y"; +export const TYPE_NULL_OBJECT = "N"; +export const TYPE_PROMISE = "P"; + +export interface ThisDecode { + values: unknown[]; + hydrated: unknown[]; + deferred: Record>; +} + +export interface ThisEncode { + index: number; + indicies: Map; + stringified: string[]; + deferred: Record>; +} + +export class Deferred { + promise: Promise; + resolve!: (value: T) => void; + reject!: (reason: unknown) => void; + + constructor() { + this.promise = new Promise((resolve, reject) => { + this.resolve = resolve; + this.reject = reject; + }); + } +} + +export function createLineSplittingTransform() { + let decoder = new TextDecoder(); + let leftover = ""; + + return new TransformStream({ + transform(chunk, controller) { + let str = decoder.decode(chunk, { stream: true }); + let parts = (leftover + str).split("\n"); + + // The last part might be a partial line, so keep it for the next chunk. + leftover = parts.pop() || ""; + + for (const part of parts) { + controller.enqueue(part); + } + }, + + flush(controller) { + // If there's any leftover data, enqueue it before closing. + if (leftover) { + controller.enqueue(leftover); + } + }, + }); +}