Skip to content

Commit

Permalink
Merge pull request #157 from yoshuawuyts/fix-ub
Browse files Browse the repository at this point in the history
Fix #155
  • Loading branch information
yoshuawuyts authored Sep 22, 2023
2 parents a2a7f6a + 12c8d4a commit 0c0c1bc
Show file tree
Hide file tree
Showing 11 changed files with 112 additions and 109 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,4 @@ futures-lite = "1.12.0"
futures-time = "3.0.0"
lending-stream = "1.0.0"
rand = "0.8.5"
tokio = { version = "1.32.0", features = ["macros", "time", "rt-multi-thread"] }
2 changes: 1 addition & 1 deletion src/future/join/tuple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ macro_rules! impl_join_tuple {
unsafe { ($($F.assume_init(),)+) }
};

this.state.set_all_completed();
this.state.set_all_none();

return Poll::Ready(out);
}
Expand Down
2 changes: 1 addition & 1 deletion src/future/race_ok/tuple/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ macro_rules! impl_race_ok_tuple {
let all_completed = *this.completed == LEN;
if all_completed {
// mark all error states as consumed before we return it
this.errors_states.set_all_completed();
this.errors_states.set_all_none();

let mut errors = array::from_fn(|_| MaybeUninit::uninit());
mem::swap(&mut errors, this.errors);
Expand Down
35 changes: 22 additions & 13 deletions src/future/try_join/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,18 +116,32 @@ where
.map_unchecked_mut(|t| t.deref_mut())
.poll(&mut cx)
} {
this.state[i].set_ready();
*this.pending -= 1;
// SAFETY: the future state has been changed to "ready" which
// means we'll no longer poll the future, so it's safe to drop
unsafe { ManuallyDrop::drop(fut.get_unchecked_mut()) };

// Check the value, short-circuit on error.
match value {
Ok(value) => this.items.write(i, value),
Ok(value) => {
this.items.write(i, value);

// SAFETY: We're marking the state as "ready", which
// means the future has been consumed, and data is
// now available to be consumed. The future will no
// longer be used after this point so it's safe to drop.
this.state[i].set_ready();
unsafe { ManuallyDrop::drop(fut.get_unchecked_mut()) };
}
Err(err) => {
// The future should no longer be polled after we're done here
*this.consumed = true;

// SAFETY: We're about to return the error value
// from the future, and drop the entire future.
// We're marking the future as consumed, and then
// proceeding to drop all other futures and
// initiatlized values in the destructor.
this.state[i].set_none();
unsafe { ManuallyDrop::drop(fut.get_unchecked_mut()) };

return Poll::Ready(Err(err));
}
}
Expand All @@ -142,16 +156,11 @@ where
if *this.pending == 0 {
// Mark all data as "consumed" before we take it
*this.consumed = true;
for state in this.state.iter_mut() {
debug_assert!(
state.is_ready(),
"Future should have reached a `Ready` state"
);
state.set_none();
}

// SAFETY: we've checked with the state that all of our outputs have been
// SAFETY: we check with the state that all of our outputs have been
// filled, which means we're ready to take the data and assume it's initialized.
debug_assert!(this.state.iter().all(|entry| entry.is_ready()));
this.state.set_all_none();
Poll::Ready(Ok(unsafe { this.items.take() }))
} else {
Poll::Pending
Expand Down
28 changes: 21 additions & 7 deletions src/future/try_join/tuple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,35 @@ macro_rules! unsafe_poll {
.map_unchecked_mut(|t| t.deref_mut())
.poll(&mut $cx)
} {
$this.state[$fut_idx].set_ready();
*$this.completed += 1;
// SAFETY: the future state has been changed to "ready" which
// means we'll no longer poll the future, so it's safe to drop
unsafe { ManuallyDrop::drop($futures.$fut_name.as_mut().get_unchecked_mut()) };

// Check the value, short-circuit on error.
match value {
Ok(value) => $this.outputs.$fut_idx.write(value),
Ok(value) => {
$this.outputs.$fut_idx.write(value);

// SAFETY: We're marking the state as "ready", which
// means the future has been consumed, and data is
// now available to be consumed. The future will no
// longer be used after this point so it's safe to drop.
$this.state[$fut_idx].set_ready();
unsafe { ManuallyDrop::drop($futures.$fut_name.as_mut().get_unchecked_mut()) };
}
Err(err) => {
// The future should no longer be polled after we're done here
*$this.consumed = true;

// SAFETY: We're about to return the error value
// from the future, and drop the entire future.
// We're marking the future as consumed, and then
// proceeding to drop all other futures and
// initiatlized values in the destructor.
$this.state[$fut_idx].set_none();
unsafe { ManuallyDrop::drop($futures.$fut_name.as_mut().get_unchecked_mut()) };

return Poll::Ready(Err(err));
}
};
}
}
}
unsafe_poll!(@inner $iteration, $this, $futures, $cx, $($F)* | $($rest)*);
Expand Down Expand Up @@ -248,7 +262,7 @@ macro_rules! impl_try_join_tuple {
unsafe { ($($F.assume_init(),)+) }
};

this.state.set_all_completed();
this.state.set_all_none();
*this.consumed = true;

return Poll::Ready(Ok(out));
Expand Down
24 changes: 19 additions & 5 deletions src/future/try_join/vec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,18 +117,32 @@ where
.map_unchecked_mut(|t| t.deref_mut())
.poll(&mut cx)
} {
this.state[i].set_ready();
*this.pending -= 1;
// SAFETY: the future state has been changed to "ready" which
// means we'll no longer poll the future, so it's safe to drop
unsafe { ManuallyDrop::drop(fut.get_unchecked_mut()) };

// Check the value, short-circuit on error.
match value {
Ok(value) => this.items.write(i, value),
Ok(value) => {
this.items.write(i, value);

// SAFETY: We're marking the state as "ready", which
// means the future has been consumed, and data is
// now available to be consumed. The future will no
// longer be used after this point so it's safe to drop.
this.state[i].set_ready();
unsafe { ManuallyDrop::drop(fut.get_unchecked_mut()) };
}
Err(err) => {
// The future should no longer be polled after we're done here
*this.consumed = true;

// SAFETY: We're about to return the error value
// from the future, and drop the entire future.
// We're marking the future as consumed, and then
// proceeding to drop all other futures and
// initiatlized values in the destructor.
this.state[i].set_none();
unsafe { ManuallyDrop::drop(fut.get_unchecked_mut()) };

return Poll::Ready(Err(err));
}
}
Expand Down
1 change: 1 addition & 0 deletions src/stream/zip/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ unsafe fn array_assume_init<T, const N: usize>(array: [MaybeUninit<T>; N]) -> [T
// * `MaybeUninit` does not drop, so there are no double-frees
// And thus the conversion is safe
let ret = unsafe { (&array as *const _ as *const [T; N]).read() };
#[allow(clippy::forget_non_drop)]
mem::forget(array);
ret
}
1 change: 1 addition & 0 deletions src/utils/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub(crate) unsafe fn array_assume_init<T, const N: usize>(array: [MaybeUninit<T>
let ret = unsafe { (&array as *const _ as *const [T; N]).read() };

// FIXME: required to avoid `~const Destruct` bound
#[allow(clippy::forget_non_drop)]
mem::forget(array);
ret
}
12 changes: 0 additions & 12 deletions src/utils/poll_state/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,6 @@ impl<const N: usize> PollArray<N> {
}
}

/// Mark all items as "completed"
#[inline]
pub(crate) fn set_all_completed(&mut self) {
self.iter_mut().for_each(|state| {
debug_assert!(
state.is_ready(),
"Future should have reached a `Ready` state"
);
state.set_none();
})
}

/// Mark all items as "pending"
#[inline]
pub(crate) fn set_all_pending(&mut self) {
Expand Down
45 changes: 45 additions & 0 deletions tests/regression-155.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
//! Regression test for: https://github.com/yoshuawuyts/futures-concurrency/issues/155
//!
//! We accidentally were marking a value as "ready" in `try_join`on the error
//! path. This meant that when we returned, the destructor assumed a value was
//! initialized when it wasn't, causing it to dereference uninitialized memory.
use futures_concurrency::prelude::*;
use futures_core::Future;
use std::{future::ready, pin::Pin};
use tokio::time::{sleep, Duration};

pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;

async fn process_not_fail() -> Result<Vec<i32>, ()> {
sleep(Duration::from_millis(100)).await;
Ok(vec![ready(1), ready(2)].join().await)
}

async fn process_fail() -> Result<Vec<i32>, ()> {
Err(())
}

#[tokio::test]
async fn array() {
let a: BoxFuture<'static, _> = Box::pin(process_fail());
let b: BoxFuture<'static, _> = Box::pin(process_not_fail());
let res = [a, b].try_join().await;
assert!(res.is_err());
}

#[tokio::test]
async fn vec() {
let a: BoxFuture<'static, _> = Box::pin(process_fail());
let b: BoxFuture<'static, _> = Box::pin(process_not_fail());
let res = vec![a, b].try_join().await;
assert!(res.is_err());
}

#[tokio::test]
async fn tuple() {
let a = process_fail();
let b = process_not_fail();
let res = (a, b).try_join().await;
assert!(res.is_err());
}
70 changes: 0 additions & 70 deletions tests/test.rs

This file was deleted.

0 comments on commit 0c0c1bc

Please sign in to comment.