diff --git a/src/collections/inner_group.rs b/src/collections/inner_group.rs new file mode 100644 index 0000000..8aa5b77 --- /dev/null +++ b/src/collections/inner_group.rs @@ -0,0 +1,279 @@ +use core::{ + future::Future, + marker::PhantomData, + ops::ControlFlow, + pin::Pin, + task::{Context, Poll}, +}; + +use alloc::{collections::BTreeSet, fmt}; +use futures_core::Stream; +use slab::Slab; +use smallvec::{smallvec, SmallVec}; + +use crate::utils::{PollVec, WakerVec}; + +const fn grow_group_capacity(cap: usize) -> usize { + cap * 2 + 1 +} + +#[pin_project::pin_project] +pub struct InnerGroup { + #[pin] + items: Slab, + wakers: WakerVec, + states: PollVec, + keys: BTreeSet, + cap: usize, + len: usize, + _poll_behavior: PhantomData, +} + +impl InnerGroup { + pub fn with_capacity(cap: usize) -> Self { + Self { + items: Slab::with_capacity(cap), + wakers: WakerVec::new(cap), + states: PollVec::new(cap), + keys: BTreeSet::new(), + cap, + len: 0, + _poll_behavior: PhantomData, + } + } + + pub fn len(&self) -> usize { + self.len + } + + pub fn capacity(&self) -> usize { + self.cap + } + + pub fn has_capacity(&self) -> bool { + self.len < self.cap + } + + pub fn is_empty(&self) -> bool { + self.len == 0 + } + + pub fn insert(&mut self, item: A) -> Key { + if !self.has_capacity() { + self.reserve(grow_group_capacity(self.cap)); + } + + let index = self.items.insert(item); + self.keys.insert(index); + + // set the corresponding state + self.states[index].set_pending(); + self.wakers.readiness().set_ready(index); + + self.len += 1; + Key(index) + } + + pub fn insert_pinned(mut self: Pin<&mut Self>, item: A) -> Key { + // SAFETY: inserting a value into the slab does not ever move + // any of the existing values + let this = unsafe { &mut self.as_mut().get_unchecked_mut() }; + this.insert(item) + } + + pub fn remove(&mut self, key: Key) -> Option { + let is_present = self.keys.remove(&key.0); + if !is_present { + return None; + } + self.states[key.0].set_none(); + let item = self.items.remove(key.0); + self.len -= 1; + Some(item) + } + + /// Reserve `additional` capacity for new items + /// Does nothing if the capacity is already sufficient + pub fn reserve(&mut self, additional: usize) { + if self.len + additional < self.cap { + return; + } + let new_cap = self.cap + additional; + self.wakers.resize(new_cap); + self.states.resize(new_cap); + self.items.reserve_exact(new_cap); + self.cap = new_cap; + } + + // move to other impl block + pub fn contains_key(&self, key: Key) -> bool { + self.items.contains(key.0) + } +} + +impl Default for InnerGroup { + fn default() -> Self { + Self::with_capacity(0) + } +} + +impl fmt::Debug for InnerGroup { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("InnerGroup") + .field("cap", &self.cap) + .field("len", &self.len) + .finish() + } +} + +/// A key used to index into the `FutureGroup` type. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct Key(pub usize); + +impl InnerGroup +where + B: PollBehavior, +{ + pub fn poll_next_inner( + mut self: Pin<&mut Self>, + cx: &Context<'_>, + ) -> Poll> { + // short-circuit if we have no items to iterate over + if self.is_empty() { + return Poll::Ready(None); + } + + // SAFETY: inserting and removing items from the slab does not ever + // move any of the existing values + let this = unsafe { self.as_mut().get_unchecked_mut() }; + + // set the top-level waker and check readiness + this.wakers.readiness().set_waker(cx.waker()); + if !this.wakers.readiness().any_ready() { + // nothing is ready yet + return Poll::Pending; + } + + let mut done_count = 0; + let group_len = this.len(); + let mut removal_queue: SmallVec<[_; 10]> = smallvec![]; + + let mut ret = Poll::Pending; + + for index in this.keys.iter().cloned() { + // can we make progress for this item? + if !(this.states[index].is_pending() && this.wakers.readiness().clear_ready(index)) { + continue; + } + + // obtain the intermediate waker + let mut cx = Context::from_waker(this.wakers.get(index).unwrap()); + + // SAFETY: this item here is a projection from the slab, which we're reading from + let pollable = unsafe { Pin::new_unchecked(&mut this.items[index]) }; + match B::poll(pollable, &mut cx) { + ControlFlow::Break((result, PollAgain::Stop)) => { + for item in removal_queue { + this.remove(item); + } + this.remove(Key(index)); + return Poll::Ready(Some((Key(index), result))); + } + ControlFlow::Break((result, PollAgain::Poll)) => { + this.states[index].set_pending(); + this.wakers.readiness().set_ready(index); + + ret = Poll::Ready(Some((Key(index), result))); + break; + } + ControlFlow::Continue(PollAgain::Stop) => { + done_count += 1; + removal_queue.push(Key(index)); + continue; + } + ControlFlow::Continue(PollAgain::Poll) => continue, + } + } + for item in removal_queue { + this.remove(item); + } + + if done_count == group_len { + return Poll::Ready(None); + } + + ret + } +} + +/// Used to tell the callee of [`PollBehavior::poll`] +/// whether the `poll`'ed item should be polled again. +pub(crate) enum PollAgain { + /// Keep polling + Poll, + /// Stop polling + Stop, +} + +pub(crate) trait PollBehavior

{ + /// The output type of the polled item + type Output; + + /// Poll the underlying item and decides how the iteration should proceed + /// + /// # Return value + /// The returned value coordinates two key aspects of the group iteration: + /// - whether the group should keep iterating over the next items; + /// - `ControlFlow::Continue(_)` to inform that the group + /// should proceed to the next item + /// - `ControlFlow::Break(_)` to inform that the group + /// should stop iterating + /// - whether the group should poll the same item again; + /// - [`PollAgain::Poll`] to inform the group to + /// mark that the item should be polled again + /// - [`PollAgain::Stop`] to inform the group to + /// stop polling this item + fn poll( + this: Pin<&mut P>, + cx: &mut Context<'_>, + ) -> ControlFlow<(Self::Output, PollAgain), PollAgain>; +} + +pub(crate) struct PollFuture; + +impl PollBehavior for PollFuture { + type Output = F::Output; + + fn poll( + future: Pin<&mut F>, + cx: &mut Context<'_>, + ) -> ControlFlow<(Self::Output, PollAgain), PollAgain> { + if let Poll::Ready(output) = future.poll(cx) { + // return the futures output and inform the group to not poll it again + ControlFlow::Break((output, PollAgain::Stop)) + } else { + // future is not ready yet, keep polling + ControlFlow::Continue(PollAgain::Poll) + } + } +} + +pub(crate) struct PollStream; + +impl PollBehavior for PollStream { + type Output = S::Item; + + fn poll( + stream: Pin<&mut S>, + cx: &mut Context<'_>, + ) -> ControlFlow<(Self::Output, PollAgain), PollAgain> { + match stream.poll_next(cx) { + // stop the iteration, keep polling this stream + Poll::Ready(Some(item)) => ControlFlow::Break((item, PollAgain::Poll)), + // continue the iteration, stop polling this stream + Poll::Ready(None) => ControlFlow::Continue(PollAgain::Stop), + // continue the iteration, continue polling this stream + Poll::Pending => ControlFlow::Continue(PollAgain::Poll), + } + } +} diff --git a/src/collections/mod.rs b/src/collections/mod.rs index 33d9377..cf6ca7e 100644 --- a/src/collections/mod.rs +++ b/src/collections/mod.rs @@ -1,2 +1,4 @@ #[cfg(feature = "alloc")] +pub(crate) mod inner_group; +#[cfg(feature = "alloc")] pub mod vec; diff --git a/src/future/future_group.rs b/src/future/future_group.rs index e327f2b..9e1e9a0 100644 --- a/src/future/future_group.rs +++ b/src/future/future_group.rs @@ -1,13 +1,11 @@ -use alloc::collections::BTreeSet; -use core::fmt::{self, Debug}; +use core::fmt; use core::ops::{Deref, DerefMut}; use core::pin::Pin; use core::task::{Context, Poll}; use futures_core::stream::Stream; use futures_core::Future; -use slab::Slab; -use crate::utils::{PollState, PollVec, WakerVec}; +use crate::collections::inner_group::{InnerGroup, Key, PollFuture}; /// A growable group of futures which act as a single unit. /// @@ -57,24 +55,25 @@ use crate::utils::{PollState, PollVec, WakerVec}; /// assert_eq!(out, 10); /// # });} /// ``` - #[must_use = "`FutureGroup` does nothing if not iterated over"] -#[derive(Default)] #[pin_project::pin_project] pub struct FutureGroup { #[pin] - futures: Slab, - wakers: WakerVec, - states: PollVec, - keys: BTreeSet, + inner: InnerGroup, } -impl Debug for FutureGroup { +impl Default for FutureGroup { + fn default() -> Self { + Self::with_capacity(0) + } +} + +impl fmt::Debug for FutureGroup { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("FutureGroup") .field("slab", &"[..]") - .field("len", &self.futures.len()) - .field("capacity", &self.futures.capacity()) + .field("len", &self.inner.len()) + .field("capacity", &self.inner.capacity()) .finish() } } @@ -88,10 +87,10 @@ impl FutureGroup { /// use futures_concurrency::future::FutureGroup; /// /// let group = FutureGroup::new(); - /// # let group: FutureGroup = group; + /// # let group: FutureGroup> = group; /// ``` pub fn new() -> Self { - Self::with_capacity(0) + Self::default() } /// Create a new instance of `FutureGroup` with a given capacity. @@ -102,14 +101,11 @@ impl FutureGroup { /// use futures_concurrency::future::FutureGroup; /// /// let group = FutureGroup::with_capacity(2); - /// # let group: FutureGroup = group; + /// # let group: FutureGroup> = group; /// ``` pub fn with_capacity(capacity: usize) -> Self { Self { - futures: Slab::with_capacity(capacity), - wakers: WakerVec::new(capacity), - states: PollVec::new(capacity), - keys: BTreeSet::new(), + inner: InnerGroup::with_capacity(capacity), } } @@ -128,7 +124,7 @@ impl FutureGroup { /// assert_eq!(group.len(), 1); /// ``` pub fn len(&self) -> usize { - self.futures.len() + self.inner.len() } /// Return the capacity of the `FutureGroup`. @@ -141,10 +137,10 @@ impl FutureGroup { /// /// let group = FutureGroup::with_capacity(2); /// assert_eq!(group.capacity(), 2); - /// # let group: FutureGroup = group; + /// # let group: FutureGroup> = group; /// ``` pub fn capacity(&self) -> usize { - self.futures.capacity() + self.inner.capacity() } /// Returns true if there are no futures currently active in the group. @@ -161,7 +157,7 @@ impl FutureGroup { /// assert!(!group.is_empty()); /// ``` pub fn is_empty(&self) -> bool { - self.futures.is_empty() + self.inner.is_empty() } /// Removes a stream from the group. Returns whether the value was present in @@ -182,12 +178,8 @@ impl FutureGroup { /// # }) /// ``` pub fn remove(&mut self, key: Key) -> bool { - let is_present = self.keys.remove(&key.0); - if is_present { - self.states[key.0].set_none(); - self.futures.remove(key.0); - } - is_present + // TODO(consoli): is it useful to return the removed future here? + self.inner.remove(key).is_some() } /// Returns `true` if the `FutureGroup` contains a value for the specified key. @@ -207,7 +199,26 @@ impl FutureGroup { /// # }) /// ``` pub fn contains_key(&mut self, key: Key) -> bool { - self.keys.contains(&key.0) + self.inner.contains_key(key) + } + + /// Reserves capacity for `additional` more futures to be inserted. + /// Does nothing if the capacity is already sufficient. + /// + /// # Example + /// + /// ```rust + /// use futures_concurrency::future::FutureGroup; + /// use std::future::Ready; + /// # futures_lite::future::block_on(async { + /// let mut group: FutureGroup> = FutureGroup::with_capacity(0); + /// assert_eq!(group.capacity(), 0); + /// group.reserve(10); + /// assert_eq!(group.capacity(), 10); + /// # }) + /// ``` + pub fn reserve(&mut self, additional: usize) { + self.inner.reserve(additional); } } @@ -223,26 +234,11 @@ impl FutureGroup { /// let mut group = FutureGroup::with_capacity(2); /// group.insert(future::ready(12)); /// ``` - pub fn insert(&mut self, stream: F) -> Key + pub fn insert(&mut self, future: F) -> Key where F: Future, { - let index = self.futures.insert(stream); - self.keys.insert(index); - let key = Key(index); - - // If our slab allocated more space we need to - // update our tracking structures along with it. - let max_len = self.capacity().max(index); - self.wakers.resize(max_len); - self.states.resize(max_len); - - // Set the corresponding state - self.states[index].set_pending(); - let mut readiness = self.wakers.readiness(); - readiness.set_ready(index); - - key + self.inner.insert(future) } /// Insert a value into a pinned `FutureGroup` @@ -251,29 +247,9 @@ impl FutureGroup { /// `ConcurrentStream`. We should never expose this publicly, as the entire /// point of this crate is that we abstract the futures poll machinery away /// from end-users. - pub(crate) fn insert_pinned(self: Pin<&mut Self>, stream: F) -> Key - where - F: Future, - { - let mut this = self.project(); - // SAFETY: inserting a value into the futures slab does not ever move - // any of the existing values. - let index = unsafe { this.futures.as_mut().get_unchecked_mut() }.insert(stream); - this.keys.insert(index); - let key = Key(index); - - // If our slab allocated more space we need to - // update our tracking structures along with it. - let max_len = this.futures.as_ref().capacity().max(index); - this.wakers.resize(max_len); - this.states.resize(max_len); - - // Set the corresponding state - this.states[index].set_pending(); - let mut readiness = this.wakers.readiness(); - readiness.set_ready(index); - - key + pub(crate) fn insert_pinned(self: Pin<&mut Self>, future: F) -> Key { + let this = self.project(); + this.inner.insert_pinned(future) } /// Create a stream which also yields the key of each item. @@ -303,82 +279,12 @@ impl FutureGroup { } } -impl FutureGroup { - fn poll_next_inner( - self: Pin<&mut Self>, - cx: &Context<'_>, - ) -> Poll::Output)>> { - let mut this = self.project(); - - // Short-circuit if we have no futures to iterate over - if this.futures.is_empty() { - return Poll::Ready(None); - } - - // Set the top-level waker and check readiness - let mut readiness = this.wakers.readiness(); - readiness.set_waker(cx.waker()); - if !readiness.any_ready() { - // Nothing is ready yet - return Poll::Pending; - } - - // Setup our futures state - let mut ret = Poll::Pending; - let states = this.states; - - // SAFETY: We unpin the future group so we can later individually access - // single futures. Either to read from them or to drop them. - let futures = unsafe { this.futures.as_mut().get_unchecked_mut() }; - - for index in this.keys.iter().cloned() { - if states[index].is_pending() && readiness.clear_ready(index) { - // unlock readiness so we don't deadlock when polling - #[allow(clippy::drop_non_drop)] - drop(readiness); - - // Obtain the intermediate waker. - let mut cx = Context::from_waker(this.wakers.get(index).unwrap()); - - // SAFETY: this future here is a projection from the futures - // vec, which we're reading from. - let future = unsafe { Pin::new_unchecked(&mut futures[index]) }; - match future.poll(&mut cx) { - Poll::Ready(item) => { - // Set the return type for the function - ret = Poll::Ready(Some((Key(index), item))); - - // Remove all associated data with the future - // The only data we can't remove directly is the key entry. - states[index] = PollState::None; - futures.remove(index); - - break; - } - // Keep looping if there is nothing for us to do - Poll::Pending => {} - }; - - // Lock readiness so we can use it again - readiness = this.wakers.readiness(); - } - } - - // Now that we're no longer borrowing `this.keys` we can remove - // the current key from the set - if let Poll::Ready(Some((key, _))) = ret { - this.keys.remove(&key.0); - } - - ret - } -} - impl Stream for FutureGroup { type Item = ::Output; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.poll_next_inner(cx) { + let this = self.project(); + match this.inner.poll_next_inner(cx) { Poll::Ready(Some((_key, item))) => Poll::Ready(Some(item)), Poll::Ready(None) => Poll::Ready(None), Poll::Pending => Poll::Pending, @@ -398,10 +304,6 @@ impl FromIterator for FutureGroup { } } -/// A key used to index into the `FutureGroup` type. -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct Key(usize); - /// Iterate over items in the futures group with their associated keys. #[derive(Debug)] #[pin_project::pin_project] @@ -429,7 +331,8 @@ impl Stream for Keyed { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); - this.group.as_mut().poll_next_inner(cx) + let inner = unsafe { this.group.as_mut().map_unchecked_mut(|t| &mut t.inner) }; + inner.poll_next_inner(cx) } } @@ -442,7 +345,7 @@ mod test { #[test] fn smoke() { futures_lite::future::block_on(async { - let mut group = FutureGroup::new(); + let mut group: FutureGroup> = FutureGroup::with_capacity(0); group.insert(future::ready(2)); group.insert(future::ready(4)); diff --git a/src/stream/stream_group.rs b/src/stream/stream_group.rs index f0bcab7..c6e70dd 100644 --- a/src/stream/stream_group.rs +++ b/src/stream/stream_group.rs @@ -1,13 +1,10 @@ -use alloc::collections::BTreeSet; -use core::fmt::{self, Debug}; +use core::fmt; use core::ops::{Deref, DerefMut}; use core::pin::Pin; use core::task::{Context, Poll}; use futures_core::Stream; -use slab::Slab; -use smallvec::{smallvec, SmallVec}; -use crate::utils::{PollState, PollVec, WakerVec}; +use crate::collections::inner_group::{InnerGroup, Key, PollStream}; /// A growable group of streams which act as a single unit. /// @@ -57,21 +54,24 @@ use crate::utils::{PollState, PollVec, WakerVec}; /// # }); /// ``` #[must_use = "`StreamGroup` does nothing if not iterated over"] -#[derive(Default)] #[pin_project::pin_project] pub struct StreamGroup { #[pin] - streams: Slab, - wakers: WakerVec, - states: PollVec, - keys: BTreeSet, - key_removal_queue: SmallVec<[usize; 10]>, + inner: InnerGroup, } -impl Debug for StreamGroup { +impl Default for StreamGroup { + fn default() -> Self { + Self::with_capacity(0) + } +} + +impl fmt::Debug for StreamGroup { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("StreamGroup") .field("slab", &"[..]") + .field("len", &self.inner.len()) + .field("capacity", &self.inner.capacity()) .finish() } } @@ -103,11 +103,7 @@ impl StreamGroup { /// ``` pub fn with_capacity(capacity: usize) -> Self { Self { - streams: Slab::with_capacity(capacity), - wakers: WakerVec::new(capacity), - states: PollVec::new(capacity), - keys: BTreeSet::new(), - key_removal_queue: smallvec![], + inner: InnerGroup::with_capacity(capacity), } } @@ -125,7 +121,7 @@ impl StreamGroup { /// assert_eq!(group.len(), 1); /// ``` pub fn len(&self) -> usize { - self.streams.len() + self.inner.len() } /// Return the capacity of the `StreamGroup`. @@ -141,7 +137,7 @@ impl StreamGroup { /// # let group: StreamGroup = group; /// ``` pub fn capacity(&self) -> usize { - self.streams.capacity() + self.inner.capacity() } /// Returns true if there are no futures currently active in the group. @@ -158,7 +154,7 @@ impl StreamGroup { /// assert!(!group.is_empty()); /// ``` pub fn is_empty(&self) -> bool { - self.streams.is_empty() + self.inner.is_empty() } /// Removes a stream from the group. Returns whether the value was present in @@ -179,12 +175,7 @@ impl StreamGroup { /// # }) /// ``` pub fn remove(&mut self, key: Key) -> bool { - let is_present = self.keys.remove(&key.0); - if is_present { - self.states[key.0].set_none(); - self.streams.remove(key.0); - } - is_present + self.inner.remove(key).is_some() } /// Returns `true` if the `StreamGroup` contains a value for the specified key. @@ -204,7 +195,26 @@ impl StreamGroup { /// # }) /// ``` pub fn contains_key(&mut self, key: Key) -> bool { - self.keys.contains(&key.0) + self.inner.contains_key(key) + } + + /// Reserves capacity for `additional` more streams to be inserted. + /// Does nothing if the capacity is already sufficient. + /// + /// # Example + /// + /// ```rust + /// use futures_concurrency::stream::StreamGroup; + /// use futures_lite::stream::Once; + /// # futures_lite::future::block_on(async { + /// let mut group: StreamGroup> = StreamGroup::with_capacity(0); + /// assert_eq!(group.capacity(), 0); + /// group.reserve(10); + /// assert_eq!(group.capacity(), 10); + /// # }) + /// ``` + pub fn reserve(&mut self, additional: usize) { + self.inner.reserve(additional); } } @@ -224,22 +234,7 @@ impl StreamGroup { where S: Stream, { - let index = self.streams.insert(stream); - self.keys.insert(index); - let key = Key(index); - - // If our slab allocated more space we need to - // update our tracking structures along with it. - let max_len = self.capacity().max(index); - self.wakers.resize(max_len); - self.states.resize(max_len); - - // Set the corresponding state - self.states[index].set_pending(); - let mut readiness = self.wakers.readiness(); - readiness.set_ready(index); - - key + self.inner.insert(stream) } /// Create a stream which also yields the key of each item. @@ -268,104 +263,12 @@ impl StreamGroup { } } -impl StreamGroup { - fn poll_next_inner( - self: Pin<&mut Self>, - cx: &Context<'_>, - ) -> Poll::Item)>> { - let mut this = self.project(); - - // Short-circuit if we have no streams to iterate over - if this.streams.is_empty() { - return Poll::Ready(None); - } - - // Set the top-level waker and check readiness - let mut readiness = this.wakers.readiness(); - readiness.set_waker(cx.waker()); - if !readiness.any_ready() { - // Nothing is ready yet - return Poll::Pending; - } - - // Setup our stream state - let mut ret = Poll::Pending; - let mut done_count = 0; - let stream_count = this.streams.len(); - let states = this.states; - - // SAFETY: We unpin the stream set so we can later individually access - // single streams. Either to read from them or to drop them. - let streams = unsafe { this.streams.as_mut().get_unchecked_mut() }; - - for index in this.keys.iter().cloned() { - if states[index].is_pending() && readiness.clear_ready(index) { - // unlock readiness so we don't deadlock when polling - #[allow(clippy::drop_non_drop)] - drop(readiness); - - // Obtain the intermediate waker. - let mut cx = Context::from_waker(this.wakers.get(index).unwrap()); - - // SAFETY: this stream here is a projection from the streams - // vec, which we're reading from. - let stream = unsafe { Pin::new_unchecked(&mut streams[index]) }; - match stream.poll_next(&mut cx) { - Poll::Ready(Some(item)) => { - // Set the return type for the function - ret = Poll::Ready(Some((Key(index), item))); - - // We just obtained an item from this index, make sure - // we check it again on a next iteration - states[index] = PollState::Pending; - let mut readiness = this.wakers.readiness(); - readiness.set_ready(index); - - break; - } - Poll::Ready(None) => { - // A stream has ended, make note of that - done_count += 1; - - // Remove all associated data about the stream. - // The only data we can't remove directly is the key entry. - states[index] = PollState::None; - streams.remove(index); - this.key_removal_queue.push(index); - } - // Keep looping if there is nothing for us to do - Poll::Pending => {} - }; - - // Lock readiness so we can use it again - readiness = this.wakers.readiness(); - } - } - - // Now that we're no longer borrowing `this.keys` we can loop over - // which items we need to remove - if !this.key_removal_queue.is_empty() { - for key in this.key_removal_queue.iter() { - this.keys.remove(key); - } - this.key_removal_queue.clear(); - } - - // If all streams turned up with `Poll::Ready(None)` our - // stream should return that - if done_count == stream_count { - ret = Poll::Ready(None); - } - - ret - } -} - impl Stream for StreamGroup { type Item = ::Item; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.poll_next_inner(cx) { + let this = self.project(); + match this.inner.poll_next_inner(cx) { Poll::Ready(Some((_key, item))) => Poll::Ready(Some(item)), Poll::Ready(None) => Poll::Ready(None), Poll::Pending => Poll::Pending, @@ -385,10 +288,6 @@ impl FromIterator for StreamGroup { } } -/// A key used to index into the `StreamGroup` type. -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct Key(usize); - /// Iterate over items in the stream group with their associated keys. #[derive(Debug)] #[pin_project::pin_project] @@ -416,7 +315,10 @@ impl Stream for Keyed { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); - this.group.as_mut().poll_next_inner(cx) + // SAFETY: pin-projecting to the inner group is safe because we trust + // it that it's correctly pinned + let inner = unsafe { this.group.as_mut().map_unchecked_mut(|t| &mut t.inner) }; + inner.poll_next_inner(cx) } } diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 29bcf29..21e97ef 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -28,7 +28,7 @@ pub(crate) use pin::{get_pin_mut, iter_pin_mut}; pub(crate) use pin::{get_pin_mut_from_vec, iter_pin_mut_vec}; pub(crate) use poll_state::PollArray; #[cfg(feature = "alloc")] -pub(crate) use poll_state::{MaybeDone, PollState, PollVec}; +pub(crate) use poll_state::{MaybeDone, PollVec}; pub(crate) use tuple::{gen_conditions, tuple_len}; pub(crate) use wakers::WakerArray; #[cfg(feature = "alloc")] diff --git a/src/utils/poll_state/vec.rs b/src/utils/poll_state/vec.rs index bf9523b..f754f23 100644 --- a/src/utils/poll_state/vec.rs +++ b/src/utils/poll_state/vec.rs @@ -26,7 +26,7 @@ use super::PollState; /// ``` const MAX_INLINE_ENTRIES: usize = core::mem::size_of::() * 3 - 2; -#[derive(Default)] +#[derive(Default, Debug)] pub(crate) struct PollVec(SmallVec<[PollState; MAX_INLINE_ENTRIES]>); impl PollVec {