From 088bbcf24c26afdf2cbef0bbde25f132896909e4 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Wed, 22 Jan 2025 18:51:59 +0200 Subject: [PATCH 1/4] BACKPORT-CONFLICT --- prdoc/pr_7222.prdoc | 19 + .../client/network/src/request_responses.rs | 998 +++++++++++------- 2 files changed, 636 insertions(+), 381 deletions(-) create mode 100644 prdoc/pr_7222.prdoc diff --git a/prdoc/pr_7222.prdoc b/prdoc/pr_7222.prdoc new file mode 100644 index 000000000000..40b89b0a1820 --- /dev/null +++ b/prdoc/pr_7222.prdoc @@ -0,0 +1,19 @@ +title: Enforce libp2p outbound request-response timeout limits + +doc: + - audience: Node Dev + description: | + This PR enforces that outbound requests are finished within the specified protocol timeout. + The stable2412 version running libp2p 0.52.4 contains a bug which does not track request timeouts properly + https://github.com/libp2p/rust-libp2p/pull/5429. + + The issue has been detected while submitting libp2p to litep2p requests in Kusama. + This aims to check that pending outbound requests have not timed out. + Although the issue has been fixed in libp2p, there might be other cases where this may happen. + For example, https://github.com/libp2p/rust-libp2p/pull/5417. + + For more context see https://github.com/paritytech/polkadot-sdk/issues/7076#issuecomment-2596085096. + +crates: +- name: sc-network + bump: patch diff --git a/substrate/client/network/src/request_responses.rs b/substrate/client/network/src/request_responses.rs index 6c2631924df4..a686fbeec62a 100644 --- a/substrate/client/network/src/request_responses.rs +++ b/substrate/client/network/src/request_responses.rs @@ -66,6 +66,9 @@ use std::{ pub use libp2p::request_response::{Config, RequestId}; +/// Periodically check if requests are taking too long. +const PERIODIC_REQUEST_CHECK: Duration = Duration::from_secs(2); + /// Possible failures occurring in the context of sending an outbound request and receiving the /// response. #[derive(Debug, thiserror::Error)] @@ -245,8 +248,14 @@ pub struct OutgoingResponse { /// Information stored about a pending request. struct PendingRequest { + /// The time when the request was sent to the libp2p request-response protocol. started_at: Instant, - response_tx: oneshot::Sender, ProtocolName), RequestFailure>>, + /// The channel to send the response back to the caller. + /// + /// This is wrapped in an `Option` to allow for the channel to be taken out + /// on force-detected timeouts. + response_tx: Option, ProtocolName), RequestFailure>>>, + /// Fallback request to send if the primary request fails. fallback_request: Option<(Vec, ProtocolName)>, } @@ -330,16 +339,20 @@ impl From<(ProtocolName, RequestId)> for ProtocolRequestId { } } +/// Details of a request-response protocol. +struct ProtocolDetails { + behaviour: Behaviour, + inbound_queue: Option>, + request_timeout: Duration, +} + /// Implementation of `NetworkBehaviour` that provides support for request-response protocols. pub struct RequestResponsesBehaviour { /// The multiple sub-protocols, by name. /// /// Contains the underlying libp2p request-response [`Behaviour`], plus an optional /// "response builder" used to build responses for incoming requests. - protocols: HashMap< - ProtocolName, - (Behaviour, Option>), - >, + protocols: HashMap, /// Pending requests, passed down to a request-response [`Behaviour`], awaiting a reply. pending_requests: HashMap, @@ -359,6 +372,14 @@ pub struct RequestResponsesBehaviour { /// Primarily used to get a reputation of a node. peer_store: Arc, + + /// Interval to check that the requests are not taking too long. + /// + /// We had issues in the past where libp2p did not produce a timeout event in due time. + /// + /// For more details, see: + /// - + periodic_request_check: tokio::time::Interval, } /// Generated by the response builder and waiting to be processed. @@ -388,7 +409,7 @@ impl RequestResponsesBehaviour { ProtocolSupport::Outbound }; - let rq_rp = Behaviour::with_codec( + let behaviour = Behaviour::with_codec( GenericCodec { max_request_size: protocol.max_request_size, max_response_size: protocol.max_response_size, @@ -400,7 +421,11 @@ impl RequestResponsesBehaviour { ); match protocols.entry(protocol.name) { - Entry::Vacant(e) => e.insert((rq_rp, protocol.inbound_queue)), + Entry::Vacant(e) => e.insert(ProtocolDetails { + behaviour, + inbound_queue: protocol.inbound_queue, + request_timeout: protocol.request_timeout, + }), Entry::Occupied(e) => return Err(RegisterError::DuplicateProtocol(e.key().clone())), }; } @@ -412,6 +437,7 @@ impl RequestResponsesBehaviour { pending_responses_arrival_time: Default::default(), send_feedback: Default::default(), peer_store, + periodic_request_check: tokio::time::interval(PERIODIC_REQUEST_CHECK), }) } @@ -432,9 +458,11 @@ impl RequestResponsesBehaviour { ) { log::trace!(target: "sub-libp2p", "send request to {target} ({protocol_name:?}), {} bytes", request.len()); - if let Some((protocol, _)) = self.protocols.get_mut(protocol_name.deref()) { + if let Some(ProtocolDetails { behaviour, .. }) = + self.protocols.get_mut(protocol_name.deref()) + { Self::send_request_inner( - protocol, + behaviour, &mut self.pending_requests, target, protocol_name, @@ -469,7 +497,7 @@ impl RequestResponsesBehaviour { (protocol_name.to_string().into(), request_id).into(), PendingRequest { started_at: Instant::now(), - response_tx: pending_response, + response_tx: Some(pending_response), fallback_request, }, ); @@ -516,18 +544,19 @@ impl NetworkBehaviour for RequestResponsesBehaviour { local_addr: &Multiaddr, remote_addr: &Multiaddr, ) -> Result, ConnectionDenied> { - let iter = self.protocols.iter_mut().filter_map(|(p, (r, _))| { - if let Ok(handler) = r.handle_established_inbound_connection( - connection_id, - peer, - local_addr, - remote_addr, - ) { - Some((p.to_string(), handler)) - } else { - None - } - }); + let iter = + self.protocols.iter_mut().filter_map(|(p, ProtocolDetails { behaviour, .. })| { + if let Ok(handler) = behaviour.handle_established_inbound_connection( + connection_id, + peer, + local_addr, + remote_addr, + ) { + Some((p.to_string(), handler)) + } else { + None + } + }); Ok(MultiHandler::try_from_iter(iter).expect( "Protocols are in a HashMap and there can be at most one handler per protocol name, \ @@ -542,6 +571,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { addr: &Multiaddr, role_override: Endpoint, ) -> Result, ConnectionDenied> { +<<<<<<< HEAD let iter = self.protocols.iter_mut().filter_map(|(p, (r, _))| { if let Ok(handler) = r.handle_established_outbound_connection(connection_id, peer, addr, role_override) @@ -551,6 +581,22 @@ impl NetworkBehaviour for RequestResponsesBehaviour { None } }); +======= + let iter = + self.protocols.iter_mut().filter_map(|(p, ProtocolDetails { behaviour, .. })| { + if let Ok(handler) = behaviour.handle_established_outbound_connection( + connection_id, + peer, + addr, + role_override, + port_use, + ) { + Some((p.to_string(), handler)) + } else { + None + } + }); +>>>>>>> fd64a1e7 (net/libp2p: Enforce outbound request-response timeout limits (#7222)) Ok(MultiHandler::try_from_iter(iter).expect( "Protocols are in a HashMap and there can be at most one handler per protocol name, \ @@ -558,6 +604,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { )) } +<<<<<<< HEAD fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::ConnectionEstablished(e) => @@ -632,6 +679,11 @@ impl NetworkBehaviour for RequestResponsesBehaviour { for (p, _) in self.protocols.values_mut() { NetworkBehaviour::on_swarm_event(p, FromSwarm::NewListenAddr(e)); }, +======= + fn on_swarm_event(&mut self, event: FromSwarm) { + for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() { + behaviour.on_swarm_event(event); +>>>>>>> fd64a1e7 (net/libp2p: Enforce outbound request-response timeout limits (#7222)) } } @@ -642,8 +694,8 @@ impl NetworkBehaviour for RequestResponsesBehaviour { event: THandlerOutEvent, ) { let p_name = event.0; - if let Some((proto, _)) = self.protocols.get_mut(p_name.as_str()) { - return proto.on_connection_handler_event(peer_id, connection_id, event.1) + if let Some(ProtocolDetails { behaviour, .. }) = self.protocols.get_mut(p_name.as_str()) { + return behaviour.on_connection_handler_event(peer_id, connection_id, event.1) } else { log::warn!( target: "sub-libp2p", @@ -659,6 +711,51 @@ impl NetworkBehaviour for RequestResponsesBehaviour { params: &mut impl PollParameters, ) -> Poll>> { 'poll_all: loop { + // Poll the periodic request check. + if self.periodic_request_check.poll_tick(cx).is_ready() { + self.pending_requests.retain(|id, req| { + let Some(ProtocolDetails { request_timeout, .. }) = + self.protocols.get(&id.protocol) + else { + log::warn!( + target: "sub-libp2p", + "Request {id:?} has no protocol registered.", + ); + + if let Some(response_tx) = req.response_tx.take() { + if response_tx.send(Err(RequestFailure::UnknownProtocol)).is_err() { + log::debug!( + target: "sub-libp2p", + "Request {id:?} has no protocol registered. At the same time local node is no longer interested in the result.", + ); + } + } + return false + }; + + let elapsed = req.started_at.elapsed(); + if elapsed > *request_timeout { + log::debug!( + target: "sub-libp2p", + "Request {id:?} force detected as timeout.", + ); + + if let Some(response_tx) = req.response_tx.take() { + if response_tx.send(Err(RequestFailure::Network(OutboundFailure::Timeout))).is_err() { + log::debug!( + target: "sub-libp2p", + "Request {id:?} force detected as timeout. At the same time local node is no longer interested in the result.", + ); + } + } + + false + } else { + true + } + }); + } + // Poll to see if any response is ready to be sent back. while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) { let RequestProcessingOutcome { @@ -675,10 +772,12 @@ impl NetworkBehaviour for RequestResponsesBehaviour { }; if let Ok(payload) = result { - if let Some((protocol, _)) = self.protocols.get_mut(&*protocol_name) { + if let Some(ProtocolDetails { behaviour, .. }) = + self.protocols.get_mut(&*protocol_name) + { log::trace!(target: "sub-libp2p", "send response to {peer} ({protocol_name:?}), {} bytes", payload.len()); - if protocol.send_response(inner_channel, Ok(payload)).is_err() { + if behaviour.send_response(inner_channel, Ok(payload)).is_err() { // Note: Failure is handled further below when receiving // `InboundFailure` event from request-response [`Behaviour`]. log::debug!( @@ -706,8 +805,14 @@ impl NetworkBehaviour for RequestResponsesBehaviour { let mut fallback_requests = vec![]; // Poll request-responses protocols. +<<<<<<< HEAD for (protocol, (ref mut behaviour, ref mut resp_builder)) in &mut self.protocols { 'poll_protocol: while let Poll::Ready(ev) = behaviour.poll(cx, params) { +======= + for (protocol, ProtocolDetails { behaviour, inbound_queue, .. }) in &mut self.protocols + { + 'poll_protocol: while let Poll::Ready(ev) = behaviour.poll(cx) { +>>>>>>> fd64a1e7 (net/libp2p: Enforce outbound request-response timeout limits (#7222)) let ev = match ev { // Main events we are interested in. ToSwarm::GenerateEvent(ev) => ev, @@ -767,7 +872,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { // Submit the request to the "response builder" passed by the user at // initialization. - if let Some(resp_builder) = resp_builder { + if let Some(resp_builder) = inbound_queue { // If the response builder is too busy, silently drop `tx`. This // will be reported by the corresponding request-response // [`Behaviour`] through an `InboundFailure::Omission` event. @@ -815,7 +920,11 @@ impl NetworkBehaviour for RequestResponsesBehaviour { .pending_requests .remove(&(protocol.clone(), request_id).into()) { - Some(PendingRequest { started_at, response_tx, .. }) => { + Some(PendingRequest { + started_at, + response_tx: Some(response_tx), + .. + }) => { log::trace!( target: "sub-libp2p", "received response from {peer} ({protocol:?}), {} bytes", @@ -831,13 +940,13 @@ impl NetworkBehaviour for RequestResponsesBehaviour { .map_err(|_| RequestFailure::Obsolete); (started_at, delivered) }, - None => { - log::warn!( + _ => { + log::debug!( target: "sub-libp2p", - "Received `RequestResponseEvent::Message` with unexpected request id {:?}", + "Received `RequestResponseEvent::Message` with unexpected request id {:?} from {:?}", request_id, + peer, ); - debug_assert!(false); continue }, }; @@ -865,7 +974,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { { Some(PendingRequest { started_at, - response_tx, + response_tx: Some(response_tx), fallback_request, }) => { // Try using the fallback request if the protocol was not @@ -905,13 +1014,14 @@ impl NetworkBehaviour for RequestResponsesBehaviour { } started_at }, - None => { - log::warn!( + _ => { + log::debug!( target: "sub-libp2p", - "Received `RequestResponseEvent::Message` with unexpected request id {:?}", + "Received `RequestResponseEvent::OutboundFailure` with unexpected request id {:?} error {:?} from {:?}", request_id, + error, + peer ); - debug_assert!(false); continue }, }; @@ -976,7 +1086,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { // Send out fallback requests. for (peer, protocol, request, pending_response) in fallback_requests.drain(..) { - if let Some((behaviour, _)) = self.protocols.get_mut(&protocol) { + if let Some(ProtocolDetails { behaviour, .. }) = self.protocols.get_mut(&protocol) { Self::send_request_inner( behaviour, &mut self.pending_requests, @@ -1145,7 +1255,7 @@ mod tests { use crate::mock::MockPeerStore; use assert_matches::assert_matches; - use futures::{channel::oneshot, executor::LocalPool, task::Spawn}; + use futures::channel::oneshot; use libp2p::{ core::{ transport::{MemoryTransport, Transport}, @@ -1158,10 +1268,10 @@ mod tests { }; use std::{iter, time::Duration}; - struct TokioExecutor(tokio::runtime::Runtime); + struct TokioExecutor; impl Executor for TokioExecutor { fn exec(&self, f: Pin + Send>>) { - let _ = self.0.spawn(f); + tokio::spawn(f); } } @@ -1178,13 +1288,18 @@ mod tests { let behaviour = RequestResponsesBehaviour::new(list, Arc::new(MockPeerStore {})).unwrap(); - let runtime = tokio::runtime::Runtime::new().unwrap(); - let mut swarm = Swarm::new( transport, behaviour, keypair.public().to_peer_id(), +<<<<<<< HEAD SwarmConfig::with_executor(TokioExecutor(runtime)), +======= + SwarmConfig::with_executor(TokioExecutor {}) + // This is taken care of by notification protocols in non-test environment + // It is very slow in test environment for some reason, hence larger timeout + .with_idle_connection_timeout(Duration::from_secs(10)), +>>>>>>> fd64a1e7 (net/libp2p: Enforce outbound request-response timeout limits (#7222)) ); let listen_addr: Multiaddr = format!("/memory/{}", rand::random::()).parse().unwrap(); @@ -1194,34 +1309,27 @@ mod tests { (swarm, listen_addr) } - #[test] - fn basic_request_response_works() { + #[tokio::test] + async fn basic_request_response_works() { let protocol_name = ProtocolName::from("/test/req-resp/1"); - let mut pool = LocalPool::new(); // Build swarms whose behaviour is [`RequestResponsesBehaviour`]. let mut swarms = (0..2) .map(|_| { let (tx, mut rx) = async_channel::bounded::(64); - pool.spawner() - .spawn_obj( - async move { - while let Some(rq) = rx.next().await { - let (fb_tx, fb_rx) = oneshot::channel(); - assert_eq!(rq.payload, b"this is a request"); - let _ = rq.pending_response.send(super::OutgoingResponse { - result: Ok(b"this is a response".to_vec()), - reputation_changes: Vec::new(), - sent_feedback: Some(fb_tx), - }); - fb_rx.await.unwrap(); - } - } - .boxed() - .into(), - ) - .unwrap(); + tokio::spawn(async move { + while let Some(rq) = rx.next().await { + let (fb_tx, fb_rx) = oneshot::channel(); + assert_eq!(rq.payload, b"this is a request"); + let _ = rq.pending_response.send(super::OutgoingResponse { + result: Ok(b"this is a response".to_vec()), + reputation_changes: Vec::new(), + sent_feedback: Some(fb_tx), + }); + fb_rx.await.unwrap(); + } + }); let protocol_config = ProtocolConfig { name: protocol_name.clone(), @@ -1245,84 +1353,69 @@ mod tests { let (mut swarm, _) = swarms.remove(0); // Running `swarm[0]` in the background. - pool.spawner() - .spawn_obj({ - async move { - loop { - match swarm.select_next_some().await { - SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => { - result.unwrap(); - }, - _ => {}, - } - } - } - .boxed() - .into() - }) - .unwrap(); - - // Remove and run the remaining swarm. - let (mut swarm, _) = swarms.remove(0); - pool.run_until(async move { - let mut response_receiver = None; - + tokio::spawn(async move { loop { match swarm.select_next_some().await { - SwarmEvent::ConnectionEstablished { peer_id, .. } => { - let (sender, receiver) = oneshot::channel(); - swarm.behaviour_mut().send_request( - &peer_id, - protocol_name.clone(), - b"this is a request".to_vec(), - None, - sender, - IfDisconnected::ImmediateError, - ); - assert!(response_receiver.is_none()); - response_receiver = Some(receiver); - }, - SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => { result.unwrap(); - break }, _ => {}, } } - - assert_eq!( - response_receiver.unwrap().await.unwrap().unwrap(), - (b"this is a response".to_vec(), protocol_name) - ); }); + + // Remove and run the remaining swarm. + let (mut swarm, _) = swarms.remove(0); + let mut response_receiver = None; + + loop { + match swarm.select_next_some().await { + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + let (sender, receiver) = oneshot::channel(); + swarm.behaviour_mut().send_request( + &peer_id, + protocol_name.clone(), + b"this is a request".to_vec(), + None, + sender, + IfDisconnected::ImmediateError, + ); + assert!(response_receiver.is_none()); + response_receiver = Some(receiver); + }, + SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + result.unwrap(); + break + }, + _ => {}, + } + } + + assert_eq!( + response_receiver.unwrap().await.unwrap().unwrap(), + (b"this is a response".to_vec(), protocol_name) + ); } - #[test] - fn max_response_size_exceeded() { + #[tokio::test] + async fn max_response_size_exceeded() { let protocol_name = ProtocolName::from("/test/req-resp/1"); - let mut pool = LocalPool::new(); // Build swarms whose behaviour is [`RequestResponsesBehaviour`]. let mut swarms = (0..2) .map(|_| { let (tx, mut rx) = async_channel::bounded::(64); - pool.spawner() - .spawn_obj( - async move { - while let Some(rq) = rx.next().await { - assert_eq!(rq.payload, b"this is a request"); - let _ = rq.pending_response.send(super::OutgoingResponse { - result: Ok(b"this response exceeds the limit".to_vec()), - reputation_changes: Vec::new(), - sent_feedback: None, - }); - } - } - .boxed() - .into(), - ) - .unwrap(); + tokio::spawn(async move { + while let Some(rq) = rx.next().await { + assert_eq!(rq.payload, b"this is a request"); + let _ = rq.pending_response.send(super::OutgoingResponse { + result: Ok(b"this response exceeds the limit".to_vec()), + reputation_changes: Vec::new(), + sent_feedback: None, + }); + } + }); let protocol_config = ProtocolConfig { name: protocol_name.clone(), @@ -1347,6 +1440,7 @@ mod tests { // Running `swarm[0]` in the background until a `InboundRequest` event happens, // which is a hint about the test having ended. let (mut swarm, _) = swarms.remove(0); +<<<<<<< HEAD pool.spawner() .spawn_obj({ async move { @@ -1370,34 +1464,62 @@ mod tests { pool.run_until(async move { let mut response_receiver = None; +======= + tokio::spawn(async move { +>>>>>>> fd64a1e7 (net/libp2p: Enforce outbound request-response timeout limits (#7222)) loop { match swarm.select_next_some().await { - SwarmEvent::ConnectionEstablished { peer_id, .. } => { - let (sender, receiver) = oneshot::channel(); - swarm.behaviour_mut().send_request( - &peer_id, - protocol_name.clone(), - b"this is a request".to_vec(), - None, - sender, - IfDisconnected::ImmediateError, - ); - assert!(response_receiver.is_none()); - response_receiver = Some(receiver); + SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => { + assert!(result.is_ok()); }, - SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { - assert!(result.is_err()); - break + SwarmEvent::ConnectionClosed { .. } => { + break; }, _ => {}, } } +<<<<<<< HEAD match response_receiver.unwrap().await.unwrap().unwrap_err() { RequestFailure::Network(OutboundFailure::ConnectionClosed) => {}, _ => panic!(), } +======= +>>>>>>> fd64a1e7 (net/libp2p: Enforce outbound request-response timeout limits (#7222)) }); + + // Remove and run the remaining swarm. + let (mut swarm, _) = swarms.remove(0); + + let mut response_receiver = None; + + loop { + match swarm.select_next_some().await { + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + let (sender, receiver) = oneshot::channel(); + swarm.behaviour_mut().send_request( + &peer_id, + protocol_name.clone(), + b"this is a request".to_vec(), + None, + sender, + IfDisconnected::ImmediateError, + ); + assert!(response_receiver.is_none()); + response_receiver = Some(receiver); + }, + SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + assert!(result.is_err()); + break + }, + _ => {}, + } + } + + match response_receiver.unwrap().await.unwrap().unwrap_err() { + RequestFailure::Network(OutboundFailure::Io(_)) => {}, + request_failure => panic!("Unexpected failure: {request_failure:?}"), + } } /// A [`RequestId`] is a unique identifier among either all inbound or all outbound requests for @@ -1410,11 +1532,10 @@ mod tests { /// without a [`RequestId`] collision. /// /// See [`ProtocolRequestId`] for additional information. - #[test] - fn request_id_collision() { + #[tokio::test] + async fn request_id_collision() { let protocol_name_1 = ProtocolName::from("/test/req-resp-1/1"); let protocol_name_2 = ProtocolName::from("/test/req-resp-2/1"); - let mut pool = LocalPool::new(); let mut swarm_1 = { let protocol_configs = vec![ @@ -1472,114 +1593,100 @@ mod tests { swarm_1.dial(listen_add_2).unwrap(); // Run swarm 2 in the background, receiving two requests. - pool.spawner() - .spawn_obj( - async move { - loop { - match swarm_2.select_next_some().await { - SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => { - result.unwrap(); - }, - _ => {}, - } - } + tokio::spawn(async move { + loop { + match swarm_2.select_next_some().await { + SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => { + result.unwrap(); + }, + _ => {}, } - .boxed() - .into(), - ) - .unwrap(); + } + }); // Handle both requests sent by swarm 1 to swarm 2 in the background. // // Make sure both requests overlap, by answering the first only after receiving the // second. - pool.spawner() - .spawn_obj( - async move { - let protocol_1_request = swarm_2_handler_1.next().await; - let protocol_2_request = swarm_2_handler_2.next().await; - - protocol_1_request - .unwrap() - .pending_response - .send(OutgoingResponse { - result: Ok(b"this is a response".to_vec()), - reputation_changes: Vec::new(), - sent_feedback: None, - }) - .unwrap(); - protocol_2_request - .unwrap() - .pending_response - .send(OutgoingResponse { - result: Ok(b"this is a response".to_vec()), - reputation_changes: Vec::new(), - sent_feedback: None, - }) - .unwrap(); - } - .boxed() - .into(), - ) - .unwrap(); + tokio::spawn(async move { + let protocol_1_request = swarm_2_handler_1.next().await; + let protocol_2_request = swarm_2_handler_2.next().await; + + protocol_1_request + .unwrap() + .pending_response + .send(OutgoingResponse { + result: Ok(b"this is a response".to_vec()), + reputation_changes: Vec::new(), + sent_feedback: None, + }) + .unwrap(); + protocol_2_request + .unwrap() + .pending_response + .send(OutgoingResponse { + result: Ok(b"this is a response".to_vec()), + reputation_changes: Vec::new(), + sent_feedback: None, + }) + .unwrap(); + }); // Have swarm 1 send two requests to swarm 2 and await responses. - pool.run_until(async move { - let mut response_receivers = None; - let mut num_responses = 0; - loop { - match swarm_1.select_next_some().await { - SwarmEvent::ConnectionEstablished { peer_id, .. } => { - let (sender_1, receiver_1) = oneshot::channel(); - let (sender_2, receiver_2) = oneshot::channel(); - swarm_1.behaviour_mut().send_request( - &peer_id, - protocol_name_1.clone(), - b"this is a request".to_vec(), - None, - sender_1, - IfDisconnected::ImmediateError, - ); - swarm_1.behaviour_mut().send_request( - &peer_id, - protocol_name_2.clone(), - b"this is a request".to_vec(), - None, - sender_2, - IfDisconnected::ImmediateError, - ); - assert!(response_receivers.is_none()); - response_receivers = Some((receiver_1, receiver_2)); - }, - SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { - num_responses += 1; - result.unwrap(); - if num_responses == 2 { - break - } - }, - _ => {}, - } + let mut response_receivers = None; + let mut num_responses = 0; + + loop { + match swarm_1.select_next_some().await { + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + let (sender_1, receiver_1) = oneshot::channel(); + let (sender_2, receiver_2) = oneshot::channel(); + swarm_1.behaviour_mut().send_request( + &peer_id, + protocol_name_1.clone(), + b"this is a request".to_vec(), + None, + sender_1, + IfDisconnected::ImmediateError, + ); + swarm_1.behaviour_mut().send_request( + &peer_id, + protocol_name_2.clone(), + b"this is a request".to_vec(), + None, + sender_2, + IfDisconnected::ImmediateError, + ); + assert!(response_receivers.is_none()); + response_receivers = Some((receiver_1, receiver_2)); + }, + SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + num_responses += 1; + result.unwrap(); + if num_responses == 2 { + break + } + }, + _ => {}, } - let (response_receiver_1, response_receiver_2) = response_receivers.unwrap(); - assert_eq!( - response_receiver_1.await.unwrap().unwrap(), - (b"this is a response".to_vec(), protocol_name_1) - ); - assert_eq!( - response_receiver_2.await.unwrap().unwrap(), - (b"this is a response".to_vec(), protocol_name_2) - ); - }); + } + let (response_receiver_1, response_receiver_2) = response_receivers.unwrap(); + assert_eq!( + response_receiver_1.await.unwrap().unwrap(), + (b"this is a response".to_vec(), protocol_name_1) + ); + assert_eq!( + response_receiver_2.await.unwrap().unwrap(), + (b"this is a response".to_vec(), protocol_name_2) + ); } - #[test] - fn request_fallback() { + #[tokio::test] + async fn request_fallback() { let protocol_name_1 = ProtocolName::from("/test/req-resp/2"); let protocol_name_1_fallback = ProtocolName::from("/test/req-resp/1"); let protocol_name_2 = ProtocolName::from("/test/another"); - let mut pool = LocalPool::new(); let protocol_config_1 = ProtocolConfig { name: protocol_name_1.clone(), @@ -1617,39 +1724,31 @@ mod tests { let mut protocol_config_2 = protocol_config_2.clone(); protocol_config_2.inbound_queue = Some(tx_2); - pool.spawner() - .spawn_obj( - async move { - for _ in 0..2 { - if let Some(rq) = rx_1.next().await { - let (fb_tx, fb_rx) = oneshot::channel(); - assert_eq!(rq.payload, b"request on protocol /test/req-resp/1"); - let _ = rq.pending_response.send(super::OutgoingResponse { - result: Ok( - b"this is a response on protocol /test/req-resp/1".to_vec() - ), - reputation_changes: Vec::new(), - sent_feedback: Some(fb_tx), - }); - fb_rx.await.unwrap(); - } - } - - if let Some(rq) = rx_2.next().await { - let (fb_tx, fb_rx) = oneshot::channel(); - assert_eq!(rq.payload, b"request on protocol /test/other"); - let _ = rq.pending_response.send(super::OutgoingResponse { - result: Ok(b"this is a response on protocol /test/other".to_vec()), - reputation_changes: Vec::new(), - sent_feedback: Some(fb_tx), - }); - fb_rx.await.unwrap(); - } + tokio::spawn(async move { + for _ in 0..2 { + if let Some(rq) = rx_1.next().await { + let (fb_tx, fb_rx) = oneshot::channel(); + assert_eq!(rq.payload, b"request on protocol /test/req-resp/1"); + let _ = rq.pending_response.send(super::OutgoingResponse { + result: Ok(b"this is a response on protocol /test/req-resp/1".to_vec()), + reputation_changes: Vec::new(), + sent_feedback: Some(fb_tx), + }); + fb_rx.await.unwrap(); } - .boxed() - .into(), - ) - .unwrap(); + } + + if let Some(rq) = rx_2.next().await { + let (fb_tx, fb_rx) = oneshot::channel(); + assert_eq!(rq.payload, b"request on protocol /test/other"); + let _ = rq.pending_response.send(super::OutgoingResponse { + result: Ok(b"this is a response on protocol /test/other".to_vec()), + reputation_changes: Vec::new(), + sent_feedback: Some(fb_tx), + }); + fb_rx.await.unwrap(); + } + }); build_swarm(vec![protocol_config_1_fallback, protocol_config_2].into_iter()) }; @@ -1670,132 +1769,269 @@ mod tests { } // Running `older_swarm`` in the background. - pool.spawner() - .spawn_obj({ - async move { - loop { - _ = older_swarm.0.select_next_some().await; - } - } - .boxed() - .into() - }) - .unwrap(); + tokio::spawn(async move { + loop { + _ = older_swarm.0.select_next_some().await; + } + }); // Run the newer swarm. Attempt to make requests on all protocols. let (mut swarm, _) = new_swarm; let mut older_peer_id = None; - pool.run_until(async move { - let mut response_receiver = None; - // Try the new protocol with a fallback. - loop { - match swarm.select_next_some().await { - SwarmEvent::ConnectionEstablished { peer_id, .. } => { - older_peer_id = Some(peer_id); - let (sender, receiver) = oneshot::channel(); - swarm.behaviour_mut().send_request( - &peer_id, - protocol_name_1.clone(), - b"request on protocol /test/req-resp/2".to_vec(), - Some(( - b"request on protocol /test/req-resp/1".to_vec(), - protocol_config_1_fallback.name.clone(), - )), - sender, - IfDisconnected::ImmediateError, - ); - response_receiver = Some(receiver); - }, - SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { - result.unwrap(); - break - }, - _ => {}, - } + let mut response_receiver = None; + // Try the new protocol with a fallback. + loop { + match swarm.select_next_some().await { + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + older_peer_id = Some(peer_id); + let (sender, receiver) = oneshot::channel(); + swarm.behaviour_mut().send_request( + &peer_id, + protocol_name_1.clone(), + b"request on protocol /test/req-resp/2".to_vec(), + Some(( + b"request on protocol /test/req-resp/1".to_vec(), + protocol_config_1_fallback.name.clone(), + )), + sender, + IfDisconnected::ImmediateError, + ); + response_receiver = Some(receiver); + }, + SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + result.unwrap(); + break + }, + _ => {}, } - assert_eq!( - response_receiver.unwrap().await.unwrap().unwrap(), - ( - b"this is a response on protocol /test/req-resp/1".to_vec(), - protocol_name_1_fallback.clone() - ) - ); - // Try the old protocol with a useless fallback. - let (sender, response_receiver) = oneshot::channel(); - swarm.behaviour_mut().send_request( - older_peer_id.as_ref().unwrap(), - protocol_name_1_fallback.clone(), - b"request on protocol /test/req-resp/1".to_vec(), - Some(( - b"dummy request, will fail if processed".to_vec(), - protocol_config_1_fallback.name.clone(), - )), - sender, - IfDisconnected::ImmediateError, - ); - loop { - match swarm.select_next_some().await { - SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { - result.unwrap(); - break - }, - _ => {}, - } + } + assert_eq!( + response_receiver.unwrap().await.unwrap().unwrap(), + ( + b"this is a response on protocol /test/req-resp/1".to_vec(), + protocol_name_1_fallback.clone() + ) + ); + // Try the old protocol with a useless fallback. + let (sender, response_receiver) = oneshot::channel(); + swarm.behaviour_mut().send_request( + older_peer_id.as_ref().unwrap(), + protocol_name_1_fallback.clone(), + b"request on protocol /test/req-resp/1".to_vec(), + Some(( + b"dummy request, will fail if processed".to_vec(), + protocol_config_1_fallback.name.clone(), + )), + sender, + IfDisconnected::ImmediateError, + ); + loop { + match swarm.select_next_some().await { + SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + result.unwrap(); + break + }, + _ => {}, } - assert_eq!( - response_receiver.await.unwrap().unwrap(), - ( - b"this is a response on protocol /test/req-resp/1".to_vec(), - protocol_name_1_fallback.clone() - ) - ); - // Try the new protocol with no fallback. Should fail. - let (sender, response_receiver) = oneshot::channel(); - swarm.behaviour_mut().send_request( - older_peer_id.as_ref().unwrap(), - protocol_name_1.clone(), - b"request on protocol /test/req-resp-2".to_vec(), - None, - sender, - IfDisconnected::ImmediateError, - ); - loop { - match swarm.select_next_some().await { - SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { - assert_matches!( - result.unwrap_err(), - RequestFailure::Network(OutboundFailure::UnsupportedProtocols) - ); - break - }, - _ => {}, - } + } + assert_eq!( + response_receiver.await.unwrap().unwrap(), + ( + b"this is a response on protocol /test/req-resp/1".to_vec(), + protocol_name_1_fallback.clone() + ) + ); + // Try the new protocol with no fallback. Should fail. + let (sender, response_receiver) = oneshot::channel(); + swarm.behaviour_mut().send_request( + older_peer_id.as_ref().unwrap(), + protocol_name_1.clone(), + b"request on protocol /test/req-resp-2".to_vec(), + None, + sender, + IfDisconnected::ImmediateError, + ); + loop { + match swarm.select_next_some().await { + SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + assert_matches!( + result.unwrap_err(), + RequestFailure::Network(OutboundFailure::UnsupportedProtocols) + ); + break + }, + _ => {}, } - assert!(response_receiver.await.unwrap().is_err()); - // Try the other protocol with no fallback. - let (sender, response_receiver) = oneshot::channel(); - swarm.behaviour_mut().send_request( - older_peer_id.as_ref().unwrap(), - protocol_name_2.clone(), - b"request on protocol /test/other".to_vec(), - None, - sender, - IfDisconnected::ImmediateError, - ); + } + assert!(response_receiver.await.unwrap().is_err()); + // Try the other protocol with no fallback. + let (sender, response_receiver) = oneshot::channel(); + swarm.behaviour_mut().send_request( + older_peer_id.as_ref().unwrap(), + protocol_name_2.clone(), + b"request on protocol /test/other".to_vec(), + None, + sender, + IfDisconnected::ImmediateError, + ); + loop { + match swarm.select_next_some().await { + SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + result.unwrap(); + break + }, + _ => {}, + } + } + assert_eq!( + response_receiver.await.unwrap().unwrap(), + (b"this is a response on protocol /test/other".to_vec(), protocol_name_2.clone()) + ); + } + + /// This test ensures the `RequestResponsesBehaviour` propagates back the Request::Timeout error + /// even if the libp2p component hangs. + /// + /// For testing purposes, the communication happens on the `/test/req-resp/1` protocol. + /// + /// This is achieved by: + /// - Two swarms are connected, the first one is slow to respond and has the timeout set to 10 + /// seconds. The second swarm is configured with a timeout of 10 seconds in libp2p, however in + /// substrate this is set to 1 second. + /// + /// - The first swarm introduces a delay of 2 seconds before responding to the request. + /// + /// - The second swarm must enforce the 1 second timeout. + #[tokio::test] + async fn enforce_outbound_timeouts() { + const REQUEST_TIMEOUT: Duration = Duration::from_secs(10); + const REQUEST_TIMEOUT_SHORT: Duration = Duration::from_secs(1); + + // These swarms only speaks protocol_name. + let protocol_name = ProtocolName::from("/test/req-resp/1"); + + let protocol_config = ProtocolConfig { + name: protocol_name.clone(), + fallback_names: Vec::new(), + max_request_size: 1024, + max_response_size: 1024 * 1024, + request_timeout: REQUEST_TIMEOUT, // <-- important for the test + inbound_queue: None, + }; + + // Build swarms whose behaviour is [`RequestResponsesBehaviour`]. + let (mut first_swarm, _) = { + let (tx, mut rx) = async_channel::bounded::(64); + + tokio::spawn(async move { + if let Some(rq) = rx.next().await { + assert_eq!(rq.payload, b"this is a request"); + + // Sleep for more than `REQUEST_TIMEOUT_SHORT` and less than + // `REQUEST_TIMEOUT`. + tokio::time::sleep(REQUEST_TIMEOUT_SHORT * 2).await; + + // By the time the response is sent back, the second swarm + // received Timeout. + let _ = rq.pending_response.send(super::OutgoingResponse { + result: Ok(b"Second swarm already timedout".to_vec()), + reputation_changes: Vec::new(), + sent_feedback: None, + }); + } + }); + + let mut protocol_config = protocol_config.clone(); + protocol_config.inbound_queue = Some(tx); + + build_swarm(iter::once(protocol_config)) + }; + + let (mut second_swarm, second_address) = { + let (tx, mut rx) = async_channel::bounded::(64); + + tokio::spawn(async move { + while let Some(rq) = rx.next().await { + let _ = rq.pending_response.send(super::OutgoingResponse { + result: Ok(b"This is the response".to_vec()), + reputation_changes: Vec::new(), + sent_feedback: None, + }); + } + }); + let mut protocol_config = protocol_config.clone(); + protocol_config.inbound_queue = Some(tx); + + build_swarm(iter::once(protocol_config.clone())) + }; + // Modify the second swarm to have a shorter timeout. + second_swarm + .behaviour_mut() + .protocols + .get_mut(&protocol_name) + .unwrap() + .request_timeout = REQUEST_TIMEOUT_SHORT; + + // Ask first swarm to dial the second swarm. + { + Swarm::dial(&mut first_swarm, second_address).unwrap(); + } + + // Running the first swarm in the background until a `InboundRequest` event happens, + // which is a hint about the test having ended. + tokio::spawn(async move { loop { - match swarm.select_next_some().await { - SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { - result.unwrap(); - break + let event = first_swarm.select_next_some().await; + match event { + SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => { + assert!(result.is_ok()); + break; + }, + SwarmEvent::ConnectionClosed { .. } => { + break; }, _ => {}, } } - assert_eq!( - response_receiver.await.unwrap().unwrap(), - (b"this is a response on protocol /test/other".to_vec(), protocol_name_2.clone()) - ); }); + + // Run the second swarm. + // - on connection established send the request to the first swarm + // - expect to receive a timeout + let mut response_receiver = None; + loop { + let event = second_swarm.select_next_some().await; + + match event { + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + let (sender, receiver) = oneshot::channel(); + second_swarm.behaviour_mut().send_request( + &peer_id, + protocol_name.clone(), + b"this is a request".to_vec(), + None, + sender, + IfDisconnected::ImmediateError, + ); + assert!(response_receiver.is_none()); + response_receiver = Some(receiver); + }, + SwarmEvent::ConnectionClosed { .. } => { + break; + }, + SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + assert!(result.is_err()); + break + }, + _ => {}, + } + } + + // Expect the timeout. + match response_receiver.unwrap().await.unwrap().unwrap_err() { + RequestFailure::Network(OutboundFailure::Timeout) => {}, + request_failure => panic!("Unexpected failure: {request_failure:?}"), + } } } From 58ffd90e688f452d24b895b2c36704abfd27e848 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 22 Jan 2025 19:45:38 +0200 Subject: [PATCH 2/4] Backport properly Signed-off-by: Alexandru Vasile --- .../client/network/src/request_responses.rs | 127 ++++++------------ 1 file changed, 38 insertions(+), 89 deletions(-) diff --git a/substrate/client/network/src/request_responses.rs b/substrate/client/network/src/request_responses.rs index a686fbeec62a..27f81733300f 100644 --- a/substrate/client/network/src/request_responses.rs +++ b/substrate/client/network/src/request_responses.rs @@ -571,17 +571,6 @@ impl NetworkBehaviour for RequestResponsesBehaviour { addr: &Multiaddr, role_override: Endpoint, ) -> Result, ConnectionDenied> { -<<<<<<< HEAD - let iter = self.protocols.iter_mut().filter_map(|(p, (r, _))| { - if let Ok(handler) = - r.handle_established_outbound_connection(connection_id, peer, addr, role_override) - { - Some((p.to_string(), handler)) - } else { - None - } - }); -======= let iter = self.protocols.iter_mut().filter_map(|(p, ProtocolDetails { behaviour, .. })| { if let Ok(handler) = behaviour.handle_established_outbound_connection( @@ -589,14 +578,12 @@ impl NetworkBehaviour for RequestResponsesBehaviour { peer, addr, role_override, - port_use, ) { Some((p.to_string(), handler)) } else { None } }); ->>>>>>> fd64a1e7 (net/libp2p: Enforce outbound request-response timeout limits (#7222)) Ok(MultiHandler::try_from_iter(iter).expect( "Protocols are in a HashMap and there can be at most one handler per protocol name, \ @@ -604,12 +591,14 @@ impl NetworkBehaviour for RequestResponsesBehaviour { )) } -<<<<<<< HEAD fn on_swarm_event(&mut self, event: FromSwarm) { match event { FromSwarm::ConnectionEstablished(e) => - for (p, _) in self.protocols.values_mut() { - NetworkBehaviour::on_swarm_event(p, FromSwarm::ConnectionEstablished(e)); + for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() { + NetworkBehaviour::on_swarm_event( + behaviour, + FromSwarm::ConnectionEstablished(e), + ); }, FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, @@ -619,8 +608,10 @@ impl NetworkBehaviour for RequestResponsesBehaviour { remaining_established, }) => for (p_name, p_handler) in handler.into_iter() { - if let Some((proto, _)) = self.protocols.get_mut(p_name.as_str()) { - proto.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed { + if let Some(ProtocolDetails { behaviour, .. }) = + self.protocols.get_mut(p_name.as_str()) + { + behaviour.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, connection_id, endpoint, @@ -636,54 +627,55 @@ impl NetworkBehaviour for RequestResponsesBehaviour { } }, FromSwarm::DialFailure(e) => - for (p, _) in self.protocols.values_mut() { - NetworkBehaviour::on_swarm_event(p, FromSwarm::DialFailure(e)); + for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() { + NetworkBehaviour::on_swarm_event(behaviour, FromSwarm::DialFailure(e)); }, FromSwarm::ListenerClosed(e) => - for (p, _) in self.protocols.values_mut() { - NetworkBehaviour::on_swarm_event(p, FromSwarm::ListenerClosed(e)); + for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() { + NetworkBehaviour::on_swarm_event(behaviour, FromSwarm::ListenerClosed(e)); }, FromSwarm::ListenFailure(e) => - for (p, _) in self.protocols.values_mut() { - NetworkBehaviour::on_swarm_event(p, FromSwarm::ListenFailure(e)); + for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() { + NetworkBehaviour::on_swarm_event(behaviour, FromSwarm::ListenFailure(e)); }, FromSwarm::ListenerError(e) => - for (p, _) in self.protocols.values_mut() { - NetworkBehaviour::on_swarm_event(p, FromSwarm::ListenerError(e)); + for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() { + NetworkBehaviour::on_swarm_event(behaviour, FromSwarm::ListenerError(e)); }, FromSwarm::ExternalAddrExpired(e) => - for (p, _) in self.protocols.values_mut() { - NetworkBehaviour::on_swarm_event(p, FromSwarm::ExternalAddrExpired(e)); + for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() { + NetworkBehaviour::on_swarm_event(behaviour, FromSwarm::ExternalAddrExpired(e)); }, FromSwarm::NewListener(e) => - for (p, _) in self.protocols.values_mut() { - NetworkBehaviour::on_swarm_event(p, FromSwarm::NewListener(e)); + for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() { + NetworkBehaviour::on_swarm_event(behaviour, FromSwarm::NewListener(e)); }, FromSwarm::ExpiredListenAddr(e) => - for (p, _) in self.protocols.values_mut() { - NetworkBehaviour::on_swarm_event(p, FromSwarm::ExpiredListenAddr(e)); + for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() { + NetworkBehaviour::on_swarm_event(behaviour, FromSwarm::ExpiredListenAddr(e)); }, FromSwarm::NewExternalAddrCandidate(e) => - for (p, _) in self.protocols.values_mut() { - NetworkBehaviour::on_swarm_event(p, FromSwarm::NewExternalAddrCandidate(e)); + for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() { + NetworkBehaviour::on_swarm_event( + behaviour, + FromSwarm::NewExternalAddrCandidate(e), + ); }, FromSwarm::ExternalAddrConfirmed(e) => - for (p, _) in self.protocols.values_mut() { - NetworkBehaviour::on_swarm_event(p, FromSwarm::ExternalAddrConfirmed(e)); + for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() { + NetworkBehaviour::on_swarm_event( + behaviour, + FromSwarm::ExternalAddrConfirmed(e), + ); }, FromSwarm::AddressChange(e) => - for (p, _) in self.protocols.values_mut() { - NetworkBehaviour::on_swarm_event(p, FromSwarm::AddressChange(e)); + for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() { + NetworkBehaviour::on_swarm_event(behaviour, FromSwarm::AddressChange(e)); }, FromSwarm::NewListenAddr(e) => - for (p, _) in self.protocols.values_mut() { - NetworkBehaviour::on_swarm_event(p, FromSwarm::NewListenAddr(e)); + for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() { + NetworkBehaviour::on_swarm_event(behaviour, FromSwarm::NewListenAddr(e)); }, -======= - fn on_swarm_event(&mut self, event: FromSwarm) { - for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() { - behaviour.on_swarm_event(event); ->>>>>>> fd64a1e7 (net/libp2p: Enforce outbound request-response timeout limits (#7222)) } } @@ -805,14 +797,9 @@ impl NetworkBehaviour for RequestResponsesBehaviour { let mut fallback_requests = vec![]; // Poll request-responses protocols. -<<<<<<< HEAD - for (protocol, (ref mut behaviour, ref mut resp_builder)) in &mut self.protocols { - 'poll_protocol: while let Poll::Ready(ev) = behaviour.poll(cx, params) { -======= for (protocol, ProtocolDetails { behaviour, inbound_queue, .. }) in &mut self.protocols { - 'poll_protocol: while let Poll::Ready(ev) = behaviour.poll(cx) { ->>>>>>> fd64a1e7 (net/libp2p: Enforce outbound request-response timeout limits (#7222)) + 'poll_protocol: while let Poll::Ready(ev) = behaviour.poll(cx, params) { let ev = match ev { // Main events we are interested in. ToSwarm::GenerateEvent(ev) => ev, @@ -1292,14 +1279,10 @@ mod tests { transport, behaviour, keypair.public().to_peer_id(), -<<<<<<< HEAD - SwarmConfig::with_executor(TokioExecutor(runtime)), -======= SwarmConfig::with_executor(TokioExecutor {}) // This is taken care of by notification protocols in non-test environment // It is very slow in test environment for some reason, hence larger timeout .with_idle_connection_timeout(Duration::from_secs(10)), ->>>>>>> fd64a1e7 (net/libp2p: Enforce outbound request-response timeout limits (#7222)) ); let listen_addr: Multiaddr = format!("/memory/{}", rand::random::()).parse().unwrap(); @@ -1440,33 +1423,7 @@ mod tests { // Running `swarm[0]` in the background until a `InboundRequest` event happens, // which is a hint about the test having ended. let (mut swarm, _) = swarms.remove(0); -<<<<<<< HEAD - pool.spawner() - .spawn_obj({ - async move { - loop { - match swarm.select_next_some().await { - SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => { - assert!(result.is_ok()); - break - }, - _ => {}, - } - } - } - .boxed() - .into() - }) - .unwrap(); - - // Remove and run the remaining swarm. - let (mut swarm, _) = swarms.remove(0); - pool.run_until(async move { - let mut response_receiver = None; - -======= tokio::spawn(async move { ->>>>>>> fd64a1e7 (net/libp2p: Enforce outbound request-response timeout limits (#7222)) loop { match swarm.select_next_some().await { SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => { @@ -1478,14 +1435,6 @@ mod tests { _ => {}, } } -<<<<<<< HEAD - - match response_receiver.unwrap().await.unwrap().unwrap_err() { - RequestFailure::Network(OutboundFailure::ConnectionClosed) => {}, - _ => panic!(), - } -======= ->>>>>>> fd64a1e7 (net/libp2p: Enforce outbound request-response timeout limits (#7222)) }); // Remove and run the remaining swarm. From 86826141704b00460667553d10db6e3e37690c32 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 22 Jan 2025 19:51:34 +0200 Subject: [PATCH 3/4] req-resp: Adjust testing Signed-off-by: Alexandru Vasile --- substrate/client/network/src/request_responses.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/substrate/client/network/src/request_responses.rs b/substrate/client/network/src/request_responses.rs index 27f81733300f..3db3a2dcf798 100644 --- a/substrate/client/network/src/request_responses.rs +++ b/substrate/client/network/src/request_responses.rs @@ -1466,7 +1466,7 @@ mod tests { } match response_receiver.unwrap().await.unwrap().unwrap_err() { - RequestFailure::Network(OutboundFailure::Io(_)) => {}, + RequestFailure::Network(OutboundFailure::ConnectionClosed) => {}, request_failure => panic!("Unexpected failure: {request_failure:?}"), } } From 37f2e13182d71d75dd92f482c624f33e9f38ec57 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 22 Jan 2025 19:55:25 +0200 Subject: [PATCH 4/4] req-resp: Fix max_response_size_exceeded conflicts Signed-off-by: Alexandru Vasile --- substrate/client/network/src/request_responses.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/substrate/client/network/src/request_responses.rs b/substrate/client/network/src/request_responses.rs index 3db3a2dcf798..3a390f70a4d5 100644 --- a/substrate/client/network/src/request_responses.rs +++ b/substrate/client/network/src/request_responses.rs @@ -1428,8 +1428,6 @@ mod tests { match swarm.select_next_some().await { SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => { assert!(result.is_ok()); - }, - SwarmEvent::ConnectionClosed { .. } => { break; }, _ => {},