Skip to content

Commit

Permalink
Implement perfect waking for array/vec Join
Browse files Browse the repository at this point in the history
Tries to implement #21 for array and vec Join.
  • Loading branch information
Swatinem committed Nov 30, 2022
1 parent 058da68 commit d925dca
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 11 deletions.
26 changes: 22 additions & 4 deletions src/future/join/array.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::Join as JoinTrait;
use crate::utils::{self, PollArray};
use crate::utils::{self, PollArray, WakerArray};

use core::array;
use core::fmt;
Expand All @@ -26,6 +26,7 @@ where
consumed: bool,
pending: usize,
items: [MaybeUninit<<Fut as Future>::Output>; N],
wakers: WakerArray<N>,
state: PollArray<N>,
#[pin]
futures: [Fut; N],
Expand All @@ -41,6 +42,7 @@ where
consumed: false,
pending: N,
items: array::from_fn(|_| MaybeUninit::uninit()),
wakers: WakerArray::new(),
state: PollArray::new(),
futures,
}
Expand Down Expand Up @@ -85,14 +87,30 @@ where
"Futures must not be polled after completing"
);

// Poll all futures
let mut readiness = this.wakers.readiness().lock().unwrap();
readiness.set_waker(cx.waker());
if !readiness.any_ready() {
// Nothing is ready yet
return Poll::Pending;
}

// Poll all ready futures
for (i, fut) in utils::iter_pin_mut(this.futures.as_mut()).enumerate() {
if this.state[i].is_pending() {
if let Poll::Ready(value) = fut.poll(cx) {
if this.state[i].is_pending() && readiness.clear_ready(i) {
// unlock readiness so we don't deadlock when polling
drop(readiness);

// Obtain the intermediate waker.
let mut cx = Context::from_waker(this.wakers.get(i).unwrap());

if let Poll::Ready(value) = fut.poll(&mut cx) {
this.items[i] = MaybeUninit::new(value);
this.state[i].set_ready();
*this.pending -= 1;
}

// Lock readiness so we can use it again
readiness = this.wakers.readiness().lock().unwrap();
}
}

Expand Down
33 changes: 26 additions & 7 deletions src/future/join/vec.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::Join as JoinTrait;
use crate::utils::{iter_pin_mut_vec, PollVec};
use crate::utils::{iter_pin_mut_vec, PollVec, WakerVec};

use core::fmt;
use core::future::{Future, IntoFuture};
Expand All @@ -26,6 +26,7 @@ where
consumed: bool,
pending: usize,
items: Vec<MaybeUninit<<Fut as Future>::Output>>,
wakers: WakerVec,
state: PollVec,
#[pin]
futures: Vec<Fut>,
Expand All @@ -36,13 +37,15 @@ where
Fut: Future,
{
pub(crate) fn new(futures: Vec<Fut>) -> Self {
let len = futures.len();
Join {
consumed: false,
pending: futures.len(),
pending: len,
items: std::iter::repeat_with(MaybeUninit::uninit)
.take(futures.len())
.take(len)
.collect(),
state: PollVec::new(futures.len()),
wakers: WakerVec::new(len),
state: PollVec::new(len),
futures,
}
}
Expand Down Expand Up @@ -84,16 +87,32 @@ where
"Futures must not be polled after completing"
);

// Poll all futures
let mut readiness = this.wakers.readiness().lock().unwrap();
readiness.set_waker(cx.waker());
if !readiness.any_ready() {
// Nothing is ready yet
return Poll::Pending;
}

// Poll all ready futures
let futures = this.futures.as_mut();
let states = &mut this.state[..];
for (i, fut) in iter_pin_mut_vec(futures).enumerate() {
if states[i].is_pending() {
if let Poll::Ready(value) = fut.poll(cx) {
if states[i].is_pending() && readiness.clear_ready(i) {
// unlock readiness so we don't deadlock when polling
drop(readiness);

// Obtain the intermediate waker.
let mut cx = Context::from_waker(this.wakers.get(i).unwrap());

if let Poll::Ready(value) = fut.poll(&mut cx) {
this.items[i] = MaybeUninit::new(value);
states[i].set_ready();
*this.pending -= 1;
}

// Lock readiness so we can use it again
readiness = this.wakers.readiness().lock().unwrap();
}
}

Expand Down

0 comments on commit d925dca

Please sign in to comment.