Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: new behavior for network bootstrapping #1257

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
770 changes: 554 additions & 216 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ url = "2.5"
warp_lambda = "0.1.4"
wsts = "9.2.0"
hex = "0.4.3"
libp2p = { version = "0.54.1", default-features = false, features = [
libp2p = { version = "0.55.0", default-features = false, features = [
"macros", "kad", "noise", "ping", "tcp",
"tokio", "yamux", "mdns", "quic", "gossipsub",
"relay", "identify", "tls", "dns", "autonat",
Expand Down
2 changes: 1 addition & 1 deletion signer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ clarity.workspace = true
config.workspace = true
futures.workspace = true
hashbrown.workspace = true
libp2p.workspace = true
libp2p = { workspace = true, default-features = false }
lru = { workspace = true, default-features = false }
metrics.workspace = true
metrics-exporter-prometheus.workspace = true
Expand Down
1 change: 0 additions & 1 deletion signer/src/context/signer_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ impl SbtcLimits {
self.max_mintable_cap.unwrap_or(Amount::MAX_MONEY)
}

/// TODO: Document this
#[cfg(test)]
/// Create a new Self with only the given deposit minimum and maximums
/// set.
Expand Down
314 changes: 314 additions & 0 deletions signer/src/network/libp2p/bootstrap.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,314 @@
//! LibP2P behavior for bootstrapping the node against the network using
//! the known seed addresses.

use std::{
collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
task::Poll,
time::{Duration, Instant},
};

use libp2p::{
multiaddr::Protocol,
swarm::{dial_opts::DialOpts, FromSwarm, NetworkBehaviour, THandlerInEvent, ToSwarm},
Multiaddr, PeerId,
};

pub struct Config {
local_peer_id: PeerId,
seed_addresses: Vec<Multiaddr>,
bootstrap_interval: Duration,
}

impl Config {
pub fn new(local_peer_id: PeerId) -> Self {
Self {
local_peer_id,
seed_addresses: Default::default(),
bootstrap_interval: Duration::from_secs(60),
}
}

#[allow(dead_code)]
pub fn add_seed_addresses<T>(mut self, seed_addresses: T) -> Self
where
T: IntoIterator<Item = Multiaddr>,
{
self.seed_addresses.extend(seed_addresses);
self
}

#[allow(dead_code)]
pub fn with_bootstrap_interval(mut self, interval: Duration) -> Self {
self.bootstrap_interval = interval;
self
}
}

#[derive(Debug)]
pub enum BootstrapEvent {
Complete,
Needed,
Started { addresses: Vec<Multiaddr> },
}

pub struct Behavior {
config: Config,
pending_events: VecDeque<ToSwarm<BootstrapEvent, THandlerInEvent<Behavior>>>,
connection_count: usize,
connected_seeds: HashMap<PeerId, HashSet<Multiaddr>>,
last_attempted_at: Option<Instant>,
is_bootstrapped: bool,
}

impl Behavior {
pub fn new(config: Config) -> Self {
Self {
config,
pending_events: VecDeque::new(),
connection_count: 0,
connected_seeds: HashMap::new(),
last_attempted_at: None,
is_bootstrapped: false,
}
}

fn is_seed_address(&self, addr: &Multiaddr) -> bool {
let mut addr = addr.clone();

// If the address ends with a p2p protocol, we need to remove it
// before checking if it's a seed address.
if let Some(Protocol::P2p(_)) = addr.iter().last() {
addr.pop();
}

self.config.seed_addresses.iter().any(|seed| seed == &addr)
}

fn local_peer_id(&self) -> PeerId {
self.config.local_peer_id
}

pub fn add_seed_addresses<T>(&mut self, seed_addresses: T) -> &mut Self
where
T: IntoIterator<Item = Multiaddr>,
{
self.config.seed_addresses.extend(seed_addresses);
matteojug marked this conversation as resolved.
Show resolved Hide resolved
tracing::debug!(addresses = ?self.config.seed_addresses, "added seed addresses");
self
}
}

impl NetworkBehaviour for Behavior {
type ConnectionHandler = libp2p::swarm::dummy::ConnectionHandler;
type ToSwarm = BootstrapEvent;

fn handle_established_inbound_connection(
&mut self,
_connection_id: libp2p::swarm::ConnectionId,
_peer: PeerId,
_local_addr: &Multiaddr,
_remote_addr: &Multiaddr,
) -> Result<libp2p::swarm::THandler<Self>, libp2p::swarm::ConnectionDenied> {
Ok(libp2p::swarm::dummy::ConnectionHandler)
}

fn handle_established_outbound_connection(
&mut self,
_connection_id: libp2p::swarm::ConnectionId,
_peer: PeerId,
_addr: &Multiaddr,
_role_override: libp2p::core::Endpoint,
_port_use: libp2p::core::transport::PortUse,
) -> Result<libp2p::swarm::THandler<Self>, libp2p::swarm::ConnectionDenied> {
Ok(libp2p::swarm::dummy::ConnectionHandler)
}

fn handle_pending_outbound_connection(
&mut self,
_connection_id: libp2p::swarm::ConnectionId,
_maybe_peer: Option<PeerId>,
addresses: &[Multiaddr],
effective_role: libp2p::core::Endpoint,
) -> Result<Vec<Multiaddr>, libp2p::swarm::ConnectionDenied> {
tracing::debug!(?addresses, "handling pending outbound connection");
let addresses = addresses.to_vec();

// We're only interested in our outbound dialing activity. This _can_ be
// a listener if the swarm is attempting a hole-punch.
if !effective_role.is_dialer() {
return Ok(addresses);
}

for addr in &addresses {
if self.is_seed_address(addr) {
tracing::debug!(%addr, "attempting to dial seed address");
}
}

// Return the addresses as-is.
Ok(addresses)
matteojug marked this conversation as resolved.
Show resolved Hide resolved
}

fn on_swarm_event(&mut self, event: libp2p::swarm::FromSwarm) {
match event {
FromSwarm::ConnectionEstablished(e) => {
// We're only interested in our outbound activity.
if !e.endpoint.is_dialer() {
return;
}

// If we've connected to ourselves, we don't need to do anything.
// This might happen if a node has its own address in the seed list.
if e.peer_id == self.local_peer_id() {
return;
}

// Increment our total connection count. We're not interested
// in counting connections to ourselves.
self.connection_count = self.connection_count.saturating_add(1);
matteojug marked this conversation as resolved.
Show resolved Hide resolved

// Get the remote address, and remove the p2p protocol if it exists.
let mut addr = e.endpoint.get_remote_address().clone();
if matches!(addr.iter().last(), Some(Protocol::P2p(_))) {
addr.pop();
}

// Check if the address is a seed address. If it's not, then
// we don't need to do anything so we return early.
if self.is_seed_address(&addr) {
tracing::debug!(peer_id = %e.peer_id, %addr, "connected to seed");
} else {
return;
}

// Update our connected seeds map.
match self.connected_seeds.entry(e.peer_id) {
matteojug marked this conversation as resolved.
Show resolved Hide resolved
Entry::Occupied(mut entry) => {
if entry.get_mut().insert(addr) {
tracing::trace!("added connected seed: {:?}", e.peer_id);
matteojug marked this conversation as resolved.
Show resolved Hide resolved
}
}
Entry::Vacant(entry) => {
let mut set = HashSet::new();
set.insert(addr);
entry.insert(set);
}
}
}
FromSwarm::ConnectionClosed(e) => {
// If this was a connection to ourselves then we don't need to do
// anything.
if e.peer_id == self.local_peer_id() {
return;
}
matteojug marked this conversation as resolved.
Show resolved Hide resolved

// Decrement our total connection count.
self.connection_count = self.connection_count.saturating_sub(1);

// Get the remote address, and remove the p2p protocol if it exists.
let mut addr = e.endpoint.get_remote_address().clone();
if matches!(addr.iter().last(), Some(Protocol::P2p(_))) {
addr.pop();
}

// Check if the address is a seed address. If it's not, then
// we don't need to do anything so we return early.
if self.is_seed_address(&addr) {
tracing::debug!(peer_id = %e.peer_id, %addr, "disconnected from seed");
} else {
return;
}

// Update our connected seeds map.
if let Some(set) = self.connected_seeds.get_mut(&e.peer_id) {
if set.remove(&addr) {
tracing::trace!("removed connected seed: {:?}", e.peer_id);
}
}
}
_ => {}
}
}

fn on_connection_handler_event(
&mut self,
_peer_id: PeerId,
_connection_id: libp2p::swarm::ConnectionId,
_event: libp2p::swarm::THandlerOutEvent<Self>,
) {
}

#[tracing::instrument(skip_all)]
fn poll(
&mut self,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<libp2p::swarm::ToSwarm<Self::ToSwarm, libp2p::swarm::THandlerInEvent<Self>>>
{
// If we have any pending events, we return them immediately.
if let Some(event) = self.pending_events.pop_front() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use self.next_pending_event()? It may end up uglier, but at least we are using the same interface

return Poll::Ready(event);
}

// If we have any connections, we consider ourselves bootstrapped.
// Determine if there was a state change or not and return the correct
// poll result.
if self.connection_count >= 1 {
return match self.is_bootstrapped {
true => Poll::Pending,
false => {
// We've just bootstrapped, so we generate an event to notify the
// swarm.
tracing::debug!(connection_count = %self.connection_count, "bootstrapping complete");
self.is_bootstrapped = true;
Poll::Ready(ToSwarm::GenerateEvent(BootstrapEvent::Complete))
}
};
}

// If we're here then we're not connected to any peers. If our current
// state is bootstrapped, we need to re-bootstrap.
if self.is_bootstrapped {
tracing::debug!(
"state is bootstrapped but not connected to any peers; re-bootstrapping is needed"
);
self.is_bootstrapped = false;
self.pending_events
.push_back(ToSwarm::GenerateEvent(BootstrapEvent::Needed));
}

// If we've attempted to bootstrap recently, we wait until the interval
// has passed.
if let Some(last_bootstrap) = self.last_attempted_at {
if last_bootstrap.elapsed() < self.config.bootstrap_interval {
return Poll::Pending;
matteojug marked this conversation as resolved.
Show resolved Hide resolved
}
}

// Queue the bootstrap started event.
tracing::debug!(addresses = ?self.config.seed_addresses, "initiating network bootstrapping from seed addresses");
matteojug marked this conversation as resolved.
Show resolved Hide resolved
self.pending_events
.push_back(ToSwarm::GenerateEvent(BootstrapEvent::Started {
addresses: self.config.seed_addresses.clone(),
}));

// Iterate over the seed addresses and queue dial events for each. Note
// that we queue a dial event for each seed address, regardless of our
// current connection count (which will then be propagated in subsequent
// polls). This is to ensure the best chance of connecting to the
// network.
self.config.seed_addresses.iter().for_each(|addr| {
// Construct the dialing options and `Dial` event which will be
// sent to the swarm.
let dial_opts = DialOpts::unknown_peer_id().address(addr.clone()).build();
let event = ToSwarm::Dial { opts: dial_opts };

// Queue the dial event.
self.pending_events.push_back(event);
});

// Update the last bootstrap attempt time.
self.last_attempted_at = Some(Instant::now());

Poll::Pending
matteojug marked this conversation as resolved.
Show resolved Hide resolved
}
}
23 changes: 23 additions & 0 deletions signer/src/network/libp2p/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::context::{Context, P2PEvent, SignerCommand, SignerSignal};
use crate::error::Error;
use crate::network::Msg;

use super::bootstrap::BootstrapEvent;
use super::swarm::{SignerBehavior, SignerBehaviorEvent};
use super::TOPIC;

Expand Down Expand Up @@ -166,6 +167,17 @@ pub async fn run(ctx: &impl Context, swarm: Arc<Mutex<Swarm<SignerBehavior>>>) {
"autonat server event"
);
}
SwarmEvent::Behaviour(SignerBehaviorEvent::Bootstrap(event)) => match event {
BootstrapEvent::Complete => {
tracing::info!("network bootstraping complete");
matteojug marked this conversation as resolved.
Show resolved Hide resolved
}
BootstrapEvent::Started { addresses } => {
tracing::info!(?addresses, "network bootstrapping started");
}
BootstrapEvent::Needed => {
tracing::info!("no connected peers; network bootstrapping needed");
}
},
// The derived `SwarmEvent` is marked as #[non_exhaustive], so we must have a
// catch-all.
event => tracing::trace!(?event, "unhandled swarm event"),
Expand Down Expand Up @@ -400,6 +412,17 @@ fn handle_gossipsub_event(
Event::GossipsubNotSupported { peer_id } => {
tracing::warn!(%peer_id, "peer does not support gossipsub");
}
Event::SlowPeer { peer_id, failed_messages } => {
tracing::warn!(
%peer_id,
failed_publishes = %failed_messages.publish,
failed_forwards = %failed_messages.forward,
failed_priority = %failed_messages.priority,
failed_non_priority = %failed_messages.non_priority,
failed_timeout = %failed_messages.timeout,
"peer has been flagged as slow"
);
}
}
}

Expand Down
1 change: 1 addition & 0 deletions signer/src/network/libp2p/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::sync::LazyLock;

use libp2p::gossipsub::IdentTopic;

mod bootstrap;
mod errors;
mod event_loop;
mod multiaddr;
Expand Down
Loading