From 8abb32513d16c7c90a0ddf4a2fed89ef606f3d92 Mon Sep 17 00:00:00 2001 From: Yosh Date: Sun, 17 Mar 2024 01:21:33 +0100 Subject: [PATCH] move IntoConcurrentStream to a new location --- src/concurrent_stream/enumerate.rs | 3 +- src/concurrent_stream/for_each.rs | 2 +- ..._concurrent_iterator.rs => from_stream.rs} | 29 ++++++------------- src/concurrent_stream/mod.rs | 25 ++++++++++++++-- src/concurrent_stream/take.rs | 2 +- src/concurrent_stream/try_for_each.rs | 2 +- src/lib.rs | 2 ++ src/stream/stream_ext.rs | 13 ++++++++- 8 files changed, 50 insertions(+), 28 deletions(-) rename src/concurrent_stream/{into_concurrent_iterator.rs => from_stream.rs} (85%) diff --git a/src/concurrent_stream/enumerate.rs b/src/concurrent_stream/enumerate.rs index ff1eee1..3106371 100644 --- a/src/concurrent_stream/enumerate.rs +++ b/src/concurrent_stream/enumerate.rs @@ -120,7 +120,8 @@ where #[cfg(test)] mod test { - use crate::concurrent_stream::{ConcurrentStream, IntoConcurrentStream}; + // use crate::concurrent_stream::{ConcurrentStream, IntoConcurrentStream}; + use crate::prelude::*; use futures_lite::stream; use futures_lite::StreamExt; use std::num::NonZeroUsize; diff --git a/src/concurrent_stream/for_each.rs b/src/concurrent_stream/for_each.rs index 970f23d..72506d1 100644 --- a/src/concurrent_stream/for_each.rs +++ b/src/concurrent_stream/for_each.rs @@ -157,7 +157,7 @@ where #[cfg(test)] mod test { use super::*; - use crate::concurrent_stream::{ConcurrentStream, IntoConcurrentStream}; + use crate::prelude::*; use futures_lite::stream; use std::sync::Arc; diff --git a/src/concurrent_stream/into_concurrent_iterator.rs b/src/concurrent_stream/from_stream.rs similarity index 85% rename from src/concurrent_stream/into_concurrent_iterator.rs rename to src/concurrent_stream/from_stream.rs index d783922..36ee3c9 100644 --- a/src/concurrent_stream/into_concurrent_iterator.rs +++ b/src/concurrent_stream/from_stream.rs @@ -12,7 +12,13 @@ use super::{ConcurrentStream, Consumer}; #[derive(Debug)] pub struct FromStream { #[pin] - iter: S, + stream: S, +} + +impl FromStream { + pub(crate) fn new(stream: S) -> Self { + Self { stream } + } } impl ConcurrentStream for FromStream @@ -26,7 +32,7 @@ where where C: Consumer, { - let mut iter = pin!(self.iter); + let mut iter = pin!(self.stream); // Concurrently progress the consumer as well as the stream. Whenever // there is an item from the stream available, we submit it to the @@ -83,7 +89,7 @@ where } fn size_hint(&self) -> (usize, Option) { - self.iter.size_hint() + self.stream.size_hint() } } @@ -91,20 +97,3 @@ enum State { Progress(super::ConsumerState), Item(T), } - -/// Convert into a concurrent stream -pub trait IntoConcurrentStream { - /// The type of concurrent stream we're returning. - type ConcurrentStream: ConcurrentStream; - - /// Convert `self` into a concurrent stream. - fn co(self) -> Self::ConcurrentStream; -} - -impl IntoConcurrentStream for S { - type ConcurrentStream = FromStream; - - fn co(self) -> Self::ConcurrentStream { - FromStream { iter: self } - } -} diff --git a/src/concurrent_stream/mod.rs b/src/concurrent_stream/mod.rs index b85202f..b3851ac 100644 --- a/src/concurrent_stream/mod.rs +++ b/src/concurrent_stream/mod.rs @@ -3,7 +3,7 @@ mod drain; mod enumerate; mod for_each; -mod into_concurrent_iterator; +mod from_stream; mod limit; mod map; mod take; @@ -15,7 +15,7 @@ use std::num::NonZeroUsize; use try_for_each::TryForEachConsumer; pub use enumerate::Enumerate; -pub use into_concurrent_iterator::{FromStream, IntoConcurrentStream}; +pub use from_stream::FromStream; pub use limit::Limit; pub use map::Map; pub use take::Take; @@ -152,7 +152,7 @@ pub trait ConcurrentStream { /// The state of the consumer, used to communicate back to the source. #[derive(Debug)] -enum ConsumerState { +pub enum ConsumerState { /// The consumer is done making progress, and the `finish` method should be called. Break, /// The consumer is ready to keep making progress. @@ -162,9 +162,28 @@ enum ConsumerState { Empty, } +/// Convert into a concurrent stream +pub trait IntoConcurrentStream { + /// The type of concurrent stream we're returning. + type ConcurrentStream: ConcurrentStream; + + /// Convert `self` into a concurrent stream. + fn into_concurrent_stream(self) -> Self::ConcurrentStream; +} + +impl IntoConcurrentStream for S { + type ConcurrentStream = S; + + fn into_concurrent_stream(self) -> Self::ConcurrentStream { + self + } +} + #[cfg(test)] mod test { use super::*; + + use crate::prelude::*; use futures_lite::prelude::*; use futures_lite::stream; diff --git a/src/concurrent_stream/take.rs b/src/concurrent_stream/take.rs index 72bef9b..0851b44 100644 --- a/src/concurrent_stream/take.rs +++ b/src/concurrent_stream/take.rs @@ -81,7 +81,7 @@ where #[cfg(test)] mod test { - use crate::concurrent_stream::{ConcurrentStream, IntoConcurrentStream}; + use crate::prelude::*; use futures_lite::stream; #[test] diff --git a/src/concurrent_stream/try_for_each.rs b/src/concurrent_stream/try_for_each.rs index 6156c95..dab16d9 100644 --- a/src/concurrent_stream/try_for_each.rs +++ b/src/concurrent_stream/try_for_each.rs @@ -179,7 +179,7 @@ where #[cfg(test)] mod test { use super::*; - use crate::concurrent_stream::{ConcurrentStream, IntoConcurrentStream}; + use crate::prelude::*; use futures_lite::stream; use std::{io, sync::Arc}; diff --git a/src/lib.rs b/src/lib.rs index 3200d11..3c6fc11 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -77,6 +77,8 @@ pub mod prelude { pub use super::stream::IntoStream as _; pub use super::stream::Merge as _; pub use super::stream::Zip as _; + + pub use super::concurrent_stream::ConcurrentStream; } pub mod concurrent_stream; diff --git a/src/stream/stream_ext.rs b/src/stream/stream_ext.rs index 3321735..c9e44df 100644 --- a/src/stream/stream_ext.rs +++ b/src/stream/stream_ext.rs @@ -1,4 +1,7 @@ -use crate::stream::{IntoStream, Merge}; +use crate::{ + concurrent_stream::FromStream, + stream::{IntoStream, Merge}, +}; use futures_core::Stream; use super::{chain::tuple::Chain2, merge::tuple::Merge2, zip::tuple::Zip2, Chain, Zip}; @@ -22,6 +25,14 @@ pub trait StreamExt: Stream { where Self: Stream + Sized, S2: IntoStream; + + /// Convert into a concurrent stream. + fn co(self) -> FromStream + where + Self: Sized, + { + FromStream::new(self) + } } impl StreamExt for S1