Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

fix(p2p): accept listener connection during bootstrap #484

Merged
merged 6 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/topos-p2p/src/behaviour/grpc/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub enum Event {
OutboundSuccess {
peer_id: PeerId,
request_id: RequestId,
#[allow(unused)]
channel: Channel,
},

Expand Down
43 changes: 29 additions & 14 deletions crates/topos-p2p/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{
};
use futures::Stream;
use libp2p::{
core::upgrade,
core::{transport::MemoryTransport, upgrade},
dns,
identity::Keypair,
kad::store::MemoryStore,
Expand All @@ -30,6 +30,7 @@ use std::{
};
use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::ReceiverStream;
use tracing::debug;

pub fn builder<'a>() -> NetworkBuilder<'a> {
NetworkBuilder::default()
Expand All @@ -48,9 +49,16 @@ pub struct NetworkBuilder<'a> {
local_port: Option<u8>,
config: NetworkConfig,
grpc_context: GrpcContext,
memory_transport: bool,
}

impl<'a> NetworkBuilder<'a> {
#[cfg(test)]
pub(crate) fn memory(mut self) -> Self {
self.memory_transport = true;

self
}
pub fn grpc_context(mut self, grpc_context: GrpcContext) -> Self {
self.grpc_context = grpc_context;

Expand Down Expand Up @@ -131,6 +139,7 @@ impl<'a> NetworkBuilder<'a> {

let grpc = grpc::Behaviour::new(self.grpc_context);

debug!("Known peers: {:?}", self.known_peers);
let behaviour = Behaviour {
gossipsub,
peer_info: PeerInfoBehaviour::new(PEER_INFO_PROTOCOL, &peer_key),
Expand All @@ -148,23 +157,29 @@ impl<'a> NetworkBuilder<'a> {
grpc,
};

let transport = {
let multiplex_config = libp2p::yamux::Config::default();

let transport = if self.memory_transport {
MemoryTransport::new()
.upgrade(upgrade::Version::V1)
.authenticate(noise::Config::new(&peer_key)?)
.multiplex(multiplex_config)
.timeout(TWO_HOURS)
.boxed()
} else {
let tcp = libp2p::tcp::tokio::Transport::new(Config::default().nodelay(true));
let dns_tcp = dns::tokio::Transport::system(tcp).unwrap();

let tcp = libp2p::tcp::tokio::Transport::new(Config::default().nodelay(true));
dns_tcp.or_transport(tcp)
dns_tcp
.or_transport(tcp)
.upgrade(upgrade::Version::V1)
.authenticate(noise::Config::new(&peer_key)?)
.multiplex(multiplex_config)
.timeout(TWO_HOURS)
.boxed()
};

let multiplex_config = libp2p::yamux::Config::default();

let transport = transport
.upgrade(upgrade::Version::V1)
.authenticate(noise::Config::new(&peer_key)?)
.multiplex(multiplex_config)
.timeout(TWO_HOURS)
.boxed();

let swarm = Swarm::new(
transport,
behaviour,
Expand Down Expand Up @@ -216,8 +231,8 @@ impl<'a> NetworkBuilder<'a> {
pending_record_requests: HashMap::new(),
shutdown,
health_state: crate::runtime::HealthState {
bootpeer_connection_retries: 3,
successfully_connected_to_bootpeer: if self.known_peers.is_empty() {
bootnode_connection_retries: 3,
successfully_connected_to_bootnode: if self.known_peers.is_empty() {
// Node seems to be a boot node
Some(ConnectionId::new_unchecked(0))
} else {
Expand Down
61 changes: 43 additions & 18 deletions crates/topos-p2p/src/runtime/handle_event.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use libp2p::{multiaddr::Protocol, swarm::SwarmEvent};
use libp2p::{core::Endpoint, multiaddr::Protocol, swarm::SwarmEvent};
use tracing::{debug, error, info, warn};

use crate::{error::P2PError, event::ComposedEvent, Event, Runtime};
Expand Down Expand Up @@ -62,13 +62,13 @@ impl EventHandler<SwarmEvent<ComposedEvent>> for Runtime {
error,
} if self
.health_state
.successfully_connected_to_bootpeer
.successfully_connected_to_bootnode
.is_none()
&& self.health_state.dialed_bootpeer.contains(&connection_id) =>
&& self.health_state.dialed_bootnode.contains(&connection_id) =>
{
warn!("Unable to connect to bootpeer {peer_id}: {error:?}");
self.health_state.dialed_bootpeer.remove(&connection_id);
if self.health_state.dialed_bootpeer.is_empty() {
warn!("Unable to connect to bootnode {peer_id}: {error:?}");
self.health_state.dialed_bootnode.remove(&connection_id);
if self.health_state.dialed_bootnode.is_empty() {
// We tried to connect to all bootnode without success
error!("Unable to connect to any bootnode");
}
Expand Down Expand Up @@ -100,25 +100,49 @@ impl EventHandler<SwarmEvent<ComposedEvent>> for Runtime {
num_established,
concurrent_dial_errors,
established_in,
} if self.health_state.dialed_bootpeer.contains(&connection_id) => {
info!("Successfully connected to bootpeer {peer_id}");
} if self.health_state.dialed_bootnode.contains(&connection_id) => {
info!("Successfully connected to bootnode {peer_id}");
if self
.health_state
.successfully_connected_to_bootpeer
.successfully_connected_to_bootnode
.is_none()
{
self.health_state.successfully_connected_to_bootpeer = Some(connection_id);
_ = self.health_state.dialed_bootpeer.remove(&connection_id);
self.health_state.successfully_connected_to_bootnode = Some(connection_id);
_ = self.health_state.dialed_bootnode.remove(&connection_id);
}
}

SwarmEvent::ConnectionEstablished {
peer_id, endpoint, ..
peer_id,
endpoint,
connection_id,
..
} => {
info!(
"Connection established with peer {peer_id} as {:?}",
endpoint.to_endpoint()
);
if self
.health_state
.successfully_connected_to_bootnode
.is_none()
&& self.boot_peers.contains(&peer_id)
{
info!(
"Connection established with bootnode {peer_id} as {:?}",
endpoint.to_endpoint()
);

if endpoint.to_endpoint() == Endpoint::Listener {
if let Err(error) = self.swarm.dial(peer_id) {
error!(
"Unable to dial bootnode {peer_id} after incoming connection: \
Freyskeyd marked this conversation as resolved.
Show resolved Hide resolved
{error}"
);
}
}
} else {
info!(
"Connection established with peer {peer_id} as {:?}",
endpoint.to_endpoint()
);
}

if self.swarm.connected_peers().count() >= self.config.minimum_cluster_size {
if let Err(error) = self.swarm.behaviour_mut().gossipsub.subscribe() {
Expand Down Expand Up @@ -164,8 +188,8 @@ impl EventHandler<SwarmEvent<ComposedEvent>> for Runtime {
peer_id: Some(ref peer_id),
connection_id,
} if self.boot_peers.contains(peer_id) => {
info!("Dialing bootpeer {peer_id} on connection: {connection_id}");
self.health_state.dialed_bootpeer.insert(connection_id);
info!("Dialing bootnode {peer_id} on connection: {connection_id}");
self.health_state.dialed_bootnode.insert(connection_id);
}

SwarmEvent::Dialing {
Expand All @@ -185,6 +209,7 @@ impl EventHandler<SwarmEvent<ComposedEvent>> for Runtime {
SwarmEvent::ListenerError { listener_id, error } => {
error!("Unhandled ListenerError {listener_id:?} | {error}")
}

event => {
warn!("Unhandled SwarmEvent: {:?}", event);
}
Expand Down
10 changes: 5 additions & 5 deletions crates/topos-p2p/src/runtime/handle_event/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl EventHandler<Box<Event>> for Runtime {
{
if self
.health_state
.successfully_connected_to_bootpeer
.successfully_connected_to_bootnode
.is_none()
{
warn!(
Expand Down Expand Up @@ -85,11 +85,11 @@ impl EventHandler<Box<Event>> for Runtime {
} if num_remaining == 0
&& self
.health_state
.successfully_connected_to_bootpeer
.successfully_connected_to_bootnode
.is_none()
&& self.swarm.behaviour().discovery.health_status == HealthStatus::Unhealthy =>
{
match self.health_state.bootpeer_connection_retries.checked_sub(1) {
match self.health_state.bootnode_connection_retries.checked_sub(1) {
None => {
error!(
"Bootstrap query finished but unable to connect to bootnode, stopping"
Expand All @@ -103,7 +103,7 @@ impl EventHandler<Box<Event>> for Runtime {
{} more times",
new
);
self.health_state.bootpeer_connection_retries = new;
self.health_state.bootnode_connection_retries = new;
}
}
}
Expand All @@ -119,7 +119,7 @@ impl EventHandler<Box<Event>> for Runtime {
} if num_remaining == 0
&& self
.health_state
.successfully_connected_to_bootpeer
.successfully_connected_to_bootnode
.is_some()
&& self.swarm.behaviour().discovery.health_status == HealthStatus::Unhealthy =>
{
Expand Down
44 changes: 43 additions & 1 deletion crates/topos-p2p/src/runtime/handle_event/grpc.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,52 @@
use tracing::debug;

use crate::{behaviour::grpc, Runtime};

use super::{EventHandler, EventResult};

#[async_trait::async_trait]
impl EventHandler<grpc::Event> for Runtime {
async fn handle(&mut self, _event: grpc::Event) -> EventResult {
async fn handle(&mut self, event: grpc::Event) -> EventResult {
match event {
grpc::Event::OutboundFailure {
peer_id,
request_id,
error,
} => {
debug!(
"Outbound connection failure to peer {} for request {}: {}",
peer_id, request_id, error
);
}
grpc::Event::OutboundSuccess {
peer_id,
request_id,
..
} => {
debug!(
"Outbound connection success to peer {} for request {}",
peer_id, request_id
);
}
grpc::Event::InboundNegotiatedConnection {
request_id,
connection_id,
} => {
debug!(
"Inbound connection negotiated for request {} with connection {}",
request_id, connection_id
);
}
grpc::Event::OutboundNegotiatedConnection {
peer_id,
request_id,
} => {
debug!(
"Outbound connection negotiated to peer {} for request {}",
peer_id, request_id
);
}
}
Ok(())
}
}
6 changes: 3 additions & 3 deletions crates/topos-p2p/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ pub(crate) struct HealthState {
/// Indicates if the node is listening on any address
pub(crate) is_listening: bool,
/// List the bootnodes that the node has tried to connect to
pub(crate) dialed_bootpeer: HashSet<ConnectionId>,
pub(crate) dialed_bootnode: HashSet<ConnectionId>,
/// Indicates if the node has successfully connected to a bootnode
pub(crate) successfully_connected_to_bootpeer: Option<ConnectionId>,
pub(crate) successfully_connected_to_bootnode: Option<ConnectionId>,
/// Track the number of remaining retries to connect to any bootnode
pub(crate) bootpeer_connection_retries: usize,
pub(crate) bootnode_connection_retries: usize,
}

impl Runtime {
Expand Down
60 changes: 60 additions & 0 deletions crates/topos-p2p/src/tests/bootstrap.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use std::time::Duration;

use futures::{future::join_all, FutureExt};
use rstest::rstest;
use test_log::test;
use topos_test_sdk::tce::NodeConfig;
use tracing::Instrument;

#[rstest]
#[test(tokio::test)]
#[timeout(Duration::from_secs(5))]
async fn two_bootnode_communicating() {
let bootnode = NodeConfig::memory(2);
let local = NodeConfig::memory(1);
let bootnode_known_peers = vec![(local.peer_id(), local.addr.clone())];
let local_known_peers = vec![(bootnode.peer_id(), bootnode.addr.clone())];

let mut handlers = Vec::new();

let context_local = tracing::info_span!("start_node", "peer_id" = local.peer_id().to_string());

let context_bootnode =
tracing::info_span!("start_node", "peer_id" = bootnode.peer_id().to_string());
handlers.push(
async move {
let (client, mut stream, runtime) = crate::network::builder()
.minimum_cluster_size(1)
.peer_key(local.keypair.clone())
.listen_addresses(&[local.addr.clone()])
.known_peers(&local_known_peers)
.memory()
.build()
.await
.expect("Unable to create p2p network");

runtime.bootstrap(&mut stream).await
}
.instrument(context_local)
.boxed(),
);

handlers.push(
async move {
let (client, mut stream, runtime) = crate::network::builder()
.minimum_cluster_size(1)
.peer_key(bootnode.keypair.clone())
.listen_addresses(&[bootnode.addr.clone()])
.known_peers(&bootnode_known_peers)
.memory()
.build()
.await
.expect("Unable to create p2p network");

runtime.bootstrap(&mut stream).await
}
.instrument(context_bootnode)
.boxed(),
);
assert!(join_all(handlers).await.iter().all(Result::is_ok));
}
1 change: 1 addition & 0 deletions crates/topos-p2p/src/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod behaviour;
mod bootstrap;
mod command;
mod support;
2 changes: 1 addition & 1 deletion crates/topos-test-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ ethers.workspace = true
async-trait.workspace = true
futures.workspace = true
lazy_static = { version = "1.4.0" }
libp2p.workspace = true
libp2p = { workspace = true, features = ["macros"] }
proc_macro_sdk = { path = "./proc_macro_sdk/" }
rand.workspace = true
rstest.workspace = true
Expand Down
Loading
Loading