diff --git a/Cargo.lock b/Cargo.lock index b7575b5c3..bb402de19 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7390,6 +7390,7 @@ dependencies = [ "rstest", "serde", "serde_json", + "serial_test", "sysinfo", "tar", "tempfile", diff --git a/crates/topos/Cargo.toml b/crates/topos/Cargo.toml index b349761d5..02134c598 100644 --- a/crates/topos/Cargo.toml +++ b/crates/topos/Cargo.toml @@ -69,6 +69,7 @@ rstest = { workspace = true, features = ["async-timeout"] } tempfile = "3.8.0" predicates = "3.0.3" sysinfo = "0.29.11" +serial_test = {version = "0.9.0"} [features] default = [] diff --git a/crates/topos/tests/cert_delivery.rs b/crates/topos/tests/cert_delivery.rs index f1fdbe5f8..b328d18a7 100644 --- a/crates/topos/tests/cert_delivery.rs +++ b/crates/topos/tests/cert_delivery.rs @@ -2,12 +2,12 @@ use futures::{future::join_all, StreamExt}; use libp2p::PeerId; use rand::seq::{IteratorRandom, SliceRandom}; use rstest::*; +use serial_test::serial; use std::collections::{HashMap, HashSet}; use std::time::Duration; use test_log::test; use tokio::spawn; use tokio::sync::mpsc; -use tokio::time::error::Elapsed; use tonic::transport::Uri; use topos_core::{ api::grpc::{ @@ -22,6 +22,7 @@ use topos_core::{ }, uci::{Certificate, SubnetId, CERTIFICATE_ID_LENGTH, SUBNET_ID_LENGTH}, }; +use topos_tce_transport::ReliableBroadcastParams; use topos_test_sdk::{certificates::create_certificate_chains, tce::create_network}; use tracing::{debug, info, warn}; @@ -40,6 +41,7 @@ fn get_subset_of_subnets(subnets: &[SubnetId], subset_size: usize) -> Vec, - timeout: Duration, -) -> Result>, Elapsed> { - tokio::time::timeout(timeout, async move { - let random_peer: Uri = peers - .choose(&mut rand::thread_rng()) - .ok_or_else(|| { - vec![format!( - "Unable to select a random peer from the list: {peers:?}" - )] - })? - .try_into() - .map_err(|_| vec![format!("Unable to parse the peer address")])?; - - let pushed_certificate = Certificate::new_with_default_fields( - [0u8; CERTIFICATE_ID_LENGTH], - [1u8; SUBNET_ID_LENGTH].into(), - &[[2u8; SUBNET_ID_LENGTH].into()], - ) - .map_err(|_| vec![format!("Unable to create the certificate")])?; - let certificate_id = pushed_certificate.id; - - let mut join_handlers = Vec::new(); - - // check that all nodes delivered the certificate - for peer in peers { - join_handlers.push(tokio::spawn(async move { - let peer_string = peer.clone(); - let mut client = ConsoleServiceClient::connect(peer_string.clone()) - .await - .map_err(|_| (peer_string.clone(), "Unable to connect to the api console"))?; - - let result = client.status(StatusRequest {}).await.map_err(|_| { - ( - peer_string.clone(), - "Unable to get the status from the api console", - ) - })?; - - let status = result.into_inner(); - if !status.has_active_sample { - return Err((peer_string, "failed to find active sample")); - } +) -> Result<(), Box> { + use std::io::{Error, ErrorKind}; + let random_peer: Uri = peers + .choose(&mut rand::thread_rng()) + .ok_or_else(|| { + Error::new( + ErrorKind::Other, + "Unable to select a random peer from the list: {peers:?}", + ) + })? + .try_into()?; + + let pushed_certificate = Certificate::new_with_default_fields( + [0u8; CERTIFICATE_ID_LENGTH], + [1u8; SUBNET_ID_LENGTH].into(), + &[[2u8; SUBNET_ID_LENGTH].into()], + )?; + let certificate_id = pushed_certificate.id; + + let mut join_handlers = Vec::new(); + + // check that all nodes delivered the certificate + for peer in peers { + join_handlers.push(tokio::spawn(async move { + let peer_string = peer.clone(); + let mut client = ConsoleServiceClient::connect(peer_string.clone()) + .await + .map_err(|_| (peer_string.clone(), "Unable to connect to the api console"))?; + + let result = client.status(StatusRequest {}).await.map_err(|_| { + ( + peer_string.clone(), + "Unable to get the status from the api console", + ) + })?; - let mut client = ApiServiceClient::connect(peer_string.clone()) - .await - .map_err(|_| (peer_string.clone(), "Unable to connect to the TCE api"))?; - - let in_stream = async_stream::stream! { - yield OpenStream { - target_checkpoint: Some(TargetCheckpoint { - target_subnet_ids: vec![[2u8; SUBNET_ID_LENGTH].into()], - positions: vec![] - }), - source_checkpoint: None - }.into() - }; - - let response = client.watch_certificates(in_stream).await.map_err(|_| { - ( - peer_string.clone(), - "Unable to execute the watch_certificates on TCE api", - ) - })?; - let mut resp_stream = response.into_inner(); - async move { - while let Some(received) = resp_stream.next().await { - let received = received.unwrap(); - if let Some(Event::CertificatePushed(CertificatePushed { - certificate: Some(certificate), - .. - })) = received.event - { - // unwrap is safe because we are sure that the certificate is present - if certificate_id == certificate.id.unwrap() { - debug!("Received the certificate on {}", peer_string); - return Ok(()); - } + let status = result.into_inner(); + if !status.has_active_sample { + return Err((peer_string, "failed to find active sample")); + } + + let mut client = ApiServiceClient::connect(peer_string.clone()) + .await + .map_err(|_| (peer_string.clone(), "Unable to connect to the TCE api"))?; + + let in_stream = async_stream::stream! { + yield OpenStream { + target_checkpoint: Some(TargetCheckpoint { + target_subnet_ids: vec![[2u8; SUBNET_ID_LENGTH].into()], + positions: vec![] + }), + source_checkpoint: None + }.into() + }; + + let response = client.watch_certificates(in_stream).await.map_err(|_| { + ( + peer_string.clone(), + "Unable to execute the watch_certificates on TCE api", + ) + })?; + let mut resp_stream = response.into_inner(); + async move { + while let Some(received) = resp_stream.next().await { + let received = received.unwrap(); + if let Some(Event::CertificatePushed(CertificatePushed { + certificate: Some(certificate), + .. + })) = received.event + { + // unwrap is safe because we are sure that the certificate is present + if certificate_id == certificate.id.unwrap() { + debug!("Received the certificate on {}", peer_string); + return Ok(()); } } - - Err((peer_string.clone(), "didn't receive any certificate")) } - .await - })); - } - let mut client = ApiServiceClient::connect(random_peer.clone()) + Err((peer_string.clone(), "didn't receive any certificate")) + } .await - .map_err(|_| vec![format!("Unable to connect to the TCE api on {random_peer}")])?; + })); + } - // submit a certificate to one node - _ = client - .submit_certificate(SubmitCertificateRequest { - certificate: Some(pushed_certificate.into()), - }) - .await - .map_err(|_| { - vec![format!( - "Unable to submit the certificate to the TCE api on {random_peer}" - )] - })?; + let mut client = ApiServiceClient::connect(random_peer.clone()).await?; - tokio::time::sleep(Duration::from_secs(timeout_broadcast)).await; - let mut errors = vec![]; + // submit a certificate to one node + _ = client + .submit_certificate(SubmitCertificateRequest { + certificate: Some(pushed_certificate.into()), + }) + .await?; - join_all(join_handlers) - .await - .iter() - .for_each(|result| match result { - Err(_) => { - errors.push("Unable to properly execute command".to_string()); - } - Ok(Err((peer, error))) => { - errors.push(format!("{peer} {error}")); - } - _ => {} - }); + tokio::time::sleep(Duration::from_secs(timeout_broadcast)).await; - if errors.is_empty() { - Ok(()) - } else { - Err(errors) - } - }) - .await - .map_err(|error| { - warn!("Timeout reached: {:?}", error); - error - }) + join_all(join_handlers) + .await + .iter() + .for_each(|result| match result { + Err(e) => { + panic!("Join error: {e}"); + } + Ok(Err((peer, error))) => { + panic!("Peer {peer} error: {error}"); + } + _ => {} + }); + Ok(()) } -async fn run_check_certificate_delivery( +async fn run_assert_certificate_full_delivery( number_of_nodes: usize, timeout_broadcast: u64, - timeout: Duration, ) -> Result<(), Box> { let mut peers_context = create_network(number_of_nodes, vec![]).await; @@ -485,15 +466,12 @@ async fn run_check_certificate_delivery( .map_err(|e| format!("Unable to parse node list: {e}")) .expect("Valid node list"); - match check_certificate_delivery(timeout_broadcast, peers, timeout).await { - Ok(Err(e)) => { - panic!("Error with certificate delivery for network of {number_of_nodes}: {e:?}"); + match assert_certificate_full_delivery(timeout_broadcast, peers).await { + Ok(()) => { + info!("Check certificate delivery passed for network of {number_of_nodes}!"); } Err(e) => { - panic!("Timeout elapsed: {e}"); - } - Ok(_) => { - info!("Check certificate delivery passed for network of {number_of_nodes}!"); + panic!("Test error: {e}"); } } }; @@ -503,15 +481,14 @@ async fn run_check_certificate_delivery( } #[rstest] -#[test(tokio::test)] -#[timeout(Duration::from_secs(20))] -async fn push_and_deliver_cert_5() -> Result<(), Box> { - run_check_certificate_delivery(5, 5, Duration::from_secs(20)).await -} - -#[rstest] -#[test(tokio::test)] +#[case(5usize)] +#[case(9usize)] +#[test_log::test(tokio::test)] +#[trace] #[timeout(Duration::from_secs(20))] -async fn push_and_deliver_cert_9() -> Result<(), Box> { - run_check_certificate_delivery(9, 5, Duration::from_secs(20)).await +#[serial] +async fn push_and_deliver_cert( + #[case] number_of_nodes: usize, +) -> Result<(), Box> { + run_assert_certificate_full_delivery(number_of_nodes, 5).await }