From 49055bedb65fb22a6aab1836aafeedf020962150 Mon Sep 17 00:00:00 2001 From: Yosh Date: Sun, 17 Mar 2024 01:08:56 +0100 Subject: [PATCH] remove passthrough and add struct docs --- src/concurrent_stream/enumerate.rs | 7 ++ src/concurrent_stream/limit.rs | 7 ++ src/concurrent_stream/mod.rs | 25 +++---- src/concurrent_stream/passthrough.rs | 58 --------------- src/concurrent_stream/take.rs | 102 +++++++++++++++++++++++++++ 5 files changed, 129 insertions(+), 70 deletions(-) delete mode 100644 src/concurrent_stream/passthrough.rs create mode 100644 src/concurrent_stream/take.rs diff --git a/src/concurrent_stream/enumerate.rs b/src/concurrent_stream/enumerate.rs index 8529a75..ff1eee1 100644 --- a/src/concurrent_stream/enumerate.rs +++ b/src/concurrent_stream/enumerate.rs @@ -5,6 +5,13 @@ use std::{ task::{ready, Context, Poll}, }; +/// A concurrent iterator that yields the current count and the element during iteration. +/// +/// This `struct` is created by the [`enumerate`] method on [`ConcurrentStream`]. See its +/// documentation for more. +/// +/// [`enumerate`]: ConcurrentStream::enumerate +/// [`ConcurrentStream`]: trait.ConcurrentStream.html #[derive(Debug)] pub struct Enumerate { inner: CS, diff --git a/src/concurrent_stream/limit.rs b/src/concurrent_stream/limit.rs index 07cba1f..b421d05 100644 --- a/src/concurrent_stream/limit.rs +++ b/src/concurrent_stream/limit.rs @@ -1,6 +1,13 @@ use super::{ConcurrentStream, Consumer}; use std::{future::Future, num::NonZeroUsize}; +/// A concurrent iterator that limits the amount of concurrency applied. +/// +/// This `struct` is created by the [`limit`] method on [`ConcurrentStream`]. See its +/// documentation for more. +/// +/// [`limit`]: ConcurrentStream::limit +/// [`ConcurrentStream`]: trait.ConcurrentStream.html #[derive(Debug)] pub struct Limit { inner: CS, diff --git a/src/concurrent_stream/mod.rs b/src/concurrent_stream/mod.rs index eb83303..b85202f 100644 --- a/src/concurrent_stream/mod.rs +++ b/src/concurrent_stream/mod.rs @@ -6,19 +6,19 @@ mod for_each; mod into_concurrent_iterator; mod limit; mod map; -mod passthrough; +mod take; mod try_for_each; -use enumerate::Enumerate; use for_each::ForEachConsumer; -use limit::Limit; -use passthrough::Passthrough; use std::future::Future; use std::num::NonZeroUsize; use try_for_each::TryForEachConsumer; +pub use enumerate::Enumerate; pub use into_concurrent_iterator::{FromStream, IntoConcurrentStream}; +pub use limit::Limit; pub use map::Map; +pub use take::Take; use self::drain::Drain; @@ -87,14 +87,6 @@ pub trait ConcurrentStream { Enumerate::new(self) } - /// Obtain a simple pass-through adapter. - fn passthrough(self) -> Passthrough - where - Self: Sized, - { - Passthrough::new(self) - } - /// Iterate over each item in sequence async fn drain(self) where @@ -111,6 +103,15 @@ pub trait ConcurrentStream { Limit::new(self, limit) } + /// Creates a stream that yields the first `n`` elements, or fewer if the + /// underlying iterator ends sooner. + fn take(self, limit: usize) -> Take + where + Self: Sized, + { + Take::new(self, limit) + } + /// Convert items from one type into another fn map(self, f: F) -> Map where diff --git a/src/concurrent_stream/passthrough.rs b/src/concurrent_stream/passthrough.rs deleted file mode 100644 index d5ae2fa..0000000 --- a/src/concurrent_stream/passthrough.rs +++ /dev/null @@ -1,58 +0,0 @@ -use super::{ConcurrentStream, Consumer}; -use std::future::Future; - -#[derive(Debug)] -pub struct Passthrough { - inner: CS, -} - -impl Passthrough { - pub(crate) fn new(inner: CS) -> Self { - Self { inner } - } -} - -impl ConcurrentStream for Passthrough { - type Item = CS::Item; - type Future = CS::Future; - - async fn drive(self, consumer: C) -> C::Output - where - C: Consumer, - { - self.inner - .drive(PassthroughConsumer { inner: consumer }) - .await - } - - fn concurrency_limit(&self) -> Option { - self.inner.concurrency_limit() - } - - fn size_hint(&self) -> (usize, Option) { - self.inner.size_hint() - } -} - -struct PassthroughConsumer { - inner: C, -} -impl Consumer for PassthroughConsumer -where - Fut: Future, - C: Consumer, -{ - type Output = C::Output; - - async fn send(&mut self, future: Fut) -> super::ConsumerState { - self.inner.send(future).await - } - - async fn progress(&mut self) -> super::ConsumerState { - self.inner.progress().await - } - - async fn finish(self) -> Self::Output { - self.inner.finish().await - } -} diff --git a/src/concurrent_stream/take.rs b/src/concurrent_stream/take.rs new file mode 100644 index 0000000..72bef9b --- /dev/null +++ b/src/concurrent_stream/take.rs @@ -0,0 +1,102 @@ +use super::{ConcurrentStream, Consumer, ConsumerState}; +use std::future::Future; + +/// A concurrent iterator that only iterates over the first `n` iterations of `iter`. +/// +/// This `struct` is created by the [`take`] method on [`ConcurrentStream`]. See its +/// documentation for more. +/// +/// [`take`]: ConcurrentStream::take +/// [`ConcurrentStream`]: trait.ConcurrentStream.html +#[derive(Debug)] +pub struct Take { + inner: CS, + limit: usize, +} + +impl Take { + pub(crate) fn new(inner: CS, limit: usize) -> Self { + Self { inner, limit } + } +} + +impl ConcurrentStream for Take { + type Item = CS::Item; + type Future = CS::Future; + + async fn drive(self, consumer: C) -> C::Output + where + C: Consumer, + { + self.inner + .drive(TakeConsumer { + inner: consumer, + count: 0, + limit: self.limit, + }) + .await + } + + // NOTE: this is the only interesting bit in this module. When a limit is + // set, this now starts using it. + fn concurrency_limit(&self) -> Option { + self.inner.concurrency_limit() + } + + fn size_hint(&self) -> (usize, Option) { + self.inner.size_hint() + } +} + +struct TakeConsumer { + inner: C, + count: usize, + limit: usize, +} +impl Consumer for TakeConsumer +where + Fut: Future, + C: Consumer, +{ + type Output = C::Output; + + async fn send(&mut self, future: Fut) -> ConsumerState { + self.count += 1; + let state = self.inner.send(future).await; + if self.count >= self.limit { + ConsumerState::Break + } else { + state + } + } + + async fn progress(&mut self) -> ConsumerState { + self.inner.progress().await + } + + async fn finish(self) -> Self::Output { + self.inner.finish().await + } +} + +#[cfg(test)] +mod test { + use crate::concurrent_stream::{ConcurrentStream, IntoConcurrentStream}; + use futures_lite::stream; + + #[test] + fn enumerate() { + futures_lite::future::block_on(async { + let mut n = 0; + stream::iter(std::iter::from_fn(|| { + let v = n; + n += 1; + Some(v) + })) + .co() + .take(5) + .for_each(|n| async move { assert!(dbg!(n) < 5) }) + .await; + }); + } +}