Skip to content

Commit

Permalink
remove passthrough and add struct docs
Browse files Browse the repository at this point in the history
  • Loading branch information
yoshuawuyts committed Mar 17, 2024
1 parent f66b58b commit 49055be
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 70 deletions.
7 changes: 7 additions & 0 deletions src/concurrent_stream/enumerate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ use std::{
task::{ready, Context, Poll},
};

/// A concurrent iterator that yields the current count and the element during iteration.
///
/// This `struct` is created by the [`enumerate`] method on [`ConcurrentStream`]. See its
/// documentation for more.
///
/// [`enumerate`]: ConcurrentStream::enumerate
/// [`ConcurrentStream`]: trait.ConcurrentStream.html
#[derive(Debug)]
pub struct Enumerate<CS: ConcurrentStream> {
inner: CS,
Expand Down
7 changes: 7 additions & 0 deletions src/concurrent_stream/limit.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
use super::{ConcurrentStream, Consumer};
use std::{future::Future, num::NonZeroUsize};

/// A concurrent iterator that limits the amount of concurrency applied.
///
/// This `struct` is created by the [`limit`] method on [`ConcurrentStream`]. See its
/// documentation for more.
///
/// [`limit`]: ConcurrentStream::limit
/// [`ConcurrentStream`]: trait.ConcurrentStream.html
#[derive(Debug)]
pub struct Limit<CS: ConcurrentStream> {
inner: CS,
Expand Down
25 changes: 13 additions & 12 deletions src/concurrent_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@ mod for_each;
mod into_concurrent_iterator;
mod limit;
mod map;
mod passthrough;
mod take;
mod try_for_each;

use enumerate::Enumerate;
use for_each::ForEachConsumer;
use limit::Limit;
use passthrough::Passthrough;
use std::future::Future;
use std::num::NonZeroUsize;
use try_for_each::TryForEachConsumer;

pub use enumerate::Enumerate;
pub use into_concurrent_iterator::{FromStream, IntoConcurrentStream};
pub use limit::Limit;
pub use map::Map;
pub use take::Take;

use self::drain::Drain;

Expand Down Expand Up @@ -87,14 +87,6 @@ pub trait ConcurrentStream {
Enumerate::new(self)
}

/// Obtain a simple pass-through adapter.
fn passthrough(self) -> Passthrough<Self>
where
Self: Sized,
{
Passthrough::new(self)
}

/// Iterate over each item in sequence
async fn drain(self)
where
Expand All @@ -111,6 +103,15 @@ pub trait ConcurrentStream {
Limit::new(self, limit)
}

/// Creates a stream that yields the first `n`` elements, or fewer if the
/// underlying iterator ends sooner.
fn take(self, limit: usize) -> Take<Self>
where
Self: Sized,
{
Take::new(self, limit)
}

/// Convert items from one type into another
fn map<F, FutB, B>(self, f: F) -> Map<Self, F, Self::Future, Self::Item, FutB, B>
where
Expand Down
58 changes: 0 additions & 58 deletions src/concurrent_stream/passthrough.rs

This file was deleted.

102 changes: 102 additions & 0 deletions src/concurrent_stream/take.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
use super::{ConcurrentStream, Consumer, ConsumerState};
use std::future::Future;

/// A concurrent iterator that only iterates over the first `n` iterations of `iter`.
///
/// This `struct` is created by the [`take`] method on [`ConcurrentStream`]. See its
/// documentation for more.
///
/// [`take`]: ConcurrentStream::take
/// [`ConcurrentStream`]: trait.ConcurrentStream.html
#[derive(Debug)]
pub struct Take<CS: ConcurrentStream> {
inner: CS,
limit: usize,
}

impl<CS: ConcurrentStream> Take<CS> {
pub(crate) fn new(inner: CS, limit: usize) -> Self {
Self { inner, limit }
}
}

impl<CS: ConcurrentStream> ConcurrentStream for Take<CS> {
type Item = CS::Item;
type Future = CS::Future;

async fn drive<C>(self, consumer: C) -> C::Output
where
C: Consumer<Self::Item, Self::Future>,
{
self.inner
.drive(TakeConsumer {
inner: consumer,
count: 0,
limit: self.limit,
})
.await
}

// NOTE: this is the only interesting bit in this module. When a limit is
// set, this now starts using it.
fn concurrency_limit(&self) -> Option<std::num::NonZeroUsize> {
self.inner.concurrency_limit()
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}

struct TakeConsumer<C> {
inner: C,
count: usize,
limit: usize,
}
impl<C, Item, Fut> Consumer<Item, Fut> for TakeConsumer<C>
where
Fut: Future<Output = Item>,
C: Consumer<Item, Fut>,
{
type Output = C::Output;

async fn send(&mut self, future: Fut) -> ConsumerState {
self.count += 1;
let state = self.inner.send(future).await;
if self.count >= self.limit {
ConsumerState::Break
} else {
state
}
}

async fn progress(&mut self) -> ConsumerState {
self.inner.progress().await
}

async fn finish(self) -> Self::Output {
self.inner.finish().await
}
}

#[cfg(test)]
mod test {
use crate::concurrent_stream::{ConcurrentStream, IntoConcurrentStream};
use futures_lite::stream;

#[test]
fn enumerate() {
futures_lite::future::block_on(async {
let mut n = 0;
stream::iter(std::iter::from_fn(|| {
let v = n;
n += 1;
Some(v)
}))
.co()
.take(5)
.for_each(|n| async move { assert!(dbg!(n) < 5) })
.await;
});
}
}

0 comments on commit 49055be

Please sign in to comment.