From a818140a4512be8926f542384cdf994d85c43e70 Mon Sep 17 00:00:00 2001 From: Alexander Schmidt Date: Fri, 5 Nov 2021 16:29:00 +0100 Subject: [PATCH] Fix warnings, clippy, format --- bee-autopeering/examples/node.rs | 15 +-- bee-autopeering/src/command.rs | 14 +- bee-autopeering/src/config.rs | 3 +- bee-autopeering/src/delay.rs | 22 ++-- bee-autopeering/src/discovery/manager.rs | 150 ++++++++++------------ bee-autopeering/src/discovery/messages.rs | 8 ++ bee-autopeering/src/discovery/query.rs | 34 ++--- bee-autopeering/src/event.rs | 6 +- bee-autopeering/src/init.rs | 43 ++----- bee-autopeering/src/lib.rs | 15 ++- bee-autopeering/src/local/mod.rs | 14 +- bee-autopeering/src/local/salt.rs | 4 + bee-autopeering/src/local/services.rs | 10 +- bee-autopeering/src/multiaddr.rs | 24 ++-- bee-autopeering/src/packet.rs | 2 + bee-autopeering/src/peer/mod.rs | 34 +++-- bee-autopeering/src/peer/peer_id.rs | 23 ++-- bee-autopeering/src/peer/peerlist.rs | 66 +++++++--- bee-autopeering/src/peer/peerstore.rs | 50 +++----- bee-autopeering/src/peering/filter.rs | 22 ++-- bee-autopeering/src/peering/manager.rs | 130 +++++++++---------- bee-autopeering/src/peering/mod.rs | 2 - bee-autopeering/src/peering/neighbor.rs | 41 +++--- bee-autopeering/src/peering/update.rs | 19 ++- bee-autopeering/src/request.rs | 19 +-- bee-autopeering/src/server.rs | 10 +- bee-autopeering/src/task.rs | 24 ++-- bee-autopeering/src/time.rs | 2 - 28 files changed, 370 insertions(+), 436 deletions(-) diff --git a/bee-autopeering/examples/node.rs b/bee-autopeering/examples/node.rs index 1f79b95ab5..fecc33cf8e 100644 --- a/bee-autopeering/examples/node.rs +++ b/bee-autopeering/examples/node.rs @@ -5,8 +5,7 @@ use bee_autopeering::{ init, - peerstore::InMemoryPeerStore, - peerstore::{SledPeerStore, SledPeerStoreConfig}, + peerstore::{InMemoryPeerStore, SledPeerStore, SledPeerStoreConfig}, AutopeeringConfig, Event, Local, NeighborValidator, Peer, ServiceProtocol, AUTOPEERING_SERVICE_NAME, }; @@ -36,10 +35,10 @@ fn read_config() -> AutopeeringConfig { // "bindAddress": "0.0.0.0:14627", // "entryNodes": [ // "/dns/lucamoser.ch/udp/14826/autopeering/4H6WV54tB29u8xCcEaMGQMn37LFvM1ynNpp27TTXaqNM", - // "/dns/entry-hornet-0.h.chrysalis-mainnet.iotaledger.net/udp/14626/autopeering/iotaPHdAn7eueBnXtikZMwhfPXaeGJGXDt4RBuLuGgb", - // "/dns/entry-hornet-1.h.chrysalis-mainnet.iotaledger.net/udp/14626/autopeering/iotaJJqMd5CQvv1A61coSQCYW9PNT1QKPs7xh2Qg5K2", - // "/dns/entry-mainnet.tanglebay.com/udp/14626/autopeering/iot4By1FD4pFLrGJ6AAe7YEeSu9RbW9xnPUmxMdQenC" - // ], + // "/dns/entry-hornet-0.h.chrysalis-mainnet.iotaledger.net/udp/14626/autopeering/ + // iotaPHdAn7eueBnXtikZMwhfPXaeGJGXDt4RBuLuGgb", "/dns/entry-hornet-1.h.chrysalis-mainnet.iotaledger. + // net/udp/14626/autopeering/iotaJJqMd5CQvv1A61coSQCYW9PNT1QKPs7xh2Qg5K2", "/dns/entry-mainnet. + // tanglebay.com/udp/14626/autopeering/iot4By1FD4pFLrGJ6AAe7YEeSu9RbW9xnPUmxMdQenC" ], // "entryNodesPreferIPv6": false, // "runAsEntryNode": false // }"#; @@ -48,8 +47,8 @@ fn read_config() -> AutopeeringConfig { // { // "bindAddress": "0.0.0.0:14627", // "entryNodes": [ - // "/dns/entry-hornet-0.h.chrysalis-mainnet.iotaledger.net/udp/14626/autopeering/iotaPHdAn7eueBnXtikZMwhfPXaeGJGXDt4RBuLuGgb" - // ], + // "/dns/entry-hornet-0.h.chrysalis-mainnet.iotaledger.net/udp/14626/autopeering/ + // iotaPHdAn7eueBnXtikZMwhfPXaeGJGXDt4RBuLuGgb" ], // "entryNodesPreferIPv6": false, // "runAsEntryNode": false // }"#; diff --git a/bee-autopeering/src/command.rs b/bee-autopeering/src/command.rs index 2777e7b0bb..ea40f5d058 100644 --- a/bee-autopeering/src/command.rs +++ b/bee-autopeering/src/command.rs @@ -5,12 +5,18 @@ use crate::peer::peer_id::PeerId; use tokio::sync::mpsc; +// TODO: revisit dead code +#[allow(dead_code)] #[derive(Debug)] pub(crate) enum Command { - SendVerificationRequest { peer_id: PeerId }, - SendDiscoveryRequest { peer_id: PeerId }, - SendVerificationRequests, - SendDiscoveryRequests, + // Send a verfication request to that peer. + Verify { peer_id: PeerId }, + // Send a discovery request to that peer. + Query { peer_id: PeerId }, + // Send a peering request to that peer. + Peer { peer_id: PeerId }, + // Send a drop-peering request to that peer. + Drop { peer_id: PeerId }, } pub(crate) type CommandRx = mpsc::UnboundedReceiver; diff --git a/bee-autopeering/src/config.rs b/bee-autopeering/src/config.rs index c8c33ce75c..95c428f48e 100644 --- a/bee-autopeering/src/config.rs +++ b/bee-autopeering/src/config.rs @@ -10,14 +10,13 @@ use serde::{Deserialize, Serialize}; use std::net::SocketAddr; #[rustfmt::skip] +// # Example // ```json // "autopeering": { // "bindAddress": "0.0.0.0:14626", // "entryNodes": [ -// "/dns/lucamoser.ch/udp/14826/autopeering/4H6WV54tB29u8xCcEaMGQMn37LFvM1ynNpp27TTXaqNM", // "/dns/entry-hornet-0.h.chrysalis-mainnet.iotaledger.net/udp/14626/autopeering/iotaPHdAn7eueBnXtikZMwhfPXaeGJGXDt4RBuLuGgb", // "/dns/entry-hornet-1.h.chrysalis-mainnet.iotaledger.net/udp/14626/autopeering/iotaJJqMd5CQvv1A61coSQCYW9PNT1QKPs7xh2Qg5K2", -// "/dns/entry-mainnet.tanglebay.com/udp/14626/autopeering/iot4By1FD4pFLrGJ6AAe7YEeSu9RbW9xnPUmxMdQenC" // ], // "entryNodesPreferIPv6": false, // "runAsEntryNode": false, diff --git a/bee-autopeering/src/delay.rs b/bee-autopeering/src/delay.rs index e680aa5b2f..f35d7819b7 100644 --- a/bee-autopeering/src/delay.rs +++ b/bee-autopeering/src/delay.rs @@ -1,18 +1,17 @@ // Copyright 2021 IOTA Stiftung // SPDX-License-Identifier: Apache-2.0 -use crate::{task::ShutdownRx, time}; - use rand::{thread_rng, Rng as _}; use std::{ - future::Future, sync::atomic::{AtomicU64, Ordering}, time::{Duration, Instant}, }; pub(crate) type Delay = Duration; +// TODO: revisit dead code +#[allow(dead_code)] #[derive(Default)] pub(crate) struct DelayFactoryBuilder { max_count: Option, @@ -21,6 +20,8 @@ pub(crate) struct DelayFactoryBuilder { mode: DelayFactoryMode, } +// TODO: revisit dead code +#[allow(dead_code)] impl DelayFactoryBuilder { pub fn new(mode: DelayFactoryMode) -> Self { Self { @@ -77,12 +78,11 @@ impl Iterator for DelayFactory { type Item = Delay; fn next(&mut self) -> Option { - if self.curr_count >= self.max_count { - None - } else if Instant::now() - .checked_duration_since(self.timestamp) - .expect("error duration since") - > self.timeout + if self.curr_count >= self.max_count + || Instant::now() + .checked_duration_since(self.timestamp) + .expect("error duration since") + > self.timeout { None } else { @@ -97,7 +97,7 @@ impl Iterator for DelayFactory { }; self.curr_count += 1; - if self.jitter != 1.0 { + if (self.jitter - 1.0).abs() > f32::EPSILON { next_interval_millis = thread_rng().gen_range(((next_interval_millis as f32 * self.jitter) as u64)..next_interval_millis) } @@ -107,6 +107,8 @@ impl Iterator for DelayFactory { } } +// TODO: revisit dead code +#[allow(dead_code)] /// The different "Modi operandi" for the [`DelayFactory`]. pub(crate) enum DelayFactoryMode { /// The factory produces a series of 0-delays. diff --git a/bee-autopeering/src/discovery/manager.rs b/bee-autopeering/src/discovery/manager.rs index a0f3d1f23f..6652dc48e4 100644 --- a/bee-autopeering/src/discovery/manager.rs +++ b/bee-autopeering/src/discovery/manager.rs @@ -25,23 +25,29 @@ use crate::{ time::{self, SECOND}, }; -use crypto::signatures::ed25519::PublicKey; use rand::{seq::index, Rng as _}; -use tokio::sync::oneshot; -use std::net::{IpAddr, SocketAddr}; +use std::net::SocketAddr; // Time interval after which the next peer is reverified. pub(crate) const DEFAULT_REVERIFY_INTERVAL_SECS: u64 = 10 * SECOND; // Time interval after which peers are queried for new peers. pub(crate) const DEFAULT_QUERY_INTERVAL_SECS: u64 = 60 * SECOND; // The default delay between requests to a single peer. +// TODO: revisit dead code +#[allow(dead_code)] const BACKOFF_INTERVALL_MILLISECS: u64 = 500; // A factor that determines the range from which a concrete delay is picked randomly. +// TODO: revisit dead code +#[allow(dead_code)] const JITTER: f32 = 0.5; // A factor that determines the intervall lengths between repeated requests to a peer. +// TODO: revisit dead code +#[allow(dead_code)] const EXPONENTIAL_BACKOFF_FACTOR: f32 = 1.5; // The number of times a request is repeated in case the peer doesn't reply. +// TODO: revisit dead code +#[allow(dead_code)] const MAX_RETRIES: usize = 2; // Is the time until a peer verification expires (12 hours). pub(crate) const VERIFICATION_EXPIRATION_SECS: u64 = 12 * time::HOUR; @@ -50,6 +56,8 @@ const MAX_PEERS_IN_RESPONSE: usize = 6; // Is the minimum number of verifications required to be selected in DiscoveryResponse. const MIN_VERIFIED_IN_RESPONSE: usize = 1; // Is the maximum number of services a peer can support. +// TODO: revisit dead code +#[allow(dead_code)] const MAX_SERVICES: usize = 5; // pub(crate) type DiscoveryHandler = Box) + Send + Sync + 'static>; @@ -57,9 +65,13 @@ const MAX_SERVICES: usize = 5; pub(crate) struct DiscoveryManagerConfig { pub(crate) entry_nodes: Vec, pub(crate) entry_nodes_prefer_ipv6: bool, + // TODO: revisit dead code + #[allow(dead_code)] pub(crate) run_as_entry_node: bool, pub(crate) version: u32, pub(crate) network_id: u32, + // TODO: revisit dead code + #[allow(dead_code)] pub(crate) bind_addr: SocketAddr, } @@ -98,6 +110,7 @@ pub(crate) struct DiscoveryManager { } impl DiscoveryManager { + #[allow(clippy::too_many_arguments)] pub(crate) fn new( config: DiscoveryManagerConfig, local: Local, @@ -138,26 +151,21 @@ impl DiscoveryManager { let DiscoveryManagerConfig { mut entry_nodes, entry_nodes_prefer_ipv6, - run_as_entry_node, + run_as_entry_node: _, version, network_id, - bind_addr, + bind_addr: _, } = config; - let ServerSocket { - mut server_rx, - server_tx, - } = socket; + let ServerSocket { server_rx, server_tx } = socket; // Add previously discovered peers from the peer store. - add_peers_from_store(&peerstore, &local, &active_peers, &replacements); + add_peers_from_store(&peerstore, &active_peers, &replacements); // Add master peers from the config. add_master_peers( &mut entry_nodes, entry_nodes_prefer_ipv6, - &request_mngr, - &server_tx, &local, &master_peers, &active_peers, @@ -169,7 +177,6 @@ impl DiscoveryManager { server_tx: server_tx.clone(), server_rx, local: local.clone(), - peerstore: peerstore.clone(), version, network_id, request_mngr: request_mngr.clone(), @@ -182,25 +189,22 @@ impl DiscoveryManager { let discovery_send_handler = DiscoverySendHandler { server_tx, - local, - peerstore, request_mngr, active_peers, command_rx, }; - task_mngr.run::>(discovery_recv_handler); - task_mngr.run::>(discovery_send_handler); + task_mngr.run::(discovery_recv_handler); + task_mngr.run::(discovery_send_handler); command_tx } } -struct DiscoveryRecvHandler { +struct DiscoveryRecvHandler { server_rx: ServerRx, server_tx: ServerTx, local: Local, - peerstore: S, version: u32, network_id: u32, request_mngr: RequestManager, @@ -209,17 +213,15 @@ struct DiscoveryRecvHandler { replacements: ReplacementList, } -struct DiscoverySendHandler { +struct DiscoverySendHandler { server_tx: ServerTx, - local: Local, - peerstore: S, request_mngr: RequestManager, active_peers: ActivePeersList, command_rx: CommandRx, } #[async_trait::async_trait] -impl Runnable for DiscoveryRecvHandler { +impl Runnable for DiscoveryRecvHandler { const NAME: &'static str = "DiscoveryRecvHandler"; const SHUTDOWN_PRIORITY: u8 = 4; @@ -349,7 +351,7 @@ impl Runnable for DiscoveryRecvHandler { } #[async_trait::async_trait] -impl Runnable for DiscoverySendHandler { +impl Runnable for DiscoverySendHandler { const NAME: &'static str = "DiscoverySendHandler"; const SHUTDOWN_PRIORITY: u8 = 5; @@ -358,8 +360,6 @@ impl Runnable for DiscoverySendHandler { async fn run(self, mut shutdown_rx: Self::ShutdownSignal) { let DiscoverySendHandler { server_tx, - local, - peerstore, request_mngr, active_peers, mut command_rx, @@ -368,25 +368,18 @@ impl Runnable for DiscoverySendHandler { 'recv: loop { tokio::select! { _ = &mut shutdown_rx => { - break; + break 'recv; } o = command_rx.recv() => { if let Some(command) = o { match command { - Command::SendVerificationRequest { peer_id } => { + Command::Verify{ peer_id } => { send_verification_request_to_peer(&peer_id, &active_peers, &request_mngr, &server_tx, None); } - Command::SendDiscoveryRequest { peer_id } => { + Command::Query{ peer_id } => { send_discovery_request_to_peer(&peer_id, &active_peers, &request_mngr, &server_tx, None); } - Command::SendVerificationRequests => { - // TODO: For testing purposes only => remove it at some point - send_verification_request_to_all_active(&active_peers, &request_mngr, &server_tx); - } - Command::SendDiscoveryRequests => { - // TODO: For testing purposes only => remove it at some point - send_discovery_request_to_all_verified(&active_peers, &request_mngr, &server_tx); - } + _ => {} } } } @@ -395,12 +388,7 @@ impl Runnable for DiscoverySendHandler { } } -fn add_peers_from_store( - peerstore: &S, - local: &Local, - active_peers: &ActivePeersList, - replacements: &ReplacementList, -) { +fn add_peers_from_store(peerstore: &S, active_peers: &ActivePeersList, replacements: &ReplacementList) { let mut num_added = 0; let mut write = active_peers.write(); @@ -422,8 +410,6 @@ fn add_peers_from_store( async fn add_master_peers( entry_nodes: &mut Vec, entry_nodes_prefer_ipv6: bool, - request_mngr: &RequestManager, - server_tx: &ServerTx, local: &Local, master_peers: &MasterPeersList, active_peers: &ActivePeersList, @@ -431,7 +417,7 @@ async fn add_master_peers( ) { let mut num_added = 0; - for mut entry_addr in entry_nodes { + for entry_addr in entry_nodes { let entry_socketaddr = match entry_addr.address_kind() { AddressKind::Ip4 | AddressKind::Ip6 => { // Unwrap: for those address kinds the returned option is always `Some`. @@ -464,12 +450,12 @@ async fn add_master_peers( } }; - let mut peer = Peer::new(entry_socketaddr.ip(), entry_addr.public_key().clone()); + let mut peer = Peer::new(entry_socketaddr.ip(), *entry_addr.public_key()); peer.add_service(AUTOPEERING_SERVICE_NAME, ServiceProtocol::Udp, entry_socketaddr.port()); - let peer_id = peer.peer_id().clone(); + let peer_id = *peer.peer_id(); - master_peers.write().insert(peer_id.clone()); + master_peers.write().insert(peer_id); // Also add it as a regular peer. if add_peer::(peer, local, active_peers, replacements) { @@ -536,9 +522,7 @@ pub(crate) fn remove_peer_from_active_list( // ``` if removed_peer.metrics().verified_count() > 0 { event_tx - .send(Event::PeerDeleted { - peer_id: peer_id.clone(), - }) + .send(Event::PeerDeleted { peer_id: *peer_id }) .expect("error sending `PeerDeleted` event"); } @@ -581,11 +565,15 @@ pub(crate) struct RecvContext<'a> { #[derive(Debug, Clone, Copy)] pub(crate) enum ValidationError { // The protocol version must match. + // TODO: revisit dead code + #[allow(dead_code)] VersionMismatch { expected: u32, received: u32, }, // The network id must match. + // TODO: revisit dead code + #[allow(dead_code)] NetworkIdMismatch { expected: u32, received: u32, @@ -599,6 +587,8 @@ pub(crate) enum ValidationError { // The peer must have an autopeering service. NoAutopeeringService, // The service port must match with the detected port. + // TODO: revisit dead code + #[allow(dead_code)] ServicePortMismatch { expected: ServicePort, received: ServicePort, @@ -642,7 +632,7 @@ fn validate_verification_response( use ValidationError::*; if let Some(reqv) = request_mngr.write().pull::(peer_id) { - if verif_res.request_hash() == &reqv.request_hash { + if verif_res.request_hash() == reqv.request_hash { let res_services = verif_res.services(); if let Some(res_peering) = res_services.get(AUTOPEERING_SERVICE_NAME) { if res_peering.port() == source_socket_addr.port() { @@ -710,10 +700,9 @@ fn handle_verification_request(verif_req: VerificationRequest, ctx: RecvContext) // Is this a known peer? if peer::is_known(ctx.peer_id, ctx.local, ctx.active_peers, ctx.replacements) { // Update verification request timestamp - ctx.active_peers - .write() - .find_mut(ctx.peer_id) - .map(|peer| peer.metrics_mut().set_last_verif_request_timestamp()); + if let Some(peer) = ctx.active_peers.write().find_mut(ctx.peer_id) { + peer.metrics_mut().set_last_verif_request_timestamp(); + } if peer::is_verified(ctx.peer_id, ctx.active_peers) { return; @@ -737,14 +726,13 @@ fn handle_verification_response(verif_res: VerificationResponse, verif_reqval: R // * Update its services; // * Fire the "peer discovered" event; if verified_count == 1 { - ctx.active_peers - .write() - .find_mut(ctx.peer_id) - .map(|p| p.peer_mut().set_services(verif_res.services().clone())); - - ctx.event_tx.send(Event::PeerDiscovered { - peer_id: ctx.peer_id.clone(), - }); + if let Some(peer) = ctx.active_peers.write().find_mut(ctx.peer_id) { + peer.peer_mut().set_services(verif_res.services().clone()) + } + + ctx.event_tx + .send(Event::PeerDiscovered { peer_id: *ctx.peer_id }) + .expect("error publishing peer-discovered event"); } } @@ -760,7 +748,7 @@ fn handle_verification_response(verif_res: VerificationResponse, verif_reqval: R } } -fn handle_discovery_request(disc_req: DiscoveryRequest, ctx: RecvContext) { +fn handle_discovery_request(_disc_req: DiscoveryRequest, ctx: RecvContext) { log::trace!("Handling discovery request."); let request_hash = msg_hash(MessageType::DiscoveryRequest, ctx.msg_bytes).to_vec(); @@ -791,8 +779,6 @@ fn handle_discovery_response(disc_res: DiscoveryResponse, disc_reqval: RequestVa // Add discovered peers to the peer list and peer store. for peer in disc_res.into_peers() { - let peer_id = peer.peer_id().clone(); - // Note: we only fire `PeerDiscovered` if it can be verified. if add_peer::(peer, ctx.local, ctx.active_peers, ctx.replacements) { num_added += 1; @@ -846,7 +832,9 @@ pub(crate) async fn begin_verification( } } +// TODO: revisit dead code // TEMP: for testing purposes +#[allow(dead_code)] pub(crate) fn send_verification_request_to_all_active( active_peers: &ActivePeersList, request_mngr: &RequestManager, @@ -906,7 +894,7 @@ pub(crate) fn send_verification_request_to_addr( let verif_req = request_mngr .write() - .new_verification_request(peer_id.clone(), peer_addr.ip(), response_tx); + .new_verification_request(*peer_id, peer_addr.ip(), response_tx); let msg_bytes = verif_req .to_protobuf() @@ -1021,7 +1009,7 @@ pub(crate) fn send_discovery_request_to_addr( ) { log::trace!("Sending discovery request to: {:?}", peer_id); - let disc_req = request_mngr.write().new_discovery_request(peer_id.clone(), response_tx); + let disc_req = request_mngr.write().new_discovery_request(*peer_id, response_tx); let msg_bytes = disc_req .to_protobuf() @@ -1037,7 +1025,9 @@ pub(crate) fn send_discovery_request_to_addr( .expect("error sending discovery request to server"); } +// TODO: revisit dead code // TEMP: for testing purposes +#[allow(dead_code)] pub(crate) fn send_discovery_request_to_all_verified( active_peers: &ActivePeersList, request_mngr: &RequestManager, @@ -1056,7 +1046,7 @@ pub(crate) fn send_discovery_request_to_all_verified( for peer_id in &peers { if peer::is_verified(peer_id, active_peers) { - send_discovery_request_to_peer(&peer_id, active_peers, request_mngr, server_tx, None); + send_discovery_request_to_peer(peer_id, active_peers, request_mngr, server_tx, None); } } } @@ -1067,7 +1057,7 @@ pub(crate) fn send_discovery_request_to_all_verified( fn choose_n_random_peers_from_active_list( active_peers: &ActivePeersList, - mut n: usize, + n: usize, min_verified_count: usize, ) -> Vec { let len = active_peers.read().len(); @@ -1106,23 +1096,15 @@ fn choose_n_random_peers_from_active_list( } } -// pub(crate) fn get_master_peers(master_peers: &MasterPeersList, peerstore: &S) -> Vec { -// let mut peers = Vec::with_capacity(master_peers.read().len()); -// peers.extend( -// master_peers -// .read() -// .iter() -// .filter_map(|peer_id| peerstore.fetch_peer(peer_id)), -// ); -// peers -// } - // Hive.go: returns the verified peer with the given ID, or nil if no such peer exists. // --- +// TODO: revisit dead code +// --- /// Returns `Some(Peer)`, if either the corresponding peer is still verified, or - if it isn't - /// can be successfully reverified. /// /// Note: The function is blocking. +#[allow(dead_code)] pub(crate) async fn get_verified_peer( peer_id: &PeerId, active_peers: &ActivePeersList, @@ -1162,5 +1144,5 @@ pub(crate) fn get_verified_peers(active_peers: &ActivePeersList) -> Vec IpAddr { self.target_addr } @@ -77,6 +79,7 @@ impl VerificationRequest { }) } + #[allow(clippy::wrong_self_convention)] pub fn to_protobuf(&self) -> Result { let ping = proto::Ping { version: self.version, @@ -132,7 +135,9 @@ impl VerificationResponse { &self.services } + // TODO: revisit dead code /// When sent contains the external addr of the remote peer, when received the external addr of the local peer. + #[allow(dead_code)] pub(crate) fn target_addr(&self) -> IpAddr { self.target_addr } @@ -203,6 +208,7 @@ impl DiscoveryRequest { }) } + #[allow(clippy::wrong_self_convention)] pub(crate) fn to_protobuf(&self) -> Result { let discover_request = proto::DiscoveryRequest { timestamp: self.timestamp as i64, @@ -240,6 +246,8 @@ impl DiscoveryResponse { &self.request_hash } + // TODO: revisit dead code + #[allow(dead_code)] pub(crate) fn peers(&self) -> &[Peer] { &self.peers } diff --git a/bee-autopeering/src/discovery/query.rs b/bee-autopeering/src/discovery/query.rs index b04a98fdf2..516a94b0de 100644 --- a/bee-autopeering/src/discovery/query.rs +++ b/bee-autopeering/src/discovery/query.rs @@ -2,13 +2,11 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - command::{Command, CommandTx}, discovery::manager, event::EventTx, local::Local, peer::{ peerlist::{ActivePeer, ActivePeersList, MasterPeersList, ReplacementList}, - peerstore::PeerStore, PeerId, }, request::RequestManager, @@ -17,12 +15,11 @@ use crate::{ }; use rand::{thread_rng, Rng}; -use tokio::sync::mpsc::Receiver; - -use std::{collections::VecDeque, time::Duration}; #[derive(Clone)] pub(crate) struct QueryContext { + // TODO: revisit dead code + #[allow(dead_code)] pub(crate) local: Local, pub(crate) request_mngr: RequestManager, pub(crate) master_peers: MasterPeersList, @@ -74,7 +71,7 @@ fn peer_to_reverify(active_peers: &ActivePeersList) -> Option { if active_peers.read().is_empty() { None } else { - active_peers.read().get_oldest().cloned().map(|p| p.peer_id().clone()) + active_peers.read().get_oldest().cloned().map(|p| *p.peer_id()) } } @@ -122,10 +119,7 @@ fn select_peers_to_query(active_peers: &ActivePeersList) -> Vec { // If we have less than 3 verified peers, then we use those for the query. if verif_peers.len() < 3 { - verif_peers - .into_iter() - .map(|ap| ap.peer_id().clone()) - .collect::>() + verif_peers.into_iter().map(|ap| *ap.peer_id()).collect::>() } else { // Note: this macro is useful to remove some noise from the pattern matching rules. macro_rules! num { @@ -149,7 +143,7 @@ fn select_peers_to_query(active_peers: &ActivePeersList) -> Vec { a + b + c } - let latest = verif_peers.remove(0).peer_id().clone(); + let latest = *verif_peers.remove(0).peer_id(); // Note: This loop finds the three "heaviest" peers with one iteration over an unsorted vec of verified peers. let heaviest3 = verif_peers.into_iter().fold( @@ -192,7 +186,7 @@ fn select_peers_to_query(active_peers: &ActivePeersList) -> Vec { ); let r = thread_rng().gen_range(0..len(&heaviest3)); - let heaviest = match r { + let heaviest = *match r { 0 => heaviest3.0, 1 => heaviest3.1, 2 => heaviest3.2, @@ -200,8 +194,7 @@ fn select_peers_to_query(active_peers: &ActivePeersList) -> Vec { } // Panic: we made sure that the unwrap is always possible. .unwrap() - .peer_id() - .clone(); + .peer_id(); vec![latest, heaviest] } @@ -214,10 +207,7 @@ mod tests { fn create_peerlist_of_size(n: usize) -> ActivePeersList { // Create a set of active peer entries. - let mut entries = (0..n) - .map(|i| Peer::new_test_peer(i as u8)) - .map(|p| ActivePeer::new(p)) - .collect::>(); + let entries = (0..n as u8).map(Peer::new_test_peer).map(ActivePeer::new); // Create a peerlist, and insert the peer entries setting the `last_new_peers` metric // equal to its peerlist index. We also need to set the `verified_count` to at least 1. @@ -254,9 +244,7 @@ mod tests { let peerlist = create_peerlist_of_size(3); macro_rules! equal { - ($a:expr, $b:expr) => {{ - $a == peerlist.read().get($b).unwrap().peer_id() - }}; + ($a:expr, $b:expr) => {{ $a == peerlist.read().get($b).unwrap().peer_id() }}; } let selected = select_peers_to_query(&peerlist); @@ -271,9 +259,7 @@ mod tests { let peerlist = create_peerlist_of_size(10); macro_rules! equal { - ($a:expr, $b:expr) => {{ - $a == peerlist.read().get($b).unwrap().peer_id() - }}; + ($a:expr, $b:expr) => {{ $a == peerlist.read().get($b).unwrap().peer_id() }}; } // 0 1 2 3 4 ... 7 8 9 (index) diff --git a/bee-autopeering/src/event.rs b/bee-autopeering/src/event.rs index 60584e6cb5..aa87fb14c1 100644 --- a/bee-autopeering/src/event.rs +++ b/bee-autopeering/src/event.rs @@ -5,7 +5,7 @@ use crate::{ peer::{Peer, PeerId}, - peering::{manager::Status, neighbor::Distance}, + peering::neighbor::Distance, }; use tokio::sync::mpsc; @@ -76,8 +76,8 @@ impl fmt::Display for Event { "Salts updated => outbound: {}/ inbound: {}.", public_salt_lifetime, private_salt_lifetime, ), - OutgoingPeering { peer, distance } => write!(f, "Peered: {} (outgoing).", peer.peer_id()), - IncomingPeering { peer, distance } => write!(f, "Peered: {} (incoming).", peer.peer_id()), + OutgoingPeering { peer, .. } => write!(f, "Peered: {} (outgoing).", peer.peer_id()), + IncomingPeering { peer, .. } => write!(f, "Peered: {} (incoming).", peer.peer_id()), PeeringDropped { peer_id } => write!(f, "Dropped: {}.", peer_id), } } diff --git a/bee-autopeering/src/init.rs b/bee-autopeering/src/init.rs index c3f91824c8..80903ed0e7 100644 --- a/bee-autopeering/src/init.rs +++ b/bee-autopeering/src/init.rs @@ -4,58 +4,39 @@ //! Autopeering initialization. use crate::{ - command::{Command, CommandTx}, config::AutopeeringConfig, - delay::{self, DelayFactoryBuilder, DelayFactoryMode}, + delay, discovery::{ manager::{ DiscoveryManager, DiscoveryManagerConfig, DEFAULT_QUERY_INTERVAL_SECS, DEFAULT_REVERIFY_INTERVAL_SECS, }, - messages::VerificationRequest, query::{self, QueryContext}, }, event::{self, EventRx}, hash, - local::{ - self, - salt::{Salt, SALT_LIFETIME_SECS}, - services::{ServiceMap, AUTOPEERING_SERVICE_NAME}, - Local, - }, + local::Local, multiaddr, - packet::{IncomingPacket, MessageType, OutgoingPacket}, + packet::IncomingPacket, peer::{ - self, - peerlist::{self, ActivePeersList, MasterPeersList, ReplacementList}, - peerstore::{self, InMemoryPeerStore, PeerStore}, - PeerId, + peerlist::{ActivePeersList, MasterPeersList, ReplacementList}, + peerstore::PeerStore, }, - peering::update::UpdateContext, peering::{ filter::NeighborFilter, manager::{ InboundNeighborhood, OutboundNeighborhood, PeeringManager, PeeringManagerConfig, SaltUpdateContext, SALT_UPDATE_SECS, }, - update::{self, OPEN_OUTBOUND_NBH_UPDATE_SECS}, + update::{self, UpdateContext, OPEN_OUTBOUND_NBH_UPDATE_SECS}, NeighborValidator, }, request::{self, RequestManager, EXPIRED_REQUEST_REMOVAL_INTERVAL_CHECK_SECS}, - server::{server_chan, IncomingPacketSenders, Server, ServerConfig, ServerSocket, ServerTx}, - task::{self, ShutdownRx, TaskManager, MAX_SHUTDOWN_PRIORITY}, + server::{server_chan, IncomingPacketSenders, Server, ServerConfig, ServerSocket}, + task::{TaskManager, MAX_SHUTDOWN_PRIORITY}, time, }; -use std::{ - collections::HashSet, - error, - future::Future, - iter, - net::SocketAddr, - ops::DerefMut as _, - sync::Arc, - time::{Duration, SystemTime}, -}; +use std::{error, future::Future, iter, time::Duration}; const NUM_TASKS: usize = 9; @@ -100,7 +81,7 @@ where log::info!("Protocol_version: {}", version); log::info!( "Public key: {}", - multiaddr::from_pubkey_to_base58(&local.read().public_key()) + multiaddr::from_pubkey_to_base58(local.read().public_key()) ); log::info!("Current time: {}", time::datetime_now()); log::info!("Private salt: {}", private_salt); @@ -157,7 +138,7 @@ where // Create neighborhoods and neighbor candidate filter. let inbound_nbh = InboundNeighborhood::new(); let outbound_nbh = OutboundNeighborhood::new(); - let nb_filter = NeighborFilter::new(local.read().peer_id().clone(), neighbor_validator); + let nb_filter = NeighborFilter::new(*local.read().peer_id(), neighbor_validator); // Create the autopeering manager handling the peering request/response protocol. let peering_config = PeeringManagerConfig::new(&config, version, network_id); @@ -170,7 +151,7 @@ where request_mngr.clone(), active_peers.clone(), event_tx.clone(), - command_tx.clone(), + command_tx, inbound_nbh.clone(), outbound_nbh.clone(), nb_filter.clone(), diff --git a/bee-autopeering/src/lib.rs b/bee-autopeering/src/lib.rs index 9bc349b4c2..a2f82ef2b0 100644 --- a/bee-autopeering/src/lib.rs +++ b/bee-autopeering/src/lib.rs @@ -4,15 +4,18 @@ //! A system that allows peers in the same IOTA network to automatically discover each other. //! ## Example //! -//! In order to integrate the Autopeering functionality in your node implementation you need to provide the following things to its `init` function: +//! In order to integrate the Autopeering functionality in your node implementation you need to provide the following +//! things to its `init` function: //! * an `AutopeeringConfig`; //! * a protocol version (`u32`); //! * a network name, e.g. "chrysalis-mainnet"; -//! * a `Local` entity (either randomly created or from an `Ed25519` keypair), that additionally announces one or more services; +//! * a `Local` entity (either randomly created or from an `Ed25519` keypair), that additionally announces one or more +//! services; //! * a shutdown signal (`Future`); -//! * a peer store, e.g. the `InMemoryPeerStore` (non-persistent) or the `SledPeerStore` (persistent), or a custom peer store implementing the `PeerStore` trait; +//! * a peer store, e.g. the `InMemoryPeerStore` (non-persistent) or the `SledPeerStore` (persistent), or a custom peer +//! store implementing the `PeerStore` trait; //! -//!```no_run +//! ```no_run //! use bee_autopeering::{ //! init, //! peerstore::{SledPeerStore, SledPeerStoreConfig}, @@ -106,7 +109,6 @@ //! ``` #![deny(missing_docs)] -#![allow(warnings)] mod command; mod delay; @@ -132,9 +134,8 @@ pub mod init; pub use config::AutopeeringConfig; pub use event::Event; pub use init::init; -pub use local::services::AUTOPEERING_SERVICE_NAME; pub use local::{ - services::{ServiceMap, ServiceName, ServiceProtocol}, + services::{ServiceMap, ServiceName, ServiceProtocol, AUTOPEERING_SERVICE_NAME}, Local, }; pub use peer::{peer_id, peer_id::PeerId, peerstore, Peer}; diff --git a/bee-autopeering/src/local/mod.rs b/bee-autopeering/src/local/mod.rs index cdaaca21d8..492a0b39ab 100644 --- a/bee-autopeering/src/local/mod.rs +++ b/bee-autopeering/src/local/mod.rs @@ -7,25 +7,15 @@ pub mod services; use salt::Salt; use services::{ServiceMap, ServiceProtocol}; -use crate::{ - delay::DelayFactory, - event::{Event, EventTx}, - hash, - peer::{peer_id::PeerId, peerstore::PeerStore}, - task::{Repeat, ShutdownRx}, - time, -}; +use crate::peer::peer_id::PeerId; use crypto::signatures::ed25519::{PublicKey, SecretKey as PrivateKey, Signature, SECRET_KEY_LENGTH}; -use libp2p_core::identity::{self, ed25519::Keypair}; +use libp2p_core::identity::ed25519::Keypair; use std::{ convert::TryInto, fmt, - hash::{Hash, Hasher}, - iter, sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}, - time::Duration, }; use self::salt::SALT_LIFETIME_SECS; diff --git a/bee-autopeering/src/local/salt.rs b/bee-autopeering/src/local/salt.rs index b401ee4a5f..730fbebf4b 100644 --- a/bee-autopeering/src/local/salt.rs +++ b/bee-autopeering/src/local/salt.rs @@ -51,10 +51,14 @@ impl Salt { self.expiration_time } + // TODO: revisit dead code + #[allow(dead_code)] pub(crate) fn from_protobuf(bytes: &[u8]) -> Result { Ok(proto::Salt::decode(bytes)?.into()) } + // TODO: revisit dead code + #[allow(dead_code)] pub(crate) fn to_protobuf(&self) -> Result { let salt = proto::Salt { bytes: self.bytes.to_vec(), diff --git a/bee-autopeering/src/local/services.rs b/bee-autopeering/src/local/services.rs index ca2017c921..0d4d4599cf 100644 --- a/bee-autopeering/src/local/services.rs +++ b/bee-autopeering/src/local/services.rs @@ -1,12 +1,11 @@ // Copyright 2021 IOTA Stiftung // SPDX-License-Identifier: Apache-2.0 -use crate::{multiaddr::AddressKind, proto}; +use crate::proto; -use libp2p_core::multiaddr::Protocol; use serde::{Deserialize, Serialize}; -use std::{collections::HashMap, convert::TryFrom, fmt, io, net::IpAddr, str::FromStr}; +use std::{collections::HashMap, fmt, io, str::FromStr}; /// Represents the name of a service. pub type ServiceName = String; @@ -20,7 +19,9 @@ pub const AUTOPEERING_SERVICE_NAME: &str = "peering"; pub struct ServiceMap(HashMap); impl ServiceMap { + // TODO: revisit dead code /// Creates a new empty service map. + #[allow(dead_code)] pub(crate) fn new() -> Self { Self::default() } @@ -88,7 +89,6 @@ impl fmt::Display for ServiceMap { .map(|(service_name, service)| format!("{}/{}/{}", service_name, service.protocol, service.port)) .collect::>() .join(";") - .to_string() ) } } @@ -100,6 +100,8 @@ pub struct Service { } impl Service { + // TODO: revisit dead code + #[allow(dead_code)] pub fn protocol(&self) -> ServiceProtocol { self.protocol } diff --git a/bee-autopeering/src/multiaddr.rs b/bee-autopeering/src/multiaddr.rs index 16a61ce783..c9a2c10ea5 100644 --- a/bee-autopeering/src/multiaddr.rs +++ b/bee-autopeering/src/multiaddr.rs @@ -7,7 +7,7 @@ use serde::{ de::{self, Visitor}, Deserialize, Serialize, Serializer, }; -use tokio::net::{lookup_host, ToSocketAddrs}; +use tokio::net::lookup_host; use std::{ fmt, @@ -165,19 +165,15 @@ impl FromStr for AutopeeringMultiaddr { type Err = Error; fn from_str(s: &str) -> Result { - let index = s - .find(AUTOPEERING_MULTIADDR_PROTOCOL_NAME) - .ok_or(Error::InvalidAutopeeringMultiaddr)?; - let parts = s .split_terminator(&format!("/{}/", AUTOPEERING_MULTIADDR_PROTOCOL_NAME)) .collect::>(); if parts.len() != 2 { - return Err(Error::InvalidAutopeeringMultiaddr); + return Err(Error::AutopeeringMultiaddr); } - let address = parts[0].parse().map_err(|_| Error::InvalidHostAddressPart)?; + let address = parts[0].parse().map_err(|_| Error::HostAddressPart)?; let public_key = from_base58_to_pubkey(parts[1]); let resolved_addrs = Vec::new(); @@ -205,6 +201,13 @@ impl<'de> Visitor<'de> for AutopeeringMultiaddrVisitor { Ok(value.parse().expect("failed to parse autopeering multiaddr")) } + fn visit_str(self, value: &str) -> Result + where + E: de::Error, + { + Ok(value.parse().expect("failed to parse autopeering multiaddr")) + } + fn visit_borrowed_str(self, value: &str) -> Result where E: de::Error, @@ -230,17 +233,16 @@ pub(crate) fn from_base58_to_pubkey(base58_pubkey: impl AsRef) -> PublicKey #[derive(Debug)] pub enum Error { /// Returned, if the host address part wasn't a valid multi address. - InvalidHostAddressPart, + HostAddressPart, /// Returned, if the public key part wasn't a base58 encoded ed25519 public key. - InvalidPubKeyPart, + PubKeyPart, /// Returned, if it's not a valid autopeering multi address. - InvalidAutopeeringMultiaddr, + AutopeeringMultiaddr, } #[cfg(test)] mod tests { use super::*; - use std::fmt; // NOTE: example taken from Hornet! // diff --git a/bee-autopeering/src/packet.rs b/bee-autopeering/src/packet.rs index 034b9bf360..00358701d0 100644 --- a/bee-autopeering/src/packet.rs +++ b/bee-autopeering/src/packet.rs @@ -51,7 +51,9 @@ impl Packet { }) } + // TODO: revisit dead code /// Returns the type of this packet. + #[allow(dead_code)] pub(crate) fn message_type(&self) -> Result { num::FromPrimitive::from_u32(self.0.r#type) .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "unknown message type identifier")) diff --git a/bee-autopeering/src/peer/mod.rs b/bee-autopeering/src/peer/mod.rs index 0b59841079..39c48862e4 100644 --- a/bee-autopeering/src/peer/mod.rs +++ b/bee-autopeering/src/peer/mod.rs @@ -12,26 +12,19 @@ pub use peerstore::PeerStore; use peerlist::{ActivePeersList, ReplacementList}; use crate::{ - command::{Command, CommandTx}, - discovery::manager::{self, VERIFICATION_EXPIRATION_SECS}, - local::services::{ServiceMap, ServiceProtocol}, - local::Local, + local::{ + services::{ServiceMap, ServiceProtocol}, + Local, + }, proto, - request::RequestManager, - server::ServerTx, - time::{self, Timestamp}, }; use bytes::BytesMut; use crypto::signatures::ed25519::PublicKey; use prost::{DecodeError, EncodeError, Message}; -use serde::{de::Visitor, ser::SerializeStruct, Deserialize, Serialize, Serializer}; +use serde::{de::Visitor, ser::SerializeStruct, Deserialize, Serialize}; -use std::{ - convert::TryInto, - fmt, - net::{IpAddr, Ipv4Addr}, -}; +use std::{convert::TryInto, fmt, net::IpAddr}; /// Represents a peer. #[derive(Clone)] @@ -61,7 +54,7 @@ impl Peer { /// Returns the public key of this peer. pub fn public_key(&self) -> &PublicKey { - &self.peer_id.public_key() + self.peer_id.public_key() } /// Returns the address of this peer. @@ -180,9 +173,9 @@ impl AsRef for Peer { } } -impl Into for Peer { - fn into(self) -> sled::IVec { - let bytes = bincode::serialize(&self).expect("serialization error"); +impl From for sled::IVec { + fn from(peer: Peer) -> Self { + let bytes = bincode::serialize(&peer).expect("serialization error"); sled::IVec::from_iter(bytes.into_iter()) } } @@ -233,7 +226,7 @@ pub(crate) fn is_known( replacements: &ReplacementList, ) -> bool { // The master list doesn't need to be queried, because those always a subset of the active peers. - peer_id == local.read().peer_id() || active_peers.read().contains(&peer_id) || replacements.read().contains(peer_id) + peer_id == local.read().peer_id() || active_peers.read().contains(peer_id) || replacements.read().contains(peer_id) } // Hive.go: whether the peer has recently done an endpoint proof @@ -250,9 +243,12 @@ pub(crate) fn is_verified(peer_id: &PeerId, active_peers: &ActivePeersList) -> b // Hive.go: whether the given peer has recently verified the local peer // --- +// TODO: revisit dead code +// --- /// Returns whether the corresponding peer sent a (still valid) verification request. /// /// Also returns `false`, if the provided `peer_id` is not found in the active peer list. +#[allow(dead_code)] pub(crate) fn has_verified(peer_id: &PeerId, active_peers: &ActivePeersList) -> bool { active_peers .read() @@ -267,7 +263,7 @@ pub(crate) fn has_verified(peer_id: &PeerId, active_peers: &ActivePeersList) -> /// * Updates the "last_verification_response" timestamp; /// * Increments the "verified" counter; pub(crate) fn set_front_and_update(peer_id: &PeerId, active_peers: &ActivePeersList) -> Option { - if let Some(p) = active_peers.write().set_newest_and_get_mut(&peer_id) { + if let Some(p) = active_peers.write().set_newest_and_get_mut(peer_id) { let metrics = p.metrics_mut(); metrics.set_last_verif_response_timestamp(); let new_count = metrics.increment_verified_count(); diff --git a/bee-autopeering/src/peer/peer_id.rs b/bee-autopeering/src/peer/peer_id.rs index 17460a3c67..3da77640ab 100644 --- a/bee-autopeering/src/peer/peer_id.rs +++ b/bee-autopeering/src/peer/peer_id.rs @@ -3,17 +3,14 @@ //! Peer identities. -use crate::{hash, local::salt::Salt}; +use crate::hash; -use crypto::signatures::ed25519::{PublicKey, SecretKey as PrivateKey, Signature, PUBLIC_KEY_LENGTH}; -use ring::signature::KeyPair; -use serde::{de::Visitor, ser::SerializeStruct, Deserialize, Serialize, Serializer}; +use crypto::signatures::ed25519::{PublicKey, SecretKey as PrivateKey}; +use serde::{de::Visitor, ser::SerializeStruct, Deserialize, Serialize}; use std::{ - convert::TryInto, fmt, hash::{Hash, Hasher}, - sync::{Arc, RwLock}, }; const DISPLAY_LENGTH: usize = 16; @@ -112,19 +109,19 @@ impl AsRef<[u8]> for PeerId { } } -impl Into for PeerId { - fn into(self) -> sled::IVec { - let bytes = self.public_key.to_bytes(); +impl From for sled::IVec { + fn from(peer: PeerId) -> Self { + let bytes = peer.public_key.to_bytes(); sled::IVec::from_iter(bytes.into_iter()) } } -impl Into for PeerId { - fn into(self) -> libp2p_core::PeerId { +impl From for libp2p_core::PeerId { + fn from(peer_id: PeerId) -> Self { let PeerId { - id_bytes: id, + id_bytes: _, public_key, - } = self; + } = peer_id; let public_key = libp2p_core::PublicKey::Ed25519( libp2p_core::identity::ed25519::PublicKey::decode(public_key.as_ref()) diff --git a/bee-autopeering/src/peer/peerlist.rs b/bee-autopeering/src/peer/peerlist.rs index a3623d0064..834b4bae64 100644 --- a/bee-autopeering/src/peer/peerlist.rs +++ b/bee-autopeering/src/peer/peerlist.rs @@ -1,23 +1,18 @@ // Copyright 2021 IOTA Stiftung // SPDX-License-Identifier: Apache-2.0 -use super::{peer_id::PeerId, peerstore::PeerStore, Peer}; +use super::{peer_id::PeerId, Peer}; use crate::{ - command::{Command, CommandTx}, - discovery::{self, manager::VERIFICATION_EXPIRATION_SECS}, - request::RequestManager, - server::ServerTx, - task::{Repeat, Runnable, ShutdownRx}, + discovery::manager::VERIFICATION_EXPIRATION_SECS, time::{self, Timestamp}, }; -use serde::{de::Visitor, ser::SerializeStruct, Deserialize, Serialize, Serializer}; +use serde::{de::Visitor, ser::SerializeStruct, Deserialize, Serialize}; use std::{ - collections::{vec_deque, HashSet, VecDeque}, + collections::{HashSet, VecDeque}, fmt, - ops::{Deref, DerefMut}, sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}, }; @@ -94,9 +89,9 @@ impl AsRef for ActivePeer { } } -impl Into for ActivePeer { - fn into(self) -> sled::IVec { - let bytes = bincode::serialize(&self).expect("serialization error"); +impl From for sled::IVec { + fn from(peer: ActivePeer) -> Self { + let bytes = bincode::serialize(&peer).expect("serialization error"); sled::IVec::from_iter(bytes.into_iter()) } } @@ -174,6 +169,8 @@ pub(crate) struct MasterPeersList { } impl MasterPeersList { + // TODO: revisit dead code + #[allow(dead_code)] pub(crate) fn new(peers: MasterPeersListInner) -> Self { Self { inner: Arc::new(RwLock::new(peers)), @@ -202,7 +199,9 @@ pub(crate) struct PeerMetrics { } impl PeerMetrics { - pub(crate) fn new(peer_id: PeerId) -> Self { + // TODO: revisit dead code + #[allow(dead_code)] + pub(crate) fn new() -> Self { Self { verified_count: 0, last_new_peers: 0, @@ -233,6 +232,8 @@ impl PeerMetrics { self.last_new_peers = last_new_peers; } + // TODO: revisit dead code + #[allow(dead_code)] pub(crate) fn last_verif_request_timestamp(&self) -> Timestamp { self.last_verif_request } @@ -241,6 +242,8 @@ impl PeerMetrics { self.last_verif_request = time::unix_now_secs(); } + // TODO: revisit dead code + #[allow(dead_code)] pub(crate) fn last_verif_response_timestamp(&self) -> Timestamp { self.last_verif_response } @@ -253,6 +256,8 @@ impl PeerMetrics { time::since(self.last_verif_response).expect("system clock error") < VERIFICATION_EXPIRATION_SECS } + // TODO: revisit dead code + #[allow(dead_code)] pub(crate) fn has_verified(&self) -> bool { time::since(self.last_verif_request).expect("system clock error") < VERIFICATION_EXPIRATION_SECS } @@ -274,6 +279,8 @@ impl fmt::Debug for PeerMetrics { pub(crate) struct PeerRing(VecDeque

); impl, const N: usize> PeerRing { + // TODO: revisit dead code + #[allow(dead_code)] pub(crate) fn new() -> Self { Self::default() } @@ -338,6 +345,8 @@ impl, const N: usize> PeerRing { self.0.get_mut(index) } + // TODO: revisit dead code + #[allow(dead_code)] pub(crate) fn get_newest(&self) -> Option<&P> { self.0.get(0) } @@ -346,9 +355,11 @@ impl, const N: usize> PeerRing { self.0.get_mut(0) } + // TODO: revisit dead code /// Moves `peer_id` to the front of the list. /// /// Returns `false` if the `peer_id` is not found in the list, and thus, cannot be made the newest. + #[allow(dead_code)] pub(crate) fn set_newest(&mut self, peer_id: &PeerId) -> bool { if let Some(mid) = self.find_index(peer_id) { if mid > 0 { @@ -361,6 +372,8 @@ impl, const N: usize> PeerRing { } // needs to be atomic + // TODO: revisit dead code + #[allow(dead_code)] pub(crate) fn set_newest_and_get(&mut self, peer_id: &PeerId) -> Option<&P> { if let Some(mid) = self.find_index(peer_id) { if mid > 0 { @@ -388,14 +401,6 @@ impl, const N: usize> PeerRing { self.0.get(self.0.len() - 1) } - pub(crate) fn rotate_backwards(&mut self) { - self.0.rotate_left(1); - } - - pub(crate) fn rotate_forwards(&mut self) { - self.0.rotate_right(1); - } - pub(crate) fn len(&self) -> usize { self.0.len() } @@ -408,8 +413,10 @@ impl, const N: usize> PeerRing { self.0.is_empty() } + // TODO: revisit dead code // TODO: mark as 'const fn' once stable. // Compiler error hints to issue #57563 . + #[allow(dead_code)] pub(crate) fn max_size(&self) -> usize { N } @@ -424,3 +431,20 @@ impl Default for PeerRing { Self(VecDeque::with_capacity(N)) } } + +#[cfg(test)] +mod tests { + use super::*; + + impl, const N: usize> PeerRing { + // TODO: revisit dead code + #[allow(dead_code)] + pub(crate) fn rotate_backwards(&mut self) { + self.0.rotate_left(1); + } + + pub(crate) fn rotate_forwards(&mut self) { + self.0.rotate_right(1); + } + } +} diff --git a/bee-autopeering/src/peer/peerstore.rs b/bee-autopeering/src/peer/peerstore.rs index 78e1c85bd7..77895d6972 100644 --- a/bee-autopeering/src/peer/peerstore.rs +++ b/bee-autopeering/src/peer/peerstore.rs @@ -5,29 +5,15 @@ use super::{ peer_id::PeerId, - peerlist::{ActivePeer, ActivePeersList, PeerMetrics, ReplacementList}, + peerlist::{ActivePeer, ActivePeersList, ReplacementList}, Peer, }; -use crate::{ - delay::DelayFactory, - local::services::AUTOPEERING_SERVICE_NAME, - packet::{MessageType, OutgoingPacket}, - request::RequestManager, - server::ServerTx, - task::ShutdownRx, - time::{self, Timestamp}, -}; - -use sled::{Batch, Db, IVec}; +use sled::{Batch, Db}; use std::{ collections::HashMap, - iter, - net::SocketAddr, - path::{Path, PathBuf}, sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}, - time::Duration, }; const ACTIVE_PEERS_TREE: &str = "active_peers"; @@ -88,7 +74,7 @@ impl InMemoryPeerStore { impl PeerStore for InMemoryPeerStore { type Config = (); - fn new(config: Self::Config) -> Self { + fn new(_: Self::Config) -> Self { Self { inner: Default::default(), } @@ -99,28 +85,28 @@ impl PeerStore for InMemoryPeerStore { let mut write = self.write(); let _ = write.replacements.remove(peer_id); - let _ = write.active_peers.insert(peer_id.clone(), peer); + let _ = write.active_peers.insert(*peer_id, peer); } fn store_all_active(&self, peers: &ActivePeersList) { let read = peers.read(); let mut write = self.write(); for (peer_id, peer) in read.iter().map(|p| (p.peer_id(), p)) { - let _ = write.active_peers.insert(peer_id.clone(), peer.clone()); + let _ = write.active_peers.insert(*peer_id, peer.clone()); } } fn store_replacement(&self, peer: Peer) { let peer_id = peer.peer_id(); let _ = self.write().active_peers.remove(peer_id); - let _ = self.write().replacements.insert(peer_id.clone(), peer); + let _ = self.write().replacements.insert(*peer_id, peer); } fn store_all_replacements(&self, peers: &ReplacementList) { let read = peers.read(); let mut write = self.write(); for (peer_id, peer) in read.iter().map(|p| (p.peer_id(), p)) { - let _ = write.replacements.insert(peer_id.clone(), peer.clone()); + let _ = write.replacements.insert(*peer_id, peer.clone()); } } fn contains(&self, peer_id: &PeerId) -> bool { @@ -165,15 +151,15 @@ impl PeerStore for SledPeerStore { fn new(config: Self::Config) -> Self { let db = config.open().expect("error opening peerstore"); - db.open_tree("active_peers"); - db.open_tree("replacements"); + db.open_tree("active_peers").expect("error opening tree"); + db.open_tree("replacements").expect("error opening tree"); Self { db } } fn store_active(&self, active_peer: ActivePeer) { let tree = self.db.open_tree(ACTIVE_PEERS_TREE).expect("error opening tree"); - let key = active_peer.peer_id().clone(); + let key = *active_peer.peer_id(); tree.insert(key, active_peer).expect("insert error"); } @@ -184,15 +170,15 @@ impl PeerStore for SledPeerStore { active_peers .read() .iter() - .for_each(|p| batch.insert(p.peer_id().clone(), p.clone())); + .for_each(|p| batch.insert(*p.peer_id(), p.clone())); tree.apply_batch(batch).expect("error applying batch"); } fn store_replacement(&self, peer: Peer) { let tree = self.db.open_tree(REPLACEMENTS_TREE).expect("error opening tree"); - let key = peer.peer_id().clone(); + let key = *peer.peer_id(); - tree.insert(key, peer); + tree.insert(key, peer).expect("error inserting peer"); } fn store_all_replacements(&self, replacements: &ReplacementList) { let replacements_tree = self.db.open_tree(REPLACEMENTS_TREE).expect("error opening tree"); @@ -201,7 +187,7 @@ impl PeerStore for SledPeerStore { replacements .read() .iter() - .for_each(|p| batch.insert(p.peer_id().clone(), p.clone())); + .for_each(|p| batch.insert(*p.peer_id(), p.clone())); replacements_tree.apply_batch(batch).expect("error applying batch"); } @@ -211,11 +197,7 @@ impl PeerStore for SledPeerStore { true } else { let tree = self.db.open_tree(REPLACEMENTS_TREE).expect("error opening tree"); - if tree.contains_key(peer_id).expect("db error") { - true - } else { - false - } + tree.contains_key(peer_id).expect("db error") } } fn fetch_active(&self, peer_id: &PeerId) -> Option { @@ -244,7 +226,7 @@ impl PeerStore for SledPeerStore { .map(|(_, ivec)| Peer::from(ivec)) .collect::>() } - fn delete(&self, peer_id: &PeerId) -> bool { + fn delete(&self, _: &PeerId) -> bool { unimplemented!("no need for single entry removal at the moment") } fn delete_all(&self) { diff --git a/bee-autopeering/src/peering/filter.rs b/bee-autopeering/src/peering/filter.rs index 47ba8b1b19..ec94af77bd 100644 --- a/bee-autopeering/src/peering/filter.rs +++ b/bee-autopeering/src/peering/filter.rs @@ -2,12 +2,12 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - peer::{peer_id::PeerId, peerlist::ActivePeer, Peer}, + peer::{peer_id::PeerId, Peer}, NeighborValidator, }; use std::{ - collections::{hash_set, HashSet}, + collections::HashSet, iter, sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}, }; @@ -64,7 +64,9 @@ impl NeighborFilterInner { self.rejected.extend(peers) } + // TODO: revisit dead code /// Removes a single currently rejected peer id. + #[allow(dead_code)] pub(crate) fn remove(&mut self, peer_id: &PeerId) { let _ = self.rejected.remove(peer_id); } @@ -79,14 +81,10 @@ impl NeighborFilterInner { let peer = candidate.as_ref(); let peer_id = peer.peer_id(); - if peer_id == &self.local_id { - false - } else if self.rejected.contains(peer_id) { - false - } else if !self.validator.is_valid(peer) { + if peer_id == &self.local_id || self.rejected.contains(peer_id) { false } else { - true + self.validator.is_valid(peer) } } @@ -95,7 +93,9 @@ impl NeighborFilterInner { candidates.iter().filter(|c| self.ok(*c)).collect::>() } + // TODO: revisit dead code /// Returns an iterator over the rejected peer ids (including the local id). + #[allow(dead_code)] pub(crate) fn iter(&self) -> impl Iterator { iter::once(&self.local_id).chain(self.rejected.iter()) } @@ -103,8 +103,10 @@ impl NeighborFilterInner { #[cfg(test)] mod tests { - use crate::local::services::{ServiceProtocol, AUTOPEERING_SERVICE_NAME}; - use crate::peer::Peer; + use crate::{ + local::services::{ServiceProtocol, AUTOPEERING_SERVICE_NAME}, + peer::Peer, + }; use super::*; diff --git a/bee-autopeering/src/peering/manager.rs b/bee-autopeering/src/peering/manager.rs index 076b48e5cb..cdbbea7a09 100644 --- a/bee-autopeering/src/peering/manager.rs +++ b/bee-autopeering/src/peering/manager.rs @@ -10,29 +10,23 @@ use super::{ use crate::{ command::CommandTx, config::AutopeeringConfig, - delay::ManualDelayFactory, - discovery, event::{Event, EventTx}, - hash, local::{ salt::{self, Salt, SALT_LIFETIME_SECS}, services::AUTOPEERING_SERVICE_NAME, Local, }, packet::{msg_hash, IncomingPacket, MessageType, OutgoingPacket}, - peer::{self, peer_id::PeerId, peerlist::ActivePeersList, peerstore::PeerStore, Peer}, - peering::{ - neighbor::{salt_distance, Neighbor}, - update::OUTBOUND_NBH_UPDATE_INTERVAL, - }, + peer::{self, peer_id::PeerId, peerlist::ActivePeersList, Peer}, + peering::neighbor::{salt_distance, Neighbor}, request::{self, RequestManager, RequestValue, ResponseTx, RESPONSE_TIMEOUT}, server::{ServerSocket, ServerTx}, task::{Repeat, Runnable, ShutdownRx}, - time::{MINUTE, SECOND}, + time::SECOND, NeighborValidator, }; -use std::{fmt, iter, net::SocketAddr, time::Duration, vec}; +use std::{net::SocketAddr, time::Duration}; /// Salt update interval. pub(crate) const SALT_UPDATE_SECS: Duration = Duration::from_secs(SALT_LIFETIME_SECS.as_secs() - SECOND); @@ -45,6 +39,8 @@ pub(crate) type OutboundNeighborhood = Neighborhood; /// Represents the answer of a `PeeringRequest`. Can be either `true` (peering accepted), or `false` (peering denied). pub type Status = bool; +// TODO: revisit dead code +#[allow(dead_code)] #[derive(Debug, thiserror::Error)] pub(crate) enum Error { #[error("response timeout")] @@ -57,6 +53,8 @@ pub(crate) enum Error { InvalidMessage, } +// TODO: revisit dead code +#[allow(dead_code)] pub(crate) struct PeeringManagerConfig { pub(crate) version: u32, pub(crate) network_id: u32, @@ -75,6 +73,8 @@ impl PeeringManagerConfig { pub(crate) struct PeeringManager { // The peering config. + // TODO: revisit dead code + #[allow(dead_code)] config: PeeringManagerConfig, // The local peer. local: Local, @@ -95,6 +95,7 @@ pub(crate) struct PeeringManager { } impl PeeringManager { + #[allow(clippy::too_many_arguments)] pub(crate) fn new( config: PeeringManagerConfig, local: Local, @@ -102,7 +103,8 @@ impl PeeringManager { request_mngr: RequestManager, active_peers: ActivePeersList, event_tx: EventTx, - command_tx: CommandTx, + // TODO: revisit dead code + _command_tx: CommandTx, inbound_nbh: InboundNeighborhood, outbound_nbh: OutboundNeighborhood, nb_filter: NeighborFilter, @@ -130,7 +132,8 @@ impl Runnable for PeeringManager { async fn run(self, mut shutdown_rx: Self::ShutdownSignal) { let PeeringManager { - config, + // TODO: revisit dead code + config: _, local, socket, request_mngr, @@ -141,12 +144,6 @@ impl Runnable for PeeringManager { nb_filter, } = self; - let PeeringManagerConfig { - version, - network_id, - source_addr, - } = config; - let ServerSocket { mut server_rx, server_tx, @@ -291,7 +288,7 @@ fn validate_peering_response(peer_res: &PeeringResponse, ctx: &RecvContext) -> R use ValidationError::*; if let Some(reqv) = ctx.request_mngr.write().pull::(ctx.peer_id) { - if peer_res.request_hash() != &reqv.request_hash { + if peer_res.request_hash() != reqv.request_hash { Err(IncorrectRequestHash) } else { Ok(reqv) @@ -322,6 +319,9 @@ fn handle_peering_request( ) { log::trace!("Handling peering request."); + // TODO: revisit peer salt + let _peer_salt = peer_req.salt(); + let mut status = false; if peer::is_verified(ctx.peer_id, ctx.active_peers) { @@ -358,7 +358,7 @@ fn handle_peering_request( status = true; // Update the neighbor filter. - nb_filter.write().add(peer.peer_id().clone()); + nb_filter.write().add(*peer.peer_id()); // Fire `IncomingPeering` event. publish_peering_event::( @@ -381,14 +381,7 @@ fn handle_peering_request( } // In any case send a response. - send_peering_response_to_addr( - ctx.peer_addr, - ctx.peer_id, - ctx.msg_bytes, - ctx.server_tx, - ctx.local, - status, - ); + send_peering_response_to_addr(ctx.peer_addr, ctx.peer_id, ctx.msg_bytes, ctx.server_tx, status); } fn handle_peering_response( @@ -402,7 +395,7 @@ fn handle_peering_response( let mut status = peer_res.status(); if status { - log::debug!("Peering by {} accepted.", ctx.peer_id); + log::debug!("Peering accepted by {}.", ctx.peer_id); let peer = ctx .active_peers @@ -421,7 +414,7 @@ fn handle_peering_response( // Fire `OutgoingPeering` event with status = `false`. publish_peering_event::( peer.clone(), - false, + status, ctx.local, ctx.event_tx, ctx.inbound_nbh, @@ -430,23 +423,14 @@ fn handle_peering_response( // Drop that peer. send_drop_peering_request_to_peer(peer, ctx.server_tx, ctx.event_tx, ctx.inbound_nbh, ctx.outbound_nbh); + } else if ctx.outbound_nbh.write().insert_neighbor(peer.clone(), ctx.local) { + // Update the neighbor filter. + nb_filter.write().add(*peer.peer_id()); + + // Fire `OutgoingPeering` event with status = `true`. + publish_peering_event::(peer, status, ctx.local, ctx.event_tx, ctx.inbound_nbh, ctx.outbound_nbh); } else { - if ctx.outbound_nbh.write().insert_neighbor(peer.clone(), ctx.local) { - // Update the neighbor filter. - nb_filter.write().add(peer.peer_id().clone()); - - // Fire `OutgoingPeering` event with status = `true`. - publish_peering_event::( - peer, - status, - ctx.local, - ctx.event_tx, - ctx.inbound_nbh, - ctx.outbound_nbh, - ); - } else { - log::debug!("Failed to add neighbor to outbound neighborhood after successful peering request"); - } + log::debug!("Failed to add neighbor to outbound neighborhood after successful peering request"); } } else { log::debug!("Peering by {} denied.", ctx.peer_id); @@ -470,7 +454,7 @@ fn handle_drop_request( if let Some(nb) = ctx.outbound_nbh.write().remove_neighbor(ctx.peer_id) { removed_nb.replace(nb); - nb_filter.write().add(ctx.peer_id.clone()); + nb_filter.write().add(*ctx.peer_id); // TODO: trigger immediate outbound neighborhood update; currently we wait for the next interval } @@ -478,7 +462,7 @@ fn handle_drop_request( if removed_nb.is_some() { send_drop_peering_request_to_addr( ctx.peer_addr, - ctx.peer_id.clone(), + *ctx.peer_id, ctx.server_tx, ctx.event_tx, ctx.inbound_nbh, @@ -560,9 +544,7 @@ pub(crate) fn send_peering_request_to_addr( ) { log::trace!("Sending peering request to: {}", peer_id); - let peer_req = request_mngr - .write() - .new_peering_request(peer_id.clone(), response_tx, local); + let peer_req = request_mngr.write().new_peering_request(*peer_id, response_tx, local); let msg_bytes = peer_req.to_protobuf().expect("error encoding peering request").to_vec(); @@ -581,7 +563,6 @@ pub(crate) fn send_peering_response_to_addr( peer_id: &PeerId, msg_bytes: &[u8], tx: &ServerTx, - local: &Local, status: bool, ) { log::trace!("Sending peering response to: {}", peer_id); @@ -636,11 +617,13 @@ pub(crate) fn send_drop_peering_request_to_addr( .expect("error encoding drop request") .to_vec(); - server_tx.send(OutgoingPacket { - msg_type: MessageType::DropRequest, - msg_bytes, - peer_addr, - }); + server_tx + .send(OutgoingPacket { + msg_type: MessageType::DropRequest, + msg_bytes, + peer_addr, + }) + .expect("error sending drop-peering request to server"); publish_drop_peering_event(peer_id, event_tx, inbound_nbh, outbound_nbh); } @@ -675,11 +658,13 @@ pub(crate) fn publish_peering_event( } }); - event_tx.send(if IS_INBOUND { - Event::IncomingPeering { peer, distance } - } else { - Event::OutgoingPeering { peer, distance } - }); + event_tx + .send(if IS_INBOUND { + Event::IncomingPeering { peer, distance } + } else { + Event::OutgoingPeering { peer, distance } + }) + .expect("error publishing incoming/outgoing peering event"); } fn publish_drop_peering_event( @@ -784,8 +769,8 @@ fn update_salts( nb_filter.write().reset(); } else { // Update the distances with the new salts. - inbound_nbh.write().update_distances(&local); - outbound_nbh.write().update_distances(&local); + inbound_nbh.write().update_distances(local); + outbound_nbh.write().update_distances(local); } log::debug!( @@ -795,10 +780,12 @@ fn update_salts( ); // Publish 'SaltUpdated' event. - event_tx.send(Event::SaltUpdated { - public_salt_lifetime, - private_salt_lifetime, - }); + event_tx + .send(Event::SaltUpdated { + public_salt_lifetime, + private_salt_lifetime, + }) + .expect("error publishing salt-updated event"); } /// Adds a neighbor to a neighborhood. Possibly even replaces the so far furthest neighbor. @@ -826,11 +813,12 @@ pub(crate) fn add_or_replace_neighbor( } } +// TODO: revisit dead code /// Reinitializes the neighbor filter with the current neighborhoods. /// /// Call this function whenever one of the neighborhoods changes. +#[allow(dead_code)] pub(crate) fn refresh_neighbor_filter( - local: &Local, neighbor_filter: &NeighborFilter, inbound_nbh: &InboundNeighborhood, outbound_nbh: &OutboundNeighborhood, @@ -843,7 +831,7 @@ pub(crate) fn refresh_neighbor_filter( .read() .iter() .map(|p| p.peer_id()) - .cloned() - .chain(outbound_nbh.read().iter().map(|p| p.peer_id()).cloned()), + .copied() + .chain(outbound_nbh.read().iter().map(|p| p.peer_id()).copied()), ); } diff --git a/bee-autopeering/src/peering/mod.rs b/bee-autopeering/src/peering/mod.rs index b5f7a1a1ed..f5a5e94893 100644 --- a/bee-autopeering/src/peering/mod.rs +++ b/bee-autopeering/src/peering/mod.rs @@ -9,5 +9,3 @@ pub(crate) mod update; pub use manager::Status; pub use neighbor::{Distance, NeighborValidator}; - -pub(crate) use manager::{PeeringManager, PeeringManagerConfig}; diff --git a/bee-autopeering/src/peering/neighbor.rs b/bee-autopeering/src/peering/neighbor.rs index ef0093dc73..106aaef3e8 100644 --- a/bee-autopeering/src/peering/neighbor.rs +++ b/bee-autopeering/src/peering/neighbor.rs @@ -4,26 +4,22 @@ use crate::{ hash, local::{salt::Salt, Local}, - peer::{peer_id::PeerId, peerlist::ActivePeer, Peer}, - task::{ShutdownRx, MAX_SHUTDOWN_PRIORITY}, + peer::{peer_id::PeerId, Peer}, }; use prost::bytes::{Buf, Bytes}; -use tokio::sync::mpsc::Receiver; use std::{ - cmp, - collections::{BTreeMap, BTreeSet}, - fmt, - ops::Deref, + cmp, fmt, sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}, - time::Duration, vec, }; /// The distance between the local entity and a neighbor. pub type Distance = u32; +// TODO: revisit dead code +#[allow(dead_code)] pub(crate) const MAX_DISTANCE: Distance = 4294967295; pub(crate) const SIZE_INBOUND: usize = 4; pub(crate) const SIZE_OUTBOUND: usize = 4; @@ -61,6 +57,8 @@ impl Neighbor { self.peer } + // TODO: revisit dead code + #[allow(dead_code)] pub(crate) fn into_distance(self) -> Distance { self.distance } @@ -80,7 +78,7 @@ impl PartialEq for Neighbor { } impl PartialOrd for Neighbor { fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(&other)) + Some(self.cmp(other)) } } impl Ord for Neighbor { @@ -120,6 +118,8 @@ pub(crate) struct NeighborhoodInner { } impl NeighborhoodInner { + // TODO: revisit dead code + #[allow(dead_code)] pub(crate) fn new() -> Self { Self::default() } @@ -127,7 +127,7 @@ impl NeighborhoodInner { pub(crate) fn insert_neighbor(&mut self, peer: Peer, local: &Local) -> bool { // If the peer already exists remove it. // NOTE: It's a bit less efficient doing it like this, but the code requires less branching this way. - let _ = self.remove_neighbor(&peer.peer_id()); + let _ = self.remove_neighbor(peer.peer_id()); if self.neighbors.len() >= N { return false; @@ -199,7 +199,7 @@ impl NeighborhoodInner { pub(crate) fn remove_furthest(&mut self) -> Option { // Note: Both methods require unique access to `self`, so we need to clone the peer id. - if let Some(peer_id) = self.find_furthest().map(|d| d.peer().peer_id().clone()) { + if let Some(peer_id) = self.find_furthest().map(|d| *d.peer().peer_id()) { self.remove_neighbor(&peer_id) } else { None @@ -207,16 +207,7 @@ impl NeighborhoodInner { } pub(crate) fn update_distances(&mut self, local: &Local) { - let (local_id, private_salt, public_salt) = { - let guard = local.read(); - - let peer_id_ = guard.peer_id().clone(); - let private_salt_ = guard.private_salt().expect("missing private salt").clone(); - let public_salt_ = guard.public_salt().expect("missing public salt").clone(); - - (peer_id_, private_salt_, public_salt_) - }; - + let local_id = *local.read().peer_id(); let salt = if INBOUND { local.read().private_salt().expect("missing private salt").clone() } else { @@ -224,7 +215,7 @@ impl NeighborhoodInner { }; self.neighbors.iter_mut().for_each(|pd| { - pd.distance = salt_distance(&local_id, &pd.peer().peer_id(), &salt); + pd.distance = salt_distance(&local_id, pd.peer().peer_id(), &salt); }); } @@ -232,6 +223,8 @@ impl NeighborhoodInner { self.neighbors.len() == N } + // TODO: revisit dead code + #[allow(dead_code)] pub(crate) fn is_empty(&self) -> bool { self.neighbors.is_empty() } @@ -296,8 +289,6 @@ fn xor(a: [u8; N], b: [u8; N]) -> [u8; N] { #[cfg(test)] mod tests { - use crypto::signatures::ed25519::SecretKey as PrivateKey; - use super::*; fn distance(peer1: &PeerId, peer2: &PeerId) -> Distance { @@ -310,7 +301,7 @@ mod tests { #[test] fn neighborhood_size_limit() { let local = Local::new(); - let mut outbound_nh = Neighborhood::<2, false>::new(); + let outbound_nh = Neighborhood::<2, false>::new(); for i in 0u8..5 { outbound_nh.write().insert_neighbor(Peer::new_test_peer(i), &local); } diff --git a/bee-autopeering/src/peering/update.rs b/bee-autopeering/src/peering/update.rs index abab64778f..203a89ce42 100644 --- a/bee-autopeering/src/peering/update.rs +++ b/bee-autopeering/src/peering/update.rs @@ -11,9 +11,8 @@ use crate::{ delay::ManualDelayFactory, discovery::manager::get_verified_peers, event::EventTx, - local::{salt, Local}, - peer::{peerlist::ActivePeersList, PeerStore}, - peering::manager::{publish_peering_event, send_drop_peering_request_to_peer}, + local::Local, + peer::peerlist::ActivePeersList, request::RequestManager, server::ServerTx, task::Repeat, @@ -24,8 +23,10 @@ use crate::{ use std::time::Duration; /// Outbound neighborhood update interval if there are remaining slots. +#[allow(clippy::identity_op)] pub(crate) const OPEN_OUTBOUND_NBH_UPDATE_SECS: Duration = Duration::from_secs(1 * SECOND); /// Outbound neighborhood update interval if there are no remaining slots. +#[allow(clippy::identity_op)] const FULL_OUTBOUND_NBH_UPDATE_SECS: Duration = Duration::from_secs(1 * MINUTE); pub(crate) static OUTBOUND_NBH_UPDATE_INTERVAL: ManualDelayFactory = @@ -37,9 +38,13 @@ pub(crate) struct UpdateContext { pub(crate) request_mngr: RequestManager, pub(crate) active_peers: ActivePeersList, pub(crate) nb_filter: NeighborFilter, + // TODO: revisit dead code + #[allow(dead_code)] pub(crate) inbound_nbh: InboundNeighborhood, pub(crate) outbound_nbh: OutboundNeighborhood, pub(crate) server_tx: ServerTx, + // TODO: revisit dead code + #[allow(dead_code)] pub(crate) event_tx: EventTx, } @@ -49,7 +54,7 @@ pub(crate) fn do_update() -> Repeat(ctx: &UpdateContext) { - let local_id = ctx.local.read().peer_id().clone(); + let local_id = *ctx.local.read().peer_id(); let local_salt = ctx.local.read().public_salt().expect("missing public salt").clone(); // TODO: write `get_verified_peers_sorted` which collects verified peers into a BTreeSet @@ -57,7 +62,7 @@ fn update_outbound(ctx: &UpdateContext) { .into_iter() .map(|p| { let peer = p.into_peer(); - let peer_id = peer.peer_id().clone(); + let peer_id = *peer.peer_id(); Neighbor::new(peer, salt_distance(&local_id, &peer_id, &local_salt)) }) .collect::>(); @@ -100,10 +105,10 @@ fn update_outbound(ctx: &UpdateContext) { if status { set_outbound_update_interval(&ctx_.outbound_nbh, &ctx_.local); } else { - ctx_.nb_filter.write().add(candidate.peer_id().clone()); + ctx_.nb_filter.write().add(*candidate.peer_id()); } } else { - ctx_.nb_filter.write().add(candidate.peer_id().clone()); + ctx_.nb_filter.write().add(*candidate.peer_id()); } }); } diff --git a/bee-autopeering/src/request.rs b/bee-autopeering/src/request.rs index e9f8658ae2..19d31bc8ce 100644 --- a/bee-autopeering/src/request.rs +++ b/bee-autopeering/src/request.rs @@ -2,30 +2,25 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - delay::DelayFactory, - discovery::messages::{DiscoveryRequest, DiscoveryResponse, VerificationRequest, VerificationResponse}, + discovery::messages::{DiscoveryRequest, VerificationRequest}, hash, - local::{salt::Salt, Local}, + local::Local, packet::{msg_hash, MessageType}, - peer::{peer_id::PeerId, peerstore::PeerStore}, + peer::peer_id::PeerId, peering::messages::PeeringRequest, - server::ServerTx, - task::{Repeat, ShutdownRx}, + task::Repeat, time::{self, Timestamp}, }; -use num::CheckedAdd; use tokio::sync::oneshot; pub(crate) use oneshot::channel as response_chan; use std::{ - any::{Any, TypeId}, + any::TypeId, collections::HashMap, fmt::Debug, - iter, net::{IpAddr, SocketAddr}, - ops::DerefMut, sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}, time::Duration, }; @@ -49,7 +44,7 @@ pub(crate) struct RequestKey { } pub(crate) struct RequestValue { - pub(crate) request_hash: [u8; hash::SHA256_LEN], + pub(crate) request_hash: RequestHash, pub(crate) expiration_time: u64, pub(crate) response_tx: Option, } @@ -185,7 +180,7 @@ impl RequestManagerInner { pub(crate) fn pull(&mut self, peer_id: &PeerId) -> Option { // TODO: prevent the clone? let key = RequestKey { - peer_id: peer_id.clone(), + peer_id: *peer_id, request_id: TypeId::of::(), }; diff --git a/bee-autopeering/src/server.rs b/bee-autopeering/src/server.rs index 36a4212297..b247b21dfe 100644 --- a/bee-autopeering/src/server.rs +++ b/bee-autopeering/src/server.rs @@ -4,7 +4,6 @@ use crate::{ config::AutopeeringConfig, local::Local, - multiaddr, packet::{ IncomingPacket, MessageType, OutgoingPacket, Packet, DISCOVERY_MSG_TYPE_RANGE, MAX_PACKET_SIZE, PEERING_MSG_TYPE_RANGE, @@ -13,10 +12,7 @@ use crate::{ task::{Runnable, ShutdownRx, TaskManager}, }; -use tokio::{ - net::UdpSocket, - sync::mpsc::{self, error::SendError}, -}; +use tokio::{net::UdpSocket, sync::mpsc}; use std::{net::SocketAddr, sync::Arc}; @@ -243,7 +239,7 @@ impl Runnable for OutgoingPacketHandler { let marshalled_bytes = marshal(msg_type, &msg_bytes); let signature = local.read().sign(&marshalled_bytes); - let packet = Packet::new(msg_type, &marshalled_bytes, &local.read().public_key(), signature); + let packet = Packet::new(msg_type, &marshalled_bytes, local.read().public_key(), signature); let bytes = packet.to_protobuf().expect("error encoding outgoing packet"); @@ -268,7 +264,7 @@ impl Runnable for OutgoingPacketHandler { pub(crate) fn marshal(msg_type: MessageType, msg_bytes: &[u8]) -> Vec { let mut marshalled_bytes = vec![0u8; msg_bytes.len() + 1]; marshalled_bytes[0] = msg_type as u8; - marshalled_bytes[1..].copy_from_slice(&msg_bytes); + marshalled_bytes[1..].copy_from_slice(msg_bytes); marshalled_bytes } diff --git a/bee-autopeering/src/task.rs b/bee-autopeering/src/task.rs index f86fca2353..f191e959aa 100644 --- a/bee-autopeering/src/task.rs +++ b/bee-autopeering/src/task.rs @@ -12,23 +12,18 @@ use crate::{ use priority_queue::PriorityQueue; use tokio::{sync::oneshot, task::JoinHandle, time}; -use std::{ - any::Any, - collections::{HashMap, HashSet}, - future::Future, - hash::Hasher, - sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}, - time::Duration, -}; +use std::{collections::HashMap, future::Future, time::Duration}; pub(crate) const MAX_SHUTDOWN_PRIORITY: u8 = 255; +// TODO: revisit dead code +#[allow(dead_code)] pub(crate) const MIN_SHUTDOWN_PRIORITY: u8 = 0; const SHUTDOWN_TIMEOUT_SECS: Duration = Duration::from_secs(5 * SECOND); pub(crate) type ShutdownRx = oneshot::Receiver<()>; type ShutdownTx = oneshot::Sender<()>; -pub(crate) type Repeat = Box Fn(&'a T) + Send>; +pub(crate) type Repeat = Box Fn(&'a T) + Send>; /// Represents types driving an event loop. #[async_trait::async_trait] @@ -79,7 +74,9 @@ impl TaskManager { self.shutdown_order.push(R::NAME.into(), R::SHUTDOWN_PRIORITY); } + // TODO: revisit dead code /// Returns a shutdown signal handle registrees can use to stop their event loops. + #[allow(dead_code)] pub(crate) fn shutdown_rx(&mut self, name: &str, priority: u8) -> ShutdownRx { let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); self.shutdown_senders.insert(name.into(), shutdown_tx); @@ -100,7 +97,7 @@ impl TaskManager { self.shutdown_senders.insert(name.into(), shutdown_tx); let handle = tokio::spawn(async move { - while let Some(duration) = delay.next() { + for duration in &mut delay { tokio::select! { _ = &mut shutdown_rx => break, _ = time::sleep(duration) => cmd(&ctx), @@ -129,7 +126,7 @@ impl TaskManager { // Send the shutdown signal to all receivers. // TODO: clone necessary? let mut shutdown_order_clone = shutdown_order.clone(); - while let Some((task_name, prio)) = shutdown_order_clone.pop() { + while let Some((task_name, _)) = shutdown_order_clone.pop() { let shutdown_tx = shutdown_senders.remove(&task_name).unwrap(); log::trace!("Shutting down: {}", task_name); @@ -138,7 +135,7 @@ impl TaskManager { // Wait for all tasks to shutdown down in a certain order and maximum amount of time. time::timeout(SHUTDOWN_TIMEOUT_SECS, async { - while let Some((task_name, prio)) = shutdown_order.pop() { + while let Some((task_name, _)) = shutdown_order.pop() { // Panic: unwrapping is fine, because we are in control of the data. let task_handle = runnable_handles.remove(&task_name).unwrap(); @@ -152,7 +149,8 @@ impl TaskManager { } } }) - .await; + .await + .expect("error awaiting shutdown"); log::debug!("Flushing data to peerstore..."); diff --git a/bee-autopeering/src/time.rs b/bee-autopeering/src/time.rs index bc5c599770..0f4f41cbd2 100644 --- a/bee-autopeering/src/time.rs +++ b/bee-autopeering/src/time.rs @@ -3,8 +3,6 @@ use std::time::{SystemTime, UNIX_EPOCH}; -pub(crate) use tokio::time::sleep; - pub(crate) type Timestamp = u64; pub(crate) type Timespan = u64;