Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve incoming message handling on both Node and Deno #704

Merged
merged 1 commit into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 33 additions & 25 deletions deno-runtime/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,39 +99,47 @@ async function main() {

const decoder = new TextDecoder();

let messageBuffer = '';
let messageBuffer: string[] = [];

for await (const chunk of Deno.stdin.readable) {
const message = decoder.decode(chunk);
const decoded = decoder.decode(chunk);

messageBuffer += message;
const messages = decoded.split(MESSAGE_SEPARATOR);

if (!message?.endsWith(MESSAGE_SEPARATOR)) {
continue;
}

let JSONRPCMessage;

try {
JSONRPCMessage = Messenger.parseMessage(messageBuffer.replace(MESSAGE_SEPARATOR, ''));
} catch (error) {
if (Messenger.isErrorResponse(error)) {
await Messenger.Transport.send(error);
} else {
await Messenger.sendParseError();
// We can't run these concurrently because they'll screw up the messageBuffer
for (const [index, message] of messages.entries()) {
// If the message is empty, it means that the last chunk ended with a separator
if (!message.length) {
continue;
}

continue;
} finally {
messageBuffer = '';
}
messageBuffer.push(message);

if (Messenger.isRequest(JSONRPCMessage)) {
await requestRouter(JSONRPCMessage);
}
// If the message is the last one, we need to wait for the next chunk to arrive
if (index === messages.length - 1) {
continue;
}

if (Messenger.isResponse(JSONRPCMessage)) {
handleResponse(JSONRPCMessage);
try {
const JSONRPCMessage = Messenger.parseMessage(messageBuffer.join(''));

if (Messenger.isRequest(JSONRPCMessage)) {
await requestRouter(JSONRPCMessage);
continue;
}

if (Messenger.isResponse(JSONRPCMessage)) {
handleResponse(JSONRPCMessage);
}
} catch (error) {
if (Messenger.isErrorResponse(error)) {
await Messenger.Transport.send(error);
} else {
await Messenger.sendParseError();
}
} finally {
messageBuffer = [];
}
}
}
}
Expand Down
7 changes: 6 additions & 1 deletion src/server/AppManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,12 @@ export class AppManager {
console.warn(`Error while compiling the App "${item.info.name} (${item.id})":`);
console.error(e);

const prl = new ProxiedApp(this, item, {} as DenoRuntimeSubprocessController);
const prl = new ProxiedApp(this, item, {
// Maybe we should have an "EmptyRuntime" class for this?
getStatus() {
return AppStatus.COMPILER_ERROR_DISABLED;
},
} as unknown as DenoRuntimeSubprocessController);
this.apps.set(item.id, prl);
}
}
Expand Down
103 changes: 57 additions & 46 deletions src/server/runtime/AppsEngineDenoRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,16 @@ export class DenoRuntimeSubprocessController extends EventEmitter {
}
}

private send(message: jsonrpc.JsonRpc) {
this.deno.stdin.write(message.serialize().concat(MESSAGE_SEPARATOR));
}

public async sendRequest(message: Pick<jsonrpc.RequestObject, 'method' | 'params'>): Promise<unknown> {
const id = String(Math.random().toString(36)).substring(2);

const request = jsonrpc.request(id, message.method, message.params);

this.deno.stdin.write(request.serialize().concat(MESSAGE_SEPARATOR));
this.send(request);

return this.waitForResponse(request);
}
Expand Down Expand Up @@ -179,10 +183,58 @@ export class DenoRuntimeSubprocessController extends EventEmitter {
}

private setupListeners(): void {
this.deno.stdout.on('data', this.parseOutput.bind(this));
this.deno.stderr.on('data', this.parseError.bind(this));

this.on('ready', this.onReady.bind(this));

let messageBuffer: string[] = [];

this.deno.stdout.on('data', async (chunk: Buffer) => {
Copy link
Member Author

@d-gubert d-gubert Jan 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to move this out of parseOutput because I needed the messageBuffer to live outside the scope of the callback.

There is likely a better way to do this, but it can be left to another time maybe

// Chunk can be multiple JSONRpc messages as the stdout read stream can buffer multiple messages
const messages = chunk.toString().split(MESSAGE_SEPARATOR);

// We can't run these concurrently because they'll screw up the messageBuffer
for (const [index, message] of messages.entries()) {
// If the message is empty, it means that the last chunk ended with a separator
if (!message.length) continue;

messageBuffer.push(message);

// If the message is the last one, we need to wait for the next chunk to arrive
if (index === messages.length - 1) {
continue;
}

try {
const JSONRPCMessage = jsonrpc.parse(messageBuffer.join(''));

if (Array.isArray(JSONRPCMessage)) {
throw new Error('Invalid message format');
}

if (JSONRPCMessage.type === 'request' || JSONRPCMessage.type === 'notification') {
await this.handleIncomingMessage(JSONRPCMessage);
continue;
}

if (JSONRPCMessage.type === 'success' || JSONRPCMessage.type === 'error') {
await this.handleResultMessage(JSONRPCMessage);
continue;
}

console.error('Unrecognized message type', JSONRPCMessage);
} catch (e) {
// SyntaxError is thrown when the message is not a valid JSON
if (e instanceof SyntaxError) {
console.error('Failed to parse message', message);
continue;
}

console.error('Error executing handler', e, message);
} finally {
messageBuffer = [];
}
}
});
}

// Probable should extract this to a separate file
Expand Down Expand Up @@ -305,15 +357,15 @@ export class DenoRuntimeSubprocessController extends EventEmitter {
if (method.startsWith('accessor:')) {
const result = await this.handleAccessorMessage(message as jsonrpc.IParsedObjectRequest);

this.deno.stdin.write(result.serialize());
this.send(result);

return;
}

if (method.startsWith('bridge:')) {
const result = await this.handleBridgeMessage(message as jsonrpc.IParsedObjectRequest);

this.deno.stdin.write(result.serialize());
this.send(result);

return;
}
Expand Down Expand Up @@ -352,47 +404,6 @@ export class DenoRuntimeSubprocessController extends EventEmitter {
this.emit(`result:${id}`, result, error);
}

private async parseOutput(chunk: Buffer): Promise<void> {
// Chunk can be multiple JSONRpc messages as the stdout read stream can buffer multiple messages
const messages = chunk.toString().split(MESSAGE_SEPARATOR);

if (messages.length < 2) {
console.error('Invalid message format', messages);
return;
}

await Promise.all(
messages.map(async (m) => {
if (!m.length) return;

try {
const message = jsonrpc.parse(m);

if (Array.isArray(message)) {
throw new Error('Invalid message format');
}

if (message.type === 'request' || message.type === 'notification') {
return await this.handleIncomingMessage(message);
}

if (message.type === 'success' || message.type === 'error') {
return await this.handleResultMessage(message);
}

console.error('Unrecognized message type', message);
} catch (e) {
// SyntaxError is thrown when the message is not a valid JSON
if (e instanceof SyntaxError) {
return console.error('Failed to parse message', m);
}

console.error('Error executing handler', e, m);
}
}),
).catch(console.error);
}

private async parseError(chunk: Buffer): Promise<void> {
console.error('Subprocess stderr', chunk.toString());
}
Expand Down