Skip to content

Commit

Permalink
feat(mempool): cache successful transactions and tx count
Browse files Browse the repository at this point in the history
  • Loading branch information
00nktk committed Nov 7, 2024
1 parent 06fac4b commit e5627f0
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 6 deletions.
20 changes: 16 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions mempool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ tracing.workspace = true
thiserror.workspace = true
futures-util.workspace = true
rust_decimal = "1.36.0"
lru = "0.12.5"
29 changes: 27 additions & 2 deletions mempool/src/pools/chain_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::time::Duration;

use dashmap::DashMap;
use futures_util::StreamExt;
use lru::LruCache;
use priority_queue::PriorityQueue;
use reth_primitives::alloy_primitives::TxNonce;
use reth_primitives::{Address, BlockNumberOrTag, ChainId};
Expand Down Expand Up @@ -93,6 +94,8 @@ pub struct ChainPool<E: Execute, G: GasPricesTrait, C: GetTxCountTrait> {
txs: Arc<DashMap<EthTxHash, TxRecord>>,
/// Timeout for evicting sender pools from the mempool
eviction_timeout_sec: u64,
tx_count_cache: LruCache<Address, u64>,
tx_cache: LruCache<EthTxHash, ()>,
}

impl<E: Execute, G: GasPricesTrait, C: GetTxCountTrait> ChainPool<E, G, C> {
Expand Down Expand Up @@ -120,6 +123,8 @@ impl<E: Execute, G: GasPricesTrait, C: GetTxCountTrait> ChainPool<E, G, C> {
executor,
txs,
eviction_timeout_sec: config.eviction_timeout_sec,
tx_count_cache: LruCache::new((256 * 1024).try_into().unwrap()), // TODO
tx_cache: LruCache::new((256 * 1024).try_into().unwrap()), // TODO
}
}

Expand Down Expand Up @@ -210,7 +215,9 @@ impl<E: Execute, G: GasPricesTrait, C: GetTxCountTrait> ChainPool<E, G, C> {
if let Some(sender_pool) = self.sender_pools.get(&addr) {
result_tx.send(sender_pool.get_pending_tx_count()).unwrap();
} else {
result_tx.send(None).unwrap();
result_tx
.send(self.tx_count_cache.get(&addr).copied())
.unwrap();
}
}
Command::GetTxHash(addr, nonce, result_tx) => {
Expand Down Expand Up @@ -294,6 +301,18 @@ impl<E: Execute, G: GasPricesTrait, C: GetTxCountTrait> ChainPool<E, G, C> {
let chain_id = tx.chain_id();
let sender = tx.sender;

if self
.tx_count_cache
.get(&tx.sender)
.map_or(false, |&count| count > tx.nonce)
{
return Err(MempoolError::NonceTooLow);
}

if self.tx_cache.get(tx.tx_hash()).is_some() {
return Err(MempoolError::AlreadyKnown);
}

let (sender_pool, created) = self.get_or_create_sender_pool(&sender).await;
// Clone necessary data from sender_pool to avoid mutable borrow later
let sender_pool_state = sender_pool.state;
Expand Down Expand Up @@ -569,11 +588,15 @@ impl<E: Execute, G: GasPricesTrait, C: GetTxCountTrait> ChainPool<E, G, C> {
&mut self,
execution_result: ExecutionResult,
) -> Result<(), MempoolError> {
let Some((_tx_hash, record)) = self.txs.remove(&execution_result.tx_hash) else {
let Some((tx_hash, record)) = self.txs.remove(&execution_result.tx_hash) else {
tracing::error!(?execution_result, "tx not found in the registry");
return Ok(());
};

if execution_result.success {
self.tx_cache.push(tx_hash, ());
}

let Some(sender_pool) = self.sender_pools.get_mut(&record.sender) else {
tracing::error!(?execution_result, sender = ?record.sender, "sender pool not found");
return Ok(());
Expand Down Expand Up @@ -670,6 +693,8 @@ impl<E: Execute, G: GasPricesTrait, C: GetTxCountTrait> ChainPool<E, G, C> {
self.sender_pools.insert(*sender, sender_pool);
return;
}
self.tx_count_cache
.push(sender_pool.sender, sender_pool.tx_count);

if sender_pool.is_empty() {
return;
Expand Down

0 comments on commit e5627f0

Please sign in to comment.