From c374c89561e5a6abca4bd0b3d280bcb99008af99 Mon Sep 17 00:00:00 2001 From: Matheus Consoli Date: Fri, 22 Mar 2024 23:31:27 -0300 Subject: [PATCH] FutureGroup working with Group abstraction --- src/collections/inner_group.rs | 163 ++++++++++++++++++++++++++++++++- src/future/future_group.rs | 153 +++++++++++++++++-------------- 2 files changed, 246 insertions(+), 70 deletions(-) diff --git a/src/collections/inner_group.rs b/src/collections/inner_group.rs index 414684c..94bf76e 100644 --- a/src/collections/inner_group.rs +++ b/src/collections/inner_group.rs @@ -1,7 +1,13 @@ -use core::{pin::Pin, task::Waker}; +use core::{ + marker::PhantomData, + ops::ControlFlow, + pin::Pin, + task::{Context, Poll, Waker}, +}; use alloc::{collections::BTreeSet, fmt}; use slab::Slab; +use smallvec::{smallvec, SmallVec}; use crate::utils::{PollVec, WakerVec}; @@ -16,6 +22,7 @@ pub struct InnerGroup { pub keys: BTreeSet, cap: usize, len: usize, + //_poll_behavior: PhantomData, } impl InnerGroup { @@ -27,6 +34,7 @@ impl InnerGroup { keys: BTreeSet::new(), cap, len: 0, + // _poll_behavior: PhantomData, } } @@ -132,7 +140,7 @@ impl Default for InnerGroup { impl fmt::Debug for InnerGroup { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("GroupInner") + f.debug_struct("InnerGroup") .field("cap", &self.cap) .field("len", &self.len) .finish() @@ -142,3 +150,154 @@ impl fmt::Debug for InnerGroup { /// A key used to index into the `FutureGroup` type. #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct Key(pub usize); + +// ---- +// prototyping + +use theory::*; + +#[derive(Debug)] +#[pin_project::pin_project] +pub struct Group { + #[pin] + pub inner: InnerGroup, + pub _poll_behavior: PhantomData, +} + +impl Group +where + B: PollBehavior, +{ + pub fn poll_next_inner(self: Pin<&mut Self>, cx: &Context<'_>) -> Poll> { + let mut this = self.project(); + let group = unsafe { this.inner.as_mut().get_unchecked_mut() }; + + // short-circuit if we have no items to iterate over + if group.is_empty() { + return Poll::Ready(None); + } + + // set the top-level waker and check readiness + group.set_top_waker(cx.waker()); + if !group.any_ready() { + // nothing is ready yet + return Poll::Pending; + } + + let mut done_count = 0; + let group_len = group.len(); + let mut removal_queue: SmallVec<[_; 10]> = smallvec![]; + + let mut ret = Poll::Pending; + + for index in group.keys.iter().cloned() { + if !group.can_progress_index(index) { + continue; + } + + // obtain the intermediate waker + let mut cx = Context::from_waker(group.wakers.get(index).unwrap()); + + let pollable = unsafe { Pin::new_unchecked(&mut group.items[index]) }; + match B::poll(pollable, &mut cx) { + Poll::Ready(ControlFlow::Break((result, PollAgain::Stop))) => { + for item in removal_queue { + group.remove(Key(item)); + } + group.remove(Key(index)); + return Poll::Ready(Some((Key(index), result))); + } + Poll::Ready(ControlFlow::Break((result, PollAgain::Poll))) => { + group.states[index].set_pending(); + group.wakers.readiness().set_ready(index); + + ret = Poll::Ready(Some((Key(index), result))); + break; + } + Poll::Ready(ControlFlow::Continue(_)) => { + done_count += 1; + removal_queue.push(index); + continue; + } + Poll::Pending => continue, + } + } + for item in removal_queue { + group.remove(Key(item)); + } + + if done_count == group_len { + return Poll::Ready(None); + } + + ret + } +} + +pub mod theory { + use core::future::Future; + use core::marker::PhantomData; + use core::ops::ControlFlow; + use core::pin::Pin; + use core::task::{Context, Poll}; + + use futures_core::Stream; + + pub enum PollAgain { + Poll, + Stop, + } + + pub trait PollBehavior { + type Item; + type Polling; + type Poll; + + fn poll( + this: Pin<&mut Self::Polling>, + cx: &mut Context<'_>, + ) -> Poll>; + } + + #[derive(Debug)] + pub struct PollFuture(PhantomData); + + impl PollBehavior for PollFuture { + type Item = F::Output; + type Polling = F; + type Poll = Self::Item; + + fn poll( + this: Pin<&mut Self::Polling>, + cx: &mut Context<'_>, + ) -> Poll> { + if let Poll::Ready(item) = this.poll(cx) { + Poll::Ready(ControlFlow::Break((item, PollAgain::Stop))) + } else { + Poll::Pending + } + } + } + + #[derive(Debug)] + pub struct PollStream(PhantomData); + + impl PollBehavior for PollStream { + type Item = S::Item; + type Polling = S; + type Poll = Option; + + fn poll( + this: Pin<&mut Self::Polling>, + cx: &mut Context<'_>, + ) -> Poll> { + match this.poll_next(cx) { + Poll::Ready(Some(item)) => { + Poll::Ready(ControlFlow::Break((Some(item), PollAgain::Poll))) + } + Poll::Ready(None) => Poll::Ready(ControlFlow::Continue(())), + Poll::Pending => Poll::Pending, + } + } + } +} diff --git a/src/future/future_group.rs b/src/future/future_group.rs index e32e4ff..d97be2b 100644 --- a/src/future/future_group.rs +++ b/src/future/future_group.rs @@ -4,7 +4,8 @@ use core::task::{Context, Poll}; use futures_core::stream::Stream; use futures_core::Future; -use crate::collections::inner_group::{InnerGroup, Key}; +use crate::collections::inner_group::theory::PollFuture; +use crate::collections::inner_group::{Group, InnerGroup, Key}; /// A growable group of futures which act as a single unit. /// @@ -55,14 +56,14 @@ use crate::collections::inner_group::{InnerGroup, Key}; /// # });} /// ``` #[must_use = "`FutureGroup` does nothing if not iterated over"] -#[derive(Default, Debug)] +#[derive(Debug)] #[pin_project::pin_project] -pub struct FutureGroup { +pub struct FutureGroup { #[pin] - inner: InnerGroup, + inner: Group>, } -impl FutureGroup { +impl FutureGroup { /// Create a new instance of `FutureGroup`. /// /// # Example @@ -71,7 +72,7 @@ impl FutureGroup { /// use futures_concurrency::future::FutureGroup; /// /// let group = FutureGroup::new(); - /// # let group: FutureGroup = group; + /// # let group: FutureGroup> = group; /// ``` pub fn new() -> Self { Self::with_capacity(0) @@ -85,11 +86,14 @@ impl FutureGroup { /// use futures_concurrency::future::FutureGroup; /// /// let group = FutureGroup::with_capacity(2); - /// # let group: FutureGroup = group; + /// # let group: FutureGroup> = group; /// ``` pub fn with_capacity(capacity: usize) -> Self { Self { - inner: InnerGroup::with_capacity(capacity), + inner: Group { + inner: InnerGroup::with_capacity(capacity), + _poll_behavior: core::marker::PhantomData, + }, } } @@ -108,7 +112,7 @@ impl FutureGroup { /// assert_eq!(group.len(), 1); /// ``` pub fn len(&self) -> usize { - self.inner.len() + self.inner.inner.len() } /// Return the capacity of the `FutureGroup`. @@ -121,10 +125,10 @@ impl FutureGroup { /// /// let group = FutureGroup::with_capacity(2); /// assert_eq!(group.capacity(), 2); - /// # let group: FutureGroup = group; + /// # let group: FutureGroup> = group; /// ``` pub fn capacity(&self) -> usize { - self.inner.capacity() + self.inner.inner.capacity() } /// Returns true if there are no futures currently active in the group. @@ -141,7 +145,7 @@ impl FutureGroup { /// assert!(!group.is_empty()); /// ``` pub fn is_empty(&self) -> bool { - self.inner.is_empty() + self.inner.inner.is_empty() } /// Removes a stream from the group. Returns whether the value was present in @@ -163,7 +167,7 @@ impl FutureGroup { /// ``` pub fn remove(&mut self, key: Key) -> bool { // TODO(consoli): is it useful to return the removed future here? - self.inner.remove(key).is_some() + self.inner.inner.remove(key).is_some() } /// Returns `true` if the `FutureGroup` contains a value for the specified key. @@ -183,7 +187,7 @@ impl FutureGroup { /// # }) /// ``` pub fn contains_key(&mut self, key: Key) -> bool { - self.inner.contains_key(key) + self.inner.inner.contains_key(key) } } @@ -203,7 +207,7 @@ impl FutureGroup { where F: Future, { - self.inner.insert(future) + self.inner.inner.insert(future) } /// Insert a value into a pinned `FutureGroup` @@ -213,8 +217,13 @@ impl FutureGroup { /// point of this crate is that we abstract the futures poll machinery away /// from end-users. pub(crate) fn insert_pinned(self: Pin<&mut Self>, future: F) -> Key { - let this = self.project(); - this.inner.insert_pinned(future) + let mut this = self.project(); + let inner = unsafe { + this.inner + .as_mut() + .map_unchecked_mut(|inner| &mut inner.inner) + }; + inner.insert_pinned(future) // let inner = unsafe { self.as_mut().map_unchecked_mut(|this| &mut this.inner) }; // inner.insert_pinned(future) @@ -247,59 +256,60 @@ impl FutureGroup { } } -impl FutureGroup { - fn poll_next_inner( - self: Pin<&mut Self>, - cx: &Context<'_>, - ) -> Poll::Output)>> { - let mut this = self.project(); - let inner = unsafe { &mut this.inner.as_mut().get_unchecked_mut() }; - - // Short-circuit if we have no futures to iterate over - if inner.is_empty() { - return Poll::Ready(None); - } - - // Set the top-level waker and check readiness - inner.set_top_waker(cx.waker()); - if !inner.any_ready() { - // Nothing is ready yet - return Poll::Pending; - } - - for index in inner.keys.iter().cloned() { - // verify if the `index`th future can be polled - if !inner.can_progress_index(index) { - continue; - } - - // Obtain the intermediate waker. - let mut cx = Context::from_waker(inner.wakers.get(index).unwrap()); - - // SAFETY: this future here is a projection from the futures - // vec, which we're reading from. - let future = unsafe { Pin::new_unchecked(&mut inner.items[index]) }; - if let Poll::Ready(item) = future.poll(&mut cx) { - let key = Key(index); - // Set the return type for the function - let ret = Poll::Ready(Some((key, item))); - - // Remove all associated data with the future - // The only data we can't remove directly is the key entry. - inner.remove(key); - - return ret; - } - } - Poll::Pending - } -} +// impl FutureGroup { +// fn poll_next_inner( +// self: Pin<&mut Self>, +// cx: &Context<'_>, +// ) -> Poll::Output)>> { +// let mut this = self.project(); +// let inner = unsafe { &mut this.inner.as_mut().get_unchecked_mut() }; + +// // Short-circuit if we have no futures to iterate over +// if inner.is_empty() { +// return Poll::Ready(None); +// } + +// // Set the top-level waker and check readiness +// inner.set_top_waker(cx.waker()); +// if !inner.any_ready() { +// // Nothing is ready yet +// return Poll::Pending; +// } + +// for index in inner.keys.iter().cloned() { +// // verify if the `index`th future can be polled +// if !inner.can_progress_index(index) { +// continue; +// } + +// // Obtain the intermediate waker. +// let mut cx = Context::from_waker(inner.wakers.get(index).unwrap()); + +// // SAFETY: this future here is a projection from the futures +// // vec, which we're reading from. +// let future = unsafe { Pin::new_unchecked(&mut inner.items[index]) }; +// if let Poll::Ready(item) = future.poll(&mut cx) { +// let key = Key(index); +// // Set the return type for the function +// let ret = Poll::Ready(Some((key, item))); + +// // Remove all associated data with the future +// // The only data we can't remove directly is the key entry. +// inner.remove(key); + +// return ret; +// } +// } +// Poll::Pending +// } +// } impl Stream for FutureGroup { type Item = ::Output; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.poll_next_inner(cx) { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let pinned = unsafe { self.as_mut().map_unchecked_mut(|this| &mut this.inner) }; + match pinned.poll_next_inner(cx) { Poll::Ready(Some((_key, item))) => Poll::Ready(Some(item)), Poll::Ready(None) => Poll::Ready(None), Poll::Pending => Poll::Pending, @@ -345,8 +355,15 @@ impl Stream for Keyed { type Item = (Key, ::Output); fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // let mut this = self.project(); + // this.group.as_mut().poll_next_inner(cx) let mut this = self.project(); - this.group.as_mut().poll_next_inner(cx) + let inner = unsafe { + this.group + .as_mut() + .map_unchecked_mut(|inner| &mut inner.inner) + }; + inner.poll_next_inner(cx) } } @@ -359,7 +376,7 @@ mod test { #[test] fn smoke() { futures_lite::future::block_on(async { - let mut group = FutureGroup::with_capacity(0); + let mut group: FutureGroup> = FutureGroup::with_capacity(0); group.insert(future::ready(2)); group.insert(future::ready(4));