Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
fix: refactor test
Browse files Browse the repository at this point in the history
  • Loading branch information
atanmarko committed Dec 21, 2023
1 parent a97c98e commit 979e653
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 140 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/topos/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ rstest = { workspace = true, features = ["async-timeout"] }
tempfile = "3.8.0"
predicates = "3.0.3"
sysinfo = "0.29.11"
serial_test = {version = "0.9.0"}

[features]
default = []
257 changes: 117 additions & 140 deletions crates/topos/tests/cert_delivery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ use futures::{future::join_all, StreamExt};
use libp2p::PeerId;
use rand::seq::{IteratorRandom, SliceRandom};
use rstest::*;
use serial_test::serial;
use std::collections::{HashMap, HashSet};
use std::time::Duration;
use test_log::test;
use tokio::spawn;
use tokio::sync::mpsc;
use tokio::time::error::Elapsed;
use tonic::transport::Uri;
use topos_core::{
api::grpc::{
Expand All @@ -22,6 +22,7 @@ use topos_core::{
},
uci::{Certificate, SubnetId, CERTIFICATE_ID_LENGTH, SUBNET_ID_LENGTH},
};
use topos_tce_transport::ReliableBroadcastParams;
use topos_test_sdk::{certificates::create_certificate_chains, tce::create_network};
use tracing::{debug, info, warn};

Expand All @@ -40,6 +41,7 @@ fn get_subset_of_subnets(subnets: &[SubnetId], subset_size: usize) -> Vec<Subnet
#[rstest]
#[test(tokio::test)]
#[timeout(Duration::from_secs(10))]
#[serial]
async fn start_a_cluster() {
let mut peers_context = create_network(5, vec![]).await;

Expand All @@ -61,6 +63,7 @@ async fn start_a_cluster() {
#[rstest]
#[tokio::test]
#[timeout(Duration::from_secs(30))]
#[serial]
// FIXME: This test is flaky, it fails sometimes because of gRPC connection error (StreamClosed)
async fn cert_delivery() {
let subscriber = tracing_subscriber::FmtSubscriber::builder()
Expand Down Expand Up @@ -319,144 +322,122 @@ async fn cert_delivery() {
// 1. No errors, returns Ok
// 2. There were errors, returns a list of all errors encountered
// 3. timeout
async fn check_certificate_delivery(
async fn assert_certificate_full_delivery(
timeout_broadcast: u64,
peers: Vec<Uri>,
timeout: Duration,
) -> Result<Result<(), Vec<String>>, Elapsed> {
tokio::time::timeout(timeout, async move {
let random_peer: Uri = peers
.choose(&mut rand::thread_rng())
.ok_or_else(|| {
vec![format!(
"Unable to select a random peer from the list: {peers:?}"
)]
})?
.try_into()
.map_err(|_| vec![format!("Unable to parse the peer address")])?;

let pushed_certificate = Certificate::new_with_default_fields(
[0u8; CERTIFICATE_ID_LENGTH],
[1u8; SUBNET_ID_LENGTH].into(),
&[[2u8; SUBNET_ID_LENGTH].into()],
)
.map_err(|_| vec![format!("Unable to create the certificate")])?;
let certificate_id = pushed_certificate.id;

let mut join_handlers = Vec::new();

// check that all nodes delivered the certificate
for peer in peers {
join_handlers.push(tokio::spawn(async move {
let peer_string = peer.clone();
let mut client = ConsoleServiceClient::connect(peer_string.clone())
.await
.map_err(|_| (peer_string.clone(), "Unable to connect to the api console"))?;

let result = client.status(StatusRequest {}).await.map_err(|_| {
(
peer_string.clone(),
"Unable to get the status from the api console",
)
})?;

let status = result.into_inner();
if !status.has_active_sample {
return Err((peer_string, "failed to find active sample"));
}
) -> Result<(), Box<dyn std::error::Error>> {
use std::io::{Error, ErrorKind};
let random_peer: Uri = peers
.choose(&mut rand::thread_rng())
.ok_or_else(|| {
Error::new(
ErrorKind::Other,
"Unable to select a random peer from the list: {peers:?}",
)
})?
.try_into()?;

let pushed_certificate = Certificate::new_with_default_fields(
[0u8; CERTIFICATE_ID_LENGTH],
[1u8; SUBNET_ID_LENGTH].into(),
&[[2u8; SUBNET_ID_LENGTH].into()],
)?;
let certificate_id = pushed_certificate.id;

let mut join_handlers = Vec::new();

// check that all nodes delivered the certificate
for peer in peers {
join_handlers.push(tokio::spawn(async move {
let peer_string = peer.clone();
let mut client = ConsoleServiceClient::connect(peer_string.clone())
.await
.map_err(|_| (peer_string.clone(), "Unable to connect to the api console"))?;

let result = client.status(StatusRequest {}).await.map_err(|_| {
(
peer_string.clone(),
"Unable to get the status from the api console",
)
})?;

let mut client = ApiServiceClient::connect(peer_string.clone())
.await
.map_err(|_| (peer_string.clone(), "Unable to connect to the TCE api"))?;

let in_stream = async_stream::stream! {
yield OpenStream {
target_checkpoint: Some(TargetCheckpoint {
target_subnet_ids: vec![[2u8; SUBNET_ID_LENGTH].into()],
positions: vec![]
}),
source_checkpoint: None
}.into()
};

let response = client.watch_certificates(in_stream).await.map_err(|_| {
(
peer_string.clone(),
"Unable to execute the watch_certificates on TCE api",
)
})?;
let mut resp_stream = response.into_inner();
async move {
while let Some(received) = resp_stream.next().await {
let received = received.unwrap();
if let Some(Event::CertificatePushed(CertificatePushed {
certificate: Some(certificate),
..
})) = received.event
{
// unwrap is safe because we are sure that the certificate is present
if certificate_id == certificate.id.unwrap() {
debug!("Received the certificate on {}", peer_string);
return Ok(());
}
let status = result.into_inner();
if !status.has_active_sample {
return Err((peer_string, "failed to find active sample"));
}

let mut client = ApiServiceClient::connect(peer_string.clone())
.await
.map_err(|_| (peer_string.clone(), "Unable to connect to the TCE api"))?;

let in_stream = async_stream::stream! {
yield OpenStream {
target_checkpoint: Some(TargetCheckpoint {
target_subnet_ids: vec![[2u8; SUBNET_ID_LENGTH].into()],
positions: vec![]
}),
source_checkpoint: None
}.into()
};

let response = client.watch_certificates(in_stream).await.map_err(|_| {
(
peer_string.clone(),
"Unable to execute the watch_certificates on TCE api",
)
})?;
let mut resp_stream = response.into_inner();
async move {
while let Some(received) = resp_stream.next().await {
let received = received.unwrap();
if let Some(Event::CertificatePushed(CertificatePushed {
certificate: Some(certificate),
..
})) = received.event
{
// unwrap is safe because we are sure that the certificate is present
if certificate_id == certificate.id.unwrap() {
debug!("Received the certificate on {}", peer_string);
return Ok(());
}
}

Err((peer_string.clone(), "didn't receive any certificate"))
}
.await
}));
}

let mut client = ApiServiceClient::connect(random_peer.clone())
Err((peer_string.clone(), "didn't receive any certificate"))
}
.await
.map_err(|_| vec![format!("Unable to connect to the TCE api on {random_peer}")])?;
}));
}

// submit a certificate to one node
_ = client
.submit_certificate(SubmitCertificateRequest {
certificate: Some(pushed_certificate.into()),
})
.await
.map_err(|_| {
vec![format!(
"Unable to submit the certificate to the TCE api on {random_peer}"
)]
})?;
let mut client = ApiServiceClient::connect(random_peer.clone()).await?;

tokio::time::sleep(Duration::from_secs(timeout_broadcast)).await;
let mut errors = vec![];
// submit a certificate to one node
_ = client
.submit_certificate(SubmitCertificateRequest {
certificate: Some(pushed_certificate.into()),
})
.await?;

join_all(join_handlers)
.await
.iter()
.for_each(|result| match result {
Err(_) => {
errors.push("Unable to properly execute command".to_string());
}
Ok(Err((peer, error))) => {
errors.push(format!("{peer} {error}"));
}
_ => {}
});
tokio::time::sleep(Duration::from_secs(timeout_broadcast)).await;

if errors.is_empty() {
Ok(())
} else {
Err(errors)
}
})
.await
.map_err(|error| {
warn!("Timeout reached: {:?}", error);
error
})
join_all(join_handlers)
.await
.iter()
.for_each(|result| match result {
Err(e) => {
panic!("Join error: {e}");
}
Ok(Err((peer, error))) => {
panic!("Peer {peer} error: {error}");
}
_ => {}
});
Ok(())
}

async fn run_check_certificate_delivery(
async fn run_assert_certificate_full_delivery(
number_of_nodes: usize,
timeout_broadcast: u64,
timeout: Duration,
) -> Result<(), Box<dyn std::error::Error>> {
let mut peers_context = create_network(number_of_nodes, vec![]).await;

Expand Down Expand Up @@ -485,15 +466,12 @@ async fn run_check_certificate_delivery(
.map_err(|e| format!("Unable to parse node list: {e}"))
.expect("Valid node list");

match check_certificate_delivery(timeout_broadcast, peers, timeout).await {
Ok(Err(e)) => {
panic!("Error with certificate delivery for network of {number_of_nodes}: {e:?}");
match assert_certificate_full_delivery(timeout_broadcast, peers).await {
Ok(()) => {
info!("Check certificate delivery passed for network of {number_of_nodes}!");
}
Err(e) => {
panic!("Timeout elapsed: {e}");
}
Ok(_) => {
info!("Check certificate delivery passed for network of {number_of_nodes}!");
panic!("Test error: {e}");
}
}
};
Expand All @@ -503,15 +481,14 @@ async fn run_check_certificate_delivery(
}

#[rstest]
#[test(tokio::test)]
#[timeout(Duration::from_secs(20))]
async fn push_and_deliver_cert_5() -> Result<(), Box<dyn std::error::Error>> {
run_check_certificate_delivery(5, 5, Duration::from_secs(20)).await
}

#[rstest]
#[test(tokio::test)]
#[case(5usize)]
#[case(9usize)]
#[test_log::test(tokio::test)]
#[trace]
#[timeout(Duration::from_secs(20))]
async fn push_and_deliver_cert_9() -> Result<(), Box<dyn std::error::Error>> {
run_check_certificate_delivery(9, 5, Duration::from_secs(20)).await
#[serial]
async fn push_and_deliver_cert(
#[case] number_of_nodes: usize,
) -> Result<(), Box<dyn std::error::Error>> {
run_assert_certificate_full_delivery(number_of_nodes, 5).await
}

0 comments on commit 979e653

Please sign in to comment.