Skip to content

Commit

Permalink
Split ReadGatewayEvents into multiple transactions (#1470)
Browse files Browse the repository at this point in the history
Co-authored-by: David Craven <[email protected]>
  • Loading branch information
4meta5 and dvc94ch authored Feb 21, 2025
1 parent 5c7fb73 commit e22634e
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 35 deletions.
6 changes: 3 additions & 3 deletions chronicle/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,14 +277,14 @@ mod tests {
tracing::info!("creating task");
// Create a task and assign it to the shard.
let task_id = mock.create_task(Task::ReadGatewayEvents { blocks: 0..1 });
tracing::info!("assigning task");
tracing::info!("assigning task {task_id} {shard_id}");
mock.assign_task(task_id, shard_id);
// Wait for the task to complete.
loop {
tracing::info!("waiting for task");
tracing::info!("waiting for task {task_id}");
let task = mock.task(task_id).unwrap();
if task.result.is_none() {
tokio::time::sleep(Duration::from_secs(1)).await;
tokio::time::sleep(Duration::from_secs(10)).await;
continue;
}
break;
Expand Down
1 change: 1 addition & 0 deletions chronicle/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ impl Runtime for Mock {
}

async fn submit_task_result(&self, task_id: TaskId, result: TaskResult) -> Result<()> {
tracing::info!("submit_task_result {task_id} {result:?}");
let mut tasks = self.tasks.lock().unwrap();
tasks.get_mut(&task_id).unwrap().result = Some(result);
Ok(())
Expand Down
81 changes: 56 additions & 25 deletions chronicle/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use scale_codec::Encode;
use std::sync::Arc;
use std::{collections::BTreeMap, pin::Pin};
use time_primitives::{
Address, BlockNumber, ErrorMsg, GmpEvents, GmpParams, IConnector, NetworkId, ShardId, Task,
TaskId, TaskResult, TssSignature, TssSigningRequest,
Address, BlockNumber, ErrorMsg, GmpEvent, GmpEvents, GmpParams, IConnector, NetworkId, ShardId,
Task, TaskId, TaskResult, TssSignature, TssSigningRequest, MAX_GMP_EVENTS,
};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -96,6 +96,24 @@ impl TaskParams {
Ok(true)
}

async fn submit_events(
&self,
block_number: BlockNumber,
shard_id: ShardId,
task_id: TaskId,
events: Vec<GmpEvent>,
span: &Span,
) -> Result<()> {
let payload = time_primitives::encode_gmp_events(task_id, &events);
let signature = self.tss_sign(block_number, shard_id, task_id, payload, span).await?;
let result = TaskResult::ReadGatewayEvents {
events: GmpEvents(BoundedVec::truncate_from(events)),
signature,
};
tracing::debug!(parent: span, "submitting task result",);
self.runtime.submit_task_result(task_id, result).await
}

#[allow(clippy::too_many_arguments)]
async fn execute(
self,
Expand All @@ -107,18 +125,20 @@ impl TaskParams {
task: Task,
span: Span,
) -> Result<()> {
let result = match task {
match task {
Task::ReadGatewayEvents { blocks } => {
let events =
self.connector.read_events(gateway, blocks).await.context("read_events")?;
tracing::info!(parent: &span, "read {} events", events.len(),);
let payload = time_primitives::encode_gmp_events(task_id, &events);
let signature =
self.tss_sign(block_number, shard_id, task_id, payload, &span).await?;
Some(TaskResult::ReadGatewayEvents {
events: GmpEvents(events),
signature,
})
tracing::info!(parent: &span, "read {} events", events.len());
let mut remaining = true;
for chunk in events.chunks(MAX_GMP_EVENTS as _) {
remaining = chunk.len() != MAX_GMP_EVENTS as usize;
self.submit_events(block_number, shard_id, task_id, chunk.to_vec(), &span)
.await?;
}
if remaining {
self.submit_events(block_number, shard_id, task_id, vec![], &span).await?;
}
},
Task::SubmitGatewayMessage { batch_id } => {
let msg =
Expand All @@ -134,23 +154,13 @@ impl TaskParams {
{
tracing::error!(parent: &span, batch_id, "Error while executing batch: {e}");
e.truncate(time_primitives::MAX_ERROR_LEN as usize - 4);
Some(TaskResult::SubmitGatewayMessage {
let result = TaskResult::SubmitGatewayMessage {
error: ErrorMsg(BoundedVec::truncate_from(e.encode())),
})
} else {
None
};
tracing::debug!(parent: &span, "submitting task result");
self.runtime.submit_task_result(task_id, result).await?;
}
},
};
if let Some(result) = result {
tracing::debug!(parent: &span, "submitting task result",);
if let Err(e) = self.runtime.submit_task_result(task_id, result).await {
tracing::error!(
parent: &span,
"error submitting task result: {:?}",
e
);
}
}
Ok(())
}
Expand Down Expand Up @@ -247,3 +257,24 @@ impl TaskExecutor {
Ok((start_sessions, completed_sessions, failed_tasks))
}
}

#[cfg(test)]
mod tests {
#[test]
fn test_event_chunking_preserves_all_events() {
const MAX_EVENTS: usize = 4;

// Create a sample list of events
let events: Vec<u32> = (1..=11).collect();

// Perform chunking
let event_chunks: Vec<Vec<_>> =
events.chunks(MAX_EVENTS).map(|chunk| chunk.to_vec()).collect();

// Flatten the chunks back into a single vector
let flattened_events: Vec<_> = event_chunks.iter().flatten().copied().collect();

// Ensure the original and reconstructed lists match
assert_eq!(events, flattened_events, "Chunking should not lose any events");
}
}
4 changes: 2 additions & 2 deletions pallets/tasks/src/benchmarking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ benchmarks! {
}: _(
RawOrigin::Signed([0u8; 32].into()),
task_id,
TaskResult::ReadGatewayEvents { events: GmpEvents(vec![]), signature: SIGNATURE }
TaskResult::ReadGatewayEvents { events: GmpEvents(BoundedVec::truncate_from(vec![])), signature: SIGNATURE }
) verify {
assert_eq!(TaskOutput::<T>::get(task_id), Some(Ok(())));
assert!(TaskShard::<T>::get(task_id).is_none());
Expand Down Expand Up @@ -129,7 +129,7 @@ benchmarks! {
}

submit_gmp_events {
}: _(RawOrigin::Root, ETHEREUM, GmpEvents(vec![]))
}: _(RawOrigin::Root, ETHEREUM, GmpEvents(BoundedVec::truncate_from(vec![])))
verify { }

sync_network {}: _(RawOrigin::Root, ETHEREUM, 100u64) verify { }
Expand Down
7 changes: 7 additions & 0 deletions pallets/tasks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub mod pallet {
AccountId, Balance, BatchBuilder, BatchId, ErrorMsg, GatewayMessage, GatewayOp, GmpEvent,
GmpEvents, Hash as TxHash, MessageId, NetworkId, NetworksInterface, PublicKey, ShardId,
ShardsInterface, Task, TaskId, TaskResult, TasksInterface, TssPublicKey, TssSignature,
MAX_GMP_EVENTS,
};

/// Trait to define the weights for various extrinsics in the pallet.
Expand Down Expand Up @@ -371,7 +372,13 @@ pub mod pallet {
Self::read_gateway_events(network);
}
// process events
let remaining = events.0.len() == MAX_GMP_EVENTS as usize;
Self::process_events(network, task_id, events);
if remaining {
// more events to submit in future transactions so task
// is NOT finished yet
return Ok(());
}
Ok(())
},
(
Expand Down
2 changes: 1 addition & 1 deletion pallets/tasks/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ fn register_shard(shard: ShardId) {
fn submit_gateway_events(shard: ShardId, task_id: TaskId, events: &[GmpEvent]) {
let signature = MockTssSigner::new(shard).sign_gmp_events(task_id, events);
let result = TaskResult::ReadGatewayEvents {
events: GmpEvents(events.to_vec()),
events: GmpEvents(BoundedVec::truncate_from(events.to_vec())),
signature,
};
assert_ok!(Tasks::submit_task_result(
Expand Down
6 changes: 3 additions & 3 deletions primitives/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,13 @@ pub fn encode_gmp_events(task_id: TaskId, events: &[GmpEvent]) -> Vec<u8> {
(task_id, events).encode()
}

//const MAX_GMP_EVENTS: u32 = 1_000;
pub const MAX_ERROR_LEN: u32 = 500;
pub const MAX_GMP_EVENTS: u32 = 10_000;
pub const MAX_ERROR_LEN: u32 = 10_000;

/// Bounded vec alias for GMP events submitted in results
#[cfg_attr(feature = "std", derive(Serialize, Deserialize))]
#[derive(Encode, Decode, TypeInfo, PartialEq, Eq, Clone, Debug)]
pub struct GmpEvents(pub Vec<GmpEvent>);
pub struct GmpEvents(pub BoundedVec<GmpEvent, ConstU32<MAX_GMP_EVENTS>>);
/// Bounded vec alias for SubmitGatewayMessage error
#[cfg_attr(feature = "std", derive(Serialize, Deserialize))]
#[derive(Encode, Decode, TypeInfo, PartialEq, Eq, Clone, Debug)]
Expand Down
2 changes: 1 addition & 1 deletion tc-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1033,7 +1033,7 @@ impl Tc {

pub async fn complete_batch(&self, network_id: NetworkId, batch_id: BatchId) -> Result<()> {
let gmp_event = GmpEvent::BatchExecuted { batch_id, tx_hash: None };
let events = GmpEvents(vec![gmp_event]);
let events = GmpEvents(BoundedVec::truncate_from(vec![gmp_event]));
self.runtime.submit_gmp_events(network_id, events).await
}

Expand Down

0 comments on commit e22634e

Please sign in to comment.