Skip to content

Commit

Permalink
Merge pull request #117 from wishawa/pr-1-benchmark-shuffling
Browse files Browse the repository at this point in the history
shuffle vec/array/tuple futures/streams for benchmark
  • Loading branch information
yoshuawuyts authored Feb 9, 2023
2 parents 3d574ab + c777f28 commit c234bb3
Show file tree
Hide file tree
Showing 5 changed files with 271 additions and 232 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@ futures-lite = "1.12.0"
criterion = { version = "0.3", features = ["async", "async_futures", "html_reports"] }
async-std = { version = "1.12.0", features = ["attributes"] }
futures-time = "3.0.0"
rand = "0.8.5"
232 changes: 0 additions & 232 deletions benches/utils.rs

This file was deleted.

112 changes: 112 additions & 0 deletions benches/utils/countdown_futures.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
use futures_core::Future;

use std::cell::{Cell, RefCell};
use std::collections::BinaryHeap;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};

use super::{shuffle, PrioritizedWaker, State};

pub fn futures_vec(len: usize) -> Vec<CountdownFuture> {
let wakers = Rc::new(RefCell::new(BinaryHeap::new()));
let completed = Rc::new(Cell::new(0));
let mut futures: Vec<_> = (0..len)
.map(|n| CountdownFuture::new(n, len, wakers.clone(), completed.clone()))
.collect();
shuffle(&mut futures);
futures
}

pub fn futures_array<const N: usize>() -> [CountdownFuture; N] {
let wakers = Rc::new(RefCell::new(BinaryHeap::new()));
let completed = Rc::new(Cell::new(0));
let mut futures =
std::array::from_fn(|n| CountdownFuture::new(n, N, wakers.clone(), completed.clone()));
shuffle(&mut futures);
futures
}

pub fn futures_tuple() -> (
CountdownFuture,
CountdownFuture,
CountdownFuture,
CountdownFuture,
CountdownFuture,
CountdownFuture,
CountdownFuture,
CountdownFuture,
CountdownFuture,
CountdownFuture,
) {
let [f0, f1, f2, f3, f4, f5, f6, f7, f8, f9] = futures_array::<10>();
(f0, f1, f2, f3, f4, f5, f6, f7, f8, f9)
}

/// A future which will _eventually_ be ready, but needs to be polled N times before it is.
pub struct CountdownFuture {
state: State,
wakers: Rc<RefCell<BinaryHeap<PrioritizedWaker>>>,
index: usize,
max_count: usize,
completed_count: Rc<Cell<usize>>,
}

impl CountdownFuture {
pub fn new(
index: usize,
max_count: usize,
wakers: Rc<RefCell<BinaryHeap<PrioritizedWaker>>>,
completed_count: Rc<Cell<usize>>,
) -> Self {
Self {
state: State::Init,
wakers,
max_count,
index,
completed_count,
}
}
}
impl Future for CountdownFuture {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// If we are the last stream to be polled, skip strait to the Polled state.
if self.wakers.borrow().len() + 1 == self.max_count {
self.state = State::Polled;
}

match self.state {
State::Init => {
// Push our waker onto the stack so we get woken again someday.
self.wakers
.borrow_mut()
.push(PrioritizedWaker(self.index, cx.waker().clone()));
self.state = State::Polled;
Poll::Pending
}
State::Polled => {
// Wake up the next one
let _ = self
.wakers
.borrow_mut()
.pop()
.map(|PrioritizedWaker(_, waker)| waker.wake());

if self.completed_count.get() == self.index {
self.state = State::Done;
self.completed_count.set(self.completed_count.get() + 1);
Poll::Ready(())
} else {
// We're not done yet, so schedule another wakeup
self.wakers
.borrow_mut()
.push(PrioritizedWaker(self.index, cx.waker().clone()));
Poll::Pending
}
}
State::Done => Poll::Ready(()),
}
}
}
Loading

0 comments on commit c234bb3

Please sign in to comment.