diff --git a/src/collections/inner_group.rs b/src/collections/inner_group.rs index 0434068..8aa5b77 100644 --- a/src/collections/inner_group.rs +++ b/src/collections/inner_group.rs @@ -3,7 +3,7 @@ use core::{ marker::PhantomData, ops::ControlFlow, pin::Pin, - task::{Context, Poll, Waker}, + task::{Context, Poll}, }; use alloc::{collections::BTreeSet, fmt}; @@ -75,24 +75,10 @@ impl InnerGroup { } pub fn insert_pinned(mut self: Pin<&mut Self>, item: A) -> Key { - // todo: less unsafe - - if !self.has_capacity() { - let r = unsafe { &mut self.as_mut().get_unchecked_mut() }; - r.reserve(grow_group_capacity(r.cap)); - } - - let mut this = self.project(); - let items = unsafe { &mut this.items.as_mut().get_unchecked_mut() }; - let index = items.insert(item); - this.keys.insert(index); - - // set the corresponding state - this.states[index].set_pending(); - this.wakers.readiness().set_ready(index); - - *this.len += 1; - Key(index) + // SAFETY: inserting a value into the slab does not ever move + // any of the existing values + let this = unsafe { &mut self.as_mut().get_unchecked_mut() }; + this.insert(item) } pub fn remove(&mut self, key: Key) -> Option { @@ -119,10 +105,6 @@ impl InnerGroup { self.cap = new_cap; } - pub fn set_top_waker(&mut self, waker: &Waker) { - self.wakers.readiness().set_waker(waker); - } - // move to other impl block pub fn contains_key(&self, key: Key) -> bool { self.items.contains(key.0) @@ -156,14 +138,17 @@ where mut self: Pin<&mut Self>, cx: &Context<'_>, ) -> Poll> { - let this = unsafe { self.as_mut().get_unchecked_mut() }; // short-circuit if we have no items to iterate over - if this.is_empty() { + if self.is_empty() { return Poll::Ready(None); } + // SAFETY: inserting and removing items from the slab does not ever + // move any of the existing values + let this = unsafe { self.as_mut().get_unchecked_mut() }; + // set the top-level waker and check readiness - this.set_top_waker(cx.waker()); + this.wakers.readiness().set_waker(cx.waker()); if !this.wakers.readiness().any_ready() { // nothing is ready yet return Poll::Pending; @@ -184,11 +169,12 @@ where // obtain the intermediate waker let mut cx = Context::from_waker(this.wakers.get(index).unwrap()); + // SAFETY: this item here is a projection from the slab, which we're reading from let pollable = unsafe { Pin::new_unchecked(&mut this.items[index]) }; match B::poll(pollable, &mut cx) { ControlFlow::Break((result, PollAgain::Stop)) => { for item in removal_queue { - this.remove(Key(item)); + this.remove(item); } this.remove(Key(index)); return Poll::Ready(Some((Key(index), result))); @@ -202,14 +188,14 @@ where } ControlFlow::Continue(PollAgain::Stop) => { done_count += 1; - removal_queue.push(index); + removal_queue.push(Key(index)); continue; } ControlFlow::Continue(PollAgain::Poll) => continue, } } for item in removal_queue { - this.remove(Key(item)); + this.remove(item); } if done_count == group_len { @@ -253,7 +239,7 @@ pub(crate) trait PollBehavior { ) -> ControlFlow<(Self::Output, PollAgain), PollAgain>; } -pub struct PollFuture; +pub(crate) struct PollFuture; impl PollBehavior for PollFuture { type Output = F::Output; @@ -272,7 +258,7 @@ impl PollBehavior for PollFuture { } } -pub struct PollStream; +pub(crate) struct PollStream; impl PollBehavior for PollStream { type Output = S::Item;
{ ) -> ControlFlow<(Self::Output, PollAgain), PollAgain>; } -pub struct PollFuture; +pub(crate) struct PollFuture; impl PollBehavior for PollFuture { type Output = F::Output; @@ -272,7 +258,7 @@ impl PollBehavior for PollFuture { } } -pub struct PollStream; +pub(crate) struct PollStream; impl PollBehavior for PollStream { type Output = S::Item;