>;
- }
+pub(crate) trait PollBehavior {
+ /// The output type of the polled item
+ type Output;
+
+ /// Poll the underlying item and decides how the iteration should proceed
+ ///
+ /// # Return value
+ /// The returned value coordinates two key aspects of the group iteration:
+ /// - whether the group should keep iterating over the next items;
+ /// - `ControlFlow::Continue(_)` to inform that the group
+ /// should proceed to the next item
+ /// - `ControlFlow::Break(_)` to inform that the group
+ /// should stop iterating
+ /// - whether the group should poll the same item again;
+ /// - [`PollAgain::Poll`] to inform the group to
+ /// mark that the item should be polled again
+ /// - [`PollAgain::Stop`] to inform the group to
+ /// stop polling this item
+ fn poll(
+ this: Pin<&mut P>,
+ cx: &mut Context<'_>,
+ ) -> ControlFlow<(Self::Output, PollAgain), PollAgain>;
+}
- #[derive(Debug)]
- pub struct PollFuture(PhantomData);
-
- impl PollBehavior for PollFuture {
- type Item = F::Output;
- type Polling = F;
- type Poll = Self::Item;
-
- fn poll(
- this: Pin<&mut Self::Polling>,
- cx: &mut Context<'_>,
- ) -> Poll> {
- if let Poll::Ready(item) = this.poll(cx) {
- Poll::Ready(ControlFlow::Break((item, PollAgain::Stop)))
- } else {
- Poll::Pending
- }
+pub struct PollFuture;
+
+impl PollBehavior for PollFuture {
+ type Output = F::Output;
+
+ fn poll(
+ future: Pin<&mut F>,
+ cx: &mut Context<'_>,
+ ) -> ControlFlow<(Self::Output, PollAgain), PollAgain> {
+ if let Poll::Ready(output) = future.poll(cx) {
+ // return the futures output and inform the group to not poll it again
+ ControlFlow::Break((output, PollAgain::Stop))
+ } else {
+ // future is not ready yet, keep polling
+ ControlFlow::Continue(PollAgain::Poll)
}
}
+}
- #[derive(Debug)]
- pub struct PollStream(PhantomData);
-
- impl PollBehavior for PollStream {
- type Item = S::Item;
- type Polling = S;
- type Poll = Self::Item;
-
- fn poll(
- this: Pin<&mut Self::Polling>,
- cx: &mut Context<'_>,
- ) -> Poll> {
- match this.poll_next(cx) {
- Poll::Ready(Some(item)) => Poll::Ready(ControlFlow::Break((item, PollAgain::Poll))),
- Poll::Ready(None) => Poll::Ready(ControlFlow::Continue(())),
- Poll::Pending => Poll::Pending,
- }
+pub struct PollStream;
+
+impl PollBehavior for PollStream {
+ type Output = S::Item;
+
+ fn poll(
+ stream: Pin<&mut S>,
+ cx: &mut Context<'_>,
+ ) -> ControlFlow<(Self::Output, PollAgain), PollAgain> {
+ match stream.poll_next(cx) {
+ // stop the iteration, keep polling this stream
+ Poll::Ready(Some(item)) => ControlFlow::Break((item, PollAgain::Poll)),
+ // continue the iteration, stop polling this stream
+ Poll::Ready(None) => ControlFlow::Continue(PollAgain::Stop),
+ // continue the iteration, continue polling this stream
+ Poll::Pending => ControlFlow::Continue(PollAgain::Poll),
}
}
}
diff --git a/src/future/future_group.rs b/src/future/future_group.rs
index 27955a8..23fdb66 100644
--- a/src/future/future_group.rs
+++ b/src/future/future_group.rs
@@ -4,8 +4,7 @@ use core::task::{Context, Poll};
use futures_core::stream::Stream;
use futures_core::Future;
-use crate::collections::inner_group::theory::PollFuture;
-use crate::collections::inner_group::{InnerGroup, Key};
+use crate::collections::inner_group::{InnerGroup, Key, PollFuture};
/// A growable group of futures which act as a single unit.
///
@@ -60,7 +59,7 @@ use crate::collections::inner_group::{InnerGroup, Key};
#[pin_project::pin_project]
pub struct FutureGroup {
#[pin]
- inner: InnerGroup>,
+ inner: InnerGroup,
}
impl FutureGroup {
diff --git a/src/stream/stream_group.rs b/src/stream/stream_group.rs
index b6d4773..364c70a 100644
--- a/src/stream/stream_group.rs
+++ b/src/stream/stream_group.rs
@@ -4,8 +4,7 @@ use core::pin::Pin;
use core::task::{Context, Poll};
use futures_core::Stream;
-use crate::collections::inner_group::theory::PollStream;
-use crate::collections::inner_group::{InnerGroup, Key};
+use crate::collections::inner_group::{InnerGroup, Key, PollStream};
/// A growable group of streams which act as a single unit.
///
@@ -59,7 +58,7 @@ use crate::collections::inner_group::{InnerGroup, Key};
#[pin_project::pin_project]
pub struct StreamGroup {
#[pin]
- inner: InnerGroup>,
+ inner: InnerGroup,
}
impl StreamGroup {