Skip to content

Commit

Permalink
fix: buffer types for TextDecoder
Browse files Browse the repository at this point in the history
  • Loading branch information
freaz committed Jan 31, 2025
1 parent 42be4b0 commit 491b05c
Show file tree
Hide file tree
Showing 4 changed files with 474 additions and 238 deletions.
195 changes: 127 additions & 68 deletions packages/cloudflare_worker_host/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,30 +1,43 @@
import { WASI } from '@cloudflare/workers-wasi';
import { WASI } from "@cloudflare/workers-wasi";

import { App, HandleMap, UnexpectedError } from './common/index.js';
import { App, HandleMap, UnexpectedError } from "./common/index.js";
// @ts-ignore
import coreModule from '../assets/core-async.wasm';
import type { FileSystem, Network, SecurityValuesMap, TextCoder, Timers, WasiContext } from './common/index.js';
import { ErrorCode, HostError, Persistence, WasiErrno, WasiError } from './common/index.js';

const pkg = require('../package.json');

export { PerformError, UnexpectedError } from './common/error.js';
import coreModule from "../assets/core-async.wasm";
import type {
FileSystem,
Network,
SecurityValuesMap,
TextCoder,
Timers,
WasiContext,
} from "./common/index.js";
import {
ErrorCode,
HostError,
Persistence,
WasiErrno,
WasiError,
} from "./common/index.js";

const pkg = require("../package.json");

export { PerformError, UnexpectedError } from "./common/error.js";

class CfwTextCoder implements TextCoder {
private encoder: TextEncoder = new TextEncoder();
private decoder: TextDecoder = new TextDecoder();

decodeUtf8(buffer: ArrayBufferLike): string {
decodeUtf8(buffer: ArrayBuffer): string {
return this.decoder.decode(buffer);
}

encodeUtf8(string: string): ArrayBuffer {
encodeUtf8(string: string): Uint8Array<ArrayBufferLike> {
return this.encoder.encode(string);
}
}
class CfwFileSystem implements FileSystem {
private readonly preopens: Record<string, Uint8Array>;
private readonly files: HandleMap<{ data: Uint8Array, cursor: number }>;
private readonly files: HandleMap<{ data: Uint8Array; cursor: number }>;

constructor(preopens: Record<string, Uint8Array>) {
this.preopens = preopens;
Expand All @@ -35,7 +48,17 @@ class CfwFileSystem implements FileSystem {
return this.preopens[path] !== undefined;
}

async open(path: string, options: { createNew?: boolean, create?: boolean, truncate?: boolean, append?: boolean, write?: boolean, read?: boolean }): Promise<number> {
async open(
path: string,
options: {
createNew?: boolean;
create?: boolean;
truncate?: boolean;
append?: boolean;
write?: boolean;
read?: boolean;
}
): Promise<number> {
if (options.read !== true) {
throw new WasiError(WasiErrno.EROFS);
}
Expand All @@ -53,7 +76,10 @@ class CfwFileSystem implements FileSystem {
throw new WasiError(WasiErrno.EBADF);
}

const readCount = Math.min(out.byteLength, file.data.byteLength - file.cursor);
const readCount = Math.min(
out.byteLength,
file.data.byteLength - file.cursor
);
const data = file.data.subarray(file.cursor, file.cursor + readCount);
for (let i = 0; i < readCount; i += 1) {
out[i] = data[i];
Expand Down Expand Up @@ -92,7 +118,7 @@ class CfwNetwork implements Network {
try {
response = await fetch(input, init);
} catch (err: unknown) {
if (typeof err === 'object' && err !== null && 'message' in err) {
if (typeof err === "object" && err !== null && "message" in err) {
// found a `Error: Network connection lost` caused by `kj/async-io-unix.c++:186: disconnected` in the wild
throw new HostError(ErrorCode.NetworkError, `${err.message}`);
}
Expand All @@ -102,18 +128,21 @@ class CfwNetwork implements Network {
if (response.status === 530) {
// TODO: DNS error is 530 with a human-readable HTML body describing the error
// this is not trivial to parse and map to our error codes
throw new HostError(ErrorCode.NetworkError, await response.text().catch(_ => 'Unknown Cloudflare 530 error'));
throw new HostError(
ErrorCode.NetworkError,
await response.text().catch((_) => "Unknown Cloudflare 530 error")
);
}

return response;
}
}
class CfwTextStreamDecoder {
private decoder: TextDecoder = new TextDecoder();
private decoderBuffer: string = '';
private decoderBuffer: string = "";

/** Decodes streaming data and splits it by newline. */
decodeUtf8Lines(buffer: ArrayBufferLike): string[] {
decodeUtf8Lines(buffer: ArrayBuffer): string[] {
this.decoderBuffer += this.decoder.decode(buffer, { stream: true });

return this.getLines();
Expand All @@ -126,7 +155,7 @@ class CfwTextStreamDecoder {
}

private getLines() {
const lines = this.decoderBuffer.split('\n');
const lines = this.decoderBuffer.split("\n");
if (lines.length > 1) {
this.decoderBuffer = lines[lines.length - 1];
return lines.slice(0, -1);
Expand Down Expand Up @@ -156,8 +185,8 @@ class CfwWasiCompat implements WasiContext {
this.wasi.start({
exports: {
...instance.exports,
_start() { }
}
_start() {},
},
});

this.memory = instance.exports.memory as WebAssembly.Memory;
Expand All @@ -169,18 +198,18 @@ class CfwWasiCompat implements WasiContext {
iovs_ptr: number,
iovs_len: number
): Array<Uint8Array> {
let result = Array<Uint8Array>(iovs_len)
let result = Array<Uint8Array>(iovs_len);

for (let i = 0; i < iovs_len; i++) {
const bufferPtr = view.getUint32(iovs_ptr, true)
iovs_ptr += 4
const bufferPtr = view.getUint32(iovs_ptr, true);
iovs_ptr += 4;

const bufferLen = view.getUint32(iovs_ptr, true)
iovs_ptr += 4
const bufferLen = view.getUint32(iovs_ptr, true);
iovs_ptr += 4;

result[i] = new Uint8Array(view.buffer, bufferPtr, bufferLen)
result[i] = new Uint8Array(view.buffer, bufferPtr, bufferLen);
}
return result
return result;
}

private fd_write(
Expand Down Expand Up @@ -208,9 +237,13 @@ class CfwWasiCompat implements WasiContext {
view.setUint32(retptr0, writeCount, true);

if (fd === 1) {
this.stdoutDecoder.decodeUtf8Lines(buffer).forEach(line => console.log(line));
this.stdoutDecoder
.decodeUtf8Lines(buffer.buffer)
.forEach((line) => console.log(line));
} else {
this.stderrDecoder.decodeUtf8Lines(buffer).forEach(line => console.error(line));
this.stderrDecoder
.decodeUtf8Lines(buffer.buffer)
.forEach((line) => console.error(line));
}

return 0; // SUCCESS
Expand All @@ -231,7 +264,7 @@ class CfwPersistence implements Persistence {
if (superfaceApiUrl !== undefined) {
this.insightsUrl = `${superfaceApiUrl}/insights/sdk_event`;
} else {
this.insightsUrl = 'https://superface.ai/insights/sdk_event';
this.insightsUrl = "https://superface.ai/insights/sdk_event";
}
}

Expand All @@ -240,23 +273,20 @@ class CfwPersistence implements Persistence {
// 2. Logpush https://developers.cloudflare.com/workers/platform/logpush
async persistMetrics(events: string[]): Promise<void> {
const headers: Record<string, string> = {
'content-type': 'application/json'
"content-type": "application/json",
};
if (this.token !== undefined) {
headers['authorization'] = `SUPERFACE-SDK-TOKEN ${this.token}`;
headers["authorization"] = `SUPERFACE-SDK-TOKEN ${this.token}`;
}
if (this.userAgent !== undefined) {
headers['user-agent'] = this.userAgent;
headers["user-agent"] = this.userAgent;
}

await fetch(
`${this.insightsUrl}/batch`,
{
method: 'POST',
body: '[' + events.join(',') + ']',
headers
}
);
await fetch(`${this.insightsUrl}/batch`, {
method: "POST",
body: "[" + events.join(",") + "]",
headers,
});
}

async persistDeveloperDump(events: string[]): Promise<void> {
Expand Down Expand Up @@ -287,13 +317,20 @@ class InternalClient {

constructor(readonly options: ClientOptions = {}) {
this.fileSystem = new CfwFileSystem(options.preopens ?? {});
this.app = new App({
fileSystem: this.fileSystem,
textCoder: new CfwTextCoder(),
timers: new CfwTimers(),
network: new CfwNetwork(),
persistence: new CfwPersistence(options.token, options.superfaceApiUrl, this.userAgent)
}, { metricsTimeout: 0 });
this.app = new App(
{
fileSystem: this.fileSystem,
textCoder: new CfwTextCoder(),
timers: new CfwTimers(),
network: new CfwNetwork(),
persistence: new CfwPersistence(
options.token,
options.superfaceApiUrl,
this.userAgent
),
},
{ metricsTimeout: 0 }
);
}

public async init() {
Expand All @@ -302,14 +339,14 @@ class InternalClient {
return;
}

console.log('INIT useragent', this.userAgent);
console.log("INIT useragent", this.userAgent);

await this.app.loadCoreModule(coreModule);
const wasi = new WASI({
env: {
ONESDK_DEFAULT_USERAGENT: this.userAgent,
...this.options.env
}
...this.options.env,
},
});
await this.app.init(new CfwWasiCompat(wasi));

Expand All @@ -331,15 +368,15 @@ class InternalClient {
): Promise<any> {
await this.init();

const resolvedProfile = profile.replace(/\//g, '.'); // TODO: be smarter about this
const assetsPath = this.options.assetsPath ?? 'superface'; // TODO: path join? - not sure if we are going to stick with this VFS
const resolvedProfile = profile.replace(/\//g, "."); // TODO: be smarter about this
const assetsPath = this.options.assetsPath ?? "superface"; // TODO: path join? - not sure if we are going to stick with this VFS

let profilePath = `${assetsPath}/${resolvedProfile}.profile.ts`;
// migration from Comlink to TypeScript profiles
const profilePathComlink = `${assetsPath}/${resolvedProfile}.profile`;
if (
!(await this.fileSystem.exists(profilePath))
&& (await this.fileSystem.exists(profilePathComlink))
!(await this.fileSystem.exists(profilePath)) &&
(await this.fileSystem.exists(profilePathComlink))
) {
profilePath = profilePathComlink;
}
Expand All @@ -356,7 +393,10 @@ class InternalClient {
security
);
} catch (err: unknown) {
if (err instanceof UnexpectedError && (err.name === 'WebAssemblyRuntimeError')) {
if (
err instanceof UnexpectedError &&
err.name === "WebAssemblyRuntimeError"
) {
await this.destroy();
}

Expand Down Expand Up @@ -393,20 +433,26 @@ export class OneClient {
}

/** Send metrics to Superface.
*
*
* If `token` was passed in `ClientOptions` then the metrics will be associated with that account and project.
*/
*/
public async sendMetricsToSuperface(): Promise<void> {
return this.internal.sendMetrics();
}
}

export class Profile {
private constructor(private readonly internal: InternalClient, public readonly name: string, public readonly url: string) {
}
private constructor(
private readonly internal: InternalClient,
public readonly name: string,
public readonly url: string
) {}

public static async loadLocal(internal: InternalClient, name: string): Promise<Profile> {
return new Profile(internal, name, ''); // TODO: why do we need the url here?
public static async loadLocal(
internal: InternalClient,
name: string
): Promise<Profile> {
return new Profile(internal, name, ""); // TODO: why do we need the url here?
}

public getUseCase(usecaseName: string): UseCase {
Expand All @@ -415,10 +461,23 @@ export class Profile {
}

export class UseCase {
constructor(private readonly internal: InternalClient, private readonly profile: Profile, public readonly name: string) {
}

public async perform<TInput = unknown, TResult = unknown>(input: TInput | undefined, options: ClientPerformOptions): Promise<TResult> {
return await this.internal.perform(this.profile.name, options.provider, this.name, input, options?.parameters, options?.security) as TResult;
constructor(
private readonly internal: InternalClient,
private readonly profile: Profile,
public readonly name: string
) {}

public async perform<TInput = unknown, TResult = unknown>(
input: TInput | undefined,
options: ClientPerformOptions
): Promise<TResult> {
return (await this.internal.perform(
this.profile.name,
options.provider,
this.name,
input,
options?.parameters,
options?.security
)) as TResult;
}
}
Loading

0 comments on commit 491b05c

Please sign in to comment.