Skip to content

Commit

Permalink
feat(bitswap): add validate_peer callback to bitswap request manager (
Browse files Browse the repository at this point in the history
#3716)

Co-authored-by: Hubert <[email protected]>
  • Loading branch information
hanabi1224 and LesnyRumcajs authored Nov 28, 2023
1 parent 9ebf9f8 commit 19ffaa8
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 45 deletions.
15 changes: 7 additions & 8 deletions src/chain_sync/chain_muxer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ where
) {
// Query the heaviest TipSet from the store
let heaviest = chain_store.heaviest_tipset();
if network.peer_manager().is_peer_new(&peer_id).await {
if network.peer_manager().is_peer_new(&peer_id) {
// Since the peer is new, send them a hello request
let request = HelloRequest {
heaviest_tip_set: heaviest.cids(),
Expand All @@ -278,17 +278,17 @@ where
// Update the peer metadata based on the response
match response {
Some(_) => {
network.peer_manager().log_success(peer_id, dur).await;
network.peer_manager().log_success(peer_id, dur);
}
None => {
network.peer_manager().log_failure(peer_id, dur).await;
network.peer_manager().log_failure(peer_id, dur);
}
}
}
}

async fn handle_peer_disconnected_event(network: SyncNetworkContext<DB>, peer_id: PeerId) {
network.peer_manager().remove_peer(&peer_id).await;
network.peer_manager().remove_peer(&peer_id);
}

async fn gossipsub_block_to_full_tipset(
Expand All @@ -313,14 +313,14 @@ where
let bls_messages: Vec<_> = block
.bls_messages
.into_iter()
.map(|m| network.bitswap_get::<Message>(m))
.map(|m| network.bitswap_get::<Message>(m, Some(epoch)))
.collect();

// Get secp_messages in the store or over Bitswap
let secp_messages: Vec<_> = block
.secpk_messages
.into_iter()
.map(|m| network.bitswap_get::<SignedMessage>(m))
.map(|m| network.bitswap_get::<SignedMessage>(m, Some(epoch)))
.collect();

let (bls_messages, secp_messages) =
Expand Down Expand Up @@ -511,8 +511,7 @@ where
// Update the peer head
network
.peer_manager()
.update_peer_head(source, Arc::new(tipset.clone().into_tipset()))
.await;
.update_peer_head(source, Arc::new(tipset.clone().into_tipset()));
metrics::PEER_TIPSET_EPOCH
.with_label_values(&[source.to_string().as_str()])
.set(tipset.epoch());
Expand Down
14 changes: 8 additions & 6 deletions src/chain_sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ where
pub async fn bitswap_get<TMessage: DeserializeOwned>(
&self,
content: Cid,
epoch: Option<i64>,
) -> Result<TMessage, String> {
// Check if what we are fetching over Bitswap already exists in the
// database. If it does, return it, else fetch over the network.
Expand All @@ -191,6 +192,7 @@ where
.send_async(NetworkMessage::BitswapRequest {
cid: content,
response_channel: tx,
epoch,
})
.await
.map_err(|_| "failed to send bitswap request, network receiver dropped")?;
Expand Down Expand Up @@ -246,7 +248,7 @@ where
None => {
// No specific peer set, send requests to a shuffled set of top peers until
// a request succeeds.
let peers = self.peer_manager.top_peers_shuffled().await;
let peers = self.peer_manager.top_peers_shuffled();

let mut batch = RaceBatch::new(MAX_CONCURRENT_CHAIN_EXCHANGE_REQUESTS);
for peer_id in peers.into_iter() {
Expand Down Expand Up @@ -306,7 +308,7 @@ where

// Log success for the global request with the latency from before sending.
match SystemTime::now().duration_since(global_pre_time) {
Ok(t) => self.peer_manager.log_global_success(t).await,
Ok(t) => self.peer_manager.log_global_success(t),
Err(e) => {
warn!("logged time less than before request: {}", e);
}
Expand Down Expand Up @@ -350,7 +352,7 @@ where
match res {
Ok(Ok(Ok(bs_res))) => {
// Successful response
peer_manager.log_success(peer_id, res_duration).await;
peer_manager.log_success(peer_id, res_duration);
debug!("Succeeded: ChainExchange Request to {peer_id}");
Ok(bs_res)
}
Expand All @@ -360,12 +362,12 @@ where
RequestResponseError::ConnectionClosed
| RequestResponseError::DialFailure
| RequestResponseError::UnsupportedProtocols => {
peer_manager.mark_peer_bad(peer_id).await;
peer_manager.mark_peer_bad(peer_id);
}
// Ignore dropping peer on timeout for now. Can't be confident yet that the
// specified timeout is adequate time.
RequestResponseError::Timeout | RequestResponseError::Io(_) => {
peer_manager.log_failure(peer_id, res_duration).await;
peer_manager.log_failure(peer_id, res_duration);
}
}
debug!("Failed: ChainExchange Request to {peer_id}");
Expand All @@ -374,7 +376,7 @@ where
Ok(Err(_)) | Err(_) => {
// Sender channel internally dropped or timeout, both should log failure which
// will negatively score the peer, but not drop yet.
peer_manager.log_failure(peer_id, res_duration).await;
peer_manager.log_failure(peer_id, res_duration);
debug!("Timeout: ChainExchange Request to {peer_id}");
Err(format!("Chain exchange request to {peer_id} timed out"))
}
Expand Down
50 changes: 29 additions & 21 deletions src/libp2p/peer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use std::{
use crate::blocks::Tipset;
use ahash::{HashMap, HashSet};
use flume::{Receiver, Sender};
use parking_lot::RwLock;
use rand::seq::SliceRandom;
use tokio::sync::RwLock;
use tracing::{debug, trace, warn};

use crate::libp2p::*;
Expand Down Expand Up @@ -74,7 +74,7 @@ pub struct PeerManager {
/// Peer operation receiver
peer_ops_rx: Receiver<PeerOperation>,
/// Peer ban list, key is peer id, value is expiration time
peer_ban_list: RwLock<HashMap<PeerId, Option<Instant>>>,
peer_ban_list: tokio::sync::RwLock<HashMap<PeerId, Option<Instant>>>,
}

impl Default for PeerManager {
Expand All @@ -93,8 +93,8 @@ impl Default for PeerManager {
impl PeerManager {
/// Updates peer's heaviest tipset. If the peer does not exist in the set, a
/// new `PeerInfo` will be generated.
pub async fn update_peer_head(&self, peer_id: PeerId, ts: Arc<Tipset>) {
let mut peers = self.peers.write().await;
pub fn update_peer_head(&self, peer_id: PeerId, ts: Arc<Tipset>) {
let mut peers = self.peers.write();
trace!("Updating head for PeerId {}", &peer_id);
if let Some(pi) = peers.full_peers.get_mut(&peer_id) {
pi.head = Some(ts);
Expand All @@ -104,17 +104,26 @@ impl PeerManager {
}
}

/// Gets the head epoch of a peer
pub fn get_peer_head_epoch(&self, peer_id: &PeerId) -> Option<i64> {
let peers = self.peers.read();
peers
.full_peers
.get(peer_id)
.and_then(|pi| pi.head.as_ref().map(|ts| ts.epoch()))
}

/// Returns true if peer is not marked as bad or not already in set.
pub async fn is_peer_new(&self, peer_id: &PeerId) -> bool {
let peers = self.peers.read().await;
pub fn is_peer_new(&self, peer_id: &PeerId) -> bool {
let peers = self.peers.read();
!peers.bad_peers.contains(peer_id) && !peers.full_peers.contains_key(peer_id)
}

/// Sort peers based on a score function with the success rate and latency
/// of requests.
pub(in crate::libp2p) async fn sorted_peers(&self) -> Vec<PeerId> {
let peer_lk = self.peers.read().await;
let average_time = self.avg_global_time.read().await;
pub(in crate::libp2p) fn sorted_peers(&self) -> Vec<PeerId> {
let peer_lk = self.peers.read();
let average_time = self.avg_global_time.read();
let mut peers: Vec<_> = peer_lk
.full_peers
.iter()
Expand All @@ -138,10 +147,9 @@ impl PeerManager {

/// Return shuffled slice of ordered peers from the peer manager. Ordering
/// is based on failure rate and latency of the peer.
pub async fn top_peers_shuffled(&self) -> Vec<PeerId> {
pub fn top_peers_shuffled(&self) -> Vec<PeerId> {
let mut peers: Vec<_> = self
.sorted_peers()
.await
.into_iter()
.take(SHUFFLE_PEERS_PREFIX)
.collect();
Expand All @@ -154,9 +162,9 @@ impl PeerManager {

/// Logs a global request success. This just updates the average for the
/// peer manager.
pub async fn log_global_success(&self, dur: Duration) {
pub fn log_global_success(&self, dur: Duration) {
debug!("logging global success");
let mut avg_global = self.avg_global_time.write().await;
let mut avg_global = self.avg_global_time.write();
if *avg_global == Duration::default() {
*avg_global = dur;
} else if dur < *avg_global {
Expand All @@ -170,9 +178,9 @@ impl PeerManager {

/// Logs a success for the given peer, and updates the average request
/// duration.
pub async fn log_success(&self, peer: PeerId, dur: Duration) {
pub fn log_success(&self, peer: PeerId, dur: Duration) {
debug!("logging success for {:?}", peer);
let mut peers = self.peers.write().await;
let mut peers = self.peers.write();
// Attempt to remove the peer and decrement bad peer count
if peers.bad_peers.remove(&peer) {
metrics::BAD_PEERS.dec();
Expand All @@ -188,9 +196,9 @@ impl PeerManager {

/// Logs a failure for the given peer, and updates the average request
/// duration.
pub async fn log_failure(&self, peer: PeerId, dur: Duration) {
pub fn log_failure(&self, peer: PeerId, dur: Duration) {
debug!("logging failure for {:?}", peer);
let mut peers = self.peers.write().await;
let mut peers = self.peers.write();
if !peers.bad_peers.contains(&peer) {
metrics::PEER_FAILURE_TOTAL.inc();
if !peers.full_peers.contains_key(&peer) {
Expand All @@ -204,8 +212,8 @@ impl PeerManager {

/// Removes a peer from the set and returns true if the value was present
/// previously
pub async fn mark_peer_bad(&self, peer_id: PeerId) -> bool {
let mut peers = self.peers.write().await;
pub fn mark_peer_bad(&self, peer_id: PeerId) -> bool {
let mut peers = self.peers.write();
let removed = remove_peer(&mut peers, &peer_id);
if removed {
metrics::FULL_PEERS.dec();
Expand All @@ -221,8 +229,8 @@ impl PeerManager {
}

/// Remove peer from managed set, does not mark as bad
pub async fn remove_peer(&self, peer_id: &PeerId) -> bool {
let mut peers = self.peers.write().await;
pub fn remove_peer(&self, peer_id: &PeerId) -> bool {
let mut peers = self.peers.write();
debug!("removed peer {}", peer_id);
let removed = remove_peer(&mut peers, peer_id);
if removed {
Expand Down
31 changes: 27 additions & 4 deletions src/libp2p/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use std::{
};

use crate::libp2p_bitswap::{
request_manager::BitswapRequestManager, BitswapStoreRead, BitswapStoreReadWrite,
request_manager::{BitswapRequestManager, ValidatePeerCallback},
BitswapStoreRead, BitswapStoreReadWrite,
};
use crate::message::SignedMessage;
use crate::{blocks::GossipBlock, rpc_api::net_api::NetInfoResult};
Expand Down Expand Up @@ -158,6 +159,7 @@ pub enum NetworkMessage {
BitswapRequest {
cid: Cid,
response_channel: flume::Sender<bool>,
epoch: Option<i64>,
},
JSONRPCRequest {
method: NetRPCMethods,
Expand Down Expand Up @@ -320,7 +322,8 @@ where
self.cs.clone(),
bitswap_request_manager.clone(),
message,
&self.network_sender_out).await;
&self.network_sender_out,
&self.peer_manager).await;
}
None => { break; }
},
Expand Down Expand Up @@ -383,6 +386,7 @@ async fn handle_network_message(
bitswap_request_manager: Arc<BitswapRequestManager>,
message: NetworkMessage,
network_sender_out: &Sender<NetworkEvent>,
peer_manager: &Arc<PeerManager>,
) {
match message {
NetworkMessage::PubsubMessage { topic, message } => {
Expand Down Expand Up @@ -425,8 +429,27 @@ async fn handle_network_message(
NetworkMessage::BitswapRequest {
cid,
response_channel,
epoch,
} => {
bitswap_request_manager.get_block(store, cid, BITSWAP_TIMEOUT, Some(response_channel));
let peer_validator: Option<Arc<ValidatePeerCallback>> = if let Some(epoch) = epoch {
let peer_manager = Arc::clone(peer_manager);
Some(Arc::new(move |peer| {
peer_manager
.get_peer_head_epoch(&peer)
.map(|peer_head_epoch| peer_head_epoch >= epoch)
.unwrap_or_default()
}))
} else {
None
};

bitswap_request_manager.get_block(
store,
cid,
BITSWAP_TIMEOUT,
Some(response_channel),
peer_validator,
);
}
NetworkMessage::JSONRPCRequest { method } => {
match method {
Expand Down Expand Up @@ -648,7 +671,7 @@ async fn handle_hello_event(
error: _,
} => {
hello.on_outbound_failure(&request_id);
peer_manager.mark_peer_bad(peer).await;
peer_manager.mark_peer_bad(peer);
}
request_response::Event::InboundFailure { .. } => {}
request_response::Event::ResponseSent { .. } => (),
Expand Down
Loading

0 comments on commit 19ffaa8

Please sign in to comment.