From 2cb026e123113ce56446f99f7e715b9dca59ae16 Mon Sep 17 00:00:00 2001 From: Yosh Date: Sat, 17 Feb 2024 02:28:44 +0100 Subject: [PATCH] start defining the concurrent stream trait --- src/concurrent_stream/mod.rs | 72 ++++++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/src/concurrent_stream/mod.rs b/src/concurrent_stream/mod.rs index 41b0fbd..eb9d2b6 100644 --- a/src/concurrent_stream/mod.rs +++ b/src/concurrent_stream/mod.rs @@ -8,6 +8,57 @@ use std::pin::pin; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; +/// Concurrently operate over the +trait ConcurrentStream { + type Item; + async fn drive>(self, consumer: C) -> C::Output; + + async fn passthrough(self) -> Passthrough + where + Self: Sized, + { + Passthrough { inner: self } + } +} + +struct Passthrough { + inner: CS, +} + +impl ConcurrentStream for Passthrough { + type Item = CS::Item; + + async fn drive>(self, consumer: C) -> C::Output { + self.inner + .drive(PassthroughConsumer { inner: consumer }) + .await + } +} + +struct PassthroughConsumer { + inner: C, +} +impl Consumer for PassthroughConsumer +where + C: Consumer, +{ + type Output = C::Output; + + fn consume(&mut self, item: Item) { + self.inner.consume(item); + } + + fn complete(self) -> Self::Output { + self.inner.complete() + } +} + +trait Consumer { + type Output; + fn consume(&mut self, item: Item); + fn complete(self) -> Self::Output; +} + /// Concurrently map the items coming out of a sequential stream, using `limit` /// as the max concurrency. /// @@ -85,6 +136,27 @@ where } } +#[pin_project::pin_project] +struct Source { + #[pin] + iter: S, +} + +impl ConcurrentStream for Source +where + S: Stream, +{ + type Item = S::Item; + + async fn drive>(self, mut consumer: C) -> C::Output { + let mut iter = pin!(self.iter); + while let Some(item) = iter.next().await { + consumer.consume(item); + } + consumer.complete() + } +} + async fn insert_fut(f: F, item: T, count: Arc) where F: Fn(T) -> Fut,