From 5fc8fe862d3f0c0d724ae35af61f89034135e8b9 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Wed, 22 Jan 2025 14:43:41 +0100 Subject: [PATCH] feat: install payload delay (#134) this adds a hack that wraps the engine_getPayloadV3 and applies a delay, that delays the request up to 500ms into the slot. the motivation for this is, so we can give the payload builder a bit more time to include more txs, because the previous block building window was observed to be ~250ms --- Cargo.lock | 4 + Cargo.toml | 1 + bin/odyssey/src/main.rs | 14 +++ crates/node/Cargo.toml | 4 + crates/node/src/delayed_resolve.rs | 146 +++++++++++++++++++++++++++++ crates/node/src/lib.rs | 1 + crates/wallet/src/lib.rs | 1 + 7 files changed, 171 insertions(+) create mode 100644 crates/node/src/delayed_resolve.rs diff --git a/Cargo.lock b/Cargo.lock index f8e47ce..6e254b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4635,9 +4635,12 @@ dependencies = [ "alloy-rpc-types", "alloy-rpc-types-eth", "eyre", + "futures", "jsonrpsee", "odyssey-common", "op-alloy-consensus", + "parking_lot", + "reth-chain-state", "reth-chainspec", "reth-cli", "reth-errors", @@ -4661,6 +4664,7 @@ dependencies = [ "reth-trie-db", "revm-precompile", "revm-primitives", + "serde", "serde_json", "tokio", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 0f06d59..c08f80b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -235,6 +235,7 @@ serde_json = "1" thiserror = "2" futures = "0.3" url = "2.5" +parking_lot = "0.12" # misc-testing rstest = "0.18.2" diff --git a/bin/odyssey/src/main.rs b/bin/odyssey/src/main.rs index ea3e031..b59e97a 100644 --- a/bin/odyssey/src/main.rs +++ b/bin/odyssey/src/main.rs @@ -30,6 +30,7 @@ use eyre::Context; use odyssey_node::{ broadcaster::periodic_broadcaster, chainspec::OdysseyChainSpecParser, + delayed_resolve::{DelayedResolver, MAX_DELAY_INTO_SLOT}, forwarder::forward_raw_transactions, node::OdysseyNode, rpc::{EthApiExt, EthApiOverrideServer}, @@ -40,6 +41,7 @@ use reth_node_builder::{engine_tree_config::TreeConfig, EngineNodeLauncher, Node use reth_optimism_cli::Cli; use reth_optimism_node::{args::RollupArgs, node::OpAddOnsBuilder}; use reth_provider::{providers::BlockchainProvider2, CanonStateSubscriptions}; +use std::time::Duration; use tracing::{info, warn}; #[global_allocator] @@ -110,6 +112,18 @@ fn main() { ctx.modules.merge_configured(walltime.into_rpc())?; info!(target: "reth::cli", "Walltime configured"); + // wrap the getPayloadV3 method in a delay + let engine_module = ctx.auth_module.module_mut().clone(); + let delay_into_slot = std::env::var("MAX_PAYLOAD_DELAY") + .ok() + .and_then(|val| val.parse::().map(Duration::from_millis).ok()) + .unwrap_or(MAX_DELAY_INTO_SLOT); + + let delayed_payload = DelayedResolver::new(engine_module, delay_into_slot); + delayed_payload.clone().spawn(ctx.provider().canonical_state_stream()); + ctx.auth_module.replace_auth_methods(delayed_payload.into_rpc_module())?; + info!(target: "reth::cli", "Configured payload delay"); + Ok(()) }) .launch_with_fn(|builder| { diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index 2f19e15..21e543d 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -33,6 +33,7 @@ reth-trie-common.workspace = true reth-trie-db.workspace = true reth-network.workspace = true reth-network-types.workspace = true +reth-chain-state.workspace = true alloy-consensus.workspace = true alloy-eips.workspace = true @@ -50,6 +51,9 @@ tokio.workspace = true tracing.workspace = true eyre.workspace = true jsonrpsee.workspace = true +futures.workspace = true +parking_lot.workspace = true +serde.workspace = true [lints] workspace = true diff --git a/crates/node/src/delayed_resolve.rs b/crates/node/src/delayed_resolve.rs new file mode 100644 index 0000000..9a64697 --- /dev/null +++ b/crates/node/src/delayed_resolve.rs @@ -0,0 +1,146 @@ +//! Helper that delays resolving the payload + +use futures::{Stream, StreamExt}; +use jsonrpsee::{ + core::traits::ToRpcParams, + types::{error::INVALID_PARAMS_CODE, ErrorObject, Params}, + MethodsError, RpcModule, +}; +use parking_lot::Mutex; +use reth_chain_state::CanonStateNotification; +use serde::de::Error; +use serde_json::value::RawValue; +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; + +/// Delay into the slot +pub const MAX_DELAY_INTO_SLOT: Duration = Duration::from_millis(500); + +/// The getpayload fn we want to delay +pub const GET_PAYLOAD_V3: &str = "engine_getPayloadV3"; + +/// A helper that tracks the block clock timestamp and can delay resolving the payload to give the +/// payload builder more time to build a block. +#[derive(Debug, Clone)] +pub struct DelayedResolver { + inner: Arc, +} + +impl DelayedResolver { + /// Creates a new instance with the engine module and the duration we should target + pub fn new(engine_module: RpcModule<()>, max_delay_into_slot: Duration) -> Self { + Self { + inner: Arc::new(DelayedResolverInner { + last_block_time: Mutex::new(Instant::now()), + engine_module, + max_delay_into_slot, + }), + } + } + + /// Listen for new blocks and track the local timestamp. + pub fn spawn(self, mut st: St) + where + St: Stream + Send + Unpin + 'static, + { + tokio::task::spawn(async move { + while st.next().await.is_some() { + *self.inner.last_block_time.lock() = Instant::now(); + } + }); + } + + async fn call(&self, params: Params<'static>) -> Result { + let last = *self.inner.last_block_time.lock(); + let now = Instant::now(); + // how far we're into the slot + let offset = now.duration_since(last); + + if offset < self.inner.max_delay_into_slot { + // if we received the request before the max delay exceeded we can delay the request to + // give the payload builder more time to build the payload. + let delay = self.inner.max_delay_into_slot.saturating_sub(offset); + tokio::time::sleep(delay).await; + } + + let params = params + .as_str() + .ok_or_else(|| MethodsError::Parse(serde_json::Error::missing_field("payload id")))?; + + self.inner.engine_module.call(GET_PAYLOAD_V3, PayloadParam(params.to_string())).await + } + + /// Converts this type into a new [`RpcModule`] that delegates the get payload call. + pub fn into_rpc_module(self) -> RpcModule<()> { + let mut module = RpcModule::new(()); + module + .register_async_method(GET_PAYLOAD_V3, move |params, _ctx, _| { + let value = self.clone(); + async move { + value.call(params).await.map_err(|err| match err { + MethodsError::JsonRpc(err) => err, + err => ErrorObject::owned( + INVALID_PARAMS_CODE, + format!("invalid payload call: {:?}", err), + None::<()>, + ), + }) + } + }) + .unwrap(); + + module + } +} + +#[derive(Debug)] +struct DelayedResolverInner { + /// Tracks the time when the last block was emitted + last_block_time: Mutex, + engine_module: RpcModule<()>, + /// By how much we want to delay getPayload into the slot + max_delay_into_slot: Duration, +} + +struct PayloadParam(String); + +impl ToRpcParams for PayloadParam { + fn to_rpc_params(self) -> Result>, serde_json::Error> { + RawValue::from_string(self.0).map(Some) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy_rpc_types::engine::PayloadId; + + /// Mocked payload object + #[derive(Clone, Debug, serde::Serialize, serde::Deserialize, Default)] + struct Payload { + attributes: serde_json::Value, + header: serde_json::Value, + } + + #[tokio::test] + async fn test_delayed_forward() { + use jsonrpsee::{core::RpcResult, RpcModule}; + + let mut module = RpcModule::new(()); + module + .register_method::, _>(GET_PAYLOAD_V3, |params, _, _| { + params.one::()?; + Ok(Payload::default()) + }) + .unwrap(); + + let id = PayloadId::default(); + + let _echo: Payload = module.call(GET_PAYLOAD_V3, [id]).await.unwrap(); + + let delayer = DelayedResolver::new(module, MAX_DELAY_INTO_SLOT).into_rpc_module(); + let _echo: Payload = delayer.call(GET_PAYLOAD_V3, [id]).await.unwrap(); + } +} diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index fe707ea..7d99ed9 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -17,6 +17,7 @@ pub mod broadcaster; pub mod chainspec; +pub mod delayed_resolve; pub mod evm; pub mod forwarder; pub mod node; diff --git a/crates/wallet/src/lib.rs b/crates/wallet/src/lib.rs index 61af625..f5d3437 100644 --- a/crates/wallet/src/lib.rs +++ b/crates/wallet/src/lib.rs @@ -310,6 +310,7 @@ impl OdysseyWallet { Self { inner: Arc::new(inner) } } + #[allow(clippy::missing_const_for_fn)] fn chain_id(&self) -> ChainId { self.inner.chain_id }