Skip to content

Commit

Permalink
fix(mempool): fix queue rebalance
Browse files Browse the repository at this point in the history
  • Loading branch information
00nktk committed Nov 19, 2024
1 parent 508ba0d commit 274fedf
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 73 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions common/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ mod tx_envelope_ext {
fn gas_price(&self) -> Result<Option<u128>, anyhow::Error>;
fn value(&self) -> Result<alloy_primitives::U256, anyhow::Error>;
fn signature(&self) -> Result<&alloy_primitives::Signature, anyhow::Error>;
fn nonce(&self) -> Result<u64, anyhow::Error>;
}

impl TxEnvelopeExt for TxEnvelope {
Expand Down Expand Up @@ -93,6 +94,16 @@ mod tx_envelope_ext {
_ => bail!("unsupported tx type"),
}
}

fn nonce(&self) -> Result<u64, anyhow::Error> {
match self {
TxEnvelope::Legacy(signed) => Ok(signed.tx().nonce),
TxEnvelope::Eip1559(signed) => Ok(signed.tx().nonce),
TxEnvelope::Eip2930(signed) => Ok(signed.tx().nonce),
TxEnvelope::Eip4844(signed) => Ok(signed.tx().nonce()),
_ => bail!("unsupported tx type"),
}
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions mempool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,6 @@ thiserror.workspace = true
futures-util.workspace = true
rust_decimal = "1.36.0"
lru = "0.12.5"

[dev-dependencies]
tracing-subscriber.workspace = true
111 changes: 96 additions & 15 deletions mempool/src/pools/chain_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,9 @@ impl<E: Execute, G: GasPricesTrait, C: GetTxCountTrait> ChainPool<E, G, C> {
}

async fn add_tx(&mut self, mut tx: TxRecord) -> Result<(), MempoolError> {
tracing::debug!(tx_hash = %tx.tx_hash(), "schedule tx command");
let chain_id = tx.chain_id();
let sender = tx.sender;
tracing::debug!(chain_id, %sender, tx_hash = %tx.tx_hash(), "schedule tx command");

if self
.tx_count_cache
Expand Down Expand Up @@ -339,6 +339,7 @@ impl<E: Execute, G: GasPricesTrait, C: GetTxCountTrait> ChainPool<E, G, C> {
if let SenderPoolState::Processing(nonce) = sender_pool_state {
if nonce == tx.nonce {
tracing::debug!(
%sender,
tx_hash = %tx.tx_hash(),
nonce,
"tx with the same nonce is already processing"
Expand All @@ -349,6 +350,7 @@ impl<E: Execute, G: GasPricesTrait, C: GetTxCountTrait> ChainPool<E, G, C> {

if tx_count > tx.nonce {
tracing::debug!(
%sender,
tx_hash = %tx.tx_hash(),
tx_nonce = tx.nonce,
tx_count,
Expand Down Expand Up @@ -408,7 +410,7 @@ impl<E: Execute, G: GasPricesTrait, C: GetTxCountTrait> ChainPool<E, G, C> {

self.purge_over_capacity_txs();

tracing::debug!(%tx_hash, "adding tx to pool");
tracing::debug!(chain_id, %sender, %tx_hash, "adding tx to pool");
let record = QueueRecord {
sender,
tx_hash,
Expand All @@ -418,7 +420,7 @@ impl<E: Execute, G: GasPricesTrait, C: GetTxCountTrait> ChainPool<E, G, C> {
self.txs.insert(tx_hash, tx);
self.add_record(record).await;
self.queue_new_tx(&sender, !created).await?;
tracing::debug!(%tx_hash, "tx added to pool");
tracing::debug!(chain_id, %sender, %tx_hash, "tx added to pool");
Ok(())
}

Expand Down Expand Up @@ -462,10 +464,11 @@ impl<E: Execute, G: GasPricesTrait, C: GetTxCountTrait> ChainPool<E, G, C> {
return Err(MempoolError::UnknownSender(*sender));
};

sender_pool.log_self("get_for_queueing");
if let Some(tx) = sender_pool.get_for_queueing() {
let gas_price = tx.sorting_gas_price;
tracing::debug!(%sender, tx_count = %sender_pool.tx_count, record = ?tx, "tx queued");
self.tx_price_queue.push(tx, gas_price);
tracing::debug!(%sender, tx_count = %sender_pool.tx_count, "tx queued");
} else if !was_suspended && sender_pool.is_suspended() {
self.heartbeat_queue.insert(
HeartBeatTask {
Expand Down Expand Up @@ -627,7 +630,7 @@ impl<E: Execute, G: GasPricesTrait, C: GetTxCountTrait> ChainPool<E, G, C> {
} else {
// Tx count is updated when state transitions into Proccesing, there is no point in
// updating it again.
self.queue_new_tx(&record.sender, !execution_result.success)
self.queue_new_tx(&record.sender, true) // !execution_result.success)
.await?;
}

Expand Down Expand Up @@ -755,7 +758,7 @@ impl<E: Execute, G: GasPricesTrait, C: GetTxCountTrait> ChainPool<E, G, C> {
let mut sender_pool = SenderPool::new(self.chain_id, *sender);
// it's an empty sender pool, we don't care about queues update
if let Err(err) = sender_pool.update_tx_count(&self.tx_count_api).await {
tracing::error!(?err, "failed to update tx count");
tracing::error!(%sender, ?err, "failed to update tx count");
}
self.sender_pools.insert(*sender, sender_pool);
created = true;
Expand All @@ -775,18 +778,30 @@ where

#[cfg(test)]
mod tests {
use super::*;
use std::io::IsTerminal;
use std::sync::Mutex;
use std::sync::Once;

use alloy_consensus::{SignableTransaction, TxLegacy};
use alloy_network::TxSignerSync;
use alloy_signer_wallet::LocalWallet;
use reth_primitives::{TxKind, U256};
use tokio::sync::mpsc::channel;
use tokio::time::sleep;
use tracing_subscriber::filter::EnvFilter;

use common::solana_sdk::signature::Keypair;
use common::solana_sdk::signature::Signature;
use common::types::TxEnvelopeExt;
use executor::{ExecuteRequest, ExecuteResult};

struct MockExecutor;
use super::*;

#[derive(Default)]
struct MockExecutor {
txs: Arc<Mutex<Vec<ExecuteRequest>>>,
nonces: Arc<DashMap<Address, TxNonce>>,
}

impl Execute for MockExecutor {
async fn handle_transaction(
Expand All @@ -795,6 +810,13 @@ mod tests {
result_sender: Option<oneshot::Sender<ExecuteResult>>,
) -> anyhow::Result<Signature> {
tracing::info!(?tx_request, "mock executor: handling tx");
if let Some(old) = self.nonces.insert(
tx_request.recover_signer().unwrap(),
tx_request.tx().nonce().unwrap() + 1,
) {
assert!(old <= tx_request.tx().nonce().unwrap());
}
self.txs.lock().unwrap().push(tx_request);
if let Some(sender) = result_sender {
let _ = sender.send(ExecuteResult::Success);
}
Expand All @@ -816,32 +838,44 @@ mod tests {
}

#[derive(Clone)]
struct MockGetTxCount;
struct MockGetTxCount(Arc<DashMap<Address, TxNonce>>);

impl GetTxCountTrait for MockGetTxCount {
async fn get_transaction_count(
&self,
_addr: BalanceAddress,
addr: BalanceAddress,
_tag: Option<BlockNumberOrTag>,
) -> Result<u64, NeonApiError> {
Ok(0)
Ok(self
.0
.get(&addr.address.0)
.map(|ref_| *ref_.value())
.unwrap_or(0))
}
}

fn create_chain_pool() -> ChainPool<MockExecutor, MockGasPrices, MockGetTxCount> {
static LOGS: Once = Once::new();
LOGS.call_once(|| {
tracing_subscriber::fmt::fmt()
.with_env_filter(EnvFilter::builder().from_env_lossy())
.with_ansi(std::io::stdout().is_terminal())
.init()
});
let config = Config {
chain_id: 1,
capacity: 10,
capacity: 1000,
capacity_high_watermark: 0.8,
eviction_timeout_sec: 60,
tx_cache_size: 0,
tx_count_cache_size: 0,
};
let executor = MockExecutor::default();
ChainPool::new(
config,
MockGasPrices,
MockGetTxCount,
Arc::new(MockExecutor),
MockGetTxCount(executor.nonces.clone()),
executor.into(),
Arc::new(DashMap::new()),
)
}
Expand All @@ -863,14 +897,29 @@ mod tests {
ExecuteRequest::new(tx.into(), 1)
}

fn create_req_with_addr(wallet: &LocalWallet, nonce: TxNonce) -> ExecuteRequest {
let mut tx = TxLegacy {
nonce,
gas_price: 2,
gas_limit: 2_000_000,
to: TxKind::Create,
value: U256::ZERO,
input: Default::default(),
chain_id: Some(1),
};
let signature = wallet.sign_transaction_sync(&mut tx).unwrap();
let tx = tx.into_signed(signature);
ExecuteRequest::new(tx.into(), 1)
}

fn create_tx_record(
gas_price: Option<GasPrice>,
sorting_gas_price: GasPrice,
nonce: TxNonce,
sender: Address,
) -> TxRecord {
TxRecord {
tx_request: create_exec_req(0),
tx_request: create_exec_req(nonce),
tx_chain_id: Some(1),
sender,
nonce,
Expand Down Expand Up @@ -981,4 +1030,36 @@ mod tests {
let result = chain_pool.add_tx(tx1.clone()).await;
assert!(matches!(result, Err(MempoolError::NonceTooHigh)));
}

#[tokio::test]
async fn test_random_sequence_from_basic() {
let sender = LocalWallet::random();
let mut chain_pool = create_chain_pool();
let chain = [
22, 2, 5, 24, 19, 10, 6, 17, 4, 16, 11, 14, 12, 15, 0, 18, 1, 21, 8, 23, 20, 9, 3, 13,
7,
];
let chain_len = chain.len();
for nonce in chain {
let tx = create_req_with_addr(&sender, nonce);
chain_pool.add_tx(tx.try_into().unwrap()).await.unwrap();
}

let txs = chain_pool.executor.txs.clone();
let (_tx, rx) = channel(1);
tokio::spawn(chain_pool.start(rx));
sleep(Duration::from_millis(
EXEC_INTERVAL_MS * (chain_len as u64 + 5),
))
.await;

let done = txs
.lock()
.unwrap()
.iter()
.map(|tx| tx.tx().nonce().unwrap())
.collect::<Vec<_>>();
let expected = (0..25).collect::<Vec<_>>();
assert_eq!(done, expected);
}
}
Loading

0 comments on commit 274fedf

Please sign in to comment.