From 557497515e353860a264c77dcc02db1d22fb3d6b Mon Sep 17 00:00:00 2001 From: Matheus Consoli Date: Sat, 23 Mar 2024 15:47:14 -0300 Subject: [PATCH] Simplifies and document `PollBehavior` --- src/collections/inner_group.rs | 139 +++++++++++++++++---------------- src/future/future_group.rs | 5 +- src/stream/stream_group.rs | 5 +- 3 files changed, 75 insertions(+), 74 deletions(-) diff --git a/src/collections/inner_group.rs b/src/collections/inner_group.rs index 554b66b..0434068 100644 --- a/src/collections/inner_group.rs +++ b/src/collections/inner_group.rs @@ -1,4 +1,5 @@ use core::{ + future::Future, marker::PhantomData, ops::ControlFlow, pin::Pin, @@ -6,6 +7,7 @@ use core::{ }; use alloc::{collections::BTreeSet, fmt}; +use futures_core::Stream; use slab::Slab; use smallvec::{smallvec, SmallVec}; @@ -146,19 +148,14 @@ impl fmt::Debug for InnerGroup { #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct Key(pub usize); -// ---- -// prototyping - -use theory::*; - impl InnerGroup where - B: PollBehavior, + B: PollBehavior, { pub fn poll_next_inner( mut self: Pin<&mut Self>, cx: &Context<'_>, - ) -> Poll> { + ) -> Poll> { let this = unsafe { self.as_mut().get_unchecked_mut() }; // short-circuit if we have no items to iterate over if this.is_empty() { @@ -189,26 +186,26 @@ where let pollable = unsafe { Pin::new_unchecked(&mut this.items[index]) }; match B::poll(pollable, &mut cx) { - Poll::Ready(ControlFlow::Break((result, PollAgain::Stop))) => { + ControlFlow::Break((result, PollAgain::Stop)) => { for item in removal_queue { this.remove(Key(item)); } this.remove(Key(index)); return Poll::Ready(Some((Key(index), result))); } - Poll::Ready(ControlFlow::Break((result, PollAgain::Poll))) => { + ControlFlow::Break((result, PollAgain::Poll)) => { this.states[index].set_pending(); this.wakers.readiness().set_ready(index); ret = Poll::Ready(Some((Key(index), result))); break; } - Poll::Ready(ControlFlow::Continue(_)) => { + ControlFlow::Continue(PollAgain::Stop) => { done_count += 1; removal_queue.push(index); continue; } - Poll::Pending => continue, + ControlFlow::Continue(PollAgain::Poll) => continue, } } for item in removal_queue { @@ -223,68 +220,74 @@ where } } -pub mod theory { - use core::future::Future; - use core::marker::PhantomData; - use core::ops::ControlFlow; - use core::pin::Pin; - use core::task::{Context, Poll}; - - use futures_core::Stream; - - pub enum PollAgain { - Poll, - Stop, - } - - pub trait PollBehavior { - type Item; - type Polling; - type Poll; +/// Used to tell the callee of [`PollBehavior::poll`] +/// whether the `poll`'ed item should be polled again. +pub(crate) enum PollAgain { + /// Keep polling + Poll, + /// Stop polling + Stop, +} - fn poll( - this: Pin<&mut Self::Polling>, - cx: &mut Context<'_>, - ) -> Poll>; - } +pub(crate) trait PollBehavior

{ + /// The output type of the polled item + type Output; + + /// Poll the underlying item and decides how the iteration should proceed + /// + /// # Return value + /// The returned value coordinates two key aspects of the group iteration: + /// - whether the group should keep iterating over the next items; + /// - `ControlFlow::Continue(_)` to inform that the group + /// should proceed to the next item + /// - `ControlFlow::Break(_)` to inform that the group + /// should stop iterating + /// - whether the group should poll the same item again; + /// - [`PollAgain::Poll`] to inform the group to + /// mark that the item should be polled again + /// - [`PollAgain::Stop`] to inform the group to + /// stop polling this item + fn poll( + this: Pin<&mut P>, + cx: &mut Context<'_>, + ) -> ControlFlow<(Self::Output, PollAgain), PollAgain>; +} - #[derive(Debug)] - pub struct PollFuture(PhantomData); - - impl PollBehavior for PollFuture { - type Item = F::Output; - type Polling = F; - type Poll = Self::Item; - - fn poll( - this: Pin<&mut Self::Polling>, - cx: &mut Context<'_>, - ) -> Poll> { - if let Poll::Ready(item) = this.poll(cx) { - Poll::Ready(ControlFlow::Break((item, PollAgain::Stop))) - } else { - Poll::Pending - } +pub struct PollFuture; + +impl PollBehavior for PollFuture { + type Output = F::Output; + + fn poll( + future: Pin<&mut F>, + cx: &mut Context<'_>, + ) -> ControlFlow<(Self::Output, PollAgain), PollAgain> { + if let Poll::Ready(output) = future.poll(cx) { + // return the futures output and inform the group to not poll it again + ControlFlow::Break((output, PollAgain::Stop)) + } else { + // future is not ready yet, keep polling + ControlFlow::Continue(PollAgain::Poll) } } +} - #[derive(Debug)] - pub struct PollStream(PhantomData); - - impl PollBehavior for PollStream { - type Item = S::Item; - type Polling = S; - type Poll = Self::Item; - - fn poll( - this: Pin<&mut Self::Polling>, - cx: &mut Context<'_>, - ) -> Poll> { - match this.poll_next(cx) { - Poll::Ready(Some(item)) => Poll::Ready(ControlFlow::Break((item, PollAgain::Poll))), - Poll::Ready(None) => Poll::Ready(ControlFlow::Continue(())), - Poll::Pending => Poll::Pending, - } +pub struct PollStream; + +impl PollBehavior for PollStream { + type Output = S::Item; + + fn poll( + stream: Pin<&mut S>, + cx: &mut Context<'_>, + ) -> ControlFlow<(Self::Output, PollAgain), PollAgain> { + match stream.poll_next(cx) { + // stop the iteration, keep polling this stream + Poll::Ready(Some(item)) => ControlFlow::Break((item, PollAgain::Poll)), + // continue the iteration, stop polling this stream + Poll::Ready(None) => ControlFlow::Continue(PollAgain::Stop), + // continue the iteration, continue polling this stream + Poll::Pending => ControlFlow::Continue(PollAgain::Poll), } } } diff --git a/src/future/future_group.rs b/src/future/future_group.rs index 27955a8..23fdb66 100644 --- a/src/future/future_group.rs +++ b/src/future/future_group.rs @@ -4,8 +4,7 @@ use core::task::{Context, Poll}; use futures_core::stream::Stream; use futures_core::Future; -use crate::collections::inner_group::theory::PollFuture; -use crate::collections::inner_group::{InnerGroup, Key}; +use crate::collections::inner_group::{InnerGroup, Key, PollFuture}; /// A growable group of futures which act as a single unit. /// @@ -60,7 +59,7 @@ use crate::collections::inner_group::{InnerGroup, Key}; #[pin_project::pin_project] pub struct FutureGroup { #[pin] - inner: InnerGroup>, + inner: InnerGroup, } impl FutureGroup { diff --git a/src/stream/stream_group.rs b/src/stream/stream_group.rs index b6d4773..364c70a 100644 --- a/src/stream/stream_group.rs +++ b/src/stream/stream_group.rs @@ -4,8 +4,7 @@ use core::pin::Pin; use core::task::{Context, Poll}; use futures_core::Stream; -use crate::collections::inner_group::theory::PollStream; -use crate::collections::inner_group::{InnerGroup, Key}; +use crate::collections::inner_group::{InnerGroup, Key, PollStream}; /// A growable group of streams which act as a single unit. /// @@ -59,7 +58,7 @@ use crate::collections::inner_group::{InnerGroup, Key}; #[pin_project::pin_project] pub struct StreamGroup { #[pin] - inner: InnerGroup>, + inner: InnerGroup, } impl StreamGroup {