Skip to content

Commit

Permalink
vec::merge: set waker on each call
Browse files Browse the repository at this point in the history
  • Loading branch information
yoshuawuyts committed Nov 14, 2022
1 parent 25286bd commit 45784eb
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 129 deletions.
5 changes: 1 addition & 4 deletions src/stream/merge/vec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,13 @@ where
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();

if !this.wakers.has_parent() {
this.wakers.set_parent(cx.waker());
}

// Iterate over our streams one-by-one. If a stream yields a value,
// we exit early. By default we'll return `Poll::Ready(None)`, but
// this changes if we encounter a `Poll::Pending`.
let mut index = this.rng.generate(this.streams.len() as u32) as usize;

let mut readiness = this.wakers.readiness().lock().unwrap();
readiness.set_waker(cx.waker());
loop {
if !readiness.any_ready() {
// Nothing is ready yet
Expand Down
4 changes: 2 additions & 2 deletions src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ mod pin;
mod poll_state;
mod rng;
mod tuple;
mod waker;
mod wakers;

pub(crate) use fuse::Fuse;
pub(crate) use maybe_done::MaybeDone;
pub(crate) use pin::{get_pin_mut, get_pin_mut_from_vec, iter_pin_mut, iter_pin_mut_vec};
pub(crate) use poll_state::PollState;
pub(crate) use rng::RandomGenerator;
pub(crate) use tuple::{gen_conditions, permutations, tuple_len};
pub(crate) use waker::{InlineWaker, Readiness, WakerList};
pub(crate) use wakers::{InlineWaker, Readiness, WakerList};

#[cfg(test)]
mod dummy_waker;
Expand Down
123 changes: 0 additions & 123 deletions src/utils/waker.rs

This file was deleted.

36 changes: 36 additions & 0 deletions src/utils/wakers/inline_waker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use crate::utils;
use std::sync;
use std::sync::Arc;
use std::sync::Mutex;
use std::task;
use std::task::Wake;
use std::task::Waker;

use super::Readiness;

/// An efficient waker which delegates wake events.
#[derive(Debug, Clone)]
pub(crate) struct InlineWaker {
pub(crate) id: usize,
pub(crate) readiness: Arc<Mutex<Readiness>>,
}

impl InlineWaker {
/// Create a new instance of `InlineWaker`.
pub(crate) fn new(id: usize, readiness: Arc<Mutex<Readiness>>) -> Self {
Self { id, readiness }
}
}

impl Wake for InlineWaker {
fn wake(self: std::sync::Arc<Self>) {
let mut readiness = self.readiness.lock().unwrap();
if !readiness.set_ready(self.id) {
readiness
.parent_waker()
.as_mut()
.expect("`parent_waker` not available from `Readiness`. Did you forget to call `Readiness::set_waker`?")
.wake_by_ref()
}
}
}
7 changes: 7 additions & 0 deletions src/utils/wakers/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
mod inline_waker;
mod readiness;
mod waker_list;

pub(crate) use inline_waker::InlineWaker;
pub(crate) use readiness::Readiness;
pub(crate) use waker_list::WakerList;
64 changes: 64 additions & 0 deletions src/utils/wakers/readiness.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use bitvec::{bitvec, vec::BitVec};
use std::task::Waker;

use crate::utils;

/// Tracks which wakers are "ready" and should be polled.
#[derive(Debug)]
pub(crate) struct Readiness {
count: usize,
ready: BitVec,
parent_waker: Option<Waker>,
}

impl Readiness {
/// Create a new instance of readiness.
pub(crate) fn new(count: usize) -> Self {
Self {
count,
ready: bitvec![true as usize; count],
parent_waker: None,
}
}

/// Returns the old ready state for this id
pub(crate) fn set_ready(&mut self, id: usize) -> bool {
if !self.ready[id] {
self.count += 1;
self.ready.set(id, true);

false
} else {
true
}
}

/// Returns whether the task id was previously ready
pub(crate) fn clear_ready(&mut self, id: usize) -> bool {
if self.ready[id] {
self.count -= 1;
self.ready.set(id, false);

true
} else {
false
}
}

/// Returns `true` if any of the wakers are ready.
pub(crate) fn any_ready(&self) -> bool {
self.count > 0
}

/// Access the parent waker.
#[inline]
pub(crate) fn parent_waker(&self) -> Option<&Waker> {
self.parent_waker.as_ref()
}

/// Set the parent `Waker`. This needs to be called at the start of every
/// `poll` function.
pub(crate) fn set_waker(&mut self, parent_waker: &Waker) {
self.parent_waker = Some(parent_waker.clone());
}
}
34 changes: 34 additions & 0 deletions src/utils/wakers/waker_list.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use std::sync::Arc;
use std::sync::Mutex;
use std::task::Waker;

use super::{InlineWaker, Readiness};
use crate::utils;

/// A collection of wakers which delegate to an in-line waker.
pub(crate) struct WakerList {
wakers: Vec<Waker>,
readiness: Arc<Mutex<Readiness>>,
}

impl WakerList {
/// Create a new instance of `WakerList`.
pub(crate) fn new(len: usize) -> Self {
let readiness = Arc::new(Mutex::new(Readiness::new(len)));
Self {
wakers: (0..len)
.map(|i| Arc::new(InlineWaker::new(i, readiness.clone())).into())
.collect(),
readiness,
}
}

pub(crate) fn get(&self, index: usize) -> Option<&Waker> {
self.wakers.get(index)
}

/// Access the `Readiness`.
pub(crate) fn readiness(&self) -> &Mutex<Readiness> {
self.readiness.as_ref()
}
}

0 comments on commit 45784eb

Please sign in to comment.