Skip to content

Commit

Permalink
Finished testing
Browse files Browse the repository at this point in the history
  • Loading branch information
AgeManning committed Oct 22, 2024
1 parent b0bbee1 commit b60ef66
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 79 deletions.
10 changes: 5 additions & 5 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,14 @@ pub struct Config {

/// Auto-discovering our IP address, is only one part in discovering our NAT/firewall
/// situation. We need to determine if we are behind a firewall that is preventing incoming
/// connections (this is espcially true for IPv6 where all connections will report the same
/// connections (this is especially true for IPv6 where all connections will report the same
/// external IP). To do this, Discv5 uses a heuristic, which is that after we set an address in
/// our ENR, we wait for this duration to see if we have any incoming connections. If we
/// receive a single INCOMING connection in this duration, we consider ourselves contactable,
/// until we update or change our IP address again. If we fail to receive an incoming
/// connection in this duration, we revoke our ENR address advertisement for 6 hours, before
/// trying again. This can be set to None, to always advertise and never revoke. The default is
/// Some(10 minutes).
/// Some(5 minutes).
pub auto_nat_listen_duration: Option<Duration>,

/// A custom executor which can spawn the discv5 tasks. This must be a tokio runtime, with
Expand Down Expand Up @@ -153,7 +153,7 @@ impl ConfigBuilder {
filter_max_bans_per_ip: Some(5),
permit_ban_list: PermitBanList::default(),
ban_duration: Some(Duration::from_secs(3600)), // 1 hour
auto_nat_listen_duration: Some(Duration::from_secs(600)), // 10 minutes
auto_nat_listen_duration: Some(Duration::from_secs(300)), // 5 minutes
executor: None,
listen_config,
};
Expand Down Expand Up @@ -310,14 +310,14 @@ impl ConfigBuilder {

/// Auto-discovering our IP address, is only one part in discovering our NAT/firewall
/// situation. We need to determine if we are behind a firewall that is preventing incoming
/// connections (this is espcially true for IPv6 where all connections will report the same
/// connections (this is especially true for IPv6 where all connections will report the same
/// external IP). To do this, Discv5 uses a heuristic, which is that after we set an address in
/// our ENR, we wait for this duration to see if we have any incoming connections. If we
/// receive a single INCOMING connection in this duration, we consider ourselves contactable,
/// until we update or change our IP address again. If we fail to receive an incoming
/// connection in this duration, we revoke our ENR address advertisement for 6 hours, before
/// trying again. This can be set to None, to always advertise and never revoke. The default is
/// Some(10 minutes).
/// Some(5 minutes).
pub fn auto_nat_listen_duration(
&mut self,
auto_nat_listen_duration: Option<Duration>,
Expand Down
14 changes: 13 additions & 1 deletion src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};

lazy_static! {
pub static ref METRICS: InternalMetrics = InternalMetrics::default();
Expand All @@ -16,6 +16,10 @@ pub struct InternalMetrics {
pub bytes_sent: AtomicUsize,
/// The number of bytes received.
pub bytes_recv: AtomicUsize,
/// Whether we consider ourselves contactable or not on ipv4.
pub ipv4_contactable: AtomicBool,
/// Whether we consider ourselves contactable or not on ipv6.
pub ipv6_contactable: AtomicBool,
}

impl Default for InternalMetrics {
Expand All @@ -26,6 +30,8 @@ impl Default for InternalMetrics {
unsolicited_requests_per_window: AtomicUsize::new(0),
bytes_sent: AtomicUsize::new(0),
bytes_recv: AtomicUsize::new(0),
ipv4_contactable: AtomicBool::new(false),
ipv6_contactable: AtomicBool::new(false),
}
}
}
Expand Down Expand Up @@ -55,6 +61,10 @@ pub struct Metrics {
pub bytes_sent: usize,
/// The number of bytes received.
pub bytes_recv: usize,
/// Whether we consider ourselves contactable or not.
pub ipv4_contactable: bool,
/// Whether we consider ourselves contactable or not.
pub ipv6_contactable: bool,
}

impl From<&METRICS> for Metrics {
Expand All @@ -67,6 +77,8 @@ impl From<&METRICS> for Metrics {
/ internal_metrics.moving_window as f64,
bytes_sent: internal_metrics.bytes_sent.load(Ordering::Relaxed),
bytes_recv: internal_metrics.bytes_recv.load(Ordering::Relaxed),
ipv4_contactable: internal_metrics.ipv4_contactable.load(Ordering::Relaxed),
ipv6_contactable: internal_metrics.ipv6_contactable.load(Ordering::Relaxed),
}
}
}
138 changes: 69 additions & 69 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -885,77 +885,77 @@ impl Service {
return;
}

// get the advertised local addresses
let (local_ip4_socket, local_ip6_socket) = {
let local_enr = self.local_enr.read();
(local_enr.udp4_socket(), local_enr.udp6_socket())
};

if let Some(ref mut ip_votes) = self.ip_votes {
ip_votes.insert(node_id, socket);
let (maybe_ip4_majority, maybe_ip6_majority) = ip_votes.majority();

let new_ip4 = maybe_ip4_majority.and_then(|majority| {
if Some(majority) != local_ip4_socket {
Some(majority)
} else {
None
}
});
let new_ip6 = maybe_ip6_majority.and_then(|majority| {
if Some(majority) != local_ip6_socket {
Some(majority)
} else {
None
}
});

if new_ip4.is_some() || new_ip6.is_some() {
let mut updated = false;

// Check if our advertised IPV6 address needs to be updated.
if let Some(new_ip6) = new_ip6 {
let new_ip6: SocketAddr = new_ip6.into();
let result = self
.local_enr
.write()
.set_udp_socket(new_ip6, &self.enr_key.read());
match result {
Ok(_) => {
updated = true;
// Inform the connectivity state that we have updated our IP advertisement
self.connectivity_state.enr_socket_update(&new_ip6);
info!(%new_ip6, "Local UDP ip6 socket updated");
self.send_event(Event::SocketUpdated(new_ip6));
match socket {
SocketAddr::V4(_) => {
let local_ip4_socket = self.local_enr.read().udp4_socket();
if let Some(ip_votes) = self.ip_votes.as_mut() {
ip_votes.insert(node_id, socket);
let maybe_ip4_majority = ip_votes.majority().0;

let new_ip4 = maybe_ip4_majority.and_then(|majority| {
if Some(majority) != local_ip4_socket {
Some(majority)
} else {
None
}
Err(e) => {
warn!(ip6 = %new_ip6, error = ?e, "Failed to update local UDP ip6 socket.");
});

// If we have a new ipv4 majority
if let Some(new_ip4) = new_ip4 {
let new_ip4: SocketAddr = new_ip4.into();
let result = self
.local_enr
.write()
.set_udp_socket(new_ip4, &self.enr_key.read());
match result {
Ok(_) => {
// Inform the connectivity state that we have updated our IP advertisement
self.connectivity_state.enr_socket_update(&new_ip4);
info!(ip_version="v4", %new_ip4, "Local UDP socket updated");
self.send_event(Event::SocketUpdated(new_ip4));
self.ping_connected_peers();
}
Err(e) => {
warn!(ip = %new_ip4, error = ?e, "Failed to update local UDP socket.");
}
}
}
}
if let Some(new_ip4) = new_ip4 {
let new_ip4: SocketAddr = new_ip4.into();
let result = self
.local_enr
.write()
.set_udp_socket(new_ip4, &self.enr_key.read());
match result {
Ok(_) => {
updated = true;
// Inform the connectivity state that we have updated our IP advertisement
self.connectivity_state.enr_socket_update(&new_ip4);
info!(%new_ip4, "Local UDP socket updated");
self.send_event(Event::SocketUpdated(new_ip4));
}
SocketAddr::V6(_) => {
let local_ip6_socket = self.local_enr.read().udp6_socket();
if let Some(ip_votes) = self.ip_votes.as_mut() {
ip_votes.insert(node_id, socket);
let maybe_ip6_majority = ip_votes.majority().1;

let new_ip6 = maybe_ip6_majority.and_then(|majority| {
if Some(majority) != local_ip6_socket {
Some(majority)
} else {
None
}
Err(e) => {
warn!(ip = %new_ip4, error = ?e, "Failed to update local UDP socket.");
});
// Check if our advertised IPV6 address needs to be updated.
if let Some(new_ip6) = new_ip6 {
let new_ip6: SocketAddr = new_ip6.into();
let result = self
.local_enr
.write()
.set_udp_socket(new_ip6, &self.enr_key.read());
match result {
Ok(_) => {
// Inform the connectivity state that we have updated our IP advertisement
self.connectivity_state.enr_socket_update(&new_ip6);
info!(ip_version="v6", %new_ip6, "Local UDP socket updated");
self.send_event(Event::SocketUpdated(new_ip6));
self.ping_connected_peers();
}
Err(e) => {
warn!(ip6 = %new_ip6, error = ?e, "Failed to update local UDP ip6 socket.");
}
}
}
}
if updated {
// Ping our peers to inform them of the ENR update
self.ping_connected_peers();
}
}
}
}
Expand Down Expand Up @@ -1465,6 +1465,12 @@ impl Service {
socket: &SocketAddr,
connection_direction: ConnectionDirection,
) {
// Inform the connectivity state that an incoming peer has connected to us. This could
// establish that our externally advertised address is contactable.
if matches!(connection_direction, ConnectionDirection::Incoming) {
self.connectivity_state.received_incoming_connection(socket);
}

// Ignore sessions with non-contactable ENRs
if self.ip_mode.get_contactable_addr(&enr).is_none() {
return;
Expand All @@ -1485,12 +1491,6 @@ impl Service {
_ => connection_direction,
};

// Inform the connectivity state that an incoming peer has connected to us. This could
// establish that our externally advertised address is contactable.
if matches!(direction, ConnectionDirection::Incoming) {
self.connectivity_state.received_incoming_connection(socket);
}

debug!(node = %node_id, %direction, "Session established with Node");
self.connection_updated(node_id, ConnectionStatus::Connected(enr, direction));
}
Expand Down
47 changes: 43 additions & 4 deletions src/service/connectivity_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,22 @@
//! DURATION_UNTIL_NEXT_CONNECTIVITY_ATTEMPT in the future. This will prevent counting votes until
//! this time, which prevents our ENR from being updated.
use crate::metrics::METRICS;
use futures::future::{pending, Either};
use futures::FutureExt;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::atomic::Ordering;
use std::time::{Duration, Instant};
use tokio::time::{sleep, Sleep};
use tracing::info;

// const DURATION_UNTIL_NEXT_CONNECTIVITY_ATTEMPT: Duration = Duration::from_secs(21600); // 6 hours
pub const DURATION_UNTIL_NEXT_CONNECTIVITY_ATTEMPT: Duration = Duration::from_secs(100);
pub const DURATION_UNTIL_NEXT_CONNECTIVITY_ATTEMPT: Duration = Duration::from_secs(21600); // 6 hours

/// The number of incoming connections we need to observe before we consider ourselves contactable.
/// A previously disconnected node reconnecting back through a temporarily open port can be false
/// positive. The higher this number, the lower the probability of false positives.
const NUMBER_OF_INCOMING_CONNECTIONS_REQUIRED_TO_BE_VALID: usize = 2;

/// The error returned from polling the ConnectivityState indicating whether IPv4 or IPv6 has
/// failed a connectivity check.
Expand All @@ -48,6 +55,10 @@ pub(crate) struct ConnectivityState {
pub ipv4_next_connectivity_test: Instant,
/// The time that we begin checking connectivity tests for ipv6.
pub ipv6_next_connectivity_test: Instant,
/// The number of incoming ipv4 nodes we have seen during our awaiting window.
ipv4_incoming_count: usize,
/// The number of incoming ipv6 nodes we have seen during our awaiting window.
ipv6_incoming_count: usize,
}

impl ConnectivityState {
Expand All @@ -58,6 +69,8 @@ impl ConnectivityState {
ipv6_incoming_wait_time: None,
ipv4_next_connectivity_test: Instant::now(),
ipv6_next_connectivity_test: Instant::now(),
ipv4_incoming_count: 0,
ipv6_incoming_count: 0,
}
}

Expand Down Expand Up @@ -85,9 +98,11 @@ impl ConnectivityState {
if let Some(duration_to_wait) = self.duration_for_incoming_connections {
match socket {
SocketAddr::V4(_) => {
self.ipv4_incoming_count = 0;
self.ipv4_incoming_wait_time = Some(Box::pin(sleep(duration_to_wait)))
}
SocketAddr::V6(_) => {
self.ipv6_incoming_count = 0;
self.ipv6_incoming_wait_time = Some(Box::pin(sleep(duration_to_wait)))
}
}
Expand All @@ -99,8 +114,30 @@ impl ConnectivityState {
// to potentially change the IP address if a legitimate change occurs.
pub fn received_incoming_connection(&mut self, socket: &SocketAddr) {
match socket {
SocketAddr::V4(_) => self.ipv4_incoming_wait_time = None,
SocketAddr::V6(_) => self.ipv6_incoming_wait_time = None,
SocketAddr::V4(_) => {
if self.ipv4_incoming_wait_time.is_none() {
// We are not waiting for any v4 connections
return;
}
self.ipv4_incoming_count += 1;
if self.ipv4_incoming_count >= NUMBER_OF_INCOMING_CONNECTIONS_REQUIRED_TO_BE_VALID {
info!(ip_version = "v4", "We are contactable");
self.ipv4_incoming_wait_time = None;
METRICS.ipv4_contactable.store(true, Ordering::Relaxed);
}
}
SocketAddr::V6(_) => {
if self.ipv6_incoming_wait_time.is_none() {
// We are not waiting for any v6 connections
return;
}
self.ipv6_incoming_count += 1;
if self.ipv6_incoming_count >= NUMBER_OF_INCOMING_CONNECTIONS_REQUIRED_TO_BE_VALID {
info!(ip_version = "v6", "We are contactable");
self.ipv6_incoming_wait_time = None;
METRICS.ipv6_contactable.store(true, Ordering::Relaxed);
}
}
}
}

Expand All @@ -124,12 +161,14 @@ impl ConnectivityState {
self.ipv4_next_connectivity_test =
Instant::now() + DURATION_UNTIL_NEXT_CONNECTIVITY_ATTEMPT;
self.ipv4_incoming_wait_time = None;
METRICS.ipv4_contactable.store(false, Ordering::Relaxed);
TimerFailure::V4
} else {
// Ipv6 fired
self.ipv6_next_connectivity_test =
Instant::now() + DURATION_UNTIL_NEXT_CONNECTIVITY_ATTEMPT;
self.ipv6_incoming_wait_time = None;
METRICS.ipv6_contactable.store(false, Ordering::Relaxed);
TimerFailure::V6
}
}
Expand Down

0 comments on commit b60ef66

Please sign in to comment.