Skip to content

Commit

Permalink
move IntoConcurrentStream to a new location
Browse files Browse the repository at this point in the history
  • Loading branch information
yoshuawuyts committed Mar 17, 2024
1 parent 49055be commit 8abb325
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 28 deletions.
3 changes: 2 additions & 1 deletion src/concurrent_stream/enumerate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ where

#[cfg(test)]
mod test {
use crate::concurrent_stream::{ConcurrentStream, IntoConcurrentStream};
// use crate::concurrent_stream::{ConcurrentStream, IntoConcurrentStream};
use crate::prelude::*;
use futures_lite::stream;
use futures_lite::StreamExt;
use std::num::NonZeroUsize;
Expand Down
2 changes: 1 addition & 1 deletion src/concurrent_stream/for_each.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ where
#[cfg(test)]
mod test {
use super::*;
use crate::concurrent_stream::{ConcurrentStream, IntoConcurrentStream};
use crate::prelude::*;
use futures_lite::stream;
use std::sync::Arc;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@ use super::{ConcurrentStream, Consumer};
#[derive(Debug)]
pub struct FromStream<S: Stream> {
#[pin]
iter: S,
stream: S,
}

impl<S: Stream> FromStream<S> {
pub(crate) fn new(stream: S) -> Self {
Self { stream }
}
}

impl<S> ConcurrentStream for FromStream<S>
Expand All @@ -26,7 +32,7 @@ where
where
C: Consumer<Self::Item, Self::Future>,
{
let mut iter = pin!(self.iter);
let mut iter = pin!(self.stream);

// Concurrently progress the consumer as well as the stream. Whenever
// there is an item from the stream available, we submit it to the
Expand Down Expand Up @@ -83,28 +89,11 @@ where
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.iter.size_hint()
self.stream.size_hint()
}
}

enum State<T> {
Progress(super::ConsumerState),
Item(T),
}

/// Convert into a concurrent stream
pub trait IntoConcurrentStream {
/// The type of concurrent stream we're returning.
type ConcurrentStream: ConcurrentStream;

/// Convert `self` into a concurrent stream.
fn co(self) -> Self::ConcurrentStream;
}

impl<S: Stream> IntoConcurrentStream for S {
type ConcurrentStream = FromStream<S>;

fn co(self) -> Self::ConcurrentStream {
FromStream { iter: self }
}
}
25 changes: 22 additions & 3 deletions src/concurrent_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
mod drain;
mod enumerate;
mod for_each;
mod into_concurrent_iterator;
mod from_stream;
mod limit;
mod map;
mod take;
Expand All @@ -15,7 +15,7 @@ use std::num::NonZeroUsize;
use try_for_each::TryForEachConsumer;

pub use enumerate::Enumerate;
pub use into_concurrent_iterator::{FromStream, IntoConcurrentStream};
pub use from_stream::FromStream;
pub use limit::Limit;
pub use map::Map;
pub use take::Take;
Expand Down Expand Up @@ -152,7 +152,7 @@ pub trait ConcurrentStream {

/// The state of the consumer, used to communicate back to the source.
#[derive(Debug)]
enum ConsumerState {
pub enum ConsumerState {
/// The consumer is done making progress, and the `finish` method should be called.
Break,
/// The consumer is ready to keep making progress.
Expand All @@ -162,9 +162,28 @@ enum ConsumerState {
Empty,
}

/// Convert into a concurrent stream
pub trait IntoConcurrentStream {
/// The type of concurrent stream we're returning.
type ConcurrentStream: ConcurrentStream;

/// Convert `self` into a concurrent stream.
fn into_concurrent_stream(self) -> Self::ConcurrentStream;
}

impl<S: ConcurrentStream> IntoConcurrentStream for S {
type ConcurrentStream = S;

fn into_concurrent_stream(self) -> Self::ConcurrentStream {
self
}
}

#[cfg(test)]
mod test {
use super::*;

use crate::prelude::*;
use futures_lite::prelude::*;
use futures_lite::stream;

Expand Down
2 changes: 1 addition & 1 deletion src/concurrent_stream/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ where

#[cfg(test)]
mod test {
use crate::concurrent_stream::{ConcurrentStream, IntoConcurrentStream};
use crate::prelude::*;
use futures_lite::stream;

#[test]
Expand Down
2 changes: 1 addition & 1 deletion src/concurrent_stream/try_for_each.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ where
#[cfg(test)]
mod test {
use super::*;
use crate::concurrent_stream::{ConcurrentStream, IntoConcurrentStream};
use crate::prelude::*;
use futures_lite::stream;
use std::{io, sync::Arc};

Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ pub mod prelude {
pub use super::stream::IntoStream as _;
pub use super::stream::Merge as _;
pub use super::stream::Zip as _;

pub use super::concurrent_stream::ConcurrentStream;
}

pub mod concurrent_stream;
Expand Down
13 changes: 12 additions & 1 deletion src/stream/stream_ext.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::stream::{IntoStream, Merge};
use crate::{
concurrent_stream::FromStream,
stream::{IntoStream, Merge},
};
use futures_core::Stream;

use super::{chain::tuple::Chain2, merge::tuple::Merge2, zip::tuple::Zip2, Chain, Zip};
Expand All @@ -22,6 +25,14 @@ pub trait StreamExt: Stream {
where
Self: Stream<Item = T> + Sized,
S2: IntoStream<Item = T>;

/// Convert into a concurrent stream.
fn co(self) -> FromStream<Self>
where
Self: Sized,
{
FromStream::new(self)
}
}

impl<S1> StreamExt for S1
Expand Down

0 comments on commit 8abb325

Please sign in to comment.