From c6fd4472a9ae2d9c9879bdc32b167cae85d787e5 Mon Sep 17 00:00:00 2001 From: Alex Moon Date: Sun, 28 Jan 2024 15:12:33 -0500 Subject: [PATCH] Add no-std support --- .github/workflows/ci.yaml | 12 +++ Cargo.toml | 13 ++- benches/utils/countdown_streams.rs | 2 +- src/future/future_group.rs | 30 +++---- src/future/futures_ext.rs | 2 +- src/future/join/array.rs | 18 ++-- src/future/join/mod.rs | 1 + src/future/join/tuple.rs | 14 +-- src/future/join/vec.rs | 20 +++-- src/future/mod.rs | 2 + src/future/race/array.rs | 2 +- src/future/race/mod.rs | 1 + src/future/race/tuple.rs | 2 +- src/future/race/vec.rs | 4 +- src/future/race_ok/array/error.rs | 11 +-- src/future/race_ok/array/mod.rs | 20 ++--- src/future/race_ok/mod.rs | 1 + src/future/race_ok/tuple/error.rs | 24 +++++ src/future/race_ok/tuple/mod.rs | 11 +-- src/future/race_ok/vec/error.rs | 17 ++-- src/future/race_ok/vec/mod.rs | 25 +++--- src/future/try_join/array.rs | 20 ++--- src/future/try_join/mod.rs | 1 + src/future/try_join/tuple.rs | 33 ++++--- src/future/try_join/vec.rs | 22 ++--- src/lib.rs | 5 ++ src/stream/chain/mod.rs | 1 + src/stream/chain/vec.rs | 2 + src/stream/merge/array.rs | 27 +++--- src/stream/merge/mod.rs | 1 + src/stream/merge/tuple.rs | 22 ++--- src/stream/merge/vec.rs | 16 ++-- src/stream/mod.rs | 2 + src/stream/stream_group.rs | 30 +++---- src/stream/zip/array.rs | 9 +- src/stream/zip/mod.rs | 1 + src/stream/zip/tuple.rs | 6 +- src/stream/zip/vec.rs | 11 ++- src/utils/array.rs | 2 +- src/utils/channel.rs | 5 +- src/utils/futures/array.rs | 2 +- src/utils/futures/mod.rs | 2 + src/utils/futures/vec.rs | 3 +- src/utils/mod.rs | 25 ++++-- src/utils/output/array.rs | 2 +- src/utils/output/mod.rs | 2 + src/utils/output/vec.rs | 4 +- src/utils/pin.rs | 4 + src/utils/poll_state/array.rs | 2 +- src/utils/poll_state/mod.rs | 4 + src/utils/poll_state/vec.rs | 8 +- src/utils/wakers/array/mod.rs | 10 +++ src/utils/wakers/array/no_std.rs | 88 ++++++++++++++++++ src/utils/wakers/array/readiness_array.rs | 2 +- src/utils/wakers/array/waker.rs | 8 +- src/utils/wakers/array/waker_array.rs | 10 +-- src/utils/wakers/dummy.rs | 3 +- src/utils/wakers/mod.rs | 6 +- src/utils/wakers/vec/mod.rs | 10 +++ src/utils/wakers/vec/no_std.rs | 104 ++++++++++++++++++++++ src/utils/wakers/vec/readiness_vec.rs | 2 +- src/utils/wakers/vec/waker.rs | 8 +- src/utils/wakers/vec/waker_vec.rs | 11 +-- tests/regression-155.rs | 2 + 64 files changed, 523 insertions(+), 247 deletions(-) create mode 100644 src/utils/wakers/array/no_std.rs create mode 100644 src/utils/wakers/vec/no_std.rs diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 198f91f..c0da2db 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -34,6 +34,18 @@ jobs: command: check args: --all --bins --examples + - name: check no-std + uses: actions-rs/cargo@v1 + with: + command: check + args: --all --no-default-features + + - name: check alloc + uses: actions-rs/cargo@v1 + with: + command: check + args: --all --no-default-features --features alloc + - name: tests uses: actions-rs/cargo@v1 with: diff --git a/Cargo.toml b/Cargo.toml index ec91430..9d503ed 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,12 +27,17 @@ harness = false name = "compare" harness = false +[features] +default = ["std"] +std = ["alloc"] +alloc = ["bitvec/alloc", "dep:slab", "dep:smallvec"] + [dependencies] -bitvec = { version = "1.0.1", default-features = false, features = ["alloc"] } -futures-core = "0.3" +bitvec = { version = "1.0.1", default-features = false } +futures-core = { version = "0.3", default-features = false } pin-project = "1.0.8" -slab = "0.4.8" -smallvec = "1.11.0" +slab = { version = "0.4.8", optional = true } +smallvec = { version = "1.11.0", optional = true } [dev-dependencies] async-std = { version = "1.12.0", features = ["attributes"] } diff --git a/benches/utils/countdown_streams.rs b/benches/utils/countdown_streams.rs index 29dd91b..6922fc6 100644 --- a/benches/utils/countdown_streams.rs +++ b/benches/utils/countdown_streams.rs @@ -42,7 +42,7 @@ pub fn streams_array() -> [CountdownStream; N] { let wakers = Rc::new(RefCell::new(BinaryHeap::new())); let completed = Rc::new(Cell::new(0)); let mut streams = - std::array::from_fn(|n| CountdownStream::new(n, N, wakers.clone(), completed.clone())); + core::array::from_fn(|n| CountdownStream::new(n, N, wakers.clone(), completed.clone())); shuffle(&mut streams); streams } diff --git a/src/future/future_group.rs b/src/future/future_group.rs index 2d5d4bc..717c3df 100644 --- a/src/future/future_group.rs +++ b/src/future/future_group.rs @@ -1,11 +1,11 @@ +use alloc::collections::BTreeSet; +use core::fmt::{self, Debug}; +use core::ops::{Deref, DerefMut}; +use core::pin::Pin; +use core::task::{Context, Poll}; use futures_core::stream::Stream; use futures_core::Future; use slab::Slab; -use std::collections::BTreeSet; -use std::fmt::{self, Debug}; -use std::ops::{Deref, DerefMut}; -use std::pin::Pin; -use std::task::{Context, Poll}; use crate::utils::{PollState, PollVec, WakerVec}; @@ -237,7 +237,7 @@ impl FutureGroup { // Set the corresponding state self.states[index].set_pending(); - let mut readiness = self.wakers.readiness().lock().unwrap(); + let mut readiness = self.wakers.readiness(); readiness.set_ready(index); key @@ -273,7 +273,7 @@ impl FutureGroup { impl FutureGroup { fn poll_next_inner( self: Pin<&mut Self>, - cx: &std::task::Context<'_>, + cx: &Context<'_>, ) -> Poll::Output)>> { let mut this = self.project(); @@ -283,7 +283,7 @@ impl FutureGroup { } // Set the top-level waker and check readiness - let mut readiness = this.wakers.readiness().lock().unwrap(); + let mut readiness = this.wakers.readiness(); readiness.set_waker(cx.waker()); if !readiness.any_ready() { // Nothing is ready yet @@ -326,7 +326,7 @@ impl FutureGroup { }; // Lock readiness so we can use it again - readiness = this.wakers.readiness().lock().unwrap(); + readiness = this.wakers.readiness(); } } @@ -343,10 +343,7 @@ impl FutureGroup { impl Stream for FutureGroup { type Item = ::Output; - fn poll_next( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match self.poll_next_inner(cx) { Poll::Ready(Some((_key, item))) => Poll::Ready(Some(item)), Poll::Ready(None) => Poll::Ready(None), @@ -396,10 +393,7 @@ impl DerefMut for Keyed { impl Stream for Keyed { type Item = (Key, ::Output); - fn poll_next( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); this.group.as_mut().poll_next_inner(cx) } @@ -408,8 +402,8 @@ impl Stream for Keyed { #[cfg(test)] mod test { use super::FutureGroup; + use core::future; use futures_lite::prelude::*; - use std::future; #[test] fn smoke() { diff --git a/src/future/futures_ext.rs b/src/future/futures_ext.rs index 75fc6ae..2dd3e73 100644 --- a/src/future/futures_ext.rs +++ b/src/future/futures_ext.rs @@ -1,7 +1,7 @@ use crate::future::Join; use crate::future::Race; +use core::future::IntoFuture; use futures_core::Future; -use std::future::IntoFuture; use super::join::tuple::Join2; use super::race::tuple::Race2; diff --git a/src/future/join/array.rs b/src/future/join/array.rs index ca628a4..5c061bb 100644 --- a/src/future/join/array.rs +++ b/src/future/join/array.rs @@ -4,9 +4,9 @@ use crate::utils::{FutureArray, OutputArray, PollArray, WakerArray}; use core::fmt; use core::future::{Future, IntoFuture}; use core::mem::ManuallyDrop; +use core::ops::DerefMut; use core::pin::Pin; use core::task::{Context, Poll}; -use std::ops::DerefMut; use pin_project::{pin_project, pinned_drop}; @@ -93,7 +93,7 @@ where "Futures must not be polled after completing" ); - let mut readiness = this.wakers.readiness().lock().unwrap(); + let mut readiness = this.wakers.readiness(); readiness.set_waker(cx.waker()); if *this.pending != 0 && !readiness.any_ready() { // Nothing is ready yet @@ -125,7 +125,7 @@ where } // Lock readiness so we can use it again - readiness = this.wakers.readiness().lock().unwrap(); + readiness = this.wakers.readiness(); } } @@ -178,12 +178,8 @@ where #[cfg(test)] mod test { use super::*; - use crate::utils::DummyWaker; - use std::future; - use std::future::Future; - use std::sync::Arc; - use std::task::Context; + use core::future; #[test] fn smoke() { @@ -203,7 +199,13 @@ mod test { } #[test] + #[cfg(feature = "alloc")] fn debug() { + use crate::utils::DummyWaker; + use alloc::format; + use alloc::sync::Arc; + use core::task::Context; + let mut fut = [future::ready("hello"), future::ready("world")].join(); assert_eq!(format!("{:?}", fut), "[Pending, Pending]"); let mut fut = Pin::new(&mut fut); diff --git a/src/future/join/mod.rs b/src/future/join/mod.rs index 3bd1eef..aa479df 100644 --- a/src/future/join/mod.rs +++ b/src/future/join/mod.rs @@ -2,6 +2,7 @@ use core::future::Future; pub(crate) mod array; pub(crate) mod tuple; +#[cfg(feature = "alloc")] pub(crate) mod vec; /// Wait for all futures to complete. diff --git a/src/future/join/tuple.rs b/src/future/join/tuple.rs index 28db118..f828780 100644 --- a/src/future/join/tuple.rs +++ b/src/future/join/tuple.rs @@ -3,11 +3,10 @@ use crate::utils::{PollArray, WakerArray}; use core::fmt::{self, Debug}; use core::future::{Future, IntoFuture}; -use core::mem::MaybeUninit; +use core::mem::{ManuallyDrop, MaybeUninit}; +use core::ops::DerefMut; use core::pin::Pin; use core::task::{Context, Poll}; -use std::mem::ManuallyDrop; -use std::ops::DerefMut; use pin_project::{pin_project, pinned_drop}; @@ -136,7 +135,7 @@ macro_rules! impl_join_tuple { }; ($mod_name:ident $StructName:ident $($F:ident)+) => { mod $mod_name { - use std::mem::ManuallyDrop; + use core::mem::ManuallyDrop; #[pin_project::pin_project] pub(super) struct Futures<$($F,)+> {$( @@ -199,7 +198,7 @@ macro_rules! impl_join_tuple { let mut futures = this.futures.project(); - let mut readiness = this.wakers.readiness().lock().unwrap(); + let mut readiness = this.wakers.readiness(); readiness.set_waker(cx.waker()); for index in 0..LEN { @@ -234,7 +233,7 @@ macro_rules! impl_join_tuple { return Poll::Ready(out); } - readiness = this.wakers.readiness().lock().unwrap(); + readiness = this.wakers.readiness(); } Poll::Pending @@ -294,7 +293,7 @@ impl_join_tuple! { join12 Join12 A B C D E F G H I J K L } #[cfg(test)] mod test { use super::*; - use std::future; + use core::future; #[test] #[allow(clippy::unit_cmp)] @@ -332,6 +331,7 @@ mod test { } #[test] + #[cfg(feature = "std")] fn does_not_leak_memory() { use core::cell::RefCell; use futures_lite::future::pending; diff --git a/src/future/join/vec.rs b/src/future/join/vec.rs index 15b67bf..0bc0980 100644 --- a/src/future/join/vec.rs +++ b/src/future/join/vec.rs @@ -1,13 +1,13 @@ use super::Join as JoinTrait; use crate::utils::{FutureVec, OutputVec, PollVec, WakerVec}; +use alloc::vec::Vec; use core::fmt; use core::future::{Future, IntoFuture}; +use core::mem::ManuallyDrop; +use core::ops::DerefMut; use core::pin::Pin; use core::task::{Context, Poll}; -use std::mem::ManuallyDrop; -use std::ops::DerefMut; -use std::vec::Vec; use pin_project::{pin_project, pinned_drop}; @@ -85,7 +85,7 @@ where "Futures must not be polled after completing" ); - let mut readiness = this.wakers.readiness().lock().unwrap(); + let mut readiness = this.wakers.readiness(); readiness.set_waker(cx.waker()); if *this.pending != 0 && !readiness.any_ready() { // Nothing is ready yet @@ -119,7 +119,7 @@ where } // Lock readiness so we can use it again - readiness = this.wakers.readiness().lock().unwrap(); + readiness = this.wakers.readiness(); } } @@ -174,10 +174,12 @@ mod test { use super::*; use crate::utils::DummyWaker; - use std::future; - use std::future::Future; - use std::sync::Arc; - use std::task::Context; + use alloc::format; + use alloc::sync::Arc; + use alloc::vec; + use core::future; + use core::future::Future; + use core::task::Context; #[test] fn smoke() { diff --git a/src/future/mod.rs b/src/future/mod.rs index f181ad1..2936a23 100644 --- a/src/future/mod.rs +++ b/src/future/mod.rs @@ -69,6 +69,7 @@ //! complete, or return an `Err` if *no* futures complete successfully. //! #[doc(inline)] +#[cfg(feature = "alloc")] pub use future_group::FutureGroup; pub use futures_ext::FutureExt; pub use join::Join; @@ -77,6 +78,7 @@ pub use race_ok::RaceOk; pub use try_join::TryJoin; /// A growable group of futures which act as a single unit. +#[cfg(feature = "alloc")] pub mod future_group; mod futures_ext; diff --git a/src/future/race/array.rs b/src/future/race/array.rs index b7ce4f1..4af1066 100644 --- a/src/future/race/array.rs +++ b/src/future/race/array.rs @@ -81,7 +81,7 @@ where #[cfg(test)] mod test { use super::*; - use std::future; + use core::future; // NOTE: we should probably poll in random order. #[test] diff --git a/src/future/race/mod.rs b/src/future/race/mod.rs index 19ca85c..a28bbbd 100644 --- a/src/future/race/mod.rs +++ b/src/future/race/mod.rs @@ -2,6 +2,7 @@ use core::future::Future; pub(crate) mod array; pub(crate) mod tuple; +#[cfg(feature = "alloc")] pub(crate) mod vec; /// Wait for the first future to complete. diff --git a/src/future/race/tuple.rs b/src/future/race/tuple.rs index c0fb804..f0415c2 100644 --- a/src/future/race/tuple.rs +++ b/src/future/race/tuple.rs @@ -106,7 +106,7 @@ impl_race_tuple! { Race12 A B C D E F G H I J K L } #[cfg(test)] mod test { use super::*; - use std::future; + use core::future; #[test] fn race_1() { diff --git a/src/future/race/vec.rs b/src/future/race/vec.rs index 28e75f2..80be869 100644 --- a/src/future/race/vec.rs +++ b/src/future/race/vec.rs @@ -2,6 +2,7 @@ use crate::utils::{self, Indexer}; use super::Race as RaceTrait; +use alloc::vec::Vec; use core::fmt; use core::future::{Future, IntoFuture}; use core::pin::Pin; @@ -81,7 +82,8 @@ where #[cfg(test)] mod test { use super::*; - use std::future; + use alloc::vec; + use core::future; // NOTE: we should probably poll in random order. #[test] diff --git a/src/future/race_ok/array/error.rs b/src/future/race_ok/array/error.rs index 23e3d6e..10433c4 100644 --- a/src/future/race_ok/array/error.rs +++ b/src/future/race_ok/array/error.rs @@ -1,5 +1,6 @@ use core::fmt; use core::ops::{Deref, DerefMut}; +#[cfg(feature = "std")] use std::error::Error; /// A collection of errors. @@ -14,24 +15,19 @@ impl AggregateError { } } -impl fmt::Debug for AggregateError { +impl fmt::Debug for AggregateError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { writeln!(f, "{self}:")?; for (i, err) in self.inner.iter().enumerate() { writeln!(f, "- Error {}: {err}", i + 1)?; - let mut source = err.source(); - while let Some(err) = source { - writeln!(f, " ↳ Caused by: {err}")?; - source = err.source(); - } } Ok(()) } } -impl fmt::Display for AggregateError { +impl fmt::Display for AggregateError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{} errors occured", self.inner.len()) } @@ -51,4 +47,5 @@ impl DerefMut for AggregateError { } } +#[cfg(feature = "std")] impl std::error::Error for AggregateError {} diff --git a/src/future/race_ok/array/mod.rs b/src/future/race_ok/array/mod.rs index c064c3d..9772035 100644 --- a/src/future/race_ok/array/mod.rs +++ b/src/future/race_ok/array/mod.rs @@ -102,13 +102,12 @@ where #[cfg(test)] mod test { use super::*; - use std::future; - use std::io::{Error, ErrorKind}; + use core::future; #[test] fn all_ok() { futures_lite::future::block_on(async { - let res: Result<&str, AggregateError> = + let res: Result<&str, AggregateError<(), 2>> = [future::ready(Ok("hello")), future::ready(Ok("world"))] .race_ok() .await; @@ -119,9 +118,8 @@ mod test { #[test] fn one_err() { futures_lite::future::block_on(async { - let err = Error::new(ErrorKind::Other, "oh no"); - let res: Result<&str, AggregateError> = - [future::ready(Ok("hello")), future::ready(Err(err))] + let res: Result<&str, AggregateError<_, 2>> = + [future::ready(Ok("hello")), future::ready(Err("oh no"))] .race_ok() .await; assert_eq!(res.unwrap(), "hello"); @@ -131,15 +129,13 @@ mod test { #[test] fn all_err() { futures_lite::future::block_on(async { - let err1 = Error::new(ErrorKind::Other, "oops"); - let err2 = Error::new(ErrorKind::Other, "oh no"); - let res: Result<&str, AggregateError> = - [future::ready(Err(err1)), future::ready(Err(err2))] + let res: Result<&str, AggregateError<_, 2>> = + [future::ready(Err("oops")), future::ready(Err("oh no"))] .race_ok() .await; let errs = res.unwrap_err(); - assert_eq!(errs[0].to_string(), "oops"); - assert_eq!(errs[1].to_string(), "oh no"); + assert_eq!(errs[0], "oops"); + assert_eq!(errs[1], "oh no"); }); } } diff --git a/src/future/race_ok/mod.rs b/src/future/race_ok/mod.rs index 5bc3e99..beb957c 100644 --- a/src/future/race_ok/mod.rs +++ b/src/future/race_ok/mod.rs @@ -2,6 +2,7 @@ use core::future::Future; pub(crate) mod array; pub(crate) mod tuple; +#[cfg(feature = "alloc")] pub(crate) mod vec; /// Wait for the first successful future to complete. diff --git a/src/future/race_ok/tuple/error.rs b/src/future/race_ok/tuple/error.rs index 23e3d6e..d7f7b3b 100644 --- a/src/future/race_ok/tuple/error.rs +++ b/src/future/race_ok/tuple/error.rs @@ -1,5 +1,6 @@ use core::fmt; use core::ops::{Deref, DerefMut}; +#[cfg(feature = "std")] use std::error::Error; /// A collection of errors. @@ -14,6 +15,7 @@ impl AggregateError { } } +#[cfg(feature = "std")] impl fmt::Debug for AggregateError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { writeln!(f, "{self}:")?; @@ -31,12 +33,33 @@ impl fmt::Debug for AggregateError { } } +#[cfg(not(feature = "std"))] +impl fmt::Debug for AggregateError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + writeln!(f, "{self}:")?; + + for (i, err) in self.inner.iter().enumerate() { + writeln!(f, "- Error {}: {err}", i + 1)?; + } + + Ok(()) + } +} + +#[cfg(feature = "std")] impl fmt::Display for AggregateError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{} errors occured", self.inner.len()) } } +#[cfg(not(feature = "std"))] +impl fmt::Display for AggregateError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{} errors occured", self.inner.len()) + } +} + impl Deref for AggregateError { type Target = [E; N]; @@ -51,4 +74,5 @@ impl DerefMut for AggregateError { } } +#[cfg(feature = "std")] impl std::error::Error for AggregateError {} diff --git a/src/future/race_ok/tuple/mod.rs b/src/future/race_ok/tuple/mod.rs index 0b3bb04..054ec1d 100644 --- a/src/future/race_ok/tuple/mod.rs +++ b/src/future/race_ok/tuple/mod.rs @@ -178,14 +178,11 @@ impl_race_ok_tuple! { RaceOk12 A B C D E F G H I J K L } mod test { use super::*; use core::future; - use std::error::Error; - - type DynError = Box; #[test] fn race_ok_1() { futures_lite::future::block_on(async { - let a = async { Ok::<_, DynError>("world") }; + let a = async { Ok::<_, ()>("world") }; let res = (a,).race_ok().await; assert!(matches!(res, Ok("world"))); }); @@ -195,7 +192,7 @@ mod test { fn race_ok_2() { futures_lite::future::block_on(async { let a = future::pending(); - let b = async { Ok::<_, DynError>("world") }; + let b = async { Ok::<_, ()>("world") }; let res = (a, b).race_ok().await; assert!(matches!(res, Ok("world"))); }); @@ -205,8 +202,8 @@ mod test { fn race_ok_3() { futures_lite::future::block_on(async { let a = future::pending(); - let b = async { Ok::<_, DynError>("hello") }; - let c = async { Ok::<_, DynError>("world") }; + let b = async { Ok::<_, ()>("hello") }; + let c = async { Ok::<_, ()>("world") }; let result = (a, b, c).race_ok().await; assert!(matches!(result, Ok("hello") | Ok("world"))); }); diff --git a/src/future/race_ok/vec/error.rs b/src/future/race_ok/vec/error.rs index e5977d1..708b1fa 100644 --- a/src/future/race_ok/vec/error.rs +++ b/src/future/race_ok/vec/error.rs @@ -1,8 +1,9 @@ +use alloc::vec::Vec; use core::fmt; +use core::ops::Deref; +use core::ops::DerefMut; +#[cfg(feature = "std")] use std::error::Error; -use std::ops::Deref; -use std::ops::DerefMut; -use std::vec::Vec; /// A collection of errors. #[repr(transparent)] @@ -16,24 +17,19 @@ impl AggregateError { } } -impl fmt::Debug for AggregateError { +impl fmt::Debug for AggregateError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { writeln!(f, "{self}:")?; for (i, err) in self.inner.iter().enumerate() { writeln!(f, "- Error {}: {err}", i + 1)?; - let mut source = err.source(); - while let Some(err) = source { - writeln!(f, " ↳ Caused by: {err}")?; - source = err.source(); - } } Ok(()) } } -impl fmt::Display for AggregateError { +impl fmt::Display for AggregateError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{} errors occurred", self.inner.len()) } @@ -53,4 +49,5 @@ impl DerefMut for AggregateError { } } +#[cfg(feature = "std")] impl Error for AggregateError {} diff --git a/src/future/race_ok/vec/mod.rs b/src/future/race_ok/vec/mod.rs index 0a55b8d..8448de9 100644 --- a/src/future/race_ok/vec/mod.rs +++ b/src/future/race_ok/vec/mod.rs @@ -2,13 +2,13 @@ use super::RaceOk as RaceOkTrait; use crate::utils::iter_pin_mut; use crate::utils::MaybeDone; +use alloc::boxed::Box; +use alloc::vec::Vec; use core::fmt; use core::future::{Future, IntoFuture}; use core::mem; use core::pin::Pin; use core::task::{Context, Poll}; -use std::boxed::Box; -use std::vec::Vec; pub use error::AggregateError; @@ -96,13 +96,13 @@ where mod test { use super::error::AggregateError; use super::*; - use std::future; - use std::io::{Error, ErrorKind}; + use alloc::vec; + use core::future; #[test] fn all_ok() { futures_lite::future::block_on(async { - let res: Result<&str, AggregateError> = + let res: Result<&str, AggregateError<()>> = vec![future::ready(Ok("hello")), future::ready(Ok("world"))] .race_ok() .await; @@ -113,9 +113,8 @@ mod test { #[test] fn one_err() { futures_lite::future::block_on(async { - let err = Error::new(ErrorKind::Other, "oh no"); - let res: Result<&str, AggregateError> = - vec![future::ready(Ok("hello")), future::ready(Err(err))] + let res: Result<&str, AggregateError<_>> = + vec![future::ready(Ok("hello")), future::ready(Err("oh no"))] .race_ok() .await; assert_eq!(res.unwrap(), "hello"); @@ -125,15 +124,13 @@ mod test { #[test] fn all_err() { futures_lite::future::block_on(async { - let err1 = Error::new(ErrorKind::Other, "oops"); - let err2 = Error::new(ErrorKind::Other, "oh no"); - let res: Result<&str, AggregateError> = - vec![future::ready(Err(err1)), future::ready(Err(err2))] + let res: Result<&str, AggregateError<_>> = + vec![future::ready(Err("oops")), future::ready(Err("oh no"))] .race_ok() .await; let errs = res.unwrap_err(); - assert_eq!(errs[0].to_string(), "oops"); - assert_eq!(errs[1].to_string(), "oh no"); + assert_eq!(errs[0], "oops"); + assert_eq!(errs[1], "oh no"); }); } } diff --git a/src/future/try_join/array.rs b/src/future/try_join/array.rs index 06995fd..cc0269f 100644 --- a/src/future/try_join/array.rs +++ b/src/future/try_join/array.rs @@ -3,10 +3,10 @@ use crate::utils::{FutureArray, OutputArray, PollArray, WakerArray}; use core::fmt; use core::future::{Future, IntoFuture}; +use core::mem::ManuallyDrop; +use core::ops::DerefMut; use core::pin::Pin; use core::task::{Context, Poll}; -use std::mem::ManuallyDrop; -use std::ops::DerefMut; use pin_project::{pin_project, pinned_drop}; @@ -93,7 +93,7 @@ where "Futures must not be polled after completing" ); - let mut readiness = this.wakers.readiness().lock().unwrap(); + let mut readiness = this.wakers.readiness(); readiness.set_waker(cx.waker()); if *this.pending != 0 && !readiness.any_ready() { // Nothing is ready yet @@ -148,7 +148,7 @@ where } // Lock readiness so we can use it again - readiness = this.wakers.readiness().lock().unwrap(); + readiness = this.wakers.readiness(); } } @@ -196,13 +196,12 @@ where #[cfg(test)] mod test { use super::*; - use std::future; - use std::io::{self, Error, ErrorKind}; + use core::future; #[test] fn all_ok() { futures_lite::future::block_on(async { - let res: io::Result<_> = [future::ready(Ok("hello")), future::ready(Ok("world"))] + let res: Result<_, ()> = [future::ready(Ok("hello")), future::ready(Ok("world"))] .try_join() .await; assert_eq!(res.unwrap(), ["hello", "world"]); @@ -212,7 +211,7 @@ mod test { #[test] fn empty() { futures_lite::future::block_on(async { - let data: [future::Ready>; 0] = []; + let data: [future::Ready>; 0] = []; let res = data.try_join().await; assert_eq!(res.unwrap(), []); }); @@ -221,11 +220,10 @@ mod test { #[test] fn one_err() { futures_lite::future::block_on(async { - let err = Error::new(ErrorKind::Other, "oh no"); - let res: io::Result<_> = [future::ready(Ok("hello")), future::ready(Err(err))] + let res: Result<_, _> = [future::ready(Ok("hello")), future::ready(Err("oh no"))] .try_join() .await; - assert_eq!(res.unwrap_err().to_string(), String::from("oh no")); + assert_eq!(res.unwrap_err(), "oh no"); }); } } diff --git a/src/future/try_join/mod.rs b/src/future/try_join/mod.rs index 4657460..df0093b 100644 --- a/src/future/try_join/mod.rs +++ b/src/future/try_join/mod.rs @@ -2,6 +2,7 @@ use core::future::Future; pub(crate) mod array; pub(crate) mod tuple; +#[cfg(feature = "alloc")] pub(crate) mod vec; /// Wait for all futures to complete successfully, or abort early on error. diff --git a/src/future/try_join/tuple.rs b/src/future/try_join/tuple.rs index 2990594..191089c 100644 --- a/src/future/try_join/tuple.rs +++ b/src/future/try_join/tuple.rs @@ -3,12 +3,12 @@ use crate::utils::{PollArray, WakerArray}; use core::fmt::{self, Debug}; use core::future::{Future, IntoFuture}; +use core::marker::PhantomData; +use core::mem::ManuallyDrop; use core::mem::MaybeUninit; +use core::ops::DerefMut; use core::pin::Pin; use core::task::{Context, Poll}; -use std::marker::PhantomData; -use std::mem::ManuallyDrop; -use std::ops::DerefMut; use pin_project::{pin_project, pinned_drop}; @@ -142,7 +142,7 @@ macro_rules! impl_try_join_tuple { } impl Future for $StructName { - type Output = Result<(), std::convert::Infallible>; + type Output = Result<(), core::convert::Infallible>; fn poll( self: Pin<&mut Self>, _cx: &mut Context<'_> @@ -153,7 +153,7 @@ macro_rules! impl_try_join_tuple { impl TryJoinTrait for () { type Output = (); - type Error = std::convert::Infallible; + type Error = core::convert::Infallible; type Future = $StructName; fn try_join(self) -> Self::Future { $StructName {} @@ -164,7 +164,7 @@ macro_rules! impl_try_join_tuple { // `Impl TryJoin for (F..)` ($mod_name:ident $StructName:ident $(($F:ident $T:ident))+) => { mod $mod_name { - use std::mem::ManuallyDrop; + use core::mem::ManuallyDrop; #[pin_project::pin_project] pub(super) struct Futures<$($F,)+> {$( @@ -231,7 +231,7 @@ macro_rules! impl_try_join_tuple { let mut futures = this.futures.project(); - let mut readiness = this.wakers.readiness().lock().unwrap(); + let mut readiness = this.wakers.readiness(); readiness.set_waker(cx.waker()); for index in 0..LEN { @@ -267,7 +267,7 @@ macro_rules! impl_try_join_tuple { return Poll::Ready(Ok(out)); } - readiness = this.wakers.readiness().lock().unwrap(); + readiness = this.wakers.readiness(); } Poll::Pending @@ -333,9 +333,8 @@ impl_try_join_tuple! { try_join_12 TryJoin12 (A ResA) (B ResB) (C ResC) (D ResD) mod test { use super::*; - use std::convert::Infallible; - use std::future; - use std::io::{self, Error, ErrorKind}; + use core::convert::Infallible; + use core::future; #[test] fn all_ok() { @@ -352,11 +351,10 @@ mod test { #[test] fn one_err() { futures_lite::future::block_on(async { - let err = Error::new(ErrorKind::Other, "oh no"); - let res: io::Result<(_, char)> = (future::ready(Ok("hello")), future::ready(Err(err))) + let res: Result<(_, char), ()> = (future::ready(Ok("hello")), future::ready(Err(()))) .try_join() .await; - assert_eq!(res.unwrap_err().to_string(), String::from("oh no")); + assert_eq!(res.unwrap_err(), ()); }) } @@ -377,6 +375,7 @@ mod test { } #[test] + #[cfg(feature = "std")] fn does_not_leak_memory() { use core::cell::RefCell; use futures_lite::future::pending; @@ -396,12 +395,12 @@ mod test { futures_lite::future::block_on(async { // this will trigger Miri if we don't drop the memory - let string = future::ready(io::Result::Ok("memory leak".to_owned())); + let string = future::ready(Result::Ok("memory leak".to_owned())); // this will not flip the thread_local flag if we don't drop the memory - let flip = future::ready(io::Result::Ok(FlipFlagAtDrop)); + let flip = future::ready(Result::Ok(FlipFlagAtDrop)); - let leak = (string, flip, pending::>()).try_join(); + let leak = (string, flip, pending::>()).try_join(); _ = futures_lite::future::poll_once(leak).await; }); diff --git a/src/future/try_join/vec.rs b/src/future/try_join/vec.rs index 7d52a83..837b466 100644 --- a/src/future/try_join/vec.rs +++ b/src/future/try_join/vec.rs @@ -1,12 +1,13 @@ use super::TryJoin as TryJoinTrait; use crate::utils::{FutureVec, OutputVec, PollVec, WakerVec}; +use alloc::vec::Vec; use core::fmt; use core::future::{Future, IntoFuture}; +use core::mem::ManuallyDrop; +use core::ops::DerefMut; use core::pin::Pin; use core::task::{Context, Poll}; -use std::mem::ManuallyDrop; -use std::ops::DerefMut; use pin_project::{pin_project, pinned_drop}; @@ -94,7 +95,7 @@ where "Futures must not be polled after completing" ); - let mut readiness = this.wakers.readiness().lock().unwrap(); + let mut readiness = this.wakers.readiness(); readiness.set_waker(cx.waker()); if *this.pending != 0 && !readiness.any_ready() { // Nothing is ready yet @@ -149,7 +150,7 @@ where } // Lock readiness so we can use it again - readiness = this.wakers.readiness().lock().unwrap(); + readiness = this.wakers.readiness(); } } @@ -202,13 +203,13 @@ where #[cfg(test)] mod test { use super::*; - use std::future; - use std::io::{self, Error, ErrorKind}; + use alloc::vec; + use core::future; #[test] fn all_ok() { futures_lite::future::block_on(async { - let res: io::Result<_> = vec![future::ready(Ok("hello")), future::ready(Ok("world"))] + let res: Result<_, ()> = vec![future::ready(Ok("hello")), future::ready(Ok("world"))] .try_join() .await; assert_eq!(res.unwrap(), ["hello", "world"]); @@ -218,7 +219,7 @@ mod test { #[test] fn empty() { futures_lite::future::block_on(async { - let data: Vec>> = vec![]; + let data: Vec>> = vec![]; let res = data.try_join().await; assert_eq!(res.unwrap(), vec![]); }); @@ -227,11 +228,10 @@ mod test { #[test] fn one_err() { futures_lite::future::block_on(async { - let err = Error::new(ErrorKind::Other, "oh no"); - let res: io::Result<_> = vec![future::ready(Ok("hello")), future::ready(Err(err))] + let res: Result<_, _> = vec![future::ready(Ok("hello")), future::ready(Err("oh no"))] .try_join() .await; - assert_eq!(res.unwrap_err().to_string(), String::from("oh no")); + assert_eq!(res.unwrap_err(), "oh no"); }); } } diff --git a/src/lib.rs b/src/lib.rs index e2b214a..0bfd9e8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -61,6 +61,10 @@ #![deny(missing_debug_implementations, nonstandard_style)] #![warn(missing_docs)] #![allow(non_snake_case)] +#![cfg_attr(not(feature = "std"), no_std)] + +#[cfg(feature = "alloc")] +extern crate alloc; mod utils; @@ -95,6 +99,7 @@ pub mod array { /// Helper functions and types for contiguous growable array type with heap-allocated contents, /// written `Vec`. +#[cfg(feature = "alloc")] pub mod vec { pub use crate::future::join::vec::Join; pub use crate::future::race::vec::Race; diff --git a/src/stream/chain/mod.rs b/src/stream/chain/mod.rs index 3dadf3a..8e7bab0 100644 --- a/src/stream/chain/mod.rs +++ b/src/stream/chain/mod.rs @@ -2,6 +2,7 @@ use futures_core::Stream; pub(crate) mod array; pub(crate) mod tuple; +#[cfg(feature = "alloc")] pub(crate) mod vec; /// Takes multiple streams and creates a new stream over all in sequence. diff --git a/src/stream/chain/vec.rs b/src/stream/chain/vec.rs index 76ce0f6..b8152ff 100644 --- a/src/stream/chain/vec.rs +++ b/src/stream/chain/vec.rs @@ -1,3 +1,4 @@ +use alloc::vec::Vec; use core::fmt; use core::pin::Pin; use core::task::{Context, Poll}; @@ -80,6 +81,7 @@ impl ChainTrait for Vec { #[cfg(test)] mod tests { use super::*; + use alloc::vec; use futures_lite::future::block_on; use futures_lite::prelude::*; use futures_lite::stream; diff --git a/src/stream/merge/array.rs b/src/stream/merge/array.rs index 2501b91..934da3b 100644 --- a/src/stream/merge/array.rs +++ b/src/stream/merge/array.rs @@ -3,9 +3,9 @@ use crate::stream::IntoStream; use crate::utils::{self, Indexer, PollArray, WakerArray}; use core::fmt; +use core::pin::Pin; +use core::task::{Context, Poll}; use futures_core::Stream; -use std::pin::Pin; -use std::task::{Context, Poll}; /// A stream that merges multiple streams into a single stream. /// @@ -62,7 +62,7 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); - let mut readiness = this.wakers.readiness().lock().unwrap(); + let mut readiness = this.wakers.readiness(); readiness.set_waker(cx.waker()); // Iterate over our streams one-by-one. If a stream yields a value, @@ -86,7 +86,7 @@ where match stream.poll_next(&mut cx) { Poll::Ready(Some(item)) => { // Mark ourselves as ready again because we need to poll for the next item. - this.wakers.readiness().lock().unwrap().set_ready(index); + this.wakers.readiness().set_ready(index); return Poll::Ready(Some(item)); } Poll::Ready(None) => { @@ -100,7 +100,7 @@ where } // Lock readiness so we can use it again - readiness = this.wakers.readiness().lock().unwrap(); + readiness = this.wakers.readiness(); } Poll::Pending @@ -121,19 +121,11 @@ where #[cfg(test)] mod tests { - use std::cell::RefCell; - use std::rc::Rc; - use super::*; - use crate::utils::channel::local_channel; - use futures::executor::LocalPool; - use futures::task::LocalSpawnExt; use futures_lite::future::block_on; use futures_lite::prelude::*; use futures_lite::stream; - use crate::future::join::Join; - #[test] fn merge_array_4() { block_on(async { @@ -170,7 +162,16 @@ mod tests { /// /// The purpose of this test is to make sure we have the waking logic working. #[test] + #[cfg(feature = "alloc")] fn merge_channels() { + use alloc::rc::Rc; + use core::cell::RefCell; + use futures::executor::LocalPool; + use futures::task::LocalSpawnExt; + + use crate::future::join::Join; + use crate::utils::channel::local_channel; + let mut pool = LocalPool::new(); let done = Rc::new(RefCell::new(false)); diff --git a/src/stream/merge/mod.rs b/src/stream/merge/mod.rs index 818c7d8..32663a3 100644 --- a/src/stream/merge/mod.rs +++ b/src/stream/merge/mod.rs @@ -2,6 +2,7 @@ use futures_core::Stream; pub(crate) mod array; pub(crate) mod tuple; +#[cfg(feature = "alloc")] pub(crate) mod vec; /// Combines multiple streams into a single stream of all their outputs. diff --git a/src/stream/merge/tuple.rs b/src/stream/merge/tuple.rs index 834de90..2ee119c 100644 --- a/src/stream/merge/tuple.rs +++ b/src/stream/merge/tuple.rs @@ -3,9 +3,9 @@ use crate::stream::IntoStream; use crate::utils::{self, PollArray, WakerArray}; use core::fmt; +use core::pin::Pin; +use core::task::{Context, Poll}; use futures_core::Stream; -use std::pin::Pin; -use std::task::{Context, Poll}; macro_rules! poll_stream { ($stream_idx:tt, $iteration:ident, $this:ident, $streams:ident . $stream_member:ident, $cx:ident, $len_streams:ident) => { @@ -13,12 +13,7 @@ macro_rules! poll_stream { match unsafe { Pin::new_unchecked(&mut $streams.$stream_member) }.poll_next(&mut $cx) { Poll::Ready(Some(item)) => { // Mark ourselves as ready again because we need to poll for the next item. - $this - .wakers - .readiness() - .lock() - .unwrap() - .set_ready($stream_idx); + $this.wakers.readiness().set_ready($stream_idx); return Poll::Ready(Some(item)); } Poll::Ready(None) => { @@ -118,7 +113,7 @@ macro_rules! impl_merge_tuple { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); - let mut readiness = this.wakers.readiness().lock().unwrap(); + let mut readiness = this.wakers.readiness(); readiness.set_waker(cx.waker()); const LEN: u8 = $mod_name::LEN as u8; @@ -155,7 +150,7 @@ macro_rules! impl_merge_tuple { )+ // Lock readiness so we can use it again - readiness = this.wakers.readiness().lock().unwrap(); + readiness = this.wakers.readiness(); } Poll::Pending @@ -200,7 +195,6 @@ impl_merge_tuple! { merge12 Merge12 A B C D E F G H I J K L } #[cfg(test)] mod tests { use super::*; - use futures::task::LocalSpawnExt; use futures_lite::future::block_on; use futures_lite::prelude::*; use futures_lite::stream; @@ -284,11 +278,13 @@ mod tests { /// /// The purpose of this test is to make sure we have the waking logic working. #[test] + #[cfg(feature = "alloc")] fn merge_channels() { - use std::cell::RefCell; - use std::rc::Rc; + use alloc::rc::Rc; + use core::cell::RefCell; use futures::executor::LocalPool; + use futures::task::LocalSpawnExt; use crate::future::Join; use crate::utils::channel::local_channel; diff --git a/src/stream/merge/vec.rs b/src/stream/merge/vec.rs index 27508ee..ebe0a84 100644 --- a/src/stream/merge/vec.rs +++ b/src/stream/merge/vec.rs @@ -2,10 +2,11 @@ use super::Merge as MergeTrait; use crate::stream::IntoStream; use crate::utils::{self, Indexer, PollVec, WakerVec}; +use alloc::vec::Vec; use core::fmt; +use core::pin::Pin; +use core::task::{Context, Poll}; use futures_core::Stream; -use std::pin::Pin; -use std::task::{Context, Poll}; /// A stream that merges multiple streams into a single stream. /// @@ -63,7 +64,7 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); - let mut readiness = this.wakers.readiness().lock().unwrap(); + let mut readiness = this.wakers.readiness(); readiness.set_waker(cx.waker()); // Iterate over our streams one-by-one. If a stream yields a value, @@ -87,7 +88,7 @@ where match stream.poll_next(&mut cx) { Poll::Ready(Some(item)) => { // Mark ourselves as ready again because we need to poll for the next item. - this.wakers.readiness().lock().unwrap().set_ready(index); + this.wakers.readiness().set_ready(index); return Poll::Ready(Some(item)); } Poll::Ready(None) => { @@ -101,7 +102,7 @@ where } // Lock readiness so we can use it again - readiness = this.wakers.readiness().lock().unwrap(); + readiness = this.wakers.readiness(); } Poll::Pending @@ -122,8 +123,9 @@ where #[cfg(test)] mod tests { - use std::cell::RefCell; - use std::rc::Rc; + use alloc::rc::Rc; + use alloc::vec; + use core::cell::RefCell; use super::*; use crate::utils::channel::local_channel; diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 9bdc336..98c6bd6 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -52,10 +52,12 @@ pub use into_stream::IntoStream; pub use merge::Merge; pub use stream_ext::StreamExt; #[doc(inline)] +#[cfg(feature = "alloc")] pub use stream_group::StreamGroup; pub use zip::Zip; /// A growable group of streams which act as a single unit. +#[cfg(feature = "alloc")] pub mod stream_group; pub(crate) mod chain; diff --git a/src/stream/stream_group.rs b/src/stream/stream_group.rs index 0f81efd..1689255 100644 --- a/src/stream/stream_group.rs +++ b/src/stream/stream_group.rs @@ -1,11 +1,11 @@ +use alloc::collections::BTreeSet; +use core::fmt::{self, Debug}; +use core::ops::{Deref, DerefMut}; +use core::pin::Pin; +use core::task::{Context, Poll}; use futures_core::Stream; use slab::Slab; use smallvec::{smallvec, SmallVec}; -use std::collections::BTreeSet; -use std::fmt::{self, Debug}; -use std::ops::{Deref, DerefMut}; -use std::pin::Pin; -use std::task::{Context, Poll}; use crate::utils::{PollState, PollVec, WakerVec}; @@ -236,7 +236,7 @@ impl StreamGroup { // Set the corresponding state self.states[index].set_pending(); - let mut readiness = self.wakers.readiness().lock().unwrap(); + let mut readiness = self.wakers.readiness(); readiness.set_ready(index); key @@ -271,7 +271,7 @@ impl StreamGroup { impl StreamGroup { fn poll_next_inner( self: Pin<&mut Self>, - cx: &std::task::Context<'_>, + cx: &Context<'_>, ) -> Poll::Item)>> { let mut this = self.project(); @@ -281,7 +281,7 @@ impl StreamGroup { } // Set the top-level waker and check readiness - let mut readiness = this.wakers.readiness().lock().unwrap(); + let mut readiness = this.wakers.readiness(); readiness.set_waker(cx.waker()); if !readiness.any_ready() { // Nothing is ready yet @@ -317,7 +317,7 @@ impl StreamGroup { // We just obtained an item from this index, make sure // we check it again on a next iteration states[index] = PollState::Pending; - let mut readiness = this.wakers.readiness().lock().unwrap(); + let mut readiness = this.wakers.readiness(); readiness.set_ready(index); break; @@ -337,7 +337,7 @@ impl StreamGroup { }; // Lock readiness so we can use it again - readiness = this.wakers.readiness().lock().unwrap(); + readiness = this.wakers.readiness(); } } @@ -363,10 +363,7 @@ impl StreamGroup { impl Stream for StreamGroup { type Item = ::Item; - fn poll_next( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match self.poll_next_inner(cx) { Poll::Ready(Some((_key, item))) => Poll::Ready(Some(item)), Poll::Ready(None) => Poll::Ready(None), @@ -416,10 +413,7 @@ impl DerefMut for Keyed { impl Stream for Keyed { type Item = (Key, ::Item); - fn poll_next( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); this.group.as_mut().poll_next_inner(cx) } diff --git a/src/stream/zip/array.rs b/src/stream/zip/array.rs index a760ff4..f10ea8a 100644 --- a/src/stream/zip/array.rs +++ b/src/stream/zip/array.rs @@ -4,10 +4,9 @@ use crate::utils::{self, PollArray, WakerArray}; use core::array; use core::fmt; -use core::mem::MaybeUninit; +use core::mem::{self, MaybeUninit}; use core::pin::Pin; use core::task::{Context, Poll}; -use std::mem; use futures_core::Stream; use pin_project::{pin_project, pinned_drop}; @@ -67,7 +66,7 @@ where assert!(!*this.done, "Stream should not be polled after completion"); - let mut readiness = this.wakers.readiness().lock().unwrap(); + let mut readiness = this.wakers.readiness(); readiness.set_waker(cx.waker()); for index in 0..N { if !readiness.any_ready() { @@ -94,7 +93,7 @@ where let all_ready = this.state.iter().all(|state| state.is_ready()); if all_ready { // Reset the future's state. - readiness = this.wakers.readiness().lock().unwrap(); + readiness = this.wakers.readiness(); readiness.set_all_ready(); this.state.set_all_pending(); @@ -118,7 +117,7 @@ where } // Lock readiness so we can use it again - readiness = this.wakers.readiness().lock().unwrap(); + readiness = this.wakers.readiness(); } Poll::Pending } diff --git a/src/stream/zip/mod.rs b/src/stream/zip/mod.rs index 14b33b0..84876e3 100644 --- a/src/stream/zip/mod.rs +++ b/src/stream/zip/mod.rs @@ -2,6 +2,7 @@ use futures_core::Stream; pub(crate) mod array; pub(crate) mod tuple; +#[cfg(feature = "alloc")] pub(crate) mod vec; /// ‘Zips up’ multiple streams into a single stream of pairs. diff --git a/src/stream/zip/tuple.rs b/src/stream/zip/tuple.rs index 05d3f91..a376dcd 100644 --- a/src/stream/zip/tuple.rs +++ b/src/stream/zip/tuple.rs @@ -80,7 +80,7 @@ macro_rules! impl_zip_for_tuple { assert!(!*this.done, "Stream should not be polled after completion"); - let mut readiness = this.wakers.readiness().lock().unwrap(); + let mut readiness = this.wakers.readiness(); readiness.set_waker(cx.waker()); for index in 0..LEN { @@ -126,7 +126,7 @@ macro_rules! impl_zip_for_tuple { if all_ready { // Reset the future's state. - readiness = this.wakers.readiness().lock().unwrap(); + readiness = this.wakers.readiness(); readiness.set_all_ready(); this.state.set_all_pending(); @@ -147,7 +147,7 @@ macro_rules! impl_zip_for_tuple { } // Lock readiness so we can use it again - readiness = this.wakers.readiness().lock().unwrap(); + readiness = this.wakers.readiness(); } Poll::Pending diff --git a/src/stream/zip/vec.rs b/src/stream/zip/vec.rs index ba29084..420f8bb 100644 --- a/src/stream/zip/vec.rs +++ b/src/stream/zip/vec.rs @@ -2,11 +2,12 @@ use super::Zip as ZipTrait; use crate::stream::IntoStream; use crate::utils::{self, PollVec, WakerVec}; +use alloc::vec::Vec; use core::fmt; +use core::mem; use core::mem::MaybeUninit; use core::pin::Pin; use core::task::{Context, Poll}; -use std::mem; use futures_core::Stream; use pin_project::{pin_project, pinned_drop}; @@ -69,7 +70,7 @@ where assert!(!*this.done, "Stream should not be polled after completion"); - let mut readiness = this.wakers.readiness().lock().unwrap(); + let mut readiness = this.wakers.readiness(); readiness.set_waker(cx.waker()); for index in 0..*this.len { if !readiness.any_ready() { @@ -96,7 +97,7 @@ where let all_ready = this.state.iter().all(|state| state.is_ready()); if all_ready { // Reset the future's state. - readiness = this.wakers.readiness().lock().unwrap(); + readiness = this.wakers.readiness(); readiness.set_all_ready(); this.state.set_all_pending(); @@ -120,7 +121,7 @@ where } // Lock readiness so we can use it again - readiness = this.wakers.readiness().lock().unwrap(); + readiness = this.wakers.readiness(); } Poll::Pending } @@ -159,6 +160,8 @@ where #[cfg(test)] mod tests { + use alloc::vec; + use crate::stream::Zip; use futures_lite::future::block_on; use futures_lite::prelude::*; diff --git a/src/utils/array.rs b/src/utils/array.rs index 5ced861..f7c13c9 100644 --- a/src/utils/array.rs +++ b/src/utils/array.rs @@ -1,4 +1,4 @@ -use std::mem::{self, MaybeUninit}; +use core::mem::{self, MaybeUninit}; /// Extracts the values from an array of `MaybeUninit` containers. /// diff --git a/src/utils/channel.rs b/src/utils/channel.rs index 07081df..3b10680 100644 --- a/src/utils/channel.rs +++ b/src/utils/channel.rs @@ -1,8 +1,7 @@ -use std::{ +use alloc::{collections::VecDeque, rc::Rc}; +use core::{ cell::RefCell, - collections::VecDeque, pin::Pin, - rc::Rc, task::{Context, Poll, Waker}, }; diff --git a/src/utils/futures/array.rs b/src/utils/futures/array.rs index 111afd3..930e485 100644 --- a/src/utils/futures/array.rs +++ b/src/utils/futures/array.rs @@ -1,4 +1,4 @@ -use std::{ +use core::{ mem::{self, ManuallyDrop, MaybeUninit}, pin::Pin, }; diff --git a/src/utils/futures/mod.rs b/src/utils/futures/mod.rs index 5536ffe..5aa41cc 100644 --- a/src/utils/futures/mod.rs +++ b/src/utils/futures/mod.rs @@ -1,5 +1,7 @@ mod array; +#[cfg(feature = "alloc")] mod vec; pub(crate) use array::FutureArray; +#[cfg(feature = "alloc")] pub(crate) use vec::FutureVec; diff --git a/src/utils/futures/vec.rs b/src/utils/futures/vec.rs index cb03e94..459de99 100644 --- a/src/utils/futures/vec.rs +++ b/src/utils/futures/vec.rs @@ -1,4 +1,5 @@ -use std::{ +use alloc::vec::Vec; +use core::{ mem::{self, ManuallyDrop, MaybeUninit}, pin::Pin, }; diff --git a/src/utils/mod.rs b/src/utils/mod.rs index ed70f95..133e123 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -9,18 +9,27 @@ mod poll_state; mod tuple; mod wakers; -pub(crate) use self::futures::{FutureArray, FutureVec}; +pub(crate) use self::futures::FutureArray; +#[cfg(feature = "alloc")] +pub(crate) use self::futures::FutureVec; pub(crate) use array::array_assume_init; pub(crate) use indexer::Indexer; -pub(crate) use output::{OutputArray, OutputVec}; -pub(crate) use pin::{get_pin_mut, get_pin_mut_from_vec, iter_pin_mut, iter_pin_mut_vec}; -pub(crate) use poll_state::MaybeDone; -pub(crate) use poll_state::{PollArray, PollState, PollVec}; +pub(crate) use output::OutputArray; +#[cfg(feature = "alloc")] +pub(crate) use output::OutputVec; +pub(crate) use pin::{get_pin_mut, iter_pin_mut}; +#[cfg(feature = "alloc")] +pub(crate) use pin::{get_pin_mut_from_vec, iter_pin_mut_vec}; +pub(crate) use poll_state::PollArray; +#[cfg(feature = "alloc")] +pub(crate) use poll_state::{MaybeDone, PollState, PollVec}; pub(crate) use tuple::{gen_conditions, tuple_len}; -pub(crate) use wakers::{WakerArray, WakerVec}; +pub(crate) use wakers::WakerArray; +#[cfg(feature = "alloc")] +pub(crate) use wakers::WakerVec; -#[cfg(test)] +#[cfg(all(test, feature = "alloc"))] pub(crate) use wakers::DummyWaker; -#[cfg(test)] +#[cfg(all(test, feature = "alloc"))] pub(crate) mod channel; diff --git a/src/utils/output/array.rs b/src/utils/output/array.rs index dd37890..f764f9c 100644 --- a/src/utils/output/array.rs +++ b/src/utils/output/array.rs @@ -1,4 +1,4 @@ -use std::{ +use core::{ array, mem::{self, MaybeUninit}, }; diff --git a/src/utils/output/mod.rs b/src/utils/output/mod.rs index 5b19e84..f9c1e09 100644 --- a/src/utils/output/mod.rs +++ b/src/utils/output/mod.rs @@ -1,5 +1,7 @@ mod array; +#[cfg(feature = "alloc")] mod vec; pub(crate) use array::OutputArray; +#[cfg(feature = "alloc")] pub(crate) use vec::OutputVec; diff --git a/src/utils/output/vec.rs b/src/utils/output/vec.rs index a800001..bab6e49 100644 --- a/src/utils/output/vec.rs +++ b/src/utils/output/vec.rs @@ -1,4 +1,6 @@ -use std::mem::{self, MaybeUninit}; +use alloc::vec; +use alloc::vec::Vec; +use core::mem::{self, MaybeUninit}; /// A contiguous vector of uninitialized data. pub(crate) struct OutputVec { diff --git a/src/utils/pin.rs b/src/utils/pin.rs index 46c1f8d..cc427fe 100644 --- a/src/utils/pin.rs +++ b/src/utils/pin.rs @@ -1,3 +1,5 @@ +#[cfg(feature = "alloc")] +use alloc::vec::Vec; use core::pin::Pin; use core::slice::SliceIndex; @@ -12,6 +14,7 @@ pub(crate) fn iter_pin_mut(slice: Pin<&mut [T]>) -> impl Iterator(slice: Pin<&mut Vec>) -> impl Iterator> { // SAFETY: `std` _could_ make this unsound if it were to decide Pin's // invariants aren't required to transmit through slices. Otherwise this has @@ -44,6 +47,7 @@ where // slices. // // From: https://github.com/rust-lang/rust/pull/78370/files +#[cfg(feature = "alloc")] pub(crate) fn get_pin_mut_from_vec( slice: Pin<&mut Vec>, index: I, diff --git a/src/utils/poll_state/array.rs b/src/utils/poll_state/array.rs index 61d0af6..e6d4387 100644 --- a/src/utils/poll_state/array.rs +++ b/src/utils/poll_state/array.rs @@ -1,4 +1,4 @@ -use std::ops::{Deref, DerefMut}; +use core::ops::{Deref, DerefMut}; use super::PollState; diff --git a/src/utils/poll_state/mod.rs b/src/utils/poll_state/mod.rs index 90a4d2c..28cbc20 100644 --- a/src/utils/poll_state/mod.rs +++ b/src/utils/poll_state/mod.rs @@ -1,11 +1,15 @@ #![allow(clippy::module_inception)] mod array; +#[cfg(feature = "alloc")] mod maybe_done; mod poll_state; +#[cfg(feature = "alloc")] mod vec; pub(crate) use array::PollArray; +#[cfg(feature = "alloc")] pub(crate) use maybe_done::MaybeDone; pub(crate) use poll_state::PollState; +#[cfg(feature = "alloc")] pub(crate) use vec::PollVec; diff --git a/src/utils/poll_state/vec.rs b/src/utils/poll_state/vec.rs index 4de8078..bf9523b 100644 --- a/src/utils/poll_state/vec.rs +++ b/src/utils/poll_state/vec.rs @@ -1,5 +1,5 @@ +use core::ops::{Deref, DerefMut}; use smallvec::{smallvec, SmallVec}; -use std::ops::{Deref, DerefMut}; use super::PollState; @@ -24,7 +24,7 @@ use super::PollState; /// len ^^^^^ /// Inline /// ``` -const MAX_INLINE_ENTRIES: usize = std::mem::size_of::() * 3 - 2; +const MAX_INLINE_ENTRIES: usize = core::mem::size_of::() * 3 - 2; #[derive(Default)] pub(crate) struct PollVec(SmallVec<[PollState; MAX_INLINE_ENTRIES]>); @@ -108,8 +108,8 @@ mod tests { fn type_size() { // PollVec is three words plus two bits assert_eq!( - std::mem::size_of::(), - std::mem::size_of::() * 4 + core::mem::size_of::(), + core::mem::size_of::() * 4 ); } diff --git a/src/utils/wakers/array/mod.rs b/src/utils/wakers/array/mod.rs index fc3b38a..cad55a4 100644 --- a/src/utils/wakers/array/mod.rs +++ b/src/utils/wakers/array/mod.rs @@ -1,7 +1,17 @@ +#[cfg(not(feature = "std"))] +mod no_std; +#[cfg(feature = "std")] mod readiness_array; +#[cfg(feature = "std")] mod waker; +#[cfg(feature = "std")] mod waker_array; +#[cfg(not(feature = "std"))] +pub(crate) use no_std::WakerArray; +#[cfg(feature = "std")] pub(crate) use readiness_array::ReadinessArray; +#[cfg(feature = "std")] pub(crate) use waker::InlineWakerArray; +#[cfg(feature = "std")] pub(crate) use waker_array::WakerArray; diff --git a/src/utils/wakers/array/no_std.rs b/src/utils/wakers/array/no_std.rs new file mode 100644 index 0000000..4113736 --- /dev/null +++ b/src/utils/wakers/array/no_std.rs @@ -0,0 +1,88 @@ +use core::ops::{Deref, DerefMut}; +use core::task::Waker; + +#[derive(Debug)] +pub(crate) struct ReadinessArray { + parent_waker: Option, +} + +impl ReadinessArray { + pub(crate) fn new() -> Self { + Self { parent_waker: None } + } + + /// Returns the old ready state for this id + pub(crate) fn set_ready(&mut self, _id: usize) -> bool { + false + } + + /// Set all markers to ready. + pub(crate) fn set_all_ready(&mut self) {} + + /// Returns whether the task id was previously ready + pub(crate) fn clear_ready(&mut self, _id: usize) -> bool { + true + } + + /// Returns `true` if any of the wakers are ready. + pub(crate) fn any_ready(&self) -> bool { + true + } + + /// Access the parent waker. + #[inline] + pub(crate) fn parent_waker(&self) -> Option<&Waker> { + self.parent_waker.as_ref() + } + + /// Set the parent `Waker`. This needs to be called at the start of every + /// `poll` function. + pub(crate) fn set_waker(&mut self, parent_waker: &Waker) { + match &mut self.parent_waker { + Some(prev) => prev.clone_from(parent_waker), + None => self.parent_waker = Some(parent_waker.clone()), + } + } +} + +pub(crate) struct ReadinessArrayRef<'a, const N: usize> { + inner: &'a mut ReadinessArray, +} + +impl<'a, const N: usize> Deref for ReadinessArrayRef<'a, N> { + type Target = ReadinessArray; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl<'a, const N: usize> DerefMut for ReadinessArrayRef<'a, N> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +/// A collection of wakers which delegate to an in-line waker. +pub(crate) struct WakerArray { + readiness: ReadinessArray, +} + +impl WakerArray { + /// Create a new instance of `WakerArray`. + pub(crate) fn new() -> Self { + let readiness = ReadinessArray::new(); + Self { readiness } + } + + pub(crate) fn get(&self, _index: usize) -> Option<&Waker> { + self.readiness.parent_waker() + } + + /// Access the `Readiness`. + pub(crate) fn readiness(&mut self) -> ReadinessArrayRef<'_, N> { + ReadinessArrayRef { + inner: &mut self.readiness, + } + } +} diff --git a/src/utils/wakers/array/readiness_array.rs b/src/utils/wakers/array/readiness_array.rs index dd50ae7..efcffce 100644 --- a/src/utils/wakers/array/readiness_array.rs +++ b/src/utils/wakers/array/readiness_array.rs @@ -1,4 +1,4 @@ -use std::task::Waker; +use core::task::Waker; /// Tracks which wakers are "ready" and should be polled. #[derive(Debug)] diff --git a/src/utils/wakers/array/waker.rs b/src/utils/wakers/array/waker.rs index 960dff0..2c06301 100644 --- a/src/utils/wakers/array/waker.rs +++ b/src/utils/wakers/array/waker.rs @@ -1,5 +1,6 @@ -use std::sync::{Arc, Mutex}; -use std::task::Wake; +use alloc::sync::Arc; +use alloc::task::Wake; +use std::sync::Mutex; use super::ReadinessArray; @@ -18,12 +19,11 @@ impl InlineWakerArray { } impl Wake for InlineWakerArray { - fn wake(self: std::sync::Arc) { + fn wake(self: Arc) { let mut readiness = self.readiness.lock().unwrap(); if !readiness.set_ready(self.id) { readiness .parent_waker() - .as_mut() .expect("`parent_waker` not available from `Readiness`. Did you forget to call `Readiness::set_waker`?") .wake_by_ref() } diff --git a/src/utils/wakers/array/waker_array.rs b/src/utils/wakers/array/waker_array.rs index 293155b..c6a3912 100644 --- a/src/utils/wakers/array/waker_array.rs +++ b/src/utils/wakers/array/waker_array.rs @@ -1,7 +1,7 @@ +use alloc::sync::Arc; use core::array; -use std::sync::Arc; -use std::sync::Mutex; -use std::task::Waker; +use core::task::Waker; +use std::sync::{Mutex, MutexGuard}; use super::{InlineWakerArray, ReadinessArray}; @@ -28,7 +28,7 @@ impl WakerArray { } /// Access the `Readiness`. - pub(crate) fn readiness(&self) -> &Mutex> { - self.readiness.as_ref() + pub(crate) fn readiness(&mut self) -> MutexGuard<'_, ReadinessArray> { + self.readiness.as_ref().lock().unwrap() } } diff --git a/src/utils/wakers/dummy.rs b/src/utils/wakers/dummy.rs index 0f454b0..8727b1a 100644 --- a/src/utils/wakers/dummy.rs +++ b/src/utils/wakers/dummy.rs @@ -1,4 +1,5 @@ -use std::{sync::Arc, task::Wake}; +use alloc::sync::Arc; +use alloc::task::Wake; pub(crate) struct DummyWaker(); impl Wake for DummyWaker { diff --git a/src/utils/wakers/mod.rs b/src/utils/wakers/mod.rs index d5c7f1d..089af6e 100644 --- a/src/utils/wakers/mod.rs +++ b/src/utils/wakers/mod.rs @@ -1,10 +1,12 @@ mod array; -#[cfg(test)] +#[cfg(all(test, feature = "alloc"))] mod dummy; +#[cfg(feature = "alloc")] mod vec; -#[cfg(test)] +#[cfg(all(test, feature = "alloc"))] pub(crate) use dummy::DummyWaker; pub(crate) use array::*; +#[cfg(feature = "alloc")] pub(crate) use vec::*; diff --git a/src/utils/wakers/vec/mod.rs b/src/utils/wakers/vec/mod.rs index 1cbdeb7..9e0e235 100644 --- a/src/utils/wakers/vec/mod.rs +++ b/src/utils/wakers/vec/mod.rs @@ -1,7 +1,17 @@ +#[cfg(not(feature = "std"))] +mod no_std; +#[cfg(feature = "std")] mod readiness_vec; +#[cfg(feature = "std")] mod waker; +#[cfg(feature = "std")] mod waker_vec; +#[cfg(not(feature = "std"))] +pub(crate) use no_std::WakerVec; +#[cfg(feature = "std")] pub(crate) use readiness_vec::ReadinessVec; +#[cfg(feature = "std")] pub(crate) use waker::InlineWakerVec; +#[cfg(feature = "std")] pub(crate) use waker_vec::WakerVec; diff --git a/src/utils/wakers/vec/no_std.rs b/src/utils/wakers/vec/no_std.rs new file mode 100644 index 0000000..7bb69eb --- /dev/null +++ b/src/utils/wakers/vec/no_std.rs @@ -0,0 +1,104 @@ +use core::ops::{Deref, DerefMut}; +use core::task::Waker; + +#[derive(Debug)] +pub(crate) struct ReadinessVec { + parent_waker: Option, +} + +impl ReadinessVec { + pub(crate) fn new() -> Self { + Self { parent_waker: None } + } + + /// Returns the old ready state for this id + pub(crate) fn set_ready(&mut self, _id: usize) -> bool { + false + } + + /// Set all markers to ready. + pub(crate) fn set_all_ready(&mut self) {} + + /// Returns whether the task id was previously ready + pub(crate) fn clear_ready(&mut self, _id: usize) -> bool { + true + } + + /// Returns `true` if any of the wakers are ready. + pub(crate) fn any_ready(&self) -> bool { + true + } + + /// Access the parent waker. + #[inline] + pub(crate) fn parent_waker(&self) -> Option<&Waker> { + self.parent_waker.as_ref() + } + + /// Set the parent `Waker`. This needs to be called at the start of every + /// `poll` function. + pub(crate) fn set_waker(&mut self, parent_waker: &Waker) { + match &mut self.parent_waker { + Some(prev) => prev.clone_from(parent_waker), + None => self.parent_waker = Some(parent_waker.clone()), + } + } + + /// Resize `readiness` to the new length. + /// + /// If new entries are created, they will be marked as 'ready'. + pub(crate) fn resize(&mut self, _len: usize) {} +} + +pub(crate) struct ReadinessVecRef<'a> { + inner: &'a mut ReadinessVec, +} + +impl<'a> Deref for ReadinessVecRef<'a> { + type Target = ReadinessVec; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl<'a> DerefMut for ReadinessVecRef<'a> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +/// A collection of wakers which delegate to an in-line waker. +pub(crate) struct WakerVec { + readiness: ReadinessVec, +} + +impl Default for WakerVec { + fn default() -> Self { + Self::new(0) + } +} + +impl WakerVec { + /// Create a new instance of `WakerArray`. + pub(crate) fn new(_len: usize) -> Self { + let readiness = ReadinessVec::new(); + Self { readiness } + } + + pub(crate) fn get(&self, _index: usize) -> Option<&Waker> { + self.readiness.parent_waker() + } + + /// Access the `Readiness`. + pub(crate) fn readiness(&mut self) -> ReadinessVecRef<'_> { + ReadinessVecRef { + inner: &mut self.readiness, + } + } + + /// Resize the `WakerVec` to the new size. + pub(crate) fn resize(&mut self, len: usize) { + self.readiness.resize(len); + } +} diff --git a/src/utils/wakers/vec/readiness_vec.rs b/src/utils/wakers/vec/readiness_vec.rs index 65fdc24..d7a8640 100644 --- a/src/utils/wakers/vec/readiness_vec.rs +++ b/src/utils/wakers/vec/readiness_vec.rs @@ -1,5 +1,5 @@ use bitvec::{bitvec, vec::BitVec}; -use std::task::Waker; +use core::task::Waker; /// Tracks which wakers are "ready" and should be polled. #[derive(Debug)] diff --git a/src/utils/wakers/vec/waker.rs b/src/utils/wakers/vec/waker.rs index cfd12ad..f331028 100644 --- a/src/utils/wakers/vec/waker.rs +++ b/src/utils/wakers/vec/waker.rs @@ -1,5 +1,6 @@ -use std::sync::{Arc, Mutex}; -use std::task::Wake; +use alloc::sync::Arc; +use alloc::task::Wake; +use std::sync::Mutex; use super::ReadinessVec; @@ -18,12 +19,11 @@ impl InlineWakerVec { } impl Wake for InlineWakerVec { - fn wake(self: std::sync::Arc) { + fn wake(self: Arc) { let mut readiness = self.readiness.lock().unwrap(); if !readiness.set_ready(self.id) { readiness .parent_waker() - .as_mut() .expect("`parent_waker` not available from `Readiness`. Did you forget to call `Readiness::set_waker`?") .wake_by_ref() } diff --git a/src/utils/wakers/vec/waker_vec.rs b/src/utils/wakers/vec/waker_vec.rs index dcc6c41..01f96f2 100644 --- a/src/utils/wakers/vec/waker_vec.rs +++ b/src/utils/wakers/vec/waker_vec.rs @@ -1,6 +1,7 @@ -use std::sync::Arc; -use std::sync::Mutex; -use std::task::Waker; +use alloc::sync::Arc; +use alloc::vec::Vec; +use core::task::Waker; +use std::sync::{Mutex, MutexGuard}; use super::{InlineWakerVec, ReadinessVec}; @@ -31,8 +32,8 @@ impl WakerVec { } /// Access the `Readiness`. - pub(crate) fn readiness(&self) -> &Mutex { - self.readiness.as_ref() + pub(crate) fn readiness(&self) -> MutexGuard<'_, ReadinessVec> { + self.readiness.lock().unwrap() } /// Resize the `WakerVec` to the new size. diff --git a/tests/regression-155.rs b/tests/regression-155.rs index 8b25d7f..3057bfe 100644 --- a/tests/regression-155.rs +++ b/tests/regression-155.rs @@ -4,6 +4,8 @@ //! path. This meant that when we returned, the destructor assumed a value was //! initialized when it wasn't, causing it to dereference uninitialized memory. +#![cfg(feature = "alloc")] + use futures_concurrency::prelude::*; use futures_core::Future; use std::{future::ready, pin::Pin};