diff --git a/crates/rpc/rpc-api/src/eth_filter.rs b/crates/rpc/rpc-api/src/eth_filter.rs index 484157898e1d..8ec470c125b9 100644 --- a/crates/rpc/rpc-api/src/eth_filter.rs +++ b/crates/rpc/rpc-api/src/eth_filter.rs @@ -1,6 +1,5 @@ use jsonrpsee::{core::RpcResult, proc_macros::rpc}; -use reth_rpc_types::{Filter, FilterChanges, FilterId, Log}; - +use reth_rpc_types::{Filter, FilterChanges, FilterId, Log, PendingTransactionFilterKind}; /// Rpc Interface for poll-based ethereum filter API. #[cfg_attr(not(feature = "client"), rpc(server, namespace = "eth"))] #[cfg_attr(feature = "client", rpc(server, client, namespace = "eth"))] @@ -15,7 +14,10 @@ pub trait EthFilterApi { /// Creates a pending transaction filter and returns its id. #[method(name = "newPendingTransactionFilter")] - async fn new_pending_transaction_filter(&self) -> RpcResult; + async fn new_pending_transaction_filter( + &self, + kind: Option, + ) -> RpcResult; /// Returns all filter changes since last poll. #[method(name = "getFilterChanges")] diff --git a/crates/rpc/rpc-builder/tests/it/http.rs b/crates/rpc/rpc-builder/tests/it/http.rs index 45931efc441f..85a87b67ec90 100644 --- a/crates/rpc/rpc-builder/tests/it/http.rs +++ b/crates/rpc/rpc-builder/tests/it/http.rs @@ -18,7 +18,10 @@ use reth_rpc_api::{ Web3ApiClient, }; use reth_rpc_builder::RethRpcModule; -use reth_rpc_types::{trace::filter::TraceFilter, CallRequest, Filter, Index, TransactionRequest}; +use reth_rpc_types::{ + trace::filter::TraceFilter, CallRequest, Filter, Index, PendingTransactionFilterKind, + TransactionRequest, +}; use std::collections::HashSet; fn is_unimplemented(err: Error) -> bool { @@ -36,7 +39,13 @@ where C: ClientT + SubscriptionClientT + Sync, { EthFilterApiClient::new_filter(client, Filter::default()).await.unwrap(); - EthFilterApiClient::new_pending_transaction_filter(client).await.unwrap(); + EthFilterApiClient::new_pending_transaction_filter(client, None).await.unwrap(); + EthFilterApiClient::new_pending_transaction_filter( + client, + Some(PendingTransactionFilterKind::Full), + ) + .await + .unwrap(); let id = EthFilterApiClient::new_block_filter(client).await.unwrap(); EthFilterApiClient::filter_changes(client, id.clone()).await.unwrap(); EthFilterApiClient::logs(client, Filter::default()).await.unwrap(); diff --git a/crates/rpc/rpc-types/src/eth/filter.rs b/crates/rpc/rpc-types/src/eth/filter.rs index c56365941c03..6935feca54d0 100644 --- a/crates/rpc/rpc-types/src/eth/filter.rs +++ b/crates/rpc/rpc-types/src/eth/filter.rs @@ -1,4 +1,4 @@ -use crate::{eth::log::Log as RpcLog, BlockNumberOrTag, Log}; +use crate::{eth::log::Log as RpcLog, BlockNumberOrTag, Log, Transaction}; use alloy_primitives::{keccak256, Address, Bloom, BloomInput, B256, U256, U64}; use itertools::{EitherOrBoth::*, Itertools}; use serde::{ @@ -820,7 +820,6 @@ impl FilteredParams { true } } - /// Response of the `eth_getFilterChanges` RPC. #[derive(Clone, Debug, Eq, PartialEq)] pub enum FilterChanges { @@ -828,6 +827,8 @@ pub enum FilterChanges { Logs(Vec), /// New hashes (block or transactions) Hashes(Vec), + /// New transactions. + Transactions(Vec), /// Empty result, Empty, } @@ -840,6 +841,7 @@ impl Serialize for FilterChanges { match self { FilterChanges::Logs(logs) => logs.serialize(s), FilterChanges::Hashes(hashes) => hashes.serialize(s), + FilterChanges::Transactions(transactions) => transactions.serialize(s), FilterChanges::Empty => (&[] as &[serde_json::Value]).serialize(s), } } @@ -908,6 +910,51 @@ impl From> for FilterId { } } } +/// Specifies the kind of information you wish to receive from the `eth_newPendingTransactionFilter` +/// RPC endpoint. +/// +/// When this type is used in a request, it determines whether the client wishes to receive: +/// - Only the transaction hashes (`Hashes` variant), or +/// - Full transaction details (`Full` variant). +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum PendingTransactionFilterKind { + /// Receive only the hashes of the transactions. + #[default] + Hashes, + /// Receive full details of the transactions. + Full, +} + +impl Serialize for PendingTransactionFilterKind { + /// Serializes the `PendingTransactionFilterKind` into a boolean value: + /// - `false` for `Hashes` + /// - `true` for `Full` + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + match self { + PendingTransactionFilterKind::Hashes => false.serialize(serializer), + PendingTransactionFilterKind::Full => true.serialize(serializer), + } + } +} + +impl<'a> Deserialize<'a> for PendingTransactionFilterKind { + /// Deserializes a boolean value into `PendingTransactionFilterKind`: + /// - `false` becomes `Hashes` + /// - `true` becomes `Full` + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'a>, + { + let val = Option::::deserialize(deserializer)?; + match val { + Some(true) => Ok(PendingTransactionFilterKind::Full), + _ => Ok(PendingTransactionFilterKind::Hashes), + } + } +} #[cfg(test)] mod tests { diff --git a/crates/rpc/rpc-types/src/eth/pubsub.rs b/crates/rpc/rpc-types/src/eth/pubsub.rs index 3c4344ca8b66..66a8266fb260 100644 --- a/crates/rpc/rpc-types/src/eth/pubsub.rs +++ b/crates/rpc/rpc-types/src/eth/pubsub.rs @@ -4,7 +4,6 @@ use crate::{ eth::{Filter, Transaction}, Log, RichHeader, }; - use alloy_primitives::B256; use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer}; diff --git a/crates/rpc/rpc/src/eth/filter.rs b/crates/rpc/rpc/src/eth/filter.rs index d895d7cf9a49..e8e2f4a92d12 100644 --- a/crates/rpc/rpc/src/eth/filter.rs +++ b/crates/rpc/rpc/src/eth/filter.rs @@ -7,16 +7,20 @@ use crate::{ result::{rpc_error_with_code, ToRpcResult}, EthSubscriptionIdProvider, }; -use alloy_primitives::B256; +use core::fmt; + use async_trait::async_trait; use jsonrpsee::{core::RpcResult, server::IdProvider}; use reth_interfaces::RethError; -use reth_primitives::{BlockHashOrNumber, Receipt, SealedBlock, TxHash}; +use reth_primitives::{BlockHashOrNumber, IntoRecoveredTransaction, Receipt, SealedBlock, TxHash}; use reth_provider::{BlockIdReader, BlockReader, EvmEnvProvider}; use reth_rpc_api::EthFilterApiServer; -use reth_rpc_types::{Filter, FilterBlockOption, FilterChanges, FilterId, FilteredParams, Log}; +use reth_rpc_types::{ + Filter, FilterBlockOption, FilterChanges, FilterId, FilteredParams, Log, + PendingTransactionFilterKind, +}; use reth_tasks::TaskSpawner; -use reth_transaction_pool::TransactionPool; +use reth_transaction_pool::{NewSubpoolTransactionStream, PoolTransaction, TransactionPool}; use std::{ collections::HashMap, iter::StepBy, @@ -35,7 +39,7 @@ const MAX_HEADERS_RANGE: u64 = 1_000; // with ~530bytes per header this is ~500k /// `Eth` filter RPC implementation. pub struct EthFilter { - /// All nested fields bundled together. + /// All nested fields bundled together inner: Arc>, } @@ -120,6 +124,7 @@ impl EthFilter where Provider: BlockReader + BlockIdReader + EvmEnvProvider + 'static, Pool: TransactionPool + 'static, + ::Transaction: 'static, { /// Returns all the filter changes for the given id, if any pub async fn filter_changes(&self, id: FilterId) -> Result { @@ -148,10 +153,7 @@ where }; match kind { - FilterKind::PendingTransaction(receiver) => { - let pending_txs = receiver.drain().await; - Ok(FilterChanges::Hashes(pending_txs)) - } + FilterKind::PendingTransaction(filter) => Ok(filter.drain().await), FilterKind::Block => { // Note: we need to fetch the block hashes from inclusive range // [start_block..best_block] @@ -235,13 +237,31 @@ where } /// Handler for `eth_newPendingTransactionFilter` - async fn new_pending_transaction_filter(&self) -> RpcResult { + async fn new_pending_transaction_filter( + &self, + kind: Option, + ) -> RpcResult { trace!(target: "rpc::eth", "Serving eth_newPendingTransactionFilter"); - let receiver = self.inner.pool.pending_transactions_listener(); - let pending_txs_receiver = PendingTransactionsReceiver::new(receiver); + let transaction_kind = match kind.unwrap_or_default() { + PendingTransactionFilterKind::Hashes => { + let receiver = self.inner.pool.pending_transactions_listener(); + let pending_txs_receiver = PendingTransactionsReceiver::new(receiver); + FilterKind::PendingTransaction(PendingTransactionKind::Hashes(pending_txs_receiver)) + } + PendingTransactionFilterKind::Full => { + let stream = self.inner.pool.new_pending_pool_transactions_listener(); + let full_txs_receiver = FullTransactionsReceiver::new(stream); + FilterKind::PendingTransaction(PendingTransactionKind::FullTransaction(Arc::new( + full_txs_receiver, + ))) + } + }; + + //let filter = FilterKind::PendingTransaction(transaction_kind); - self.inner.install_filter(FilterKind::PendingTransaction(pending_txs_receiver)).await + // Install the filter and propagate any errors + self.inner.install_filter(transaction_kind).await } /// Handler for `eth_getFilterChanges` @@ -490,14 +510,81 @@ impl PendingTransactionsReceiver { } /// Returns all new pending transactions received since the last poll. - async fn drain(&self) -> Vec { + async fn drain(&self) -> FilterChanges { let mut pending_txs = Vec::new(); let mut prepared_stream = self.txs_receiver.lock().await; while let Ok(tx_hash) = prepared_stream.try_recv() { pending_txs.push(tx_hash); } - pending_txs + + // Convert the vector of hashes into FilterChanges::Hashes + FilterChanges::Hashes(pending_txs) + } +} + +/// A structure to manage and provide access to a stream of full transaction details. +#[derive(Debug, Clone)] +struct FullTransactionsReceiver { + txs_stream: Arc>>, +} + +impl FullTransactionsReceiver +where + T: PoolTransaction + 'static, +{ + /// Creates a new `FullTransactionsReceiver` encapsulating the provided transaction stream. + fn new(stream: NewSubpoolTransactionStream) -> Self { + FullTransactionsReceiver { txs_stream: Arc::new(Mutex::new(stream)) } + } + + /// Returns all new pending transactions received since the last poll. + async fn drain(&self) -> FilterChanges { + let mut pending_txs = Vec::new(); + let mut prepared_stream = self.txs_stream.lock().await; + + while let Ok(tx) = prepared_stream.try_recv() { + pending_txs.push(reth_rpc_types_compat::transaction::from_recovered( + tx.transaction.to_recovered_transaction(), + )) + } + FilterChanges::Transactions(pending_txs) + } +} + +/// Helper trait for [FullTransactionsReceiver] to erase the `Transaction` type. +#[async_trait] +trait FullTransactionsFilter: fmt::Debug + Send + Sync + Unpin + 'static { + async fn drain(&self) -> FilterChanges; +} + +#[async_trait] +impl FullTransactionsFilter for FullTransactionsReceiver +where + T: PoolTransaction + 'static, +{ + async fn drain(&self) -> FilterChanges { + FullTransactionsReceiver::drain(self).await + } +} + +/// Represents the kind of pending transaction data that can be retrieved. +/// +/// This enum differentiates between two kinds of pending transaction data: +/// - Just the transaction hashes. +/// - Full transaction details. +#[derive(Debug, Clone)] +enum PendingTransactionKind { + Hashes(PendingTransactionsReceiver), + FullTransaction(Arc), +} + +impl PendingTransactionKind { + async fn drain(&self) -> FilterChanges { + match self { + PendingTransactionKind::Hashes(receiver) => receiver.drain().await, + PendingTransactionKind::FullTransaction(receiver) => receiver.drain().await, + } } } @@ -505,9 +592,8 @@ impl PendingTransactionsReceiver { enum FilterKind { Log(Box), Block, - PendingTransaction(PendingTransactionsReceiver), + PendingTransaction(PendingTransactionKind), } - /// Errors that can occur in the handler implementation #[derive(Debug, thiserror::Error)] pub enum FilterError { diff --git a/crates/transaction-pool/src/traits.rs b/crates/transaction-pool/src/traits.rs index 398fcec1b0cf..9d75721b47d6 100644 --- a/crates/transaction-pool/src/traits.rs +++ b/crates/transaction-pool/src/traits.rs @@ -1113,6 +1113,22 @@ impl NewSubpoolTransactionStream { pub fn new(st: Receiver>, subpool: SubPool) -> Self { Self { st, subpool } } + + /// Tries to receive the next value for this stream. + pub fn try_recv( + &mut self, + ) -> Result, tokio::sync::mpsc::error::TryRecvError> { + loop { + match self.st.try_recv() { + Ok(event) => { + if event.subpool == self.subpool { + return Ok(event) + } + } + Err(e) => return Err(e), + } + } + } } impl Stream for NewSubpoolTransactionStream {