Skip to content

Commit

Permalink
FutureGroup working with Group abstraction
Browse files Browse the repository at this point in the history
  • Loading branch information
matheus-consoli committed Mar 23, 2024
1 parent 8753dde commit c374c89
Show file tree
Hide file tree
Showing 2 changed files with 246 additions and 70 deletions.
163 changes: 161 additions & 2 deletions src/collections/inner_group.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
use core::{pin::Pin, task::Waker};
use core::{
marker::PhantomData,
ops::ControlFlow,
pin::Pin,
task::{Context, Poll, Waker},
};

use alloc::{collections::BTreeSet, fmt};

Check failure on line 8 in src/collections/inner_group.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-latest, stable)

failed to resolve: use of undeclared crate or module `alloc`

Check failure on line 8 in src/collections/inner_group.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-latest, stable)

unresolved import `alloc`
use slab::Slab;

Check failure on line 9 in src/collections/inner_group.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-latest, stable)

unresolved import `slab`
use smallvec::{smallvec, SmallVec};

Check failure on line 10 in src/collections/inner_group.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-latest, stable)

unresolved import `smallvec`

use crate::utils::{PollVec, WakerVec};

Check failure on line 12 in src/collections/inner_group.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-latest, stable)

unresolved imports `crate::utils::PollVec`, `crate::utils::WakerVec`

Expand All @@ -16,6 +22,7 @@ pub struct InnerGroup<A> {
pub keys: BTreeSet<usize>,
cap: usize,
len: usize,
//_poll_behavior: PhantomData<B>,
}

impl<A> InnerGroup<A> {
Expand All @@ -27,6 +34,7 @@ impl<A> InnerGroup<A> {
keys: BTreeSet::new(),
cap,
len: 0,
// _poll_behavior: PhantomData,
}
}

Expand Down Expand Up @@ -132,7 +140,7 @@ impl<A> Default for InnerGroup<A> {

impl<A> fmt::Debug for InnerGroup<A> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("GroupInner")
f.debug_struct("InnerGroup")
.field("cap", &self.cap)
.field("len", &self.len)
.finish()
Expand All @@ -142,3 +150,154 @@ impl<A> fmt::Debug for InnerGroup<A> {
/// A key used to index into the `FutureGroup` type.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Key(pub usize);

// ----
// prototyping

use theory::*;

#[derive(Debug)]
#[pin_project::pin_project]
pub struct Group<A, B> {
#[pin]
pub inner: InnerGroup<A>,
pub _poll_behavior: PhantomData<B>,
}

impl<A, B> Group<A, B>
where
B: PollBehavior<Polling = A>,
{
pub fn poll_next_inner(self: Pin<&mut Self>, cx: &Context<'_>) -> Poll<Option<(Key, B::Poll)>> {
let mut this = self.project();
let group = unsafe { this.inner.as_mut().get_unchecked_mut() };

// short-circuit if we have no items to iterate over
if group.is_empty() {
return Poll::Ready(None);
}

// set the top-level waker and check readiness
group.set_top_waker(cx.waker());
if !group.any_ready() {
// nothing is ready yet
return Poll::Pending;
}

let mut done_count = 0;
let group_len = group.len();
let mut removal_queue: SmallVec<[_; 10]> = smallvec![];

let mut ret = Poll::Pending;

for index in group.keys.iter().cloned() {
if !group.can_progress_index(index) {
continue;
}

// obtain the intermediate waker
let mut cx = Context::from_waker(group.wakers.get(index).unwrap());

let pollable = unsafe { Pin::new_unchecked(&mut group.items[index]) };
match B::poll(pollable, &mut cx) {
Poll::Ready(ControlFlow::Break((result, PollAgain::Stop))) => {
for item in removal_queue {
group.remove(Key(item));
}
group.remove(Key(index));
return Poll::Ready(Some((Key(index), result)));
}
Poll::Ready(ControlFlow::Break((result, PollAgain::Poll))) => {
group.states[index].set_pending();
group.wakers.readiness().set_ready(index);

ret = Poll::Ready(Some((Key(index), result)));
break;
}
Poll::Ready(ControlFlow::Continue(_)) => {
done_count += 1;
removal_queue.push(index);
continue;
}
Poll::Pending => continue,
}
}
for item in removal_queue {
group.remove(Key(item));
}

if done_count == group_len {
return Poll::Ready(None);
}

ret
}
}

pub mod theory {
use core::future::Future;
use core::marker::PhantomData;
use core::ops::ControlFlow;
use core::pin::Pin;
use core::task::{Context, Poll};

use futures_core::Stream;

pub enum PollAgain {
Poll,
Stop,
}

pub trait PollBehavior {
type Item;
type Polling;
type Poll;

fn poll(
this: Pin<&mut Self::Polling>,
cx: &mut Context<'_>,
) -> Poll<ControlFlow<(Self::Poll, PollAgain)>>;
}

#[derive(Debug)]
pub struct PollFuture<F: Future>(PhantomData<F>);

impl<F: Future> PollBehavior for PollFuture<F> {
type Item = F::Output;
type Polling = F;
type Poll = Self::Item;

fn poll(
this: Pin<&mut Self::Polling>,
cx: &mut Context<'_>,
) -> Poll<ControlFlow<(Self::Poll, PollAgain)>> {
if let Poll::Ready(item) = this.poll(cx) {
Poll::Ready(ControlFlow::Break((item, PollAgain::Stop)))
} else {
Poll::Pending
}
}
}

#[derive(Debug)]
pub struct PollStream<S: Stream>(PhantomData<S>);

impl<S: Stream> PollBehavior for PollStream<S> {
type Item = S::Item;
type Polling = S;
type Poll = Option<Self::Item>;

fn poll(
this: Pin<&mut Self::Polling>,
cx: &mut Context<'_>,
) -> Poll<ControlFlow<(Self::Poll, PollAgain)>> {
match this.poll_next(cx) {
Poll::Ready(Some(item)) => {
Poll::Ready(ControlFlow::Break((Some(item), PollAgain::Poll)))
}
Poll::Ready(None) => Poll::Ready(ControlFlow::Continue(())),
Poll::Pending => Poll::Pending,
}
}
}
}
Loading

0 comments on commit c374c89

Please sign in to comment.