From 166e70b9297f1701a7354d703a4e81b26f550f74 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Thu, 16 May 2024 21:27:36 +0100 Subject: [PATCH 1/3] fix pin violation in much of concurrentstream --- Cargo.toml | 1 + src/concurrent_stream/for_each.rs | 8 ++--- .../from_concurrent_stream.rs | 8 ++--- src/concurrent_stream/try_for_each.rs | 8 ++--- src/future/future_group.rs | 31 ------------------- 5 files changed, 13 insertions(+), 43 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a141570..e1a9ced 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ futures-lite = "1.12.0" pin-project = "1.0.8" slab = { version = "0.4.8", optional = true } smallvec = { version = "1.11.0", optional = true } +futures-buffered = "0.2.6" [dev-dependencies] async-io = "2.3.2" diff --git a/src/concurrent_stream/for_each.rs b/src/concurrent_stream/for_each.rs index 07564a2..ea24aeb 100644 --- a/src/concurrent_stream/for_each.rs +++ b/src/concurrent_stream/for_each.rs @@ -1,5 +1,5 @@ use super::{Consumer, ConsumerState}; -use crate::future::FutureGroup; +use futures_buffered::FuturesUnordered; use futures_lite::StreamExt; use pin_project::pin_project; @@ -22,7 +22,7 @@ where // NOTE: we can remove the `Arc` here if we're willing to make this struct self-referential count: Arc, #[pin] - group: FutureGroup>, + group: FuturesUnordered>, limit: usize, f: F, _phantom: PhantomData<(T, FutB)>, @@ -44,7 +44,7 @@ where f, _phantom: PhantomData, count: Arc::new(AtomicUsize::new(0)), - group: FutureGroup::new(), + group: FuturesUnordered::new(), } } } @@ -69,7 +69,7 @@ where // Space was available! - insert the item for posterity this.count.fetch_add(1, Ordering::Relaxed); let fut = ForEachFut::new(this.f.clone(), future, this.count.clone()); - this.group.as_mut().insert_pinned(fut); + this.group.as_mut().push(fut); ConsumerState::Continue } diff --git a/src/concurrent_stream/from_concurrent_stream.rs b/src/concurrent_stream/from_concurrent_stream.rs index a84e2f0..ac8c106 100644 --- a/src/concurrent_stream/from_concurrent_stream.rs +++ b/src/concurrent_stream/from_concurrent_stream.rs @@ -1,5 +1,5 @@ use super::{ConcurrentStream, Consumer, ConsumerState, IntoConcurrentStream}; -use crate::future::FutureGroup; +use futures_buffered::FuturesUnordered; #[cfg(all(feature = "alloc", not(feature = "std")))] use alloc::vec::Vec; use core::future::Future; @@ -32,14 +32,14 @@ impl FromConcurrentStream for Vec { #[pin_project] pub(crate) struct VecConsumer<'a, Fut: Future> { #[pin] - group: FutureGroup, + group: FuturesUnordered, output: &'a mut Vec, } impl<'a, Fut: Future> VecConsumer<'a, Fut> { pub(crate) fn new(output: &'a mut Vec) -> Self { Self { - group: FutureGroup::new(), + group: FuturesUnordered::new(), output, } } @@ -54,7 +54,7 @@ where async fn send(self: Pin<&mut Self>, future: Fut) -> super::ConsumerState { let mut this = self.project(); // unbounded concurrency, so we just goooo - this.group.as_mut().insert_pinned(future); + this.group.as_mut().push(future); ConsumerState::Continue } diff --git a/src/concurrent_stream/try_for_each.rs b/src/concurrent_stream/try_for_each.rs index 8696e36..13e87ff 100644 --- a/src/concurrent_stream/try_for_each.rs +++ b/src/concurrent_stream/try_for_each.rs @@ -1,5 +1,5 @@ use crate::concurrent_stream::ConsumerState; -use crate::future::FutureGroup; +use futures_buffered::FuturesUnordered; use crate::private::Try; use futures_lite::StreamExt; use pin_project::pin_project; @@ -26,7 +26,7 @@ where count: Arc, // TODO: remove the `Pin` from this signature by requiring this struct is pinned #[pin] - group: FutureGroup>, + group: FuturesUnordered>, limit: usize, residual: Option, f: F, @@ -50,7 +50,7 @@ where f, residual: None, count: Arc::new(AtomicUsize::new(0)), - group: FutureGroup::new(), + group: FuturesUnordered::new(), _phantom: PhantomData, } } @@ -93,7 +93,7 @@ where // Space was available! - insert the item for posterity this.count.fetch_add(1, Ordering::Relaxed); let fut = TryForEachFut::new(this.f.clone(), future, this.count.clone()); - this.group.as_mut().insert_pinned(fut); + this.group.as_mut().push(fut); ConsumerState::Continue } diff --git a/src/future/future_group.rs b/src/future/future_group.rs index 4d000c2..7e75934 100644 --- a/src/future/future_group.rs +++ b/src/future/future_group.rs @@ -274,37 +274,6 @@ impl FutureGroup { Key(index) } - /// Insert a value into a pinned `FutureGroup` - /// - /// This method is private because it serves as an implementation detail for - /// `ConcurrentStream`. We should never expose this publicly, as the entire - /// point of this crate is that we abstract the futures poll machinery away - /// from end-users. - pub(crate) fn insert_pinned(self: Pin<&mut Self>, future: F) -> Key - where - F: Future, - { - let mut this = self.project(); - // SAFETY: inserting a value into the futures slab does not ever move - // any of the existing values. - let index = unsafe { this.futures.as_mut().get_unchecked_mut() }.insert(future); - this.keys.insert(index); - let key = Key(index); - - // If our slab allocated more space we need to - // update our tracking structures along with it. - let max_len = this.futures.as_ref().capacity().max(index); - this.wakers.resize(max_len); - this.states.resize(max_len); - - // Set the corresponding state - this.states[index].set_pending(); - let mut readiness = this.wakers.readiness(); - readiness.set_ready(index); - - key - } - /// Create a stream which also yields the key of each item. /// /// # Example From 2be8ce6b7cf3c7c4f8d6da4318e728f205586a67 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Thu, 16 May 2024 21:42:46 +0100 Subject: [PATCH 2/3] fmt --- src/concurrent_stream/from_concurrent_stream.rs | 2 +- src/concurrent_stream/try_for_each.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/concurrent_stream/from_concurrent_stream.rs b/src/concurrent_stream/from_concurrent_stream.rs index ac8c106..ab704e9 100644 --- a/src/concurrent_stream/from_concurrent_stream.rs +++ b/src/concurrent_stream/from_concurrent_stream.rs @@ -1,9 +1,9 @@ use super::{ConcurrentStream, Consumer, ConsumerState, IntoConcurrentStream}; -use futures_buffered::FuturesUnordered; #[cfg(all(feature = "alloc", not(feature = "std")))] use alloc::vec::Vec; use core::future::Future; use core::pin::Pin; +use futures_buffered::FuturesUnordered; use futures_lite::StreamExt; use pin_project::pin_project; diff --git a/src/concurrent_stream/try_for_each.rs b/src/concurrent_stream/try_for_each.rs index 13e87ff..63b135d 100644 --- a/src/concurrent_stream/try_for_each.rs +++ b/src/concurrent_stream/try_for_each.rs @@ -1,6 +1,6 @@ use crate::concurrent_stream::ConsumerState; -use futures_buffered::FuturesUnordered; use crate::private::Try; +use futures_buffered::FuturesUnordered; use futures_lite::StreamExt; use pin_project::pin_project; From 00601b9421226382d486d1674fe6727f03bcf3e3 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Sat, 18 May 2024 10:30:12 +0100 Subject: [PATCH 3/3] revert function deletion --- src/future/future_group.rs | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/src/future/future_group.rs b/src/future/future_group.rs index 7e75934..70528d9 100644 --- a/src/future/future_group.rs +++ b/src/future/future_group.rs @@ -274,6 +274,38 @@ impl FutureGroup { Key(index) } + #[allow(unused)] + /// Insert a value into a pinned `FutureGroup` + /// + /// This method is private because it serves as an implementation detail for + /// `ConcurrentStream`. We should never expose this publicly, as the entire + /// point of this crate is that we abstract the futures poll machinery away + /// from end-users. + pub(crate) fn insert_pinned(self: Pin<&mut Self>, future: F) -> Key + where + F: Future, + { + let mut this = self.project(); + // SAFETY: inserting a value into the futures slab does not ever move + // any of the existing values. + let index = unsafe { this.futures.as_mut().get_unchecked_mut() }.insert(future); + this.keys.insert(index); + let key = Key(index); + + // If our slab allocated more space we need to + // update our tracking structures along with it. + let max_len = this.futures.as_ref().capacity().max(index); + this.wakers.resize(max_len); + this.states.resize(max_len); + + // Set the corresponding state + this.states[index].set_pending(); + let mut readiness = this.wakers.readiness(); + readiness.set_ready(index); + + key + } + /// Create a stream which also yields the key of each item. /// /// # Example