Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature(gossipsub): switch internal async-channel, #570

Open
wants to merge 1 commit into
base: sigp-gossipsub
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion protocols/gossipsub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
## 0.48.1
## 0.49

- switch the internal `async-channel` used to dispatch messages from `NetworkBehaviour` to the `ConnectionHandler`
with an internal priority queue. See [PR XXXX](https://github.com/libp2p/rust-libp2p/pull/XXXX)

- Allow whitelisting topics for metrics to ensure metrics are recorded correctly for these topics.
See [PR 5895](https://github.com/libp2p/rust-libp2p/pull/5895)

Expand Down
69 changes: 39 additions & 30 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
// DEALINGS IN THE SOFTWARE.

use std::{
cmp::{max, Ordering, Ordering::Equal},
cmp::{
max,
Ordering::{self, Equal},
},
collections::{BTreeSet, HashMap, HashSet, VecDeque},
fmt,
fmt::Debug,
fmt::{self, Debug},
net::IpAddr,
task::{Context, Poll},
time::Duration,
Expand Down Expand Up @@ -57,7 +59,7 @@ use crate::{
metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty},
peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason},
protocol::SIGNING_PREFIX,
rpc::Sender,
queue::Queue,
rpc_proto::proto,
subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter},
time_cache::DuplicateCache,
Expand Down Expand Up @@ -746,6 +748,7 @@ where
if self.send_message(
*peer_id,
RpcOut::Publish {
message_id: msg_id.clone(),
message: raw_message.clone(),
timeout: Delay::new(self.config.publish_queue_duration()),
},
Expand Down Expand Up @@ -1348,6 +1351,7 @@ where
self.send_message(
*peer_id,
RpcOut::Forward {
message_id: id.clone(),
message: msg,
timeout: Delay::new(self.config.forward_queue_duration()),
},
Expand Down Expand Up @@ -2046,9 +2050,8 @@ where
// before we add all the gossip from this heartbeat in order to gain a true measure of
// steady-state size of the queues.
if let Some(m) = &mut self.metrics {
for sender_queue in self.connected_peers.values().map(|v| &v.sender) {
m.observe_priority_queue_size(sender_queue.priority_queue_len());
m.observe_non_priority_queue_size(sender_queue.non_priority_queue_len());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR removes the priority/non-priority logic.

for sender_queue in self.connected_peers.values().map(|v| &v.messages) {
m.observe_priority_queue_size(sender_queue.len());
}
}

Expand Down Expand Up @@ -2711,6 +2714,7 @@ where
self.send_message(
*peer_id,
RpcOut::Forward {
message_id: msg_id.clone(),
message: message.clone(),
timeout: Delay::new(self.config.forward_queue_duration()),
},
Expand Down Expand Up @@ -2834,7 +2838,7 @@ where
};

// Try sending the message to the connection handler.
match peer.sender.send_message(rpc) {
match peer.messages.try_push(rpc) {
Ok(()) => true,
Err(rpc) => {
// Sending failed because the channel is full.
Expand All @@ -2858,7 +2862,7 @@ where
| RpcOut::Prune(_)
| RpcOut::Subscribe(_)
| RpcOut::Unsubscribe(_) => {
unreachable!("Channel for highpriority control messages is unbounded and should always be open.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is no longer the case, because we've merged the priority and non-priority logic.

failed_messages.priority += 1;
}
}

Expand Down Expand Up @@ -3096,16 +3100,16 @@ where
.or_insert(PeerConnections {
kind: PeerKind::Floodsub,
connections: vec![],
sender: Sender::new(self.config.connection_handler_queue_len()),
topics: Default::default(),
dont_send: LinkedHashMap::new(),
messages: Queue::new(self.config.connection_handler_queue_len()),
});
// Add the new connection
connected_peer.connections.push(connection_id);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// This clones a reference to the Queue so any new handlers reference the same underlying queue. No data is actually cloned here.

Ok(Handler::new(
self.config.protocol_config(),
connected_peer.sender.new_receiver(),
connected_peer.messages.clone(),
))
}

Expand All @@ -3123,16 +3127,16 @@ where
.or_insert(PeerConnections {
kind: PeerKind::Floodsub,
connections: vec![],
sender: Sender::new(self.config.connection_handler_queue_len()),
topics: Default::default(),
dont_send: LinkedHashMap::new(),
messages: Queue::new(self.config.connection_handler_queue_len()),
});
// Add the new connection
connected_peer.connections.push(connection_id);

Ok(Handler::new(
self.config.protocol_config(),
connected_peer.sender.new_receiver(),
connected_peer.messages.clone(),
))
}

Expand Down Expand Up @@ -3173,7 +3177,7 @@ where
}
}
}
HandlerEvent::MessageDropped(rpc) => {
HandlerEvent::MessagesDropped(rpcs) => {
// Account for this in the scoring logic
if let Some((peer_score, _, _)) = &mut self.peer_score {
peer_score.failed_message_slow_peer(&propagation_source);
Expand All @@ -3182,26 +3186,21 @@ where
// Keep track of expired messages for the application layer.
let failed_messages = self.failed_messages.entry(propagation_source).or_default();
failed_messages.timeout += 1;
match rpc {
RpcOut::Publish { .. } => {
failed_messages.publish += 1;
}
RpcOut::Forward { .. } => {
failed_messages.forward += 1;
}
_ => {}
}

// Record metrics on the failure.
if let Some(metrics) = self.metrics.as_mut() {
for rpc in rpcs {
match rpc {
RpcOut::Publish { message, .. } => {
metrics.publish_msg_dropped(&message.topic);
metrics.timeout_msg_dropped(&message.topic);
failed_messages.publish += 1;
if let Some(metrics) = self.metrics.as_mut() {
metrics.publish_msg_dropped(&message.topic);
metrics.timeout_msg_dropped(&message.topic);
}
}
RpcOut::Forward { message, .. } => {
metrics.forward_msg_dropped(&message.topic);
metrics.timeout_msg_dropped(&message.topic);
failed_messages.forward += 1;
if let Some(metrics) = self.metrics.as_mut() {
metrics.forward_msg_dropped(&message.topic);
metrics.timeout_msg_dropped(&message.topic);
}
}
_ => {}
}
Expand Down Expand Up @@ -3306,6 +3305,16 @@ where
if let Some(metrics) = self.metrics.as_mut() {
metrics.register_idontwant(message_ids.len());
}

// Remove messages from the queue.
peer.messages.retain_mut(|rpc| match rpc {
RpcOut::Publish { message_id, .. }
| RpcOut::Forward { message_id, .. } => {
message_ids.contains(message_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs the negation

Suggested change
message_ids.contains(message_id)
!message_ids.contains(message_id)

}
_ => true,
});

for message_id in message_ids {
peer.dont_send.insert(message_id, Instant::now());
// Don't exceed capacity.
Expand Down
Loading
Loading