diff --git a/src/collections/inner_group.rs b/src/collections/inner_group.rs index 94bf76e..257f3cb 100644 --- a/src/collections/inner_group.rs +++ b/src/collections/inner_group.rs @@ -14,18 +14,18 @@ use crate::utils::{PollVec, WakerVec}; const GROUP_GROWTH_FACTOR: usize = 2; #[pin_project::pin_project] -pub struct InnerGroup { +pub struct InnerGroup { #[pin] - pub items: Slab, - pub wakers: WakerVec, - pub states: PollVec, - pub keys: BTreeSet, + items: Slab, + wakers: WakerVec, + states: PollVec, + keys: BTreeSet, cap: usize, len: usize, - //_poll_behavior: PhantomData, + _poll_behavior: PhantomData, } -impl InnerGroup { +impl InnerGroup { pub fn with_capacity(cap: usize) -> Self { Self { items: Slab::with_capacity(cap), @@ -34,7 +34,7 @@ impl InnerGroup { keys: BTreeSet::new(), cap, len: 0, - // _poll_behavior: PhantomData, + _poll_behavior: PhantomData, } } @@ -71,6 +71,8 @@ impl InnerGroup { } pub fn insert_pinned(mut self: Pin<&mut Self>, item: A) -> Key { + // todo: less unsafe + if !self.has_capacity() { let r = unsafe { &mut self.as_mut().get_unchecked_mut() }; r.resize((r.cap + 1) * GROUP_GROWTH_FACTOR); @@ -122,23 +124,20 @@ impl InnerGroup { pub fn can_progress_index(&self, index: usize) -> bool { self.states[index].is_pending() && self.wakers.readiness().clear_ready(index) } -} -/// Keyed operations -impl InnerGroup { // move to other impl block pub fn contains_key(&self, key: Key) -> bool { self.items.contains(key.0) } } -impl Default for InnerGroup { +impl Default for InnerGroup { fn default() -> Self { Self::with_capacity(0) } } -impl fmt::Debug for InnerGroup { +impl fmt::Debug for InnerGroup { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("InnerGroup") .field("cap", &self.cap) @@ -156,60 +155,53 @@ pub struct Key(pub usize); use theory::*; -#[derive(Debug)] -#[pin_project::pin_project] -pub struct Group { - #[pin] - pub inner: InnerGroup, - pub _poll_behavior: PhantomData, -} - -impl Group +impl InnerGroup where B: PollBehavior, { - pub fn poll_next_inner(self: Pin<&mut Self>, cx: &Context<'_>) -> Poll> { - let mut this = self.project(); - let group = unsafe { this.inner.as_mut().get_unchecked_mut() }; - + pub fn poll_next_inner( + mut self: Pin<&mut Self>, + cx: &Context<'_>, + ) -> Poll> { + let this = unsafe { self.as_mut().get_unchecked_mut() }; // short-circuit if we have no items to iterate over - if group.is_empty() { + if this.is_empty() { return Poll::Ready(None); } // set the top-level waker and check readiness - group.set_top_waker(cx.waker()); - if !group.any_ready() { + this.set_top_waker(cx.waker()); + if !this.any_ready() { // nothing is ready yet return Poll::Pending; } let mut done_count = 0; - let group_len = group.len(); + let group_len = this.len(); let mut removal_queue: SmallVec<[_; 10]> = smallvec![]; let mut ret = Poll::Pending; - for index in group.keys.iter().cloned() { - if !group.can_progress_index(index) { + for index in this.keys.iter().cloned() { + if !this.can_progress_index(index) { continue; } // obtain the intermediate waker - let mut cx = Context::from_waker(group.wakers.get(index).unwrap()); + let mut cx = Context::from_waker(this.wakers.get(index).unwrap()); - let pollable = unsafe { Pin::new_unchecked(&mut group.items[index]) }; + let pollable = unsafe { Pin::new_unchecked(&mut this.items[index]) }; match B::poll(pollable, &mut cx) { Poll::Ready(ControlFlow::Break((result, PollAgain::Stop))) => { for item in removal_queue { - group.remove(Key(item)); + this.remove(Key(item)); } - group.remove(Key(index)); + this.remove(Key(index)); return Poll::Ready(Some((Key(index), result))); } Poll::Ready(ControlFlow::Break((result, PollAgain::Poll))) => { - group.states[index].set_pending(); - group.wakers.readiness().set_ready(index); + this.states[index].set_pending(); + this.wakers.readiness().set_ready(index); ret = Poll::Ready(Some((Key(index), result))); break; @@ -223,7 +215,7 @@ where } } for item in removal_queue { - group.remove(Key(item)); + this.remove(Key(item)); } if done_count == group_len { @@ -260,7 +252,7 @@ pub mod theory { } #[derive(Debug)] - pub struct PollFuture(PhantomData); + pub struct PollFuture(PhantomData); impl PollBehavior for PollFuture { type Item = F::Output; @@ -280,21 +272,19 @@ pub mod theory { } #[derive(Debug)] - pub struct PollStream(PhantomData); + pub struct PollStream(PhantomData); impl PollBehavior for PollStream { type Item = S::Item; type Polling = S; - type Poll = Option; + type Poll = Self::Item; fn poll( this: Pin<&mut Self::Polling>, cx: &mut Context<'_>, ) -> Poll> { match this.poll_next(cx) { - Poll::Ready(Some(item)) => { - Poll::Ready(ControlFlow::Break((Some(item), PollAgain::Poll))) - } + Poll::Ready(Some(item)) => Poll::Ready(ControlFlow::Break((item, PollAgain::Poll))), Poll::Ready(None) => Poll::Ready(ControlFlow::Continue(())), Poll::Pending => Poll::Pending, } diff --git a/src/future/future_group.rs b/src/future/future_group.rs index d97be2b..5a6b0bc 100644 --- a/src/future/future_group.rs +++ b/src/future/future_group.rs @@ -5,7 +5,7 @@ use futures_core::stream::Stream; use futures_core::Future; use crate::collections::inner_group::theory::PollFuture; -use crate::collections::inner_group::{Group, InnerGroup, Key}; +use crate::collections::inner_group::{InnerGroup, Key}; /// A growable group of futures which act as a single unit. /// @@ -58,12 +58,12 @@ use crate::collections::inner_group::{Group, InnerGroup, Key}; #[must_use = "`FutureGroup` does nothing if not iterated over"] #[derive(Debug)] #[pin_project::pin_project] -pub struct FutureGroup { +pub struct FutureGroup { #[pin] - inner: Group>, + inner: InnerGroup>, } -impl FutureGroup { +impl FutureGroup { /// Create a new instance of `FutureGroup`. /// /// # Example @@ -90,10 +90,7 @@ impl FutureGroup { /// ``` pub fn with_capacity(capacity: usize) -> Self { Self { - inner: Group { - inner: InnerGroup::with_capacity(capacity), - _poll_behavior: core::marker::PhantomData, - }, + inner: InnerGroup::with_capacity(capacity), } } @@ -112,7 +109,7 @@ impl FutureGroup { /// assert_eq!(group.len(), 1); /// ``` pub fn len(&self) -> usize { - self.inner.inner.len() + self.inner.len() } /// Return the capacity of the `FutureGroup`. @@ -128,7 +125,7 @@ impl FutureGroup { /// # let group: FutureGroup> = group; /// ``` pub fn capacity(&self) -> usize { - self.inner.inner.capacity() + self.inner.capacity() } /// Returns true if there are no futures currently active in the group. @@ -145,7 +142,7 @@ impl FutureGroup { /// assert!(!group.is_empty()); /// ``` pub fn is_empty(&self) -> bool { - self.inner.inner.is_empty() + self.inner.is_empty() } /// Removes a stream from the group. Returns whether the value was present in @@ -167,7 +164,7 @@ impl FutureGroup { /// ``` pub fn remove(&mut self, key: Key) -> bool { // TODO(consoli): is it useful to return the removed future here? - self.inner.inner.remove(key).is_some() + self.inner.remove(key).is_some() } /// Returns `true` if the `FutureGroup` contains a value for the specified key. @@ -187,7 +184,7 @@ impl FutureGroup { /// # }) /// ``` pub fn contains_key(&mut self, key: Key) -> bool { - self.inner.inner.contains_key(key) + self.inner.contains_key(key) } } @@ -207,7 +204,7 @@ impl FutureGroup { where F: Future, { - self.inner.inner.insert(future) + self.inner.insert(future) } /// Insert a value into a pinned `FutureGroup` @@ -217,16 +214,8 @@ impl FutureGroup { /// point of this crate is that we abstract the futures poll machinery away /// from end-users. pub(crate) fn insert_pinned(self: Pin<&mut Self>, future: F) -> Key { - let mut this = self.project(); - let inner = unsafe { - this.inner - .as_mut() - .map_unchecked_mut(|inner| &mut inner.inner) - }; - inner.insert_pinned(future) - - // let inner = unsafe { self.as_mut().map_unchecked_mut(|this| &mut this.inner) }; - // inner.insert_pinned(future) + let this = self.project(); + this.inner.insert_pinned(future) } /// Create a stream which also yields the key of each item. @@ -256,60 +245,12 @@ impl FutureGroup { } } -// impl FutureGroup { -// fn poll_next_inner( -// self: Pin<&mut Self>, -// cx: &Context<'_>, -// ) -> Poll::Output)>> { -// let mut this = self.project(); -// let inner = unsafe { &mut this.inner.as_mut().get_unchecked_mut() }; - -// // Short-circuit if we have no futures to iterate over -// if inner.is_empty() { -// return Poll::Ready(None); -// } - -// // Set the top-level waker and check readiness -// inner.set_top_waker(cx.waker()); -// if !inner.any_ready() { -// // Nothing is ready yet -// return Poll::Pending; -// } - -// for index in inner.keys.iter().cloned() { -// // verify if the `index`th future can be polled -// if !inner.can_progress_index(index) { -// continue; -// } - -// // Obtain the intermediate waker. -// let mut cx = Context::from_waker(inner.wakers.get(index).unwrap()); - -// // SAFETY: this future here is a projection from the futures -// // vec, which we're reading from. -// let future = unsafe { Pin::new_unchecked(&mut inner.items[index]) }; -// if let Poll::Ready(item) = future.poll(&mut cx) { -// let key = Key(index); -// // Set the return type for the function -// let ret = Poll::Ready(Some((key, item))); - -// // Remove all associated data with the future -// // The only data we can't remove directly is the key entry. -// inner.remove(key); - -// return ret; -// } -// } -// Poll::Pending -// } -// } - impl Stream for FutureGroup { type Item = ::Output; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let pinned = unsafe { self.as_mut().map_unchecked_mut(|this| &mut this.inner) }; - match pinned.poll_next_inner(cx) { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + match this.inner.poll_next_inner(cx) { Poll::Ready(Some((_key, item))) => Poll::Ready(Some(item)), Poll::Ready(None) => Poll::Ready(None), Poll::Pending => Poll::Pending, @@ -355,14 +296,8 @@ impl Stream for Keyed { type Item = (Key, ::Output); fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // let mut this = self.project(); - // this.group.as_mut().poll_next_inner(cx) let mut this = self.project(); - let inner = unsafe { - this.group - .as_mut() - .map_unchecked_mut(|inner| &mut inner.inner) - }; + let inner = unsafe { this.group.as_mut().map_unchecked_mut(|t| &mut t.inner) }; inner.poll_next_inner(cx) } } diff --git a/src/stream/stream_group.rs b/src/stream/stream_group.rs index 558153a..628f768 100644 --- a/src/stream/stream_group.rs +++ b/src/stream/stream_group.rs @@ -3,8 +3,8 @@ use core::ops::{Deref, DerefMut}; use core::pin::Pin; use core::task::{Context, Poll}; use futures_core::Stream; -use smallvec::{smallvec, SmallVec}; +use crate::collections::inner_group::theory::PollStream; use crate::collections::inner_group::{InnerGroup, Key}; /// A growable group of streams which act as a single unit. @@ -59,7 +59,7 @@ use crate::collections::inner_group::{InnerGroup, Key}; #[pin_project::pin_project] pub struct StreamGroup { #[pin] - inner: InnerGroup, + inner: InnerGroup>, } impl StreamGroup { @@ -230,94 +230,12 @@ impl StreamGroup { } } -impl StreamGroup { - fn poll_next_inner( - self: Pin<&mut Self>, - cx: &Context<'_>, - ) -> Poll::Item)>> { - let mut this = self.project(); - let inner = unsafe { &mut this.inner.as_mut().get_unchecked_mut() }; - - // Short-circuit if we have no streams to iterate over - if inner.is_empty() { - return Poll::Ready(None); - } - - // Set the top-level waker and check readiness - inner.set_top_waker(cx.waker()); - if !inner.any_ready() { - // Nothing is ready yet - return Poll::Pending; - } - - // Setup our stream state - let mut done_count = 0; - let stream_count = inner.len(); - let mut removal_queue: SmallVec<[usize; 10]> = smallvec![]; - - for index in inner.keys.iter().cloned() { - if !inner.can_progress_index(index) { - continue; - } - - // Obtain the intermediate waker. - let mut cx = Context::from_waker(inner.wakers.get(index).unwrap()); - - // SAFETY: this stream here is a projection from the streams - // vec, which we're reading from. - let stream = unsafe { Pin::new_unchecked(&mut inner.items[index]) }; - match stream.poll_next(&mut cx) { - Poll::Ready(Some(item)) => { - let key = Key(index); - // Set the return type for the function - let ret = Poll::Ready(Some((key, item))); - - // We just obtained an item from this index, make sure - // we check it again on a next iteration - inner.states[index].set_pending(); - inner.wakers.readiness().set_ready(index); - - for key in removal_queue { - inner.remove(Key(key)); - } - - return ret; - } - Poll::Ready(None) => { - // A stream has ended, make note of that - done_count += 1; - - // Remove all associated data about the stream. - // The only data we can't remove directly is the key entry. - removal_queue.push(index); - continue; - } - // Keep looping if there is nothing for us to do - Poll::Pending => {} - }; - } - - // Now that we're no longer borrowing `this.keys` we can loop over - // which items we need to remove - for key in removal_queue { - inner.remove(Key(key)); - } - - // If all streams turned up with `Poll::Ready(None)` our - // stream should return that - if done_count == stream_count { - return Poll::Ready(None); - } - - Poll::Pending - } -} - impl Stream for StreamGroup { type Item = ::Item; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.poll_next_inner(cx) { + let this = self.project(); + match this.inner.poll_next_inner(cx) { Poll::Ready(Some((_key, item))) => Poll::Ready(Some(item)), Poll::Ready(None) => Poll::Ready(None), Poll::Pending => Poll::Pending, @@ -364,7 +282,9 @@ impl Stream for Keyed { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); - this.group.as_mut().poll_next_inner(cx) + // todo: unsafe + let inner = unsafe { this.group.as_mut().map_unchecked_mut(|t| &mut t.inner) }; + inner.poll_next_inner(cx) } }