Skip to content

Commit

Permalink
Simplifies and document PollBehavior
Browse files Browse the repository at this point in the history
  • Loading branch information
matheus-consoli committed Mar 23, 2024
1 parent 6465651 commit 5574975
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 74 deletions.
139 changes: 71 additions & 68 deletions src/collections/inner_group.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use core::{
future::Future,
marker::PhantomData,
ops::ControlFlow,
pin::Pin,
task::{Context, Poll, Waker},
};

use alloc::{collections::BTreeSet, fmt};
use futures_core::Stream;
use slab::Slab;
use smallvec::{smallvec, SmallVec};

Expand Down Expand Up @@ -146,19 +148,14 @@ impl<A, B> fmt::Debug for InnerGroup<A, B> {
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Key(pub usize);

// ----
// prototyping

use theory::*;

impl<A, B> InnerGroup<A, B>
where
B: PollBehavior<Polling = A>,
B: PollBehavior<A>,
{
pub fn poll_next_inner(
mut self: Pin<&mut Self>,
cx: &Context<'_>,
) -> Poll<Option<(Key, B::Poll)>> {
) -> Poll<Option<(Key, B::Output)>> {
let this = unsafe { self.as_mut().get_unchecked_mut() };
// short-circuit if we have no items to iterate over
if this.is_empty() {
Expand Down Expand Up @@ -189,26 +186,26 @@ where

let pollable = unsafe { Pin::new_unchecked(&mut this.items[index]) };
match B::poll(pollable, &mut cx) {
Poll::Ready(ControlFlow::Break((result, PollAgain::Stop))) => {
ControlFlow::Break((result, PollAgain::Stop)) => {
for item in removal_queue {
this.remove(Key(item));
}
this.remove(Key(index));
return Poll::Ready(Some((Key(index), result)));
}
Poll::Ready(ControlFlow::Break((result, PollAgain::Poll))) => {
ControlFlow::Break((result, PollAgain::Poll)) => {
this.states[index].set_pending();
this.wakers.readiness().set_ready(index);

ret = Poll::Ready(Some((Key(index), result)));
break;
}
Poll::Ready(ControlFlow::Continue(_)) => {
ControlFlow::Continue(PollAgain::Stop) => {
done_count += 1;
removal_queue.push(index);
continue;
}
Poll::Pending => continue,
ControlFlow::Continue(PollAgain::Poll) => continue,
}
}
for item in removal_queue {
Expand All @@ -223,68 +220,74 @@ where
}
}

pub mod theory {
use core::future::Future;
use core::marker::PhantomData;
use core::ops::ControlFlow;
use core::pin::Pin;
use core::task::{Context, Poll};

use futures_core::Stream;

pub enum PollAgain {
Poll,
Stop,
}

pub trait PollBehavior {
type Item;
type Polling;
type Poll;
/// Used to tell the callee of [`PollBehavior::poll`]
/// whether the `poll`'ed item should be polled again.
pub(crate) enum PollAgain {
/// Keep polling
Poll,
/// Stop polling
Stop,
}

fn poll(
this: Pin<&mut Self::Polling>,
cx: &mut Context<'_>,
) -> Poll<ControlFlow<(Self::Poll, PollAgain)>>;
}
pub(crate) trait PollBehavior<P> {
/// The output type of the polled item
type Output;

/// Poll the underlying item and decides how the iteration should proceed
///
/// # Return value
/// The returned value coordinates two key aspects of the group iteration:
/// - whether the group should keep iterating over the next items;
/// - `ControlFlow::Continue(_)` to inform that the group
/// should proceed to the next item
/// - `ControlFlow::Break(_)` to inform that the group
/// should stop iterating
/// - whether the group should poll the same item again;
/// - [`PollAgain::Poll`] to inform the group to
/// mark that the item should be polled again
/// - [`PollAgain::Stop`] to inform the group to
/// stop polling this item
fn poll(
this: Pin<&mut P>,
cx: &mut Context<'_>,
) -> ControlFlow<(Self::Output, PollAgain), PollAgain>;
}

#[derive(Debug)]
pub struct PollFuture<F>(PhantomData<F>);

impl<F: Future> PollBehavior for PollFuture<F> {
type Item = F::Output;
type Polling = F;
type Poll = Self::Item;

fn poll(
this: Pin<&mut Self::Polling>,
cx: &mut Context<'_>,
) -> Poll<ControlFlow<(Self::Poll, PollAgain)>> {
if let Poll::Ready(item) = this.poll(cx) {
Poll::Ready(ControlFlow::Break((item, PollAgain::Stop)))
} else {
Poll::Pending
}
pub struct PollFuture;

impl<F: Future> PollBehavior<F> for PollFuture {
type Output = F::Output;

fn poll(
future: Pin<&mut F>,
cx: &mut Context<'_>,
) -> ControlFlow<(Self::Output, PollAgain), PollAgain> {
if let Poll::Ready(output) = future.poll(cx) {
// return the futures output and inform the group to not poll it again
ControlFlow::Break((output, PollAgain::Stop))
} else {
// future is not ready yet, keep polling
ControlFlow::Continue(PollAgain::Poll)
}
}
}

#[derive(Debug)]
pub struct PollStream<S>(PhantomData<S>);

impl<S: Stream> PollBehavior for PollStream<S> {
type Item = S::Item;
type Polling = S;
type Poll = Self::Item;

fn poll(
this: Pin<&mut Self::Polling>,
cx: &mut Context<'_>,
) -> Poll<ControlFlow<(Self::Poll, PollAgain)>> {
match this.poll_next(cx) {
Poll::Ready(Some(item)) => Poll::Ready(ControlFlow::Break((item, PollAgain::Poll))),
Poll::Ready(None) => Poll::Ready(ControlFlow::Continue(())),
Poll::Pending => Poll::Pending,
}
pub struct PollStream;

impl<S: Stream> PollBehavior<S> for PollStream {
type Output = S::Item;

fn poll(
stream: Pin<&mut S>,
cx: &mut Context<'_>,
) -> ControlFlow<(Self::Output, PollAgain), PollAgain> {
match stream.poll_next(cx) {
// stop the iteration, keep polling this stream
Poll::Ready(Some(item)) => ControlFlow::Break((item, PollAgain::Poll)),
// continue the iteration, stop polling this stream
Poll::Ready(None) => ControlFlow::Continue(PollAgain::Stop),
// continue the iteration, continue polling this stream
Poll::Pending => ControlFlow::Continue(PollAgain::Poll),
}
}
}
5 changes: 2 additions & 3 deletions src/future/future_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ use core::task::{Context, Poll};
use futures_core::stream::Stream;
use futures_core::Future;

use crate::collections::inner_group::theory::PollFuture;
use crate::collections::inner_group::{InnerGroup, Key};
use crate::collections::inner_group::{InnerGroup, Key, PollFuture};

/// A growable group of futures which act as a single unit.
///
Expand Down Expand Up @@ -60,7 +59,7 @@ use crate::collections::inner_group::{InnerGroup, Key};
#[pin_project::pin_project]
pub struct FutureGroup<F> {
#[pin]
inner: InnerGroup<F, PollFuture<F>>,
inner: InnerGroup<F, PollFuture>,
}

impl<F> FutureGroup<F> {
Expand Down
5 changes: 2 additions & 3 deletions src/stream/stream_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ use core::pin::Pin;
use core::task::{Context, Poll};
use futures_core::Stream;

use crate::collections::inner_group::theory::PollStream;
use crate::collections::inner_group::{InnerGroup, Key};
use crate::collections::inner_group::{InnerGroup, Key, PollStream};

/// A growable group of streams which act as a single unit.
///
Expand Down Expand Up @@ -59,7 +58,7 @@ use crate::collections::inner_group::{InnerGroup, Key};
#[pin_project::pin_project]
pub struct StreamGroup<S> {
#[pin]
inner: InnerGroup<S, PollStream<S>>,
inner: InnerGroup<S, PollStream>,
}

impl<S> StreamGroup<S> {
Expand Down

0 comments on commit 5574975

Please sign in to comment.