From b29307d9419ec1bbebe3bf3df64efcdf33cbef2b Mon Sep 17 00:00:00 2001 From: Wisha Wa Date: Thu, 29 Dec 2022 06:48:52 +0000 Subject: [PATCH 1/2] shuffle vec/array/tuple futures/streams for benchmark (with a fixed seed) --- Cargo.toml | 1 + benches/utils.rs | 232 ----------------------------- benches/utils/countdown_futures.rs | 116 +++++++++++++++ benches/utils/countdown_streams.rs | 116 +++++++++++++++ benches/utils/mod.rs | 46 ++++++ 5 files changed, 279 insertions(+), 232 deletions(-) delete mode 100644 benches/utils.rs create mode 100644 benches/utils/countdown_futures.rs create mode 100644 benches/utils/countdown_streams.rs create mode 100644 benches/utils/mod.rs diff --git a/Cargo.toml b/Cargo.toml index f75584c..25f411f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,3 +35,4 @@ futures-lite = "1.12.0" criterion = { version = "0.3", features = ["async", "async_futures", "html_reports"] } async-std = { version = "1.12.0", features = ["attributes"] } futures-time = "3.0.0" +rand = "0.8.5" diff --git a/benches/utils.rs b/benches/utils.rs deleted file mode 100644 index b04e4d1..0000000 --- a/benches/utils.rs +++ /dev/null @@ -1,232 +0,0 @@ -#![allow(unused)] - -use futures_core::Stream; -use futures_lite::prelude::*; -use pin_project::pin_project; - -use std::cell::RefCell; -use std::collections::VecDeque; -use std::pin::Pin; -use std::rc::Rc; -use std::task::{Context, Poll, Waker}; - -pub fn futures_vec(len: usize) -> Vec { - let wakers = Rc::new(RefCell::new(VecDeque::new())); - let completed = Rc::new(RefCell::new(0)); - let futures: Vec<_> = (0..len) - .map(|n| CountdownFuture::new(n, len, wakers.clone(), completed.clone())) - .collect(); - futures -} - -pub fn futures_array() -> [CountdownFuture; N] { - let wakers = Rc::new(RefCell::new(VecDeque::new())); - let completed = Rc::new(RefCell::new(0)); - std::array::from_fn(|n| CountdownFuture::new(n, N, wakers.clone(), completed.clone())) -} - -pub fn futures_tuple() -> ( - CountdownFuture, - CountdownFuture, - CountdownFuture, - CountdownFuture, - CountdownFuture, - CountdownFuture, - CountdownFuture, - CountdownFuture, - CountdownFuture, - CountdownFuture, -) { - let len = 10; - let wakers = Rc::new(RefCell::new(VecDeque::new())); - let completed = Rc::new(RefCell::new(0)); - ( - CountdownFuture::new(0, len, wakers.clone(), completed.clone()), - CountdownFuture::new(1, len, wakers.clone(), completed.clone()), - CountdownFuture::new(2, len, wakers.clone(), completed.clone()), - CountdownFuture::new(3, len, wakers.clone(), completed.clone()), - CountdownFuture::new(4, len, wakers.clone(), completed.clone()), - CountdownFuture::new(5, len, wakers.clone(), completed.clone()), - CountdownFuture::new(6, len, wakers.clone(), completed.clone()), - CountdownFuture::new(7, len, wakers.clone(), completed.clone()), - CountdownFuture::new(8, len, wakers.clone(), completed.clone()), - CountdownFuture::new(9, len, wakers, completed), - ) -} - -pub fn streams_vec(len: usize) -> Vec { - let wakers = Rc::new(RefCell::new(VecDeque::new())); - let completed = Rc::new(RefCell::new(0)); - let streams: Vec<_> = (0..len) - .map(|n| CountdownStream::new(n, len, wakers.clone(), completed.clone())) - .collect(); - streams -} - -pub fn streams_array() -> [CountdownStream; N] { - let wakers = Rc::new(RefCell::new(VecDeque::new())); - let completed = Rc::new(RefCell::new(0)); - std::array::from_fn(|n| CountdownStream::new(n, N, wakers.clone(), completed.clone())) -} - -pub fn streams_tuple() -> ( - CountdownStream, - CountdownStream, - CountdownStream, - CountdownStream, - CountdownStream, - CountdownStream, - CountdownStream, - CountdownStream, - CountdownStream, - CountdownStream, -) { - let len = 10; - let wakers = Rc::new(RefCell::new(VecDeque::new())); - let completed = Rc::new(RefCell::new(0)); - ( - CountdownStream::new(0, len, wakers.clone(), completed.clone()), - CountdownStream::new(1, len, wakers.clone(), completed.clone()), - CountdownStream::new(2, len, wakers.clone(), completed.clone()), - CountdownStream::new(3, len, wakers.clone(), completed.clone()), - CountdownStream::new(4, len, wakers.clone(), completed.clone()), - CountdownStream::new(5, len, wakers.clone(), completed.clone()), - CountdownStream::new(6, len, wakers.clone(), completed.clone()), - CountdownStream::new(7, len, wakers.clone(), completed.clone()), - CountdownStream::new(8, len, wakers.clone(), completed.clone()), - CountdownStream::new(9, len, wakers, completed), - ) -} - -#[derive(Clone, Copy)] -enum State { - Init, - Polled, - Done, -} - -/// A stream which will _eventually_ be ready, but needs to be polled N times before it is. -#[pin_project] -pub struct CountdownStream { - state: State, - wakers: Rc>>, - index: usize, - max_count: usize, - completed_count: Rc>, -} - -impl CountdownStream { - pub fn new( - index: usize, - max_count: usize, - wakers: Rc>>, - completed_count: Rc>, - ) -> Self { - Self { - state: State::Init, - wakers, - max_count, - index, - completed_count, - } - } -} -impl Stream for CountdownStream { - type Item = (); - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - - // If we are the last stream to be polled, skip strait to the Polled state. - if this.wakers.borrow().len() + 1 == *this.max_count { - *this.state = State::Polled; - } - - match this.state { - State::Init => { - // Push our waker onto the stack so we get woken again someday. - this.wakers.borrow_mut().push_back(cx.waker().clone()); - *this.state = State::Polled; - Poll::Pending - } - State::Polled => { - // Wake up the next one - let _ = this.wakers.borrow_mut().pop_front().map(Waker::wake); - - if *this.completed_count.borrow() == *this.index { - *this.state = State::Done; - *this.completed_count.borrow_mut() += 1; - Poll::Ready(Some(())) - } else { - // We're not done yet, so schedule another wakeup - this.wakers.borrow_mut().push_back(cx.waker().clone()); - Poll::Pending - } - } - State::Done => Poll::Ready(None), - } - } -} - -/// A future which will _eventually_ be ready, but needs to be polled N times before it is. -#[pin_project] -pub struct CountdownFuture { - state: State, - wakers: Rc>>, - index: usize, - max_count: usize, - completed_count: Rc>, -} - -impl CountdownFuture { - pub fn new( - index: usize, - max_count: usize, - wakers: Rc>>, - completed_count: Rc>, - ) -> Self { - Self { - state: State::Init, - wakers, - max_count, - index, - completed_count, - } - } -} -impl Future for CountdownFuture { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - - // If we are the last stream to be polled, skip strait to the Polled state. - if this.wakers.borrow().len() + 1 == *this.max_count { - *this.state = State::Polled; - } - - match this.state { - State::Init => { - // Push our waker onto the stack so we get woken again someday. - this.wakers.borrow_mut().push_back(cx.waker().clone()); - *this.state = State::Polled; - Poll::Pending - } - State::Polled => { - // Wake up the next one - let _ = this.wakers.borrow_mut().pop_front().map(Waker::wake); - - if *this.completed_count.borrow() == *this.index { - *this.state = State::Done; - *this.completed_count.borrow_mut() += 1; - Poll::Ready(()) - } else { - // We're not done yet, so schedule another wakeup - this.wakers.borrow_mut().push_back(cx.waker().clone()); - Poll::Pending - } - } - State::Done => Poll::Ready(()), - } - } -} diff --git a/benches/utils/countdown_futures.rs b/benches/utils/countdown_futures.rs new file mode 100644 index 0000000..c324281 --- /dev/null +++ b/benches/utils/countdown_futures.rs @@ -0,0 +1,116 @@ +use futures_core::Future; +use pin_project::pin_project; + +use std::cell::{Cell, RefCell}; +use std::collections::BinaryHeap; +use std::pin::Pin; +use std::rc::Rc; +use std::task::{Context, Poll}; + +use super::{shuffle, PrioritizedWaker, State}; + +pub fn futures_vec(len: usize) -> Vec { + let wakers = Rc::new(RefCell::new(BinaryHeap::new())); + let completed = Rc::new(Cell::new(0)); + let mut futures: Vec<_> = (0..len) + .map(|n| CountdownFuture::new(n, len, wakers.clone(), completed.clone())) + .collect(); + shuffle(&mut futures); + futures +} + +pub fn futures_array() -> [CountdownFuture; N] { + let wakers = Rc::new(RefCell::new(BinaryHeap::new())); + let completed = Rc::new(Cell::new(0)); + let mut futures = + std::array::from_fn(|n| CountdownFuture::new(n, N, wakers.clone(), completed.clone())); + shuffle(&mut futures); + futures +} + +pub fn futures_tuple() -> ( + CountdownFuture, + CountdownFuture, + CountdownFuture, + CountdownFuture, + CountdownFuture, + CountdownFuture, + CountdownFuture, + CountdownFuture, + CountdownFuture, + CountdownFuture, +) { + let [f0, f1, f2, f3, f4, f5, f6, f7, f8, f9] = futures_array::<10>(); + (f0, f1, f2, f3, f4, f5, f6, f7, f8, f9) +} + +/// A future which will _eventually_ be ready, but needs to be polled N times before it is. +#[pin_project] +pub struct CountdownFuture { + state: State, + wakers: Rc>>, + index: usize, + max_count: usize, + completed_count: Rc>, +} + +impl CountdownFuture { + pub fn new( + index: usize, + max_count: usize, + wakers: Rc>>, + completed_count: Rc>, + ) -> Self { + Self { + state: State::Init, + wakers, + max_count, + index, + completed_count, + } + } +} +impl Future for CountdownFuture { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + + // If we are the last stream to be polled, skip strait to the Polled state. + if this.wakers.borrow().len() + 1 == *this.max_count { + *this.state = State::Polled; + } + + match this.state { + State::Init => { + // Push our waker onto the stack so we get woken again someday. + this.wakers + .borrow_mut() + .push(PrioritizedWaker(*this.index, cx.waker().clone())); + *this.state = State::Polled; + Poll::Pending + } + State::Polled => { + // Wake up the next one + let _ = this + .wakers + .borrow_mut() + .pop() + .map(|PrioritizedWaker(_, waker)| waker.wake()); + + if this.completed_count.get() == *this.index { + *this.state = State::Done; + this.completed_count.set(this.completed_count.get() + 1); + Poll::Ready(()) + } else { + // We're not done yet, so schedule another wakeup + this.wakers + .borrow_mut() + .push(PrioritizedWaker(*this.index, cx.waker().clone())); + Poll::Pending + } + } + State::Done => Poll::Ready(()), + } + } +} diff --git a/benches/utils/countdown_streams.rs b/benches/utils/countdown_streams.rs new file mode 100644 index 0000000..0fa1035 --- /dev/null +++ b/benches/utils/countdown_streams.rs @@ -0,0 +1,116 @@ +use futures_core::Stream; +use pin_project::pin_project; + +use std::cell::{Cell, RefCell}; +use std::collections::BinaryHeap; +use std::pin::Pin; +use std::rc::Rc; +use std::task::{Context, Poll}; + +use super::{shuffle, PrioritizedWaker, State}; + +pub fn streams_vec(len: usize) -> Vec { + let wakers = Rc::new(RefCell::new(BinaryHeap::new())); + let completed = Rc::new(Cell::new(0)); + let mut streams: Vec<_> = (0..len) + .map(|n| CountdownStream::new(n, len, wakers.clone(), completed.clone())) + .collect(); + shuffle(&mut streams); + streams +} + +pub fn streams_array() -> [CountdownStream; N] { + let wakers = Rc::new(RefCell::new(BinaryHeap::new())); + let completed = Rc::new(Cell::new(0)); + let mut streams = + std::array::from_fn(|n| CountdownStream::new(n, N, wakers.clone(), completed.clone())); + shuffle(&mut streams); + streams +} + +pub fn streams_tuple() -> ( + CountdownStream, + CountdownStream, + CountdownStream, + CountdownStream, + CountdownStream, + CountdownStream, + CountdownStream, + CountdownStream, + CountdownStream, + CountdownStream, +) { + let [f0, f1, f2, f3, f4, f5, f6, f7, f8, f9] = streams_array::<10>(); + (f0, f1, f2, f3, f4, f5, f6, f7, f8, f9) +} + +/// A stream which will _eventually_ be ready, but needs to be polled N times before it is. +#[pin_project] +pub struct CountdownStream { + state: State, + wakers: Rc>>, + index: usize, + max_count: usize, + completed_count: Rc>, +} + +impl CountdownStream { + pub fn new( + index: usize, + max_count: usize, + wakers: Rc>>, + completed_count: Rc>, + ) -> Self { + Self { + state: State::Init, + wakers, + max_count, + index, + completed_count, + } + } +} +impl Stream for CountdownStream { + type Item = (); + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + + // If we are the last stream to be polled, skip strait to the Polled state. + if this.wakers.borrow().len() + 1 == *this.max_count { + *this.state = State::Polled; + } + + match this.state { + State::Init => { + // Push our waker onto the stack so we get woken again someday. + this.wakers + .borrow_mut() + .push(PrioritizedWaker(*this.index, cx.waker().clone())); + *this.state = State::Polled; + Poll::Pending + } + State::Polled => { + // Wake up the next one + let _ = this + .wakers + .borrow_mut() + .pop() + .map(|PrioritizedWaker(_, waker)| waker.wake()); + + if this.completed_count.get() == *this.index { + *this.state = State::Done; + this.completed_count.set(this.completed_count.get() + 1); + Poll::Ready(Some(())) + } else { + // We're not done yet, so schedule another wakeup + this.wakers + .borrow_mut() + .push(PrioritizedWaker(*this.index, cx.waker().clone())); + Poll::Pending + } + } + State::Done => Poll::Ready(None), + } + } +} diff --git a/benches/utils/mod.rs b/benches/utils/mod.rs new file mode 100644 index 0000000..971a5de --- /dev/null +++ b/benches/utils/mod.rs @@ -0,0 +1,46 @@ +mod countdown_futures; +mod countdown_streams; + +mod prioritized_waker { + use std::{cmp::Ordering, task::Waker}; + + // PrioritizedWaker(index, waker). + // Lowest index gets popped off the BinaryHeap first. + pub struct PrioritizedWaker(pub usize, pub Waker); + impl PartialEq for PrioritizedWaker { + fn eq(&self, other: &Self) -> bool { + self.0 == other.0 + } + } + impl Eq for PrioritizedWaker { + fn assert_receiver_is_total_eq(&self) {} + } + impl PartialOrd for PrioritizedWaker { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } + } + impl Ord for PrioritizedWaker { + fn cmp(&self, other: &Self) -> Ordering { + self.0.cmp(&other.0).reverse() + } + } +} +use prioritized_waker::PrioritizedWaker; + +#[derive(Clone, Copy)] +enum State { + Init, + Polled, + Done, +} + +fn shuffle(slice: &mut [T]) { + use rand::seq::SliceRandom; + use rand::SeedableRng; + let mut rng = rand::rngs::StdRng::seed_from_u64(42); + slice.shuffle(&mut rng); +} + +pub use countdown_futures::*; +pub use countdown_streams::*; From c777f28d767f09c88235874e00f89c62158d2def Mon Sep 17 00:00:00 2001 From: Wisha Wa Date: Thu, 2 Feb 2023 17:22:31 +0000 Subject: [PATCH 2/2] remove unneccessary pin_project in countdown future/stream --- benches/utils/countdown_futures.rs | 30 +++++++++++++----------------- benches/utils/countdown_streams.rs | 30 +++++++++++++----------------- 2 files changed, 26 insertions(+), 34 deletions(-) diff --git a/benches/utils/countdown_futures.rs b/benches/utils/countdown_futures.rs index c324281..fb8b88b 100644 --- a/benches/utils/countdown_futures.rs +++ b/benches/utils/countdown_futures.rs @@ -1,5 +1,4 @@ use futures_core::Future; -use pin_project::pin_project; use std::cell::{Cell, RefCell}; use std::collections::BinaryHeap; @@ -45,7 +44,6 @@ pub fn futures_tuple() -> ( } /// A future which will _eventually_ be ready, but needs to be polled N times before it is. -#[pin_project] pub struct CountdownFuture { state: State, wakers: Rc>>, @@ -73,40 +71,38 @@ impl CountdownFuture { impl Future for CountdownFuture { type Output = (); - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { // If we are the last stream to be polled, skip strait to the Polled state. - if this.wakers.borrow().len() + 1 == *this.max_count { - *this.state = State::Polled; + if self.wakers.borrow().len() + 1 == self.max_count { + self.state = State::Polled; } - match this.state { + match self.state { State::Init => { // Push our waker onto the stack so we get woken again someday. - this.wakers + self.wakers .borrow_mut() - .push(PrioritizedWaker(*this.index, cx.waker().clone())); - *this.state = State::Polled; + .push(PrioritizedWaker(self.index, cx.waker().clone())); + self.state = State::Polled; Poll::Pending } State::Polled => { // Wake up the next one - let _ = this + let _ = self .wakers .borrow_mut() .pop() .map(|PrioritizedWaker(_, waker)| waker.wake()); - if this.completed_count.get() == *this.index { - *this.state = State::Done; - this.completed_count.set(this.completed_count.get() + 1); + if self.completed_count.get() == self.index { + self.state = State::Done; + self.completed_count.set(self.completed_count.get() + 1); Poll::Ready(()) } else { // We're not done yet, so schedule another wakeup - this.wakers + self.wakers .borrow_mut() - .push(PrioritizedWaker(*this.index, cx.waker().clone())); + .push(PrioritizedWaker(self.index, cx.waker().clone())); Poll::Pending } } diff --git a/benches/utils/countdown_streams.rs b/benches/utils/countdown_streams.rs index 0fa1035..25bec68 100644 --- a/benches/utils/countdown_streams.rs +++ b/benches/utils/countdown_streams.rs @@ -1,5 +1,4 @@ use futures_core::Stream; -use pin_project::pin_project; use std::cell::{Cell, RefCell}; use std::collections::BinaryHeap; @@ -45,7 +44,6 @@ pub fn streams_tuple() -> ( } /// A stream which will _eventually_ be ready, but needs to be polled N times before it is. -#[pin_project] pub struct CountdownStream { state: State, wakers: Rc>>, @@ -73,40 +71,38 @@ impl CountdownStream { impl Stream for CountdownStream { type Item = (); - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { // If we are the last stream to be polled, skip strait to the Polled state. - if this.wakers.borrow().len() + 1 == *this.max_count { - *this.state = State::Polled; + if self.wakers.borrow().len() + 1 == self.max_count { + self.state = State::Polled; } - match this.state { + match &mut self.state { State::Init => { // Push our waker onto the stack so we get woken again someday. - this.wakers + self.wakers .borrow_mut() - .push(PrioritizedWaker(*this.index, cx.waker().clone())); - *this.state = State::Polled; + .push(PrioritizedWaker(self.index, cx.waker().clone())); + self.state = State::Polled; Poll::Pending } State::Polled => { // Wake up the next one - let _ = this + let _ = self .wakers .borrow_mut() .pop() .map(|PrioritizedWaker(_, waker)| waker.wake()); - if this.completed_count.get() == *this.index { - *this.state = State::Done; - this.completed_count.set(this.completed_count.get() + 1); + if self.completed_count.get() == self.index { + self.state = State::Done; + self.completed_count.set(self.completed_count.get() + 1); Poll::Ready(Some(())) } else { // We're not done yet, so schedule another wakeup - this.wakers + self.wakers .borrow_mut() - .push(PrioritizedWaker(*this.index, cx.waker().clone())); + .push(PrioritizedWaker(self.index, cx.waker().clone())); Poll::Pending } }