diff --git a/crates/net/discv5/src/enr.rs b/crates/net/discv5/src/enr.rs index 502233794248..b810c1dc63e6 100644 --- a/crates/net/discv5/src/enr.rs +++ b/crates/net/discv5/src/enr.rs @@ -1,7 +1,8 @@ //! Interface between node identification on protocol version 5 and 4. Specifically, between types //! [`discv5::enr::NodeId`] and [`PeerId`]. -use discv5::enr::{CombinedPublicKey, Enr, EnrPublicKey, NodeId}; +use discv5::enr::{CombinedPublicKey, EnrPublicKey, NodeId}; +use enr::Enr; use reth_primitives::{id2pk, pk2id, PeerId}; use secp256k1::{PublicKey, SecretKey}; diff --git a/crates/net/discv5/src/error.rs b/crates/net/discv5/src/error.rs index 96929b793634..7e4fa8653009 100644 --- a/crates/net/discv5/src/error.rs +++ b/crates/net/discv5/src/error.rs @@ -7,13 +7,13 @@ use discv5::IpMode; pub enum Error { /// Failure adding node to [`discv5::Discv5`]. #[error("failed adding node to discv5, {0}")] - AddNodeToDiscv5Failed(&'static str), + AddNodeFailed(&'static str), /// Node record has incompatible key type. #[error("incompatible key type (not secp256k1)")] IncompatibleKeyType, /// Missing key used to identify rlpx network. - #[error("fork missing on enr, 'eth' key missing")] - ForkMissing, + #[error("fork missing on enr, key missing")] + ForkMissing(&'static [u8]), /// Failed to decode [`ForkId`](reth_primitives::ForkId) rlp value. #[error("failed to decode fork id, 'eth': {0:?}")] ForkIdDecodeError(#[from] alloy_rlp::Error), @@ -30,9 +30,6 @@ pub enum Error { #[error("init failed, {0}")] InitFailure(&'static str), /// An error from underlying [`discv5::Discv5`] node. - #[error("{0}")] + #[error("sigp/discv5 error, {0}")] Discv5Error(discv5::Error), - /// An error from underlying [`discv5::Discv5`] node. - #[error("{0}")] - Discv5ErrorStr(&'static str), } diff --git a/crates/net/discv5/src/lib.rs b/crates/net/discv5/src/lib.rs index 5f0809cb207d..8a027d180cc9 100644 --- a/crates/net/discv5/src/lib.rs +++ b/crates/net/discv5/src/lib.rs @@ -18,7 +18,6 @@ use std::{ use ::enr::Enr; use alloy_rlp::Decodable; -use derive_more::Deref; use discv5::ListenConfig; use enr::{discv4_id_to_discv5_id, EnrCombinedKeyWrapper}; use futures::future::join_all; @@ -46,9 +45,8 @@ use metrics::Discv5Metrics; const MAX_LOG2_DISTANCE: usize = 255; /// Transparent wrapper around [`discv5::Discv5`]. -#[derive(Deref, Clone)] +#[derive(Clone)] pub struct Discv5 { - #[deref] /// sigp/discv5 node. discv5: Arc, /// [`IpMode`] of the the node. @@ -67,9 +65,9 @@ impl Discv5 { //////////////////////////////////////////////////////////////////////////////////////////////// /// Adds the node to the table, if it is not already present. - pub fn add_node_to_routing_table(&self, node_record: Enr) -> Result<(), Error> { + pub fn add_node(&self, node_record: Enr) -> Result<(), Error> { let EnrCombinedKeyWrapper(enr) = node_record.into(); - self.add_enr(enr).map_err(Error::AddNodeToDiscv5Failed) + self.discv5.add_enr(enr).map_err(Error::AddNodeFailed) } /// Sets the pair in the EIP-868 [`Enr`] of the node. @@ -85,7 +83,7 @@ impl Discv5 { ); return }; - if let Err(err) = self.enr_insert(key_str, &rlp) { + if let Err(err) = self.discv5.enr_insert(key_str, &rlp) { error!(target: "discv5", %err, "failed to update local enr" @@ -109,11 +107,11 @@ impl Discv5 { /// Adds the peer and id to the ban list. /// /// This will prevent any future inclusion in the table - pub fn ban_peer_by_ip_and_node_id(&self, peer_id: PeerId, ip: IpAddr) { + pub fn ban(&self, peer_id: PeerId, ip: IpAddr) { match discv4_id_to_discv5_id(peer_id) { Ok(node_id) => { - self.ban_node(&node_id, None); - self.ban_peer_by_ip(ip); + self.discv5.ban_node(&node_id, None); + self.ban_ip(ip); } Err(err) => error!(target: "discv5", %err, @@ -125,15 +123,15 @@ impl Discv5 { /// Adds the ip to the ban list. /// /// This will prevent any future inclusion in the table - pub fn ban_peer_by_ip(&self, ip: IpAddr) { - self.ban_ip(ip, None); + pub fn ban_ip(&self, ip: IpAddr) { + self.discv5.ban_ip(ip, None); } /// Returns the [`NodeRecord`] of the local node. /// /// This includes the currently tracked external IP address of the node. pub fn node_record(&self) -> NodeRecord { - let enr: Enr<_> = EnrCombinedKeyWrapper(self.local_enr()).into(); + let enr: Enr<_> = EnrCombinedKeyWrapper(self.discv5.local_enr()).into(); (&enr).try_into().unwrap() } @@ -238,7 +236,7 @@ impl Discv5 { // // 4. add boot nodes // - Self::bootstrap(bootstrap_nodes, &discv5)?; + Self::bootstrap(bootstrap_nodes, &discv5).await?; let metrics = Discv5Metrics::default(); @@ -255,7 +253,7 @@ impl Discv5 { } /// Bootstraps underlying [`discv5::Discv5`] node with configured peers. - fn bootstrap( + async fn bootstrap( bootstrap_nodes: HashSet, discv5: &Arc, ) -> Result<(), Error> { @@ -269,7 +267,7 @@ impl Discv5 { match node { BootNode::Enr(node) => { if let Err(err) = discv5.add_enr(node) { - return Err(Error::Discv5ErrorStr(err)) + return Err(Error::AddNodeFailed(err)) } } BootNode::Enode(enode) => { @@ -286,18 +284,8 @@ impl Discv5 { } } } - _ = join_all(enr_requests); - - debug!(target: "net::discv5", - nodes=format!("[{:#}]", discv5.with_kbuckets(|kbuckets| kbuckets - .write() - .iter() - .map(|peer| format!("enr: {:?}, status: {:?}", peer.node.value, peer.status)).collect::>() - ).into_iter().format(", ")), - "added boot nodes" - ); - Ok(()) + Ok(_ = join_all(enr_requests).await) } /// Backgrounds regular look up queries, in order to keep kbuckets populated. @@ -479,7 +467,8 @@ impl Discv5 { &self, enr: &discv5::enr::Enr, ) -> Result { - let mut fork_id_bytes = enr.get_raw_rlp(self.fork_id_key()).ok_or(Error::ForkMissing)?; + let key = self.fork_id_key; + let mut fork_id_bytes = enr.get_raw_rlp(key).ok_or(Error::ForkMissing(key))?; Ok(ForkId::decode(&mut fork_id_bytes)?) } @@ -491,9 +480,9 @@ impl Discv5 { /// Exposes API of [`discv5::Discv5`]. pub fn with_discv5(&self, f: F) -> R where - F: FnOnce(&Self) -> R, + F: FnOnce(&discv5::Discv5) -> R, { - f(self) + f(&self.discv5) } //////////////////////////////////////////////////////////////////////////////////////////////// @@ -617,7 +606,7 @@ mod tests { // add node_2 to discovery handle of node_1 (should add node to discv5 kbuckets) let node_2_enr_reth_compatible_ty: Enr = EnrCombinedKeyWrapper(node_2_enr.clone()).into(); - node_1.add_node_to_routing_table(node_2_enr_reth_compatible_ty).unwrap(); + node_1.add_node(node_2_enr_reth_compatible_ty).unwrap(); // verify node_2 is in KBuckets of node_1:discv5 assert!( @@ -630,21 +619,21 @@ mod tests { // verify node_1:discv5 is connected to node_2:discv5 and vv let event_2_v5 = stream_2.recv().await.unwrap(); let event_1_v5 = stream_1.recv().await.unwrap(); - matches!( + assert!(matches!( event_1_v5, discv5::Event::SessionEstablished(node, socket) if node == node_2_enr && socket == node_2_enr.udp4_socket().unwrap().into() - ); - matches!( + )); + assert!(matches!( event_2_v5, discv5::Event::SessionEstablished(node, socket) if node == node_1_enr && socket == node_1_enr.udp4_socket().unwrap().into() - ); + )); // verify node_1 is in KBuckets of node_2:discv5 let event_2_v5 = stream_2.recv().await.unwrap(); - matches!( + assert!(matches!( event_2_v5, discv5::Event::NodeInserted { node_id, replaced } if node_id == node_1_enr.node_id() && replaced.is_none() - ); + )); } #[test] diff --git a/crates/net/network/Cargo.toml b/crates/net/network/Cargo.toml index 1df8067649b8..ecae97de2ca8 100644 --- a/crates/net/network/Cargo.toml +++ b/crates/net/network/Cargo.toml @@ -29,7 +29,7 @@ reth-rpc-types.workspace = true reth-tokio-util.workspace = true # ethereum -enr = { workspace = true, features = ["rust-secp256k1"], optional = true } +enr = { workspace = true, features = ["serde", "rust-secp256k1"] } alloy-rlp.workspace = true discv5.workspace = true @@ -84,8 +84,6 @@ alloy-node-bindings.workspace = true ethers-core = { workspace = true, default-features = false } ethers-providers = { workspace = true, default-features = false, features = ["ws"] } -enr = { workspace = true, features = ["serde", "rust-secp256k1"] } - # misc serial_test.workspace = true tempfile.workspace = true @@ -96,10 +94,9 @@ criterion = { workspace = true, features = ["async_tokio", "html_reports"] } [features] default = ["serde"] -serde = ["dep:serde", "dep:humantime-serde", "secp256k1/serde", "enr?/serde", "dep:serde_json"] +serde = ["dep:serde", "dep:humantime-serde", "secp256k1/serde", "enr/serde", "dep:serde_json"] test-utils = [ "reth-provider/test-utils", - "dep:enr", "dep:tempfile", "reth-transaction-pool/test-utils", ] diff --git a/crates/net/network/src/config.rs b/crates/net/network/src/config.rs index 85812dee588b..b69730364954 100644 --- a/crates/net/network/src/config.rs +++ b/crates/net/network/src/config.rs @@ -43,12 +43,12 @@ pub struct NetworkConfig { pub boot_nodes: HashSet, /// How to set up discovery over DNS. pub dns_discovery_config: Option, + /// Address to use for discovery v4. + pub discovery_v4_addr: SocketAddr, /// How to set up discovery. pub discovery_v4_config: Option, /// How to set up discovery version 5. pub discovery_v5_config: Option, - /// Address to use for discovery - pub discovery_addr: SocketAddr, /// Address to listen for incoming connections pub listener_addr: SocketAddr, /// How to instantiate peer manager. @@ -155,10 +155,8 @@ impl NetworkConfig { } /// Sets the config to use for the discovery v5 protocol. - pub fn set_discovery_v5(mut self, discv5_config: reth_discv5::Config) -> Self { self.discovery_v5_config = Some(discv5_config); - self.discovery_addr = self.discovery_v5_config.as_ref().unwrap().discovery_socket(); self } @@ -567,7 +565,7 @@ impl NetworkConfigBuilder { dns_discovery_config, discovery_v4_config: discovery_v4_builder.map(|builder| builder.build()), discovery_v5_config: None, - discovery_addr: discovery_addr.unwrap_or(DEFAULT_DISCOVERY_ADDRESS), + discovery_v4_addr: discovery_addr.unwrap_or(DEFAULT_DISCOVERY_ADDRESS), listener_addr, peers_config: peers_config.unwrap_or_default(), sessions_config: sessions_config.unwrap_or_default(), diff --git a/crates/net/network/src/discovery.rs b/crates/net/network/src/discovery.rs index 56aa6c68d9f5..c2b70b37fedf 100644 --- a/crates/net/network/src/discovery.rs +++ b/crates/net/network/src/discovery.rs @@ -1,18 +1,21 @@ //! Discovery support for the network. use crate::{ + cache::LruMap, error::{NetworkError, ServiceKind}, manager::DiscoveredEvent, }; +use enr::Enr; use futures::StreamExt; use reth_discv4::{DiscoveryUpdate, Discv4, Discv4Config, EnrForkIdEntry}; +use reth_discv5::{DiscoveredPeer, Discv5}; use reth_dns_discovery::{ DnsDiscoveryConfig, DnsDiscoveryHandle, DnsDiscoveryService, DnsNodeRecordUpdate, DnsResolver, }; use reth_primitives::{ForkId, NodeRecord, PeerId}; use secp256k1::SecretKey; use std::{ - collections::{hash_map::Entry, HashMap, VecDeque}, + collections::VecDeque, net::{IpAddr, SocketAddr}, pin::Pin, sync::Arc, @@ -20,6 +23,12 @@ use std::{ }; use tokio::{sync::mpsc, task::JoinHandle}; use tokio_stream::{wrappers::ReceiverStream, Stream}; +use tracing::trace; + +/// Default max capacity for cache of discovered peers. +/// +/// Default is 10 000 peers. +pub const DEFAULT_MAX_CAPACITY_DISCOVERED_PEERS_CACHE: u32 = 10_000; /// An abstraction over the configured discovery protocol. /// @@ -30,8 +39,8 @@ pub struct Discovery { /// All nodes discovered via discovery protocol. /// /// These nodes can be ephemeral and are updated via the discovery protocol. - discovered_nodes: HashMap, - /// Local ENR of the discovery service. + discovered_nodes: LruMap, + /// Local ENR of the discovery v4 service (discv5 ENR has same [`PeerId`]). local_enr: NodeRecord, /// Handler to interact with the Discovery v4 service discv4: Option, @@ -39,6 +48,10 @@ pub struct Discovery { discv4_updates: Option>, /// The handle to the spawned discv4 service _discv4_service: Option>, + /// Handler to interact with the Discovery v5 service + discv5: Option, + /// All KAD table updates from the discv5 service. + discv5_updates: Option>, /// Handler to interact with the DNS discovery service _dns_discovery: Option, /// Updates from the DNS discovery service. @@ -57,24 +70,42 @@ impl Discovery { /// This will spawn the [`reth_discv4::Discv4Service`] onto a new task and establish a listener /// channel to receive all discovered nodes. pub async fn new( - discovery_addr: SocketAddr, + discovery_v4_addr: SocketAddr, sk: SecretKey, discv4_config: Option, + discv5_config: Option, // contains discv5 listen address dns_discovery_config: Option, ) -> Result { // setup discv4 - let local_enr = NodeRecord::from_secret_key(discovery_addr, &sk); - let (discv4, discv4_updates, _discv4_service) = if let Some(disc_config) = discv4_config { - let (discv4, mut discv4_service) = - Discv4::bind(discovery_addr, local_enr, sk, disc_config).await.map_err(|err| { - NetworkError::from_io_error(err, ServiceKind::Discovery(discovery_addr)) - })?; - let discv4_updates = discv4_service.update_stream(); - // spawn the service - let _discv4_service = discv4_service.spawn(); - (Some(discv4), Some(discv4_updates), Some(_discv4_service)) - } else { - (None, None, None) + let local_enr = NodeRecord::from_secret_key(discovery_v4_addr, &sk); + let (discv4, discv4_updates, _discv4_service) = match discv4_config { + Some(disc_config) => { + let (discv4, mut discv4_service) = + Discv4::bind(discovery_v4_addr, local_enr, sk, disc_config).await.map_err( + |err| { + NetworkError::from_io_error( + err, + ServiceKind::Discovery(discovery_v4_addr), + ) + }, + )?; + let discv4_updates = discv4_service.update_stream(); + // spawn the service + let _discv4_service = discv4_service.spawn(); + + (Some(discv4), Some(discv4_updates), Some(_discv4_service)) + } + None => (None, None, None), + }; + + let (discv5, discv5_updates) = match discv5_config { + Some(config) => { + let (discv5, discv5_updates, _local_enr_discv5) = + Discv5::start(&sk, config).await?; + + (Some(discv5), Some(discv5_updates.into())) + } + None => (None, None), }; // setup DNS discovery @@ -97,7 +128,9 @@ impl Discovery { discv4, discv4_updates, _discv4_service, - discovered_nodes: Default::default(), + discv5, + discv5_updates, + discovered_nodes: LruMap::new(DEFAULT_MAX_CAPACITY_DISCOVERED_PEERS_CACHE), queued_events: Default::default(), _dns_disc_service, _dns_discovery, @@ -122,6 +155,7 @@ impl Discovery { // use forward-compatible forkid entry discv4.set_eip868_rlp("eth".as_bytes().to_vec(), EnrForkIdEntry::from(fork_id)) } + // todo: update discv5 enr } /// Bans the [`IpAddr`] in the discovery service. @@ -129,6 +163,9 @@ impl Discovery { if let Some(discv4) = &self.discv4 { discv4.ban_ip(ip) } + if let Some(discv5) = &self.discv5 { + discv5.ban_ip(ip) + } } /// Bans the [`PeerId`] and [`IpAddr`] in the discovery service. @@ -136,6 +173,9 @@ impl Discovery { if let Some(discv4) = &self.discv4 { discv4.ban(peer_id, ip) } + if let Some(discv5) = &self.discv5 { + discv5.ban(peer_id, ip) + } } /// Returns a shared reference to the discv4. @@ -143,9 +183,9 @@ impl Discovery { self.discv4.clone() } - /// Returns the id with which the local identifies itself in the network + /// Returns the id with which the local node identifies itself in the network pub(crate) fn local_id(&self) -> PeerId { - self.local_enr.id + self.local_enr.id // local discv4 and discv5 have same id, since signed with same secret key } /// Add a node to the discv4 table. @@ -155,19 +195,27 @@ impl Discovery { } } + /// Add a node to the discv4 table. + pub(crate) fn add_discv5_node(&self, enr: Enr) -> Result<(), NetworkError> { + if let Some(discv5) = &self.discv5 { + discv5.add_node(enr).map_err(NetworkError::Discv5Error)?; + } + + Ok(()) + } + /// Processes an incoming [NodeRecord] update from a discovery service fn on_node_record_update(&mut self, record: NodeRecord, fork_id: Option) { let id = record.id; let addr = record.tcp_addr(); - match self.discovered_nodes.entry(id) { - Entry::Occupied(_entry) => {} - Entry::Vacant(entry) => { - entry.insert(addr); + _ = + self.discovered_nodes.get_or_insert(id, || { self.queued_events.push_back(DiscoveryEvent::NewNode( DiscoveredEvent::EventQueued { peer_id: id, socket_addr: addr, fork_id }, )); - } - } + + addr + }) } fn on_discv4_update(&mut self, update: DiscoveryUpdate) { @@ -200,17 +248,37 @@ impl Discovery { return Poll::Ready(event) } - // drain the update streams + // drain the discv4 update stream while let Some(Poll::Ready(Some(update))) = self.discv4_updates.as_mut().map(|updates| updates.poll_next_unpin(cx)) { self.on_discv4_update(update) } + // drain the discv5 update stream + while let Some(Poll::Ready(Some(update))) = + self.discv5_updates.as_mut().map(|updates| updates.poll_next_unpin(cx)) + { + if let Some(discv5) = self.discv5.as_mut() { + if let Some(DiscoveredPeer { node_record, fork_id }) = + discv5.on_discv5_update(update) + { + self.on_node_record_update(node_record, fork_id); + } + } + } + + // drain the dns update stream while let Some(Poll::Ready(Some(update))) = self.dns_discovery_updates.as_mut().map(|updates| updates.poll_next_unpin(cx)) { self.add_discv4_node(update.node_record); + if let Err(err) = self.add_discv5_node(update.enr) { + trace!(target: "net::discovery", + %err, + "failed adding node discovered by dns to discv5" + ); + } self.on_node_record_update(update.node_record, update.fork_id); } @@ -239,7 +307,7 @@ impl Discovery { mpsc::unbounded_channel(); Self { - discovered_nodes: Default::default(), + discovered_nodes: LruMap::new(0), local_enr: NodeRecord { address: IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), tcp_port: 0, @@ -248,6 +316,8 @@ impl Discovery { }, discv4: Default::default(), discv4_updates: Default::default(), + discv5: None, + discv5_updates: None, queued_events: Default::default(), _discv4_service: Default::default(), _dns_discovery: None, @@ -259,7 +329,7 @@ impl Discovery { } /// Events produced by the [`Discovery`] manager. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub enum DiscoveryEvent { /// Discovered a node NewNode(DiscoveredEvent), @@ -279,9 +349,119 @@ mod tests { let mut rng = thread_rng(); let (secret_key, _) = SECP256K1.generate_keypair(&mut rng); let discovery_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)); - let _discovery = - Discovery::new(discovery_addr, secret_key, Default::default(), Default::default()) - .await - .unwrap(); + let _discovery = Discovery::new( + discovery_addr, + secret_key, + Default::default(), + None, + Default::default(), + ) + .await + .unwrap(); + } + + use reth_discv4::Discv4ConfigBuilder; + use reth_discv5::{enr::EnrCombinedKeyWrapper, enr_to_discv4_id}; + use tracing::trace; + + async fn start_discovery_node(udp_port_discv4: u16, udp_port_discv5: u16) -> Discovery { + let secret_key = SecretKey::new(&mut thread_rng()); + + let discv4_addr = format!("127.0.0.1:{udp_port_discv4}").parse().unwrap(); + let discv5_addr: SocketAddr = format!("127.0.0.1:{udp_port_discv5}").parse().unwrap(); + + // disable `NatResolver` + let discv4_config = Discv4ConfigBuilder::default().external_ip_resolver(None).build(); + + let discv5_listen_config = discv5::ListenConfig::from(discv5_addr); + let discv5_config = reth_discv5::Config::builder(0) + .discv5_config(discv5::ConfigBuilder::new(discv5_listen_config).build()) + .build(); + + Discovery::new(discv4_addr, secret_key, Some(discv4_config), Some(discv5_config), None) + .await + .expect("should build discv5 with discv4 downgrade") + } + + #[tokio::test(flavor = "multi_thread")] + async fn discv5_and_discv4_same_pk() { + reth_tracing::init_test_tracing(); + + // set up test + let mut node_1 = start_discovery_node(40014, 40015).await; + let discv4_enr_1 = node_1.discv4.as_ref().unwrap().node_record(); + let discv5_enr_node_1 = + node_1.discv5.as_ref().unwrap().with_discv5(|discv5| discv5.local_enr()); + let discv4_id_1 = discv4_enr_1.id; + let discv5_id_1 = discv5_enr_node_1.node_id(); + + let mut node_2 = start_discovery_node(40024, 40025).await; + let discv4_enr_2 = node_2.discv4.as_ref().unwrap().node_record(); + let discv5_enr_node_2 = + node_2.discv5.as_ref().unwrap().with_discv5(|discv5| discv5.local_enr()); + let discv4_id_2 = discv4_enr_2.id; + let discv5_id_2 = discv5_enr_node_2.node_id(); + + trace!(target: "net::discovery::tests", + node_1_node_id=format!("{:#}", discv5_id_1), + node_2_node_id=format!("{:#}", discv5_id_2), + "started nodes" + ); + + // test + + // assert discovery version 4 and version 5 nodes have same id + assert_eq!(discv4_id_1, enr_to_discv4_id(&discv5_enr_node_1).unwrap()); + assert_eq!(discv4_id_2, enr_to_discv4_id(&discv5_enr_node_2).unwrap()); + + // add node_2:discv4 manually to node_1:discv4 + node_1.add_discv4_node(discv4_enr_2); + + // verify node_2:discv4 discovered node_1:discv4 and vv + let event_node_1 = node_1.next().await.unwrap(); + let event_node_2 = node_2.next().await.unwrap(); + + assert_eq!( + DiscoveryEvent::NewNode(DiscoveredEvent::EventQueued { + peer_id: discv4_id_2, + socket_addr: discv4_enr_2.tcp_addr(), + fork_id: None + }), + event_node_1 + ); + assert_eq!( + DiscoveryEvent::NewNode(DiscoveredEvent::EventQueued { + peer_id: discv4_id_1, + socket_addr: discv4_enr_1.tcp_addr(), + fork_id: None + }), + event_node_2 + ); + + assert_eq!(1, node_1.discovered_nodes.len()); + assert_eq!(1, node_2.discovered_nodes.len()); + + // add node_2:discv5 to node_1:discv5, manual insertion won't emit an event + node_1.add_discv5_node(EnrCombinedKeyWrapper(discv5_enr_node_2.clone()).into()).unwrap(); + // verify node_2 is in KBuckets of node_1:discv5 + assert!(node_1 + .discv5 + .as_ref() + .unwrap() + .with_discv5(|discv5| discv5.table_entries_id().contains(&discv5_id_2))); + + // manually trigger connection from node_1:discv5 to node_2:discv5 + node_1 + .discv5 + .as_ref() + .unwrap() + .with_discv5(|discv5| discv5.send_ping(discv5_enr_node_2.clone())) + .await + .unwrap(); + + // this won't emit an event, since the nodes already discovered each other on discv4, the + // number of nodes stored for each node on this level remains 1. + assert_eq!(1, node_1.discovered_nodes.len()); + assert_eq!(1, node_2.discovered_nodes.len()); } } diff --git a/crates/net/network/src/error.rs b/crates/net/network/src/error.rs index 122107e69280..2bfa9f9c108a 100644 --- a/crates/net/network/src/error.rs +++ b/crates/net/network/src/error.rs @@ -53,6 +53,9 @@ pub enum NetworkError { /// IO error when creating the discovery service #[error("failed to launch discovery service: {0}")] Discovery(io::Error), + /// An error occurred with discovery v5 node. + #[error("discv5 error, {0}")] + Discv5Error(#[from] reth_discv5::Error), /// Error when setting up the DNS resolver failed /// /// See also [DnsResolver](reth_dns_discovery::DnsResolver::from_system_conf) diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 3877a1e00894..abcdecef6733 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -177,8 +177,9 @@ where let NetworkConfig { client, secret_key, + discovery_v4_addr, mut discovery_v4_config, - discovery_addr, + discovery_v5_config, listener_addr, peers_config, sessions_config, @@ -195,7 +196,7 @@ where tx_gossip_disabled, #[cfg(feature = "optimism")] optimism_network_config: crate::config::OptimismNetworkConfig { sequencer_endpoint }, - .. + transactions_manager_config: _, } = config; let peers_manager = PeersManager::new(peers_config); @@ -213,9 +214,14 @@ where disc_config }); - let discovery = - Discovery::new(discovery_addr, secret_key, discovery_v4_config, dns_discovery_config) - .await?; + let discovery = Discovery::new( + discovery_v4_addr, + secret_key, + discovery_v4_config, + discovery_v5_config, + dns_discovery_config, + ) + .await?; // need to retrieve the addr here since provided port could be `0` let local_peer_id = discovery.local_id(); let discv4 = discovery.discv4(); @@ -1025,7 +1031,7 @@ pub enum NetworkEvent { PeerRemoved(PeerId), } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub enum DiscoveredEvent { EventQueued { peer_id: PeerId, socket_addr: SocketAddr, fork_id: Option }, } diff --git a/crates/net/network/tests/it/startup.rs b/crates/net/network/tests/it/startup.rs index ca9d7c61adab..0bef94e08c30 100644 --- a/crates/net/network/tests/it/startup.rs +++ b/crates/net/network/tests/it/startup.rs @@ -59,8 +59,8 @@ async fn test_discovery_addr_in_use() { let any_port_listener = TcpListener::bind(addr).await.unwrap(); let port = any_port_listener.local_addr().unwrap().port(); let addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, port)); - let _discovery = Discovery::new(addr, secret_key, Some(disc_config), None).await.unwrap(); + let _discovery = Discovery::new(addr, secret_key, Some(disc_config), None, None).await.unwrap(); let disc_config = Discv4Config::default(); - let result = Discovery::new(addr, secret_key, Some(disc_config), None).await; + let result = Discovery::new(addr, secret_key, Some(disc_config), None, None).await; assert!(is_addr_in_use_kind(&result.err().unwrap(), ServiceKind::Discovery(addr))); }