Skip to content

Commit

Permalink
remove allocations from vec::merge
Browse files Browse the repository at this point in the history
  • Loading branch information
yoshuawuyts committed Nov 14, 2022
1 parent f262b48 commit 25286bd
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 32 deletions.
31 changes: 12 additions & 19 deletions src/stream/merge/vec.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use super::Merge as MergeTrait;
use crate::stream::IntoStream;
use crate::utils::{self, Fuse, RandomGenerator, Readiness, StreamWaker};
use crate::utils::{self, Fuse, RandomGenerator, WakerList};

use core::fmt;
use futures_core::Stream;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};

/// A stream that merges multiple streams into a single stream.
Expand All @@ -23,24 +22,17 @@ where
#[pin]
streams: Vec<Fuse<S>>,
rng: RandomGenerator,
readiness: Arc<Mutex<Readiness>>,
complete: usize,
wakers: Vec<StreamWaker>,
wakers: WakerList,
}

impl<S> Merge<S>
where
S: Stream,
{
pub(crate) fn new(streams: Vec<S>) -> Self {
let readiness = Arc::new(Mutex::new(Readiness::new(streams.len())));
let wakers = (0..streams.len())
.map(|i| StreamWaker::new(i, readiness.clone()))
.collect();

Self {
wakers,
readiness,
wakers: WakerList::new(streams.len()),
streams: streams.into_iter().map(Fuse::new).collect(),
rng: RandomGenerator::new(),
complete: 0,
Expand All @@ -66,12 +58,16 @@ where
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();

if !this.wakers.has_parent() {
this.wakers.set_parent(cx.waker());
}

// Iterate over our streams one-by-one. If a stream yields a value,
// we exit early. By default we'll return `Poll::Ready(None)`, but
// this changes if we encounter a `Poll::Pending`.
let mut index = this.rng.generate(this.streams.len() as u32) as usize;

let mut readiness = this.readiness.lock().unwrap();
let mut readiness = this.wakers.readiness().lock().unwrap();
loop {
if !readiness.any_ready() {
// Nothing is ready yet
Expand All @@ -87,17 +83,14 @@ where
// unlock readiness so we don't deadlock when polling
drop(readiness);

// Construct an intermediate waker.
let mut waker = this.wakers[index].clone();
waker.set_parent_waker(cx.waker().clone());
let waker = Arc::new(waker).into();
let mut cx = Context::from_waker(&waker);
// Obtain the intermediate waker.
let mut cx = Context::from_waker(this.wakers.get(index).unwrap());

let stream = utils::get_pin_mut_from_vec(this.streams.as_mut(), index).unwrap();
match stream.poll_next(&mut cx) {
Poll::Ready(Some(item)) => {
// Mark ourselves as ready again because we need to poll for the next item.
this.readiness.lock().unwrap().set_ready(index);
this.wakers.readiness().lock().unwrap().set_ready(index);
return Poll::Ready(Some(item));
}
Poll::Ready(None) => {
Expand All @@ -110,7 +103,7 @@ where
}

// Lock readiness so we can use it again
readiness = this.readiness.lock().unwrap();
readiness = this.wakers.readiness().lock().unwrap();
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub(crate) use pin::{get_pin_mut, get_pin_mut_from_vec, iter_pin_mut, iter_pin_m
pub(crate) use poll_state::PollState;
pub(crate) use rng::RandomGenerator;
pub(crate) use tuple::{gen_conditions, permutations, tuple_len};
pub(crate) use waker::{Readiness, StreamWaker};
pub(crate) use waker::{InlineWaker, Readiness, WakerList};

#[cfg(test)]
mod dummy_waker;
Expand Down
63 changes: 51 additions & 12 deletions src/utils/waker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,32 +53,71 @@ impl Readiness {
}
}

/// An efficient waker which delegates wake events.
#[derive(Debug, Clone)]
pub(crate) struct StreamWaker {
pub(crate) struct InlineWaker {
id: usize,
readiness: Arc<Mutex<Readiness>>,
parent_waker: Option<Waker>,
parent_waker: Waker,
}

impl StreamWaker {
pub(crate) fn new(id: usize, readiness: Arc<Mutex<Readiness>>) -> Self {
impl InlineWaker {
pub(crate) fn new(id: usize, readiness: Arc<Mutex<Readiness>>, parent_waker: Waker) -> Self {
Self {
id,
readiness,
parent_waker: None,
parent_waker,
}
}

pub(crate) fn set_parent_waker(&mut self, parent: Waker) {
self.parent_waker = Some(parent);
}
}

impl Wake for StreamWaker {
impl Wake for InlineWaker {
fn wake(self: std::sync::Arc<Self>) {
if !self.readiness.lock().unwrap().set_ready(self.id) {
let parent = self.parent_waker.as_ref().expect("No parent waker was set");
parent.wake_by_ref()
self.parent_waker.wake_by_ref()
}
}
}

/// A collection of wakers.
pub(crate) struct WakerList {
wakers: Vec<Waker>,
has_parent: bool,
readiness: Arc<Mutex<Readiness>>,
len: usize,
}

impl WakerList {
pub(crate) fn new(len: usize) -> Self {
Self {
has_parent: false,
wakers: vec![],
readiness: Arc::new(Mutex::new(Readiness::new(len))),
len,
}
}

pub(crate) fn has_parent(&self) -> bool {
self.has_parent
}

pub(crate) fn set_parent(&mut self, parent: &Waker) {
self.wakers = (0..self.len)
.map(|i| Arc::new(InlineWaker::new(i, self.readiness.clone(), parent.clone())).into())
.collect();

self.has_parent = true;
}

pub(crate) fn get(&self, index: usize) -> Option<&Waker> {
debug_assert!(
self.has_parent,
"no parent waker set. Did you forget to call `WakerList::set_parent?"
);
self.wakers.get(index)
}

pub(crate) fn readiness(&self) -> &Mutex<Readiness> {
self.readiness.as_ref()
}
}

0 comments on commit 25286bd

Please sign in to comment.