Skip to content

Commit

Permalink
Merge pull request #57 from yoshuawuyts/no-alloc-vec-merge
Browse files Browse the repository at this point in the history
Implement "perfect" waking for `impl Merge for Vec` (2/2)
  • Loading branch information
eholk authored Nov 14, 2022
2 parents f262b48 + 45784eb commit ff31355
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 56 deletions.
28 changes: 9 additions & 19 deletions src/stream/merge/vec.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use super::Merge as MergeTrait;
use crate::stream::IntoStream;
use crate::utils::{self, Fuse, RandomGenerator, Readiness, StreamWaker};
use crate::utils::{self, Fuse, RandomGenerator, WakerList};

use core::fmt;
use futures_core::Stream;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};

/// A stream that merges multiple streams into a single stream.
Expand All @@ -23,24 +22,17 @@ where
#[pin]
streams: Vec<Fuse<S>>,
rng: RandomGenerator,
readiness: Arc<Mutex<Readiness>>,
complete: usize,
wakers: Vec<StreamWaker>,
wakers: WakerList,
}

impl<S> Merge<S>
where
S: Stream,
{
pub(crate) fn new(streams: Vec<S>) -> Self {
let readiness = Arc::new(Mutex::new(Readiness::new(streams.len())));
let wakers = (0..streams.len())
.map(|i| StreamWaker::new(i, readiness.clone()))
.collect();

Self {
wakers,
readiness,
wakers: WakerList::new(streams.len()),
streams: streams.into_iter().map(Fuse::new).collect(),
rng: RandomGenerator::new(),
complete: 0,
Expand Down Expand Up @@ -71,7 +63,8 @@ where
// this changes if we encounter a `Poll::Pending`.
let mut index = this.rng.generate(this.streams.len() as u32) as usize;

let mut readiness = this.readiness.lock().unwrap();
let mut readiness = this.wakers.readiness().lock().unwrap();
readiness.set_waker(cx.waker());
loop {
if !readiness.any_ready() {
// Nothing is ready yet
Expand All @@ -87,17 +80,14 @@ where
// unlock readiness so we don't deadlock when polling
drop(readiness);

// Construct an intermediate waker.
let mut waker = this.wakers[index].clone();
waker.set_parent_waker(cx.waker().clone());
let waker = Arc::new(waker).into();
let mut cx = Context::from_waker(&waker);
// Obtain the intermediate waker.
let mut cx = Context::from_waker(this.wakers.get(index).unwrap());

let stream = utils::get_pin_mut_from_vec(this.streams.as_mut(), index).unwrap();
match stream.poll_next(&mut cx) {
Poll::Ready(Some(item)) => {
// Mark ourselves as ready again because we need to poll for the next item.
this.readiness.lock().unwrap().set_ready(index);
this.wakers.readiness().lock().unwrap().set_ready(index);
return Poll::Ready(Some(item));
}
Poll::Ready(None) => {
Expand All @@ -110,7 +100,7 @@ where
}

// Lock readiness so we can use it again
readiness = this.readiness.lock().unwrap();
readiness = this.wakers.readiness().lock().unwrap();
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ mod pin;
mod poll_state;
mod rng;
mod tuple;
mod waker;
mod wakers;

pub(crate) use fuse::Fuse;
pub(crate) use maybe_done::MaybeDone;
pub(crate) use pin::{get_pin_mut, get_pin_mut_from_vec, iter_pin_mut, iter_pin_mut_vec};
pub(crate) use poll_state::PollState;
pub(crate) use rng::RandomGenerator;
pub(crate) use tuple::{gen_conditions, permutations, tuple_len};
pub(crate) use waker::{Readiness, StreamWaker};
pub(crate) use wakers::{InlineWaker, Readiness, WakerList};

#[cfg(test)]
mod dummy_waker;
Expand Down
36 changes: 36 additions & 0 deletions src/utils/wakers/inline_waker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use crate::utils;
use std::sync;
use std::sync::Arc;
use std::sync::Mutex;
use std::task;
use std::task::Wake;
use std::task::Waker;

use super::Readiness;

/// An efficient waker which delegates wake events.
#[derive(Debug, Clone)]
pub(crate) struct InlineWaker {
pub(crate) id: usize,
pub(crate) readiness: Arc<Mutex<Readiness>>,
}

impl InlineWaker {
/// Create a new instance of `InlineWaker`.
pub(crate) fn new(id: usize, readiness: Arc<Mutex<Readiness>>) -> Self {
Self { id, readiness }
}
}

impl Wake for InlineWaker {
fn wake(self: std::sync::Arc<Self>) {
let mut readiness = self.readiness.lock().unwrap();
if !readiness.set_ready(self.id) {
readiness
.parent_waker()
.as_mut()
.expect("`parent_waker` not available from `Readiness`. Did you forget to call `Readiness::set_waker`?")
.wake_by_ref()
}
}
}
7 changes: 7 additions & 0 deletions src/utils/wakers/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
mod inline_waker;
mod readiness;
mod waker_list;

pub(crate) use inline_waker::InlineWaker;
pub(crate) use readiness::Readiness;
pub(crate) use waker_list::WakerList;
50 changes: 15 additions & 35 deletions src/utils/waker.rs → src/utils/wakers/readiness.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
use crate::stream::IntoStream;
use crate::utils::{self, Fuse, RandomGenerator};
use bitvec::{bitvec, vec::BitVec};
use std::task::Waker;

use bitvec::bitvec;
use bitvec::vec::BitVec;
use core::fmt;
use futures_core::Stream;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Wake, Waker};
use crate::utils;

/// Tracks which wakers are "ready" and should be polled.
#[derive(Debug)]
pub(crate) struct Readiness {
count: usize,
ready: BitVec,
parent_waker: Option<Waker>,
}

impl Readiness {
Expand All @@ -21,6 +17,7 @@ impl Readiness {
Self {
count,
ready: bitvec![true as usize; count],
parent_waker: None,
}
}

Expand Down Expand Up @@ -48,37 +45,20 @@ impl Readiness {
}
}

/// Returns `true` if any of the wakers are ready.
pub(crate) fn any_ready(&self) -> bool {
self.count > 0
}
}

#[derive(Debug, Clone)]
pub(crate) struct StreamWaker {
id: usize,
readiness: Arc<Mutex<Readiness>>,
parent_waker: Option<Waker>,
}

impl StreamWaker {
pub(crate) fn new(id: usize, readiness: Arc<Mutex<Readiness>>) -> Self {
Self {
id,
readiness,
parent_waker: None,
}
}

pub(crate) fn set_parent_waker(&mut self, parent: Waker) {
self.parent_waker = Some(parent);
/// Access the parent waker.
#[inline]
pub(crate) fn parent_waker(&self) -> Option<&Waker> {
self.parent_waker.as_ref()
}
}

impl Wake for StreamWaker {
fn wake(self: std::sync::Arc<Self>) {
if !self.readiness.lock().unwrap().set_ready(self.id) {
let parent = self.parent_waker.as_ref().expect("No parent waker was set");
parent.wake_by_ref()
}
/// Set the parent `Waker`. This needs to be called at the start of every
/// `poll` function.
pub(crate) fn set_waker(&mut self, parent_waker: &Waker) {
self.parent_waker = Some(parent_waker.clone());
}
}
34 changes: 34 additions & 0 deletions src/utils/wakers/waker_list.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use std::sync::Arc;
use std::sync::Mutex;
use std::task::Waker;

use super::{InlineWaker, Readiness};
use crate::utils;

/// A collection of wakers which delegate to an in-line waker.
pub(crate) struct WakerList {
wakers: Vec<Waker>,
readiness: Arc<Mutex<Readiness>>,
}

impl WakerList {
/// Create a new instance of `WakerList`.
pub(crate) fn new(len: usize) -> Self {
let readiness = Arc::new(Mutex::new(Readiness::new(len)));
Self {
wakers: (0..len)
.map(|i| Arc::new(InlineWaker::new(i, readiness.clone())).into())
.collect(),
readiness,
}
}

pub(crate) fn get(&self, index: usize) -> Option<&Waker> {
self.wakers.get(index)
}

/// Access the `Readiness`.
pub(crate) fn readiness(&self) -> &Mutex<Readiness> {
self.readiness.as_ref()
}
}

0 comments on commit ff31355

Please sign in to comment.