Skip to content

Commit

Permalink
Merge pull request #570 from kinode-dao/release-candidate
Browse files Browse the repository at this point in the history
hotfix: in net, use AtomicU64 in Peers to respect fd_manager limits
  • Loading branch information
dr-frmr authored Oct 11, 2024
2 parents 76c312a + 2c6c3e2 commit ed41dd4
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 36 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "kinode_lib"
authors = ["KinodeDAO"]
version = "0.9.5"
version = "0.9.6"
edition = "2021"
description = "A general-purpose sovereign cloud computing platform"
homepage = "https://kinode.org"
Expand Down
2 changes: 1 addition & 1 deletion kinode/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "kinode"
authors = ["KinodeDAO"]
version = "0.9.5"
version = "0.9.6"
edition = "2021"
description = "A general-purpose sovereign cloud computing platform"
homepage = "https://kinode.org"
Expand Down
49 changes: 32 additions & 17 deletions kinode/src/net/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,40 @@ use tokio::sync::mpsc;

/// if target is a peer, queue to be routed
/// otherwise, create peer and initiate routing
pub async fn send_to_peer(ext: &IdentityExt, data: &NetData, km: KernelMessage) {
pub async fn send_to_peer(ext: &IdentityExt, data: &NetData, mut km: KernelMessage) {
if let Some(mut peer) = data.peers.get_mut(&km.target.node) {
peer.sender.send(km).expect("net: peer sender was dropped");
peer.set_last_message();
} else {
let Some(peer_id) = data.pki.get(&km.target.node) else {
return utils::error_offline(km, &ext.network_error_tx).await;
};
let (mut peer, peer_rx) = Peer::new(peer_id.clone(), false);
// send message to be routed
peer.send(km);
data.peers.insert(peer_id.name.clone(), peer).await;
tokio::spawn(connect_to_peer(
ext.clone(),
data.clone(),
peer_id.clone(),
peer_rx,
));
match peer.send(km) {
Ok(()) => {
peer.set_last_message();
return;
}
Err(e_km) => {
// peer connection was closed, remove it and try to reconnect
data.peers.remove(&peer.identity.name).await;
km = e_km.0;
}
}
}
let Some(peer_id) = data.pki.get(&km.target.node) else {
return utils::error_offline(km, &ext.network_error_tx).await;
};
let (mut peer, peer_rx) = Peer::new(peer_id.clone(), false);
// send message to be routed
match peer.send(km) {
Ok(()) => {
peer.set_last_message();
}
Err(e_km) => {
return utils::error_offline(e_km.0, &ext.network_error_tx).await;
}
};
data.peers.insert(peer_id.name.clone(), peer).await;
tokio::spawn(connect_to_peer(
ext.clone(),
data.clone(),
peer_id.clone(),
peer_rx,
));
}

/// based on peer's identity, either use one of their
Expand Down
10 changes: 5 additions & 5 deletions kinode/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ pub async fn networking(
peers,
pending_passthroughs,
active_passthroughs,
max_peers,
max_passthroughs,
fds_limit: 10, // small hardcoded limit that gets replaced by fd_manager soon after boot
};
Expand Down Expand Up @@ -212,7 +211,7 @@ async fn handle_local_request(
printout.push_str(&format!(
"we have connections with {} peers ({} max):\r\n",
data.peers.peers().len(),
data.max_peers,
data.peers.max_peers(),
));

let now = std::time::SystemTime::now()
Expand Down Expand Up @@ -342,16 +341,17 @@ async fn handle_fdman(km: &KernelMessage, request_body: &[u8], data: &mut NetDat
match req {
lib::core::FdManagerRequest::FdsLimit(fds_limit) => {
data.fds_limit = fds_limit;
if data.max_peers > fds_limit {
data.max_peers = fds_limit;
}
data.peers.set_max_peers(fds_limit);
// TODO combine with max_peers check
// only update passthrough limit if it's higher than the new fds limit
// most nodes have passthroughs disabled, meaning this will keep it at 0
if data.max_passthroughs > fds_limit {
data.max_passthroughs = fds_limit;
}
// TODO cull passthroughs too
if data.peers.peers().len() >= data.fds_limit as usize {
let diff = data.peers.peers().len() - data.fds_limit as usize;
println!("net: culling {diff} peer(s)\r\n");
data.peers.cull(diff).await;
}
}
Expand Down
29 changes: 21 additions & 8 deletions kinode/src/net/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use {
dashmap::DashMap,
ring::signature::Ed25519KeyPair,
serde::{Deserialize, Serialize},
std::sync::atomic::AtomicU64,
std::sync::Arc,
tokio::net::TcpStream,
tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender},
Expand Down Expand Up @@ -57,15 +58,15 @@ pub struct RoutingRequest {

#[derive(Clone)]
pub struct Peers {
max_peers: u64,
max_peers: Arc<AtomicU64>,
send_to_loop: MessageSender,
peers: Arc<DashMap<String, Peer>>,
}

impl Peers {
pub fn new(max_peers: u64, send_to_loop: MessageSender) -> Self {
Self {
max_peers,
max_peers: Arc::new(max_peers.into()),
send_to_loop,
peers: Arc::new(DashMap::new()),
}
Expand All @@ -75,6 +76,15 @@ impl Peers {
&self.peers
}

pub fn max_peers(&self) -> u64 {
self.max_peers.load(std::sync::atomic::Ordering::Relaxed)
}

pub fn set_max_peers(&self, max_peers: u64) {
self.max_peers
.store(max_peers, std::sync::atomic::Ordering::Relaxed);
}

pub fn get(&self, name: &str) -> Option<dashmap::mapref::one::Ref<'_, String, Peer>> {
self.peers.get(name)
}
Expand All @@ -94,15 +104,15 @@ impl Peers {
/// remove the one with the oldest last_message.
pub async fn insert(&self, name: String, peer: Peer) {
self.peers.insert(name, peer);
if self.peers.len() > self.max_peers as usize {
if self.peers.len() as u64 > self.max_peers.load(std::sync::atomic::Ordering::Relaxed) {
let oldest = self
.peers
.iter()
.min_by_key(|p| p.last_message)
.unwrap()
.key()
.clone();
self.peers.remove(&oldest);
self.remove(&oldest).await;
crate::fd_manager::send_fd_manager_hit_fds_limit(
&Address::new("our", NET_PROCESS_ID.clone()),
&self.send_to_loop,
Expand All @@ -122,7 +132,7 @@ impl Peers {
sorted_peers.sort_by_key(|p| p.last_message);
to_remove.extend(sorted_peers.iter().take(n));
for peer in to_remove {
self.peers.remove(&peer.identity.name);
self.remove(&peer.identity.name).await;
}
crate::fd_manager::send_fd_manager_hit_fds_limit(
&Address::new("our", NET_PROCESS_ID.clone()),
Expand Down Expand Up @@ -189,9 +199,13 @@ impl Peer {
}

/// Send a message to the peer.
pub fn send(&mut self, km: KernelMessage) {
self.sender.send(km).expect("net: peer sender was dropped");
pub fn send(
&mut self,
km: KernelMessage,
) -> Result<(), tokio::sync::mpsc::error::SendError<KernelMessage>> {
self.sender.send(km)?;
self.set_last_message();
Ok(())
}

/// Update the last message time to now.
Expand Down Expand Up @@ -222,7 +236,6 @@ pub struct NetData {
pub pending_passthroughs: PendingPassthroughs,
/// only used by routers
pub active_passthroughs: ActivePassthroughs,
pub max_peers: u64,
pub max_passthroughs: u64,
pub fds_limit: u64,
}
2 changes: 1 addition & 1 deletion lib/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "lib"
authors = ["KinodeDAO"]
version = "0.9.5"
version = "0.9.6"
edition = "2021"
description = "A general-purpose sovereign cloud computing platform"
homepage = "https://kinode.org"
Expand Down

0 comments on commit ed41dd4

Please sign in to comment.