From e5627f03f6a7eca962c4768cc0ba416f54fc6980 Mon Sep 17 00:00:00 2001 From: Nikita Podoliako <00nktk@hey.com> Date: Fri, 8 Nov 2024 00:33:05 +0300 Subject: [PATCH] feat(mempool): cache successful transactions and tx count --- Cargo.lock | 20 ++++++++++++++++---- mempool/Cargo.toml | 1 + mempool/src/pools/chain_pool.rs | 29 +++++++++++++++++++++++++++-- 3 files changed, 44 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3a379a77..cd0b4922 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2919,6 +2919,12 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foldhash" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f81ec6369c545a7d40e4589b5597581fa1c441fe1cce96dd1de43159910a36a2" + [[package]] name = "foreign-types" version = "0.3.2" @@ -3259,6 +3265,11 @@ name = "hashbrown" version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb" +dependencies = [ + "allocator-api2", + "equivalent", + "foldhash", +] [[package]] name = "hashlink" @@ -3706,7 +3717,7 @@ dependencies = [ "clap 4.5.4", "db", "hex", - "lru 0.12.3", + "lru 0.12.5", "neon-proxy-common", "parse", "prometheus", @@ -4151,11 +4162,11 @@ dependencies = [ [[package]] name = "lru" -version = "0.12.3" +version = "0.12.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3262e75e648fce39813cb56ac41f3c3e3f65217ebf3844d818d1f9398cfb0dc" +checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38" dependencies = [ - "hashbrown 0.14.2", + "hashbrown 0.15.0", ] [[package]] @@ -4267,6 +4278,7 @@ dependencies = [ "executor", "futures-util", "jsonrpsee", + "lru 0.12.5", "neon-api", "neon-proxy-common", "priority-queue", diff --git a/mempool/Cargo.toml b/mempool/Cargo.toml index 4806576d..92d0a2f5 100644 --- a/mempool/Cargo.toml +++ b/mempool/Cargo.toml @@ -31,3 +31,4 @@ tracing.workspace = true thiserror.workspace = true futures-util.workspace = true rust_decimal = "1.36.0" +lru = "0.12.5" diff --git a/mempool/src/pools/chain_pool.rs b/mempool/src/pools/chain_pool.rs index 57fd49a1..16b8defb 100644 --- a/mempool/src/pools/chain_pool.rs +++ b/mempool/src/pools/chain_pool.rs @@ -6,6 +6,7 @@ use std::time::Duration; use dashmap::DashMap; use futures_util::StreamExt; +use lru::LruCache; use priority_queue::PriorityQueue; use reth_primitives::alloy_primitives::TxNonce; use reth_primitives::{Address, BlockNumberOrTag, ChainId}; @@ -93,6 +94,8 @@ pub struct ChainPool { txs: Arc>, /// Timeout for evicting sender pools from the mempool eviction_timeout_sec: u64, + tx_count_cache: LruCache, + tx_cache: LruCache, } impl ChainPool { @@ -120,6 +123,8 @@ impl ChainPool { executor, txs, eviction_timeout_sec: config.eviction_timeout_sec, + tx_count_cache: LruCache::new((256 * 1024).try_into().unwrap()), // TODO + tx_cache: LruCache::new((256 * 1024).try_into().unwrap()), // TODO } } @@ -210,7 +215,9 @@ impl ChainPool { if let Some(sender_pool) = self.sender_pools.get(&addr) { result_tx.send(sender_pool.get_pending_tx_count()).unwrap(); } else { - result_tx.send(None).unwrap(); + result_tx + .send(self.tx_count_cache.get(&addr).copied()) + .unwrap(); } } Command::GetTxHash(addr, nonce, result_tx) => { @@ -294,6 +301,18 @@ impl ChainPool { let chain_id = tx.chain_id(); let sender = tx.sender; + if self + .tx_count_cache + .get(&tx.sender) + .map_or(false, |&count| count > tx.nonce) + { + return Err(MempoolError::NonceTooLow); + } + + if self.tx_cache.get(tx.tx_hash()).is_some() { + return Err(MempoolError::AlreadyKnown); + } + let (sender_pool, created) = self.get_or_create_sender_pool(&sender).await; // Clone necessary data from sender_pool to avoid mutable borrow later let sender_pool_state = sender_pool.state; @@ -569,11 +588,15 @@ impl ChainPool { &mut self, execution_result: ExecutionResult, ) -> Result<(), MempoolError> { - let Some((_tx_hash, record)) = self.txs.remove(&execution_result.tx_hash) else { + let Some((tx_hash, record)) = self.txs.remove(&execution_result.tx_hash) else { tracing::error!(?execution_result, "tx not found in the registry"); return Ok(()); }; + if execution_result.success { + self.tx_cache.push(tx_hash, ()); + } + let Some(sender_pool) = self.sender_pools.get_mut(&record.sender) else { tracing::error!(?execution_result, sender = ?record.sender, "sender pool not found"); return Ok(()); @@ -670,6 +693,8 @@ impl ChainPool { self.sender_pools.insert(*sender, sender_pool); return; } + self.tx_count_cache + .push(sender_pool.sender, sender_pool.tx_count); if sender_pool.is_empty() { return;