From 0882378abf59dc0c42f5bd2da33846accf6a08c5 Mon Sep 17 00:00:00 2001 From: Yosh Date: Sun, 11 Jun 2023 21:59:58 +0200 Subject: [PATCH 1/6] Update array.rs --- src/future/join/array.rs | 46 ++++++++++++++++++++++++++++++++++------ 1 file changed, 40 insertions(+), 6 deletions(-) diff --git a/src/future/join/array.rs b/src/future/join/array.rs index e96a395..c4bde86 100644 --- a/src/future/join/array.rs +++ b/src/future/join/array.rs @@ -10,6 +10,14 @@ use core::task::{Context, Poll}; use pin_project::{pin_project, pinned_drop}; +/// Cast an array of `T` to an array of `MaybeUninit` +fn array_to_maybe_uninit(arr: [T; N]) -> [MaybeUninit; N] { + // Implementation copied from: https://doc.rust-lang.org/src/core/mem/maybe_uninit.rs.html#1292 + let arr = MaybeUninit::new(arr); + // SAFETY: T and MaybeUninit have the same layout + unsafe { mem::transmute_copy(&mem::ManuallyDrop::new(arr)) } +} + /// A future which waits for two similarly-typed futures to complete. /// /// This `struct` is created by the [`join`] method on the [`Join`] trait. See @@ -29,7 +37,7 @@ where wakers: WakerArray, state: PollArray, #[pin] - futures: [Fut; N], + futures: [MaybeUninit; N], } impl Join @@ -44,7 +52,7 @@ where items: array::from_fn(|_| MaybeUninit::uninit()), wakers: WakerArray::new(), state: PollArray::new(), - futures, + futures: array_to_maybe_uninit(futures), } } } @@ -94,7 +102,7 @@ where } // Poll all ready futures - for (i, fut) in utils::iter_pin_mut(this.futures.as_mut()).enumerate() { + for (i, mut fut) in utils::iter_pin_mut(this.futures.as_mut()).enumerate() { if this.state[i].is_pending() && readiness.clear_ready(i) { // unlock readiness so we don't deadlock when polling drop(readiness); @@ -102,12 +110,23 @@ where // Obtain the intermediate waker. let mut cx = Context::from_waker(this.wakers.get(i).unwrap()); - if let Poll::Ready(value) = fut.poll(&mut cx) { + // Poll the future + if let Poll::Ready(value) = unsafe { + fut.as_mut() + .map_unchecked_mut(|t| t.assume_init_mut()) + .poll(&mut cx) + } { this.items[i] = MaybeUninit::new(value); this.state[i].set_ready(); *this.pending -= 1; } + // If the state was changed from "pending" to "ready", drop the future. + if this.state[i].is_ready() { + // SAFETY: we're done with the future, drop in-place + unsafe { fut.get_unchecked_mut().assume_init_drop() }; + } + // Lock readiness so we can use it again readiness = this.wakers.readiness().lock().unwrap(); } @@ -145,9 +164,9 @@ where Fut: Future, { fn drop(self: Pin<&mut Self>) { - let this = self.project(); + let mut this = self.project(); - // Get the indexes of the initialized values. + // Get the indexes of the initialized output values. let indexes = this .state .iter_mut() @@ -161,6 +180,21 @@ where // We can assume they're initialized, and this is where we drop them. unsafe { this.items[i].assume_init_drop() }; } + + // Get the indexes of the pending futures. + let indexes = this + .state + .iter_mut() + .enumerate() + .filter(|(_, state)| state.is_pending()) + .map(|(i, _)| i); + + // Drop each future at the index. + for i in indexes { + // SAFETY: we've just filtered down to *only* the pending futures, + // which have not yet been dropped. + unsafe { this.futures.as_mut().get_unchecked_mut()[i].assume_init_drop() }; + } } } From dee07a4326af088cbe26207ab09e9edd7491ef62 Mon Sep 17 00:00:00 2001 From: Yosh Date: Sun, 11 Jun 2023 22:01:21 +0200 Subject: [PATCH 2/6] move "array_to_maybe_uninit" to utils --- src/future/join/array.rs | 10 +--------- src/utils/array.rs | 8 ++++++++ src/utils/mod.rs | 2 +- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/future/join/array.rs b/src/future/join/array.rs index c4bde86..ecd1bbe 100644 --- a/src/future/join/array.rs +++ b/src/future/join/array.rs @@ -1,5 +1,5 @@ use super::Join as JoinTrait; -use crate::utils::{self, PollArray, WakerArray}; +use crate::utils::{self, array_to_maybe_uninit, PollArray, WakerArray}; use core::array; use core::fmt; @@ -10,14 +10,6 @@ use core::task::{Context, Poll}; use pin_project::{pin_project, pinned_drop}; -/// Cast an array of `T` to an array of `MaybeUninit` -fn array_to_maybe_uninit(arr: [T; N]) -> [MaybeUninit; N] { - // Implementation copied from: https://doc.rust-lang.org/src/core/mem/maybe_uninit.rs.html#1292 - let arr = MaybeUninit::new(arr); - // SAFETY: T and MaybeUninit have the same layout - unsafe { mem::transmute_copy(&mem::ManuallyDrop::new(arr)) } -} - /// A future which waits for two similarly-typed futures to complete. /// /// This `struct` is created by the [`join`] method on the [`Join`] trait. See diff --git a/src/utils/array.rs b/src/utils/array.rs index 8eeaef5..2fb19c7 100644 --- a/src/utils/array.rs +++ b/src/utils/array.rs @@ -20,3 +20,11 @@ pub(crate) unsafe fn array_assume_init(array: [MaybeUninit mem::forget(array); ret } + +/// Cast an array of `T` to an array of `MaybeUninit` +pub(crate) fn array_to_maybe_uninit(arr: [T; N]) -> [MaybeUninit; N] { + // Implementation copied from: https://doc.rust-lang.org/src/core/mem/maybe_uninit.rs.html#1292 + let arr = MaybeUninit::new(arr); + // SAFETY: T and MaybeUninit have the same layout + unsafe { mem::transmute_copy(&mem::ManuallyDrop::new(arr)) } +} diff --git a/src/utils/mod.rs b/src/utils/mod.rs index ac5d38e..d945d41 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -7,7 +7,7 @@ mod poll_state; mod tuple; mod wakers; -pub(crate) use array::array_assume_init; +pub(crate) use array::{array_assume_init, array_to_maybe_uninit}; pub(crate) use indexer::Indexer; pub(crate) use pin::{get_pin_mut, get_pin_mut_from_vec, iter_pin_mut, iter_pin_mut_vec}; pub(crate) use poll_state::MaybeDone; From 8a16d9200efc871b274adb9834b0f4f901713716 Mon Sep 17 00:00:00 2001 From: Yosh Date: Sun, 11 Jun 2023 22:10:38 +0200 Subject: [PATCH 3/6] add additional safety note --- src/future/join/array.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/future/join/array.rs b/src/future/join/array.rs index ecd1bbe..d0c373e 100644 --- a/src/future/join/array.rs +++ b/src/future/join/array.rs @@ -103,6 +103,7 @@ where let mut cx = Context::from_waker(this.wakers.get(i).unwrap()); // Poll the future + // SAFETY: we checked the future state was "pending" if let Poll::Ready(value) = unsafe { fut.as_mut() .map_unchecked_mut(|t| t.assume_init_mut()) From 6070083caa0dd2aba27b93629920c5fcba6bf6b5 Mon Sep 17 00:00:00 2001 From: Yosh Date: Mon, 12 Jun 2023 02:05:27 +0200 Subject: [PATCH 4/6] use ManuallyDrop --- src/future/join/array.rs | 18 +++++++++++------- src/utils/array.rs | 4 ++-- src/utils/mod.rs | 2 +- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/src/future/join/array.rs b/src/future/join/array.rs index d0c373e..912c213 100644 --- a/src/future/join/array.rs +++ b/src/future/join/array.rs @@ -1,12 +1,13 @@ use super::Join as JoinTrait; -use crate::utils::{self, array_to_maybe_uninit, PollArray, WakerArray}; +use crate::utils::{self, array_to_manually_drop, PollArray, WakerArray}; use core::array; use core::fmt; use core::future::{Future, IntoFuture}; -use core::mem::{self, MaybeUninit}; +use core::mem::{self, ManuallyDrop, MaybeUninit}; use core::pin::Pin; use core::task::{Context, Poll}; +use std::ops::DerefMut; use pin_project::{pin_project, pinned_drop}; @@ -29,7 +30,7 @@ where wakers: WakerArray, state: PollArray, #[pin] - futures: [MaybeUninit; N], + futures: [ManuallyDrop; N], } impl Join @@ -44,7 +45,7 @@ where items: array::from_fn(|_| MaybeUninit::uninit()), wakers: WakerArray::new(), state: PollArray::new(), - futures: array_to_maybe_uninit(futures), + futures: array_to_manually_drop(futures), } } } @@ -106,7 +107,7 @@ where // SAFETY: we checked the future state was "pending" if let Poll::Ready(value) = unsafe { fut.as_mut() - .map_unchecked_mut(|t| t.assume_init_mut()) + .map_unchecked_mut(|t| t.deref_mut()) .poll(&mut cx) } { this.items[i] = MaybeUninit::new(value); @@ -117,7 +118,7 @@ where // If the state was changed from "pending" to "ready", drop the future. if this.state[i].is_ready() { // SAFETY: we're done with the future, drop in-place - unsafe { fut.get_unchecked_mut().assume_init_drop() }; + unsafe { ManuallyDrop::drop(fut.get_unchecked_mut()) }; } // Lock readiness so we can use it again @@ -186,7 +187,10 @@ where for i in indexes { // SAFETY: we've just filtered down to *only* the pending futures, // which have not yet been dropped. - unsafe { this.futures.as_mut().get_unchecked_mut()[i].assume_init_drop() }; + unsafe { + let futures = this.futures.as_mut().get_unchecked_mut(); + ManuallyDrop::drop(&mut futures[i]); + }; } } } diff --git a/src/utils/array.rs b/src/utils/array.rs index 2fb19c7..35fd0b0 100644 --- a/src/utils/array.rs +++ b/src/utils/array.rs @@ -1,4 +1,4 @@ -use std::mem::{self, MaybeUninit}; +use std::mem::{self, ManuallyDrop, MaybeUninit}; /// Extracts the values from an array of `MaybeUninit` containers. /// @@ -22,7 +22,7 @@ pub(crate) unsafe fn array_assume_init(array: [MaybeUninit } /// Cast an array of `T` to an array of `MaybeUninit` -pub(crate) fn array_to_maybe_uninit(arr: [T; N]) -> [MaybeUninit; N] { +pub(crate) fn array_to_manually_drop(arr: [T; N]) -> [ManuallyDrop; N] { // Implementation copied from: https://doc.rust-lang.org/src/core/mem/maybe_uninit.rs.html#1292 let arr = MaybeUninit::new(arr); // SAFETY: T and MaybeUninit have the same layout diff --git a/src/utils/mod.rs b/src/utils/mod.rs index d945d41..33b91cf 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -7,7 +7,7 @@ mod poll_state; mod tuple; mod wakers; -pub(crate) use array::{array_assume_init, array_to_maybe_uninit}; +pub(crate) use array::{array_assume_init, array_to_manually_drop}; pub(crate) use indexer::Indexer; pub(crate) use pin::{get_pin_mut, get_pin_mut_from_vec, iter_pin_mut, iter_pin_mut_vec}; pub(crate) use poll_state::MaybeDone; From 0d27cd839e541cfd75596f2a3b0de79b4e85acac Mon Sep 17 00:00:00 2001 From: Yosh Date: Mon, 12 Jun 2023 02:09:36 +0200 Subject: [PATCH 5/6] slim down code branches --- src/future/join/array.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/future/join/array.rs b/src/future/join/array.rs index 912c213..d397adb 100644 --- a/src/future/join/array.rs +++ b/src/future/join/array.rs @@ -104,7 +104,7 @@ where let mut cx = Context::from_waker(this.wakers.get(i).unwrap()); // Poll the future - // SAFETY: we checked the future state was "pending" + // SAFETY: the future's state was "pending", so it's safe to poll if let Poll::Ready(value) = unsafe { fut.as_mut() .map_unchecked_mut(|t| t.deref_mut()) @@ -113,11 +113,8 @@ where this.items[i] = MaybeUninit::new(value); this.state[i].set_ready(); *this.pending -= 1; - } - - // If the state was changed from "pending" to "ready", drop the future. - if this.state[i].is_ready() { - // SAFETY: we're done with the future, drop in-place + // SAFETY: the future state has been changed to "ready" which + // means we'll no longer poll the future, so it's safe to drop unsafe { ManuallyDrop::drop(fut.get_unchecked_mut()) }; } From 0bd7d99d20b2865826f9bf2ee433fd57bd644a92 Mon Sep 17 00:00:00 2001 From: Yosh Date: Mon, 12 Jun 2023 02:57:52 +0200 Subject: [PATCH 6/6] Move `ManuallyDrop` to a separate structure --- src/future/join/array.rs | 22 +++++++++++-------- src/utils/array.rs | 10 +-------- src/utils/futures/array.rs | 44 ++++++++++++++++++++++++++++++++++++++ src/utils/futures/mod.rs | 3 +++ src/utils/mod.rs | 4 +++- 5 files changed, 64 insertions(+), 19 deletions(-) create mode 100644 src/utils/futures/array.rs create mode 100644 src/utils/futures/mod.rs diff --git a/src/future/join/array.rs b/src/future/join/array.rs index d397adb..800b222 100644 --- a/src/future/join/array.rs +++ b/src/future/join/array.rs @@ -1,5 +1,5 @@ use super::Join as JoinTrait; -use crate::utils::{self, array_to_manually_drop, PollArray, WakerArray}; +use crate::utils::{self, FutureArray, PollArray, WakerArray}; use core::array; use core::fmt; @@ -24,13 +24,20 @@ pub struct Join where Fut: Future, { + /// A boolean which holds whether the future has completed consumed: bool, + /// The number of futures which are currently still in-flight pending: usize, + /// The output data, to be returned after the future completes items: [MaybeUninit<::Output>; N], + /// A structure holding the waker passed to the future, and the various + /// sub-wakers passed to the contained futures. wakers: WakerArray, + /// The individual poll state of each future. state: PollArray, #[pin] - futures: [ManuallyDrop; N], + /// The array of futures passed to the structure. + futures: FutureArray, } impl Join @@ -45,7 +52,7 @@ where items: array::from_fn(|_| MaybeUninit::uninit()), wakers: WakerArray::new(), state: PollArray::new(), - futures: array_to_manually_drop(futures), + futures: FutureArray::new(futures), } } } @@ -80,7 +87,7 @@ where #[inline] fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.project(); + let this = self.project(); assert!( !*this.consumed, @@ -95,7 +102,7 @@ where } // Poll all ready futures - for (i, mut fut) in utils::iter_pin_mut(this.futures.as_mut()).enumerate() { + for (i, mut fut) in this.futures.iter().enumerate() { if this.state[i].is_pending() && readiness.clear_ready(i) { // unlock readiness so we don't deadlock when polling drop(readiness); @@ -184,10 +191,7 @@ where for i in indexes { // SAFETY: we've just filtered down to *only* the pending futures, // which have not yet been dropped. - unsafe { - let futures = this.futures.as_mut().get_unchecked_mut(); - ManuallyDrop::drop(&mut futures[i]); - }; + unsafe { this.futures.as_mut().drop(i) }; } } } diff --git a/src/utils/array.rs b/src/utils/array.rs index 35fd0b0..8eeaef5 100644 --- a/src/utils/array.rs +++ b/src/utils/array.rs @@ -1,4 +1,4 @@ -use std::mem::{self, ManuallyDrop, MaybeUninit}; +use std::mem::{self, MaybeUninit}; /// Extracts the values from an array of `MaybeUninit` containers. /// @@ -20,11 +20,3 @@ pub(crate) unsafe fn array_assume_init(array: [MaybeUninit mem::forget(array); ret } - -/// Cast an array of `T` to an array of `MaybeUninit` -pub(crate) fn array_to_manually_drop(arr: [T; N]) -> [ManuallyDrop; N] { - // Implementation copied from: https://doc.rust-lang.org/src/core/mem/maybe_uninit.rs.html#1292 - let arr = MaybeUninit::new(arr); - // SAFETY: T and MaybeUninit have the same layout - unsafe { mem::transmute_copy(&mem::ManuallyDrop::new(arr)) } -} diff --git a/src/utils/futures/array.rs b/src/utils/futures/array.rs new file mode 100644 index 0000000..111afd3 --- /dev/null +++ b/src/utils/futures/array.rs @@ -0,0 +1,44 @@ +use std::{ + mem::{self, ManuallyDrop, MaybeUninit}, + pin::Pin, +}; + +/// An array of futures which can be dropped in-place, intended to be +/// constructed once and then accessed through pin projections. +pub(crate) struct FutureArray { + futures: [ManuallyDrop; N], +} + +impl FutureArray { + /// Create a new instance of `FutureArray` + pub(crate) fn new(futures: [T; N]) -> Self { + // Implementation copied from: https://doc.rust-lang.org/src/core/mem/maybe_uninit.rs.html#1292 + let futures = MaybeUninit::new(futures); + // SAFETY: T and MaybeUninit have the same layout + let futures = unsafe { mem::transmute_copy(&mem::ManuallyDrop::new(futures)) }; + Self { futures } + } + + /// Create an iterator of pinned references. + pub(crate) fn iter(self: Pin<&mut Self>) -> impl Iterator>> { + // SAFETY: `std` _could_ make this unsound if it were to decide Pin's + // invariants aren't required to transmit through slices. Otherwise this has + // the same safety as a normal field pin projection. + unsafe { self.get_unchecked_mut() } + .futures + .iter_mut() + .map(|t| unsafe { Pin::new_unchecked(t) }) + } + + /// Drop a future at the given index. + /// + /// # Safety + /// + /// The future is held in a `ManuallyDrop`, so no double-dropping, etc + pub(crate) unsafe fn drop(mut self: Pin<&mut Self>, idx: usize) { + unsafe { + let futures = self.as_mut().get_unchecked_mut().futures.as_mut(); + ManuallyDrop::drop(&mut futures[idx]); + }; + } +} diff --git a/src/utils/futures/mod.rs b/src/utils/futures/mod.rs new file mode 100644 index 0000000..8658e34 --- /dev/null +++ b/src/utils/futures/mod.rs @@ -0,0 +1,3 @@ +mod array; + +pub(crate) use array::FutureArray; diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 33b91cf..b8c6073 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,13 +1,15 @@ //! Utilities to implement the different futures of this crate. mod array; +mod futures; mod indexer; mod pin; mod poll_state; mod tuple; mod wakers; -pub(crate) use array::{array_assume_init, array_to_manually_drop}; +pub(crate) use self::futures::FutureArray; +pub(crate) use array::array_assume_init; pub(crate) use indexer::Indexer; pub(crate) use pin::{get_pin_mut, get_pin_mut_from_vec, iter_pin_mut, iter_pin_mut_vec}; pub(crate) use poll_state::MaybeDone;