Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(gossipsub): Attempt to publish to at least mesh_n peers #5578

Merged
merged 6 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ libp2p-core = { version = "0.42.0", path = "core" }
libp2p-dcutr = { version = "0.12.0", path = "protocols/dcutr" }
libp2p-dns = { version = "0.42.0", path = "transports/dns" }
libp2p-floodsub = { version = "0.45.0", path = "protocols/floodsub" }
libp2p-gossipsub = { version = "0.47.0", path = "protocols/gossipsub" }
libp2p-gossipsub = { version = "0.47.1", path = "protocols/gossipsub" }
libp2p-identify = { version = "0.45.0", path = "protocols/identify" }
libp2p-identity = { version = "0.2.9" }
libp2p-kad = { version = "0.46.0", path = "protocols/kad" }
Expand Down
5 changes: 5 additions & 0 deletions protocols/gossipsub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
## 0.47.0
jxs marked this conversation as resolved.
Show resolved Hide resolved

- Attempt to publish to at least mesh_n peers when flood publish is disabled.
See [PR XXXX](https://github.com/libp2p/rust-libp2p/pull/XXXX).
jxs marked this conversation as resolved.
Show resolved Hide resolved

## 0.47.0

<!-- Update to libp2p-swarm v0.45.0 -->
- Add ConnectionError to FromSwarm::ConnectionClosed.
See [PR 5485](https://github.com/libp2p/rust-libp2p/pull/5485).
Expand Down
2 changes: 1 addition & 1 deletion protocols/gossipsub/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-gossipsub"
edition = "2021"
rust-version = { workspace = true }
description = "Gossipsub protocol for libp2p"
version = "0.47.0"
version = "0.47.1"
authors = ["Age Manning <[email protected]>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
394 changes: 162 additions & 232 deletions protocols/gossipsub/src/behaviour.rs

Large diffs are not rendered by default.

101 changes: 54 additions & 47 deletions protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,9 @@ fn test_unsubscribe() {

for topic_hash in &topic_hashes {
assert!(
gs.topic_peers.contains_key(topic_hash),
gs.connected_peers
.values()
.any(|p| p.topics.contains(topic_hash)),
"Topic_peers contain a topic entry"
);
assert!(
Expand Down Expand Up @@ -629,8 +631,11 @@ fn test_publish_without_flood_publishing() {

// all peers should be subscribed to the topic
assert_eq!(
gs.topic_peers.get(&topic_hashes[0]).map(|p| p.len()),
Some(20),
gs.connected_peers
.values()
.filter(|p| p.topics.contains(&topic_hashes[0]))
.count(),
20,
"Peers should be subscribed to the topic"
);

Expand Down Expand Up @@ -669,8 +674,8 @@ fn test_publish_without_flood_publishing() {
let config: Config = Config::default();
assert_eq!(
publishes.len(),
config.mesh_n_low(),
"Should send a publish message to all known peers"
config.mesh_n(),
"Should send a publish message to at least mesh_n peers"
);

assert!(
Expand Down Expand Up @@ -809,9 +814,9 @@ fn test_inject_connected() {

// should add the new peers to `peer_topics` with an empty vec as a gossipsub node
for peer in peers {
let known_topics = gs.peer_topics.get(&peer).unwrap();
let peer = gs.connected_peers.get(&peer).unwrap();
assert!(
known_topics == &topic_hashes.iter().cloned().collect(),
peer.topics == topic_hashes.iter().cloned().collect(),
"The topics for each node should all topics"
);
}
Expand Down Expand Up @@ -860,24 +865,39 @@ fn test_handle_received_subscriptions() {

// verify the result

let peer_topics = gs.peer_topics.get(&peers[0]).unwrap().clone();
let peer = gs.connected_peers.get(&peers[0]).unwrap();
assert!(
peer_topics == topic_hashes.iter().take(3).cloned().collect(),
peer.topics
== topic_hashes
.iter()
.take(3)
.cloned()
.collect::<BTreeSet<_>>(),
"First peer should be subscribed to three topics"
);
let peer_topics = gs.peer_topics.get(&peers[1]).unwrap().clone();
let peer1 = gs.connected_peers.get(&peers[1]).unwrap();
assert!(
peer_topics == topic_hashes.iter().take(3).cloned().collect(),
peer1.topics
== topic_hashes
.iter()
.take(3)
.cloned()
.collect::<BTreeSet<_>>(),
"Second peer should be subscribed to three topics"
);

assert!(
!gs.peer_topics.contains_key(&unknown_peer),
!gs.connected_peers.contains_key(&unknown_peer),
"Unknown peer should not have been added"
);

for topic_hash in topic_hashes[..3].iter() {
let topic_peers = gs.topic_peers.get(topic_hash).unwrap().clone();
let topic_peers = gs
.connected_peers
.iter()
.filter(|(_, p)| p.topics.contains(topic_hash))
.map(|(peer_id, _)| *peer_id)
.collect::<BTreeSet<PeerId>>();
assert!(
topic_peers == peers[..2].iter().cloned().collect(),
"Two peers should be added to the first three topics"
Expand All @@ -894,13 +914,21 @@ fn test_handle_received_subscriptions() {
&peers[0],
);

let peer_topics = gs.peer_topics.get(&peers[0]).unwrap().clone();
assert!(
peer_topics == topic_hashes[1..3].iter().cloned().collect(),
let peer = gs.connected_peers.get(&peers[0]).unwrap().clone();
assert_eq!(
peer.topics,
topic_hashes[1..3].iter().cloned().collect::<BTreeSet<_>>(),
"Peer should be subscribed to two topics"
);

let topic_peers = gs.topic_peers.get(&topic_hashes[0]).unwrap().clone(); // only gossipsub at the moment
// only gossipsub at the moment
let topic_peers = gs
.connected_peers
.iter()
.filter(|(_, p)| p.topics.contains(&topic_hashes[0]))
.map(|(peer_id, _)| *peer_id)
.collect::<BTreeSet<PeerId>>();

assert!(
topic_peers == peers[1..2].iter().cloned().collect(),
"Only the second peers should be in the first topic"
Expand All @@ -924,9 +952,8 @@ fn test_get_random_peers() {
for _ in 0..20 {
peers.push(PeerId::random())
}

gs.topic_peers
.insert(topic_hash.clone(), peers.iter().cloned().collect());
let mut topics = BTreeSet::new();
topics.insert(topic_hash.clone());

gs.connected_peers = peers
.iter()
Expand All @@ -936,52 +963,32 @@ fn test_get_random_peers() {
PeerConnections {
kind: PeerKind::Gossipsubv1_1,
connections: vec![ConnectionId::new_unchecked(0)],
topics: topics.clone(),
},
)
})
.collect();

let random_peers =
get_random_peers(&gs.topic_peers, &gs.connected_peers, &topic_hash, 5, |_| {
true
});
let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 5, |_| true);
assert_eq!(random_peers.len(), 5, "Expected 5 peers to be returned");
let random_peers = get_random_peers(
&gs.topic_peers,
&gs.connected_peers,
&topic_hash,
30,
|_| true,
);
let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 30, |_| true);
assert!(random_peers.len() == 20, "Expected 20 peers to be returned");
assert!(
random_peers == peers.iter().cloned().collect(),
"Expected no shuffling"
);
let random_peers = get_random_peers(
&gs.topic_peers,
&gs.connected_peers,
&topic_hash,
20,
|_| true,
);
let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 20, |_| true);
assert!(random_peers.len() == 20, "Expected 20 peers to be returned");
assert!(
random_peers == peers.iter().cloned().collect(),
"Expected no shuffling"
);
let random_peers =
get_random_peers(&gs.topic_peers, &gs.connected_peers, &topic_hash, 0, |_| {
true
});
let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 0, |_| true);
assert!(random_peers.is_empty(), "Expected 0 peers to be returned");
// test the filter
let random_peers =
get_random_peers(&gs.topic_peers, &gs.connected_peers, &topic_hash, 5, |_| {
false
});
let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 5, |_| false);
assert!(random_peers.is_empty(), "Expected 0 peers to be returned");
let random_peers = get_random_peers(&gs.topic_peers, &gs.connected_peers, &topic_hash, 10, {
let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 10, {
|peer| peers.contains(peer)
});
assert!(random_peers.len() == 10, "Expected 10 peers to be returned");
Expand Down
15 changes: 10 additions & 5 deletions protocols/gossipsub/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,12 +355,17 @@ impl Metrics {
}
}

/// Register how many peers do we known are subscribed to this topic.
pub(crate) fn set_topic_peers(&mut self, topic: &TopicHash, count: usize) {
/// Increase the number of peers that are subscribed to this topic.
pub(crate) fn inc_topic_peers(&mut self, topic: &TopicHash) {
if self.register_topic(topic).is_ok() {
self.topic_peers_count
.get_or_create(topic)
.set(count as i64);
self.topic_peers_count.get_or_create(topic).inc();
}
}

/// Decrease the number of peers that are subscribed to this topic.
pub(crate) fn dec_topic_peers(&mut self, topic: &TopicHash) {
if self.register_topic(topic).is_ok() {
self.topic_peers_count.get_or_create(topic).dec();
}
}

Expand Down
4 changes: 3 additions & 1 deletion protocols/gossipsub/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use libp2p_identity::PeerId;
use libp2p_swarm::ConnectionId;
use prometheus_client::encoding::EncodeLabelValue;
use quick_protobuf::MessageWrite;
use std::fmt;
use std::fmt::Debug;
use std::{collections::BTreeSet, fmt};

use crate::rpc_proto::proto;
#[cfg(feature = "serde")]
Expand Down Expand Up @@ -77,6 +77,8 @@ pub(crate) struct PeerConnections {
pub(crate) kind: PeerKind,
/// Its current connections.
pub(crate) connections: Vec<ConnectionId>,
/// Subscribed topics.
pub(crate) topics: BTreeSet<TopicHash>,
}

/// Describes the types of peers that can exist in the gossipsub context.
Expand Down
Loading