From 95e639664577a97faeb04dc88651d9c9d3e2dbf0 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Tue, 30 Jan 2024 03:35:56 +0200 Subject: [PATCH 1/4] Decrease idle network connection timeout to 3 seconds --- crates/subspace-networking/src/constructor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/subspace-networking/src/constructor.rs b/crates/subspace-networking/src/constructor.rs index a2737302a2..99a323bea3 100644 --- a/crates/subspace-networking/src/constructor.rs +++ b/crates/subspace-networking/src/constructor.rs @@ -60,7 +60,7 @@ const SPECIAL_CONNECTED_PEERS_PROTOCOL_LOG_TARGET: &str = "special-connected-pee /// It must be set for large plots. const SWARM_MAX_NEGOTIATING_INBOUND_STREAMS: usize = 100000; /// How long will connection be allowed to be open without any usage -const IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(10); +const IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(3); /// The default maximum established incoming connection number for the swarm. const SWARM_MAX_ESTABLISHED_INCOMING_CONNECTIONS: u32 = 100; /// The default maximum established incoming connection number for the swarm. From fc087954be17c1cf1cee332f081920b5ae38c544 Mon Sep 17 00:00:00 2001 From: Shamil Gadelshin Date: Wed, 31 Jan 2024 19:21:11 +0700 Subject: [PATCH 2/4] networking: Change rate limiter. - change CONNECTION_TIMEOUT_PARAMETER. - remove substream limits - use Tokio async semaphore # Conflicts: # crates/subspace-networking/src/node.rs # crates/subspace-networking/src/node_runner.rs # crates/subspace-networking/src/shared.rs --- crates/subspace-networking/src/node.rs | 36 ++++--- crates/subspace-networking/src/node_runner.rs | 42 ++------ crates/subspace-networking/src/shared.rs | 11 ++- .../src/utils/rate_limiter.rs | 97 +++---------------- 4 files changed, 52 insertions(+), 134 deletions(-) diff --git a/crates/subspace-networking/src/node.rs b/crates/subspace-networking/src/node.rs index bf552aa358..822d90d48e 100644 --- a/crates/subspace-networking/src/node.rs +++ b/crates/subspace-networking/src/node.rs @@ -3,7 +3,6 @@ use crate::protocols::request_response::request_response_factory; pub use crate::shared::NewPeerInfo; use crate::shared::{Command, CreatedSubscription, PeerDiscovered, Shared}; use crate::utils::multihash::Multihash; -use crate::utils::rate_limiter::RateLimiterPermit; use crate::utils::HandlerFn; use bytes::Bytes; use event_listener_primitives::HandlerId; @@ -18,6 +17,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; use thiserror::Error; +use tokio::sync::OwnedSemaphorePermit; use tracing::{debug, error, trace}; /// Topic subscription, will unsubscribe when last instance is dropped for a particular topic. @@ -29,7 +29,7 @@ pub struct TopicSubscription { command_sender: Option>, #[pin] receiver: mpsc::UnboundedReceiver, - _permit: RateLimiterPermit, + _permit: OwnedSemaphorePermit, } impl Stream for TopicSubscription { @@ -266,7 +266,7 @@ impl Node { &self, key: Multihash, ) -> Result, GetValueError> { - let permit = self.shared.rate_limiter.acquire_kademlia_permit().await; + let permit = self.shared.rate_limiter.acquire_permit().await; let (result_sender, result_receiver) = mpsc::unbounded(); self.shared @@ -289,7 +289,7 @@ impl Node { key: Multihash, value: Vec, ) -> Result, PutValueError> { - let permit = self.shared.rate_limiter.acquire_kademlia_permit().await; + let permit = self.shared.rate_limiter.acquire_permit().await; let (result_sender, result_receiver) = mpsc::unbounded(); self.shared @@ -309,7 +309,7 @@ impl Node { /// Subcribe to some topic on the DSN. pub async fn subscribe(&self, topic: Sha256Topic) -> Result { - let permit = self.shared.rate_limiter.acquire_regular_permit().await; + let permit = self.shared.rate_limiter.acquire_permit().await; let (result_sender, result_receiver) = oneshot::channel(); self.shared @@ -337,7 +337,7 @@ impl Node { /// Subcribe a messgo to some topic on the DSN. pub async fn publish(&self, topic: Sha256Topic, message: Vec) -> Result<(), PublishError> { - let _permit = self.shared.rate_limiter.acquire_regular_permit().await; + let _permit = self.shared.rate_limiter.acquire_permit().await; let (result_sender, result_receiver) = oneshot::channel(); self.shared @@ -363,7 +363,7 @@ impl Node { Request: GenericRequest, { let _permit = if acquire_permit { - Some(self.shared.rate_limiter.acquire_kademlia_permit().await) + Some(self.shared.rate_limiter.acquire_permit().await) } else { None }; @@ -401,7 +401,21 @@ impl Node { &self, key: Multihash, ) -> Result, GetClosestPeersError> { - let permit = self.shared.rate_limiter.acquire_kademlia_permit().await; + self.get_closest_peers_internal(key, true).await + } + + /// Get closest peers by multihash key using Kademlia DHT. + async fn get_closest_peers_internal( + &self, + key: Multihash, + acquire_permit: bool, + ) -> Result, GetClosestPeersError> { + let permit = if acquire_permit { + Some(self.shared.rate_limiter.acquire_permit().await) + } else { + None + }; + trace!(?key, "Starting 'GetClosestPeers' request."); let (result_sender, result_receiver) = mpsc::unbounded(); @@ -434,7 +448,7 @@ impl Node { acquire_permit: bool, ) -> Result, GetProvidersError> { let permit = if acquire_permit { - Some(self.shared.rate_limiter.acquire_kademlia_permit().await) + Some(self.shared.rate_limiter.acquire_permit().await) } else { None }; @@ -568,7 +582,7 @@ impl Node { /// Returns the request batch handle with common "connection permit" slot from the shared pool. pub async fn get_requests_batch_handle(&self) -> NodeRequestsBatchHandle { - let _permit = self.shared.rate_limiter.acquire_kademlia_permit().await; + let _permit = self.shared.rate_limiter.acquire_permit().await; NodeRequestsBatchHandle { _permit, @@ -583,7 +597,7 @@ impl Node { /// we don't need to obtain separate semaphore permits for the operations. pub struct NodeRequestsBatchHandle { node: Node, - _permit: RateLimiterPermit, + _permit: OwnedSemaphorePermit, } impl NodeRequestsBatchHandle { diff --git a/crates/subspace-networking/src/node_runner.rs b/crates/subspace-networking/src/node_runner.rs index 16b8ca9ddc..c62e0c3df4 100644 --- a/crates/subspace-networking/src/node_runner.rs +++ b/crates/subspace-networking/src/node_runner.rs @@ -3,16 +3,15 @@ use crate::behavior::persistent_parameters::{ PEERS_ADDRESSES_BATCH_SIZE, }; use crate::behavior::{Behavior, Event}; -use crate::constructor; use crate::constructor::temporary_bans::TemporaryBans; use crate::constructor::{ConnectedPeersHandler, LocalOnlyRecordStore}; use crate::protocols::peer_info::{Event as PeerInfoEvent, PeerInfoSuccess}; use crate::protocols::request_response::request_response_factory::{ Event as RequestResponseEvent, IfDisconnected, }; -use crate::shared::{Command, CreatedSubscription, NewPeerInfo, PeerDiscovered, Shared}; -use crate::utils::rate_limiter::RateLimiterPermit; +use crate::shared::{Command, CreatedSubscription, PeerDiscovered, Shared}; use crate::utils::{is_global_address_or_dns, strip_peer_id, PeerAddress}; +use crate::{constructor, NewPeerInfo}; use async_mutex::Mutex as AsyncMutex; use bytes::Bytes; use event_listener_primitives::HandlerId; @@ -41,11 +40,11 @@ use std::collections::{HashMap, HashSet}; use std::fmt; use std::fmt::Debug; use std::net::IpAddr; -use std::num::NonZeroUsize; use std::pin::Pin; use std::sync::atomic::Ordering; use std::sync::{Arc, Weak}; use std::time::Duration; +use tokio::sync::OwnedSemaphorePermit; use tokio::task::yield_now; use tokio::time::Sleep; use tracing::{debug, error, trace, warn}; @@ -53,32 +52,26 @@ use tracing::{debug, error, trace, warn}; // Defines a batch size for peer addresses from Kademlia buckets. const KADEMLIA_PEERS_ADDRESSES_BATCH_SIZE: usize = 20; -/// How many peers should node be connected to before boosting turns on. -/// -/// 1 means boosting starts with second peer. -const CONCURRENT_TASKS_BOOST_PEERS_THRESHOLD: NonZeroUsize = - NonZeroUsize::new(5).expect("Not zero; qed"); - enum QueryResultSender { Value { sender: mpsc::UnboundedSender, // Just holding onto permit while data structure is not dropped - _permit: RateLimiterPermit, + _permit: OwnedSemaphorePermit, }, ClosestPeers { sender: mpsc::UnboundedSender, // Just holding onto permit while data structure is not dropped - _permit: RateLimiterPermit, + _permit: Option, }, Providers { sender: mpsc::UnboundedSender, // Just holding onto permit while data structure is not dropped - _permit: Option, + _permit: Option, }, PutValue { sender: mpsc::UnboundedSender<()>, // Just holding onto permit while data structure is not dropped - _permit: RateLimiterPermit, + _permit: OwnedSemaphorePermit, }, Bootstrap { sender: mpsc::UnboundedSender<()>, @@ -534,15 +527,7 @@ where .num_established_peer_connections .fetch_add(1, Ordering::SeqCst) + 1; - if num_established_peer_connections > CONCURRENT_TASKS_BOOST_PEERS_THRESHOLD.get() { - // The peer count exceeded the threshold, bump up the quota. - if let Err(error) = shared.rate_limiter.expand_kademlia_semaphore() { - warn!(%error, "Failed to expand Kademlia concurrent tasks"); - } - if let Err(error) = shared.rate_limiter.expand_regular_semaphore() { - warn!(%error, "Failed to expand regular concurrent tasks"); - } - } + shared .handlers .num_established_peer_connections_change @@ -577,16 +562,7 @@ where .num_established_peer_connections .fetch_sub(1, Ordering::SeqCst) - 1; - if num_established_peer_connections == CONCURRENT_TASKS_BOOST_PEERS_THRESHOLD.get() - { - // The previous peer count was over the threshold, reclaim the quota. - if let Err(error) = shared.rate_limiter.shrink_kademlia_semaphore() { - warn!(%error, "Failed to shrink Kademlia concurrent tasks"); - } - if let Err(error) = shared.rate_limiter.shrink_regular_semaphore() { - warn!(%error, "Failed to shrink regular concurrent tasks"); - } - } + shared .handlers .num_established_peer_connections_change diff --git a/crates/subspace-networking/src/shared.rs b/crates/subspace-networking/src/shared.rs index a8ee3033d5..eee8389eb1 100644 --- a/crates/subspace-networking/src/shared.rs +++ b/crates/subspace-networking/src/shared.rs @@ -4,7 +4,7 @@ use crate::protocols::peer_info::PeerInfo; use crate::protocols::request_response::request_response_factory::RequestFailure; use crate::utils::multihash::Multihash; -use crate::utils::rate_limiter::{RateLimiter, RateLimiterPermit}; +use crate::utils::rate_limiter::RateLimiter; use crate::utils::Handler; use bytes::Bytes; use futures::channel::{mpsc, oneshot}; @@ -14,6 +14,7 @@ use libp2p::{Multiaddr, PeerId}; use parking_lot::Mutex; use std::sync::atomic::AtomicUsize; use std::sync::Arc; +use tokio::sync::OwnedSemaphorePermit; /// Represents Kademlia events (RoutablePeer, PendingRoutablePeer, UnroutablePeer). #[derive(Clone, Debug)] @@ -56,13 +57,13 @@ pub(crate) enum Command { GetValue { key: Multihash, result_sender: mpsc::UnboundedSender, - permit: RateLimiterPermit, + permit: OwnedSemaphorePermit, }, PutValue { key: Multihash, value: Vec, result_sender: mpsc::UnboundedSender<()>, - permit: RateLimiterPermit, + permit: OwnedSemaphorePermit, }, Subscribe { topic: Sha256Topic, @@ -80,7 +81,7 @@ pub(crate) enum Command { GetClosestPeers { key: Multihash, result_sender: mpsc::UnboundedSender, - permit: RateLimiterPermit, + permit: Option, }, GenericRequest { peer_id: PeerId, @@ -91,7 +92,7 @@ pub(crate) enum Command { GetProviders { key: Multihash, result_sender: mpsc::UnboundedSender, - permit: Option, + permit: Option, }, BanPeer { peer_id: PeerId, diff --git a/crates/subspace-networking/src/utils/rate_limiter.rs b/crates/subspace-networking/src/utils/rate_limiter.rs index 6f5d26f769..af34eb6fe2 100644 --- a/crates/subspace-networking/src/utils/rate_limiter.rs +++ b/crates/subspace-networking/src/utils/rate_limiter.rs @@ -2,55 +2,20 @@ pub(crate) mod resizable_semaphore; #[cfg(test)] mod tests; -use crate::utils::rate_limiter::resizable_semaphore::{ - ResizableSemaphore, ResizableSemaphorePermit, SemaphoreError, -}; use std::num::NonZeroUsize; -use tracing::{debug, trace}; - -/// Base limit for number of concurrent tasks initiated towards Kademlia. -/// -/// We restrict this so we can manage outgoing requests a bit better by cancelling low-priority -/// requests, but this value will be boosted depending on number of connected peers. -const KADEMLIA_BASE_CONCURRENT_TASKS: NonZeroUsize = NonZeroUsize::new(15).expect("Not zero; qed"); -/// Above base limit will be boosted by specified number for every peer connected starting with -/// second peer, such that it scaled with network connectivity, but the exact coefficient might need -/// to be tweaked in the future. -pub(crate) const KADEMLIA_CONCURRENT_TASKS_BOOST_PER_PEER: usize = 15; -/// Base limit for number of any concurrent tasks except Kademlia. -/// -/// We configure total number of streams per connection to 256. Here we assume half of them might be -/// incoming and half outgoing, we also leave a small buffer of streams just in case. -/// -/// We restrict this so we don't exceed number of streams for single peer, but this value will be -/// boosted depending on number of connected peers. -const REGULAR_BASE_CONCURRENT_TASKS: NonZeroUsize = - NonZeroUsize::new(50 - KADEMLIA_BASE_CONCURRENT_TASKS.get()).expect("Not zero; qed"); -/// Above base limit will be boosted by specified number for every peer connected starting with -/// second peer, such that it scaled with network connectivity, but the exact coefficient might need -/// to be tweaked in the future. -pub(crate) const REGULAR_CONCURRENT_TASKS_BOOST_PER_PEER: usize = 25; +use std::sync::Arc; +use tokio::sync::{OwnedSemaphorePermit, Semaphore}; +use tracing::debug; /// Defines the minimum size of the "connection limit semaphore". const MINIMUM_CONNECTIONS_SEMAPHORE_SIZE: usize = 3; /// Empiric parameter for connection timeout and retry parameters (total retries and backoff time). -const CONNECTION_TIMEOUT_PARAMETER: usize = 9; - -#[derive(Debug)] -pub(crate) struct RateLimiterPermit { - /// Limits Kademlia substreams. - _substream_limit_permit: ResizableSemaphorePermit, - - /// Limits outgoing connections. - _connection_limit_permit: ResizableSemaphorePermit, -} +const CONNECTION_TIMEOUT_PARAMETER: usize = 2; #[derive(Debug)] pub(crate) struct RateLimiter { - kademlia_tasks_semaphore: ResizableSemaphore, - regular_tasks_semaphore: ResizableSemaphore, - connections_semaphore: ResizableSemaphore, + connections_semaphore: Arc, } impl RateLimiter { @@ -63,9 +28,7 @@ impl RateLimiter { debug!(%out_connections, %pending_out_connections, %permits, "Rate limiter was instantiated."); Self { - kademlia_tasks_semaphore: ResizableSemaphore::new(KADEMLIA_BASE_CONCURRENT_TASKS), - regular_tasks_semaphore: ResizableSemaphore::new(REGULAR_BASE_CONCURRENT_TASKS), - connections_semaphore: ResizableSemaphore::new(permits), + connections_semaphore: Arc::new(Semaphore::new(permits.get())), } } @@ -91,47 +54,11 @@ impl RateLimiter { .max(minimum_semaphore_size) } - pub(crate) async fn acquire_regular_permit(&self) -> RateLimiterPermit { - let connections_permit = self.connections_semaphore.acquire().await; - let substream_permit = self.regular_tasks_semaphore.acquire().await; - - RateLimiterPermit { - _connection_limit_permit: connections_permit, - _substream_limit_permit: substream_permit, - } - } - - pub(crate) fn expand_regular_semaphore(&self) -> Result<(), SemaphoreError> { - self.regular_tasks_semaphore - .expand(REGULAR_CONCURRENT_TASKS_BOOST_PER_PEER) - .map(|old_capacity| trace!(%old_capacity, "Expand regular semaphore.")) - } - - pub(crate) fn shrink_regular_semaphore(&self) -> Result<(), SemaphoreError> { - self.regular_tasks_semaphore - .shrink(REGULAR_CONCURRENT_TASKS_BOOST_PER_PEER) - .map(|old_capacity| trace!(%old_capacity, "Shrink regular semaphore.")) - } - - pub(crate) async fn acquire_kademlia_permit(&self) -> RateLimiterPermit { - let connections_permit = self.connections_semaphore.acquire().await; - let substream_permit = self.kademlia_tasks_semaphore.acquire().await; - - RateLimiterPermit { - _connection_limit_permit: connections_permit, - _substream_limit_permit: substream_permit, - } - } - - pub(crate) fn expand_kademlia_semaphore(&self) -> Result<(), SemaphoreError> { - self.kademlia_tasks_semaphore - .expand(KADEMLIA_CONCURRENT_TASKS_BOOST_PER_PEER) - .map(|old_capacity| trace!(%old_capacity, "Expand kademlia semaphore.")) - } - - pub(crate) fn shrink_kademlia_semaphore(&self) -> Result<(), SemaphoreError> { - self.kademlia_tasks_semaphore - .shrink(KADEMLIA_CONCURRENT_TASKS_BOOST_PER_PEER) - .map(|old_capacity| trace!(%old_capacity, "Shrink kademlia semaphore.")) + pub(crate) async fn acquire_permit(&self) -> OwnedSemaphorePermit { + self.connections_semaphore + .clone() + .acquire_owned() + .await + .expect("We never close semaphore.") } } From c02b3662544e50f465d0195aad322f59dd54c7b7 Mon Sep 17 00:00:00 2001 From: Shamil Gadelshin Date: Thu, 1 Feb 2024 14:39:15 +0700 Subject: [PATCH 3/4] networking: Remove ResizableSemaphore. --- .../src/utils/rate_limiter.rs | 4 - .../utils/rate_limiter/resizable_semaphore.rs | 190 ------------------ .../src/utils/rate_limiter/tests.rs | 61 ------ 3 files changed, 255 deletions(-) delete mode 100644 crates/subspace-networking/src/utils/rate_limiter/resizable_semaphore.rs delete mode 100644 crates/subspace-networking/src/utils/rate_limiter/tests.rs diff --git a/crates/subspace-networking/src/utils/rate_limiter.rs b/crates/subspace-networking/src/utils/rate_limiter.rs index af34eb6fe2..76d019a936 100644 --- a/crates/subspace-networking/src/utils/rate_limiter.rs +++ b/crates/subspace-networking/src/utils/rate_limiter.rs @@ -1,7 +1,3 @@ -pub(crate) mod resizable_semaphore; -#[cfg(test)] -mod tests; - use std::num::NonZeroUsize; use std::sync::Arc; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; diff --git a/crates/subspace-networking/src/utils/rate_limiter/resizable_semaphore.rs b/crates/subspace-networking/src/utils/rate_limiter/resizable_semaphore.rs deleted file mode 100644 index 60673926aa..0000000000 --- a/crates/subspace-networking/src/utils/rate_limiter/resizable_semaphore.rs +++ /dev/null @@ -1,190 +0,0 @@ -use parking_lot::Mutex; -use std::num::NonZeroUsize; -use std::sync::Arc; -use thiserror::Error; -use tokio::sync::Notify; - -/// Errors happening during semaphore usage -#[derive(Debug, Error)] -pub(crate) enum SemaphoreError { - #[error("Invalid shrink: capacity {capacity}, delta {delta}")] - InvalidShrink { - /// The current capacity - capacity: usize, - /// How much to shrink - delta: usize, - }, - #[error("Invalid expand: capacity {capacity}, delta {delta}")] - InvalidExpand { - /// The current capacity - capacity: usize, - /// How much to expand - delta: usize, - }, -} - -/// The state shared between the semaphore and the outstanding permits. -#[derive(Debug)] -struct SemShared { - /// The tuple holds (current usage, current max capacity) - state: Mutex, - /// To signal waiters for permits to be available - notify: Notify, -} - -/// The semaphore state. -#[derive(Debug)] -struct SemState { - /// The current capacity - capacity: usize, - /// The current outstanding permits - usage: usize, -} - -impl SemState { - // Allocates a permit if available. - // Returns true if allocated, false otherwise. - fn alloc_one(&mut self) -> bool { - if self.usage < self.capacity { - self.usage += 1; - true - } else { - false - } - } - - // Returns a free permit to the free pool. - // Returns true if any waiters need to be notified. - fn free_one(&mut self) -> bool { - let prev_is_full = self.is_full(); - if let Some(dec) = self.usage.checked_sub(1) { - self.usage = dec; - } else { - unreachable!("Dropping semaphore twice is not possible"); - } - - // Notify if we did a full -> available transition. - prev_is_full && !self.is_full() - } - - // Expands the max capacity by delta. - // Returns true if any waiters need to be notified. - fn expand(&mut self, delta: usize) -> Result { - let prev_is_full = self.is_full(); - if let Some(capacity) = self.capacity.checked_add(delta) { - self.capacity = capacity; - // Notify if we did a full -> available transition. - Ok(prev_is_full && !self.is_full()) - } else { - Err(SemaphoreError::InvalidExpand { - capacity: self.capacity, - delta, - }) - } - } - - // Shrinks the max capacity by delta. - fn shrink(&mut self, delta: usize) -> Result<(), SemaphoreError> { - if let Some(capacity) = self.capacity.checked_sub(delta) { - self.capacity = capacity; - Ok(()) - } else { - Err(SemaphoreError::InvalidShrink { - capacity: self.capacity, - delta, - }) - } - } - - // Returns true if current usage exceeds capacity - fn is_full(&self) -> bool { - self.usage >= self.capacity - } -} - -/// Semaphore like implementation that allows both shrinking and expanding -/// the max permits. -#[derive(Clone, Debug)] -pub(crate) struct ResizableSemaphore(Arc); - -impl ResizableSemaphore { - pub(crate) fn new(capacity: NonZeroUsize) -> Self { - let shared = SemShared { - state: Mutex::new(SemState { - capacity: capacity.get(), - usage: 0, - }), - notify: Notify::new(), - }; - Self(Arc::new(shared)) - } - - // Acquires a permit. Waits until a permit is available. - pub(crate) async fn acquire(&self) -> ResizableSemaphorePermit { - loop { - let wait = { - let mut state = self.0.state.lock(); - if state.alloc_one() { - None - } else { - // This needs to be done under the lock to avoid race. - Some(self.0.notify.notified()) - } - }; - - match wait { - Some(notified) => notified.await, - None => break, - } - } - ResizableSemaphorePermit(self.0.clone()) - } - - // Acquires a permit, doesn't wait for permits to be available. - // Currently used only for tests. - #[cfg(test)] - pub(crate) fn try_acquire(&self) -> Option { - let mut state = self.0.state.lock(); - if state.alloc_one() { - Some(ResizableSemaphorePermit(self.0.clone())) - } else { - None - } - } - - // Expands the capacity by the specified amount. Returns old capacity. - pub(crate) fn expand(&self, delta: usize) -> Result { - let mut state = self.0.state.lock(); - let old_capacity = state.capacity; - - let notify_waiters = state.expand(delta)?; - if notify_waiters { - self.0.notify.notify_waiters(); - } - - Ok(old_capacity) - } - - // Shrinks the capacity by the specified amount. - pub(crate) fn shrink(&self, delta: usize) -> Result { - let mut state = self.0.state.lock(); - let old_capacity = state.capacity; - - state.shrink(delta)?; - - Ok(old_capacity) - } -} - -/// The semaphore permit. -#[derive(Clone, Debug)] -pub(crate) struct ResizableSemaphorePermit(Arc); - -impl Drop for ResizableSemaphorePermit { - fn drop(&mut self) { - let notify_waiters = self.0.state.lock().free_one(); - if notify_waiters { - self.0.notify.notify_waiters(); - } - } -} diff --git a/crates/subspace-networking/src/utils/rate_limiter/tests.rs b/crates/subspace-networking/src/utils/rate_limiter/tests.rs deleted file mode 100644 index fe51f3ce0b..0000000000 --- a/crates/subspace-networking/src/utils/rate_limiter/tests.rs +++ /dev/null @@ -1,61 +0,0 @@ -use crate::utils::rate_limiter::resizable_semaphore::ResizableSemaphore; -use std::num::NonZeroUsize; - -#[test] -fn test_resizable_semaphore_alloc() { - // Capacity = 3. We should be able to alloc only three permits. - let sem = ResizableSemaphore::new(NonZeroUsize::new(3).unwrap()); - let _permit_1 = sem.try_acquire().unwrap(); - let _permit_2 = sem.try_acquire().unwrap(); - let _permit_3 = sem.try_acquire().unwrap(); - assert!(sem.try_acquire().is_none()); -} - -#[test] -fn test_resizable_semaphore_expand() { - // Initial capacity = 3. - let sem = ResizableSemaphore::new(NonZeroUsize::new(3).unwrap()); - let _permit_1 = sem.try_acquire().unwrap(); - let _permit_2 = sem.try_acquire().unwrap(); - let _permit_3 = sem.try_acquire().unwrap(); - assert!(sem.try_acquire().is_none()); - - // Increase capacity of semaphore by 2, we should be able to alloc two more permits. - sem.expand(2).unwrap(); - // Can't expand with overflow - assert!(sem.expand(usize::MAX).is_err()); - let _permit_4 = sem.try_acquire().unwrap(); - let _permit_5 = sem.try_acquire().unwrap(); - assert!(sem.try_acquire().is_none()); -} - -#[test] -fn test_resizable_semaphore_shrink() { - // Initial capacity = 4, alloc 4 outstanding permits. - let sem = ResizableSemaphore::new(NonZeroUsize::new(4).unwrap()); - let permit_1 = sem.try_acquire().unwrap(); - let permit_2 = sem.try_acquire().unwrap(); - let permit_3 = sem.try_acquire().unwrap(); - let _permit_4 = sem.try_acquire().unwrap(); - assert!(sem.try_acquire().is_none()); - - // Shrink the capacity by 2, new capacity = 2. - sem.shrink(2).unwrap(); - // Can't shrink by more than capacity - assert!(sem.shrink(usize::MAX).is_err()); - - // Alloc should fail as outstanding permits(4) >= capacity(2). - assert!(sem.try_acquire().is_none()); - - // Free a permit, alloc should fail as outstanding permits(3) >= capacity(2). - drop(permit_2); - assert!(sem.try_acquire().is_none()); - - // Free another permit, alloc should fail as outstanding permits(2) >= capacity(2). - drop(permit_3); - assert!(sem.try_acquire().is_none()); - - // Free another permit, alloc should succeed as outstanding permits(1) < capacity(2). - drop(permit_1); - assert!(sem.try_acquire().is_some()); -} From ba2bce9a47f4c240d743e69ff022f6f41e993c2a Mon Sep 17 00:00:00 2001 From: Shamil Gadelshin Date: Thu, 1 Feb 2024 15:57:13 +0700 Subject: [PATCH 4/4] Increase retries number for DSN requests. --- crates/subspace-farmer/src/single_disk_farm/plotting.rs | 2 +- crates/subspace-service/src/sync_from_dsn/import_blocks.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/subspace-farmer/src/single_disk_farm/plotting.rs b/crates/subspace-farmer/src/single_disk_farm/plotting.rs index 7a4872a4ee..a17787063f 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plotting.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plotting.rs @@ -43,7 +43,7 @@ const FARMER_APP_INFO_RETRY_INTERVAL: Duration = Duration::from_millis(500); /// Size of the cache of archived segments for the purposes of faster sector expiration checks. const ARCHIVED_SEGMENTS_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(1000).expect("Not zero; qed"); /// Get piece retry attempts number. -const PIECE_GETTER_RETRY_NUMBER: NonZeroU16 = NonZeroU16::new(4).expect("Not zero; qed"); +const PIECE_GETTER_RETRY_NUMBER: NonZeroU16 = NonZeroU16::new(7).expect("Not zero; qed"); /// Details about sector currently being plotted #[derive(Debug, Clone, Encode, Decode)] diff --git a/crates/subspace-service/src/sync_from_dsn/import_blocks.rs b/crates/subspace-service/src/sync_from_dsn/import_blocks.rs index e73b6ddaed..6c0fb4e2ec 100644 --- a/crates/subspace-service/src/sync_from_dsn/import_blocks.rs +++ b/crates/subspace-service/src/sync_from_dsn/import_blocks.rs @@ -37,7 +37,7 @@ use tokio::sync::Semaphore; use tracing::warn; /// Get piece retry attempts number. -const PIECE_GETTER_RETRY_NUMBER: NonZeroU16 = NonZeroU16::new(4).expect("Not zero; qed"); +const PIECE_GETTER_RETRY_NUMBER: NonZeroU16 = NonZeroU16::new(7).expect("Not zero; qed"); /// How many blocks to queue before pausing and waiting for blocks to be imported, this is /// essentially used to ensure we use a bounded amount of RAM during sync process.