Skip to content

Commit

Permalink
fix(request-response): Avoid hanging at capacity and on dial IO errors
Browse files Browse the repository at this point in the history
  • Loading branch information
oblique committed May 25, 2024
1 parent 94fef37 commit 7fea44b
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 23 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ libp2p-pnet = { version = "0.24.0", path = "transports/pnet" }
libp2p-quic = { version = "0.10.3", path = "transports/quic" }
libp2p-relay = { version = "0.17.2", path = "protocols/relay" }
libp2p-rendezvous = { version = "0.14.0", path = "protocols/rendezvous" }
libp2p-request-response = { version = "0.26.2", path = "protocols/request-response" }
libp2p-request-response = { version = "0.26.3", path = "protocols/request-response" }
libp2p-server = { version = "0.12.7", path = "misc/server" }
libp2p-stream = { version = "0.1.0-alpha.1", path = "protocols/stream" }
libp2p-swarm = { version = "0.44.2", path = "swarm" }
Expand Down
4 changes: 4 additions & 0 deletions protocols/request-response/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 0.26.3

- Avoid hanging at capacity and dial IO errors.

## 0.26.2

- Deprecate `Behaviour::add_address` in favor of `Swarm::add_peer_address`.
Expand Down
2 changes: 1 addition & 1 deletion protocols/request-response/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-request-response"
edition = "2021"
rust-version = { workspace = true }
description = "Generic Request/Response Protocols"
version = "0.26.2"
version = "0.26.3"
authors = ["Parity Technologies <[email protected]>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
72 changes: 52 additions & 20 deletions protocols/request-response/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::{InboundRequestId, OutboundRequestId, EMPTY_QUEUE_SHRINK_THRESHOLD};

use futures::channel::mpsc;
use futures::{channel::oneshot, prelude::*};
use instant::Instant;
use libp2p_swarm::handler::{
ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
ListenUpgradeError,
Expand Down Expand Up @@ -57,6 +58,10 @@ where
inbound_protocols: SmallVec<[TCodec::Protocol; 2]>,
/// The request/response message codec.
codec: TCodec,

request_timeout: Duration,
max_concurrent_streams: usize,

/// Queue of events to emit in `poll()`.
pending_events: VecDeque<Event<TCodec>>,
/// Outbound upgrades waiting to be emitted as an `OutboundSubstreamRequest`.
Expand Down Expand Up @@ -94,22 +99,24 @@ where
pub(super) fn new(
inbound_protocols: SmallVec<[TCodec::Protocol; 2]>,
codec: TCodec,
substream_timeout: Duration,
request_timeout: Duration,
inbound_request_id: Arc<AtomicU64>,
max_concurrent_streams: usize,
) -> Self {
let (inbound_sender, inbound_receiver) = mpsc::channel(0);
Self {
inbound_protocols,
codec,
request_timeout,
max_concurrent_streams,
pending_outbound: VecDeque::new(),
requested_outbound: Default::default(),
inbound_receiver,
inbound_sender,
pending_events: VecDeque::new(),
inbound_request_id,
worker_streams: futures_bounded::FuturesMap::new(
substream_timeout,
request_timeout,
max_concurrent_streams,
),
}
Expand Down Expand Up @@ -159,6 +166,9 @@ where
}
};

// Inbound connections are reported to the upper layer from within the worker task,
// so by failing to schedule the worker means the upper layer will never know
// about the inbound request. Because of that we do not report any inbound failure.
if self
.worker_streams
.try_push(RequestId::Inbound(request_id), recv.boxed())
Expand All @@ -183,6 +193,21 @@ where
.pop_front()
.expect("negotiated a stream without a pending message");

// If timeout is already reached then there is no need to proceed further.
if message.time.elapsed() >= self.request_timeout {
self.pending_events
.push_back(Event::OutboundTimeout(message.request_id));
return;
}

// If we are at capacity, reschedule request later on.
//
// TODO(oblique): Implement `futures_bounded::FuturesMap::is_full`
if self.worker_streams.len() == self.max_concurrent_streams {
self.requested_outbound.push_back(message);
return;
}

let mut codec = self.codec.clone();
let request_id = message.request_id;

Expand All @@ -199,13 +224,10 @@ where
})
};

if self
.worker_streams
self.worker_streams
.try_push(RequestId::Outbound(request_id), send.boxed())
.is_err()
{
tracing::warn!("Dropping outbound stream because we are at capacity")
}
.ok()
.expect("worker_streams at capacity");
}

fn on_dial_upgrade_error(
Expand Down Expand Up @@ -350,6 +372,7 @@ pub struct OutboundMessage<TCodec: Codec> {
pub(crate) request_id: OutboundRequestId,
pub(crate) request: TCodec::Request,
pub(crate) protocols: SmallVec<[TCodec::Protocol; 2]>,
pub(crate) time: Instant,
}

impl<TCodec> fmt::Debug for OutboundMessage<TCodec>
Expand Down Expand Up @@ -441,20 +464,29 @@ where
}));
}

// Emit outbound requests.
if let Some(request) = self.pending_outbound.pop_front() {
let protocols = request.protocols.clone();
self.requested_outbound.push_back(request);

return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(Protocol { protocols }, ()),
});
}
// Emit outbound requests if we are not at capacity.
if self.worker_streams.len() < self.max_concurrent_streams {
if let Some(request) = self.pending_outbound.pop_front() {
// If timeout is already reached then there is no need to proceed further.
if request.time.elapsed() >= self.request_timeout {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
Event::OutboundTimeout(request.request_id),
));
}

let protocols = request.protocols.clone();
self.requested_outbound.push_back(request);

return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(Protocol { protocols }, ()),
});
}

debug_assert!(self.pending_outbound.is_empty());
debug_assert!(self.pending_outbound.is_empty());

if self.pending_outbound.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD {
self.pending_outbound.shrink_to_fit();
if self.pending_outbound.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD {
self.pending_outbound.shrink_to_fit();
}
}

Poll::Pending
Expand Down
2 changes: 2 additions & 0 deletions protocols/request-response/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ pub use handler::ProtocolSupport;
use crate::handler::OutboundMessage;
use futures::channel::oneshot;
use handler::Handler;
use instant::Instant;
use libp2p_core::{ConnectedPoint, Endpoint, Multiaddr};
use libp2p_identity::PeerId;
use libp2p_swarm::{
Expand Down Expand Up @@ -428,6 +429,7 @@ where
request_id,
request,
protocols: self.outbound_protocols.clone(),
time: Instant::now(),
};

if let Some(request) = self.try_send_request(peer, request) {
Expand Down

0 comments on commit 7fea44b

Please sign in to comment.