Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove MaybeDone from impl Join for Vec #29

Merged
merged 2 commits into from
Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions src/future/into_future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,8 @@ impl<Fut: Future> IntoFuture for Vec<Fut> {
type IntoFuture = crate::future::join::vec::Join<Fut>;

fn into_future(self) -> Self::IntoFuture {
let elems = self
.into_iter()
.map(|fut| MaybeDone::new(core::future::IntoFuture::into_future(fut)))
.collect::<Box<_>>()
.into();
crate::future::join::vec::Join::new(elems)
use crate::future::join::vec::Join;
Join::new(self.into_iter().collect())
}
}

Expand Down
138 changes: 103 additions & 35 deletions src/future/join/vec.rs
Original file line number Diff line number Diff line change
@@ -1,50 +1,58 @@
use super::Join as JoinTrait;
use crate::utils::iter_pin_mut;
use crate::utils::MaybeDone;
use crate::utils::{iter_pin_mut_vec, Metadata};

use core::fmt;
use core::future::{Future, IntoFuture};
use core::mem;
use core::pin::Pin;
use core::task::{Context, Poll};
use std::boxed::Box;
use std::mem::{self, MaybeUninit};
use std::vec::Vec;

impl<Fut> JoinTrait for Vec<Fut>
where
Fut: IntoFuture,
{
type Output = Vec<Fut::Output>;
type Future = Join<Fut::IntoFuture>;

fn join(self) -> Self::Future {
let elems = self
.into_iter()
.map(|fut| MaybeDone::new(fut.into_future()))
.collect::<Box<_>>()
.into();
Join::new(elems)
}
}
use pin_project::{pin_project, pinned_drop};

/// Waits for two similarly-typed futures to complete.
///
/// Awaits multiple futures simultaneously, returning the output of the
/// futures once both complete.
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[pin_project(PinnedDrop)]
pub struct Join<Fut>
where
Fut: Future,
{
elems: Pin<Box<[MaybeDone<Fut>]>>,
#[pin]
futures: Vec<Fut>,
items: Vec<MaybeUninit<<Fut as Future>::Output>>,
metadata: Vec<Metadata>,
}

impl<Fut> Join<Fut>
where
Fut: Future,
{
pub(crate) fn new(elems: Pin<Box<[MaybeDone<Fut>]>>) -> Self {
Self { elems }
pub(crate) fn new(futures: Vec<Fut>) -> Self {
Join {
items: std::iter::repeat_with(|| MaybeUninit::uninit())
.take(futures.len())
.collect(),
metadata: std::iter::successors(Some(0), |prev| Some(prev + 1))
.take(futures.len())
.map(Metadata::new)
.collect(),
yoshuawuyts marked this conversation as resolved.
Show resolved Hide resolved
futures,
}
}
}

impl<Fut> JoinTrait for Vec<Fut>
where
Fut: IntoFuture,
{
type Output = Vec<Fut::Output>;
type Future = Join<Fut::IntoFuture>;

fn join(self) -> Self::Future {
Join::new(self.into_iter().map(IntoFuture::into_future).collect())
}
}

Expand All @@ -54,7 +62,8 @@ where
Fut::Output: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Join").field("elems", &self.elems).finish()
// TODO: fix debug output
f.debug_struct("Join").finish()
}
}

Expand All @@ -64,23 +73,82 @@ where
{
type Output = Vec<Fut::Output>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut all_done = true;
// SAFETY: see https://github.com/rust-lang/rust/issues/104108,
// projecting through slices is fine now, but it's not yet guaranteed to
// work. We need to guarantee structural pinning works as expected for it to
// be provably sound.
yoshuawuyts marked this conversation as resolved.
Show resolved Hide resolved
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();

// Poll all futures
let futures = this.futures.as_mut();
for (i, fut) in iter_pin_mut_vec(futures).enumerate() {
if this.metadata[i].is_done() {
continue;
}

for elem in iter_pin_mut(self.elems.as_mut()) {
if elem.poll(cx).is_pending() {
all_done = false;
if let Poll::Ready(value) = fut.poll(cx) {
this.items[i] = MaybeUninit::new(value);
this.metadata[i].set_done();
}
}

if all_done {
let mut elems = mem::replace(&mut self.elems, Box::pin([]));
let result = iter_pin_mut(elems.as_mut())
.map(|e| e.take().unwrap())
.collect();
Poll::Ready(result)
// Check whether we're all done now or need to keep going.
if this.metadata.iter().all(|meta| meta.is_done()) {
yoshuawuyts marked this conversation as resolved.
Show resolved Hide resolved
// Mark all data as "taken" before we actually take it.
this.metadata.iter_mut().for_each(|meta| meta.set_taken());

// SAFETY: we've checked with the metadata that all of our outputs have been
// filled, which means we're ready to take the data and assume it's initialized.
let items = unsafe {
let items = mem::take(this.items);
mem::transmute::<_, Vec<Fut::Output>>(items)
yoshuawuyts marked this conversation as resolved.
Show resolved Hide resolved
};
Poll::Ready(items)
} else {
Poll::Pending
}
}
}

/// Drop the already initialized values on cancellation.
#[pinned_drop]
impl<Fut> PinnedDrop for Join<Fut>
where
Fut: Future,
{
fn drop(self: Pin<&mut Self>) {
let this = self.project();

// Get the indexes of the initialized values.
let indexes = this
.metadata
.iter_mut()
.filter(|meta| meta.is_done())
.map(|meta| meta.index());

// Drop each value at the index.
for i in indexes {
// SAFETY: we've just filtered down to *only* the initialized values.
// We can assume they're initialized, and this is where we drop them.
unsafe { this.items[i].assume_init_drop() };
}
}
}

#[cfg(test)]
mod test {
use super::*;
use std::future;

// NOTE: we should probably poll in random order.
#[test]
fn no_fairness() {
futures_lite::future::block_on(async {
let res = vec![future::ready("hello"), future::ready("world")]
.join()
.await;
assert_eq!(res, vec!["hello", "world"]);
});
}
}
105 changes: 105 additions & 0 deletions src/utils/metadata.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/// Enumerate the current poll state.
#[derive(Debug, Clone, Copy)]
pub(crate) enum PollState {
/// Actively polling the underlying future.
Active,
/// Data has been written to the output structure
/// and the future should no longer be polled.
Written,
/// Data has been taken from the output structure,
/// and we no longer need to reason about it.
Taken,
}

impl PollState {
/// Returns `true` if the poll state is [`Active`].
///
/// [`Active`]: PollState::Active
#[must_use]
fn is_active(&self) -> bool {
matches!(self, Self::Active)
}

/// Returns `true` if the poll state is [`Done`].
yoshuawuyts marked this conversation as resolved.
Show resolved Hide resolved
///
/// [`Done`]: PollState::Done
#[must_use]
fn is_done(&self) -> bool {
matches!(self, Self::Written)
}

/// Returns `true` if the poll state is [`Taken`].
///
/// [`Taken`]: PollState::Taken
#[must_use]
pub(crate) fn is_taken(&self) -> bool {
matches!(self, Self::Taken)
}
}

#[derive(Debug)]
pub(crate) struct Metadata {
index: usize,
yoshuawuyts marked this conversation as resolved.
Show resolved Hide resolved
poll_state: PollState,
}

impl Metadata {
/// Create a new instance of `Metadata`, positioned at a certain index.
pub(crate) fn new(index: usize) -> Self {
Self {
index,
poll_state: PollState::Active,
}
}

/// Get the index of the metadata.
pub(crate) fn index(&self) -> usize {
self.index
}

/// Get the current poll state.
pub(crate) fn poll_state(&self) -> PollState {
self.poll_state
}

/// Set the current poll state.
pub(crate) fn set_poll_state(&mut self, poll_state: PollState) {
self.poll_state = poll_state;
}

/// Set the current poll state to `Active`.
pub(crate) fn set_active(&mut self) {
self.poll_state = PollState::Active;
}

/// Set the current poll state to `Done`.
pub(crate) fn set_done(&mut self) {
self.poll_state = PollState::Written;
}

/// Set the current poll state to `Taken`.
pub(crate) fn set_taken(&mut self) {
self.poll_state = PollState::Taken;
}

/// Returns `true` if the poll state is [`Active`].
///
/// [`Active`]: PollState::Active
pub(crate) fn is_active(&self) -> bool {
self.poll_state.is_active()
}

/// Returns `true` if the poll state is [`Done`].
///
/// [`Done`]: PollState::Done
pub(crate) fn is_done(&self) -> bool {
self.poll_state.is_done()
}

/// Returns `true` if the poll state is [`Taken`].
///
/// [`Taken`]: PollState::Taken
pub(crate) fn is_taken(&self) -> bool {
self.poll_state.is_taken()
}
}
4 changes: 3 additions & 1 deletion src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@

mod fuse;
mod maybe_done;
mod metadata;
mod pin;
mod rng;

pub(crate) use fuse::Fuse;
pub(crate) use maybe_done::MaybeDone;
pub(crate) use pin::{get_pin_mut, get_pin_mut_from_vec, iter_pin_mut};
pub(crate) use metadata::{Metadata, PollState};
pub(crate) use pin::{get_pin_mut, get_pin_mut_from_vec, iter_pin_mut, iter_pin_mut_vec};
pub(crate) use rng::random;
19 changes: 9 additions & 10 deletions src/utils/pin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,15 @@ pub(crate) fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<
.map(|t| unsafe { Pin::new_unchecked(t) })
}

// From: Yosh made this one up, hehehe
// #[cfg(feature = "unstable")]
// pub(crate) fn pin_project_array<T, const N: usize>(slice: Pin<&mut [T; N]>) -> [Pin<&mut T>; N] {
// // SAFETY: `std` _could_ make this unsound if it were to decide Pin's
// // invariants aren't required to transmit through arrays. Otherwise this has
// // the same safety as a normal field pin projection.
// unsafe { slice.get_unchecked_mut() }
// .each_mut()
// .map(|t| unsafe { Pin::new_unchecked(t) })
// }
// From: `futures_rs::join_all!` -- https://github.com/rust-lang/futures-rs/blob/b48eb2e9a9485ef7388edc2f177094a27e08e28b/futures-util/src/future/join_all.rs#L18-L23
pub(crate) fn iter_pin_mut_vec<T>(slice: Pin<&mut Vec<T>>) -> impl Iterator<Item = Pin<&mut T>> {
yoshuawuyts marked this conversation as resolved.
Show resolved Hide resolved
// SAFETY: `std` _could_ make this unsound if it were to decide Pin's
// invariants aren't required to transmit through slices. Otherwise this has
// the same safety as a normal field pin projection.
unsafe { slice.get_unchecked_mut() }
.iter_mut()
.map(|t| unsafe { Pin::new_unchecked(t) })
}

/// Returns a pinned mutable reference to an element or subslice depending on the
/// type of index (see `get`) or `None` if the index is out of bounds.
Expand Down