From 25286bd15a1ee1f560cdbc867e774cc8d76c66db Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts <2467194+yoshuawuyts@users.noreply.github.com> Date: Fri, 11 Nov 2022 19:04:55 +0100 Subject: [PATCH] remove allocations from vec::merge --- src/stream/merge/vec.rs | 31 ++++++++------------ src/utils/mod.rs | 2 +- src/utils/waker.rs | 63 +++++++++++++++++++++++++++++++++-------- 3 files changed, 64 insertions(+), 32 deletions(-) diff --git a/src/stream/merge/vec.rs b/src/stream/merge/vec.rs index 5e18d94..c3e69d8 100644 --- a/src/stream/merge/vec.rs +++ b/src/stream/merge/vec.rs @@ -1,11 +1,10 @@ use super::Merge as MergeTrait; use crate::stream::IntoStream; -use crate::utils::{self, Fuse, RandomGenerator, Readiness, StreamWaker}; +use crate::utils::{self, Fuse, RandomGenerator, WakerList}; use core::fmt; use futures_core::Stream; use std::pin::Pin; -use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; /// A stream that merges multiple streams into a single stream. @@ -23,9 +22,8 @@ where #[pin] streams: Vec>, rng: RandomGenerator, - readiness: Arc>, complete: usize, - wakers: Vec, + wakers: WakerList, } impl Merge @@ -33,14 +31,8 @@ where S: Stream, { pub(crate) fn new(streams: Vec) -> Self { - let readiness = Arc::new(Mutex::new(Readiness::new(streams.len()))); - let wakers = (0..streams.len()) - .map(|i| StreamWaker::new(i, readiness.clone())) - .collect(); - Self { - wakers, - readiness, + wakers: WakerList::new(streams.len()), streams: streams.into_iter().map(Fuse::new).collect(), rng: RandomGenerator::new(), complete: 0, @@ -66,12 +58,16 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); + if !this.wakers.has_parent() { + this.wakers.set_parent(cx.waker()); + } + // Iterate over our streams one-by-one. If a stream yields a value, // we exit early. By default we'll return `Poll::Ready(None)`, but // this changes if we encounter a `Poll::Pending`. let mut index = this.rng.generate(this.streams.len() as u32) as usize; - let mut readiness = this.readiness.lock().unwrap(); + let mut readiness = this.wakers.readiness().lock().unwrap(); loop { if !readiness.any_ready() { // Nothing is ready yet @@ -87,17 +83,14 @@ where // unlock readiness so we don't deadlock when polling drop(readiness); - // Construct an intermediate waker. - let mut waker = this.wakers[index].clone(); - waker.set_parent_waker(cx.waker().clone()); - let waker = Arc::new(waker).into(); - let mut cx = Context::from_waker(&waker); + // Obtain the intermediate waker. + let mut cx = Context::from_waker(this.wakers.get(index).unwrap()); let stream = utils::get_pin_mut_from_vec(this.streams.as_mut(), index).unwrap(); match stream.poll_next(&mut cx) { Poll::Ready(Some(item)) => { // Mark ourselves as ready again because we need to poll for the next item. - this.readiness.lock().unwrap().set_ready(index); + this.wakers.readiness().lock().unwrap().set_ready(index); return Poll::Ready(Some(item)); } Poll::Ready(None) => { @@ -110,7 +103,7 @@ where } // Lock readiness so we can use it again - readiness = this.readiness.lock().unwrap(); + readiness = this.wakers.readiness().lock().unwrap(); } } } diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 6cccb0b..60e9d5f 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -19,7 +19,7 @@ pub(crate) use pin::{get_pin_mut, get_pin_mut_from_vec, iter_pin_mut, iter_pin_m pub(crate) use poll_state::PollState; pub(crate) use rng::RandomGenerator; pub(crate) use tuple::{gen_conditions, permutations, tuple_len}; -pub(crate) use waker::{Readiness, StreamWaker}; +pub(crate) use waker::{InlineWaker, Readiness, WakerList}; #[cfg(test)] mod dummy_waker; diff --git a/src/utils/waker.rs b/src/utils/waker.rs index 3c84395..e18d51f 100644 --- a/src/utils/waker.rs +++ b/src/utils/waker.rs @@ -53,32 +53,71 @@ impl Readiness { } } +/// An efficient waker which delegates wake events. #[derive(Debug, Clone)] -pub(crate) struct StreamWaker { +pub(crate) struct InlineWaker { id: usize, readiness: Arc>, - parent_waker: Option, + parent_waker: Waker, } -impl StreamWaker { - pub(crate) fn new(id: usize, readiness: Arc>) -> Self { +impl InlineWaker { + pub(crate) fn new(id: usize, readiness: Arc>, parent_waker: Waker) -> Self { Self { id, readiness, - parent_waker: None, + parent_waker, } } - - pub(crate) fn set_parent_waker(&mut self, parent: Waker) { - self.parent_waker = Some(parent); - } } -impl Wake for StreamWaker { +impl Wake for InlineWaker { fn wake(self: std::sync::Arc) { if !self.readiness.lock().unwrap().set_ready(self.id) { - let parent = self.parent_waker.as_ref().expect("No parent waker was set"); - parent.wake_by_ref() + self.parent_waker.wake_by_ref() + } + } +} + +/// A collection of wakers. +pub(crate) struct WakerList { + wakers: Vec, + has_parent: bool, + readiness: Arc>, + len: usize, +} + +impl WakerList { + pub(crate) fn new(len: usize) -> Self { + Self { + has_parent: false, + wakers: vec![], + readiness: Arc::new(Mutex::new(Readiness::new(len))), + len, } } + + pub(crate) fn has_parent(&self) -> bool { + self.has_parent + } + + pub(crate) fn set_parent(&mut self, parent: &Waker) { + self.wakers = (0..self.len) + .map(|i| Arc::new(InlineWaker::new(i, self.readiness.clone(), parent.clone())).into()) + .collect(); + + self.has_parent = true; + } + + pub(crate) fn get(&self, index: usize) -> Option<&Waker> { + debug_assert!( + self.has_parent, + "no parent waker set. Did you forget to call `WakerList::set_parent?" + ); + self.wakers.get(index) + } + + pub(crate) fn readiness(&self) -> &Mutex { + self.readiness.as_ref() + } }