Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop futures as soon as they're done for array::join #138

Merged
merged 6 commits into from
Jun 12, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 34 additions & 7 deletions src/future/join/array.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::Join as JoinTrait;
use crate::utils::{self, PollArray, WakerArray};
use crate::utils::{self, array_to_maybe_uninit, PollArray, WakerArray};

use core::array;
use core::fmt;
Expand Down Expand Up @@ -29,7 +29,7 @@ where
wakers: WakerArray<N>,
state: PollArray<N>,
#[pin]
futures: [Fut; N],
futures: [MaybeUninit<Fut>; N],
yoshuawuyts marked this conversation as resolved.
Show resolved Hide resolved
}

impl<Fut, const N: usize> Join<Fut, N>
Expand All @@ -44,7 +44,7 @@ where
items: array::from_fn(|_| MaybeUninit::uninit()),
wakers: WakerArray::new(),
state: PollArray::new(),
futures,
futures: array_to_maybe_uninit(futures),
}
}
}
Expand Down Expand Up @@ -94,20 +94,32 @@ where
}

// Poll all ready futures
for (i, fut) in utils::iter_pin_mut(this.futures.as_mut()).enumerate() {
for (i, mut fut) in utils::iter_pin_mut(this.futures.as_mut()).enumerate() {
if this.state[i].is_pending() && readiness.clear_ready(i) {
// unlock readiness so we don't deadlock when polling
drop(readiness);

// Obtain the intermediate waker.
let mut cx = Context::from_waker(this.wakers.get(i).unwrap());

if let Poll::Ready(value) = fut.poll(&mut cx) {
// Poll the future
// SAFETY: we checked the future state was "pending"
if let Poll::Ready(value) = unsafe {
fut.as_mut()
.map_unchecked_mut(|t| t.assume_init_mut())
.poll(&mut cx)
} {
this.items[i] = MaybeUninit::new(value);
this.state[i].set_ready();
*this.pending -= 1;
}

// If the state was changed from "pending" to "ready", drop the future.
if this.state[i].is_ready() {
yoshuawuyts marked this conversation as resolved.
Show resolved Hide resolved
// SAFETY: we're done with the future, drop in-place
unsafe { fut.get_unchecked_mut().assume_init_drop() };
}

// Lock readiness so we can use it again
readiness = this.wakers.readiness().lock().unwrap();
}
Expand Down Expand Up @@ -145,9 +157,9 @@ where
Fut: Future,
{
fn drop(self: Pin<&mut Self>) {
let this = self.project();
let mut this = self.project();

// Get the indexes of the initialized values.
// Get the indexes of the initialized output values.
let indexes = this
.state
.iter_mut()
Expand All @@ -161,6 +173,21 @@ where
// We can assume they're initialized, and this is where we drop them.
unsafe { this.items[i].assume_init_drop() };
}

// Get the indexes of the pending futures.
let indexes = this
.state
.iter_mut()
.enumerate()
.filter(|(_, state)| state.is_pending())
.map(|(i, _)| i);

// Drop each future at the index.
for i in indexes {
// SAFETY: we've just filtered down to *only* the pending futures,
// which have not yet been dropped.
unsafe { this.futures.as_mut().get_unchecked_mut()[i].assume_init_drop() };
}
}
}

Expand Down
8 changes: 8 additions & 0 deletions src/utils/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,11 @@ pub(crate) unsafe fn array_assume_init<T, const N: usize>(array: [MaybeUninit<T>
mem::forget(array);
ret
}

/// Cast an array of `T` to an array of `MaybeUninit<T>`
pub(crate) fn array_to_maybe_uninit<T, const N: usize>(arr: [T; N]) -> [MaybeUninit<T>; N] {
// Implementation copied from: https://doc.rust-lang.org/src/core/mem/maybe_uninit.rs.html#1292
let arr = MaybeUninit::new(arr);
// SAFETY: T and MaybeUninit<T> have the same layout
unsafe { mem::transmute_copy(&mem::ManuallyDrop::new(arr)) }
}
2 changes: 1 addition & 1 deletion src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ mod poll_state;
mod tuple;
mod wakers;

pub(crate) use array::array_assume_init;
pub(crate) use array::{array_assume_init, array_to_maybe_uninit};
pub(crate) use indexer::Indexer;
pub(crate) use pin::{get_pin_mut, get_pin_mut_from_vec, iter_pin_mut, iter_pin_mut_vec};
pub(crate) use poll_state::MaybeDone;
Expand Down