Skip to content

Commit

Permalink
Move StreamGroup to use InnerGroup
Browse files Browse the repository at this point in the history
  • Loading branch information
matheus-consoli committed Mar 23, 2024
1 parent c374c89 commit 899f32b
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 214 deletions.
80 changes: 35 additions & 45 deletions src/collections/inner_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,18 @@ use crate::utils::{PollVec, WakerVec};
const GROUP_GROWTH_FACTOR: usize = 2;

#[pin_project::pin_project]
pub struct InnerGroup<A> {
pub struct InnerGroup<A, B> {
#[pin]
pub items: Slab<A>,
pub wakers: WakerVec,
pub states: PollVec,
pub keys: BTreeSet<usize>,
items: Slab<A>,
wakers: WakerVec,
states: PollVec,
keys: BTreeSet<usize>,
cap: usize,
len: usize,
//_poll_behavior: PhantomData<B>,
_poll_behavior: PhantomData<B>,
}

impl<A> InnerGroup<A> {
impl<A, B> InnerGroup<A, B> {
pub fn with_capacity(cap: usize) -> Self {
Self {
items: Slab::with_capacity(cap),
Expand All @@ -34,7 +34,7 @@ impl<A> InnerGroup<A> {
keys: BTreeSet::new(),
cap,
len: 0,
// _poll_behavior: PhantomData,
_poll_behavior: PhantomData,
}
}

Expand Down Expand Up @@ -71,6 +71,8 @@ impl<A> InnerGroup<A> {
}

pub fn insert_pinned(mut self: Pin<&mut Self>, item: A) -> Key {
// todo: less unsafe

if !self.has_capacity() {
let r = unsafe { &mut self.as_mut().get_unchecked_mut() };
r.resize((r.cap + 1) * GROUP_GROWTH_FACTOR);
Expand Down Expand Up @@ -122,23 +124,20 @@ impl<A> InnerGroup<A> {
pub fn can_progress_index(&self, index: usize) -> bool {
self.states[index].is_pending() && self.wakers.readiness().clear_ready(index)
}
}

/// Keyed operations
impl<A> InnerGroup<A> {
// move to other impl block
pub fn contains_key(&self, key: Key) -> bool {
self.items.contains(key.0)
}
}

impl<A> Default for InnerGroup<A> {
impl<A, B> Default for InnerGroup<A, B> {
fn default() -> Self {
Self::with_capacity(0)
}
}

impl<A> fmt::Debug for InnerGroup<A> {
impl<A, B> fmt::Debug for InnerGroup<A, B> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("InnerGroup")
.field("cap", &self.cap)
Expand All @@ -156,60 +155,53 @@ pub struct Key(pub usize);

use theory::*;

#[derive(Debug)]
#[pin_project::pin_project]
pub struct Group<A, B> {
#[pin]
pub inner: InnerGroup<A>,
pub _poll_behavior: PhantomData<B>,
}

impl<A, B> Group<A, B>
impl<A, B> InnerGroup<A, B>
where
B: PollBehavior<Polling = A>,
{
pub fn poll_next_inner(self: Pin<&mut Self>, cx: &Context<'_>) -> Poll<Option<(Key, B::Poll)>> {
let mut this = self.project();
let group = unsafe { this.inner.as_mut().get_unchecked_mut() };

pub fn poll_next_inner(
mut self: Pin<&mut Self>,
cx: &Context<'_>,
) -> Poll<Option<(Key, B::Poll)>> {
let this = unsafe { self.as_mut().get_unchecked_mut() };
// short-circuit if we have no items to iterate over
if group.is_empty() {
if this.is_empty() {
return Poll::Ready(None);
}

// set the top-level waker and check readiness
group.set_top_waker(cx.waker());
if !group.any_ready() {
this.set_top_waker(cx.waker());
if !this.any_ready() {
// nothing is ready yet
return Poll::Pending;
}

let mut done_count = 0;
let group_len = group.len();
let group_len = this.len();
let mut removal_queue: SmallVec<[_; 10]> = smallvec![];

let mut ret = Poll::Pending;

for index in group.keys.iter().cloned() {
if !group.can_progress_index(index) {
for index in this.keys.iter().cloned() {
if !this.can_progress_index(index) {
continue;
}

// obtain the intermediate waker
let mut cx = Context::from_waker(group.wakers.get(index).unwrap());
let mut cx = Context::from_waker(this.wakers.get(index).unwrap());

let pollable = unsafe { Pin::new_unchecked(&mut group.items[index]) };
let pollable = unsafe { Pin::new_unchecked(&mut this.items[index]) };
match B::poll(pollable, &mut cx) {
Poll::Ready(ControlFlow::Break((result, PollAgain::Stop))) => {
for item in removal_queue {
group.remove(Key(item));
this.remove(Key(item));
}
group.remove(Key(index));
this.remove(Key(index));
return Poll::Ready(Some((Key(index), result)));
}
Poll::Ready(ControlFlow::Break((result, PollAgain::Poll))) => {
group.states[index].set_pending();
group.wakers.readiness().set_ready(index);
this.states[index].set_pending();
this.wakers.readiness().set_ready(index);

ret = Poll::Ready(Some((Key(index), result)));
break;
Expand All @@ -223,7 +215,7 @@ where
}
}
for item in removal_queue {
group.remove(Key(item));
this.remove(Key(item));
}

if done_count == group_len {
Expand Down Expand Up @@ -260,7 +252,7 @@ pub mod theory {
}

#[derive(Debug)]
pub struct PollFuture<F: Future>(PhantomData<F>);
pub struct PollFuture<F>(PhantomData<F>);

impl<F: Future> PollBehavior for PollFuture<F> {
type Item = F::Output;
Expand All @@ -280,21 +272,19 @@ pub mod theory {
}

#[derive(Debug)]
pub struct PollStream<S: Stream>(PhantomData<S>);
pub struct PollStream<S>(PhantomData<S>);

impl<S: Stream> PollBehavior for PollStream<S> {
type Item = S::Item;
type Polling = S;
type Poll = Option<Self::Item>;
type Poll = Self::Item;

fn poll(
this: Pin<&mut Self::Polling>,
cx: &mut Context<'_>,
) -> Poll<ControlFlow<(Self::Poll, PollAgain)>> {
match this.poll_next(cx) {
Poll::Ready(Some(item)) => {
Poll::Ready(ControlFlow::Break((Some(item), PollAgain::Poll)))
}
Poll::Ready(Some(item)) => Poll::Ready(ControlFlow::Break((item, PollAgain::Poll))),
Poll::Ready(None) => Poll::Ready(ControlFlow::Continue(())),
Poll::Pending => Poll::Pending,
}
Expand Down
99 changes: 17 additions & 82 deletions src/future/future_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use futures_core::stream::Stream;
use futures_core::Future;

use crate::collections::inner_group::theory::PollFuture;
use crate::collections::inner_group::{Group, InnerGroup, Key};
use crate::collections::inner_group::{InnerGroup, Key};

/// A growable group of futures which act as a single unit.
///
Expand Down Expand Up @@ -58,12 +58,12 @@ use crate::collections::inner_group::{Group, InnerGroup, Key};
#[must_use = "`FutureGroup` does nothing if not iterated over"]
#[derive(Debug)]
#[pin_project::pin_project]
pub struct FutureGroup<F: Future> {
pub struct FutureGroup<F> {
#[pin]
inner: Group<F, PollFuture<F>>,
inner: InnerGroup<F, PollFuture<F>>,
}

impl<F: Future> FutureGroup<F> {
impl<F> FutureGroup<F> {
/// Create a new instance of `FutureGroup`.
///
/// # Example
Expand All @@ -90,10 +90,7 @@ impl<F: Future> FutureGroup<F> {
/// ```
pub fn with_capacity(capacity: usize) -> Self {
Self {
inner: Group {
inner: InnerGroup::with_capacity(capacity),
_poll_behavior: core::marker::PhantomData,
},
inner: InnerGroup::with_capacity(capacity),
}
}

Expand All @@ -112,7 +109,7 @@ impl<F: Future> FutureGroup<F> {
/// assert_eq!(group.len(), 1);
/// ```
pub fn len(&self) -> usize {
self.inner.inner.len()
self.inner.len()
}

/// Return the capacity of the `FutureGroup`.
Expand All @@ -128,7 +125,7 @@ impl<F: Future> FutureGroup<F> {
/// # let group: FutureGroup<core::future::Ready<usize>> = group;
/// ```
pub fn capacity(&self) -> usize {
self.inner.inner.capacity()
self.inner.capacity()
}

/// Returns true if there are no futures currently active in the group.
Expand All @@ -145,7 +142,7 @@ impl<F: Future> FutureGroup<F> {
/// assert!(!group.is_empty());
/// ```
pub fn is_empty(&self) -> bool {
self.inner.inner.is_empty()
self.inner.is_empty()
}

/// Removes a stream from the group. Returns whether the value was present in
Expand All @@ -167,7 +164,7 @@ impl<F: Future> FutureGroup<F> {
/// ```
pub fn remove(&mut self, key: Key) -> bool {
// TODO(consoli): is it useful to return the removed future here?
self.inner.inner.remove(key).is_some()
self.inner.remove(key).is_some()
}

/// Returns `true` if the `FutureGroup` contains a value for the specified key.
Expand All @@ -187,7 +184,7 @@ impl<F: Future> FutureGroup<F> {
/// # })
/// ```
pub fn contains_key(&mut self, key: Key) -> bool {
self.inner.inner.contains_key(key)
self.inner.contains_key(key)
}
}

Expand All @@ -207,7 +204,7 @@ impl<F: Future> FutureGroup<F> {
where
F: Future,
{
self.inner.inner.insert(future)
self.inner.insert(future)
}

/// Insert a value into a pinned `FutureGroup`
Expand All @@ -217,16 +214,8 @@ impl<F: Future> FutureGroup<F> {
/// point of this crate is that we abstract the futures poll machinery away
/// from end-users.
pub(crate) fn insert_pinned(self: Pin<&mut Self>, future: F) -> Key {
let mut this = self.project();
let inner = unsafe {
this.inner
.as_mut()
.map_unchecked_mut(|inner| &mut inner.inner)
};
inner.insert_pinned(future)

// let inner = unsafe { self.as_mut().map_unchecked_mut(|this| &mut this.inner) };
// inner.insert_pinned(future)
let this = self.project();
this.inner.insert_pinned(future)
}

/// Create a stream which also yields the key of each item.
Expand Down Expand Up @@ -256,60 +245,12 @@ impl<F: Future> FutureGroup<F> {
}
}

// impl<F: Future> FutureGroup<F> {
// fn poll_next_inner(
// self: Pin<&mut Self>,
// cx: &Context<'_>,
// ) -> Poll<Option<(Key, <F as Future>::Output)>> {
// let mut this = self.project();
// let inner = unsafe { &mut this.inner.as_mut().get_unchecked_mut() };

// // Short-circuit if we have no futures to iterate over
// if inner.is_empty() {
// return Poll::Ready(None);
// }

// // Set the top-level waker and check readiness
// inner.set_top_waker(cx.waker());
// if !inner.any_ready() {
// // Nothing is ready yet
// return Poll::Pending;
// }

// for index in inner.keys.iter().cloned() {
// // verify if the `index`th future can be polled
// if !inner.can_progress_index(index) {
// continue;
// }

// // Obtain the intermediate waker.
// let mut cx = Context::from_waker(inner.wakers.get(index).unwrap());

// // SAFETY: this future here is a projection from the futures
// // vec, which we're reading from.
// let future = unsafe { Pin::new_unchecked(&mut inner.items[index]) };
// if let Poll::Ready(item) = future.poll(&mut cx) {
// let key = Key(index);
// // Set the return type for the function
// let ret = Poll::Ready(Some((key, item)));

// // Remove all associated data with the future
// // The only data we can't remove directly is the key entry.
// inner.remove(key);

// return ret;
// }
// }
// Poll::Pending
// }
// }

impl<F: Future> Stream for FutureGroup<F> {
type Item = <F as Future>::Output;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let pinned = unsafe { self.as_mut().map_unchecked_mut(|this| &mut this.inner) };
match pinned.poll_next_inner(cx) {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
match this.inner.poll_next_inner(cx) {
Poll::Ready(Some((_key, item))) => Poll::Ready(Some(item)),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
Expand Down Expand Up @@ -355,14 +296,8 @@ impl<F: Future> Stream for Keyed<F> {
type Item = (Key, <F as Future>::Output);

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// let mut this = self.project();
// this.group.as_mut().poll_next_inner(cx)
let mut this = self.project();
let inner = unsafe {
this.group
.as_mut()
.map_unchecked_mut(|inner| &mut inner.inner)
};
let inner = unsafe { this.group.as_mut().map_unchecked_mut(|t| &mut t.inner) };
inner.poll_next_inner(cx)
}
}
Expand Down
Loading

0 comments on commit 899f32b

Please sign in to comment.