Skip to content

Commit

Permalink
fix(l1): peers negotiated capabilities (#1887)
Browse files Browse the repository at this point in the history
**Motivation**

As mentioned in #1885 the logic of storing peers capabilities wasn't
entirely right.

**Description**
This update improves the negotiation of capabilities during the `Hello`
message exchange when establishing a connection. The main issue
previously was that we were not correctly handling capability matching
and so we were initiating connections with incompatible peers. Changes:
- Verifies compatibility by matching supported versions and storing the
highest available version: currently, we only support `eth/68`, so there
isn’t a higher version, but keeping this check aligns with the protocol
specification and we avoid having repeated hardcoded values.
- Sends and validates that the eth version in the `Status` message
matches the one negotiated in the `Hello` exchange.
- Adds a check to confirm that peers support at least one of our `eth`
versions. Previously, we encountered a case where a node supported
`snap` and `eth` but not an `eth` version we support. Despite this, the
connection was established because `snap` was shared. However, since
`snap` requires `eth` as well, this led to connections with incompatible
peers.

Closes None
  • Loading branch information
MarcosNicolau authored Feb 13, 2025
1 parent 090e6ab commit 08cf8f4
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 28 deletions.
76 changes: 56 additions & 20 deletions crates/networking/p2p/rlpx/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ use tokio_util::codec::Framed;

use super::utils::log_peer_warn;

const CAP_P2P: (Capability, u8) = (Capability::P2p, 5);
const CAP_ETH: (Capability, u8) = (Capability::Eth, 68);
const CAP_SNAP: (Capability, u8) = (Capability::Snap, 1);
const SUPPORTED_CAPABILITIES: [(Capability, u8); 3] = [CAP_P2P, CAP_ETH, CAP_SNAP];
const CAP_P2P_5: (Capability, u8) = (Capability::P2p, 5);
const CAP_ETH_68: (Capability, u8) = (Capability::Eth, 68);
const CAP_SNAP_1: (Capability, u8) = (Capability::Snap, 1);
const SUPPORTED_CAPABILITIES: [(Capability, u8); 3] = [CAP_P2P_5, CAP_ETH_68, CAP_SNAP_1];
const PERIODIC_TASKS_CHECK_INTERVAL: std::time::Duration = std::time::Duration::from_secs(15);

pub(crate) type Aes256Ctr64BE = ctr::Ctr64BE<aes::Aes256>;
Expand All @@ -70,6 +70,8 @@ pub(crate) struct RLPxConnection<S> {
framed: Framed<S, RLPxCodec>,
storage: Store,
capabilities: Vec<(Capability, u8)>,
negotiated_eth_version: u8,
negotiated_snap_version: u8,
next_periodic_task_check: Instant,
/// Send end of the channel used to broadcast messages
/// to other connected peers, is ok to have it here,
Expand Down Expand Up @@ -97,6 +99,8 @@ impl<S: AsyncWrite + AsyncRead + std::marker::Unpin> RLPxConnection<S> {
framed: Framed::new(stream, codec),
storage,
capabilities: vec![],
negotiated_eth_version: 0,
negotiated_snap_version: 0,
next_periodic_task_check: Instant::now() + PERIODIC_TASKS_CHECK_INTERVAL,
connection_broadcast_send: connection_broadcast,
}
Expand Down Expand Up @@ -201,23 +205,42 @@ impl<S: AsyncWrite + AsyncRead + std::marker::Unpin> RLPxConnection<S> {

match msg {
Message::Hello(hello_message) => {
let mut negotiated_eth_cap = (Capability::Eth, 0);
let mut negotiated_snap_cap = (Capability::Snap, 0);

log_peer_debug(
&self.node,
&format!(
"Hello message capabilities {:?}",
hello_message.capabilities
),
);
self.capabilities = hello_message.capabilities;

// Check if we have any capability in common
for cap in self.capabilities.clone() {
if SUPPORTED_CAPABILITIES.contains(&cap) {
return Ok(());
// Check if we have any capability in common and store the highest version
for cap in &hello_message.capabilities {
match *cap {
CAP_ETH_68 if CAP_ETH_68.1 > negotiated_eth_cap.1 => {
negotiated_eth_cap = CAP_ETH_68
}
CAP_SNAP_1 if CAP_SNAP_1.1 > negotiated_snap_cap.1 => {
negotiated_snap_cap = CAP_SNAP_1
}
_ => {}
}
}
// Return error if not
Err(RLPxError::NoMatchingCapabilities())

self.capabilities = hello_message.capabilities;

if negotiated_eth_cap.1 == 0 {
return Err(RLPxError::NoMatchingCapabilities());
}
self.negotiated_eth_version = negotiated_eth_cap.1;

if negotiated_snap_cap.1 != 0 {
self.negotiated_snap_version = negotiated_snap_cap.1;
}

Ok(())
}
Message::Disconnect(disconnect) => Err(RLPxError::DisconnectRequested(
disconnect.reason().to_string(),
Expand All @@ -239,7 +262,10 @@ impl<S: AsyncWrite + AsyncRead + std::marker::Unpin> RLPxConnection<S> {

// Subscribe this connection to the broadcasting channel.
let mut broadcaster_receive = {
if self.capabilities.contains(&CAP_ETH) {
if self
.capabilities
.contains(&(Capability::Eth, self.negotiated_eth_version))
{
Some(self.connection_broadcast_send.subscribe())
} else {
None
Expand Down Expand Up @@ -308,7 +334,7 @@ impl<S: AsyncWrite + AsyncRead + std::marker::Unpin> RLPxConnection<S> {
message: Message,
sender: mpsc::Sender<Message>,
) -> Result<(), RLPxError> {
let peer_supports_eth = self.capabilities.contains(&CAP_ETH);
let peer_supports_eth = self.negotiated_eth_version != 0;
let is_synced = self.storage.is_synced()?;
match message {
Message::Disconnect(msg_data) => {
Expand All @@ -328,9 +354,11 @@ impl<S: AsyncWrite + AsyncRead + std::marker::Unpin> RLPxConnection<S> {
Message::Pong(_) => {
// We ignore received Pong messages
}
Message::Status(msg_data) if !peer_supports_eth => {
backend::validate_status(msg_data, &self.storage)?
}
Message::Status(msg_data) if !peer_supports_eth => backend::validate_status(
msg_data,
&self.storage,
self.negotiated_eth_version as u32,
)?,
Message::GetAccountRange(req) => {
let response = process_account_range_request(req, self.storage.clone())?;
self.send(Message::AccountRange(response)).await?
Expand Down Expand Up @@ -446,8 +474,11 @@ impl<S: AsyncWrite + AsyncRead + std::marker::Unpin> RLPxConnection<S> {

async fn init_peer_conn(&mut self) -> Result<(), RLPxError> {
// Sending eth Status if peer supports it
if self.capabilities.contains(&CAP_ETH) {
let status = backend::get_status(&self.storage)?;
if self
.capabilities
.contains(&(Capability::Eth, self.negotiated_eth_version))
{
let status = backend::get_status(&self.storage, self.negotiated_eth_version as u32)?;
log_peer_debug(&self.node, "Sending status");
self.send(Message::Status(status)).await?;
// The next immediate message in the ETH protocol is the
Expand All @@ -459,8 +490,12 @@ impl<S: AsyncWrite + AsyncRead + std::marker::Unpin> RLPxConnection<S> {
};
match msg {
Message::Status(msg_data) => {
log_peer_debug(&self.node, &format!("Received Status {:?}", msg_data));
backend::validate_status(msg_data, &self.storage)?
log_peer_debug(&self.node, "Received Status");
backend::validate_status(
msg_data,
&self.storage,
self.negotiated_eth_version as u32,
)?
}
Message::Disconnect(disconnect) => {
return Err(RLPxError::HandshakeError(format!(
Expand All @@ -475,6 +510,7 @@ impl<S: AsyncWrite + AsyncRead + std::marker::Unpin> RLPxConnection<S> {
}
}
}

Ok(())
}

Expand Down
19 changes: 11 additions & 8 deletions crates/networking/p2p/rlpx/eth/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ use crate::rlpx::error::RLPxError;

use super::status::StatusMessage;

pub const ETH_VERSION: u32 = 68;

pub fn get_status(storage: &Store) -> Result<StatusMessage, RLPxError> {
pub fn get_status(storage: &Store, eth_version: u32) -> Result<StatusMessage, RLPxError> {
let chain_config = storage.get_chain_config()?;
let total_difficulty = U256::from(chain_config.terminal_total_difficulty.unwrap_or_default());
let network_id = chain_config.chain_id;
Expand All @@ -25,7 +23,7 @@ pub fn get_status(storage: &Store) -> Result<StatusMessage, RLPxError> {
let block_hash = block_header.compute_block_hash();
let fork_id = ForkId::new(chain_config, genesis, block_header.timestamp, block_number);
Ok(StatusMessage {
eth_version: ETH_VERSION,
eth_version,
network_id,
total_difficulty,
block_hash,
Expand All @@ -34,7 +32,11 @@ pub fn get_status(storage: &Store) -> Result<StatusMessage, RLPxError> {
})
}

pub fn validate_status(msg_data: StatusMessage, storage: &Store) -> Result<(), RLPxError> {
pub fn validate_status(
msg_data: StatusMessage,
storage: &Store,
eth_version: u32,
) -> Result<(), RLPxError> {
let chain_config = storage.get_chain_config()?;

// These blocks must always be available
Expand All @@ -60,7 +62,7 @@ pub fn validate_status(msg_data: StatusMessage, storage: &Store) -> Result<(), R
));
}
//Check Protocol Version
if msg_data.eth_version != ETH_VERSION {
if msg_data.eth_version != eth_version {
return Err(RLPxError::HandshakeError(
"Eth protocol version does not match".to_string(),
));
Expand Down Expand Up @@ -116,15 +118,16 @@ mod tests {
let genesis_hash = genesis.get_block().hash();
let fork_id = ForkId::new(config, genesis_hash, 2707305664, 123);

let eth_version = 68;
let message = StatusMessage {
eth_version: 68u32,
eth_version,
network_id: 3503995874084926,
total_difficulty,
block_hash: H256::random(),
genesis: genesis_hash,
fork_id,
};
let result = validate_status(message, &storage);
let result = validate_status(message, &storage, eth_version);
assert!(result.is_ok());
}
}

0 comments on commit 08cf8f4

Please sign in to comment.