Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chronicle funds check #1007

Merged
merged 7 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions chronicle/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ pub struct ChronicleConfig {
pub target_keyfile: PathBuf,
/// Path to a cache for TSS key shares.
pub tss_keyshare_cache: PathBuf,
/// Minimum balance chronicle should have.
pub target_min_balance: u128,
}

impl ChronicleConfig {
Expand Down Expand Up @@ -92,6 +94,7 @@ pub async fn run_chronicle(
let task_spawner_params = TaskSpawnerParams {
tss: tss_tx,
blockchain: chain,
min_balance: config.target_min_balance,
network: subchain,
network_id: config.network_id,
url: config.target_url,
Expand Down
4 changes: 4 additions & 0 deletions chronicle/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ pub struct ChronicleArgs {
/// key file for connector wallet
#[clap(long)]
pub target_keyfile: PathBuf,
/// key file for connector wallet
#[clap(long)]
pub target_min_balance: u128,
/// Metadata version to use to connect to timechain node.
#[clap(long)]
pub timechain_metadata: Option<MetadataVariant>,
Expand All @@ -48,6 +51,7 @@ impl ChronicleArgs {
network_id: self.network_id,
network_keyfile: self.network_keyfile,
network_port: self.network_port,
target_min_balance: self.target_min_balance,
timechain_metadata: self.timechain_metadata.unwrap_or_default(),
timechain_url: self.timechain_url,
timechain_keyfile: self.timechain_keyfile,
Expand Down
4 changes: 4 additions & 0 deletions chronicle/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,10 @@ impl Runtime for Mock {
Ok(())
}

async fn submit_unregister_member(&self) -> Result<()> {
Ok(())
}

async fn submit_heartbeat(&self) -> Result<()> {
Ok(())
}
Expand Down
41 changes: 38 additions & 3 deletions chronicle/src/tasks/spawner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use rosetta_client::Wallet;
use rosetta_config_ethereum::{query::GetLogs, CallResult, FilterBlockOption};
use rosetta_core::{BlockOrIdentifier, ClientEvent};
use schnorr_evm::VerifyingKey;
use std::time::Duration;
use std::{
future::Future,
num::NonZeroU64,
Expand All @@ -22,11 +23,13 @@ use time_primitives::{
};
use time_primitives::{IGateway, Msg};
use tokio::sync::Mutex;
use tokio::time::sleep;

#[derive(Clone)]
pub struct TaskSpawnerParams<S> {
pub tss: mpsc::Sender<TssSigningRequest>,
pub blockchain: String,
pub min_balance: u128,
pub network: String,
pub network_id: NetworkId,
pub url: String,
Expand All @@ -38,6 +41,7 @@ pub struct TaskSpawnerParams<S> {
pub struct TaskSpawner<S> {
tss: mpsc::Sender<TssSigningRequest>,
wallet: Arc<Wallet>,
wallet_min_balance: u128,
wallet_guard: Arc<Mutex<()>>,
substrate: S,
network_id: NetworkId,
Expand All @@ -58,13 +62,29 @@ where
)
.await?,
);
Ok(Self {

let spawner = Self {
tss: params.tss,
wallet_min_balance: params.min_balance,
wallet,
wallet_guard: Arc::new(Mutex::new(())),
substrate: params.substrate,
network_id: params.network_id,
})
};

while !spawner.is_balance_available().await? {
sleep(Duration::from_secs(10)).await;
tracing::warn!("Chronicle balance is too low, retrying...");
}

Ok(spawner)
}

///
/// Checks if wallet have enough balance
async fn is_balance_available(&self) -> Result<bool> {
let balance = self.wallet.balance().await?;
Ok(balance > self.wallet_min_balance)
}

///
Expand Down Expand Up @@ -229,7 +249,7 @@ where
Ok(payload) => payload,
Err(payload) => Payload::Error(payload),
};
tracing::debug!("debug_latency:{} sending read payloa dor signing", task_id);
tracing::debug!("debug_latency:{} sending read payload for signing", task_id);
let (_, signature) = self
.tss_sign(block_num, shard_id, task_id, TaskPhase::Read, &payload.bytes(task_id))
.await?;
Expand All @@ -242,6 +262,21 @@ where
}

async fn write(self, task_id: TaskId, function: Function) -> Result<()> {
match self.is_balance_available().await {
Ok(false) => {
// unregister member
if let Err(e) = self.substrate.submit_unregister_member().await {
tracing::error!(task_id = task_id, "Failed to unregister member: {:?}", e);
};
tracing::warn!(task_id = task_id, "Chronicle balance too low, exiting");
std::process::exit(1);
},
Ok(true) => {},
Err(err) => {
tracing::error!(task_id = task_id, "Could not fetch account balance: {:?}", err)
},
}

let submission = async move {
match function {
Function::EvmDeploy { bytecode } => {
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ services:
- '--timechain-metadata=development'
- '--timechain-url=ws://validator:9944'
- '--target-keyfile=/etc/target_keyfile'
- '--target-min-balance=1000000000000000'
- '--timechain-keyfile=/etc/timechain_keyfile'
- '--tss-keyshare-cache=/etc'
environment:
Expand Down
2 changes: 2 additions & 0 deletions primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ pub trait Runtime: Clone + Send + Sync + 'static {
stake_amount: u128,
) -> Result<()>;

async fn submit_unregister_member(&self) -> Result<()>;

async fn submit_heartbeat(&self) -> Result<()>;

async fn submit_commitment(
Expand Down
7 changes: 7 additions & 0 deletions tc-subxt/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,10 @@ impl StaticEvent for GatewayRegistered {
const PALLET: &'static str = "Tasks";
const EVENT: &'static str = "GatewayRegistered";
}

#[derive(DecodeAsType, Debug)]
pub struct UnRegisteredMember(pub [u8; 32], pub u64);
impl StaticEvent for UnRegisteredMember {
const PALLET: &'static str = "Members";
const EVENT: &'static str = "UnRegisteredMember";
}
12 changes: 12 additions & 0 deletions tc-subxt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub trait TxSubmitter: Clone + Send + Sync + 'static {

pub enum Tx {
RegisterMember { network: NetworkId, peer_id: PeerId, stake_amount: u128 },
UnregisterMember,
Heartbeat,
Commitment { shard_id: ShardId, commitment: Commitment, proof_of_knowledge: ProofOfKnowledge },
CreateTask { task: TaskDescriptorParams },
Expand Down Expand Up @@ -129,6 +130,10 @@ impl<T: TxSubmitter> SubxtWorker<T> {
);
self.create_signed_payload(&payload).await
},
Tx::UnregisterMember => {
let payload = metadata::tx().members().unregister_member();
self.create_signed_payload(&payload).await
},
Tx::Heartbeat => {
let payload = metadata::tx().members().send_heartbeat();
self.create_signed_payload(&payload).await
Expand Down Expand Up @@ -595,6 +600,13 @@ impl Runtime for SubxtClient {
Ok(())
}

async fn submit_unregister_member(&self) -> Result<()> {
let (tx, rx) = oneshot::channel();
self.tx.unbounded_send((Tx::UnregisterMember, tx))?;
rx.await?;
Ok(())
}

async fn submit_heartbeat(&self) -> Result<()> {
let (tx, rx) = oneshot::channel();
self.tx.unbounded_send((Tx::Heartbeat, tx))?;
Expand Down
71 changes: 70 additions & 1 deletion tester/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ enum Test {
Batch { tasks: u64 },
Gmp,
ChroniclePayment,
ChronicleFundCheck,
Migration,
Restart,
}
Expand Down Expand Up @@ -192,13 +193,19 @@ async fn main() -> Result<()> {
},
// chronicles are refunded the gas for gmp call
Command::Test(Test::ChroniclePayment) => {
println!("This test is only available local with single node shard");
// "This test is only available local with single node shard"
let starting_balance = chronicles[0].wallet().balance().await?;
gmp_test(&tester[0], &tester[1], &contract).await?;
let ending_balance = chronicles[0].wallet().balance().await?;
println!("Verifying balance");
assert!(starting_balance <= ending_balance);
},
// chronicles are refunded the gas for gmp call
Command::Test(Test::ChronicleFundCheck) => {
// "This test is only available in local setup"
gmp_funds_check(&tester[0], &tester[1], &contract, &chronicles, &args.timechain_url)
.await?;
},
Command::Test(Test::Gmp) => {
gmp_test(&tester[0], &tester[1], &contract).await?;
},
Expand Down Expand Up @@ -507,6 +514,68 @@ async fn gmp_test(src: &Tester, dest: &Tester, contract: &Path) -> Result<()> {
Ok(())
}

async fn gmp_funds_check(
src: &Tester,
dest: &Tester,
contract: &Path,
chronicles: &[Tester],
timechain_url: &str,
) -> Result<()> {
let (src_contract, _, _) = setup_gmp_with_contracts(src, dest, contract, 1).await?;

let subxt_client = SubxtClient::get_client(timechain_url).await?;

// submit a vote on source contract (testing contract) which will emit a gmpcreated event on gateway contract
let res = src
.wallet()
.eth_send_call(
src_contract,
VotingContract::voteCall { _vote: true }.abi_encode(),
0,
None,
None,
)
.await?;
let block = res.receipt().unwrap().block_number.unwrap();
println!("submitted vote in block {block}, tx_hash: {:?}", res.tx_hash());
let chronicle_wallet = chronicles.first().unwrap().wallet();
let current_balance = chronicle_wallet.balance().await?;
// leave some space for gas_price
let transfer_balance = current_balance - 100000000000000u128;
println!("Emptying chronicle balance");
chronicle_wallet
.transfer(src.wallet().account(), transfer_balance, None, None)
.await
.unwrap();
println!("looking for unregister event");
let mut block_stream = src.finality_block_stream().await;

'main: loop {
tokio::select! {
block = block_stream.next() => {
if let Some((block_hash, block_number)) = block {
println!("Received block number: {:?}", block_number);
let events = subxt_client.events().at(block_hash).await?;
let member_offline_event = events.find::<events::UnRegisteredMember>().flatten().next();

if let Some(member_offline) = member_offline_event {
let network = member_offline.1;

println!("member offline for network: {:?}", network);
break 'main;
}
}
}
_ = tokio::signal::ctrl_c() => {
println!("aborting...");
anyhow::bail!("abort");
}
}
}
println!("Test Passed");
Ok(())
}

async fn task_migration_test(tester: &Tester, contract: &Path) -> Result<()> {
let (_, contract_address, start_block) = test_setup(tester, contract, 1, 1).await?;

Expand Down
Loading