Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop futures as soon as they're done for vec::join #141

Merged
merged 1 commit into from
Jun 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions src/future/join/vec.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use super::Join as JoinTrait;
use crate::utils::{iter_pin_mut_vec, OutputVec, PollVec, WakerVec};
use crate::utils::{FutureVec, OutputVec, PollVec, WakerVec};

use core::fmt;
use core::future::{Future, IntoFuture};
use core::pin::Pin;
use core::task::{Context, Poll};
use std::mem::ManuallyDrop;
use std::ops::DerefMut;
use std::vec::Vec;

use pin_project::{pin_project, pinned_drop};
Expand All @@ -28,7 +30,7 @@ where
wakers: WakerVec,
state: PollVec,
#[pin]
futures: Vec<Fut>,
futures: FutureVec<Fut>,
}

impl<Fut> Join<Fut>
Expand All @@ -43,7 +45,7 @@ where
items: OutputVec::uninit(len),
wakers: WakerVec::new(len),
state: PollVec::new(len),
futures,
futures: FutureVec::new(futures),
}
}
}
Expand Down Expand Up @@ -93,18 +95,27 @@ where
// Poll all ready futures
let futures = this.futures.as_mut();
let states = &mut this.state[..];
for (i, fut) in iter_pin_mut_vec(futures).enumerate() {
for (i, mut fut) in futures.iter().enumerate() {
if states[i].is_pending() && readiness.clear_ready(i) {
// unlock readiness so we don't deadlock when polling
drop(readiness);

// Obtain the intermediate waker.
let mut cx = Context::from_waker(this.wakers.get(i).unwrap());

if let Poll::Ready(value) = fut.poll(&mut cx) {
// Poll the future
// SAFETY: the future's state was "pending", so it's safe to poll
if let Poll::Ready(value) = unsafe {
fut.as_mut()
.map_unchecked_mut(|t| t.deref_mut())
.poll(&mut cx)
} {
this.items.write(i, value);
states[i].set_ready();
*this.pending -= 1;
// SAFETY: the future state has been changed to "ready" which
// means we'll no longer poll the future, so it's safe to drop
unsafe { ManuallyDrop::drop(fut.get_unchecked_mut()) };
}

// Lock readiness so we can use it again
Expand Down Expand Up @@ -140,14 +151,21 @@ where
Fut: Future,
{
fn drop(self: Pin<&mut Self>) {
let this = self.project();
let mut this = self.project();

// Drop all initialized values.
for i in this.state.ready_indexes() {
// SAFETY: we've just filtered down to *only* the initialized values.
// We can assume they're initialized, and this is where we drop them.
unsafe { this.items.drop(i) };
}

// Drop all pending futures.
for i in this.state.pending_indexes() {
// SAFETY: we've just filtered down to *only* the pending futures,
// which have not yet been dropped.
unsafe { this.futures.as_mut().drop(i) };
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/utils/futures/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
mod array;
mod vec;

pub(crate) use array::FutureArray;
pub(crate) use vec::FutureVec;
44 changes: 44 additions & 0 deletions src/utils/futures/vec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use std::{
mem::{self, ManuallyDrop, MaybeUninit},
pin::Pin,
};

/// An array of futures which can be dropped in-place, intended to be
/// constructed once and then accessed through pin projections.
pub(crate) struct FutureVec<T> {
futures: Vec<ManuallyDrop<T>>,
}

impl<T> FutureVec<T> {
/// Create a new instance of `FutureVec`
pub(crate) fn new(futures: Vec<T>) -> Self {
// Implementation copied from: https://doc.rust-lang.org/src/core/mem/maybe_uninit.rs.html#1292
let futures = MaybeUninit::new(futures);
// SAFETY: T and MaybeUninit<T> have the same layout
let futures = unsafe { mem::transmute_copy(&mem::ManuallyDrop::new(futures)) };
Self { futures }
}

/// Create an iterator of pinned references.
pub(crate) fn iter(self: Pin<&mut Self>) -> impl Iterator<Item = Pin<&mut ManuallyDrop<T>>> {
// SAFETY: `std` _could_ make this unsound if it were to decide Pin's
// invariants aren't required to transmit through slices. Otherwise this has
// the same safety as a normal field pin projection.
unsafe { self.get_unchecked_mut() }
.futures
.iter_mut()
.map(|t| unsafe { Pin::new_unchecked(t) })
}

/// Drop a future at the given index.
///
/// # Safety
///
/// The future is held in a `ManuallyDrop`, so no double-dropping, etc
pub(crate) unsafe fn drop(mut self: Pin<&mut Self>, idx: usize) {
unsafe {
let futures = self.as_mut().get_unchecked_mut().futures.as_mut_slice();
ManuallyDrop::drop(&mut futures[idx]);
};
}
}
2 changes: 1 addition & 1 deletion src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ mod poll_state;
mod tuple;
mod wakers;

pub(crate) use self::futures::FutureArray;
pub(crate) use self::futures::{FutureArray, FutureVec};
pub(crate) use array::array_assume_init;
pub(crate) use indexer::Indexer;
pub(crate) use output::{OutputArray, OutputVec};
Expand Down