From c71e9762835339037c1e9113b84643ce8d5e9da2 Mon Sep 17 00:00:00 2001 From: Yosh Date: Sun, 13 Aug 2023 02:00:17 +0200 Subject: [PATCH 1/5] Add `FutureGroup` --- src/future/future_group.rs | 408 +++++++++++++++++++++++++++++++++++++ src/future/mod.rs | 2 + 2 files changed, 410 insertions(+) create mode 100644 src/future/future_group.rs diff --git a/src/future/future_group.rs b/src/future/future_group.rs new file mode 100644 index 0000000..9ee305e --- /dev/null +++ b/src/future/future_group.rs @@ -0,0 +1,408 @@ +use futures_core::stream::Stream; +use futures_core::Future; +use slab::Slab; +use smallvec::{smallvec, SmallVec}; +use std::collections::BTreeSet; +use std::fmt::{self, Debug}; +use std::ops::{Deref, DerefMut}; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use crate::utils::{PollState, PollVec, WakerVec}; + +/// A growable group of futures which act as a single unit. +/// +/// In order go mutate the group during iteration, the future group should be +/// combined with a mechanism such as +/// [`lend_mut`](https://docs.rs/async-iterator/latest/async_iterator/trait.Iterator.html#method.lend_mut). +/// This is not yet provided by the `futures-concurrency` crate. +/// +/// # Example +/// +/// ```rust +/// use futures_concurrency::future::FutureGroup; +/// use futures_lite::StreamExt; +/// use std::future; +/// +/// # futures_lite::future::block_on(async { +/// let mut group = FutureGroup::new(); +/// group.insert(future::ready(2)); +/// group.insert(future::ready(4)); +/// +/// let mut out = 0; +/// while let Some(num) = group.next().await { +/// out += num; +/// } +/// assert_eq!(out, 6); +/// # }); +/// ``` +#[must_use = "`FutureGroup` does nothing if not iterated over"] +#[derive(Default)] +#[pin_project::pin_project] +pub struct FutureGroup { + #[pin] + futures: Slab, + wakers: WakerVec, + states: PollVec, + keys: BTreeSet, + key_removal_queue: SmallVec<[usize; 10]>, +} + +impl Debug for FutureGroup { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("FutureGroup") + .field("slab", &"[..]") + .finish() + } +} + +impl FutureGroup { + /// Create a new instance of `FutureGroup`. + /// + /// # Example + /// + /// ```rust + /// use futures_concurrency::future::FutureGroup; + /// + /// let group = FutureGroup::new(); + /// # let group: FutureGroup = group; + /// ``` + pub fn new() -> Self { + Self::with_capacity(0) + } + + /// Create a new instance of `FutureGroup` with a given capacity. + /// + /// # Example + /// + /// ```rust + /// use futures_concurrency::future::FutureGroup; + /// + /// let group = FutureGroup::with_capacity(2); + /// # let group: FutureGroup = group; + /// ``` + pub fn with_capacity(capacity: usize) -> Self { + Self { + futures: Slab::with_capacity(capacity), + wakers: WakerVec::new(capacity), + states: PollVec::new(capacity), + keys: BTreeSet::new(), + key_removal_queue: smallvec![], + } + } + + /// Return the number of futures currently active in the group. + /// + /// # Example + /// + /// ```rust + /// use futures_concurrency::future::FutureGroup; + /// use futures_lite::StreamExt; + /// use std::future; + /// + /// let mut group = FutureGroup::with_capacity(2); + /// assert_eq!(group.len(), 0); + /// group.insert(future::ready(12)); + /// assert_eq!(group.len(), 1); + /// ``` + pub fn len(&self) -> usize { + self.futures.len() + } + + /// Return the capacity of the `FutureGroup`. + /// + /// # Example + /// + /// ```rust + /// use futures_concurrency::future::FutureGroup; + /// use futures_lite::stream; + /// + /// let group = FutureGroup::with_capacity(2); + /// assert_eq!(group.capacity(), 2); + /// # let group: FutureGroup = group; + /// ``` + pub fn capacity(&self) -> usize { + self.futures.capacity() + } + + /// Returns true if there are no futures currently active in the group. + /// + /// # Example + /// + /// ```rust + /// use futures_concurrency::future::FutureGroup; + /// use std::future; + /// + /// let mut group = FutureGroup::with_capacity(2); + /// assert!(group.is_empty()); + /// group.insert(future::ready(12)); + /// assert!(!group.is_empty()); + /// ``` + pub fn is_empty(&self) -> bool { + self.futures.is_empty() + } + + /// Insert a new future into the group. + /// + /// # Example + /// + /// ```rust + /// use futures_concurrency::future::FutureGroup; + /// use std::future; + /// + /// let mut group = FutureGroup::with_capacity(2); + /// group.insert(future::ready(12)); + /// ``` + pub fn insert(&mut self, stream: F) -> Key + where + F: Future, + { + let index = self.futures.insert(stream); + self.keys.insert(index); + let key = Key(index); + + // If our slab allocated more space we need to + // update our tracking structures along with it. + let max_len = self.capacity().max(index); + self.wakers.resize(max_len); + self.states.resize(max_len); + + // Set the corresponding state + self.states[index].set_pending(); + + key + } + + /// Removes a stream from the group. Returns whether the value was present in + /// the group. + /// + /// # Example + /// + /// ``` + /// use futures_concurrency::future::FutureGroup; + /// use std::future; + /// + /// # futures_lite::future::block_on(async { + /// let mut group = FutureGroup::new(); + /// let key = group.insert(future::ready(4)); + /// assert_eq!(group.len(), 1); + /// group.remove(key); + /// assert_eq!(group.len(), 0); + /// # }) + /// ``` + pub fn remove(&mut self, key: Key) -> bool { + let is_present = self.keys.remove(&key.0); + if is_present { + self.states[key.0].set_none(); + self.futures.remove(key.0); + } + is_present + } + + /// Returns `true` if the `FutureGroup` contains a value for the specified key. + /// + /// # Example + /// + /// ``` + /// use futures_concurrency::future::FutureGroup; + /// use std::future; + /// + /// # futures_lite::future::block_on(async { + /// let mut group = FutureGroup::new(); + /// let key = group.insert(future::ready(4)); + /// assert!(group.contains_key(key)); + /// group.remove(key); + /// assert!(!group.contains_key(key)); + /// # }) + /// ``` + pub fn contains_key(&mut self, key: Key) -> bool { + self.keys.contains(&key.0) + } +} + +impl FutureGroup { + /// Create a stream which also yields the key of each item. + /// + /// # Example + /// + /// ```rust + /// use futures_concurrency::future::FutureGroup; + /// use futures_lite::StreamExt; + /// use std::future; + /// + /// # futures_lite::future::block_on(async { + /// let mut group = FutureGroup::new(); + /// group.insert(future::ready(2)); + /// group.insert(future::ready(4)); + /// + /// let mut out = 0; + /// let mut group = group.keyed(); + /// while let Some((_key, num)) = group.next().await { + /// out += num; + /// } + /// assert_eq!(out, 6); + /// # }); + /// ``` + pub fn keyed(self) -> Keyed { + Keyed { group: self } + } +} + +impl FutureGroup { + fn poll_next_inner( + self: Pin<&mut Self>, + cx: &std::task::Context<'_>, + ) -> Poll::Output)>> { + let mut this = self.project(); + + // Short-circuit if we have no futures to iterate over + if this.futures.is_empty() { + return Poll::Ready(None); + } + + // Set the top-level waker and check readiness + let mut readiness = this.wakers.readiness().lock().unwrap(); + readiness.set_waker(cx.waker()); + if !readiness.any_ready() { + // Nothing is ready yet + return Poll::Pending; + } + + // Setup our futures state + let mut ret = Poll::Pending; + let states = this.states; + + // SAFETY: We unpin the future group so we can later individually access + // single futures. Either to read from them or to drop them. + let futures = unsafe { this.futures.as_mut().get_unchecked_mut() }; + + for index in this.keys.iter().cloned() { + if states[index].is_pending() && readiness.clear_ready(index) { + // unlock readiness so we don't deadlock when polling + drop(readiness); + + // Obtain the intermediate waker. + let mut cx = Context::from_waker(this.wakers.get(index).unwrap()); + + // SAFETY: this future here is a projection from the futures + // vec, which we're reading from. + let future = unsafe { Pin::new_unchecked(&mut futures[index]) }; + match future.poll(&mut cx) { + Poll::Ready(item) => { + // Set the return type for the function + ret = Poll::Ready(Some((Key(index), item))); + + // Remove all associated data with the future + // The only data we can't remove directly is the key entry. + states[index] = PollState::None; + futures.remove(index); + + break; + } + // Keep looping if there is nothing for us to do + Poll::Pending => {} + }; + + // Lock readiness so we can use it again + readiness = this.wakers.readiness().lock().unwrap(); + } + } + + // Now that we're no longer borrowing `this.keys` we can remove + // the current key from the set + if let Poll::Ready(Some((key, _))) = ret { + this.keys.remove(&key.0); + } + + ret + } +} + +impl Stream for FutureGroup { + type Item = ::Output; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + match self.poll_next_inner(cx) { + Poll::Ready(Some((_key, item))) => Poll::Ready(Some(item)), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } +} + +impl FromIterator for FutureGroup { + fn from_iter>(iter: T) -> Self { + let iter = iter.into_iter(); + let len = iter.size_hint().1.unwrap_or_default(); + let mut this = Self::with_capacity(len); + for future in iter { + this.insert(future); + } + this + } +} + +/// A key used to index into the `FutureGroup` type. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct Key(usize); + +/// Iterate over items in the futures group with their associated keys. +#[derive(Debug)] +#[pin_project::pin_project] +pub struct Keyed { + #[pin] + group: FutureGroup, +} + +impl Deref for Keyed { + type Target = FutureGroup; + + fn deref(&self) -> &Self::Target { + &self.group + } +} + +impl DerefMut for Keyed { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.group + } +} + +impl Stream for Keyed { + type Item = (Key, ::Output); + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + let mut this = self.project(); + this.group.as_mut().poll_next_inner(cx) + } +} + +#[cfg(test)] +mod test { + use super::FutureGroup; + use futures_lite::prelude::*; + use std::future; + + #[test] + fn smoke() { + futures_lite::future::block_on(async { + let mut group = FutureGroup::new(); + group.insert(future::ready(2)); + group.insert(future::ready(4)); + + let mut out = 0; + while let Some(num) = group.next().await { + out += num; + } + assert_eq!(out, 6); + assert_eq!(group.len(), 0); + assert!(group.is_empty()); + }); + } +} diff --git a/src/future/mod.rs b/src/future/mod.rs index b41c17f..e37f14f 100644 --- a/src/future/mod.rs +++ b/src/future/mod.rs @@ -68,12 +68,14 @@ //! - `future::RaceOk`: wait for the first _successful_ future in the set to //! complete, or return an `Err` if *no* futures complete successfully. //! +pub use future_group::FutureGroup; pub use futures_ext::FutureExt; pub use join::Join; pub use race::Race; pub use race_ok::RaceOk; pub use try_join::TryJoin; +mod future_group; mod futures_ext; pub(crate) mod join; pub(crate) mod race; From e4bb66039708901368e05c29631169e301f11725 Mon Sep 17 00:00:00 2001 From: Yosh Date: Sun, 13 Aug 2023 02:06:32 +0200 Subject: [PATCH 2/5] remove unneeded `key_removal_queue` --- src/future/future_group.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/future/future_group.rs b/src/future/future_group.rs index 9ee305e..84239ca 100644 --- a/src/future/future_group.rs +++ b/src/future/future_group.rs @@ -1,7 +1,6 @@ use futures_core::stream::Stream; use futures_core::Future; use slab::Slab; -use smallvec::{smallvec, SmallVec}; use std::collections::BTreeSet; use std::fmt::{self, Debug}; use std::ops::{Deref, DerefMut}; @@ -45,7 +44,6 @@ pub struct FutureGroup { wakers: WakerVec, states: PollVec, keys: BTreeSet, - key_removal_queue: SmallVec<[usize; 10]>, } impl Debug for FutureGroup { @@ -87,7 +85,6 @@ impl FutureGroup { wakers: WakerVec::new(capacity), states: PollVec::new(capacity), keys: BTreeSet::new(), - key_removal_queue: smallvec![], } } From 19f5c23924adcbe2d738cae1a275dd7c3c87866a Mon Sep 17 00:00:00 2001 From: Yosh Date: Sun, 13 Aug 2023 02:09:53 +0200 Subject: [PATCH 3/5] Add benchmarks --- benches/bench.rs | 53 +++++++++++++++++++++++++++++- benches/utils/countdown_futures.rs | 19 +++++++++++ 2 files changed, 71 insertions(+), 1 deletion(-) diff --git a/benches/bench.rs b/benches/bench.rs index d2ab343..da2f970 100644 --- a/benches/bench.rs +++ b/benches/bench.rs @@ -11,7 +11,8 @@ fn main() { merge::merge_benches, join::join_benches, race::race_benches, - stream_group::stream_group_benches + stream_group::stream_group_benches, + future_group::future_group_benches, ); main() } @@ -66,6 +67,56 @@ mod stream_group { group.finish(); } } +mod future_group { + use criterion::async_executor::FuturesExecutor; + use criterion::{black_box, criterion_group, BatchSize, BenchmarkId, Criterion}; + use futures::stream::FuturesUnordered; + use futures_concurrency::future::FutureGroup; + use futures_lite::prelude::*; + + use crate::utils::{make_future_group, make_futures_unordered}; + criterion_group! { + name = future_group_benches; + // This can be any expression that returns a `Criterion` object. + config = Criterion::default(); + targets = future_group_bench + } + + fn future_group_bench(c: &mut Criterion) { + let mut group = c.benchmark_group("future_group"); + for i in [10, 100, 1000].iter() { + group.bench_with_input(BenchmarkId::new("FutureGroup", i), i, |b, i| { + let setup = || make_future_group(*i); + let routine = |mut group: FutureGroup<_>| async move { + let mut counter = 0; + black_box({ + while group.next().await.is_some() { + counter += 1; + } + assert_eq!(counter, *i); + }); + }; + b.to_async(FuturesExecutor) + .iter_batched(setup, routine, BatchSize::SmallInput) + }); + group.bench_with_input(BenchmarkId::new("FuturesUnordered", i), i, |b, i| { + let setup = || make_futures_unordered(*i); + let routine = |mut group: FuturesUnordered<_>| async move { + let mut counter = 0; + black_box({ + while group.next().await.is_some() { + counter += 1; + } + assert_eq!(counter, *i); + }); + }; + b.to_async(FuturesExecutor) + .iter_batched(setup, routine, BatchSize::SmallInput) + }); + } + group.finish(); + } +} mod merge { use criterion::async_executor::FuturesExecutor; diff --git a/benches/utils/countdown_futures.rs b/benches/utils/countdown_futures.rs index fcaca2f..c00093b 100644 --- a/benches/utils/countdown_futures.rs +++ b/benches/utils/countdown_futures.rs @@ -1,3 +1,4 @@ +use futures_concurrency::future::FutureGroup; use futures_core::Future; use std::cell::{Cell, RefCell}; @@ -28,6 +29,24 @@ pub fn futures_array() -> [CountdownFuture; N] { futures } +#[allow(unused)] +pub fn make_future_group(len: usize) -> FutureGroup { + let wakers = Rc::new(RefCell::new(BinaryHeap::new())); + let completed = Rc::new(Cell::new(0)); + (0..len) + .map(|n| CountdownFuture::new(n, len, wakers.clone(), completed.clone())) + .collect() +} + +#[allow(unused)] +pub fn make_futures_unordered(len: usize) -> futures::stream::FuturesUnordered { + let wakers = Rc::new(RefCell::new(BinaryHeap::new())); + let completed = Rc::new(Cell::new(0)); + (0..len) + .map(|n| CountdownFuture::new(n, len, wakers.clone(), completed.clone())) + .collect() +} + #[allow(unused)] pub fn futures_tuple() -> ( CountdownFuture, From b90a83e0db949171db4bd4394f36f884eebd2c97 Mon Sep 17 00:00:00 2001 From: Yosh Date: Sun, 13 Aug 2023 02:21:08 +0200 Subject: [PATCH 4/5] fix item export for `FutureGroup` --- src/future/mod.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/future/mod.rs b/src/future/mod.rs index e37f14f..f181ad1 100644 --- a/src/future/mod.rs +++ b/src/future/mod.rs @@ -68,6 +68,7 @@ //! - `future::RaceOk`: wait for the first _successful_ future in the set to //! complete, or return an `Err` if *no* futures complete successfully. //! +#[doc(inline)] pub use future_group::FutureGroup; pub use futures_ext::FutureExt; pub use join::Join; @@ -75,7 +76,9 @@ pub use race::Race; pub use race_ok::RaceOk; pub use try_join::TryJoin; -mod future_group; +/// A growable group of futures which act as a single unit. +pub mod future_group; + mod futures_ext; pub(crate) mod join; pub(crate) mod race; From 74b26114dabf441bd149b1b66517756b652875f2 Mon Sep 17 00:00:00 2001 From: Yosh Date: Sun, 13 Aug 2023 02:21:32 +0200 Subject: [PATCH 5/5] set bounds on `{Future,Stream}Group::insert` --- src/future/future_group.rs | 62 +++++++++++++++++++------------------- src/stream/stream_group.rs | 62 +++++++++++++++++++------------------- 2 files changed, 62 insertions(+), 62 deletions(-) diff --git a/src/future/future_group.rs b/src/future/future_group.rs index 84239ca..4f5b9a4 100644 --- a/src/future/future_group.rs +++ b/src/future/future_group.rs @@ -139,37 +139,6 @@ impl FutureGroup { self.futures.is_empty() } - /// Insert a new future into the group. - /// - /// # Example - /// - /// ```rust - /// use futures_concurrency::future::FutureGroup; - /// use std::future; - /// - /// let mut group = FutureGroup::with_capacity(2); - /// group.insert(future::ready(12)); - /// ``` - pub fn insert(&mut self, stream: F) -> Key - where - F: Future, - { - let index = self.futures.insert(stream); - self.keys.insert(index); - let key = Key(index); - - // If our slab allocated more space we need to - // update our tracking structures along with it. - let max_len = self.capacity().max(index); - self.wakers.resize(max_len); - self.states.resize(max_len); - - // Set the corresponding state - self.states[index].set_pending(); - - key - } - /// Removes a stream from the group. Returns whether the value was present in /// the group. /// @@ -218,6 +187,37 @@ impl FutureGroup { } impl FutureGroup { + /// Insert a new future into the group. + /// + /// # Example + /// + /// ```rust + /// use futures_concurrency::future::FutureGroup; + /// use std::future; + /// + /// let mut group = FutureGroup::with_capacity(2); + /// group.insert(future::ready(12)); + /// ``` + pub fn insert(&mut self, stream: F) -> Key + where + F: Future, + { + let index = self.futures.insert(stream); + self.keys.insert(index); + let key = Key(index); + + // If our slab allocated more space we need to + // update our tracking structures along with it. + let max_len = self.capacity().max(index); + self.wakers.resize(max_len); + self.states.resize(max_len); + + // Set the corresponding state + self.states[index].set_pending(); + + key + } + /// Create a stream which also yields the key of each item. /// /// # Example diff --git a/src/stream/stream_group.rs b/src/stream/stream_group.rs index a45cdf7..d014d6b 100644 --- a/src/stream/stream_group.rs +++ b/src/stream/stream_group.rs @@ -139,37 +139,6 @@ impl StreamGroup { self.streams.is_empty() } - /// Insert a new future into the group. - /// - /// # Example - /// - /// ```rust - /// use futures_concurrency::stream::StreamGroup; - /// use futures_lite::stream; - /// - /// let mut group = StreamGroup::with_capacity(2); - /// group.insert(stream::once(12)); - /// ``` - pub fn insert(&mut self, stream: S) -> Key - where - S: Stream, - { - let index = self.streams.insert(stream); - self.keys.insert(index); - let key = Key(index); - - // If our slab allocated more space we need to - // update our tracking structures along with it. - let max_len = self.capacity().max(index); - self.wakers.resize(max_len); - self.states.resize(max_len); - - // Set the corresponding state - self.states[index].set_pending(); - - key - } - /// Removes a stream from the group. Returns whether the value was present in /// the group. /// @@ -218,6 +187,37 @@ impl StreamGroup { } impl StreamGroup { + /// Insert a new future into the group. + /// + /// # Example + /// + /// ```rust + /// use futures_concurrency::stream::StreamGroup; + /// use futures_lite::stream; + /// + /// let mut group = StreamGroup::with_capacity(2); + /// group.insert(stream::once(12)); + /// ``` + pub fn insert(&mut self, stream: S) -> Key + where + S: Stream, + { + let index = self.streams.insert(stream); + self.keys.insert(index); + let key = Key(index); + + // If our slab allocated more space we need to + // update our tracking structures along with it. + let max_len = self.capacity().max(index); + self.wakers.resize(max_len); + self.states.resize(max_len); + + // Set the corresponding state + self.states[index].set_pending(); + + key + } + /// Create a stream which also yields the key of each item. /// /// # Example