From 1618bf9164e9a74cde281d5472a8cbfa3c87e66f Mon Sep 17 00:00:00 2001 From: Alexander Schmidt Date: Fri, 25 Feb 2022 15:06:38 +0100 Subject: [PATCH 01/25] Explicitly end underlying connection --- bee-network/bee-gossip/src/network/host.rs | 9 +++++++++ bee-network/bee-gossip/src/service/host.rs | 6 ++++++ 2 files changed, 15 insertions(+) diff --git a/bee-network/bee-gossip/src/network/host.rs b/bee-network/bee-gossip/src/network/host.rs index fb25ba2784..d7ae9cd2bd 100644 --- a/bee-network/bee-gossip/src/network/host.rs +++ b/bee-network/bee-gossip/src/network/host.rs @@ -182,6 +182,9 @@ async fn process_internal_command(internal_command: Command, swarm: &mut Swarm { + hang_up(swarm, peer_id).await + } _ => {} } } @@ -219,3 +222,9 @@ async fn dial_peer(swarm: &mut Swarm, peer_id: PeerId, peerlist: Ok(()) } + +async fn hang_up(swarm: &mut Swarm, peer_id: PeerId) { + debug!("Hanging up on: {}.", alias!(peer_id)); + + let _ = Swarm::disconnect_peer_id(swarm, peer_id); +} diff --git a/bee-network/bee-gossip/src/service/host.rs b/bee-network/bee-gossip/src/service/host.rs index f23a559895..bd9bcd537e 100644 --- a/bee-network/bee-gossip/src/service/host.rs +++ b/bee-network/bee-gossip/src/service/host.rs @@ -367,6 +367,12 @@ async fn process_internal_event( // We no longer need to hold the lock. drop(peerlist); + // Make sure to end the underlying connection. + senders + .internal_commands + .send(Command::DisconnectPeer { peer_id }) + .map_err(|_| Error::SendingEventFailed)?; + senders .events .send(Event::PeerDisconnected { peer_id }) From 441a5823eab537465e2a7816d87a088588ba959b Mon Sep 17 00:00:00 2001 From: Alexander Schmidt Date: Fri, 25 Feb 2022 15:55:42 +0100 Subject: [PATCH 02/25] Fire 'ProtocolStopped' event in any connection-end case --- bee-network/bee-gossip/src/service/event.rs | 4 ++-- bee-network/bee-gossip/src/service/host.rs | 2 +- .../src/swarm/protocols/iota_gossip/io.rs | 16 ++++++---------- 3 files changed, 9 insertions(+), 13 deletions(-) diff --git a/bee-network/bee-gossip/src/service/event.rs b/bee-network/bee-gossip/src/service/event.rs index 368ae1bf78..6ceb8f82a0 100644 --- a/bee-network/bee-gossip/src/service/event.rs +++ b/bee-network/bee-gossip/src/service/event.rs @@ -123,8 +123,8 @@ pub enum InternalEvent { substream: Box, }, - /// The gossip protocol has been dropped with a peer. - ProtocolDropped { peer_id: PeerId }, + /// The gossip protocol with a peer was stopped. + ProtocolStopped { peer_id: PeerId }, } /// Allows the user to receive [`Event`]s published by the network layer. diff --git a/bee-network/bee-gossip/src/service/host.rs b/bee-network/bee-gossip/src/service/host.rs index bd9bcd537e..7d25ebf184 100644 --- a/bee-network/bee-gossip/src/service/host.rs +++ b/bee-network/bee-gossip/src/service/host.rs @@ -354,7 +354,7 @@ async fn process_internal_event( .map_err(|_| Error::SendingEventFailed)?; } - InternalEvent::ProtocolDropped { peer_id } => { + InternalEvent::ProtocolStopped { peer_id } => { let mut peerlist = peerlist.0.write().await; // Try to disconnect, but ignore errors in-case the peer was disconnected already. diff --git a/bee-network/bee-gossip/src/swarm/protocols/iota_gossip/io.rs b/bee-network/bee-gossip/src/swarm/protocols/iota_gossip/io.rs index 6f24ae9c1f..b69a1e62f6 100644 --- a/bee-network/bee-gossip/src/swarm/protocols/iota_gossip/io.rs +++ b/bee-network/bee-gossip/src/swarm/protocols/iota_gossip/io.rs @@ -53,15 +53,13 @@ pub fn start_incoming_processor( // from all once connected peers, hence if the following send fails, then it // must be considered a bug. - // The remote peer dropped the connection. - internal_event_sender - .send(InternalEvent::ProtocolDropped { peer_id }) - .expect("The service must not shutdown as long as there are gossip tasks running."); - break; } } + // Ignore send errors. + let _ = internal_event_sender.send(InternalEvent::ProtocolStopped { peer_id }); + // Reasons why this task might end: // (1) The remote dropped the TCP connection. // (2) The local dropped the gossip_in receiver channel. @@ -91,11 +89,6 @@ pub fn start_outgoing_processor( // NB: The network service will not shut down before it has received the `ConnectionDropped` event // from all once connected peers, hence if the following send fails, then it // must be considered a bug. - - internal_event_sender - .send(InternalEvent::ProtocolDropped { peer_id }) - .expect("The service must not shutdown as long as there are gossip tasks running."); - break; } @@ -107,6 +100,9 @@ pub fn start_outgoing_processor( } } + // Ignore send errors. + let _ = internal_event_sender.send(InternalEvent::ProtocolStopped { peer_id }); + // Reasons why this task might end: // (1) The local send the shutdown message (len = 0) // (2) The remote dropped the TCP connection. From ed696a787c4ee8785f4ac92360f7c3b31aeb6785 Mon Sep 17 00:00:00 2001 From: Alexander Schmidt Date: Fri, 25 Feb 2022 17:20:18 +0100 Subject: [PATCH 03/25] Count dial attempts and signal 'PeerUnreachable' --- bee-network/bee-gossip/src/alias.rs | 4 +-- bee-network/bee-gossip/src/network/host.rs | 23 +++++++++---- bee-network/bee-gossip/src/peer/list.rs | 37 +++++++++++++++++---- bee-network/bee-gossip/src/service/event.rs | 7 ++++ bee-network/bee-gossip/src/service/host.rs | 14 ++++++-- bee-protocol/src/workers/peer/manager.rs | 6 ++++ 6 files changed, 74 insertions(+), 17 deletions(-) diff --git a/bee-network/bee-gossip/src/alias.rs b/bee-network/bee-gossip/src/alias.rs index 0cfe19d478..5db7002fbb 100644 --- a/bee-network/bee-gossip/src/alias.rs +++ b/bee-network/bee-gossip/src/alias.rs @@ -7,9 +7,9 @@ #[macro_export] macro_rules! alias { ($peer_id:expr) => { - &$peer_id.to_base58()[46..] + &$peer_id.to_base58()[..16] }; ($peer_id:expr, $len:expr) => { - &$peer_id.to_base58()[(52 - $len)..] + &$peer_id.to_base58()[..$len] }; } diff --git a/bee-network/bee-gossip/src/network/host.rs b/bee-network/bee-gossip/src/network/host.rs index d7ae9cd2bd..ded812ac59 100644 --- a/bee-network/bee-gossip/src/network/host.rs +++ b/bee-network/bee-gossip/src/network/host.rs @@ -182,9 +182,7 @@ async fn process_internal_command(internal_command: Command, swarm: &mut Swarm { - hang_up(swarm, peer_id).await - } + Command::DisconnectPeer { peer_id } => hang_up(swarm, peer_id).await, _ => {} } } @@ -212,9 +210,22 @@ async fn dial_peer(swarm: &mut Swarm, peer_id: PeerId, peerlist: // We just checked, that the peer is fine to be dialed. let PeerInfo { address: addr, alias, .. - } = peerlist.0.read().await.info(&peer_id).unwrap(); - - debug!("Dialing peer: {} ({}).", alias, alias!(peer_id)); + } = peerlist.0.read().await.info(&peer_id).expect("peer must exist"); + + let dial_attempt = peerlist + .0 + .write() + .await + .log_dial_attempt(&peer_id) + .expect("peer must exist") + .expect("peer must not be connected"); + + debug!( + "Dialing peer: {} ({}) attempt: #{}.", + alias, + alias!(peer_id), + dial_attempt + ); // TODO: We also use `Swarm::dial_addr` here (instead of `Swarm::dial`) for now. See if it's better to change // that. diff --git a/bee-network/bee-gossip/src/peer/list.rs b/bee-network/bee-gossip/src/peer/list.rs index 7429387701..d4cebce1ad 100644 --- a/bee-network/bee-gossip/src/peer/list.rs +++ b/bee-network/bee-gossip/src/peer/list.rs @@ -63,7 +63,7 @@ impl PeerList { alias: peer.alias.unwrap_or_else(|| alias!(peer_id).to_owned()), relation: PeerRelation::Known, }, - PeerState::Disconnected, + PeerState::default(), ), ) })); @@ -83,7 +83,7 @@ impl PeerList { } // Since we already checked that such a `peer_id` is not yet present, the returned value is always `None`. - let _ = self.peers.insert(peer_id, (peer_info, PeerState::Disconnected)); + let _ = self.peers.insert(peer_id, (peer_info, PeerState::default())); Ok(()) } @@ -115,6 +115,29 @@ impl PeerList { .map(|(peer_info, _)| peer_info.clone()) } + pub fn dial_attempts(&self, peer_id: &PeerId) -> Result, Error> { + let (_, state) = self.peers.get(peer_id).ok_or(Error::PeerNotPresent(*peer_id))?; + + if let PeerState::Disconnected { dial_attempts } = state { + Ok(Some(*dial_attempts)) + } else { + Ok(None) + } + } + + // Note: no-op if peer is already connected. + pub fn log_dial_attempt(&mut self, peer_id: &PeerId) -> Result, Error> { + let (_, state) = self.peers.get_mut(peer_id).ok_or(Error::PeerNotPresent(*peer_id))?; + + if let PeerState::Disconnected { dial_attempts } = state { + *dial_attempts += 1; + + Ok(Some(*dial_attempts)) + } else { + Ok(None) + } + } + pub fn len(&self) -> usize { self.peers.len() } @@ -471,19 +494,19 @@ mod tests { #[derive(Clone, Debug)] pub enum PeerState { - Disconnected, + Disconnected { dial_attempts: usize }, Connected(GossipSender), } impl Default for PeerState { fn default() -> Self { - Self::Disconnected + Self::Disconnected { dial_attempts: 0 } } } impl PeerState { pub fn is_disconnected(&self) -> bool { - matches!(self, Self::Disconnected) + matches!(self, Self::Disconnected { dial_attempts: _ }) } pub fn is_connected(&self) -> bool { @@ -497,7 +520,7 @@ impl PeerState { pub fn set_disconnected(&mut self) -> Option { match take(self) { - Self::Disconnected => None, + Self::Disconnected { .. } => None, Self::Connected(sender) => Some(sender), } } @@ -517,7 +540,7 @@ mod peerstate_tests { #[test] fn peer_state_change() { - let mut peerstate = PeerState::Disconnected; + let mut peerstate = PeerState::Disconnected { dial_attempts: 0 }; let (tx, _rx) = channel(); peerstate.set_connected(tx); diff --git a/bee-network/bee-gossip/src/service/event.rs b/bee-network/bee-gossip/src/service/event.rs index 6ceb8f82a0..94c7743932 100644 --- a/bee-network/bee-gossip/src/service/event.rs +++ b/bee-network/bee-gossip/src/service/event.rs @@ -100,6 +100,13 @@ pub enum Event { /// The peer's id. peer_id: PeerId, }, + /// A peer didn't answer our repeated calls. + PeerUnreachable { + /// The peer's id. + peer_id: PeerId, + /// The relation we have with that peer. + peer_info: PeerInfo, + }, } /// Describes the internal events. diff --git a/bee-network/bee-gossip/src/service/host.rs b/bee-network/bee-gossip/src/service/host.rs index 7d25ebf184..bf4da9b7a5 100644 --- a/bee-network/bee-gossip/src/service/host.rs +++ b/bee-network/bee-gossip/src/service/host.rs @@ -32,6 +32,7 @@ use tokio::time::{self, Duration, Instant}; use tokio_stream::wrappers::{IntervalStream, UnboundedReceiverStream}; const MAX_PEER_STATE_CHECKER_DELAY_MILLIS: u64 = 2000; +const MAX_DIAL_ATTEMPTS: usize = 3; pub struct ServiceHostConfig { pub local_keys: identity::Keypair, @@ -240,10 +241,19 @@ async fn peerstate_checker(shutdown: Shutdown, senders: Senders, peerlist: PeerL // Automatically try to reconnect known **and** discovered peers. The removal of discovered peers is a decision // that needs to be made in the autopeering service. - for (peer_id, info) in peerlist_reader.filter_info(|info, state| { + for (peer_id, peer_info) in peerlist_reader.filter_info(|info, state| { (info.relation.is_known() || info.relation.is_discovered()) && state.is_disconnected() }) { - debug!("Trying to reconnect to: {} ({}).", info.alias, alias!(peer_id)); + let dial_attempts = peerlist_reader.dial_attempts(&peer_id).expect("peer does exist").expect("peer is not connected"); + + if dial_attempts >= MAX_DIAL_ATTEMPTS { + log::debug!("Peer {} is unreachable.", peer_id); + + let _ = senders.events.send(Event::PeerUnreachable { peer_id, peer_info }); + continue; + } + + debug!("Trying to reconnect to: {} ({}).", peer_info.alias, alias!(peer_id)); // Ignore if the command fails. We can always retry the next time. let _ = senders.internal_commands.send(Command::DialPeer { peer_id }); diff --git a/bee-protocol/src/workers/peer/manager.rs b/bee-protocol/src/workers/peer/manager.rs index de806b5ac6..ff3a58666a 100644 --- a/bee-protocol/src/workers/peer/manager.rs +++ b/bee-protocol/src/workers/peer/manager.rs @@ -187,6 +187,12 @@ where info!("Disconnected peer {}.", peer.0.alias()); }) .unwrap_or_default(), + NetworkEvent::PeerUnreachable { peer_id, peer_info } => { + if peer_info.relation.is_discovered() { + // todo: tell the autopeering to remove that neighbor + log::error!("Tell autopeering to remove {}", alias!(peer_id)); + } + } _ => (), // Ignore all other events for now } } From d1e453b894f954594cfabb4838743dff0e325210 Mon Sep 17 00:00:00 2001 From: Alexander Schmidt Date: Fri, 25 Feb 2022 18:28:54 +0100 Subject: [PATCH 04/25] Temporarly show peer count for gossip-peerlist and protocol-peer_ manager --- bee-network/bee-gossip/src/service/host.rs | 8 ++++++-- bee-protocol/src/workers/peer/manager.rs | 1 + bee-protocol/src/workers/peer/manager_res.rs | 4 ++++ 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/bee-network/bee-gossip/src/service/host.rs b/bee-network/bee-gossip/src/service/host.rs index bf4da9b7a5..aeabd91d14 100644 --- a/bee-network/bee-gossip/src/service/host.rs +++ b/bee-network/bee-gossip/src/service/host.rs @@ -216,6 +216,9 @@ async fn peerstate_checker(shutdown: Shutdown, senders: Senders, peerlist: PeerL while interval.next().await.is_some() { let peerlist_reader = peerlist.0.read().await; + // How many peers we know about. + let num_all = peerlist_reader.len(); + // To how many known peers are we currently connected. let num_known = peerlist_reader.filter_count(|info, _| info.relation.is_known()); let num_connected_known = @@ -230,13 +233,14 @@ async fn peerstate_checker(shutdown: Shutdown, senders: Senders, peerlist: PeerL peerlist_reader.filter_count(|info, state| info.relation.is_discovered() && state.is_connected()); info!( - "Connected peers: known {}/{} unknown {}/{} discovered {}/{}.", + "Connected peers: known {}/{} unknown {}/{} discovered {}/{}, All peers: {}.", num_connected_known, num_known, num_connected_unknown, global::max_unknown_peers(), num_connected_discovered, - global::max_discovered_peers() + global::max_discovered_peers(), + num_all, ); // Automatically try to reconnect known **and** discovered peers. The removal of discovered peers is a decision diff --git a/bee-protocol/src/workers/peer/manager.rs b/bee-protocol/src/workers/peer/manager.rs index ff3a58666a..559b128e4b 100644 --- a/bee-protocol/src/workers/peer/manager.rs +++ b/bee-protocol/src/workers/peer/manager.rs @@ -110,6 +110,7 @@ where while let Some(event) = receiver.next().await { trace!("Received event {:?}.", event); + log::warn!{"peer_manager.len()={}", peer_manager.len()}; match event { NetworkEvent::PeerAdded { peer_id, info } => { diff --git a/bee-protocol/src/workers/peer/manager_res.rs b/bee-protocol/src/workers/peer/manager_res.rs index 6956cc5637..1d971460e8 100644 --- a/bee-protocol/src/workers/peer/manager_res.rs +++ b/bee-protocol/src/workers/peer/manager_res.rs @@ -181,4 +181,8 @@ impl PeerManager { .filter(|(_, (peer, ctx))| (ctx.is_some() && peer.is_synced())) .count() as u8 } + + pub fn len(&self) -> usize { + self.inner.read().peers.len() + } } From f9970d92aa024e4b7ccc8e45cb108275904b951f Mon Sep 17 00:00:00 2001 From: Alexander Schmidt Date: Fri, 25 Feb 2022 18:57:06 +0100 Subject: [PATCH 05/25] Display error message when sending the shutdown signal to an autopeering task fails --- bee-network/bee-autopeering/src/task.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/bee-network/bee-autopeering/src/task.rs b/bee-network/bee-autopeering/src/task.rs index 858b9b1870..b37675d440 100644 --- a/bee-network/bee-autopeering/src/task.rs +++ b/bee-network/bee-autopeering/src/task.rs @@ -117,7 +117,10 @@ impl TaskManager { let shutdown_tx = shutdown_senders.remove(&task_name).unwrap(); log::trace!("Shutting down: {}", task_name); - shutdown_tx.send(()).expect("error sending shutdown signal"); + + if shutdown_tx.send(()).is_err() { + log::error!("Error sending shutdown signal to task {task_name}. This is a bug."); + } } // Wait for all tasks to shutdown down in a certain order and maximum amount of time. From a53300acd0f98d0b62098a084df9b1d27832f02f Mon Sep 17 00:00:00 2001 From: Alexander Schmidt Date: Fri, 25 Feb 2022 23:03:42 +0100 Subject: [PATCH 06/25] Remove unreachable discovered peers from peer manager --- bee-protocol/src/workers/peer/manager.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/bee-protocol/src/workers/peer/manager.rs b/bee-protocol/src/workers/peer/manager.rs index 559b128e4b..a4d7fb2833 100644 --- a/bee-protocol/src/workers/peer/manager.rs +++ b/bee-protocol/src/workers/peer/manager.rs @@ -190,7 +190,11 @@ where .unwrap_or_default(), NetworkEvent::PeerUnreachable { peer_id, peer_info } => { if peer_info.relation.is_discovered() { - // todo: tell the autopeering to remove that neighbor + // Remove that discovered peer from the manager. + if let Some(peer) = peer_manager.remove(&peer_id) { + info!("Removed peer {}.", peer.0.alias()); + } + // Todo: tell the autopeering to remove that neighbor. log::error!("Tell autopeering to remove {}", alias!(peer_id)); } } From b6997d24f7763b6613b5e740e1f8e6ce2690814e Mon Sep 17 00:00:00 2001 From: Alexander Schmidt Date: Fri, 25 Feb 2022 23:34:29 +0100 Subject: [PATCH 07/25] Remove unreachable discovered peers from gossip and peer manager --- bee-network/bee-gossip/src/service/host.rs | 5 ++++- bee-protocol/src/workers/peer/manager.rs | 21 ++++++++++++--------- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/bee-network/bee-gossip/src/service/host.rs b/bee-network/bee-gossip/src/service/host.rs index aeabd91d14..acc7b65e0e 100644 --- a/bee-network/bee-gossip/src/service/host.rs +++ b/bee-network/bee-gossip/src/service/host.rs @@ -248,7 +248,10 @@ async fn peerstate_checker(shutdown: Shutdown, senders: Senders, peerlist: PeerL for (peer_id, peer_info) in peerlist_reader.filter_info(|info, state| { (info.relation.is_known() || info.relation.is_discovered()) && state.is_disconnected() }) { - let dial_attempts = peerlist_reader.dial_attempts(&peer_id).expect("peer does exist").expect("peer is not connected"); + let dial_attempts = peerlist_reader + .dial_attempts(&peer_id) + .expect("peer does exist") + .expect("peer is not connected"); if dial_attempts >= MAX_DIAL_ATTEMPTS { log::debug!("Peer {} is unreachable.", peer_id); diff --git a/bee-protocol/src/workers/peer/manager.rs b/bee-protocol/src/workers/peer/manager.rs index a4d7fb2833..4697b7bb1b 100644 --- a/bee-protocol/src/workers/peer/manager.rs +++ b/bee-protocol/src/workers/peer/manager.rs @@ -63,7 +63,7 @@ where let tangle = node.resource::>(); let requested_milestones = node.resource::(); let metrics = node.resource::(); - let network_command_tx = node.resource::(); + let gossip_command_tx = node.resource::(); let hasher = node.worker::().unwrap().tx.clone(); let message_responder = node.worker::().unwrap().tx.clone(); @@ -87,13 +87,13 @@ where match event { AutopeeringEvent::IncomingPeering { peer, .. } => { - handle_new_peering(peer, &network_name, &network_command_tx); + handle_new_peering(peer, &network_name, &gossip_command_tx); } AutopeeringEvent::OutgoingPeering { peer, .. } => { - handle_new_peering(peer, &network_name, &network_command_tx); + handle_new_peering(peer, &network_name, &gossip_command_tx); } AutopeeringEvent::PeeringDropped { peer_id } => { - handle_peering_dropped(peer_id, &network_command_tx); + handle_peering_dropped(peer_id, &gossip_command_tx); } _ => {} } @@ -103,6 +103,8 @@ where }); } + let gossip_command_tx = node.resource::(); + node.spawn::(|shutdown| async move { info!("Network handler running."); @@ -110,7 +112,7 @@ where while let Some(event) = receiver.next().await { trace!("Received event {:?}.", event); - log::warn!{"peer_manager.len()={}", peer_manager.len()}; + log::warn! {"peer_manager.len()={}", peer_manager.len()}; match event { NetworkEvent::PeerAdded { peer_id, info } => { @@ -190,10 +192,11 @@ where .unwrap_or_default(), NetworkEvent::PeerUnreachable { peer_id, peer_info } => { if peer_info.relation.is_discovered() { - // Remove that discovered peer from the manager. - if let Some(peer) = peer_manager.remove(&peer_id) { - info!("Removed peer {}.", peer.0.alias()); - } + // Remove that discovered peer. + gossip_command_tx + .send(Command::RemovePeer { peer_id }) + .expect("send gossip command"); + // Todo: tell the autopeering to remove that neighbor. log::error!("Tell autopeering to remove {}", alias!(peer_id)); } From 9a14c5c0c3baca29ae81b461317092b96e05d2ca Mon Sep 17 00:00:00 2001 From: Alexander Schmidt Date: Sun, 27 Feb 2022 19:33:21 +0100 Subject: [PATCH 08/25] Publish PeerUnreachable for peers that don't identify properly; Change some logs; --- bee-network/bee-gossip/src/service/event.rs | 6 +++++ bee-network/bee-gossip/src/service/host.rs | 12 +++++++++- bee-network/bee-gossip/src/swarm/behaviour.rs | 6 ++++- bee-protocol/src/workers/peer/manager.rs | 24 +++++++++++-------- 4 files changed, 36 insertions(+), 12 deletions(-) diff --git a/bee-network/bee-gossip/src/service/event.rs b/bee-network/bee-gossip/src/service/event.rs index 94c7743932..d08e4ef0d5 100644 --- a/bee-network/bee-gossip/src/service/event.rs +++ b/bee-network/bee-gossip/src/service/event.rs @@ -132,6 +132,12 @@ pub enum InternalEvent { /// The gossip protocol with a peer was stopped. ProtocolStopped { peer_id: PeerId }, + + /// A peer didn't answer our repeated calls. + PeerUnreachable { + /// The peer's id. + peer_id: PeerId, + }, } /// Allows the user to receive [`Event`]s published by the network layer. diff --git a/bee-network/bee-gossip/src/service/host.rs b/bee-network/bee-gossip/src/service/host.rs index acc7b65e0e..4fd5efbf3b 100644 --- a/bee-network/bee-gossip/src/service/host.rs +++ b/bee-network/bee-gossip/src/service/host.rs @@ -172,7 +172,8 @@ async fn command_processor(shutdown: Shutdown, commands: CommandReceiver, sender while let Some(command) = commands.next().await { if let Err(e) = process_command(command, &senders, &peerlist).await { - error!("Error processing command. Cause: {}", e); + // Note: commands are allowed to fail as the user may not be up-to-date. + debug!("Command could not be executed. Cause: {}", e); continue; } } @@ -480,6 +481,15 @@ async fn process_internal_event( debug!("{}", accepted.unwrap_err()); } } + + InternalEvent::PeerUnreachable { peer_id } => { + if let Ok(peer_info) = peerlist.0.read().await.info(&peer_id) { + senders + .events + .send(Event::PeerUnreachable { peer_id, peer_info }) + .map_err(|_| Error::SendingEventFailed)?; + } + } } Ok(()) diff --git a/bee-network/bee-gossip/src/swarm/behaviour.rs b/bee-network/bee-gossip/src/swarm/behaviour.rs index bef12a331e..8b3ec7f817 100644 --- a/bee-network/bee-gossip/src/swarm/behaviour.rs +++ b/bee-network/bee-gossip/src/swarm/behaviour.rs @@ -58,7 +58,11 @@ impl NetworkBehaviourEventProcess for SwarmBehaviour { trace!("Pushed Identify request to {}.", alias!(peer_id)); } IdentifyEvent::Error { peer_id, error } => { - warn!("Identification error with {}: Cause: {:?}.", alias!(peer_id), error); + trace!("Identification error with {}: Cause: {:?}.", alias!(peer_id), error); + + // Panic: + // Sending must not fail. + self.internal_sender.send(InternalEvent::PeerUnreachable { peer_id }).expect("send internal event"); } } } diff --git a/bee-protocol/src/workers/peer/manager.rs b/bee-protocol/src/workers/peer/manager.rs index 4697b7bb1b..04b5107952 100644 --- a/bee-protocol/src/workers/peer/manager.rs +++ b/bee-protocol/src/workers/peer/manager.rs @@ -198,7 +198,7 @@ where .expect("send gossip command"); // Todo: tell the autopeering to remove that neighbor. - log::error!("Tell autopeering to remove {}", alias!(peer_id)); + log::warn!("TODO: tell autopeering to remove {} from neighborhood.", alias!(peer_id)); } } _ => (), // Ignore all other events for now @@ -216,21 +216,25 @@ fn handle_new_peering(peer: bee_autopeering::Peer, network_name: &str, command_t if let Some(multiaddr) = peer.service_multiaddr(network_name) { let peer_id = peer.peer_id().libp2p_peer_id(); - command_tx - .send(Command::AddPeer { - peer_id, - alias: Some(alias!(peer_id).to_string()), - multiaddr, - relation: PeerRelation::Discovered, - }) - .expect("error sending network command"); + // Panic: + // Sending commands must not fail. + command_tx + .send(Command::AddPeer { + peer_id, + alias: Some(alias!(peer_id).to_string()), + multiaddr, + relation: PeerRelation::Discovered, + }) + .expect("send command to gossip layer"); } } fn handle_peering_dropped(peer_id: bee_autopeering::PeerId, command_tx: &NetworkCommandSender) { let peer_id = peer_id.libp2p_peer_id(); + // Panic: + // Sending commands must not fail. command_tx .send(Command::RemovePeer { peer_id }) - .expect("error sending network command"); + .expect("send command to gossip layer"); } From d875e8aa7451b79881ecfa93246cecf8227a2994 Mon Sep 17 00:00:00 2001 From: Alexander Schmidt Date: Mon, 28 Feb 2022 11:57:00 +0100 Subject: [PATCH 09/25] Reset dial attempt counter only if peer successfully identified itself --- bee-network/bee-gossip/src/network/host.rs | 12 +- bee-network/bee-gossip/src/peer/list.rs | 114 +++++++++--------- bee-network/bee-gossip/src/service/event.rs | 6 + bee-network/bee-gossip/src/service/host.rs | 50 +++++--- bee-network/bee-gossip/src/swarm/behaviour.rs | 12 +- bee-protocol/src/workers/peer/manager.rs | 25 ++-- 6 files changed, 127 insertions(+), 92 deletions(-) diff --git a/bee-network/bee-gossip/src/network/host.rs b/bee-network/bee-gossip/src/network/host.rs index ded812ac59..9943cd5b34 100644 --- a/bee-network/bee-gossip/src/network/host.rs +++ b/bee-network/bee-gossip/src/network/host.rs @@ -212,13 +212,17 @@ async fn dial_peer(swarm: &mut Swarm, peer_id: PeerId, peerlist: address: addr, alias, .. } = peerlist.0.read().await.info(&peer_id).expect("peer must exist"); - let dial_attempt = peerlist + let mut dial_attempt = 0; + + peerlist .0 .write() .await - .log_dial_attempt(&peer_id) - .expect("peer must exist") - .expect("peer must not be connected"); + .update_metrics(&peer_id, |m| { + m.num_dials += 1; + dial_attempt = m.num_dials; + }) + .expect("peer must exist"); debug!( "Dialing peer: {} ({}) attempt: #{}.", diff --git a/bee-network/bee-gossip/src/peer/list.rs b/bee-network/bee-gossip/src/peer/list.rs index d4cebce1ad..42bc3ad806 100644 --- a/bee-network/bee-gossip/src/peer/list.rs +++ b/bee-network/bee-gossip/src/peer/list.rs @@ -33,7 +33,7 @@ impl PeerListWrapper { pub struct PeerList { local_id: PeerId, local_addrs: HashSet, - peers: HashMap, + peers: HashMap, banned_peers: HashSet, banned_addrs: HashSet, } @@ -64,6 +64,7 @@ impl PeerList { relation: PeerRelation::Known, }, PeerState::default(), + PeerMetrics::default(), ), ) })); @@ -83,7 +84,9 @@ impl PeerList { } // Since we already checked that such a `peer_id` is not yet present, the returned value is always `None`. - let _ = self.peers.insert(peer_id, (peer_info, PeerState::default())); + let _ = self + .peers + .insert(peer_id, (peer_info, PeerState::default(), PeerMetrics::default())); Ok(()) } @@ -99,7 +102,7 @@ impl PeerList { } pub fn remove(&mut self, peer_id: &PeerId) -> Result { - let (info, _) = self.peers.remove(peer_id).ok_or(Error::PeerNotPresent(*peer_id))?; + let (info, _, _) = self.peers.remove(peer_id).ok_or(Error::PeerNotPresent(*peer_id))?; Ok(info) } @@ -112,30 +115,14 @@ impl PeerList { self.peers .get(peer_id) .ok_or(Error::PeerNotPresent(*peer_id)) - .map(|(peer_info, _)| peer_info.clone()) - } - - pub fn dial_attempts(&self, peer_id: &PeerId) -> Result, Error> { - let (_, state) = self.peers.get(peer_id).ok_or(Error::PeerNotPresent(*peer_id))?; - - if let PeerState::Disconnected { dial_attempts } = state { - Ok(Some(*dial_attempts)) - } else { - Ok(None) - } + .map(|(info, _, _)| info.clone()) } - // Note: no-op if peer is already connected. - pub fn log_dial_attempt(&mut self, peer_id: &PeerId) -> Result, Error> { - let (_, state) = self.peers.get_mut(peer_id).ok_or(Error::PeerNotPresent(*peer_id))?; - - if let PeerState::Disconnected { dial_attempts } = state { - *dial_attempts += 1; - - Ok(Some(*dial_attempts)) - } else { - Ok(None) - } + pub fn metrics(&self, peer_id: &PeerId) -> Result { + self.peers + .get(peer_id) + .ok_or(Error::PeerNotPresent(*peer_id)) + .map(|(_, _, metrics)| metrics.clone()) } pub fn len(&self) -> usize { @@ -146,7 +133,7 @@ impl PeerList { where U: FnMut(&mut PeerInfo), { - let (info, _) = self.peers.get_mut(peer_id).ok_or(Error::PeerNotPresent(*peer_id))?; + let (info, _, _) = self.peers.get_mut(peer_id).ok_or(Error::PeerNotPresent(*peer_id))?; update(info); @@ -157,28 +144,39 @@ impl PeerList { where U: FnMut(&mut PeerState) -> Option, { - let (_, state) = self.peers.get_mut(peer_id).ok_or(Error::PeerNotPresent(*peer_id))?; + let (_, state, _) = self.peers.get_mut(peer_id).ok_or(Error::PeerNotPresent(*peer_id))?; Ok(update(state)) } + pub fn update_metrics(&mut self, peer_id: &PeerId, mut update: U) -> Result<(), Error> + where + U: FnMut(&mut PeerMetrics), + { + let (_, _, metrics) = self.peers.get_mut(peer_id).ok_or(Error::PeerNotPresent(*peer_id))?; + + update(metrics); + + Ok(()) + } + pub fn satisfies

(&self, peer_id: &PeerId, predicate: P) -> Result where - P: Fn(&PeerInfo, &PeerState) -> bool, + P: Fn(&PeerInfo, &PeerState, &PeerMetrics) -> bool, { self.peers .get(peer_id) .ok_or(Error::PeerNotPresent(*peer_id)) - .map(|(info, state)| predicate(info, state)) + .map(|(info, state, metrics)| predicate(info, state, metrics)) } - pub fn filter_info<'a, P: 'a>(&'a self, predicate: P) -> impl Iterator + '_ + pub fn filter<'a, P: 'a>(&'a self, predicate: P) -> impl Iterator + '_ where - P: Fn(&PeerInfo, &PeerState) -> bool, + P: Fn(&PeerInfo, &PeerState, &PeerMetrics) -> bool, { - self.peers.iter().filter_map(move |(peer_id, (info, state))| { - if predicate(info, state) { - Some((*peer_id, info.clone())) + self.peers.iter().filter_map(move |(peer_id, (info, state, metrics))| { + if predicate(info, state, metrics) { + Some((*peer_id, info.clone(), metrics.clone())) } else { None } @@ -187,19 +185,19 @@ impl PeerList { pub fn filter_count

(&self, predicate: P) -> usize where - P: Fn(&PeerInfo, &PeerState) -> bool, + P: Fn(&PeerInfo, &PeerState, &PeerMetrics) -> bool, { self.peers.iter().fold( 0, - |count, (_, (info, state))| { - if predicate(info, state) { count + 1 } else { count } + |count, (_, (info, state, metrics))| { + if predicate(info, state, metrics) { count + 1 } else { count } }, ) } pub fn filter_remove

(&mut self, peer_id: &PeerId, predicate: P) -> bool where - P: Fn(&PeerInfo, &PeerState) -> bool, + P: Fn(&PeerInfo, &PeerState, &PeerMetrics) -> bool, { // NB: Since we drop a potential reference to `&(PeerInfo, PeerState)` this code won't create a deadlock in case // we refactor `PeerList` in a way that `.remove` would only take a `&self`. @@ -207,7 +205,7 @@ impl PeerList { if self .peers .get(peer_id) - .filter(|(info, state)| predicate(info, state)) + .filter(|(info, state, metrics)| predicate(info, state, metrics)) .is_some() { // Should always return `true`, because we know it's there. @@ -284,16 +282,16 @@ impl PeerList { } else if self.banned_addrs.contains(peer_addr) { Err(Error::AddressIsBanned(peer_addr.clone())) } else if self - .satisfies(peer_id, |_, state| state.is_connected()) + .satisfies(peer_id, |_, state, _| state.is_connected()) .unwrap_or(false) { Err(Error::PeerIsConnected(*peer_id)) } else if !self.contains(peer_id) - && self.filter_count(|info, _| info.relation.is_unknown()) >= global::max_unknown_peers() + && self.filter_count(|info, _, _| info.relation.is_unknown()) >= global::max_unknown_peers() { Err(Error::ExceedsUnknownPeerLimit(global::max_unknown_peers())) } else if !self.contains(peer_id) - && self.filter_count(|info, _| info.relation.is_discovered()) >= global::max_discovered_peers() + && self.filter_count(|info, _, _| info.relation.is_discovered()) >= global::max_discovered_peers() { Err(Error::ExceedsDiscoveredPeerLimit(global::max_discovered_peers())) } else { @@ -319,24 +317,24 @@ impl PeerList { } else if self.banned_peers.contains(peer_id) { Err(Error::PeerIsBanned(*peer_id)) } else if self - .satisfies(peer_id, |_, state| state.is_connected()) + .satisfies(peer_id, |_, state, _| state.is_connected()) .unwrap_or(false) { Err(Error::PeerIsConnected(*peer_id)) } else { - let (peer_info, _) = self.peers.get(peer_id).unwrap(); + let (peer_info, _, _) = self.peers.get(peer_id).unwrap(); if self.local_addrs.contains(&peer_info.address) { Err(Error::AddressIsLocal(peer_info.address.clone())) } else if self.banned_addrs.contains(&peer_info.address) { Err(Error::AddressIsBanned(peer_info.address.clone())) } else if peer_info.relation.is_unknown() - && self.filter_count(|info, status| info.relation.is_unknown() && status.is_connected()) + && self.filter_count(|info, status, _| info.relation.is_unknown() && status.is_connected()) >= global::max_unknown_peers() { Err(Error::ExceedsUnknownPeerLimit(global::max_unknown_peers())) } else if peer_info.relation.is_discovered() - && self.filter_count(|info, status| info.relation.is_discovered() && status.is_connected()) + && self.filter_count(|info, status, _| info.relation.is_discovered() && status.is_connected()) >= global::max_discovered_peers() { Err(Error::ExceedsDiscoveredPeerLimit(global::max_discovered_peers())) @@ -365,9 +363,9 @@ impl PeerList { } fn find_peer_if_connected(&self, addr: &Multiaddr) -> Option { - self.filter_info(|info, state| state.is_connected() && info.address == *addr) + self.filter(|info, state, _| state.is_connected() && info.address == *addr) .next() - .map(|(peer_id, _)| peer_id) + .map(|(peer_id, _, _)| peer_id) } } @@ -451,10 +449,10 @@ mod tests { .unwrap(); assert_eq!(1, pl.len()); - pl.filter_remove(&peer_id, |info, _| info.relation.is_unknown()); + pl.filter_remove(&peer_id, |info, _, _| info.relation.is_unknown()); assert_eq!(1, pl.len()); - pl.filter_remove(&peer_id, |info, _| info.relation.is_known()); + pl.filter_remove(&peer_id, |info, _, _| info.relation.is_known()); assert_eq!(0, pl.len()); } @@ -494,19 +492,25 @@ mod tests { #[derive(Clone, Debug)] pub enum PeerState { - Disconnected { dial_attempts: usize }, + Disconnected, Connected(GossipSender), } +#[derive(Clone, Debug, Default)] +pub struct PeerMetrics { + pub(crate) num_dials: usize, + pub(crate) identified: Option, +} + impl Default for PeerState { fn default() -> Self { - Self::Disconnected { dial_attempts: 0 } + Self::Disconnected } } impl PeerState { pub fn is_disconnected(&self) -> bool { - matches!(self, Self::Disconnected { dial_attempts: _ }) + matches!(self, Self::Disconnected) } pub fn is_connected(&self) -> bool { @@ -520,7 +524,7 @@ impl PeerState { pub fn set_disconnected(&mut self) -> Option { match take(self) { - Self::Disconnected { .. } => None, + Self::Disconnected => None, Self::Connected(sender) => Some(sender), } } @@ -540,7 +544,7 @@ mod peerstate_tests { #[test] fn peer_state_change() { - let mut peerstate = PeerState::Disconnected { dial_attempts: 0 }; + let mut peerstate = PeerState::Disconnected; let (tx, _rx) = channel(); peerstate.set_connected(tx); diff --git a/bee-network/bee-gossip/src/service/event.rs b/bee-network/bee-gossip/src/service/event.rs index d08e4ef0d5..ca64e3bdc8 100644 --- a/bee-network/bee-gossip/src/service/event.rs +++ b/bee-network/bee-gossip/src/service/event.rs @@ -138,6 +138,12 @@ pub enum InternalEvent { /// The peer's id. peer_id: PeerId, }, + + /// A peer has identified itself via the `libp2p` Identify protocol. + PeerIdentified { + /// The peer's id. + peer_id: PeerId, + }, } /// Allows the user to receive [`Event`]s published by the network layer. diff --git a/bee-network/bee-gossip/src/service/host.rs b/bee-network/bee-gossip/src/service/host.rs index 4fd5efbf3b..5b85ce65d0 100644 --- a/bee-network/bee-gossip/src/service/host.rs +++ b/bee-network/bee-gossip/src/service/host.rs @@ -1,6 +1,8 @@ // Copyright 2020-2021 IOTA Stiftung // SPDX-License-Identifier: Apache-2.0 +use std::time::{SystemTime, UNIX_EPOCH}; + use super::{ command::{Command, CommandReceiver, CommandSender}, error::Error, @@ -32,7 +34,7 @@ use tokio::time::{self, Duration, Instant}; use tokio_stream::wrappers::{IntervalStream, UnboundedReceiverStream}; const MAX_PEER_STATE_CHECKER_DELAY_MILLIS: u64 = 2000; -const MAX_DIAL_ATTEMPTS: usize = 3; +const MAX_DIALS: usize = 3; pub struct ServiceHostConfig { pub local_keys: identity::Keypair, @@ -215,46 +217,42 @@ async fn peerstate_checker(shutdown: Shutdown, senders: Senders, peerlist: PeerL // Check, if there are any disconnected known peers, and schedule a reconnect attempt for each // of those. while interval.next().await.is_some() { - let peerlist_reader = peerlist.0.read().await; - - // How many peers we know about. - let num_all = peerlist_reader.len(); + let read = peerlist.0.read().await; // To how many known peers are we currently connected. - let num_known = peerlist_reader.filter_count(|info, _| info.relation.is_known()); + let num_known = read.filter_count(|info, _, _| info.relation.is_known()); let num_connected_known = - peerlist_reader.filter_count(|info, state| info.relation.is_known() && state.is_connected()); + read.filter_count(|info, state, _| info.relation.is_known() && state.is_connected()); // To how many unknown peers are we currently connected. let num_connected_unknown = - peerlist_reader.filter_count(|info, state| info.relation.is_unknown() && state.is_connected()); + read.filter_count(|info, state, _| info.relation.is_unknown() && state.is_connected()); // To how many discovered peers are we currently connected. let num_connected_discovered = - peerlist_reader.filter_count(|info, state| info.relation.is_discovered() && state.is_connected()); + read.filter_count(|info, state, _| info.relation.is_discovered() && state.is_connected()); + + // How many peers we know of but are currently disconnected. + let num_disconnected = + read.filter_count(|_, state, _| state.is_disconnected()); info!( - "Connected peers: known {}/{} unknown {}/{} discovered {}/{}, All peers: {}.", + "Connected peers: known {}/{} unknown {}/{} discovered {}/{} - Disconnected peers: {}.", num_connected_known, num_known, num_connected_unknown, global::max_unknown_peers(), num_connected_discovered, global::max_discovered_peers(), - num_all, + num_disconnected, ); // Automatically try to reconnect known **and** discovered peers. The removal of discovered peers is a decision // that needs to be made in the autopeering service. - for (peer_id, peer_info) in peerlist_reader.filter_info(|info, state| { + for (peer_id, peer_info, peer_metrics) in read.filter(|info, state, _| { (info.relation.is_known() || info.relation.is_discovered()) && state.is_disconnected() }) { - let dial_attempts = peerlist_reader - .dial_attempts(&peer_id) - .expect("peer does exist") - .expect("peer is not connected"); - - if dial_attempts >= MAX_DIAL_ATTEMPTS { + if peer_metrics.num_dials >= MAX_DIALS { log::debug!("Peer {} is unreachable.", peer_id); let _ = senders.events.send(Event::PeerUnreachable { peer_id, peer_info }); @@ -380,7 +378,7 @@ async fn process_internal_event( // Only remove unknown peers. // NOTE: discovered peers should be removed manually via command if the autopeering protocol suggests it. - let _ = peerlist.filter_remove(&peer_id, |peer_info, _| peer_info.relation.is_unknown()); + let _ = peerlist.filter_remove(&peer_id, |peer_info, _, _| peer_info.relation.is_unknown()); // We no longer need to hold the lock. drop(peerlist); @@ -490,6 +488,20 @@ async fn process_internal_event( .map_err(|_| Error::SendingEventFailed)?; } } + + InternalEvent::PeerIdentified { peer_id } => { + let _ = peerlist.0.write().await.update_metrics(&peer_id, |m| { + // Reset dial count. + m.num_dials = 0; + // Update Identify timestamp. + m.identified = Some( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("system time") + .as_secs(), + ); + }); + } } Ok(()) diff --git a/bee-network/bee-gossip/src/swarm/behaviour.rs b/bee-network/bee-gossip/src/swarm/behaviour.rs index 8b3ec7f817..847756d19f 100644 --- a/bee-network/bee-gossip/src/swarm/behaviour.rs +++ b/bee-network/bee-gossip/src/swarm/behaviour.rs @@ -44,12 +44,16 @@ impl NetworkBehaviourEventProcess for SwarmBehaviour { match event { IdentifyEvent::Received { peer_id, info } => { trace!( - "Received Identify request from {}. Observed address: {:?}.", + "Received Identify response from {}. Observed address: {:?}.", alias!(peer_id), info.observed_addr, ); - // TODO: log supported protocols by the peer (info.protocols) + // Panic: + // Sending must not fail. + self.internal_sender + .send(InternalEvent::PeerIdentified { peer_id }) + .expect("send internal event"); } IdentifyEvent::Sent { peer_id } => { trace!("Sent Identify request to {}.", alias!(peer_id)); @@ -62,7 +66,9 @@ impl NetworkBehaviourEventProcess for SwarmBehaviour { // Panic: // Sending must not fail. - self.internal_sender.send(InternalEvent::PeerUnreachable { peer_id }).expect("send internal event"); + self.internal_sender + .send(InternalEvent::PeerUnreachable { peer_id }) + .expect("send internal event"); } } } diff --git a/bee-protocol/src/workers/peer/manager.rs b/bee-protocol/src/workers/peer/manager.rs index 04b5107952..862bcdcb21 100644 --- a/bee-protocol/src/workers/peer/manager.rs +++ b/bee-protocol/src/workers/peer/manager.rs @@ -198,7 +198,10 @@ where .expect("send gossip command"); // Todo: tell the autopeering to remove that neighbor. - log::warn!("TODO: tell autopeering to remove {} from neighborhood.", alias!(peer_id)); + log::warn!( + "TODO: tell autopeering to remove {} from neighborhood.", + alias!(peer_id) + ); } } _ => (), // Ignore all other events for now @@ -216,16 +219,16 @@ fn handle_new_peering(peer: bee_autopeering::Peer, network_name: &str, command_t if let Some(multiaddr) = peer.service_multiaddr(network_name) { let peer_id = peer.peer_id().libp2p_peer_id(); - // Panic: - // Sending commands must not fail. - command_tx - .send(Command::AddPeer { - peer_id, - alias: Some(alias!(peer_id).to_string()), - multiaddr, - relation: PeerRelation::Discovered, - }) - .expect("send command to gossip layer"); + // Panic: + // Sending commands must not fail. + command_tx + .send(Command::AddPeer { + peer_id, + alias: Some(alias!(peer_id).to_string()), + multiaddr, + relation: PeerRelation::Discovered, + }) + .expect("send command to gossip layer"); } } From 6d4a3a86bf54c46e320280b197509641499525c7 Mon Sep 17 00:00:00 2001 From: Alexander Schmidt Date: Mon, 28 Feb 2022 12:13:58 +0100 Subject: [PATCH 10/25] Remove temporary logs --- bee-protocol/src/workers/peer/manager.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/bee-protocol/src/workers/peer/manager.rs b/bee-protocol/src/workers/peer/manager.rs index 862bcdcb21..268e1cb83c 100644 --- a/bee-protocol/src/workers/peer/manager.rs +++ b/bee-protocol/src/workers/peer/manager.rs @@ -112,7 +112,6 @@ where while let Some(event) = receiver.next().await { trace!("Received event {:?}.", event); - log::warn! {"peer_manager.len()={}", peer_manager.len()}; match event { NetworkEvent::PeerAdded { peer_id, info } => { @@ -197,11 +196,7 @@ where .send(Command::RemovePeer { peer_id }) .expect("send gossip command"); - // Todo: tell the autopeering to remove that neighbor. - log::warn!( - "TODO: tell autopeering to remove {} from neighborhood.", - alias!(peer_id) - ); + // Todo: tell the autopeering to remove that peer from the neighborhood. } } _ => (), // Ignore all other events for now From 30d831afe4f48fa1c633f731ca598b954f6c5fc0 Mon Sep 17 00:00:00 2001 From: Alexander Schmidt Date: Mon, 28 Feb 2022 12:43:58 +0100 Subject: [PATCH 11/25] Alias macro starts with the significant part of peer id --- bee-network/bee-gossip/src/alias.rs | 4 ++-- bee-network/bee-gossip/src/tests/alias.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/bee-network/bee-gossip/src/alias.rs b/bee-network/bee-gossip/src/alias.rs index 5db7002fbb..84c9b69b11 100644 --- a/bee-network/bee-gossip/src/alias.rs +++ b/bee-network/bee-gossip/src/alias.rs @@ -7,9 +7,9 @@ #[macro_export] macro_rules! alias { ($peer_id:expr) => { - &$peer_id.to_base58()[..16] + &$peer_id.to_base58()[7..23] }; ($peer_id:expr, $len:expr) => { - &$peer_id.to_base58()[..$len] + &$peer_id.to_base58()[7..7+$len] }; } diff --git a/bee-network/bee-gossip/src/tests/alias.rs b/bee-network/bee-gossip/src/tests/alias.rs index 64ee5f9d52..b92a3d37b2 100644 --- a/bee-network/bee-gossip/src/tests/alias.rs +++ b/bee-network/bee-gossip/src/tests/alias.rs @@ -9,12 +9,12 @@ use crate::alias; fn alias_default() { let peer_id = gen_constant_peer_id(); let alias = alias!(peer_id); - assert_eq!(alias, "eF27st"); + assert_eq!(alias, "WJWEKvSFbben74C7"); } #[test] fn alias_custom() { let peer_id = gen_constant_peer_id(); let alias = alias!(peer_id, 10); - assert_eq!(alias, "WSUEeF27st"); + assert_eq!(alias, "WJWEKvSFbb"); } From 1a06f38e397d30299ffcde1961c338eb4047a057 Mon Sep 17 00:00:00 2001 From: Alexander Schmidt Date: Mon, 28 Feb 2022 13:02:03 +0100 Subject: [PATCH 12/25] Send PeerRemoved event for disconnected unknown peers --- bee-network/bee-gossip/src/alias.rs | 2 +- bee-network/bee-gossip/src/network/host.rs | 4 +- bee-network/bee-gossip/src/peer/list.rs | 45 +++++++++++----------- bee-network/bee-gossip/src/service/host.rs | 20 ++++++---- 4 files changed, 39 insertions(+), 32 deletions(-) diff --git a/bee-network/bee-gossip/src/alias.rs b/bee-network/bee-gossip/src/alias.rs index 84c9b69b11..6dcb919285 100644 --- a/bee-network/bee-gossip/src/alias.rs +++ b/bee-network/bee-gossip/src/alias.rs @@ -10,6 +10,6 @@ macro_rules! alias { &$peer_id.to_base58()[7..23] }; ($peer_id:expr, $len:expr) => { - &$peer_id.to_base58()[7..7+$len] + &$peer_id.to_base58()[7..7 + $len] }; } diff --git a/bee-network/bee-gossip/src/network/host.rs b/bee-network/bee-gossip/src/network/host.rs index 9943cd5b34..b342612ba5 100644 --- a/bee-network/bee-gossip/src/network/host.rs +++ b/bee-network/bee-gossip/src/network/host.rs @@ -147,7 +147,7 @@ async fn process_swarm_event( .0 .write() .await - .insert_local_addr(address) + .add_local_addr(address) .expect("insert_local_addr"); } SwarmEvent::ConnectionEstablished { peer_id, .. } => { @@ -213,7 +213,7 @@ async fn dial_peer(swarm: &mut Swarm, peer_id: PeerId, peerlist: } = peerlist.0.read().await.info(&peer_id).expect("peer must exist"); let mut dial_attempt = 0; - + peerlist .0 .write() diff --git a/bee-network/bee-gossip/src/peer/list.rs b/bee-network/bee-gossip/src/peer/list.rs index 42bc3ad806..715b67089e 100644 --- a/bee-network/bee-gossip/src/peer/list.rs +++ b/bee-network/bee-gossip/src/peer/list.rs @@ -78,7 +78,7 @@ impl PeerList { } } - pub fn insert_peer(&mut self, peer_id: PeerId, peer_info: PeerInfo) -> Result<(), (PeerId, PeerInfo, Error)> { + pub fn add(&mut self, peer_id: PeerId, peer_info: PeerInfo) -> Result<(), (PeerId, PeerInfo, Error)> { if self.contains(&peer_id) { return Err((peer_id, peer_info, Error::PeerIsDuplicate(peer_id))); } @@ -91,16 +91,6 @@ impl PeerList { Ok(()) } - pub fn insert_local_addr(&mut self, addr: Multiaddr) -> Result<(), (Multiaddr, Error)> { - if self.local_addrs.contains(&addr) { - return Err((addr.clone(), Error::AddressIsDuplicate(addr))); - } - - let _ = self.local_addrs.insert(addr); - - Ok(()) - } - pub fn remove(&mut self, peer_id: &PeerId) -> Result { let (info, _, _) = self.peers.remove(peer_id).ok_or(Error::PeerNotPresent(*peer_id))?; @@ -129,6 +119,16 @@ impl PeerList { self.peers.len() } + pub fn add_local_addr(&mut self, addr: Multiaddr) -> Result<(), (Multiaddr, Error)> { + if self.local_addrs.contains(&addr) { + return Err((addr.clone(), Error::AddressIsDuplicate(addr))); + } + + let _ = self.local_addrs.insert(addr); + + Ok(()) + } + pub fn update_info(&mut self, peer_id: &PeerId, mut update: U) -> Result<(), Error> where U: FnMut(&mut PeerInfo), @@ -187,12 +187,13 @@ impl PeerList { where P: Fn(&PeerInfo, &PeerState, &PeerMetrics) -> bool, { - self.peers.iter().fold( - 0, - |count, (_, (info, state, metrics))| { - if predicate(info, state, metrics) { count + 1 } else { count } - }, - ) + self.peers.iter().fold(0, |count, (_, (info, state, metrics))| { + if predicate(info, state, metrics) { + count + 1 + } else { + count + } + }) } pub fn filter_remove

(&mut self, peer_id: &PeerId, predicate: P) -> bool @@ -388,7 +389,7 @@ mod tests { for i in 1..=3 { assert!( - pl.insert_peer( + pl.add( gen_random_peer_id(), gen_deterministic_peer_info(i, PeerRelation::Known) ) @@ -405,11 +406,11 @@ mod tests { let peer_id = gen_constant_peer_id(); - assert!(pl.insert_peer(peer_id, gen_constant_peer_info()).is_ok()); + assert!(pl.add(peer_id, gen_constant_peer_info()).is_ok()); // Do not allow inserting the same peer id twice. assert!(matches!( - pl.insert_peer(peer_id, gen_constant_peer_info()), + pl.add(peer_id, gen_constant_peer_info()), Err((_, _, Error::PeerIsDuplicate(_))) )); } @@ -434,7 +435,7 @@ mod tests { let mut pl = PeerList::new(local_id); - pl.insert_peer(peer_id, peer_info.clone()).unwrap(); + pl.add(peer_id, peer_info.clone()).unwrap(); pl.accepts_incoming_peer(&peer_id, &peer_info.address).unwrap(); } @@ -445,7 +446,7 @@ mod tests { let peer_id = gen_random_peer_id(); - pl.insert_peer(peer_id, gen_deterministic_peer_info(0, PeerRelation::Known)) + pl.add(peer_id, gen_deterministic_peer_info(0, PeerRelation::Known)) .unwrap(); assert_eq!(1, pl.len()); diff --git a/bee-network/bee-gossip/src/service/host.rs b/bee-network/bee-gossip/src/service/host.rs index 5b85ce65d0..d083449582 100644 --- a/bee-network/bee-gossip/src/service/host.rs +++ b/bee-network/bee-gossip/src/service/host.rs @@ -221,8 +221,7 @@ async fn peerstate_checker(shutdown: Shutdown, senders: Senders, peerlist: PeerL // To how many known peers are we currently connected. let num_known = read.filter_count(|info, _, _| info.relation.is_known()); - let num_connected_known = - read.filter_count(|info, state, _| info.relation.is_known() && state.is_connected()); + let num_connected_known = read.filter_count(|info, state, _| info.relation.is_known() && state.is_connected()); // To how many unknown peers are we currently connected. let num_connected_unknown = @@ -233,8 +232,7 @@ async fn peerstate_checker(shutdown: Shutdown, senders: Senders, peerlist: PeerL read.filter_count(|info, state, _| info.relation.is_discovered() && state.is_connected()); // How many peers we know of but are currently disconnected. - let num_disconnected = - read.filter_count(|_, state, _| state.is_disconnected()); + let num_disconnected = read.filter_count(|_, state, _| state.is_disconnected()); info!( "Connected peers: known {}/{} unknown {}/{} discovered {}/{} - Disconnected peers: {}.", @@ -378,7 +376,7 @@ async fn process_internal_event( // Only remove unknown peers. // NOTE: discovered peers should be removed manually via command if the autopeering protocol suggests it. - let _ = peerlist.filter_remove(&peer_id, |peer_info, _, _| peer_info.relation.is_unknown()); + let was_removed = peerlist.filter_remove(&peer_id, |peer_info, _, _| peer_info.relation.is_unknown()); // We no longer need to hold the lock. drop(peerlist); @@ -393,6 +391,14 @@ async fn process_internal_event( .events .send(Event::PeerDisconnected { peer_id }) .map_err(|_| Error::SendingEventFailed)?; + + if was_removed { + log::warn!("Removed unknown {peer_id}"); + senders + .events + .send(Event::PeerRemoved { peer_id }) + .map_err(|_| Error::SendingEventFailed)?; + } } InternalEvent::ProtocolEstablished { @@ -418,7 +424,7 @@ async fn process_internal_event( alias: alias!(peer_id).to_string(), relation: PeerRelation::Unknown, }; - peerlist.insert_peer(peer_id, peer_info).map_err(|(_, _, e)| e)?; + peerlist.add(peer_id, peer_info).map_err(|(_, _, e)| e)?; peer_added = true; } @@ -524,7 +530,7 @@ async fn add_peer( let mut peerlist = peerlist.0.write().await; // If the insert fails for some reason, we get the peer data back, so it can be reused. - match peerlist.insert_peer(peer_id, peer_info) { + match peerlist.add(peer_id, peer_info) { Ok(()) => { // Panic: // We just added the peer_id so unwrapping here is fine. From 5d187bca1de6754604cba78d79423e170d8b0f0b Mon Sep 17 00:00:00 2001 From: Alexander Schmidt Date: Mon, 28 Feb 2022 13:15:49 +0100 Subject: [PATCH 13/25] Alias macro starts with the significant part of peer id (for real) --- bee-network/bee-gossip/src/alias.rs | 4 ++-- bee-network/bee-gossip/src/tests/alias.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/bee-network/bee-gossip/src/alias.rs b/bee-network/bee-gossip/src/alias.rs index 6dcb919285..aa66f2e732 100644 --- a/bee-network/bee-gossip/src/alias.rs +++ b/bee-network/bee-gossip/src/alias.rs @@ -7,9 +7,9 @@ #[macro_export] macro_rules! alias { ($peer_id:expr) => { - &$peer_id.to_base58()[7..23] + &$peer_id.to_base58()[8..24] }; ($peer_id:expr, $len:expr) => { - &$peer_id.to_base58()[7..7 + $len] + &$peer_id.to_base58()[8..8 + $len] }; } diff --git a/bee-network/bee-gossip/src/tests/alias.rs b/bee-network/bee-gossip/src/tests/alias.rs index b92a3d37b2..18526c124a 100644 --- a/bee-network/bee-gossip/src/tests/alias.rs +++ b/bee-network/bee-gossip/src/tests/alias.rs @@ -9,12 +9,12 @@ use crate::alias; fn alias_default() { let peer_id = gen_constant_peer_id(); let alias = alias!(peer_id); - assert_eq!(alias, "WJWEKvSFbben74C7"); + assert_eq!(alias, "JWEKvSFbben74C7H"); } #[test] fn alias_custom() { let peer_id = gen_constant_peer_id(); let alias = alias!(peer_id, 10); - assert_eq!(alias, "WJWEKvSFbb"); + assert_eq!(alias, "JWEKvSFbbe"); } From e75ae7606549b5007a87c8daa500ab2194a02122 Mon Sep 17 00:00:00 2001 From: Alexander Schmidt Date: Mon, 28 Feb 2022 13:18:03 +0100 Subject: [PATCH 14/25] Align autopeering peer id display --- bee-network/bee-autopeering/src/peer/peer_id.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bee-network/bee-autopeering/src/peer/peer_id.rs b/bee-network/bee-autopeering/src/peer/peer_id.rs index 06aa0e75dd..b6cb1010ff 100644 --- a/bee-network/bee-autopeering/src/peer/peer_id.rs +++ b/bee-network/bee-autopeering/src/peer/peer_id.rs @@ -98,7 +98,7 @@ impl fmt::Debug for PeerId { impl fmt::Display for PeerId { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.libp2p_peer_id().to_base58()[..DISPLAY_LENGTH].fmt(f) + self.libp2p_peer_id().to_base58()[8..8+DISPLAY_LENGTH].fmt(f) } } From 49eaf73c7a3207183a988361220d423f720f3a04 Mon Sep 17 00:00:00 2001 From: Alexander Schmidt Date: Mon, 28 Feb 2022 14:37:49 +0100 Subject: [PATCH 15/25] Ignore error if adding a local address fails --- bee-network/bee-gossip/src/network/host.rs | 6 +++--- bee-network/bee-gossip/src/peer/list.rs | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/bee-network/bee-gossip/src/network/host.rs b/bee-network/bee-gossip/src/network/host.rs index b342612ba5..a6ffd7c234 100644 --- a/bee-network/bee-gossip/src/network/host.rs +++ b/bee-network/bee-gossip/src/network/host.rs @@ -143,12 +143,12 @@ async fn process_swarm_event( }) .expect("send error"); - peerlist + // Note: We don't care if the inserted address is a duplicate. + let _ = peerlist .0 .write() .await - .add_local_addr(address) - .expect("insert_local_addr"); + .add_local_addr(address); } SwarmEvent::ConnectionEstablished { peer_id, .. } => { debug!("Swarm event: connection established with {}.", alias!(peer_id)); diff --git a/bee-network/bee-gossip/src/peer/list.rs b/bee-network/bee-gossip/src/peer/list.rs index 715b67089e..89c669e287 100644 --- a/bee-network/bee-gossip/src/peer/list.rs +++ b/bee-network/bee-gossip/src/peer/list.rs @@ -119,6 +119,7 @@ impl PeerList { self.peers.len() } + /// Note: Returns an error if the address trying to be added is a duplicate. pub fn add_local_addr(&mut self, addr: Multiaddr) -> Result<(), (Multiaddr, Error)> { if self.local_addrs.contains(&addr) { return Err((addr.clone(), Error::AddressIsDuplicate(addr))); From bc336d9935fd3625f1bfdf579ebaf5418ec603e1 Mon Sep 17 00:00:00 2001 From: Alexander Schmidt Date: Mon, 28 Feb 2022 14:44:18 +0100 Subject: [PATCH 16/25] Change some log levels --- bee-network/bee-gossip/src/service/host.rs | 3 ++- bee-network/bee-gossip/src/swarm/behaviour.rs | 21 +++++++------------ 2 files changed, 9 insertions(+), 15 deletions(-) diff --git a/bee-network/bee-gossip/src/service/host.rs b/bee-network/bee-gossip/src/service/host.rs index d083449582..fc9d0cbd87 100644 --- a/bee-network/bee-gossip/src/service/host.rs +++ b/bee-network/bee-gossip/src/service/host.rs @@ -393,7 +393,8 @@ async fn process_internal_event( .map_err(|_| Error::SendingEventFailed)?; if was_removed { - log::warn!("Removed unknown {peer_id}"); + log::trace!("Removed unknown peer: {peer_id}"); + senders .events .send(Event::PeerRemoved { peer_id }) diff --git a/bee-network/bee-gossip/src/swarm/behaviour.rs b/bee-network/bee-gossip/src/swarm/behaviour.rs index 847756d19f..94c2b0fb77 100644 --- a/bee-network/bee-gossip/src/swarm/behaviour.rs +++ b/bee-network/bee-gossip/src/swarm/behaviour.rs @@ -62,7 +62,7 @@ impl NetworkBehaviourEventProcess for SwarmBehaviour { trace!("Pushed Identify request to {}.", alias!(peer_id)); } IdentifyEvent::Error { peer_id, error } => { - trace!("Identification error with {}: Cause: {:?}.", alias!(peer_id), error); + debug!("Identification error with {}: Cause: {:?}.", alias!(peer_id), error); // Panic: // Sending must not fail. @@ -78,10 +78,10 @@ impl NetworkBehaviourEventProcess for SwarmBehaviour { fn inject_event(&mut self, event: IotaGossipEvent) { match event { IotaGossipEvent::ReceivedUpgradeRequest { from } => { - debug!("Received IOTA gossip request from {}.", alias!(from)); + trace!("Received IOTA gossip request from {}.", alias!(from)); } IotaGossipEvent::SentUpgradeRequest { to } => { - debug!("Sent IOTA gossip request to {}.", alias!(to)); + trace!("Sent IOTA gossip request to {}.", alias!(to)); } IotaGossipEvent::UpgradeCompleted { peer_id, @@ -89,24 +89,17 @@ impl NetworkBehaviourEventProcess for SwarmBehaviour { origin, substream, } => { - debug!("Successfully negotiated IOTA gossip protocol with {}.", alias!(peer_id)); + trace!("Successfully negotiated IOTA gossip protocol with {}.", alias!(peer_id)); - if let Err(e) = self.internal_sender.send(InternalEvent::ProtocolEstablished { + self.internal_sender.send(InternalEvent::ProtocolEstablished { peer_id, peer_addr, origin, substream, - }) { - warn!( - "Send event error for {} after successfully established IOTA gossip protocol. Cause: {}", - peer_id, e - ); - - // TODO: stop processors in that case. - } + }).expect("send internal event"); } IotaGossipEvent::UpgradeError { peer_id, error } => { - warn!( + debug!( "IOTA gossip upgrade error with {}: Cause: {:?}.", alias!(peer_id), error From 4d231f714ec08810e851ae2f09cfa4aa1a52b1fb Mon Sep 17 00:00:00 2001 From: Alexander Schmidt Date: Mon, 28 Feb 2022 14:51:00 +0100 Subject: [PATCH 17/25] Format; Introduce display offset constant, Update autopeering changelog --- bee-network/bee-autopeering/CHANGELOG.md | 6 ++++++ bee-network/bee-autopeering/src/peer/peer_id.rs | 3 ++- bee-network/bee-gossip/src/network/host.rs | 6 +----- bee-network/bee-gossip/src/swarm/behaviour.rs | 14 ++++++++------ 4 files changed, 17 insertions(+), 12 deletions(-) diff --git a/bee-network/bee-autopeering/CHANGELOG.md b/bee-network/bee-autopeering/CHANGELOG.md index 32ca192a72..a58adfaecc 100644 --- a/bee-network/bee-autopeering/CHANGELOG.md +++ b/bee-network/bee-autopeering/CHANGELOG.md @@ -19,6 +19,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Security --> +## 0.5.0 - 2022-02-28 + +### Changed + +- Moved `PeerId` display format to start of significant part; + ## 0.4.0 - 2022-02-11 ### Added diff --git a/bee-network/bee-autopeering/src/peer/peer_id.rs b/bee-network/bee-autopeering/src/peer/peer_id.rs index b6cb1010ff..588de12bc1 100644 --- a/bee-network/bee-autopeering/src/peer/peer_id.rs +++ b/bee-network/bee-autopeering/src/peer/peer_id.rs @@ -18,6 +18,7 @@ use std::{ }; const DISPLAY_LENGTH: usize = 16; +const DISPLAY_OFFSET: usize = 8; /// Represents the unique identity of a peer in the network. #[derive(Copy, Clone)] @@ -98,7 +99,7 @@ impl fmt::Debug for PeerId { impl fmt::Display for PeerId { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.libp2p_peer_id().to_base58()[8..8+DISPLAY_LENGTH].fmt(f) + self.libp2p_peer_id().to_base58()[DISPLAY_OFFSET..DISPLAY_OFFSET + DISPLAY_LENGTH].fmt(f) } } diff --git a/bee-network/bee-gossip/src/network/host.rs b/bee-network/bee-gossip/src/network/host.rs index a6ffd7c234..54dccc019f 100644 --- a/bee-network/bee-gossip/src/network/host.rs +++ b/bee-network/bee-gossip/src/network/host.rs @@ -144,11 +144,7 @@ async fn process_swarm_event( .expect("send error"); // Note: We don't care if the inserted address is a duplicate. - let _ = peerlist - .0 - .write() - .await - .add_local_addr(address); + let _ = peerlist.0.write().await.add_local_addr(address); } SwarmEvent::ConnectionEstablished { peer_id, .. } => { debug!("Swarm event: connection established with {}.", alias!(peer_id)); diff --git a/bee-network/bee-gossip/src/swarm/behaviour.rs b/bee-network/bee-gossip/src/swarm/behaviour.rs index 94c2b0fb77..c6798fae0c 100644 --- a/bee-network/bee-gossip/src/swarm/behaviour.rs +++ b/bee-network/bee-gossip/src/swarm/behaviour.rs @@ -91,12 +91,14 @@ impl NetworkBehaviourEventProcess for SwarmBehaviour { } => { trace!("Successfully negotiated IOTA gossip protocol with {}.", alias!(peer_id)); - self.internal_sender.send(InternalEvent::ProtocolEstablished { - peer_id, - peer_addr, - origin, - substream, - }).expect("send internal event"); + self.internal_sender + .send(InternalEvent::ProtocolEstablished { + peer_id, + peer_addr, + origin, + substream, + }) + .expect("send internal event"); } IotaGossipEvent::UpgradeError { peer_id, error } => { debug!( From aa74ff356787f750e3c0d81ba80c2f1727983c0e Mon Sep 17 00:00:00 2001 From: Alexander Schmidt Date: Mon, 28 Feb 2022 18:48:42 +0100 Subject: [PATCH 18/25] Update changelogs; bump versions --- Cargo.lock | 6 +++--- bee-api/bee-rest-api/Cargo.toml | 2 +- bee-network/bee-autopeering/CHANGELOG.md | 4 ++-- bee-network/bee-autopeering/Cargo.toml | 2 +- bee-network/bee-gossip/CHANGELOG.md | 15 +++++++++++++++ bee-network/bee-gossip/Cargo.toml | 2 +- bee-network/bee-gossip/src/swarm/behaviour.rs | 4 ++-- bee-node/Cargo.toml | 2 +- bee-protocol/CHANGELOG.md | 8 +++++++- bee-protocol/Cargo.toml | 4 ++-- 10 files changed, 35 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5184cd51d9..9f216a3dee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -186,7 +186,7 @@ checksum = "cf9ff0bbfd639f15c74af777d81383cf53efb7c93613f6cab67c6c11e05bbf8b" [[package]] name = "bee-autopeering" -version = "0.4.0" +version = "0.4.1" dependencies = [ "async-trait", "base64 0.13.0", @@ -247,7 +247,7 @@ dependencies = [ [[package]] name = "bee-gossip" -version = "0.4.0" +version = "0.5.0" dependencies = [ "async-trait", "bee-runtime", @@ -374,7 +374,7 @@ dependencies = [ [[package]] name = "bee-protocol" -version = "0.2.0" +version = "0.2.1" dependencies = [ "async-channel", "async-priority-queue", diff --git a/bee-api/bee-rest-api/Cargo.toml b/bee-api/bee-rest-api/Cargo.toml index 37f29f68da..6c96b8c8a3 100644 --- a/bee-api/bee-rest-api/Cargo.toml +++ b/bee-api/bee-rest-api/Cargo.toml @@ -12,7 +12,7 @@ homepage = "https://www.iota.org" [dependencies] bee-common = { version = "0.6.0", path = "../../bee-common/bee-common", default-features = false, optional = true } -bee-gossip = { version = "0.4.0", path = "../../bee-network/bee-gossip", default-features = false, optional = true } +bee-gossip = { version = "0.5.0", path = "../../bee-network/bee-gossip", default-features = false, optional = true } bee-ledger = { version = "0.6.1", path = "../../bee-ledger", default-features = false } bee-message = { version = "0.1.6", path = "../../bee-message", default-features = false } bee-pow = { version = "0.2.0", path = "../../bee-pow", default-features = false } diff --git a/bee-network/bee-autopeering/CHANGELOG.md b/bee-network/bee-autopeering/CHANGELOG.md index a58adfaecc..3abdf66945 100644 --- a/bee-network/bee-autopeering/CHANGELOG.md +++ b/bee-network/bee-autopeering/CHANGELOG.md @@ -19,11 +19,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Security --> -## 0.5.0 - 2022-02-28 +## 0.4.1 - 2022-02-28 ### Changed -- Moved `PeerId` display format to start of significant part; +- Displayed representation of `PeerId`s to start at the beginning of its significant part; ## 0.4.0 - 2022-02-11 diff --git a/bee-network/bee-autopeering/Cargo.toml b/bee-network/bee-autopeering/Cargo.toml index a17e2ac364..2579a87938 100644 --- a/bee-network/bee-autopeering/Cargo.toml +++ b/bee-network/bee-autopeering/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bee-autopeering" -version = "0.4.0" +version = "0.4.1" authors = [ "IOTA Stiftung" ] edition = "2021" description = "Allows peers in the same IOTA network to automatically discover each other." diff --git a/bee-network/bee-gossip/CHANGELOG.md b/bee-network/bee-gossip/CHANGELOG.md index 7fcd86951d..8ffff4698c 100644 --- a/bee-network/bee-gossip/CHANGELOG.md +++ b/bee-network/bee-gossip/CHANGELOG.md @@ -19,6 +19,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Security --> +## 0.5.0 - 2022-02-28 + +### Added + +- `PeerMetrics` type, that keeps a count of dial attempts and idenfication timestamp; +- `PeerUnreachable` event: fired after a certain number of unsuccessful dial attempts; + +### Changed + +- `PeerList` type keeps metrics for each peer; + +### Fixed + +- Missing `PeerRemoved` event for disconnected unknown peers; + ## 0.4.0 - 2022-01-20 ### Fixed diff --git a/bee-network/bee-gossip/Cargo.toml b/bee-network/bee-gossip/Cargo.toml index 4e150f8546..7fadc96d50 100644 --- a/bee-network/bee-gossip/Cargo.toml +++ b/bee-network/bee-gossip/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bee-gossip" -version = "0.4.0" +version = "0.5.0" authors = [ "IOTA Stiftung" ] edition = "2021" description = "Allows peers in the same IOTA network to exchange gossip messages with each other." diff --git a/bee-network/bee-gossip/src/swarm/behaviour.rs b/bee-network/bee-gossip/src/swarm/behaviour.rs index c6798fae0c..c849c6a8a1 100644 --- a/bee-network/bee-gossip/src/swarm/behaviour.rs +++ b/bee-network/bee-gossip/src/swarm/behaviour.rs @@ -44,9 +44,9 @@ impl NetworkBehaviourEventProcess for SwarmBehaviour { match event { IdentifyEvent::Received { peer_id, info } => { trace!( - "Received Identify response from {}. Observed address: {:?}.", + "Received Identify response from {}: {:?}.", alias!(peer_id), - info.observed_addr, + info, ); // Panic: diff --git a/bee-node/Cargo.toml b/bee-node/Cargo.toml index 0e454fb4c5..a4ca5c3221 100644 --- a/bee-node/Cargo.toml +++ b/bee-node/Cargo.toml @@ -13,7 +13,7 @@ homepage = "https://www.iota.org" [dependencies] bee-autopeering = { version = "0.4.0", path = "../bee-network/bee-autopeering", default-features = false, features = [ "rocksdb" ] } bee-common = { version = "0.6.0", path = "../bee-common/bee-common", default-features = false } -bee-gossip = { version = "0.4.0", path = "../bee-network/bee-gossip", default-features = false, features = [ "full" ] } +bee-gossip = { version = "0.5.0", path = "../bee-network/bee-gossip", default-features = false, features = [ "full" ] } bee-ledger = { version = "0.6.1", path = "../bee-ledger", default-features = false, features = [ "workers" ] } bee-message = { version = "0.1.6", path = "../bee-message", default-features = false } bee-protocol = { version = "0.2.0", path = "../bee-protocol", default-features = false, features = [ "workers" ] } diff --git a/bee-protocol/CHANGELOG.md b/bee-protocol/CHANGELOG.md index 5771f66b43..7961a68dda 100644 --- a/bee-protocol/CHANGELOG.md +++ b/bee-protocol/CHANGELOG.md @@ -19,7 +19,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Security --> -## 0.2.0 - 2022-XX-XX +## 0.2.1 - 2022-02-28 + +### Changed + +- Peer manager handles new bee-gossip `PeerUnreachable` event; + +## 0.2.0 - 2022-01-27 ### Added diff --git a/bee-protocol/Cargo.toml b/bee-protocol/Cargo.toml index 4841fbdd43..720aa2320b 100644 --- a/bee-protocol/Cargo.toml +++ b/bee-protocol/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bee-protocol" -version = "0.2.0" +version = "0.2.1" authors = [ "IOTA Stiftung" ] edition = "2021" description = "All types and workers enabling the IOTA protocol" @@ -13,7 +13,7 @@ homepage = "https://www.iota.org" [dependencies] bee-autopeering = { version = "0.4.0", path = "../bee-network/bee-autopeering", default-features = false } bee-common = { version = "0.6.0", path = "../bee-common/bee-common", default-features = false, optional = true } -bee-gossip = { version = "0.4.0", path = "../bee-network/bee-gossip", default-features = false } +bee-gossip = { version = "0.5.0", path = "../bee-network/bee-gossip", default-features = false } bee-ledger = { version = "0.6.0", path = "../bee-ledger", default-features = false, features = [ "workers" ], optional = true } bee-message = { version = "0.1.6", path = "../bee-message", default-features = false, features = [ "serde" ] } bee-pow = { version = "0.2.0", path = "../bee-pow", default-features = false } From f89f6c2dc17f5388bc9b52368036885e21b32edc Mon Sep 17 00:00:00 2001 From: Alexander Schmidt Date: Tue, 1 Mar 2022 11:25:21 +0100 Subject: [PATCH 19/25] Nits --- bee-network/bee-gossip/src/service/event.rs | 3 ++- bee-network/bee-gossip/src/swarm/behaviour.rs | 6 +----- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/bee-network/bee-gossip/src/service/event.rs b/bee-network/bee-gossip/src/service/event.rs index ca64e3bdc8..406aa1b037 100644 --- a/bee-network/bee-gossip/src/service/event.rs +++ b/bee-network/bee-gossip/src/service/event.rs @@ -100,11 +100,12 @@ pub enum Event { /// The peer's id. peer_id: PeerId, }, + /// A peer didn't answer our repeated calls. PeerUnreachable { /// The peer's id. peer_id: PeerId, - /// The relation we have with that peer. + /// The peer's info. peer_info: PeerInfo, }, } diff --git a/bee-network/bee-gossip/src/swarm/behaviour.rs b/bee-network/bee-gossip/src/swarm/behaviour.rs index c849c6a8a1..f5329496d3 100644 --- a/bee-network/bee-gossip/src/swarm/behaviour.rs +++ b/bee-network/bee-gossip/src/swarm/behaviour.rs @@ -43,11 +43,7 @@ impl NetworkBehaviourEventProcess for SwarmBehaviour { fn inject_event(&mut self, event: IdentifyEvent) { match event { IdentifyEvent::Received { peer_id, info } => { - trace!( - "Received Identify response from {}: {:?}.", - alias!(peer_id), - info, - ); + trace!("Received Identify response from {}: {:?}.", alias!(peer_id), info,); // Panic: // Sending must not fail. From f2c96723b6c0c0a3c25d3141b3ec443728772013 Mon Sep 17 00:00:00 2001 From: /alex/ Date: Tue, 1 Mar 2022 17:04:36 +0100 Subject: [PATCH 20/25] Fix typos and German comma in changelog MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Jochen Görtler --- bee-network/bee-gossip/CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bee-network/bee-gossip/CHANGELOG.md b/bee-network/bee-gossip/CHANGELOG.md index 8ffff4698c..977c3e0720 100644 --- a/bee-network/bee-gossip/CHANGELOG.md +++ b/bee-network/bee-gossip/CHANGELOG.md @@ -23,8 +23,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- `PeerMetrics` type, that keeps a count of dial attempts and idenfication timestamp; -- `PeerUnreachable` event: fired after a certain number of unsuccessful dial attempts; +- `PeerMetrics` type that keeps a count of dial attempts and identification timestamp; +- `PeerUnreachable` event that is fired after a certain number of unsuccessful dial attempts; ### Changed From 693a82653f15679fcfc2428100f153f2809c1d65 Mon Sep 17 00:00:00 2001 From: Alexander Schmidt Date: Tue, 1 Mar 2022 17:29:25 +0100 Subject: [PATCH 21/25] Fix docs --- bee-network/bee-gossip/src/service/event.rs | 5 ++++- bee-network/bee-gossip/src/swarm/behaviour.rs | 8 ++++---- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/bee-network/bee-gossip/src/service/event.rs b/bee-network/bee-gossip/src/service/event.rs index 406aa1b037..fb4ef2f538 100644 --- a/bee-network/bee-gossip/src/service/event.rs +++ b/bee-network/bee-gossip/src/service/event.rs @@ -132,7 +132,10 @@ pub enum InternalEvent { }, /// The gossip protocol with a peer was stopped. - ProtocolStopped { peer_id: PeerId }, + ProtocolStopped { + /// The peer's id. + peer_id: PeerId + }, /// A peer didn't answer our repeated calls. PeerUnreachable { diff --git a/bee-network/bee-gossip/src/swarm/behaviour.rs b/bee-network/bee-gossip/src/swarm/behaviour.rs index f5329496d3..f65281e3bb 100644 --- a/bee-network/bee-gossip/src/swarm/behaviour.rs +++ b/bee-network/bee-gossip/src/swarm/behaviour.rs @@ -45,8 +45,8 @@ impl NetworkBehaviourEventProcess for SwarmBehaviour { IdentifyEvent::Received { peer_id, info } => { trace!("Received Identify response from {}: {:?}.", alias!(peer_id), info,); - // Panic: - // Sending must not fail. + // Panic: we made sure that the sender (network host) is always dropped before the receiver (service host) + // through the worker dependencies, hence this can never panic. self.internal_sender .send(InternalEvent::PeerIdentified { peer_id }) .expect("send internal event"); @@ -60,8 +60,8 @@ impl NetworkBehaviourEventProcess for SwarmBehaviour { IdentifyEvent::Error { peer_id, error } => { debug!("Identification error with {}: Cause: {:?}.", alias!(peer_id), error); - // Panic: - // Sending must not fail. + // Panic: we made sure that the sender (network host) is always dropped before the receiver (service host) + // through the worker dependencies, hence this can never panic. self.internal_sender .send(InternalEvent::PeerUnreachable { peer_id }) .expect("send internal event"); From 74c09b7f9c566690842ad452a3f09b93d5e9496b Mon Sep 17 00:00:00 2001 From: Alexander Schmidt Date: Tue, 1 Mar 2022 18:23:04 +0100 Subject: [PATCH 22/25] Address review comments --- Cargo.lock | 2 +- bee-api/bee-rest-api/CHANGELOG.md | 6 +- bee-api/bee-rest-api/Cargo.toml | 2 +- bee-network/bee-autopeering/src/task.rs | 2 +- bee-network/bee-gossip/src/service/event.rs | 4 +- bee-network/bee-gossip/src/service/host.rs | 32 ++++--- bee-network/bee-gossip/src/swarm/behaviour.rs | 8 +- .../src/swarm/protocols/iota_gossip/io.rs | 89 +++++++++---------- 8 files changed, 77 insertions(+), 68 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9f216a3dee..5cce54d180 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -406,7 +406,7 @@ dependencies = [ [[package]] name = "bee-rest-api" -version = "0.2.0" +version = "0.2.1" dependencies = [ "async-trait", "bech32", diff --git a/bee-api/bee-rest-api/CHANGELOG.md b/bee-api/bee-rest-api/CHANGELOG.md index 3b6d57d84a..08c6b11b9a 100644 --- a/bee-api/bee-rest-api/CHANGELOG.md +++ b/bee-api/bee-rest-api/CHANGELOG.md @@ -19,7 +19,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Security --> -## 0.2.0 - 2022-XX-XX +## 0.2.1 - 2022-02-28 + +- Update `bee-gossip` dependency to 0.5.0; + +## 0.2.0 - 2022-01-28 ### Added diff --git a/bee-api/bee-rest-api/Cargo.toml b/bee-api/bee-rest-api/Cargo.toml index 6c96b8c8a3..ad1432711d 100644 --- a/bee-api/bee-rest-api/Cargo.toml +++ b/bee-api/bee-rest-api/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bee-rest-api" -version = "0.2.0" +version = "0.2.1" authors = [ "IOTA Stiftung" ] edition = "2021" description = "The default REST API implementation for the IOTA Bee node software." diff --git a/bee-network/bee-autopeering/src/task.rs b/bee-network/bee-autopeering/src/task.rs index b37675d440..793c0aac5a 100644 --- a/bee-network/bee-autopeering/src/task.rs +++ b/bee-network/bee-autopeering/src/task.rs @@ -119,7 +119,7 @@ impl TaskManager { log::trace!("Shutting down: {}", task_name); if shutdown_tx.send(()).is_err() { - log::error!("Error sending shutdown signal to task {task_name}. This is a bug."); + log::error!("Error sending shutdown signal to task {task_name}."); } } diff --git a/bee-network/bee-gossip/src/service/event.rs b/bee-network/bee-gossip/src/service/event.rs index fb4ef2f538..dff453fcff 100644 --- a/bee-network/bee-gossip/src/service/event.rs +++ b/bee-network/bee-gossip/src/service/event.rs @@ -132,9 +132,9 @@ pub enum InternalEvent { }, /// The gossip protocol with a peer was stopped. - ProtocolStopped { + ProtocolStopped { /// The peer's id. - peer_id: PeerId + peer_id: PeerId, }, /// A peer didn't answer our repeated calls. diff --git a/bee-network/bee-gossip/src/service/host.rs b/bee-network/bee-gossip/src/service/host.rs index fc9d0cbd87..a7ec74ecb1 100644 --- a/bee-network/bee-gossip/src/service/host.rs +++ b/bee-network/bee-gossip/src/service/host.rs @@ -1,8 +1,6 @@ // Copyright 2020-2021 IOTA Stiftung // SPDX-License-Identifier: Apache-2.0 -use std::time::{SystemTime, UNIX_EPOCH}; - use super::{ command::{Command, CommandReceiver, CommandSender}, error::Error, @@ -33,6 +31,8 @@ use rand::Rng; use tokio::time::{self, Duration, Instant}; use tokio_stream::wrappers::{IntervalStream, UnboundedReceiverStream}; +use std::time::{SystemTime, UNIX_EPOCH}; + const MAX_PEER_STATE_CHECKER_DELAY_MILLIS: u64 = 2000; const MAX_DIALS: usize = 3; @@ -436,17 +436,27 @@ async fn process_internal_event( // Spin up separate buffered reader and writer to efficiently process the gossip with that peer. let (r, w) = substream.split(); - let reader = BufReader::with_capacity(IO_BUFFER_LEN, r); - let writer = BufWriter::with_capacity(IO_BUFFER_LEN, w); + let inbound_gossip_rx = BufReader::with_capacity(IO_BUFFER_LEN, r); + let outbound_gossip_tx = BufWriter::with_capacity(IO_BUFFER_LEN, w); - let (incoming_tx, incoming_rx) = iota_gossip::channel(); - let (outgoing_tx, outgoing_rx) = iota_gossip::channel(); + let (inbound_gossip_tx, gossip_in) = iota_gossip::channel(); + let (gossip_out, outbound_gossip_rx) = iota_gossip::channel(); - iota_gossip::start_incoming_processor(peer_id, reader, incoming_tx, senders.internal_events.clone()); - iota_gossip::start_outgoing_processor(peer_id, writer, outgoing_rx, senders.internal_events.clone()); + iota_gossip::start_inbound_gossip_handler( + peer_id, + inbound_gossip_rx, + inbound_gossip_tx, + senders.internal_events.clone(), + ); + iota_gossip::start_outbound_gossip_handler( + peer_id, + outbound_gossip_tx, + outbound_gossip_rx, + senders.internal_events.clone(), + ); // We store a clone of the gossip send channel in order to send a shutdown signal. - let _ = peerlist.update_state(&peer_id, |state| state.set_connected(outgoing_tx.clone())); + let _ = peerlist.update_state(&peer_id, |state| state.set_connected(gossip_out.clone())); // We no longer need to hold the lock. drop(peerlist); @@ -476,8 +486,8 @@ async fn process_internal_event( .send(Event::PeerConnected { peer_id, info: peer_info, - gossip_in: incoming_rx, - gossip_out: outgoing_tx, + gossip_in, + gossip_out, }) .map_err(|_| Error::SendingEventFailed)?; } else { diff --git a/bee-network/bee-gossip/src/swarm/behaviour.rs b/bee-network/bee-gossip/src/swarm/behaviour.rs index f65281e3bb..8ef4d4ae90 100644 --- a/bee-network/bee-gossip/src/swarm/behaviour.rs +++ b/bee-network/bee-gossip/src/swarm/behaviour.rs @@ -45,8 +45,8 @@ impl NetworkBehaviourEventProcess for SwarmBehaviour { IdentifyEvent::Received { peer_id, info } => { trace!("Received Identify response from {}: {:?}.", alias!(peer_id), info,); - // Panic: we made sure that the sender (network host) is always dropped before the receiver (service host) - // through the worker dependencies, hence this can never panic. + // Panic: we made sure that the sender (network host) is always dropped before the receiver (service + // host) through the worker dependencies, hence this can never panic. self.internal_sender .send(InternalEvent::PeerIdentified { peer_id }) .expect("send internal event"); @@ -60,8 +60,8 @@ impl NetworkBehaviourEventProcess for SwarmBehaviour { IdentifyEvent::Error { peer_id, error } => { debug!("Identification error with {}: Cause: {:?}.", alias!(peer_id), error); - // Panic: we made sure that the sender (network host) is always dropped before the receiver (service host) - // through the worker dependencies, hence this can never panic. + // Panic: we made sure that the sender (network host) is always dropped before the receiver (service + // host) through the worker dependencies, hence this can never panic. self.internal_sender .send(InternalEvent::PeerUnreachable { peer_id }) .expect("send internal event"); diff --git a/bee-network/bee-gossip/src/swarm/protocols/iota_gossip/io.rs b/bee-network/bee-gossip/src/swarm/protocols/iota_gossip/io.rs index b69a1e62f6..9de32ac552 100644 --- a/bee-network/bee-gossip/src/swarm/protocols/iota_gossip/io.rs +++ b/bee-network/bee-gossip/src/swarm/protocols/iota_gossip/io.rs @@ -28,85 +28,80 @@ pub fn channel() -> (GossipSender, GossipReceiver) { (sender, UnboundedReceiverStream::new(receiver)) } -pub fn start_incoming_processor( +pub fn start_inbound_gossip_handler( peer_id: PeerId, - mut reader: BufReader>>, - incoming_tx: GossipSender, - internal_event_sender: InternalEventSender, + mut inbound_gossip_rx: BufReader>>, + inbound_gossip_tx: GossipSender, + internal_event_tx: InternalEventSender, ) { tokio::spawn(async move { - let mut msg_buf = vec![0u8; MSG_BUFFER_LEN]; + let mut buf = vec![0u8; MSG_BUFFER_LEN]; loop { - if let Some(len) = (&mut reader).read(&mut msg_buf).await.ok().filter(|len| *len > 0) { - if incoming_tx.send(msg_buf[..len].to_vec()).is_err() { - debug!("gossip-in: receiver dropped locally."); + if let Some(len) = (&mut inbound_gossip_rx) + .read(&mut buf) + .await + .ok() + .filter(|len| *len > 0) + { + if inbound_gossip_tx.send(buf[..len].to_vec()).is_err() { + debug!("Terminating gossip protocol with {}.", alias!(peer_id)); - // The receiver of this channel was dropped, maybe due to a shutdown. There is nothing we can do - // to salvage this situation, hence we drop the connection. break; } } else { - debug!("gossip-in: stream closed remotely."); + debug!("Peer {} terminated gossip protocol.", alias!(peer_id)); - // NB: The network service will not shut down before it has received the `ProtocolDropped` event - // from all once connected peers, hence if the following send fails, then it - // must be considered a bug. + // Panic: we made sure that the sender (network host) is always dropped before the receiver (service + // host) through the worker dependencies, hence this can never panic. + internal_event_tx + .send(InternalEvent::ProtocolStopped { peer_id }) + .expect("send internal event"); break; } } - // Ignore send errors. - let _ = internal_event_sender.send(InternalEvent::ProtocolStopped { peer_id }); - - // Reasons why this task might end: - // (1) The remote dropped the TCP connection. - // (2) The local dropped the gossip_in receiver channel. - - debug!("gossip-in: exiting gossip-in processor for {}.", alias!(peer_id)); + trace!("Dropping gossip stream reader for {}.", alias!(peer_id)); }); } -pub fn start_outgoing_processor( +pub fn start_outbound_gossip_handler( peer_id: PeerId, - mut writer: BufWriter>>, - outgoing_rx: GossipReceiver, - internal_event_sender: InternalEventSender, + mut outbound_gossip_tx: BufWriter>>, + outbound_gossip_rx: GossipReceiver, + internal_event_tx: InternalEventSender, ) { tokio::spawn(async move { - let mut outgoing_gossip_receiver = outgoing_rx.fuse(); + let mut outbound_gossip_rx = outbound_gossip_rx.fuse(); // If the gossip sender dropped we end the connection. - while let Some(message) = outgoing_gossip_receiver.next().await { - // NB: Instead of polling another shutdown channel, we use an empty message + while let Some(message) = outbound_gossip_rx.next().await { + // Note: Instead of polling another shutdown channel, we use an empty message // to signal that we want to end the connection. We use this "trick" whenever the network // receives the `DisconnectPeer` command to enforce that the connection will be dropped. - if message.is_empty() { - debug!("gossip-out: received shutdown message."); + debug!( + "Terminating gossip protocol with {} (received shutdown signal).", + alias!(peer_id) + ); + + // Panic: we made sure that the sender (network host) is always dropped before the receiver (service + // host) through the worker dependencies, hence this can never panic. + internal_event_tx + .send(InternalEvent::ProtocolStopped { peer_id }) + .expect("send internal event"); - // NB: The network service will not shut down before it has received the `ConnectionDropped` event - // from all once connected peers, hence if the following send fails, then it - // must be considered a bug. break; - } + } else if (&mut outbound_gossip_tx).write_all(&message).await.is_err() + || (&mut outbound_gossip_tx).flush().await.is_err() + { + debug!("Peer {} terminated gossip protocol.", alias!(peer_id)); - // If sending to the stream fails we end the connection. - // TODO: buffer for x milliseconds before flushing. - if (&mut writer).write_all(&message).await.is_err() || (&mut writer).flush().await.is_err() { - debug!("gossip-out: stream closed remotely"); break; } } - // Ignore send errors. - let _ = internal_event_sender.send(InternalEvent::ProtocolStopped { peer_id }); - - // Reasons why this task might end: - // (1) The local send the shutdown message (len = 0) - // (2) The remote dropped the TCP connection. - - debug!("gossip-out: exiting gossip-out processor for {}.", alias!(peer_id)); + trace!("Dropping gossip stream writer for {}.", alias!(peer_id)); }); } From e65444b9e572f280a4ee6d0f8073e19323d320e5 Mon Sep 17 00:00:00 2001 From: Alexander Schmidt Date: Wed, 2 Mar 2022 12:08:45 +0100 Subject: [PATCH 23/25] Simplify 'add_local_addr' method --- bee-network/bee-gossip/src/peer/list.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/bee-network/bee-gossip/src/peer/list.rs b/bee-network/bee-gossip/src/peer/list.rs index 89c669e287..03815b6af1 100644 --- a/bee-network/bee-gossip/src/peer/list.rs +++ b/bee-network/bee-gossip/src/peer/list.rs @@ -121,13 +121,11 @@ impl PeerList { /// Note: Returns an error if the address trying to be added is a duplicate. pub fn add_local_addr(&mut self, addr: Multiaddr) -> Result<(), (Multiaddr, Error)> { - if self.local_addrs.contains(&addr) { - return Err((addr.clone(), Error::AddressIsDuplicate(addr))); + if self.local_addrs.insert(addr.clone()) { + Ok(()) + } else { + Err((addr.clone(), Error::AddressIsDuplicate(addr))) } - - let _ = self.local_addrs.insert(addr); - - Ok(()) } pub fn update_info(&mut self, peer_id: &PeerId, mut update: U) -> Result<(), Error> From 0016bab0f9ff87b5f0fc6ac95066f7c5ba4c1765 Mon Sep 17 00:00:00 2001 From: Alexander Schmidt Date: Wed, 2 Mar 2022 12:19:11 +0100 Subject: [PATCH 24/25] Add explanation why sending commands to gossip layer cannot fail --- bee-protocol/src/workers/peer/manager.rs | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/bee-protocol/src/workers/peer/manager.rs b/bee-protocol/src/workers/peer/manager.rs index 268e1cb83c..b49c18216e 100644 --- a/bee-protocol/src/workers/peer/manager.rs +++ b/bee-protocol/src/workers/peer/manager.rs @@ -192,11 +192,13 @@ where NetworkEvent::PeerUnreachable { peer_id, peer_info } => { if peer_info.relation.is_discovered() { // Remove that discovered peer. + + // Panic: sending commands cannot fail: same explanation as in other sender usages. gossip_command_tx .send(Command::RemovePeer { peer_id }) .expect("send gossip command"); - // Todo: tell the autopeering to remove that peer from the neighborhood. + // TODO: tell the autopeering to remove that peer from the neighborhood. } } _ => (), // Ignore all other events for now @@ -210,13 +212,14 @@ where } } -fn handle_new_peering(peer: bee_autopeering::Peer, network_name: &str, command_tx: &NetworkCommandSender) { +fn handle_new_peering(peer: bee_autopeering::Peer, network_name: &str, gossip_command_tx: &NetworkCommandSender) { if let Some(multiaddr) = peer.service_multiaddr(network_name) { let peer_id = peer.peer_id().libp2p_peer_id(); - // Panic: - // Sending commands must not fail. - command_tx + // Panic: sending commands cannot fail due to worker dependencies: because the "Peer Manager" depends on + // the `bee-gossip` "ServiceHost", it is guaranteed that the receiver of this channel is not dropped + // before the sender. + gossip_command_tx .send(Command::AddPeer { peer_id, alias: Some(alias!(peer_id).to_string()), @@ -227,12 +230,11 @@ fn handle_new_peering(peer: bee_autopeering::Peer, network_name: &str, command_t } } -fn handle_peering_dropped(peer_id: bee_autopeering::PeerId, command_tx: &NetworkCommandSender) { +fn handle_peering_dropped(peer_id: bee_autopeering::PeerId, gossip_command_tx: &NetworkCommandSender) { let peer_id = peer_id.libp2p_peer_id(); - // Panic: - // Sending commands must not fail. - command_tx + // Panic: sending commands cannot fail: same explanation as in other sender usages. + gossip_command_tx .send(Command::RemovePeer { peer_id }) .expect("send command to gossip layer"); } From a0502accc450558e741827d1aff72e9d1ff91f73 Mon Sep 17 00:00:00 2001 From: Alexander Schmidt Date: Wed, 2 Mar 2022 12:52:23 +0100 Subject: [PATCH 25/25] Rename to 'identified_at' --- bee-network/bee-gossip/src/peer/list.rs | 2 +- bee-network/bee-gossip/src/service/host.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/bee-network/bee-gossip/src/peer/list.rs b/bee-network/bee-gossip/src/peer/list.rs index 03815b6af1..9a5aed68aa 100644 --- a/bee-network/bee-gossip/src/peer/list.rs +++ b/bee-network/bee-gossip/src/peer/list.rs @@ -499,7 +499,7 @@ pub enum PeerState { #[derive(Clone, Debug, Default)] pub struct PeerMetrics { pub(crate) num_dials: usize, - pub(crate) identified: Option, + pub(crate) identified_at: Option, } impl Default for PeerState { diff --git a/bee-network/bee-gossip/src/service/host.rs b/bee-network/bee-gossip/src/service/host.rs index a7ec74ecb1..9f5dca129a 100644 --- a/bee-network/bee-gossip/src/service/host.rs +++ b/bee-network/bee-gossip/src/service/host.rs @@ -511,7 +511,7 @@ async fn process_internal_event( // Reset dial count. m.num_dials = 0; // Update Identify timestamp. - m.identified = Some( + m.identified_at = Some( SystemTime::now() .duration_since(UNIX_EPOCH) .expect("system time")