Skip to content

Commit

Permalink
feat: reset route scenario test
Browse files Browse the repository at this point in the history
  • Loading branch information
BastienFaivre committed Jan 11, 2025
1 parent f5b1fc8 commit d3e42ea
Show file tree
Hide file tree
Showing 4 changed files with 276 additions and 8 deletions.
4 changes: 3 additions & 1 deletion dog/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,9 @@ where
transaction,
}));

self.forward_transaction(&tx_id, raw_transaction, propagation_source);
if self.config.forward_transactions() {
self.forward_transaction(&tx_id, raw_transaction, propagation_source);
}
}

fn handle_invalid_transaction(
Expand Down
15 changes: 15 additions & 0 deletions dog/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub struct Config {
connection_handler_publish_duration: Duration,
connection_handler_forward_duration: Duration,
deliver_own_transactions: bool,
forward_transactions: bool,
}

impl Config {
Expand Down Expand Up @@ -93,6 +94,12 @@ impl Config {
pub fn deliver_own_transactions(&self) -> bool {
self.deliver_own_transactions
}

/// Whether the node should forward transactions to other peers. The default is `true`.
/// This is used if a node just wants to be a "client" in the network.
pub fn forward_transactions(&self) -> bool {
self.forward_transactions
}
}

impl Default for Config {
Expand Down Expand Up @@ -128,6 +135,7 @@ impl Default for ConfigBuilder {
connection_handler_publish_duration: Duration::from_secs(5),
connection_handler_forward_duration: Duration::from_secs(1),
deliver_own_transactions: false,
forward_transactions: true,
},
}
}
Expand Down Expand Up @@ -222,6 +230,13 @@ impl ConfigBuilder {
self
}

/// Whether the node should forward transactions to other peers. The default is `true`.
/// This is used if a node just wants to be a "client" in the network.
pub fn forward_transactions(&mut self, forward_transactions: bool) -> &mut Self {
self.config.forward_transactions = forward_transactions;
self
}

/// Determines the level of validation used when receiving transactions. See [`ValidationMode`]
/// for the available types. the default is `ValidationMode::Strict`.
pub fn validation_mode(&mut self, validation_mode: ValidationMode) -> &mut Self {
Expand Down
43 changes: 36 additions & 7 deletions dog/tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tokio::{sync::mpsc, task::JoinHandle, time::sleep};

pub struct Test<const N: usize> {
nodes: [TestNode; N],
handlers: [Option<JoinHandle<()>>; N],
}

impl<const N: usize> Test<N> {
Expand Down Expand Up @@ -45,8 +46,8 @@ impl<const N: usize> Test<N> {
bootstrap_sets
}

pub fn new_with_unique_config(
config: libp2p_dog::Config,
pub fn new_with_each_config(
configs: [libp2p_dog::Config; N],
bootstrap_sets: [Vec<usize>; N],
signed_transactions: bool,
) -> Result<Self, Box<dyn std::error::Error>> {
Expand All @@ -64,20 +65,48 @@ impl<const N: usize> Test<N> {
})
.collect();

TestNode::new(addr, bootstrap_set, config.clone(), signed_transactions).unwrap()
TestNode::new(addr, bootstrap_set, configs[i].clone(), signed_transactions).unwrap()
})
.collect::<Vec<_>>()
.try_into()
.map_err(|_| "Failed to convert Vec to array")?;

Ok(Self { nodes })
Ok(Self {
nodes,
handlers: [const { None }; N],
})
}

pub fn new_with_unique_config(
config: libp2p_dog::Config,
bootstrap_sets: [Vec<usize>; N],
signed_transactions: bool,
) -> Result<Self, Box<dyn std::error::Error>> {
let configs = std::array::from_fn(|_| config.clone());
Self::new_with_each_config(configs, bootstrap_sets, signed_transactions)
}

pub async fn spawn_all(&mut self) -> Vec<JoinHandle<()>> {
let join_handlers = self.nodes.iter_mut().map(|node| node.spawn()).collect();
pub async fn spawn_all(&mut self) {
self.handlers = self
.nodes
.iter_mut()
.map(|node| Some(node.spawn()))
.collect::<Vec<Option<JoinHandle<()>>>>()
.try_into()
.unwrap_or_else(|_| panic!("Failed to convert Vec to array"));
// Wait for the swarms to initialize and dial each other
sleep(Duration::from_secs(2)).await;
join_handlers
}

pub async fn kill_node(&mut self, node: usize) {
assert!(node < N);
match &self.handlers[node] {
Some(handle) => {
handle.abort();
self.handlers[node] = None;
}
None => {}
}
}

pub fn peer_ids(&self) -> [PeerId; N] {
Expand Down
222 changes: 222 additions & 0 deletions dog/tests/tests/dog.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::time::Duration;

use libp2p_dog_tests::Test;
use rand::seq::SliceRandom;
use tokio::time::sleep;

// Testing the dog behaviour with two nodes sending transactions to each other
Expand Down Expand Up @@ -262,3 +263,224 @@ pub async fn random_network_no_redundancy() {
}
}
}

// Testing that a node will request to reset a route previously blocked by a have_tx message if
// the redundancy is too low.
// We consider the following network:
// A <---> Bi <---> C
// where A is the transaction sender, Bi (1 <= i <= N) are intermediary nodes and C is the node
// on which we want to test the behaviour.
// We define a redundancy R (0 <= R <= N - 1). First, A publishes a bunch of transactions. After
// some time, C should have sent N - R have_tx messages to nodes Bi (in order to reach the target
// redundancy). Then, A continues to publish transactions, but we kill M nodes Bi (0 <= M <= N - R,
// i.e. the ones that have not received a have_tx message from C). We expect C to reset some routes
// to recover the target redundancy.
// For a better test tracking, we define N >= (2 * R) + 1 to make sure node C will be able to
// reach the target redundancy.
#[tokio::test]
pub async fn simple_reset_route_scenario() {
const R: usize = 3;
const B: usize = 2 * R + 1; // Number of Bi nodes
const N: usize = B + 2; // Total number of nodes

let config_a_bi = libp2p_dog::ConfigBuilder::default()
// We force the nodes to remove any redundancy
.target_redundancy(0.0)
.redundancy_delta_percent(0)
// Speed up have_tx unblocking
.redundancy_interval(Duration::from_secs(100))
// Disable signature to speed up the test
.validation_mode(libp2p_dog::ValidationMode::None)
.build()
.unwrap();
let config_c = libp2p_dog::ConfigBuilder::default()
// We force the nodes to remove any redundancy
.target_redundancy(R as f64)
.redundancy_delta_percent(0)
// Speed up have_tx unblocking
.redundancy_interval(Duration::from_millis(10))
// Disable signature to speed up the test
.validation_mode(libp2p_dog::ValidationMode::None)
// For simplicity, node C acts as a client
.forward_transactions(false)
.build()
.unwrap();

let mut configs = Vec::with_capacity(N);
for _ in 0..B + 1 {
configs.push(config_a_bi.clone());
}
configs.push(config_c);

let mut bootstrap_sets: Vec<Vec<usize>> = Vec::with_capacity(N);
bootstrap_sets.push((1..B + 1).collect());
for _ in 0..B {
bootstrap_sets.push(vec![N - 1]);
}
bootstrap_sets.push(vec![]);

let mut test = match Test::<N>::new_with_each_config(
match configs.try_into() {
Ok(configs_array) => configs_array,
Err(_) => panic!("Failed to convert Vec to array"),
},
bootstrap_sets
.try_into()
.expect("Failed to convert Vec to array"),
false,
) {
Ok(test) => test,
Err(e) => panic!("Failed to create test: {}", e),
};

test.spawn_all().await;

for i in 0..B - R + 1 {
test.publish_on_node(0, format!("Hello #{} from node A!", i).into_bytes());
sleep(Duration::from_millis(100)).await;
}

sleep(Duration::from_secs(5)).await;

let peer_ids = test.peer_ids();
let events = test.collect_events();

assert_eq!(peer_ids.len(), N);
assert_eq!(events.len(), N);

for (i, (transactions, _)) in events.iter().enumerate() {
if i == 0 {
// Node A
continue;
}

assert_eq!(transactions.len(), B - R + 1);
let mut expected = (0..B - R + 1)
.map(|j| libp2p_dog::Transaction {
from: peer_ids[0],
seqno: 0, // ignored
data: format!("Hello #{} from node A!", j).into_bytes(),
})
.collect::<Vec<_>>();

for transaction in transactions {
let index = match expected.iter().position(|expected| {
expected.from == transaction.from && expected.data == transaction.data
}) {
Some(index) => index,
None => panic!("Unexpected transaction: {:?}", transaction),
};
expected.remove(index);
}
}

let mut exception = false;
for (i, (_, routing_updates)) in events.iter().enumerate() {
if i == 0 || i == N - 1 {
assert!(routing_updates.is_empty());
continue;
}

match routing_updates.len() {
0 => {}
1 => {
assert_eq!(routing_updates[0].len(), 1);
}
2 => {
if exception {
panic!("There can only be one exception");
}
exception = true;
assert_eq!(routing_updates[0].len(), 1);
assert_eq!(routing_updates[1].len(), 0);
}
n => panic!("Unexpected number of routing updates: {}", n),
}
}

// Retrieve the Bi nodes that have not received a have_tx message from C
let mut bi_nodes_to_kill = Vec::new();
for (i, (_, routing_updates)) in events.iter().enumerate() {
if !(1..B + 1).contains(&i) {
continue;
}

match routing_updates.last() {
Some(routes) => {
if routes.is_empty() {
bi_nodes_to_kill.push(i);
}
}
None => {
bi_nodes_to_kill.push(i);
}
}
}

// Drop a random Bi node as we need to keep one route alive
bi_nodes_to_kill.shuffle(&mut rand::thread_rng());
bi_nodes_to_kill.pop();

// Kill the Bi nodes that have not received a have_tx message from C
for bi_node in bi_nodes_to_kill.iter() {
test.kill_node(*bi_node).await;
}

for i in B - R - 1..B - 1 {
test.publish_on_node(0, format!("Hello #{} from node A!", i).into_bytes());
sleep(Duration::from_millis(100)).await;
}

sleep(Duration::from_secs(5)).await;

let events = test.collect_events();

assert_eq!(events.len(), N);

for (i, (transactions, _)) in events.iter().enumerate() {
if i == 0 || bi_nodes_to_kill.contains(&i) {
// Node A
continue;
}

assert_eq!(transactions.len(), R);
let mut expected = (B - R - 1..B - 1)
.map(|j| libp2p_dog::Transaction {
from: peer_ids[0],
seqno: 0, // ignored
data: format!("Hello #{} from node A!", j).into_bytes(),
})
.collect::<Vec<_>>();

for transaction in transactions {
let index = match expected.iter().position(|expected| {
expected.from == transaction.from && expected.data == transaction.data
}) {
Some(index) => index,
None => panic!("Unexpected transaction: {:?}", transaction),
};
expected.remove(index);
}
}

let mut exception = false;
for (i, (_, routing_updates)) in events.iter().enumerate() {
if i == 0 || bi_nodes_to_kill.contains(&i) || i == N - 1 {
assert!(routing_updates.is_empty());
continue;
}

match routing_updates.len() {
0 => {
if exception {
panic!("There can only be one exception");
}
exception = true;
}
1 => {
assert_eq!(routing_updates[0].len(), 0);
}
n => panic!("Unexpected number of routing updates: {}", n),
}
}
}

0 comments on commit d3e42ea

Please sign in to comment.