From f17d53bda3ca5a21081ab7faeccc2d454f83bfae Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts <2467194+yoshuawuyts@users.noreply.github.com> Date: Wed, 9 Nov 2022 16:25:24 +0100 Subject: [PATCH 1/7] init bench --- Cargo.toml | 5 +++ benches/bench.rs | 85 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 90 insertions(+) create mode 100644 benches/bench.rs diff --git a/Cargo.toml b/Cargo.toml index e25b402..7e2e12b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,9 +13,14 @@ authors = [ "Yoshua Wuyts " ] +[[bench]] +name = "bench" +harness = false + [dependencies] futures-core = "0.3" pin-project = "1.0.8" [dev-dependencies] futures-lite = "1.12.0" +criterion = "0.3" diff --git a/benches/bench.rs b/benches/bench.rs new file mode 100644 index 0000000..e74c256 --- /dev/null +++ b/benches/bench.rs @@ -0,0 +1,85 @@ +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use futures_concurrency::prelude::*; +use futures_core::Stream; +use futures_lite::future::block_on; +use futures_lite::prelude::*; +use pin_project::pin_project; + +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +fn criterion_benchmark(c: &mut Criterion) { + c.bench_function("merge 10", |b| b.iter(|| merge_test(black_box(10)))); + // c.bench_function("merge 100", |b| b.iter(|| merge_futures(black_box(100)))); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); + +pub(crate) fn merge_test(max: usize) { + block_on(async { + let futures: Vec<_> = (0..max).rev().map(Countdown::new).collect(); + let mut s = futures.merge(); + + let mut counter = 0; + while let Some(_) = s.next().await { + counter += 1; + } + assert_eq!(counter, max); + }) +} + +/// A future which will _eventually_ be ready, but needs to be polled N times before it is. +#[pin_project] +struct Countdown { + count: usize, + done: bool, +} + +impl Countdown { + fn new(count: usize) -> Self { + Self { count, done: false } + } +} +impl Stream for Countdown { + type Item = (); + + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + if *this.done { + Poll::Ready(None) + } else if *this.count == 0 { + *this.done = true; + Poll::Ready(Some(())) + } else { + *this.count -= 1; + Poll::Pending + } + } +} + +impl Future for Countdown { + type Output = (); + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + let this = self.project(); + if *this.done { + panic!("futures should not be polled after completing"); + } else if *this.count == 0 { + *this.done = true; + Poll::Ready(()) + } else { + *this.count -= 1; + Poll::Pending + } + } +} + +#[cfg(test)] +mod test { + #[test] + fn smoke() { + merge_test(3); + } +} From 38cf64338550659e27e330460497e5a541c065bd Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts <2467194+yoshuawuyts@users.noreply.github.com> Date: Wed, 9 Nov 2022 16:26:09 +0100 Subject: [PATCH 2/7] remove fairness from `impl Merge for Vec` the method applied here was so slow that merging > 3 streams would grind the system to a halt --- src/stream/merge/vec.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/stream/merge/vec.rs b/src/stream/merge/vec.rs index 3b71bb3..729028f 100644 --- a/src/stream/merge/vec.rs +++ b/src/stream/merge/vec.rs @@ -58,8 +58,8 @@ where // Randomize the indexes into our streams array. This ensures that when // multiple streams are ready at the same time, we don't accidentally // exhaust one stream before another. - let mut indexes: Vec<_> = (0..this.streams.len()).into_iter().collect(); - indexes.sort_by_cached_key(|_| utils::random(1000)); + let indexes: Vec<_> = (0..this.streams.len()).collect(); + // indexes.sort_by_cached_key(|_| utils::random(1000)); // Iterate over our streams one-by-one. If a stream yields a value, // we exit early. By default we'll return `Poll::Ready(None)`, but From 0631d41db4060369ab259519f0695850bec083e7 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts <2467194+yoshuawuyts@users.noreply.github.com> Date: Wed, 9 Nov 2022 16:27:32 +0100 Subject: [PATCH 3/7] add `Merge` tests for vec and array --- src/stream/merge/array.rs | 25 +++++++++++++++++++++++++ src/stream/merge/vec.rs | 25 +++++++++++++++++++++++++ 2 files changed, 50 insertions(+) diff --git a/src/stream/merge/array.rs b/src/stream/merge/array.rs index e1db43f..563d156 100644 --- a/src/stream/merge/array.rs +++ b/src/stream/merge/array.rs @@ -95,3 +95,28 @@ where Merge::new(self.map(|i| i.into_stream())) } } + +#[cfg(test)] +mod tests { + use super::*; + use futures_lite::future::block_on; + use futures_lite::prelude::*; + use futures_lite::stream; + + #[test] + fn merge_tuple_4() { + block_on(async { + let a = stream::once(1); + let b = stream::once(2); + let c = stream::once(3); + let d = stream::once(4); + let mut s = [a, b, c, d].merge(); + + let mut counter = 0; + while let Some(n) = s.next().await { + counter += n; + } + assert_eq!(counter, 10); + }) + } +} diff --git a/src/stream/merge/vec.rs b/src/stream/merge/vec.rs index 729028f..5c40e47 100644 --- a/src/stream/merge/vec.rs +++ b/src/stream/merge/vec.rs @@ -88,3 +88,28 @@ where Merge::new(self.into_iter().map(|i| i.into_stream()).collect()) } } + +#[cfg(test)] +mod tests { + use super::*; + use futures_lite::future::block_on; + use futures_lite::prelude::*; + use futures_lite::stream; + + #[test] + fn merge_tuple_4() { + block_on(async { + let a = stream::once(1); + let b = stream::once(2); + let c = stream::once(3); + let d = stream::once(4); + let mut s = vec![a, b, c, d].merge(); + + let mut counter = 0; + while let Some(n) = s.next().await { + counter += n; + } + assert_eq!(counter, 10); + }) + } +} From 4093be95057d60e09b3c82a8f7252a00c4c5fab3 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts <2467194+yoshuawuyts@users.noreply.github.com> Date: Wed, 9 Nov 2022 16:55:16 +0100 Subject: [PATCH 4/7] implement structured RNG --- src/utils/mod.rs | 2 +- src/utils/rng.rs | 29 +++++++++++++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 7e5d511..78637d6 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -15,7 +15,7 @@ pub(crate) use fuse::Fuse; pub(crate) use maybe_done::MaybeDone; pub(crate) use pin::{get_pin_mut, get_pin_mut_from_vec, iter_pin_mut, iter_pin_mut_vec}; pub(crate) use poll_state::PollState; -pub(crate) use rng::random; +pub(crate) use rng::{random, RandomGenerator}; #[cfg(test)] mod dummy_waker; diff --git a/src/utils/rng.rs b/src/utils/rng.rs index 7c5402a..610917f 100644 --- a/src/utils/rng.rs +++ b/src/utils/rng.rs @@ -31,3 +31,32 @@ pub(crate) fn random(n: u32) -> u32 { ((u64::from(x.0)).wrapping_mul(u64::from(n)) >> 32) as u32 }) } + +/// Generates a random number in `0..n`. +pub(crate) struct RandomGenerator(Wrapping); + +impl RandomGenerator { + pub(crate) fn new(n: u32) -> Self { + // Take the address of a local value as seed. + let mut x = 0i32; + let r = &mut x; + let addr = r as *mut i32 as usize; + Self(Wrapping(addr as u32)) + } + pub(crate) fn random(&mut self, n: u32) -> u32 { + // This is the 32-bit variant of Xorshift. + // + // Source: https://en.wikipedia.org/wiki/Xorshift + let mut x = self.0; + x ^= x << 13; + x ^= x >> 17; + x ^= x << 5; + self.0 = x; + + // This is a fast alternative to `x % n`. + // + // Author: Daniel Lemire + // Source: https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/ + ((u64::from(x.0)).wrapping_mul(u64::from(n)) >> 32) as u32 + } +} From b419787f1b94146e51ada26207166834af07d589 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts <2467194+yoshuawuyts@users.noreply.github.com> Date: Wed, 9 Nov 2022 16:55:32 +0100 Subject: [PATCH 5/7] remove unnecessary indexes --- src/stream/merge/vec.rs | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/src/stream/merge/vec.rs b/src/stream/merge/vec.rs index 5c40e47..679fd28 100644 --- a/src/stream/merge/vec.rs +++ b/src/stream/merge/vec.rs @@ -52,20 +52,11 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); - // Randomize the indexes into our streams array. This ensures that when - // multiple streams are ready at the same time, we don't accidentally - // exhaust one stream before another. - // Randomize the indexes into our streams array. This ensures that when - // multiple streams are ready at the same time, we don't accidentally - // exhaust one stream before another. - let indexes: Vec<_> = (0..this.streams.len()).collect(); - // indexes.sort_by_cached_key(|_| utils::random(1000)); - // Iterate over our streams one-by-one. If a stream yields a value, // we exit early. By default we'll return `Poll::Ready(None)`, but // this changes if we encounter a `Poll::Pending`. let mut res = Poll::Ready(None); - for index in indexes { + for index in 0..this.streams.len() { let stream = utils::get_pin_mut_from_vec(this.streams.as_mut(), index).unwrap(); match stream.poll_next(cx) { Poll::Ready(Some(item)) => return Poll::Ready(Some(item)), From d8b3ba2a3d0babee0aeb8e8a47f7fb15d52b44b6 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts <2467194+yoshuawuyts@users.noreply.github.com> Date: Wed, 9 Nov 2022 16:55:45 +0100 Subject: [PATCH 6/7] add `merge 100`, `merge 1000` benches --- benches/bench.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/benches/bench.rs b/benches/bench.rs index e74c256..5b36f20 100644 --- a/benches/bench.rs +++ b/benches/bench.rs @@ -11,7 +11,8 @@ use std::task::{Context, Poll}; fn criterion_benchmark(c: &mut Criterion) { c.bench_function("merge 10", |b| b.iter(|| merge_test(black_box(10)))); - // c.bench_function("merge 100", |b| b.iter(|| merge_futures(black_box(100)))); + c.bench_function("merge 100", |b| b.iter(|| merge_test(black_box(100)))); + c.bench_function("merge 1000", |b| b.iter(|| merge_test(black_box(1000)))); } criterion_group!(benches, criterion_benchmark); From 16e32f79f58ae976bbc1423137fac3b19eb36c70 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts <2467194+yoshuawuyts@users.noreply.github.com> Date: Wed, 9 Nov 2022 18:14:55 +0100 Subject: [PATCH 7/7] fix benchmark strat and restore fairness --- benches/bench.rs | 49 ++++++++++------------------- src/stream/merge/vec.rs | 6 +++- src/utils/rng.rs | 2 +- tests/test.rs | 69 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 92 insertions(+), 34 deletions(-) diff --git a/benches/bench.rs b/benches/bench.rs index 5b36f20..b0d8b92 100644 --- a/benches/bench.rs +++ b/benches/bench.rs @@ -5,8 +5,9 @@ use futures_lite::future::block_on; use futures_lite::prelude::*; use pin_project::pin_project; -use std::future::Future; +use std::cell::RefCell; use std::pin::Pin; +use std::rc::Rc; use std::task::{Context, Poll}; fn criterion_benchmark(c: &mut Criterion) { @@ -20,7 +21,11 @@ criterion_main!(benches); pub(crate) fn merge_test(max: usize) { block_on(async { - let futures: Vec<_> = (0..max).rev().map(Countdown::new).collect(); + let counter = Rc::new(RefCell::new(max)); + let futures: Vec<_> = (1..=max) + .rev() + .map(|n| Countdown::new(n, counter.clone())) + .collect(); let mut s = futures.merge(); let mut counter = 0; @@ -34,13 +39,18 @@ pub(crate) fn merge_test(max: usize) { /// A future which will _eventually_ be ready, but needs to be polled N times before it is. #[pin_project] struct Countdown { - count: usize, + success_count: Rc>, + target_count: usize, done: bool, } impl Countdown { - fn new(count: usize) -> Self { - Self { count, done: false } + fn new(count: usize, success_count: Rc>) -> Self { + Self { + success_count, + target_count: count, + done: false, + } } } impl Stream for Countdown { @@ -50,37 +60,12 @@ impl Stream for Countdown { let this = self.project(); if *this.done { Poll::Ready(None) - } else if *this.count == 0 { + } else if *this.success_count.borrow() == *this.target_count { + *this.success_count.borrow_mut() -= 1; *this.done = true; Poll::Ready(Some(())) } else { - *this.count -= 1; - Poll::Pending - } - } -} - -impl Future for Countdown { - type Output = (); - - fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { - let this = self.project(); - if *this.done { - panic!("futures should not be polled after completing"); - } else if *this.count == 0 { - *this.done = true; - Poll::Ready(()) - } else { - *this.count -= 1; Poll::Pending } } } - -#[cfg(test)] -mod test { - #[test] - fn smoke() { - merge_test(3); - } -} diff --git a/src/stream/merge/vec.rs b/src/stream/merge/vec.rs index 679fd28..2283038 100644 --- a/src/stream/merge/vec.rs +++ b/src/stream/merge/vec.rs @@ -1,6 +1,6 @@ use super::Merge as MergeTrait; use crate::stream::IntoStream; -use crate::utils::{self, Fuse}; +use crate::utils::{self, Fuse, RandomGenerator}; use core::fmt; use futures_core::Stream; @@ -21,6 +21,7 @@ where { #[pin] streams: Vec>, + rng: RandomGenerator, } impl Merge @@ -30,6 +31,7 @@ where pub(crate) fn new(streams: Vec) -> Self { Self { streams: streams.into_iter().map(Fuse::new).collect(), + rng: RandomGenerator::new(), } } } @@ -55,8 +57,10 @@ where // Iterate over our streams one-by-one. If a stream yields a value, // we exit early. By default we'll return `Poll::Ready(None)`, but // this changes if we encounter a `Poll::Pending`. + let random = this.rng.random(this.streams.len() as u32) as usize; let mut res = Poll::Ready(None); for index in 0..this.streams.len() { + let index = (random + index).wrapping_rem(this.streams.len()); let stream = utils::get_pin_mut_from_vec(this.streams.as_mut(), index).unwrap(); match stream.poll_next(cx) { Poll::Ready(Some(item)) => return Poll::Ready(Some(item)), diff --git a/src/utils/rng.rs b/src/utils/rng.rs index 610917f..1d6abfc 100644 --- a/src/utils/rng.rs +++ b/src/utils/rng.rs @@ -36,7 +36,7 @@ pub(crate) fn random(n: u32) -> u32 { pub(crate) struct RandomGenerator(Wrapping); impl RandomGenerator { - pub(crate) fn new(n: u32) -> Self { + pub(crate) fn new() -> Self { // Take the address of a local value as seed. let mut x = 0i32; let r = &mut x; diff --git a/tests/test.rs b/tests/test.rs index 8b13789..968a9eb 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -1 +1,70 @@ +// use futures_concurrency::prelude::*; +// use futures_core::Stream; +// use futures_lite::future::block_on; +// use futures_lite::prelude::*; +// use pin_project::pin_project; +// use std::cell::RefCell; +// use std::pin::Pin; +// use std::rc::Rc; +// use std::task::{Context, Poll}; + +// pub(crate) fn merge_test(max: usize) { +// block_on(async { +// let counter = Rc::new(RefCell::new(max)); +// let futures: Vec<_> = (1..=max) +// .rev() +// .map(|n| Countdown::new(n, counter.clone())) +// .collect(); +// let mut s = futures.merge(); + +// let mut counter = 0; +// while let Some(_) = s.next().await { +// counter += 1; +// } +// assert_eq!(counter, max); +// }) +// } + +// /// A future which will _eventually_ be ready, but needs to be polled N times before it is. +// #[pin_project] +// struct Countdown { +// success_count: Rc>, +// target_count: usize, +// done: bool, +// } + +// impl Countdown { +// fn new(count: usize, success_count: Rc>) -> Self { +// Self { +// success_count, +// target_count: count, +// done: false, +// } +// } +// } +// impl Stream for Countdown { +// type Item = (); + +// fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { +// let this = self.project(); +// if *this.done { +// Poll::Ready(None) +// } else if *this.success_count.borrow() == *this.target_count { +// *this.success_count.borrow_mut() -= 1; +// *this.done = true; +// Poll::Ready(Some(())) +// } else { +// Poll::Pending +// } +// } +// } + +// #[cfg(test)] +// mod test { +// use super::*; +// #[test] +// fn smoke() { +// merge_test(4); +// } +// }