Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #155 #157

Merged
merged 9 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,4 @@ futures-lite = "1.12.0"
futures-time = "3.0.0"
lending-stream = "1.0.0"
rand = "0.8.5"
tokio = { version = "1.32.0", features = ["macros", "time", "rt-multi-thread"] }
2 changes: 1 addition & 1 deletion src/future/join/tuple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ macro_rules! impl_join_tuple {
unsafe { ($($F.assume_init(),)+) }
};

this.state.set_all_completed();
this.state.set_all_none();

return Poll::Ready(out);
}
Expand Down
2 changes: 1 addition & 1 deletion src/future/race_ok/tuple/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ macro_rules! impl_race_ok_tuple {
let all_completed = *this.completed == LEN;
if all_completed {
// mark all error states as consumed before we return it
this.errors_states.set_all_completed();
this.errors_states.set_all_none();

let mut errors = array::from_fn(|_| MaybeUninit::uninit());
mem::swap(&mut errors, this.errors);
Expand Down
35 changes: 22 additions & 13 deletions src/future/try_join/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,18 +116,32 @@ where
.map_unchecked_mut(|t| t.deref_mut())
.poll(&mut cx)
} {
this.state[i].set_ready();
*this.pending -= 1;
// SAFETY: the future state has been changed to "ready" which
// means we'll no longer poll the future, so it's safe to drop
unsafe { ManuallyDrop::drop(fut.get_unchecked_mut()) };

// Check the value, short-circuit on error.
match value {
Ok(value) => this.items.write(i, value),
Ok(value) => {
this.items.write(i, value);

// SAFETY: We're marking the state as "ready", which
// means the future has been consumed, and data is
// now available to be consumed. The future will no
// longer be used after this point so it's safe to drop.
this.state[i].set_ready();
unsafe { ManuallyDrop::drop(fut.get_unchecked_mut()) };
}
Err(err) => {
// The future should no longer be polled after we're done here
*this.consumed = true;

// SAFETY: We're about to return the error value
// from the future, and drop the entire future.
// We're marking the future as consumed, and then
// proceeding to drop all other futures and
// initiatlized values in the destructor.
this.state[i].set_none();
unsafe { ManuallyDrop::drop(fut.get_unchecked_mut()) };

return Poll::Ready(Err(err));
}
}
Expand All @@ -142,16 +156,11 @@ where
if *this.pending == 0 {
// Mark all data as "consumed" before we take it
*this.consumed = true;
for state in this.state.iter_mut() {
debug_assert!(
state.is_ready(),
"Future should have reached a `Ready` state"
);
state.set_none();
}

// SAFETY: we've checked with the state that all of our outputs have been
// SAFETY: we check with the state that all of our outputs have been
// filled, which means we're ready to take the data and assume it's initialized.
debug_assert!(this.state.iter().all(|entry| entry.is_ready()));
this.state.set_all_none();
Poll::Ready(Ok(unsafe { this.items.take() }))
} else {
Poll::Pending
Expand Down
28 changes: 21 additions & 7 deletions src/future/try_join/tuple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,35 @@ macro_rules! unsafe_poll {
.map_unchecked_mut(|t| t.deref_mut())
.poll(&mut $cx)
} {
$this.state[$fut_idx].set_ready();
*$this.completed += 1;
// SAFETY: the future state has been changed to "ready" which
// means we'll no longer poll the future, so it's safe to drop
unsafe { ManuallyDrop::drop($futures.$fut_name.as_mut().get_unchecked_mut()) };

// Check the value, short-circuit on error.
match value {
Ok(value) => $this.outputs.$fut_idx.write(value),
Ok(value) => {
$this.outputs.$fut_idx.write(value);

// SAFETY: We're marking the state as "ready", which
// means the future has been consumed, and data is
// now available to be consumed. The future will no
// longer be used after this point so it's safe to drop.
$this.state[$fut_idx].set_ready();
unsafe { ManuallyDrop::drop($futures.$fut_name.as_mut().get_unchecked_mut()) };
}
Err(err) => {
// The future should no longer be polled after we're done here
*$this.consumed = true;

// SAFETY: We're about to return the error value
// from the future, and drop the entire future.
// We're marking the future as consumed, and then
// proceeding to drop all other futures and
// initiatlized values in the destructor.
$this.state[$fut_idx].set_none();
unsafe { ManuallyDrop::drop($futures.$fut_name.as_mut().get_unchecked_mut()) };

return Poll::Ready(Err(err));
}
};
}
}
}
unsafe_poll!(@inner $iteration, $this, $futures, $cx, $($F)* | $($rest)*);
Expand Down Expand Up @@ -248,7 +262,7 @@ macro_rules! impl_try_join_tuple {
unsafe { ($($F.assume_init(),)+) }
};

this.state.set_all_completed();
this.state.set_all_none();
*this.consumed = true;

return Poll::Ready(Ok(out));
Expand Down
24 changes: 19 additions & 5 deletions src/future/try_join/vec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,18 +117,32 @@ where
.map_unchecked_mut(|t| t.deref_mut())
.poll(&mut cx)
} {
this.state[i].set_ready();
*this.pending -= 1;
// SAFETY: the future state has been changed to "ready" which
// means we'll no longer poll the future, so it's safe to drop
unsafe { ManuallyDrop::drop(fut.get_unchecked_mut()) };

// Check the value, short-circuit on error.
match value {
Ok(value) => this.items.write(i, value),
Ok(value) => {
this.items.write(i, value);

// SAFETY: We're marking the state as "ready", which
// means the future has been consumed, and data is
// now available to be consumed. The future will no
// longer be used after this point so it's safe to drop.
this.state[i].set_ready();
unsafe { ManuallyDrop::drop(fut.get_unchecked_mut()) };
}
Err(err) => {
// The future should no longer be polled after we're done here
*this.consumed = true;

// SAFETY: We're about to return the error value
// from the future, and drop the entire future.
// We're marking the future as consumed, and then
// proceeding to drop all other futures and
// initiatlized values in the destructor.
this.state[i].set_none();
unsafe { ManuallyDrop::drop(fut.get_unchecked_mut()) };

return Poll::Ready(Err(err));
}
}
Expand Down
1 change: 1 addition & 0 deletions src/stream/zip/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ unsafe fn array_assume_init<T, const N: usize>(array: [MaybeUninit<T>; N]) -> [T
// * `MaybeUninit` does not drop, so there are no double-frees
// And thus the conversion is safe
let ret = unsafe { (&array as *const _ as *const [T; N]).read() };
#[allow(clippy::forget_non_drop)]
mem::forget(array);
ret
}
1 change: 1 addition & 0 deletions src/utils/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub(crate) unsafe fn array_assume_init<T, const N: usize>(array: [MaybeUninit<T>
let ret = unsafe { (&array as *const _ as *const [T; N]).read() };

// FIXME: required to avoid `~const Destruct` bound
#[allow(clippy::forget_non_drop)]
mem::forget(array);
ret
}
12 changes: 0 additions & 12 deletions src/utils/poll_state/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,6 @@ impl<const N: usize> PollArray<N> {
}
}

/// Mark all items as "completed"
#[inline]
pub(crate) fn set_all_completed(&mut self) {
self.iter_mut().for_each(|state| {
debug_assert!(
state.is_ready(),
"Future should have reached a `Ready` state"
);
state.set_none();
})
}

/// Mark all items as "pending"
#[inline]
pub(crate) fn set_all_pending(&mut self) {
Expand Down
45 changes: 45 additions & 0 deletions tests/regression-155.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
//! Regression test for: https://github.com/yoshuawuyts/futures-concurrency/issues/155
//!
//! We accidentally were marking a value as "ready" in `try_join`on the error
//! path. This meant that when we returned, the destructor assumed a value was
//! initialized when it wasn't, causing it to dereference uninitialized memory.

use futures_concurrency::prelude::*;
use futures_core::Future;
use std::{future::ready, pin::Pin};
use tokio::time::{sleep, Duration};

pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;

async fn process_not_fail() -> Result<Vec<i32>, ()> {
sleep(Duration::from_millis(100)).await;
Ok(vec![ready(1), ready(2)].join().await)
}

async fn process_fail() -> Result<Vec<i32>, ()> {
Err(())
}

#[tokio::test]
async fn array() {
let a: BoxFuture<'static, _> = Box::pin(process_fail());
let b: BoxFuture<'static, _> = Box::pin(process_not_fail());
let res = [a, b].try_join().await;
assert!(res.is_err());
}

#[tokio::test]
async fn vec() {
let a: BoxFuture<'static, _> = Box::pin(process_fail());
let b: BoxFuture<'static, _> = Box::pin(process_not_fail());
let res = vec![a, b].try_join().await;
assert!(res.is_err());
}

#[tokio::test]
async fn tuple() {
let a = process_fail();
let b = process_not_fail();
let res = (a, b).try_join().await;
assert!(res.is_err());
}
70 changes: 0 additions & 70 deletions tests/test.rs

This file was deleted.