diff --git a/chronicle/src/lib.rs b/chronicle/src/lib.rs index 8f7fd01c8..9a1061d1f 100644 --- a/chronicle/src/lib.rs +++ b/chronicle/src/lib.rs @@ -46,6 +46,8 @@ pub struct ChronicleConfig { pub target_keyfile: PathBuf, /// Path to a cache for TSS key shares. pub tss_keyshare_cache: PathBuf, + /// Minimum balance chronicle should have. + pub target_min_balance: u128, } impl ChronicleConfig { @@ -92,6 +94,7 @@ pub async fn run_chronicle( let task_spawner_params = TaskSpawnerParams { tss: tss_tx, blockchain: chain, + min_balance: config.target_min_balance, network: subchain, network_id: config.network_id, url: config.target_url, diff --git a/chronicle/src/main.rs b/chronicle/src/main.rs index 29e88e933..2a30a0ae9 100644 --- a/chronicle/src/main.rs +++ b/chronicle/src/main.rs @@ -22,6 +22,9 @@ pub struct ChronicleArgs { /// key file for connector wallet #[clap(long)] pub target_keyfile: PathBuf, + /// key file for connector wallet + #[clap(long)] + pub target_min_balance: u128, /// Metadata version to use to connect to timechain node. #[clap(long)] pub timechain_metadata: Option, @@ -48,6 +51,7 @@ impl ChronicleArgs { network_id: self.network_id, network_keyfile: self.network_keyfile, network_port: self.network_port, + target_min_balance: self.target_min_balance, timechain_metadata: self.timechain_metadata.unwrap_or_default(), timechain_url: self.timechain_url, timechain_keyfile: self.timechain_keyfile, diff --git a/chronicle/src/mock.rs b/chronicle/src/mock.rs index ff1a4e585..98895766c 100644 --- a/chronicle/src/mock.rs +++ b/chronicle/src/mock.rs @@ -386,6 +386,10 @@ impl Runtime for Mock { Ok(()) } + async fn submit_unregister_member(&self) -> Result<()> { + Ok(()) + } + async fn submit_heartbeat(&self) -> Result<()> { Ok(()) } diff --git a/chronicle/src/tasks/spawner.rs b/chronicle/src/tasks/spawner.rs index 7f4f92b2c..91d290946 100644 --- a/chronicle/src/tasks/spawner.rs +++ b/chronicle/src/tasks/spawner.rs @@ -8,6 +8,7 @@ use rosetta_client::Wallet; use rosetta_config_ethereum::{query::GetLogs, CallResult, FilterBlockOption}; use rosetta_core::{BlockOrIdentifier, ClientEvent}; use schnorr_evm::VerifyingKey; +use std::time::Duration; use std::{ future::Future, num::NonZeroU64, @@ -22,11 +23,13 @@ use time_primitives::{ }; use time_primitives::{IGateway, Msg}; use tokio::sync::Mutex; +use tokio::time::sleep; #[derive(Clone)] pub struct TaskSpawnerParams { pub tss: mpsc::Sender, pub blockchain: String, + pub min_balance: u128, pub network: String, pub network_id: NetworkId, pub url: String, @@ -38,6 +41,7 @@ pub struct TaskSpawnerParams { pub struct TaskSpawner { tss: mpsc::Sender, wallet: Arc, + wallet_min_balance: u128, wallet_guard: Arc>, substrate: S, network_id: NetworkId, @@ -58,13 +62,29 @@ where ) .await?, ); - Ok(Self { + + let spawner = Self { tss: params.tss, + wallet_min_balance: params.min_balance, wallet, wallet_guard: Arc::new(Mutex::new(())), substrate: params.substrate, network_id: params.network_id, - }) + }; + + while !spawner.is_balance_available().await? { + sleep(Duration::from_secs(10)).await; + tracing::warn!("Chronicle balance is too low, retrying..."); + } + + Ok(spawner) + } + + /// + /// Checks if wallet have enough balance + async fn is_balance_available(&self) -> Result { + let balance = self.wallet.balance().await?; + Ok(balance > self.wallet_min_balance) } /// @@ -229,7 +249,7 @@ where Ok(payload) => payload, Err(payload) => Payload::Error(payload), }; - tracing::debug!("debug_latency:{} sending read payloa dor signing", task_id); + tracing::debug!("debug_latency:{} sending read payload for signing", task_id); let (_, signature) = self .tss_sign(block_num, shard_id, task_id, TaskPhase::Read, &payload.bytes(task_id)) .await?; @@ -242,6 +262,21 @@ where } async fn write(self, task_id: TaskId, function: Function) -> Result<()> { + match self.is_balance_available().await { + Ok(false) => { + // unregister member + if let Err(e) = self.substrate.submit_unregister_member().await { + tracing::error!(task_id = task_id, "Failed to unregister member: {:?}", e); + }; + tracing::warn!(task_id = task_id, "Chronicle balance too low, exiting"); + std::process::exit(1); + }, + Ok(true) => {}, + Err(err) => { + tracing::error!(task_id = task_id, "Could not fetch account balance: {:?}", err) + }, + } + let submission = async move { match function { Function::EvmDeploy { bytecode } => { diff --git a/docker-compose.yml b/docker-compose.yml index fb25cbd85..eff7cd32d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -70,6 +70,7 @@ services: - '--timechain-metadata=development' - '--timechain-url=ws://validator:9944' - '--target-keyfile=/etc/target_keyfile' + - '--target-min-balance=1000000000000000' - '--timechain-keyfile=/etc/timechain_keyfile' - '--tss-keyshare-cache=/etc' environment: diff --git a/primitives/src/lib.rs b/primitives/src/lib.rs index 79a42f37f..d13e58227 100644 --- a/primitives/src/lib.rs +++ b/primitives/src/lib.rs @@ -233,6 +233,8 @@ pub trait Runtime: Clone + Send + Sync + 'static { stake_amount: u128, ) -> Result<()>; + async fn submit_unregister_member(&self) -> Result<()>; + async fn submit_heartbeat(&self) -> Result<()>; async fn submit_commitment( diff --git a/tc-subxt/src/events.rs b/tc-subxt/src/events.rs index f22f2e3e0..13f13f098 100644 --- a/tc-subxt/src/events.rs +++ b/tc-subxt/src/events.rs @@ -21,3 +21,10 @@ impl StaticEvent for GatewayRegistered { const PALLET: &'static str = "Tasks"; const EVENT: &'static str = "GatewayRegistered"; } + +#[derive(DecodeAsType, Debug)] +pub struct UnRegisteredMember(pub [u8; 32], pub u64); +impl StaticEvent for UnRegisteredMember { + const PALLET: &'static str = "Members"; + const EVENT: &'static str = "UnRegisteredMember"; +} diff --git a/tc-subxt/src/lib.rs b/tc-subxt/src/lib.rs index 5bc723037..693b25530 100644 --- a/tc-subxt/src/lib.rs +++ b/tc-subxt/src/lib.rs @@ -48,6 +48,7 @@ pub trait TxSubmitter: Clone + Send + Sync + 'static { pub enum Tx { RegisterMember { network: NetworkId, peer_id: PeerId, stake_amount: u128 }, + UnregisterMember, Heartbeat, Commitment { shard_id: ShardId, commitment: Commitment, proof_of_knowledge: ProofOfKnowledge }, CreateTask { task: TaskDescriptorParams }, @@ -129,6 +130,10 @@ impl SubxtWorker { ); self.create_signed_payload(&payload).await }, + Tx::UnregisterMember => { + let payload = metadata::tx().members().unregister_member(); + self.create_signed_payload(&payload).await + }, Tx::Heartbeat => { let payload = metadata::tx().members().send_heartbeat(); self.create_signed_payload(&payload).await @@ -595,6 +600,13 @@ impl Runtime for SubxtClient { Ok(()) } + async fn submit_unregister_member(&self) -> Result<()> { + let (tx, rx) = oneshot::channel(); + self.tx.unbounded_send((Tx::UnregisterMember, tx))?; + rx.await?; + Ok(()) + } + async fn submit_heartbeat(&self) -> Result<()> { let (tx, rx) = oneshot::channel(); self.tx.unbounded_send((Tx::Heartbeat, tx))?; diff --git a/tester/src/main.rs b/tester/src/main.rs index cf0b88df2..7fd23b8e1 100644 --- a/tester/src/main.rs +++ b/tester/src/main.rs @@ -95,6 +95,7 @@ enum Test { Batch { tasks: u64 }, Gmp, ChroniclePayment, + ChronicleFundCheck, Migration, Restart, } @@ -192,13 +193,19 @@ async fn main() -> Result<()> { }, // chronicles are refunded the gas for gmp call Command::Test(Test::ChroniclePayment) => { - println!("This test is only available local with single node shard"); + // "This test is only available local with single node shard" let starting_balance = chronicles[0].wallet().balance().await?; gmp_test(&tester[0], &tester[1], &contract).await?; let ending_balance = chronicles[0].wallet().balance().await?; println!("Verifying balance"); assert!(starting_balance <= ending_balance); }, + // chronicles are refunded the gas for gmp call + Command::Test(Test::ChronicleFundCheck) => { + // "This test is only available in local setup" + gmp_funds_check(&tester[0], &tester[1], &contract, &chronicles, &args.timechain_url) + .await?; + }, Command::Test(Test::Gmp) => { gmp_test(&tester[0], &tester[1], &contract).await?; }, @@ -507,6 +514,68 @@ async fn gmp_test(src: &Tester, dest: &Tester, contract: &Path) -> Result<()> { Ok(()) } +async fn gmp_funds_check( + src: &Tester, + dest: &Tester, + contract: &Path, + chronicles: &[Tester], + timechain_url: &str, +) -> Result<()> { + let (src_contract, _, _) = setup_gmp_with_contracts(src, dest, contract, 1).await?; + + let subxt_client = SubxtClient::get_client(timechain_url).await?; + + // submit a vote on source contract (testing contract) which will emit a gmpcreated event on gateway contract + let res = src + .wallet() + .eth_send_call( + src_contract, + VotingContract::voteCall { _vote: true }.abi_encode(), + 0, + None, + None, + ) + .await?; + let block = res.receipt().unwrap().block_number.unwrap(); + println!("submitted vote in block {block}, tx_hash: {:?}", res.tx_hash()); + let chronicle_wallet = chronicles.first().unwrap().wallet(); + let current_balance = chronicle_wallet.balance().await?; + // leave some space for gas_price + let transfer_balance = current_balance - 100000000000000u128; + println!("Emptying chronicle balance"); + chronicle_wallet + .transfer(src.wallet().account(), transfer_balance, None, None) + .await + .unwrap(); + println!("looking for unregister event"); + let mut block_stream = src.finality_block_stream().await; + + 'main: loop { + tokio::select! { + block = block_stream.next() => { + if let Some((block_hash, block_number)) = block { + println!("Received block number: {:?}", block_number); + let events = subxt_client.events().at(block_hash).await?; + let member_offline_event = events.find::().flatten().next(); + + if let Some(member_offline) = member_offline_event { + let network = member_offline.1; + + println!("member offline for network: {:?}", network); + break 'main; + } + } + } + _ = tokio::signal::ctrl_c() => { + println!("aborting..."); + anyhow::bail!("abort"); + } + } + } + println!("Test Passed"); + Ok(()) +} + async fn task_migration_test(tester: &Tester, contract: &Path) -> Result<()> { let (_, contract_address, start_block) = test_setup(tester, contract, 1, 1).await?;