From 9f8ca213b2168a1eb1c0f852ac4bb44b5c266ad0 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 17 Jan 2025 11:57:00 +0200 Subject: [PATCH 01/19] net/req-resp: Introduce a new `ProtocolDetails` to hold protocol details Signed-off-by: Alexandru Vasile --- .../client/network/src/request_responses.rs | 99 +++++++++++-------- 1 file changed, 57 insertions(+), 42 deletions(-) diff --git a/substrate/client/network/src/request_responses.rs b/substrate/client/network/src/request_responses.rs index 5fe34c781378..c5c2e86fe4cd 100644 --- a/substrate/client/network/src/request_responses.rs +++ b/substrate/client/network/src/request_responses.rs @@ -336,16 +336,20 @@ impl From<(ProtocolName, RequestId)> for ProtocolRequestId } } +/// Details of a request-response protocol. +struct ProtocolDetails { + behaviour: Behaviour, + inbound_queue: Option>, + request_timeout: Duration, +} + /// Implementation of `NetworkBehaviour` that provides support for request-response protocols. pub struct RequestResponsesBehaviour { /// The multiple sub-protocols, by name. /// /// Contains the underlying libp2p request-response [`Behaviour`], plus an optional /// "response builder" used to build responses for incoming requests. - protocols: HashMap< - ProtocolName, - (Behaviour, Option>), - >, + protocols: HashMap, /// Pending requests, passed down to a request-response [`Behaviour`], awaiting a reply. pending_requests: HashMap, PendingRequest>, @@ -393,7 +397,7 @@ impl RequestResponsesBehaviour { ProtocolSupport::Outbound }; - let rq_rp = Behaviour::with_codec( + let behaviour = Behaviour::with_codec( GenericCodec { max_request_size: protocol.max_request_size, max_response_size: protocol.max_response_size, @@ -405,7 +409,11 @@ impl RequestResponsesBehaviour { ); match protocols.entry(protocol.name) { - Entry::Vacant(e) => e.insert((rq_rp, protocol.inbound_queue)), + Entry::Vacant(e) => e.insert(ProtocolDetails { + behaviour, + inbound_queue: protocol.inbound_queue, + request_timeout: protocol.request_timeout, + }), Entry::Occupied(e) => return Err(RegisterError::DuplicateProtocol(e.key().clone())), }; } @@ -437,9 +445,11 @@ impl RequestResponsesBehaviour { ) { log::trace!(target: "sub-libp2p", "send request to {target} ({protocol_name:?}), {} bytes", request.len()); - if let Some((protocol, _)) = self.protocols.get_mut(protocol_name.deref()) { + if let Some(ProtocolDetails { behaviour, .. }) = + self.protocols.get_mut(protocol_name.deref()) + { Self::send_request_inner( - protocol, + behaviour, &mut self.pending_requests, target, protocol_name, @@ -521,18 +531,19 @@ impl NetworkBehaviour for RequestResponsesBehaviour { local_addr: &Multiaddr, remote_addr: &Multiaddr, ) -> Result, ConnectionDenied> { - let iter = self.protocols.iter_mut().filter_map(|(p, (r, _))| { - if let Ok(handler) = r.handle_established_inbound_connection( - connection_id, - peer, - local_addr, - remote_addr, - ) { - Some((p.to_string(), handler)) - } else { - None - } - }); + let iter = + self.protocols.iter_mut().filter_map(|(p, ProtocolDetails { behaviour, .. })| { + if let Ok(handler) = behaviour.handle_established_inbound_connection( + connection_id, + peer, + local_addr, + remote_addr, + ) { + Some((p.to_string(), handler)) + } else { + None + } + }); Ok(MultiHandler::try_from_iter(iter).expect( "Protocols are in a HashMap and there can be at most one handler per protocol name, \ @@ -548,19 +559,20 @@ impl NetworkBehaviour for RequestResponsesBehaviour { role_override: Endpoint, port_use: PortUse, ) -> Result, ConnectionDenied> { - let iter = self.protocols.iter_mut().filter_map(|(p, (r, _))| { - if let Ok(handler) = r.handle_established_outbound_connection( - connection_id, - peer, - addr, - role_override, - port_use, - ) { - Some((p.to_string(), handler)) - } else { - None - } - }); + let iter = + self.protocols.iter_mut().filter_map(|(p, ProtocolDetails { behaviour, .. })| { + if let Ok(handler) = behaviour.handle_established_outbound_connection( + connection_id, + peer, + addr, + role_override, + port_use, + ) { + Some((p.to_string(), handler)) + } else { + None + } + }); Ok(MultiHandler::try_from_iter(iter).expect( "Protocols are in a HashMap and there can be at most one handler per protocol name, \ @@ -569,8 +581,8 @@ impl NetworkBehaviour for RequestResponsesBehaviour { } fn on_swarm_event(&mut self, event: FromSwarm) { - for (protocol, _) in self.protocols.values_mut() { - protocol.on_swarm_event(event); + for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() { + behaviour.on_swarm_event(event); } } @@ -581,8 +593,8 @@ impl NetworkBehaviour for RequestResponsesBehaviour { event: THandlerOutEvent, ) { let p_name = event.0; - if let Some((proto, _)) = self.protocols.get_mut(p_name.as_str()) { - return proto.on_connection_handler_event(peer_id, connection_id, event.1) + if let Some(ProtocolDetails { behaviour, .. }) = self.protocols.get_mut(p_name.as_str()) { + return behaviour.on_connection_handler_event(peer_id, connection_id, event.1) } else { log::warn!( target: "sub-libp2p", @@ -610,10 +622,12 @@ impl NetworkBehaviour for RequestResponsesBehaviour { }; if let Ok(payload) = result { - if let Some((protocol, _)) = self.protocols.get_mut(&*protocol_name) { + if let Some(ProtocolDetails { behaviour, .. }) = + self.protocols.get_mut(&*protocol_name) + { log::trace!(target: "sub-libp2p", "send response to {peer} ({protocol_name:?}), {} bytes", payload.len()); - if protocol.send_response(inner_channel, Ok(payload)).is_err() { + if behaviour.send_response(inner_channel, Ok(payload)).is_err() { // Note: Failure is handled further below when receiving // `InboundFailure` event from request-response [`Behaviour`]. log::debug!( @@ -641,7 +655,8 @@ impl NetworkBehaviour for RequestResponsesBehaviour { let mut fallback_requests = vec![]; // Poll request-responses protocols. - for (protocol, (ref mut behaviour, ref mut resp_builder)) in &mut self.protocols { + for (protocol, ProtocolDetails { behaviour, inbound_queue, .. }) in &mut self.protocols + { 'poll_protocol: while let Poll::Ready(ev) = behaviour.poll(cx) { let ev = match ev { // Main events we are interested in. @@ -696,7 +711,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { // Submit the request to the "response builder" passed by the user at // initialization. - if let Some(resp_builder) = resp_builder { + if let Some(resp_builder) = inbound_queue { // If the response builder is too busy, silently drop `tx`. This // will be reported by the corresponding request-response // [`Behaviour`] through an `InboundFailure::Omission` event. @@ -904,7 +919,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { // Send out fallback requests. for (peer, protocol, request, pending_response) in fallback_requests.drain(..) { - if let Some((behaviour, _)) = self.protocols.get_mut(&protocol) { + if let Some(ProtocolDetails { behaviour, .. }) = self.protocols.get_mut(&protocol) { Self::send_request_inner( behaviour, &mut self.pending_requests, From 190fe8989653a98a7754897486e6ea3db57cc2ae Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 17 Jan 2025 13:48:06 +0200 Subject: [PATCH 02/19] net/req-resp: Evict timeout requests Signed-off-by: Alexandru Vasile --- .../client/network/src/request_responses.rs | 66 +++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/substrate/client/network/src/request_responses.rs b/substrate/client/network/src/request_responses.rs index c5c2e86fe4cd..d0f28c6bb514 100644 --- a/substrate/client/network/src/request_responses.rs +++ b/substrate/client/network/src/request_responses.rs @@ -64,6 +64,9 @@ use std::{ pub use libp2p::request_response::{Config, InboundRequestId, OutboundRequestId}; +/// Periodically check if requests are taking too long. +const PERIODIC_REQUEST_CHECK: Duration = Duration::from_secs(2); + /// Possible failures occurring in the context of sending an outbound request and receiving the /// response. #[derive(Debug, Clone, thiserror::Error)] @@ -369,6 +372,14 @@ pub struct RequestResponsesBehaviour { /// Primarily used to get a reputation of a node. peer_store: Arc, + + /// Interval to check that the requests are not taking too long. + /// + /// We had issues in the past where libp2p did not produce a timeout event in due time. + /// + /// For more details, see: + /// - https://github.com/paritytech/polkadot-sdk/issues/7076#issuecomment-2596085096 + periodic_request_check: tokio::time::Interval, } /// Generated by the response builder and waiting to be processed. @@ -425,6 +436,7 @@ impl RequestResponsesBehaviour { pending_responses_arrival_time: Default::default(), send_feedback: Default::default(), peer_store, + periodic_request_check: tokio::time::interval(PERIODIC_REQUEST_CHECK), }) } @@ -606,6 +618,60 @@ impl NetworkBehaviour for RequestResponsesBehaviour { fn poll(&mut self, cx: &mut Context) -> Poll>> { 'poll_all: loop { + // Poll the periodic request check. + if let Poll::Ready(_) = self.periodic_request_check.poll_tick(cx) { + // First pass is needed to collect timed out requests and + // submit `OutboundFailure::Timeout` on the moved oneshot sender. + let timedout_requests: Vec<_> = self + .pending_requests + .iter() + .filter_map(|(id, req)| { + let Some(ProtocolDetails { request_timeout, .. }) = + self.protocols.get(&id.protocol) + else { + log::warn!( + target: "sub-libp2p", + "Request with id {:?} has no protocol registered.", + id.request_id, + ); + return Some(id.clone()) + }; + + let elapsed = req.started_at.elapsed(); + if elapsed > *request_timeout { + Some(id.clone()) + } else { + None + } + }) + .collect(); + + if !timedout_requests.is_empty() { + log::warn!( + target: "sub-libp2p", + "Requests {timedout_requests:?} force detected as timeout.", + ); + } + + for id in timedout_requests { + let Some(req) = self.pending_requests.remove(&id) else { + continue; + }; + + if req + .response_tx + .send(Err(RequestFailure::Network(OutboundFailure::Timeout))) + .is_err() + { + log::debug!( + target: "sub-libp2p", + "Request with id {id:?} force detected as timeout. At the same time local \ + node is no longer interested in the result.", + ); + } + } + } + // Poll to see if any response is ready to be sent back. while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) { let RequestProcessingOutcome { From c3b01dac9a1f582b7e5724534fb4325e72bcb11e Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 17 Jan 2025 14:01:54 +0200 Subject: [PATCH 03/19] Add prdoc Signed-off-by: Alexandru Vasile --- prdoc/pr_7222.prdoc | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) create mode 100644 prdoc/pr_7222.prdoc diff --git a/prdoc/pr_7222.prdoc b/prdoc/pr_7222.prdoc new file mode 100644 index 000000000000..ab6b7d6069b4 --- /dev/null +++ b/prdoc/pr_7222.prdoc @@ -0,0 +1,19 @@ +title: Enforce libp2p outbound request-response timeout limits + +doc: + - audience: Node Dev + description: | + This PR enforces that outbound requests are finished within the specified protocol timeout. + The stable2412 version running libp2p 0.52.4 contains a bug which does not track request timeouts properly: + https://github.com/libp2p/rust-libp2p/pull/5429. + + The issue has been detected while submitting libp2p -> litep2p requests in kusama. + This aims to check that pending outbound requests have not timedout. + Although the issue has been fixed in libp2p, there might be other cases where this may happen. + For example: https://github.com/libp2p/rust-libp2p/pull/5417. + + For more context see: https://github.com/paritytech/polkadot-sdk/issues/7076#issuecomment-2596085096. + +crates: +- name: sc-network + bump: patch From 48f01271ed5c199744fc59d7d64654a602e76789 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 17 Jan 2025 17:14:24 +0200 Subject: [PATCH 04/19] net/req-resp: Fix cargo docs Signed-off-by: Alexandru Vasile --- substrate/client/network/src/request_responses.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/substrate/client/network/src/request_responses.rs b/substrate/client/network/src/request_responses.rs index d0f28c6bb514..ebb5133c551f 100644 --- a/substrate/client/network/src/request_responses.rs +++ b/substrate/client/network/src/request_responses.rs @@ -378,7 +378,7 @@ pub struct RequestResponsesBehaviour { /// We had issues in the past where libp2p did not produce a timeout event in due time. /// /// For more details, see: - /// - https://github.com/paritytech/polkadot-sdk/issues/7076#issuecomment-2596085096 + /// - periodic_request_check: tokio::time::Interval, } From f8e89f4b579cce590f427b3826cdab5de66675cc Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 17 Jan 2025 17:18:03 +0200 Subject: [PATCH 05/19] net/req-resp: Adjust logs Signed-off-by: Alexandru Vasile --- substrate/client/network/src/request_responses.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/substrate/client/network/src/request_responses.rs b/substrate/client/network/src/request_responses.rs index ebb5133c551f..e0cd7c5a67ac 100644 --- a/substrate/client/network/src/request_responses.rs +++ b/substrate/client/network/src/request_responses.rs @@ -631,8 +631,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { else { log::warn!( target: "sub-libp2p", - "Request with id {:?} has no protocol registered.", - id.request_id, + "Request {id:?} has no protocol registered.", ); return Some(id.clone()) }; @@ -665,7 +664,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { { log::debug!( target: "sub-libp2p", - "Request with id {id:?} force detected as timeout. At the same time local \ + "Request {id:?} force detected as timeout. At the same time local \ node is no longer interested in the result.", ); } From 050c166e5c5dfb9cc22d3cd4554b9b5d81d8de02 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 17 Jan 2025 17:30:41 +0200 Subject: [PATCH 06/19] Adjust PRdoc Signed-off-by: Alexandru Vasile --- prdoc/pr_7222.prdoc | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/prdoc/pr_7222.prdoc b/prdoc/pr_7222.prdoc index ab6b7d6069b4..40b89b0a1820 100644 --- a/prdoc/pr_7222.prdoc +++ b/prdoc/pr_7222.prdoc @@ -4,15 +4,15 @@ doc: - audience: Node Dev description: | This PR enforces that outbound requests are finished within the specified protocol timeout. - The stable2412 version running libp2p 0.52.4 contains a bug which does not track request timeouts properly: + The stable2412 version running libp2p 0.52.4 contains a bug which does not track request timeouts properly https://github.com/libp2p/rust-libp2p/pull/5429. - The issue has been detected while submitting libp2p -> litep2p requests in kusama. - This aims to check that pending outbound requests have not timedout. + The issue has been detected while submitting libp2p to litep2p requests in Kusama. + This aims to check that pending outbound requests have not timed out. Although the issue has been fixed in libp2p, there might be other cases where this may happen. - For example: https://github.com/libp2p/rust-libp2p/pull/5417. + For example, https://github.com/libp2p/rust-libp2p/pull/5417. - For more context see: https://github.com/paritytech/polkadot-sdk/issues/7076#issuecomment-2596085096. + For more context see https://github.com/paritytech/polkadot-sdk/issues/7076#issuecomment-2596085096. crates: - name: sc-network From 13d00947e6a819432cca3c06593f116c9be05d71 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 20 Jan 2025 12:40:17 +0200 Subject: [PATCH 07/19] req-resp/tests: Add test to ensure timouts are forced Signed-off-by: Alexandru Vasile --- .../client/network/src/request_responses.rs | 156 +++++++++++++++++- 1 file changed, 149 insertions(+), 7 deletions(-) diff --git a/substrate/client/network/src/request_responses.rs b/substrate/client/network/src/request_responses.rs index e0cd7c5a67ac..79a3fa957081 100644 --- a/substrate/client/network/src/request_responses.rs +++ b/substrate/client/network/src/request_responses.rs @@ -846,7 +846,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { "Received `RequestResponseEvent::Message` with unexpected request id {:?}", request_id, ); - debug_assert!(false); + // debug_assert!(false); continue }, }; @@ -919,7 +919,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { "Received `RequestResponseEvent::Message` with unexpected request id {:?}", request_id, ); - debug_assert!(false); + // debug_assert!(false); continue }, }; @@ -1166,10 +1166,10 @@ mod tests { }; use std::{iter, time::Duration}; - struct TokioExecutor(tokio::runtime::Runtime); + struct TokioExecutor; impl Executor for TokioExecutor { fn exec(&self, f: Pin + Send>>) { - let _ = self.0.spawn(f); + tokio::spawn(f); } } @@ -1186,13 +1186,11 @@ mod tests { let behaviour = RequestResponsesBehaviour::new(list, Arc::new(MockPeerStore {})).unwrap(); - let runtime = tokio::runtime::Runtime::new().unwrap(); - let mut swarm = Swarm::new( transport, behaviour, keypair.public().to_peer_id(), - SwarmConfig::with_executor(TokioExecutor(runtime)) + SwarmConfig::with_executor(TokioExecutor {}) // This is taken care of by notification protocols in non-test environment // It is very slow in test environment for some reason, hence larger timeout .with_idle_connection_timeout(Duration::from_secs(10)), @@ -1811,4 +1809,148 @@ mod tests { ); }); } + + /// This test ensures the `RequestResponsesBehaviour` propagates back the Request::Timeout error + /// even if the libp2p component hangs. + /// + /// For testing purposes, the communication happens on the `/test/req-resp/1` protocol. + /// + /// This is achieved by: + /// - Two swarms are connected, the first one is slow to respond and has the timeout set to 60 + /// seconds. The second swarm is configured with a timeout of 60 seconds in libp2p, however in + /// substrate this is set to 5 seconds. + /// + /// - The first swarm introduces a delay of 10 seconds before responding to the request. + /// + /// - The second swarm must enforce the 5 seconds timeout. + #[tokio::test] + async fn enforce_outbound_timeouts() { + const REQUEST_TIMEOUT: Duration = Duration::from_secs(60); + const REQUEST_TIMEOUT_SHORT: Duration = Duration::from_secs(5); + + // These swarms only speaks protocol_name. + let protocol_name = ProtocolName::from("/test/req-resp/1"); + + let protocol_config = ProtocolConfig { + name: protocol_name.clone(), + fallback_names: Vec::new(), + max_request_size: 1024, + max_response_size: 1024 * 1024, + request_timeout: REQUEST_TIMEOUT, // <-- important for the test + inbound_queue: None, + }; + + // Build swarms whose behaviour is [`RequestResponsesBehaviour`]. + let (mut first_swarm, _) = { + let (tx, mut rx) = async_channel::bounded::(64); + + tokio::spawn(async move { + while let Some(rq) = rx.next().await { + assert_eq!(rq.payload, b"this is a request"); + + // Sleep for more than `REQUEST_TIMEOUT_SHORT` and less than + // `REQUEST_TIMEOUT`. + tokio::time::sleep(REQUEST_TIMEOUT_SHORT * 2).await; + + // By the time the response is sent back, the second swarm + // received Timeout. + let _ = rq.pending_response.send(super::OutgoingResponse { + result: Ok(b"Second swarm already timedout".to_vec()), + reputation_changes: Vec::new(), + sent_feedback: None, + }); + } + }); + + let mut protocol_config = protocol_config.clone(); + protocol_config.inbound_queue = Some(tx); + + build_swarm(iter::once(protocol_config)) + }; + + let (mut second_swarm, second_address) = { + let (tx, mut rx) = async_channel::bounded::(64); + + tokio::spawn(async move { + while let Some(rq) = rx.next().await { + let _ = rq.pending_response.send(super::OutgoingResponse { + result: Ok(b"This is the response".to_vec()), + reputation_changes: Vec::new(), + sent_feedback: None, + }); + } + }); + let mut protocol_config = protocol_config.clone(); + protocol_config.inbound_queue = Some(tx); + + build_swarm(iter::once(protocol_config.clone())) + }; + // Modify the second swarm to have a shorter timeout. + second_swarm + .behaviour_mut() + .protocols + .get_mut(&protocol_name) + .unwrap() + .request_timeout = REQUEST_TIMEOUT_SHORT; + + // Ask first swarm to dial the second swarm. + { + Swarm::dial(&mut first_swarm, second_address).unwrap(); + } + + // Running the first swarm in the background until a `InboundRequest` event happens, + // which is a hint about the test having ended. + tokio::spawn(async move { + loop { + let event = first_swarm.select_next_some().await; + match event { + SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => { + assert!(result.is_ok()); + }, + SwarmEvent::ConnectionClosed { .. } => { + break; + }, + _ => {}, + } + } + }); + + // Run the second swarm. + // - on connection established send the request to the first swarm + // - expect to receive a timeout + let mut response_receiver = None; + loop { + let event = second_swarm.select_next_some().await; + + match event { + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + let (sender, receiver) = oneshot::channel(); + second_swarm.behaviour_mut().send_request( + &peer_id, + protocol_name.clone(), + b"this is a request".to_vec(), + None, + sender, + IfDisconnected::ImmediateError, + ); + assert!(response_receiver.is_none()); + response_receiver = Some(receiver); + }, + SwarmEvent::ConnectionClosed { .. } => { + break; + }, + SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + assert!(result.is_err()); + break + }, + _ => {}, + } + } + + // Expect the timeout. + match response_receiver.unwrap().await.unwrap().unwrap_err() { + RequestFailure::Network(OutboundFailure::Timeout) => {}, + request_failure => panic!("Unexpected failure: {request_failure:?}"), + } + } } From e862829c058a9c9088b30034d7b4b9e9b302cdf4 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 20 Jan 2025 12:41:05 +0200 Subject: [PATCH 08/19] req-resp: Remove debug asserts as libp2p might respond later Signed-off-by: Alexandru Vasile --- substrate/client/network/src/request_responses.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/substrate/client/network/src/request_responses.rs b/substrate/client/network/src/request_responses.rs index 79a3fa957081..5931d05c43bf 100644 --- a/substrate/client/network/src/request_responses.rs +++ b/substrate/client/network/src/request_responses.rs @@ -846,7 +846,6 @@ impl NetworkBehaviour for RequestResponsesBehaviour { "Received `RequestResponseEvent::Message` with unexpected request id {:?}", request_id, ); - // debug_assert!(false); continue }, }; @@ -919,7 +918,6 @@ impl NetworkBehaviour for RequestResponsesBehaviour { "Received `RequestResponseEvent::Message` with unexpected request id {:?}", request_id, ); - // debug_assert!(false); continue }, }; From 1ec465728a8df8d564156c34b9d1dd94d4d2b9ad Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 20 Jan 2025 12:43:54 +0200 Subject: [PATCH 09/19] req-resp/tests: Refactor basic_request_response_works to use tokio Signed-off-by: Alexandru Vasile --- .../client/network/src/request_responses.rs | 111 ++++++++---------- 1 file changed, 48 insertions(+), 63 deletions(-) diff --git a/substrate/client/network/src/request_responses.rs b/substrate/client/network/src/request_responses.rs index 5931d05c43bf..5c18b3bd4bbb 100644 --- a/substrate/client/network/src/request_responses.rs +++ b/substrate/client/network/src/request_responses.rs @@ -1201,34 +1201,27 @@ mod tests { (swarm, listen_addr) } - #[test] - fn basic_request_response_works() { + #[tokio::test] + async fn basic_request_response_works() { let protocol_name = ProtocolName::from("/test/req-resp/1"); - let mut pool = LocalPool::new(); // Build swarms whose behaviour is [`RequestResponsesBehaviour`]. let mut swarms = (0..2) .map(|_| { let (tx, mut rx) = async_channel::bounded::(64); - pool.spawner() - .spawn_obj( - async move { - while let Some(rq) = rx.next().await { - let (fb_tx, fb_rx) = oneshot::channel(); - assert_eq!(rq.payload, b"this is a request"); - let _ = rq.pending_response.send(super::OutgoingResponse { - result: Ok(b"this is a response".to_vec()), - reputation_changes: Vec::new(), - sent_feedback: Some(fb_tx), - }); - fb_rx.await.unwrap(); - } - } - .boxed() - .into(), - ) - .unwrap(); + tokio::spawn(async move { + while let Some(rq) = rx.next().await { + let (fb_tx, fb_rx) = oneshot::channel(); + assert_eq!(rq.payload, b"this is a request"); + let _ = rq.pending_response.send(super::OutgoingResponse { + result: Ok(b"this is a response".to_vec()), + reputation_changes: Vec::new(), + sent_feedback: Some(fb_tx), + }); + fb_rx.await.unwrap(); + } + }); let protocol_config = ProtocolConfig { name: protocol_name.clone(), @@ -1252,56 +1245,48 @@ mod tests { let (mut swarm, _) = swarms.remove(0); // Running `swarm[0]` in the background. - pool.spawner() - .spawn_obj({ - async move { - loop { - match swarm.select_next_some().await { - SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => { - result.unwrap(); - }, - _ => {}, - } - } - } - .boxed() - .into() - }) - .unwrap(); - - // Remove and run the remaining swarm. - let (mut swarm, _) = swarms.remove(0); - pool.run_until(async move { - let mut response_receiver = None; - + tokio::spawn(async move { loop { match swarm.select_next_some().await { - SwarmEvent::ConnectionEstablished { peer_id, .. } => { - let (sender, receiver) = oneshot::channel(); - swarm.behaviour_mut().send_request( - &peer_id, - protocol_name.clone(), - b"this is a request".to_vec(), - None, - sender, - IfDisconnected::ImmediateError, - ); - assert!(response_receiver.is_none()); - response_receiver = Some(receiver); - }, - SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => { result.unwrap(); - break }, _ => {}, } } - - assert_eq!( - response_receiver.unwrap().await.unwrap().unwrap(), - (b"this is a response".to_vec(), protocol_name) - ); }); + + // Remove and run the remaining swarm. + let (mut swarm, _) = swarms.remove(0); + let mut response_receiver = None; + + loop { + match swarm.select_next_some().await { + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + let (sender, receiver) = oneshot::channel(); + swarm.behaviour_mut().send_request( + &peer_id, + protocol_name.clone(), + b"this is a request".to_vec(), + None, + sender, + IfDisconnected::ImmediateError, + ); + assert!(response_receiver.is_none()); + response_receiver = Some(receiver); + }, + SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + result.unwrap(); + break + }, + _ => {}, + } + } + + assert_eq!( + response_receiver.unwrap().await.unwrap().unwrap(), + (b"this is a response".to_vec(), protocol_name) + ); } #[test] From caa335dbb3b265188622e0deefaccf8060defd5e Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 20 Jan 2025 12:45:12 +0200 Subject: [PATCH 10/19] req-resp/tests: Refactor max_response_size_exceeded to use tokio Signed-off-by: Alexandru Vasile --- .../client/network/src/request_responses.rs | 112 ++++++++---------- 1 file changed, 49 insertions(+), 63 deletions(-) diff --git a/substrate/client/network/src/request_responses.rs b/substrate/client/network/src/request_responses.rs index 5c18b3bd4bbb..1375bd7460f9 100644 --- a/substrate/client/network/src/request_responses.rs +++ b/substrate/client/network/src/request_responses.rs @@ -1289,32 +1289,25 @@ mod tests { ); } - #[test] - fn max_response_size_exceeded() { + #[tokio::test] + async fn max_response_size_exceeded() { let protocol_name = ProtocolName::from("/test/req-resp/1"); - let mut pool = LocalPool::new(); // Build swarms whose behaviour is [`RequestResponsesBehaviour`]. let mut swarms = (0..2) .map(|_| { let (tx, mut rx) = async_channel::bounded::(64); - pool.spawner() - .spawn_obj( - async move { - while let Some(rq) = rx.next().await { - assert_eq!(rq.payload, b"this is a request"); - let _ = rq.pending_response.send(super::OutgoingResponse { - result: Ok(b"this response exceeds the limit".to_vec()), - reputation_changes: Vec::new(), - sent_feedback: None, - }); - } - } - .boxed() - .into(), - ) - .unwrap(); + tokio::spawn(async move { + while let Some(rq) = rx.next().await { + assert_eq!(rq.payload, b"this is a request"); + let _ = rq.pending_response.send(super::OutgoingResponse { + result: Ok(b"this response exceeds the limit".to_vec()), + reputation_changes: Vec::new(), + sent_feedback: None, + }); + } + }); let protocol_config = ProtocolConfig { name: protocol_name.clone(), @@ -1339,59 +1332,52 @@ mod tests { // Running `swarm[0]` in the background until a `InboundRequest` event happens, // which is a hint about the test having ended. let (mut swarm, _) = swarms.remove(0); - pool.spawner() - .spawn_obj({ - async move { - loop { - match swarm.select_next_some().await { - SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => { - assert!(result.is_ok()); - }, - SwarmEvent::ConnectionClosed { .. } => { - break; - }, - _ => {}, - } - } - } - .boxed() - .into() - }) - .unwrap(); - - // Remove and run the remaining swarm. - let (mut swarm, _) = swarms.remove(0); - pool.run_until(async move { - let mut response_receiver = None; - + tokio::spawn(async move { loop { match swarm.select_next_some().await { - SwarmEvent::ConnectionEstablished { peer_id, .. } => { - let (sender, receiver) = oneshot::channel(); - swarm.behaviour_mut().send_request( - &peer_id, - protocol_name.clone(), - b"this is a request".to_vec(), - None, - sender, - IfDisconnected::ImmediateError, - ); - assert!(response_receiver.is_none()); - response_receiver = Some(receiver); + SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => { + assert!(result.is_ok()); }, - SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { - assert!(result.is_err()); - break + SwarmEvent::ConnectionClosed { .. } => { + break; }, _ => {}, } } + }); + + // Remove and run the remaining swarm. + let (mut swarm, _) = swarms.remove(0); + + let mut response_receiver = None; - match response_receiver.unwrap().await.unwrap().unwrap_err() { - RequestFailure::Network(OutboundFailure::Io(_)) => {}, - request_failure => panic!("Unexpected failure: {request_failure:?}"), + loop { + match swarm.select_next_some().await { + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + let (sender, receiver) = oneshot::channel(); + swarm.behaviour_mut().send_request( + &peer_id, + protocol_name.clone(), + b"this is a request".to_vec(), + None, + sender, + IfDisconnected::ImmediateError, + ); + assert!(response_receiver.is_none()); + response_receiver = Some(receiver); + }, + SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + assert!(result.is_err()); + break + }, + _ => {}, } - }); + } + + match response_receiver.unwrap().await.unwrap().unwrap_err() { + RequestFailure::Network(OutboundFailure::Io(_)) => {}, + request_failure => panic!("Unexpected failure: {request_failure:?}"), + } } /// A `RequestId` is a unique identifier among either all inbound or all outbound requests for From f992c9c0e8e84c81e2f26be3d627daaa43f9a5cf Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 20 Jan 2025 12:46:53 +0200 Subject: [PATCH 11/19] req-resp/tests: Refactor request_id_collision to use tokio Signed-off-by: Alexandru Vasile --- .../client/network/src/request_responses.rs | 172 ++++++++---------- 1 file changed, 79 insertions(+), 93 deletions(-) diff --git a/substrate/client/network/src/request_responses.rs b/substrate/client/network/src/request_responses.rs index 1375bd7460f9..6103336757e7 100644 --- a/substrate/client/network/src/request_responses.rs +++ b/substrate/client/network/src/request_responses.rs @@ -1390,11 +1390,10 @@ mod tests { /// without a `RequestId` collision. /// /// See [`ProtocolRequestId`] for additional information. - #[test] - fn request_id_collision() { + #[tokio::test] + async fn request_id_collision() { let protocol_name_1 = ProtocolName::from("/test/req-resp-1/1"); let protocol_name_2 = ProtocolName::from("/test/req-resp-2/1"); - let mut pool = LocalPool::new(); let mut swarm_1 = { let protocol_configs = vec![ @@ -1452,106 +1451,93 @@ mod tests { swarm_1.dial(listen_add_2).unwrap(); // Run swarm 2 in the background, receiving two requests. - pool.spawner() - .spawn_obj( - async move { - loop { - match swarm_2.select_next_some().await { - SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => { - result.unwrap(); - }, - _ => {}, - } - } + tokio::spawn(async move { + loop { + match swarm_2.select_next_some().await { + SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => { + result.unwrap(); + }, + _ => {}, } - .boxed() - .into(), - ) - .unwrap(); + } + }); // Handle both requests sent by swarm 1 to swarm 2 in the background. // // Make sure both requests overlap, by answering the first only after receiving the // second. - pool.spawner() - .spawn_obj( - async move { - let protocol_1_request = swarm_2_handler_1.next().await; - let protocol_2_request = swarm_2_handler_2.next().await; - - protocol_1_request - .unwrap() - .pending_response - .send(OutgoingResponse { - result: Ok(b"this is a response".to_vec()), - reputation_changes: Vec::new(), - sent_feedback: None, - }) - .unwrap(); - protocol_2_request - .unwrap() - .pending_response - .send(OutgoingResponse { - result: Ok(b"this is a response".to_vec()), - reputation_changes: Vec::new(), - sent_feedback: None, - }) - .unwrap(); - } - .boxed() - .into(), - ) - .unwrap(); + tokio::spawn(async move { + let protocol_1_request = swarm_2_handler_1.next().await; + let protocol_2_request = swarm_2_handler_2.next().await; + + protocol_1_request + .unwrap() + .pending_response + .send(OutgoingResponse { + result: Ok(b"this is a response".to_vec()), + reputation_changes: Vec::new(), + sent_feedback: None, + }) + .unwrap(); + protocol_2_request + .unwrap() + .pending_response + .send(OutgoingResponse { + result: Ok(b"this is a response".to_vec()), + reputation_changes: Vec::new(), + sent_feedback: None, + }) + .unwrap(); + }); // Have swarm 1 send two requests to swarm 2 and await responses. - pool.run_until(async move { - let mut response_receivers = None; - let mut num_responses = 0; - loop { - match swarm_1.select_next_some().await { - SwarmEvent::ConnectionEstablished { peer_id, .. } => { - let (sender_1, receiver_1) = oneshot::channel(); - let (sender_2, receiver_2) = oneshot::channel(); - swarm_1.behaviour_mut().send_request( - &peer_id, - protocol_name_1.clone(), - b"this is a request".to_vec(), - None, - sender_1, - IfDisconnected::ImmediateError, - ); - swarm_1.behaviour_mut().send_request( - &peer_id, - protocol_name_2.clone(), - b"this is a request".to_vec(), - None, - sender_2, - IfDisconnected::ImmediateError, - ); - assert!(response_receivers.is_none()); - response_receivers = Some((receiver_1, receiver_2)); - }, - SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { - num_responses += 1; - result.unwrap(); - if num_responses == 2 { - break - } - }, - _ => {}, - } + let mut response_receivers = None; + let mut num_responses = 0; + + loop { + match swarm_1.select_next_some().await { + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + let (sender_1, receiver_1) = oneshot::channel(); + let (sender_2, receiver_2) = oneshot::channel(); + swarm_1.behaviour_mut().send_request( + &peer_id, + protocol_name_1.clone(), + b"this is a request".to_vec(), + None, + sender_1, + IfDisconnected::ImmediateError, + ); + swarm_1.behaviour_mut().send_request( + &peer_id, + protocol_name_2.clone(), + b"this is a request".to_vec(), + None, + sender_2, + IfDisconnected::ImmediateError, + ); + assert!(response_receivers.is_none()); + response_receivers = Some((receiver_1, receiver_2)); + }, + SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + num_responses += 1; + result.unwrap(); + if num_responses == 2 { + break + } + }, + _ => {}, } - let (response_receiver_1, response_receiver_2) = response_receivers.unwrap(); - assert_eq!( - response_receiver_1.await.unwrap().unwrap(), - (b"this is a response".to_vec(), protocol_name_1) - ); - assert_eq!( - response_receiver_2.await.unwrap().unwrap(), - (b"this is a response".to_vec(), protocol_name_2) - ); - }); + } + let (response_receiver_1, response_receiver_2) = response_receivers.unwrap(); + assert_eq!( + response_receiver_1.await.unwrap().unwrap(), + (b"this is a response".to_vec(), protocol_name_1) + ); + assert_eq!( + response_receiver_2.await.unwrap().unwrap(), + (b"this is a response".to_vec(), protocol_name_2) + ); } #[test] From 25891cce6191b63e6772dc1a9e72998c092a70fb Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 20 Jan 2025 12:58:45 +0200 Subject: [PATCH 12/19] req-resp/tests: Refactor request_fallback to use tokio Signed-off-by: Alexandru Vasile --- .../client/network/src/request_responses.rs | 291 +++++++++--------- 1 file changed, 137 insertions(+), 154 deletions(-) diff --git a/substrate/client/network/src/request_responses.rs b/substrate/client/network/src/request_responses.rs index 6103336757e7..8e42047a1fce 100644 --- a/substrate/client/network/src/request_responses.rs +++ b/substrate/client/network/src/request_responses.rs @@ -1151,7 +1151,7 @@ mod tests { use crate::mock::MockPeerStore; use assert_matches::assert_matches; - use futures::{channel::oneshot, executor::LocalPool, task::Spawn}; + use futures::channel::oneshot; use libp2p::{ core::{ transport::{MemoryTransport, Transport}, @@ -1540,12 +1540,11 @@ mod tests { ); } - #[test] - fn request_fallback() { + #[tokio::test] + async fn request_fallback() { let protocol_name_1 = ProtocolName::from("/test/req-resp/2"); let protocol_name_1_fallback = ProtocolName::from("/test/req-resp/1"); let protocol_name_2 = ProtocolName::from("/test/another"); - let mut pool = LocalPool::new(); let protocol_config_1 = ProtocolConfig { name: protocol_name_1.clone(), @@ -1583,39 +1582,31 @@ mod tests { let mut protocol_config_2 = protocol_config_2.clone(); protocol_config_2.inbound_queue = Some(tx_2); - pool.spawner() - .spawn_obj( - async move { - for _ in 0..2 { - if let Some(rq) = rx_1.next().await { - let (fb_tx, fb_rx) = oneshot::channel(); - assert_eq!(rq.payload, b"request on protocol /test/req-resp/1"); - let _ = rq.pending_response.send(super::OutgoingResponse { - result: Ok( - b"this is a response on protocol /test/req-resp/1".to_vec() - ), - reputation_changes: Vec::new(), - sent_feedback: Some(fb_tx), - }); - fb_rx.await.unwrap(); - } - } - - if let Some(rq) = rx_2.next().await { - let (fb_tx, fb_rx) = oneshot::channel(); - assert_eq!(rq.payload, b"request on protocol /test/other"); - let _ = rq.pending_response.send(super::OutgoingResponse { - result: Ok(b"this is a response on protocol /test/other".to_vec()), - reputation_changes: Vec::new(), - sent_feedback: Some(fb_tx), - }); - fb_rx.await.unwrap(); - } + tokio::spawn(async move { + for _ in 0..2 { + if let Some(rq) = rx_1.next().await { + let (fb_tx, fb_rx) = oneshot::channel(); + assert_eq!(rq.payload, b"request on protocol /test/req-resp/1"); + let _ = rq.pending_response.send(super::OutgoingResponse { + result: Ok(b"this is a response on protocol /test/req-resp/1".to_vec()), + reputation_changes: Vec::new(), + sent_feedback: Some(fb_tx), + }); + fb_rx.await.unwrap(); } - .boxed() - .into(), - ) - .unwrap(); + } + + if let Some(rq) = rx_2.next().await { + let (fb_tx, fb_rx) = oneshot::channel(); + assert_eq!(rq.payload, b"request on protocol /test/other"); + let _ = rq.pending_response.send(super::OutgoingResponse { + result: Ok(b"this is a response on protocol /test/other".to_vec()), + reputation_changes: Vec::new(), + sent_feedback: Some(fb_tx), + }); + fb_rx.await.unwrap(); + } + }); build_swarm(vec![protocol_config_1_fallback, protocol_config_2].into_iter()) }; @@ -1636,133 +1627,125 @@ mod tests { } // Running `older_swarm`` in the background. - pool.spawner() - .spawn_obj({ - async move { - loop { - _ = older_swarm.0.select_next_some().await; - } - } - .boxed() - .into() - }) - .unwrap(); + tokio::spawn(async move { + loop { + _ = older_swarm.0.select_next_some().await; + } + }); // Run the newer swarm. Attempt to make requests on all protocols. let (mut swarm, _) = new_swarm; let mut older_peer_id = None; - pool.run_until(async move { - let mut response_receiver = None; - // Try the new protocol with a fallback. - loop { - match swarm.select_next_some().await { - SwarmEvent::ConnectionEstablished { peer_id, .. } => { - older_peer_id = Some(peer_id); - let (sender, receiver) = oneshot::channel(); - swarm.behaviour_mut().send_request( - &peer_id, - protocol_name_1.clone(), - b"request on protocol /test/req-resp/2".to_vec(), - Some(( - b"request on protocol /test/req-resp/1".to_vec(), - protocol_config_1_fallback.name.clone(), - )), - sender, - IfDisconnected::ImmediateError, - ); - response_receiver = Some(receiver); - }, - SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { - result.unwrap(); - break - }, - _ => {}, - } + let mut response_receiver = None; + // Try the new protocol with a fallback. + loop { + match swarm.select_next_some().await { + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + older_peer_id = Some(peer_id); + let (sender, receiver) = oneshot::channel(); + swarm.behaviour_mut().send_request( + &peer_id, + protocol_name_1.clone(), + b"request on protocol /test/req-resp/2".to_vec(), + Some(( + b"request on protocol /test/req-resp/1".to_vec(), + protocol_config_1_fallback.name.clone(), + )), + sender, + IfDisconnected::ImmediateError, + ); + response_receiver = Some(receiver); + }, + SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + result.unwrap(); + break + }, + _ => {}, } - assert_eq!( - response_receiver.unwrap().await.unwrap().unwrap(), - ( - b"this is a response on protocol /test/req-resp/1".to_vec(), - protocol_name_1_fallback.clone() - ) - ); - // Try the old protocol with a useless fallback. - let (sender, response_receiver) = oneshot::channel(); - swarm.behaviour_mut().send_request( - older_peer_id.as_ref().unwrap(), - protocol_name_1_fallback.clone(), - b"request on protocol /test/req-resp/1".to_vec(), - Some(( - b"dummy request, will fail if processed".to_vec(), - protocol_config_1_fallback.name.clone(), - )), - sender, - IfDisconnected::ImmediateError, - ); - loop { - match swarm.select_next_some().await { - SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { - result.unwrap(); - break - }, - _ => {}, - } + } + assert_eq!( + response_receiver.unwrap().await.unwrap().unwrap(), + ( + b"this is a response on protocol /test/req-resp/1".to_vec(), + protocol_name_1_fallback.clone() + ) + ); + // Try the old protocol with a useless fallback. + let (sender, response_receiver) = oneshot::channel(); + swarm.behaviour_mut().send_request( + older_peer_id.as_ref().unwrap(), + protocol_name_1_fallback.clone(), + b"request on protocol /test/req-resp/1".to_vec(), + Some(( + b"dummy request, will fail if processed".to_vec(), + protocol_config_1_fallback.name.clone(), + )), + sender, + IfDisconnected::ImmediateError, + ); + loop { + match swarm.select_next_some().await { + SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + result.unwrap(); + break + }, + _ => {}, } - assert_eq!( - response_receiver.await.unwrap().unwrap(), - ( - b"this is a response on protocol /test/req-resp/1".to_vec(), - protocol_name_1_fallback.clone() - ) - ); - // Try the new protocol with no fallback. Should fail. - let (sender, response_receiver) = oneshot::channel(); - swarm.behaviour_mut().send_request( - older_peer_id.as_ref().unwrap(), - protocol_name_1.clone(), - b"request on protocol /test/req-resp-2".to_vec(), - None, - sender, - IfDisconnected::ImmediateError, - ); - loop { - match swarm.select_next_some().await { - SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { - assert_matches!( - result.unwrap_err(), - RequestFailure::Network(OutboundFailure::UnsupportedProtocols) - ); - break - }, - _ => {}, - } + } + assert_eq!( + response_receiver.await.unwrap().unwrap(), + ( + b"this is a response on protocol /test/req-resp/1".to_vec(), + protocol_name_1_fallback.clone() + ) + ); + // Try the new protocol with no fallback. Should fail. + let (sender, response_receiver) = oneshot::channel(); + swarm.behaviour_mut().send_request( + older_peer_id.as_ref().unwrap(), + protocol_name_1.clone(), + b"request on protocol /test/req-resp-2".to_vec(), + None, + sender, + IfDisconnected::ImmediateError, + ); + loop { + match swarm.select_next_some().await { + SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + assert_matches!( + result.unwrap_err(), + RequestFailure::Network(OutboundFailure::UnsupportedProtocols) + ); + break + }, + _ => {}, } - assert!(response_receiver.await.unwrap().is_err()); - // Try the other protocol with no fallback. - let (sender, response_receiver) = oneshot::channel(); - swarm.behaviour_mut().send_request( - older_peer_id.as_ref().unwrap(), - protocol_name_2.clone(), - b"request on protocol /test/other".to_vec(), - None, - sender, - IfDisconnected::ImmediateError, - ); - loop { - match swarm.select_next_some().await { - SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { - result.unwrap(); - break - }, - _ => {}, - } + } + assert!(response_receiver.await.unwrap().is_err()); + // Try the other protocol with no fallback. + let (sender, response_receiver) = oneshot::channel(); + swarm.behaviour_mut().send_request( + older_peer_id.as_ref().unwrap(), + protocol_name_2.clone(), + b"request on protocol /test/other".to_vec(), + None, + sender, + IfDisconnected::ImmediateError, + ); + loop { + match swarm.select_next_some().await { + SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + result.unwrap(); + break + }, + _ => {}, } - assert_eq!( - response_receiver.await.unwrap().unwrap(), - (b"this is a response on protocol /test/other".to_vec(), protocol_name_2.clone()) - ); - }); + } + assert_eq!( + response_receiver.await.unwrap().unwrap(), + (b"this is a response on protocol /test/other".to_vec(), protocol_name_2.clone()) + ); } /// This test ensures the `RequestResponsesBehaviour` propagates back the Request::Timeout error From d505321b3f0fe97bb4c732eed3ed8825030e39d9 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 20 Jan 2025 16:27:28 +0200 Subject: [PATCH 13/19] req-resp: Fix old libp2p warn to distinguish between response and failure Signed-off-by: Alexandru Vasile --- substrate/client/network/src/request_responses.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/substrate/client/network/src/request_responses.rs b/substrate/client/network/src/request_responses.rs index 8e42047a1fce..49c199548012 100644 --- a/substrate/client/network/src/request_responses.rs +++ b/substrate/client/network/src/request_responses.rs @@ -843,8 +843,9 @@ impl NetworkBehaviour for RequestResponsesBehaviour { None => { log::warn!( target: "sub-libp2p", - "Received `RequestResponseEvent::Message` with unexpected request id {:?}", + "Received `RequestResponseEvent::Message` with unexpected request id {:?} from {:?}", request_id, + peer ); continue }, @@ -915,8 +916,10 @@ impl NetworkBehaviour for RequestResponsesBehaviour { None => { log::warn!( target: "sub-libp2p", - "Received `RequestResponseEvent::Message` with unexpected request id {:?}", + "Received `RequestResponseEvent::OutboundFailure` with unexpected request id {:?} error {:?} from {:?}", request_id, + error, + peer ); continue }, From 7a7ee487ad8d6d350ec4791d706c91140ae59cad Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 20 Jan 2025 17:26:24 +0200 Subject: [PATCH 14/19] req-resp: Log elapsed time since request started Signed-off-by: Alexandru Vasile --- substrate/client/network/src/request_responses.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/substrate/client/network/src/request_responses.rs b/substrate/client/network/src/request_responses.rs index 49c199548012..6d15377c43dc 100644 --- a/substrate/client/network/src/request_responses.rs +++ b/substrate/client/network/src/request_responses.rs @@ -633,12 +633,12 @@ impl NetworkBehaviour for RequestResponsesBehaviour { target: "sub-libp2p", "Request {id:?} has no protocol registered.", ); - return Some(id.clone()) + return Some((id.clone(), None)) }; let elapsed = req.started_at.elapsed(); if elapsed > *request_timeout { - Some(id.clone()) + Some((id.clone(), Some(elapsed))) } else { None } @@ -652,7 +652,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { ); } - for id in timedout_requests { + for (id, _) in timedout_requests { let Some(req) = self.pending_requests.remove(&id) else { continue; }; From b6400104788d4b6bd9b700b1e1c7df6f9cb188d4 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Tue, 21 Jan 2025 11:49:22 +0200 Subject: [PATCH 15/19] Update substrate/client/network/src/request_responses.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bastian Köcher --- substrate/client/network/src/request_responses.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/substrate/client/network/src/request_responses.rs b/substrate/client/network/src/request_responses.rs index 6d15377c43dc..d1ff9abeb3bb 100644 --- a/substrate/client/network/src/request_responses.rs +++ b/substrate/client/network/src/request_responses.rs @@ -619,7 +619,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { fn poll(&mut self, cx: &mut Context) -> Poll>> { 'poll_all: loop { // Poll the periodic request check. - if let Poll::Ready(_) = self.periodic_request_check.poll_tick(cx) { + if self.periodic_request_check.poll_tick(cx).is_ready() { // First pass is needed to collect timed out requests and // submit `OutboundFailure::Timeout` on the moved oneshot sender. let timedout_requests: Vec<_> = self From 053d8ca6f8ba9f7ab1a80e6dfd38fb245ea60972 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 21 Jan 2025 18:09:54 +0200 Subject: [PATCH 16/19] req-resp: Introduce Option to wrap the oneshot::Sender Signed-off-by: Alexandru Vasile --- .../client/network/src/request_responses.rs | 102 +++++++++--------- 1 file changed, 52 insertions(+), 50 deletions(-) diff --git a/substrate/client/network/src/request_responses.rs b/substrate/client/network/src/request_responses.rs index d1ff9abeb3bb..2cda8f666fee 100644 --- a/substrate/client/network/src/request_responses.rs +++ b/substrate/client/network/src/request_responses.rs @@ -254,8 +254,14 @@ pub struct OutgoingResponse { /// Information stored about a pending request. struct PendingRequest { + /// The time when the request was sent to the libp2p request-response protocol. started_at: Instant, - response_tx: oneshot::Sender, ProtocolName), RequestFailure>>, + /// The channel to send the response back to the caller. + /// + /// This is wrapped in an `Option` to allow for the channel to be taken out + /// on force-detected timeouts. + response_tx: Option, ProtocolName), RequestFailure>>>, + /// Fallback request to send if the primary request fails. fallback_request: Option<(Vec, ProtocolName)>, } @@ -496,7 +502,7 @@ impl RequestResponsesBehaviour { (protocol_name.to_string().into(), request_id).into(), PendingRequest { started_at: Instant::now(), - response_tx: pending_response, + response_tx: Some(pending_response), fallback_request, }, ); @@ -620,55 +626,47 @@ impl NetworkBehaviour for RequestResponsesBehaviour { 'poll_all: loop { // Poll the periodic request check. if self.periodic_request_check.poll_tick(cx).is_ready() { - // First pass is needed to collect timed out requests and - // submit `OutboundFailure::Timeout` on the moved oneshot sender. - let timedout_requests: Vec<_> = self - .pending_requests - .iter() - .filter_map(|(id, req)| { - let Some(ProtocolDetails { request_timeout, .. }) = - self.protocols.get(&id.protocol) - else { - log::warn!( - target: "sub-libp2p", - "Request {id:?} has no protocol registered.", - ); - return Some((id.clone(), None)) - }; - - let elapsed = req.started_at.elapsed(); - if elapsed > *request_timeout { - Some((id.clone(), Some(elapsed))) - } else { - None - } - }) - .collect(); - - if !timedout_requests.is_empty() { - log::warn!( - target: "sub-libp2p", - "Requests {timedout_requests:?} force detected as timeout.", - ); - } + self.pending_requests.retain(|id, req| { + let Some(ProtocolDetails { request_timeout, .. }) = + self.protocols.get(&id.protocol) + else { + log::warn!( + target: "sub-libp2p", + "Request {id:?} has no protocol registered.", + ); - for (id, _) in timedout_requests { - let Some(req) = self.pending_requests.remove(&id) else { - continue; + if let Some(response_tx) = req.response_tx.take() { + if response_tx.send(Err(RequestFailure::UnknownProtocol)).is_err() { + log::debug!( + target: "sub-libp2p", + "Request {id:?} has no protocol registered. At the same time local node is no longer interested in the result.", + ); + } + } + return false }; - if req - .response_tx - .send(Err(RequestFailure::Network(OutboundFailure::Timeout))) - .is_err() - { - log::debug!( + let elapsed = req.started_at.elapsed(); + if elapsed > *request_timeout { + log::warn!( target: "sub-libp2p", - "Request {id:?} force detected as timeout. At the same time local \ - node is no longer interested in the result.", + "Request {id:?} force detected as timeout.", ); + + if let Some(response_tx) = req.response_tx.take() { + if response_tx.send(Err(RequestFailure::Network(OutboundFailure::Timeout))).is_err() { + log::debug!( + target: "sub-libp2p", + "Request {id:?} force detected as timeout. At the same time local node is no longer interested in the result.", + ); + } + } + + false + } else { + true } - } + }); } // Poll to see if any response is ready to be sent back. @@ -824,7 +822,11 @@ impl NetworkBehaviour for RequestResponsesBehaviour { .pending_requests .remove(&(protocol.clone(), request_id).into()) { - Some(PendingRequest { started_at, response_tx, .. }) => { + Some(PendingRequest { + started_at, + response_tx: Some(response_tx), + .. + }) => { log::trace!( target: "sub-libp2p", "received response from {peer} ({protocol:?}), {} bytes", @@ -840,12 +842,12 @@ impl NetworkBehaviour for RequestResponsesBehaviour { .map_err(|_| RequestFailure::Obsolete); (started_at, delivered) }, - None => { + _ => { log::warn!( target: "sub-libp2p", "Received `RequestResponseEvent::Message` with unexpected request id {:?} from {:?}", request_id, - peer + peer, ); continue }, @@ -875,7 +877,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { { Some(PendingRequest { started_at, - response_tx, + response_tx: Some(response_tx), fallback_request, }) => { // Try using the fallback request if the protocol was not @@ -913,7 +915,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { } started_at }, - None => { + _ => { log::warn!( target: "sub-libp2p", "Received `RequestResponseEvent::OutboundFailure` with unexpected request id {:?} error {:?} from {:?}", From 4d33dfcabe65de249631aa7853dab1f8b9a3bc06 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 22 Jan 2025 13:33:47 +0200 Subject: [PATCH 17/19] req-resp/tests: Lower test time to 2s by stopping first swarm + reduce timeouts Signed-off-by: Alexandru Vasile --- .../client/network/src/request_responses.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/substrate/client/network/src/request_responses.rs b/substrate/client/network/src/request_responses.rs index 2cda8f666fee..df0fa32a21a8 100644 --- a/substrate/client/network/src/request_responses.rs +++ b/substrate/client/network/src/request_responses.rs @@ -1759,17 +1759,17 @@ mod tests { /// For testing purposes, the communication happens on the `/test/req-resp/1` protocol. /// /// This is achieved by: - /// - Two swarms are connected, the first one is slow to respond and has the timeout set to 60 - /// seconds. The second swarm is configured with a timeout of 60 seconds in libp2p, however in - /// substrate this is set to 5 seconds. + /// - Two swarms are connected, the first one is slow to respond and has the timeout set to 10 + /// seconds. The second swarm is configured with a timeout of 10 seconds in libp2p, however in + /// substrate this is set to 1 second. /// - /// - The first swarm introduces a delay of 10 seconds before responding to the request. + /// - The first swarm introduces a delay of 2 seconds before responding to the request. /// - /// - The second swarm must enforce the 5 seconds timeout. + /// - The second swarm must enforce the 1 second timeout. #[tokio::test] async fn enforce_outbound_timeouts() { - const REQUEST_TIMEOUT: Duration = Duration::from_secs(60); - const REQUEST_TIMEOUT_SHORT: Duration = Duration::from_secs(5); + const REQUEST_TIMEOUT: Duration = Duration::from_secs(10); + const REQUEST_TIMEOUT_SHORT: Duration = Duration::from_secs(1); // These swarms only speaks protocol_name. let protocol_name = ProtocolName::from("/test/req-resp/1"); @@ -1802,6 +1802,7 @@ mod tests { reputation_changes: Vec::new(), sent_feedback: None, }); + return; } }); @@ -1849,6 +1850,7 @@ mod tests { match event { SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => { assert!(result.is_ok()); + break; }, SwarmEvent::ConnectionClosed { .. } => { break; From 296156d63cc63859862c2c64d715b5a1d3c695ff Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 22 Jan 2025 14:22:10 +0200 Subject: [PATCH 18/19] req-resp/tests: Fix clippy Signed-off-by: Alexandru Vasile --- substrate/client/network/src/request_responses.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/substrate/client/network/src/request_responses.rs b/substrate/client/network/src/request_responses.rs index df0fa32a21a8..661557111798 100644 --- a/substrate/client/network/src/request_responses.rs +++ b/substrate/client/network/src/request_responses.rs @@ -1788,7 +1788,7 @@ mod tests { let (tx, mut rx) = async_channel::bounded::(64); tokio::spawn(async move { - while let Some(rq) = rx.next().await { + if let Some(rq) = rx.next().await { assert_eq!(rq.payload, b"this is a request"); // Sleep for more than `REQUEST_TIMEOUT_SHORT` and less than @@ -1802,7 +1802,6 @@ mod tests { reputation_changes: Vec::new(), sent_feedback: None, }); - return; } }); From d13f9d7109f5197cb2c12484587a2a8681d6b2d1 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 22 Jan 2025 17:32:28 +0200 Subject: [PATCH 19/19] req-resp: Downgrade warnings to debug to not pollute validator logs Signed-off-by: Alexandru Vasile --- substrate/client/network/src/request_responses.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/substrate/client/network/src/request_responses.rs b/substrate/client/network/src/request_responses.rs index 661557111798..e21773632ed7 100644 --- a/substrate/client/network/src/request_responses.rs +++ b/substrate/client/network/src/request_responses.rs @@ -648,7 +648,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { let elapsed = req.started_at.elapsed(); if elapsed > *request_timeout { - log::warn!( + log::debug!( target: "sub-libp2p", "Request {id:?} force detected as timeout.", ); @@ -843,7 +843,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { (started_at, delivered) }, _ => { - log::warn!( + log::debug!( target: "sub-libp2p", "Received `RequestResponseEvent::Message` with unexpected request id {:?} from {:?}", request_id, @@ -916,7 +916,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { started_at }, _ => { - log::warn!( + log::debug!( target: "sub-libp2p", "Received `RequestResponseEvent::OutboundFailure` with unexpected request id {:?} error {:?} from {:?}", request_id,