diff --git a/prdoc/pr_7222.prdoc b/prdoc/pr_7222.prdoc new file mode 100644 index 000000000000..40b89b0a1820 --- /dev/null +++ b/prdoc/pr_7222.prdoc @@ -0,0 +1,19 @@ +title: Enforce libp2p outbound request-response timeout limits + +doc: + - audience: Node Dev + description: | + This PR enforces that outbound requests are finished within the specified protocol timeout. + The stable2412 version running libp2p 0.52.4 contains a bug which does not track request timeouts properly + https://github.com/libp2p/rust-libp2p/pull/5429. + + The issue has been detected while submitting libp2p to litep2p requests in Kusama. + This aims to check that pending outbound requests have not timed out. + Although the issue has been fixed in libp2p, there might be other cases where this may happen. + For example, https://github.com/libp2p/rust-libp2p/pull/5417. + + For more context see https://github.com/paritytech/polkadot-sdk/issues/7076#issuecomment-2596085096. + +crates: +- name: sc-network + bump: patch diff --git a/substrate/client/network/src/request_responses.rs b/substrate/client/network/src/request_responses.rs index 5fe34c781378..e21773632ed7 100644 --- a/substrate/client/network/src/request_responses.rs +++ b/substrate/client/network/src/request_responses.rs @@ -64,6 +64,9 @@ use std::{ pub use libp2p::request_response::{Config, InboundRequestId, OutboundRequestId}; +/// Periodically check if requests are taking too long. +const PERIODIC_REQUEST_CHECK: Duration = Duration::from_secs(2); + /// Possible failures occurring in the context of sending an outbound request and receiving the /// response. #[derive(Debug, Clone, thiserror::Error)] @@ -251,8 +254,14 @@ pub struct OutgoingResponse { /// Information stored about a pending request. struct PendingRequest { + /// The time when the request was sent to the libp2p request-response protocol. started_at: Instant, - response_tx: oneshot::Sender, ProtocolName), RequestFailure>>, + /// The channel to send the response back to the caller. + /// + /// This is wrapped in an `Option` to allow for the channel to be taken out + /// on force-detected timeouts. + response_tx: Option, ProtocolName), RequestFailure>>>, + /// Fallback request to send if the primary request fails. fallback_request: Option<(Vec, ProtocolName)>, } @@ -336,16 +345,20 @@ impl From<(ProtocolName, RequestId)> for ProtocolRequestId } } +/// Details of a request-response protocol. +struct ProtocolDetails { + behaviour: Behaviour, + inbound_queue: Option>, + request_timeout: Duration, +} + /// Implementation of `NetworkBehaviour` that provides support for request-response protocols. pub struct RequestResponsesBehaviour { /// The multiple sub-protocols, by name. /// /// Contains the underlying libp2p request-response [`Behaviour`], plus an optional /// "response builder" used to build responses for incoming requests. - protocols: HashMap< - ProtocolName, - (Behaviour, Option>), - >, + protocols: HashMap, /// Pending requests, passed down to a request-response [`Behaviour`], awaiting a reply. pending_requests: HashMap, PendingRequest>, @@ -365,6 +378,14 @@ pub struct RequestResponsesBehaviour { /// Primarily used to get a reputation of a node. peer_store: Arc, + + /// Interval to check that the requests are not taking too long. + /// + /// We had issues in the past where libp2p did not produce a timeout event in due time. + /// + /// For more details, see: + /// - + periodic_request_check: tokio::time::Interval, } /// Generated by the response builder and waiting to be processed. @@ -393,7 +414,7 @@ impl RequestResponsesBehaviour { ProtocolSupport::Outbound }; - let rq_rp = Behaviour::with_codec( + let behaviour = Behaviour::with_codec( GenericCodec { max_request_size: protocol.max_request_size, max_response_size: protocol.max_response_size, @@ -405,7 +426,11 @@ impl RequestResponsesBehaviour { ); match protocols.entry(protocol.name) { - Entry::Vacant(e) => e.insert((rq_rp, protocol.inbound_queue)), + Entry::Vacant(e) => e.insert(ProtocolDetails { + behaviour, + inbound_queue: protocol.inbound_queue, + request_timeout: protocol.request_timeout, + }), Entry::Occupied(e) => return Err(RegisterError::DuplicateProtocol(e.key().clone())), }; } @@ -417,6 +442,7 @@ impl RequestResponsesBehaviour { pending_responses_arrival_time: Default::default(), send_feedback: Default::default(), peer_store, + periodic_request_check: tokio::time::interval(PERIODIC_REQUEST_CHECK), }) } @@ -437,9 +463,11 @@ impl RequestResponsesBehaviour { ) { log::trace!(target: "sub-libp2p", "send request to {target} ({protocol_name:?}), {} bytes", request.len()); - if let Some((protocol, _)) = self.protocols.get_mut(protocol_name.deref()) { + if let Some(ProtocolDetails { behaviour, .. }) = + self.protocols.get_mut(protocol_name.deref()) + { Self::send_request_inner( - protocol, + behaviour, &mut self.pending_requests, target, protocol_name, @@ -474,7 +502,7 @@ impl RequestResponsesBehaviour { (protocol_name.to_string().into(), request_id).into(), PendingRequest { started_at: Instant::now(), - response_tx: pending_response, + response_tx: Some(pending_response), fallback_request, }, ); @@ -521,18 +549,19 @@ impl NetworkBehaviour for RequestResponsesBehaviour { local_addr: &Multiaddr, remote_addr: &Multiaddr, ) -> Result, ConnectionDenied> { - let iter = self.protocols.iter_mut().filter_map(|(p, (r, _))| { - if let Ok(handler) = r.handle_established_inbound_connection( - connection_id, - peer, - local_addr, - remote_addr, - ) { - Some((p.to_string(), handler)) - } else { - None - } - }); + let iter = + self.protocols.iter_mut().filter_map(|(p, ProtocolDetails { behaviour, .. })| { + if let Ok(handler) = behaviour.handle_established_inbound_connection( + connection_id, + peer, + local_addr, + remote_addr, + ) { + Some((p.to_string(), handler)) + } else { + None + } + }); Ok(MultiHandler::try_from_iter(iter).expect( "Protocols are in a HashMap and there can be at most one handler per protocol name, \ @@ -548,19 +577,20 @@ impl NetworkBehaviour for RequestResponsesBehaviour { role_override: Endpoint, port_use: PortUse, ) -> Result, ConnectionDenied> { - let iter = self.protocols.iter_mut().filter_map(|(p, (r, _))| { - if let Ok(handler) = r.handle_established_outbound_connection( - connection_id, - peer, - addr, - role_override, - port_use, - ) { - Some((p.to_string(), handler)) - } else { - None - } - }); + let iter = + self.protocols.iter_mut().filter_map(|(p, ProtocolDetails { behaviour, .. })| { + if let Ok(handler) = behaviour.handle_established_outbound_connection( + connection_id, + peer, + addr, + role_override, + port_use, + ) { + Some((p.to_string(), handler)) + } else { + None + } + }); Ok(MultiHandler::try_from_iter(iter).expect( "Protocols are in a HashMap and there can be at most one handler per protocol name, \ @@ -569,8 +599,8 @@ impl NetworkBehaviour for RequestResponsesBehaviour { } fn on_swarm_event(&mut self, event: FromSwarm) { - for (protocol, _) in self.protocols.values_mut() { - protocol.on_swarm_event(event); + for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() { + behaviour.on_swarm_event(event); } } @@ -581,8 +611,8 @@ impl NetworkBehaviour for RequestResponsesBehaviour { event: THandlerOutEvent, ) { let p_name = event.0; - if let Some((proto, _)) = self.protocols.get_mut(p_name.as_str()) { - return proto.on_connection_handler_event(peer_id, connection_id, event.1) + if let Some(ProtocolDetails { behaviour, .. }) = self.protocols.get_mut(p_name.as_str()) { + return behaviour.on_connection_handler_event(peer_id, connection_id, event.1) } else { log::warn!( target: "sub-libp2p", @@ -594,6 +624,51 @@ impl NetworkBehaviour for RequestResponsesBehaviour { fn poll(&mut self, cx: &mut Context) -> Poll>> { 'poll_all: loop { + // Poll the periodic request check. + if self.periodic_request_check.poll_tick(cx).is_ready() { + self.pending_requests.retain(|id, req| { + let Some(ProtocolDetails { request_timeout, .. }) = + self.protocols.get(&id.protocol) + else { + log::warn!( + target: "sub-libp2p", + "Request {id:?} has no protocol registered.", + ); + + if let Some(response_tx) = req.response_tx.take() { + if response_tx.send(Err(RequestFailure::UnknownProtocol)).is_err() { + log::debug!( + target: "sub-libp2p", + "Request {id:?} has no protocol registered. At the same time local node is no longer interested in the result.", + ); + } + } + return false + }; + + let elapsed = req.started_at.elapsed(); + if elapsed > *request_timeout { + log::debug!( + target: "sub-libp2p", + "Request {id:?} force detected as timeout.", + ); + + if let Some(response_tx) = req.response_tx.take() { + if response_tx.send(Err(RequestFailure::Network(OutboundFailure::Timeout))).is_err() { + log::debug!( + target: "sub-libp2p", + "Request {id:?} force detected as timeout. At the same time local node is no longer interested in the result.", + ); + } + } + + false + } else { + true + } + }); + } + // Poll to see if any response is ready to be sent back. while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) { let RequestProcessingOutcome { @@ -610,10 +685,12 @@ impl NetworkBehaviour for RequestResponsesBehaviour { }; if let Ok(payload) = result { - if let Some((protocol, _)) = self.protocols.get_mut(&*protocol_name) { + if let Some(ProtocolDetails { behaviour, .. }) = + self.protocols.get_mut(&*protocol_name) + { log::trace!(target: "sub-libp2p", "send response to {peer} ({protocol_name:?}), {} bytes", payload.len()); - if protocol.send_response(inner_channel, Ok(payload)).is_err() { + if behaviour.send_response(inner_channel, Ok(payload)).is_err() { // Note: Failure is handled further below when receiving // `InboundFailure` event from request-response [`Behaviour`]. log::debug!( @@ -641,7 +718,8 @@ impl NetworkBehaviour for RequestResponsesBehaviour { let mut fallback_requests = vec![]; // Poll request-responses protocols. - for (protocol, (ref mut behaviour, ref mut resp_builder)) in &mut self.protocols { + for (protocol, ProtocolDetails { behaviour, inbound_queue, .. }) in &mut self.protocols + { 'poll_protocol: while let Poll::Ready(ev) = behaviour.poll(cx) { let ev = match ev { // Main events we are interested in. @@ -696,7 +774,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { // Submit the request to the "response builder" passed by the user at // initialization. - if let Some(resp_builder) = resp_builder { + if let Some(resp_builder) = inbound_queue { // If the response builder is too busy, silently drop `tx`. This // will be reported by the corresponding request-response // [`Behaviour`] through an `InboundFailure::Omission` event. @@ -744,7 +822,11 @@ impl NetworkBehaviour for RequestResponsesBehaviour { .pending_requests .remove(&(protocol.clone(), request_id).into()) { - Some(PendingRequest { started_at, response_tx, .. }) => { + Some(PendingRequest { + started_at, + response_tx: Some(response_tx), + .. + }) => { log::trace!( target: "sub-libp2p", "received response from {peer} ({protocol:?}), {} bytes", @@ -760,13 +842,13 @@ impl NetworkBehaviour for RequestResponsesBehaviour { .map_err(|_| RequestFailure::Obsolete); (started_at, delivered) }, - None => { - log::warn!( + _ => { + log::debug!( target: "sub-libp2p", - "Received `RequestResponseEvent::Message` with unexpected request id {:?}", + "Received `RequestResponseEvent::Message` with unexpected request id {:?} from {:?}", request_id, + peer, ); - debug_assert!(false); continue }, }; @@ -795,7 +877,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { { Some(PendingRequest { started_at, - response_tx, + response_tx: Some(response_tx), fallback_request, }) => { // Try using the fallback request if the protocol was not @@ -833,13 +915,14 @@ impl NetworkBehaviour for RequestResponsesBehaviour { } started_at }, - None => { - log::warn!( + _ => { + log::debug!( target: "sub-libp2p", - "Received `RequestResponseEvent::Message` with unexpected request id {:?}", + "Received `RequestResponseEvent::OutboundFailure` with unexpected request id {:?} error {:?} from {:?}", request_id, + error, + peer ); - debug_assert!(false); continue }, }; @@ -904,7 +987,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { // Send out fallback requests. for (peer, protocol, request, pending_response) in fallback_requests.drain(..) { - if let Some((behaviour, _)) = self.protocols.get_mut(&protocol) { + if let Some(ProtocolDetails { behaviour, .. }) = self.protocols.get_mut(&protocol) { Self::send_request_inner( behaviour, &mut self.pending_requests, @@ -1073,7 +1156,7 @@ mod tests { use crate::mock::MockPeerStore; use assert_matches::assert_matches; - use futures::{channel::oneshot, executor::LocalPool, task::Spawn}; + use futures::channel::oneshot; use libp2p::{ core::{ transport::{MemoryTransport, Transport}, @@ -1086,10 +1169,10 @@ mod tests { }; use std::{iter, time::Duration}; - struct TokioExecutor(tokio::runtime::Runtime); + struct TokioExecutor; impl Executor for TokioExecutor { fn exec(&self, f: Pin + Send>>) { - let _ = self.0.spawn(f); + tokio::spawn(f); } } @@ -1106,13 +1189,11 @@ mod tests { let behaviour = RequestResponsesBehaviour::new(list, Arc::new(MockPeerStore {})).unwrap(); - let runtime = tokio::runtime::Runtime::new().unwrap(); - let mut swarm = Swarm::new( transport, behaviour, keypair.public().to_peer_id(), - SwarmConfig::with_executor(TokioExecutor(runtime)) + SwarmConfig::with_executor(TokioExecutor {}) // This is taken care of by notification protocols in non-test environment // It is very slow in test environment for some reason, hence larger timeout .with_idle_connection_timeout(Duration::from_secs(10)), @@ -1125,34 +1206,27 @@ mod tests { (swarm, listen_addr) } - #[test] - fn basic_request_response_works() { + #[tokio::test] + async fn basic_request_response_works() { let protocol_name = ProtocolName::from("/test/req-resp/1"); - let mut pool = LocalPool::new(); // Build swarms whose behaviour is [`RequestResponsesBehaviour`]. let mut swarms = (0..2) .map(|_| { let (tx, mut rx) = async_channel::bounded::(64); - pool.spawner() - .spawn_obj( - async move { - while let Some(rq) = rx.next().await { - let (fb_tx, fb_rx) = oneshot::channel(); - assert_eq!(rq.payload, b"this is a request"); - let _ = rq.pending_response.send(super::OutgoingResponse { - result: Ok(b"this is a response".to_vec()), - reputation_changes: Vec::new(), - sent_feedback: Some(fb_tx), - }); - fb_rx.await.unwrap(); - } - } - .boxed() - .into(), - ) - .unwrap(); + tokio::spawn(async move { + while let Some(rq) = rx.next().await { + let (fb_tx, fb_rx) = oneshot::channel(); + assert_eq!(rq.payload, b"this is a request"); + let _ = rq.pending_response.send(super::OutgoingResponse { + result: Ok(b"this is a response".to_vec()), + reputation_changes: Vec::new(), + sent_feedback: Some(fb_tx), + }); + fb_rx.await.unwrap(); + } + }); let protocol_config = ProtocolConfig { name: protocol_name.clone(), @@ -1176,84 +1250,69 @@ mod tests { let (mut swarm, _) = swarms.remove(0); // Running `swarm[0]` in the background. - pool.spawner() - .spawn_obj({ - async move { - loop { - match swarm.select_next_some().await { - SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => { - result.unwrap(); - }, - _ => {}, - } - } - } - .boxed() - .into() - }) - .unwrap(); - - // Remove and run the remaining swarm. - let (mut swarm, _) = swarms.remove(0); - pool.run_until(async move { - let mut response_receiver = None; - + tokio::spawn(async move { loop { match swarm.select_next_some().await { - SwarmEvent::ConnectionEstablished { peer_id, .. } => { - let (sender, receiver) = oneshot::channel(); - swarm.behaviour_mut().send_request( - &peer_id, - protocol_name.clone(), - b"this is a request".to_vec(), - None, - sender, - IfDisconnected::ImmediateError, - ); - assert!(response_receiver.is_none()); - response_receiver = Some(receiver); - }, - SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => { result.unwrap(); - break }, _ => {}, } } - - assert_eq!( - response_receiver.unwrap().await.unwrap().unwrap(), - (b"this is a response".to_vec(), protocol_name) - ); }); + + // Remove and run the remaining swarm. + let (mut swarm, _) = swarms.remove(0); + let mut response_receiver = None; + + loop { + match swarm.select_next_some().await { + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + let (sender, receiver) = oneshot::channel(); + swarm.behaviour_mut().send_request( + &peer_id, + protocol_name.clone(), + b"this is a request".to_vec(), + None, + sender, + IfDisconnected::ImmediateError, + ); + assert!(response_receiver.is_none()); + response_receiver = Some(receiver); + }, + SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + result.unwrap(); + break + }, + _ => {}, + } + } + + assert_eq!( + response_receiver.unwrap().await.unwrap().unwrap(), + (b"this is a response".to_vec(), protocol_name) + ); } - #[test] - fn max_response_size_exceeded() { + #[tokio::test] + async fn max_response_size_exceeded() { let protocol_name = ProtocolName::from("/test/req-resp/1"); - let mut pool = LocalPool::new(); // Build swarms whose behaviour is [`RequestResponsesBehaviour`]. let mut swarms = (0..2) .map(|_| { let (tx, mut rx) = async_channel::bounded::(64); - pool.spawner() - .spawn_obj( - async move { - while let Some(rq) = rx.next().await { - assert_eq!(rq.payload, b"this is a request"); - let _ = rq.pending_response.send(super::OutgoingResponse { - result: Ok(b"this response exceeds the limit".to_vec()), - reputation_changes: Vec::new(), - sent_feedback: None, - }); - } - } - .boxed() - .into(), - ) - .unwrap(); + tokio::spawn(async move { + while let Some(rq) = rx.next().await { + assert_eq!(rq.payload, b"this is a request"); + let _ = rq.pending_response.send(super::OutgoingResponse { + result: Ok(b"this response exceeds the limit".to_vec()), + reputation_changes: Vec::new(), + sent_feedback: None, + }); + } + }); let protocol_config = ProtocolConfig { name: protocol_name.clone(), @@ -1278,59 +1337,52 @@ mod tests { // Running `swarm[0]` in the background until a `InboundRequest` event happens, // which is a hint about the test having ended. let (mut swarm, _) = swarms.remove(0); - pool.spawner() - .spawn_obj({ - async move { - loop { - match swarm.select_next_some().await { - SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => { - assert!(result.is_ok()); - }, - SwarmEvent::ConnectionClosed { .. } => { - break; - }, - _ => {}, - } - } - } - .boxed() - .into() - }) - .unwrap(); - - // Remove and run the remaining swarm. - let (mut swarm, _) = swarms.remove(0); - pool.run_until(async move { - let mut response_receiver = None; - + tokio::spawn(async move { loop { match swarm.select_next_some().await { - SwarmEvent::ConnectionEstablished { peer_id, .. } => { - let (sender, receiver) = oneshot::channel(); - swarm.behaviour_mut().send_request( - &peer_id, - protocol_name.clone(), - b"this is a request".to_vec(), - None, - sender, - IfDisconnected::ImmediateError, - ); - assert!(response_receiver.is_none()); - response_receiver = Some(receiver); + SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => { + assert!(result.is_ok()); }, - SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { - assert!(result.is_err()); - break + SwarmEvent::ConnectionClosed { .. } => { + break; }, _ => {}, } } + }); + + // Remove and run the remaining swarm. + let (mut swarm, _) = swarms.remove(0); + + let mut response_receiver = None; - match response_receiver.unwrap().await.unwrap().unwrap_err() { - RequestFailure::Network(OutboundFailure::Io(_)) => {}, - request_failure => panic!("Unexpected failure: {request_failure:?}"), + loop { + match swarm.select_next_some().await { + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + let (sender, receiver) = oneshot::channel(); + swarm.behaviour_mut().send_request( + &peer_id, + protocol_name.clone(), + b"this is a request".to_vec(), + None, + sender, + IfDisconnected::ImmediateError, + ); + assert!(response_receiver.is_none()); + response_receiver = Some(receiver); + }, + SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + assert!(result.is_err()); + break + }, + _ => {}, } - }); + } + + match response_receiver.unwrap().await.unwrap().unwrap_err() { + RequestFailure::Network(OutboundFailure::Io(_)) => {}, + request_failure => panic!("Unexpected failure: {request_failure:?}"), + } } /// A `RequestId` is a unique identifier among either all inbound or all outbound requests for @@ -1343,11 +1395,10 @@ mod tests { /// without a `RequestId` collision. /// /// See [`ProtocolRequestId`] for additional information. - #[test] - fn request_id_collision() { + #[tokio::test] + async fn request_id_collision() { let protocol_name_1 = ProtocolName::from("/test/req-resp-1/1"); let protocol_name_2 = ProtocolName::from("/test/req-resp-2/1"); - let mut pool = LocalPool::new(); let mut swarm_1 = { let protocol_configs = vec![ @@ -1405,114 +1456,100 @@ mod tests { swarm_1.dial(listen_add_2).unwrap(); // Run swarm 2 in the background, receiving two requests. - pool.spawner() - .spawn_obj( - async move { - loop { - match swarm_2.select_next_some().await { - SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => { - result.unwrap(); - }, - _ => {}, - } - } + tokio::spawn(async move { + loop { + match swarm_2.select_next_some().await { + SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => { + result.unwrap(); + }, + _ => {}, } - .boxed() - .into(), - ) - .unwrap(); + } + }); // Handle both requests sent by swarm 1 to swarm 2 in the background. // // Make sure both requests overlap, by answering the first only after receiving the // second. - pool.spawner() - .spawn_obj( - async move { - let protocol_1_request = swarm_2_handler_1.next().await; - let protocol_2_request = swarm_2_handler_2.next().await; - - protocol_1_request - .unwrap() - .pending_response - .send(OutgoingResponse { - result: Ok(b"this is a response".to_vec()), - reputation_changes: Vec::new(), - sent_feedback: None, - }) - .unwrap(); - protocol_2_request - .unwrap() - .pending_response - .send(OutgoingResponse { - result: Ok(b"this is a response".to_vec()), - reputation_changes: Vec::new(), - sent_feedback: None, - }) - .unwrap(); - } - .boxed() - .into(), - ) - .unwrap(); + tokio::spawn(async move { + let protocol_1_request = swarm_2_handler_1.next().await; + let protocol_2_request = swarm_2_handler_2.next().await; + + protocol_1_request + .unwrap() + .pending_response + .send(OutgoingResponse { + result: Ok(b"this is a response".to_vec()), + reputation_changes: Vec::new(), + sent_feedback: None, + }) + .unwrap(); + protocol_2_request + .unwrap() + .pending_response + .send(OutgoingResponse { + result: Ok(b"this is a response".to_vec()), + reputation_changes: Vec::new(), + sent_feedback: None, + }) + .unwrap(); + }); // Have swarm 1 send two requests to swarm 2 and await responses. - pool.run_until(async move { - let mut response_receivers = None; - let mut num_responses = 0; - loop { - match swarm_1.select_next_some().await { - SwarmEvent::ConnectionEstablished { peer_id, .. } => { - let (sender_1, receiver_1) = oneshot::channel(); - let (sender_2, receiver_2) = oneshot::channel(); - swarm_1.behaviour_mut().send_request( - &peer_id, - protocol_name_1.clone(), - b"this is a request".to_vec(), - None, - sender_1, - IfDisconnected::ImmediateError, - ); - swarm_1.behaviour_mut().send_request( - &peer_id, - protocol_name_2.clone(), - b"this is a request".to_vec(), - None, - sender_2, - IfDisconnected::ImmediateError, - ); - assert!(response_receivers.is_none()); - response_receivers = Some((receiver_1, receiver_2)); - }, - SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { - num_responses += 1; - result.unwrap(); - if num_responses == 2 { - break - } - }, - _ => {}, - } + let mut response_receivers = None; + let mut num_responses = 0; + + loop { + match swarm_1.select_next_some().await { + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + let (sender_1, receiver_1) = oneshot::channel(); + let (sender_2, receiver_2) = oneshot::channel(); + swarm_1.behaviour_mut().send_request( + &peer_id, + protocol_name_1.clone(), + b"this is a request".to_vec(), + None, + sender_1, + IfDisconnected::ImmediateError, + ); + swarm_1.behaviour_mut().send_request( + &peer_id, + protocol_name_2.clone(), + b"this is a request".to_vec(), + None, + sender_2, + IfDisconnected::ImmediateError, + ); + assert!(response_receivers.is_none()); + response_receivers = Some((receiver_1, receiver_2)); + }, + SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + num_responses += 1; + result.unwrap(); + if num_responses == 2 { + break + } + }, + _ => {}, } - let (response_receiver_1, response_receiver_2) = response_receivers.unwrap(); - assert_eq!( - response_receiver_1.await.unwrap().unwrap(), - (b"this is a response".to_vec(), protocol_name_1) - ); - assert_eq!( - response_receiver_2.await.unwrap().unwrap(), - (b"this is a response".to_vec(), protocol_name_2) - ); - }); + } + let (response_receiver_1, response_receiver_2) = response_receivers.unwrap(); + assert_eq!( + response_receiver_1.await.unwrap().unwrap(), + (b"this is a response".to_vec(), protocol_name_1) + ); + assert_eq!( + response_receiver_2.await.unwrap().unwrap(), + (b"this is a response".to_vec(), protocol_name_2) + ); } - #[test] - fn request_fallback() { + #[tokio::test] + async fn request_fallback() { let protocol_name_1 = ProtocolName::from("/test/req-resp/2"); let protocol_name_1_fallback = ProtocolName::from("/test/req-resp/1"); let protocol_name_2 = ProtocolName::from("/test/another"); - let mut pool = LocalPool::new(); let protocol_config_1 = ProtocolConfig { name: protocol_name_1.clone(), @@ -1550,39 +1587,31 @@ mod tests { let mut protocol_config_2 = protocol_config_2.clone(); protocol_config_2.inbound_queue = Some(tx_2); - pool.spawner() - .spawn_obj( - async move { - for _ in 0..2 { - if let Some(rq) = rx_1.next().await { - let (fb_tx, fb_rx) = oneshot::channel(); - assert_eq!(rq.payload, b"request on protocol /test/req-resp/1"); - let _ = rq.pending_response.send(super::OutgoingResponse { - result: Ok( - b"this is a response on protocol /test/req-resp/1".to_vec() - ), - reputation_changes: Vec::new(), - sent_feedback: Some(fb_tx), - }); - fb_rx.await.unwrap(); - } - } - - if let Some(rq) = rx_2.next().await { - let (fb_tx, fb_rx) = oneshot::channel(); - assert_eq!(rq.payload, b"request on protocol /test/other"); - let _ = rq.pending_response.send(super::OutgoingResponse { - result: Ok(b"this is a response on protocol /test/other".to_vec()), - reputation_changes: Vec::new(), - sent_feedback: Some(fb_tx), - }); - fb_rx.await.unwrap(); - } + tokio::spawn(async move { + for _ in 0..2 { + if let Some(rq) = rx_1.next().await { + let (fb_tx, fb_rx) = oneshot::channel(); + assert_eq!(rq.payload, b"request on protocol /test/req-resp/1"); + let _ = rq.pending_response.send(super::OutgoingResponse { + result: Ok(b"this is a response on protocol /test/req-resp/1".to_vec()), + reputation_changes: Vec::new(), + sent_feedback: Some(fb_tx), + }); + fb_rx.await.unwrap(); } - .boxed() - .into(), - ) - .unwrap(); + } + + if let Some(rq) = rx_2.next().await { + let (fb_tx, fb_rx) = oneshot::channel(); + assert_eq!(rq.payload, b"request on protocol /test/other"); + let _ = rq.pending_response.send(super::OutgoingResponse { + result: Ok(b"this is a response on protocol /test/other".to_vec()), + reputation_changes: Vec::new(), + sent_feedback: Some(fb_tx), + }); + fb_rx.await.unwrap(); + } + }); build_swarm(vec![protocol_config_1_fallback, protocol_config_2].into_iter()) }; @@ -1603,132 +1632,269 @@ mod tests { } // Running `older_swarm`` in the background. - pool.spawner() - .spawn_obj({ - async move { - loop { - _ = older_swarm.0.select_next_some().await; - } - } - .boxed() - .into() - }) - .unwrap(); + tokio::spawn(async move { + loop { + _ = older_swarm.0.select_next_some().await; + } + }); // Run the newer swarm. Attempt to make requests on all protocols. let (mut swarm, _) = new_swarm; let mut older_peer_id = None; - pool.run_until(async move { - let mut response_receiver = None; - // Try the new protocol with a fallback. - loop { - match swarm.select_next_some().await { - SwarmEvent::ConnectionEstablished { peer_id, .. } => { - older_peer_id = Some(peer_id); - let (sender, receiver) = oneshot::channel(); - swarm.behaviour_mut().send_request( - &peer_id, - protocol_name_1.clone(), - b"request on protocol /test/req-resp/2".to_vec(), - Some(( - b"request on protocol /test/req-resp/1".to_vec(), - protocol_config_1_fallback.name.clone(), - )), - sender, - IfDisconnected::ImmediateError, - ); - response_receiver = Some(receiver); - }, - SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { - result.unwrap(); - break - }, - _ => {}, - } + let mut response_receiver = None; + // Try the new protocol with a fallback. + loop { + match swarm.select_next_some().await { + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + older_peer_id = Some(peer_id); + let (sender, receiver) = oneshot::channel(); + swarm.behaviour_mut().send_request( + &peer_id, + protocol_name_1.clone(), + b"request on protocol /test/req-resp/2".to_vec(), + Some(( + b"request on protocol /test/req-resp/1".to_vec(), + protocol_config_1_fallback.name.clone(), + )), + sender, + IfDisconnected::ImmediateError, + ); + response_receiver = Some(receiver); + }, + SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + result.unwrap(); + break + }, + _ => {}, } - assert_eq!( - response_receiver.unwrap().await.unwrap().unwrap(), - ( - b"this is a response on protocol /test/req-resp/1".to_vec(), - protocol_name_1_fallback.clone() - ) - ); - // Try the old protocol with a useless fallback. - let (sender, response_receiver) = oneshot::channel(); - swarm.behaviour_mut().send_request( - older_peer_id.as_ref().unwrap(), - protocol_name_1_fallback.clone(), - b"request on protocol /test/req-resp/1".to_vec(), - Some(( - b"dummy request, will fail if processed".to_vec(), - protocol_config_1_fallback.name.clone(), - )), - sender, - IfDisconnected::ImmediateError, - ); - loop { - match swarm.select_next_some().await { - SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { - result.unwrap(); - break - }, - _ => {}, - } + } + assert_eq!( + response_receiver.unwrap().await.unwrap().unwrap(), + ( + b"this is a response on protocol /test/req-resp/1".to_vec(), + protocol_name_1_fallback.clone() + ) + ); + // Try the old protocol with a useless fallback. + let (sender, response_receiver) = oneshot::channel(); + swarm.behaviour_mut().send_request( + older_peer_id.as_ref().unwrap(), + protocol_name_1_fallback.clone(), + b"request on protocol /test/req-resp/1".to_vec(), + Some(( + b"dummy request, will fail if processed".to_vec(), + protocol_config_1_fallback.name.clone(), + )), + sender, + IfDisconnected::ImmediateError, + ); + loop { + match swarm.select_next_some().await { + SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + result.unwrap(); + break + }, + _ => {}, } - assert_eq!( - response_receiver.await.unwrap().unwrap(), - ( - b"this is a response on protocol /test/req-resp/1".to_vec(), - protocol_name_1_fallback.clone() - ) - ); - // Try the new protocol with no fallback. Should fail. - let (sender, response_receiver) = oneshot::channel(); - swarm.behaviour_mut().send_request( - older_peer_id.as_ref().unwrap(), - protocol_name_1.clone(), - b"request on protocol /test/req-resp-2".to_vec(), - None, - sender, - IfDisconnected::ImmediateError, - ); - loop { - match swarm.select_next_some().await { - SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { - assert_matches!( - result.unwrap_err(), - RequestFailure::Network(OutboundFailure::UnsupportedProtocols) - ); - break - }, - _ => {}, - } + } + assert_eq!( + response_receiver.await.unwrap().unwrap(), + ( + b"this is a response on protocol /test/req-resp/1".to_vec(), + protocol_name_1_fallback.clone() + ) + ); + // Try the new protocol with no fallback. Should fail. + let (sender, response_receiver) = oneshot::channel(); + swarm.behaviour_mut().send_request( + older_peer_id.as_ref().unwrap(), + protocol_name_1.clone(), + b"request on protocol /test/req-resp-2".to_vec(), + None, + sender, + IfDisconnected::ImmediateError, + ); + loop { + match swarm.select_next_some().await { + SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + assert_matches!( + result.unwrap_err(), + RequestFailure::Network(OutboundFailure::UnsupportedProtocols) + ); + break + }, + _ => {}, } - assert!(response_receiver.await.unwrap().is_err()); - // Try the other protocol with no fallback. - let (sender, response_receiver) = oneshot::channel(); - swarm.behaviour_mut().send_request( - older_peer_id.as_ref().unwrap(), - protocol_name_2.clone(), - b"request on protocol /test/other".to_vec(), - None, - sender, - IfDisconnected::ImmediateError, - ); + } + assert!(response_receiver.await.unwrap().is_err()); + // Try the other protocol with no fallback. + let (sender, response_receiver) = oneshot::channel(); + swarm.behaviour_mut().send_request( + older_peer_id.as_ref().unwrap(), + protocol_name_2.clone(), + b"request on protocol /test/other".to_vec(), + None, + sender, + IfDisconnected::ImmediateError, + ); + loop { + match swarm.select_next_some().await { + SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + result.unwrap(); + break + }, + _ => {}, + } + } + assert_eq!( + response_receiver.await.unwrap().unwrap(), + (b"this is a response on protocol /test/other".to_vec(), protocol_name_2.clone()) + ); + } + + /// This test ensures the `RequestResponsesBehaviour` propagates back the Request::Timeout error + /// even if the libp2p component hangs. + /// + /// For testing purposes, the communication happens on the `/test/req-resp/1` protocol. + /// + /// This is achieved by: + /// - Two swarms are connected, the first one is slow to respond and has the timeout set to 10 + /// seconds. The second swarm is configured with a timeout of 10 seconds in libp2p, however in + /// substrate this is set to 1 second. + /// + /// - The first swarm introduces a delay of 2 seconds before responding to the request. + /// + /// - The second swarm must enforce the 1 second timeout. + #[tokio::test] + async fn enforce_outbound_timeouts() { + const REQUEST_TIMEOUT: Duration = Duration::from_secs(10); + const REQUEST_TIMEOUT_SHORT: Duration = Duration::from_secs(1); + + // These swarms only speaks protocol_name. + let protocol_name = ProtocolName::from("/test/req-resp/1"); + + let protocol_config = ProtocolConfig { + name: protocol_name.clone(), + fallback_names: Vec::new(), + max_request_size: 1024, + max_response_size: 1024 * 1024, + request_timeout: REQUEST_TIMEOUT, // <-- important for the test + inbound_queue: None, + }; + + // Build swarms whose behaviour is [`RequestResponsesBehaviour`]. + let (mut first_swarm, _) = { + let (tx, mut rx) = async_channel::bounded::(64); + + tokio::spawn(async move { + if let Some(rq) = rx.next().await { + assert_eq!(rq.payload, b"this is a request"); + + // Sleep for more than `REQUEST_TIMEOUT_SHORT` and less than + // `REQUEST_TIMEOUT`. + tokio::time::sleep(REQUEST_TIMEOUT_SHORT * 2).await; + + // By the time the response is sent back, the second swarm + // received Timeout. + let _ = rq.pending_response.send(super::OutgoingResponse { + result: Ok(b"Second swarm already timedout".to_vec()), + reputation_changes: Vec::new(), + sent_feedback: None, + }); + } + }); + + let mut protocol_config = protocol_config.clone(); + protocol_config.inbound_queue = Some(tx); + + build_swarm(iter::once(protocol_config)) + }; + + let (mut second_swarm, second_address) = { + let (tx, mut rx) = async_channel::bounded::(64); + + tokio::spawn(async move { + while let Some(rq) = rx.next().await { + let _ = rq.pending_response.send(super::OutgoingResponse { + result: Ok(b"This is the response".to_vec()), + reputation_changes: Vec::new(), + sent_feedback: None, + }); + } + }); + let mut protocol_config = protocol_config.clone(); + protocol_config.inbound_queue = Some(tx); + + build_swarm(iter::once(protocol_config.clone())) + }; + // Modify the second swarm to have a shorter timeout. + second_swarm + .behaviour_mut() + .protocols + .get_mut(&protocol_name) + .unwrap() + .request_timeout = REQUEST_TIMEOUT_SHORT; + + // Ask first swarm to dial the second swarm. + { + Swarm::dial(&mut first_swarm, second_address).unwrap(); + } + + // Running the first swarm in the background until a `InboundRequest` event happens, + // which is a hint about the test having ended. + tokio::spawn(async move { loop { - match swarm.select_next_some().await { - SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { - result.unwrap(); - break + let event = first_swarm.select_next_some().await; + match event { + SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => { + assert!(result.is_ok()); + break; + }, + SwarmEvent::ConnectionClosed { .. } => { + break; }, _ => {}, } } - assert_eq!( - response_receiver.await.unwrap().unwrap(), - (b"this is a response on protocol /test/other".to_vec(), protocol_name_2.clone()) - ); }); + + // Run the second swarm. + // - on connection established send the request to the first swarm + // - expect to receive a timeout + let mut response_receiver = None; + loop { + let event = second_swarm.select_next_some().await; + + match event { + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + let (sender, receiver) = oneshot::channel(); + second_swarm.behaviour_mut().send_request( + &peer_id, + protocol_name.clone(), + b"this is a request".to_vec(), + None, + sender, + IfDisconnected::ImmediateError, + ); + assert!(response_receiver.is_none()); + response_receiver = Some(receiver); + }, + SwarmEvent::ConnectionClosed { .. } => { + break; + }, + SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + assert!(result.is_err()); + break + }, + _ => {}, + } + } + + // Expect the timeout. + match response_receiver.unwrap().await.unwrap().unwrap_err() { + RequestFailure::Network(OutboundFailure::Timeout) => {}, + request_failure => panic!("Unexpected failure: {request_failure:?}"), + } } }