Skip to content

Commit

Permalink
feat(discv5): plug discv5 crate into network (#7446)
Browse files Browse the repository at this point in the history
  • Loading branch information
emhane authored Apr 4, 2024
1 parent 4a8d2f4 commit 633806e
Show file tree
Hide file tree
Showing 9 changed files with 265 additions and 94 deletions.
3 changes: 2 additions & 1 deletion crates/net/discv5/src/enr.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
//! Interface between node identification on protocol version 5 and 4. Specifically, between types
//! [`discv5::enr::NodeId`] and [`PeerId`].
use discv5::enr::{CombinedPublicKey, Enr, EnrPublicKey, NodeId};
use discv5::enr::{CombinedPublicKey, EnrPublicKey, NodeId};
use enr::Enr;
use reth_primitives::{id2pk, pk2id, PeerId};
use secp256k1::{PublicKey, SecretKey};

Expand Down
11 changes: 4 additions & 7 deletions crates/net/discv5/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ use discv5::IpMode;
pub enum Error {
/// Failure adding node to [`discv5::Discv5`].
#[error("failed adding node to discv5, {0}")]
AddNodeToDiscv5Failed(&'static str),
AddNodeFailed(&'static str),
/// Node record has incompatible key type.
#[error("incompatible key type (not secp256k1)")]
IncompatibleKeyType,
/// Missing key used to identify rlpx network.
#[error("fork missing on enr, 'eth' key missing")]
ForkMissing,
#[error("fork missing on enr, key missing")]
ForkMissing(&'static [u8]),
/// Failed to decode [`ForkId`](reth_primitives::ForkId) rlp value.
#[error("failed to decode fork id, 'eth': {0:?}")]
ForkIdDecodeError(#[from] alloy_rlp::Error),
Expand All @@ -30,9 +30,6 @@ pub enum Error {
#[error("init failed, {0}")]
InitFailure(&'static str),
/// An error from underlying [`discv5::Discv5`] node.
#[error("{0}")]
#[error("sigp/discv5 error, {0}")]
Discv5Error(discv5::Error),
/// An error from underlying [`discv5::Discv5`] node.
#[error("{0}")]
Discv5ErrorStr(&'static str),
}
61 changes: 25 additions & 36 deletions crates/net/discv5/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::{

use ::enr::Enr;
use alloy_rlp::Decodable;
use derive_more::Deref;
use discv5::ListenConfig;
use enr::{discv4_id_to_discv5_id, EnrCombinedKeyWrapper};
use futures::future::join_all;
Expand Down Expand Up @@ -46,9 +45,8 @@ use metrics::Discv5Metrics;
const MAX_LOG2_DISTANCE: usize = 255;

/// Transparent wrapper around [`discv5::Discv5`].
#[derive(Deref, Clone)]
#[derive(Clone)]
pub struct Discv5 {
#[deref]
/// sigp/discv5 node.
discv5: Arc<discv5::Discv5>,
/// [`IpMode`] of the the node.
Expand All @@ -67,9 +65,9 @@ impl Discv5 {
////////////////////////////////////////////////////////////////////////////////////////////////

/// Adds the node to the table, if it is not already present.
pub fn add_node_to_routing_table(&self, node_record: Enr<SecretKey>) -> Result<(), Error> {
pub fn add_node(&self, node_record: Enr<SecretKey>) -> Result<(), Error> {
let EnrCombinedKeyWrapper(enr) = node_record.into();
self.add_enr(enr).map_err(Error::AddNodeToDiscv5Failed)
self.discv5.add_enr(enr).map_err(Error::AddNodeFailed)
}

/// Sets the pair in the EIP-868 [`Enr`] of the node.
Expand All @@ -85,7 +83,7 @@ impl Discv5 {
);
return
};
if let Err(err) = self.enr_insert(key_str, &rlp) {
if let Err(err) = self.discv5.enr_insert(key_str, &rlp) {
error!(target: "discv5",
%err,
"failed to update local enr"
Expand All @@ -109,11 +107,11 @@ impl Discv5 {
/// Adds the peer and id to the ban list.
///
/// This will prevent any future inclusion in the table
pub fn ban_peer_by_ip_and_node_id(&self, peer_id: PeerId, ip: IpAddr) {
pub fn ban(&self, peer_id: PeerId, ip: IpAddr) {
match discv4_id_to_discv5_id(peer_id) {
Ok(node_id) => {
self.ban_node(&node_id, None);
self.ban_peer_by_ip(ip);
self.discv5.ban_node(&node_id, None);
self.ban_ip(ip);
}
Err(err) => error!(target: "discv5",
%err,
Expand All @@ -125,15 +123,15 @@ impl Discv5 {
/// Adds the ip to the ban list.
///
/// This will prevent any future inclusion in the table
pub fn ban_peer_by_ip(&self, ip: IpAddr) {
self.ban_ip(ip, None);
pub fn ban_ip(&self, ip: IpAddr) {
self.discv5.ban_ip(ip, None);
}

/// Returns the [`NodeRecord`] of the local node.
///
/// This includes the currently tracked external IP address of the node.
pub fn node_record(&self) -> NodeRecord {
let enr: Enr<_> = EnrCombinedKeyWrapper(self.local_enr()).into();
let enr: Enr<_> = EnrCombinedKeyWrapper(self.discv5.local_enr()).into();
(&enr).try_into().unwrap()
}

Expand Down Expand Up @@ -238,7 +236,7 @@ impl Discv5 {
//
// 4. add boot nodes
//
Self::bootstrap(bootstrap_nodes, &discv5)?;
Self::bootstrap(bootstrap_nodes, &discv5).await?;

let metrics = Discv5Metrics::default();

Expand All @@ -255,7 +253,7 @@ impl Discv5 {
}

/// Bootstraps underlying [`discv5::Discv5`] node with configured peers.
fn bootstrap(
async fn bootstrap(
bootstrap_nodes: HashSet<BootNode>,
discv5: &Arc<discv5::Discv5>,
) -> Result<(), Error> {
Expand All @@ -269,7 +267,7 @@ impl Discv5 {
match node {
BootNode::Enr(node) => {
if let Err(err) = discv5.add_enr(node) {
return Err(Error::Discv5ErrorStr(err))
return Err(Error::AddNodeFailed(err))
}
}
BootNode::Enode(enode) => {
Expand All @@ -286,18 +284,8 @@ impl Discv5 {
}
}
}
_ = join_all(enr_requests);

debug!(target: "net::discv5",
nodes=format!("[{:#}]", discv5.with_kbuckets(|kbuckets| kbuckets
.write()
.iter()
.map(|peer| format!("enr: {:?}, status: {:?}", peer.node.value, peer.status)).collect::<Vec<_>>()
).into_iter().format(", ")),
"added boot nodes"
);

Ok(())
Ok(_ = join_all(enr_requests).await)
}

/// Backgrounds regular look up queries, in order to keep kbuckets populated.
Expand Down Expand Up @@ -479,7 +467,8 @@ impl Discv5 {
&self,
enr: &discv5::enr::Enr<K>,
) -> Result<ForkId, Error> {
let mut fork_id_bytes = enr.get_raw_rlp(self.fork_id_key()).ok_or(Error::ForkMissing)?;
let key = self.fork_id_key;
let mut fork_id_bytes = enr.get_raw_rlp(key).ok_or(Error::ForkMissing(key))?;

Ok(ForkId::decode(&mut fork_id_bytes)?)
}
Expand All @@ -491,9 +480,9 @@ impl Discv5 {
/// Exposes API of [`discv5::Discv5`].
pub fn with_discv5<F, R>(&self, f: F) -> R
where
F: FnOnce(&Self) -> R,
F: FnOnce(&discv5::Discv5) -> R,
{
f(self)
f(&self.discv5)
}

////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -617,7 +606,7 @@ mod tests {
// add node_2 to discovery handle of node_1 (should add node to discv5 kbuckets)
let node_2_enr_reth_compatible_ty: Enr<SecretKey> =
EnrCombinedKeyWrapper(node_2_enr.clone()).into();
node_1.add_node_to_routing_table(node_2_enr_reth_compatible_ty).unwrap();
node_1.add_node(node_2_enr_reth_compatible_ty).unwrap();

// verify node_2 is in KBuckets of node_1:discv5
assert!(
Expand All @@ -630,21 +619,21 @@ mod tests {
// verify node_1:discv5 is connected to node_2:discv5 and vv
let event_2_v5 = stream_2.recv().await.unwrap();
let event_1_v5 = stream_1.recv().await.unwrap();
matches!(
assert!(matches!(
event_1_v5,
discv5::Event::SessionEstablished(node, socket) if node == node_2_enr && socket == node_2_enr.udp4_socket().unwrap().into()
);
matches!(
));
assert!(matches!(
event_2_v5,
discv5::Event::SessionEstablished(node, socket) if node == node_1_enr && socket == node_1_enr.udp4_socket().unwrap().into()
);
));

// verify node_1 is in KBuckets of node_2:discv5
let event_2_v5 = stream_2.recv().await.unwrap();
matches!(
assert!(matches!(
event_2_v5,
discv5::Event::NodeInserted { node_id, replaced } if node_id == node_1_enr.node_id() && replaced.is_none()
);
));
}

#[test]
Expand Down
7 changes: 2 additions & 5 deletions crates/net/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ reth-rpc-types.workspace = true
reth-tokio-util.workspace = true

# ethereum
enr = { workspace = true, features = ["rust-secp256k1"], optional = true }
enr = { workspace = true, features = ["serde", "rust-secp256k1"] }
alloy-rlp.workspace = true
discv5.workspace = true

Expand Down Expand Up @@ -84,8 +84,6 @@ alloy-node-bindings.workspace = true
ethers-core = { workspace = true, default-features = false }
ethers-providers = { workspace = true, default-features = false, features = ["ws"] }

enr = { workspace = true, features = ["serde", "rust-secp256k1"] }

# misc
serial_test.workspace = true
tempfile.workspace = true
Expand All @@ -96,10 +94,9 @@ criterion = { workspace = true, features = ["async_tokio", "html_reports"] }

[features]
default = ["serde"]
serde = ["dep:serde", "dep:humantime-serde", "secp256k1/serde", "enr?/serde", "dep:serde_json"]
serde = ["dep:serde", "dep:humantime-serde", "secp256k1/serde", "enr/serde", "dep:serde_json"]
test-utils = [
"reth-provider/test-utils",
"dep:enr",
"dep:tempfile",
"reth-transaction-pool/test-utils",
]
Expand Down
8 changes: 3 additions & 5 deletions crates/net/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ pub struct NetworkConfig<C> {
pub boot_nodes: HashSet<NodeRecord>,
/// How to set up discovery over DNS.
pub dns_discovery_config: Option<DnsDiscoveryConfig>,
/// Address to use for discovery v4.
pub discovery_v4_addr: SocketAddr,
/// How to set up discovery.
pub discovery_v4_config: Option<Discv4Config>,
/// How to set up discovery version 5.
pub discovery_v5_config: Option<reth_discv5::Config>,
/// Address to use for discovery
pub discovery_addr: SocketAddr,
/// Address to listen for incoming connections
pub listener_addr: SocketAddr,
/// How to instantiate peer manager.
Expand Down Expand Up @@ -155,10 +155,8 @@ impl<C> NetworkConfig<C> {
}

/// Sets the config to use for the discovery v5 protocol.
pub fn set_discovery_v5(mut self, discv5_config: reth_discv5::Config) -> Self {
self.discovery_v5_config = Some(discv5_config);
self.discovery_addr = self.discovery_v5_config.as_ref().unwrap().discovery_socket();
self
}

Expand Down Expand Up @@ -567,7 +565,7 @@ impl NetworkConfigBuilder {
dns_discovery_config,
discovery_v4_config: discovery_v4_builder.map(|builder| builder.build()),
discovery_v5_config: None,
discovery_addr: discovery_addr.unwrap_or(DEFAULT_DISCOVERY_ADDRESS),
discovery_v4_addr: discovery_addr.unwrap_or(DEFAULT_DISCOVERY_ADDRESS),
listener_addr,
peers_config: peers_config.unwrap_or_default(),
sessions_config: sessions_config.unwrap_or_default(),
Expand Down
Loading

0 comments on commit 633806e

Please sign in to comment.