Skip to content

Commit

Permalink
fix alloc and no_std builds
Browse files Browse the repository at this point in the history
  • Loading branch information
yoshuawuyts committed Mar 17, 2024
1 parent 2e1aa2d commit 2a9e714
Show file tree
Hide file tree
Showing 10 changed files with 47 additions and 44 deletions.
7 changes: 4 additions & 3 deletions src/concurrent_stream/drain.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use super::{Consumer, ConsumerState};
use crate::future::FutureGroup;
use futures_lite::StreamExt;

use super::{Consumer, ConsumerState};
use alloc::boxed::Box;
use core::future::Future;
use std::pin::Pin;
use core::pin::Pin;
use futures_lite::StreamExt;

pub(crate) struct Drain<Fut: Future> {
group: Pin<Box<FutureGroup<Fut>>>,
Expand Down
11 changes: 5 additions & 6 deletions src/concurrent_stream/enumerate.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use super::{ConcurrentStream, Consumer};
use std::{
future::Future,
pin::Pin,
task::{ready, Context, Poll},
};
use core::future::Future;
use core::num::NonZeroUsize;
use core::pin::Pin;
use core::task::{ready, Context, Poll};

/// A concurrent iterator that yields the current count and the element during iteration.
///
Expand Down Expand Up @@ -39,7 +38,7 @@ impl<CS: ConcurrentStream> ConcurrentStream for Enumerate<CS> {
.await
}

fn concurrency_limit(&self) -> Option<std::num::NonZeroUsize> {
fn concurrency_limit(&self) -> Option<NonZeroUsize> {
self.inner.concurrency_limit()
}

Expand Down
16 changes: 8 additions & 8 deletions src/concurrent_stream/for_each.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ use crate::future::FutureGroup;
use futures_lite::StreamExt;

use super::{Consumer, ConsumerState};
use std::future::Future;
use std::marker::PhantomData;
use std::num::NonZeroUsize;

use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::{ready, Context, Poll};
use alloc::boxed::Box;
use alloc::sync::Arc;
use core::future::Future;
use core::marker::PhantomData;
use core::num::NonZeroUsize;
use core::pin::Pin;
use core::sync::atomic::{AtomicUsize, Ordering};
use core::task::{ready, Context, Poll};

// OK: validated! - all bounds should check out
pub(crate) struct ForEachConsumer<FutT, T, F, FutB>
Expand Down
12 changes: 6 additions & 6 deletions src/concurrent_stream/from_stream.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use super::{ConcurrentStream, Consumer};
use crate::concurrent_stream::ConsumerState;
use crate::prelude::*;
use futures_lite::{Stream, StreamExt};
use std::future::{ready, Ready};

use std::pin::pin;

use super::{ConcurrentStream, Consumer};
use core::future::{ready, Ready};
use core::num::NonZeroUsize;
use core::pin::pin;
use futures_lite::{Stream, StreamExt};

/// A concurrent for each implementation from a `Stream`
#[pin_project::pin_project]
Expand Down Expand Up @@ -84,7 +84,7 @@ where
consumer.finish().await
}

fn concurrency_limit(&self) -> Option<std::num::NonZeroUsize> {
fn concurrency_limit(&self) -> Option<NonZeroUsize> {
None
}

Expand Down
5 changes: 3 additions & 2 deletions src/concurrent_stream/limit.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::{ConcurrentStream, Consumer};
use std::{future::Future, num::NonZeroUsize};
use core::future::Future;
use core::num::NonZeroUsize;

/// A concurrent iterator that limits the amount of concurrency applied.
///
Expand Down Expand Up @@ -33,7 +34,7 @@ impl<CS: ConcurrentStream> ConcurrentStream for Limit<CS> {

// NOTE: this is the only interesting bit in this module. When a limit is
// set, this now starts using it.
fn concurrency_limit(&self) -> Option<std::num::NonZeroUsize> {
fn concurrency_limit(&self) -> Option<NonZeroUsize> {
self.limit
}

Expand Down
5 changes: 3 additions & 2 deletions src/concurrent_stream/map.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::{ConcurrentStream, Consumer};
use std::{
use core::num::NonZeroUsize;
use core::{
future::Future,
marker::PhantomData,
pin::Pin,
Expand Down Expand Up @@ -61,7 +62,7 @@ where
self.inner.drive(consumer).await
}

fn concurrency_limit(&self) -> Option<std::num::NonZeroUsize> {
fn concurrency_limit(&self) -> Option<NonZeroUsize> {
self.inner.concurrency_limit()
}

Expand Down
6 changes: 3 additions & 3 deletions src/concurrent_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ mod map;
mod take;
mod try_for_each;

use core::future::Future;
use core::num::NonZeroUsize;
use for_each::ForEachConsumer;
use std::future::Future;
use std::num::NonZeroUsize;
use try_for_each::TryForEachConsumer;

pub use enumerate::Enumerate;
Expand Down Expand Up @@ -96,7 +96,7 @@ pub trait ConcurrentStream {
}

/// Obtain a simple pass-through adapter.
fn limit(self, limit: Option<std::num::NonZeroUsize>) -> Limit<Self>
fn limit(self, limit: Option<NonZeroUsize>) -> Limit<Self>
where
Self: Sized,
{
Expand Down
5 changes: 3 additions & 2 deletions src/concurrent_stream/take.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::{ConcurrentStream, Consumer, ConsumerState};
use std::future::Future;
use core::future::Future;
use core::num::NonZeroUsize;

/// A concurrent iterator that only iterates over the first `n` iterations of `iter`.
///
Expand Down Expand Up @@ -39,7 +40,7 @@ impl<CS: ConcurrentStream> ConcurrentStream for Take<CS> {

// NOTE: this is the only interesting bit in this module. When a limit is
// set, this now starts using it.
fn concurrency_limit(&self) -> Option<std::num::NonZeroUsize> {
fn concurrency_limit(&self) -> Option<NonZeroUsize> {
self.inner.concurrency_limit()
}

Expand Down
16 changes: 8 additions & 8 deletions src/concurrent_stream/try_for_each.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ use crate::future::FutureGroup;
use futures_lite::StreamExt;

use super::Consumer;
use std::future::Future;
use std::marker::PhantomData;
use std::num::NonZeroUsize;

use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::{ready, Context, Poll};
use alloc::boxed::Box;
use alloc::sync::Arc;
use core::future::Future;
use core::marker::PhantomData;
use core::num::NonZeroUsize;
use core::pin::Pin;
use core::sync::atomic::{AtomicUsize, Ordering};
use core::task::{ready, Context, Poll};

// OK: validated! - all bounds should check out
pub(crate) struct TryForEachConsumer<FutT, T, F, FutB, E>
Expand Down
8 changes: 4 additions & 4 deletions src/stream/stream_ext.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::{
concurrent_stream::FromStream,
stream::{IntoStream, Merge},
};
use crate::stream::{IntoStream, Merge};
use futures_core::Stream;

#[cfg(feature = "alloc")]
use crate::concurrent_stream::FromStream;

use super::{chain::tuple::Chain2, merge::tuple::Merge2, zip::tuple::Zip2, Chain, Zip};

/// An extension trait for the `Stream` trait.
Expand Down

0 comments on commit 2a9e714

Please sign in to comment.