Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: sigp/discv5
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: v0.8.0
Choose a base ref
...
head repository: sigp/discv5
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: master
Choose a head ref
  • 10 commits
  • 12 files changed
  • 3 contributors

Commits on Oct 23, 2024

  1. Copy the full SHA
    ab4ce9a View commit details

Commits on Oct 28, 2024

  1. Increase default vote duration to 2 minutes (#270)

    * Increase default vote duration to 2 minutes
    
    * Update comment
    AgeManning authored Oct 28, 2024
    Copy the full SHA
    a196ef7 View commit details

Commits on Oct 29, 2024

  1. Automatic Firewall/NAT Detection (#271)

    * Initial work towards firewall checking
    
    * Checking connectivity by awaiting incoming connections
    
    * Correct timeout handling
    
    * Testing
    
    * Finished testing
    
    * fmt
    
    * clippy
    
    * Inform peers of ENR update
    AgeManning authored Oct 29, 2024
    Copy the full SHA
    ccc3a57 View commit details
  2. Copy the full SHA
    2fb3bea View commit details

Commits on Jan 20, 2025

  1. Fix encode TalkRequest regression during library transition (#276)

    * Fix encode TalkRequest regression during library transition
    
    * fix: clippy
    KolbyML authored Jan 20, 2025
    Copy the full SHA
    9d211f6 View commit details

Commits on Jan 22, 2025

  1. Prevent dual-stack spamming (#275)

    * Prevent dual-stack spamming
    
    * Fix clippy lint
    
    * update majority function
    
    * Improve comments and function naming
    
    * remove self and make filter_stale_find_most_frequent a free function
    
    * fix test
    
    ---------
    
    Co-authored-by: João Oliveira <hello@jxs.pt>
    AgeManning and jxs authored Jan 22, 2025
    Copy the full SHA
    1445a56 View commit details
  2. Version bump to v0.9.1

    AgeManning committed Jan 22, 2025
    Copy the full SHA
    c3592f2 View commit details

Commits on Feb 12, 2025

  1. Copy the full SHA
    e2ac53b View commit details
  2. Copy the full SHA
    60e24b9 View commit details
  3. Copy the full SHA
    ac91ad4 View commit details
Showing with 707 additions and 314 deletions.
  1. +10 −7 Cargo.toml
  2. +34 −3 src/config.rs
  3. +6 −4 src/discv5.rs
  4. +1 −1 src/handler/mod.rs
  5. +1 −3 src/kbucket/bucket.rs
  6. +13 −1 src/metrics.rs
  7. +12 −1 src/node_info.rs
  8. +50 −4 src/rpc.rs
  9. +314 −256 src/service.rs
  10. +179 −0 src/service/connectivity_state.rs
  11. +73 −29 src/service/ip_vote.rs
  12. +14 −5 src/service/test.rs
17 changes: 10 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@
name = "discv5"
authors = ["Sigma Prime <contact@sigmaprime.io>"]
edition = "2018"
version = "0.8.0"
version = "0.9.1"
description = "Implementation of the p2p discv5 discovery protocol"
license = "Apache-2.0"
repository = "https://github.com/sigp/discv5"
@@ -12,17 +12,20 @@ categories = ["network-programming", "asynchronous"]
exclude = [".gitignore", ".github/*"]

[dependencies]
enr = { version = "0.12", features = ["k256", "ed25519"] }
enr = { version = "0.13.0", features = [
"k256",
"ed25519",
] } # enr = { version = "0.12", features = ["k256", "ed25519"] }
tokio = { version = "1", features = ["net", "sync", "macros", "rt"] }
libp2p-identity = { version = "0.2", features = [
"ed25519",
"secp256k1",
"ed25519",
"secp256k1",
], optional = true }
multiaddr = { version = "0.18", optional = true }
zeroize = { version = "1", features = ["zeroize_derive"] }
futures = "0.3"
uint = { version = "0.10", default-features = false }
alloy-rlp = { version = "0.3.8", default-features = true }
alloy-rlp = { version = "0.3", default-features = true }
# This version must be kept up to date do it uses the same dependencies as ENR
hkdf = "0.12"
hex = "0.4"
@@ -33,8 +36,8 @@ socket2 = "0.5"
smallvec = "1"
parking_lot = "0.12"
lazy_static = "1"
aes = "0.8.4"
ctr = "0.9.2"
aes = "0.8"
ctr = "0.9"
aes-gcm = "0.10.3"
tracing = { version = "0.1", features = ["log"] }
lru = "0.12"
37 changes: 34 additions & 3 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@ pub struct Config {
pub request_timeout: Duration,

/// The interval over which votes are remembered when determining our external IP. A lower
/// interval will respond faster to IP changes. Default is 30 seconds.
/// interval will respond faster to IP changes. Default is 2 minutes.
pub vote_duration: Duration,

/// The timeout after which a `QueryPeer` in an ongoing query is marked unresponsive.
@@ -92,6 +92,18 @@ pub struct Config {
/// will last indefinitely. Default is 1 hour.
pub ban_duration: Option<Duration>,

/// Auto-discovering our IP address, is only one part in discovering our NAT/firewall
/// situation. We need to determine if we are behind a firewall that is preventing incoming
/// connections (this is especially true for IPv6 where all connections will report the same
/// external IP). To do this, Discv5 uses a heuristic, which is that after we set an address in
/// our ENR, we wait for this duration to see if we have any incoming connections. If we
/// receive a single INCOMING connection in this duration, we consider ourselves contactable,
/// until we update or change our IP address again. If we fail to receive an incoming
/// connection in this duration, we revoke our ENR address advertisement for 6 hours, before
/// trying again. This can be set to None, to always advertise and never revoke. The default is
/// Some(5 minutes).
pub auto_nat_listen_duration: Option<Duration>,

/// A custom executor which can spawn the discv5 tasks. This must be a tokio runtime, with
/// timing support. By default, the executor that created the discv5 struct will be used.
pub executor: Option<Box<dyn Executor + Send + Sync>>,
@@ -121,7 +133,7 @@ impl ConfigBuilder {
let config = Config {
enable_packet_filter: false,
request_timeout: Duration::from_secs(1),
vote_duration: Duration::from_secs(30),
vote_duration: Duration::from_secs(120),
query_peer_timeout: Duration::from_secs(2),
query_timeout: Duration::from_secs(60),
request_retries: 1,
@@ -141,6 +153,7 @@ impl ConfigBuilder {
filter_max_bans_per_ip: Some(5),
permit_ban_list: PermitBanList::default(),
ban_duration: Some(Duration::from_secs(3600)), // 1 hour
auto_nat_listen_duration: Some(Duration::from_secs(300)), // 5 minutes
executor: None,
listen_config,
};
@@ -161,7 +174,7 @@ impl ConfigBuilder {
}

/// The interval over which votes are remembered when determining our external IP. A lower
/// interval will respond faster to IP changes. Default is 30 seconds.
/// interval will respond faster to IP changes. Default is 2 minutes.
pub fn vote_duration(&mut self, vote_duration: Duration) -> &mut Self {
self.config.vote_duration = vote_duration;
self
@@ -295,6 +308,24 @@ impl ConfigBuilder {
self
}

/// Auto-discovering our IP address, is only one part in discovering our NAT/firewall
/// situation. We need to determine if we are behind a firewall that is preventing incoming
/// connections (this is especially true for IPv6 where all connections will report the same
/// external IP). To do this, Discv5 uses a heuristic, which is that after we set an address in
/// our ENR, we wait for this duration to see if we have any incoming connections. If we
/// receive a single INCOMING connection in this duration, we consider ourselves contactable,
/// until we update or change our IP address again. If we fail to receive an incoming
/// connection in this duration, we revoke our ENR address advertisement for 6 hours, before
/// trying again. This can be set to None, to always advertise and never revoke. The default is
/// Some(5 minutes).
pub fn auto_nat_listen_duration(
&mut self,
auto_nat_listen_duration: Option<Duration>,
) -> &mut Self {
self.config.auto_nat_listen_duration = auto_nat_listen_duration;
self
}

/// A custom executor which can spawn the discv5 tasks. This must be a tokio runtime, with
/// timing support.
pub fn executor(&mut self, executor: Box<dyn Executor + Send + Sync>) -> &mut Self {
10 changes: 6 additions & 4 deletions src/discv5.rs
Original file line number Diff line number Diff line change
@@ -267,6 +267,10 @@ impl<P: ProtocolIdentity> Discv5<P> {
nodes_to_send
}

pub fn ip_mode(&self) -> IpMode {
self.ip_mode
}

/// Mark a node in the routing table as `Disconnected`.
///
/// A `Disconnected` node will be present in the routing table and will be only
@@ -568,21 +572,19 @@ impl<P: ProtocolIdentity> Discv5<P> {
}
}

/// Request a TALK message from a node, identified via the ENR.
/// Request a TALK message from a node, identified via the NodeContact.
pub fn talk_req(
&self,
enr: Enr,
node_contact: NodeContact,
protocol: Vec<u8>,
request: Vec<u8>,
) -> impl Future<Output = Result<Vec<u8>, RequestError>> + 'static {
// convert the ENR to a node_contact.

let (callback_send, callback_recv) = oneshot::channel();
let channel = self.clone_channel();
let ip_mode = self.ip_mode;

async move {
let node_contact = NodeContact::try_from_enr(enr, ip_mode)?;
let channel = channel.map_err(|_| RequestError::ServiceNotStarted)?;

let event = ServiceRequest::Talk(node_contact, protocol, request, callback_send);
2 changes: 1 addition & 1 deletion src/handler/mod.rs
Original file line number Diff line number Diff line change
@@ -858,7 +858,7 @@ impl Handler {
//
// We still handle the request, but we do not add the ENR to our routing
// table or consider the ENR valid.
warn!(
debug!(
udp4_socket = ?enr.udp4_socket(),
udp6_socket = ?enr.udp6_socket(),
expected = %node_address,
4 changes: 1 addition & 3 deletions src/kbucket/bucket.rs
Original file line number Diff line number Diff line change
@@ -410,9 +410,7 @@ where
// Adjust `first_connected_pos` accordingly.
match old_status.state {
ConnectionState::Connected => {
if self.first_connected_pos.map_or(false, |p| p == pos.0)
&& pos.0 == self.nodes.len()
{
if self.first_connected_pos == Some(pos.0) && pos.0 == self.nodes.len() {
// It was the last connected node.
self.first_connected_pos = None
}
14 changes: 13 additions & 1 deletion src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};

lazy_static! {
pub static ref METRICS: InternalMetrics = InternalMetrics::default();
@@ -16,6 +16,10 @@ pub struct InternalMetrics {
pub bytes_sent: AtomicUsize,
/// The number of bytes received.
pub bytes_recv: AtomicUsize,
/// Whether we consider ourselves contactable or not on ipv4.
pub ipv4_contactable: AtomicBool,
/// Whether we consider ourselves contactable or not on ipv6.
pub ipv6_contactable: AtomicBool,
}

impl Default for InternalMetrics {
@@ -26,6 +30,8 @@ impl Default for InternalMetrics {
unsolicited_requests_per_window: AtomicUsize::new(0),
bytes_sent: AtomicUsize::new(0),
bytes_recv: AtomicUsize::new(0),
ipv4_contactable: AtomicBool::new(false),
ipv6_contactable: AtomicBool::new(false),
}
}
}
@@ -55,6 +61,10 @@ pub struct Metrics {
pub bytes_sent: usize,
/// The number of bytes received.
pub bytes_recv: usize,
/// Whether we consider ourselves contactable or not.
pub ipv4_contactable: bool,
/// Whether we consider ourselves contactable or not.
pub ipv6_contactable: bool,
}

impl From<&METRICS> for Metrics {
@@ -67,6 +77,8 @@ impl From<&METRICS> for Metrics {
/ internal_metrics.moving_window as f64,
bytes_sent: internal_metrics.bytes_sent.load(Ordering::Relaxed),
bytes_recv: internal_metrics.bytes_recv.load(Ordering::Relaxed),
ipv4_contactable: internal_metrics.ipv4_contactable.load(Ordering::Relaxed),
ipv6_contactable: internal_metrics.ipv6_contactable.load(Ordering::Relaxed),
}
}
}
13 changes: 12 additions & 1 deletion src/node_info.rs
Original file line number Diff line number Diff line change
@@ -26,8 +26,19 @@ pub struct NonContactable {
}

impl NodeContact {
pub fn new(public_key: CombinedPublicKey, socket_addr: SocketAddr, enr: Option<Enr>) -> Self {
NodeContact {
public_key,
socket_addr,
enr,
}
}

pub fn node_id(&self) -> NodeId {
self.public_key.clone().into()
match self.enr {
Some(ref enr) => enr.node_id(),
None => self.public_key.clone().into(),
}
}

pub fn seq_no(&self) -> Option<u64> {
54 changes: 50 additions & 4 deletions src/rpc.rs
Original file line number Diff line number Diff line change
@@ -155,8 +155,8 @@ impl Request {
RequestBody::Talk { protocol, request } => {
let mut list = Vec::<u8>::new();
id.as_bytes().encode(&mut list);
protocol.encode(&mut list);
request.encode(&mut list);
protocol.as_slice().encode(&mut list);
request.as_slice().encode(&mut list);
let header = Header {
list: true,
payload_length: list.len(),
@@ -479,8 +479,8 @@ impl Message {
}
5 => {
// Talk Request
let protocol = Vec::<u8>::decode(payload)?;
let request = Vec::<u8>::decode(payload)?;
let protocol = Bytes::decode(payload)?.to_vec();
let request = Bytes::decode(payload)?.to_vec();
if !payload.is_empty() {
return Err(DecoderError::Custom("Payload should be empty"));
}
@@ -812,4 +812,50 @@ mod tests {
let data6 = [6, 193, 128, 128];
Message::decode(&data6).expect_err("should reject extra data");
}

#[test]
fn test_encode_request_talk_request() {
// reference input
let id = RequestId(vec![113, 236, 255, 66, 31, 191, 221, 86]);
let message = Message::Request(Request {
id,
body: RequestBody::Talk {
protocol: hex::decode("757470").unwrap(),
request: hex::decode("0100a028839e1549000003ef001000007619dde7").unwrap(),
},
});

// expected hex output
let expected_output =
hex::decode("05e28871ecff421fbfdd5683757470940100a028839e1549000003ef001000007619dde7")
.unwrap();
dbg!(hex::encode(message.clone().encode()));

let encoded_message = message.clone().encode();
assert_eq!(encoded_message.clone(), expected_output);
assert_eq!(Message::decode(&encoded_message).unwrap(), message);
}

#[test]
fn test_encode_request_talk_response() {
// reference input
let id = RequestId(vec![113, 236, 255, 66, 31, 191, 221, 86]);
let message = Message::Response(Response {
id,
body: ResponseBody::Talk {
response: hex::decode("0100a028839e1549000003ef001000007619dde7").unwrap(),
},
});

// expected hex output
let expected_output =
hex::decode("06de8871ecff421fbfdd56940100a028839e1549000003ef001000007619dde7")
.unwrap();

dbg!(hex::encode(message.clone().encode()));

let encoded_message = message.clone().encode();
assert_eq!(encoded_message.clone(), expected_output);
assert_eq!(Message::decode(&encoded_message).unwrap(), message);
}
}
Loading