Skip to content

Commit

Permalink
implement collect
Browse files Browse the repository at this point in the history
  • Loading branch information
yoshuawuyts committed Mar 17, 2024
1 parent 2a9e714 commit 1ce109e
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 17 deletions.
100 changes: 100 additions & 0 deletions src/concurrent_stream/convert.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use super::{ConcurrentStream, Consumer, ConsumerState};
use crate::future::FutureGroup;
use core::future::Future;
use core::pin::Pin;
use futures_lite::StreamExt;

/// Conversion into a [`ConcurrentStream`]
pub trait IntoConcurrentStream {
/// The type of the elements being iterated over.
type Item;
/// Which kind of iterator are we turning this into?
type ConcurrentStream: ConcurrentStream<Item = Self::Item>;

/// Convert `self` into a concurrent iterator.
fn into_concurrent_stream(self) -> Self::ConcurrentStream;
}

impl<S: ConcurrentStream> IntoConcurrentStream for S {
type Item = S::Item;
type ConcurrentStream = S;

fn into_concurrent_stream(self) -> Self::ConcurrentStream {
self
}
}

/// Conversion from a [`ConcurrentStream`]
#[allow(async_fn_in_trait)]
pub trait FromConcurrentStream<A>: Sized {
/// Creates a value from a concurrent iterator.
async fn from_concurrent_stream<T>(iter: T) -> Self
where
T: IntoConcurrentStream<Item = A>;
}

impl<T> FromConcurrentStream<T> for Vec<T> {

Check failure on line 36 in src/concurrent_stream/convert.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-latest, stable)

cannot find type `Vec` in this scope
async fn from_concurrent_stream<S>(iter: S) -> Self
where
S: IntoConcurrentStream<Item = T>,
{
let stream = iter.into_concurrent_stream();
let mut output = Vec::with_capacity(stream.size_hint().1.unwrap_or_default());

Check failure on line 42 in src/concurrent_stream/convert.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-latest, stable)

failed to resolve: use of undeclared type `Vec`
stream.drive(VecConsumer::new(&mut output)).await;
output
}
}

// TODO: replace this with a generalized `fold` operation
pub(crate) struct VecConsumer<'a, Fut: Future> {
group: Pin<Box<FutureGroup<Fut>>>,

Check failure on line 50 in src/concurrent_stream/convert.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-latest, stable)

cannot find type `Box` in this scope
output: &'a mut Vec<Fut::Output>,

Check failure on line 51 in src/concurrent_stream/convert.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-latest, stable)

cannot find type `Vec` in this scope
}

impl<'a, Fut: Future> VecConsumer<'a, Fut> {
pub(crate) fn new(output: &'a mut Vec<Fut::Output>) -> Self {

Check failure on line 55 in src/concurrent_stream/convert.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-latest, stable)

cannot find type `Vec` in this scope
Self {
group: Box::pin(FutureGroup::new()),

Check failure on line 57 in src/concurrent_stream/convert.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-latest, stable)

failed to resolve: use of undeclared type `Box`
output,
}
}
}

impl<'a, Item, Fut> Consumer<Item, Fut> for VecConsumer<'a, Fut>
where
Fut: Future<Output = Item>,
{
type Output = ();

async fn send(&mut self, future: Fut) -> super::ConsumerState {
// unbounded concurrency, so we just goooo
self.group.as_mut().insert_pinned(future);
ConsumerState::Continue
}

async fn progress(&mut self) -> super::ConsumerState {
while let Some(item) = self.group.next().await {
self.output.push(item);
}
ConsumerState::Empty
}
async fn finish(mut self) -> Self::Output {
while let Some(item) = self.group.next().await {
self.output.push(item);
}
}
}

#[cfg(test)]
mod test {
use crate::prelude::*;
use futures_lite::stream;

#[test]
fn collect() {
futures_lite::future::block_on(async {
let v: Vec<_> = stream::repeat(1).co().take(5).collect().await;
assert_eq!(v, &[1, 1, 1, 1, 1]);
});
}
}
28 changes: 11 additions & 17 deletions src/concurrent_stream/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Concurrent execution of streams
mod convert;
mod drain;
mod enumerate;
mod for_each;
Expand All @@ -14,6 +15,7 @@ use core::num::NonZeroUsize;
use for_each::ForEachConsumer;
use try_for_each::TryForEachConsumer;

pub use convert::{FromConcurrentStream, IntoConcurrentStream};
pub use enumerate::Enumerate;
pub use from_stream::FromStream;
pub use limit::Limit;
Expand Down Expand Up @@ -148,6 +150,15 @@ pub trait ConcurrentStream {
let limit = self.concurrency_limit();
self.drive(TryForEachConsumer::new(limit, f)).await
}

/// Transforms an iterator into a collection.
async fn collect<B>(self) -> B
where
B: FromConcurrentStream<Self::Item>,
Self: Sized,
{
B::from_concurrent_stream(self).await
}
}

/// The state of the consumer, used to communicate back to the source.
Expand All @@ -162,23 +173,6 @@ pub enum ConsumerState {
Empty,
}

/// Convert into a concurrent stream
pub trait IntoConcurrentStream {
/// The type of concurrent stream we're returning.
type ConcurrentStream: ConcurrentStream;

/// Convert `self` into a concurrent stream.
fn into_concurrent_stream(self) -> Self::ConcurrentStream;
}

impl<S: ConcurrentStream> IntoConcurrentStream for S {
type ConcurrentStream = S;

fn into_concurrent_stream(self) -> Self::ConcurrentStream {
self
}
}

#[cfg(test)]
mod test {
use super::*;
Expand Down

0 comments on commit 1ce109e

Please sign in to comment.