Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc refactoring #1296

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open

Conversation

khaidarkairbek
Copy link
Collaborator

@khaidarkairbek khaidarkairbek commented Nov 30, 2024

Relates to #1291 and #1290

@khaidarkairbek khaidarkairbek marked this pull request as draft November 30, 2024 00:29
@khaidarkairbek khaidarkairbek changed the title feat rpc refactoring to support websocket subscriptions Nov 30, 2024
@khaidarkairbek khaidarkairbek changed the title rpc refactoring to support websocket subscriptions rpc refactoring Nov 30, 2024
? await _eth_getBlockByNumber(args.requestQueue, {
blockTag: "latest",
})
: await _eth_getBlockByHash(args.requestQueue, {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how come we still need to fetch the block? Does newHeads not return the full block object?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newHeads returns the block header, which does not include transactions. So, we still need to fetch via _eth_getBlockByHash to access complete block data.

Comment on lines 852 to 860
// After a certain number of attempts, emit a fatal error.
if (++consecutiveErrors === ERROR_TIMEOUT.length) {
args.common.logger.error({
service: "realtime",
msg: `Fatal warning: Failed to subscribe to the latest '${args.network.name}' block after ${ERROR_TIMEOUT.length} attempts.`,
error,
});

args.onFatalError(error);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any way we could reuse the same retry logic we have in enqueue?

Comment on lines 42 to 50
export type SubscribeParameters = Parameters<
ResolvedWebSocketTransport["value"]["subscribe"]
>[0] & {
method: "eth_subscribe";
};

export type SubscribeReturnType = Awaited<
ReturnType<ResolvedWebSocketTransport["value"]["subscribe"]>
>;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these types from viem?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, ResolvedWebSocketTransport is based on the WebSocketTransport type defined in viem.

type ResolvedWebSocketTransport = Omit<
  ReturnType<WebSocketTransport>,
  "value"
> & {
  value: NonNullable<ReturnType<WebSocketTransport>["value"]>;
};

const wsTransport = resolveWebsocketTransport(network.transport);

if (wsTransport === undefined) {
throw new Error(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be NonRetryableError

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree, it should always be defined by the way we make use of _eth_subscribe in sync-realtime.

Comment on lines 261 to 284
export function resolveWebsocketTransport(
transport: ReturnType<Transport>,
): ResolvedWebSocketTransport | undefined {
if (transport.config.type === "http") {
return undefined;
}

if (transport.config.type === "fallback") {
const fallbackTransport: ReturnType<FallbackTransport> =
transport as ReturnType<FallbackTransport>;

const wsTransport = fallbackTransport.value!.transports.find(
(t: ReturnType<Transport>) => t.config.type === "webSocket",
) as ResolvedWebSocketTransport | undefined;

return wsTransport;
}

if (transport.config.type === "webSocket") {
return transport as ResolvedWebSocketTransport;
}

return undefined;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about getting away from using the Transport type in this module?

In the future this would let us implement features that aren't available in the transports like request cancellation. I also feel confident that ponder.config.ts should use rpc urls, not transports.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this makes sense, and we can make this change once we get to config and switch to rpcUrls instead of transports.

@khaidarkairbek khaidarkairbek marked this pull request as ready for review December 7, 2024 00:49
Comment on lines 210 to 213
subscribe: (params: SubscribeParameters) => {
const stopClockLag = startClock();

return requestQueue.add({ request: params, stopClockLag });
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about subscribe() not adding to the queue. I think it's safe to assume that we aren't going to get a 429 with the requests sent to it, because there should just be one per chain.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that makes sense.

Comment on lines 188 to 191
return await withRetry({
fn: subscribe,
request: task.request,
});
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the difference between the errors throw in subscribe() vs the errors passed to onError() in the subscribe params

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was confused a bit too, as far as I know, the onError handles the errors thrown during the active websocket connection, while the errors thrown in subscribe happen when establishing this connection. Feel free to correct me here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure either but that makes sense

interval = setInterval(enqueue, args.network.pollingInterval);
} else {
// Check on the connection every second
interval = setInterval(watchWebsocket, 1000);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to do this? Looking at the viem implementation of watchBlocks seems like they are handling it differently

https://github.com/wevm/viem/blob/main/src/actions/public/watchBlocks.ts

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My idea behind this implementation was to have persistency of websocket connection, so if error happens during active connection, we drop it and reconnect again. From the watchblocks implementation, they seem to leave this flexibility via onError callback.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants