Skip to content

Commit

Permalink
Merge pull request #2498 from subspace/connection-limits-gemini-3g-ba…
Browse files Browse the repository at this point in the history
…ckport

Gemini 3g backport: improve the default connection limit management for DSN
  • Loading branch information
nazar-pc authored Feb 1, 2024
2 parents 5014213 + ba2bce9 commit 6c2302f
Show file tree
Hide file tree
Showing 9 changed files with 55 additions and 392 deletions.
2 changes: 1 addition & 1 deletion crates/subspace-farmer/src/single_disk_farm/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const FARMER_APP_INFO_RETRY_INTERVAL: Duration = Duration::from_millis(500);
/// Size of the cache of archived segments for the purposes of faster sector expiration checks.
const ARCHIVED_SEGMENTS_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(1000).expect("Not zero; qed");
/// Get piece retry attempts number.
const PIECE_GETTER_RETRY_NUMBER: NonZeroU16 = NonZeroU16::new(4).expect("Not zero; qed");
const PIECE_GETTER_RETRY_NUMBER: NonZeroU16 = NonZeroU16::new(7).expect("Not zero; qed");

/// Details about sector currently being plotted
#[derive(Debug, Clone, Encode, Decode)]
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-networking/src/constructor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ const SPECIAL_CONNECTED_PEERS_PROTOCOL_LOG_TARGET: &str = "special-connected-pee
/// It must be set for large plots.
const SWARM_MAX_NEGOTIATING_INBOUND_STREAMS: usize = 100000;
/// How long will connection be allowed to be open without any usage
const IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(10);
const IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(3);
/// The default maximum established incoming connection number for the swarm.
const SWARM_MAX_ESTABLISHED_INCOMING_CONNECTIONS: u32 = 100;
/// The default maximum established incoming connection number for the swarm.
Expand Down
36 changes: 25 additions & 11 deletions crates/subspace-networking/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crate::protocols::request_response::request_response_factory;
pub use crate::shared::NewPeerInfo;
use crate::shared::{Command, CreatedSubscription, PeerDiscovered, Shared};
use crate::utils::multihash::Multihash;
use crate::utils::rate_limiter::RateLimiterPermit;
use crate::utils::HandlerFn;
use bytes::Bytes;
use event_listener_primitives::HandlerId;
Expand All @@ -18,6 +17,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use thiserror::Error;
use tokio::sync::OwnedSemaphorePermit;
use tracing::{debug, error, trace};

/// Topic subscription, will unsubscribe when last instance is dropped for a particular topic.
Expand All @@ -29,7 +29,7 @@ pub struct TopicSubscription {
command_sender: Option<mpsc::Sender<Command>>,
#[pin]
receiver: mpsc::UnboundedReceiver<Bytes>,
_permit: RateLimiterPermit,
_permit: OwnedSemaphorePermit,
}

impl Stream for TopicSubscription {
Expand Down Expand Up @@ -266,7 +266,7 @@ impl Node {
&self,
key: Multihash,
) -> Result<impl Stream<Item = PeerRecord>, GetValueError> {
let permit = self.shared.rate_limiter.acquire_kademlia_permit().await;
let permit = self.shared.rate_limiter.acquire_permit().await;
let (result_sender, result_receiver) = mpsc::unbounded();

self.shared
Expand All @@ -289,7 +289,7 @@ impl Node {
key: Multihash,
value: Vec<u8>,
) -> Result<impl Stream<Item = ()>, PutValueError> {
let permit = self.shared.rate_limiter.acquire_kademlia_permit().await;
let permit = self.shared.rate_limiter.acquire_permit().await;
let (result_sender, result_receiver) = mpsc::unbounded();

self.shared
Expand All @@ -309,7 +309,7 @@ impl Node {

/// Subcribe to some topic on the DSN.
pub async fn subscribe(&self, topic: Sha256Topic) -> Result<TopicSubscription, SubscribeError> {
let permit = self.shared.rate_limiter.acquire_regular_permit().await;
let permit = self.shared.rate_limiter.acquire_permit().await;
let (result_sender, result_receiver) = oneshot::channel();

self.shared
Expand Down Expand Up @@ -337,7 +337,7 @@ impl Node {

/// Subcribe a messgo to some topic on the DSN.
pub async fn publish(&self, topic: Sha256Topic, message: Vec<u8>) -> Result<(), PublishError> {
let _permit = self.shared.rate_limiter.acquire_regular_permit().await;
let _permit = self.shared.rate_limiter.acquire_permit().await;
let (result_sender, result_receiver) = oneshot::channel();

self.shared
Expand All @@ -363,7 +363,7 @@ impl Node {
Request: GenericRequest,
{
let _permit = if acquire_permit {
Some(self.shared.rate_limiter.acquire_kademlia_permit().await)
Some(self.shared.rate_limiter.acquire_permit().await)
} else {
None
};
Expand Down Expand Up @@ -401,7 +401,21 @@ impl Node {
&self,
key: Multihash,
) -> Result<impl Stream<Item = PeerId>, GetClosestPeersError> {
let permit = self.shared.rate_limiter.acquire_kademlia_permit().await;
self.get_closest_peers_internal(key, true).await
}

/// Get closest peers by multihash key using Kademlia DHT.
async fn get_closest_peers_internal(
&self,
key: Multihash,
acquire_permit: bool,
) -> Result<impl Stream<Item = PeerId>, GetClosestPeersError> {
let permit = if acquire_permit {
Some(self.shared.rate_limiter.acquire_permit().await)
} else {
None
};

trace!(?key, "Starting 'GetClosestPeers' request.");

let (result_sender, result_receiver) = mpsc::unbounded();
Expand Down Expand Up @@ -434,7 +448,7 @@ impl Node {
acquire_permit: bool,
) -> Result<impl Stream<Item = PeerId>, GetProvidersError> {
let permit = if acquire_permit {
Some(self.shared.rate_limiter.acquire_kademlia_permit().await)
Some(self.shared.rate_limiter.acquire_permit().await)
} else {
None
};
Expand Down Expand Up @@ -568,7 +582,7 @@ impl Node {

/// Returns the request batch handle with common "connection permit" slot from the shared pool.
pub async fn get_requests_batch_handle(&self) -> NodeRequestsBatchHandle {
let _permit = self.shared.rate_limiter.acquire_kademlia_permit().await;
let _permit = self.shared.rate_limiter.acquire_permit().await;

NodeRequestsBatchHandle {
_permit,
Expand All @@ -583,7 +597,7 @@ impl Node {
/// we don't need to obtain separate semaphore permits for the operations.
pub struct NodeRequestsBatchHandle {
node: Node,
_permit: RateLimiterPermit,
_permit: OwnedSemaphorePermit,
}

impl NodeRequestsBatchHandle {
Expand Down
42 changes: 9 additions & 33 deletions crates/subspace-networking/src/node_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,15 @@ use crate::behavior::persistent_parameters::{
PEERS_ADDRESSES_BATCH_SIZE,
};
use crate::behavior::{Behavior, Event};
use crate::constructor;
use crate::constructor::temporary_bans::TemporaryBans;
use crate::constructor::{ConnectedPeersHandler, LocalOnlyRecordStore};
use crate::protocols::peer_info::{Event as PeerInfoEvent, PeerInfoSuccess};
use crate::protocols::request_response::request_response_factory::{
Event as RequestResponseEvent, IfDisconnected,
};
use crate::shared::{Command, CreatedSubscription, NewPeerInfo, PeerDiscovered, Shared};
use crate::utils::rate_limiter::RateLimiterPermit;
use crate::shared::{Command, CreatedSubscription, PeerDiscovered, Shared};
use crate::utils::{is_global_address_or_dns, strip_peer_id, PeerAddress};
use crate::{constructor, NewPeerInfo};
use async_mutex::Mutex as AsyncMutex;
use bytes::Bytes;
use event_listener_primitives::HandlerId;
Expand Down Expand Up @@ -41,44 +40,38 @@ use std::collections::{HashMap, HashSet};
use std::fmt;
use std::fmt::Debug;
use std::net::IpAddr;
use std::num::NonZeroUsize;
use std::pin::Pin;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Weak};
use std::time::Duration;
use tokio::sync::OwnedSemaphorePermit;
use tokio::task::yield_now;
use tokio::time::Sleep;
use tracing::{debug, error, trace, warn};

// Defines a batch size for peer addresses from Kademlia buckets.
const KADEMLIA_PEERS_ADDRESSES_BATCH_SIZE: usize = 20;

/// How many peers should node be connected to before boosting turns on.
///
/// 1 means boosting starts with second peer.
const CONCURRENT_TASKS_BOOST_PEERS_THRESHOLD: NonZeroUsize =
NonZeroUsize::new(5).expect("Not zero; qed");

enum QueryResultSender {
Value {
sender: mpsc::UnboundedSender<PeerRecord>,
// Just holding onto permit while data structure is not dropped
_permit: RateLimiterPermit,
_permit: OwnedSemaphorePermit,
},
ClosestPeers {
sender: mpsc::UnboundedSender<PeerId>,
// Just holding onto permit while data structure is not dropped
_permit: RateLimiterPermit,
_permit: Option<OwnedSemaphorePermit>,
},
Providers {
sender: mpsc::UnboundedSender<PeerId>,
// Just holding onto permit while data structure is not dropped
_permit: Option<RateLimiterPermit>,
_permit: Option<OwnedSemaphorePermit>,
},
PutValue {
sender: mpsc::UnboundedSender<()>,
// Just holding onto permit while data structure is not dropped
_permit: RateLimiterPermit,
_permit: OwnedSemaphorePermit,
},
Bootstrap {
sender: mpsc::UnboundedSender<()>,
Expand Down Expand Up @@ -534,15 +527,7 @@ where
.num_established_peer_connections
.fetch_add(1, Ordering::SeqCst)
+ 1;
if num_established_peer_connections > CONCURRENT_TASKS_BOOST_PEERS_THRESHOLD.get() {
// The peer count exceeded the threshold, bump up the quota.
if let Err(error) = shared.rate_limiter.expand_kademlia_semaphore() {
warn!(%error, "Failed to expand Kademlia concurrent tasks");
}
if let Err(error) = shared.rate_limiter.expand_regular_semaphore() {
warn!(%error, "Failed to expand regular concurrent tasks");
}
}

shared
.handlers
.num_established_peer_connections_change
Expand Down Expand Up @@ -577,16 +562,7 @@ where
.num_established_peer_connections
.fetch_sub(1, Ordering::SeqCst)
- 1;
if num_established_peer_connections == CONCURRENT_TASKS_BOOST_PEERS_THRESHOLD.get()
{
// The previous peer count was over the threshold, reclaim the quota.
if let Err(error) = shared.rate_limiter.shrink_kademlia_semaphore() {
warn!(%error, "Failed to shrink Kademlia concurrent tasks");
}
if let Err(error) = shared.rate_limiter.shrink_regular_semaphore() {
warn!(%error, "Failed to shrink regular concurrent tasks");
}
}

shared
.handlers
.num_established_peer_connections_change
Expand Down
11 changes: 6 additions & 5 deletions crates/subspace-networking/src/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use crate::protocols::peer_info::PeerInfo;
use crate::protocols::request_response::request_response_factory::RequestFailure;
use crate::utils::multihash::Multihash;
use crate::utils::rate_limiter::{RateLimiter, RateLimiterPermit};
use crate::utils::rate_limiter::RateLimiter;
use crate::utils::Handler;
use bytes::Bytes;
use futures::channel::{mpsc, oneshot};
Expand All @@ -14,6 +14,7 @@ use libp2p::{Multiaddr, PeerId};
use parking_lot::Mutex;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use tokio::sync::OwnedSemaphorePermit;

/// Represents Kademlia events (RoutablePeer, PendingRoutablePeer, UnroutablePeer).
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -56,13 +57,13 @@ pub(crate) enum Command {
GetValue {
key: Multihash,
result_sender: mpsc::UnboundedSender<PeerRecord>,
permit: RateLimiterPermit,
permit: OwnedSemaphorePermit,
},
PutValue {
key: Multihash,
value: Vec<u8>,
result_sender: mpsc::UnboundedSender<()>,
permit: RateLimiterPermit,
permit: OwnedSemaphorePermit,
},
Subscribe {
topic: Sha256Topic,
Expand All @@ -80,7 +81,7 @@ pub(crate) enum Command {
GetClosestPeers {
key: Multihash,
result_sender: mpsc::UnboundedSender<PeerId>,
permit: RateLimiterPermit,
permit: Option<OwnedSemaphorePermit>,
},
GenericRequest {
peer_id: PeerId,
Expand All @@ -91,7 +92,7 @@ pub(crate) enum Command {
GetProviders {
key: Multihash,
result_sender: mpsc::UnboundedSender<PeerId>,
permit: Option<RateLimiterPermit>,
permit: Option<OwnedSemaphorePermit>,
},
BanPeer {
peer_id: PeerId,
Expand Down
Loading

0 comments on commit 6c2302f

Please sign in to comment.