From d223a56ce7345de7c2fc12f507e2766c703997ed Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Fri, 24 May 2024 16:35:05 +0300 Subject: [PATCH 1/5] fix(request-response)!: Report outbound failure when max concurrent streams is reached --- protocols/request-response/src/handler.rs | 13 +++- protocols/request-response/src/lib.rs | 14 ++++ .../request-response/tests/error_reporting.rs | 65 ++++++++++++++++++- 3 files changed, 88 insertions(+), 4 deletions(-) diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index 2d45e0d7dc3..4553634b265 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -159,6 +159,9 @@ where } }; + // Inbound connections are reported to the upper layer from within the worker task, + // so by failing to schedule the worker means the upper layer will never know + // about the inbound request. Because of that we do not report any inbound failure. if self .worker_streams .try_push(RequestId::Inbound(request_id), recv.boxed()) @@ -204,7 +207,8 @@ where .try_push(RequestId::Outbound(request_id), send.boxed()) .is_err() { - tracing::warn!("Dropping outbound stream because we are at capacity") + self.pending_events + .push_back(Event::OutboundMaxStreamsReached(request_id)); } } @@ -276,6 +280,9 @@ where /// A response to an inbound request was omitted as a result /// of dropping the response `sender` of an inbound `Request`. ResponseOmission(InboundRequestId), + /// An outbound request could not be sent because maximum + /// concurrent streams reached. + OutboundMaxStreamsReached(OutboundRequestId), /// An outbound request timed out while sending the request /// or waiting for the response. OutboundTimeout(OutboundRequestId), @@ -320,6 +327,10 @@ impl fmt::Debug for Event { .debug_tuple("Event::ResponseOmission") .field(request_id) .finish(), + Event::OutboundMaxStreamsReached(request_id) => f + .debug_tuple("Event::OutboundMaxStreamsReached") + .field(request_id) + .finish(), Event::OutboundTimeout(request_id) => f .debug_tuple("Event::OutboundTimeout") .field(request_id) diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index 4362b3255ad..490a70ca691 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -169,6 +169,8 @@ pub enum Event { pub enum OutboundFailure { /// The request could not be sent because a dialing attempt failed. DialFailure, + /// The request could not be sent because maximum concurrent streams reached. + MaxStreamsReached, /// The request timed out before a response was received. /// /// It is not known whether the request may have been @@ -189,6 +191,7 @@ impl fmt::Display for OutboundFailure { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { OutboundFailure::DialFailure => write!(f, "Failed to dial the requested peer"), + OutboundFailure::MaxStreamsReached => write!(f, "Maximum concurrent streams reached"), OutboundFailure::Timeout => write!(f, "Timeout while waiting for a response"), OutboundFailure::ConnectionClosed => { write!(f, "Connection was closed before a response was received") @@ -877,6 +880,17 @@ where error: InboundFailure::ResponseOmission, })); } + handler::Event::OutboundMaxStreamsReached(request_id) => { + let removed = self.remove_pending_outbound_response(&peer, connection, request_id); + debug_assert!(removed, "Expect request_id to be pending before failing."); + + self.pending_events + .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure { + peer, + request_id, + error: OutboundFailure::MaxStreamsReached, + })); + } handler::Event::OutboundTimeout(request_id) => { let removed = self.remove_pending_outbound_response(&peer, connection, request_id); debug_assert!( diff --git a/protocols/request-response/tests/error_reporting.rs b/protocols/request-response/tests/error_reporting.rs index 2dc82b2e0c5..f5870427677 100644 --- a/protocols/request-response/tests/error_reporting.rs +++ b/protocols/request-response/tests/error_reporting.rs @@ -161,6 +161,56 @@ async fn report_outbound_timeout_on_read_response() { futures::future::select(server_task, client_task).await; } +#[async_std::test] +async fn report_outbound_failure_on_max_streams() { + let _ = tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .try_init(); + + // `swarm2` will be able to handle only 1 stream per time + let swarm2_config = request_response::Config::default().with_max_concurrent_streams(1); + + let (peer1_id, mut swarm1) = new_swarm(); + let (peer2_id, mut swarm2) = new_swarm_with_config(swarm2_config); + + swarm1.listen().with_memory_addr_external().await; + swarm2.connect(&mut swarm1).await; + + let swarm1_task = async move { + let _req_id = swarm1 + .behaviour_mut() + .send_request(&peer2_id, Action::FailOnMaxStreams); + + // Keep the connection alive, otherwise swarm2 may receive `ConnectionClosed` instead + wait_no_events(&mut swarm1).await; + }; + + // Expects OutboundFailure::MaxStreamsReached failure + let swarm2_task = async move { + let (peer, _inbound_req_id, action, _resp_channel) = + wait_request(&mut swarm2).await.unwrap(); + assert_eq!(peer, peer1_id); + assert_eq!(action, Action::FailOnMaxStreams); + + // A task for sending back a response is already scheduled so max concurrent + // streams is reached and no new tasks can be sheduled. + // + // We produce the failure by creating new request before we response. + let outbound_req_id = swarm2 + .behaviour_mut() + .send_request(&peer1_id, Action::FailOnMaxStreams); + + let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap(); + assert_eq!(peer, peer1_id); + assert_eq!(req_id_done, outbound_req_id); + assert!(matches!(error, OutboundFailure::MaxStreamsReached)); + }; + + let swarm1_task = pin!(swarm1_task); + let swarm2_task = pin!(swarm2_task); + futures::future::select(swarm1_task, swarm2_task).await; +} + #[async_std::test] async fn report_inbound_failure_on_read_request() { let _ = tracing_subscriber::fmt() @@ -332,6 +382,7 @@ enum Action { FailOnWriteRequest, FailOnWriteResponse, TimeoutOnWriteResponse, + FailOnMaxStreams, } impl From for u8 { @@ -343,6 +394,7 @@ impl From for u8 { Action::FailOnWriteRequest => 3, Action::FailOnWriteResponse => 4, Action::TimeoutOnWriteResponse => 5, + Action::FailOnMaxStreams => 6, } } } @@ -358,6 +410,7 @@ impl TryFrom for Action { 3 => Ok(Action::FailOnWriteRequest), 4 => Ok(Action::FailOnWriteResponse), 5 => Ok(Action::TimeoutOnWriteResponse), + 6 => Ok(Action::FailOnMaxStreams), _ => Err(io::Error::new(io::ErrorKind::Other, "invalid action")), } } @@ -468,11 +521,10 @@ impl Codec for TestCodec { } } -fn new_swarm_with_timeout( - timeout: Duration, +fn new_swarm_with_config( + cfg: request_response::Config, ) -> (PeerId, Swarm>) { let protocols = iter::once((StreamProtocol::new("/test/1"), ProtocolSupport::Full)); - let cfg = request_response::Config::default().with_request_timeout(timeout); let swarm = Swarm::new_ephemeral(|_| request_response::Behaviour::::new(protocols, cfg)); @@ -481,6 +533,13 @@ fn new_swarm_with_timeout( (peed_id, swarm) } +fn new_swarm_with_timeout( + timeout: Duration, +) -> (PeerId, Swarm>) { + let cfg = request_response::Config::default().with_request_timeout(timeout); + new_swarm_with_config(cfg) +} + fn new_swarm() -> (PeerId, Swarm>) { new_swarm_with_timeout(Duration::from_millis(100)) } From e3c3bba8f46e571ab86680a2084380ee5af10d74 Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Fri, 24 May 2024 16:47:27 +0300 Subject: [PATCH 2/5] fix changelog and version --- Cargo.lock | 2 +- Cargo.toml | 2 +- protocols/request-response/CHANGELOG.md | 5 +++++ protocols/request-response/Cargo.toml | 2 +- 4 files changed, 8 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 05742c9a961..fdfaf30c0a8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3269,7 +3269,7 @@ dependencies = [ [[package]] name = "libp2p-request-response" -version = "0.26.2" +version = "0.27.0" dependencies = [ "anyhow", "async-std", diff --git a/Cargo.toml b/Cargo.toml index 805661b26d7..ee49dd57732 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -99,7 +99,7 @@ libp2p-pnet = { version = "0.24.0", path = "transports/pnet" } libp2p-quic = { version = "0.10.3", path = "transports/quic" } libp2p-relay = { version = "0.17.2", path = "protocols/relay" } libp2p-rendezvous = { version = "0.14.0", path = "protocols/rendezvous" } -libp2p-request-response = { version = "0.26.2", path = "protocols/request-response" } +libp2p-request-response = { version = "0.27.0", path = "protocols/request-response" } libp2p-server = { version = "0.12.7", path = "misc/server" } libp2p-stream = { version = "0.1.0-alpha.1", path = "protocols/stream" } libp2p-swarm = { version = "0.44.2", path = "swarm" } diff --git a/protocols/request-response/CHANGELOG.md b/protocols/request-response/CHANGELOG.md index 92417508786..b415f61b42f 100644 --- a/protocols/request-response/CHANGELOG.md +++ b/protocols/request-response/CHANGELOG.md @@ -1,3 +1,8 @@ +## 0.27.0 + +- Report outbound failure when max concurrent streams is reached. + See [PR 5417](https://github.com/libp2p/rust-libp2p/pull/5417). + ## 0.26.2 - Deprecate `Behaviour::add_address` in favor of `Swarm::add_peer_address`. diff --git a/protocols/request-response/Cargo.toml b/protocols/request-response/Cargo.toml index 1eb8c1ae95f..321bd2a5264 100644 --- a/protocols/request-response/Cargo.toml +++ b/protocols/request-response/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-request-response" edition = "2021" rust-version = { workspace = true } description = "Generic Request/Response Protocols" -version = "0.26.2" +version = "0.27.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" From f9a42a124b69d10baa45755858927a9359084af0 Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Fri, 24 May 2024 19:09:56 +0300 Subject: [PATCH 3/5] add timeout for swarm2_config in test case --- protocols/request-response/tests/error_reporting.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/protocols/request-response/tests/error_reporting.rs b/protocols/request-response/tests/error_reporting.rs index f5870427677..cb5e27e9c74 100644 --- a/protocols/request-response/tests/error_reporting.rs +++ b/protocols/request-response/tests/error_reporting.rs @@ -168,7 +168,9 @@ async fn report_outbound_failure_on_max_streams() { .try_init(); // `swarm2` will be able to handle only 1 stream per time - let swarm2_config = request_response::Config::default().with_max_concurrent_streams(1); + let swarm2_config = request_response::Config::default() + .with_request_timeout(Duration::from_millis(100)) + .with_max_concurrent_streams(1); let (peer1_id, mut swarm1) = new_swarm(); let (peer2_id, mut swarm2) = new_swarm_with_config(swarm2_config); From 8deab2526bca0f653bc79c3f8914c0bc202870e4 Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Wed, 29 May 2024 13:37:52 +0300 Subject: [PATCH 4/5] report error as OutboundError::Io --- Cargo.lock | 2 +- Cargo.toml | 2 +- protocols/request-response/CHANGELOG.md | 4 ++-- protocols/request-response/Cargo.toml | 2 +- protocols/request-response/src/handler.rs | 19 +++++++------------ protocols/request-response/src/lib.rs | 14 -------------- .../request-response/tests/error_reporting.rs | 4 ++-- 7 files changed, 14 insertions(+), 33 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2720e66a2f9..2203b20ba8d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3269,7 +3269,7 @@ dependencies = [ [[package]] name = "libp2p-request-response" -version = "0.27.0" +version = "0.26.3" dependencies = [ "anyhow", "async-std", diff --git a/Cargo.toml b/Cargo.toml index f2df7ff3d41..fdaf149dbae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -99,7 +99,7 @@ libp2p-pnet = { version = "0.24.0", path = "transports/pnet" } libp2p-quic = { version = "0.10.3", path = "transports/quic" } libp2p-relay = { version = "0.17.2", path = "protocols/relay" } libp2p-rendezvous = { version = "0.14.0", path = "protocols/rendezvous" } -libp2p-request-response = { version = "0.27.0", path = "protocols/request-response" } +libp2p-request-response = { version = "0.26.3", path = "protocols/request-response" } libp2p-server = { version = "0.12.7", path = "misc/server" } libp2p-stream = { version = "0.1.0-alpha.1", path = "protocols/stream" } libp2p-swarm = { version = "0.44.2", path = "swarm" } diff --git a/protocols/request-response/CHANGELOG.md b/protocols/request-response/CHANGELOG.md index b415f61b42f..425a4d8ddcb 100644 --- a/protocols/request-response/CHANGELOG.md +++ b/protocols/request-response/CHANGELOG.md @@ -1,6 +1,6 @@ -## 0.27.0 +## 0.26.3 -- Report outbound failure when max concurrent streams is reached. +- Report failure when streams are at capacity. See [PR 5417](https://github.com/libp2p/rust-libp2p/pull/5417). ## 0.26.2 diff --git a/protocols/request-response/Cargo.toml b/protocols/request-response/Cargo.toml index 321bd2a5264..d621e477bfb 100644 --- a/protocols/request-response/Cargo.toml +++ b/protocols/request-response/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-request-response" edition = "2021" rust-version = { workspace = true } description = "Generic Request/Response Protocols" -version = "0.27.0" +version = "0.26.3" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index 4553634b265..3f75643611e 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -159,9 +159,9 @@ where } }; - // Inbound connections are reported to the upper layer from within the worker task, - // so by failing to schedule the worker means the upper layer will never know - // about the inbound request. Because of that we do not report any inbound failure. + // Inbound connections are reported to the upper layer from within the above task, + // so by failing to schedule it, it means the upper layer will never know about the + // inbound request. Because of that we do not report any inbound failure. if self .worker_streams .try_push(RequestId::Inbound(request_id), recv.boxed()) @@ -207,8 +207,10 @@ where .try_push(RequestId::Outbound(request_id), send.boxed()) .is_err() { - self.pending_events - .push_back(Event::OutboundMaxStreamsReached(request_id)); + self.pending_events.push_back(Event::OutboundStreamFailed { + request_id: message.request_id, + error: io::Error::new(io::ErrorKind::Other, "max sub-streams reached"), + }); } } @@ -280,9 +282,6 @@ where /// A response to an inbound request was omitted as a result /// of dropping the response `sender` of an inbound `Request`. ResponseOmission(InboundRequestId), - /// An outbound request could not be sent because maximum - /// concurrent streams reached. - OutboundMaxStreamsReached(OutboundRequestId), /// An outbound request timed out while sending the request /// or waiting for the response. OutboundTimeout(OutboundRequestId), @@ -327,10 +326,6 @@ impl fmt::Debug for Event { .debug_tuple("Event::ResponseOmission") .field(request_id) .finish(), - Event::OutboundMaxStreamsReached(request_id) => f - .debug_tuple("Event::OutboundMaxStreamsReached") - .field(request_id) - .finish(), Event::OutboundTimeout(request_id) => f .debug_tuple("Event::OutboundTimeout") .field(request_id) diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index 490a70ca691..4362b3255ad 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -169,8 +169,6 @@ pub enum Event { pub enum OutboundFailure { /// The request could not be sent because a dialing attempt failed. DialFailure, - /// The request could not be sent because maximum concurrent streams reached. - MaxStreamsReached, /// The request timed out before a response was received. /// /// It is not known whether the request may have been @@ -191,7 +189,6 @@ impl fmt::Display for OutboundFailure { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { OutboundFailure::DialFailure => write!(f, "Failed to dial the requested peer"), - OutboundFailure::MaxStreamsReached => write!(f, "Maximum concurrent streams reached"), OutboundFailure::Timeout => write!(f, "Timeout while waiting for a response"), OutboundFailure::ConnectionClosed => { write!(f, "Connection was closed before a response was received") @@ -880,17 +877,6 @@ where error: InboundFailure::ResponseOmission, })); } - handler::Event::OutboundMaxStreamsReached(request_id) => { - let removed = self.remove_pending_outbound_response(&peer, connection, request_id); - debug_assert!(removed, "Expect request_id to be pending before failing."); - - self.pending_events - .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure { - peer, - request_id, - error: OutboundFailure::MaxStreamsReached, - })); - } handler::Event::OutboundTimeout(request_id) => { let removed = self.remove_pending_outbound_response(&peer, connection, request_id); debug_assert!( diff --git a/protocols/request-response/tests/error_reporting.rs b/protocols/request-response/tests/error_reporting.rs index cb5e27e9c74..7440495ba7b 100644 --- a/protocols/request-response/tests/error_reporting.rs +++ b/protocols/request-response/tests/error_reporting.rs @@ -187,7 +187,7 @@ async fn report_outbound_failure_on_max_streams() { wait_no_events(&mut swarm1).await; }; - // Expects OutboundFailure::MaxStreamsReached failure + // Expects OutboundFailure::Io failure let swarm2_task = async move { let (peer, _inbound_req_id, action, _resp_channel) = wait_request(&mut swarm2).await.unwrap(); @@ -205,7 +205,7 @@ async fn report_outbound_failure_on_max_streams() { let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap(); assert_eq!(peer, peer1_id); assert_eq!(req_id_done, outbound_req_id); - assert!(matches!(error, OutboundFailure::MaxStreamsReached)); + assert!(matches!(error, OutboundFailure::Io(_))); }; let swarm1_task = pin!(swarm1_task); From f23fee5ee7588e409e8c07e14656bdf19ab8977f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Mon, 3 Jun 2024 19:24:38 +0100 Subject: [PATCH 5/5] Apply suggestions from code review --- protocols/request-response/tests/error_reporting.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/protocols/request-response/tests/error_reporting.rs b/protocols/request-response/tests/error_reporting.rs index 7440495ba7b..e78bba926e2 100644 --- a/protocols/request-response/tests/error_reporting.rs +++ b/protocols/request-response/tests/error_reporting.rs @@ -167,7 +167,7 @@ async fn report_outbound_failure_on_max_streams() { .with_env_filter(EnvFilter::from_default_env()) .try_init(); - // `swarm2` will be able to handle only 1 stream per time + // `swarm2` will be able to handle only 1 stream per time. let swarm2_config = request_response::Config::default() .with_request_timeout(Duration::from_millis(100)) .with_max_concurrent_streams(1); @@ -183,11 +183,11 @@ async fn report_outbound_failure_on_max_streams() { .behaviour_mut() .send_request(&peer2_id, Action::FailOnMaxStreams); - // Keep the connection alive, otherwise swarm2 may receive `ConnectionClosed` instead + // Keep the connection alive, otherwise swarm2 may receive `ConnectionClosed` instead. wait_no_events(&mut swarm1).await; }; - // Expects OutboundFailure::Io failure + // Expects OutboundFailure::Io failure. let swarm2_task = async move { let (peer, _inbound_req_id, action, _resp_channel) = wait_request(&mut swarm2).await.unwrap();