From 248466c158f35c270b8d38703267bdf6d4b1ce74 Mon Sep 17 00:00:00 2001 From: diego Date: Fri, 13 Dec 2024 11:18:02 +0100 Subject: [PATCH 01/23] update discv5 to 0.9.0 --- Cargo.lock | 33 +-------------------------------- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 33 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0fa4f900..644090c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1939,37 +1939,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "discv5" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23e6b70634e26c909d1edbb3142b3eaf3b89da0e52f284f00ca7c80d9901ad9e" -dependencies = [ - "aes 0.8.4", - "aes-gcm", - "alloy-rlp", - "arrayvec", - "ctr 0.9.2", - "delay_map 0.4.0", - "enr 0.12.1", - "fnv", - "futures", - "hashlink 0.9.1", - "hex", - "hkdf", - "lazy_static", - "lru", - "more-asserts", - "parking_lot 0.12.3", - "rand", - "smallvec", - "socket2 0.5.8", - "tokio", - "tracing", - "uint 0.10.0", - "zeroize", -] - [[package]] name = "discv5" version = "0.9.0" @@ -5295,7 +5264,7 @@ version = "0.1.0" dependencies = [ "async-channel", "dirs 5.0.1", - "discv5 0.8.0", + "discv5 0.9.0", "futures", "libp2p", "lighthouse_network 0.2.0 (git+https://github.com/sigp/lighthouse?branch=unstable)", diff --git a/Cargo.toml b/Cargo.toml index aa1e61d6..6faa7e66 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,7 +37,7 @@ derive_more = { version = "1.0.0", features = ["full"] } async-channel = "1.9" axum = "0.7.7" clap = { version = "4.5.15", features = ["derive", "wrap_help"] } -discv5 = "0.8.0" +discv5 = "0.9.0" dirs = "5.0.1" either = "1.13.0" futures = "0.3.30" From f8ca4b47446e212194becb7f1c76560611f14982 Mon Sep 17 00:00:00 2001 From: diego Date: Fri, 13 Dec 2024 11:20:24 +0100 Subject: [PATCH 02/23] add discovery module --- anchor/network/src/discovery.rs | 0 anchor/network/src/lib.rs | 1 + 2 files changed, 1 insertion(+) create mode 100644 anchor/network/src/discovery.rs diff --git a/anchor/network/src/discovery.rs b/anchor/network/src/discovery.rs new file mode 100644 index 00000000..e69de29b diff --git a/anchor/network/src/lib.rs b/anchor/network/src/lib.rs index 161c0960..1dcfde25 100644 --- a/anchor/network/src/lib.rs +++ b/anchor/network/src/lib.rs @@ -6,6 +6,7 @@ mod keypair_utils; mod network; mod transport; mod types; +mod discovery; pub use config::Config; pub use lighthouse_network::{ListenAddr, ListenAddress}; From c016651b3781189050a8942973dba0477f846388 Mon Sep 17 00:00:00 2001 From: diego Date: Fri, 13 Dec 2024 11:22:42 +0100 Subject: [PATCH 03/23] add Discovery struct and implement NetworkBehaviour --- anchor/network/src/discovery.rs | 38 +++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/anchor/network/src/discovery.rs b/anchor/network/src/discovery.rs index e69de29b..04d81716 100644 --- a/anchor/network/src/discovery.rs +++ b/anchor/network/src/discovery.rs @@ -0,0 +1,38 @@ +use std::task::{Context, Poll}; + +use discv5::Discv5; +use discv5::libp2p_identity::PeerId; +use discv5::multiaddr::Multiaddr; +use libp2p::core::Endpoint; +use libp2p::core::transport::PortUse; +use libp2p::swarm::{ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm}; +use libp2p::swarm::dummy::ConnectionHandler; + +pub struct Discovery { + pub discv5: Discv5, +} + +impl NetworkBehaviour for Discovery { + type ConnectionHandler = ConnectionHandler; + type ToSwarm = (); + + fn handle_established_inbound_connection(&mut self, _connection_id: ConnectionId, peer: PeerId, local_addr: &Multiaddr, remote_addr: &Multiaddr) -> Result, ConnectionDenied> { + todo!() + } + + fn handle_established_outbound_connection(&mut self, _connection_id: ConnectionId, peer: PeerId, addr: &Multiaddr, role_override: Endpoint, port_use: PortUse) -> Result, ConnectionDenied> { + todo!() + } + + fn on_swarm_event(&mut self, event: FromSwarm) { + + } + + fn on_connection_handler_event(&mut self, _peer_id: PeerId, _connection_id: ConnectionId, _event: THandlerOutEvent) { + todo!() + } + + fn poll(&mut self, cx: &mut Context<'_>) -> Poll>> { + todo!() + } +} From daff657b3cb8f763eb3029faca053d5b16401446 Mon Sep 17 00:00:00 2001 From: diego Date: Fri, 13 Dec 2024 11:24:39 +0100 Subject: [PATCH 04/23] add build_enr function --- anchor/network/src/discovery.rs | 80 ++++++++++++++++++++++++++++++++- 1 file changed, 79 insertions(+), 1 deletion(-) diff --git a/anchor/network/src/discovery.rs b/anchor/network/src/discovery.rs index 04d81716..48d064a2 100644 --- a/anchor/network/src/discovery.rs +++ b/anchor/network/src/discovery.rs @@ -1,12 +1,15 @@ use std::task::{Context, Poll}; -use discv5::Discv5; +use discv5::{Discv5, Enr}; +use discv5::enr::CombinedKey; use discv5::libp2p_identity::PeerId; use discv5::multiaddr::Multiaddr; use libp2p::core::Endpoint; use libp2p::core::transport::PortUse; use libp2p::swarm::{ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm}; use libp2p::swarm::dummy::ConnectionHandler; +use lighthouse_network::discovery::enr_ext::{QUIC6_ENR_KEY, QUIC_ENR_KEY}; +use crate::Config; pub struct Discovery { pub discv5: Discv5, @@ -36,3 +39,78 @@ impl NetworkBehaviour for Discovery { todo!() } } + +/// Builds a anchor ENR given a `network::Config`. +pub fn build_enr( + enr_key: &CombinedKey, + config: &Config, +) -> Result { + let mut builder = discv5::enr::Enr::builder(); + let (maybe_ipv4_address, maybe_ipv6_address) = &config.enr_address; + + if let Some(ip) = maybe_ipv4_address { + builder.ip4(*ip); + } + + if let Some(ip) = maybe_ipv6_address { + builder.ip6(*ip); + } + + if let Some(udp4_port) = config.enr_udp4_port { + builder.udp4(udp4_port.get()); + } + + if let Some(udp6_port) = config.enr_udp6_port { + builder.udp6(udp6_port.get()); + } + + // Add QUIC fields to the ENR. + // Since QUIC is used as an alternative transport for the libp2p protocols, + // the related fields should only be added when both QUIC and libp2p are enabled + if !config.disable_quic_support { + // If we are listening on ipv4, add the quic ipv4 port. + if let Some(quic4_port) = config.enr_quic4_port.or_else(|| { + config + .listen_addresses + .v4() + .and_then(|v4_addr| v4_addr.quic_port.try_into().ok()) + }) { + builder.add_value(QUIC_ENR_KEY, &quic4_port.get()); + } + + // If we are listening on ipv6, add the quic ipv6 port. + if let Some(quic6_port) = config.enr_quic6_port.or_else(|| { + config + .listen_addresses + .v6() + .and_then(|v6_addr| v6_addr.quic_port.try_into().ok()) + }) { + builder.add_value(QUIC6_ENR_KEY, &quic6_port.get()); + } + } + + // If the ENR port is not set, and we are listening over that ip version, use the listening port instead. + let tcp4_port = config.enr_tcp4_port.or_else(|| { + config + .listen_addresses + .v4() + .and_then(|v4_addr| v4_addr.tcp_port.try_into().ok()) + }); + if let Some(tcp4_port) = tcp4_port { + builder.tcp4(tcp4_port.get()); + } + + let tcp6_port = config.enr_tcp6_port.or_else(|| { + config + .listen_addresses + .v6() + .and_then(|v6_addr| v6_addr.tcp_port.try_into().ok()) + }); + if let Some(tcp6_port) = tcp6_port { + builder.tcp6(tcp6_port.get()); + } + + builder + .build(enr_key) + .map_err(|e| format!("Could not build Local ENR: {:?}", e)) +} From e1b09d69c359215352bafdcf263de295d79b8977 Mon Sep 17 00:00:00 2001 From: diego Date: Fri, 13 Dec 2024 11:25:44 +0100 Subject: [PATCH 05/23] add discovery to AnchorBehaviour --- anchor/network/src/behaviour.rs | 4 ++++ anchor/network/src/network.rs | 25 ++++++++++++++++++++++--- 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/anchor/network/src/behaviour.rs b/anchor/network/src/behaviour.rs index f52459a5..0b8bbc63 100644 --- a/anchor/network/src/behaviour.rs +++ b/anchor/network/src/behaviour.rs @@ -1,5 +1,7 @@ +use discv5::Discv5; use libp2p::swarm::NetworkBehaviour; use libp2p::{gossipsub, identify, ping}; +use crate::discovery::Discovery; #[derive(NetworkBehaviour)] pub struct AnchorBehaviour { @@ -9,4 +11,6 @@ pub struct AnchorBehaviour { pub ping: ping::Behaviour, /// The routing pub-sub mechanism for Anchor. pub gossipsub: gossipsub::Behaviour, + /// Discv5 Discovery protocol. + pub discovery: Discovery, } diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index dc0325a9..055eb075 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -1,3 +1,4 @@ +use std::net::Ipv4Addr; use crate::behaviour::AnchorBehaviour; use crate::behaviour::AnchorBehaviourEvent; use crate::keypair_utils::load_private_key; @@ -15,8 +16,13 @@ use lighthouse_network::discv5::enr::k256::sha2::{Digest, Sha256}; use std::num::{NonZeroU8, NonZeroUsize}; use std::pin::Pin; use std::time::Duration; +use discv5::Discv5; +use discv5::enr::Enr; +use lighthouse_network::CombinedKeyExt; +use lighthouse_network::discovery::CombinedKey; use task_executor::TaskExecutor; use tracing::{info, log}; +use crate::discovery::{build_enr, Discovery}; pub struct Network { swarm: Swarm, @@ -29,7 +35,7 @@ impl Network { pub async fn try_new(config: &Config, executor: TaskExecutor) -> Result { let local_keypair: Keypair = load_private_key(&config.network_dir); let transport = build_transport(local_keypair.clone(), !config.disable_quic_support); - let behaviour = build_anchor_behaviour(local_keypair.clone()); + let behaviour = build_anchor_behaviour(local_keypair.clone(), config); let peer_id = local_keypair.public().to_peer_id(); let mut network = Network { @@ -102,7 +108,7 @@ impl Network { } } -fn build_anchor_behaviour(local_keypair: Keypair) -> AnchorBehaviour { +fn build_anchor_behaviour(local_keypair: Keypair, network_config: &Config) -> AnchorBehaviour { // TODO setup discv5 let identify = { let local_public_key = local_keypair.public(); @@ -140,12 +146,25 @@ fn build_anchor_behaviour(local_keypair: Keypair) -> AnchorBehaviour { .unwrap(); let gossipsub = - gossipsub::Behaviour::new(MessageAuthenticity::Signed(local_keypair), config).unwrap(); + gossipsub::Behaviour::new(MessageAuthenticity::Signed(local_keypair.clone()), config).unwrap(); + + let discv5_listen_config = + discv5::ListenConfig::from_ip(Ipv4Addr::UNSPECIFIED.into(), 9000); + + // discv5 configuration + let discv5_config = discv5::ConfigBuilder::new(discv5_listen_config) + .build(); + + // convert the keypair into an ENR key + let enr_key: CombinedKey = CombinedKey::from_libp2p(local_keypair).unwrap(); + let enr = build_enr(&enr_key, network_config).unwrap(); + let discv5 = Discv5::new(enr, enr_key, discv5_config).unwrap(); AnchorBehaviour { identify, ping: ping::Behaviour::default(), gossipsub, + discovery: Discovery { discv5 }, } } From 467b48a677e8a6a352226508aba40a9d7c83bdf4 Mon Sep 17 00:00:00 2001 From: diego Date: Fri, 13 Dec 2024 11:27:41 +0100 Subject: [PATCH 06/23] cargo fmt --- anchor/network/src/behaviour.rs | 2 +- anchor/network/src/discovery.rs | 49 +++++++++++++++++++++++---------- anchor/network/src/lib.rs | 2 +- anchor/network/src/network.rs | 21 +++++++------- 4 files changed, 46 insertions(+), 28 deletions(-) diff --git a/anchor/network/src/behaviour.rs b/anchor/network/src/behaviour.rs index 0b8bbc63..5e89ed66 100644 --- a/anchor/network/src/behaviour.rs +++ b/anchor/network/src/behaviour.rs @@ -1,7 +1,7 @@ +use crate::discovery::Discovery; use discv5::Discv5; use libp2p::swarm::NetworkBehaviour; use libp2p::{gossipsub, identify, ping}; -use crate::discovery::Discovery; #[derive(NetworkBehaviour)] pub struct AnchorBehaviour { diff --git a/anchor/network/src/discovery.rs b/anchor/network/src/discovery.rs index 48d064a2..9959d6bd 100644 --- a/anchor/network/src/discovery.rs +++ b/anchor/network/src/discovery.rs @@ -1,15 +1,18 @@ use std::task::{Context, Poll}; -use discv5::{Discv5, Enr}; +use crate::Config; use discv5::enr::CombinedKey; use discv5::libp2p_identity::PeerId; use discv5::multiaddr::Multiaddr; -use libp2p::core::Endpoint; +use discv5::{Discv5, Enr}; use libp2p::core::transport::PortUse; -use libp2p::swarm::{ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm}; +use libp2p::core::Endpoint; use libp2p::swarm::dummy::ConnectionHandler; +use libp2p::swarm::{ + ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent, + THandlerOutEvent, ToSwarm, +}; use lighthouse_network::discovery::enr_ext::{QUIC6_ENR_KEY, QUIC_ENR_KEY}; -use crate::Config; pub struct Discovery { pub discv5: Discv5, @@ -19,32 +22,48 @@ impl NetworkBehaviour for Discovery { type ConnectionHandler = ConnectionHandler; type ToSwarm = (); - fn handle_established_inbound_connection(&mut self, _connection_id: ConnectionId, peer: PeerId, local_addr: &Multiaddr, remote_addr: &Multiaddr) -> Result, ConnectionDenied> { + fn handle_established_inbound_connection( + &mut self, + _connection_id: ConnectionId, + peer: PeerId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result, ConnectionDenied> { todo!() } - fn handle_established_outbound_connection(&mut self, _connection_id: ConnectionId, peer: PeerId, addr: &Multiaddr, role_override: Endpoint, port_use: PortUse) -> Result, ConnectionDenied> { + fn handle_established_outbound_connection( + &mut self, + _connection_id: ConnectionId, + peer: PeerId, + addr: &Multiaddr, + role_override: Endpoint, + port_use: PortUse, + ) -> Result, ConnectionDenied> { todo!() } - fn on_swarm_event(&mut self, event: FromSwarm) { - - } + fn on_swarm_event(&mut self, event: FromSwarm) {} - fn on_connection_handler_event(&mut self, _peer_id: PeerId, _connection_id: ConnectionId, _event: THandlerOutEvent) { + fn on_connection_handler_event( + &mut self, + _peer_id: PeerId, + _connection_id: ConnectionId, + _event: THandlerOutEvent, + ) { todo!() } - fn poll(&mut self, cx: &mut Context<'_>) -> Poll>> { + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { todo!() } } /// Builds a anchor ENR given a `network::Config`. -pub fn build_enr( - enr_key: &CombinedKey, - config: &Config, -) -> Result { +pub fn build_enr(enr_key: &CombinedKey, config: &Config) -> Result { let mut builder = discv5::enr::Enr::builder(); let (maybe_ipv4_address, maybe_ipv6_address) = &config.enr_address; diff --git a/anchor/network/src/lib.rs b/anchor/network/src/lib.rs index 1dcfde25..4e461fcb 100644 --- a/anchor/network/src/lib.rs +++ b/anchor/network/src/lib.rs @@ -2,11 +2,11 @@ mod behaviour; mod config; +mod discovery; mod keypair_utils; mod network; mod transport; mod types; -mod discovery; pub use config::Config; pub use lighthouse_network::{ListenAddr, ListenAddress}; diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index 055eb075..2b039ffc 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -1,9 +1,11 @@ -use std::net::Ipv4Addr; use crate::behaviour::AnchorBehaviour; use crate::behaviour::AnchorBehaviourEvent; +use crate::discovery::{build_enr, Discovery}; use crate::keypair_utils::load_private_key; use crate::transport::build_transport; use crate::Config; +use discv5::enr::Enr; +use discv5::Discv5; use futures::StreamExt; use libp2p::core::muxing::StreamMuxerBox; use libp2p::core::transport::Boxed; @@ -12,17 +14,15 @@ use libp2p::identity::Keypair; use libp2p::multiaddr::Protocol; use libp2p::swarm::SwarmEvent; use libp2p::{futures, gossipsub, identify, ping, PeerId, Swarm, SwarmBuilder}; +use lighthouse_network::discovery::CombinedKey; use lighthouse_network::discv5::enr::k256::sha2::{Digest, Sha256}; +use lighthouse_network::CombinedKeyExt; +use std::net::Ipv4Addr; use std::num::{NonZeroU8, NonZeroUsize}; use std::pin::Pin; use std::time::Duration; -use discv5::Discv5; -use discv5::enr::Enr; -use lighthouse_network::CombinedKeyExt; -use lighthouse_network::discovery::CombinedKey; use task_executor::TaskExecutor; use tracing::{info, log}; -use crate::discovery::{build_enr, Discovery}; pub struct Network { swarm: Swarm, @@ -146,14 +146,13 @@ fn build_anchor_behaviour(local_keypair: Keypair, network_config: &Config) -> An .unwrap(); let gossipsub = - gossipsub::Behaviour::new(MessageAuthenticity::Signed(local_keypair.clone()), config).unwrap(); + gossipsub::Behaviour::new(MessageAuthenticity::Signed(local_keypair.clone()), config) + .unwrap(); - let discv5_listen_config = - discv5::ListenConfig::from_ip(Ipv4Addr::UNSPECIFIED.into(), 9000); + let discv5_listen_config = discv5::ListenConfig::from_ip(Ipv4Addr::UNSPECIFIED.into(), 9000); // discv5 configuration - let discv5_config = discv5::ConfigBuilder::new(discv5_listen_config) - .build(); + let discv5_config = discv5::ConfigBuilder::new(discv5_listen_config).build(); // convert the keypair into an ENR key let enr_key: CombinedKey = CombinedKey::from_libp2p(local_keypair).unwrap(); From a12a8e96fad04460e8c1bac36692120f48e904c8 Mon Sep 17 00:00:00 2001 From: diego Date: Mon, 16 Dec 2024 23:38:07 +0100 Subject: [PATCH 07/23] expose Enr --- anchor/network/src/lib.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/anchor/network/src/lib.rs b/anchor/network/src/lib.rs index 4e461fcb..0d1b2cf7 100644 --- a/anchor/network/src/lib.rs +++ b/anchor/network/src/lib.rs @@ -11,3 +11,5 @@ mod types; pub use config::Config; pub use lighthouse_network::{ListenAddr, ListenAddress}; pub use network::Network; + +pub type Enr = discv5::enr::Enr; \ No newline at end of file From f71df58de250f01421dd4a473608d6e347962258 Mon Sep 17 00:00:00 2001 From: diego Date: Mon, 16 Dec 2024 23:38:40 +0100 Subject: [PATCH 08/23] add disable_discovery --- anchor/network/src/config.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/anchor/network/src/config.rs b/anchor/network/src/config.rs index 921c6630..3587da76 100644 --- a/anchor/network/src/config.rs +++ b/anchor/network/src/config.rs @@ -55,6 +55,9 @@ pub struct Config { /// Disables peer scoring altogether. pub disable_peer_scoring: bool, + /// Disables the discovery protocol from starting. + pub disable_discovery: bool, + /// Disables quic support. pub disable_quic_support: bool, @@ -94,6 +97,7 @@ impl Default for Config { boot_nodes_enr: vec![], boot_nodes_multiaddr: vec![], disable_peer_scoring: false, + disable_discovery: false, disable_quic_support: false, topics: vec![], } From 1f3d9554d117746b11ef22ee9821622be4c2e00b Mon Sep 17 00:00:00 2001 From: diego Date: Mon, 16 Dec 2024 23:39:08 +0100 Subject: [PATCH 09/23] hardcode holesky bootnode --- anchor/client/src/config.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/anchor/client/src/config.rs b/anchor/client/src/config.rs index 9cb8e1ea..01289358 100644 --- a/anchor/client/src/config.rs +++ b/anchor/client/src/config.rs @@ -2,7 +2,7 @@ // use clap_utils::{flags::DISABLE_MALLOC_TUNING_FLAG, parse_optional, parse_required}; use crate::cli::Anchor; -use network::{ListenAddr, ListenAddress}; +use network::{Enr, ListenAddr, ListenAddress}; use sensitive_url::SensitiveUrl; use serde::{Deserialize, Serialize}; use std::fs; @@ -131,6 +131,12 @@ pub fn from_cli(cli_args: &Anchor) -> Result { */ config.network.listen_addresses = parse_listening_addresses(cli_args)?; + let mut enrs: Vec = vec![]; + if let Ok(enr) = "enr:-Li4QFIQzamdvTxGJhvcXG_DFmCeyggSffDnllY5DiU47pd_K_1MRnSaJimWtfKJ-MD46jUX9TwgW5Jqe0t4pH41RYWGAYuFnlyth2F0dG5ldHOIAAAAAAAAAACEZXRoMpD1pf1CAAAAAP__________gmlkgnY0gmlwhCLdu_SJc2VjcDI1NmsxoQN4v-N9zFYwEqzGPBBX37q24QPFvAVUtokIo1fblIsmTIN0Y3CCE4uDdWRwgg-j".parse::() { + enrs.push(enr.clone()); + } + config.network.boot_nodes_enr = enrs; + config.beacon_nodes_tls_certs = cli_args.beacon_nodes_tls_certs.clone(); config.execution_nodes_tls_certs = cli_args.execution_nodes_tls_certs.clone(); From b5cdbca27e62cfcfb91a14f7be3463e5f44d973b Mon Sep 17 00:00:00 2001 From: diego Date: Mon, 16 Dec 2024 23:40:00 +0100 Subject: [PATCH 10/23] add discovery --- anchor/network/src/behaviour.rs | 1 - anchor/network/src/discovery.rs | 392 ++++++++++++++++++++++++++++---- anchor/network/src/network.rs | 67 +++--- 3 files changed, 388 insertions(+), 72 deletions(-) diff --git a/anchor/network/src/behaviour.rs b/anchor/network/src/behaviour.rs index 5e89ed66..baa2dd17 100644 --- a/anchor/network/src/behaviour.rs +++ b/anchor/network/src/behaviour.rs @@ -1,5 +1,4 @@ use crate::discovery::Discovery; -use discv5::Discv5; use libp2p::swarm::NetworkBehaviour; use libp2p::{gossipsub, identify, ping}; diff --git a/anchor/network/src/discovery.rs b/anchor/network/src/discovery.rs index 9959d6bd..998583f3 100644 --- a/anchor/network/src/discovery.rs +++ b/anchor/network/src/discovery.rs @@ -1,69 +1,377 @@ +use std::collections::HashMap; +use std::future::Future; +use std::net::Ipv4Addr; +use std::pin::Pin; use std::task::{Context, Poll}; +use std::time::Instant; -use crate::Config; -use discv5::enr::CombinedKey; -use discv5::libp2p_identity::PeerId; -use discv5::multiaddr::Multiaddr; use discv5::{Discv5, Enr}; -use libp2p::core::transport::PortUse; +use discv5::enr::{CombinedKey, NodeId}; +use discv5::libp2p_identity::{Keypair, PeerId}; +use discv5::multiaddr::Multiaddr; +use futures::{StreamExt, TryFutureExt}; +use futures::FutureExt; +use futures::stream::FuturesUnordered; use libp2p::core::Endpoint; +use libp2p::core::transport::PortUse; +use libp2p::swarm::{ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm}; use libp2p::swarm::dummy::ConnectionHandler; -use libp2p::swarm::{ - ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent, - THandlerOutEvent, ToSwarm, -}; +use lighthouse_network::{CombinedKeyExt, Subnet}; +use lighthouse_network::discovery::DiscoveredPeers; use lighthouse_network::discovery::enr_ext::{QUIC6_ENR_KEY, QUIC_ENR_KEY}; +use tokio::sync::mpsc; + +use crate::Config; + +/// The number of closest peers to search for when doing a regular peer search. +/// +/// We could reduce this constant to speed up queries however at the cost of security. It will +/// make it easier to peers to eclipse this node. Kademlia suggests a value of 16. +pub const FIND_NODE_QUERY_CLOSEST_PEERS: usize = 16; + +#[derive(Debug, Clone, PartialEq)] +struct SubnetQuery { + subnet: Subnet, + min_ttl: Option, + retries: usize, +} + +#[derive(Debug, Clone, PartialEq)] +enum QueryType { + /// We are searching for subnet peers. + Subnet(Vec), + /// We are searching for more peers without ENR or time constraints. + FindPeers, +} + +/// The result of a query. +struct QueryResult { + query_type: QueryType, + result: Result, discv5::QueryError>, +} + +// Awaiting the event stream future +enum EventStream { + /// Awaiting an event stream to be generated. This is required due to the poll nature of + /// `Discovery` + Awaiting( + Pin, discv5::Error>> + Send>>, + ), + /// The future has completed. + Present(mpsc::Receiver), + // The future has failed or discv5 has been disabled. There are no events from discv5. + InActive, +} pub struct Discovery { - pub discv5: Discv5, + /// The handle for the underlying discv5 Server. + /// + /// This is behind a Reference counter to allow for futures to be spawned and polled with a + /// static lifetime. + discv5: Discv5, + + /// Indicates if we are actively searching for peers. We only allow a single FindPeers query at + /// a time, regardless of the query concurrency. + find_peer_active: bool, + + /// Active discovery queries. + active_queries: FuturesUnordered + Send>>>, + + /// The discv5 event stream. + event_stream: EventStream, + + /// Indicates if the discovery service has been started. When the service is disabled, this is + /// always false. + pub started: bool, } -impl NetworkBehaviour for Discovery { - type ConnectionHandler = ConnectionHandler; - type ToSwarm = (); +impl Discovery { + pub async fn new( + local_keypair: Keypair, + network_config: &Config, + ) -> Result { - fn handle_established_inbound_connection( - &mut self, - _connection_id: ConnectionId, - peer: PeerId, - local_addr: &Multiaddr, - remote_addr: &Multiaddr, - ) -> Result, ConnectionDenied> { - todo!() - } + let _enr_dir = match network_config.network_dir.to_str() { + Some(path) => String::from(path), + None => String::from(""), + }; - fn handle_established_outbound_connection( - &mut self, - _connection_id: ConnectionId, - peer: PeerId, - addr: &Multiaddr, - role_override: Endpoint, - port_use: PortUse, - ) -> Result, ConnectionDenied> { - todo!() + // TODO info!(log, "ENR Initialised"; "enr" => local_enr.to_base64(), "seq" => local_enr.seq(), "id"=> %local_enr.node_id(), + // "ip4" => ?local_enr.ip4(), "udp4"=> ?local_enr.udp4(), "tcp4" => ?local_enr.tcp4(), "tcp6" => ?local_enr.tcp6(), "udp6" => ?local_enr.udp6(), + // "quic4" => ?local_enr.quic4(), "quic6" => ?local_enr.quic6() + // ); + + let discv5_listen_config = discv5::ListenConfig::from_ip(Ipv4Addr::UNSPECIFIED.into(), 9000); + + // discv5 configuration + let discv5_config = discv5::ConfigBuilder::new(discv5_listen_config).build(); + + // convert the keypair into an ENR key + let enr_key: CombinedKey = CombinedKey::from_libp2p(local_keypair)?; + + let enr = build_enr(&enr_key, network_config).unwrap(); + let mut discv5 = Discv5::new(enr, enr_key, discv5_config) + .map_err(|e| format!("Discv5 service failed. Error: {:?}", e))?; + + // Add bootnodes to routing table + for bootnode_enr in network_config.boot_nodes_enr.clone() { + // TODO if bootnode_enr.node_id() == local_node_id { + // // If we are a boot node, ignore adding it to the routing table + // continue; + // } + // TODO debug!( + // log, + // "Adding node to routing table"; + // "node_id" => %bootnode_enr.node_id(), + // "peer_id" => %bootnode_enr.peer_id(), + // "ip" => ?bootnode_enr.ip4(), + // "udp" => ?bootnode_enr.udp4(), + // "tcp" => ?bootnode_enr.tcp4(), + // "quic" => ?bootnode_enr.quic4() + // ); + + //let repr = bootnode_enr.to_string(); + let _ = discv5.add_enr(bootnode_enr).map_err(|_e| { + // TODO error!( + // log, + // "Could not add peer to the local routing table"; + // "addr" => repr, + // "error" => e.to_string(), + // ) + }); + } + + // Start the discv5 service and obtain an event stream + let event_stream = if !network_config.disable_discovery { + discv5.start().map_err(|e| e.to_string()).await?; + // TODO debug!(log, "Discovery service started"); + EventStream::Awaiting(Box::pin(discv5.event_stream())) + } else { + EventStream::InActive + }; + + if !network_config.boot_nodes_multiaddr.is_empty() { + // TODO info!(log, "Contacting Multiaddr boot-nodes for their ENR"); + } + + // get futures for requesting the Enrs associated to these multiaddr and wait for their + // completion + let mut fut_coll = network_config + .boot_nodes_multiaddr + .iter() + .map(|addr| addr.to_string()) + // request the ENR for this multiaddr and keep the original for logging + .map(|addr| { + futures::future::join( + discv5.request_enr(addr.clone()), + futures::future::ready(addr), + ) + }) + .collect::>(); + + while let Some((result, _original_addr)) = fut_coll.next().await { + match result { + Ok(enr) => { + // TODO debug!( + // log, + // "Adding node to routing table"; + // "node_id" => %enr.node_id(), + // "peer_id" => %enr.peer_id(), + // "ip" => ?enr.ip4(), + // "udp" => ?enr.udp4(), + // "tcp" => ?enr.tcp4(), + // "quic" => ?enr.quic4() + // ); + let _ = discv5.add_enr(enr).map_err(|_e| { + // TODO error!( + // log, + // "Could not add peer to the local routing table"; + // "addr" => original_addr.to_string(), + // "error" => e.to_string(), + // ) + }); + } + Err(_e) => { + // TODO error!(log, "Error getting mapping to ENR"; "multiaddr" => original_addr.to_string(), "error" => e.to_string()) + } + } + } + + // TODO let update_ports = UpdatePorts { + // tcp4: config.enr_tcp4_port.is_none(), + // tcp6: config.enr_tcp6_port.is_none(), + // quic4: config.enr_quic4_port.is_none(), + // quic6: config.enr_quic6_port.is_none(), + // }; + + Ok(Self { + // cached_enrs: LruCache::new(ENR_CACHE_CAPACITY), + // network_globals, + find_peer_active: false, + // queued_queries: VecDeque::with_capacity(10), + active_queries: FuturesUnordered::new(), + discv5, + event_stream, + started: !network_config.disable_discovery, + // update_ports, + // log, + // enr_dir, + // spec: Arc::new(spec.clone()), + }) } - fn on_swarm_event(&mut self, event: FromSwarm) {} + /// This adds a new `FindPeers` query to the queue if one doesn't already exist. + /// The `target_peers` parameter informs discovery to end the query once the target is found. + /// The maximum this can be is 16. + pub fn discover_peers(&mut self, target_peers: usize) { + // If the discv5 service isn't running or we are in the process of a query, don't bother queuing a new one. + if !self.started || self.find_peer_active { + return; + } + // Immediately start a FindNode query + let target_peers = std::cmp::min(FIND_NODE_QUERY_CLOSEST_PEERS, target_peers); + // TODO debug!(self.log, "Starting a peer discovery request"; "target_peers" => target_peers ); + self.find_peer_active = true; + self.start_query(QueryType::FindPeers, target_peers, |_| true); + } - fn on_connection_handler_event( + /// Search for a specified number of new peers using the underlying discovery mechanism. + /// + /// This can optionally search for peers for a given predicate. Regardless of the predicate + /// given, this will only search for peers on the same enr_fork_id as specified in the local + /// ENR. + fn start_query( &mut self, - _peer_id: PeerId, - _connection_id: ConnectionId, - _event: THandlerOutEvent, + query: QueryType, + target_peers: usize, + _additional_predicate: impl Fn(&Enr) -> bool + Send + 'static, ) { - todo!() + // let enr_fork_id = match self.local_enr().eth2() { + // Ok(v) => v, + // Err(e) => { + // // TODO crit!(self.log, "Local ENR has no fork id"; "error" => e); + // return; + // } + // }; + // predicate for finding nodes with a matching fork and valid tcp port + let eth2_fork_predicate = move |enr: &Enr| { + // `next_fork_epoch` and `next_fork_version` can be different so that + // we can connect to peers who aren't compatible with an upcoming fork. + // `fork_digest` **must** be same. + + // enr.eth2().map(|e| e.fork_digest) == Ok(enr_fork_id.fork_digest) + // && + enr.tcp4().is_some() || enr.tcp6().is_some() + }; + + // General predicate + let predicate: Box bool + Send> = + //Box::new(move |enr: &Enr| eth2_fork_predicate(enr) && additional_predicate(enr)); + Box::new(move |enr: &Enr| eth2_fork_predicate(enr)); + + // Build the future + let query_future = self + .discv5 + // Generate a random target node id. + .find_node_predicate(NodeId::random(), predicate, target_peers) + .map(|v| QueryResult { + query_type: query, + result: v, + }); + + // Add the future to active queries, to be executed. + self.active_queries.push(Box::pin(query_future)); } - fn poll( + /// Process the completed QueryResult returned from discv5. + fn process_completed_queries( &mut self, - cx: &mut Context<'_>, - ) -> Poll>> { - todo!() + query: QueryResult, + ) -> Option>> { + match query.query_type { + QueryType::FindPeers => { + self.find_peer_active = false; + match query.result { + Ok(r) if r.is_empty() => { + //debug!(self.log, "Discovery query yielded no results."); + } + Ok(r) => { + // debug!(self.log, "Discovery query completed"; "peers_found" => r.len()); + let results = r + .into_iter() + .map(|enr| { + // cache the found ENR's + //self.cached_enrs.put(enr.peer_id(), enr.clone()); + (enr, None) + }) + .collect(); + return Some(results); + } + Err(_e) => { + //warn!(self.log, "Discovery query failed"; "error" => %e); + } + } + } + _ => {} + } + None + } + + /// Drives the queries returning any results from completed queries. + fn poll_queries(&mut self, cx: &mut Context) -> Option>> { + while let Poll::Ready(Some(query_result)) = self.active_queries.poll_next_unpin(cx) { + let result = self.process_completed_queries(query_result); + if result.is_some() { + return result; + } + } + None + } + +} + +impl NetworkBehaviour for Discovery { + // Discovery is not a real NetworkBehaviour... + type ConnectionHandler = ConnectionHandler; + type ToSwarm = DiscoveredPeers; + + fn handle_established_inbound_connection(&mut self, _connection_id: ConnectionId, _peer: PeerId, _local_addr: &Multiaddr, _remote_addr: &Multiaddr) -> Result, ConnectionDenied> { + Ok(ConnectionHandler) + } + + fn handle_established_outbound_connection(&mut self, _connection_id: ConnectionId, _peer: PeerId, _addr: &Multiaddr, _role_override: Endpoint, _port_use: PortUse) -> Result, ConnectionDenied> { + Ok(ConnectionHandler) + } + + fn on_swarm_event(&mut self, _event: FromSwarm) { + + } + + fn on_connection_handler_event(&mut self, _peer_id: PeerId, _connection_id: ConnectionId, _event: THandlerOutEvent) { + } + + fn poll(&mut self, cx: &mut Context<'_>) -> Poll>> { + if !self.started { + return Poll::Pending; + } + + // Process the query queue + //self.process_queue(); + + // Drive the queries and return any results from completed queries + if let Some(peers) = self.poll_queries(cx) { + // return the result to the peer manager + return Poll::Ready(ToSwarm::GenerateEvent(DiscoveredPeers { peers })); + } + Poll::Pending } } /// Builds a anchor ENR given a `network::Config`. -pub fn build_enr(enr_key: &CombinedKey, config: &Config) -> Result { +pub fn build_enr( + enr_key: &CombinedKey, + config: &Config, +) -> Result { let mut builder = discv5::enr::Enr::builder(); let (maybe_ipv4_address, maybe_ipv6_address) = &config.enr_address; diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index 2b039ffc..04350beb 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -1,29 +1,27 @@ -use crate::behaviour::AnchorBehaviour; -use crate::behaviour::AnchorBehaviourEvent; -use crate::discovery::{build_enr, Discovery}; -use crate::keypair_utils::load_private_key; -use crate::transport::build_transport; -use crate::Config; -use discv5::enr::Enr; -use discv5::Discv5; +use std::num::{NonZeroU8, NonZeroUsize}; +use std::pin::Pin; +use std::time::Duration; + use futures::StreamExt; +use libp2p::{futures, gossipsub, identify, PeerId, ping, Swarm, SwarmBuilder}; use libp2p::core::muxing::StreamMuxerBox; use libp2p::core::transport::Boxed; use libp2p::gossipsub::{MessageAuthenticity, ValidationMode}; use libp2p::identity::Keypair; use libp2p::multiaddr::Protocol; use libp2p::swarm::SwarmEvent; -use libp2p::{futures, gossipsub, identify, ping, PeerId, Swarm, SwarmBuilder}; -use lighthouse_network::discovery::CombinedKey; +use lighthouse_network::discovery::DiscoveredPeers; use lighthouse_network::discv5::enr::k256::sha2::{Digest, Sha256}; -use lighthouse_network::CombinedKeyExt; -use std::net::Ipv4Addr; -use std::num::{NonZeroU8, NonZeroUsize}; -use std::pin::Pin; -use std::time::Duration; use task_executor::TaskExecutor; use tracing::{info, log}; +use crate::behaviour::AnchorBehaviour; +use crate::behaviour::AnchorBehaviourEvent; +use crate::Config; +use crate::discovery::{Discovery, FIND_NODE_QUERY_CLOSEST_PEERS}; +use crate::keypair_utils::load_private_key; +use crate::transport::build_transport; + pub struct Network { swarm: Swarm, peer_id: PeerId, @@ -35,7 +33,7 @@ impl Network { pub async fn try_new(config: &Config, executor: TaskExecutor) -> Result { let local_keypair: Keypair = load_private_key(&config.network_dir); let transport = build_transport(local_keypair.clone(), !config.disable_quic_support); - let behaviour = build_anchor_behaviour(local_keypair.clone(), config); + let behaviour = build_anchor_behaviour(local_keypair.clone(), config).await; let peer_id = local_keypair.public().to_peer_id(); let mut network = Network { @@ -91,12 +89,21 @@ impl Network { AnchorBehaviourEvent::Gossipsub(_ge) => { // TODO handle gossipsub events }, + // Inform the peer manager about discovered peers. + // + // The peer manager will subsequently decide which peers need to be dialed and then dial + // them. + AnchorBehaviourEvent::Discovery(DiscoveredPeers { peers }) => { + //self.peer_manager_mut().peers_discovered(peers); + log::debug!("Discovered peers: {:?}", peers); + } // TODO handle other behaviour events _ => { log::debug!("Unhandled behaviour event: {:?}", behaviour_event); } }, // TODO handle other swarm events + SwarmEvent::NewListenAddr { .. } => {}, _ => { log::debug!("Unhandled swarm event: {:?}", swarm_message); } @@ -108,7 +115,7 @@ impl Network { } } -fn build_anchor_behaviour(local_keypair: Keypair, network_config: &Config) -> AnchorBehaviour { +async fn build_anchor_behaviour(local_keypair: Keypair, network_config: &Config) -> AnchorBehaviour { // TODO setup discv5 let identify = { let local_public_key = local_keypair.public(); @@ -149,21 +156,22 @@ fn build_anchor_behaviour(local_keypair: Keypair, network_config: &Config) -> An gossipsub::Behaviour::new(MessageAuthenticity::Signed(local_keypair.clone()), config) .unwrap(); - let discv5_listen_config = discv5::ListenConfig::from_ip(Ipv4Addr::UNSPECIFIED.into(), 9000); - - // discv5 configuration - let discv5_config = discv5::ConfigBuilder::new(discv5_listen_config).build(); - - // convert the keypair into an ENR key - let enr_key: CombinedKey = CombinedKey::from_libp2p(local_keypair).unwrap(); - let enr = build_enr(&enr_key, network_config).unwrap(); - let discv5 = Discv5::new(enr, enr_key, discv5_config).unwrap(); + let discovery = { + // Build and start the discovery sub-behaviour + let mut discovery = Discovery::new( + local_keypair.clone(), + &network_config, + ).await.unwrap(); + // start searching for peers + discovery.discover_peers(FIND_NODE_QUERY_CLOSEST_PEERS); + discovery + }; AnchorBehaviour { identify, ping: ping::Behaviour::default(), gossipsub, - discovery: Discovery { discv5 }, + discovery, } } @@ -222,10 +230,11 @@ fn build_swarm( #[cfg(test)] mod test { - use crate::network::Network; - use crate::Config; use task_executor::TaskExecutor; + use crate::Config; + use crate::network::Network; + #[tokio::test] async fn create_network() { let handle = tokio::runtime::Handle::current(); From 2ca416704aeb0880dc95947bbc4cb6174524b527 Mon Sep 17 00:00:00 2001 From: diego Date: Tue, 17 Dec 2024 00:08:52 +0100 Subject: [PATCH 11/23] cargo fmt --- anchor/network/src/discovery.rs | 67 ++++++++++++++++++++------------- anchor/network/src/lib.rs | 2 +- anchor/network/src/network.rs | 18 +++++---- 3 files changed, 52 insertions(+), 35 deletions(-) diff --git a/anchor/network/src/discovery.rs b/anchor/network/src/discovery.rs index 998583f3..1e68395a 100644 --- a/anchor/network/src/discovery.rs +++ b/anchor/network/src/discovery.rs @@ -5,20 +5,23 @@ use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Instant; -use discv5::{Discv5, Enr}; use discv5::enr::{CombinedKey, NodeId}; use discv5::libp2p_identity::{Keypair, PeerId}; use discv5::multiaddr::Multiaddr; -use futures::{StreamExt, TryFutureExt}; -use futures::FutureExt; +use discv5::{Discv5, Enr}; use futures::stream::FuturesUnordered; -use libp2p::core::Endpoint; +use futures::FutureExt; +use futures::{StreamExt, TryFutureExt}; use libp2p::core::transport::PortUse; -use libp2p::swarm::{ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm}; +use libp2p::core::Endpoint; use libp2p::swarm::dummy::ConnectionHandler; -use lighthouse_network::{CombinedKeyExt, Subnet}; -use lighthouse_network::discovery::DiscoveredPeers; +use libp2p::swarm::{ + ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent, + THandlerOutEvent, ToSwarm, +}; use lighthouse_network::discovery::enr_ext::{QUIC6_ENR_KEY, QUIC_ENR_KEY}; +use lighthouse_network::discovery::DiscoveredPeers; +use lighthouse_network::{CombinedKeyExt, Subnet}; use tokio::sync::mpsc; use crate::Config; @@ -86,11 +89,7 @@ pub struct Discovery { } impl Discovery { - pub async fn new( - local_keypair: Keypair, - network_config: &Config, - ) -> Result { - + pub async fn new(local_keypair: Keypair, network_config: &Config) -> Result { let _enr_dir = match network_config.network_dir.to_str() { Some(path) => String::from(path), None => String::from(""), @@ -101,7 +100,8 @@ impl Discovery { // "quic4" => ?local_enr.quic4(), "quic6" => ?local_enr.quic6() // ); - let discv5_listen_config = discv5::ListenConfig::from_ip(Ipv4Addr::UNSPECIFIED.into(), 9000); + let discv5_listen_config = + discv5::ListenConfig::from_ip(Ipv4Addr::UNSPECIFIED.into(), 9000); // discv5 configuration let discv5_config = discv5::ConfigBuilder::new(discv5_listen_config).build(); @@ -296,7 +296,7 @@ impl Discovery { //debug!(self.log, "Discovery query yielded no results."); } Ok(r) => { - // debug!(self.log, "Discovery query completed"; "peers_found" => r.len()); + // debug!(self.log, "Discovery query completed"; "peers_found" => r.len()); let results = r .into_iter() .map(|enr| { @@ -327,7 +327,6 @@ impl Discovery { } None } - } impl NetworkBehaviour for Discovery { @@ -335,22 +334,41 @@ impl NetworkBehaviour for Discovery { type ConnectionHandler = ConnectionHandler; type ToSwarm = DiscoveredPeers; - fn handle_established_inbound_connection(&mut self, _connection_id: ConnectionId, _peer: PeerId, _local_addr: &Multiaddr, _remote_addr: &Multiaddr) -> Result, ConnectionDenied> { + fn handle_established_inbound_connection( + &mut self, + _connection_id: ConnectionId, + _peer: PeerId, + _local_addr: &Multiaddr, + _remote_addr: &Multiaddr, + ) -> Result, ConnectionDenied> { Ok(ConnectionHandler) } - fn handle_established_outbound_connection(&mut self, _connection_id: ConnectionId, _peer: PeerId, _addr: &Multiaddr, _role_override: Endpoint, _port_use: PortUse) -> Result, ConnectionDenied> { + fn handle_established_outbound_connection( + &mut self, + _connection_id: ConnectionId, + _peer: PeerId, + _addr: &Multiaddr, + _role_override: Endpoint, + _port_use: PortUse, + ) -> Result, ConnectionDenied> { Ok(ConnectionHandler) } - fn on_swarm_event(&mut self, _event: FromSwarm) { + fn on_swarm_event(&mut self, _event: FromSwarm) {} + fn on_connection_handler_event( + &mut self, + _peer_id: PeerId, + _connection_id: ConnectionId, + _event: THandlerOutEvent, + ) { } - fn on_connection_handler_event(&mut self, _peer_id: PeerId, _connection_id: ConnectionId, _event: THandlerOutEvent) { - } - - fn poll(&mut self, cx: &mut Context<'_>) -> Poll>> { + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { if !self.started { return Poll::Pending; } @@ -368,10 +386,7 @@ impl NetworkBehaviour for Discovery { } /// Builds a anchor ENR given a `network::Config`. -pub fn build_enr( - enr_key: &CombinedKey, - config: &Config, -) -> Result { +pub fn build_enr(enr_key: &CombinedKey, config: &Config) -> Result { let mut builder = discv5::enr::Enr::builder(); let (maybe_ipv4_address, maybe_ipv6_address) = &config.enr_address; diff --git a/anchor/network/src/lib.rs b/anchor/network/src/lib.rs index 0d1b2cf7..777af44e 100644 --- a/anchor/network/src/lib.rs +++ b/anchor/network/src/lib.rs @@ -12,4 +12,4 @@ pub use config::Config; pub use lighthouse_network::{ListenAddr, ListenAddress}; pub use network::Network; -pub type Enr = discv5::enr::Enr; \ No newline at end of file +pub type Enr = discv5::enr::Enr; diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index 04350beb..b4565976 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -3,13 +3,13 @@ use std::pin::Pin; use std::time::Duration; use futures::StreamExt; -use libp2p::{futures, gossipsub, identify, PeerId, ping, Swarm, SwarmBuilder}; use libp2p::core::muxing::StreamMuxerBox; use libp2p::core::transport::Boxed; use libp2p::gossipsub::{MessageAuthenticity, ValidationMode}; use libp2p::identity::Keypair; use libp2p::multiaddr::Protocol; use libp2p::swarm::SwarmEvent; +use libp2p::{futures, gossipsub, identify, ping, PeerId, Swarm, SwarmBuilder}; use lighthouse_network::discovery::DiscoveredPeers; use lighthouse_network::discv5::enr::k256::sha2::{Digest, Sha256}; use task_executor::TaskExecutor; @@ -17,10 +17,10 @@ use tracing::{info, log}; use crate::behaviour::AnchorBehaviour; use crate::behaviour::AnchorBehaviourEvent; -use crate::Config; use crate::discovery::{Discovery, FIND_NODE_QUERY_CLOSEST_PEERS}; use crate::keypair_utils::load_private_key; use crate::transport::build_transport; +use crate::Config; pub struct Network { swarm: Swarm, @@ -115,7 +115,10 @@ impl Network { } } -async fn build_anchor_behaviour(local_keypair: Keypair, network_config: &Config) -> AnchorBehaviour { +async fn build_anchor_behaviour( + local_keypair: Keypair, + network_config: &Config, +) -> AnchorBehaviour { // TODO setup discv5 let identify = { let local_public_key = local_keypair.public(); @@ -158,10 +161,9 @@ async fn build_anchor_behaviour(local_keypair: Keypair, network_config: &Config) let discovery = { // Build and start the discovery sub-behaviour - let mut discovery = Discovery::new( - local_keypair.clone(), - &network_config, - ).await.unwrap(); + let mut discovery = Discovery::new(local_keypair.clone(), &network_config) + .await + .unwrap(); // start searching for peers discovery.discover_peers(FIND_NODE_QUERY_CLOSEST_PEERS); discovery @@ -232,8 +234,8 @@ fn build_swarm( mod test { use task_executor::TaskExecutor; - use crate::Config; use crate::network::Network; + use crate::Config; #[tokio::test] async fn create_network() { From cc5004e4dc6efa3d3dc7eea001a484f4376f75d6 Mon Sep 17 00:00:00 2001 From: diego Date: Tue, 17 Dec 2024 09:39:21 +0100 Subject: [PATCH 12/23] cargo clippy --- anchor/network/src/discovery.rs | 4 +++- anchor/network/src/network.rs | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/anchor/network/src/discovery.rs b/anchor/network/src/discovery.rs index 1e68395a..e7923bc5 100644 --- a/anchor/network/src/discovery.rs +++ b/anchor/network/src/discovery.rs @@ -312,7 +312,9 @@ impl Discovery { } } } - _ => {} + _ => { + // TODO handle subnet queries + } } None } diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index b4565976..0c2b7961 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -161,7 +161,7 @@ async fn build_anchor_behaviour( let discovery = { // Build and start the discovery sub-behaviour - let mut discovery = Discovery::new(local_keypair.clone(), &network_config) + let mut discovery = Discovery::new(local_keypair.clone(), network_config) .await .unwrap(); // start searching for peers From e532014dedd12e9b3f37a5092745eaf1fdf4d88f Mon Sep 17 00:00:00 2001 From: diego Date: Tue, 17 Dec 2024 15:09:47 +0100 Subject: [PATCH 13/23] remove hardcoded bootnode enr --- anchor/client/src/cli.rs | 8 ++++++++ anchor/client/src/config.rs | 23 ++++++++++++++++++----- 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/anchor/client/src/cli.rs b/anchor/client/src/cli.rs index 053c86fd..944f438a 100644 --- a/anchor/client/src/cli.rs +++ b/anchor/client/src/cli.rs @@ -329,6 +329,14 @@ pub struct Anchor { help_heading = FLAG_HEADER )] help: Option, + #[clap( + long, + global = true, + value_delimiter = ',', + help = "One or more comma-delimited base64-encoded ENR's to bootstrap the p2p network", + display_order = 0 + )] + pub boot_nodes_enr: Vec, } pub fn get_color_style() -> Styles { diff --git a/anchor/client/src/config.rs b/anchor/client/src/config.rs index 01289358..8f8a12a1 100644 --- a/anchor/client/src/config.rs +++ b/anchor/client/src/config.rs @@ -2,7 +2,7 @@ // use clap_utils::{flags::DISABLE_MALLOC_TUNING_FLAG, parse_optional, parse_required}; use crate::cli::Anchor; -use network::{Enr, ListenAddr, ListenAddress}; +use network::{ListenAddr, ListenAddress}; use sensitive_url::SensitiveUrl; use serde::{Deserialize, Serialize}; use std::fs; @@ -131,11 +131,24 @@ pub fn from_cli(cli_args: &Anchor) -> Result { */ config.network.listen_addresses = parse_listening_addresses(cli_args)?; - let mut enrs: Vec = vec![]; - if let Ok(enr) = "enr:-Li4QFIQzamdvTxGJhvcXG_DFmCeyggSffDnllY5DiU47pd_K_1MRnSaJimWtfKJ-MD46jUX9TwgW5Jqe0t4pH41RYWGAYuFnlyth2F0dG5ldHOIAAAAAAAAAACEZXRoMpD1pf1CAAAAAP__________gmlkgnY0gmlwhCLdu_SJc2VjcDI1NmsxoQN4v-N9zFYwEqzGPBBX37q24QPFvAVUtokIo1fblIsmTIN0Y3CCE4uDdWRwgg-j".parse::() { - enrs.push(enr.clone()); + for addr in cli_args.boot_nodes_enr.clone() { + match addr.parse() { + Ok(enr) => config.network.boot_nodes_enr.push(enr), + Err(_) => { + // parsing as ENR failed, try as Multiaddr + // let multi: Multiaddr = addr + // .parse() + // .map_err(|_| format!("Not valid as ENR nor Multiaddr: {}", addr))?; + // if !multi.iter().any(|proto| matches!(proto, Protocol::Udp(_))) { + // slog::error!(log, "Missing UDP in Multiaddr {}", multi.to_string()); + // } + // if !multi.iter().any(|proto| matches!(proto, Protocol::P2p(_))) { + // slog::error!(log, "Missing P2P in Multiaddr {}", multi.to_string()); + // } + // multiaddrs.push(multi); + } + } } - config.network.boot_nodes_enr = enrs; config.beacon_nodes_tls_certs = cli_args.beacon_nodes_tls_certs.clone(); config.execution_nodes_tls_certs = cli_args.execution_nodes_tls_certs.clone(); From 3080f3dc0b5834bca0a82004418f6e99ab76b9ed Mon Sep 17 00:00:00 2001 From: diego Date: Tue, 17 Dec 2024 17:47:28 +0100 Subject: [PATCH 14/23] dial discovered peers --- anchor/network/src/discovery.rs | 12 +++++++++++- anchor/network/src/network.rs | 8 ++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/anchor/network/src/discovery.rs b/anchor/network/src/discovery.rs index e7923bc5..84627c01 100644 --- a/anchor/network/src/discovery.rs +++ b/anchor/network/src/discovery.rs @@ -23,6 +23,7 @@ use lighthouse_network::discovery::enr_ext::{QUIC6_ENR_KEY, QUIC_ENR_KEY}; use lighthouse_network::discovery::DiscoveredPeers; use lighthouse_network::{CombinedKeyExt, Subnet}; use tokio::sync::mpsc; +use tracing::log; use crate::Config; @@ -357,7 +358,16 @@ impl NetworkBehaviour for Discovery { Ok(ConnectionHandler) } - fn on_swarm_event(&mut self, _event: FromSwarm) {} + fn on_swarm_event(&mut self, event: FromSwarm) { + match event { + FromSwarm::ConnectionEstablished(c) => { + log::debug!("Connection established: {:?}", c); + } + _ => { + // TODO handle other events + } + } + } fn on_connection_handler_event( &mut self, diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index 0c2b7961..9bcc6e6e 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -22,6 +22,8 @@ use crate::keypair_utils::load_private_key; use crate::transport::build_transport; use crate::Config; +use lighthouse_network::EnrExt; + pub struct Network { swarm: Swarm, peer_id: PeerId, @@ -96,6 +98,12 @@ impl Network { AnchorBehaviourEvent::Discovery(DiscoveredPeers { peers }) => { //self.peer_manager_mut().peers_discovered(peers); log::debug!("Discovered peers: {:?}", peers); + for (enr, _) in peers { + for tcp in enr.multiaddr_tcp() { + log::debug!("Dialing peer: {:?}", tcp); + self.swarm.dial(tcp).unwrap(); + } + } } // TODO handle other behaviour events _ => { From c091041b9cf4a7b1eecb8201e04ab7e99fe8eeda Mon Sep 17 00:00:00 2001 From: diego Date: Tue, 17 Dec 2024 18:29:59 +0100 Subject: [PATCH 15/23] remove event --- anchor/network/src/network.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index 9bcc6e6e..8b632222 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -111,7 +111,6 @@ impl Network { } }, // TODO handle other swarm events - SwarmEvent::NewListenAddr { .. } => {}, _ => { log::debug!("Unhandled swarm event: {:?}", swarm_message); } From 2cd0e4338e28f8cd7ed11d26854be8dd4554af8f Mon Sep 17 00:00:00 2001 From: diego Date: Tue, 17 Dec 2024 19:11:46 +0100 Subject: [PATCH 16/23] uncomment logs --- anchor/network/src/discovery.rs | 101 ++++++++++++++++---------------- 1 file changed, 52 insertions(+), 49 deletions(-) diff --git a/anchor/network/src/discovery.rs b/anchor/network/src/discovery.rs index 84627c01..761aa583 100644 --- a/anchor/network/src/discovery.rs +++ b/anchor/network/src/discovery.rs @@ -23,7 +23,9 @@ use lighthouse_network::discovery::enr_ext::{QUIC6_ENR_KEY, QUIC_ENR_KEY}; use lighthouse_network::discovery::DiscoveredPeers; use lighthouse_network::{CombinedKeyExt, Subnet}; use tokio::sync::mpsc; -use tracing::log; +use tracing::{debug, error, warn}; + +use lighthouse_network::EnrExt; use crate::Config; @@ -96,9 +98,10 @@ impl Discovery { None => String::from(""), }; - // TODO info!(log, "ENR Initialised"; "enr" => local_enr.to_base64(), "seq" => local_enr.seq(), "id"=> %local_enr.node_id(), - // "ip4" => ?local_enr.ip4(), "udp4"=> ?local_enr.udp4(), "tcp4" => ?local_enr.tcp4(), "tcp6" => ?local_enr.tcp6(), "udp6" => ?local_enr.udp6(), - // "quic4" => ?local_enr.quic4(), "quic6" => ?local_enr.quic6() + // info!("enr" = local_enr.to_base64(), "seq" = local_enr.seq(), "id" = local_enr.node_id(), + // "ip4" = ?local_enr.ip4(), "udp4" = ?local_enr.udp4(), "tcp4" = ?local_enr.tcp4(), "tcp6" = ?local_enr.tcp6(), "udp6" = ?local_enr.udp6(), + // "quic4" = ?local_enr.quic4(), "quic6" = ?local_enr.quic6(), + // "ENR Initialised" // ); let discv5_listen_config = @@ -120,32 +123,30 @@ impl Discovery { // // If we are a boot node, ignore adding it to the routing table // continue; // } - // TODO debug!( - // log, - // "Adding node to routing table"; - // "node_id" => %bootnode_enr.node_id(), - // "peer_id" => %bootnode_enr.peer_id(), - // "ip" => ?bootnode_enr.ip4(), - // "udp" => ?bootnode_enr.udp4(), - // "tcp" => ?bootnode_enr.tcp4(), - // "quic" => ?bootnode_enr.quic4() - // ); - - //let repr = bootnode_enr.to_string(); - let _ = discv5.add_enr(bootnode_enr).map_err(|_e| { - // TODO error!( - // log, - // "Could not add peer to the local routing table"; - // "addr" => repr, - // "error" => e.to_string(), - // ) + debug!( + node_id = %bootnode_enr.node_id(), + peer_id = %bootnode_enr.peer_id(), + ip = ?bootnode_enr.ip4(), + udp = ?bootnode_enr.udp4(), + tcp = ?bootnode_enr.tcp4(), + quic = ?bootnode_enr.quic4(), + "Adding node to routing table", + ); + + let repr = bootnode_enr.to_string(); + let _ = discv5.add_enr(bootnode_enr).map_err(|e| { + error!( + addr = repr, + error = e.to_string(), + "Could not add peer to the local routing table" + ) }); } // Start the discv5 service and obtain an event stream let event_stream = if !network_config.disable_discovery { discv5.start().map_err(|e| e.to_string()).await?; - // TODO debug!(log, "Discovery service started"); + debug!("Discovery service started"); EventStream::Awaiting(Box::pin(discv5.event_stream())) } else { EventStream::InActive @@ -170,30 +171,32 @@ impl Discovery { }) .collect::>(); - while let Some((result, _original_addr)) = fut_coll.next().await { + while let Some((result, original_addr)) = fut_coll.next().await { match result { Ok(enr) => { - // TODO debug!( - // log, - // "Adding node to routing table"; - // "node_id" => %enr.node_id(), - // "peer_id" => %enr.peer_id(), - // "ip" => ?enr.ip4(), - // "udp" => ?enr.udp4(), - // "tcp" => ?enr.tcp4(), - // "quic" => ?enr.quic4() - // ); - let _ = discv5.add_enr(enr).map_err(|_e| { - // TODO error!( - // log, - // "Could not add peer to the local routing table"; - // "addr" => original_addr.to_string(), - // "error" => e.to_string(), - // ) + debug!( + node_id = %enr.node_id(), + peer_id = %enr.peer_id(), + ip = ?enr.ip4(), + udp = ?enr.udp4(), + tcp = ?enr.tcp4(), + quic = ?enr.quic4(), + "Adding node to routing table" + ); + let _ = discv5.add_enr(enr).map_err(|e| { + error!( + addr = original_addr.to_string(), + error = e.to_string(), + "Could not add peer to the local routing table" + ) }); } - Err(_e) => { - // TODO error!(log, "Error getting mapping to ENR"; "multiaddr" => original_addr.to_string(), "error" => e.to_string()) + Err(e) => { + error!( + multiaddr = original_addr.to_string(), + error = e.to_string(), + "Error getting mapping to ENR" + ) } } } @@ -294,10 +297,10 @@ impl Discovery { self.find_peer_active = false; match query.result { Ok(r) if r.is_empty() => { - //debug!(self.log, "Discovery query yielded no results."); + debug!("Discovery query yielded no results."); } Ok(r) => { - // debug!(self.log, "Discovery query completed"; "peers_found" => r.len()); + debug!(peers_found = r.len(), "Discovery query completed"); let results = r .into_iter() .map(|enr| { @@ -308,8 +311,8 @@ impl Discovery { .collect(); return Some(results); } - Err(_e) => { - //warn!(self.log, "Discovery query failed"; "error" => %e); + Err(e) => { + warn!(error = %e, "Discovery query failed"); } } } @@ -361,7 +364,7 @@ impl NetworkBehaviour for Discovery { fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::ConnectionEstablished(c) => { - log::debug!("Connection established: {:?}", c); + debug!("Connection established: {:?}", c); } _ => { // TODO handle other events From afcfe99f4442abf14e87d3d165c91f1b9d80585d Mon Sep 17 00:00:00 2001 From: diego Date: Tue, 17 Dec 2024 19:38:51 +0100 Subject: [PATCH 17/23] simplify error handling --- anchor/network/src/discovery.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/anchor/network/src/discovery.rs b/anchor/network/src/discovery.rs index 761aa583..5a4f8e5f 100644 --- a/anchor/network/src/discovery.rs +++ b/anchor/network/src/discovery.rs @@ -134,13 +134,13 @@ impl Discovery { ); let repr = bootnode_enr.to_string(); - let _ = discv5.add_enr(bootnode_enr).map_err(|e| { + if let Err(e) = discv5.add_enr(bootnode_enr) { error!( addr = repr, error = e.to_string(), "Could not add peer to the local routing table" ) - }); + }; } // Start the discv5 service and obtain an event stream From 14fc072ef58eeba785ee2a4879092072e7c1ef92 Mon Sep 17 00:00:00 2001 From: diego Date: Wed, 18 Dec 2024 12:54:19 +0100 Subject: [PATCH 18/23] TODO handle local enr --- anchor/network/src/discovery.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/anchor/network/src/discovery.rs b/anchor/network/src/discovery.rs index 5a4f8e5f..ad0a45e9 100644 --- a/anchor/network/src/discovery.rs +++ b/anchor/network/src/discovery.rs @@ -98,11 +98,7 @@ impl Discovery { None => String::from(""), }; - // info!("enr" = local_enr.to_base64(), "seq" = local_enr.seq(), "id" = local_enr.node_id(), - // "ip4" = ?local_enr.ip4(), "udp4" = ?local_enr.udp4(), "tcp4" = ?local_enr.tcp4(), "tcp6" = ?local_enr.tcp6(), "udp6" = ?local_enr.udp6(), - // "quic4" = ?local_enr.quic4(), "quic6" = ?local_enr.quic6(), - // "ENR Initialised" - // ); + // TODO handle local enr let discv5_listen_config = discv5::ListenConfig::from_ip(Ipv4Addr::UNSPECIFIED.into(), 9000); From ae8d4a93926181fd199ad256b445d16239e3f823 Mon Sep 17 00:00:00 2001 From: diego Date: Thu, 19 Dec 2024 12:47:22 +0100 Subject: [PATCH 19/23] add ssv predicate --- anchor/network/src/discovery.rs | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/anchor/network/src/discovery.rs b/anchor/network/src/discovery.rs index ad0a45e9..74640fb2 100644 --- a/anchor/network/src/discovery.rs +++ b/anchor/network/src/discovery.rs @@ -253,21 +253,20 @@ impl Discovery { // return; // } // }; - // predicate for finding nodes with a matching fork and valid tcp port - let eth2_fork_predicate = move |enr: &Enr| { - // `next_fork_epoch` and `next_fork_version` can be different so that - // we can connect to peers who aren't compatible with an upcoming fork. - // `fork_digest` **must** be same. - - // enr.eth2().map(|e| e.fork_digest) == Ok(enr_fork_id.fork_digest) - // && - enr.tcp4().is_some() || enr.tcp6().is_some() + + // predicate for finding ssv nodes with a valid tcp port + let ssv_node_predicate = move |enr: &Enr| { + if let Some(Ok(is_ssv)) = enr.get_decodable("ssv") { + is_ssv && enr.tcp4().is_some() || enr.tcp6().is_some() + } else { + false + } }; // General predicate let predicate: Box bool + Send> = //Box::new(move |enr: &Enr| eth2_fork_predicate(enr) && additional_predicate(enr)); - Box::new(move |enr: &Enr| eth2_fork_predicate(enr)); + Box::new(move |enr: &Enr| ssv_node_predicate(enr)); // Build the future let query_future = self From 92c046890238d279fae0160aeacd68c9b1b6f5f0 Mon Sep 17 00:00:00 2001 From: diegomrsantos Date: Mon, 23 Dec 2024 12:56:00 +0100 Subject: [PATCH 20/23] Update anchor/network/src/network.rs Co-authored-by: jking-aus <72330194+jking-aus@users.noreply.github.com> --- anchor/network/src/network.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index 8b632222..d6445c98 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -101,7 +101,9 @@ impl Network { for (enr, _) in peers { for tcp in enr.multiaddr_tcp() { log::debug!("Dialing peer: {:?}", tcp); - self.swarm.dial(tcp).unwrap(); +if let Err(e) = self.swarm.dial(tcp.clone()) { + log::error!("Error dialing peer {}: {}", tcp, e); + } } } } From a32363596f22bb1435426f0106302991f8c61d75 Mon Sep 17 00:00:00 2001 From: diegomrsantos Date: Mon, 23 Dec 2024 12:56:12 +0100 Subject: [PATCH 21/23] Update anchor/network/src/network.rs Co-authored-by: jking-aus <72330194+jking-aus@users.noreply.github.com> --- anchor/network/src/network.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index d6445c98..ec13ae73 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -100,7 +100,7 @@ impl Network { log::debug!("Discovered peers: {:?}", peers); for (enr, _) in peers { for tcp in enr.multiaddr_tcp() { - log::debug!("Dialing peer: {:?}", tcp); + log::trace!("Dialing peer: {:?}", tcp); if let Err(e) = self.swarm.dial(tcp.clone()) { log::error!("Error dialing peer {}: {}", tcp, e); } From a8b5f52c5752037bd1f2c743aeb76be3fd2e35d4 Mon Sep 17 00:00:00 2001 From: diego Date: Mon, 23 Dec 2024 12:59:28 +0100 Subject: [PATCH 22/23] cargo fmt --- anchor/network/src/network.rs | 62 +++++++++++++++++------------------ 1 file changed, 31 insertions(+), 31 deletions(-) diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index ec13ae73..930283e8 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -85,41 +85,41 @@ impl Network { pub async fn run(mut self) { loop { tokio::select! { - swarm_message = self.swarm.select_next_some() => { - match swarm_message { - SwarmEvent::Behaviour(behaviour_event) => match behaviour_event { - AnchorBehaviourEvent::Gossipsub(_ge) => { - // TODO handle gossipsub events - }, - // Inform the peer manager about discovered peers. - // - // The peer manager will subsequently decide which peers need to be dialed and then dial - // them. - AnchorBehaviourEvent::Discovery(DiscoveredPeers { peers }) => { - //self.peer_manager_mut().peers_discovered(peers); - log::debug!("Discovered peers: {:?}", peers); - for (enr, _) in peers { - for tcp in enr.multiaddr_tcp() { - log::trace!("Dialing peer: {:?}", tcp); -if let Err(e) = self.swarm.dial(tcp.clone()) { - log::error!("Error dialing peer {}: {}", tcp, e); - } + swarm_message = self.swarm.select_next_some() => { + match swarm_message { + SwarmEvent::Behaviour(behaviour_event) => match behaviour_event { + AnchorBehaviourEvent::Gossipsub(_ge) => { + // TODO handle gossipsub events + }, + // Inform the peer manager about discovered peers. + // + // The peer manager will subsequently decide which peers need to be dialed and then dial + // them. + AnchorBehaviourEvent::Discovery(DiscoveredPeers { peers }) => { + //self.peer_manager_mut().peers_discovered(peers); + log::debug!("Discovered peers: {:?}", peers); + for (enr, _) in peers { + for tcp in enr.multiaddr_tcp() { + log::trace!("Dialing peer: {:?}", tcp); + if let Err(e) = self.swarm.dial(tcp.clone()) { + log::error!("Error dialing peer {}: {}", tcp, e); + } + } + } + } + // TODO handle other behaviour events + _ => { + log::debug!("Unhandled behaviour event: {:?}", behaviour_event); + } + }, + // TODO handle other swarm events + _ => { + log::debug!("Unhandled swarm event: {:?}", swarm_message); } } } - // TODO handle other behaviour events - _ => { - log::debug!("Unhandled behaviour event: {:?}", behaviour_event); - } - }, - // TODO handle other swarm events - _ => { - log::debug!("Unhandled swarm event: {:?}", swarm_message); + // TODO match input channels } - } - } - // TODO match input channels - } } } } From 24fa59aa2a0195f4251705ad01ef8feeadd517e9 Mon Sep 17 00:00:00 2001 From: diego Date: Mon, 23 Dec 2024 22:26:53 +0100 Subject: [PATCH 23/23] fix formatting --- anchor/network/src/network.rs | 60 +++++++++++++++++------------------ 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index 930283e8..803438f7 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -85,41 +85,41 @@ impl Network { pub async fn run(mut self) { loop { tokio::select! { - swarm_message = self.swarm.select_next_some() => { - match swarm_message { - SwarmEvent::Behaviour(behaviour_event) => match behaviour_event { - AnchorBehaviourEvent::Gossipsub(_ge) => { - // TODO handle gossipsub events - }, - // Inform the peer manager about discovered peers. - // - // The peer manager will subsequently decide which peers need to be dialed and then dial - // them. - AnchorBehaviourEvent::Discovery(DiscoveredPeers { peers }) => { - //self.peer_manager_mut().peers_discovered(peers); - log::debug!("Discovered peers: {:?}", peers); - for (enr, _) in peers { - for tcp in enr.multiaddr_tcp() { - log::trace!("Dialing peer: {:?}", tcp); - if let Err(e) = self.swarm.dial(tcp.clone()) { - log::error!("Error dialing peer {}: {}", tcp, e); - } - } - } + swarm_message = self.swarm.select_next_some() => { + match swarm_message { + SwarmEvent::Behaviour(behaviour_event) => match behaviour_event { + AnchorBehaviourEvent::Gossipsub(_ge) => { + // TODO handle gossipsub events + }, + // Inform the peer manager about discovered peers. + // + // The peer manager will subsequently decide which peers need to be dialed and then dial + // them. + AnchorBehaviourEvent::Discovery(DiscoveredPeers { peers }) => { + //self.peer_manager_mut().peers_discovered(peers); + log::debug!("Discovered peers: {:?}", peers); + for (enr, _) in peers { + for tcp in enr.multiaddr_tcp() { + log::trace!("Dialing peer: {:?}", tcp); + if let Err(e) = self.swarm.dial(tcp.clone()) { + log::error!("Error dialing peer {}: {}", tcp, e); } - // TODO handle other behaviour events - _ => { - log::debug!("Unhandled behaviour event: {:?}", behaviour_event); - } - }, - // TODO handle other swarm events - _ => { - log::debug!("Unhandled swarm event: {:?}", swarm_message); } } } - // TODO match input channels + // TODO handle other behaviour events + _ => { + log::debug!("Unhandled behaviour event: {:?}", behaviour_event); + } + }, + // TODO handle other swarm events + _ => { + log::debug!("Unhandled swarm event: {:?}", swarm_message); } + } + } + // TODO match input channels + } } } }