Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial implementation for streams with http2 module #235

Merged
merged 4 commits into from
Mar 18, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 116 additions & 3 deletions src/http-client/node-http2-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ import {
HTTPClientOptions,
HTTPRequest,
HTTPResponse,
HTTPStreamClient,
HTTPStreamRequest,
StreamAdapter,
} from "./http-client";
import { NetworkError } from "../errors";
import { ServiceError, NetworkError } from "../errors";

// alias http2 types
type ClientHttp2Session = any;
Expand All @@ -22,7 +25,7 @@ type OutgoingHttpHeaders = any;
/**
* An implementation for {@link HTTPClient} that uses the node http package
*/
export class NodeHTTP2Client implements HTTPClient {
export class NodeHTTP2Client implements HTTPClient, HTTPStreamClient {
static #clients: Map<string, NodeHTTP2Client> = new Map();

#http2_session_idle_ms: number;
Expand Down Expand Up @@ -107,6 +110,11 @@ export class NodeHTTP2Client implements HTTPClient {
});
}

/** {@inheritDoc HTTPStreamClient.stream} */
stream(req: HTTPStreamRequest): StreamAdapter {
return this.#doStream(req);
}

/** {@inheritDoc HTTPClient.close} */
close() {
// defend against redundant close calls
Expand Down Expand Up @@ -170,7 +178,7 @@ export class NodeHTTP2Client implements HTTPClient {

// append response data to the data string every time we receive new
// data chunks in the response
req.on("data", (chunk: any) => {
req.on("data", (chunk: string) => {
responseData += chunk;
});

Expand Down Expand Up @@ -213,4 +221,109 @@ export class NodeHTTP2Client implements HTTPClient {
}
});
}

/** {@inheritDoc HTTPStreamClient.stream} */
#doStream({
data: requestData,
headers: requestHeaders,
method,
}: HTTPStreamRequest): StreamAdapter {
let resolveChunk: (chunk: string) => void;
let rejectChunk: (reason: any) => void;

const setChunkPromise = () =>
new Promise<string>((res, rej) => {
resolveChunk = res;
rejectChunk = rej;
});

let chunkPromise = setChunkPromise();

let req: ClientHttp2Stream;
const onResponse = (
http2ResponseHeaders: IncomingHttpHeaders & IncomingHttpStatusHeader
) => {
const status = Number(
http2ResponseHeaders[http2.constants.HTTP2_HEADER_STATUS]
);
if (!(status >= 200 && status < 400)) {
// Get the error body and then throw an error
let responseData = "";

// append response data to the data string every time we receive new
// data chunks in the response
req.on("data", (chunk: string) => {
responseData += chunk;
});

// Once the response is finished, resolve the promise
// TODO: The Client contains the information for how to parse an error
// into the appropriate class, so lift this logic out of the HTTPClient.
req.on("end", () => {
rejectChunk(new ServiceError(JSON.parse(responseData), status));
});
} else {
let partOfLine = "";

// append response data to the data string every time we receive new
// data chunks in the response
req.on("data", (chunk: string) => {
const chunkLines = (partOfLine + chunk).split("\n");

// Yield all complete lines
for (let i = 0; i < chunkLines.length - 1; i++) {
resolveChunk(chunkLines[i].trim());
chunkPromise = setChunkPromise();
}

// Store the partial line
partOfLine = chunkLines[chunkLines.length - 1];
});

// Once the response is finished, resolve the promise
req.on("end", () => {
resolveChunk(partOfLine);
});
}
};

// eslint-disable-next-line @typescript-eslint/no-this-alias
const self = this;

async function* reader(): AsyncGenerator<string> {
const httpRequestHeaders: OutgoingHttpHeaders = {
...requestHeaders,
[http2.constants.HTTP2_HEADER_PATH]: "/stream/1",
[http2.constants.HTTP2_HEADER_METHOD]: method,
};

const session = self.#connect();
req = session
.request(httpRequestHeaders)
.setEncoding("utf8")
.on("error", (error: any) => {
rejectChunk(error);
})
.on("response", onResponse);

const body = JSON.stringify(requestData);

req.write(body, "utf8");

req.end();

while (true) {
yield await chunkPromise;
}
}

return {
read: reader(),
close: () => {
if (req) {
req.close();
}
},
};
}
}
Loading