diff --git a/src/stream/merge/vec.rs b/src/stream/merge/vec.rs index 5e18d94..5cc1086 100644 --- a/src/stream/merge/vec.rs +++ b/src/stream/merge/vec.rs @@ -1,11 +1,10 @@ use super::Merge as MergeTrait; use crate::stream::IntoStream; -use crate::utils::{self, Fuse, RandomGenerator, Readiness, StreamWaker}; +use crate::utils::{self, Fuse, RandomGenerator, WakerList}; use core::fmt; use futures_core::Stream; use std::pin::Pin; -use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; /// A stream that merges multiple streams into a single stream. @@ -23,9 +22,8 @@ where #[pin] streams: Vec>, rng: RandomGenerator, - readiness: Arc>, complete: usize, - wakers: Vec, + wakers: WakerList, } impl Merge @@ -33,14 +31,8 @@ where S: Stream, { pub(crate) fn new(streams: Vec) -> Self { - let readiness = Arc::new(Mutex::new(Readiness::new(streams.len()))); - let wakers = (0..streams.len()) - .map(|i| StreamWaker::new(i, readiness.clone())) - .collect(); - Self { - wakers, - readiness, + wakers: WakerList::new(streams.len()), streams: streams.into_iter().map(Fuse::new).collect(), rng: RandomGenerator::new(), complete: 0, @@ -71,7 +63,8 @@ where // this changes if we encounter a `Poll::Pending`. let mut index = this.rng.generate(this.streams.len() as u32) as usize; - let mut readiness = this.readiness.lock().unwrap(); + let mut readiness = this.wakers.readiness().lock().unwrap(); + readiness.set_waker(cx.waker()); loop { if !readiness.any_ready() { // Nothing is ready yet @@ -87,17 +80,14 @@ where // unlock readiness so we don't deadlock when polling drop(readiness); - // Construct an intermediate waker. - let mut waker = this.wakers[index].clone(); - waker.set_parent_waker(cx.waker().clone()); - let waker = Arc::new(waker).into(); - let mut cx = Context::from_waker(&waker); + // Obtain the intermediate waker. + let mut cx = Context::from_waker(this.wakers.get(index).unwrap()); let stream = utils::get_pin_mut_from_vec(this.streams.as_mut(), index).unwrap(); match stream.poll_next(&mut cx) { Poll::Ready(Some(item)) => { // Mark ourselves as ready again because we need to poll for the next item. - this.readiness.lock().unwrap().set_ready(index); + this.wakers.readiness().lock().unwrap().set_ready(index); return Poll::Ready(Some(item)); } Poll::Ready(None) => { @@ -110,7 +100,7 @@ where } // Lock readiness so we can use it again - readiness = this.readiness.lock().unwrap(); + readiness = this.wakers.readiness().lock().unwrap(); } } } diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 6cccb0b..6e5e1cd 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -11,7 +11,7 @@ mod pin; mod poll_state; mod rng; mod tuple; -mod waker; +mod wakers; pub(crate) use fuse::Fuse; pub(crate) use maybe_done::MaybeDone; @@ -19,7 +19,7 @@ pub(crate) use pin::{get_pin_mut, get_pin_mut_from_vec, iter_pin_mut, iter_pin_m pub(crate) use poll_state::PollState; pub(crate) use rng::RandomGenerator; pub(crate) use tuple::{gen_conditions, permutations, tuple_len}; -pub(crate) use waker::{Readiness, StreamWaker}; +pub(crate) use wakers::{InlineWaker, Readiness, WakerList}; #[cfg(test)] mod dummy_waker; diff --git a/src/utils/wakers/inline_waker.rs b/src/utils/wakers/inline_waker.rs new file mode 100644 index 0000000..70e9c5c --- /dev/null +++ b/src/utils/wakers/inline_waker.rs @@ -0,0 +1,36 @@ +use crate::utils; +use std::sync; +use std::sync::Arc; +use std::sync::Mutex; +use std::task; +use std::task::Wake; +use std::task::Waker; + +use super::Readiness; + +/// An efficient waker which delegates wake events. +#[derive(Debug, Clone)] +pub(crate) struct InlineWaker { + pub(crate) id: usize, + pub(crate) readiness: Arc>, +} + +impl InlineWaker { + /// Create a new instance of `InlineWaker`. + pub(crate) fn new(id: usize, readiness: Arc>) -> Self { + Self { id, readiness } + } +} + +impl Wake for InlineWaker { + fn wake(self: std::sync::Arc) { + let mut readiness = self.readiness.lock().unwrap(); + if !readiness.set_ready(self.id) { + readiness + .parent_waker() + .as_mut() + .expect("`parent_waker` not available from `Readiness`. Did you forget to call `Readiness::set_waker`?") + .wake_by_ref() + } + } +} diff --git a/src/utils/wakers/mod.rs b/src/utils/wakers/mod.rs new file mode 100644 index 0000000..952766d --- /dev/null +++ b/src/utils/wakers/mod.rs @@ -0,0 +1,7 @@ +mod inline_waker; +mod readiness; +mod waker_list; + +pub(crate) use inline_waker::InlineWaker; +pub(crate) use readiness::Readiness; +pub(crate) use waker_list::WakerList; diff --git a/src/utils/waker.rs b/src/utils/wakers/readiness.rs similarity index 52% rename from src/utils/waker.rs rename to src/utils/wakers/readiness.rs index 3c84395..1cd9a93 100644 --- a/src/utils/waker.rs +++ b/src/utils/wakers/readiness.rs @@ -1,18 +1,14 @@ -use crate::stream::IntoStream; -use crate::utils::{self, Fuse, RandomGenerator}; +use bitvec::{bitvec, vec::BitVec}; +use std::task::Waker; -use bitvec::bitvec; -use bitvec::vec::BitVec; -use core::fmt; -use futures_core::Stream; -use std::pin::Pin; -use std::sync::{Arc, Mutex}; -use std::task::{Context, Poll, Wake, Waker}; +use crate::utils; +/// Tracks which wakers are "ready" and should be polled. #[derive(Debug)] pub(crate) struct Readiness { count: usize, ready: BitVec, + parent_waker: Option, } impl Readiness { @@ -21,6 +17,7 @@ impl Readiness { Self { count, ready: bitvec![true as usize; count], + parent_waker: None, } } @@ -48,37 +45,20 @@ impl Readiness { } } + /// Returns `true` if any of the wakers are ready. pub(crate) fn any_ready(&self) -> bool { self.count > 0 } -} -#[derive(Debug, Clone)] -pub(crate) struct StreamWaker { - id: usize, - readiness: Arc>, - parent_waker: Option, -} - -impl StreamWaker { - pub(crate) fn new(id: usize, readiness: Arc>) -> Self { - Self { - id, - readiness, - parent_waker: None, - } - } - - pub(crate) fn set_parent_waker(&mut self, parent: Waker) { - self.parent_waker = Some(parent); + /// Access the parent waker. + #[inline] + pub(crate) fn parent_waker(&self) -> Option<&Waker> { + self.parent_waker.as_ref() } -} -impl Wake for StreamWaker { - fn wake(self: std::sync::Arc) { - if !self.readiness.lock().unwrap().set_ready(self.id) { - let parent = self.parent_waker.as_ref().expect("No parent waker was set"); - parent.wake_by_ref() - } + /// Set the parent `Waker`. This needs to be called at the start of every + /// `poll` function. + pub(crate) fn set_waker(&mut self, parent_waker: &Waker) { + self.parent_waker = Some(parent_waker.clone()); } } diff --git a/src/utils/wakers/waker_list.rs b/src/utils/wakers/waker_list.rs new file mode 100644 index 0000000..dc305e5 --- /dev/null +++ b/src/utils/wakers/waker_list.rs @@ -0,0 +1,34 @@ +use std::sync::Arc; +use std::sync::Mutex; +use std::task::Waker; + +use super::{InlineWaker, Readiness}; +use crate::utils; + +/// A collection of wakers which delegate to an in-line waker. +pub(crate) struct WakerList { + wakers: Vec, + readiness: Arc>, +} + +impl WakerList { + /// Create a new instance of `WakerList`. + pub(crate) fn new(len: usize) -> Self { + let readiness = Arc::new(Mutex::new(Readiness::new(len))); + Self { + wakers: (0..len) + .map(|i| Arc::new(InlineWaker::new(i, readiness.clone())).into()) + .collect(), + readiness, + } + } + + pub(crate) fn get(&self, index: usize) -> Option<&Waker> { + self.wakers.get(index) + } + + /// Access the `Readiness`. + pub(crate) fn readiness(&self) -> &Mutex { + self.readiness.as_ref() + } +}