diff --git a/Cargo.toml b/Cargo.toml index e1a9ced..60a382a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,6 +50,7 @@ criterion = { version = "0.3", features = [ ] } futures = "0.3.25" futures-time = "3.0.0" +itertools = "0.12.1" lending-stream = "1.0.0" rand = "0.8.5" tokio = { version = "1.32.0", features = ["macros", "time", "rt-multi-thread"] } diff --git a/benches/bench.rs b/benches/bench.rs index da2f970..bf335e8 100644 --- a/benches/bench.rs +++ b/benches/bench.rs @@ -68,22 +68,30 @@ mod stream_group { } } mod future_group { + use std::fmt::{Debug, Display}; + use std::time::{Duration, Instant}; + use criterion::async_executor::FuturesExecutor; use criterion::{black_box, criterion_group, BatchSize, BenchmarkId, Criterion}; + use futures::channel::oneshot; + use futures::never::Never; use futures::stream::FuturesUnordered; use futures_concurrency::future::FutureGroup; + use futures_lite::future::yield_now; use futures_lite::prelude::*; + use itertools::Itertools; + use rand::{seq::SliceRandom, SeedableRng}; use crate::utils::{make_future_group, make_futures_unordered}; criterion_group! { name = future_group_benches; // This can be any expression that returns a `Criterion` object. config = Criterion::default(); - targets = future_group_bench + targets = future_group_throughput_bench, future_group_latency_bench } - fn future_group_bench(c: &mut Criterion) { - let mut group = c.benchmark_group("future_group"); + fn future_group_throughput_bench(c: &mut Criterion) { + let mut group = c.benchmark_group("future_group_poll_throughput"); for i in [10, 100, 1000].iter() { group.bench_with_input(BenchmarkId::new("FutureGroup", i), i, |b, i| { let setup = || make_future_group(*i); @@ -116,6 +124,115 @@ mod future_group { } group.finish(); } + + /// This benchmark measures the latency between when futures become ready + /// and when their outputs appear on the [`FutureGroup`] stream. + /// + /// To test this, we: + /// - insert N pending futures to the [`FutureGroup`]. + /// - until the [`FutureGroup`] is empty, we set some fraction of the + /// pending futures to ready, then record how long it takes for their + /// outputs to be produced from [`FutureGroup`]'s `Stream` impl. + /// - we sum the recorded durations for each of these rounds. + fn future_group_latency_bench(c: &mut Criterion) { + #[derive(Debug, Clone, Copy)] + struct Params { + init_size: usize, + pct_ready_per_round: f64, + } + + impl Display for Params { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + Debug::fmt(&self, f) + } + } + + async fn routine( + iters: u64, + Params { + init_size, + pct_ready_per_round, + }: Params, + ) -> Duration + where + G: Default + Stream + Unpin + Extend>, + { + let ready_per_round = ((init_size as f64) * pct_ready_per_round).max(1.) as usize; + + let mut total_runtime = Duration::ZERO; + + for _ in 0..iters { + // construct a set of oneshot::Receiver futures. These become + // ready once their Sender ends are dropped. + let (mut senders, mut group) = (0..init_size) + .map(|_| oneshot::channel()) + .unzip::<_, _, Vec<_>, G>(); + + // shuffle our senders so from the FutureGroup's perspective, + // futures become ready in arbitrary order. + let mut rng = rand::rngs::StdRng::seed_from_u64(42); + senders.shuffle(&mut rng); + + // poll once to set up all the wakers + assert!(futures::poll!(group.next()).is_pending()); + + while !senders.is_empty() { + let num_completing = ready_per_round.min(senders.len()); + + // drop some Senders. The corresponding Receiver futures + // will become ready in the FutureGroup + assert_eq!(senders.drain(..num_completing).count(), num_completing); + + // this isn't necessary now, but if we were using the tokio + // runtime/oneshots, coming up for air regularly prevents + // the budget system from inserting false pendings. this + // also more closely emulates what would happen in a real + // system (sender task yields to let executor poll receiver + // task). though that shouldn't make a difference. + yield_now().await; + + // measure the time it takes for all newly ready futures to + // be produced from the FutureGroup stream. + let recv_start = Instant::now(); + assert_eq!( + (&mut group).take(num_completing).count().await, + num_completing + ); + total_runtime += recv_start.elapsed(); + } + } + + total_runtime + } + + let mut group = c.benchmark_group("future_group_poll_latency"); + for params in [10, 100, 1000] + .into_iter() + .cartesian_product([0.0001, 0.2, 1.0]) + .map(|(init_size, pct_ready_per_round)| Params { + init_size, + pct_ready_per_round, + }) + { + group.bench_with_input( + BenchmarkId::new("FutureGroup", params), + ¶ms, + |b, ¶ms| { + b.to_async(FuturesExecutor) + .iter_custom(|iters| routine::>(iters, params)) + }, + ); + group.bench_with_input( + BenchmarkId::new("FuturesUnordered", params), + ¶ms, + |b, ¶ms| { + b.to_async(FuturesExecutor) + .iter_custom(|iters| routine::>(iters, params)) + }, + ); + } + group.finish(); + } } mod merge { diff --git a/src/future/future_group.rs b/src/future/future_group.rs index 70528d9..fa731e3 100644 --- a/src/future/future_group.rs +++ b/src/future/future_group.rs @@ -59,7 +59,6 @@ use crate::utils::{PollState, PollVec, WakerVec}; /// ``` #[must_use = "`FutureGroup` does nothing if not iterated over"] -#[derive(Default)] #[pin_project::pin_project] pub struct FutureGroup { #[pin] @@ -80,6 +79,12 @@ impl Debug for FutureGroup { } } +impl Default for FutureGroup { + fn default() -> Self { + Self::new() + } +} + impl FutureGroup { /// Create a new instance of `FutureGroup`. /// @@ -416,14 +421,22 @@ impl Stream for FutureGroup { } } -impl FromIterator for FutureGroup { - fn from_iter>(iter: T) -> Self { +impl Extend for FutureGroup { + fn extend>(&mut self, iter: T) { let iter = iter.into_iter(); let len = iter.size_hint().1.unwrap_or_default(); - let mut this = Self::with_capacity(len); + self.reserve(len); + for future in iter { - this.insert(future); + self.insert(future); } + } +} + +impl FromIterator for FutureGroup { + fn from_iter>(iter: T) -> Self { + let mut this = Self::new(); + this.extend(iter); this } }