Skip to content

Commit

Permalink
Subxt tests (#1478)
Browse files Browse the repository at this point in the history
Co-authored-by: David Craven <[email protected]>
  • Loading branch information
haider-rs and dvc94ch authored Feb 18, 2025
1 parent 1aefc61 commit 54a346c
Show file tree
Hide file tree
Showing 16 changed files with 1,058 additions and 221 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,6 @@ runtime/target/srtool
# secrets
.env
gcp-key.json

#cached db
tc-subxt/cached_tx.redb
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion chronicle/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ pub struct ChronicleArgs {
/// Gmp backend to use.
#[clap(long, default_value = "evm")]
pub backend: Backend,
/// Chronicle db path.
#[clap(long, default_value = "cached_tx.redb")]
pub tx_db: String,
/// Cctp Sender.
#[clap(long)]
pub cctp_sender: Option<String>,
Expand Down Expand Up @@ -130,7 +133,8 @@ async fn main() -> Result<()> {
}
}

let subxt = SubxtClient::with_key(&args.timechain_url, &timechain_mnemonic).await?;
let subxt =
SubxtClient::with_key(&args.timechain_url, &timechain_mnemonic, &args.tx_db).await?;

let config = args.config(network_key, target_mnemonic)?;

Expand Down
1 change: 1 addition & 0 deletions primitives/src/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub type TssHash = [u8; 32];
pub type PeerId = [u8; 32];
pub type ShardId = u64;
pub type ProofOfKnowledge = [u8; 65];

#[derive(Encode, Decode, TypeInfo, PartialEq, Eq, Clone, Debug)]
pub struct Commitment(pub BoundedVec<TssPublicKey, ConstU32<MAX_SHARD_SIZE>>);

Expand Down
7 changes: 4 additions & 3 deletions tc-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ impl Tc {
tracing::info!("waiting for chain to start: {err:?}");
sleep_or_abort(Duration::from_secs(10)).await?;
}
let runtime = SubxtClient::with_key(&timechain_url, &env.timechain_mnemonic)
.await
.context("failed to connect to timechain")?;
let runtime =
SubxtClient::with_key(&timechain_url, &env.timechain_mnemonic, "cached_tx.redb")
.await
.context("failed to connect to timechain")?;
Ok::<_, anyhow::Error>(runtime)
});
let mut connectors = HashMap::new();
Expand Down
8 changes: 7 additions & 1 deletion tc-subxt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ async-trait.workspace = true
clap.workspace = true
futures.workspace = true
hex.workspace = true
redb = "2.1.2"
serde = { workspace = true, features = ["derive"] }
tokio.workspace = true
tokio = { workspace = true, features = ["full"] }
tracing.workspace = true

scale-codec.workspace = true
Expand All @@ -29,6 +30,11 @@ subxt-signer.workspace = true
time-primitives = { workspace = true, default-features = true }
tc-subxt-metadata = { path = "metadata" }

[dev-dependencies]
env_logger.workspace = true
tokio-stream = { version = "0.1", features = ["sync"]}


[features]
testnet = [
"time-primitives/testnet",
Expand Down
6 changes: 3 additions & 3 deletions tc-subxt/src/api/members.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,23 +84,23 @@ impl SubxtClient {
tx,
))?;
let tx = rx.await?;
self.wait_for_success(tx).await?;
self.is_success(&tx).await?;
Ok(())
}

pub async fn unregister_member(&self, member: AccountId) -> Result<()> {
let (tx, rx) = oneshot::channel();
self.tx.unbounded_send((Tx::UnregisterMember { member }, tx))?;
let tx = rx.await?;
self.wait_for_success(tx).await?;
self.is_success(&tx).await?;
Ok(())
}

pub async fn submit_heartbeat(&self) -> Result<()> {
let (tx, rx) = oneshot::channel();
self.tx.unbounded_send((Tx::Heartbeat, tx))?;
let tx = rx.await?;
self.wait_for_success(tx).await?;
self.is_success(&tx).await?;
Ok(())
}
}
4 changes: 2 additions & 2 deletions tc-subxt/src/api/networks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ impl SubxtClient {
let (tx, rx) = oneshot::channel();
self.tx.unbounded_send((Tx::RegisterNetwork { network }, tx))?;
let tx = rx.await?;
self.wait_for_success(tx).await?;
self.is_success(&tx).await?;
Ok(())
}

Expand All @@ -21,7 +21,7 @@ impl SubxtClient {
let (tx, rx) = oneshot::channel();
self.tx.unbounded_send((Tx::SetNetworkConfig { network, config }, tx))?;
let tx = rx.await?;
self.wait_for_success(tx).await?;
self.is_success(&tx).await?;
Ok(())
}

Expand Down
6 changes: 3 additions & 3 deletions tc-subxt/src/api/shards.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,23 +84,23 @@ impl SubxtClient {
tx,
))?;
let tx = rx.await?;
self.wait_for_success(tx).await?;
self.is_success(&tx).await?;
Ok(())
}

pub async fn submit_online(&self, shard_id: ShardId) -> Result<()> {
let (tx, rx) = oneshot::channel();
self.tx.unbounded_send((Tx::Ready { shard_id }, tx))?;
let tx = rx.await?;
self.wait_for_success(tx).await?;
self.is_success(&tx).await?;
Ok(())
}

pub async fn force_shard_offline(&self, shard_id: ShardId) -> Result<()> {
let (tx, rx) = oneshot::channel();
self.tx.unbounded_send((Tx::ForceShardOffline { shard_id }, tx))?;
let tx = rx.await?;
self.wait_for_success(tx).await?;
self.is_success(&tx).await?;
Ok(())
}
}
6 changes: 3 additions & 3 deletions tc-subxt/src/api/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,23 +60,23 @@ impl SubxtClient {
let (tx, rx) = oneshot::channel();
self.tx.unbounded_send((Tx::SubmitTaskResult { task_id, result }, tx))?;
let tx = rx.await?;
self.wait_for_success(tx).await?;
self.is_success(&tx).await?;
Ok(())
}

pub async fn submit_gmp_events(&self, network: NetworkId, gmp_events: GmpEvents) -> Result<()> {
let (tx, rx) = oneshot::channel();
self.tx.unbounded_send((Tx::SubmitGmpEvents { network, gmp_events }, tx))?;
let tx = rx.await?;
self.wait_for_success(tx).await?;
self.is_success(&tx).await?;
Ok(())
}

pub async fn remove_task(&self, task_id: TaskId) -> Result<()> {
let (tx, rx) = oneshot::channel();
self.tx.unbounded_send((Tx::RemoveTask { task_id }, tx))?;
let tx = rx.await?;
self.wait_for_success(tx).await?;
self.is_success(&tx).await?;
Ok(())
}

Expand Down
94 changes: 94 additions & 0 deletions tc-subxt/src/db.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use anyhow::Result;
use redb::{Database, ReadableTable, TableDefinition};
use scale_codec::{Decode, Encode};
use std::collections::VecDeque;
use subxt::utils::H256;

use crate::{timechain_client::ITransactionDbOps, worker::TxData};

const TX_TABLE: TableDefinition<[u8; 64], &[u8]> = TableDefinition::new("pending_txs");

pub struct TransactionsDB {
db: Database,
public_key: [u8; 32],
}

impl TransactionsDB {
pub fn new(path: &str, public_key: [u8; 32]) -> Result<Self> {
let db = Database::create(path)?;

let write_tx = db.begin_write()?;
{
write_tx.open_table(TX_TABLE)?;
}
write_tx.commit()?;
Ok(Self { db, public_key })
}
}

impl ITransactionDbOps for TransactionsDB {
fn store_tx(&self, tx_data: &TxData) -> Result<()> {
let mut composite_key = [0u8; 64];
composite_key[..32].copy_from_slice(&self.public_key);
composite_key[32..].copy_from_slice(tx_data.hash.as_bytes());

let tx_value = tx_data.encode();

let write_tx = self.db.begin_write()?;
{
let mut table = write_tx.open_table(TX_TABLE)?;
table.insert(&composite_key, &*tx_value)?;
}
write_tx.commit()?;
Ok(())
}

fn remove_tx(&self, hash: H256) -> Result<()> {
let mut composite_key = [0u8; 64];
composite_key[..32].copy_from_slice(&self.public_key);
composite_key[32..].copy_from_slice(hash.as_bytes());

let write_tx = self.db.begin_write()?;
{
let mut table = write_tx.open_table(TX_TABLE)?;
table.remove(&composite_key)?;
}
write_tx.commit()?;
Ok(())
}

fn load_pending_txs(&self, nonce: u64) -> Result<VecDeque<TxData>> {
let write_tx = self.db.begin_write()?;
let mut pending_txs = Vec::new();

{
let mut table = write_tx.open_table(TX_TABLE)?;
let mut delete_keys = Vec::new();

let mut lower_bound = [0u8; 64];
lower_bound[..32].copy_from_slice(&self.public_key);
let mut upper_bound = [0xffu8; 64];
upper_bound[..32].copy_from_slice(&self.public_key);

for entry in table.range(lower_bound..=upper_bound)? {
let (key, value) = entry?;
let tx_data = TxData::decode(&mut value.value())?;
if tx_data.nonce < nonce {
let key_bytes: [u8; 64] = key.value();
delete_keys.push(key_bytes);
} else {
pending_txs.push(tx_data);
}
}

for key_bytes in delete_keys {
table.remove(&key_bytes)?;
}
}

write_tx.commit()?;

pending_txs.sort_by_key(|tx| tx.nonce);
Ok(pending_txs.into())
}
}
Loading

0 comments on commit 54a346c

Please sign in to comment.