diff --git a/Cargo.lock b/Cargo.lock index 5184cd51d9..5cce54d180 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -186,7 +186,7 @@ checksum = "cf9ff0bbfd639f15c74af777d81383cf53efb7c93613f6cab67c6c11e05bbf8b" [[package]] name = "bee-autopeering" -version = "0.4.0" +version = "0.4.1" dependencies = [ "async-trait", "base64 0.13.0", @@ -247,7 +247,7 @@ dependencies = [ [[package]] name = "bee-gossip" -version = "0.4.0" +version = "0.5.0" dependencies = [ "async-trait", "bee-runtime", @@ -374,7 +374,7 @@ dependencies = [ [[package]] name = "bee-protocol" -version = "0.2.0" +version = "0.2.1" dependencies = [ "async-channel", "async-priority-queue", @@ -406,7 +406,7 @@ dependencies = [ [[package]] name = "bee-rest-api" -version = "0.2.0" +version = "0.2.1" dependencies = [ "async-trait", "bech32", diff --git a/bee-api/bee-rest-api/CHANGELOG.md b/bee-api/bee-rest-api/CHANGELOG.md index 3b6d57d84a..08c6b11b9a 100644 --- a/bee-api/bee-rest-api/CHANGELOG.md +++ b/bee-api/bee-rest-api/CHANGELOG.md @@ -19,7 +19,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Security --> -## 0.2.0 - 2022-XX-XX +## 0.2.1 - 2022-02-28 + +- Update `bee-gossip` dependency to 0.5.0; + +## 0.2.0 - 2022-01-28 ### Added diff --git a/bee-api/bee-rest-api/Cargo.toml b/bee-api/bee-rest-api/Cargo.toml index 37f29f68da..ad1432711d 100644 --- a/bee-api/bee-rest-api/Cargo.toml +++ b/bee-api/bee-rest-api/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bee-rest-api" -version = "0.2.0" +version = "0.2.1" authors = [ "IOTA Stiftung" ] edition = "2021" description = "The default REST API implementation for the IOTA Bee node software." @@ -12,7 +12,7 @@ homepage = "https://www.iota.org" [dependencies] bee-common = { version = "0.6.0", path = "../../bee-common/bee-common", default-features = false, optional = true } -bee-gossip = { version = "0.4.0", path = "../../bee-network/bee-gossip", default-features = false, optional = true } +bee-gossip = { version = "0.5.0", path = "../../bee-network/bee-gossip", default-features = false, optional = true } bee-ledger = { version = "0.6.1", path = "../../bee-ledger", default-features = false } bee-message = { version = "0.1.6", path = "../../bee-message", default-features = false } bee-pow = { version = "0.2.0", path = "../../bee-pow", default-features = false } diff --git a/bee-network/bee-autopeering/CHANGELOG.md b/bee-network/bee-autopeering/CHANGELOG.md index 32ca192a72..3abdf66945 100644 --- a/bee-network/bee-autopeering/CHANGELOG.md +++ b/bee-network/bee-autopeering/CHANGELOG.md @@ -19,6 +19,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Security --> +## 0.4.1 - 2022-02-28 + +### Changed + +- Displayed representation of `PeerId`s to start at the beginning of its significant part; + ## 0.4.0 - 2022-02-11 ### Added diff --git a/bee-network/bee-autopeering/Cargo.toml b/bee-network/bee-autopeering/Cargo.toml index a17e2ac364..2579a87938 100644 --- a/bee-network/bee-autopeering/Cargo.toml +++ b/bee-network/bee-autopeering/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bee-autopeering" -version = "0.4.0" +version = "0.4.1" authors = [ "IOTA Stiftung" ] edition = "2021" description = "Allows peers in the same IOTA network to automatically discover each other." diff --git a/bee-network/bee-autopeering/src/peer/peer_id.rs b/bee-network/bee-autopeering/src/peer/peer_id.rs index 06aa0e75dd..588de12bc1 100644 --- a/bee-network/bee-autopeering/src/peer/peer_id.rs +++ b/bee-network/bee-autopeering/src/peer/peer_id.rs @@ -18,6 +18,7 @@ use std::{ }; const DISPLAY_LENGTH: usize = 16; +const DISPLAY_OFFSET: usize = 8; /// Represents the unique identity of a peer in the network. #[derive(Copy, Clone)] @@ -98,7 +99,7 @@ impl fmt::Debug for PeerId { impl fmt::Display for PeerId { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.libp2p_peer_id().to_base58()[..DISPLAY_LENGTH].fmt(f) + self.libp2p_peer_id().to_base58()[DISPLAY_OFFSET..DISPLAY_OFFSET + DISPLAY_LENGTH].fmt(f) } } diff --git a/bee-network/bee-autopeering/src/task.rs b/bee-network/bee-autopeering/src/task.rs index 858b9b1870..793c0aac5a 100644 --- a/bee-network/bee-autopeering/src/task.rs +++ b/bee-network/bee-autopeering/src/task.rs @@ -117,7 +117,10 @@ impl TaskManager { let shutdown_tx = shutdown_senders.remove(&task_name).unwrap(); log::trace!("Shutting down: {}", task_name); - shutdown_tx.send(()).expect("error sending shutdown signal"); + + if shutdown_tx.send(()).is_err() { + log::error!("Error sending shutdown signal to task {task_name}."); + } } // Wait for all tasks to shutdown down in a certain order and maximum amount of time. diff --git a/bee-network/bee-gossip/CHANGELOG.md b/bee-network/bee-gossip/CHANGELOG.md index 7fcd86951d..977c3e0720 100644 --- a/bee-network/bee-gossip/CHANGELOG.md +++ b/bee-network/bee-gossip/CHANGELOG.md @@ -19,6 +19,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Security --> +## 0.5.0 - 2022-02-28 + +### Added + +- `PeerMetrics` type that keeps a count of dial attempts and identification timestamp; +- `PeerUnreachable` event that is fired after a certain number of unsuccessful dial attempts; + +### Changed + +- `PeerList` type keeps metrics for each peer; + +### Fixed + +- Missing `PeerRemoved` event for disconnected unknown peers; + ## 0.4.0 - 2022-01-20 ### Fixed diff --git a/bee-network/bee-gossip/Cargo.toml b/bee-network/bee-gossip/Cargo.toml index 4e150f8546..7fadc96d50 100644 --- a/bee-network/bee-gossip/Cargo.toml +++ b/bee-network/bee-gossip/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bee-gossip" -version = "0.4.0" +version = "0.5.0" authors = [ "IOTA Stiftung" ] edition = "2021" description = "Allows peers in the same IOTA network to exchange gossip messages with each other." diff --git a/bee-network/bee-gossip/src/alias.rs b/bee-network/bee-gossip/src/alias.rs index 0cfe19d478..aa66f2e732 100644 --- a/bee-network/bee-gossip/src/alias.rs +++ b/bee-network/bee-gossip/src/alias.rs @@ -7,9 +7,9 @@ #[macro_export] macro_rules! alias { ($peer_id:expr) => { - &$peer_id.to_base58()[46..] + &$peer_id.to_base58()[8..24] }; ($peer_id:expr, $len:expr) => { - &$peer_id.to_base58()[(52 - $len)..] + &$peer_id.to_base58()[8..8 + $len] }; } diff --git a/bee-network/bee-gossip/src/network/host.rs b/bee-network/bee-gossip/src/network/host.rs index fb25ba2784..54dccc019f 100644 --- a/bee-network/bee-gossip/src/network/host.rs +++ b/bee-network/bee-gossip/src/network/host.rs @@ -143,12 +143,8 @@ async fn process_swarm_event( }) .expect("send error"); - peerlist - .0 - .write() - .await - .insert_local_addr(address) - .expect("insert_local_addr"); + // Note: We don't care if the inserted address is a duplicate. + let _ = peerlist.0.write().await.add_local_addr(address); } SwarmEvent::ConnectionEstablished { peer_id, .. } => { debug!("Swarm event: connection established with {}.", alias!(peer_id)); @@ -182,6 +178,7 @@ async fn process_internal_command(internal_command: Command, swarm: &mut Swarm hang_up(swarm, peer_id).await, _ => {} } } @@ -209,9 +206,26 @@ async fn dial_peer(swarm: &mut Swarm, peer_id: PeerId, peerlist: // We just checked, that the peer is fine to be dialed. let PeerInfo { address: addr, alias, .. - } = peerlist.0.read().await.info(&peer_id).unwrap(); - - debug!("Dialing peer: {} ({}).", alias, alias!(peer_id)); + } = peerlist.0.read().await.info(&peer_id).expect("peer must exist"); + + let mut dial_attempt = 0; + + peerlist + .0 + .write() + .await + .update_metrics(&peer_id, |m| { + m.num_dials += 1; + dial_attempt = m.num_dials; + }) + .expect("peer must exist"); + + debug!( + "Dialing peer: {} ({}) attempt: #{}.", + alias, + alias!(peer_id), + dial_attempt + ); // TODO: We also use `Swarm::dial_addr` here (instead of `Swarm::dial`) for now. See if it's better to change // that. @@ -219,3 +233,9 @@ async fn dial_peer(swarm: &mut Swarm, peer_id: PeerId, peerlist: Ok(()) } + +async fn hang_up(swarm: &mut Swarm, peer_id: PeerId) { + debug!("Hanging up on: {}.", alias!(peer_id)); + + let _ = Swarm::disconnect_peer_id(swarm, peer_id); +} diff --git a/bee-network/bee-gossip/src/peer/list.rs b/bee-network/bee-gossip/src/peer/list.rs index 7429387701..9a5aed68aa 100644 --- a/bee-network/bee-gossip/src/peer/list.rs +++ b/bee-network/bee-gossip/src/peer/list.rs @@ -33,7 +33,7 @@ impl PeerListWrapper { pub struct PeerList { local_id: PeerId, local_addrs: HashSet, - peers: HashMap, + peers: HashMap, banned_peers: HashSet, banned_addrs: HashSet, } @@ -63,7 +63,8 @@ impl PeerList { alias: peer.alias.unwrap_or_else(|| alias!(peer_id).to_owned()), relation: PeerRelation::Known, }, - PeerState::Disconnected, + PeerState::default(), + PeerMetrics::default(), ), ) })); @@ -77,29 +78,21 @@ impl PeerList { } } - pub fn insert_peer(&mut self, peer_id: PeerId, peer_info: PeerInfo) -> Result<(), (PeerId, PeerInfo, Error)> { + pub fn add(&mut self, peer_id: PeerId, peer_info: PeerInfo) -> Result<(), (PeerId, PeerInfo, Error)> { if self.contains(&peer_id) { return Err((peer_id, peer_info, Error::PeerIsDuplicate(peer_id))); } // Since we already checked that such a `peer_id` is not yet present, the returned value is always `None`. - let _ = self.peers.insert(peer_id, (peer_info, PeerState::Disconnected)); - - Ok(()) - } - - pub fn insert_local_addr(&mut self, addr: Multiaddr) -> Result<(), (Multiaddr, Error)> { - if self.local_addrs.contains(&addr) { - return Err((addr.clone(), Error::AddressIsDuplicate(addr))); - } - - let _ = self.local_addrs.insert(addr); + let _ = self + .peers + .insert(peer_id, (peer_info, PeerState::default(), PeerMetrics::default())); Ok(()) } pub fn remove(&mut self, peer_id: &PeerId) -> Result { - let (info, _) = self.peers.remove(peer_id).ok_or(Error::PeerNotPresent(*peer_id))?; + let (info, _, _) = self.peers.remove(peer_id).ok_or(Error::PeerNotPresent(*peer_id))?; Ok(info) } @@ -112,18 +105,34 @@ impl PeerList { self.peers .get(peer_id) .ok_or(Error::PeerNotPresent(*peer_id)) - .map(|(peer_info, _)| peer_info.clone()) + .map(|(info, _, _)| info.clone()) + } + + pub fn metrics(&self, peer_id: &PeerId) -> Result { + self.peers + .get(peer_id) + .ok_or(Error::PeerNotPresent(*peer_id)) + .map(|(_, _, metrics)| metrics.clone()) } pub fn len(&self) -> usize { self.peers.len() } + /// Note: Returns an error if the address trying to be added is a duplicate. + pub fn add_local_addr(&mut self, addr: Multiaddr) -> Result<(), (Multiaddr, Error)> { + if self.local_addrs.insert(addr.clone()) { + Ok(()) + } else { + Err((addr.clone(), Error::AddressIsDuplicate(addr))) + } + } + pub fn update_info(&mut self, peer_id: &PeerId, mut update: U) -> Result<(), Error> where U: FnMut(&mut PeerInfo), { - let (info, _) = self.peers.get_mut(peer_id).ok_or(Error::PeerNotPresent(*peer_id))?; + let (info, _, _) = self.peers.get_mut(peer_id).ok_or(Error::PeerNotPresent(*peer_id))?; update(info); @@ -134,28 +143,39 @@ impl PeerList { where U: FnMut(&mut PeerState) -> Option, { - let (_, state) = self.peers.get_mut(peer_id).ok_or(Error::PeerNotPresent(*peer_id))?; + let (_, state, _) = self.peers.get_mut(peer_id).ok_or(Error::PeerNotPresent(*peer_id))?; Ok(update(state)) } + pub fn update_metrics(&mut self, peer_id: &PeerId, mut update: U) -> Result<(), Error> + where + U: FnMut(&mut PeerMetrics), + { + let (_, _, metrics) = self.peers.get_mut(peer_id).ok_or(Error::PeerNotPresent(*peer_id))?; + + update(metrics); + + Ok(()) + } + pub fn satisfies

(&self, peer_id: &PeerId, predicate: P) -> Result where - P: Fn(&PeerInfo, &PeerState) -> bool, + P: Fn(&PeerInfo, &PeerState, &PeerMetrics) -> bool, { self.peers .get(peer_id) .ok_or(Error::PeerNotPresent(*peer_id)) - .map(|(info, state)| predicate(info, state)) + .map(|(info, state, metrics)| predicate(info, state, metrics)) } - pub fn filter_info<'a, P: 'a>(&'a self, predicate: P) -> impl Iterator + '_ + pub fn filter<'a, P: 'a>(&'a self, predicate: P) -> impl Iterator + '_ where - P: Fn(&PeerInfo, &PeerState) -> bool, + P: Fn(&PeerInfo, &PeerState, &PeerMetrics) -> bool, { - self.peers.iter().filter_map(move |(peer_id, (info, state))| { - if predicate(info, state) { - Some((*peer_id, info.clone())) + self.peers.iter().filter_map(move |(peer_id, (info, state, metrics))| { + if predicate(info, state, metrics) { + Some((*peer_id, info.clone(), metrics.clone())) } else { None } @@ -164,19 +184,20 @@ impl PeerList { pub fn filter_count

(&self, predicate: P) -> usize where - P: Fn(&PeerInfo, &PeerState) -> bool, + P: Fn(&PeerInfo, &PeerState, &PeerMetrics) -> bool, { - self.peers.iter().fold( - 0, - |count, (_, (info, state))| { - if predicate(info, state) { count + 1 } else { count } - }, - ) + self.peers.iter().fold(0, |count, (_, (info, state, metrics))| { + if predicate(info, state, metrics) { + count + 1 + } else { + count + } + }) } pub fn filter_remove

(&mut self, peer_id: &PeerId, predicate: P) -> bool where - P: Fn(&PeerInfo, &PeerState) -> bool, + P: Fn(&PeerInfo, &PeerState, &PeerMetrics) -> bool, { // NB: Since we drop a potential reference to `&(PeerInfo, PeerState)` this code won't create a deadlock in case // we refactor `PeerList` in a way that `.remove` would only take a `&self`. @@ -184,7 +205,7 @@ impl PeerList { if self .peers .get(peer_id) - .filter(|(info, state)| predicate(info, state)) + .filter(|(info, state, metrics)| predicate(info, state, metrics)) .is_some() { // Should always return `true`, because we know it's there. @@ -261,16 +282,16 @@ impl PeerList { } else if self.banned_addrs.contains(peer_addr) { Err(Error::AddressIsBanned(peer_addr.clone())) } else if self - .satisfies(peer_id, |_, state| state.is_connected()) + .satisfies(peer_id, |_, state, _| state.is_connected()) .unwrap_or(false) { Err(Error::PeerIsConnected(*peer_id)) } else if !self.contains(peer_id) - && self.filter_count(|info, _| info.relation.is_unknown()) >= global::max_unknown_peers() + && self.filter_count(|info, _, _| info.relation.is_unknown()) >= global::max_unknown_peers() { Err(Error::ExceedsUnknownPeerLimit(global::max_unknown_peers())) } else if !self.contains(peer_id) - && self.filter_count(|info, _| info.relation.is_discovered()) >= global::max_discovered_peers() + && self.filter_count(|info, _, _| info.relation.is_discovered()) >= global::max_discovered_peers() { Err(Error::ExceedsDiscoveredPeerLimit(global::max_discovered_peers())) } else { @@ -296,24 +317,24 @@ impl PeerList { } else if self.banned_peers.contains(peer_id) { Err(Error::PeerIsBanned(*peer_id)) } else if self - .satisfies(peer_id, |_, state| state.is_connected()) + .satisfies(peer_id, |_, state, _| state.is_connected()) .unwrap_or(false) { Err(Error::PeerIsConnected(*peer_id)) } else { - let (peer_info, _) = self.peers.get(peer_id).unwrap(); + let (peer_info, _, _) = self.peers.get(peer_id).unwrap(); if self.local_addrs.contains(&peer_info.address) { Err(Error::AddressIsLocal(peer_info.address.clone())) } else if self.banned_addrs.contains(&peer_info.address) { Err(Error::AddressIsBanned(peer_info.address.clone())) } else if peer_info.relation.is_unknown() - && self.filter_count(|info, status| info.relation.is_unknown() && status.is_connected()) + && self.filter_count(|info, status, _| info.relation.is_unknown() && status.is_connected()) >= global::max_unknown_peers() { Err(Error::ExceedsUnknownPeerLimit(global::max_unknown_peers())) } else if peer_info.relation.is_discovered() - && self.filter_count(|info, status| info.relation.is_discovered() && status.is_connected()) + && self.filter_count(|info, status, _| info.relation.is_discovered() && status.is_connected()) >= global::max_discovered_peers() { Err(Error::ExceedsDiscoveredPeerLimit(global::max_discovered_peers())) @@ -342,9 +363,9 @@ impl PeerList { } fn find_peer_if_connected(&self, addr: &Multiaddr) -> Option { - self.filter_info(|info, state| state.is_connected() && info.address == *addr) + self.filter(|info, state, _| state.is_connected() && info.address == *addr) .next() - .map(|(peer_id, _)| peer_id) + .map(|(peer_id, _, _)| peer_id) } } @@ -367,7 +388,7 @@ mod tests { for i in 1..=3 { assert!( - pl.insert_peer( + pl.add( gen_random_peer_id(), gen_deterministic_peer_info(i, PeerRelation::Known) ) @@ -384,11 +405,11 @@ mod tests { let peer_id = gen_constant_peer_id(); - assert!(pl.insert_peer(peer_id, gen_constant_peer_info()).is_ok()); + assert!(pl.add(peer_id, gen_constant_peer_info()).is_ok()); // Do not allow inserting the same peer id twice. assert!(matches!( - pl.insert_peer(peer_id, gen_constant_peer_info()), + pl.add(peer_id, gen_constant_peer_info()), Err((_, _, Error::PeerIsDuplicate(_))) )); } @@ -413,7 +434,7 @@ mod tests { let mut pl = PeerList::new(local_id); - pl.insert_peer(peer_id, peer_info.clone()).unwrap(); + pl.add(peer_id, peer_info.clone()).unwrap(); pl.accepts_incoming_peer(&peer_id, &peer_info.address).unwrap(); } @@ -424,14 +445,14 @@ mod tests { let peer_id = gen_random_peer_id(); - pl.insert_peer(peer_id, gen_deterministic_peer_info(0, PeerRelation::Known)) + pl.add(peer_id, gen_deterministic_peer_info(0, PeerRelation::Known)) .unwrap(); assert_eq!(1, pl.len()); - pl.filter_remove(&peer_id, |info, _| info.relation.is_unknown()); + pl.filter_remove(&peer_id, |info, _, _| info.relation.is_unknown()); assert_eq!(1, pl.len()); - pl.filter_remove(&peer_id, |info, _| info.relation.is_known()); + pl.filter_remove(&peer_id, |info, _, _| info.relation.is_known()); assert_eq!(0, pl.len()); } @@ -475,6 +496,12 @@ pub enum PeerState { Connected(GossipSender), } +#[derive(Clone, Debug, Default)] +pub struct PeerMetrics { + pub(crate) num_dials: usize, + pub(crate) identified_at: Option, +} + impl Default for PeerState { fn default() -> Self { Self::Disconnected diff --git a/bee-network/bee-gossip/src/service/event.rs b/bee-network/bee-gossip/src/service/event.rs index 368ae1bf78..dff453fcff 100644 --- a/bee-network/bee-gossip/src/service/event.rs +++ b/bee-network/bee-gossip/src/service/event.rs @@ -100,6 +100,14 @@ pub enum Event { /// The peer's id. peer_id: PeerId, }, + + /// A peer didn't answer our repeated calls. + PeerUnreachable { + /// The peer's id. + peer_id: PeerId, + /// The peer's info. + peer_info: PeerInfo, + }, } /// Describes the internal events. @@ -123,8 +131,23 @@ pub enum InternalEvent { substream: Box, }, - /// The gossip protocol has been dropped with a peer. - ProtocolDropped { peer_id: PeerId }, + /// The gossip protocol with a peer was stopped. + ProtocolStopped { + /// The peer's id. + peer_id: PeerId, + }, + + /// A peer didn't answer our repeated calls. + PeerUnreachable { + /// The peer's id. + peer_id: PeerId, + }, + + /// A peer has identified itself via the `libp2p` Identify protocol. + PeerIdentified { + /// The peer's id. + peer_id: PeerId, + }, } /// Allows the user to receive [`Event`]s published by the network layer. diff --git a/bee-network/bee-gossip/src/service/host.rs b/bee-network/bee-gossip/src/service/host.rs index f23a559895..9f5dca129a 100644 --- a/bee-network/bee-gossip/src/service/host.rs +++ b/bee-network/bee-gossip/src/service/host.rs @@ -31,7 +31,10 @@ use rand::Rng; use tokio::time::{self, Duration, Instant}; use tokio_stream::wrappers::{IntervalStream, UnboundedReceiverStream}; +use std::time::{SystemTime, UNIX_EPOCH}; + const MAX_PEER_STATE_CHECKER_DELAY_MILLIS: u64 = 2000; +const MAX_DIALS: usize = 3; pub struct ServiceHostConfig { pub local_keys: identity::Keypair, @@ -171,7 +174,8 @@ async fn command_processor(shutdown: Shutdown, commands: CommandReceiver, sender while let Some(command) = commands.next().await { if let Err(e) = process_command(command, &senders, &peerlist).await { - error!("Error processing command. Cause: {}", e); + // Note: commands are allowed to fail as the user may not be up-to-date. + debug!("Command could not be executed. Cause: {}", e); continue; } } @@ -213,37 +217,47 @@ async fn peerstate_checker(shutdown: Shutdown, senders: Senders, peerlist: PeerL // Check, if there are any disconnected known peers, and schedule a reconnect attempt for each // of those. while interval.next().await.is_some() { - let peerlist_reader = peerlist.0.read().await; + let read = peerlist.0.read().await; // To how many known peers are we currently connected. - let num_known = peerlist_reader.filter_count(|info, _| info.relation.is_known()); - let num_connected_known = - peerlist_reader.filter_count(|info, state| info.relation.is_known() && state.is_connected()); + let num_known = read.filter_count(|info, _, _| info.relation.is_known()); + let num_connected_known = read.filter_count(|info, state, _| info.relation.is_known() && state.is_connected()); // To how many unknown peers are we currently connected. let num_connected_unknown = - peerlist_reader.filter_count(|info, state| info.relation.is_unknown() && state.is_connected()); + read.filter_count(|info, state, _| info.relation.is_unknown() && state.is_connected()); // To how many discovered peers are we currently connected. let num_connected_discovered = - peerlist_reader.filter_count(|info, state| info.relation.is_discovered() && state.is_connected()); + read.filter_count(|info, state, _| info.relation.is_discovered() && state.is_connected()); + + // How many peers we know of but are currently disconnected. + let num_disconnected = read.filter_count(|_, state, _| state.is_disconnected()); info!( - "Connected peers: known {}/{} unknown {}/{} discovered {}/{}.", + "Connected peers: known {}/{} unknown {}/{} discovered {}/{} - Disconnected peers: {}.", num_connected_known, num_known, num_connected_unknown, global::max_unknown_peers(), num_connected_discovered, - global::max_discovered_peers() + global::max_discovered_peers(), + num_disconnected, ); // Automatically try to reconnect known **and** discovered peers. The removal of discovered peers is a decision // that needs to be made in the autopeering service. - for (peer_id, info) in peerlist_reader.filter_info(|info, state| { + for (peer_id, peer_info, peer_metrics) in read.filter(|info, state, _| { (info.relation.is_known() || info.relation.is_discovered()) && state.is_disconnected() }) { - debug!("Trying to reconnect to: {} ({}).", info.alias, alias!(peer_id)); + if peer_metrics.num_dials >= MAX_DIALS { + log::debug!("Peer {} is unreachable.", peer_id); + + let _ = senders.events.send(Event::PeerUnreachable { peer_id, peer_info }); + continue; + } + + debug!("Trying to reconnect to: {} ({}).", peer_info.alias, alias!(peer_id)); // Ignore if the command fails. We can always retry the next time. let _ = senders.internal_commands.send(Command::DialPeer { peer_id }); @@ -354,7 +368,7 @@ async fn process_internal_event( .map_err(|_| Error::SendingEventFailed)?; } - InternalEvent::ProtocolDropped { peer_id } => { + InternalEvent::ProtocolStopped { peer_id } => { let mut peerlist = peerlist.0.write().await; // Try to disconnect, but ignore errors in-case the peer was disconnected already. @@ -362,15 +376,30 @@ async fn process_internal_event( // Only remove unknown peers. // NOTE: discovered peers should be removed manually via command if the autopeering protocol suggests it. - let _ = peerlist.filter_remove(&peer_id, |peer_info, _| peer_info.relation.is_unknown()); + let was_removed = peerlist.filter_remove(&peer_id, |peer_info, _, _| peer_info.relation.is_unknown()); // We no longer need to hold the lock. drop(peerlist); + // Make sure to end the underlying connection. + senders + .internal_commands + .send(Command::DisconnectPeer { peer_id }) + .map_err(|_| Error::SendingEventFailed)?; + senders .events .send(Event::PeerDisconnected { peer_id }) .map_err(|_| Error::SendingEventFailed)?; + + if was_removed { + log::trace!("Removed unknown peer: {peer_id}"); + + senders + .events + .send(Event::PeerRemoved { peer_id }) + .map_err(|_| Error::SendingEventFailed)?; + } } InternalEvent::ProtocolEstablished { @@ -396,7 +425,7 @@ async fn process_internal_event( alias: alias!(peer_id).to_string(), relation: PeerRelation::Unknown, }; - peerlist.insert_peer(peer_id, peer_info).map_err(|(_, _, e)| e)?; + peerlist.add(peer_id, peer_info).map_err(|(_, _, e)| e)?; peer_added = true; } @@ -407,17 +436,27 @@ async fn process_internal_event( // Spin up separate buffered reader and writer to efficiently process the gossip with that peer. let (r, w) = substream.split(); - let reader = BufReader::with_capacity(IO_BUFFER_LEN, r); - let writer = BufWriter::with_capacity(IO_BUFFER_LEN, w); + let inbound_gossip_rx = BufReader::with_capacity(IO_BUFFER_LEN, r); + let outbound_gossip_tx = BufWriter::with_capacity(IO_BUFFER_LEN, w); - let (incoming_tx, incoming_rx) = iota_gossip::channel(); - let (outgoing_tx, outgoing_rx) = iota_gossip::channel(); + let (inbound_gossip_tx, gossip_in) = iota_gossip::channel(); + let (gossip_out, outbound_gossip_rx) = iota_gossip::channel(); - iota_gossip::start_incoming_processor(peer_id, reader, incoming_tx, senders.internal_events.clone()); - iota_gossip::start_outgoing_processor(peer_id, writer, outgoing_rx, senders.internal_events.clone()); + iota_gossip::start_inbound_gossip_handler( + peer_id, + inbound_gossip_rx, + inbound_gossip_tx, + senders.internal_events.clone(), + ); + iota_gossip::start_outbound_gossip_handler( + peer_id, + outbound_gossip_tx, + outbound_gossip_rx, + senders.internal_events.clone(), + ); // We store a clone of the gossip send channel in order to send a shutdown signal. - let _ = peerlist.update_state(&peer_id, |state| state.set_connected(outgoing_tx.clone())); + let _ = peerlist.update_state(&peer_id, |state| state.set_connected(gossip_out.clone())); // We no longer need to hold the lock. drop(peerlist); @@ -447,8 +486,8 @@ async fn process_internal_event( .send(Event::PeerConnected { peer_id, info: peer_info, - gossip_in: incoming_rx, - gossip_out: outgoing_tx, + gossip_in, + gossip_out, }) .map_err(|_| Error::SendingEventFailed)?; } else { @@ -457,6 +496,29 @@ async fn process_internal_event( debug!("{}", accepted.unwrap_err()); } } + + InternalEvent::PeerUnreachable { peer_id } => { + if let Ok(peer_info) = peerlist.0.read().await.info(&peer_id) { + senders + .events + .send(Event::PeerUnreachable { peer_id, peer_info }) + .map_err(|_| Error::SendingEventFailed)?; + } + } + + InternalEvent::PeerIdentified { peer_id } => { + let _ = peerlist.0.write().await.update_metrics(&peer_id, |m| { + // Reset dial count. + m.num_dials = 0; + // Update Identify timestamp. + m.identified_at = Some( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("system time") + .as_secs(), + ); + }); + } } Ok(()) @@ -479,7 +541,7 @@ async fn add_peer( let mut peerlist = peerlist.0.write().await; // If the insert fails for some reason, we get the peer data back, so it can be reused. - match peerlist.insert_peer(peer_id, peer_info) { + match peerlist.add(peer_id, peer_info) { Ok(()) => { // Panic: // We just added the peer_id so unwrapping here is fine. diff --git a/bee-network/bee-gossip/src/swarm/behaviour.rs b/bee-network/bee-gossip/src/swarm/behaviour.rs index bef12a331e..8ef4d4ae90 100644 --- a/bee-network/bee-gossip/src/swarm/behaviour.rs +++ b/bee-network/bee-gossip/src/swarm/behaviour.rs @@ -43,13 +43,13 @@ impl NetworkBehaviourEventProcess for SwarmBehaviour { fn inject_event(&mut self, event: IdentifyEvent) { match event { IdentifyEvent::Received { peer_id, info } => { - trace!( - "Received Identify request from {}. Observed address: {:?}.", - alias!(peer_id), - info.observed_addr, - ); + trace!("Received Identify response from {}: {:?}.", alias!(peer_id), info,); - // TODO: log supported protocols by the peer (info.protocols) + // Panic: we made sure that the sender (network host) is always dropped before the receiver (service + // host) through the worker dependencies, hence this can never panic. + self.internal_sender + .send(InternalEvent::PeerIdentified { peer_id }) + .expect("send internal event"); } IdentifyEvent::Sent { peer_id } => { trace!("Sent Identify request to {}.", alias!(peer_id)); @@ -58,7 +58,13 @@ impl NetworkBehaviourEventProcess for SwarmBehaviour { trace!("Pushed Identify request to {}.", alias!(peer_id)); } IdentifyEvent::Error { peer_id, error } => { - warn!("Identification error with {}: Cause: {:?}.", alias!(peer_id), error); + debug!("Identification error with {}: Cause: {:?}.", alias!(peer_id), error); + + // Panic: we made sure that the sender (network host) is always dropped before the receiver (service + // host) through the worker dependencies, hence this can never panic. + self.internal_sender + .send(InternalEvent::PeerUnreachable { peer_id }) + .expect("send internal event"); } } } @@ -68,10 +74,10 @@ impl NetworkBehaviourEventProcess for SwarmBehaviour { fn inject_event(&mut self, event: IotaGossipEvent) { match event { IotaGossipEvent::ReceivedUpgradeRequest { from } => { - debug!("Received IOTA gossip request from {}.", alias!(from)); + trace!("Received IOTA gossip request from {}.", alias!(from)); } IotaGossipEvent::SentUpgradeRequest { to } => { - debug!("Sent IOTA gossip request to {}.", alias!(to)); + trace!("Sent IOTA gossip request to {}.", alias!(to)); } IotaGossipEvent::UpgradeCompleted { peer_id, @@ -79,24 +85,19 @@ impl NetworkBehaviourEventProcess for SwarmBehaviour { origin, substream, } => { - debug!("Successfully negotiated IOTA gossip protocol with {}.", alias!(peer_id)); - - if let Err(e) = self.internal_sender.send(InternalEvent::ProtocolEstablished { - peer_id, - peer_addr, - origin, - substream, - }) { - warn!( - "Send event error for {} after successfully established IOTA gossip protocol. Cause: {}", - peer_id, e - ); + trace!("Successfully negotiated IOTA gossip protocol with {}.", alias!(peer_id)); - // TODO: stop processors in that case. - } + self.internal_sender + .send(InternalEvent::ProtocolEstablished { + peer_id, + peer_addr, + origin, + substream, + }) + .expect("send internal event"); } IotaGossipEvent::UpgradeError { peer_id, error } => { - warn!( + debug!( "IOTA gossip upgrade error with {}: Cause: {:?}.", alias!(peer_id), error diff --git a/bee-network/bee-gossip/src/swarm/protocols/iota_gossip/io.rs b/bee-network/bee-gossip/src/swarm/protocols/iota_gossip/io.rs index 6f24ae9c1f..9de32ac552 100644 --- a/bee-network/bee-gossip/src/swarm/protocols/iota_gossip/io.rs +++ b/bee-network/bee-gossip/src/swarm/protocols/iota_gossip/io.rs @@ -28,89 +28,80 @@ pub fn channel() -> (GossipSender, GossipReceiver) { (sender, UnboundedReceiverStream::new(receiver)) } -pub fn start_incoming_processor( +pub fn start_inbound_gossip_handler( peer_id: PeerId, - mut reader: BufReader>>, - incoming_tx: GossipSender, - internal_event_sender: InternalEventSender, + mut inbound_gossip_rx: BufReader>>, + inbound_gossip_tx: GossipSender, + internal_event_tx: InternalEventSender, ) { tokio::spawn(async move { - let mut msg_buf = vec![0u8; MSG_BUFFER_LEN]; + let mut buf = vec![0u8; MSG_BUFFER_LEN]; loop { - if let Some(len) = (&mut reader).read(&mut msg_buf).await.ok().filter(|len| *len > 0) { - if incoming_tx.send(msg_buf[..len].to_vec()).is_err() { - debug!("gossip-in: receiver dropped locally."); + if let Some(len) = (&mut inbound_gossip_rx) + .read(&mut buf) + .await + .ok() + .filter(|len| *len > 0) + { + if inbound_gossip_tx.send(buf[..len].to_vec()).is_err() { + debug!("Terminating gossip protocol with {}.", alias!(peer_id)); - // The receiver of this channel was dropped, maybe due to a shutdown. There is nothing we can do - // to salvage this situation, hence we drop the connection. break; } } else { - debug!("gossip-in: stream closed remotely."); + debug!("Peer {} terminated gossip protocol.", alias!(peer_id)); - // NB: The network service will not shut down before it has received the `ProtocolDropped` event - // from all once connected peers, hence if the following send fails, then it - // must be considered a bug. - - // The remote peer dropped the connection. - internal_event_sender - .send(InternalEvent::ProtocolDropped { peer_id }) - .expect("The service must not shutdown as long as there are gossip tasks running."); + // Panic: we made sure that the sender (network host) is always dropped before the receiver (service + // host) through the worker dependencies, hence this can never panic. + internal_event_tx + .send(InternalEvent::ProtocolStopped { peer_id }) + .expect("send internal event"); break; } } - // Reasons why this task might end: - // (1) The remote dropped the TCP connection. - // (2) The local dropped the gossip_in receiver channel. - - debug!("gossip-in: exiting gossip-in processor for {}.", alias!(peer_id)); + trace!("Dropping gossip stream reader for {}.", alias!(peer_id)); }); } -pub fn start_outgoing_processor( +pub fn start_outbound_gossip_handler( peer_id: PeerId, - mut writer: BufWriter>>, - outgoing_rx: GossipReceiver, - internal_event_sender: InternalEventSender, + mut outbound_gossip_tx: BufWriter>>, + outbound_gossip_rx: GossipReceiver, + internal_event_tx: InternalEventSender, ) { tokio::spawn(async move { - let mut outgoing_gossip_receiver = outgoing_rx.fuse(); + let mut outbound_gossip_rx = outbound_gossip_rx.fuse(); // If the gossip sender dropped we end the connection. - while let Some(message) = outgoing_gossip_receiver.next().await { - // NB: Instead of polling another shutdown channel, we use an empty message + while let Some(message) = outbound_gossip_rx.next().await { + // Note: Instead of polling another shutdown channel, we use an empty message // to signal that we want to end the connection. We use this "trick" whenever the network // receives the `DisconnectPeer` command to enforce that the connection will be dropped. - if message.is_empty() { - debug!("gossip-out: received shutdown message."); + debug!( + "Terminating gossip protocol with {} (received shutdown signal).", + alias!(peer_id) + ); - // NB: The network service will not shut down before it has received the `ConnectionDropped` event - // from all once connected peers, hence if the following send fails, then it - // must be considered a bug. - - internal_event_sender - .send(InternalEvent::ProtocolDropped { peer_id }) - .expect("The service must not shutdown as long as there are gossip tasks running."); + // Panic: we made sure that the sender (network host) is always dropped before the receiver (service + // host) through the worker dependencies, hence this can never panic. + internal_event_tx + .send(InternalEvent::ProtocolStopped { peer_id }) + .expect("send internal event"); break; - } + } else if (&mut outbound_gossip_tx).write_all(&message).await.is_err() + || (&mut outbound_gossip_tx).flush().await.is_err() + { + debug!("Peer {} terminated gossip protocol.", alias!(peer_id)); - // If sending to the stream fails we end the connection. - // TODO: buffer for x milliseconds before flushing. - if (&mut writer).write_all(&message).await.is_err() || (&mut writer).flush().await.is_err() { - debug!("gossip-out: stream closed remotely"); break; } } - // Reasons why this task might end: - // (1) The local send the shutdown message (len = 0) - // (2) The remote dropped the TCP connection. - - debug!("gossip-out: exiting gossip-out processor for {}.", alias!(peer_id)); + trace!("Dropping gossip stream writer for {}.", alias!(peer_id)); }); } diff --git a/bee-network/bee-gossip/src/tests/alias.rs b/bee-network/bee-gossip/src/tests/alias.rs index 64ee5f9d52..18526c124a 100644 --- a/bee-network/bee-gossip/src/tests/alias.rs +++ b/bee-network/bee-gossip/src/tests/alias.rs @@ -9,12 +9,12 @@ use crate::alias; fn alias_default() { let peer_id = gen_constant_peer_id(); let alias = alias!(peer_id); - assert_eq!(alias, "eF27st"); + assert_eq!(alias, "JWEKvSFbben74C7H"); } #[test] fn alias_custom() { let peer_id = gen_constant_peer_id(); let alias = alias!(peer_id, 10); - assert_eq!(alias, "WSUEeF27st"); + assert_eq!(alias, "JWEKvSFbbe"); } diff --git a/bee-node/Cargo.toml b/bee-node/Cargo.toml index 0e454fb4c5..a4ca5c3221 100644 --- a/bee-node/Cargo.toml +++ b/bee-node/Cargo.toml @@ -13,7 +13,7 @@ homepage = "https://www.iota.org" [dependencies] bee-autopeering = { version = "0.4.0", path = "../bee-network/bee-autopeering", default-features = false, features = [ "rocksdb" ] } bee-common = { version = "0.6.0", path = "../bee-common/bee-common", default-features = false } -bee-gossip = { version = "0.4.0", path = "../bee-network/bee-gossip", default-features = false, features = [ "full" ] } +bee-gossip = { version = "0.5.0", path = "../bee-network/bee-gossip", default-features = false, features = [ "full" ] } bee-ledger = { version = "0.6.1", path = "../bee-ledger", default-features = false, features = [ "workers" ] } bee-message = { version = "0.1.6", path = "../bee-message", default-features = false } bee-protocol = { version = "0.2.0", path = "../bee-protocol", default-features = false, features = [ "workers" ] } diff --git a/bee-protocol/CHANGELOG.md b/bee-protocol/CHANGELOG.md index 5771f66b43..7961a68dda 100644 --- a/bee-protocol/CHANGELOG.md +++ b/bee-protocol/CHANGELOG.md @@ -19,7 +19,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Security --> -## 0.2.0 - 2022-XX-XX +## 0.2.1 - 2022-02-28 + +### Changed + +- Peer manager handles new bee-gossip `PeerUnreachable` event; + +## 0.2.0 - 2022-01-27 ### Added diff --git a/bee-protocol/Cargo.toml b/bee-protocol/Cargo.toml index 4841fbdd43..720aa2320b 100644 --- a/bee-protocol/Cargo.toml +++ b/bee-protocol/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bee-protocol" -version = "0.2.0" +version = "0.2.1" authors = [ "IOTA Stiftung" ] edition = "2021" description = "All types and workers enabling the IOTA protocol" @@ -13,7 +13,7 @@ homepage = "https://www.iota.org" [dependencies] bee-autopeering = { version = "0.4.0", path = "../bee-network/bee-autopeering", default-features = false } bee-common = { version = "0.6.0", path = "../bee-common/bee-common", default-features = false, optional = true } -bee-gossip = { version = "0.4.0", path = "../bee-network/bee-gossip", default-features = false } +bee-gossip = { version = "0.5.0", path = "../bee-network/bee-gossip", default-features = false } bee-ledger = { version = "0.6.0", path = "../bee-ledger", default-features = false, features = [ "workers" ], optional = true } bee-message = { version = "0.1.6", path = "../bee-message", default-features = false, features = [ "serde" ] } bee-pow = { version = "0.2.0", path = "../bee-pow", default-features = false } diff --git a/bee-protocol/src/workers/peer/manager.rs b/bee-protocol/src/workers/peer/manager.rs index de806b5ac6..b49c18216e 100644 --- a/bee-protocol/src/workers/peer/manager.rs +++ b/bee-protocol/src/workers/peer/manager.rs @@ -63,7 +63,7 @@ where let tangle = node.resource::>(); let requested_milestones = node.resource::(); let metrics = node.resource::(); - let network_command_tx = node.resource::(); + let gossip_command_tx = node.resource::(); let hasher = node.worker::().unwrap().tx.clone(); let message_responder = node.worker::().unwrap().tx.clone(); @@ -87,13 +87,13 @@ where match event { AutopeeringEvent::IncomingPeering { peer, .. } => { - handle_new_peering(peer, &network_name, &network_command_tx); + handle_new_peering(peer, &network_name, &gossip_command_tx); } AutopeeringEvent::OutgoingPeering { peer, .. } => { - handle_new_peering(peer, &network_name, &network_command_tx); + handle_new_peering(peer, &network_name, &gossip_command_tx); } AutopeeringEvent::PeeringDropped { peer_id } => { - handle_peering_dropped(peer_id, &network_command_tx); + handle_peering_dropped(peer_id, &gossip_command_tx); } _ => {} } @@ -103,6 +103,8 @@ where }); } + let gossip_command_tx = node.resource::(); + node.spawn::(|shutdown| async move { info!("Network handler running."); @@ -187,6 +189,18 @@ where info!("Disconnected peer {}.", peer.0.alias()); }) .unwrap_or_default(), + NetworkEvent::PeerUnreachable { peer_id, peer_info } => { + if peer_info.relation.is_discovered() { + // Remove that discovered peer. + + // Panic: sending commands cannot fail: same explanation as in other sender usages. + gossip_command_tx + .send(Command::RemovePeer { peer_id }) + .expect("send gossip command"); + + // TODO: tell the autopeering to remove that peer from the neighborhood. + } + } _ => (), // Ignore all other events for now } } @@ -198,25 +212,29 @@ where } } -fn handle_new_peering(peer: bee_autopeering::Peer, network_name: &str, command_tx: &NetworkCommandSender) { +fn handle_new_peering(peer: bee_autopeering::Peer, network_name: &str, gossip_command_tx: &NetworkCommandSender) { if let Some(multiaddr) = peer.service_multiaddr(network_name) { let peer_id = peer.peer_id().libp2p_peer_id(); - command_tx + // Panic: sending commands cannot fail due to worker dependencies: because the "Peer Manager" depends on + // the `bee-gossip` "ServiceHost", it is guaranteed that the receiver of this channel is not dropped + // before the sender. + gossip_command_tx .send(Command::AddPeer { peer_id, alias: Some(alias!(peer_id).to_string()), multiaddr, relation: PeerRelation::Discovered, }) - .expect("error sending network command"); + .expect("send command to gossip layer"); } } -fn handle_peering_dropped(peer_id: bee_autopeering::PeerId, command_tx: &NetworkCommandSender) { +fn handle_peering_dropped(peer_id: bee_autopeering::PeerId, gossip_command_tx: &NetworkCommandSender) { let peer_id = peer_id.libp2p_peer_id(); - command_tx + // Panic: sending commands cannot fail: same explanation as in other sender usages. + gossip_command_tx .send(Command::RemovePeer { peer_id }) - .expect("error sending network command"); + .expect("send command to gossip layer"); } diff --git a/bee-protocol/src/workers/peer/manager_res.rs b/bee-protocol/src/workers/peer/manager_res.rs index 6956cc5637..1d971460e8 100644 --- a/bee-protocol/src/workers/peer/manager_res.rs +++ b/bee-protocol/src/workers/peer/manager_res.rs @@ -181,4 +181,8 @@ impl PeerManager { .filter(|(_, (peer, ctx))| (ctx.is_some() && peer.is_synced())) .count() as u8 } + + pub fn len(&self) -> usize { + self.inner.read().peers.len() + } }