diff --git a/Cargo.toml b/Cargo.toml index 8788f27..f138dcc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,6 +48,7 @@ criterion = { version = "0.3", features = [ ] } futures = "0.3.25" futures-time = "3.0.0" +itertools = "0.12.1" lending-stream = "1.0.0" rand = "0.8.5" tokio = { version = "1.32.0", features = ["macros", "time", "rt-multi-thread"] } diff --git a/benches/bench.rs b/benches/bench.rs index a1167bc..45ce7d0 100644 --- a/benches/bench.rs +++ b/benches/bench.rs @@ -68,6 +68,7 @@ mod stream_group { } } mod future_group { + use std::fmt::{Debug, Display}; use std::time::{Duration, Instant}; use criterion::async_executor::FuturesExecutor; @@ -76,8 +77,10 @@ mod future_group { use futures::never::Never; use futures::stream::FuturesUnordered; use futures_concurrency::future::FutureGroup; + use futures_lite::future::yield_now; use futures_lite::prelude::*; - use rand::{seq::SliceRandom, Rng, SeedableRng}; + use itertools::Itertools; + use rand::{seq::SliceRandom, SeedableRng}; use crate::utils::{make_future_group, make_futures_unordered}; criterion_group! { @@ -122,38 +125,75 @@ mod future_group { group.finish(); } + /// This benchmark aims to measure how long it takes for the results of + /// futures that have become ready to be produced by the [`FutureGroup`] + /// stream. + /// + /// To test this, we: + /// - insert N pending futures to the [`FutureGroup`]. + /// - until the [`FutureGroup`] is empty, we set some fraction of the + /// pending futures to ready, then record how long it takes for all newly + /// ready futures to be produced from [`FutureGroup`]'s `Stream` impl. + /// - we sum the recorded durations for each of these rounds. fn future_group_latency_bench(c: &mut Criterion) { - async fn routine(iters: u64, size: usize) -> Duration + #[derive(Debug, Clone, Copy)] + struct Params { + init_size: usize, + pct_ready_per_round: f64, + } + + impl Display for Params { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + Debug::fmt(&self, f) + } + } + + async fn routine( + iters: u64, + Params { + init_size, + pct_ready_per_round, + }: Params, + ) -> Duration where G: Default + Stream + Unpin + Extend>, { - // fraction of `size` which represents the maximum number of - // futures which may all become ready between polls. - const MAX_READY_RATIO_PER_POLL: f64 = 0.1; - let max_ready_num_per_poll = - ((size as f64) * MAX_READY_RATIO_PER_POLL).max(1.) as usize; + let ready_per_round = ((init_size as f64) * pct_ready_per_round).max(1.) as usize; let mut total_runtime = Duration::ZERO; for _ in 0..iters { - let (mut senders, mut group) = (0..size) + // construct a set of oneshot::Receiver futures. These become + // ready once their Sender ends are dropped. + let (mut senders, mut group) = (0..init_size) .map(|_| oneshot::channel()) .unzip::<_, _, Vec<_>, G>(); + // shuffle our senders so from the FutureGroup's perspective, + // futures become ready in arbitrary order. let mut rng = rand::rngs::StdRng::seed_from_u64(42); - - // senders will complete in arbitrary order senders.shuffle(&mut rng); // poll once to set up all the wakers assert!(futures::poll!(group.next()).is_pending()); while !senders.is_empty() { - let completion_limit = max_ready_num_per_poll.min(senders.len()); - let num_completing = rng.gen_range(0..=completion_limit); + let num_completing = ready_per_round.min(senders.len()); + // drop some Senders. The corresponding Receiver futures + // will become ready in the FutureGroup assert_eq!(senders.drain(..num_completing).count(), num_completing); + // this isn't necessary now, but if we were using the tokio + // runtime/oneshots, coming up for air regularly prevents + // the budget system from inserting false pendings. this + // also more closely emulates what would happen in a real + // system (sender task yields to let executor poll receiver + // task). though that shouldn't make a difference. + yield_now().await; + + // measure the time it takes for all newly ready futures to + // be produced from the FutureGroup stream. let recv_start = Instant::now(); assert_eq!( (&mut group).take(num_completing).count().await, @@ -167,15 +207,30 @@ mod future_group { } let mut group = c.benchmark_group("future_group_poll_latency"); - for i in &[10, 100, 1000] { - group.bench_with_input(BenchmarkId::new("FutureGroup", i), i, |b, &i| { - b.to_async(FuturesExecutor) - .iter_custom(|iters| routine::>(iters, i)) - }); - group.bench_with_input(BenchmarkId::new("FuturesUnordered", i), i, |b, &i| { - b.to_async(FuturesExecutor) - .iter_custom(|iters| routine::>(iters, i)) - }); + for params in [10, 100, 1000] + .into_iter() + .cartesian_product([0.0001, 0.2, 1.0]) + .map(|(init_size, pct_ready_per_round)| Params { + init_size, + pct_ready_per_round, + }) + { + group.bench_with_input( + BenchmarkId::new("FutureGroup", params), + ¶ms, + |b, ¶ms| { + b.to_async(FuturesExecutor) + .iter_custom(|iters| routine::>(iters, params)) + }, + ); + group.bench_with_input( + BenchmarkId::new("FuturesUnordered", params), + ¶ms, + |b, ¶ms| { + b.to_async(FuturesExecutor) + .iter_custom(|iters| routine::>(iters, params)) + }, + ); } group.finish(); }