Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split ReadGatewayEvents into multiple transactions #1470

Merged
merged 31 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
0360f33
init chronicle changes todo store events not submitted
4meta5 Jan 30, 2025
13fd68d
clean
4meta5 Jan 30, 2025
07d3bc2
update pallet tasks to work with expected changes
4meta5 Jan 30, 2025
634f793
clean
4meta5 Jan 31, 2025
7826c1f
Merge branch 'development' into amar-bound-read-events-v0
4meta5 Jan 31, 2025
9ee2d41
Update chronicle/src/tasks/mod.rs
4meta5 Jan 31, 2025
042ee1d
update tasks pallet benchmarks
4meta5 Jan 31, 2025
3e914b1
update pallet tasks tests
4meta5 Jan 31, 2025
f56356b
clippy fix
4meta5 Feb 2, 2025
6ba6587
bound gmp events to bound submit result inputs altogether
4meta5 Feb 2, 2025
2ed6b20
update tc cli
4meta5 Feb 2, 2025
83bb477
Merge branch 'development' into amar-bound-read-events-v0
4meta5 Feb 3, 2025
ddedaa3
Merge branch 'development' into amar-bound-read-events-v0
4meta5 Feb 3, 2025
389772b
Merge branch 'development' into amar-bound-read-events-v0
4meta5 Feb 4, 2025
4e67e49
Merge branch 'development' into amar-bound-read-events-v0
4meta5 Feb 4, 2025
e906e6f
Merge branch 'development' into amar-bound-read-events-v0
dvc94ch Feb 6, 2025
5e71f88
push test to ensure that chunking events does not throw out any events
4meta5 Feb 6, 2025
b052b78
Merge branch 'development' into amar-bound-read-events-v0
4meta5 Feb 6, 2025
4f7631a
Merge branch 'development' into amar-bound-read-events-v0
4meta5 Feb 7, 2025
5ed4365
Merge branch 'development' into amar-bound-read-events-v0
4meta5 Feb 9, 2025
a48f1ac
Merge branch 'development' into amar-bound-read-events-v0
4meta5 Feb 10, 2025
28e25fb
Merge branch 'development' into amar-bound-read-events-v0
4meta5 Feb 11, 2025
1222fd9
Merge branch 'development' into amar-bound-read-events-v0
4meta5 Feb 11, 2025
b48ae87
Merge branch 'development' into amar-bound-read-events-v0
4meta5 Feb 12, 2025
3cab0ae
Merge branch 'development' into amar-bound-read-events-v0
4meta5 Feb 12, 2025
e5b8a5d
Merge branch 'development' into amar-bound-read-events-v0
dvc94ch Feb 16, 2025
2c943eb
Merge branch 'development' into amar-bound-read-events-v0
4meta5 Feb 17, 2025
9b479ce
Merge branch 'development' into amar-bound-read-events-v0
4meta5 Feb 19, 2025
5d4ec42
fmt
4meta5 Feb 19, 2025
4a44def
Merge branch 'development' into amar-bound-read-events-v0
4meta5 Feb 20, 2025
e26d600
Fix submission.
dvc94ch Feb 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions chronicle/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,14 +277,14 @@ mod tests {
tracing::info!("creating task");
// Create a task and assign it to the shard.
let task_id = mock.create_task(Task::ReadGatewayEvents { blocks: 0..1 });
tracing::info!("assigning task");
tracing::info!("assigning task {task_id} {shard_id}");
mock.assign_task(task_id, shard_id);
// Wait for the task to complete.
loop {
tracing::info!("waiting for task");
tracing::info!("waiting for task {task_id}");
let task = mock.task(task_id).unwrap();
if task.result.is_none() {
tokio::time::sleep(Duration::from_secs(1)).await;
tokio::time::sleep(Duration::from_secs(10)).await;
continue;
}
break;
Expand Down
1 change: 1 addition & 0 deletions chronicle/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ impl Runtime for Mock {
}

async fn submit_task_result(&self, task_id: TaskId, result: TaskResult) -> Result<()> {
tracing::info!("submit_task_result {task_id} {result:?}");
let mut tasks = self.tasks.lock().unwrap();
tasks.get_mut(&task_id).unwrap().result = Some(result);
Ok(())
Expand Down
81 changes: 56 additions & 25 deletions chronicle/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use scale_codec::Encode;
use std::sync::Arc;
use std::{collections::BTreeMap, pin::Pin};
use time_primitives::{
Address, BlockNumber, ErrorMsg, GmpEvents, GmpParams, IConnector, NetworkId, ShardId, Task,
TaskId, TaskResult, TssSignature, TssSigningRequest,
Address, BlockNumber, ErrorMsg, GmpEvent, GmpEvents, GmpParams, IConnector, NetworkId, ShardId,
Task, TaskId, TaskResult, TssSignature, TssSigningRequest, MAX_GMP_EVENTS,
};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -96,6 +96,24 @@ impl TaskParams {
Ok(true)
}

async fn submit_events(
&self,
block_number: BlockNumber,
shard_id: ShardId,
task_id: TaskId,
events: Vec<GmpEvent>,
span: &Span,
) -> Result<()> {
let payload = time_primitives::encode_gmp_events(task_id, &events);
let signature = self.tss_sign(block_number, shard_id, task_id, payload, span).await?;
let result = TaskResult::ReadGatewayEvents {
events: GmpEvents(BoundedVec::truncate_from(events)),
signature,
};
tracing::debug!(parent: span, "submitting task result",);
self.runtime.submit_task_result(task_id, result).await
}

#[allow(clippy::too_many_arguments)]
async fn execute(
self,
Expand All @@ -107,18 +125,20 @@ impl TaskParams {
task: Task,
span: Span,
) -> Result<()> {
let result = match task {
match task {
Task::ReadGatewayEvents { blocks } => {
let events =
self.connector.read_events(gateway, blocks).await.context("read_events")?;
tracing::info!(parent: &span, "read {} events", events.len(),);
let payload = time_primitives::encode_gmp_events(task_id, &events);
let signature =
self.tss_sign(block_number, shard_id, task_id, payload, &span).await?;
Some(TaskResult::ReadGatewayEvents {
events: GmpEvents(events),
signature,
})
tracing::info!(parent: &span, "read {} events", events.len());
let mut remaining = true;
for chunk in events.chunks(MAX_GMP_EVENTS as _) {
remaining = chunk.len() != MAX_GMP_EVENTS as usize;
self.submit_events(block_number, shard_id, task_id, chunk.to_vec(), &span)
.await?;
}
if remaining {
self.submit_events(block_number, shard_id, task_id, vec![], &span).await?;
}
},
Task::SubmitGatewayMessage { batch_id } => {
let msg =
Expand All @@ -134,23 +154,13 @@ impl TaskParams {
{
tracing::error!(parent: &span, batch_id, "Error while executing batch: {e}");
e.truncate(time_primitives::MAX_ERROR_LEN as usize - 4);
Some(TaskResult::SubmitGatewayMessage {
let result = TaskResult::SubmitGatewayMessage {
error: ErrorMsg(BoundedVec::truncate_from(e.encode())),
})
} else {
None
};
tracing::debug!(parent: &span, "submitting task result");
self.runtime.submit_task_result(task_id, result).await?;
}
},
};
if let Some(result) = result {
tracing::debug!(parent: &span, "submitting task result",);
if let Err(e) = self.runtime.submit_task_result(task_id, result).await {
tracing::error!(
parent: &span,
"error submitting task result: {:?}",
e
);
}
}
Ok(())
}
Expand Down Expand Up @@ -247,3 +257,24 @@ impl TaskExecutor {
Ok((start_sessions, completed_sessions, failed_tasks))
}
}

#[cfg(test)]
mod tests {
#[test]
fn test_event_chunking_preserves_all_events() {
const MAX_EVENTS: usize = 4;

// Create a sample list of events
let events: Vec<u32> = (1..=11).collect();

// Perform chunking
let event_chunks: Vec<Vec<_>> =
events.chunks(MAX_EVENTS).map(|chunk| chunk.to_vec()).collect();

// Flatten the chunks back into a single vector
let flattened_events: Vec<_> = event_chunks.iter().flatten().copied().collect();

// Ensure the original and reconstructed lists match
assert_eq!(events, flattened_events, "Chunking should not lose any events");
}
}
4 changes: 2 additions & 2 deletions pallets/tasks/src/benchmarking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ benchmarks! {
}: _(
RawOrigin::Signed([0u8; 32].into()),
task_id,
TaskResult::ReadGatewayEvents { events: GmpEvents(vec![]), signature: SIGNATURE }
TaskResult::ReadGatewayEvents { events: GmpEvents(BoundedVec::truncate_from(vec![])), signature: SIGNATURE }
) verify {
assert_eq!(TaskOutput::<T>::get(task_id), Some(Ok(())));
assert!(TaskShard::<T>::get(task_id).is_none());
Expand Down Expand Up @@ -129,7 +129,7 @@ benchmarks! {
}

submit_gmp_events {
}: _(RawOrigin::Root, ETHEREUM, GmpEvents(vec![]))
}: _(RawOrigin::Root, ETHEREUM, GmpEvents(BoundedVec::truncate_from(vec![])))
verify { }

sync_network {}: _(RawOrigin::Root, ETHEREUM, 100u64) verify { }
Expand Down
7 changes: 7 additions & 0 deletions pallets/tasks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub mod pallet {
AccountId, Balance, BatchBuilder, BatchId, ErrorMsg, GatewayMessage, GatewayOp, GmpEvent,
GmpEvents, Hash as TxHash, MessageId, NetworkId, NetworksInterface, PublicKey, ShardId,
ShardsInterface, Task, TaskId, TaskResult, TasksInterface, TssPublicKey, TssSignature,
MAX_GMP_EVENTS,
};

/// Trait to define the weights for various extrinsics in the pallet.
Expand Down Expand Up @@ -371,7 +372,13 @@ pub mod pallet {
Self::read_gateway_events(network);
}
// process events
let remaining = events.0.len() == MAX_GMP_EVENTS as usize;
Self::process_events(network, task_id, events);
if remaining {
// more events to submit in future transactions so task
// is NOT finished yet
return Ok(());
}
Ok(())
},
(
Expand Down
2 changes: 1 addition & 1 deletion pallets/tasks/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ fn register_shard(shard: ShardId) {
fn submit_gateway_events(shard: ShardId, task_id: TaskId, events: &[GmpEvent]) {
let signature = MockTssSigner::new(shard).sign_gmp_events(task_id, events);
let result = TaskResult::ReadGatewayEvents {
events: GmpEvents(events.to_vec()),
events: GmpEvents(BoundedVec::truncate_from(events.to_vec())),
signature,
};
assert_ok!(Tasks::submit_task_result(
Expand Down
6 changes: 3 additions & 3 deletions primitives/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,13 @@ pub fn encode_gmp_events(task_id: TaskId, events: &[GmpEvent]) -> Vec<u8> {
(task_id, events).encode()
}

//const MAX_GMP_EVENTS: u32 = 1_000;
pub const MAX_ERROR_LEN: u32 = 500;
pub const MAX_GMP_EVENTS: u32 = 10_000;
pub const MAX_ERROR_LEN: u32 = 10_000;

/// Bounded vec alias for GMP events submitted in results
#[cfg_attr(feature = "std", derive(Serialize, Deserialize))]
#[derive(Encode, Decode, TypeInfo, PartialEq, Eq, Clone, Debug)]
pub struct GmpEvents(pub Vec<GmpEvent>);
pub struct GmpEvents(pub BoundedVec<GmpEvent, ConstU32<MAX_GMP_EVENTS>>);
/// Bounded vec alias for SubmitGatewayMessage error
#[cfg_attr(feature = "std", derive(Serialize, Deserialize))]
#[derive(Encode, Decode, TypeInfo, PartialEq, Eq, Clone, Debug)]
Expand Down
2 changes: 1 addition & 1 deletion tc-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1033,7 +1033,7 @@ impl Tc {

pub async fn complete_batch(&self, network_id: NetworkId, batch_id: BatchId) -> Result<()> {
let gmp_event = GmpEvent::BatchExecuted { batch_id, tx_hash: None };
let events = GmpEvents(vec![gmp_event]);
let events = GmpEvents(BoundedVec::truncate_from(vec![gmp_event]));
self.runtime.submit_gmp_events(network_id, events).await
}

Expand Down
Loading