Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

net/libp2p: Enforce outbound request-response timeout limits #7222

Merged
merged 24 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
9f8ca21
net/req-resp: Introduce a new `ProtocolDetails` to hold protocol details
lexnv Jan 17, 2025
190fe89
net/req-resp: Evict timeout requests
lexnv Jan 17, 2025
c3b01da
Add prdoc
lexnv Jan 17, 2025
48f0127
net/req-resp: Fix cargo docs
lexnv Jan 17, 2025
f8e89f4
net/req-resp: Adjust logs
lexnv Jan 17, 2025
050c166
Adjust PRdoc
lexnv Jan 17, 2025
13d0094
req-resp/tests: Add test to ensure timouts are forced
lexnv Jan 20, 2025
e862829
req-resp: Remove debug asserts as libp2p might respond later
lexnv Jan 20, 2025
1ec4657
req-resp/tests: Refactor basic_request_response_works to use tokio
lexnv Jan 20, 2025
caa335d
req-resp/tests: Refactor max_response_size_exceeded to use tokio
lexnv Jan 20, 2025
f992c9c
req-resp/tests: Refactor request_id_collision to use tokio
lexnv Jan 20, 2025
25891cc
req-resp/tests: Refactor request_fallback to use tokio
lexnv Jan 20, 2025
06e3b5c
Merge remote-tracking branch 'origin/master' into lexnv/enforce-timeouts
lexnv Jan 20, 2025
4fff0f5
Merge branch 'master' into lexnv/enforce-timeouts
lexnv Jan 20, 2025
d505321
req-resp: Fix old libp2p warn to distinguish between response and
lexnv Jan 20, 2025
7cdd47a
Merge remote-tracking branch 'origin/lexnv/enforce-timeouts' into lex…
lexnv Jan 20, 2025
1e5edb2
Merge remote-tracking branch 'origin/master' into lexnv/enforce-timeouts
lexnv Jan 20, 2025
7a7ee48
req-resp: Log elapsed time since request started
lexnv Jan 20, 2025
b640010
Update substrate/client/network/src/request_responses.rs
lexnv Jan 21, 2025
053d8ca
req-resp: Introduce Option to wrap the oneshot::Sender
lexnv Jan 21, 2025
4d33dfc
req-resp/tests: Lower test time to 2s by stopping first swarm + reduce
lexnv Jan 22, 2025
296156d
req-resp/tests: Fix clippy
lexnv Jan 22, 2025
d13f9d7
req-resp: Downgrade warnings to debug to not pollute validator logs
lexnv Jan 22, 2025
79ee2d8
Merge branch 'master' into lexnv/enforce-timeouts
lexnv Jan 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions prdoc/pr_7222.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
title: Enforce libp2p outbound request-response timeout limits

doc:
- audience: Node Dev
description: |
This PR enforces that outbound requests are finished within the specified protocol timeout.
The stable2412 version running libp2p 0.52.4 contains a bug which does not track request timeouts properly:
https://github.com/libp2p/rust-libp2p/pull/5429.

The issue has been detected while submitting libp2p -> litep2p requests in kusama.
This aims to check that pending outbound requests have not timedout.
Although the issue has been fixed in libp2p, there might be other cases where this may happen.
For example: https://github.com/libp2p/rust-libp2p/pull/5417.

For more context see: https://github.com/paritytech/polkadot-sdk/issues/7076#issuecomment-2596085096.

crates:
- name: sc-network
bump: patch
165 changes: 123 additions & 42 deletions substrate/client/network/src/request_responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ use std::{

pub use libp2p::request_response::{Config, InboundRequestId, OutboundRequestId};

/// Periodically check if requests are taking too long.
const PERIODIC_REQUEST_CHECK: Duration = Duration::from_secs(2);

/// Possible failures occurring in the context of sending an outbound request and receiving the
/// response.
#[derive(Debug, Clone, thiserror::Error)]
Expand Down Expand Up @@ -336,16 +339,20 @@ impl<RequestId> From<(ProtocolName, RequestId)> for ProtocolRequestId<RequestId>
}
}

/// Details of a request-response protocol.
struct ProtocolDetails {
behaviour: Behaviour<GenericCodec>,
inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
request_timeout: Duration,
}

/// Implementation of `NetworkBehaviour` that provides support for request-response protocols.
pub struct RequestResponsesBehaviour {
/// The multiple sub-protocols, by name.
///
/// Contains the underlying libp2p request-response [`Behaviour`], plus an optional
/// "response builder" used to build responses for incoming requests.
protocols: HashMap<
ProtocolName,
(Behaviour<GenericCodec>, Option<async_channel::Sender<IncomingRequest>>),
>,
protocols: HashMap<ProtocolName, ProtocolDetails>,

/// Pending requests, passed down to a request-response [`Behaviour`], awaiting a reply.
pending_requests: HashMap<ProtocolRequestId<OutboundRequestId>, PendingRequest>,
Expand All @@ -365,6 +372,14 @@ pub struct RequestResponsesBehaviour {

/// Primarily used to get a reputation of a node.
peer_store: Arc<dyn PeerStoreProvider>,

/// Interval to check that the requests are not taking too long.
///
/// We had issues in the past where libp2p did not produce a timeout event in due time.
///
/// For more details, see:
/// - https://github.com/paritytech/polkadot-sdk/issues/7076#issuecomment-2596085096
periodic_request_check: tokio::time::Interval,
}

/// Generated by the response builder and waiting to be processed.
Expand Down Expand Up @@ -393,7 +408,7 @@ impl RequestResponsesBehaviour {
ProtocolSupport::Outbound
};

let rq_rp = Behaviour::with_codec(
let behaviour = Behaviour::with_codec(
GenericCodec {
max_request_size: protocol.max_request_size,
max_response_size: protocol.max_response_size,
Expand All @@ -405,7 +420,11 @@ impl RequestResponsesBehaviour {
);

match protocols.entry(protocol.name) {
Entry::Vacant(e) => e.insert((rq_rp, protocol.inbound_queue)),
Entry::Vacant(e) => e.insert(ProtocolDetails {
behaviour,
inbound_queue: protocol.inbound_queue,
request_timeout: protocol.request_timeout,
}),
Entry::Occupied(e) => return Err(RegisterError::DuplicateProtocol(e.key().clone())),
};
}
Expand All @@ -417,6 +436,7 @@ impl RequestResponsesBehaviour {
pending_responses_arrival_time: Default::default(),
send_feedback: Default::default(),
peer_store,
periodic_request_check: tokio::time::interval(PERIODIC_REQUEST_CHECK),
})
}

Expand All @@ -437,9 +457,11 @@ impl RequestResponsesBehaviour {
) {
log::trace!(target: "sub-libp2p", "send request to {target} ({protocol_name:?}), {} bytes", request.len());

if let Some((protocol, _)) = self.protocols.get_mut(protocol_name.deref()) {
if let Some(ProtocolDetails { behaviour, .. }) =
self.protocols.get_mut(protocol_name.deref())
{
Self::send_request_inner(
protocol,
behaviour,
&mut self.pending_requests,
target,
protocol_name,
Expand Down Expand Up @@ -521,18 +543,19 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
local_addr: &Multiaddr,
remote_addr: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
let iter = self.protocols.iter_mut().filter_map(|(p, (r, _))| {
if let Ok(handler) = r.handle_established_inbound_connection(
connection_id,
peer,
local_addr,
remote_addr,
) {
Some((p.to_string(), handler))
} else {
None
}
});
let iter =
self.protocols.iter_mut().filter_map(|(p, ProtocolDetails { behaviour, .. })| {
if let Ok(handler) = behaviour.handle_established_inbound_connection(
connection_id,
peer,
local_addr,
remote_addr,
) {
Some((p.to_string(), handler))
} else {
None
}
});

Ok(MultiHandler::try_from_iter(iter).expect(
"Protocols are in a HashMap and there can be at most one handler per protocol name, \
Expand All @@ -548,19 +571,20 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
role_override: Endpoint,
port_use: PortUse,
) -> Result<THandler<Self>, ConnectionDenied> {
let iter = self.protocols.iter_mut().filter_map(|(p, (r, _))| {
if let Ok(handler) = r.handle_established_outbound_connection(
connection_id,
peer,
addr,
role_override,
port_use,
) {
Some((p.to_string(), handler))
} else {
None
}
});
let iter =
self.protocols.iter_mut().filter_map(|(p, ProtocolDetails { behaviour, .. })| {
if let Ok(handler) = behaviour.handle_established_outbound_connection(
connection_id,
peer,
addr,
role_override,
port_use,
) {
Some((p.to_string(), handler))
} else {
None
}
});

Ok(MultiHandler::try_from_iter(iter).expect(
"Protocols are in a HashMap and there can be at most one handler per protocol name, \
Expand All @@ -569,8 +593,8 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
}

fn on_swarm_event(&mut self, event: FromSwarm) {
for (protocol, _) in self.protocols.values_mut() {
protocol.on_swarm_event(event);
for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
behaviour.on_swarm_event(event);
}
}

Expand All @@ -581,8 +605,8 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
event: THandlerOutEvent<Self>,
) {
let p_name = event.0;
if let Some((proto, _)) = self.protocols.get_mut(p_name.as_str()) {
return proto.on_connection_handler_event(peer_id, connection_id, event.1)
if let Some(ProtocolDetails { behaviour, .. }) = self.protocols.get_mut(p_name.as_str()) {
return behaviour.on_connection_handler_event(peer_id, connection_id, event.1)
} else {
log::warn!(
target: "sub-libp2p",
Expand All @@ -594,6 +618,60 @@ impl NetworkBehaviour for RequestResponsesBehaviour {

fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
'poll_all: loop {
// Poll the periodic request check.
lexnv marked this conversation as resolved.
Show resolved Hide resolved
if let Poll::Ready(_) = self.periodic_request_check.poll_tick(cx) {
lexnv marked this conversation as resolved.
Show resolved Hide resolved
// First pass is needed to collect timed out requests and
// submit `OutboundFailure::Timeout` on the moved oneshot sender.
let timedout_requests: Vec<_> = self
.pending_requests
.iter()
.filter_map(|(id, req)| {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not retain? Then we don't need collect & remove.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a tiny borrow-checker issue coming from the sender consuming self: retain will give us &mut req and the req.response oneshot::Sender consumes self when we send out the send(Err(RequestFailure::Network(OutboundFailure::Timeout))).

We could still avoid the extra alloc by adding an Option over the sender, will have a look if it complicates the code, thanks

let Some(ProtocolDetails { request_timeout, .. }) =
self.protocols.get(&id.protocol)
else {
log::warn!(
target: "sub-libp2p",
"Request with id {:?} has no protocol registered.",
id.request_id,
);
return Some(id.clone())
};

let elapsed = req.started_at.elapsed();
if elapsed > *request_timeout {
Some(id.clone())
} else {
None
}
})
.collect();

if !timedout_requests.is_empty() {
log::warn!(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this could only be triggered by a bug in libp2p/litep2p?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep this could be triggered only by a bug in libp2p (libp2p/rust-libp2p#5429). I expected that the latest libp2p version would solve this issue, however, I could reproduce it on our libp2p kusama validator (and locally):

2025-01-21 05:43:14.885  WARN tokio-runtime-worker sub-libp2p: Requests [(ProtocolRequestId { protocol: OnHeap("/b0a8d493285c2df73290dfb7e61f870f17b41801197a149ca93654499ea3dafe/sync/2"), request_id: OutboundRequestId(53207) }, Some(20.479579184s))] force detected as timeout.

2025-01-21 05:43:18.178  WARN tokio-runtime-worker sub-libp2p: Received `RequestResponseEvent::OutboundFailure` with unexpected request id OutboundRequestId(53207) error Timeout from PeerId("12D3KooWJZuPc7KPtJcbYurX7brn3booPwbK53ucJhhPmuc1zZPt")

In this case libp2p produces a timeout event ~4s after we have detected the request as timed out. I'll also have a look at our poll implementation, maybe there's something there

target: "sub-libp2p",
"Requests {timedout_requests:?} force detected as timeout.",
lexnv marked this conversation as resolved.
Show resolved Hide resolved
);
}

for id in timedout_requests {
let Some(req) = self.pending_requests.remove(&id) else {
lexnv marked this conversation as resolved.
Show resolved Hide resolved
continue;
};

if req
.response_tx
.send(Err(RequestFailure::Network(OutboundFailure::Timeout)))
.is_err()
{
log::debug!(
target: "sub-libp2p",
lexnv marked this conversation as resolved.
Show resolved Hide resolved
"Request with id {id:?} force detected as timeout. At the same time local \
node is no longer interested in the result.",
);
}
}
}

// Poll to see if any response is ready to be sent back.
while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) {
let RequestProcessingOutcome {
Expand All @@ -610,10 +688,12 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
};

if let Ok(payload) = result {
if let Some((protocol, _)) = self.protocols.get_mut(&*protocol_name) {
if let Some(ProtocolDetails { behaviour, .. }) =
self.protocols.get_mut(&*protocol_name)
{
log::trace!(target: "sub-libp2p", "send response to {peer} ({protocol_name:?}), {} bytes", payload.len());

if protocol.send_response(inner_channel, Ok(payload)).is_err() {
if behaviour.send_response(inner_channel, Ok(payload)).is_err() {
// Note: Failure is handled further below when receiving
// `InboundFailure` event from request-response [`Behaviour`].
log::debug!(
Expand Down Expand Up @@ -641,7 +721,8 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
let mut fallback_requests = vec![];

// Poll request-responses protocols.
for (protocol, (ref mut behaviour, ref mut resp_builder)) in &mut self.protocols {
for (protocol, ProtocolDetails { behaviour, inbound_queue, .. }) in &mut self.protocols
{
'poll_protocol: while let Poll::Ready(ev) = behaviour.poll(cx) {
let ev = match ev {
// Main events we are interested in.
Expand Down Expand Up @@ -696,7 +777,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour {

// Submit the request to the "response builder" passed by the user at
// initialization.
if let Some(resp_builder) = resp_builder {
if let Some(resp_builder) = inbound_queue {
// If the response builder is too busy, silently drop `tx`. This
// will be reported by the corresponding request-response
// [`Behaviour`] through an `InboundFailure::Omission` event.
Expand Down Expand Up @@ -904,7 +985,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour {

// Send out fallback requests.
for (peer, protocol, request, pending_response) in fallback_requests.drain(..) {
if let Some((behaviour, _)) = self.protocols.get_mut(&protocol) {
if let Some(ProtocolDetails { behaviour, .. }) = self.protocols.get_mut(&protocol) {
Self::send_request_inner(
behaviour,
&mut self.pending_requests,
Expand Down
Loading