diff --git a/Cargo.toml b/Cargo.toml index 1e5e511..c729377 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,9 +35,10 @@ slab = "0.4.8" smallvec = "1.11.0" [dev-dependencies] +async-std = { version = "1.12.0", features = ["attributes"] } +criterion = { version = "0.3", features = ["async", "async_futures", "html_reports"] } futures = "0.3.25" futures-lite = "1.12.0" -criterion = { version = "0.3", features = ["async", "async_futures", "html_reports"] } -async-std = { version = "1.12.0", features = ["attributes"] } futures-time = "3.0.0" +lending-stream = "1.0.0" rand = "0.8.5" diff --git a/src/future/future_group.rs b/src/future/future_group.rs index 4f5b9a4..2d5d4bc 100644 --- a/src/future/future_group.rs +++ b/src/future/future_group.rs @@ -11,13 +11,10 @@ use crate::utils::{PollState, PollVec, WakerVec}; /// A growable group of futures which act as a single unit. /// -/// In order go mutate the group during iteration, the future group should be -/// combined with a mechanism such as -/// [`lend_mut`](https://docs.rs/async-iterator/latest/async_iterator/trait.Iterator.html#method.lend_mut). -/// This is not yet provided by the `futures-concurrency` crate. -/// /// # Example /// +/// **Basic example** +/// /// ```rust /// use futures_concurrency::future::FutureGroup; /// use futures_lite::StreamExt; @@ -35,6 +32,32 @@ use crate::utils::{PollState, PollVec, WakerVec}; /// assert_eq!(out, 6); /// # }); /// ``` +/// +/// **Update the group on every iteration** +/// +/// ``` +/// use futures_concurrency::future::FutureGroup; +/// use lending_stream::prelude::*; +/// use std::future; +/// +/// # fn main() { futures_lite::future::block_on(async { +/// let mut group = FutureGroup::new(); +/// group.insert(future::ready(4)); +/// +/// let mut index = 3; +/// let mut out = 0; +/// let mut group = group.lend_mut(); +/// while let Some((group, num)) = group.next().await { +/// if index != 0 { +/// group.insert(future::ready(index)); +/// index -= 1; +/// } +/// out += num; +/// } +/// assert_eq!(out, 10); +/// # });} +/// ``` + #[must_use = "`FutureGroup` does nothing if not iterated over"] #[derive(Default)] #[pin_project::pin_project] @@ -214,6 +237,8 @@ impl FutureGroup { // Set the corresponding state self.states[index].set_pending(); + let mut readiness = self.wakers.readiness().lock().unwrap(); + readiness.set_ready(index); key } diff --git a/src/stream/stream_group.rs b/src/stream/stream_group.rs index d014d6b..0f81efd 100644 --- a/src/stream/stream_group.rs +++ b/src/stream/stream_group.rs @@ -11,13 +11,10 @@ use crate::utils::{PollState, PollVec, WakerVec}; /// A growable group of streams which act as a single unit. /// -/// In order go mutate the group during iteration, the stream should be -/// combined with a mechanism such as -/// [`lend_mut`](https://docs.rs/async-iterator/latest/async_iterator/trait.Iterator.html#method.lend_mut). -/// This is not yet provided by the `futures-concurrency` crate. -/// /// # Example /// +/// **Basic example** +/// /// ```rust /// use futures_concurrency::stream::StreamGroup; /// use futures_lite::{stream, StreamExt}; @@ -34,6 +31,31 @@ use crate::utils::{PollState, PollVec, WakerVec}; /// assert_eq!(out, 6); /// # }); /// ``` +/// +/// **Update the group on every iteration** +/// +/// ```rust +/// use futures_concurrency::stream::StreamGroup; +/// use lending_stream::prelude::*; +/// use futures_lite::stream; +/// +/// # futures_lite::future::block_on(async { +/// let mut group = StreamGroup::new(); +/// group.insert(stream::once(4)); + +/// let mut index = 3; +/// let mut out = 0; +/// let mut group = group.lend_mut(); +/// while let Some((group, num)) = group.next().await { +/// if index != 0 { +/// group.insert(stream::once(index)); +/// index -= 1; +/// } +/// out += num; +/// } +/// assert_eq!(out, 10); +/// # }); +/// ``` #[must_use = "`StreamGroup` does nothing if not iterated over"] #[derive(Default)] #[pin_project::pin_project] @@ -214,6 +236,8 @@ impl StreamGroup { // Set the corresponding state self.states[index].set_pending(); + let mut readiness = self.wakers.readiness().lock().unwrap(); + readiness.set_ready(index); key }