Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

fix(p2p): accept listener connection during bootstrap #484

Merged
merged 6 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 29 additions & 14 deletions crates/topos-p2p/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{
};
use futures::Stream;
use libp2p::{
core::upgrade,
core::{transport::MemoryTransport, upgrade},
dns,
identity::Keypair,
kad::store::MemoryStore,
Expand All @@ -30,6 +30,7 @@ use std::{
};
use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::ReceiverStream;
use tracing::warn;

pub fn builder<'a>() -> NetworkBuilder<'a> {
NetworkBuilder::default()
Expand All @@ -48,9 +49,16 @@ pub struct NetworkBuilder<'a> {
local_port: Option<u8>,
config: NetworkConfig,
grpc_context: GrpcContext,
memory_transport: bool,
}

impl<'a> NetworkBuilder<'a> {
#[cfg(test)]
pub(crate) fn memory(mut self) -> Self {
self.memory_transport = true;

self
}
pub fn grpc_context(mut self, grpc_context: GrpcContext) -> Self {
self.grpc_context = grpc_context;

Expand Down Expand Up @@ -131,6 +139,7 @@ impl<'a> NetworkBuilder<'a> {

let grpc = grpc::Behaviour::new(self.grpc_context);

warn!("Known peers: {:?}", self.known_peers);
hadjiszs marked this conversation as resolved.
Show resolved Hide resolved
let behaviour = Behaviour {
gossipsub,
peer_info: PeerInfoBehaviour::new(PEER_INFO_PROTOCOL, &peer_key),
Expand All @@ -148,23 +157,29 @@ impl<'a> NetworkBuilder<'a> {
grpc,
};

let transport = {
let multiplex_config = libp2p::yamux::Config::default();

let transport = if self.memory_transport {
MemoryTransport::new()
.upgrade(upgrade::Version::V1)
.authenticate(noise::Config::new(&peer_key)?)
.multiplex(multiplex_config)
.timeout(TWO_HOURS)
.boxed()
} else {
let tcp = libp2p::tcp::tokio::Transport::new(Config::default().nodelay(true));
let dns_tcp = dns::tokio::Transport::system(tcp).unwrap();

let tcp = libp2p::tcp::tokio::Transport::new(Config::default().nodelay(true));
dns_tcp.or_transport(tcp)
dns_tcp
.or_transport(tcp)
.upgrade(upgrade::Version::V1)
.authenticate(noise::Config::new(&peer_key)?)
.multiplex(multiplex_config)
.timeout(TWO_HOURS)
.boxed()
};

let multiplex_config = libp2p::yamux::Config::default();

let transport = transport
.upgrade(upgrade::Version::V1)
.authenticate(noise::Config::new(&peer_key)?)
.multiplex(multiplex_config)
.timeout(TWO_HOURS)
.boxed();

let swarm = Swarm::new(
transport,
behaviour,
Expand Down Expand Up @@ -216,8 +231,8 @@ impl<'a> NetworkBuilder<'a> {
pending_record_requests: HashMap::new(),
shutdown,
health_state: crate::runtime::HealthState {
bootpeer_connection_retries: 3,
successfully_connected_to_bootpeer: if self.known_peers.is_empty() {
bootnode_connection_retries: 3,
successfully_connected_to_bootnode: if self.known_peers.is_empty() {
// Node seems to be a boot node
Some(ConnectionId::new_unchecked(0))
} else {
Expand Down
61 changes: 43 additions & 18 deletions crates/topos-p2p/src/runtime/handle_event.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use libp2p::{multiaddr::Protocol, swarm::SwarmEvent};
use libp2p::{core::Endpoint, multiaddr::Protocol, swarm::SwarmEvent};
use tracing::{debug, error, info, warn};

use crate::{error::P2PError, event::ComposedEvent, Event, Runtime};
Expand Down Expand Up @@ -62,13 +62,13 @@ impl EventHandler<SwarmEvent<ComposedEvent>> for Runtime {
error,
} if self
.health_state
.successfully_connected_to_bootpeer
.successfully_connected_to_bootnode
.is_none()
&& self.health_state.dialed_bootpeer.contains(&connection_id) =>
&& self.health_state.dialed_bootnode.contains(&connection_id) =>
{
warn!("Unable to connect to bootpeer {peer_id}: {error:?}");
self.health_state.dialed_bootpeer.remove(&connection_id);
if self.health_state.dialed_bootpeer.is_empty() {
warn!("Unable to connect to bootnode {peer_id}: {error:?}");
self.health_state.dialed_bootnode.remove(&connection_id);
if self.health_state.dialed_bootnode.is_empty() {
// We tried to connect to all bootnode without success
error!("Unable to connect to any bootnode");
}
Expand Down Expand Up @@ -100,25 +100,49 @@ impl EventHandler<SwarmEvent<ComposedEvent>> for Runtime {
num_established,
concurrent_dial_errors,
established_in,
} if self.health_state.dialed_bootpeer.contains(&connection_id) => {
info!("Successfully connected to bootpeer {peer_id}");
} if self.health_state.dialed_bootnode.contains(&connection_id) => {
info!("Successfully connected to bootnode {peer_id}");
if self
.health_state
.successfully_connected_to_bootpeer
.successfully_connected_to_bootnode
.is_none()
{
self.health_state.successfully_connected_to_bootpeer = Some(connection_id);
_ = self.health_state.dialed_bootpeer.remove(&connection_id);
self.health_state.successfully_connected_to_bootnode = Some(connection_id);
_ = self.health_state.dialed_bootnode.remove(&connection_id);
}
}

SwarmEvent::ConnectionEstablished {
peer_id, endpoint, ..
peer_id,
endpoint,
connection_id,
..
} => {
info!(
"Connection established with peer {peer_id} as {:?}",
endpoint.to_endpoint()
);
if self
.health_state
.successfully_connected_to_bootnode
.is_none()
&& self.boot_peers.contains(&peer_id)
{
info!(
"Connection established with bootnode {peer_id} as {:?}",
endpoint.to_endpoint()
);

if endpoint.to_endpoint() == Endpoint::Listener {
if let Err(error) = self.swarm.dial(peer_id) {
error!(
"Unable to dial bootnode {peer_id} after incoming connection: \
Freyskeyd marked this conversation as resolved.
Show resolved Hide resolved
{error}"
);
}
}
} else {
info!(
"Connection established with peer {peer_id} as {:?}",
endpoint.to_endpoint()
);
}

if self.swarm.connected_peers().count() >= self.config.minimum_cluster_size {
if let Err(error) = self.swarm.behaviour_mut().gossipsub.subscribe() {
Expand Down Expand Up @@ -164,8 +188,8 @@ impl EventHandler<SwarmEvent<ComposedEvent>> for Runtime {
peer_id: Some(ref peer_id),
connection_id,
} if self.boot_peers.contains(peer_id) => {
info!("Dialing bootpeer {peer_id} on connection: {connection_id}");
self.health_state.dialed_bootpeer.insert(connection_id);
info!("Dialing bootnode {peer_id} on connection: {connection_id}");
self.health_state.dialed_bootnode.insert(connection_id);
}

SwarmEvent::Dialing {
Expand All @@ -185,6 +209,7 @@ impl EventHandler<SwarmEvent<ComposedEvent>> for Runtime {
SwarmEvent::ListenerError { listener_id, error } => {
error!("Unhandled ListenerError {listener_id:?} | {error}")
}

event => {
warn!("Unhandled SwarmEvent: {:?}", event);
}
Expand Down
10 changes: 5 additions & 5 deletions crates/topos-p2p/src/runtime/handle_event/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl EventHandler<Box<Event>> for Runtime {
{
if self
.health_state
.successfully_connected_to_bootpeer
.successfully_connected_to_bootnode
.is_none()
{
warn!(
Expand Down Expand Up @@ -85,11 +85,11 @@ impl EventHandler<Box<Event>> for Runtime {
} if num_remaining == 0
&& self
.health_state
.successfully_connected_to_bootpeer
.successfully_connected_to_bootnode
.is_none()
&& self.swarm.behaviour().discovery.health_status == HealthStatus::Unhealthy =>
{
match self.health_state.bootpeer_connection_retries.checked_sub(1) {
match self.health_state.bootnode_connection_retries.checked_sub(1) {
None => {
error!(
"Bootstrap query finished but unable to connect to bootnode, stopping"
Expand All @@ -103,7 +103,7 @@ impl EventHandler<Box<Event>> for Runtime {
{} more times",
new
);
self.health_state.bootpeer_connection_retries = new;
self.health_state.bootnode_connection_retries = new;
}
}
}
Expand All @@ -119,7 +119,7 @@ impl EventHandler<Box<Event>> for Runtime {
} if num_remaining == 0
&& self
.health_state
.successfully_connected_to_bootpeer
.successfully_connected_to_bootnode
.is_some()
&& self.swarm.behaviour().discovery.health_status == HealthStatus::Unhealthy =>
{
Expand Down
6 changes: 3 additions & 3 deletions crates/topos-p2p/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ pub(crate) struct HealthState {
/// Indicates if the node is listening on any address
pub(crate) is_listening: bool,
/// List the bootnodes that the node has tried to connect to
pub(crate) dialed_bootpeer: HashSet<ConnectionId>,
pub(crate) dialed_bootnode: HashSet<ConnectionId>,
/// Indicates if the node has successfully connected to a bootnode
pub(crate) successfully_connected_to_bootpeer: Option<ConnectionId>,
pub(crate) successfully_connected_to_bootnode: Option<ConnectionId>,
/// Track the number of remaining retries to connect to any bootnode
pub(crate) bootpeer_connection_retries: usize,
pub(crate) bootnode_connection_retries: usize,
}

impl Runtime {
Expand Down
60 changes: 60 additions & 0 deletions crates/topos-p2p/src/tests/bootstrap.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use std::time::Duration;

use futures::{future::join_all, FutureExt};
use rstest::rstest;
use test_log::test;
use topos_test_sdk::tce::NodeConfig;
use tracing::Instrument;

#[rstest]
#[test(tokio::test)]
#[timeout(Duration::from_secs(5))]
async fn two_bootnode_communicating() {
let bootnode = NodeConfig::memory(2);
let local = NodeConfig::memory(1);
let bootnode_known_peers = vec![(local.peer_id(), local.addr.clone())];
let local_known_peers = vec![(bootnode.peer_id(), bootnode.addr.clone())];

let mut handlers = Vec::new();

let context_local = tracing::info_span!("start_node", "peer_id" = local.peer_id().to_string());

let context_bootnode =
tracing::info_span!("start_node", "peer_id" = bootnode.peer_id().to_string());
handlers.push(
async move {
let (client, mut stream, runtime) = crate::network::builder()
.minimum_cluster_size(1)
.peer_key(local.keypair.clone())
.listen_addresses(&[local.addr.clone()])
.known_peers(&local_known_peers)
.memory()
.build()
.await
.expect("Unable to create p2p network");

runtime.bootstrap(&mut stream).await
}
.instrument(context_local)
.boxed(),
);

handlers.push(
async move {
let (client, mut stream, runtime) = crate::network::builder()
.minimum_cluster_size(1)
.peer_key(bootnode.keypair.clone())
.listen_addresses(&[bootnode.addr.clone()])
.known_peers(&bootnode_known_peers)
.memory()
.build()
.await
.expect("Unable to create p2p network");

runtime.bootstrap(&mut stream).await
}
.instrument(context_bootnode)
.boxed(),
);
assert!(join_all(handlers).await.iter().all(Result::is_ok));
}
1 change: 1 addition & 0 deletions crates/topos-p2p/src/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod behaviour;
mod bootstrap;
mod command;
mod support;
2 changes: 1 addition & 1 deletion crates/topos-test-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ ethers.workspace = true
async-trait.workspace = true
futures.workspace = true
lazy_static = { version = "1.4.0" }
libp2p.workspace = true
libp2p = { workspace = true, features = ["macros"] }
proc_macro_sdk = { path = "./proc_macro_sdk/" }
rand.workspace = true
rstest.workspace = true
Expand Down
28 changes: 16 additions & 12 deletions crates/topos-test-sdk/src/p2p/mod.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,28 @@
use libp2p::{
build_multiaddr,
identity::{self, Keypair},
Multiaddr,
};
use rand::{thread_rng, Rng};

use crate::networking::get_available_port;

pub type Port = u16;

pub fn local_peer(peer_index: u8) -> (Keypair, Port, Multiaddr) {
pub fn local_peer(peer_index: u8, memory_transport: bool) -> (Keypair, Multiaddr) {
let peer_id: Keypair = keypair_from_seed(peer_index);
let port = get_available_port();
let local_listen_addr: Multiaddr = format!(
"/ip4/127.0.0.1/tcp/{}/p2p/{}",
port,
peer_id.public().to_peer_id()
)
.parse()
.unwrap();
let local_listen_addr = if memory_transport {
build_multiaddr![Memory(thread_rng().gen::<u64>())]
} else {
let port = get_available_port();
format!(
"/ip4/127.0.0.1/tcp/{}/p2p/{}",
port,
peer_id.public().to_peer_id()
)
.parse()
.unwrap()
};

(peer_id, port, local_listen_addr)
(peer_id, local_listen_addr)
}

pub fn keypair_from_seed(seed: u8) -> Keypair {
Expand Down
Loading
Loading