From c4347d6b7a96be9368a3e57be3877bffdf5203e6 Mon Sep 17 00:00:00 2001 From: Yosh Date: Fri, 22 Sep 2023 13:37:39 +0200 Subject: [PATCH 1/9] add tokio test --- Cargo.toml | 1 + tests/test.rs | 92 +++++++++++++++------------------------------------ 2 files changed, 28 insertions(+), 65 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 306e52e..b5339f3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,3 +42,4 @@ futures-lite = "1.12.0" futures-time = "3.0.0" lending-stream = "1.0.0" rand = "0.8.5" +tokio = { version = "1.32.0", features = ["macros", "time", "rt-multi-thread"] } diff --git a/tests/test.rs b/tests/test.rs index 968a9eb..fbdf5ea 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -1,70 +1,32 @@ -// use futures_concurrency::prelude::*; -// use futures_core::Stream; -// use futures_lite::future::block_on; -// use futures_lite::prelude::*; -// use pin_project::pin_project; +use futures_concurrency::future::{Join, TryJoin}; +use std::future::ready; +use tokio::time::{sleep, Duration}; -// use std::cell::RefCell; -// use std::pin::Pin; -// use std::rc::Rc; -// use std::task::{Context, Poll}; +async fn process(iter: impl IntoIterator, fail: bool) -> Result, Vec<()>> { + if fail { + return Err(vec![]); + } else { + sleep(Duration::from_secs(5)).await; + } -// pub(crate) fn merge_test(max: usize) { -// block_on(async { -// let counter = Rc::new(RefCell::new(max)); -// let futures: Vec<_> = (1..=max) -// .rev() -// .map(|n| Countdown::new(n, counter.clone())) -// .collect(); -// let mut s = futures.merge(); + Ok(iter + .into_iter() + .map(|i| ready(*i)) + .collect::>() + .join() + .await) +} -// let mut counter = 0; -// while let Some(_) = s.next().await { -// counter += 1; -// } -// assert_eq!(counter, max); -// }) -// } +#[tokio::test] +async fn test() -> Result<(), Vec<()>> { + let v = (0..10).collect::>(); -// /// A future which will _eventually_ be ready, but needs to be polled N times before it is. -// #[pin_project] -// struct Countdown { -// success_count: Rc>, -// target_count: usize, -// done: bool, -// } + ( + process(v.iter().take(5), true), + process(v.iter().take(0), false), + ) + .try_join() + .await?; -// impl Countdown { -// fn new(count: usize, success_count: Rc>) -> Self { -// Self { -// success_count, -// target_count: count, -// done: false, -// } -// } -// } -// impl Stream for Countdown { -// type Item = (); - -// fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { -// let this = self.project(); -// if *this.done { -// Poll::Ready(None) -// } else if *this.success_count.borrow() == *this.target_count { -// *this.success_count.borrow_mut() -= 1; -// *this.done = true; -// Poll::Ready(Some(())) -// } else { -// Poll::Pending -// } -// } -// } - -// #[cfg(test)] -// mod test { -// use super::*; -// #[test] -// fn smoke() { -// merge_test(4); -// } -// } + Ok(()) +} From e35ee847b9964a2b9e8175ed3b58c81c60a1b87c Mon Sep 17 00:00:00 2001 From: Yosh Date: Fri, 22 Sep 2023 13:38:14 +0200 Subject: [PATCH 2/9] fix ub for try_join --- src/future/try_join/tuple.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/future/try_join/tuple.rs b/src/future/try_join/tuple.rs index 2a8e941..cde96b0 100644 --- a/src/future/try_join/tuple.rs +++ b/src/future/try_join/tuple.rs @@ -33,7 +33,7 @@ macro_rules! unsafe_poll { .map_unchecked_mut(|t| t.deref_mut()) .poll(&mut $cx) } { - $this.state[$fut_idx].set_ready(); + $this.state[$fut_idx].set_none(); *$this.completed += 1; // SAFETY: the future state has been changed to "ready" which // means we'll no longer poll the future, so it's safe to drop From e38d05f541fc7fb1ee763072aae2e01db4dbb09f Mon Sep 17 00:00:00 2001 From: Yosh Date: Fri, 22 Sep 2023 14:53:53 +0200 Subject: [PATCH 3/9] set all none --- src/future/join/tuple.rs | 2 +- src/future/race_ok/tuple/mod.rs | 2 +- src/future/try_join/tuple.rs | 4 ++-- src/utils/poll_state/array.rs | 12 ------------ 4 files changed, 4 insertions(+), 16 deletions(-) diff --git a/src/future/join/tuple.rs b/src/future/join/tuple.rs index 52e3001..28db118 100644 --- a/src/future/join/tuple.rs +++ b/src/future/join/tuple.rs @@ -230,7 +230,7 @@ macro_rules! impl_join_tuple { unsafe { ($($F.assume_init(),)+) } }; - this.state.set_all_completed(); + this.state.set_all_none(); return Poll::Ready(out); } diff --git a/src/future/race_ok/tuple/mod.rs b/src/future/race_ok/tuple/mod.rs index e7f47a6..0b3bb04 100644 --- a/src/future/race_ok/tuple/mod.rs +++ b/src/future/race_ok/tuple/mod.rs @@ -122,7 +122,7 @@ macro_rules! impl_race_ok_tuple { let all_completed = *this.completed == LEN; if all_completed { // mark all error states as consumed before we return it - this.errors_states.set_all_completed(); + this.errors_states.set_all_none(); let mut errors = array::from_fn(|_| MaybeUninit::uninit()); mem::swap(&mut errors, this.errors); diff --git a/src/future/try_join/tuple.rs b/src/future/try_join/tuple.rs index cde96b0..f04b8e0 100644 --- a/src/future/try_join/tuple.rs +++ b/src/future/try_join/tuple.rs @@ -33,7 +33,7 @@ macro_rules! unsafe_poll { .map_unchecked_mut(|t| t.deref_mut()) .poll(&mut $cx) } { - $this.state[$fut_idx].set_none(); + $this.state[$fut_idx].set_ready(); *$this.completed += 1; // SAFETY: the future state has been changed to "ready" which // means we'll no longer poll the future, so it's safe to drop @@ -248,7 +248,7 @@ macro_rules! impl_try_join_tuple { unsafe { ($($F.assume_init(),)+) } }; - this.state.set_all_completed(); + this.state.set_all_none(); *this.consumed = true; return Poll::Ready(Ok(out)); diff --git a/src/utils/poll_state/array.rs b/src/utils/poll_state/array.rs index eee7bca..61d0af6 100644 --- a/src/utils/poll_state/array.rs +++ b/src/utils/poll_state/array.rs @@ -22,18 +22,6 @@ impl PollArray { } } - /// Mark all items as "completed" - #[inline] - pub(crate) fn set_all_completed(&mut self) { - self.iter_mut().for_each(|state| { - debug_assert!( - state.is_ready(), - "Future should have reached a `Ready` state" - ); - state.set_none(); - }) - } - /// Mark all items as "pending" #[inline] pub(crate) fn set_all_pending(&mut self) { From 3402826743c49d148f087cab10c2442692c899c9 Mon Sep 17 00:00:00 2001 From: Yosh Date: Fri, 22 Sep 2023 15:39:53 +0200 Subject: [PATCH 4/9] fix UB in array::try_join --- src/future/try_join/array.rs | 35 ++++++++++++++++++++++------------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/src/future/try_join/array.rs b/src/future/try_join/array.rs index 3aee1e2..06995fd 100644 --- a/src/future/try_join/array.rs +++ b/src/future/try_join/array.rs @@ -116,18 +116,32 @@ where .map_unchecked_mut(|t| t.deref_mut()) .poll(&mut cx) } { - this.state[i].set_ready(); *this.pending -= 1; - // SAFETY: the future state has been changed to "ready" which - // means we'll no longer poll the future, so it's safe to drop - unsafe { ManuallyDrop::drop(fut.get_unchecked_mut()) }; // Check the value, short-circuit on error. match value { - Ok(value) => this.items.write(i, value), + Ok(value) => { + this.items.write(i, value); + + // SAFETY: We're marking the state as "ready", which + // means the future has been consumed, and data is + // now available to be consumed. The future will no + // longer be used after this point so it's safe to drop. + this.state[i].set_ready(); + unsafe { ManuallyDrop::drop(fut.get_unchecked_mut()) }; + } Err(err) => { // The future should no longer be polled after we're done here *this.consumed = true; + + // SAFETY: We're about to return the error value + // from the future, and drop the entire future. + // We're marking the future as consumed, and then + // proceeding to drop all other futures and + // initiatlized values in the destructor. + this.state[i].set_none(); + unsafe { ManuallyDrop::drop(fut.get_unchecked_mut()) }; + return Poll::Ready(Err(err)); } } @@ -142,16 +156,11 @@ where if *this.pending == 0 { // Mark all data as "consumed" before we take it *this.consumed = true; - for state in this.state.iter_mut() { - debug_assert!( - state.is_ready(), - "Future should have reached a `Ready` state" - ); - state.set_none(); - } - // SAFETY: we've checked with the state that all of our outputs have been + // SAFETY: we check with the state that all of our outputs have been // filled, which means we're ready to take the data and assume it's initialized. + debug_assert!(this.state.iter().all(|entry| entry.is_ready())); + this.state.set_all_none(); Poll::Ready(Ok(unsafe { this.items.take() })) } else { Poll::Pending From 7922cd9035c9f8fba3e69693a64daf3369a06d5b Mon Sep 17 00:00:00 2001 From: Yosh Date: Fri, 22 Sep 2023 15:43:02 +0200 Subject: [PATCH 5/9] Add all try_join tests for tokio --- tests/test.rs | 32 -------------------------------- tests/try_join_tokio.rs | 39 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 32 deletions(-) delete mode 100644 tests/test.rs create mode 100644 tests/try_join_tokio.rs diff --git a/tests/test.rs b/tests/test.rs deleted file mode 100644 index fbdf5ea..0000000 --- a/tests/test.rs +++ /dev/null @@ -1,32 +0,0 @@ -use futures_concurrency::future::{Join, TryJoin}; -use std::future::ready; -use tokio::time::{sleep, Duration}; - -async fn process(iter: impl IntoIterator, fail: bool) -> Result, Vec<()>> { - if fail { - return Err(vec![]); - } else { - sleep(Duration::from_secs(5)).await; - } - - Ok(iter - .into_iter() - .map(|i| ready(*i)) - .collect::>() - .join() - .await) -} - -#[tokio::test] -async fn test() -> Result<(), Vec<()>> { - let v = (0..10).collect::>(); - - ( - process(v.iter().take(5), true), - process(v.iter().take(0), false), - ) - .try_join() - .await?; - - Ok(()) -} diff --git a/tests/try_join_tokio.rs b/tests/try_join_tokio.rs new file mode 100644 index 0000000..06471a7 --- /dev/null +++ b/tests/try_join_tokio.rs @@ -0,0 +1,39 @@ +use futures_concurrency::prelude::*; +use futures_core::Future; +use std::{future::ready, pin::Pin}; +use tokio::time::{sleep, Duration}; + +pub type BoxFuture<'a, T> = Pin + Send + 'a>>; + +async fn process_not_fail() -> Result, ()> { + sleep(Duration::from_millis(100)).await; + Ok(vec![ready(1), ready(2)].join().await) +} + +async fn process_fail() -> Result, ()> { + Err(()) +} + +#[tokio::test] +async fn array() { + let a: BoxFuture<'static, _> = Box::pin(process_fail()); + let b: BoxFuture<'static, _> = Box::pin(process_not_fail()); + let res = [a, b].try_join().await; + assert!(res.is_err()); +} + +#[tokio::test] +async fn vec() { + let a: BoxFuture<'static, _> = Box::pin(process_fail()); + let b: BoxFuture<'static, _> = Box::pin(process_not_fail()); + let res = vec![a, b].try_join().await; + assert!(res.is_err()); +} + +#[tokio::test] +async fn tuple() { + let a = process_fail(); + let b = process_not_fail(); + let res = (a, b).try_join().await; + assert!(res.is_err()); +} From ae044d74ae87de47f0d29ca8c24e65a663245113 Mon Sep 17 00:00:00 2001 From: Yosh Date: Fri, 22 Sep 2023 15:44:33 +0200 Subject: [PATCH 6/9] fix try-join for vec --- src/future/try_join/vec.rs | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/src/future/try_join/vec.rs b/src/future/try_join/vec.rs index 203a7ce..7d52a83 100644 --- a/src/future/try_join/vec.rs +++ b/src/future/try_join/vec.rs @@ -117,18 +117,32 @@ where .map_unchecked_mut(|t| t.deref_mut()) .poll(&mut cx) } { - this.state[i].set_ready(); *this.pending -= 1; - // SAFETY: the future state has been changed to "ready" which - // means we'll no longer poll the future, so it's safe to drop - unsafe { ManuallyDrop::drop(fut.get_unchecked_mut()) }; // Check the value, short-circuit on error. match value { - Ok(value) => this.items.write(i, value), + Ok(value) => { + this.items.write(i, value); + + // SAFETY: We're marking the state as "ready", which + // means the future has been consumed, and data is + // now available to be consumed. The future will no + // longer be used after this point so it's safe to drop. + this.state[i].set_ready(); + unsafe { ManuallyDrop::drop(fut.get_unchecked_mut()) }; + } Err(err) => { // The future should no longer be polled after we're done here *this.consumed = true; + + // SAFETY: We're about to return the error value + // from the future, and drop the entire future. + // We're marking the future as consumed, and then + // proceeding to drop all other futures and + // initiatlized values in the destructor. + this.state[i].set_none(); + unsafe { ManuallyDrop::drop(fut.get_unchecked_mut()) }; + return Poll::Ready(Err(err)); } } From d6d3831f6ed22bb2e8e3dd152b5755a804b6f26d Mon Sep 17 00:00:00 2001 From: Yosh Date: Fri, 22 Sep 2023 15:53:04 +0200 Subject: [PATCH 7/9] Fix TryJoin for tuple --- src/future/try_join/tuple.rs | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/src/future/try_join/tuple.rs b/src/future/try_join/tuple.rs index f04b8e0..2990594 100644 --- a/src/future/try_join/tuple.rs +++ b/src/future/try_join/tuple.rs @@ -33,21 +33,35 @@ macro_rules! unsafe_poll { .map_unchecked_mut(|t| t.deref_mut()) .poll(&mut $cx) } { - $this.state[$fut_idx].set_ready(); *$this.completed += 1; - // SAFETY: the future state has been changed to "ready" which - // means we'll no longer poll the future, so it's safe to drop - unsafe { ManuallyDrop::drop($futures.$fut_name.as_mut().get_unchecked_mut()) }; // Check the value, short-circuit on error. match value { - Ok(value) => $this.outputs.$fut_idx.write(value), + Ok(value) => { + $this.outputs.$fut_idx.write(value); + + // SAFETY: We're marking the state as "ready", which + // means the future has been consumed, and data is + // now available to be consumed. The future will no + // longer be used after this point so it's safe to drop. + $this.state[$fut_idx].set_ready(); + unsafe { ManuallyDrop::drop($futures.$fut_name.as_mut().get_unchecked_mut()) }; + } Err(err) => { // The future should no longer be polled after we're done here *$this.consumed = true; + + // SAFETY: We're about to return the error value + // from the future, and drop the entire future. + // We're marking the future as consumed, and then + // proceeding to drop all other futures and + // initiatlized values in the destructor. + $this.state[$fut_idx].set_none(); + unsafe { ManuallyDrop::drop($futures.$fut_name.as_mut().get_unchecked_mut()) }; + return Poll::Ready(Err(err)); } - }; + } } } unsafe_poll!(@inner $iteration, $this, $futures, $cx, $($F)* | $($rest)*); From 0e1589aeacd7e9d8314f95ec07d4119605025c5a Mon Sep 17 00:00:00 2001 From: Yosh Date: Fri, 22 Sep 2023 15:53:46 +0200 Subject: [PATCH 8/9] Add an explainer to the try_join regression test --- tests/{try_join_tokio.rs => regression-155.rs} | 6 ++++++ 1 file changed, 6 insertions(+) rename tests/{try_join_tokio.rs => regression-155.rs} (76%) diff --git a/tests/try_join_tokio.rs b/tests/regression-155.rs similarity index 76% rename from tests/try_join_tokio.rs rename to tests/regression-155.rs index 06471a7..8b25d7f 100644 --- a/tests/try_join_tokio.rs +++ b/tests/regression-155.rs @@ -1,3 +1,9 @@ +//! Regression test for: https://github.com/yoshuawuyts/futures-concurrency/issues/155 +//! +//! We accidentally were marking a value as "ready" in `try_join`on the error +//! path. This meant that when we returned, the destructor assumed a value was +//! initialized when it wasn't, causing it to dereference uninitialized memory. + use futures_concurrency::prelude::*; use futures_core::Future; use std::{future::ready, pin::Pin}; From 12c8d4aa0bb8346bb905cd05ce94c230d6166f96 Mon Sep 17 00:00:00 2001 From: Yosh Date: Fri, 22 Sep 2023 16:12:11 +0200 Subject: [PATCH 9/9] fix clippy warnings --- src/stream/zip/array.rs | 1 + src/utils/array.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/src/stream/zip/array.rs b/src/stream/zip/array.rs index fa69b01..a760ff4 100644 --- a/src/stream/zip/array.rs +++ b/src/stream/zip/array.rs @@ -186,6 +186,7 @@ unsafe fn array_assume_init(array: [MaybeUninit; N]) -> [T // * `MaybeUninit` does not drop, so there are no double-frees // And thus the conversion is safe let ret = unsafe { (&array as *const _ as *const [T; N]).read() }; + #[allow(clippy::forget_non_drop)] mem::forget(array); ret } diff --git a/src/utils/array.rs b/src/utils/array.rs index 8eeaef5..5ced861 100644 --- a/src/utils/array.rs +++ b/src/utils/array.rs @@ -17,6 +17,7 @@ pub(crate) unsafe fn array_assume_init(array: [MaybeUninit let ret = unsafe { (&array as *const _ as *const [T; N]).read() }; // FIXME: required to avoid `~const Destruct` bound + #[allow(clippy::forget_non_drop)] mem::forget(array); ret }