Skip to content

Commit

Permalink
updates
Browse files Browse the repository at this point in the history
  • Loading branch information
haider-rs committed Feb 13, 2025
1 parent 9d0e2ea commit ec4c9d1
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 9 deletions.
4 changes: 2 additions & 2 deletions chronicle/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use scale_codec::Encode;
use std::sync::Arc;
use std::{collections::BTreeMap, pin::Pin};
use time_primitives::{
Address, BlockNumber, ErrorMsg, GmpEvents, GmpParams, IConnector, NetworkId, ShardId, Task,
TaskId, TaskResult, TssSignature, TssSigningRequest,
Address, BlockNumber, ErrorMsg, GatewayOp, GmpEvents, GmpParams, IConnector, NetworkId,
ShardId, Task, TaskId, TaskResult, TssSignature, TssSigningRequest,
};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
Expand Down
4 changes: 4 additions & 0 deletions runtime/src/apis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,10 @@ impl_runtime_apis! {
fn get_batch_message(batch_id: BatchId) -> Option<GatewayMessage> {
Tasks::get_batch_message(batch_id)
}

fn get_failed_tasks() -> Vec<TaskId> {
Tasks::get_failed_tasks()
}
}

#[cfg(feature = "testnet")]
Expand Down
17 changes: 11 additions & 6 deletions tc-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,13 +545,13 @@ impl Tc {
Ok(tasks)
}

pub async fn assigned_tasks(&self, shard: ShardId) -> Result<Vec<Task>> {
let task_ids = self.runtime.assigned_tasks(shard).await?;
let mut tasks = Vec::with_capacity(task_ids.len());
for id in task_ids {
tasks.push(self.task(id).await?);
pub async fn get_failed_batches(&self) -> Result<Vec<Batch>> {
let batch_ids = self.runtime.get_failed_tasks().await?;
let mut batches = Vec::with_capacity(batch_ids.len());
for id in batch_ids {
batches.push(self.batch(id).await?);
}
Ok(tasks)
Ok(batches)
}

pub async fn members(&self, shard: ShardId) -> Result<Vec<Member>> {
Expand Down Expand Up @@ -852,6 +852,11 @@ impl Tc {
self.runtime.force_shard_offline(shard).await?;
Ok(())
}

pub async fn restart_failed_batch(&self, batch_id: BatchId) -> Result<()> {
self.runtime.restart_failed_batch(batch_id).await?;
Ok(())
}
}

impl Tc {
Expand Down
11 changes: 11 additions & 0 deletions tc-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ enum Command {
AssignedTasks {
shard: ShardId,
},
FailedBatches,
TransactionBaseFee {
network: NetworkId,
},
Expand Down Expand Up @@ -128,6 +129,9 @@ enum Command {
RegisterShards {
network: NetworkId,
},
RetryFailedBatch {
batch_id: BatchId,
},
SetGatewayAdmin {
network: NetworkId,
admin: String,
Expand Down Expand Up @@ -269,6 +273,10 @@ async fn real_main() -> Result<()> {
let tasks = tc.assigned_tasks(shard).await?;
tc.print_table(None, "assigned-tasks", tasks).await?;
},
Command::FailedBatches => {
let batches = tc.get_failed_batches().await?;
tc.print_table(None, "failed-batches", batches).await?;
},
Command::TransactionBaseFee { network } => {
let base_fee = tc.transaction_base_fee(network).await?;
tc.println(
Expand Down Expand Up @@ -461,6 +469,9 @@ async fn real_main() -> Result<()> {
let address = tc.parse_address(Some(network), &address)?;
tc.withdraw_funds(network, amount, address).await?;
},
Command::RetryFailedBatch { batch_id } => {
tc.restart_failed_batch(batch_id).await?;
},
}
tracing::info!("executed query in {}s", now.elapsed().unwrap().as_secs());
Ok(())
Expand Down
10 changes: 9 additions & 1 deletion tc-subxt/src/api/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl SubxtClient {
Ok(self.client.runtime_api().at_latest().await?.call(runtime_call).await?)
}

pub async fn list_failed_tasks(&self) -> Result<Vec<TaskId>> {
pub async fn get_failed_tasks(&self) -> Result<Vec<TaskId>> {
let runtime_call = metadata::apis().tasks_api().get_failed_tasks();
Ok(self.client.runtime_api().at_latest().await?.call(runtime_call).await?)
}
Expand Down Expand Up @@ -85,6 +85,14 @@ impl SubxtClient {
Ok(())
}

pub async fn restart_failed_batch(&self, batch_id: TaskId) -> Result<()> {
let (tx, rx) = oneshot::channel();
self.tx.unbounded_send((Tx::RestartBatch { batch_id }, tx))?;
let tx = rx.await?;
self.wait_for_success(tx).await?;
Ok(())
}

pub async fn read_events_task(&self, network: NetworkId) -> Result<Option<TaskId>> {
let storage_query = metadata::storage().tasks().read_events_task(network);
Ok(self.client.storage().at_latest().await?.fetch(&storage_query).await?)
Expand Down
11 changes: 11 additions & 0 deletions tc-subxt/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use subxt::config::DefaultExtrinsicParamsBuilder;
use subxt::tx::Payload as TxPayload;
use subxt::utils::H256;
use subxt_signer::sr25519::Keypair;
use time_primitives::BatchId;
use time_primitives::{
traits::IdentifyAccount, AccountId, Commitment, GmpEvents, Network, NetworkConfig, NetworkId,
PeerId, ProofOfKnowledge, PublicKey, ShardId, TaskId, TaskResult,
Expand Down Expand Up @@ -77,6 +78,9 @@ pub enum Tx {
RemoveTask {
task_id: TaskId,
},
RestartBatch {
batch_id: BatchId,
},
}

pub struct TxData {
Expand Down Expand Up @@ -259,6 +263,13 @@ impl SubxtWorker {
let payload = metadata::sudo(runtime_call);
self.create_signed_payload(&payload, params)
},
Tx::RestartBatch { batch_id } => {
let runtime_call = RuntimeCall::Tasks(
metadata::runtime_types::pallet_tasks::pallet::Call::restart_batch { batch_id },
);
let payload = metadata::sudo(runtime_call);
self.create_signed_payload(&payload, params)
},
}
}

Expand Down

0 comments on commit ec4c9d1

Please sign in to comment.