Skip to content

Commit

Permalink
start defining the concurrent stream trait
Browse files Browse the repository at this point in the history
  • Loading branch information
yoshuawuyts committed Feb 17, 2024
1 parent 857dafa commit 2cb026e
Showing 1 changed file with 72 additions and 0 deletions.
72 changes: 72 additions & 0 deletions src/concurrent_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,57 @@ use std::pin::pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

/// Concurrently operate over the
trait ConcurrentStream {
type Item;
async fn drive<C: Consumer<Self::Item>>(self, consumer: C) -> C::Output;

async fn passthrough(self) -> Passthrough<Self>
where
Self: Sized,
{
Passthrough { inner: self }
}
}

struct Passthrough<CS: ConcurrentStream> {
inner: CS,
}

impl<CS: ConcurrentStream> ConcurrentStream for Passthrough<CS> {
type Item = CS::Item;

async fn drive<C: Consumer<Self::Item>>(self, consumer: C) -> C::Output {
self.inner
.drive(PassthroughConsumer { inner: consumer })
.await
}
}

struct PassthroughConsumer<C> {
inner: C,
}
impl<C, Item> Consumer<Item> for PassthroughConsumer<C>
where
C: Consumer<Item>,
{
type Output = C::Output;

fn consume(&mut self, item: Item) {
self.inner.consume(item);
}

fn complete(self) -> Self::Output {
self.inner.complete()
}
}

trait Consumer<Item> {
type Output;
fn consume(&mut self, item: Item);
fn complete(self) -> Self::Output;
}

/// Concurrently map the items coming out of a sequential stream, using `limit`
/// as the max concurrency.
///
Expand Down Expand Up @@ -85,6 +136,27 @@ where
}
}

#[pin_project::pin_project]
struct Source<S> {
#[pin]
iter: S,
}

impl<S> ConcurrentStream for Source<S>
where
S: Stream,
{
type Item = S::Item;

async fn drive<C: Consumer<Self::Item>>(self, mut consumer: C) -> C::Output {
let mut iter = pin!(self.iter);
while let Some(item) = iter.next().await {
consumer.consume(item);
}
consumer.complete()
}
}

async fn insert_fut<T, F, Fut>(f: F, item: T, count: Arc<AtomicUsize>)
where
F: Fn(T) -> Fut,
Expand Down

0 comments on commit 2cb026e

Please sign in to comment.