Skip to content

Commit

Permalink
some updates
Browse files Browse the repository at this point in the history
  • Loading branch information
haider-rs committed Feb 12, 2025
1 parent 2adddda commit fe88918
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 33 deletions.
1 change: 0 additions & 1 deletion tc-subxt/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ impl ITransactionDbOps for TransactionsDB {
let write_tx = self.db.begin_write()?;
let mut pending_txs = Vec::new();

// Use a scope to contain the table borrow
{
let mut table = write_tx.open_table(TX_TABLE)?;
let mut delete_keys = Vec::new();
Expand Down
12 changes: 5 additions & 7 deletions tc-subxt/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,10 @@ where
let txs_data = db.load_pending_txs(nonce)?;
let pending_tx: VecDeque<TxStatus<C>> = txs_data
.into_iter()
.filter_map(|tx_data| {
(tx_data.nonce >= nonce).then(|| TxStatus {
data: tx_data,
event_sender: None,
best_block: None,
})
.map(|tx_data| TxStatus {
data: tx_data,
event_sender: None,
best_block: None,
})
.collect();
Ok(Self {
Expand Down Expand Up @@ -409,7 +407,7 @@ where
self.pending_tx = new_pending;
},
Some(Err(e)) => {
tracing::error!("Error processing block: {:?}", e);
tracing::error!("Error processing finalized blocks: {:?}", e);
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
continue;
},
Expand Down
24 changes: 6 additions & 18 deletions tc-subxt/tests/mock_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::{HashMap, VecDeque};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;

use anyhow::Result;
use futures::stream::{self, BoxStream};
Expand All @@ -13,7 +14,6 @@ use tc_subxt::timechain_client::{
use tc_subxt::worker::TxData;
use tc_subxt::ExtrinsicParams;
use tokio::sync::{broadcast, Mutex};
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
use tokio_stream::wrappers::BroadcastStream;

#[derive(Clone)]
Expand Down Expand Up @@ -102,6 +102,7 @@ impl MockClient {
};
self.push_best_block(&block).await;
self.push_finalized_block(&block).await;
tokio::time::sleep(Duration::from_millis(100)).await;
}
tracing::info!("Done inserting blocks");
}
Expand All @@ -117,14 +118,14 @@ pub struct MockTransaction {
failing_hashes: Arc<Mutex<Vec<H256>>>,
}

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct MockBlock {
pub number: u64,
pub hash: H256,
pub extrinsics: Vec<MockExtrinsic>,
}

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct MockExtrinsic {
pub hash: H256,
pub is_success: bool,
Expand Down Expand Up @@ -193,21 +194,8 @@ impl ITimechainClient for MockClient {
*counter += 1;
}
let stream = BroadcastStream::new(rx)
.map(|res| match res {
Ok(block) => Ok(Some(block)),
Err(BroadcastStreamRecvError::Lagged(n)) => {
tracing::warn!("Skipped {} best blocks due to lag", n);
Ok(None)
},
Err(e) => Err(anyhow::anyhow!(e)),
})
.filter_map(|res| async {
match res {
Ok(Some(block)) => Some(Ok((block.clone(), block.extrinsics.clone()))),
Ok(None) => None,
Err(e) => Some(Err(e)),
}
})
.map(|res| res.map_err(|e| anyhow::anyhow!(e)))
.map(|block_result| block_result.map(|block| (block.clone(), block.extrinsics.clone())))
.boxed();
Ok(stream)
}
Expand Down
14 changes: 7 additions & 7 deletions tc-subxt/tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,23 +73,23 @@ async fn test_transaction_mortality_outage_flow() {
#[tokio::test]
#[ignore]
// not working tbf
async fn test_transaction_mortality_outage_flow_50() {
let total_tasks: usize = 50;
async fn test_transaction_mortality_outage_flow_25() {
let total_tasks: usize = 25;
let env = new_env().await;
let mut receivers = VecDeque::new();
// init 100 transactions
for _ in 0..total_tasks {
let (tx, rx) = oneshot::channel();
tokio::time::sleep(Duration::from_millis(100)).await;
env.tx_sender.unbounded_send((Tx::Ready { shard_id: 0 }, tx)).unwrap();
receivers.push_back(rx);
}

let hashes = wait_for_submission(&env.client, total_tasks).await;
assert!(hashes.len() == total_tasks);
env.client.inc_empty_blocks(MORTALITY / 2).await;
tokio::time::sleep(Duration::from_secs(1)).await;
env.client.inc_empty_blocks(MORTALITY / 2).await;
tokio::time::sleep(Duration::from_secs(1)).await;
env.client.inc_empty_blocks(1).await;
tokio::time::sleep(Duration::from_secs(5)).await;
env.client.inc_empty_blocks(MORTALITY + 1).await;

let hashes = wait_for_submission(&env.client, total_tasks + total_tasks).await;
assert_eq!(hashes[0], hashes[total_tasks + 1]);
env.client.inc_block_with_tx(hashes[0], true).await;
Expand Down

0 comments on commit fe88918

Please sign in to comment.