Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add production field to data messages #135

Merged
merged 3 commits into from
Jan 18, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
protocol: add timeout to stream
  • Loading branch information
fracek committed Jan 18, 2025
commit c48a86805b6dbf48836aef2dace3f12f21d78894
6 changes: 4 additions & 2 deletions examples/starknet-client/src/main.ts
Original file line number Diff line number Diff line change
@@ -75,11 +75,13 @@ const command = defineCommand({
filter: [filter],
finality: "accepted",
startingCursor: {
orderKey: 800_000n,
orderKey: 1_078_335n,
},
});

for await (const message of client.streamData(request)) {
for await (const message of client.streamData(request, {
timeout: 40_000,
})) {
switch (message._tag) {
case "data": {
consola.info("Data", message.data.endCursor?.orderKey);
74 changes: 51 additions & 23 deletions packages/protocol/src/client.ts
Original file line number Diff line number Diff line change
@@ -23,14 +23,25 @@ import { type StreamDataRequest, StreamDataResponse } from "./stream";

export { ClientError, Status } from "nice-grpc";

const DEFAULT_TIMEOUT_MS = 45_000;

export class TimeoutError extends Error {
constructor(timeout: number) {
super(`No message received in ${timeout}ms`);
this.name = "TimeoutError";
}
}

/** Client call options. */
export interface ClientCallOptions {
signal?: AbortSignal;
}

export interface StreamDataOptions extends ClientCallOptions {
/** Stop at the specified cursor (inclusive) */
/** Stop at the specified cursor (inclusive). */
endingCursor?: Cursor;
/** Timeout between messages, in milliseconds. */
timeout?: number;
}

/** DNA client. */
@@ -112,44 +123,61 @@ export class StreamDataIterable<TBlock> {
const inner = this.it[Symbol.asyncIterator]();
const schema = StreamDataResponse(this.schema);
const decoder = Schema.decodeSync(schema);
const { endingCursor } = this.options ?? {};
const { endingCursor, timeout = DEFAULT_TIMEOUT_MS } = this.options ?? {};
let shouldStop = false;

let clock: string | number | NodeJS.Timeout | undefined;

return {
async next() {
if (shouldStop) {
return { done: true, value: undefined };
}

const { done, value } = await inner.next();
// biome-ignore lint/suspicious/noExplicitAny: any is ok
const t: Promise<{ done: boolean; value: any }> = new Promise(
(_, reject) => {
clock = setTimeout(() => {
reject(new TimeoutError(timeout));
}, timeout);
},
);

if (done || value.message === undefined) {
return { done: true, value: undefined };
}
try {
const { done, value } = await Promise.race([inner.next(), t]);

clearTimeout(clock);

if (done || value.message === undefined) {
return { done: true, value: undefined };
}

const decodedMessage = decoder(value.message);
const decodedMessage = decoder(value.message);

if (endingCursor) {
assert(value.message.$case === "data");
assert(decodedMessage._tag === "data");
if (endingCursor) {
assert(value.message.$case === "data");
assert(decodedMessage._tag === "data");

const { orderKey, uniqueKey } = endingCursor;
const endCursor = decodedMessage.data.endCursor;
const { orderKey, uniqueKey } = endingCursor;
const endCursor = decodedMessage.data.endCursor;

// Check if the orderKey matches
if (orderKey === endCursor?.orderKey) {
// If a uniqueKey is specified, it must also match
if (!uniqueKey || uniqueKey === endCursor.uniqueKey) {
shouldStop = true;
return { done: false, value: decodedMessage };
// Check if the orderKey matches
if (orderKey === endCursor?.orderKey) {
// If a uniqueKey is specified, it must also match
if (!uniqueKey || uniqueKey === endCursor.uniqueKey) {
shouldStop = true;
return { done: false, value: decodedMessage };
}
}
}
}

return {
done: false,
value: decodedMessage,
};
return {
done: false,
value: decodedMessage,
};
} finally {
clearTimeout(clock);
}
},
};
}