diff --git a/src/future/join/vec.rs b/src/future/join/vec.rs index e578a82..a995987 100644 --- a/src/future/join/vec.rs +++ b/src/future/join/vec.rs @@ -1,10 +1,12 @@ use super::Join as JoinTrait; -use crate::utils::{iter_pin_mut_vec, OutputVec, PollVec, WakerVec}; +use crate::utils::{FutureVec, OutputVec, PollVec, WakerVec}; use core::fmt; use core::future::{Future, IntoFuture}; use core::pin::Pin; use core::task::{Context, Poll}; +use std::mem::ManuallyDrop; +use std::ops::DerefMut; use std::vec::Vec; use pin_project::{pin_project, pinned_drop}; @@ -28,7 +30,7 @@ where wakers: WakerVec, state: PollVec, #[pin] - futures: Vec, + futures: FutureVec, } impl Join @@ -43,7 +45,7 @@ where items: OutputVec::uninit(len), wakers: WakerVec::new(len), state: PollVec::new(len), - futures, + futures: FutureVec::new(futures), } } } @@ -93,7 +95,7 @@ where // Poll all ready futures let futures = this.futures.as_mut(); let states = &mut this.state[..]; - for (i, fut) in iter_pin_mut_vec(futures).enumerate() { + for (i, mut fut) in futures.iter().enumerate() { if states[i].is_pending() && readiness.clear_ready(i) { // unlock readiness so we don't deadlock when polling drop(readiness); @@ -101,10 +103,19 @@ where // Obtain the intermediate waker. let mut cx = Context::from_waker(this.wakers.get(i).unwrap()); - if let Poll::Ready(value) = fut.poll(&mut cx) { + // Poll the future + // SAFETY: the future's state was "pending", so it's safe to poll + if let Poll::Ready(value) = unsafe { + fut.as_mut() + .map_unchecked_mut(|t| t.deref_mut()) + .poll(&mut cx) + } { this.items.write(i, value); states[i].set_ready(); *this.pending -= 1; + // SAFETY: the future state has been changed to "ready" which + // means we'll no longer poll the future, so it's safe to drop + unsafe { ManuallyDrop::drop(fut.get_unchecked_mut()) }; } // Lock readiness so we can use it again @@ -140,7 +151,7 @@ where Fut: Future, { fn drop(self: Pin<&mut Self>) { - let this = self.project(); + let mut this = self.project(); // Drop all initialized values. for i in this.state.ready_indexes() { @@ -148,6 +159,13 @@ where // We can assume they're initialized, and this is where we drop them. unsafe { this.items.drop(i) }; } + + // Drop all pending futures. + for i in this.state.pending_indexes() { + // SAFETY: we've just filtered down to *only* the pending futures, + // which have not yet been dropped. + unsafe { this.futures.as_mut().drop(i) }; + } } } diff --git a/src/utils/futures/mod.rs b/src/utils/futures/mod.rs index 8658e34..5536ffe 100644 --- a/src/utils/futures/mod.rs +++ b/src/utils/futures/mod.rs @@ -1,3 +1,5 @@ mod array; +mod vec; pub(crate) use array::FutureArray; +pub(crate) use vec::FutureVec; diff --git a/src/utils/futures/vec.rs b/src/utils/futures/vec.rs new file mode 100644 index 0000000..cb03e94 --- /dev/null +++ b/src/utils/futures/vec.rs @@ -0,0 +1,44 @@ +use std::{ + mem::{self, ManuallyDrop, MaybeUninit}, + pin::Pin, +}; + +/// An array of futures which can be dropped in-place, intended to be +/// constructed once and then accessed through pin projections. +pub(crate) struct FutureVec { + futures: Vec>, +} + +impl FutureVec { + /// Create a new instance of `FutureVec` + pub(crate) fn new(futures: Vec) -> Self { + // Implementation copied from: https://doc.rust-lang.org/src/core/mem/maybe_uninit.rs.html#1292 + let futures = MaybeUninit::new(futures); + // SAFETY: T and MaybeUninit have the same layout + let futures = unsafe { mem::transmute_copy(&mem::ManuallyDrop::new(futures)) }; + Self { futures } + } + + /// Create an iterator of pinned references. + pub(crate) fn iter(self: Pin<&mut Self>) -> impl Iterator>> { + // SAFETY: `std` _could_ make this unsound if it were to decide Pin's + // invariants aren't required to transmit through slices. Otherwise this has + // the same safety as a normal field pin projection. + unsafe { self.get_unchecked_mut() } + .futures + .iter_mut() + .map(|t| unsafe { Pin::new_unchecked(t) }) + } + + /// Drop a future at the given index. + /// + /// # Safety + /// + /// The future is held in a `ManuallyDrop`, so no double-dropping, etc + pub(crate) unsafe fn drop(mut self: Pin<&mut Self>, idx: usize) { + unsafe { + let futures = self.as_mut().get_unchecked_mut().futures.as_mut_slice(); + ManuallyDrop::drop(&mut futures[idx]); + }; + } +} diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 16d7e1c..ed70f95 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -9,7 +9,7 @@ mod poll_state; mod tuple; mod wakers; -pub(crate) use self::futures::FutureArray; +pub(crate) use self::futures::{FutureArray, FutureVec}; pub(crate) use array::array_assume_init; pub(crate) use indexer::Indexer; pub(crate) use output::{OutputArray, OutputVec};