From 6c8c343ca1c5318d90709787ad0deebf0feb4dc2 Mon Sep 17 00:00:00 2001 From: James Hinshelwood Date: Thu, 5 Sep 2024 10:53:44 +0100 Subject: [PATCH] Ensure we retry `BlockRequest`s which never receive a `BlockResponse` (#1398) We do this by properly utilising `libp2p::request_response` and representing a `BlockRequest` and a `BlockResponse` as an actual request and response pair. Previously, both were requests at the libp2p level and would be responded to by an empty ACK. `libp2p::request_response` now sends us a request timeout failure if a response is not received for a request within 10 seconds. This triggers the existing retry logic in `BlockStore` and we successfully make the request again (hopefully from a different peer). To achieve this I refactored the interaction between the libp2p layer and `Node`. Instead of all messages being passed to `handle_network_message`, I have split this into 4 different methods: * `handle_broadcast` * `handle_request` * `handle_request_failure` * `handle_response` Importantly, `handle_request` takes `ResponseChannel` as an argument. When the handler is ready to respond to the request it sends that `ResponseChannel` (and the actual response) to a new `request_responses` channel. The libp2p layer picks up these responses and passes them on to the network. This is a slightly leaky abstraction, since the `ResponseChannel` is libp2p-specific but the node knows about it. This isn't awful, but it does mean we need some compile-time feature flag shenanigans to allow the integration test suite (which doesn't use libp2p) to construct fake versions of the `ResponseChannel`. I hope this is reasonably well documented in the code and not too ugly. --- Cargo.lock | 1 + z2/src/docgen.rs | 3 +- zilliqa/Cargo.toml | 5 +- zilliqa/src/consensus.rs | 2 +- zilliqa/src/message.rs | 6 +- zilliqa/src/node.rs | 168 +++++++++++++++++++++----------- zilliqa/src/node_launcher.rs | 140 +++++++++++++++++--------- zilliqa/src/p2p_node.rs | 125 +++++++++++------------- zilliqa/tests/it/consensus.rs | 2 +- zilliqa/tests/it/main.rs | 144 +++++++++++++++++++++------ zilliqa/tests/it/persistence.rs | 2 +- 11 files changed, 393 insertions(+), 205 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3b94ab07e..7c97f9b21 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9465,6 +9465,7 @@ dependencies = [ "bs58", "bytes", "cbor4ii", + "cfg-if", "clap", "criterion", "crypto-bigint", diff --git a/z2/src/docgen.rs b/z2/src/docgen.rs index b153e479d..16e702c92 100644 --- a/z2/src/docgen.rs +++ b/z2/src/docgen.rs @@ -324,10 +324,11 @@ pub fn get_implemented_jsonrpc_methods() -> Result Result> { + pub fn new_view(&mut self, new_view: NewView) -> Result> { trace!("Received new view for height: {:?}", new_view.view); // The leader for this view should be chosen according to the parent of the highest QC diff --git a/zilliqa/src/message.rs b/zilliqa/src/message.rs index 8388e8d5d..6260b4217 100644 --- a/zilliqa/src/message.rs +++ b/zilliqa/src/message.rs @@ -218,7 +218,9 @@ pub enum ExternalMessage { BlockRequest(BlockRequest), BlockResponse(BlockResponse), NewTransaction(SignedTransaction), - RequestResponse, + /// An acknowledgement of the receipt of a message. Note this is only used as a response when the caller doesn't + /// require any data in the response. + Acknowledgement, } impl ExternalMessage { @@ -254,7 +256,7 @@ impl Display for ExternalMessage { } } ExternalMessage::NewTransaction(_) => write!(f, "NewTransaction"), - ExternalMessage::RequestResponse => write!(f, "RequestResponse"), + ExternalMessage::Acknowledgement => write!(f, "RequestResponse"), } } } diff --git a/zilliqa/src/node.rs b/zilliqa/src/node.rs index 37d4106ae..9b09b3bee 100644 --- a/zilliqa/src/node.rs +++ b/zilliqa/src/node.rs @@ -34,9 +34,10 @@ use crate::{ exec::{PendingState, TransactionApplyResult}, inspector::{self, ScillaInspector}, message::{ - Block, BlockHeader, BlockRequest, BlockResponse, ExternalMessage, InternalMessage, - IntershardCall, Proposal, + Block, BlockHeader, BlockResponse, ExternalMessage, InternalMessage, IntershardCall, + Proposal, }, + node_launcher::ResponseChannel, p2p_node::{LocalMessageTuple, OutboundMessageTuple}, pool::TxPoolContent, state::State, @@ -134,6 +135,9 @@ pub struct Node { pub db: Arc, peer_id: PeerId, message_sender: MessageSender, + /// Send responses to requests down this channel. The `ResponseChannel` passed must correspond to a + /// `ResponseChannel` received via `handle_request`. + request_responses: UnboundedSender<(ResponseChannel, ExternalMessage)>, reset_timeout: UnboundedSender, pub consensus: Consensus, peer_num: Arc, @@ -162,6 +166,7 @@ impl Node { secret_key: SecretKey, message_sender_channel: UnboundedSender, local_sender_channel: UnboundedSender, + request_responses: UnboundedSender<(ResponseChannel, ExternalMessage)>, reset_timeout: UnboundedSender, peer_num: Arc, ) -> Result { @@ -178,6 +183,7 @@ impl Node { config: config.clone(), peer_id, message_sender: message_sender.clone(), + request_responses, reset_timeout: reset_timeout.clone(), db: db.clone(), chain_id: ChainId::new(config.eth_chain_id), @@ -187,35 +193,37 @@ impl Node { Ok(node) } - // TODO: Multithreading - `&mut self` -> `&self` - pub fn handle_network_message( - &mut self, - from: PeerId, - message: Result, - ) -> Result<()> { - let to = self.peer_id; - let to_self = from == to; - let message = match message { - Ok(message) => message, - Err(failure) => { - debug!(?failure, "handling message failure"); - self.consensus.report_outgoing_message_failure(failure)?; - - return Ok(()); - } - }; - debug!(%from, %to, %message, "handling message"); + pub fn handle_broadcast(&mut self, from: PeerId, message: ExternalMessage) -> Result<()> { + debug!(%from, to = %self.peer_id, %message, "handling broadcast"); + // We only expect `Proposal`s and `NewTransaction`s to be broadcast. match message { ExternalMessage::Proposal(m) => { - if let Some((to, message)) = self.consensus.proposal(from, m, false)? { - self.reset_timeout.send(DEFAULT_SLEEP_TIME_MS)?; - if let Some(to) = to { - self.message_sender.send_external_message(to, message)?; - } else { + self.handle_proposal(from, m)?; + } + ExternalMessage::NewTransaction(t) => { + let inserted = self.consensus.new_transaction(t.verify()?)?; + if inserted { + if let Some((_, message)) = self.consensus.try_to_propose_new_block()? { self.message_sender.broadcast_external_message(message)?; } } } + _ => { + warn!("unexpected message type"); + } + } + + Ok(()) + } + + pub fn handle_request( + &mut self, + from: PeerId, + message: ExternalMessage, + response_channel: ResponseChannel, + ) -> Result<()> { + debug!(%from, to = %self.peer_id, %message, "handling request"); + match message { ExternalMessage::Vote(m) => { if let Some((block, transactions)) = self.consensus.vote(*m)? { self.message_sender @@ -223,33 +231,91 @@ impl Node { Proposal::from_parts(block, transactions), ))?; } + // Acknowledge this vote. + self.request_responses + .send((response_channel, ExternalMessage::Acknowledgement))?; } ExternalMessage::NewView(m) => { - if let Some(block) = self.consensus.new_view(from, *m)? { + if let Some(block) = self.consensus.new_view(*m)? { self.message_sender .broadcast_external_message(ExternalMessage::Proposal( Proposal::from_parts(block, vec![]), ))?; } + // Acknowledge this new view. + self.request_responses + .send((response_channel, ExternalMessage::Acknowledgement))?; } - ExternalMessage::BlockRequest(m) => { - if !to_self { - self.handle_block_request(from, m)?; - } else { + ExternalMessage::BlockRequest(request) => { + if from == self.peer_id { debug!("ignoring blocks request to self"); + return Ok(()); } + + let proposals = (request.from_view..=request.to_view) + .take(self.config.block_request_limit) + .filter_map(|view| { + self.consensus + .get_block_by_view(view) + .transpose() + .map(|block| Ok(self.block_to_proposal(block?))) + }) + .collect::>()?; + + trace!("responding to new blocks request of {request:?}"); + + // Send the response to this block request. + self.request_responses.send(( + response_channel, + ExternalMessage::BlockResponse(BlockResponse { proposals }), + ))?; } + // We don't usually expect a [BlockResponse] to be received as a request, however this can occur when our + // [BlockStore] has re-sent a previously unusable block because we didn't (yet) have the block's parent. + // Having knowledge of this here breaks our abstraction boundaries slightly, but it also keeps things + // simple. ExternalMessage::BlockResponse(m) => { self.handle_block_response(from, m)?; + // Acknowledge this block response. This does nothing because the `BlockResponse` request was sent by + // us, but we keep it here for symmetry with the other handlers. + self.request_responses + .send((response_channel, ExternalMessage::Acknowledgement))?; } - ExternalMessage::RequestResponse => {} - ExternalMessage::NewTransaction(t) => { - let inserted = self.consensus.new_transaction(t.verify()?)?; - if inserted { - if let Some((_, message)) = self.consensus.try_to_propose_new_block()? { - self.message_sender.broadcast_external_message(message)?; - } - } + // Handle requests which just contain a block proposal. This shouldn't actually happen in production, but + // annoyingly we have some test cases which trigger this (by rewriting broadcasts to direct messages) and + // the easiest fix is just to handle requests with proposals gracefully. + ExternalMessage::Proposal(m) => { + self.handle_proposal(from, m)?; + + // Acknowledge the proposal. + self.request_responses + .send((response_channel, ExternalMessage::Acknowledgement))?; + } + _ => { + warn!("unexpected message type"); + } + } + + Ok(()) + } + + pub fn handle_request_failure( + &mut self, + from: PeerId, + failure: OutgoingMessageFailure, + ) -> Result<()> { + debug!(%from, to = %self.peer_id, ?failure, "handling message failure"); + self.consensus.report_outgoing_message_failure(failure)?; + Ok(()) + } + + pub fn handle_response(&mut self, from: PeerId, message: ExternalMessage) -> Result<()> { + debug!(%from, to = %self.peer_id, %message, "handling response"); + match message { + ExternalMessage::BlockResponse(m) => self.handle_block_response(from, m)?, + ExternalMessage::Acknowledgement => {} + _ => { + warn!("unexpected message type"); } } @@ -801,23 +867,15 @@ impl Node { Proposal::from_parts(block, txs) } - fn handle_block_request(&mut self, source: PeerId, request: BlockRequest) -> Result<()> { - let proposals = (request.from_view..=request.to_view) - .take(self.config.block_request_limit) - .filter_map(|view| { - self.consensus - .get_block_by_view(view) - .transpose() - .map(|block| Ok(self.block_to_proposal(block?))) - }) - .collect::>()?; - - trace!("responding to new blocks request of {request:?}"); - - self.message_sender.send_external_message( - source, - ExternalMessage::BlockResponse(BlockResponse { proposals }), - )?; + fn handle_proposal(&mut self, from: PeerId, proposal: Proposal) -> Result<()> { + if let Some((to, message)) = self.consensus.proposal(from, proposal, false)? { + self.reset_timeout.send(DEFAULT_SLEEP_TIME_MS)?; + if let Some(to) = to { + self.message_sender.send_external_message(to, message)?; + } else { + self.message_sender.broadcast_external_message(message)?; + } + } Ok(()) } diff --git a/zilliqa/src/node_launcher.rs b/zilliqa/src/node_launcher.rs index 4a9f1c5fa..d5a733607 100644 --- a/zilliqa/src/node_launcher.rs +++ b/zilliqa/src/node_launcher.rs @@ -32,45 +32,85 @@ pub struct NodeLauncher { pub node: Arc>, pub config: NodeConfig, pub rpc_module: RpcModule>>, - /// The following two message streams are used for networked messages. - /// The sender is provided to the p2p coordinator, to forward messages to the node. - pub inbound_message_sender: - UnboundedSender<(PeerId, Result)>, - /// The corresponding receiver is handled here, forwarding messages to the node struct. - pub inbound_message_receiver: - UnboundedReceiverStream<(PeerId, Result)>, - /// The following two message streams are used for local messages. - /// The sender is provided to the p2p coordinator, to forward cross-shard messages to the node. - pub local_inbound_message_sender: UnboundedSender<(u64, InternalMessage)>, - /// The corresponding receiver is handled here, forwarding messages to the node struct. - pub local_inbound_message_receiver: UnboundedReceiverStream<(u64, InternalMessage)>, + pub broadcasts: UnboundedReceiverStream<(PeerId, ExternalMessage)>, + pub requests: UnboundedReceiverStream<(PeerId, ExternalMessage, ResponseChannel)>, + pub request_failures: UnboundedReceiverStream<(PeerId, OutgoingMessageFailure)>, + pub responses: UnboundedReceiverStream<(PeerId, ExternalMessage)>, + pub local_messages: UnboundedReceiverStream<(u64, InternalMessage)>, /// Channel used to steer next sleep time pub reset_timeout_receiver: UnboundedReceiverStream, node_launched: bool, } +// If the `fake_response_channel` feature is enabled, swap out the libp2p ResponseChannel for a `u64`. In our +// integration tests we are not able to construct a ResponseChannel manually, so we need an alternative way of linking +// a request and a response. +#[cfg(not(feature = "fake_response_channel"))] +type ChannelType = libp2p::request_response::ResponseChannel; +#[cfg(feature = "fake_response_channel")] +type ChannelType = u64; + +/// A wrapper around [libp2p::request_response::ResponseChannel] which also handles the case where the node has sent a +/// request to itself. In this case, we don't require a response. +#[derive(Debug)] +#[cfg_attr(feature = "fake_response_channel", derive(Clone, Hash, PartialEq, Eq))] +pub enum ResponseChannel { + Local, + Remote(ChannelType), +} + +impl ResponseChannel { + pub fn into_inner(self) -> Option { + match self { + ResponseChannel::Local => None, + ResponseChannel::Remote(c) => Some(c), + } + } +} + +/// The collection of channels used to send messages to a [NodeLauncher]. +pub struct NodeInputChannels { + /// Send broadcast messages (received via gossipsub) down this channel. + pub broadcasts: UnboundedSender<(PeerId, ExternalMessage)>, + /// Send direct requests down this channel. The `ResponseChannel` must be used by the receiver to respond to this + /// request. + pub requests: UnboundedSender<(PeerId, ExternalMessage, ResponseChannel)>, + /// Send failed requests down this channel. + pub request_failures: UnboundedSender<(PeerId, OutgoingMessageFailure)>, + /// Send direct responses to direct requests down this channel. + pub responses: UnboundedSender<(PeerId, ExternalMessage)>, + /// Send local messages down this channel. This is used to forward cross-shard messages to the node. + pub local_messages: UnboundedSender<(u64, InternalMessage)>, +} + impl NodeLauncher { pub async fn new( secret_key: SecretKey, config: NodeConfig, outbound_message_sender: UnboundedSender, local_outbound_message_sender: UnboundedSender, + request_responses_sender: UnboundedSender<(ResponseChannel, ExternalMessage)>, peer_num: Arc, - ) -> Result { - let (inbound_message_sender, inbound_message_receiver) = mpsc::unbounded_channel(); - let inbound_message_receiver = UnboundedReceiverStream::new(inbound_message_receiver); - let (local_inbound_message_sender, local_inbound_message_receiver) = - mpsc::unbounded_channel(); - let local_inbound_message_receiver = - UnboundedReceiverStream::new(local_inbound_message_receiver); - let (reset_timeout_sender, reset_timeout_receiver) = mpsc::unbounded_channel(); - let reset_timeout_receiver = UnboundedReceiverStream::new(reset_timeout_receiver); + ) -> Result<(Self, NodeInputChannels)> { + /// Helper to create a (sender, receiver) pair for a channel. + fn sender_receiver() -> (UnboundedSender, UnboundedReceiverStream) { + let (sender, receiver) = mpsc::unbounded_channel(); + (sender, UnboundedReceiverStream::new(receiver)) + } + + let (broadcasts_sender, broadcasts_receiver) = sender_receiver(); + let (requests_sender, requests_receiver) = sender_receiver(); + let (request_failures_sender, request_failures_receiver) = sender_receiver(); + let (responses_sender, responses_receiver) = sender_receiver(); + let (local_messages_sender, local_messages_receiver) = sender_receiver(); + let (reset_timeout_sender, reset_timeout_receiver) = sender_receiver(); let node = Node::new( config.clone(), secret_key, - outbound_message_sender.clone(), - local_outbound_message_sender.clone(), + outbound_message_sender, + local_outbound_message_sender, + request_responses_sender, reset_timeout_sender.clone(), peer_num, )?; @@ -105,27 +145,27 @@ impl NodeLauncher { } } - Ok(Self { + let launcher = NodeLauncher { node, rpc_module, - inbound_message_sender, - inbound_message_receiver, + broadcasts: broadcasts_receiver, + requests: requests_receiver, + request_failures: request_failures_receiver, + responses: responses_receiver, + local_messages: local_messages_receiver, reset_timeout_receiver, - local_inbound_message_sender, - local_inbound_message_receiver, node_launched: false, config, - }) - } + }; + let input_channels = NodeInputChannels { + broadcasts: broadcasts_sender, + requests: requests_sender, + request_failures: request_failures_sender, + responses: responses_sender, + local_messages: local_messages_sender, + }; - pub fn message_input( - &self, - ) -> UnboundedSender<(PeerId, Result)> { - self.inbound_message_sender.clone() - } - - pub fn local_message_input(&self) -> UnboundedSender<(u64, InternalMessage)> { - self.local_inbound_message_sender.clone() + Ok((launcher, input_channels)) } pub async fn start_shard_node(&mut self) -> Result<()> { @@ -140,14 +180,26 @@ impl NodeLauncher { loop { select! { - _message = self.local_inbound_message_receiver.next() => { - let (_source, _message) = _message.expect("message stream should be infinite"); - todo!("Local messages will need to be handled once cross-shard messaging is implemented"); + message = self.broadcasts.next() => { + let (source, message) = message.expect("message stream should be infinite"); + self.node.lock().unwrap().handle_broadcast(source, message).unwrap(); + } + message = self.requests.next() => { + let (source, message, response_channel) = message.expect("message stream should be infinite"); + self.node.lock().unwrap().handle_request(source, message, response_channel).unwrap(); } - message = self.inbound_message_receiver.next() => { + message = self.request_failures.next() => { let (source, message) = message.expect("message stream should be infinite"); - self.node.lock().unwrap().handle_network_message(source, message).unwrap(); - }, + self.node.lock().unwrap().handle_request_failure(source, message).unwrap(); + } + message = self.responses.next() => { + let (source, message) = message.expect("message stream should be infinite"); + self.node.lock().unwrap().handle_response(source, message).unwrap(); + } + message = self.local_messages.next() => { + let (_source, _message) = message.expect("message stream should be infinite"); + todo!("Local messages will need to be handled once cross-shard messaging is implemented"); + } () = &mut sleep => { // No messages for a while, so check if consensus wants to timeout self.node.lock().unwrap().handle_timeout().unwrap(); diff --git a/zilliqa/src/p2p_node.rs b/zilliqa/src/p2p_node.rs index 6205b2763..f6e0e426b 100644 --- a/zilliqa/src/p2p_node.rs +++ b/zilliqa/src/p2p_node.rs @@ -8,6 +8,7 @@ use std::{ }; use anyhow::{anyhow, Result}; +use cfg_if::cfg_if; use libp2p::{ autonat, core::upgrade, @@ -26,7 +27,7 @@ use libp2p::{ use tokio::{ select, signal::{self, unix::SignalKind}, - sync::{mpsc, mpsc::UnboundedSender}, + sync::mpsc::{self, error::SendError, UnboundedSender}, task::JoinSet, }; use tokio_stream::wrappers::UnboundedReceiverStream; @@ -38,7 +39,7 @@ use crate::{ db, message::{ExternalMessage, InternalMessage}, node::{OutgoingMessageFailure, RequestId}, - node_launcher::NodeLauncher, + node_launcher::{NodeInputChannels, NodeLauncher, ResponseChannel}, }; /// Messages are a tuple of the destination shard ID and the actual message. @@ -46,7 +47,7 @@ type DirectMessage = (u64, ExternalMessage); #[derive(NetworkBehaviour)] struct Behaviour { - request_response: request_response::cbor::Behaviour, + request_response: request_response::cbor::Behaviour, gossipsub: gossipsub::Behaviour, mdns: mdns::tokio::Behaviour, autonat: libp2p::autonat::Behaviour, @@ -62,11 +63,6 @@ pub type OutboundMessageTuple = (Option<(PeerId, RequestId)>, u64, ExternalMessa /// (source_shard, destination_shard, message) pub type LocalMessageTuple = (u64, u64, InternalMessage); -struct NodeInputChannels { - external: UnboundedSender<(PeerId, Result)>, - internal: UnboundedSender<(u64, InternalMessage)>, -} - pub struct P2pNode { shard_nodes: HashMap, shard_threads: JoinSet>, @@ -78,11 +74,13 @@ pub struct P2pNode { /// Shard nodes get a copy of these senders to propagate messages. outbound_message_sender: UnboundedSender, local_message_sender: UnboundedSender, + request_responses_sender: UnboundedSender<(ResponseChannel, ExternalMessage)>, /// The p2p node keeps a handle to these receivers, to obtain messages from shards and propagate /// them as necessary. outbound_message_receiver: UnboundedReceiverStream, local_message_receiver: UnboundedReceiverStream, - // Map of pending direct requests. Maps the libp2p request ID to our request ID. + request_responses_receiver: UnboundedReceiverStream<(ResponseChannel, ExternalMessage)>, + /// Map of pending direct requests. Maps the libp2p request ID to our request ID. pending_requests: HashMap, // Count of current peers for API peer_num: Arc, @@ -96,6 +94,9 @@ impl P2pNode { let (local_message_sender, local_message_receiver) = mpsc::unbounded_channel(); let local_message_receiver = UnboundedReceiverStream::new(local_message_receiver); + let (request_responses_sender, request_responses_receiver) = mpsc::unbounded_channel(); + let request_responses_receiver = UnboundedReceiverStream::new(request_responses_receiver); + let key_pair = secret_key.to_libp2p_keypair(); let peer_id = PeerId::from(key_pair.public()); info!(%peer_id); @@ -156,8 +157,10 @@ impl P2pNode { task_threads: JoinSet::new(), outbound_message_sender, local_message_sender, + request_responses_sender, outbound_message_receiver, local_message_receiver, + request_responses_receiver, pending_requests: HashMap::new(), peer_num: Arc::new(AtomicUsize::new(0)), }) @@ -191,67 +194,32 @@ impl P2pNode { info!("LaunchShard message received for a shard we're already running. Ignoring..."); return Ok(()); } - let mut node = NodeLauncher::new( + let (mut node, input_channels) = NodeLauncher::new( self.secret_key, config, self.outbound_message_sender.clone(), self.local_message_sender.clone(), + self.request_responses_sender.clone(), self.peer_num.clone(), ) .await?; - self.shard_nodes.insert( - topic.hash(), - NodeInputChannels { - external: node.message_input(), - internal: node.local_message_input(), - }, - ); + self.shard_nodes.insert(topic.hash(), input_channels); self.shard_threads .spawn(async move { node.start_shard_node().await }); self.swarm.behaviour_mut().gossipsub.subscribe(&topic)?; Ok(()) } - fn forward_external_message_to_node( - &self, - topic_hash: &TopicHash, - source: PeerId, - message: Result, - ) -> Result<()> { - match self.shard_nodes.get(topic_hash) { - Some(inbound_message_sender) => { - inbound_message_sender.external.send((source, message))? - } - None => warn!( - ?topic_hash, - ?source, - ?message, - "Message received for unknown shard/topic" - ), - }; - Ok(()) - } - - fn forward_local_message_to_shard( + fn send_to( &self, topic_hash: &TopicHash, - source_shard: u64, - message: InternalMessage, + sender: impl FnOnce(&NodeInputChannels) -> Result<(), SendError>, ) -> Result<()> { - match self.shard_nodes.get(topic_hash) { - Some(inbound_message_sender) => inbound_message_sender - .internal - .send((source_shard, message))?, - None => { - warn!( - ?topic_hash, - ?source_shard, - ?message, - "Message received for unknown shard/topic" - ) - } + let Some(channels) = self.shard_nodes.get(topic_hash) else { + warn!(?topic_hash, "message received for unknown shard or topic"); + return Ok(()); }; - Ok(()) + Ok(sender(channels)?) } pub async fn start(&mut self) -> Result<()> { @@ -348,21 +316,30 @@ impl P2pNode { let message = cbor4ii::serde::from_slice::(&data).unwrap(); let to = self.peer_id; debug!(%source, %to, %message, "broadcast recieved"); - self.forward_external_message_to_node(&topic_hash, source, Ok(message))?; + self.send_to(&topic_hash, |c| c.broadcasts.send((source, message)))?; } - SwarmEvent::Behaviour(BehaviourEvent::RequestResponse(request_response::Event::Message { message, peer: source })) => { + SwarmEvent::Behaviour(BehaviourEvent::RequestResponse(request_response::Event::Message { message, peer: _source })) => { match message { - request_response::Message::Request { request, channel, .. } => { + request_response::Message::Request { request, channel: _channel, .. } => { let to = self.peer_id; - let (shard_id, external_message) = request; - debug!(%source, %to, %external_message, "message received"); - let topic = Self::shard_id_to_topic(shard_id); - self.forward_external_message_to_node(&topic.hash(), source, Ok(external_message))?; - let _ = self.swarm.behaviour_mut().request_response.send_response(channel, (shard_id, ExternalMessage::RequestResponse)); + let (shard_id, _external_message) = request; + debug!(source = %_source, %to, external_message = %_external_message, "message received"); + let _topic = Self::shard_id_to_topic(shard_id); + cfg_if! { + if #[cfg(not(feature = "fake_response_channel"))] { + self.send_to(&_topic.hash(), |c| c.requests.send((_source, _external_message, ResponseChannel::Remote(_channel))))?; + } else { + panic!("fake_response_channel is enabled and you are trying to use a real libp2p network"); + } + } } - request_response::Message::Response { request_id, .. } => { - self.pending_requests.remove(&request_id); + request_response::Message::Response { request_id, response } => { + if let Some((shard_id, _)) = self.pending_requests.remove(&request_id) { + self.send_to(&Self::shard_id_to_topic(shard_id).hash(), |c| c.responses.send((_source, response)))?; + } else { + return Err(anyhow!("response to request with no id")); + } } } } @@ -377,7 +354,7 @@ impl P2pNode { if let Some((shard_id, request_id)) = self.pending_requests.remove(&request_id) { let error = OutgoingMessageFailure { peer, request_id, error }; - self.forward_external_message_to_node(&Self::shard_id_to_topic(shard_id).hash(), peer, Err(error))?; + self.send_to(&Self::shard_id_to_topic(shard_id).hash(), |c| c.request_failures.send((peer, error)))?; } else { return Err(anyhow!("request without id failed")); } @@ -401,13 +378,25 @@ impl P2pNode { self.add_shard_node(shard_config.clone()).await?; }, InternalMessage::LaunchLink(_) | InternalMessage::IntershardCall(_) => { - self.forward_local_message_to_shard(&Self::shard_id_to_topic(destination).hash(), source, message)?; + self.send_to(&Self::shard_id_to_topic(destination).hash(), |c| c.local_messages.send((source, message)))?; } InternalMessage::ExportBlockCheckpoint(block, parent, trie_storage, path) => { self.task_threads.spawn(async move { db::checkpoint_block_with_state(&block, &parent, trie_storage, source, path) }); } } }, + message = self.request_responses_receiver.next() => { + let (ch, _rs) = message.expect("message stream should be infinite"); + if let Some(_ch) = ch.into_inner() { + cfg_if! { + if #[cfg(not(feature = "fake_response_channel"))] { + let _ = self.swarm.behaviour_mut().request_response.send_response(_ch, _rs); + } else { + panic!("fake_response_channel is enabled and you are trying to use a real libp2p network"); + } + } + } + } message = self.outbound_message_receiver.next() => { let (dest, shard_id, message) = message.expect("message stream should be infinite"); let data = cbor4ii::serde::to_vec(Vec::new(), &message).unwrap(); @@ -417,9 +406,9 @@ impl P2pNode { match dest { Some((dest, request_id)) => { - debug!(%from, %dest, %message, "sending direct message"); + debug!(%from, %dest, %message, ?request_id, "sending direct message"); if from == dest { - self.forward_external_message_to_node(&topic.hash(), from, Ok(message))?; + self.send_to(&topic.hash(), |c| c.requests.send((from, message, ResponseChannel::Local)))?; } else { let libp2p_request_id = self.swarm.behaviour_mut().request_response.send_request(&dest, (shard_id, message)); self.pending_requests.insert(libp2p_request_id, (shard_id, request_id)); @@ -434,7 +423,7 @@ impl P2pNode { } } // Also broadcast the message to ourselves. - self.forward_external_message_to_node(&topic.hash(), from, Ok(message))?; + self.send_to(&topic.hash(), |c| c.broadcasts.send((from, message)))?; }, } }, diff --git a/zilliqa/tests/it/consensus.rs b/zilliqa/tests/it/consensus.rs index 75bb455ea..a0ab482a7 100644 --- a/zilliqa/tests/it/consensus.rs +++ b/zilliqa/tests/it/consensus.rs @@ -185,7 +185,7 @@ async fn create_shard( }); } - network.run_until_block(wallet, 10.into(), 100).await; + network.run_until_block(wallet, 10.into(), 200).await; assert_eq!( network.nodes.len(), initial_main_shard_nodes + child_shard_nodes diff --git a/zilliqa/tests/it/main.rs b/zilliqa/tests/it/main.rs index 448992286..a4afa993d 100644 --- a/zilliqa/tests/it/main.rs +++ b/zilliqa/tests/it/main.rs @@ -74,6 +74,7 @@ use zilliqa::{ db, message::{ExternalMessage, InternalMessage}, node::{Node, RequestId}, + node_launcher::ResponseChannel, transaction::EvmGas, }; @@ -104,6 +105,10 @@ impl NewNodeOptions { enum AnyMessage { External(ExternalMessage), Internal(u64, u64, InternalMessage), + Response { + channel: ResponseChannel, + message: ExternalMessage, + }, } type Wallet = SignerMiddleware, LocalWallet>; @@ -123,6 +128,7 @@ fn node( TestNode, BoxStream<'static, StreamMessage>, BoxStream<'static, StreamMessage>, + BoxStream<'static, StreamMessage>, )> { let (message_sender, message_receiver) = mpsc::unbounded_channel(); let message_receiver = UnboundedReceiverStream::new(message_receiver); @@ -145,6 +151,15 @@ fn node( }) .boxed(); + let (request_responses_sender, request_responses_receiver) = mpsc::unbounded_channel(); + let request_responses_receiver = + UnboundedReceiverStream::new(request_responses_receiver).boxed(); + let request_responses_receiver = request_responses_receiver + // A bit of a hack here - We keep the destination of responses as `None` for now (as if they were a broadcast) + // and look up the destination via the channel later. + .map(move |(channel, message)| (peer_id, None, AnyMessage::Response { channel, message })) + .boxed(); + let (reset_timeout_sender, reset_timeout_receiver) = mpsc::unbounded_channel(); std::mem::forget(reset_timeout_receiver); @@ -158,6 +173,7 @@ fn node( secret_key, message_sender, local_message_sender, + request_responses_sender, reset_timeout_sender, Arc::new(AtomicUsize::new(0)), )?; @@ -176,6 +192,7 @@ fn node( }, message_receiver, local_message_receiver, + request_responses_receiver, )) } @@ -204,6 +221,12 @@ struct Network { /// A stream of messages from each node. The stream items are a tuple of (source, destination, message). /// If the destination is `None`, the message is a broadcast. receivers: Vec>, + /// When we send a request to a node, we also send it a [ResponseChannel]. The node sends a response to that + /// request by passing the [ResponseChannel] back to us. This map lets us remember who to send that response to, + /// based on who the initial request was from. + pending_responses: HashMap, + /// Counter for the next unassigned response channel ID. Starts at 0 and increments with each request. + response_channel_id: u64, resend_message: UnboundedSender, send_to_parent: Option>, rng: Arc>, @@ -317,7 +340,12 @@ impl Network { failed_request_sleep_duration: failed_request_sleep_duration_default(), }; - let (nodes, external_receivers, local_receivers): (Vec<_>, Vec<_>, Vec<_>) = keys + let (nodes, external_receivers, local_receivers, request_response_receivers): ( + Vec<_>, + Vec<_>, + Vec<_>, + Vec<_>, + ) = keys .into_iter() .enumerate() .map(|(i, key)| { @@ -335,6 +363,7 @@ impl Network { let mut receivers: Vec<_> = external_receivers .into_iter() .chain(local_receivers) + .chain(request_response_receivers) .collect(); let (resend_message, receive_resend_message) = mpsc::unbounded_channel::(); @@ -357,6 +386,8 @@ impl Network { send_to_parent, shard_id, receivers, + pending_responses: HashMap::new(), + response_channel_id: 0, resend_message, rng, seed, @@ -423,7 +454,7 @@ impl Network { let secret_key = options.secret_key_or_random(self.rng.clone()); let onchain_key = options.onchain_key_or_random(self.rng.clone()); - let (node, receiver, local_receiver) = + let (node, receiver, local_receiver, request_responses) = node(config, secret_key, onchain_key, self.nodes.len(), None).unwrap(); trace!("Node {}: {}", node.index, node.peer_id); @@ -433,6 +464,7 @@ impl Network { self.nodes.push(node); self.receivers.push(receiver); self.receivers.push(local_receiver); + self.receivers.push(request_responses); index } @@ -470,7 +502,12 @@ impl Network { nodes.inner.lock().unwrap().db.flush_state(); } - let (nodes, external_receivers, local_receivers): (Vec<_>, Vec<_>, Vec<_>) = keys + let (nodes, external_receivers, local_receivers, request_response_receivers): ( + Vec<_>, + Vec<_>, + Vec<_>, + Vec<_>, + ) = keys .into_iter() .enumerate() .map(|(i, key)| { @@ -531,6 +568,7 @@ impl Network { let mut receivers: Vec<_> = external_receivers .into_iter() .chain(local_receivers) + .chain(request_response_receivers) .collect(); for node in &nodes { @@ -789,6 +827,13 @@ impl Network { fn handle_message(&mut self, message: StreamMessage) { let (source, destination, ref contents) = message; + info!(%source, ?destination); + let sender_node = self + .nodes + .iter() + .find(|&node| node.peer_id == source) + .expect("Sender should be on the nodes list"); + let sender_chain_id = sender_node.inner.lock().unwrap().config.eth_chain_id; match contents { AnyMessage::Internal(source_shard, destination_shard, ref internal_message) => { trace!("Handling internal message from node in shard {source_shard}, targetting {destination_shard}"); @@ -873,41 +918,77 @@ impl Network { } } AnyMessage::External(external_message) => { - let nodes: Vec<(usize, &TestNode)> = if let Some((destination, _)) = destination { - let (index, node) = self - .nodes - .iter() - .enumerate() - .find(|(_, n)| n.peer_id == destination) - .unwrap(); - if self.disconnected.contains(&index) { - vec![] - } else { - vec![(index, node)] + info!(%external_message, "external"); + match destination { + Some((destination, _)) => { + // Direct message + let (index, node) = self + .nodes + .iter() + .enumerate() + .find(|(_, n)| n.peer_id == destination) + .unwrap(); + if !self.disconnected.contains(&index) { + let span = + tracing::span!(tracing::Level::INFO, "handle_message", index); + span.in_scope(|| { + let mut inner = node.inner.lock().unwrap(); + // Send to nodes only in the same shard (having same chain_id) + if inner.config.eth_chain_id == sender_chain_id { + let response_channel = + ResponseChannel::Remote(self.response_channel_id); + self.response_channel_id += 1; + self.pending_responses + .insert(response_channel.clone(), source); + + inner + .handle_request( + source, + external_message.clone(), + response_channel, + ) + .unwrap(); + } + }); + } } - } else { - self.nodes - .iter() - .enumerate() - .filter(|(index, _)| !self.disconnected.contains(index)) - .collect() - }; - let sender_node = self + None => { + // Broadcast + for (index, node) in self.nodes.iter().enumerate() { + if self.disconnected.contains(&index) { + continue; + } + let span = + tracing::span!(tracing::Level::INFO, "handle_message", index); + span.in_scope(|| { + let mut inner = node.inner.lock().unwrap(); + // Send to nodes only in the same shard (having same chain_id) + if inner.config.eth_chain_id == sender_chain_id { + inner + .handle_broadcast(source, external_message.clone()) + .unwrap(); + } + }); + } + } + } + } + AnyMessage::Response { channel, message } => { + info!(%message, ?channel, "response"); + let destination = self.pending_responses.remove(channel).unwrap(); + let (index, node) = self .nodes .iter() - .find(|&node| node.peer_id == source) - .expect("Sender should be on the nodes list"); - let sender_chain_id = sender_node.inner.lock().unwrap().config.eth_chain_id; - - for (index, node) in nodes.iter() { + .enumerate() + .find(|(_, n)| n.peer_id == destination) + .unwrap(); + if !self.disconnected.contains(&index) { let span = tracing::span!(tracing::Level::INFO, "handle_message", index); span.in_scope(|| { let mut inner = node.inner.lock().unwrap(); - // Send broadcast to nodes only in the same shard (having same chain_id) + // Send to nodes only in the same shard (having same chain_id) if inner.config.eth_chain_id == sender_chain_id { - inner - .handle_network_message(source, Ok(external_message.clone())) - .unwrap(); + inner.handle_response(source, message.clone()).unwrap(); } }); } @@ -1087,6 +1168,7 @@ fn format_message( let message = match message { AnyMessage::External(message) => format!("{message}"), AnyMessage::Internal(_source_shard, _destination_shard, message) => format!("{message}"), + AnyMessage::Response { message, .. } => format!("{message}"), }; let source_index = nodes.iter().find(|n| n.peer_id == source).unwrap().index; diff --git a/zilliqa/tests/it/persistence.rs b/zilliqa/tests/it/persistence.rs index 2f8d61529..a4ea3a3c2 100644 --- a/zilliqa/tests/it/persistence.rs +++ b/zilliqa/tests/it/persistence.rs @@ -129,7 +129,7 @@ async fn block_and_tx_data_persistence(mut network: Network) { // As this is very painful to debug, should only ever be relevant for tests like these, and CI // should run enough samples to still have decent test coverage, we simply skip the rest of the // test if this happens. - let Ok((newnode, _, _)) = result else { + let Ok((newnode, _, _, _)) = result else { warn!( "Failed to release database lock. Skipping test, with seed {}.", network.seed