diff --git a/.gitignore b/.gitignore index a125bc365..badefcbf3 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,6 @@ runtime/target/srtool # secrets .env gcp-key.json + +#cached db +tc-subxt/cached_tx.redb diff --git a/Cargo.lock b/Cargo.lock index 175f09101..40a2976a8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -20038,9 +20038,11 @@ dependencies = [ "async-stream", "async-trait", "clap", + "env_logger 0.11.6", "futures", "hex", "parity-scale-codec", + "redb", "scale-decode 0.13.1", "scale-info", "serde", @@ -20049,6 +20051,7 @@ dependencies = [ "tc-subxt-metadata", "time-primitives", "tokio", + "tokio-stream", "tracing", ] diff --git a/chronicle/src/main.rs b/chronicle/src/main.rs index f1dcd694c..a7af175bb 100644 --- a/chronicle/src/main.rs +++ b/chronicle/src/main.rs @@ -45,6 +45,9 @@ pub struct ChronicleArgs { /// Gmp backend to use. #[clap(long, default_value = "evm")] pub backend: Backend, + /// Chronicle db path. + #[clap(long, default_value = "cached_tx.redb")] + pub tx_db: String, /// Cctp Sender. #[clap(long)] pub cctp_sender: Option, @@ -130,7 +133,8 @@ async fn main() -> Result<()> { } } - let subxt = SubxtClient::with_key(&args.timechain_url, &timechain_mnemonic).await?; + let subxt = + SubxtClient::with_key(&args.timechain_url, &timechain_mnemonic, &args.tx_db).await?; let config = args.config(network_key, target_mnemonic)?; diff --git a/primitives/src/shard.rs b/primitives/src/shard.rs index 8bef59707..654a6a300 100644 --- a/primitives/src/shard.rs +++ b/primitives/src/shard.rs @@ -23,6 +23,7 @@ pub type TssHash = [u8; 32]; pub type PeerId = [u8; 32]; pub type ShardId = u64; pub type ProofOfKnowledge = [u8; 65]; + #[derive(Encode, Decode, TypeInfo, PartialEq, Eq, Clone, Debug)] pub struct Commitment(pub BoundedVec>); diff --git a/tc-cli/src/lib.rs b/tc-cli/src/lib.rs index cf13a8986..5cb08c23a 100644 --- a/tc-cli/src/lib.rs +++ b/tc-cli/src/lib.rs @@ -59,9 +59,10 @@ impl Tc { tracing::info!("waiting for chain to start: {err:?}"); sleep_or_abort(Duration::from_secs(10)).await?; } - let runtime = SubxtClient::with_key(&timechain_url, &env.timechain_mnemonic) - .await - .context("failed to connect to timechain")?; + let runtime = + SubxtClient::with_key(&timechain_url, &env.timechain_mnemonic, "cached_tx.redb") + .await + .context("failed to connect to timechain")?; Ok::<_, anyhow::Error>(runtime) }); let mut connectors = HashMap::new(); diff --git a/tc-subxt/Cargo.toml b/tc-subxt/Cargo.toml index 131a76f5b..18ad97b43 100644 --- a/tc-subxt/Cargo.toml +++ b/tc-subxt/Cargo.toml @@ -15,8 +15,9 @@ async-trait.workspace = true clap.workspace = true futures.workspace = true hex.workspace = true +redb = "2.1.2" serde = { workspace = true, features = ["derive"] } -tokio.workspace = true +tokio = { workspace = true, features = ["full"] } tracing.workspace = true scale-codec.workspace = true @@ -29,6 +30,11 @@ subxt-signer.workspace = true time-primitives = { workspace = true, default-features = true } tc-subxt-metadata = { path = "metadata" } +[dev-dependencies] +env_logger.workspace = true +tokio-stream = { version = "0.1", features = ["sync"]} + + [features] testnet = [ "time-primitives/testnet", diff --git a/tc-subxt/src/api/members.rs b/tc-subxt/src/api/members.rs index 0e4a8895c..c5d5deaf7 100644 --- a/tc-subxt/src/api/members.rs +++ b/tc-subxt/src/api/members.rs @@ -84,7 +84,7 @@ impl SubxtClient { tx, ))?; let tx = rx.await?; - self.wait_for_success(tx).await?; + self.is_success(&tx).await?; Ok(()) } @@ -92,7 +92,7 @@ impl SubxtClient { let (tx, rx) = oneshot::channel(); self.tx.unbounded_send((Tx::UnregisterMember { member }, tx))?; let tx = rx.await?; - self.wait_for_success(tx).await?; + self.is_success(&tx).await?; Ok(()) } @@ -100,7 +100,7 @@ impl SubxtClient { let (tx, rx) = oneshot::channel(); self.tx.unbounded_send((Tx::Heartbeat, tx))?; let tx = rx.await?; - self.wait_for_success(tx).await?; + self.is_success(&tx).await?; Ok(()) } } diff --git a/tc-subxt/src/api/networks.rs b/tc-subxt/src/api/networks.rs index 443de081a..d953416e8 100644 --- a/tc-subxt/src/api/networks.rs +++ b/tc-subxt/src/api/networks.rs @@ -9,7 +9,7 @@ impl SubxtClient { let (tx, rx) = oneshot::channel(); self.tx.unbounded_send((Tx::RegisterNetwork { network }, tx))?; let tx = rx.await?; - self.wait_for_success(tx).await?; + self.is_success(&tx).await?; Ok(()) } @@ -21,7 +21,7 @@ impl SubxtClient { let (tx, rx) = oneshot::channel(); self.tx.unbounded_send((Tx::SetNetworkConfig { network, config }, tx))?; let tx = rx.await?; - self.wait_for_success(tx).await?; + self.is_success(&tx).await?; Ok(()) } diff --git a/tc-subxt/src/api/shards.rs b/tc-subxt/src/api/shards.rs index dad965b07..b846dedd7 100644 --- a/tc-subxt/src/api/shards.rs +++ b/tc-subxt/src/api/shards.rs @@ -84,7 +84,7 @@ impl SubxtClient { tx, ))?; let tx = rx.await?; - self.wait_for_success(tx).await?; + self.is_success(&tx).await?; Ok(()) } @@ -92,7 +92,7 @@ impl SubxtClient { let (tx, rx) = oneshot::channel(); self.tx.unbounded_send((Tx::Ready { shard_id }, tx))?; let tx = rx.await?; - self.wait_for_success(tx).await?; + self.is_success(&tx).await?; Ok(()) } @@ -100,7 +100,7 @@ impl SubxtClient { let (tx, rx) = oneshot::channel(); self.tx.unbounded_send((Tx::ForceShardOffline { shard_id }, tx))?; let tx = rx.await?; - self.wait_for_success(tx).await?; + self.is_success(&tx).await?; Ok(()) } } diff --git a/tc-subxt/src/api/tasks.rs b/tc-subxt/src/api/tasks.rs index 474f6fdd8..8bface8d3 100644 --- a/tc-subxt/src/api/tasks.rs +++ b/tc-subxt/src/api/tasks.rs @@ -60,7 +60,7 @@ impl SubxtClient { let (tx, rx) = oneshot::channel(); self.tx.unbounded_send((Tx::SubmitTaskResult { task_id, result }, tx))?; let tx = rx.await?; - self.wait_for_success(tx).await?; + self.is_success(&tx).await?; Ok(()) } @@ -68,7 +68,7 @@ impl SubxtClient { let (tx, rx) = oneshot::channel(); self.tx.unbounded_send((Tx::SubmitGmpEvents { network, gmp_events }, tx))?; let tx = rx.await?; - self.wait_for_success(tx).await?; + self.is_success(&tx).await?; Ok(()) } @@ -76,7 +76,7 @@ impl SubxtClient { let (tx, rx) = oneshot::channel(); self.tx.unbounded_send((Tx::RemoveTask { task_id }, tx))?; let tx = rx.await?; - self.wait_for_success(tx).await?; + self.is_success(&tx).await?; Ok(()) } diff --git a/tc-subxt/src/db.rs b/tc-subxt/src/db.rs new file mode 100644 index 000000000..d12a46620 --- /dev/null +++ b/tc-subxt/src/db.rs @@ -0,0 +1,94 @@ +use anyhow::Result; +use redb::{Database, ReadableTable, TableDefinition}; +use scale_codec::{Decode, Encode}; +use std::collections::VecDeque; +use subxt::utils::H256; + +use crate::{timechain_client::ITransactionDbOps, worker::TxData}; + +const TX_TABLE: TableDefinition<[u8; 64], &[u8]> = TableDefinition::new("pending_txs"); + +pub struct TransactionsDB { + db: Database, + public_key: [u8; 32], +} + +impl TransactionsDB { + pub fn new(path: &str, public_key: [u8; 32]) -> Result { + let db = Database::create(path)?; + + let write_tx = db.begin_write()?; + { + write_tx.open_table(TX_TABLE)?; + } + write_tx.commit()?; + Ok(Self { db, public_key }) + } +} + +impl ITransactionDbOps for TransactionsDB { + fn store_tx(&self, tx_data: &TxData) -> Result<()> { + let mut composite_key = [0u8; 64]; + composite_key[..32].copy_from_slice(&self.public_key); + composite_key[32..].copy_from_slice(tx_data.hash.as_bytes()); + + let tx_value = tx_data.encode(); + + let write_tx = self.db.begin_write()?; + { + let mut table = write_tx.open_table(TX_TABLE)?; + table.insert(&composite_key, &*tx_value)?; + } + write_tx.commit()?; + Ok(()) + } + + fn remove_tx(&self, hash: H256) -> Result<()> { + let mut composite_key = [0u8; 64]; + composite_key[..32].copy_from_slice(&self.public_key); + composite_key[32..].copy_from_slice(hash.as_bytes()); + + let write_tx = self.db.begin_write()?; + { + let mut table = write_tx.open_table(TX_TABLE)?; + table.remove(&composite_key)?; + } + write_tx.commit()?; + Ok(()) + } + + fn load_pending_txs(&self, nonce: u64) -> Result> { + let write_tx = self.db.begin_write()?; + let mut pending_txs = Vec::new(); + + { + let mut table = write_tx.open_table(TX_TABLE)?; + let mut delete_keys = Vec::new(); + + let mut lower_bound = [0u8; 64]; + lower_bound[..32].copy_from_slice(&self.public_key); + let mut upper_bound = [0xffu8; 64]; + upper_bound[..32].copy_from_slice(&self.public_key); + + for entry in table.range(lower_bound..=upper_bound)? { + let (key, value) = entry?; + let tx_data = TxData::decode(&mut value.value())?; + if tx_data.nonce < nonce { + let key_bytes: [u8; 64] = key.value(); + delete_keys.push(key_bytes); + } else { + pending_txs.push(tx_data); + } + } + + for key_bytes in delete_keys { + table.remove(&key_bytes)?; + } + } + + write_tx.commit()?; + + pending_txs.sort_by_key(|tx| tx.nonce); + Ok(pending_txs.into()) + } +} diff --git a/tc-subxt/src/lib.rs b/tc-subxt/src/lib.rs index 9e6dbb260..7e45285a3 100644 --- a/tc-subxt/src/lib.rs +++ b/tc-subxt/src/lib.rs @@ -1,6 +1,7 @@ #![allow(clippy::missing_transmute_annotations)] use crate::worker::{SubxtWorker, Tx}; use anyhow::{Context, Result}; +use db::TransactionsDB; use futures::channel::{mpsc, oneshot}; use futures::stream::BoxStream; use std::future::Future; @@ -11,12 +12,15 @@ use subxt::backend::rpc::RpcClient; use subxt::config::DefaultExtrinsicParams; use subxt::PolkadotConfig; use subxt_signer::SecretUri; +use timechain_client::{IExtrinsic, TimechainExtrinsic, TimechainOnlineClient}; use time_primitives::{AccountId, BlockHash, BlockNumber, PublicKey, H256}; mod api; -mod metadata; -mod worker; +pub mod db; +pub mod metadata; +pub mod timechain_client; +pub mod worker; use metadata::technical_committee::events as CommitteeEvent; @@ -29,24 +33,26 @@ pub type ExtrinsicDetails = subxt::blocks::ExtrinsicDetails; pub type ExtrinsicParams = as subxt::config::ExtrinsicParams>::Params; -pub type TxInBlock = subxt::tx::TxInBlock; -pub type TxProgress = subxt::tx::TxProgress; -#[derive(Clone)] pub struct SubxtClient { client: OnlineClient, - tx: mpsc::UnboundedSender<(Tx, oneshot::Sender)>, + tx: mpsc::UnboundedSender<(Tx, oneshot::Sender)>, public_key: PublicKey, account_id: AccountId, } impl SubxtClient { - pub async fn new(url: &str, keypair: Keypair) -> Result { + pub async fn new(url: &str, keypair: Keypair, tx_db: &str) -> Result { let rpc = Self::get_client(url).await?; let client = OnlineClient::from_rpc_client(rpc.clone()) .await .map_err(|_| anyhow::anyhow!("Failed to create a new client"))?; - let worker = SubxtWorker::new(rpc, client.clone(), keypair).await?; + let account_id: subxt::utils::AccountId32 = keypair.public_key().into(); + let legacy_rpc = LegacyRpcMethods::new(rpc.clone()); + let nonce = legacy_rpc.system_account_next_index(&account_id).await?; + let timechain_client = TimechainOnlineClient::new(client.clone(), keypair.clone()); + let db = TransactionsDB::new(tx_db, keypair.public_key().0)?; + let worker = SubxtWorker::new(nonce, timechain_client, db, keypair).await?; let public_key = worker.public_key(); let account_id = worker.account_id(); tracing::info!("account id {}", account_id); @@ -59,11 +65,11 @@ impl SubxtClient { }) } - pub async fn with_key(url: &str, mnemonic: &str) -> Result { + pub async fn with_key(url: &str, mnemonic: &str, tx_db: &str) -> Result { let secret = SecretUri::from_str(mnemonic.trim()).context("failed to parse substrate keyfile")?; let keypair = Keypair::from_uri(&secret).context("substrate keyfile contains uri")?; - Self::new(url, keypair).await + Self::new(url, keypair, tx_db).await } pub async fn get_client(url: &str) -> Result { @@ -104,7 +110,7 @@ impl SubxtClient { let (tx, rx) = oneshot::channel(); self.tx.unbounded_send((Tx::SetCode { code }, tx))?; let tx = rx.await?; - self.wait_for_success(tx).await?; + self.is_success(&tx).await?; Ok(()) } @@ -112,7 +118,7 @@ impl SubxtClient { let (tx, rx) = oneshot::channel(); self.tx.unbounded_send((Tx::Transfer { account, balance }, tx))?; let tx = rx.await?; - self.wait_for_success(tx).await?; + self.is_success(&tx).await?; Ok(()) } @@ -123,43 +129,9 @@ impl SubxtClient { Ok(if let Some(info) = result { info.data.free } else { 0 }) } - pub async fn wait_for_success(&self, extrinsic: ExtrinsicDetails) -> Result { - type SpRuntimeDispatchError = metadata::runtime_types::sp_runtime::DispatchError; - let events = extrinsic.events().await?; - - for ev in events.iter() { - let ev = ev?; - - if ev.pallet_name() == "System" && ev.variant_name() == "ExtrinsicFailed" { - let dispatch_error = subxt::error::DispatchError::decode_from( - ev.field_bytes(), - self.client.metadata(), - )?; - return Err(dispatch_error.into()); - } - - if let Some(event) = ev.as_event::()? { - if let Err(err) = event.result { - let SpRuntimeDispatchError::Module(error) = err else { - anyhow::bail!("Tx failed with error: {:?}", err); - }; - - let metadata = self.client.metadata(); - let error_pallet = metadata - .pallet_by_index(error.index) - .ok_or_else(|| anyhow::anyhow!("Pallet not found: {:?}", error.index))?; - - let Some(error_metadata) = error_pallet.error_variant_by_index(error.error[0]) - else { - anyhow::bail!("Tx failed with error: {:?}", error); - }; - - anyhow::bail!("Tx failed with error: {:?}", error_metadata.name); - } - } - } - - Ok(events) + pub async fn is_success(&self, extrinsic: &E) -> Result<()> { + extrinsic.is_success().await?; + Ok(()) } } diff --git a/tc-subxt/src/timechain_client.rs b/tc-subxt/src/timechain_client.rs new file mode 100644 index 000000000..8acc5807a --- /dev/null +++ b/tc-subxt/src/timechain_client.rs @@ -0,0 +1,250 @@ +use std::collections::VecDeque; + +use anyhow::Result; +use futures::{stream::BoxStream, StreamExt, TryStreamExt}; +use subxt::utils::H256; +use subxt::{client::Update, tx::Payload}; +pub use subxt_signer::sr25519::Keypair; + +use crate::worker::TxData; +use crate::{metadata, CommitteeEvent, ExtrinsicParams, OnlineClient, SubmittableExtrinsic}; + +#[derive(Clone, Copy, Debug, Default)] +pub struct BlockId { + pub number: u64, + pub hash: H256, +} + +#[async_trait::async_trait] +pub trait ITimechainClient { + type Submitter: ITransactionSubmitter + Send + Sync; + type Block: IBlock + Send + Sync; + type Update: Send + Sync; + async fn get_latest_block(&self) -> Result; + fn sign_payload(&self, call: &Call, params: ExtrinsicParams) -> Vec + where + Call: Payload + Send + Sync; + fn submittable_transaction(&self, tx: Vec) -> Self::Submitter; + async fn finalized_block_stream( + &self, + ) -> Result::Extrinsic>)>>>; + async fn best_block_stream( + &self, + ) -> Result::Extrinsic>)>>>; + async fn runtime_updates(&self) -> Result>>; + fn apply_update(&self, update: Self::Update) -> Result<()>; +} + +#[async_trait::async_trait] +pub trait ITransactionSubmitter: Send + Sync { + fn hash(&self) -> H256; + async fn submit(&self) -> Result; +} + +#[async_trait::async_trait] +pub trait IBlock: Send + Sync { + type Extrinsic: IExtrinsic + Send + Sync; + async fn extrinsics(&self) -> Result>; + fn number(&self) -> u64; + fn hash(&self) -> H256; +} + +#[async_trait::async_trait] +pub trait IExtrinsic: Send + Sync { + type Events: Send + Sync; + async fn events(&self) -> Result; + fn hash(&self) -> H256; + async fn is_success(&self) -> Result<()>; +} + +pub trait ITransactionDbOps: Send + Sync { + fn store_tx(&self, tx_data: &TxData) -> Result<()>; + fn remove_tx(&self, hash: H256) -> Result<()>; + fn load_pending_txs(&self, nonce: u64) -> Result>; +} + +#[derive(Clone)] +pub struct TimechainOnlineClient { + client: OnlineClient, + keypair: Keypair, +} + +impl TimechainOnlineClient { + pub fn new(client: OnlineClient, keypair: Keypair) -> Self { + Self { client, keypair } + } +} +pub struct SignedTransaction { + tx: SubmittableExtrinsic, +} + +pub struct TimechainBlock { + pub block: crate::Block, +} + +pub struct TimechainExtrinsic { + pub extrinsic: crate::ExtrinsicDetails, +} + +pub struct TimechainEvents { + pub events: crate::ExtrinsicEvents, +} + +#[async_trait::async_trait] +impl ITimechainClient for TimechainOnlineClient { + type Submitter = SignedTransaction; + type Block = TimechainBlock; + type Update = Update; + + async fn get_latest_block(&self) -> Result { + let block = self.client.blocks().at_latest().await?; + Ok(BlockId { + number: block.number().into(), + hash: block.hash(), + }) + } + + fn sign_payload(&self, call: &Call, params: ExtrinsicParams) -> Vec + where + Call: Payload + Send + Sync, + { + self.client + .tx() + .create_signed_offline(call, &self.keypair, params) + .expect("Metadata is invalid") + .into_encoded() + } + + fn submittable_transaction(&self, tx: Vec) -> Self::Submitter { + let tx = SubmittableExtrinsic::from_bytes(self.client.clone(), tx); + SignedTransaction { tx } + } + + async fn finalized_block_stream( + &self, + ) -> Result::Extrinsic>)>>> + { + let finalized_stream = self.client.blocks().subscribe_finalized().await?; + let stream_with_txs = finalized_stream.map(|res| res.map_err(anyhow::Error::new)).and_then( + |block| async move { + let block = TimechainBlock { block }; + let extrinsics = IBlock::extrinsics(&block).await?; + Ok((block, extrinsics)) + }, + ); + Ok(stream_with_txs.boxed()) + } + async fn best_block_stream( + &self, + ) -> Result::Extrinsic>)>>> + { + let best_stream = self.client.blocks().subscribe_best().await?; + let stream_with_txs = + best_stream + .map(|res| res.map_err(anyhow::Error::new)) + .and_then(|block| async move { + let block = TimechainBlock { block }; + let extrinsics = IBlock::extrinsics(&block).await?; + Ok((block, extrinsics)) + }); + Ok(stream_with_txs.boxed()) + } + async fn runtime_updates(&self) -> Result>> { + let updater = self.client.updater(); + let stream = updater.runtime_updates().await?; + let stream = futures::stream::try_unfold(stream, |mut stream| async move { + match stream.next().await { + Some(Ok(update)) => Ok(Some((update, stream))), + Some(Err(e)) => Err(e.into()), + None => Ok(None), + } + }); + + Ok(stream.boxed()) + } + + fn apply_update(&self, update: Self::Update) -> Result<()> { + let updater = self.client.updater(); + let version = update.runtime_version().spec_version; + if let Err(e) = updater.apply_update(update) { + tracing::error!("Update to version {} failed: {:?}", version, e); + } else { + tracing::info!("Updating to version {}", version); + }; + Ok(()) + } +} + +#[async_trait::async_trait] +impl ITransactionSubmitter for SignedTransaction { + fn hash(&self) -> H256 { + self.tx.hash() + } + async fn submit(&self) -> Result { + self.tx.submit().await.map_err(|e| anyhow::anyhow!(e)) + } +} + +#[async_trait::async_trait] +impl IBlock for TimechainBlock { + type Extrinsic = TimechainExtrinsic; + async fn extrinsics(&self) -> Result> { + let extrinsics = self.block.extrinsics().await?; + Ok(extrinsics.iter().map(|extrinsic| TimechainExtrinsic { extrinsic }).collect()) + } + fn number(&self) -> u64 { + self.block.number().into() + } + fn hash(&self) -> H256 { + self.block.hash() + } +} + +#[async_trait::async_trait] +impl IExtrinsic for TimechainExtrinsic { + type Events = TimechainEvents; + async fn events(&self) -> Result { + Ok(TimechainEvents { + events: self.extrinsic.events().await?, + }) + } + fn hash(&self) -> H256 { + self.extrinsic.hash() + } + async fn is_success(&self) -> Result<()> { + type SpRuntimeDispatchError = metadata::runtime_types::sp_runtime::DispatchError; + let events = self.extrinsic.events().await?; + for ev in events.iter() { + let ev = ev?; + + if ev.pallet_name() == "System" && ev.variant_name() == "ExtrinsicFailed" { + let event_metadata = ev.event_metadata(); + anyhow::bail!( + "{:?} extrinsic failed with code: {:?}, pallet idx: {}, variant idx: {}", + self.hash(), + ev.field_bytes(), + event_metadata.pallet.index(), + event_metadata.variant.index, + ) + } + + if let Some(event) = ev.as_event::()? { + if let Err(err) = event.result { + let SpRuntimeDispatchError::Module(error) = err else { + anyhow::bail!("Tx failed with error: {:?}", err); + }; + let event_metadata = ev.event_metadata(); + + let Some(error_metadata) = + event_metadata.pallet.error_variant_by_index(error.error[0]) + else { + anyhow::bail!("Tx failed with error: {:?}", error); + }; + + anyhow::bail!("Tx failed with error: {:?}", error_metadata.name); + } + } + } + Ok(()) + } +} diff --git a/tc-subxt/src/worker.rs b/tc-subxt/src/worker.rs index 2ebbc6316..8d5868ee5 100644 --- a/tc-subxt/src/worker.rs +++ b/tc-subxt/src/worker.rs @@ -1,17 +1,17 @@ use crate::metadata::{self, runtime_types, RuntimeCall}; -use crate::{ - ExtrinsicDetails, ExtrinsicParams, LegacyRpcMethods, OnlineClient, SubmittableExtrinsic, +use crate::timechain_client::{ + BlockId, IBlock, IExtrinsic, ITimechainClient, ITransactionDbOps, ITransactionSubmitter, }; -use anyhow::{Context, Result}; +use crate::ExtrinsicParams; +use anyhow::Result; use futures::channel::{mpsc, oneshot}; -use futures::stream::FuturesUnordered; -use futures::{Future, FutureExt, StreamExt, TryStreamExt}; +use futures::future::BoxFuture; +use futures::stream::{BoxStream, Fuse, FuturesUnordered}; +use futures::{Future, FutureExt, StreamExt}; +use scale_codec::{Decode, Encode}; use std::collections::{HashSet, VecDeque}; use std::pin::Pin; -use std::time::Duration; -use subxt::backend::rpc::RpcClient; use subxt::config::DefaultExtrinsicParamsBuilder; -use subxt::tx::Payload as TxPayload; use subxt::utils::H256; use subxt_signer::sr25519::Keypair; use time_primitives::{ @@ -19,11 +19,11 @@ use time_primitives::{ PeerId, ProofOfKnowledge, PublicKey, ShardId, TaskId, TaskResult, }; -const MORTALITY: u8 = 32; -type TransactionFuture = Pin> + Send>>; +pub const MORTALITY: u8 = 32; +type TransactionFuture = Pin> + Send>>; type TransactionsUnordered = FuturesUnordered; -#[derive(Clone)] +#[derive(Clone, Encode, Decode)] pub enum Tx { // system SetCode { @@ -60,6 +60,7 @@ pub enum Tx { Commitment { shard_id: ShardId, commitment: Commitment, + // #[serde(with = "serde_proof_of_knowledge")] proof_of_knowledge: ProofOfKnowledge, }, Ready { @@ -79,52 +80,63 @@ pub enum Tx { }, } +#[derive(Clone, Encode, Decode)] pub struct TxData { - transaction: Tx, - era: u64, - hash: H256, - nonce: u64, + pub hash: H256, + pub era: u64, + pub nonce: u64, + pub transaction: Tx, } -pub struct TxStatus { +pub struct TxStatus { data: TxData, - event_sender: oneshot::Sender, + event_sender: Option::Extrinsic>>, best_block: Option, } -pub struct BlockDetail { - number: u64, - hash: H256, -} - -pub struct SubxtWorker { - client: OnlineClient, +pub struct SubxtWorker +where + C: ITimechainClient + Send + Sync + Clone + 'static, + D: ITransactionDbOps + Send + Sync + 'static, +{ + client: C, keypair: Keypair, nonce: u64, - latest_block: BlockDetail, - pending_tx: VecDeque, + latest_block: BlockId, + pending_tx: VecDeque>, transaction_pool: TransactionsUnordered, + db: D, } -impl SubxtWorker { - pub async fn new(rpc: RpcClient, client: OnlineClient, keypair: Keypair) -> Result { - let block = client.blocks().at_latest().await?; - let account_id: subxt::utils::AccountId32 = keypair.public_key().into(); - let legacy_rpc = LegacyRpcMethods::new(rpc.clone()); - let nonce = legacy_rpc.system_account_next_index(&account_id).await?; - let tx_pool = FuturesUnordered::new(); - // adding a never ending future to avoid tx_pool flood of None in select! loop - tx_pool.push(futures::future::pending().boxed()); +impl SubxtWorker +where + C: ITimechainClient + Send + Sync + Clone + 'static, + C::Submitter: ITransactionSubmitter + Send + Sync + 'static, + C::Block: IBlock + Send + Sync + 'static, + D: ITransactionDbOps + Send + Sync + 'static, +{ + pub async fn new(nonce: u64, client: C, db: D, keypair: Keypair) -> Result { + let latest_block = client.get_latest_block().await?; + let transaction_pool = FuturesUnordered::new(); + transaction_pool.push(futures::future::pending().boxed()); + + let txs_data = db.load_pending_txs(nonce)?; + let pending_tx: VecDeque> = txs_data + .into_iter() + .map(|tx_data| TxStatus { + data: tx_data, + event_sender: None, + best_block: None, + }) + .collect(); Ok(Self { client, keypair, nonce, - latest_block: BlockDetail { - number: block.number().into(), - hash: block.hash(), - }, - pending_tx: Default::default(), - transaction_pool: tx_pool, + latest_block, + pending_tx, + transaction_pool, + db, }) } @@ -136,17 +148,6 @@ impl SubxtWorker { self.public_key().into_account() } - fn create_signed_payload(&self, call: &Call, params: ExtrinsicParams) -> Vec - where - Call: TxPayload, - { - self.client - .tx() - .create_signed_offline(call, &self.keypair, params) - .expect("Metadata is invalid") - .into_encoded() - } - fn build_tx(&mut self, tx: Tx, params: ExtrinsicParams) -> Vec { match tx.clone() { // system @@ -156,14 +157,14 @@ impl SubxtWorker { code, }); let payload = metadata::sudo(runtime_call); - self.create_signed_payload(&payload, params) + self.client.sign_payload(&payload, params) }, // balances Tx::Transfer { account, balance } => { let account = subxt::utils::Static(account); let payload = metadata::tx().balances().transfer_allow_death(account.into(), balance); - self.create_signed_payload(&payload, params) + self.client.sign_payload(&payload, params) }, // networks Tx::RegisterNetwork { network } => { @@ -172,7 +173,7 @@ impl SubxtWorker { runtime_types::pallet_networks::pallet::Call::register_network { network }, ); let payload = metadata::sudo(runtime_call); - self.create_signed_payload(&payload, params) + self.client.sign_payload(&payload, params) }, Tx::ForceShardOffline { shard_id } => { let runtime_call = RuntimeCall::Shards( @@ -181,7 +182,7 @@ impl SubxtWorker { }, ); let payload = metadata::sudo(runtime_call); - self.create_signed_payload(&payload, params) + self.client.sign_payload(&payload, params) }, Tx::SetNetworkConfig { network, config } => { let config = subxt::utils::Static(config); @@ -192,7 +193,7 @@ impl SubxtWorker { }, ); let payload = metadata::sudo(runtime_call); - self.create_signed_payload(&payload, params) + self.client.sign_payload(&payload, params) }, // members Tx::RegisterMember { @@ -208,16 +209,16 @@ impl SubxtWorker { peer_id, stake_amount, ); - self.create_signed_payload(&payload, params) + self.client.sign_payload(&payload, params) }, Tx::UnregisterMember { member } => { let member = subxt::utils::Static(member); let payload = metadata::tx().members().unregister_member(member); - self.create_signed_payload(&payload, params) + self.client.sign_payload(&payload, params) }, Tx::Heartbeat => { let payload = metadata::tx().members().send_heartbeat(); - self.create_signed_payload(&payload, params) + self.client.sign_payload(&payload, params) }, // shards Tx::Commitment { @@ -228,17 +229,17 @@ impl SubxtWorker { let commitment = subxt::utils::Static(commitment); let payload = metadata::tx().shards().commit(shard_id, commitment, proof_of_knowledge); - self.create_signed_payload(&payload, params) + self.client.sign_payload(&payload, params) }, Tx::Ready { shard_id } => { let payload = metadata::tx().shards().ready(shard_id); - self.create_signed_payload(&payload, params) + self.client.sign_payload(&payload, params) }, // tasks Tx::SubmitTaskResult { task_id, result } => { let result = subxt::utils::Static(result); let payload = metadata::tx().tasks().submit_task_result(task_id, result); - self.create_signed_payload(&payload, params) + self.client.sign_payload(&payload, params) }, Tx::SubmitGmpEvents { network, gmp_events } => { let runtime_call = RuntimeCall::Tasks( @@ -248,7 +249,7 @@ impl SubxtWorker { }, ); let payload = metadata::sudo(runtime_call); - self.create_signed_payload(&payload, params) + self.client.sign_payload(&payload, params) }, Tx::RemoveTask { task_id } => { let runtime_call = RuntimeCall::Tasks( @@ -257,7 +258,7 @@ impl SubxtWorker { }, ); let payload = metadata::sudo(runtime_call); - self.create_signed_payload(&payload, params) + self.client.sign_payload(&payload, params) }, } } @@ -265,7 +266,7 @@ impl SubxtWorker { fn add_tx_to_pool( &mut self, transaction: Tx, - sender: oneshot::Sender, + event_sender: Option::Extrinsic>>, nonce: Option, ) { let mut is_new_tx = true; @@ -282,7 +283,7 @@ impl SubxtWorker { .mortal_unchecked(block.number, block.hash, MORTALITY.into()) .build(); let tx = self.build_tx(transaction.clone(), params); - let tx = SubmittableExtrinsic::from_bytes(self.client.clone(), tx.clone()); + let tx = self.client.submittable_transaction(tx.clone()); let hash = tx.hash(); let tx_status = TxStatus { data: TxData { @@ -291,9 +292,14 @@ impl SubxtWorker { hash, nonce, }, - event_sender: sender, + event_sender, best_block: None, }; + + if let Err(e) = self.db.store_tx(&tx_status.data) { + tracing::error!("Unable to add transaction into cache: {e}"); + }; + if is_new_tx { self.pending_tx.push_back(tx_status); self.nonce += 1; @@ -304,97 +310,126 @@ impl SubxtWorker { self.transaction_pool.push(fut); } - pub fn into_sender(mut self) -> mpsc::UnboundedSender<(Tx, oneshot::Sender)> { - let updater = self.client.updater(); + pub fn into_sender( + mut self, + ) -> mpsc::UnboundedSender<(Tx, oneshot::Sender<::Extrinsic>)> { let (tx, mut rx) = mpsc::unbounded(); tokio::task::spawn(async move { tracing::info!("starting subxt worker"); - let mut update_stream = - updater.runtime_updates().await.context("failed to start subxt worker").unwrap(); - let finalized_block_stream = self.client.blocks().subscribe_finalized().await.unwrap(); - let mut finalized_blocks = finalized_block_stream - .and_then(|block| async move { - let ex = block.extrinsics().await?; - Ok(ex) - }) - .boxed(); - let best_block_stream = self.client.blocks().subscribe_best().await.unwrap(); - let mut best_blocks = best_block_stream - .and_then(|block| async { - let ex = block.extrinsics().await?; - Ok((block, ex)) - }) - .boxed(); + let mut update_stream = self.client.runtime_updates().await.unwrap().boxed(); + let mut finalized_blocks = Self::create_stream_with_retry(self.client.clone(), |c| { + async move { c.finalized_block_stream().await }.boxed() + }) + .await; + let mut best_blocks = Self::create_stream_with_retry(self.client.clone(), |c| { + async move { c.best_block_stream().await }.boxed() + }) + .await; loop { futures::select! { tx = rx.next().fuse() => { - let Some((command, channel)) = tx else { continue; }; - self.add_tx_to_pool(command, channel, None); + let Some((command, channel)) = tx else { break; }; + tracing::info!("tx added to pool"); + self.add_tx_to_pool(command, Some(channel), None); } - block_data = finalized_blocks.next().fuse() => { - let Some(block_res) = block_data else { - tracing::error!("Finalized block strem terminated"); - tokio::time::sleep(Duration::from_secs(1)).await; - continue; - }; - - let Ok(extrinsics) = block_res else { - tracing::error!("Error processing finalized blocks: {:?}", block_res.err()); - continue; - }; - - if self.pending_tx.is_empty() { - continue; - } - - for extrinsic in extrinsics.iter() { - let extrinsic_hash = extrinsic.hash(); - if Some(extrinsic_hash) == self.pending_tx.front().map(|tx| tx.data.hash) { - let Some(tx) = self.pending_tx.pop_front() else { + block_data = finalized_blocks.next() => { + match block_data { + Some(Ok((_block, extrinsics))) => { + if self.pending_tx.is_empty() { continue; - }; - tx.event_sender.send(extrinsic).ok(); + } + + for extrinsic in extrinsics.into_iter() { + let extrinsic_hash = extrinsic.hash(); + let front = self.pending_tx.front().map(|tx| tx.data.hash); + if Some(extrinsic_hash) == front { + let Some(tx) = self.pending_tx.pop_front() else { + continue; + }; + tx.event_sender.unwrap().send(extrinsic).ok(); + if let Err(e) = self.db.remove_tx(tx.data.hash){ + tracing::error!("Unable to remove tx from db {e}"); + }; + } + } + }, + Some(Err(e)) => { + tracing::error!("Error processing finalized blocks: {:?}", e); + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + continue; + }, + None => { + tracing::error!("Finalized block stream terminated"); + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + finalized_blocks = Self:: + create_stream_with_retry( + self.client.clone(), + |c| async move { + let client = c.clone(); + client.finalized_block_stream().await + }.boxed() + ) + .await; } } } - block_data = best_blocks.next().fuse() => { - let Some(block_res) = block_data else { - tracing::error!("Latest block stream terminated"); - tokio::time::sleep(Duration::from_secs(1)).await; - continue; - }; - - let Ok((block, extrinsics)) = block_res else { - tracing::error!("Error processing block: {:?}", block_res.err()); - continue; - }; - - self.latest_block = BlockDetail { - number: block.number().into(), - hash: block.hash() - }; + block_data = best_blocks.next() => { + match block_data { + Some(Ok((block, extrinsics))) => { + self.latest_block = BlockId { + number: block.number(), + hash: block.hash() + }; + tracing::info!("best block stream hit: {}", self.latest_block.number); - if self.pending_tx.is_empty() { - continue; - } + if self.pending_tx.is_empty() { + continue; + } - let hashes: HashSet<_> = extrinsics.iter().map(|extrinsic| extrinsic.hash()).collect(); - for tx in self.pending_tx.iter_mut() { - if hashes.contains(&tx.data.hash) { - tx.best_block = Some(self.latest_block.number); - } - } + let hashes: HashSet<_> = extrinsics.iter() + .map(|extrinsic| extrinsic.hash()) + .collect(); + for tx in self.pending_tx.iter_mut() { + if hashes.contains(&tx.data.hash) { + tracing::info!("tx found in block {}", self.latest_block.number); + tx.best_block = Some(self.latest_block.number); + } + } - let mut new_pending: VecDeque = VecDeque::new(); - while let Some(tx) = self.pending_tx.pop_front() { - if tx.best_block.is_none() && self.latest_block.number > tx.data.era { - tracing::warn!("Outdated tx found retrying with nonce: {}", tx.data.nonce); - self.add_tx_to_pool(tx.data.transaction, tx.event_sender, Some(tx.data.nonce)); - } else { - new_pending.push_back(tx); + let mut new_pending: VecDeque> = VecDeque::new(); + while let Some(tx) = self.pending_tx.pop_front() { + if tx.best_block.is_none() && self.latest_block.number > tx.data.era { + tracing::warn!("outdated tx found retrying with nonce {}", tx.data.nonce); + self.add_tx_to_pool( + tx.data.transaction, + tx.event_sender, + Some(tx.data.nonce), + ); + } else { + new_pending.push_back(tx); + } + } + self.pending_tx = new_pending; + }, + Some(Err(e)) => { + tracing::error!("Error processing finalized blocks: {:?}", e); + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + continue; + }, + None => { + tracing::error!("Latest block stream terminated"); + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + best_blocks = Self:: + create_stream_with_retry( + self.client.clone(), + |c| async move { + let client = c.clone(); + client.best_block_stream().await + }.boxed() + ) + .await; } } - self.pending_tx = new_pending; } tx_result = self.transaction_pool.next().fuse() => { let Some(result) = tx_result else { @@ -402,29 +437,44 @@ impl SubxtWorker { }; match result { Ok(hash) => { - tracing::info!("Transaction completed: {:?}", hash); + tracing::info!("tx completed: {:?}", hash); } Err(e) => { - tracing::error!("Transaction failed {e}"); + tracing::error!("tx failed {e}"); } } } update = update_stream.next().fuse() => { - let Some(Ok(update)) = update else { continue; }; - let version = update.runtime_version().spec_version; - match updater.apply_update(update) { - Ok(()) => { - tracing::info!("Upgrade to version: {} successful", version) - }, - Err(subxt::client::UpgradeError::SameVersion) => {} - Err(e) => { - tracing::error!("Upgrade to version {} failed {:?}", version, e); - }, - }; + let Some(Ok(update)) = update else {continue}; + self.client.apply_update(update).ok(); } } } }); tx } + + async fn create_stream_with_retry( + client: C, + stream_creator: F, + ) -> Fuse>> + where + F: Fn(C) -> BoxFuture<'static, Result>>> + + Clone + + Send + + 'static, + { + loop { + match stream_creator(client.clone()).await { + Ok(stream) => { + tracing::info!("stream created successfully returning"); + return stream.fuse(); + }, + Err(e) => { + tracing::warn!("Couldn't create stream, retrying: {:?}", e); + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + }, + } + } + } } diff --git a/tc-subxt/tests/mock.rs b/tc-subxt/tests/mock.rs new file mode 100644 index 000000000..c95b34734 --- /dev/null +++ b/tc-subxt/tests/mock.rs @@ -0,0 +1,321 @@ +use anyhow::Result; +use futures::channel::{mpsc, oneshot}; +use futures::stream::{self, BoxStream}; +use futures::{SinkExt, Stream, StreamExt}; +use std::collections::{HashMap, VecDeque}; +use std::pin::Pin; +use std::str::FromStr; +use std::sync::Arc; +use std::task::{Context, Poll}; +use subxt::{tx::Payload as TxPayload, utils::H256}; +use subxt_signer::{sr25519::Keypair, SecretUri}; +use tc_subxt::timechain_client::{ + BlockId, IBlock, IExtrinsic, ITimechainClient, ITransactionDbOps, ITransactionSubmitter, +}; +use tc_subxt::worker::{SubxtWorker, Tx, TxData}; +use tc_subxt::ExtrinsicParams; +use tokio::sync::{broadcast, Mutex}; +use tokio_stream::wrappers::BroadcastStream; + +type TxSender = futures::channel::mpsc::UnboundedSender<( + Tx, + oneshot::Sender<::Extrinsic>, +)>; + +pub struct TestingEnv { + client: MockClient, + tx_sender: TxSender, + pub db: MockDb, +} + +impl TestingEnv { + pub async fn new() -> Self { + env_logger::try_init().ok(); + let client = MockClient::new(); + let uri = SecretUri::from_str("//Alice").unwrap(); + let keypair = Keypair::from_uri(&uri).unwrap(); + let db = MockDb::default(); + let worker = SubxtWorker::new(0, client.clone(), db.clone(), keypair).await.unwrap(); + let tx_sender = worker.into_sender(); + Self { client, tx_sender, db } + } + + pub async fn submit_tx(&self) -> oneshot::Receiver { + let (tx, rx) = oneshot::channel(); + self.tx_sender.unbounded_send((Tx::Ready { shard_id: 0 }, tx)).unwrap(); + rx + } +} + +impl std::ops::Deref for TestingEnv { + type Target = MockClient; + + fn deref(&self) -> &Self::Target { + &self.client + } +} + +#[derive(Clone)] +pub struct MockClient { + best_sender: broadcast::Sender, + finalized_sender: broadcast::Sender, + pool_sender: mpsc::Sender, + pool: Arc>>, + executed: Arc>>, + best_block: Arc>, + subscription_counter: Arc>, + force_stream_error: Arc>, +} + +impl Default for MockClient { + fn default() -> Self { + Self::new() + } +} + +impl MockClient { + pub fn new() -> Self { + let (finalized_sender, _) = broadcast::channel(10_000); + let (best_sender, _) = broadcast::channel(10_000); + let (pool_sender, pool) = mpsc::channel(1); + Self { + finalized_sender, + best_sender, + pool_sender, + pool: Arc::new(Mutex::new(pool)), + executed: Default::default(), + best_block: Default::default(), + subscription_counter: Default::default(), + force_stream_error: Default::default(), + } + } + + pub async fn submission(&self) -> u64 { + self.pool.lock().await.next().await.unwrap() + } + + pub async fn execute_tx(&self, nonce: u64, success: bool) { + self.executed.lock().await.push(MockExtrinsic { nonce, success }); + } + + pub async fn make_block(&self) -> MockBlock { + let mut best_block = self.best_block.lock().await; + let extrinsics = std::mem::take(&mut *self.executed.lock().await); + let id = BlockId { + number: best_block.number + 1, + hash: H256::random(), + }; + let block = MockBlock { id, extrinsics }; + *best_block = id; + self.best_sender.send(block.clone()).ok(); + self.finalized_sender.send(block.clone()).ok(); + block + } + + pub async fn set_force_stream_error(&self, flag: bool) { + *self.force_stream_error.lock().await = flag; + } +} + +#[async_trait::async_trait] +impl ITimechainClient for MockClient { + type Submitter = MockSubmitter; + type Block = MockBlock; + type Update = (); + async fn get_latest_block(&self) -> Result { + Ok(*self.best_block.lock().await) + } + + fn sign_payload(&self, _call: &Call, params: ExtrinsicParams) -> Vec + where + Call: TxPayload + Send + Sync, + { + let nonce = params.2 .0.unwrap_or_default(); + nonce.to_le_bytes().to_vec() + } + + fn submittable_transaction(&self, tx: Vec) -> Self::Submitter { + let nonce = u64::from_le_bytes(tx.try_into().unwrap()); + MockSubmitter { + nonce, + pool_sender: self.pool_sender.clone(), + } + } + + async fn finalized_block_stream( + &self, + ) -> Result::Extrinsic>)>>> + { + let rx = self.finalized_sender.subscribe(); + { + let mut counter = self.subscription_counter.lock().await; + *counter += 1; + } + let stream = BroadcastStream::new(rx) + .map(|res| res.map_err(|e| anyhow::anyhow!(e))) + .map(|block_result| block_result.map(|block| (block.clone(), block.extrinsics.clone()))) + .boxed(); + + let stream = ErrorInjectingStream::new(stream, self.force_stream_error.clone()).boxed(); + Ok(stream) + } + + async fn best_block_stream( + &self, + ) -> Result::Extrinsic>)>>> + { + let rx = self.best_sender.subscribe(); + { + let mut counter = self.subscription_counter.lock().await; + *counter += 1; + } + let stream = BroadcastStream::new(rx) + .map(|res| res.map_err(|e| anyhow::anyhow!(e))) + .map(|block_result| block_result.map(|block| (block.clone(), block.extrinsics.clone()))) + .boxed(); + Ok(stream) + } + + async fn runtime_updates(&self) -> Result>> { + Ok(stream::empty().boxed()) + } + + fn apply_update(&self, _update: Self::Update) -> Result<()> { + Ok(()) + } +} + +pub struct MockSubmitter { + nonce: u64, + pool_sender: mpsc::Sender, +} + +#[async_trait::async_trait] +impl ITransactionSubmitter for MockSubmitter { + fn hash(&self) -> H256 { + let mut hash = [0; 32]; + hash[..8].copy_from_slice(&self.nonce.to_le_bytes()); + hash.into() + } + + async fn submit(&self) -> Result { + self.pool_sender.clone().send(self.nonce).await?; + Ok(self.hash()) + } +} + +#[derive(Clone, Debug)] +pub struct MockBlock { + pub id: BlockId, + pub extrinsics: Vec, +} + +#[async_trait::async_trait] +impl IBlock for MockBlock { + type Extrinsic = MockExtrinsic; + + async fn extrinsics(&self) -> Result> { + Ok(self.extrinsics.clone()) + } + + fn number(&self) -> u64 { + self.id.number + } + + fn hash(&self) -> H256 { + self.id.hash + } +} + +#[derive(Clone, Debug)] +pub struct MockExtrinsic { + pub nonce: u64, + pub success: bool, +} + +#[async_trait::async_trait] +impl IExtrinsic for MockExtrinsic { + type Events = (); + + async fn events(&self) -> Result { + Ok(()) + } + + fn hash(&self) -> H256 { + let mut hash = [0; 32]; + hash[..8].copy_from_slice(&self.nonce.to_le_bytes()); + hash.into() + } + + async fn is_success(&self) -> Result<()> { + if self.success { + Ok(()) + } else { + anyhow::bail!("tx is failed") + } + } +} + +#[derive(Default, Clone)] +pub struct MockDb { + data: std::sync::Arc>>, +} + +impl ITransactionDbOps for MockDb { + fn store_tx(&self, tx_data: &TxData) -> Result<()> { + self.data.lock().unwrap().insert(tx_data.hash.0, tx_data.clone()); + Ok(()) + } + + fn remove_tx(&self, hash: H256) -> Result<()> { + self.data.lock().unwrap().remove(&hash.0); + Ok(()) + } + + fn load_pending_txs(&self, _nonce: u64) -> Result> { + let data = self.data.lock().unwrap(); + let mut txs: Vec = data.values().cloned().collect(); + txs.sort_by_key(|tx| tx.nonce); + Ok(txs.into()) + } +} + +struct ErrorInjectingStream { + inner: S, + flag: Arc>, + errored: bool, +} + +impl ErrorInjectingStream { + fn new(inner: S, flag: Arc>) -> Self { + Self { inner, flag, errored: false } + } +} + +impl Stream for ErrorInjectingStream +where + S: Stream> + Unpin, +{ + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let flag_set = { + if let Ok(lock) = self.flag.try_lock() { + *lock + } else { + false + } + }; + + if flag_set && !self.errored { + self.errored = true; + return Poll::Ready(Some(Err(anyhow::anyhow!("Forced stream error for testing")))); + } + + let next = Pin::new(&mut self.inner).poll_next(cx); + if self.errored { + return Poll::Ready(None); + } + next + } +} diff --git a/tc-subxt/tests/tests.rs b/tc-subxt/tests/tests.rs new file mode 100644 index 000000000..09a73d3ff --- /dev/null +++ b/tc-subxt/tests/tests.rs @@ -0,0 +1,132 @@ +use crate::mock::TestingEnv; +use anyhow::Result; +use std::time::Duration; +use tc_subxt::timechain_client::ITransactionDbOps; +use tc_subxt::worker::MORTALITY; +use tokio::time::sleep; + +mod mock; + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_transaction_flow() -> Result<()> { + let env = TestingEnv::new().await; + let rx = env.submit_tx().await; + let nonce = env.submission().await; + assert_eq!(nonce, 0); + env.execute_tx(nonce, true).await; + env.make_block().await; + let tx = rx.await?; + assert_eq!(tx.nonce, 0); + assert!(tx.success); + Ok(()) +} + +// test tx failing and next tx being processed. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_transaction_flow_with_error() -> Result<()> { + let env = TestingEnv::new().await; + let rx = env.submit_tx().await; + let nonce = env.submission().await; + assert_eq!(nonce, 0); + env.execute_tx(nonce, false).await; + env.make_block().await; + let tx = rx.await?; + assert_eq!(tx.nonce, 0); + assert!(!tx.success); + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_transaction_mortality_outage_flow() -> Result<()> { + let env = TestingEnv::new().await; + let rx = env.submit_tx().await; + let nonce = env.submission().await; + assert_eq!(nonce, 0); + for _ in 0..(MORTALITY + 1) { + env.make_block().await; + } + let nonce = env.submission().await; + assert_eq!(nonce, 0); + env.execute_tx(nonce, true).await; + env.make_block().await; + let tx = rx.await?; + assert_eq!(tx.nonce, 0); + assert!(tx.success); + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_transaction_mortality_outage_flow_20() -> Result<()> { + let num_txs = 20; + let env = TestingEnv::new().await; + let mut rxs = vec![]; + for _ in 0..num_txs { + let rx = env.submit_tx().await; + rxs.push(rx); + } + for _ in 0..num_txs { + env.submission().await; + } + for _ in 0..(MORTALITY + 1) { + env.make_block().await; + } + for _ in 0..num_txs { + let nonce = env.submission().await; + env.execute_tx(nonce, true).await; + } + env.make_block().await; + for rx in rxs { + let tx = rx.await?; + assert!(tx.success); + } + Ok(()) +} + +// test add tx to db +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_tc_subxt_db_ops() -> Result<()> { + let env = TestingEnv::new().await; + let rx = env.submit_tx().await; + let nonce = env.submission().await; + // check db insertion + let txs = env.db.load_pending_txs(0).unwrap(); + assert!(txs.len() == 1); + env.execute_tx(nonce, true).await; + env.make_block().await; + let _ = rx.await.unwrap(); + let txs = env.db.load_pending_txs(0).unwrap(); + assert!(txs.is_empty()); + Ok(()) +} + +// Add test to check stream in case of errors. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_finalized_stream_error_and_recovery() -> Result<()> { + let env = TestingEnv::new().await; + let rx = env.submit_tx().await; + let nonce = env.submission().await; + assert_eq!(nonce, 0); + env.execute_tx(nonce, true).await; + env.make_block().await; + let tx = rx.await?; + assert_eq!(tx.nonce, 0); + assert!(tx.success); + + // stream restart + sleep(Duration::from_secs(1)).await; + env.set_force_stream_error(true).await; + sleep(Duration::from_millis(100)).await; + env.set_force_stream_error(false).await; + env.make_block().await; + + // adding new + let rx = env.submit_tx().await; + let nonce = env.submission().await; + assert_eq!(nonce, 1); + env.execute_tx(nonce, true).await; + env.make_block().await; + let tx = rx.await?; + assert_eq!(tx.nonce, 1); + assert!(tx.success); + Ok(()) +}