diff --git a/cfx_types/src/lib.rs b/cfx_types/src/lib.rs index e13f52ffe9..207fe2fd95 100644 --- a/cfx_types/src/lib.rs +++ b/cfx_types/src/lib.rs @@ -118,6 +118,49 @@ impl AddressWithSpace { pub fn assert_native(&self) { assert_eq!(self.space, Space::Native) } } +#[derive(Default, Clone)] +pub struct SpaceMap { + pub native: T, + pub evm: T, +} + +impl SpaceMap { + #[inline] + pub fn in_space(&self, space: Space) -> &T { + match space { + Space::Native => &self.native, + Space::Ethereum => &self.evm, + } + } + + #[inline] + pub fn in_space_mut(&mut self, space: Space) -> &mut T { + match space { + Space::Native => &mut self.native, + Space::Ethereum => &mut self.evm, + } + } + + pub fn iter(&self) -> impl Iterator { + vec![&self.native, &self.evm].into_iter() + } + + pub fn map_sum usize>(&self, f: F) -> usize { + f(&self.native) + f(&self.evm) + } + + pub const fn size(&self) -> usize { 2 } + + pub fn apply_all U>( + &mut self, mut f: F, + ) -> SpaceMap { + SpaceMap { + native: f(&mut self.native), + evm: f(&mut self.evm), + } + } +} + pub mod space_util { use super::{Address, AddressWithSpace, Space}; diff --git a/core/src/transaction_pool/mod.rs b/core/src/transaction_pool/mod.rs index b0c2443ec3..7198b66c9c 100644 --- a/core/src/transaction_pool/mod.rs +++ b/core/src/transaction_pool/mod.rs @@ -469,7 +469,7 @@ impl TransactionPool { } } - TX_POOL_DEFERRED_GAUGE.update(self.total_deferred()); + TX_POOL_DEFERRED_GAUGE.update(self.total_deferred(None)); TX_POOL_UNPACKED_GAUGE.update(self.total_unpacked()); TX_POOL_READY_GAUGE.update(self.total_ready_accounts()); @@ -580,7 +580,7 @@ impl TransactionPool { //RwLock is dropped here } - TX_POOL_DEFERRED_GAUGE.update(self.total_deferred()); + TX_POOL_DEFERRED_GAUGE.update(self.total_deferred(None)); TX_POOL_UNPACKED_GAUGE.update(self.total_unpacked()); TX_POOL_READY_GAUGE.update(self.total_ready_accounts()); @@ -751,9 +751,9 @@ impl TransactionPool { inner.clear() } - pub fn total_deferred(&self) -> usize { + pub fn total_deferred(&self, space: Option) -> usize { let inner = self.inner.read(); - inner.total_deferred() + inner.total_deferred(space) } pub fn total_ready_accounts(&self) -> usize { @@ -776,7 +776,7 @@ impl TransactionPool { let inner = self.inner.read(); ( inner.total_ready_accounts(), - inner.total_deferred(), + inner.total_deferred(None), inner.total_received(), inner.total_unpacked(), ) diff --git a/core/src/transaction_pool/transaction_pool_inner.rs b/core/src/transaction_pool/transaction_pool_inner.rs index 0a5f709d8e..1f511be1b8 100644 --- a/core/src/transaction_pool/transaction_pool_inner.rs +++ b/core/src/transaction_pool/transaction_pool_inner.rs @@ -11,8 +11,8 @@ use crate::{ use cfx_parameters::staking::DRIPS_PER_STORAGE_COLLATERAL_UNIT; use cfx_statedb::Result as StateDbResult; use cfx_types::{ - address_util::AddressUtil, Address, AddressWithSpace, Space, H256, U128, - U256, U512, + address_util::AddressUtil, Address, AddressWithSpace, Space, SpaceMap, + H256, U128, U256, U512, }; use heap_map::HeapMap; use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf; @@ -650,6 +650,49 @@ pub enum PendingReason { OutdatedStatus, } +#[derive(Default, DeriveMallocSizeOf)] +pub struct TransactionSet { + inner: HashMap>, + count: SpaceMap, +} + +impl TransactionSet { + fn get(&self, tx_hash: &H256) -> Option<&Arc> { + self.inner.get(tx_hash) + } + + fn values( + &self, + ) -> std::collections::hash_map::Values<'_, H256, Arc> + { + self.inner.values() + } + + fn insert( + &mut self, tx_hash: H256, tx: Arc, + ) -> Option> { + *self.count.in_space_mut(tx.space()) += 1; + let res = self.inner.insert(tx_hash, tx); + if let Some(ref tx) = res { + *self.count.in_space_mut(tx.space()) -= 1; + } + res + } + + fn remove(&mut self, tx_hash: &H256) -> Option> { + let res = self.inner.remove(tx_hash); + if let Some(ref tx) = res { + *self.count.in_space_mut(tx.space()) -= 1; + } + res + } + + fn clear(&mut self) { + self.inner.clear(); + self.count.apply_all(|x| *x = 0); + } +} + #[derive(DeriveMallocSizeOf)] pub struct TransactionPoolInner { capacity: usize, @@ -667,10 +710,10 @@ pub struct TransactionPoolInner { /// (set_tx_packed), after epoch execution, or during transaction /// insertion. ready_nonces_and_balances: HashMap, - garbage_collector: GarbageCollector, + garbage_collector: SpaceMap, /// Keeps all transactions in the transaction pool. /// It should contain the same transaction set as `deferred_pool`. - txs: HashMap>, + txs: TransactionSet, tx_sponsored_gas_map: HashMap, } @@ -691,8 +734,8 @@ impl TransactionPoolInner { total_gas_capacity, ), ready_nonces_and_balances: HashMap::new(), - garbage_collector: GarbageCollector::default(), - txs: HashMap::new(), + garbage_collector: SpaceMap::default(), + txs: TransactionSet::default(), tx_sponsored_gas_map: HashMap::new(), } } @@ -701,14 +744,19 @@ impl TransactionPoolInner { self.deferred_pool.clear(); self.ready_account_pool.clear(); self.ready_nonces_and_balances.clear(); - self.garbage_collector.clear(); + self.garbage_collector.apply_all(|x| x.clear()); self.txs.clear(); self.tx_sponsored_gas_map.clear(); self.total_received_count = 0; self.unpacked_transaction_count = 0; } - pub fn total_deferred(&self) -> usize { self.txs.len() } + pub fn total_deferred(&self, space: Option) -> usize { + match space { + Some(space) => *self.txs.count.in_space(space), + None => self.txs.count.map_sum(|x| *x), + } + } pub fn ready_transacton_hashes_in_evm_pool(&self) -> BTreeSet { self.ready_account_pool.get_transaction_hashes_in_evm_pool() @@ -738,8 +786,8 @@ impl TransactionPoolInner { bucket.get_tx_by_nonce(nonce).map(|tx| tx.transaction) } - pub fn is_full(&self) -> bool { - return self.total_deferred() >= self.capacity; + pub fn is_full(&self, space: Space) -> bool { + return self.total_deferred(Some(space)) >= self.capacity; } pub fn get_current_timestamp(&self) -> u64 { @@ -759,16 +807,19 @@ impl TransactionPoolInner { /// garbage collectable. And if there is a tie, the one who has minimum /// timestamp will be picked. pub fn collect_garbage(&mut self, new_tx: &SignedTransaction) { - let count_before_gc = self.total_deferred(); + let space = new_tx.space(); + let count_before_gc = self.total_deferred(Some(space)); let mut skipped_self_node = None; - while self.is_full() && !self.garbage_collector.is_empty() { + while self.is_full(space) + && !self.garbage_collector.in_space(space).is_empty() + { let current_timestamp = self.get_current_timestamp(); let (victim_address, victim) = - self.garbage_collector.top().unwrap(); + self.garbage_collector.in_space(space).top().unwrap(); // Accounts which are not in `deferred_pool` may be inserted // into `garbage_collector`, we can just ignore them. if !self.deferred_pool.contain_address(victim_address) { - self.garbage_collector.pop(); + self.garbage_collector.in_space_mut(space).pop(); continue; } @@ -779,8 +830,11 @@ impl TransactionPoolInner { if *victim_address == new_tx.sender() { // We do not GC a not-executed transaction from the same // sender, so save it and try another account. - let (victim_address, victim) = - self.garbage_collector.pop().unwrap(); + let (victim_address, victim) = self + .garbage_collector + .in_space_mut(space) + .pop() + .unwrap(); skipped_self_node = Some((victim_address, victim)); continue; } else if victim.has_ready_tx @@ -800,7 +854,7 @@ impl TransactionPoolInner { // victim is now chosen to be evicted. let (victim_address, victim) = - self.garbage_collector.pop().unwrap(); + self.garbage_collector.in_space_mut(space).pop().unwrap(); let (ready_nonce, _) = self .get_local_nonce_and_balance(&victim_address) @@ -863,7 +917,7 @@ impl TransactionPoolInner { } else { 0 }; - self.garbage_collector.insert( + self.garbage_collector.in_space_mut(space).insert( &victim_address, count, current_timestamp, @@ -880,7 +934,7 @@ impl TransactionPoolInner { // Insert back skipped nodes to keep `garbage_collector` // unchanged. if let Some((addr, node)) = skipped_self_node { - self.garbage_collector.insert( + self.garbage_collector.in_space_mut(space).insert( &addr, node.count, node.timestamp, @@ -888,14 +942,15 @@ impl TransactionPoolInner { node.first_tx_gas_price, ); } - GC_METER.mark(count_before_gc - self.total_deferred()); + GC_METER.mark(count_before_gc - self.total_deferred(Some(space))); } /// Collect garbage and return the remaining quota of the pool to insert new /// transactions. pub fn remaining_quota(&self) -> usize { - let len = self.total_deferred(); - self.capacity - len + self.garbage_collector.gc_size() + let len = self.total_deferred(None); + self.garbage_collector.size() * self.capacity - len + + self.garbage_collector.map_sum(|x| x.gc_size()) } pub fn capacity(&self) -> usize { self.capacity } @@ -915,7 +970,7 @@ impl TransactionPoolInner { &transaction.nonce(), ) { self.collect_garbage(transaction.as_ref()); - if self.is_full() { + if self.is_full(transaction.space()) { return InsertResult::Failed("Transaction Pool is full".into()); } } @@ -1136,6 +1191,7 @@ impl TransactionPoolInner { fn recalculate_readiness( &mut self, addr: &AddressWithSpace, nonce: U256, balance: U256, ) { + let space = addr.space; let ret = self .deferred_pool .recalculate_readiness_with_local_info(addr, nonce, balance); @@ -1145,9 +1201,10 @@ impl TransactionPoolInner { let count = self.deferred_pool.count_less(addr, &nonce); let timestamp = self .garbage_collector + .in_space(space) .get_timestamp(addr) .unwrap_or(self.get_current_timestamp()); - self.garbage_collector.insert( + self.garbage_collector.in_space_mut(space).insert( addr, count, timestamp, diff --git a/util/malloc_size_of/src/lib.rs b/util/malloc_size_of/src/lib.rs index 1152d9e50c..9221b5d82e 100644 --- a/util/malloc_size_of/src/lib.rs +++ b/util/malloc_size_of/src/lib.rs @@ -21,7 +21,7 @@ static GLOBAL: Jemalloc = Jemalloc; use cfg_if::cfg_if; use cfx_types::{ - AddressWithSpace, AllChainID, Space, H160, H256, H512, U256, U512, + AddressWithSpace, AllChainID, Space, SpaceMap, H160, H256, H512, U256, U512, }; use hashbrown::HashMap as FastHashMap; use parking_lot; @@ -125,6 +125,12 @@ impl MallocSizeOf for String { } } +impl MallocSizeOf for SpaceMap { + fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize { + self.native.size_of(ops) + self.evm.size_of(ops) + } +} + impl MallocShallowSizeOf for Box { fn shallow_size_of(&self, ops: &mut MallocSizeOfOps) -> usize { unsafe { ops.malloc_size_of(&**self) }