diff --git a/Cargo.toml b/Cargo.toml index a141570..e1a9ced 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ futures-lite = "1.12.0" pin-project = "1.0.8" slab = { version = "0.4.8", optional = true } smallvec = { version = "1.11.0", optional = true } +futures-buffered = "0.2.6" [dev-dependencies] async-io = "2.3.2" diff --git a/src/concurrent_stream/for_each.rs b/src/concurrent_stream/for_each.rs index 07564a2..ea24aeb 100644 --- a/src/concurrent_stream/for_each.rs +++ b/src/concurrent_stream/for_each.rs @@ -1,5 +1,5 @@ use super::{Consumer, ConsumerState}; -use crate::future::FutureGroup; +use futures_buffered::FuturesUnordered; use futures_lite::StreamExt; use pin_project::pin_project; @@ -22,7 +22,7 @@ where // NOTE: we can remove the `Arc` here if we're willing to make this struct self-referential count: Arc, #[pin] - group: FutureGroup>, + group: FuturesUnordered>, limit: usize, f: F, _phantom: PhantomData<(T, FutB)>, @@ -44,7 +44,7 @@ where f, _phantom: PhantomData, count: Arc::new(AtomicUsize::new(0)), - group: FutureGroup::new(), + group: FuturesUnordered::new(), } } } @@ -69,7 +69,7 @@ where // Space was available! - insert the item for posterity this.count.fetch_add(1, Ordering::Relaxed); let fut = ForEachFut::new(this.f.clone(), future, this.count.clone()); - this.group.as_mut().insert_pinned(fut); + this.group.as_mut().push(fut); ConsumerState::Continue } diff --git a/src/concurrent_stream/from_concurrent_stream.rs b/src/concurrent_stream/from_concurrent_stream.rs index a84e2f0..ab704e9 100644 --- a/src/concurrent_stream/from_concurrent_stream.rs +++ b/src/concurrent_stream/from_concurrent_stream.rs @@ -1,9 +1,9 @@ use super::{ConcurrentStream, Consumer, ConsumerState, IntoConcurrentStream}; -use crate::future::FutureGroup; #[cfg(all(feature = "alloc", not(feature = "std")))] use alloc::vec::Vec; use core::future::Future; use core::pin::Pin; +use futures_buffered::FuturesUnordered; use futures_lite::StreamExt; use pin_project::pin_project; @@ -32,14 +32,14 @@ impl FromConcurrentStream for Vec { #[pin_project] pub(crate) struct VecConsumer<'a, Fut: Future> { #[pin] - group: FutureGroup, + group: FuturesUnordered, output: &'a mut Vec, } impl<'a, Fut: Future> VecConsumer<'a, Fut> { pub(crate) fn new(output: &'a mut Vec) -> Self { Self { - group: FutureGroup::new(), + group: FuturesUnordered::new(), output, } } @@ -54,7 +54,7 @@ where async fn send(self: Pin<&mut Self>, future: Fut) -> super::ConsumerState { let mut this = self.project(); // unbounded concurrency, so we just goooo - this.group.as_mut().insert_pinned(future); + this.group.as_mut().push(future); ConsumerState::Continue } diff --git a/src/concurrent_stream/try_for_each.rs b/src/concurrent_stream/try_for_each.rs index 8696e36..63b135d 100644 --- a/src/concurrent_stream/try_for_each.rs +++ b/src/concurrent_stream/try_for_each.rs @@ -1,6 +1,6 @@ use crate::concurrent_stream::ConsumerState; -use crate::future::FutureGroup; use crate::private::Try; +use futures_buffered::FuturesUnordered; use futures_lite::StreamExt; use pin_project::pin_project; @@ -26,7 +26,7 @@ where count: Arc, // TODO: remove the `Pin` from this signature by requiring this struct is pinned #[pin] - group: FutureGroup>, + group: FuturesUnordered>, limit: usize, residual: Option, f: F, @@ -50,7 +50,7 @@ where f, residual: None, count: Arc::new(AtomicUsize::new(0)), - group: FutureGroup::new(), + group: FuturesUnordered::new(), _phantom: PhantomData, } } @@ -93,7 +93,7 @@ where // Space was available! - insert the item for posterity this.count.fetch_add(1, Ordering::Relaxed); let fut = TryForEachFut::new(this.f.clone(), future, this.count.clone()); - this.group.as_mut().insert_pinned(fut); + this.group.as_mut().push(fut); ConsumerState::Continue } diff --git a/src/future/future_group.rs b/src/future/future_group.rs index 4d000c2..70528d9 100644 --- a/src/future/future_group.rs +++ b/src/future/future_group.rs @@ -274,6 +274,7 @@ impl FutureGroup { Key(index) } + #[allow(unused)] /// Insert a value into a pinned `FutureGroup` /// /// This method is private because it serves as an implementation detail for