diff --git a/Cargo.toml b/Cargo.toml index 306e52e..b5339f3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,3 +42,4 @@ futures-lite = "1.12.0" futures-time = "3.0.0" lending-stream = "1.0.0" rand = "0.8.5" +tokio = { version = "1.32.0", features = ["macros", "time", "rt-multi-thread"] } diff --git a/src/future/join/tuple.rs b/src/future/join/tuple.rs index 52e3001..28db118 100644 --- a/src/future/join/tuple.rs +++ b/src/future/join/tuple.rs @@ -230,7 +230,7 @@ macro_rules! impl_join_tuple { unsafe { ($($F.assume_init(),)+) } }; - this.state.set_all_completed(); + this.state.set_all_none(); return Poll::Ready(out); } diff --git a/src/future/race_ok/tuple/mod.rs b/src/future/race_ok/tuple/mod.rs index e7f47a6..0b3bb04 100644 --- a/src/future/race_ok/tuple/mod.rs +++ b/src/future/race_ok/tuple/mod.rs @@ -122,7 +122,7 @@ macro_rules! impl_race_ok_tuple { let all_completed = *this.completed == LEN; if all_completed { // mark all error states as consumed before we return it - this.errors_states.set_all_completed(); + this.errors_states.set_all_none(); let mut errors = array::from_fn(|_| MaybeUninit::uninit()); mem::swap(&mut errors, this.errors); diff --git a/src/future/try_join/array.rs b/src/future/try_join/array.rs index 3aee1e2..06995fd 100644 --- a/src/future/try_join/array.rs +++ b/src/future/try_join/array.rs @@ -116,18 +116,32 @@ where .map_unchecked_mut(|t| t.deref_mut()) .poll(&mut cx) } { - this.state[i].set_ready(); *this.pending -= 1; - // SAFETY: the future state has been changed to "ready" which - // means we'll no longer poll the future, so it's safe to drop - unsafe { ManuallyDrop::drop(fut.get_unchecked_mut()) }; // Check the value, short-circuit on error. match value { - Ok(value) => this.items.write(i, value), + Ok(value) => { + this.items.write(i, value); + + // SAFETY: We're marking the state as "ready", which + // means the future has been consumed, and data is + // now available to be consumed. The future will no + // longer be used after this point so it's safe to drop. + this.state[i].set_ready(); + unsafe { ManuallyDrop::drop(fut.get_unchecked_mut()) }; + } Err(err) => { // The future should no longer be polled after we're done here *this.consumed = true; + + // SAFETY: We're about to return the error value + // from the future, and drop the entire future. + // We're marking the future as consumed, and then + // proceeding to drop all other futures and + // initiatlized values in the destructor. + this.state[i].set_none(); + unsafe { ManuallyDrop::drop(fut.get_unchecked_mut()) }; + return Poll::Ready(Err(err)); } } @@ -142,16 +156,11 @@ where if *this.pending == 0 { // Mark all data as "consumed" before we take it *this.consumed = true; - for state in this.state.iter_mut() { - debug_assert!( - state.is_ready(), - "Future should have reached a `Ready` state" - ); - state.set_none(); - } - // SAFETY: we've checked with the state that all of our outputs have been + // SAFETY: we check with the state that all of our outputs have been // filled, which means we're ready to take the data and assume it's initialized. + debug_assert!(this.state.iter().all(|entry| entry.is_ready())); + this.state.set_all_none(); Poll::Ready(Ok(unsafe { this.items.take() })) } else { Poll::Pending diff --git a/src/future/try_join/tuple.rs b/src/future/try_join/tuple.rs index 2a8e941..2990594 100644 --- a/src/future/try_join/tuple.rs +++ b/src/future/try_join/tuple.rs @@ -33,21 +33,35 @@ macro_rules! unsafe_poll { .map_unchecked_mut(|t| t.deref_mut()) .poll(&mut $cx) } { - $this.state[$fut_idx].set_ready(); *$this.completed += 1; - // SAFETY: the future state has been changed to "ready" which - // means we'll no longer poll the future, so it's safe to drop - unsafe { ManuallyDrop::drop($futures.$fut_name.as_mut().get_unchecked_mut()) }; // Check the value, short-circuit on error. match value { - Ok(value) => $this.outputs.$fut_idx.write(value), + Ok(value) => { + $this.outputs.$fut_idx.write(value); + + // SAFETY: We're marking the state as "ready", which + // means the future has been consumed, and data is + // now available to be consumed. The future will no + // longer be used after this point so it's safe to drop. + $this.state[$fut_idx].set_ready(); + unsafe { ManuallyDrop::drop($futures.$fut_name.as_mut().get_unchecked_mut()) }; + } Err(err) => { // The future should no longer be polled after we're done here *$this.consumed = true; + + // SAFETY: We're about to return the error value + // from the future, and drop the entire future. + // We're marking the future as consumed, and then + // proceeding to drop all other futures and + // initiatlized values in the destructor. + $this.state[$fut_idx].set_none(); + unsafe { ManuallyDrop::drop($futures.$fut_name.as_mut().get_unchecked_mut()) }; + return Poll::Ready(Err(err)); } - }; + } } } unsafe_poll!(@inner $iteration, $this, $futures, $cx, $($F)* | $($rest)*); @@ -248,7 +262,7 @@ macro_rules! impl_try_join_tuple { unsafe { ($($F.assume_init(),)+) } }; - this.state.set_all_completed(); + this.state.set_all_none(); *this.consumed = true; return Poll::Ready(Ok(out)); diff --git a/src/future/try_join/vec.rs b/src/future/try_join/vec.rs index 203a7ce..7d52a83 100644 --- a/src/future/try_join/vec.rs +++ b/src/future/try_join/vec.rs @@ -117,18 +117,32 @@ where .map_unchecked_mut(|t| t.deref_mut()) .poll(&mut cx) } { - this.state[i].set_ready(); *this.pending -= 1; - // SAFETY: the future state has been changed to "ready" which - // means we'll no longer poll the future, so it's safe to drop - unsafe { ManuallyDrop::drop(fut.get_unchecked_mut()) }; // Check the value, short-circuit on error. match value { - Ok(value) => this.items.write(i, value), + Ok(value) => { + this.items.write(i, value); + + // SAFETY: We're marking the state as "ready", which + // means the future has been consumed, and data is + // now available to be consumed. The future will no + // longer be used after this point so it's safe to drop. + this.state[i].set_ready(); + unsafe { ManuallyDrop::drop(fut.get_unchecked_mut()) }; + } Err(err) => { // The future should no longer be polled after we're done here *this.consumed = true; + + // SAFETY: We're about to return the error value + // from the future, and drop the entire future. + // We're marking the future as consumed, and then + // proceeding to drop all other futures and + // initiatlized values in the destructor. + this.state[i].set_none(); + unsafe { ManuallyDrop::drop(fut.get_unchecked_mut()) }; + return Poll::Ready(Err(err)); } } diff --git a/src/stream/zip/array.rs b/src/stream/zip/array.rs index fa69b01..a760ff4 100644 --- a/src/stream/zip/array.rs +++ b/src/stream/zip/array.rs @@ -186,6 +186,7 @@ unsafe fn array_assume_init(array: [MaybeUninit; N]) -> [T // * `MaybeUninit` does not drop, so there are no double-frees // And thus the conversion is safe let ret = unsafe { (&array as *const _ as *const [T; N]).read() }; + #[allow(clippy::forget_non_drop)] mem::forget(array); ret } diff --git a/src/utils/array.rs b/src/utils/array.rs index 8eeaef5..5ced861 100644 --- a/src/utils/array.rs +++ b/src/utils/array.rs @@ -17,6 +17,7 @@ pub(crate) unsafe fn array_assume_init(array: [MaybeUninit let ret = unsafe { (&array as *const _ as *const [T; N]).read() }; // FIXME: required to avoid `~const Destruct` bound + #[allow(clippy::forget_non_drop)] mem::forget(array); ret } diff --git a/src/utils/poll_state/array.rs b/src/utils/poll_state/array.rs index eee7bca..61d0af6 100644 --- a/src/utils/poll_state/array.rs +++ b/src/utils/poll_state/array.rs @@ -22,18 +22,6 @@ impl PollArray { } } - /// Mark all items as "completed" - #[inline] - pub(crate) fn set_all_completed(&mut self) { - self.iter_mut().for_each(|state| { - debug_assert!( - state.is_ready(), - "Future should have reached a `Ready` state" - ); - state.set_none(); - }) - } - /// Mark all items as "pending" #[inline] pub(crate) fn set_all_pending(&mut self) { diff --git a/tests/regression-155.rs b/tests/regression-155.rs new file mode 100644 index 0000000..8b25d7f --- /dev/null +++ b/tests/regression-155.rs @@ -0,0 +1,45 @@ +//! Regression test for: https://github.com/yoshuawuyts/futures-concurrency/issues/155 +//! +//! We accidentally were marking a value as "ready" in `try_join`on the error +//! path. This meant that when we returned, the destructor assumed a value was +//! initialized when it wasn't, causing it to dereference uninitialized memory. + +use futures_concurrency::prelude::*; +use futures_core::Future; +use std::{future::ready, pin::Pin}; +use tokio::time::{sleep, Duration}; + +pub type BoxFuture<'a, T> = Pin + Send + 'a>>; + +async fn process_not_fail() -> Result, ()> { + sleep(Duration::from_millis(100)).await; + Ok(vec![ready(1), ready(2)].join().await) +} + +async fn process_fail() -> Result, ()> { + Err(()) +} + +#[tokio::test] +async fn array() { + let a: BoxFuture<'static, _> = Box::pin(process_fail()); + let b: BoxFuture<'static, _> = Box::pin(process_not_fail()); + let res = [a, b].try_join().await; + assert!(res.is_err()); +} + +#[tokio::test] +async fn vec() { + let a: BoxFuture<'static, _> = Box::pin(process_fail()); + let b: BoxFuture<'static, _> = Box::pin(process_not_fail()); + let res = vec![a, b].try_join().await; + assert!(res.is_err()); +} + +#[tokio::test] +async fn tuple() { + let a = process_fail(); + let b = process_not_fail(); + let res = (a, b).try_join().await; + assert!(res.is_err()); +} diff --git a/tests/test.rs b/tests/test.rs deleted file mode 100644 index 968a9eb..0000000 --- a/tests/test.rs +++ /dev/null @@ -1,70 +0,0 @@ -// use futures_concurrency::prelude::*; -// use futures_core::Stream; -// use futures_lite::future::block_on; -// use futures_lite::prelude::*; -// use pin_project::pin_project; - -// use std::cell::RefCell; -// use std::pin::Pin; -// use std::rc::Rc; -// use std::task::{Context, Poll}; - -// pub(crate) fn merge_test(max: usize) { -// block_on(async { -// let counter = Rc::new(RefCell::new(max)); -// let futures: Vec<_> = (1..=max) -// .rev() -// .map(|n| Countdown::new(n, counter.clone())) -// .collect(); -// let mut s = futures.merge(); - -// let mut counter = 0; -// while let Some(_) = s.next().await { -// counter += 1; -// } -// assert_eq!(counter, max); -// }) -// } - -// /// A future which will _eventually_ be ready, but needs to be polled N times before it is. -// #[pin_project] -// struct Countdown { -// success_count: Rc>, -// target_count: usize, -// done: bool, -// } - -// impl Countdown { -// fn new(count: usize, success_count: Rc>) -> Self { -// Self { -// success_count, -// target_count: count, -// done: false, -// } -// } -// } -// impl Stream for Countdown { -// type Item = (); - -// fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { -// let this = self.project(); -// if *this.done { -// Poll::Ready(None) -// } else if *this.success_count.borrow() == *this.target_count { -// *this.success_count.borrow_mut() -= 1; -// *this.done = true; -// Poll::Ready(Some(())) -// } else { -// Poll::Pending -// } -// } -// } - -// #[cfg(test)] -// mod test { -// use super::*; -// #[test] -// fn smoke() { -// merge_test(4); -// } -// }