Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gemini 3g backport: improve the default connection limit management for DSN #2498

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/subspace-farmer/src/single_disk_farm/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const FARMER_APP_INFO_RETRY_INTERVAL: Duration = Duration::from_millis(500);
/// Size of the cache of archived segments for the purposes of faster sector expiration checks.
const ARCHIVED_SEGMENTS_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(1000).expect("Not zero; qed");
/// Get piece retry attempts number.
const PIECE_GETTER_RETRY_NUMBER: NonZeroU16 = NonZeroU16::new(4).expect("Not zero; qed");
const PIECE_GETTER_RETRY_NUMBER: NonZeroU16 = NonZeroU16::new(7).expect("Not zero; qed");

/// Details about sector currently being plotted
#[derive(Debug, Clone, Encode, Decode)]
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-networking/src/constructor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ const SPECIAL_CONNECTED_PEERS_PROTOCOL_LOG_TARGET: &str = "special-connected-pee
/// It must be set for large plots.
const SWARM_MAX_NEGOTIATING_INBOUND_STREAMS: usize = 100000;
/// How long will connection be allowed to be open without any usage
const IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(10);
const IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(3);
/// The default maximum established incoming connection number for the swarm.
const SWARM_MAX_ESTABLISHED_INCOMING_CONNECTIONS: u32 = 100;
/// The default maximum established incoming connection number for the swarm.
Expand Down
36 changes: 25 additions & 11 deletions crates/subspace-networking/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crate::protocols::request_response::request_response_factory;
pub use crate::shared::NewPeerInfo;
use crate::shared::{Command, CreatedSubscription, PeerDiscovered, Shared};
use crate::utils::multihash::Multihash;
use crate::utils::rate_limiter::RateLimiterPermit;
use crate::utils::HandlerFn;
use bytes::Bytes;
use event_listener_primitives::HandlerId;
Expand All @@ -18,6 +17,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use thiserror::Error;
use tokio::sync::OwnedSemaphorePermit;
use tracing::{debug, error, trace};

/// Topic subscription, will unsubscribe when last instance is dropped for a particular topic.
Expand All @@ -29,7 +29,7 @@ pub struct TopicSubscription {
command_sender: Option<mpsc::Sender<Command>>,
#[pin]
receiver: mpsc::UnboundedReceiver<Bytes>,
_permit: RateLimiterPermit,
_permit: OwnedSemaphorePermit,
}

impl Stream for TopicSubscription {
Expand Down Expand Up @@ -266,7 +266,7 @@ impl Node {
&self,
key: Multihash,
) -> Result<impl Stream<Item = PeerRecord>, GetValueError> {
let permit = self.shared.rate_limiter.acquire_kademlia_permit().await;
let permit = self.shared.rate_limiter.acquire_permit().await;
let (result_sender, result_receiver) = mpsc::unbounded();

self.shared
Expand All @@ -289,7 +289,7 @@ impl Node {
key: Multihash,
value: Vec<u8>,
) -> Result<impl Stream<Item = ()>, PutValueError> {
let permit = self.shared.rate_limiter.acquire_kademlia_permit().await;
let permit = self.shared.rate_limiter.acquire_permit().await;
let (result_sender, result_receiver) = mpsc::unbounded();

self.shared
Expand All @@ -309,7 +309,7 @@ impl Node {

/// Subcribe to some topic on the DSN.
pub async fn subscribe(&self, topic: Sha256Topic) -> Result<TopicSubscription, SubscribeError> {
let permit = self.shared.rate_limiter.acquire_regular_permit().await;
let permit = self.shared.rate_limiter.acquire_permit().await;
let (result_sender, result_receiver) = oneshot::channel();

self.shared
Expand Down Expand Up @@ -337,7 +337,7 @@ impl Node {

/// Subcribe a messgo to some topic on the DSN.
pub async fn publish(&self, topic: Sha256Topic, message: Vec<u8>) -> Result<(), PublishError> {
let _permit = self.shared.rate_limiter.acquire_regular_permit().await;
let _permit = self.shared.rate_limiter.acquire_permit().await;
let (result_sender, result_receiver) = oneshot::channel();

self.shared
Expand All @@ -363,7 +363,7 @@ impl Node {
Request: GenericRequest,
{
let _permit = if acquire_permit {
Some(self.shared.rate_limiter.acquire_kademlia_permit().await)
Some(self.shared.rate_limiter.acquire_permit().await)
} else {
None
};
Expand Down Expand Up @@ -401,7 +401,21 @@ impl Node {
&self,
key: Multihash,
) -> Result<impl Stream<Item = PeerId>, GetClosestPeersError> {
let permit = self.shared.rate_limiter.acquire_kademlia_permit().await;
self.get_closest_peers_internal(key, true).await
}

/// Get closest peers by multihash key using Kademlia DHT.
async fn get_closest_peers_internal(
&self,
key: Multihash,
acquire_permit: bool,
) -> Result<impl Stream<Item = PeerId>, GetClosestPeersError> {
let permit = if acquire_permit {
Some(self.shared.rate_limiter.acquire_permit().await)
} else {
None
};

trace!(?key, "Starting 'GetClosestPeers' request.");

let (result_sender, result_receiver) = mpsc::unbounded();
Expand Down Expand Up @@ -434,7 +448,7 @@ impl Node {
acquire_permit: bool,
) -> Result<impl Stream<Item = PeerId>, GetProvidersError> {
let permit = if acquire_permit {
Some(self.shared.rate_limiter.acquire_kademlia_permit().await)
Some(self.shared.rate_limiter.acquire_permit().await)
} else {
None
};
Expand Down Expand Up @@ -568,7 +582,7 @@ impl Node {

/// Returns the request batch handle with common "connection permit" slot from the shared pool.
pub async fn get_requests_batch_handle(&self) -> NodeRequestsBatchHandle {
let _permit = self.shared.rate_limiter.acquire_kademlia_permit().await;
let _permit = self.shared.rate_limiter.acquire_permit().await;

NodeRequestsBatchHandle {
_permit,
Expand All @@ -583,7 +597,7 @@ impl Node {
/// we don't need to obtain separate semaphore permits for the operations.
pub struct NodeRequestsBatchHandle {
node: Node,
_permit: RateLimiterPermit,
_permit: OwnedSemaphorePermit,
}

impl NodeRequestsBatchHandle {
Expand Down
42 changes: 9 additions & 33 deletions crates/subspace-networking/src/node_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,15 @@ use crate::behavior::persistent_parameters::{
PEERS_ADDRESSES_BATCH_SIZE,
};
use crate::behavior::{Behavior, Event};
use crate::constructor;
use crate::constructor::temporary_bans::TemporaryBans;
use crate::constructor::{ConnectedPeersHandler, LocalOnlyRecordStore};
use crate::protocols::peer_info::{Event as PeerInfoEvent, PeerInfoSuccess};
use crate::protocols::request_response::request_response_factory::{
Event as RequestResponseEvent, IfDisconnected,
};
use crate::shared::{Command, CreatedSubscription, NewPeerInfo, PeerDiscovered, Shared};
use crate::utils::rate_limiter::RateLimiterPermit;
use crate::shared::{Command, CreatedSubscription, PeerDiscovered, Shared};
use crate::utils::{is_global_address_or_dns, strip_peer_id, PeerAddress};
use crate::{constructor, NewPeerInfo};
use async_mutex::Mutex as AsyncMutex;
use bytes::Bytes;
use event_listener_primitives::HandlerId;
Expand Down Expand Up @@ -41,44 +40,38 @@ use std::collections::{HashMap, HashSet};
use std::fmt;
use std::fmt::Debug;
use std::net::IpAddr;
use std::num::NonZeroUsize;
use std::pin::Pin;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Weak};
use std::time::Duration;
use tokio::sync::OwnedSemaphorePermit;
use tokio::task::yield_now;
use tokio::time::Sleep;
use tracing::{debug, error, trace, warn};

// Defines a batch size for peer addresses from Kademlia buckets.
const KADEMLIA_PEERS_ADDRESSES_BATCH_SIZE: usize = 20;

/// How many peers should node be connected to before boosting turns on.
///
/// 1 means boosting starts with second peer.
const CONCURRENT_TASKS_BOOST_PEERS_THRESHOLD: NonZeroUsize =
NonZeroUsize::new(5).expect("Not zero; qed");

enum QueryResultSender {
Value {
sender: mpsc::UnboundedSender<PeerRecord>,
// Just holding onto permit while data structure is not dropped
_permit: RateLimiterPermit,
_permit: OwnedSemaphorePermit,
},
ClosestPeers {
sender: mpsc::UnboundedSender<PeerId>,
// Just holding onto permit while data structure is not dropped
_permit: RateLimiterPermit,
_permit: Option<OwnedSemaphorePermit>,
},
Providers {
sender: mpsc::UnboundedSender<PeerId>,
// Just holding onto permit while data structure is not dropped
_permit: Option<RateLimiterPermit>,
_permit: Option<OwnedSemaphorePermit>,
},
PutValue {
sender: mpsc::UnboundedSender<()>,
// Just holding onto permit while data structure is not dropped
_permit: RateLimiterPermit,
_permit: OwnedSemaphorePermit,
},
Bootstrap {
sender: mpsc::UnboundedSender<()>,
Expand Down Expand Up @@ -534,15 +527,7 @@ where
.num_established_peer_connections
.fetch_add(1, Ordering::SeqCst)
+ 1;
if num_established_peer_connections > CONCURRENT_TASKS_BOOST_PEERS_THRESHOLD.get() {
// The peer count exceeded the threshold, bump up the quota.
if let Err(error) = shared.rate_limiter.expand_kademlia_semaphore() {
warn!(%error, "Failed to expand Kademlia concurrent tasks");
}
if let Err(error) = shared.rate_limiter.expand_regular_semaphore() {
warn!(%error, "Failed to expand regular concurrent tasks");
}
}

shared
.handlers
.num_established_peer_connections_change
Expand Down Expand Up @@ -577,16 +562,7 @@ where
.num_established_peer_connections
.fetch_sub(1, Ordering::SeqCst)
- 1;
if num_established_peer_connections == CONCURRENT_TASKS_BOOST_PEERS_THRESHOLD.get()
{
// The previous peer count was over the threshold, reclaim the quota.
if let Err(error) = shared.rate_limiter.shrink_kademlia_semaphore() {
warn!(%error, "Failed to shrink Kademlia concurrent tasks");
}
if let Err(error) = shared.rate_limiter.shrink_regular_semaphore() {
warn!(%error, "Failed to shrink regular concurrent tasks");
}
}

shared
.handlers
.num_established_peer_connections_change
Expand Down
11 changes: 6 additions & 5 deletions crates/subspace-networking/src/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use crate::protocols::peer_info::PeerInfo;
use crate::protocols::request_response::request_response_factory::RequestFailure;
use crate::utils::multihash::Multihash;
use crate::utils::rate_limiter::{RateLimiter, RateLimiterPermit};
use crate::utils::rate_limiter::RateLimiter;
use crate::utils::Handler;
use bytes::Bytes;
use futures::channel::{mpsc, oneshot};
Expand All @@ -14,6 +14,7 @@ use libp2p::{Multiaddr, PeerId};
use parking_lot::Mutex;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use tokio::sync::OwnedSemaphorePermit;

/// Represents Kademlia events (RoutablePeer, PendingRoutablePeer, UnroutablePeer).
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -56,13 +57,13 @@ pub(crate) enum Command {
GetValue {
key: Multihash,
result_sender: mpsc::UnboundedSender<PeerRecord>,
permit: RateLimiterPermit,
permit: OwnedSemaphorePermit,
},
PutValue {
key: Multihash,
value: Vec<u8>,
result_sender: mpsc::UnboundedSender<()>,
permit: RateLimiterPermit,
permit: OwnedSemaphorePermit,
},
Subscribe {
topic: Sha256Topic,
Expand All @@ -80,7 +81,7 @@ pub(crate) enum Command {
GetClosestPeers {
key: Multihash,
result_sender: mpsc::UnboundedSender<PeerId>,
permit: RateLimiterPermit,
permit: Option<OwnedSemaphorePermit>,
},
GenericRequest {
peer_id: PeerId,
Expand All @@ -91,7 +92,7 @@ pub(crate) enum Command {
GetProviders {
key: Multihash,
result_sender: mpsc::UnboundedSender<PeerId>,
permit: Option<RateLimiterPermit>,
permit: Option<OwnedSemaphorePermit>,
},
BanPeer {
peer_id: PeerId,
Expand Down
Loading
Loading