diff --git a/Cargo.toml b/Cargo.toml index 3e75f50..7b78324 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,3 +35,4 @@ futures-lite = "1.12.0" criterion = { version = "0.3", features = ["async", "async_futures", "html_reports"] } async-std = { version = "1.12.0", features = ["attributes"] } futures-time = "3.0.0" +rand = "0.8.5" diff --git a/benches/utils.rs b/benches/utils.rs deleted file mode 100644 index b04e4d1..0000000 --- a/benches/utils.rs +++ /dev/null @@ -1,232 +0,0 @@ -#![allow(unused)] - -use futures_core::Stream; -use futures_lite::prelude::*; -use pin_project::pin_project; - -use std::cell::RefCell; -use std::collections::VecDeque; -use std::pin::Pin; -use std::rc::Rc; -use std::task::{Context, Poll, Waker}; - -pub fn futures_vec(len: usize) -> Vec { - let wakers = Rc::new(RefCell::new(VecDeque::new())); - let completed = Rc::new(RefCell::new(0)); - let futures: Vec<_> = (0..len) - .map(|n| CountdownFuture::new(n, len, wakers.clone(), completed.clone())) - .collect(); - futures -} - -pub fn futures_array() -> [CountdownFuture; N] { - let wakers = Rc::new(RefCell::new(VecDeque::new())); - let completed = Rc::new(RefCell::new(0)); - std::array::from_fn(|n| CountdownFuture::new(n, N, wakers.clone(), completed.clone())) -} - -pub fn futures_tuple() -> ( - CountdownFuture, - CountdownFuture, - CountdownFuture, - CountdownFuture, - CountdownFuture, - CountdownFuture, - CountdownFuture, - CountdownFuture, - CountdownFuture, - CountdownFuture, -) { - let len = 10; - let wakers = Rc::new(RefCell::new(VecDeque::new())); - let completed = Rc::new(RefCell::new(0)); - ( - CountdownFuture::new(0, len, wakers.clone(), completed.clone()), - CountdownFuture::new(1, len, wakers.clone(), completed.clone()), - CountdownFuture::new(2, len, wakers.clone(), completed.clone()), - CountdownFuture::new(3, len, wakers.clone(), completed.clone()), - CountdownFuture::new(4, len, wakers.clone(), completed.clone()), - CountdownFuture::new(5, len, wakers.clone(), completed.clone()), - CountdownFuture::new(6, len, wakers.clone(), completed.clone()), - CountdownFuture::new(7, len, wakers.clone(), completed.clone()), - CountdownFuture::new(8, len, wakers.clone(), completed.clone()), - CountdownFuture::new(9, len, wakers, completed), - ) -} - -pub fn streams_vec(len: usize) -> Vec { - let wakers = Rc::new(RefCell::new(VecDeque::new())); - let completed = Rc::new(RefCell::new(0)); - let streams: Vec<_> = (0..len) - .map(|n| CountdownStream::new(n, len, wakers.clone(), completed.clone())) - .collect(); - streams -} - -pub fn streams_array() -> [CountdownStream; N] { - let wakers = Rc::new(RefCell::new(VecDeque::new())); - let completed = Rc::new(RefCell::new(0)); - std::array::from_fn(|n| CountdownStream::new(n, N, wakers.clone(), completed.clone())) -} - -pub fn streams_tuple() -> ( - CountdownStream, - CountdownStream, - CountdownStream, - CountdownStream, - CountdownStream, - CountdownStream, - CountdownStream, - CountdownStream, - CountdownStream, - CountdownStream, -) { - let len = 10; - let wakers = Rc::new(RefCell::new(VecDeque::new())); - let completed = Rc::new(RefCell::new(0)); - ( - CountdownStream::new(0, len, wakers.clone(), completed.clone()), - CountdownStream::new(1, len, wakers.clone(), completed.clone()), - CountdownStream::new(2, len, wakers.clone(), completed.clone()), - CountdownStream::new(3, len, wakers.clone(), completed.clone()), - CountdownStream::new(4, len, wakers.clone(), completed.clone()), - CountdownStream::new(5, len, wakers.clone(), completed.clone()), - CountdownStream::new(6, len, wakers.clone(), completed.clone()), - CountdownStream::new(7, len, wakers.clone(), completed.clone()), - CountdownStream::new(8, len, wakers.clone(), completed.clone()), - CountdownStream::new(9, len, wakers, completed), - ) -} - -#[derive(Clone, Copy)] -enum State { - Init, - Polled, - Done, -} - -/// A stream which will _eventually_ be ready, but needs to be polled N times before it is. -#[pin_project] -pub struct CountdownStream { - state: State, - wakers: Rc>>, - index: usize, - max_count: usize, - completed_count: Rc>, -} - -impl CountdownStream { - pub fn new( - index: usize, - max_count: usize, - wakers: Rc>>, - completed_count: Rc>, - ) -> Self { - Self { - state: State::Init, - wakers, - max_count, - index, - completed_count, - } - } -} -impl Stream for CountdownStream { - type Item = (); - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - - // If we are the last stream to be polled, skip strait to the Polled state. - if this.wakers.borrow().len() + 1 == *this.max_count { - *this.state = State::Polled; - } - - match this.state { - State::Init => { - // Push our waker onto the stack so we get woken again someday. - this.wakers.borrow_mut().push_back(cx.waker().clone()); - *this.state = State::Polled; - Poll::Pending - } - State::Polled => { - // Wake up the next one - let _ = this.wakers.borrow_mut().pop_front().map(Waker::wake); - - if *this.completed_count.borrow() == *this.index { - *this.state = State::Done; - *this.completed_count.borrow_mut() += 1; - Poll::Ready(Some(())) - } else { - // We're not done yet, so schedule another wakeup - this.wakers.borrow_mut().push_back(cx.waker().clone()); - Poll::Pending - } - } - State::Done => Poll::Ready(None), - } - } -} - -/// A future which will _eventually_ be ready, but needs to be polled N times before it is. -#[pin_project] -pub struct CountdownFuture { - state: State, - wakers: Rc>>, - index: usize, - max_count: usize, - completed_count: Rc>, -} - -impl CountdownFuture { - pub fn new( - index: usize, - max_count: usize, - wakers: Rc>>, - completed_count: Rc>, - ) -> Self { - Self { - state: State::Init, - wakers, - max_count, - index, - completed_count, - } - } -} -impl Future for CountdownFuture { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - - // If we are the last stream to be polled, skip strait to the Polled state. - if this.wakers.borrow().len() + 1 == *this.max_count { - *this.state = State::Polled; - } - - match this.state { - State::Init => { - // Push our waker onto the stack so we get woken again someday. - this.wakers.borrow_mut().push_back(cx.waker().clone()); - *this.state = State::Polled; - Poll::Pending - } - State::Polled => { - // Wake up the next one - let _ = this.wakers.borrow_mut().pop_front().map(Waker::wake); - - if *this.completed_count.borrow() == *this.index { - *this.state = State::Done; - *this.completed_count.borrow_mut() += 1; - Poll::Ready(()) - } else { - // We're not done yet, so schedule another wakeup - this.wakers.borrow_mut().push_back(cx.waker().clone()); - Poll::Pending - } - } - State::Done => Poll::Ready(()), - } - } -} diff --git a/benches/utils/countdown_futures.rs b/benches/utils/countdown_futures.rs new file mode 100644 index 0000000..fb8b88b --- /dev/null +++ b/benches/utils/countdown_futures.rs @@ -0,0 +1,112 @@ +use futures_core::Future; + +use std::cell::{Cell, RefCell}; +use std::collections::BinaryHeap; +use std::pin::Pin; +use std::rc::Rc; +use std::task::{Context, Poll}; + +use super::{shuffle, PrioritizedWaker, State}; + +pub fn futures_vec(len: usize) -> Vec { + let wakers = Rc::new(RefCell::new(BinaryHeap::new())); + let completed = Rc::new(Cell::new(0)); + let mut futures: Vec<_> = (0..len) + .map(|n| CountdownFuture::new(n, len, wakers.clone(), completed.clone())) + .collect(); + shuffle(&mut futures); + futures +} + +pub fn futures_array() -> [CountdownFuture; N] { + let wakers = Rc::new(RefCell::new(BinaryHeap::new())); + let completed = Rc::new(Cell::new(0)); + let mut futures = + std::array::from_fn(|n| CountdownFuture::new(n, N, wakers.clone(), completed.clone())); + shuffle(&mut futures); + futures +} + +pub fn futures_tuple() -> ( + CountdownFuture, + CountdownFuture, + CountdownFuture, + CountdownFuture, + CountdownFuture, + CountdownFuture, + CountdownFuture, + CountdownFuture, + CountdownFuture, + CountdownFuture, +) { + let [f0, f1, f2, f3, f4, f5, f6, f7, f8, f9] = futures_array::<10>(); + (f0, f1, f2, f3, f4, f5, f6, f7, f8, f9) +} + +/// A future which will _eventually_ be ready, but needs to be polled N times before it is. +pub struct CountdownFuture { + state: State, + wakers: Rc>>, + index: usize, + max_count: usize, + completed_count: Rc>, +} + +impl CountdownFuture { + pub fn new( + index: usize, + max_count: usize, + wakers: Rc>>, + completed_count: Rc>, + ) -> Self { + Self { + state: State::Init, + wakers, + max_count, + index, + completed_count, + } + } +} +impl Future for CountdownFuture { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // If we are the last stream to be polled, skip strait to the Polled state. + if self.wakers.borrow().len() + 1 == self.max_count { + self.state = State::Polled; + } + + match self.state { + State::Init => { + // Push our waker onto the stack so we get woken again someday. + self.wakers + .borrow_mut() + .push(PrioritizedWaker(self.index, cx.waker().clone())); + self.state = State::Polled; + Poll::Pending + } + State::Polled => { + // Wake up the next one + let _ = self + .wakers + .borrow_mut() + .pop() + .map(|PrioritizedWaker(_, waker)| waker.wake()); + + if self.completed_count.get() == self.index { + self.state = State::Done; + self.completed_count.set(self.completed_count.get() + 1); + Poll::Ready(()) + } else { + // We're not done yet, so schedule another wakeup + self.wakers + .borrow_mut() + .push(PrioritizedWaker(self.index, cx.waker().clone())); + Poll::Pending + } + } + State::Done => Poll::Ready(()), + } + } +} diff --git a/benches/utils/countdown_streams.rs b/benches/utils/countdown_streams.rs new file mode 100644 index 0000000..25bec68 --- /dev/null +++ b/benches/utils/countdown_streams.rs @@ -0,0 +1,112 @@ +use futures_core::Stream; + +use std::cell::{Cell, RefCell}; +use std::collections::BinaryHeap; +use std::pin::Pin; +use std::rc::Rc; +use std::task::{Context, Poll}; + +use super::{shuffle, PrioritizedWaker, State}; + +pub fn streams_vec(len: usize) -> Vec { + let wakers = Rc::new(RefCell::new(BinaryHeap::new())); + let completed = Rc::new(Cell::new(0)); + let mut streams: Vec<_> = (0..len) + .map(|n| CountdownStream::new(n, len, wakers.clone(), completed.clone())) + .collect(); + shuffle(&mut streams); + streams +} + +pub fn streams_array() -> [CountdownStream; N] { + let wakers = Rc::new(RefCell::new(BinaryHeap::new())); + let completed = Rc::new(Cell::new(0)); + let mut streams = + std::array::from_fn(|n| CountdownStream::new(n, N, wakers.clone(), completed.clone())); + shuffle(&mut streams); + streams +} + +pub fn streams_tuple() -> ( + CountdownStream, + CountdownStream, + CountdownStream, + CountdownStream, + CountdownStream, + CountdownStream, + CountdownStream, + CountdownStream, + CountdownStream, + CountdownStream, +) { + let [f0, f1, f2, f3, f4, f5, f6, f7, f8, f9] = streams_array::<10>(); + (f0, f1, f2, f3, f4, f5, f6, f7, f8, f9) +} + +/// A stream which will _eventually_ be ready, but needs to be polled N times before it is. +pub struct CountdownStream { + state: State, + wakers: Rc>>, + index: usize, + max_count: usize, + completed_count: Rc>, +} + +impl CountdownStream { + pub fn new( + index: usize, + max_count: usize, + wakers: Rc>>, + completed_count: Rc>, + ) -> Self { + Self { + state: State::Init, + wakers, + max_count, + index, + completed_count, + } + } +} +impl Stream for CountdownStream { + type Item = (); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // If we are the last stream to be polled, skip strait to the Polled state. + if self.wakers.borrow().len() + 1 == self.max_count { + self.state = State::Polled; + } + + match &mut self.state { + State::Init => { + // Push our waker onto the stack so we get woken again someday. + self.wakers + .borrow_mut() + .push(PrioritizedWaker(self.index, cx.waker().clone())); + self.state = State::Polled; + Poll::Pending + } + State::Polled => { + // Wake up the next one + let _ = self + .wakers + .borrow_mut() + .pop() + .map(|PrioritizedWaker(_, waker)| waker.wake()); + + if self.completed_count.get() == self.index { + self.state = State::Done; + self.completed_count.set(self.completed_count.get() + 1); + Poll::Ready(Some(())) + } else { + // We're not done yet, so schedule another wakeup + self.wakers + .borrow_mut() + .push(PrioritizedWaker(self.index, cx.waker().clone())); + Poll::Pending + } + } + State::Done => Poll::Ready(None), + } + } +} diff --git a/benches/utils/mod.rs b/benches/utils/mod.rs new file mode 100644 index 0000000..971a5de --- /dev/null +++ b/benches/utils/mod.rs @@ -0,0 +1,46 @@ +mod countdown_futures; +mod countdown_streams; + +mod prioritized_waker { + use std::{cmp::Ordering, task::Waker}; + + // PrioritizedWaker(index, waker). + // Lowest index gets popped off the BinaryHeap first. + pub struct PrioritizedWaker(pub usize, pub Waker); + impl PartialEq for PrioritizedWaker { + fn eq(&self, other: &Self) -> bool { + self.0 == other.0 + } + } + impl Eq for PrioritizedWaker { + fn assert_receiver_is_total_eq(&self) {} + } + impl PartialOrd for PrioritizedWaker { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } + } + impl Ord for PrioritizedWaker { + fn cmp(&self, other: &Self) -> Ordering { + self.0.cmp(&other.0).reverse() + } + } +} +use prioritized_waker::PrioritizedWaker; + +#[derive(Clone, Copy)] +enum State { + Init, + Polled, + Done, +} + +fn shuffle(slice: &mut [T]) { + use rand::seq::SliceRandom; + use rand::SeedableRng; + let mut rng = rand::rngs::StdRng::seed_from_u64(42); + slice.shuffle(&mut rng); +} + +pub use countdown_futures::*; +pub use countdown_streams::*;