From d28bede9ad3c3d0efca54831d826635b6cca801e Mon Sep 17 00:00:00 2001 From: yhx-12243 Date: Sun, 18 Aug 2024 20:28:05 -0400 Subject: [PATCH 1/2] refactor: `TryFuture`s are `Future`s now, `into_future` is no longer need --- futures-core/src/future.rs | 5 +- futures-util/src/future/mod.rs | 4 +- .../src/future/try_future/into_future.rs | 36 ------------ futures-util/src/future/try_future/mod.rs | 55 ++++--------------- futures-util/src/future/try_join_all.rs | 9 ++- .../stream/try_stream/try_buffer_unordered.rs | 5 +- .../src/stream/try_stream/try_buffered.rs | 5 +- futures/tests/auto_traits.rs | 7 --- 8 files changed, 26 insertions(+), 100 deletions(-) delete mode 100644 futures-util/src/future/try_future/into_future.rs diff --git a/futures-core/src/future.rs b/futures-core/src/future.rs index 7540cd027e..c9409ae039 100644 --- a/futures-core/src/future.rs +++ b/futures-core/src/future.rs @@ -55,7 +55,10 @@ mod private_try_future { /// A convenience for futures that return `Result` values that includes /// a variety of adapters tailored to such futures. -pub trait TryFuture: Future + private_try_future::Sealed { +pub trait TryFuture: + Future::Ok, ::Error>> + + private_try_future::Sealed +{ /// The type of successful values yielded by this future type Ok; diff --git a/futures-util/src/future/mod.rs b/futures-util/src/future/mod.rs index 1280ce9864..10eff2ce5b 100644 --- a/futures-util/src/future/mod.rs +++ b/futures-util/src/future/mod.rs @@ -40,8 +40,8 @@ pub use self::future::{Shared, WeakShared}; mod try_future; pub use self::try_future::{ - AndThen, ErrInto, InspectErr, InspectOk, IntoFuture, MapErr, MapOk, MapOkOrElse, OkInto, - OrElse, TryFlatten, TryFlattenStream, TryFutureExt, UnwrapOrElse, + AndThen, ErrInto, InspectErr, InspectOk, MapErr, MapOk, MapOkOrElse, OkInto, OrElse, + TryFlatten, TryFlattenStream, TryFutureExt, UnwrapOrElse, }; #[cfg(feature = "sink")] diff --git a/futures-util/src/future/try_future/into_future.rs b/futures-util/src/future/try_future/into_future.rs deleted file mode 100644 index 9f093d0e2e..0000000000 --- a/futures-util/src/future/try_future/into_future.rs +++ /dev/null @@ -1,36 +0,0 @@ -use core::pin::Pin; -use futures_core::future::{FusedFuture, Future, TryFuture}; -use futures_core::task::{Context, Poll}; -use pin_project_lite::pin_project; - -pin_project! { - /// Future for the [`into_future`](super::TryFutureExt::into_future) method. - #[derive(Debug)] - #[must_use = "futures do nothing unless you `.await` or poll them"] - pub struct IntoFuture { - #[pin] - future: Fut, - } -} - -impl IntoFuture { - #[inline] - pub(crate) fn new(future: Fut) -> Self { - Self { future } - } -} - -impl FusedFuture for IntoFuture { - fn is_terminated(&self) -> bool { - self.future.is_terminated() - } -} - -impl Future for IntoFuture { - type Output = Result; - - #[inline] - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.project().future.try_poll(cx) - } -} diff --git a/futures-util/src/future/try_future/mod.rs b/futures-util/src/future/try_future/mod.rs index 4ae544aa91..8e30860039 100644 --- a/futures-util/src/future/try_future/mod.rs +++ b/futures-util/src/future/try_future/mod.rs @@ -23,7 +23,6 @@ use crate::future::{assert_future, Inspect, Map}; use crate::stream::assert_stream; // Combinators -mod into_future; mod try_flatten; mod try_flatten_err; @@ -89,45 +88,43 @@ delegate_all!( delegate_all!( /// Future for the [`inspect_ok`](super::TryFutureExt::inspect_ok) method. InspectOk( - Inspect, InspectOkFn> - ): Debug + Future + FusedFuture + New[|x: Fut, f: F| Inspect::new(IntoFuture::new(x), inspect_ok_fn(f))] + Inspect> + ): Debug + Future + FusedFuture + New[|x: Fut, f: F| Inspect::new(x, inspect_ok_fn(f))] ); delegate_all!( /// Future for the [`inspect_err`](super::TryFutureExt::inspect_err) method. InspectErr( - Inspect, InspectErrFn> - ): Debug + Future + FusedFuture + New[|x: Fut, f: F| Inspect::new(IntoFuture::new(x), inspect_err_fn(f))] + Inspect> + ): Debug + Future + FusedFuture + New[|x: Fut, f: F| Inspect::new(x, inspect_err_fn(f))] ); -pub use self::into_future::IntoFuture; - delegate_all!( /// Future for the [`map_ok`](TryFutureExt::map_ok) method. MapOk( - Map, MapOkFn> - ): Debug + Future + FusedFuture + New[|x: Fut, f: F| Map::new(IntoFuture::new(x), map_ok_fn(f))] + Map> + ): Debug + Future + FusedFuture + New[|x: Fut, f: F| Map::new(x, map_ok_fn(f))] ); delegate_all!( /// Future for the [`map_err`](TryFutureExt::map_err) method. MapErr( - Map, MapErrFn> - ): Debug + Future + FusedFuture + New[|x: Fut, f: F| Map::new(IntoFuture::new(x), map_err_fn(f))] + Map> + ): Debug + Future + FusedFuture + New[|x: Fut, f: F| Map::new(x, map_err_fn(f))] ); delegate_all!( /// Future for the [`map_ok_or_else`](TryFutureExt::map_ok_or_else) method. MapOkOrElse( - Map, MapOkOrElseFn> - ): Debug + Future + FusedFuture + New[|x: Fut, f: F, g: G| Map::new(IntoFuture::new(x), map_ok_or_else_fn(f, g))] + Map> + ): Debug + Future + FusedFuture + New[|x: Fut, f: F, g: G| Map::new(x, map_ok_or_else_fn(f, g))] ); delegate_all!( /// Future for the [`unwrap_or_else`](TryFutureExt::unwrap_or_else) method. UnwrapOrElse( - Map, UnwrapOrElseFn> - ): Debug + Future + FusedFuture + New[|x: Fut, f: F| Map::new(IntoFuture::new(x), unwrap_or_else_fn(f))] + Map> + ): Debug + Future + FusedFuture + New[|x: Fut, f: F| Map::new(x, unwrap_or_else_fn(f))] ); impl TryFutureExt for Fut {} @@ -585,34 +582,6 @@ pub trait TryFutureExt: TryFuture { Compat::new(self) } - /// Wraps a [`TryFuture`] into a type that implements - /// [`Future`](std::future::Future). - /// - /// [`TryFuture`]s currently do not implement the - /// [`Future`](std::future::Future) trait due to limitations of the - /// compiler. - /// - /// # Examples - /// - /// ``` - /// use futures::future::{Future, TryFuture, TryFutureExt}; - /// - /// # type T = i32; - /// # type E = (); - /// fn make_try_future() -> impl TryFuture { // ... } - /// # async { Ok::(1) } - /// # } - /// fn take_future(future: impl Future>) { /* ... */ } - /// - /// take_future(make_try_future().into_future()); - /// ``` - fn into_future(self) -> IntoFuture - where - Self: Sized, - { - assert_future::, _>(IntoFuture::new(self)) - } - /// A convenience method for calling [`TryFuture::try_poll`] on [`Unpin`] /// future types. fn try_poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll> diff --git a/futures-util/src/future/try_join_all.rs b/futures-util/src/future/try_join_all.rs index 2d6a2a02cb..cbb2f5a072 100644 --- a/futures-util/src/future/try_join_all.rs +++ b/futures-util/src/future/try_join_all.rs @@ -10,11 +10,10 @@ use core::mem; use core::pin::Pin; use core::task::{Context, Poll}; -use super::{assert_future, join_all, IntoFuture, TryFuture, TryMaybeDone}; +use super::{assert_future, join_all, TryFuture, TryMaybeDone}; #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] use crate::stream::{FuturesOrdered, TryCollect, TryStreamExt}; -use crate::TryFutureExt; enum FinalState { Pending, @@ -36,11 +35,11 @@ where F: TryFuture, { Small { - elems: Pin>]>>, + elems: Pin]>>, }, #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] Big { - fut: TryCollect>, Vec>, + fut: TryCollect, Vec>, }, } @@ -119,7 +118,7 @@ where I: IntoIterator, I::Item: TryFuture, { - let iter = iter.into_iter().map(TryFutureExt::into_future); + let iter = iter.into_iter(); #[cfg(target_os = "none")] #[cfg_attr(target_os = "none", cfg(not(target_has_atomic = "ptr")))] diff --git a/futures-util/src/stream/try_stream/try_buffer_unordered.rs b/futures-util/src/stream/try_stream/try_buffer_unordered.rs index 21b00b20ac..1d3c38b1d9 100644 --- a/futures-util/src/stream/try_stream/try_buffer_unordered.rs +++ b/futures-util/src/stream/try_stream/try_buffer_unordered.rs @@ -1,4 +1,3 @@ -use crate::future::{IntoFuture, TryFutureExt}; use crate::stream::{Fuse, FuturesUnordered, IntoStream, StreamExt}; use core::num::NonZeroUsize; use core::pin::Pin; @@ -19,7 +18,7 @@ pin_project! { { #[pin] stream: Fuse>, - in_progress_queue: FuturesUnordered>, + in_progress_queue: FuturesUnordered, max: Option, } } @@ -54,7 +53,7 @@ where // our queue of futures. Propagate errors from the stream immediately. while this.max.map(|max| this.in_progress_queue.len() < max.get()).unwrap_or(true) { match this.stream.as_mut().poll_next(cx)? { - Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut.into_future()), + Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut), Poll::Ready(None) | Poll::Pending => break, } } diff --git a/futures-util/src/stream/try_stream/try_buffered.rs b/futures-util/src/stream/try_stream/try_buffered.rs index 83e9093fbc..6b9dd0f7ec 100644 --- a/futures-util/src/stream/try_stream/try_buffered.rs +++ b/futures-util/src/stream/try_stream/try_buffered.rs @@ -1,4 +1,3 @@ -use crate::future::{IntoFuture, TryFutureExt}; use crate::stream::{Fuse, FuturesOrdered, IntoStream, StreamExt}; use core::num::NonZeroUsize; use core::pin::Pin; @@ -20,7 +19,7 @@ pin_project! { { #[pin] stream: Fuse>, - in_progress_queue: FuturesOrdered>, + in_progress_queue: FuturesOrdered, max: Option, } } @@ -55,7 +54,7 @@ where // our queue of futures. Propagate errors from the stream immediately. while this.max.map(|max| this.in_progress_queue.len() < max.get()).unwrap_or(true) { match this.stream.as_mut().poll_next(cx)? { - Poll::Ready(Some(fut)) => this.in_progress_queue.push_back(fut.into_future()), + Poll::Ready(Some(fut)) => this.in_progress_queue.push_back(fut), Poll::Ready(None) | Poll::Pending => break, } } diff --git a/futures/tests/auto_traits.rs b/futures/tests/auto_traits.rs index 8d15fa28ef..75ed663f9e 100644 --- a/futures/tests/auto_traits.rs +++ b/futures/tests/auto_traits.rs @@ -337,13 +337,6 @@ mod future { assert_impl!(InspectOk: Unpin); assert_not_impl!(InspectOk: Unpin); - assert_impl!(IntoFuture: Send); - assert_not_impl!(IntoFuture: Send); - assert_impl!(IntoFuture: Sync); - assert_not_impl!(IntoFuture: Sync); - assert_impl!(IntoFuture: Unpin); - assert_not_impl!(IntoFuture: Unpin); - assert_impl!(IntoStream: Send); assert_not_impl!(IntoStream: Send); assert_impl!(IntoStream: Sync); From 9ddeb5b681a63845f1d71ecacc62a3bb34205d2b Mon Sep 17 00:00:00 2001 From: yhx-12243 Date: Sun, 18 Aug 2024 20:59:33 -0400 Subject: [PATCH 2/2] refactor: remove `into_stream` and `IntoStream` --- futures-core/src/stream.rs | 5 +- futures-util/src/stream/mod.rs | 4 +- .../src/stream/try_stream/into_stream.rs | 52 ------------------- futures-util/src/stream/try_stream/mod.rs | 47 +++-------------- .../stream/try_stream/try_buffer_unordered.rs | 8 +-- .../src/stream/try_stream/try_buffered.rs | 8 +-- .../src/stream/try_stream/try_chunks.rs | 12 ++--- .../try_stream/try_flatten_unordered.rs | 6 +-- .../src/stream/try_stream/try_forward.rs | 6 +-- .../src/stream/try_stream/try_ready_chunks.rs | 8 +-- futures/tests/auto_traits.rs | 7 --- 11 files changed, 35 insertions(+), 128 deletions(-) delete mode 100644 futures-util/src/stream/try_stream/into_stream.rs diff --git a/futures-core/src/stream.rs b/futures-core/src/stream.rs index ad5350b795..79aa5cba5e 100644 --- a/futures-core/src/stream.rs +++ b/futures-core/src/stream.rs @@ -164,7 +164,10 @@ mod private_try_stream { /// A convenience for streams that return `Result` values that includes /// a variety of adapters tailored to such futures. -pub trait TryStream: Stream + private_try_stream::Sealed { +pub trait TryStream: + Stream::Ok, ::Error>> + + private_try_stream::Sealed +{ /// The type of successful values yielded by this future type Ok; diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index 789e1ad221..23d28bf445 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -52,8 +52,8 @@ pub use self::stream::{ReuniteError, SplitSink, SplitStream}; mod try_stream; pub use self::try_stream::{ - try_unfold, AndThen, ErrInto, InspectErr, InspectOk, IntoStream, MapErr, MapOk, OrElse, TryAll, - TryAny, TryCollect, TryConcat, TryFilter, TryFilterMap, TryFlatten, TryNext, TrySkipWhile, + try_unfold, AndThen, ErrInto, InspectErr, InspectOk, MapErr, MapOk, OrElse, TryAll, TryAny, + TryCollect, TryConcat, TryFilter, TryFilterMap, TryFlatten, TryNext, TrySkipWhile, TryStreamExt, TryTakeWhile, TryUnfold, }; diff --git a/futures-util/src/stream/try_stream/into_stream.rs b/futures-util/src/stream/try_stream/into_stream.rs deleted file mode 100644 index 2126258af7..0000000000 --- a/futures-util/src/stream/try_stream/into_stream.rs +++ /dev/null @@ -1,52 +0,0 @@ -use core::pin::Pin; -use futures_core::stream::{FusedStream, Stream, TryStream}; -use futures_core::task::{Context, Poll}; -#[cfg(feature = "sink")] -use futures_sink::Sink; -use pin_project_lite::pin_project; - -pin_project! { - /// Stream for the [`into_stream`](super::TryStreamExt::into_stream) method. - #[derive(Debug)] - #[must_use = "streams do nothing unless polled"] - pub struct IntoStream { - #[pin] - stream: St, - } -} - -impl IntoStream { - #[inline] - pub(super) fn new(stream: St) -> Self { - Self { stream } - } - - delegate_access_inner!(stream, St, ()); -} - -impl FusedStream for IntoStream { - fn is_terminated(&self) -> bool { - self.stream.is_terminated() - } -} - -impl Stream for IntoStream { - type Item = Result; - - #[inline] - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().stream.try_poll_next(cx) - } - - fn size_hint(&self) -> (usize, Option) { - self.stream.size_hint() - } -} - -// Forwarding impl of Sink from the underlying stream -#[cfg(feature = "sink")] -impl, Item> Sink for IntoStream { - type Error = S::Error; - - delegate_sink!(stream, Item); -} diff --git a/futures-util/src/stream/try_stream/mod.rs b/futures-util/src/stream/try_stream/mod.rs index fe9317d7a0..970ee0c64e 100644 --- a/futures-util/src/stream/try_stream/mod.rs +++ b/futures-util/src/stream/try_stream/mod.rs @@ -36,32 +36,29 @@ delegate_all!( delegate_all!( /// Stream for the [`inspect_ok`](super::TryStreamExt::inspect_ok) method. InspectOk( - Inspect, InspectOkFn> - ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Inspect::new(IntoStream::new(x), inspect_ok_fn(f))] + Inspect> + ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St, f: F| Inspect::new(x, inspect_ok_fn(f))] ); delegate_all!( /// Stream for the [`inspect_err`](super::TryStreamExt::inspect_err) method. InspectErr( - Inspect, InspectErrFn> - ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Inspect::new(IntoStream::new(x), inspect_err_fn(f))] + Inspect> + ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St, f: F| Inspect::new(x, inspect_err_fn(f))] ); -mod into_stream; -pub use self::into_stream::IntoStream; - delegate_all!( /// Stream for the [`map_ok`](super::TryStreamExt::map_ok) method. MapOk( - Map, MapOkFn> - ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Map::new(IntoStream::new(x), map_ok_fn(f))] + Map> + ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St, f: F| Map::new(x, map_ok_fn(f))] ); delegate_all!( /// Stream for the [`map_err`](super::TryStreamExt::map_err) method. MapErr( - Map, MapErrFn> - ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Map::new(IntoStream::new(x), map_err_fn(f))] + Map> + ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St, f: F| Map::new(x, map_err_fn(f))] ); mod or_else; @@ -352,34 +349,6 @@ pub trait TryStreamExt: TryStream { assert_stream::, _>(InspectErr::new(self, f)) } - /// Wraps a [`TryStream`] into a type that implements - /// [`Stream`](futures_core::stream::Stream) - /// - /// [`TryStream`]s currently do not implement the - /// [`Stream`](futures_core::stream::Stream) trait because of limitations - /// of the compiler. - /// - /// # Examples - /// - /// ``` - /// use futures::stream::{Stream, TryStream, TryStreamExt}; - /// - /// # type T = i32; - /// # type E = (); - /// fn make_try_stream() -> impl TryStream { // ... } - /// # futures::stream::empty() - /// # } - /// fn take_stream(stream: impl Stream>) { /* ... */ } - /// - /// take_stream(make_try_stream().into_stream()); - /// ``` - fn into_stream(self) -> IntoStream - where - Self: Sized, - { - assert_stream::, _>(IntoStream::new(self)) - } - /// Creates a future that attempts to resolve the next item in the stream. /// If an error is encountered before the next item, the error is returned /// instead. diff --git a/futures-util/src/stream/try_stream/try_buffer_unordered.rs b/futures-util/src/stream/try_stream/try_buffer_unordered.rs index 1d3c38b1d9..af6994912e 100644 --- a/futures-util/src/stream/try_stream/try_buffer_unordered.rs +++ b/futures-util/src/stream/try_stream/try_buffer_unordered.rs @@ -1,4 +1,4 @@ -use crate::stream::{Fuse, FuturesUnordered, IntoStream, StreamExt}; +use crate::stream::{Fuse, FuturesUnordered, StreamExt}; use core::num::NonZeroUsize; use core::pin::Pin; use futures_core::future::TryFuture; @@ -17,7 +17,7 @@ pin_project! { where St: TryStream { #[pin] - stream: Fuse>, + stream: Fuse, in_progress_queue: FuturesUnordered, max: Option, } @@ -30,13 +30,13 @@ where { pub(super) fn new(stream: St, n: Option) -> Self { Self { - stream: IntoStream::new(stream).fuse(), + stream: stream.fuse(), in_progress_queue: FuturesUnordered::new(), max: n.and_then(NonZeroUsize::new), } } - delegate_access_inner!(stream, St, (. .)); + delegate_access_inner!(stream, St, (.)); } impl Stream for TryBufferUnordered diff --git a/futures-util/src/stream/try_stream/try_buffered.rs b/futures-util/src/stream/try_stream/try_buffered.rs index 6b9dd0f7ec..4c98615eb0 100644 --- a/futures-util/src/stream/try_stream/try_buffered.rs +++ b/futures-util/src/stream/try_stream/try_buffered.rs @@ -1,4 +1,4 @@ -use crate::stream::{Fuse, FuturesOrdered, IntoStream, StreamExt}; +use crate::stream::{Fuse, FuturesOrdered, StreamExt}; use core::num::NonZeroUsize; use core::pin::Pin; use futures_core::future::TryFuture; @@ -18,7 +18,7 @@ pin_project! { St::Ok: TryFuture, { #[pin] - stream: Fuse>, + stream: Fuse, in_progress_queue: FuturesOrdered, max: Option, } @@ -31,13 +31,13 @@ where { pub(super) fn new(stream: St, n: Option) -> Self { Self { - stream: IntoStream::new(stream).fuse(), + stream: stream.fuse(), in_progress_queue: FuturesOrdered::new(), max: n.and_then(NonZeroUsize::new), } } - delegate_access_inner!(stream, St, (. .)); + delegate_access_inner!(stream, St, (.)); } impl Stream for TryBuffered diff --git a/futures-util/src/stream/try_stream/try_chunks.rs b/futures-util/src/stream/try_stream/try_chunks.rs index ec53f4bd11..a6184ee44a 100644 --- a/futures-util/src/stream/try_stream/try_chunks.rs +++ b/futures-util/src/stream/try_stream/try_chunks.rs @@ -1,4 +1,4 @@ -use crate::stream::{Fuse, IntoStream, StreamExt}; +use crate::stream::{Fuse, StreamExt}; use alloc::vec::Vec; use core::pin::Pin; @@ -16,7 +16,7 @@ pin_project! { #[must_use = "streams do nothing unless polled"] pub struct TryChunks { #[pin] - stream: Fuse>, + stream: Fuse, items: Vec, cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 } @@ -26,11 +26,7 @@ impl TryChunks { pub(super) fn new(stream: St, capacity: usize) -> Self { assert!(capacity > 0); - Self { - stream: IntoStream::new(stream).fuse(), - items: Vec::with_capacity(capacity), - cap: capacity, - } + Self { stream: stream.fuse(), items: Vec::with_capacity(capacity), cap: capacity } } fn take(self: Pin<&mut Self>) -> Vec { @@ -38,7 +34,7 @@ impl TryChunks { mem::replace(self.project().items, Vec::with_capacity(cap)) } - delegate_access_inner!(stream, St, (. .)); + delegate_access_inner!(stream, St, (.)); } type TryChunksStreamError = TryChunksError<::Ok, ::Error>; diff --git a/futures-util/src/stream/try_stream/try_flatten_unordered.rs b/futures-util/src/stream/try_stream/try_flatten_unordered.rs index a74dfc451d..9987b0dc3a 100644 --- a/futures-util/src/stream/try_stream/try_flatten_unordered.rs +++ b/futures-util/src/stream/try_stream/try_flatten_unordered.rs @@ -13,8 +13,6 @@ use crate::future::Either; use crate::stream::stream::flatten_unordered::{ FlattenUnorderedWithFlowController, FlowController, FlowStep, }; -use crate::stream::IntoStream; -use crate::TryStreamExt; delegate_all!( /// Stream for the [`try_flatten_unordered`](super::TryStreamExt::try_flatten_unordered) method. @@ -128,7 +126,7 @@ where { // Item is either an inner stream or a stream containing a single error. // This will allow using `Either`'s `Stream` implementation as both branches are actually streams of `Result`'s. - type Item = Either, SingleStreamResult>; + type Item = Either>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let item = ready!(self.project().stream.try_poll_next(cx)); @@ -136,7 +134,7 @@ where let out = match item { Some(res) => match res { // Emit successful inner stream as is - Ok(stream) => Either::Left(stream.into_stream()), + Ok(stream) => Either::Left(stream), // Wrap an error into a stream containing a single item err @ Err(_) => { let res = err.map(|_: St::Ok| unreachable!()).map_err(Into::into); diff --git a/futures-util/src/stream/try_stream/try_forward.rs b/futures-util/src/stream/try_stream/try_forward.rs index 52c28afa34..74cc33d4e9 100644 --- a/futures-util/src/stream/try_stream/try_forward.rs +++ b/futures-util/src/stream/try_stream/try_forward.rs @@ -1,4 +1,4 @@ -use crate::stream::{Fuse, IntoStream, Stream, TryStream}; +use crate::stream::{Fuse, Stream, TryStream}; use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::ready; @@ -15,14 +15,14 @@ pin_project! { #[pin] sink: Option, #[pin] - stream: Fuse>, + stream: Fuse, buffered_item: Option, } } impl TryForward { pub(crate) fn new(stream: St, sink: Si) -> Self { - Self { sink: Some(sink), stream: Fuse::new(IntoStream::new(stream)), buffered_item: None } + Self { sink: Some(sink), stream: Fuse::new(stream), buffered_item: None } } } diff --git a/futures-util/src/stream/try_stream/try_ready_chunks.rs b/futures-util/src/stream/try_stream/try_ready_chunks.rs index 8b1470ea26..8910790935 100644 --- a/futures-util/src/stream/try_stream/try_ready_chunks.rs +++ b/futures-util/src/stream/try_stream/try_ready_chunks.rs @@ -1,4 +1,4 @@ -use crate::stream::{Fuse, IntoStream, StreamExt}; +use crate::stream::{Fuse, StreamExt}; use alloc::vec::Vec; use core::fmt; @@ -15,7 +15,7 @@ pin_project! { #[must_use = "streams do nothing unless polled"] pub struct TryReadyChunks { #[pin] - stream: Fuse>, + stream: Fuse, cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 } } @@ -24,10 +24,10 @@ impl TryReadyChunks { pub(super) fn new(stream: St, capacity: usize) -> Self { assert!(capacity > 0); - Self { stream: IntoStream::new(stream).fuse(), cap: capacity } + Self { stream: stream.fuse(), cap: capacity } } - delegate_access_inner!(stream, St, (. .)); + delegate_access_inner!(stream, St, (.)); } type TryReadyChunksStreamError = diff --git a/futures/tests/auto_traits.rs b/futures/tests/auto_traits.rs index 75ed663f9e..43ba31a4f6 100644 --- a/futures/tests/auto_traits.rs +++ b/futures/tests/auto_traits.rs @@ -1361,13 +1361,6 @@ mod stream { // IntoAsyncRead requires `St: Unpin` // assert_not_impl!(IntoAsyncRead, io::Error>>: Unpin); - assert_impl!(IntoStream<()>: Send); - assert_not_impl!(IntoStream<*const ()>: Send); - assert_impl!(IntoStream<()>: Sync); - assert_not_impl!(IntoStream<*const ()>: Sync); - assert_impl!(IntoStream<()>: Unpin); - assert_not_impl!(IntoStream: Unpin); - assert_impl!(Iter<()>: Send); assert_not_impl!(Iter<*const ()>: Send); assert_impl!(Iter<()>: Sync);