From d925dca2ce620e1cfa6b09b490192c85bf76f867 Mon Sep 17 00:00:00 2001 From: Arpad Borsos Date: Wed, 30 Nov 2022 11:20:21 +0100 Subject: [PATCH] Implement perfect waking for array/vec Join Tries to implement #21 for array and vec Join. --- src/future/join/array.rs | 26 ++++++++++++++++++++++---- src/future/join/vec.rs | 33 ++++++++++++++++++++++++++------- 2 files changed, 48 insertions(+), 11 deletions(-) diff --git a/src/future/join/array.rs b/src/future/join/array.rs index 50529f5..9a6ec0c 100644 --- a/src/future/join/array.rs +++ b/src/future/join/array.rs @@ -1,5 +1,5 @@ use super::Join as JoinTrait; -use crate::utils::{self, PollArray}; +use crate::utils::{self, PollArray, WakerArray}; use core::array; use core::fmt; @@ -26,6 +26,7 @@ where consumed: bool, pending: usize, items: [MaybeUninit<::Output>; N], + wakers: WakerArray, state: PollArray, #[pin] futures: [Fut; N], @@ -41,6 +42,7 @@ where consumed: false, pending: N, items: array::from_fn(|_| MaybeUninit::uninit()), + wakers: WakerArray::new(), state: PollArray::new(), futures, } @@ -85,14 +87,30 @@ where "Futures must not be polled after completing" ); - // Poll all futures + let mut readiness = this.wakers.readiness().lock().unwrap(); + readiness.set_waker(cx.waker()); + if !readiness.any_ready() { + // Nothing is ready yet + return Poll::Pending; + } + + // Poll all ready futures for (i, fut) in utils::iter_pin_mut(this.futures.as_mut()).enumerate() { - if this.state[i].is_pending() { - if let Poll::Ready(value) = fut.poll(cx) { + if this.state[i].is_pending() && readiness.clear_ready(i) { + // unlock readiness so we don't deadlock when polling + drop(readiness); + + // Obtain the intermediate waker. + let mut cx = Context::from_waker(this.wakers.get(i).unwrap()); + + if let Poll::Ready(value) = fut.poll(&mut cx) { this.items[i] = MaybeUninit::new(value); this.state[i].set_ready(); *this.pending -= 1; } + + // Lock readiness so we can use it again + readiness = this.wakers.readiness().lock().unwrap(); } } diff --git a/src/future/join/vec.rs b/src/future/join/vec.rs index d11a48e..1817de6 100644 --- a/src/future/join/vec.rs +++ b/src/future/join/vec.rs @@ -1,5 +1,5 @@ use super::Join as JoinTrait; -use crate::utils::{iter_pin_mut_vec, PollVec}; +use crate::utils::{iter_pin_mut_vec, PollVec, WakerVec}; use core::fmt; use core::future::{Future, IntoFuture}; @@ -26,6 +26,7 @@ where consumed: bool, pending: usize, items: Vec::Output>>, + wakers: WakerVec, state: PollVec, #[pin] futures: Vec, @@ -36,13 +37,15 @@ where Fut: Future, { pub(crate) fn new(futures: Vec) -> Self { + let len = futures.len(); Join { consumed: false, - pending: futures.len(), + pending: len, items: std::iter::repeat_with(MaybeUninit::uninit) - .take(futures.len()) + .take(len) .collect(), - state: PollVec::new(futures.len()), + wakers: WakerVec::new(len), + state: PollVec::new(len), futures, } } @@ -84,16 +87,32 @@ where "Futures must not be polled after completing" ); - // Poll all futures + let mut readiness = this.wakers.readiness().lock().unwrap(); + readiness.set_waker(cx.waker()); + if !readiness.any_ready() { + // Nothing is ready yet + return Poll::Pending; + } + + // Poll all ready futures let futures = this.futures.as_mut(); let states = &mut this.state[..]; for (i, fut) in iter_pin_mut_vec(futures).enumerate() { - if states[i].is_pending() { - if let Poll::Ready(value) = fut.poll(cx) { + if states[i].is_pending() && readiness.clear_ready(i) { + // unlock readiness so we don't deadlock when polling + drop(readiness); + + // Obtain the intermediate waker. + let mut cx = Context::from_waker(this.wakers.get(i).unwrap()); + + if let Poll::Ready(value) = fut.poll(&mut cx) { this.items[i] = MaybeUninit::new(value); states[i].set_ready(); *this.pending -= 1; } + + // Lock readiness so we can use it again + readiness = this.wakers.readiness().lock().unwrap(); } }