diff --git a/src/stream/merge/vec.rs b/src/stream/merge/vec.rs index c3e69d8..5cc1086 100644 --- a/src/stream/merge/vec.rs +++ b/src/stream/merge/vec.rs @@ -58,16 +58,13 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); - if !this.wakers.has_parent() { - this.wakers.set_parent(cx.waker()); - } - // Iterate over our streams one-by-one. If a stream yields a value, // we exit early. By default we'll return `Poll::Ready(None)`, but // this changes if we encounter a `Poll::Pending`. let mut index = this.rng.generate(this.streams.len() as u32) as usize; let mut readiness = this.wakers.readiness().lock().unwrap(); + readiness.set_waker(cx.waker()); loop { if !readiness.any_ready() { // Nothing is ready yet diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 60e9d5f..6e5e1cd 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -11,7 +11,7 @@ mod pin; mod poll_state; mod rng; mod tuple; -mod waker; +mod wakers; pub(crate) use fuse::Fuse; pub(crate) use maybe_done::MaybeDone; @@ -19,7 +19,7 @@ pub(crate) use pin::{get_pin_mut, get_pin_mut_from_vec, iter_pin_mut, iter_pin_m pub(crate) use poll_state::PollState; pub(crate) use rng::RandomGenerator; pub(crate) use tuple::{gen_conditions, permutations, tuple_len}; -pub(crate) use waker::{InlineWaker, Readiness, WakerList}; +pub(crate) use wakers::{InlineWaker, Readiness, WakerList}; #[cfg(test)] mod dummy_waker; diff --git a/src/utils/waker.rs b/src/utils/waker.rs deleted file mode 100644 index e18d51f..0000000 --- a/src/utils/waker.rs +++ /dev/null @@ -1,123 +0,0 @@ -use crate::stream::IntoStream; -use crate::utils::{self, Fuse, RandomGenerator}; - -use bitvec::bitvec; -use bitvec::vec::BitVec; -use core::fmt; -use futures_core::Stream; -use std::pin::Pin; -use std::sync::{Arc, Mutex}; -use std::task::{Context, Poll, Wake, Waker}; - -#[derive(Debug)] -pub(crate) struct Readiness { - count: usize, - ready: BitVec, -} - -impl Readiness { - /// Create a new instance of readiness. - pub(crate) fn new(count: usize) -> Self { - Self { - count, - ready: bitvec![true as usize; count], - } - } - - /// Returns the old ready state for this id - pub(crate) fn set_ready(&mut self, id: usize) -> bool { - if !self.ready[id] { - self.count += 1; - self.ready.set(id, true); - - false - } else { - true - } - } - - /// Returns whether the task id was previously ready - pub(crate) fn clear_ready(&mut self, id: usize) -> bool { - if self.ready[id] { - self.count -= 1; - self.ready.set(id, false); - - true - } else { - false - } - } - - pub(crate) fn any_ready(&self) -> bool { - self.count > 0 - } -} - -/// An efficient waker which delegates wake events. -#[derive(Debug, Clone)] -pub(crate) struct InlineWaker { - id: usize, - readiness: Arc>, - parent_waker: Waker, -} - -impl InlineWaker { - pub(crate) fn new(id: usize, readiness: Arc>, parent_waker: Waker) -> Self { - Self { - id, - readiness, - parent_waker, - } - } -} - -impl Wake for InlineWaker { - fn wake(self: std::sync::Arc) { - if !self.readiness.lock().unwrap().set_ready(self.id) { - self.parent_waker.wake_by_ref() - } - } -} - -/// A collection of wakers. -pub(crate) struct WakerList { - wakers: Vec, - has_parent: bool, - readiness: Arc>, - len: usize, -} - -impl WakerList { - pub(crate) fn new(len: usize) -> Self { - Self { - has_parent: false, - wakers: vec![], - readiness: Arc::new(Mutex::new(Readiness::new(len))), - len, - } - } - - pub(crate) fn has_parent(&self) -> bool { - self.has_parent - } - - pub(crate) fn set_parent(&mut self, parent: &Waker) { - self.wakers = (0..self.len) - .map(|i| Arc::new(InlineWaker::new(i, self.readiness.clone(), parent.clone())).into()) - .collect(); - - self.has_parent = true; - } - - pub(crate) fn get(&self, index: usize) -> Option<&Waker> { - debug_assert!( - self.has_parent, - "no parent waker set. Did you forget to call `WakerList::set_parent?" - ); - self.wakers.get(index) - } - - pub(crate) fn readiness(&self) -> &Mutex { - self.readiness.as_ref() - } -} diff --git a/src/utils/wakers/inline_waker.rs b/src/utils/wakers/inline_waker.rs new file mode 100644 index 0000000..70e9c5c --- /dev/null +++ b/src/utils/wakers/inline_waker.rs @@ -0,0 +1,36 @@ +use crate::utils; +use std::sync; +use std::sync::Arc; +use std::sync::Mutex; +use std::task; +use std::task::Wake; +use std::task::Waker; + +use super::Readiness; + +/// An efficient waker which delegates wake events. +#[derive(Debug, Clone)] +pub(crate) struct InlineWaker { + pub(crate) id: usize, + pub(crate) readiness: Arc>, +} + +impl InlineWaker { + /// Create a new instance of `InlineWaker`. + pub(crate) fn new(id: usize, readiness: Arc>) -> Self { + Self { id, readiness } + } +} + +impl Wake for InlineWaker { + fn wake(self: std::sync::Arc) { + let mut readiness = self.readiness.lock().unwrap(); + if !readiness.set_ready(self.id) { + readiness + .parent_waker() + .as_mut() + .expect("`parent_waker` not available from `Readiness`. Did you forget to call `Readiness::set_waker`?") + .wake_by_ref() + } + } +} diff --git a/src/utils/wakers/mod.rs b/src/utils/wakers/mod.rs new file mode 100644 index 0000000..952766d --- /dev/null +++ b/src/utils/wakers/mod.rs @@ -0,0 +1,7 @@ +mod inline_waker; +mod readiness; +mod waker_list; + +pub(crate) use inline_waker::InlineWaker; +pub(crate) use readiness::Readiness; +pub(crate) use waker_list::WakerList; diff --git a/src/utils/wakers/readiness.rs b/src/utils/wakers/readiness.rs new file mode 100644 index 0000000..1cd9a93 --- /dev/null +++ b/src/utils/wakers/readiness.rs @@ -0,0 +1,64 @@ +use bitvec::{bitvec, vec::BitVec}; +use std::task::Waker; + +use crate::utils; + +/// Tracks which wakers are "ready" and should be polled. +#[derive(Debug)] +pub(crate) struct Readiness { + count: usize, + ready: BitVec, + parent_waker: Option, +} + +impl Readiness { + /// Create a new instance of readiness. + pub(crate) fn new(count: usize) -> Self { + Self { + count, + ready: bitvec![true as usize; count], + parent_waker: None, + } + } + + /// Returns the old ready state for this id + pub(crate) fn set_ready(&mut self, id: usize) -> bool { + if !self.ready[id] { + self.count += 1; + self.ready.set(id, true); + + false + } else { + true + } + } + + /// Returns whether the task id was previously ready + pub(crate) fn clear_ready(&mut self, id: usize) -> bool { + if self.ready[id] { + self.count -= 1; + self.ready.set(id, false); + + true + } else { + false + } + } + + /// Returns `true` if any of the wakers are ready. + pub(crate) fn any_ready(&self) -> bool { + self.count > 0 + } + + /// Access the parent waker. + #[inline] + pub(crate) fn parent_waker(&self) -> Option<&Waker> { + self.parent_waker.as_ref() + } + + /// Set the parent `Waker`. This needs to be called at the start of every + /// `poll` function. + pub(crate) fn set_waker(&mut self, parent_waker: &Waker) { + self.parent_waker = Some(parent_waker.clone()); + } +} diff --git a/src/utils/wakers/waker_list.rs b/src/utils/wakers/waker_list.rs new file mode 100644 index 0000000..dc305e5 --- /dev/null +++ b/src/utils/wakers/waker_list.rs @@ -0,0 +1,34 @@ +use std::sync::Arc; +use std::sync::Mutex; +use std::task::Waker; + +use super::{InlineWaker, Readiness}; +use crate::utils; + +/// A collection of wakers which delegate to an in-line waker. +pub(crate) struct WakerList { + wakers: Vec, + readiness: Arc>, +} + +impl WakerList { + /// Create a new instance of `WakerList`. + pub(crate) fn new(len: usize) -> Self { + let readiness = Arc::new(Mutex::new(Readiness::new(len))); + Self { + wakers: (0..len) + .map(|i| Arc::new(InlineWaker::new(i, readiness.clone())).into()) + .collect(), + readiness, + } + } + + pub(crate) fn get(&self, index: usize) -> Option<&Waker> { + self.wakers.get(index) + } + + /// Access the `Readiness`. + pub(crate) fn readiness(&self) -> &Mutex { + self.readiness.as_ref() + } +}