From 1117fac022f01f800f71038aace2ba2c5540919a Mon Sep 17 00:00:00 2001 From: Matheus Consoli Date: Fri, 22 Mar 2024 16:01:15 -0300 Subject: [PATCH 01/12] Isolate GroupInner from FutureGroup --- src/collections/group_inner.rs | 144 +++++++++++++++++++++++++++ src/collections/mod.rs | 1 + src/future/future_group.rs | 173 +++++++++------------------------ src/utils/poll_state/vec.rs | 2 +- 4 files changed, 191 insertions(+), 129 deletions(-) create mode 100644 src/collections/group_inner.rs diff --git a/src/collections/group_inner.rs b/src/collections/group_inner.rs new file mode 100644 index 0000000..99d18e2 --- /dev/null +++ b/src/collections/group_inner.rs @@ -0,0 +1,144 @@ +use core::{pin::Pin, task::Waker}; + +use alloc::{collections::BTreeSet, fmt}; +use slab::Slab; + +use crate::utils::{PollVec, WakerVec}; + +const GROUP_GROWTH_FACTOR: usize = 2; + +#[pin_project::pin_project] +pub struct GroupInner { + #[pin] + pub items: Slab, + pub wakers: WakerVec, + pub states: PollVec, + pub keys: BTreeSet, + cap: usize, + len: usize, +} + +impl GroupInner { + pub fn with_capacity(cap: usize) -> Self { + Self { + items: Slab::with_capacity(cap), + wakers: WakerVec::new(cap), + states: PollVec::new(cap), + keys: BTreeSet::new(), + cap, + len: 0, + } + } + + pub fn len(&self) -> usize { + self.len + } + + pub fn capacity(&self) -> usize { + self.cap + } + + pub fn has_capacity(&self) -> bool { + self.len < self.cap + } + + pub fn is_empty(&self) -> bool { + self.len == 0 + } + + pub fn insert(&mut self, item: A) -> Key { + if !self.has_capacity() { + self.resize(self.cap + 1 * GROUP_GROWTH_FACTOR); + } + + let index = self.items.insert(item); + self.keys.insert(index); + + // set the corresponding state + self.states[index].set_pending(); + self.wakers.readiness().set_ready(index); + + self.len += 1; + Key(index) + } + + pub fn insert_pinned(mut self: Pin<&mut Self>, item: A) -> Key { + if !self.has_capacity() { + let r = unsafe { &mut self.as_mut().get_unchecked_mut() }; + r.resize(r.cap + 1 * GROUP_GROWTH_FACTOR); + } + + let mut this = self.project(); + let items = unsafe { &mut this.items.as_mut().get_unchecked_mut() }; + let index = items.insert(item); + this.keys.insert(index); + + // set the corresponding state + this.states[index].set_pending(); + this.wakers.readiness().set_ready(index); + + *this.len += 1; + Key(index) + } + + pub fn remove(&mut self, key: Key) -> Option { + let is_present = self.keys.remove(&key.0); + if !is_present { + return None; + } + self.states[key.0].set_none(); + let item = self.items.remove(key.0); + self.len -= 1; + Some(item) + } + + // todo: rename to reserve + pub fn resize(&mut self, cap: usize) { + if self.len + cap < self.cap { + return; + } + self.wakers.resize(cap); + self.states.resize(cap); + self.items.reserve_exact(cap); + self.cap = cap; + } + + pub fn any_ready(&self) -> bool { + self.wakers.readiness().any_ready() + } + + pub fn set_top_waker(&mut self, waker: &Waker) { + self.wakers.readiness().set_waker(waker); + } + + pub fn can_progress_index(&self, index: usize) -> bool { + self.states[index].is_pending() && self.wakers.readiness().clear_ready(index) + } +} + +/// Keyed operations +impl GroupInner { + // move to other impl block + pub fn contains_key(&self, key: Key) -> bool { + self.items.contains(key.0) + } +} + +impl Default for GroupInner { + fn default() -> Self { + Self::with_capacity(0) + } +} + +impl fmt::Debug for GroupInner { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("GroupInner") + .field("cap", &self.cap) + .field("len", &self.len) + .finish() + } +} + +/// A key used to index into the `FutureGroup` type. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct Key(pub usize); diff --git a/src/collections/mod.rs b/src/collections/mod.rs index 33d9377..a4f46f8 100644 --- a/src/collections/mod.rs +++ b/src/collections/mod.rs @@ -1,2 +1,3 @@ +pub(crate) mod group_inner; #[cfg(feature = "alloc")] pub mod vec; diff --git a/src/future/future_group.rs b/src/future/future_group.rs index e327f2b..7a96017 100644 --- a/src/future/future_group.rs +++ b/src/future/future_group.rs @@ -1,13 +1,10 @@ -use alloc::collections::BTreeSet; -use core::fmt::{self, Debug}; use core::ops::{Deref, DerefMut}; use core::pin::Pin; use core::task::{Context, Poll}; use futures_core::stream::Stream; use futures_core::Future; -use slab::Slab; -use crate::utils::{PollState, PollVec, WakerVec}; +use crate::collections::group_inner::{GroupInner, Key}; /// A growable group of futures which act as a single unit. /// @@ -57,26 +54,12 @@ use crate::utils::{PollState, PollVec, WakerVec}; /// assert_eq!(out, 10); /// # });} /// ``` - #[must_use = "`FutureGroup` does nothing if not iterated over"] -#[derive(Default)] +#[derive(Default, Debug)] #[pin_project::pin_project] pub struct FutureGroup { #[pin] - futures: Slab, - wakers: WakerVec, - states: PollVec, - keys: BTreeSet, -} - -impl Debug for FutureGroup { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("FutureGroup") - .field("slab", &"[..]") - .field("len", &self.futures.len()) - .field("capacity", &self.futures.capacity()) - .finish() - } + inner: GroupInner, } impl FutureGroup { @@ -106,10 +89,7 @@ impl FutureGroup { /// ``` pub fn with_capacity(capacity: usize) -> Self { Self { - futures: Slab::with_capacity(capacity), - wakers: WakerVec::new(capacity), - states: PollVec::new(capacity), - keys: BTreeSet::new(), + inner: GroupInner::with_capacity(capacity), } } @@ -128,7 +108,7 @@ impl FutureGroup { /// assert_eq!(group.len(), 1); /// ``` pub fn len(&self) -> usize { - self.futures.len() + self.inner.len() } /// Return the capacity of the `FutureGroup`. @@ -144,7 +124,7 @@ impl FutureGroup { /// # let group: FutureGroup = group; /// ``` pub fn capacity(&self) -> usize { - self.futures.capacity() + self.inner.capacity() } /// Returns true if there are no futures currently active in the group. @@ -161,7 +141,7 @@ impl FutureGroup { /// assert!(!group.is_empty()); /// ``` pub fn is_empty(&self) -> bool { - self.futures.is_empty() + self.inner.is_empty() } /// Removes a stream from the group. Returns whether the value was present in @@ -182,12 +162,8 @@ impl FutureGroup { /// # }) /// ``` pub fn remove(&mut self, key: Key) -> bool { - let is_present = self.keys.remove(&key.0); - if is_present { - self.states[key.0].set_none(); - self.futures.remove(key.0); - } - is_present + // TODO(consoli): is it useful to return the removed future here? + self.inner.remove(key).is_some() } /// Returns `true` if the `FutureGroup` contains a value for the specified key. @@ -207,7 +183,7 @@ impl FutureGroup { /// # }) /// ``` pub fn contains_key(&mut self, key: Key) -> bool { - self.keys.contains(&key.0) + self.inner.contains_key(key) } } @@ -223,26 +199,11 @@ impl FutureGroup { /// let mut group = FutureGroup::with_capacity(2); /// group.insert(future::ready(12)); /// ``` - pub fn insert(&mut self, stream: F) -> Key + pub fn insert(&mut self, future: F) -> Key where F: Future, { - let index = self.futures.insert(stream); - self.keys.insert(index); - let key = Key(index); - - // If our slab allocated more space we need to - // update our tracking structures along with it. - let max_len = self.capacity().max(index); - self.wakers.resize(max_len); - self.states.resize(max_len); - - // Set the corresponding state - self.states[index].set_pending(); - let mut readiness = self.wakers.readiness(); - readiness.set_ready(index); - - key + self.inner.insert(future) } /// Insert a value into a pinned `FutureGroup` @@ -251,29 +212,12 @@ impl FutureGroup { /// `ConcurrentStream`. We should never expose this publicly, as the entire /// point of this crate is that we abstract the futures poll machinery away /// from end-users. - pub(crate) fn insert_pinned(self: Pin<&mut Self>, stream: F) -> Key - where - F: Future, - { - let mut this = self.project(); - // SAFETY: inserting a value into the futures slab does not ever move - // any of the existing values. - let index = unsafe { this.futures.as_mut().get_unchecked_mut() }.insert(stream); - this.keys.insert(index); - let key = Key(index); - - // If our slab allocated more space we need to - // update our tracking structures along with it. - let max_len = this.futures.as_ref().capacity().max(index); - this.wakers.resize(max_len); - this.states.resize(max_len); - - // Set the corresponding state - this.states[index].set_pending(); - let mut readiness = this.wakers.readiness(); - readiness.set_ready(index); - - key + pub(crate) fn insert_pinned(self: Pin<&mut Self>, future: F) -> Key { + let this = self.project(); + this.inner.insert_pinned(future) + + // let inner = unsafe { self.as_mut().map_unchecked_mut(|this| &mut this.inner) }; + // inner.insert_pinned(future) } /// Create a stream which also yields the key of each item. @@ -309,68 +253,45 @@ impl FutureGroup { cx: &Context<'_>, ) -> Poll::Output)>> { let mut this = self.project(); + let inner = unsafe { &mut this.inner.as_mut().get_unchecked_mut() }; // Short-circuit if we have no futures to iterate over - if this.futures.is_empty() { + if inner.is_empty() { return Poll::Ready(None); } // Set the top-level waker and check readiness - let mut readiness = this.wakers.readiness(); - readiness.set_waker(cx.waker()); - if !readiness.any_ready() { + inner.set_top_waker(cx.waker()); + if !inner.any_ready() { // Nothing is ready yet return Poll::Pending; } - // Setup our futures state - let mut ret = Poll::Pending; - let states = this.states; - - // SAFETY: We unpin the future group so we can later individually access - // single futures. Either to read from them or to drop them. - let futures = unsafe { this.futures.as_mut().get_unchecked_mut() }; - - for index in this.keys.iter().cloned() { - if states[index].is_pending() && readiness.clear_ready(index) { - // unlock readiness so we don't deadlock when polling - #[allow(clippy::drop_non_drop)] - drop(readiness); - - // Obtain the intermediate waker. - let mut cx = Context::from_waker(this.wakers.get(index).unwrap()); - - // SAFETY: this future here is a projection from the futures - // vec, which we're reading from. - let future = unsafe { Pin::new_unchecked(&mut futures[index]) }; - match future.poll(&mut cx) { - Poll::Ready(item) => { - // Set the return type for the function - ret = Poll::Ready(Some((Key(index), item))); - - // Remove all associated data with the future - // The only data we can't remove directly is the key entry. - states[index] = PollState::None; - futures.remove(index); - - break; - } - // Keep looping if there is nothing for us to do - Poll::Pending => {} - }; - - // Lock readiness so we can use it again - readiness = this.wakers.readiness(); + for index in inner.keys.iter().cloned() { + // verify if the `index`th future can be polled + if !inner.can_progress_index(index) { + continue; } - } - // Now that we're no longer borrowing `this.keys` we can remove - // the current key from the set - if let Poll::Ready(Some((key, _))) = ret { - this.keys.remove(&key.0); - } + // Obtain the intermediate waker. + let mut cx = Context::from_waker(inner.wakers.get(index).unwrap()); + + // SAFETY: this future here is a projection from the futures + // vec, which we're reading from. + let future = unsafe { Pin::new_unchecked(&mut inner.items[index]) }; + if let Poll::Ready(item) = future.poll(&mut cx) { + let key = Key(index); + // Set the return type for the function + let ret = Poll::Ready(Some((key, item))); - ret + // Remove all associated data with the future + // The only data we can't remove directly is the key entry. + inner.remove(key); + + return ret; + } + } + Poll::Pending } } @@ -398,10 +319,6 @@ impl FromIterator for FutureGroup { } } -/// A key used to index into the `FutureGroup` type. -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct Key(usize); - /// Iterate over items in the futures group with their associated keys. #[derive(Debug)] #[pin_project::pin_project] @@ -442,7 +359,7 @@ mod test { #[test] fn smoke() { futures_lite::future::block_on(async { - let mut group = FutureGroup::new(); + let mut group = FutureGroup::with_capacity(0); group.insert(future::ready(2)); group.insert(future::ready(4)); diff --git a/src/utils/poll_state/vec.rs b/src/utils/poll_state/vec.rs index bf9523b..f754f23 100644 --- a/src/utils/poll_state/vec.rs +++ b/src/utils/poll_state/vec.rs @@ -26,7 +26,7 @@ use super::PollState; /// ``` const MAX_INLINE_ENTRIES: usize = core::mem::size_of::() * 3 - 2; -#[derive(Default)] +#[derive(Default, Debug)] pub(crate) struct PollVec(SmallVec<[PollState; MAX_INLINE_ENTRIES]>); impl PollVec { From 9505c43230186856be51ceb34ccdd37d15f4d1cd Mon Sep 17 00:00:00 2001 From: Matheus Consoli Date: Fri, 22 Mar 2024 17:06:24 -0300 Subject: [PATCH 02/12] StreamGroup use GroupInner --- src/collections/group_inner.rs | 4 +- src/stream/stream_group.rs | 172 ++++++++++++--------------------- src/utils/mod.rs | 2 +- 3 files changed, 63 insertions(+), 115 deletions(-) diff --git a/src/collections/group_inner.rs b/src/collections/group_inner.rs index 99d18e2..dae86c7 100644 --- a/src/collections/group_inner.rs +++ b/src/collections/group_inner.rs @@ -48,7 +48,7 @@ impl GroupInner { pub fn insert(&mut self, item: A) -> Key { if !self.has_capacity() { - self.resize(self.cap + 1 * GROUP_GROWTH_FACTOR); + self.resize((self.cap + 1) * GROUP_GROWTH_FACTOR); } let index = self.items.insert(item); @@ -65,7 +65,7 @@ impl GroupInner { pub fn insert_pinned(mut self: Pin<&mut Self>, item: A) -> Key { if !self.has_capacity() { let r = unsafe { &mut self.as_mut().get_unchecked_mut() }; - r.resize(r.cap + 1 * GROUP_GROWTH_FACTOR); + r.resize((r.cap + 1) * GROUP_GROWTH_FACTOR); } let mut this = self.project(); diff --git a/src/stream/stream_group.rs b/src/stream/stream_group.rs index f0bcab7..d8e616c 100644 --- a/src/stream/stream_group.rs +++ b/src/stream/stream_group.rs @@ -1,13 +1,11 @@ -use alloc::collections::BTreeSet; -use core::fmt::{self, Debug}; +use core::fmt::Debug; use core::ops::{Deref, DerefMut}; use core::pin::Pin; use core::task::{Context, Poll}; use futures_core::Stream; -use slab::Slab; use smallvec::{smallvec, SmallVec}; -use crate::utils::{PollState, PollVec, WakerVec}; +use crate::collections::group_inner::{GroupInner, Key}; /// A growable group of streams which act as a single unit. /// @@ -57,23 +55,11 @@ use crate::utils::{PollState, PollVec, WakerVec}; /// # }); /// ``` #[must_use = "`StreamGroup` does nothing if not iterated over"] -#[derive(Default)] +#[derive(Default, Debug)] #[pin_project::pin_project] pub struct StreamGroup { #[pin] - streams: Slab, - wakers: WakerVec, - states: PollVec, - keys: BTreeSet, - key_removal_queue: SmallVec<[usize; 10]>, -} - -impl Debug for StreamGroup { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("StreamGroup") - .field("slab", &"[..]") - .finish() - } + inner: GroupInner, } impl StreamGroup { @@ -103,11 +89,7 @@ impl StreamGroup { /// ``` pub fn with_capacity(capacity: usize) -> Self { Self { - streams: Slab::with_capacity(capacity), - wakers: WakerVec::new(capacity), - states: PollVec::new(capacity), - keys: BTreeSet::new(), - key_removal_queue: smallvec![], + inner: GroupInner::with_capacity(capacity), } } @@ -125,7 +107,7 @@ impl StreamGroup { /// assert_eq!(group.len(), 1); /// ``` pub fn len(&self) -> usize { - self.streams.len() + self.inner.len() } /// Return the capacity of the `StreamGroup`. @@ -141,7 +123,7 @@ impl StreamGroup { /// # let group: StreamGroup = group; /// ``` pub fn capacity(&self) -> usize { - self.streams.capacity() + self.inner.capacity() } /// Returns true if there are no futures currently active in the group. @@ -158,7 +140,7 @@ impl StreamGroup { /// assert!(!group.is_empty()); /// ``` pub fn is_empty(&self) -> bool { - self.streams.is_empty() + self.inner.is_empty() } /// Removes a stream from the group. Returns whether the value was present in @@ -179,12 +161,7 @@ impl StreamGroup { /// # }) /// ``` pub fn remove(&mut self, key: Key) -> bool { - let is_present = self.keys.remove(&key.0); - if is_present { - self.states[key.0].set_none(); - self.streams.remove(key.0); - } - is_present + self.inner.remove(key).is_some() } /// Returns `true` if the `StreamGroup` contains a value for the specified key. @@ -204,7 +181,7 @@ impl StreamGroup { /// # }) /// ``` pub fn contains_key(&mut self, key: Key) -> bool { - self.keys.contains(&key.0) + self.inner.contains_key(key) } } @@ -224,22 +201,7 @@ impl StreamGroup { where S: Stream, { - let index = self.streams.insert(stream); - self.keys.insert(index); - let key = Key(index); - - // If our slab allocated more space we need to - // update our tracking structures along with it. - let max_len = self.capacity().max(index); - self.wakers.resize(max_len); - self.states.resize(max_len); - - // Set the corresponding state - self.states[index].set_pending(); - let mut readiness = self.wakers.readiness(); - readiness.set_ready(index); - - key + self.inner.insert(stream) } /// Create a stream which also yields the key of each item. @@ -274,90 +236,80 @@ impl StreamGroup { cx: &Context<'_>, ) -> Poll::Item)>> { let mut this = self.project(); + let inner = unsafe { &mut this.inner.as_mut().get_unchecked_mut() }; // Short-circuit if we have no streams to iterate over - if this.streams.is_empty() { + if inner.is_empty() { return Poll::Ready(None); } // Set the top-level waker and check readiness - let mut readiness = this.wakers.readiness(); - readiness.set_waker(cx.waker()); - if !readiness.any_ready() { + inner.set_top_waker(cx.waker()); + if !inner.any_ready() { // Nothing is ready yet return Poll::Pending; } // Setup our stream state - let mut ret = Poll::Pending; let mut done_count = 0; - let stream_count = this.streams.len(); - let states = this.states; - - // SAFETY: We unpin the stream set so we can later individually access - // single streams. Either to read from them or to drop them. - let streams = unsafe { this.streams.as_mut().get_unchecked_mut() }; - - for index in this.keys.iter().cloned() { - if states[index].is_pending() && readiness.clear_ready(index) { - // unlock readiness so we don't deadlock when polling - #[allow(clippy::drop_non_drop)] - drop(readiness); - - // Obtain the intermediate waker. - let mut cx = Context::from_waker(this.wakers.get(index).unwrap()); - - // SAFETY: this stream here is a projection from the streams - // vec, which we're reading from. - let stream = unsafe { Pin::new_unchecked(&mut streams[index]) }; - match stream.poll_next(&mut cx) { - Poll::Ready(Some(item)) => { - // Set the return type for the function - ret = Poll::Ready(Some((Key(index), item))); - - // We just obtained an item from this index, make sure - // we check it again on a next iteration - states[index] = PollState::Pending; - let mut readiness = this.wakers.readiness(); - readiness.set_ready(index); - - break; - } - Poll::Ready(None) => { - // A stream has ended, make note of that - done_count += 1; - - // Remove all associated data about the stream. - // The only data we can't remove directly is the key entry. - states[index] = PollState::None; - streams.remove(index); - this.key_removal_queue.push(index); - } - // Keep looping if there is nothing for us to do - Poll::Pending => {} - }; + let stream_count = inner.len(); + let mut removal_queue: SmallVec<[usize; 10]> = smallvec![]; - // Lock readiness so we can use it again - readiness = this.wakers.readiness(); + for index in inner.keys.iter().cloned() { + if !inner.can_progress_index(index) { + continue; } + + // Obtain the intermediate waker. + let mut cx = Context::from_waker(inner.wakers.get(index).unwrap()); + + // SAFETY: this stream here is a projection from the streams + // vec, which we're reading from. + let stream = unsafe { Pin::new_unchecked(&mut inner.items[index]) }; + match stream.poll_next(&mut cx) { + Poll::Ready(Some(item)) => { + let key = Key(index); + // Set the return type for the function + let ret = Poll::Ready(Some((key, item))); + + // We just obtained an item from this index, make sure + // we check it again on a next iteration + inner.states[index].set_pending(); + inner.wakers.readiness().set_ready(index); + + for key in removal_queue { + inner.remove(Key(key)); + } + + return ret; + } + Poll::Ready(None) => { + // A stream has ended, make note of that + done_count += 1; + + // Remove all associated data about the stream. + // The only data we can't remove directly is the key entry. + removal_queue.push(index); + continue; + } + // Keep looping if there is nothing for us to do + Poll::Pending => {} + }; } // Now that we're no longer borrowing `this.keys` we can loop over // which items we need to remove - if !this.key_removal_queue.is_empty() { - for key in this.key_removal_queue.iter() { - this.keys.remove(key); - } - this.key_removal_queue.clear(); + for key in removal_queue { + inner.remove(Key(key)); } // If all streams turned up with `Poll::Ready(None)` our // stream should return that if done_count == stream_count { - ret = Poll::Ready(None); + return Poll::Ready(None); } - ret + Poll::Pending } } @@ -385,10 +337,6 @@ impl FromIterator for StreamGroup { } } -/// A key used to index into the `StreamGroup` type. -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct Key(usize); - /// Iterate over items in the stream group with their associated keys. #[derive(Debug)] #[pin_project::pin_project] diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 29bcf29..21e97ef 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -28,7 +28,7 @@ pub(crate) use pin::{get_pin_mut, iter_pin_mut}; pub(crate) use pin::{get_pin_mut_from_vec, iter_pin_mut_vec}; pub(crate) use poll_state::PollArray; #[cfg(feature = "alloc")] -pub(crate) use poll_state::{MaybeDone, PollState, PollVec}; +pub(crate) use poll_state::{MaybeDone, PollVec}; pub(crate) use tuple::{gen_conditions, tuple_len}; pub(crate) use wakers::WakerArray; #[cfg(feature = "alloc")] From 8753ddeac30407e0685410a4fd9cb077d5a39e2f Mon Sep 17 00:00:00 2001 From: Matheus Consoli Date: Fri, 22 Mar 2024 19:52:52 -0300 Subject: [PATCH 03/12] Rename `GroupInner` to `InnerGroup` --- src/collections/{group_inner.rs => inner_group.rs} | 10 +++++----- src/collections/mod.rs | 2 +- src/future/future_group.rs | 6 +++--- src/stream/stream_group.rs | 6 +++--- 4 files changed, 12 insertions(+), 12 deletions(-) rename src/collections/{group_inner.rs => inner_group.rs} (95%) diff --git a/src/collections/group_inner.rs b/src/collections/inner_group.rs similarity index 95% rename from src/collections/group_inner.rs rename to src/collections/inner_group.rs index dae86c7..414684c 100644 --- a/src/collections/group_inner.rs +++ b/src/collections/inner_group.rs @@ -8,7 +8,7 @@ use crate::utils::{PollVec, WakerVec}; const GROUP_GROWTH_FACTOR: usize = 2; #[pin_project::pin_project] -pub struct GroupInner { +pub struct InnerGroup { #[pin] pub items: Slab, pub wakers: WakerVec, @@ -18,7 +18,7 @@ pub struct GroupInner { len: usize, } -impl GroupInner { +impl InnerGroup { pub fn with_capacity(cap: usize) -> Self { Self { items: Slab::with_capacity(cap), @@ -117,20 +117,20 @@ impl GroupInner { } /// Keyed operations -impl GroupInner { +impl InnerGroup { // move to other impl block pub fn contains_key(&self, key: Key) -> bool { self.items.contains(key.0) } } -impl Default for GroupInner { +impl Default for InnerGroup { fn default() -> Self { Self::with_capacity(0) } } -impl fmt::Debug for GroupInner { +impl fmt::Debug for InnerGroup { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("GroupInner") .field("cap", &self.cap) diff --git a/src/collections/mod.rs b/src/collections/mod.rs index a4f46f8..37fb814 100644 --- a/src/collections/mod.rs +++ b/src/collections/mod.rs @@ -1,3 +1,3 @@ -pub(crate) mod group_inner; +pub(crate) mod inner_group; #[cfg(feature = "alloc")] pub mod vec; diff --git a/src/future/future_group.rs b/src/future/future_group.rs index 7a96017..e32e4ff 100644 --- a/src/future/future_group.rs +++ b/src/future/future_group.rs @@ -4,7 +4,7 @@ use core::task::{Context, Poll}; use futures_core::stream::Stream; use futures_core::Future; -use crate::collections::group_inner::{GroupInner, Key}; +use crate::collections::inner_group::{InnerGroup, Key}; /// A growable group of futures which act as a single unit. /// @@ -59,7 +59,7 @@ use crate::collections::group_inner::{GroupInner, Key}; #[pin_project::pin_project] pub struct FutureGroup { #[pin] - inner: GroupInner, + inner: InnerGroup, } impl FutureGroup { @@ -89,7 +89,7 @@ impl FutureGroup { /// ``` pub fn with_capacity(capacity: usize) -> Self { Self { - inner: GroupInner::with_capacity(capacity), + inner: InnerGroup::with_capacity(capacity), } } diff --git a/src/stream/stream_group.rs b/src/stream/stream_group.rs index d8e616c..558153a 100644 --- a/src/stream/stream_group.rs +++ b/src/stream/stream_group.rs @@ -5,7 +5,7 @@ use core::task::{Context, Poll}; use futures_core::Stream; use smallvec::{smallvec, SmallVec}; -use crate::collections::group_inner::{GroupInner, Key}; +use crate::collections::inner_group::{InnerGroup, Key}; /// A growable group of streams which act as a single unit. /// @@ -59,7 +59,7 @@ use crate::collections::group_inner::{GroupInner, Key}; #[pin_project::pin_project] pub struct StreamGroup { #[pin] - inner: GroupInner, + inner: InnerGroup, } impl StreamGroup { @@ -89,7 +89,7 @@ impl StreamGroup { /// ``` pub fn with_capacity(capacity: usize) -> Self { Self { - inner: GroupInner::with_capacity(capacity), + inner: InnerGroup::with_capacity(capacity), } } From c374c89561e5a6abca4bd0b3d280bcb99008af99 Mon Sep 17 00:00:00 2001 From: Matheus Consoli Date: Fri, 22 Mar 2024 23:31:27 -0300 Subject: [PATCH 04/12] FutureGroup working with Group abstraction --- src/collections/inner_group.rs | 163 ++++++++++++++++++++++++++++++++- src/future/future_group.rs | 153 +++++++++++++++++-------------- 2 files changed, 246 insertions(+), 70 deletions(-) diff --git a/src/collections/inner_group.rs b/src/collections/inner_group.rs index 414684c..94bf76e 100644 --- a/src/collections/inner_group.rs +++ b/src/collections/inner_group.rs @@ -1,7 +1,13 @@ -use core::{pin::Pin, task::Waker}; +use core::{ + marker::PhantomData, + ops::ControlFlow, + pin::Pin, + task::{Context, Poll, Waker}, +}; use alloc::{collections::BTreeSet, fmt}; use slab::Slab; +use smallvec::{smallvec, SmallVec}; use crate::utils::{PollVec, WakerVec}; @@ -16,6 +22,7 @@ pub struct InnerGroup { pub keys: BTreeSet, cap: usize, len: usize, + //_poll_behavior: PhantomData, } impl InnerGroup { @@ -27,6 +34,7 @@ impl InnerGroup { keys: BTreeSet::new(), cap, len: 0, + // _poll_behavior: PhantomData, } } @@ -132,7 +140,7 @@ impl Default for InnerGroup { impl fmt::Debug for InnerGroup { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("GroupInner") + f.debug_struct("InnerGroup") .field("cap", &self.cap) .field("len", &self.len) .finish() @@ -142,3 +150,154 @@ impl fmt::Debug for InnerGroup { /// A key used to index into the `FutureGroup` type. #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct Key(pub usize); + +// ---- +// prototyping + +use theory::*; + +#[derive(Debug)] +#[pin_project::pin_project] +pub struct Group { + #[pin] + pub inner: InnerGroup, + pub _poll_behavior: PhantomData, +} + +impl Group +where + B: PollBehavior, +{ + pub fn poll_next_inner(self: Pin<&mut Self>, cx: &Context<'_>) -> Poll> { + let mut this = self.project(); + let group = unsafe { this.inner.as_mut().get_unchecked_mut() }; + + // short-circuit if we have no items to iterate over + if group.is_empty() { + return Poll::Ready(None); + } + + // set the top-level waker and check readiness + group.set_top_waker(cx.waker()); + if !group.any_ready() { + // nothing is ready yet + return Poll::Pending; + } + + let mut done_count = 0; + let group_len = group.len(); + let mut removal_queue: SmallVec<[_; 10]> = smallvec![]; + + let mut ret = Poll::Pending; + + for index in group.keys.iter().cloned() { + if !group.can_progress_index(index) { + continue; + } + + // obtain the intermediate waker + let mut cx = Context::from_waker(group.wakers.get(index).unwrap()); + + let pollable = unsafe { Pin::new_unchecked(&mut group.items[index]) }; + match B::poll(pollable, &mut cx) { + Poll::Ready(ControlFlow::Break((result, PollAgain::Stop))) => { + for item in removal_queue { + group.remove(Key(item)); + } + group.remove(Key(index)); + return Poll::Ready(Some((Key(index), result))); + } + Poll::Ready(ControlFlow::Break((result, PollAgain::Poll))) => { + group.states[index].set_pending(); + group.wakers.readiness().set_ready(index); + + ret = Poll::Ready(Some((Key(index), result))); + break; + } + Poll::Ready(ControlFlow::Continue(_)) => { + done_count += 1; + removal_queue.push(index); + continue; + } + Poll::Pending => continue, + } + } + for item in removal_queue { + group.remove(Key(item)); + } + + if done_count == group_len { + return Poll::Ready(None); + } + + ret + } +} + +pub mod theory { + use core::future::Future; + use core::marker::PhantomData; + use core::ops::ControlFlow; + use core::pin::Pin; + use core::task::{Context, Poll}; + + use futures_core::Stream; + + pub enum PollAgain { + Poll, + Stop, + } + + pub trait PollBehavior { + type Item; + type Polling; + type Poll; + + fn poll( + this: Pin<&mut Self::Polling>, + cx: &mut Context<'_>, + ) -> Poll>; + } + + #[derive(Debug)] + pub struct PollFuture(PhantomData); + + impl PollBehavior for PollFuture { + type Item = F::Output; + type Polling = F; + type Poll = Self::Item; + + fn poll( + this: Pin<&mut Self::Polling>, + cx: &mut Context<'_>, + ) -> Poll> { + if let Poll::Ready(item) = this.poll(cx) { + Poll::Ready(ControlFlow::Break((item, PollAgain::Stop))) + } else { + Poll::Pending + } + } + } + + #[derive(Debug)] + pub struct PollStream(PhantomData); + + impl PollBehavior for PollStream { + type Item = S::Item; + type Polling = S; + type Poll = Option; + + fn poll( + this: Pin<&mut Self::Polling>, + cx: &mut Context<'_>, + ) -> Poll> { + match this.poll_next(cx) { + Poll::Ready(Some(item)) => { + Poll::Ready(ControlFlow::Break((Some(item), PollAgain::Poll))) + } + Poll::Ready(None) => Poll::Ready(ControlFlow::Continue(())), + Poll::Pending => Poll::Pending, + } + } + } +} diff --git a/src/future/future_group.rs b/src/future/future_group.rs index e32e4ff..d97be2b 100644 --- a/src/future/future_group.rs +++ b/src/future/future_group.rs @@ -4,7 +4,8 @@ use core::task::{Context, Poll}; use futures_core::stream::Stream; use futures_core::Future; -use crate::collections::inner_group::{InnerGroup, Key}; +use crate::collections::inner_group::theory::PollFuture; +use crate::collections::inner_group::{Group, InnerGroup, Key}; /// A growable group of futures which act as a single unit. /// @@ -55,14 +56,14 @@ use crate::collections::inner_group::{InnerGroup, Key}; /// # });} /// ``` #[must_use = "`FutureGroup` does nothing if not iterated over"] -#[derive(Default, Debug)] +#[derive(Debug)] #[pin_project::pin_project] -pub struct FutureGroup { +pub struct FutureGroup { #[pin] - inner: InnerGroup, + inner: Group>, } -impl FutureGroup { +impl FutureGroup { /// Create a new instance of `FutureGroup`. /// /// # Example @@ -71,7 +72,7 @@ impl FutureGroup { /// use futures_concurrency::future::FutureGroup; /// /// let group = FutureGroup::new(); - /// # let group: FutureGroup = group; + /// # let group: FutureGroup> = group; /// ``` pub fn new() -> Self { Self::with_capacity(0) @@ -85,11 +86,14 @@ impl FutureGroup { /// use futures_concurrency::future::FutureGroup; /// /// let group = FutureGroup::with_capacity(2); - /// # let group: FutureGroup = group; + /// # let group: FutureGroup> = group; /// ``` pub fn with_capacity(capacity: usize) -> Self { Self { - inner: InnerGroup::with_capacity(capacity), + inner: Group { + inner: InnerGroup::with_capacity(capacity), + _poll_behavior: core::marker::PhantomData, + }, } } @@ -108,7 +112,7 @@ impl FutureGroup { /// assert_eq!(group.len(), 1); /// ``` pub fn len(&self) -> usize { - self.inner.len() + self.inner.inner.len() } /// Return the capacity of the `FutureGroup`. @@ -121,10 +125,10 @@ impl FutureGroup { /// /// let group = FutureGroup::with_capacity(2); /// assert_eq!(group.capacity(), 2); - /// # let group: FutureGroup = group; + /// # let group: FutureGroup> = group; /// ``` pub fn capacity(&self) -> usize { - self.inner.capacity() + self.inner.inner.capacity() } /// Returns true if there are no futures currently active in the group. @@ -141,7 +145,7 @@ impl FutureGroup { /// assert!(!group.is_empty()); /// ``` pub fn is_empty(&self) -> bool { - self.inner.is_empty() + self.inner.inner.is_empty() } /// Removes a stream from the group. Returns whether the value was present in @@ -163,7 +167,7 @@ impl FutureGroup { /// ``` pub fn remove(&mut self, key: Key) -> bool { // TODO(consoli): is it useful to return the removed future here? - self.inner.remove(key).is_some() + self.inner.inner.remove(key).is_some() } /// Returns `true` if the `FutureGroup` contains a value for the specified key. @@ -183,7 +187,7 @@ impl FutureGroup { /// # }) /// ``` pub fn contains_key(&mut self, key: Key) -> bool { - self.inner.contains_key(key) + self.inner.inner.contains_key(key) } } @@ -203,7 +207,7 @@ impl FutureGroup { where F: Future, { - self.inner.insert(future) + self.inner.inner.insert(future) } /// Insert a value into a pinned `FutureGroup` @@ -213,8 +217,13 @@ impl FutureGroup { /// point of this crate is that we abstract the futures poll machinery away /// from end-users. pub(crate) fn insert_pinned(self: Pin<&mut Self>, future: F) -> Key { - let this = self.project(); - this.inner.insert_pinned(future) + let mut this = self.project(); + let inner = unsafe { + this.inner + .as_mut() + .map_unchecked_mut(|inner| &mut inner.inner) + }; + inner.insert_pinned(future) // let inner = unsafe { self.as_mut().map_unchecked_mut(|this| &mut this.inner) }; // inner.insert_pinned(future) @@ -247,59 +256,60 @@ impl FutureGroup { } } -impl FutureGroup { - fn poll_next_inner( - self: Pin<&mut Self>, - cx: &Context<'_>, - ) -> Poll::Output)>> { - let mut this = self.project(); - let inner = unsafe { &mut this.inner.as_mut().get_unchecked_mut() }; - - // Short-circuit if we have no futures to iterate over - if inner.is_empty() { - return Poll::Ready(None); - } - - // Set the top-level waker and check readiness - inner.set_top_waker(cx.waker()); - if !inner.any_ready() { - // Nothing is ready yet - return Poll::Pending; - } - - for index in inner.keys.iter().cloned() { - // verify if the `index`th future can be polled - if !inner.can_progress_index(index) { - continue; - } - - // Obtain the intermediate waker. - let mut cx = Context::from_waker(inner.wakers.get(index).unwrap()); - - // SAFETY: this future here is a projection from the futures - // vec, which we're reading from. - let future = unsafe { Pin::new_unchecked(&mut inner.items[index]) }; - if let Poll::Ready(item) = future.poll(&mut cx) { - let key = Key(index); - // Set the return type for the function - let ret = Poll::Ready(Some((key, item))); - - // Remove all associated data with the future - // The only data we can't remove directly is the key entry. - inner.remove(key); - - return ret; - } - } - Poll::Pending - } -} +// impl FutureGroup { +// fn poll_next_inner( +// self: Pin<&mut Self>, +// cx: &Context<'_>, +// ) -> Poll::Output)>> { +// let mut this = self.project(); +// let inner = unsafe { &mut this.inner.as_mut().get_unchecked_mut() }; + +// // Short-circuit if we have no futures to iterate over +// if inner.is_empty() { +// return Poll::Ready(None); +// } + +// // Set the top-level waker and check readiness +// inner.set_top_waker(cx.waker()); +// if !inner.any_ready() { +// // Nothing is ready yet +// return Poll::Pending; +// } + +// for index in inner.keys.iter().cloned() { +// // verify if the `index`th future can be polled +// if !inner.can_progress_index(index) { +// continue; +// } + +// // Obtain the intermediate waker. +// let mut cx = Context::from_waker(inner.wakers.get(index).unwrap()); + +// // SAFETY: this future here is a projection from the futures +// // vec, which we're reading from. +// let future = unsafe { Pin::new_unchecked(&mut inner.items[index]) }; +// if let Poll::Ready(item) = future.poll(&mut cx) { +// let key = Key(index); +// // Set the return type for the function +// let ret = Poll::Ready(Some((key, item))); + +// // Remove all associated data with the future +// // The only data we can't remove directly is the key entry. +// inner.remove(key); + +// return ret; +// } +// } +// Poll::Pending +// } +// } impl Stream for FutureGroup { type Item = ::Output; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.poll_next_inner(cx) { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let pinned = unsafe { self.as_mut().map_unchecked_mut(|this| &mut this.inner) }; + match pinned.poll_next_inner(cx) { Poll::Ready(Some((_key, item))) => Poll::Ready(Some(item)), Poll::Ready(None) => Poll::Ready(None), Poll::Pending => Poll::Pending, @@ -345,8 +355,15 @@ impl Stream for Keyed { type Item = (Key, ::Output); fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // let mut this = self.project(); + // this.group.as_mut().poll_next_inner(cx) let mut this = self.project(); - this.group.as_mut().poll_next_inner(cx) + let inner = unsafe { + this.group + .as_mut() + .map_unchecked_mut(|inner| &mut inner.inner) + }; + inner.poll_next_inner(cx) } } @@ -359,7 +376,7 @@ mod test { #[test] fn smoke() { futures_lite::future::block_on(async { - let mut group = FutureGroup::with_capacity(0); + let mut group: FutureGroup> = FutureGroup::with_capacity(0); group.insert(future::ready(2)); group.insert(future::ready(4)); From 899f32bafe27639e82af67ed9a15156d7599cb77 Mon Sep 17 00:00:00 2001 From: Matheus Consoli Date: Sat, 23 Mar 2024 11:10:29 -0300 Subject: [PATCH 05/12] Move `StreamGroup` to use `InnerGroup` --- src/collections/inner_group.rs | 80 ++++++++++++--------------- src/future/future_group.rs | 99 ++++++---------------------------- src/stream/stream_group.rs | 94 +++----------------------------- 3 files changed, 59 insertions(+), 214 deletions(-) diff --git a/src/collections/inner_group.rs b/src/collections/inner_group.rs index 94bf76e..257f3cb 100644 --- a/src/collections/inner_group.rs +++ b/src/collections/inner_group.rs @@ -14,18 +14,18 @@ use crate::utils::{PollVec, WakerVec}; const GROUP_GROWTH_FACTOR: usize = 2; #[pin_project::pin_project] -pub struct InnerGroup { +pub struct InnerGroup { #[pin] - pub items: Slab, - pub wakers: WakerVec, - pub states: PollVec, - pub keys: BTreeSet, + items: Slab, + wakers: WakerVec, + states: PollVec, + keys: BTreeSet, cap: usize, len: usize, - //_poll_behavior: PhantomData, + _poll_behavior: PhantomData, } -impl InnerGroup { +impl InnerGroup { pub fn with_capacity(cap: usize) -> Self { Self { items: Slab::with_capacity(cap), @@ -34,7 +34,7 @@ impl InnerGroup { keys: BTreeSet::new(), cap, len: 0, - // _poll_behavior: PhantomData, + _poll_behavior: PhantomData, } } @@ -71,6 +71,8 @@ impl InnerGroup { } pub fn insert_pinned(mut self: Pin<&mut Self>, item: A) -> Key { + // todo: less unsafe + if !self.has_capacity() { let r = unsafe { &mut self.as_mut().get_unchecked_mut() }; r.resize((r.cap + 1) * GROUP_GROWTH_FACTOR); @@ -122,23 +124,20 @@ impl InnerGroup { pub fn can_progress_index(&self, index: usize) -> bool { self.states[index].is_pending() && self.wakers.readiness().clear_ready(index) } -} -/// Keyed operations -impl InnerGroup { // move to other impl block pub fn contains_key(&self, key: Key) -> bool { self.items.contains(key.0) } } -impl Default for InnerGroup { +impl Default for InnerGroup { fn default() -> Self { Self::with_capacity(0) } } -impl fmt::Debug for InnerGroup { +impl fmt::Debug for InnerGroup { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("InnerGroup") .field("cap", &self.cap) @@ -156,60 +155,53 @@ pub struct Key(pub usize); use theory::*; -#[derive(Debug)] -#[pin_project::pin_project] -pub struct Group { - #[pin] - pub inner: InnerGroup, - pub _poll_behavior: PhantomData, -} - -impl Group +impl InnerGroup where B: PollBehavior, { - pub fn poll_next_inner(self: Pin<&mut Self>, cx: &Context<'_>) -> Poll> { - let mut this = self.project(); - let group = unsafe { this.inner.as_mut().get_unchecked_mut() }; - + pub fn poll_next_inner( + mut self: Pin<&mut Self>, + cx: &Context<'_>, + ) -> Poll> { + let this = unsafe { self.as_mut().get_unchecked_mut() }; // short-circuit if we have no items to iterate over - if group.is_empty() { + if this.is_empty() { return Poll::Ready(None); } // set the top-level waker and check readiness - group.set_top_waker(cx.waker()); - if !group.any_ready() { + this.set_top_waker(cx.waker()); + if !this.any_ready() { // nothing is ready yet return Poll::Pending; } let mut done_count = 0; - let group_len = group.len(); + let group_len = this.len(); let mut removal_queue: SmallVec<[_; 10]> = smallvec![]; let mut ret = Poll::Pending; - for index in group.keys.iter().cloned() { - if !group.can_progress_index(index) { + for index in this.keys.iter().cloned() { + if !this.can_progress_index(index) { continue; } // obtain the intermediate waker - let mut cx = Context::from_waker(group.wakers.get(index).unwrap()); + let mut cx = Context::from_waker(this.wakers.get(index).unwrap()); - let pollable = unsafe { Pin::new_unchecked(&mut group.items[index]) }; + let pollable = unsafe { Pin::new_unchecked(&mut this.items[index]) }; match B::poll(pollable, &mut cx) { Poll::Ready(ControlFlow::Break((result, PollAgain::Stop))) => { for item in removal_queue { - group.remove(Key(item)); + this.remove(Key(item)); } - group.remove(Key(index)); + this.remove(Key(index)); return Poll::Ready(Some((Key(index), result))); } Poll::Ready(ControlFlow::Break((result, PollAgain::Poll))) => { - group.states[index].set_pending(); - group.wakers.readiness().set_ready(index); + this.states[index].set_pending(); + this.wakers.readiness().set_ready(index); ret = Poll::Ready(Some((Key(index), result))); break; @@ -223,7 +215,7 @@ where } } for item in removal_queue { - group.remove(Key(item)); + this.remove(Key(item)); } if done_count == group_len { @@ -260,7 +252,7 @@ pub mod theory { } #[derive(Debug)] - pub struct PollFuture(PhantomData); + pub struct PollFuture(PhantomData); impl PollBehavior for PollFuture { type Item = F::Output; @@ -280,21 +272,19 @@ pub mod theory { } #[derive(Debug)] - pub struct PollStream(PhantomData); + pub struct PollStream(PhantomData); impl PollBehavior for PollStream { type Item = S::Item; type Polling = S; - type Poll = Option; + type Poll = Self::Item; fn poll( this: Pin<&mut Self::Polling>, cx: &mut Context<'_>, ) -> Poll> { match this.poll_next(cx) { - Poll::Ready(Some(item)) => { - Poll::Ready(ControlFlow::Break((Some(item), PollAgain::Poll))) - } + Poll::Ready(Some(item)) => Poll::Ready(ControlFlow::Break((item, PollAgain::Poll))), Poll::Ready(None) => Poll::Ready(ControlFlow::Continue(())), Poll::Pending => Poll::Pending, } diff --git a/src/future/future_group.rs b/src/future/future_group.rs index d97be2b..5a6b0bc 100644 --- a/src/future/future_group.rs +++ b/src/future/future_group.rs @@ -5,7 +5,7 @@ use futures_core::stream::Stream; use futures_core::Future; use crate::collections::inner_group::theory::PollFuture; -use crate::collections::inner_group::{Group, InnerGroup, Key}; +use crate::collections::inner_group::{InnerGroup, Key}; /// A growable group of futures which act as a single unit. /// @@ -58,12 +58,12 @@ use crate::collections::inner_group::{Group, InnerGroup, Key}; #[must_use = "`FutureGroup` does nothing if not iterated over"] #[derive(Debug)] #[pin_project::pin_project] -pub struct FutureGroup { +pub struct FutureGroup { #[pin] - inner: Group>, + inner: InnerGroup>, } -impl FutureGroup { +impl FutureGroup { /// Create a new instance of `FutureGroup`. /// /// # Example @@ -90,10 +90,7 @@ impl FutureGroup { /// ``` pub fn with_capacity(capacity: usize) -> Self { Self { - inner: Group { - inner: InnerGroup::with_capacity(capacity), - _poll_behavior: core::marker::PhantomData, - }, + inner: InnerGroup::with_capacity(capacity), } } @@ -112,7 +109,7 @@ impl FutureGroup { /// assert_eq!(group.len(), 1); /// ``` pub fn len(&self) -> usize { - self.inner.inner.len() + self.inner.len() } /// Return the capacity of the `FutureGroup`. @@ -128,7 +125,7 @@ impl FutureGroup { /// # let group: FutureGroup> = group; /// ``` pub fn capacity(&self) -> usize { - self.inner.inner.capacity() + self.inner.capacity() } /// Returns true if there are no futures currently active in the group. @@ -145,7 +142,7 @@ impl FutureGroup { /// assert!(!group.is_empty()); /// ``` pub fn is_empty(&self) -> bool { - self.inner.inner.is_empty() + self.inner.is_empty() } /// Removes a stream from the group. Returns whether the value was present in @@ -167,7 +164,7 @@ impl FutureGroup { /// ``` pub fn remove(&mut self, key: Key) -> bool { // TODO(consoli): is it useful to return the removed future here? - self.inner.inner.remove(key).is_some() + self.inner.remove(key).is_some() } /// Returns `true` if the `FutureGroup` contains a value for the specified key. @@ -187,7 +184,7 @@ impl FutureGroup { /// # }) /// ``` pub fn contains_key(&mut self, key: Key) -> bool { - self.inner.inner.contains_key(key) + self.inner.contains_key(key) } } @@ -207,7 +204,7 @@ impl FutureGroup { where F: Future, { - self.inner.inner.insert(future) + self.inner.insert(future) } /// Insert a value into a pinned `FutureGroup` @@ -217,16 +214,8 @@ impl FutureGroup { /// point of this crate is that we abstract the futures poll machinery away /// from end-users. pub(crate) fn insert_pinned(self: Pin<&mut Self>, future: F) -> Key { - let mut this = self.project(); - let inner = unsafe { - this.inner - .as_mut() - .map_unchecked_mut(|inner| &mut inner.inner) - }; - inner.insert_pinned(future) - - // let inner = unsafe { self.as_mut().map_unchecked_mut(|this| &mut this.inner) }; - // inner.insert_pinned(future) + let this = self.project(); + this.inner.insert_pinned(future) } /// Create a stream which also yields the key of each item. @@ -256,60 +245,12 @@ impl FutureGroup { } } -// impl FutureGroup { -// fn poll_next_inner( -// self: Pin<&mut Self>, -// cx: &Context<'_>, -// ) -> Poll::Output)>> { -// let mut this = self.project(); -// let inner = unsafe { &mut this.inner.as_mut().get_unchecked_mut() }; - -// // Short-circuit if we have no futures to iterate over -// if inner.is_empty() { -// return Poll::Ready(None); -// } - -// // Set the top-level waker and check readiness -// inner.set_top_waker(cx.waker()); -// if !inner.any_ready() { -// // Nothing is ready yet -// return Poll::Pending; -// } - -// for index in inner.keys.iter().cloned() { -// // verify if the `index`th future can be polled -// if !inner.can_progress_index(index) { -// continue; -// } - -// // Obtain the intermediate waker. -// let mut cx = Context::from_waker(inner.wakers.get(index).unwrap()); - -// // SAFETY: this future here is a projection from the futures -// // vec, which we're reading from. -// let future = unsafe { Pin::new_unchecked(&mut inner.items[index]) }; -// if let Poll::Ready(item) = future.poll(&mut cx) { -// let key = Key(index); -// // Set the return type for the function -// let ret = Poll::Ready(Some((key, item))); - -// // Remove all associated data with the future -// // The only data we can't remove directly is the key entry. -// inner.remove(key); - -// return ret; -// } -// } -// Poll::Pending -// } -// } - impl Stream for FutureGroup { type Item = ::Output; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let pinned = unsafe { self.as_mut().map_unchecked_mut(|this| &mut this.inner) }; - match pinned.poll_next_inner(cx) { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + match this.inner.poll_next_inner(cx) { Poll::Ready(Some((_key, item))) => Poll::Ready(Some(item)), Poll::Ready(None) => Poll::Ready(None), Poll::Pending => Poll::Pending, @@ -355,14 +296,8 @@ impl Stream for Keyed { type Item = (Key, ::Output); fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // let mut this = self.project(); - // this.group.as_mut().poll_next_inner(cx) let mut this = self.project(); - let inner = unsafe { - this.group - .as_mut() - .map_unchecked_mut(|inner| &mut inner.inner) - }; + let inner = unsafe { this.group.as_mut().map_unchecked_mut(|t| &mut t.inner) }; inner.poll_next_inner(cx) } } diff --git a/src/stream/stream_group.rs b/src/stream/stream_group.rs index 558153a..628f768 100644 --- a/src/stream/stream_group.rs +++ b/src/stream/stream_group.rs @@ -3,8 +3,8 @@ use core::ops::{Deref, DerefMut}; use core::pin::Pin; use core::task::{Context, Poll}; use futures_core::Stream; -use smallvec::{smallvec, SmallVec}; +use crate::collections::inner_group::theory::PollStream; use crate::collections::inner_group::{InnerGroup, Key}; /// A growable group of streams which act as a single unit. @@ -59,7 +59,7 @@ use crate::collections::inner_group::{InnerGroup, Key}; #[pin_project::pin_project] pub struct StreamGroup { #[pin] - inner: InnerGroup, + inner: InnerGroup>, } impl StreamGroup { @@ -230,94 +230,12 @@ impl StreamGroup { } } -impl StreamGroup { - fn poll_next_inner( - self: Pin<&mut Self>, - cx: &Context<'_>, - ) -> Poll::Item)>> { - let mut this = self.project(); - let inner = unsafe { &mut this.inner.as_mut().get_unchecked_mut() }; - - // Short-circuit if we have no streams to iterate over - if inner.is_empty() { - return Poll::Ready(None); - } - - // Set the top-level waker and check readiness - inner.set_top_waker(cx.waker()); - if !inner.any_ready() { - // Nothing is ready yet - return Poll::Pending; - } - - // Setup our stream state - let mut done_count = 0; - let stream_count = inner.len(); - let mut removal_queue: SmallVec<[usize; 10]> = smallvec![]; - - for index in inner.keys.iter().cloned() { - if !inner.can_progress_index(index) { - continue; - } - - // Obtain the intermediate waker. - let mut cx = Context::from_waker(inner.wakers.get(index).unwrap()); - - // SAFETY: this stream here is a projection from the streams - // vec, which we're reading from. - let stream = unsafe { Pin::new_unchecked(&mut inner.items[index]) }; - match stream.poll_next(&mut cx) { - Poll::Ready(Some(item)) => { - let key = Key(index); - // Set the return type for the function - let ret = Poll::Ready(Some((key, item))); - - // We just obtained an item from this index, make sure - // we check it again on a next iteration - inner.states[index].set_pending(); - inner.wakers.readiness().set_ready(index); - - for key in removal_queue { - inner.remove(Key(key)); - } - - return ret; - } - Poll::Ready(None) => { - // A stream has ended, make note of that - done_count += 1; - - // Remove all associated data about the stream. - // The only data we can't remove directly is the key entry. - removal_queue.push(index); - continue; - } - // Keep looping if there is nothing for us to do - Poll::Pending => {} - }; - } - - // Now that we're no longer borrowing `this.keys` we can loop over - // which items we need to remove - for key in removal_queue { - inner.remove(Key(key)); - } - - // If all streams turned up with `Poll::Ready(None)` our - // stream should return that - if done_count == stream_count { - return Poll::Ready(None); - } - - Poll::Pending - } -} - impl Stream for StreamGroup { type Item = ::Item; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.poll_next_inner(cx) { + let this = self.project(); + match this.inner.poll_next_inner(cx) { Poll::Ready(Some((_key, item))) => Poll::Ready(Some(item)), Poll::Ready(None) => Poll::Ready(None), Poll::Pending => Poll::Pending, @@ -364,7 +282,9 @@ impl Stream for Keyed { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); - this.group.as_mut().poll_next_inner(cx) + // todo: unsafe + let inner = unsafe { this.group.as_mut().map_unchecked_mut(|t| &mut t.inner) }; + inner.poll_next_inner(cx) } } From d9715ff831587af74867141c463338f5ccccb8f2 Mon Sep 17 00:00:00 2001 From: Matheus Consoli Date: Sat, 23 Mar 2024 12:05:26 -0300 Subject: [PATCH 06/12] Fix `feature(alloc)` for `InnerGroup` --- src/collections/inner_group.rs | 13 +++---------- src/collections/mod.rs | 1 + src/future/future_group.rs | 8 +++++++- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/collections/inner_group.rs b/src/collections/inner_group.rs index 257f3cb..136e008 100644 --- a/src/collections/inner_group.rs +++ b/src/collections/inner_group.rs @@ -113,18 +113,10 @@ impl InnerGroup { self.cap = cap; } - pub fn any_ready(&self) -> bool { - self.wakers.readiness().any_ready() - } - pub fn set_top_waker(&mut self, waker: &Waker) { self.wakers.readiness().set_waker(waker); } - pub fn can_progress_index(&self, index: usize) -> bool { - self.states[index].is_pending() && self.wakers.readiness().clear_ready(index) - } - // move to other impl block pub fn contains_key(&self, key: Key) -> bool { self.items.contains(key.0) @@ -171,7 +163,7 @@ where // set the top-level waker and check readiness this.set_top_waker(cx.waker()); - if !this.any_ready() { + if !this.wakers.readiness().any_ready() { // nothing is ready yet return Poll::Pending; } @@ -183,7 +175,8 @@ where let mut ret = Poll::Pending; for index in this.keys.iter().cloned() { - if !this.can_progress_index(index) { + // can we make progress for this item? + if !(this.states[index].is_pending() && this.wakers.readiness().clear_ready(index)) { continue; } diff --git a/src/collections/mod.rs b/src/collections/mod.rs index 37fb814..cf6ca7e 100644 --- a/src/collections/mod.rs +++ b/src/collections/mod.rs @@ -1,3 +1,4 @@ +#[cfg(feature = "alloc")] pub(crate) mod inner_group; #[cfg(feature = "alloc")] pub mod vec; diff --git a/src/future/future_group.rs b/src/future/future_group.rs index 5a6b0bc..415faa0 100644 --- a/src/future/future_group.rs +++ b/src/future/future_group.rs @@ -75,7 +75,7 @@ impl FutureGroup { /// # let group: FutureGroup> = group; /// ``` pub fn new() -> Self { - Self::with_capacity(0) + Self::default() } /// Create a new instance of `FutureGroup` with a given capacity. @@ -188,6 +188,12 @@ impl FutureGroup { } } +impl Default for FutureGroup { + fn default() -> Self { + Self::with_capacity(0) + } +} + impl FutureGroup { /// Insert a new future into the group. /// From a6313c5fe88076b283e898a4a4fe28e256ae6e98 Mon Sep 17 00:00:00 2001 From: Matheus Consoli Date: Sat, 23 Mar 2024 13:01:13 -0300 Subject: [PATCH 07/12] Add `reserve` method to `FutureGroup` and `StreamGroup` --- src/collections/inner_group.rs | 24 ++++++++++++++---------- src/future/future_group.rs | 19 +++++++++++++++++++ src/stream/stream_group.rs | 19 +++++++++++++++++++ 3 files changed, 52 insertions(+), 10 deletions(-) diff --git a/src/collections/inner_group.rs b/src/collections/inner_group.rs index 136e008..554b66b 100644 --- a/src/collections/inner_group.rs +++ b/src/collections/inner_group.rs @@ -11,7 +11,9 @@ use smallvec::{smallvec, SmallVec}; use crate::utils::{PollVec, WakerVec}; -const GROUP_GROWTH_FACTOR: usize = 2; +const fn grow_group_capacity(cap: usize) -> usize { + cap * 2 + 1 +} #[pin_project::pin_project] pub struct InnerGroup { @@ -56,7 +58,7 @@ impl InnerGroup { pub fn insert(&mut self, item: A) -> Key { if !self.has_capacity() { - self.resize((self.cap + 1) * GROUP_GROWTH_FACTOR); + self.reserve(grow_group_capacity(self.cap)); } let index = self.items.insert(item); @@ -75,7 +77,7 @@ impl InnerGroup { if !self.has_capacity() { let r = unsafe { &mut self.as_mut().get_unchecked_mut() }; - r.resize((r.cap + 1) * GROUP_GROWTH_FACTOR); + r.reserve(grow_group_capacity(r.cap)); } let mut this = self.project(); @@ -102,15 +104,17 @@ impl InnerGroup { Some(item) } - // todo: rename to reserve - pub fn resize(&mut self, cap: usize) { - if self.len + cap < self.cap { + /// Reserve `additional` capacity for new items + /// Does nothing if the capacity is already sufficient + pub fn reserve(&mut self, additional: usize) { + if self.len + additional < self.cap { return; } - self.wakers.resize(cap); - self.states.resize(cap); - self.items.reserve_exact(cap); - self.cap = cap; + let new_cap = self.cap + additional; + self.wakers.resize(new_cap); + self.states.resize(new_cap); + self.items.reserve_exact(new_cap); + self.cap = new_cap; } pub fn set_top_waker(&mut self, waker: &Waker) { diff --git a/src/future/future_group.rs b/src/future/future_group.rs index 415faa0..5f3c46c 100644 --- a/src/future/future_group.rs +++ b/src/future/future_group.rs @@ -186,6 +186,25 @@ impl FutureGroup { pub fn contains_key(&mut self, key: Key) -> bool { self.inner.contains_key(key) } + + /// Reserves capacity for `additional` more futures to be inserted. + /// Does nothing if the capacity is already sufficient. + /// + /// # Example + /// + /// ```rust + /// use futures_concurrency::future::FutureGroup; + /// use std::future; + /// # futures_lite::future::block_on(async { + /// let mut group = FutureGroup::with_capacity(0); + /// assert_eq!(group.capacity(), 0); + /// group.reserve(10); + /// assert_eq!(group.capacity(), 10); + /// # }) + /// ``` + pub fn reserve(&mut self, additional: usize) { + self.inner.reserve(additional); + } } impl Default for FutureGroup { diff --git a/src/stream/stream_group.rs b/src/stream/stream_group.rs index 628f768..58136e1 100644 --- a/src/stream/stream_group.rs +++ b/src/stream/stream_group.rs @@ -183,6 +183,25 @@ impl StreamGroup { pub fn contains_key(&mut self, key: Key) -> bool { self.inner.contains_key(key) } + + /// Reserves capacity for `additional` more streams to be inserted. + /// Does nothing if the capacity is already sufficient. + /// + /// # Example + /// + /// ```rust + /// use futures_concurrency::future::StreamGroup; + /// use std::future; + /// # futures_lite::future::block_on(async { + /// let mut group = StreamGroup::with_capacity(0); + /// assert_eq!(group.capacity(), 0); + /// group.reserve(10); + /// assert_eq!(group.capacity(), 10); + /// # }) + /// ``` + pub fn reserve(&mut self, additional: usize) { + self.inner.reserve(additional); + } } impl StreamGroup { From 6465651be8ae80af4868496ff08462b3f24d2cd3 Mon Sep 17 00:00:00 2001 From: Matheus Consoli Date: Sat, 23 Mar 2024 13:09:26 -0300 Subject: [PATCH 08/12] Fix `reserve` doc tests --- src/future/future_group.rs | 4 ++-- src/stream/stream_group.rs | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/future/future_group.rs b/src/future/future_group.rs index 5f3c46c..27955a8 100644 --- a/src/future/future_group.rs +++ b/src/future/future_group.rs @@ -194,9 +194,9 @@ impl FutureGroup { /// /// ```rust /// use futures_concurrency::future::FutureGroup; - /// use std::future; + /// use std::future::Ready; /// # futures_lite::future::block_on(async { - /// let mut group = FutureGroup::with_capacity(0); + /// let mut group: FutureGroup> = FutureGroup::with_capacity(0); /// assert_eq!(group.capacity(), 0); /// group.reserve(10); /// assert_eq!(group.capacity(), 10); diff --git a/src/stream/stream_group.rs b/src/stream/stream_group.rs index 58136e1..b6d4773 100644 --- a/src/stream/stream_group.rs +++ b/src/stream/stream_group.rs @@ -190,10 +190,10 @@ impl StreamGroup { /// # Example /// /// ```rust - /// use futures_concurrency::future::StreamGroup; - /// use std::future; + /// use futures_concurrency::stream::StreamGroup; + /// use futures_lite::stream::Once; /// # futures_lite::future::block_on(async { - /// let mut group = StreamGroup::with_capacity(0); + /// let mut group: StreamGroup> = StreamGroup::with_capacity(0); /// assert_eq!(group.capacity(), 0); /// group.reserve(10); /// assert_eq!(group.capacity(), 10); From 557497515e353860a264c77dcc02db1d22fb3d6b Mon Sep 17 00:00:00 2001 From: Matheus Consoli Date: Sat, 23 Mar 2024 15:47:14 -0300 Subject: [PATCH 09/12] Simplifies and document `PollBehavior` --- src/collections/inner_group.rs | 139 +++++++++++++++++---------------- src/future/future_group.rs | 5 +- src/stream/stream_group.rs | 5 +- 3 files changed, 75 insertions(+), 74 deletions(-) diff --git a/src/collections/inner_group.rs b/src/collections/inner_group.rs index 554b66b..0434068 100644 --- a/src/collections/inner_group.rs +++ b/src/collections/inner_group.rs @@ -1,4 +1,5 @@ use core::{ + future::Future, marker::PhantomData, ops::ControlFlow, pin::Pin, @@ -6,6 +7,7 @@ use core::{ }; use alloc::{collections::BTreeSet, fmt}; +use futures_core::Stream; use slab::Slab; use smallvec::{smallvec, SmallVec}; @@ -146,19 +148,14 @@ impl fmt::Debug for InnerGroup { #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct Key(pub usize); -// ---- -// prototyping - -use theory::*; - impl InnerGroup where - B: PollBehavior, + B: PollBehavior, { pub fn poll_next_inner( mut self: Pin<&mut Self>, cx: &Context<'_>, - ) -> Poll> { + ) -> Poll> { let this = unsafe { self.as_mut().get_unchecked_mut() }; // short-circuit if we have no items to iterate over if this.is_empty() { @@ -189,26 +186,26 @@ where let pollable = unsafe { Pin::new_unchecked(&mut this.items[index]) }; match B::poll(pollable, &mut cx) { - Poll::Ready(ControlFlow::Break((result, PollAgain::Stop))) => { + ControlFlow::Break((result, PollAgain::Stop)) => { for item in removal_queue { this.remove(Key(item)); } this.remove(Key(index)); return Poll::Ready(Some((Key(index), result))); } - Poll::Ready(ControlFlow::Break((result, PollAgain::Poll))) => { + ControlFlow::Break((result, PollAgain::Poll)) => { this.states[index].set_pending(); this.wakers.readiness().set_ready(index); ret = Poll::Ready(Some((Key(index), result))); break; } - Poll::Ready(ControlFlow::Continue(_)) => { + ControlFlow::Continue(PollAgain::Stop) => { done_count += 1; removal_queue.push(index); continue; } - Poll::Pending => continue, + ControlFlow::Continue(PollAgain::Poll) => continue, } } for item in removal_queue { @@ -223,68 +220,74 @@ where } } -pub mod theory { - use core::future::Future; - use core::marker::PhantomData; - use core::ops::ControlFlow; - use core::pin::Pin; - use core::task::{Context, Poll}; - - use futures_core::Stream; - - pub enum PollAgain { - Poll, - Stop, - } - - pub trait PollBehavior { - type Item; - type Polling; - type Poll; +/// Used to tell the callee of [`PollBehavior::poll`] +/// whether the `poll`'ed item should be polled again. +pub(crate) enum PollAgain { + /// Keep polling + Poll, + /// Stop polling + Stop, +} - fn poll( - this: Pin<&mut Self::Polling>, - cx: &mut Context<'_>, - ) -> Poll>; - } +pub(crate) trait PollBehavior

{ + /// The output type of the polled item + type Output; + + /// Poll the underlying item and decides how the iteration should proceed + /// + /// # Return value + /// The returned value coordinates two key aspects of the group iteration: + /// - whether the group should keep iterating over the next items; + /// - `ControlFlow::Continue(_)` to inform that the group + /// should proceed to the next item + /// - `ControlFlow::Break(_)` to inform that the group + /// should stop iterating + /// - whether the group should poll the same item again; + /// - [`PollAgain::Poll`] to inform the group to + /// mark that the item should be polled again + /// - [`PollAgain::Stop`] to inform the group to + /// stop polling this item + fn poll( + this: Pin<&mut P>, + cx: &mut Context<'_>, + ) -> ControlFlow<(Self::Output, PollAgain), PollAgain>; +} - #[derive(Debug)] - pub struct PollFuture(PhantomData); - - impl PollBehavior for PollFuture { - type Item = F::Output; - type Polling = F; - type Poll = Self::Item; - - fn poll( - this: Pin<&mut Self::Polling>, - cx: &mut Context<'_>, - ) -> Poll> { - if let Poll::Ready(item) = this.poll(cx) { - Poll::Ready(ControlFlow::Break((item, PollAgain::Stop))) - } else { - Poll::Pending - } +pub struct PollFuture; + +impl PollBehavior for PollFuture { + type Output = F::Output; + + fn poll( + future: Pin<&mut F>, + cx: &mut Context<'_>, + ) -> ControlFlow<(Self::Output, PollAgain), PollAgain> { + if let Poll::Ready(output) = future.poll(cx) { + // return the futures output and inform the group to not poll it again + ControlFlow::Break((output, PollAgain::Stop)) + } else { + // future is not ready yet, keep polling + ControlFlow::Continue(PollAgain::Poll) } } +} - #[derive(Debug)] - pub struct PollStream(PhantomData); - - impl PollBehavior for PollStream { - type Item = S::Item; - type Polling = S; - type Poll = Self::Item; - - fn poll( - this: Pin<&mut Self::Polling>, - cx: &mut Context<'_>, - ) -> Poll> { - match this.poll_next(cx) { - Poll::Ready(Some(item)) => Poll::Ready(ControlFlow::Break((item, PollAgain::Poll))), - Poll::Ready(None) => Poll::Ready(ControlFlow::Continue(())), - Poll::Pending => Poll::Pending, - } +pub struct PollStream; + +impl PollBehavior for PollStream { + type Output = S::Item; + + fn poll( + stream: Pin<&mut S>, + cx: &mut Context<'_>, + ) -> ControlFlow<(Self::Output, PollAgain), PollAgain> { + match stream.poll_next(cx) { + // stop the iteration, keep polling this stream + Poll::Ready(Some(item)) => ControlFlow::Break((item, PollAgain::Poll)), + // continue the iteration, stop polling this stream + Poll::Ready(None) => ControlFlow::Continue(PollAgain::Stop), + // continue the iteration, continue polling this stream + Poll::Pending => ControlFlow::Continue(PollAgain::Poll), } } } diff --git a/src/future/future_group.rs b/src/future/future_group.rs index 27955a8..23fdb66 100644 --- a/src/future/future_group.rs +++ b/src/future/future_group.rs @@ -4,8 +4,7 @@ use core::task::{Context, Poll}; use futures_core::stream::Stream; use futures_core::Future; -use crate::collections::inner_group::theory::PollFuture; -use crate::collections::inner_group::{InnerGroup, Key}; +use crate::collections::inner_group::{InnerGroup, Key, PollFuture}; /// A growable group of futures which act as a single unit. /// @@ -60,7 +59,7 @@ use crate::collections::inner_group::{InnerGroup, Key}; #[pin_project::pin_project] pub struct FutureGroup { #[pin] - inner: InnerGroup>, + inner: InnerGroup, } impl FutureGroup { diff --git a/src/stream/stream_group.rs b/src/stream/stream_group.rs index b6d4773..364c70a 100644 --- a/src/stream/stream_group.rs +++ b/src/stream/stream_group.rs @@ -4,8 +4,7 @@ use core::pin::Pin; use core::task::{Context, Poll}; use futures_core::Stream; -use crate::collections::inner_group::theory::PollStream; -use crate::collections::inner_group::{InnerGroup, Key}; +use crate::collections::inner_group::{InnerGroup, Key, PollStream}; /// A growable group of streams which act as a single unit. /// @@ -59,7 +58,7 @@ use crate::collections::inner_group::{InnerGroup, Key}; #[pin_project::pin_project] pub struct StreamGroup { #[pin] - inner: InnerGroup>, + inner: InnerGroup, } impl StreamGroup { From f7273aad3165416e2c2adccb5779e556b5b1ae47 Mon Sep 17 00:00:00 2001 From: Matheus Consoli Date: Sun, 24 Mar 2024 13:01:17 -0300 Subject: [PATCH 10/12] Safety documentation --- src/collections/inner_group.rs | 48 ++++++++++++---------------------- 1 file changed, 17 insertions(+), 31 deletions(-) diff --git a/src/collections/inner_group.rs b/src/collections/inner_group.rs index 0434068..8aa5b77 100644 --- a/src/collections/inner_group.rs +++ b/src/collections/inner_group.rs @@ -3,7 +3,7 @@ use core::{ marker::PhantomData, ops::ControlFlow, pin::Pin, - task::{Context, Poll, Waker}, + task::{Context, Poll}, }; use alloc::{collections::BTreeSet, fmt}; @@ -75,24 +75,10 @@ impl InnerGroup { } pub fn insert_pinned(mut self: Pin<&mut Self>, item: A) -> Key { - // todo: less unsafe - - if !self.has_capacity() { - let r = unsafe { &mut self.as_mut().get_unchecked_mut() }; - r.reserve(grow_group_capacity(r.cap)); - } - - let mut this = self.project(); - let items = unsafe { &mut this.items.as_mut().get_unchecked_mut() }; - let index = items.insert(item); - this.keys.insert(index); - - // set the corresponding state - this.states[index].set_pending(); - this.wakers.readiness().set_ready(index); - - *this.len += 1; - Key(index) + // SAFETY: inserting a value into the slab does not ever move + // any of the existing values + let this = unsafe { &mut self.as_mut().get_unchecked_mut() }; + this.insert(item) } pub fn remove(&mut self, key: Key) -> Option { @@ -119,10 +105,6 @@ impl InnerGroup { self.cap = new_cap; } - pub fn set_top_waker(&mut self, waker: &Waker) { - self.wakers.readiness().set_waker(waker); - } - // move to other impl block pub fn contains_key(&self, key: Key) -> bool { self.items.contains(key.0) @@ -156,14 +138,17 @@ where mut self: Pin<&mut Self>, cx: &Context<'_>, ) -> Poll> { - let this = unsafe { self.as_mut().get_unchecked_mut() }; // short-circuit if we have no items to iterate over - if this.is_empty() { + if self.is_empty() { return Poll::Ready(None); } + // SAFETY: inserting and removing items from the slab does not ever + // move any of the existing values + let this = unsafe { self.as_mut().get_unchecked_mut() }; + // set the top-level waker and check readiness - this.set_top_waker(cx.waker()); + this.wakers.readiness().set_waker(cx.waker()); if !this.wakers.readiness().any_ready() { // nothing is ready yet return Poll::Pending; @@ -184,11 +169,12 @@ where // obtain the intermediate waker let mut cx = Context::from_waker(this.wakers.get(index).unwrap()); + // SAFETY: this item here is a projection from the slab, which we're reading from let pollable = unsafe { Pin::new_unchecked(&mut this.items[index]) }; match B::poll(pollable, &mut cx) { ControlFlow::Break((result, PollAgain::Stop)) => { for item in removal_queue { - this.remove(Key(item)); + this.remove(item); } this.remove(Key(index)); return Poll::Ready(Some((Key(index), result))); @@ -202,14 +188,14 @@ where } ControlFlow::Continue(PollAgain::Stop) => { done_count += 1; - removal_queue.push(index); + removal_queue.push(Key(index)); continue; } ControlFlow::Continue(PollAgain::Poll) => continue, } } for item in removal_queue { - this.remove(Key(item)); + this.remove(item); } if done_count == group_len { @@ -253,7 +239,7 @@ pub(crate) trait PollBehavior

{ ) -> ControlFlow<(Self::Output, PollAgain), PollAgain>; } -pub struct PollFuture; +pub(crate) struct PollFuture; impl PollBehavior for PollFuture { type Output = F::Output; @@ -272,7 +258,7 @@ impl PollBehavior for PollFuture { } } -pub struct PollStream; +pub(crate) struct PollStream; impl PollBehavior for PollStream { type Output = S::Item; From 7850457182e273191523d2c5cb8cd5ac04285606 Mon Sep 17 00:00:00 2001 From: Matheus Consoli Date: Sun, 24 Mar 2024 14:14:57 -0300 Subject: [PATCH 11/12] Document safety for Keyed Stream implementation --- src/stream/stream_group.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/stream/stream_group.rs b/src/stream/stream_group.rs index 364c70a..4cd28ce 100644 --- a/src/stream/stream_group.rs +++ b/src/stream/stream_group.rs @@ -300,7 +300,8 @@ impl Stream for Keyed { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); - // todo: unsafe + // SAFETY: pin-projecting to the inner group is safe because we trust + // it that it's correctly pinned let inner = unsafe { this.group.as_mut().map_unchecked_mut(|t| &mut t.inner) }; inner.poll_next_inner(cx) } From 397e5123823d39720f68dc234e22d56ea3f0f527 Mon Sep 17 00:00:00 2001 From: Matheus Consoli Date: Sun, 24 Mar 2024 14:36:40 -0300 Subject: [PATCH 12/12] Impl Debug and Default for Groups --- src/future/future_group.rs | 24 +++++++++++++++++------- src/stream/stream_group.rs | 19 +++++++++++++++++-- 2 files changed, 34 insertions(+), 9 deletions(-) diff --git a/src/future/future_group.rs b/src/future/future_group.rs index 23fdb66..9e1e9a0 100644 --- a/src/future/future_group.rs +++ b/src/future/future_group.rs @@ -1,3 +1,4 @@ +use core::fmt; use core::ops::{Deref, DerefMut}; use core::pin::Pin; use core::task::{Context, Poll}; @@ -55,13 +56,28 @@ use crate::collections::inner_group::{InnerGroup, Key, PollFuture}; /// # });} /// ``` #[must_use = "`FutureGroup` does nothing if not iterated over"] -#[derive(Debug)] #[pin_project::pin_project] pub struct FutureGroup { #[pin] inner: InnerGroup, } +impl Default for FutureGroup { + fn default() -> Self { + Self::with_capacity(0) + } +} + +impl fmt::Debug for FutureGroup { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("FutureGroup") + .field("slab", &"[..]") + .field("len", &self.inner.len()) + .field("capacity", &self.inner.capacity()) + .finish() + } +} + impl FutureGroup { /// Create a new instance of `FutureGroup`. /// @@ -206,12 +222,6 @@ impl FutureGroup { } } -impl Default for FutureGroup { - fn default() -> Self { - Self::with_capacity(0) - } -} - impl FutureGroup { /// Insert a new future into the group. /// diff --git a/src/stream/stream_group.rs b/src/stream/stream_group.rs index 4cd28ce..c6e70dd 100644 --- a/src/stream/stream_group.rs +++ b/src/stream/stream_group.rs @@ -1,4 +1,4 @@ -use core::fmt::Debug; +use core::fmt; use core::ops::{Deref, DerefMut}; use core::pin::Pin; use core::task::{Context, Poll}; @@ -54,13 +54,28 @@ use crate::collections::inner_group::{InnerGroup, Key, PollStream}; /// # }); /// ``` #[must_use = "`StreamGroup` does nothing if not iterated over"] -#[derive(Default, Debug)] #[pin_project::pin_project] pub struct StreamGroup { #[pin] inner: InnerGroup, } +impl Default for StreamGroup { + fn default() -> Self { + Self::with_capacity(0) + } +} + +impl fmt::Debug for StreamGroup { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("StreamGroup") + .field("slab", &"[..]") + .field("len", &self.inner.len()) + .field("capacity", &self.inner.capacity()) + .finish() + } +} + impl StreamGroup { /// Create a new instance of `StreamGroup`. ///