Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Init bench #46

Merged
merged 7 commits into from
Nov 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,14 @@ authors = [
"Yoshua Wuyts <[email protected]>"
]

[[bench]]
name = "bench"
harness = false

[dependencies]
futures-core = "0.3"
pin-project = "1.0.8"

[dev-dependencies]
futures-lite = "1.12.0"
criterion = "0.3"
71 changes: 71 additions & 0 deletions benches/bench.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use futures_concurrency::prelude::*;
use futures_core::Stream;
use futures_lite::future::block_on;
use futures_lite::prelude::*;
use pin_project::pin_project;

use std::cell::RefCell;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};

fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("merge 10", |b| b.iter(|| merge_test(black_box(10))));
c.bench_function("merge 100", |b| b.iter(|| merge_test(black_box(100))));
c.bench_function("merge 1000", |b| b.iter(|| merge_test(black_box(1000))));
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);

pub(crate) fn merge_test(max: usize) {
block_on(async {
let counter = Rc::new(RefCell::new(max));
let futures: Vec<_> = (1..=max)
.rev()
.map(|n| Countdown::new(n, counter.clone()))
.collect();
let mut s = futures.merge();

let mut counter = 0;
while let Some(_) = s.next().await {
counter += 1;
}
assert_eq!(counter, max);
})
}

/// A future which will _eventually_ be ready, but needs to be polled N times before it is.
#[pin_project]
struct Countdown {
success_count: Rc<RefCell<usize>>,
target_count: usize,
done: bool,
}

impl Countdown {
fn new(count: usize, success_count: Rc<RefCell<usize>>) -> Self {
Self {
success_count,
target_count: count,
done: false,
}
}
}
impl Stream for Countdown {
type Item = ();

fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
if *this.done {
Poll::Ready(None)
} else if *this.success_count.borrow() == *this.target_count {
*this.success_count.borrow_mut() -= 1;
*this.done = true;
Poll::Ready(Some(()))
} else {
Poll::Pending
}
}
}
25 changes: 25 additions & 0 deletions src/stream/merge/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,28 @@ where
Merge::new(self.map(|i| i.into_stream()))
}
}

#[cfg(test)]
mod tests {
use super::*;
use futures_lite::future::block_on;
use futures_lite::prelude::*;
use futures_lite::stream;

#[test]
fn merge_tuple_4() {
block_on(async {
let a = stream::once(1);
let b = stream::once(2);
let c = stream::once(3);
let d = stream::once(4);
let mut s = [a, b, c, d].merge();

let mut counter = 0;
while let Some(n) = s.next().await {
counter += n;
}
assert_eq!(counter, 10);
})
}
}
42 changes: 31 additions & 11 deletions src/stream/merge/vec.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::Merge as MergeTrait;
use crate::stream::IntoStream;
use crate::utils::{self, Fuse};
use crate::utils::{self, Fuse, RandomGenerator};

use core::fmt;
use futures_core::Stream;
Expand All @@ -21,6 +21,7 @@ where
{
#[pin]
streams: Vec<Fuse<S>>,
rng: RandomGenerator,
}

impl<S> Merge<S>
Expand All @@ -30,6 +31,7 @@ where
pub(crate) fn new(streams: Vec<S>) -> Self {
Self {
streams: streams.into_iter().map(Fuse::new).collect(),
rng: RandomGenerator::new(),
}
}
}
Expand All @@ -52,20 +54,13 @@ where
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();

// Randomize the indexes into our streams array. This ensures that when
// multiple streams are ready at the same time, we don't accidentally
// exhaust one stream before another.
// Randomize the indexes into our streams array. This ensures that when
// multiple streams are ready at the same time, we don't accidentally
// exhaust one stream before another.
let mut indexes: Vec<_> = (0..this.streams.len()).into_iter().collect();
indexes.sort_by_cached_key(|_| utils::random(1000));

// Iterate over our streams one-by-one. If a stream yields a value,
// we exit early. By default we'll return `Poll::Ready(None)`, but
// this changes if we encounter a `Poll::Pending`.
let random = this.rng.random(this.streams.len() as u32) as usize;
let mut res = Poll::Ready(None);
for index in indexes {
for index in 0..this.streams.len() {
let index = (random + index).wrapping_rem(this.streams.len());
let stream = utils::get_pin_mut_from_vec(this.streams.as_mut(), index).unwrap();
match stream.poll_next(cx) {
Poll::Ready(Some(item)) => return Poll::Ready(Some(item)),
Expand All @@ -88,3 +83,28 @@ where
Merge::new(self.into_iter().map(|i| i.into_stream()).collect())
}
}

#[cfg(test)]
mod tests {
use super::*;
use futures_lite::future::block_on;
use futures_lite::prelude::*;
use futures_lite::stream;

#[test]
fn merge_tuple_4() {
block_on(async {
let a = stream::once(1);
let b = stream::once(2);
let c = stream::once(3);
let d = stream::once(4);
let mut s = vec![a, b, c, d].merge();

let mut counter = 0;
while let Some(n) = s.next().await {
counter += n;
}
assert_eq!(counter, 10);
})
}
}
2 changes: 1 addition & 1 deletion src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub(crate) use fuse::Fuse;
pub(crate) use maybe_done::MaybeDone;
pub(crate) use pin::{get_pin_mut, get_pin_mut_from_vec, iter_pin_mut, iter_pin_mut_vec};
pub(crate) use poll_state::PollState;
pub(crate) use rng::random;
pub(crate) use rng::{random, RandomGenerator};

#[cfg(test)]
mod dummy_waker;
Expand Down
29 changes: 29 additions & 0 deletions src/utils/rng.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,32 @@ pub(crate) fn random(n: u32) -> u32 {
((u64::from(x.0)).wrapping_mul(u64::from(n)) >> 32) as u32
})
}

/// Generates a random number in `0..n`.
pub(crate) struct RandomGenerator(Wrapping<u32>);

impl RandomGenerator {
pub(crate) fn new() -> Self {
// Take the address of a local value as seed.
let mut x = 0i32;
let r = &mut x;
let addr = r as *mut i32 as usize;
Self(Wrapping(addr as u32))
}
pub(crate) fn random(&mut self, n: u32) -> u32 {
// This is the 32-bit variant of Xorshift.
//
// Source: https://en.wikipedia.org/wiki/Xorshift
let mut x = self.0;
x ^= x << 13;
x ^= x >> 17;
x ^= x << 5;
self.0 = x;

// This is a fast alternative to `x % n`.
//
// Author: Daniel Lemire
// Source: https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
((u64::from(x.0)).wrapping_mul(u64::from(n)) >> 32) as u32
}
}
69 changes: 69 additions & 0 deletions tests/test.rs
Original file line number Diff line number Diff line change
@@ -1 +1,70 @@
// use futures_concurrency::prelude::*;
// use futures_core::Stream;
// use futures_lite::future::block_on;
// use futures_lite::prelude::*;
// use pin_project::pin_project;

// use std::cell::RefCell;
// use std::pin::Pin;
// use std::rc::Rc;
// use std::task::{Context, Poll};

// pub(crate) fn merge_test(max: usize) {
// block_on(async {
// let counter = Rc::new(RefCell::new(max));
// let futures: Vec<_> = (1..=max)
// .rev()
// .map(|n| Countdown::new(n, counter.clone()))
// .collect();
// let mut s = futures.merge();

// let mut counter = 0;
// while let Some(_) = s.next().await {
// counter += 1;
// }
// assert_eq!(counter, max);
// })
// }

// /// A future which will _eventually_ be ready, but needs to be polled N times before it is.
// #[pin_project]
// struct Countdown {
// success_count: Rc<RefCell<usize>>,
// target_count: usize,
// done: bool,
// }

// impl Countdown {
// fn new(count: usize, success_count: Rc<RefCell<usize>>) -> Self {
// Self {
// success_count,
// target_count: count,
// done: false,
// }
// }
// }
// impl Stream for Countdown {
// type Item = ();

// fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// let this = self.project();
// if *this.done {
// Poll::Ready(None)
// } else if *this.success_count.borrow() == *this.target_count {
// *this.success_count.borrow_mut() -= 1;
// *this.done = true;
// Poll::Ready(Some(()))
// } else {
// Poll::Pending
// }
// }
// }

// #[cfg(test)]
// mod test {
// use super::*;
// #[test]
// fn smoke() {
// merge_test(4);
// }
// }