Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry failed tasks via admin #1549

Merged
merged 12 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions pallets/tasks/src/benchmarking.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
BatchIdCounter, Call, Config, Pallet, ReadEventsTask, ShardRegistered, TaskIdCounter,
TaskOutput, TaskShard,
BatchIdCounter, BatchTaskId, Call, Config, FailedBatchIds, Pallet, ReadEventsTask,
ShardRegistered, TaskIdCounter, TaskNetwork, TaskOutput, TaskShard,
};
use frame_benchmarking::benchmarks;
use frame_support::pallet_prelude::Get;
Expand Down Expand Up @@ -143,5 +143,30 @@ benchmarks! {
TaskOutput::<T>::insert(task_id, Ok::<(), ErrorMsg>(()));
}: _(RawOrigin::Root, task_id) verify { }



restart_batch {
create_shard::<T>(ETHEREUM);
let network = ETHEREUM;
let batch_id = BatchIdCounter::<T>::get();
let initial_task_id = Pallet::<T>::create_task(
network,
Task::SubmitGatewayMessage { batch_id }
);
FailedBatchIds::<T>::insert(batch_id, ());
BatchTaskId::<T>::insert(batch_id, initial_task_id);
TaskNetwork::<T>::insert(initial_task_id, network);
}: _(RawOrigin::Root, batch_id) verify {
let new_task_id = initial_task_id + 1;
assert_eq!(
BatchTaskId::<T>::get(batch_id),
Some(new_task_id),
"New task not created"
);
assert!(
!FailedBatchIds::<T>::contains_key(batch_id),
"Batch not removed from failed list"
);
}
impl_benchmark_test_suite!(Pallet, crate::mock::new_test_ext(), crate::mock::Test);
}
37 changes: 35 additions & 2 deletions pallets/tasks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ pub mod pallet {
use crate::queue::*;

use polkadot_sdk::{
frame_support, frame_system, pallet_balances, pallet_treasury, sp_runtime, sp_std,
frame_support::{self, Blake2_128Concat},
frame_system, pallet_balances, pallet_treasury, sp_runtime, sp_std,
};

use frame_support::{
Expand Down Expand Up @@ -82,6 +83,7 @@ pub mod pallet {
fn sync_network() -> Weight;
fn stop_network() -> Weight;
fn remove_task() -> Weight;
fn restart_batch() -> Weight;
}

impl WeightInfo for () {
Expand All @@ -106,6 +108,9 @@ pub mod pallet {
fn remove_task() -> Weight {
Weight::default()
}
fn restart_batch() -> Weight {
Weight::default()
}
}

#[pallet::pallet]
Expand Down Expand Up @@ -260,6 +265,10 @@ pub mod pallet {
#[pallet::storage]
pub type BatchTaskId<T: Config> = StorageMap<_, Blake2_128Concat, BatchId, TaskId, OptionQuery>;

/// List of failed batches.
#[pallet::storage]
pub type FailedBatchIds<T: Config> = StorageMap<_, Blake2_128Concat, BatchId, (), OptionQuery>;

/// TxHash of the batch executed.
///
/// It can `None` either if the BatchExecuted event was not received or it was received but tx_hash was `None`.
Expand All @@ -282,6 +291,8 @@ pub mod pallet {
ShardTaskLimitSet(NetworkId, u32),
/// Set the network batch size
BatchSizeSet(NetworkId, u64, u64),
/// Batch Restarted (old_task_id, new_task_id)
BatchRestarted(TaskId, TaskId),
/// Insufficient Treasury Balance to payout rewards
InsufficientTreasuryBalance(AccountId, Balance),
/// Message received
Expand Down Expand Up @@ -363,11 +374,15 @@ pub mod pallet {
Self::process_events(network, task_id, events);
Ok(())
},
(Task::SubmitGatewayMessage { .. }, TaskResult::SubmitGatewayMessage { error }) => {
(
Task::SubmitGatewayMessage { batch_id },
TaskResult::SubmitGatewayMessage { error },
) => {
// verify signature
let expected_signer =
TaskSubmitter::<T>::get(task_id).map(|s| s.into_account());
ensure!(Some(&signer) == expected_signer.as_ref(), Error::<T>::InvalidSigner);
FailedBatchIds::<T>::insert(batch_id, ());
Err(error)
},
(_, _) => return Err(Error::<T>::InvalidTaskResult.into()),
Expand Down Expand Up @@ -434,6 +449,19 @@ pub mod pallet {
TaskSubmitter::<T>::remove(task);
Ok(())
}

#[pallet::call_index(14)]
#[pallet::weight(<T as Config>::WeightInfo::restart_batch())]
pub fn restart_batch(origin: OriginFor<T>, batch_id: BatchId) -> DispatchResult {
T::AdminOrigin::ensure_origin(origin)?;
let old_task_id = BatchTaskId::<T>::get(batch_id).ok_or(Error::<T>::InvalidBatchId)?;
let network = TaskNetwork::<T>::get(old_task_id).ok_or(Error::<T>::UnknownTask)?;
let new_task_id = Self::create_task(network, Task::SubmitGatewayMessage { batch_id });
BatchTaskId::<T>::insert(batch_id, new_task_id);
FailedBatchIds::<T>::remove(batch_id);
Self::deposit_event(Event::BatchRestarted(old_task_id, new_task_id));
Ok(())
}
}

impl<T: Config> Pallet<T> {
Expand Down Expand Up @@ -765,6 +793,11 @@ pub mod pallet {
pub fn get_batch_message(batch: BatchId) -> Option<GatewayMessage> {
BatchMessage::<T>::get(batch)
}

/// Get all failed batch IDs
pub fn get_failed_tasks() -> Vec<BatchId> {
FailedBatchIds::<T>::iter_keys().collect()
}
}

impl<T: Config> TasksInterface for Pallet<T> {
Expand Down
33 changes: 32 additions & 1 deletion pallets/tasks/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::mock::*;
use crate::{mock::*, BatchTaskId, Event, FailedBatchIds};
use crate::{BatchIdCounter, BatchTxHash, ShardRegistered};

use frame_support::assert_ok;
Expand Down Expand Up @@ -405,6 +405,37 @@ fn test_task_stuck_in_unassigned_queue() {
})
}

#[test]
fn test_restart_failed_batch() {
new_test_ext().execute_with(|| {
register_gateway(ETHEREUM, 42);
let shard = create_shard(ETHEREUM, 3, 1);
roll(1);
let batch_id = 0;
let initial_task_id = 2;
Tasks::assign_task(shard, 2);
assert_eq!(Tasks::get_task(initial_task_id), Some(Task::SubmitGatewayMessage { batch_id }));
roll(1);
let submitter = Tasks::get_task_submitter(initial_task_id).unwrap();
submit_submission_error(submitter, initial_task_id, "batch failed");
assert!(FailedBatchIds::<Test>::contains_key(batch_id));
assert_ok!(Tasks::restart_batch(RawOrigin::Root.into(), batch_id));
let new_task_id = 3;
assert_eq!(Tasks::get_task(new_task_id), Some(Task::SubmitGatewayMessage { batch_id }));
assert_eq!(BatchTaskId::<Test>::get(batch_id), Some(new_task_id));
assert!(!FailedBatchIds::<Test>::contains_key(batch_id));
assert!(Tasks::get_task_result(initial_task_id).is_some());
let event = System::events().into_iter().find_map(|r| {
if let RuntimeEvent::Tasks(Event::BatchRestarted(old, new)) = r.event {
Some((old, new))
} else {
None
}
});
assert_eq!(event, Some((initial_task_id, new_task_id)));
});
}

mod bench_helper {
use super::*;

Expand Down
1 change: 1 addition & 0 deletions primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ sp_api::decl_runtime_apis! {
fn get_task_submitter(task_id: TaskId) -> Option<PublicKey>;
fn get_task_result(task_id: TaskId) -> Option<Result<(), ErrorMsg>>;
fn get_batch_message(batch_id: BatchId) -> Option<GatewayMessage>;
fn get_failed_tasks() -> Vec<TaskId>;
}

pub trait SubmitTransactionApi{
Expand Down
4 changes: 4 additions & 0 deletions runtime/src/apis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,10 @@ impl_runtime_apis! {
fn get_batch_message(batch_id: BatchId) -> Option<GatewayMessage> {
Tasks::get_batch_message(batch_id)
}

fn get_failed_tasks() -> Vec<TaskId> {
Tasks::get_failed_tasks()
}
}

#[cfg(feature = "testnet")]
Expand Down
18 changes: 18 additions & 0 deletions runtime/src/weights/develop/pallet_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,4 +208,22 @@ impl<T: frame_system::Config> pallet_tasks::WeightInfo for WeightInfo<T> {
.saturating_add(T::DbWeight::get().reads(2))
.saturating_add(T::DbWeight::get().writes(4))
}
/// Storage: `Tasks::TaskOutput` (r:1 w:1)
/// Proof: `Tasks::TaskOutput` (`max_values`: None, `max_size`: None, mode: `Measured`)
/// Storage: `Tasks::Tasks` (r:1 w:1)
/// Proof: `Tasks::Tasks` (`max_values`: None, `max_size`: None, mode: `Measured`)
/// Storage: `Tasks::TaskSubmitter` (r:0 w:1)
/// Proof: `Tasks::TaskSubmitter` (`max_values`: None, `max_size`: None, mode: `Measured`)
/// Storage: `Tasks::TaskNetwork` (r:0 w:1)
/// Proof: `Tasks::TaskNetwork` (`max_values`: None, `max_size`: None, mode: `Measured`)
fn restart_batch() -> Weight {
// Proof Size summary in bytes:
// Measured: `286`
// Estimated: `3751`
// Minimum execution time: 21_431_000 picoseconds.
Weight::from_parts(25_146_000, 0)
.saturating_add(Weight::from_parts(0, 3751))
.saturating_add(T::DbWeight::get().reads(2))
.saturating_add(T::DbWeight::get().writes(4))
}
}
18 changes: 18 additions & 0 deletions runtime/src/weights/mainnet/pallet_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,4 +210,22 @@ impl<T: frame_system::Config> pallet_tasks::WeightInfo for WeightInfo<T> {
.saturating_add(T::DbWeight::get().reads(2))
.saturating_add(T::DbWeight::get().writes(4))
}
/// Storage: `Tasks::TaskOutput` (r:1 w:1)
/// Proof: `Tasks::TaskOutput` (`max_values`: None, `max_size`: None, mode: `Measured`)
/// Storage: `Tasks::Tasks` (r:1 w:1)
/// Proof: `Tasks::Tasks` (`max_values`: None, `max_size`: None, mode: `Measured`)
/// Storage: `Tasks::TaskSubmitter` (r:0 w:1)
/// Proof: `Tasks::TaskSubmitter` (`max_values`: None, `max_size`: None, mode: `Measured`)
/// Storage: `Tasks::TaskNetwork` (r:0 w:1)
/// Proof: `Tasks::TaskNetwork` (`max_values`: None, `max_size`: None, mode: `Measured`)
fn restart_batch() -> Weight {
// Proof Size summary in bytes:
// Measured: `286`
// Estimated: `3751`
// Minimum execution time: 8_666_000 picoseconds.
Weight::from_parts(8_986_000, 0)
.saturating_add(Weight::from_parts(0, 3751))
.saturating_add(T::DbWeight::get().reads(2))
.saturating_add(T::DbWeight::get().writes(4))
}
}
18 changes: 18 additions & 0 deletions runtime/src/weights/testnet/pallet_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,4 +210,22 @@ impl<T: frame_system::Config> pallet_tasks::WeightInfo for WeightInfo<T> {
.saturating_add(T::DbWeight::get().reads(2))
.saturating_add(T::DbWeight::get().writes(4))
}
/// Storage: `Tasks::TaskOutput` (r:1 w:1)
/// Proof: `Tasks::TaskOutput` (`max_values`: None, `max_size`: None, mode: `Measured`)
/// Storage: `Tasks::Tasks` (r:1 w:1)
/// Proof: `Tasks::Tasks` (`max_values`: None, `max_size`: None, mode: `Measured`)
/// Storage: `Tasks::TaskSubmitter` (r:0 w:1)
/// Proof: `Tasks::TaskSubmitter` (`max_values`: None, `max_size`: None, mode: `Measured`)
/// Storage: `Tasks::TaskNetwork` (r:0 w:1)
/// Proof: `Tasks::TaskNetwork` (`max_values`: None, `max_size`: None, mode: `Measured`)
fn restart_batch() -> Weight {
// Proof Size summary in bytes:
// Measured: `286`
// Estimated: `3751`
// Minimum execution time: 12_533_000 picoseconds.
Weight::from_parts(12_894_000, 0)
.saturating_add(Weight::from_parts(0, 3751))
.saturating_add(T::DbWeight::get().reads(2))
.saturating_add(T::DbWeight::get().writes(4))
}
}
14 changes: 14 additions & 0 deletions tc-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,15 @@ impl Tc {
Ok(tasks)
}

pub async fn get_failed_batches(&self) -> Result<Vec<Batch>> {
let batch_ids = self.runtime.get_failed_tasks().await?;
let mut batches = Vec::with_capacity(batch_ids.len());
for id in batch_ids {
batches.push(self.batch(id).await?);
}
Ok(batches)
}

pub async fn members(&self, shard: ShardId) -> Result<Vec<Member>> {
let shard_members = self.runtime.shard_members(shard).await?;
let mut members = Vec::with_capacity(shard_members.len());
Expand Down Expand Up @@ -859,6 +868,11 @@ impl Tc {
self.runtime.force_shard_offline(shard).await?;
Ok(())
}

pub async fn restart_failed_batch(&self, batch_id: BatchId) -> Result<()> {
self.runtime.restart_failed_batch(batch_id).await?;
Ok(())
}
}

impl Tc {
Expand Down
11 changes: 11 additions & 0 deletions tc-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ enum Command {
AssignedTasks {
shard: ShardId,
},
FailedBatches,
TransactionBaseFee {
network: NetworkId,
},
Expand Down Expand Up @@ -128,6 +129,9 @@ enum Command {
RegisterShards {
network: NetworkId,
},
RetryFailedBatch {
batch_id: BatchId,
},
SetGatewayAdmin {
network: NetworkId,
admin: String,
Expand Down Expand Up @@ -292,6 +296,10 @@ async fn real_main() -> Result<()> {
let tasks = tc.assigned_tasks(shard).await?;
tc.print_table(None, "assigned-tasks", tasks).await?;
},
Command::FailedBatches => {
let batches = tc.get_failed_batches().await?;
tc.print_table(None, "failed-batches", batches).await?;
},
Command::TransactionBaseFee { network } => {
let base_fee = tc.transaction_base_fee(network).await?;
tc.println(
Expand Down Expand Up @@ -518,6 +526,9 @@ async fn real_main() -> Result<()> {
let output = tc.debug_transaction(network, hash).await?;
tc.println(None, output).await?;
},
Command::RetryFailedBatch { batch_id } => {
tc.restart_failed_batch(batch_id).await?;
},
}
tracing::info!("executed query in {}s", now.elapsed().unwrap().as_secs());
Ok(())
Expand Down
13 changes: 13 additions & 0 deletions tc-subxt/src/api/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ impl SubxtClient {
Ok(self.client.runtime_api().at_latest().await?.call(runtime_call).await?)
}

pub async fn get_failed_tasks(&self) -> Result<Vec<TaskId>> {
let runtime_call = metadata::apis().tasks_api().get_failed_tasks();
Ok(self.client.runtime_api().at_latest().await?.call(runtime_call).await?)
}

pub async fn unassigned_tasks(&self, network: NetworkId) -> Result<Vec<TaskId>> {
let storage_query = metadata::storage().tasks().ua_tasks_iter1(network);
let mut items = self.client.storage().at_latest().await?.iter(storage_query).await?;
Expand Down Expand Up @@ -80,6 +85,14 @@ impl SubxtClient {
Ok(())
}

pub async fn restart_failed_batch(&self, batch_id: TaskId) -> Result<()> {
let (tx, rx) = oneshot::channel();
self.tx.unbounded_send((Tx::RestartBatch { batch_id }, tx))?;
let tx = rx.await?;
self.is_success(&tx).await?;
Ok(())
}

pub async fn read_events_task(&self, network: NetworkId) -> Result<Option<TaskId>> {
let storage_query = metadata::storage().tasks().read_events_task(network);
Ok(self.client.storage().at_latest().await?.fetch(&storage_query).await?)
Expand Down
11 changes: 11 additions & 0 deletions tc-subxt/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::pin::Pin;
use subxt::config::DefaultExtrinsicParamsBuilder;
use subxt::utils::H256;
use subxt_signer::sr25519::Keypair;
use time_primitives::BatchId;
use time_primitives::{
traits::IdentifyAccount, AccountId, Commitment, GmpEvents, Network, NetworkConfig, NetworkId,
PeerId, ProofOfKnowledge, PublicKey, ShardId, TaskId, TaskResult,
Expand Down Expand Up @@ -78,6 +79,9 @@ pub enum Tx {
RemoveTask {
task_id: TaskId,
},
RestartBatch {
batch_id: BatchId,
},
}

#[derive(Clone, Encode, Decode)]
Expand Down Expand Up @@ -260,6 +264,13 @@ where
let payload = metadata::sudo(runtime_call);
self.client.sign_payload(&payload, params)
},
Tx::RestartBatch { batch_id } => {
let runtime_call = RuntimeCall::Tasks(
metadata::runtime_types::pallet_tasks::pallet::Call::restart_batch { batch_id },
);
let payload = metadata::sudo(runtime_call);
self.client.sign_payload(&payload, params)
},
}
}

Expand Down
Loading