From f7645c817484227100bd68e3532003cafa86a85d Mon Sep 17 00:00:00 2001 From: Yosh Date: Mon, 14 Aug 2023 13:22:08 +0200 Subject: [PATCH 1/3] fix readiness bug in Group types --- src/future/future_group.rs | 2 ++ src/stream/stream_group.rs | 27 +++++++++++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/src/future/future_group.rs b/src/future/future_group.rs index 4f5b9a4..232464f 100644 --- a/src/future/future_group.rs +++ b/src/future/future_group.rs @@ -214,6 +214,8 @@ impl FutureGroup { // Set the corresponding state self.states[index].set_pending(); + let mut readiness = self.wakers.readiness().lock().unwrap(); + readiness.set_ready(index); key } diff --git a/src/stream/stream_group.rs b/src/stream/stream_group.rs index d014d6b..a6f6f9d 100644 --- a/src/stream/stream_group.rs +++ b/src/stream/stream_group.rs @@ -34,6 +34,31 @@ use crate::utils::{PollState, PollVec, WakerVec}; /// assert_eq!(out, 6); /// # }); /// ``` +/// +/// **Update the group on every iteration** +/// +/// ```rust +/// use futures_concurrency::stream::StreamGroup; +/// use lending_stream::prelude::*; +/// use futures_lite::stream; +/// +/// # futures_lite::future::block_on(async { +/// let mut group = StreamGroup::new(); +/// group.insert(stream::once(4)); + +/// let mut index = 3; +/// let mut out = 0; +/// let mut group = group.lend_mut(); +/// while let Some((group, num)) = group.next().await { +/// if index != 0 { +/// group.insert(stream::once(index)); +/// index -= 1; +/// } +/// out += num; +/// } +/// assert_eq!(out, 10); +/// # }); +/// ``` #[must_use = "`StreamGroup` does nothing if not iterated over"] #[derive(Default)] #[pin_project::pin_project] @@ -214,6 +239,8 @@ impl StreamGroup { // Set the corresponding state self.states[index].set_pending(); + let mut readiness = self.wakers.readiness().lock().unwrap(); + readiness.set_ready(index); key } From 6754ba0f463fd6199ece7e5f3c60072bd3181e0f Mon Sep 17 00:00:00 2001 From: Yosh Date: Mon, 14 Aug 2023 13:22:29 +0200 Subject: [PATCH 2/3] Add lending examples to group types --- Cargo.toml | 1 + src/future/future_group.rs | 33 ++++++++++++++++++++++++++++----- src/stream/stream_group.rs | 7 ++----- 3 files changed, 31 insertions(+), 10 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1e5e511..9d40352 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,4 +40,5 @@ futures-lite = "1.12.0" criterion = { version = "0.3", features = ["async", "async_futures", "html_reports"] } async-std = { version = "1.12.0", features = ["attributes"] } futures-time = "3.0.0" +lending-stream = "1.0.0" rand = "0.8.5" diff --git a/src/future/future_group.rs b/src/future/future_group.rs index 232464f..2d5d4bc 100644 --- a/src/future/future_group.rs +++ b/src/future/future_group.rs @@ -11,13 +11,10 @@ use crate::utils::{PollState, PollVec, WakerVec}; /// A growable group of futures which act as a single unit. /// -/// In order go mutate the group during iteration, the future group should be -/// combined with a mechanism such as -/// [`lend_mut`](https://docs.rs/async-iterator/latest/async_iterator/trait.Iterator.html#method.lend_mut). -/// This is not yet provided by the `futures-concurrency` crate. -/// /// # Example /// +/// **Basic example** +/// /// ```rust /// use futures_concurrency::future::FutureGroup; /// use futures_lite::StreamExt; @@ -35,6 +32,32 @@ use crate::utils::{PollState, PollVec, WakerVec}; /// assert_eq!(out, 6); /// # }); /// ``` +/// +/// **Update the group on every iteration** +/// +/// ``` +/// use futures_concurrency::future::FutureGroup; +/// use lending_stream::prelude::*; +/// use std::future; +/// +/// # fn main() { futures_lite::future::block_on(async { +/// let mut group = FutureGroup::new(); +/// group.insert(future::ready(4)); +/// +/// let mut index = 3; +/// let mut out = 0; +/// let mut group = group.lend_mut(); +/// while let Some((group, num)) = group.next().await { +/// if index != 0 { +/// group.insert(future::ready(index)); +/// index -= 1; +/// } +/// out += num; +/// } +/// assert_eq!(out, 10); +/// # });} +/// ``` + #[must_use = "`FutureGroup` does nothing if not iterated over"] #[derive(Default)] #[pin_project::pin_project] diff --git a/src/stream/stream_group.rs b/src/stream/stream_group.rs index a6f6f9d..0f81efd 100644 --- a/src/stream/stream_group.rs +++ b/src/stream/stream_group.rs @@ -11,13 +11,10 @@ use crate::utils::{PollState, PollVec, WakerVec}; /// A growable group of streams which act as a single unit. /// -/// In order go mutate the group during iteration, the stream should be -/// combined with a mechanism such as -/// [`lend_mut`](https://docs.rs/async-iterator/latest/async_iterator/trait.Iterator.html#method.lend_mut). -/// This is not yet provided by the `futures-concurrency` crate. -/// /// # Example /// +/// **Basic example** +/// /// ```rust /// use futures_concurrency::stream::StreamGroup; /// use futures_lite::{stream, StreamExt}; From 96f10884230b3c171fdbe8662afd975aa5c182e7 Mon Sep 17 00:00:00 2001 From: Yosh Date: Mon, 14 Aug 2023 13:22:45 +0200 Subject: [PATCH 3/3] alphabetize deps in cargo.toml --- Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9d40352..c729377 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,10 +35,10 @@ slab = "0.4.8" smallvec = "1.11.0" [dev-dependencies] +async-std = { version = "1.12.0", features = ["attributes"] } +criterion = { version = "0.3", features = ["async", "async_futures", "html_reports"] } futures = "0.3.25" futures-lite = "1.12.0" -criterion = { version = "0.3", features = ["async", "async_futures", "html_reports"] } -async-std = { version = "1.12.0", features = ["attributes"] } futures-time = "3.0.0" lending-stream = "1.0.0" rand = "0.8.5"