From fbde0b58c522412d0a14968f1830d871c5a8a514 Mon Sep 17 00:00:00 2001 From: Paul Paterson Date: Fri, 8 Mar 2024 15:35:13 -0500 Subject: [PATCH 1/4] Initial implementation for streams with http2 module --- src/http-client/node-http2-client.ts | 114 ++++++++++++++++++++++++++- 1 file changed, 112 insertions(+), 2 deletions(-) diff --git a/src/http-client/node-http2-client.ts b/src/http-client/node-http2-client.ts index ebd61bb6..1f8ad9a7 100644 --- a/src/http-client/node-http2-client.ts +++ b/src/http-client/node-http2-client.ts @@ -9,8 +9,10 @@ import { HTTPClientOptions, HTTPRequest, HTTPResponse, + HTTPStreamRequest, + StreamAdapter, } from "./http-client"; -import { NetworkError } from "../errors"; +import { ServiceError, NetworkError } from "../errors"; // alias http2 types type ClientHttp2Session = any; @@ -107,6 +109,11 @@ export class NodeHTTP2Client implements HTTPClient { }); } + /** {@inheritDoc HTTPStreamClient.stream} */ + stream(req: HTTPStreamRequest): StreamAdapter { + return this.#doStream(req); + } + /** {@inheritDoc HTTPClient.close} */ close() { // defend against redundant close calls @@ -170,7 +177,7 @@ export class NodeHTTP2Client implements HTTPClient { // append response data to the data string every time we receive new // data chunks in the response - req.on("data", (chunk: any) => { + req.on("data", (chunk: string) => { responseData += chunk; }); @@ -213,4 +220,107 @@ export class NodeHTTP2Client implements HTTPClient { } }); } + + /** {@inheritDoc HTTPStreamClient.stream} */ + #doStream({ + data: requestData, + headers: requestHeaders, + method, + }: HTTPStreamRequest): StreamAdapter { + let resolveChunk: (chunk: string) => void; + let rejectChunk: (reason: any) => void; + + const setChunkPromise = () => + new Promise((res, rej) => { + resolveChunk = res; + rejectChunk = rej; + }); + + let chunkPromise = setChunkPromise(); + + let req: ClientHttp2Stream; + const onResponse = ( + http2ResponseHeaders: IncomingHttpHeaders & IncomingHttpStatusHeader + ) => { + const status = Number( + http2ResponseHeaders[http2.constants.HTTP2_HEADER_STATUS] + ); + if (!(status >= 200 && status < 400)) { + // TODO: if we get bad status, then we should still finish reading the response, and create + // the appropriate error instance + rejectChunk( + new ServiceError( + { + error: { + code: "fauna error", + message: "fauna error", + }, + }, + status + ) + ); + } + + let partOfLine = ""; + + // append response data to the data string every time we receive new + // data chunks in the response + req.on("data", (chunk: string) => { + const chunkLines = (partOfLine + chunk).split("\n"); + + // Yield all complete lines + for (let i = 0; i < chunkLines.length - 1; i++) { + resolveChunk(chunkLines[i].trim()); + chunkPromise = setChunkPromise(); + } + + // Store the partial line + partOfLine = chunkLines[chunkLines.length - 1]; + }); + + // Once the response is finished, resolve the promise + req.on("end", () => { + resolveChunk(partOfLine); + }); + }; + + // eslint-disable-next-line @typescript-eslint/no-this-alias + const self = this; + + async function* reader(): AsyncGenerator { + const httpRequestHeaders: OutgoingHttpHeaders = { + ...requestHeaders, + [http2.constants.HTTP2_HEADER_PATH]: "/query/1", + [http2.constants.HTTP2_HEADER_METHOD]: method, + }; + + const session = self.#connect(); + req = session + .request(httpRequestHeaders) + .setEncoding("utf8") + .on("error", (error: any) => { + rejectChunk(error); + }) + .on("response", onResponse); + + const body = JSON.stringify(requestData); + + req.write(body, "utf8"); + + req.end(); + + while (true) { + yield await chunkPromise; + } + } + + return { + read: reader(), + close: () => { + if (req) { + req.close(); + } + }, + }; + } } From dd5373305de994aeb7b33706909132b69c62254e Mon Sep 17 00:00:00 2001 From: Paul Paterson Date: Fri, 8 Mar 2024 16:03:10 -0500 Subject: [PATCH 2/4] Fix stream path --- src/http-client/node-http2-client.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/http-client/node-http2-client.ts b/src/http-client/node-http2-client.ts index 1f8ad9a7..e4c01cc8 100644 --- a/src/http-client/node-http2-client.ts +++ b/src/http-client/node-http2-client.ts @@ -290,7 +290,7 @@ export class NodeHTTP2Client implements HTTPClient { async function* reader(): AsyncGenerator { const httpRequestHeaders: OutgoingHttpHeaders = { ...requestHeaders, - [http2.constants.HTTP2_HEADER_PATH]: "/query/1", + [http2.constants.HTTP2_HEADER_PATH]: "/stream/1", [http2.constants.HTTP2_HEADER_METHOD]: method, }; From e20010e4bdf70f4e72b817ae977af8d8d59c7561 Mon Sep 17 00:00:00 2001 From: Paul Paterson Date: Fri, 8 Mar 2024 16:05:48 -0500 Subject: [PATCH 3/4] Explicitly implement the HTTPStreamClient --- src/http-client/node-http2-client.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/http-client/node-http2-client.ts b/src/http-client/node-http2-client.ts index e4c01cc8..cde5e5df 100644 --- a/src/http-client/node-http2-client.ts +++ b/src/http-client/node-http2-client.ts @@ -9,6 +9,7 @@ import { HTTPClientOptions, HTTPRequest, HTTPResponse, + HTTPStreamClient, HTTPStreamRequest, StreamAdapter, } from "./http-client"; @@ -24,7 +25,7 @@ type OutgoingHttpHeaders = any; /** * An implementation for {@link HTTPClient} that uses the node http package */ -export class NodeHTTP2Client implements HTTPClient { +export class NodeHTTP2Client implements HTTPClient, HTTPStreamClient { static #clients: Map = new Map(); #http2_session_idle_ms: number; From 1f88f70882269b1ca3887cad88e8ce60adc4918e Mon Sep 17 00:00:00 2001 From: Paul Paterson Date: Fri, 8 Mar 2024 16:27:28 -0500 Subject: [PATCH 4/4] Handle error response bodies --- src/http-client/node-http2-client.ts | 64 ++++++++++++++-------------- 1 file changed, 33 insertions(+), 31 deletions(-) diff --git a/src/http-client/node-http2-client.ts b/src/http-client/node-http2-client.ts index cde5e5df..193c2263 100644 --- a/src/http-client/node-http2-client.ts +++ b/src/http-client/node-http2-client.ts @@ -247,42 +247,44 @@ export class NodeHTTP2Client implements HTTPClient, HTTPStreamClient { http2ResponseHeaders[http2.constants.HTTP2_HEADER_STATUS] ); if (!(status >= 200 && status < 400)) { - // TODO: if we get bad status, then we should still finish reading the response, and create - // the appropriate error instance - rejectChunk( - new ServiceError( - { - error: { - code: "fauna error", - message: "fauna error", - }, - }, - status - ) - ); - } + // Get the error body and then throw an error + let responseData = ""; + + // append response data to the data string every time we receive new + // data chunks in the response + req.on("data", (chunk: string) => { + responseData += chunk; + }); - let partOfLine = ""; + // Once the response is finished, resolve the promise + // TODO: The Client contains the information for how to parse an error + // into the appropriate class, so lift this logic out of the HTTPClient. + req.on("end", () => { + rejectChunk(new ServiceError(JSON.parse(responseData), status)); + }); + } else { + let partOfLine = ""; - // append response data to the data string every time we receive new - // data chunks in the response - req.on("data", (chunk: string) => { - const chunkLines = (partOfLine + chunk).split("\n"); + // append response data to the data string every time we receive new + // data chunks in the response + req.on("data", (chunk: string) => { + const chunkLines = (partOfLine + chunk).split("\n"); - // Yield all complete lines - for (let i = 0; i < chunkLines.length - 1; i++) { - resolveChunk(chunkLines[i].trim()); - chunkPromise = setChunkPromise(); - } + // Yield all complete lines + for (let i = 0; i < chunkLines.length - 1; i++) { + resolveChunk(chunkLines[i].trim()); + chunkPromise = setChunkPromise(); + } - // Store the partial line - partOfLine = chunkLines[chunkLines.length - 1]; - }); + // Store the partial line + partOfLine = chunkLines[chunkLines.length - 1]; + }); - // Once the response is finished, resolve the promise - req.on("end", () => { - resolveChunk(partOfLine); - }); + // Once the response is finished, resolve the promise + req.on("end", () => { + resolveChunk(partOfLine); + }); + } }; // eslint-disable-next-line @typescript-eslint/no-this-alias