diff --git a/tc-subxt/src/db.rs b/tc-subxt/src/db.rs index f0a4dfd6b..d12a46620 100644 --- a/tc-subxt/src/db.rs +++ b/tc-subxt/src/db.rs @@ -61,7 +61,6 @@ impl ITransactionDbOps for TransactionsDB { let write_tx = self.db.begin_write()?; let mut pending_txs = Vec::new(); - // Use a scope to contain the table borrow { let mut table = write_tx.open_table(TX_TABLE)?; let mut delete_keys = Vec::new(); diff --git a/tc-subxt/src/worker.rs b/tc-subxt/src/worker.rs index f3d4e0b5e..e3d38ea9f 100644 --- a/tc-subxt/src/worker.rs +++ b/tc-subxt/src/worker.rs @@ -123,12 +123,10 @@ where let txs_data = db.load_pending_txs(nonce)?; let pending_tx: VecDeque> = txs_data .into_iter() - .filter_map(|tx_data| { - (tx_data.nonce >= nonce).then(|| TxStatus { - data: tx_data, - event_sender: None, - best_block: None, - }) + .map(|tx_data| TxStatus { + data: tx_data, + event_sender: None, + best_block: None, }) .collect(); Ok(Self { @@ -409,7 +407,7 @@ where self.pending_tx = new_pending; }, Some(Err(e)) => { - tracing::error!("Error processing block: {:?}", e); + tracing::error!("Error processing finalized blocks: {:?}", e); tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; continue; }, diff --git a/tc-subxt/tests/mock_client/mod.rs b/tc-subxt/tests/mock_client/mod.rs index 2c8aa4170..1c09bd270 100644 --- a/tc-subxt/tests/mock_client/mod.rs +++ b/tc-subxt/tests/mock_client/mod.rs @@ -2,6 +2,7 @@ use std::collections::{HashMap, VecDeque}; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use std::time::Duration; use anyhow::Result; use futures::stream::{self, BoxStream}; @@ -13,7 +14,6 @@ use tc_subxt::timechain_client::{ use tc_subxt::worker::TxData; use tc_subxt::ExtrinsicParams; use tokio::sync::{broadcast, Mutex}; -use tokio_stream::wrappers::errors::BroadcastStreamRecvError; use tokio_stream::wrappers::BroadcastStream; #[derive(Clone)] @@ -102,6 +102,7 @@ impl MockClient { }; self.push_best_block(&block).await; self.push_finalized_block(&block).await; + tokio::time::sleep(Duration::from_millis(100)).await; } tracing::info!("Done inserting blocks"); } @@ -117,14 +118,14 @@ pub struct MockTransaction { failing_hashes: Arc>>, } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct MockBlock { pub number: u64, pub hash: H256, pub extrinsics: Vec, } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct MockExtrinsic { pub hash: H256, pub is_success: bool, @@ -193,21 +194,8 @@ impl ITimechainClient for MockClient { *counter += 1; } let stream = BroadcastStream::new(rx) - .map(|res| match res { - Ok(block) => Ok(Some(block)), - Err(BroadcastStreamRecvError::Lagged(n)) => { - tracing::warn!("Skipped {} best blocks due to lag", n); - Ok(None) - }, - Err(e) => Err(anyhow::anyhow!(e)), - }) - .filter_map(|res| async { - match res { - Ok(Some(block)) => Some(Ok((block.clone(), block.extrinsics.clone()))), - Ok(None) => None, - Err(e) => Some(Err(e)), - } - }) + .map(|res| res.map_err(|e| anyhow::anyhow!(e))) + .map(|block_result| block_result.map(|block| (block.clone(), block.extrinsics.clone()))) .boxed(); Ok(stream) } diff --git a/tc-subxt/tests/test_cases.rs b/tc-subxt/tests/test_cases.rs index db635f6ff..3ee2c4582 100644 --- a/tc-subxt/tests/test_cases.rs +++ b/tc-subxt/tests/test_cases.rs @@ -73,23 +73,23 @@ async fn test_transaction_mortality_outage_flow() { #[tokio::test] #[ignore] // not working tbf -async fn test_transaction_mortality_outage_flow_50() { - let total_tasks: usize = 50; +async fn test_transaction_mortality_outage_flow_25() { + let total_tasks: usize = 25; let env = new_env().await; let mut receivers = VecDeque::new(); // init 100 transactions for _ in 0..total_tasks { let (tx, rx) = oneshot::channel(); + tokio::time::sleep(Duration::from_millis(100)).await; env.tx_sender.unbounded_send((Tx::Ready { shard_id: 0 }, tx)).unwrap(); receivers.push_back(rx); } + let hashes = wait_for_submission(&env.client, total_tasks).await; assert!(hashes.len() == total_tasks); - env.client.inc_empty_blocks(MORTALITY / 2).await; - tokio::time::sleep(Duration::from_secs(1)).await; - env.client.inc_empty_blocks(MORTALITY / 2).await; - tokio::time::sleep(Duration::from_secs(1)).await; - env.client.inc_empty_blocks(1).await; + tokio::time::sleep(Duration::from_secs(5)).await; + env.client.inc_empty_blocks(MORTALITY + 1).await; + let hashes = wait_for_submission(&env.client, total_tasks + total_tasks).await; assert_eq!(hashes[0], hashes[total_tasks + 1]); env.client.inc_block_with_tx(hashes[0], true).await;