Skip to content

Commit

Permalink
Merge pull request #2754 from ChenxingLi/gc_by_space
Browse files Browse the repository at this point in the history
Limit transaction pool size by space
  • Loading branch information
peilun-conflux authored Dec 13, 2023
2 parents 952b1fd + c46e042 commit 622070c
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 30 deletions.
43 changes: 43 additions & 0 deletions cfx_types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,49 @@ impl AddressWithSpace {
pub fn assert_native(&self) { assert_eq!(self.space, Space::Native) }
}

#[derive(Default, Clone)]
pub struct SpaceMap<T> {
pub native: T,
pub evm: T,
}

impl<T> SpaceMap<T> {
#[inline]
pub fn in_space(&self, space: Space) -> &T {
match space {
Space::Native => &self.native,
Space::Ethereum => &self.evm,
}
}

#[inline]
pub fn in_space_mut(&mut self, space: Space) -> &mut T {
match space {
Space::Native => &mut self.native,
Space::Ethereum => &mut self.evm,
}
}

pub fn iter(&self) -> impl Iterator<Item = &T> {
vec![&self.native, &self.evm].into_iter()
}

pub fn map_sum<F: Fn(&T) -> usize>(&self, f: F) -> usize {
f(&self.native) + f(&self.evm)
}

pub const fn size(&self) -> usize { 2 }

pub fn apply_all<U, F: FnMut(&mut T) -> U>(
&mut self, mut f: F,
) -> SpaceMap<U> {
SpaceMap {
native: f(&mut self.native),
evm: f(&mut self.evm),
}
}
}

pub mod space_util {
use super::{Address, AddressWithSpace, Space};

Expand Down
10 changes: 5 additions & 5 deletions core/src/transaction_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ impl TransactionPool {
}
}

TX_POOL_DEFERRED_GAUGE.update(self.total_deferred());
TX_POOL_DEFERRED_GAUGE.update(self.total_deferred(None));
TX_POOL_UNPACKED_GAUGE.update(self.total_unpacked());
TX_POOL_READY_GAUGE.update(self.total_ready_accounts());

Expand Down Expand Up @@ -580,7 +580,7 @@ impl TransactionPool {
//RwLock is dropped here
}

TX_POOL_DEFERRED_GAUGE.update(self.total_deferred());
TX_POOL_DEFERRED_GAUGE.update(self.total_deferred(None));
TX_POOL_UNPACKED_GAUGE.update(self.total_unpacked());
TX_POOL_READY_GAUGE.update(self.total_ready_accounts());

Expand Down Expand Up @@ -751,9 +751,9 @@ impl TransactionPool {
inner.clear()
}

pub fn total_deferred(&self) -> usize {
pub fn total_deferred(&self, space: Option<Space>) -> usize {
let inner = self.inner.read();
inner.total_deferred()
inner.total_deferred(space)
}

pub fn total_ready_accounts(&self) -> usize {
Expand All @@ -776,7 +776,7 @@ impl TransactionPool {
let inner = self.inner.read();
(
inner.total_ready_accounts(),
inner.total_deferred(),
inner.total_deferred(None),
inner.total_received(),
inner.total_unpacked(),
)
Expand Down
105 changes: 81 additions & 24 deletions core/src/transaction_pool/transaction_pool_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use crate::{
use cfx_parameters::staking::DRIPS_PER_STORAGE_COLLATERAL_UNIT;
use cfx_statedb::Result as StateDbResult;
use cfx_types::{
address_util::AddressUtil, Address, AddressWithSpace, Space, H256, U128,
U256, U512,
address_util::AddressUtil, Address, AddressWithSpace, Space, SpaceMap,
H256, U128, U256, U512,
};
use heap_map::HeapMap;
use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf;
Expand Down Expand Up @@ -650,6 +650,49 @@ pub enum PendingReason {
OutdatedStatus,
}

#[derive(Default, DeriveMallocSizeOf)]
pub struct TransactionSet {
inner: HashMap<H256, Arc<SignedTransaction>>,
count: SpaceMap<usize>,
}

impl TransactionSet {
fn get(&self, tx_hash: &H256) -> Option<&Arc<SignedTransaction>> {
self.inner.get(tx_hash)
}

fn values(
&self,
) -> std::collections::hash_map::Values<'_, H256, Arc<SignedTransaction>>
{
self.inner.values()
}

fn insert(
&mut self, tx_hash: H256, tx: Arc<SignedTransaction>,
) -> Option<Arc<SignedTransaction>> {
*self.count.in_space_mut(tx.space()) += 1;
let res = self.inner.insert(tx_hash, tx);
if let Some(ref tx) = res {
*self.count.in_space_mut(tx.space()) -= 1;
}
res
}

fn remove(&mut self, tx_hash: &H256) -> Option<Arc<SignedTransaction>> {
let res = self.inner.remove(tx_hash);
if let Some(ref tx) = res {
*self.count.in_space_mut(tx.space()) -= 1;
}
res
}

fn clear(&mut self) {
self.inner.clear();
self.count.apply_all(|x| *x = 0);
}
}

#[derive(DeriveMallocSizeOf)]
pub struct TransactionPoolInner {
capacity: usize,
Expand All @@ -667,10 +710,10 @@ pub struct TransactionPoolInner {
/// (set_tx_packed), after epoch execution, or during transaction
/// insertion.
ready_nonces_and_balances: HashMap<AddressWithSpace, (U256, U256)>,
garbage_collector: GarbageCollector,
garbage_collector: SpaceMap<GarbageCollector>,
/// Keeps all transactions in the transaction pool.
/// It should contain the same transaction set as `deferred_pool`.
txs: HashMap<H256, Arc<SignedTransaction>>,
txs: TransactionSet,
tx_sponsored_gas_map: HashMap<H256, (U256, u64)>,
}

Expand All @@ -691,8 +734,8 @@ impl TransactionPoolInner {
total_gas_capacity,
),
ready_nonces_and_balances: HashMap::new(),
garbage_collector: GarbageCollector::default(),
txs: HashMap::new(),
garbage_collector: SpaceMap::default(),
txs: TransactionSet::default(),
tx_sponsored_gas_map: HashMap::new(),
}
}
Expand All @@ -701,14 +744,19 @@ impl TransactionPoolInner {
self.deferred_pool.clear();
self.ready_account_pool.clear();
self.ready_nonces_and_balances.clear();
self.garbage_collector.clear();
self.garbage_collector.apply_all(|x| x.clear());
self.txs.clear();
self.tx_sponsored_gas_map.clear();
self.total_received_count = 0;
self.unpacked_transaction_count = 0;
}

pub fn total_deferred(&self) -> usize { self.txs.len() }
pub fn total_deferred(&self, space: Option<Space>) -> usize {
match space {
Some(space) => *self.txs.count.in_space(space),
None => self.txs.count.map_sum(|x| *x),
}
}

pub fn ready_transacton_hashes_in_evm_pool(&self) -> BTreeSet<H256> {
self.ready_account_pool.get_transaction_hashes_in_evm_pool()
Expand Down Expand Up @@ -738,8 +786,8 @@ impl TransactionPoolInner {
bucket.get_tx_by_nonce(nonce).map(|tx| tx.transaction)
}

pub fn is_full(&self) -> bool {
return self.total_deferred() >= self.capacity;
pub fn is_full(&self, space: Space) -> bool {
return self.total_deferred(Some(space)) >= self.capacity;
}

pub fn get_current_timestamp(&self) -> u64 {
Expand All @@ -759,16 +807,19 @@ impl TransactionPoolInner {
/// garbage collectable. And if there is a tie, the one who has minimum
/// timestamp will be picked.
pub fn collect_garbage(&mut self, new_tx: &SignedTransaction) {
let count_before_gc = self.total_deferred();
let space = new_tx.space();
let count_before_gc = self.total_deferred(Some(space));
let mut skipped_self_node = None;
while self.is_full() && !self.garbage_collector.is_empty() {
while self.is_full(space)
&& !self.garbage_collector.in_space(space).is_empty()
{
let current_timestamp = self.get_current_timestamp();
let (victim_address, victim) =
self.garbage_collector.top().unwrap();
self.garbage_collector.in_space(space).top().unwrap();
// Accounts which are not in `deferred_pool` may be inserted
// into `garbage_collector`, we can just ignore them.
if !self.deferred_pool.contain_address(victim_address) {
self.garbage_collector.pop();
self.garbage_collector.in_space_mut(space).pop();
continue;
}

Expand All @@ -779,8 +830,11 @@ impl TransactionPoolInner {
if *victim_address == new_tx.sender() {
// We do not GC a not-executed transaction from the same
// sender, so save it and try another account.
let (victim_address, victim) =
self.garbage_collector.pop().unwrap();
let (victim_address, victim) = self
.garbage_collector
.in_space_mut(space)
.pop()
.unwrap();
skipped_self_node = Some((victim_address, victim));
continue;
} else if victim.has_ready_tx
Expand All @@ -800,7 +854,7 @@ impl TransactionPoolInner {

// victim is now chosen to be evicted.
let (victim_address, victim) =
self.garbage_collector.pop().unwrap();
self.garbage_collector.in_space_mut(space).pop().unwrap();

let (ready_nonce, _) = self
.get_local_nonce_and_balance(&victim_address)
Expand Down Expand Up @@ -863,7 +917,7 @@ impl TransactionPoolInner {
} else {
0
};
self.garbage_collector.insert(
self.garbage_collector.in_space_mut(space).insert(
&victim_address,
count,
current_timestamp,
Expand All @@ -880,22 +934,23 @@ impl TransactionPoolInner {
// Insert back skipped nodes to keep `garbage_collector`
// unchanged.
if let Some((addr, node)) = skipped_self_node {
self.garbage_collector.insert(
self.garbage_collector.in_space_mut(space).insert(
&addr,
node.count,
node.timestamp,
node.has_ready_tx,
node.first_tx_gas_price,
);
}
GC_METER.mark(count_before_gc - self.total_deferred());
GC_METER.mark(count_before_gc - self.total_deferred(Some(space)));
}

/// Collect garbage and return the remaining quota of the pool to insert new
/// transactions.
pub fn remaining_quota(&self) -> usize {
let len = self.total_deferred();
self.capacity - len + self.garbage_collector.gc_size()
let len = self.total_deferred(None);
self.garbage_collector.size() * self.capacity - len
+ self.garbage_collector.map_sum(|x| x.gc_size())
}

pub fn capacity(&self) -> usize { self.capacity }
Expand All @@ -915,7 +970,7 @@ impl TransactionPoolInner {
&transaction.nonce(),
) {
self.collect_garbage(transaction.as_ref());
if self.is_full() {
if self.is_full(transaction.space()) {
return InsertResult::Failed("Transaction Pool is full".into());
}
}
Expand Down Expand Up @@ -1136,6 +1191,7 @@ impl TransactionPoolInner {
fn recalculate_readiness(
&mut self, addr: &AddressWithSpace, nonce: U256, balance: U256,
) {
let space = addr.space;
let ret = self
.deferred_pool
.recalculate_readiness_with_local_info(addr, nonce, balance);
Expand All @@ -1145,9 +1201,10 @@ impl TransactionPoolInner {
let count = self.deferred_pool.count_less(addr, &nonce);
let timestamp = self
.garbage_collector
.in_space(space)
.get_timestamp(addr)
.unwrap_or(self.get_current_timestamp());
self.garbage_collector.insert(
self.garbage_collector.in_space_mut(space).insert(
addr,
count,
timestamp,
Expand Down
8 changes: 7 additions & 1 deletion util/malloc_size_of/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ static GLOBAL: Jemalloc = Jemalloc;

use cfg_if::cfg_if;
use cfx_types::{
AddressWithSpace, AllChainID, Space, H160, H256, H512, U256, U512,
AddressWithSpace, AllChainID, Space, SpaceMap, H160, H256, H512, U256, U512,
};
use hashbrown::HashMap as FastHashMap;
use parking_lot;
Expand Down Expand Up @@ -125,6 +125,12 @@ impl MallocSizeOf for String {
}
}

impl<T: MallocSizeOf> MallocSizeOf for SpaceMap<T> {
fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
self.native.size_of(ops) + self.evm.size_of(ops)
}
}

impl<T: ?Sized> MallocShallowSizeOf for Box<T> {
fn shallow_size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
unsafe { ops.malloc_size_of(&**self) }
Expand Down

0 comments on commit 622070c

Please sign in to comment.