From de8a750f9bbb7bfb56d7d931bda5f46ea4bff64c Mon Sep 17 00:00:00 2001 From: Suchir Kavi Date: Mon, 15 Apr 2024 21:44:25 -0700 Subject: [PATCH 1/3] add `FutureGroup::extend` reimplement `FutureGroup::from_iter` in terms of `extend` --- src/future/future_group.rs | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/future/future_group.rs b/src/future/future_group.rs index 4d000c2..97058ac 100644 --- a/src/future/future_group.rs +++ b/src/future/future_group.rs @@ -415,14 +415,22 @@ impl Stream for FutureGroup { } } -impl FromIterator for FutureGroup { - fn from_iter>(iter: T) -> Self { +impl Extend for FutureGroup { + fn extend>(&mut self, iter: T) { let iter = iter.into_iter(); let len = iter.size_hint().1.unwrap_or_default(); - let mut this = Self::with_capacity(len); + self.reserve(len); + for future in iter { - this.insert(future); + self.insert(future); } + } +} + +impl FromIterator for FutureGroup { + fn from_iter>(iter: T) -> Self { + let mut this = Self::new(); + this.extend(iter); this } } From ee748713c76af21195c72d2317dd507c348308b2 Mon Sep 17 00:00:00 2001 From: Suchir Kavi Date: Mon, 15 Apr 2024 21:46:26 -0700 Subject: [PATCH 2/3] Add manual `Default` impl for `FutureGroup` Removes restriction that `F: Default` --- src/future/future_group.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/future/future_group.rs b/src/future/future_group.rs index 97058ac..bcb4fa4 100644 --- a/src/future/future_group.rs +++ b/src/future/future_group.rs @@ -59,7 +59,6 @@ use crate::utils::{PollState, PollVec, WakerVec}; /// ``` #[must_use = "`FutureGroup` does nothing if not iterated over"] -#[derive(Default)] #[pin_project::pin_project] pub struct FutureGroup { #[pin] @@ -80,6 +79,12 @@ impl Debug for FutureGroup { } } +impl Default for FutureGroup { + fn default() -> Self { + Self::new() + } +} + impl FutureGroup { /// Create a new instance of `FutureGroup`. /// From d960f75d5aab28e97ba021df2f51c0062bfb1aab Mon Sep 17 00:00:00 2001 From: Suchir Kavi Date: Mon, 15 Apr 2024 22:01:35 -0700 Subject: [PATCH 3/3] add new benchmark for `FutureGroup` this benchmark aims to measure latency between futures becoming ready and the stream producing them. --- Cargo.toml | 1 + benches/bench.rs | 123 +++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 121 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a141570..318dce2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,6 +49,7 @@ criterion = { version = "0.3", features = [ ] } futures = "0.3.25" futures-time = "3.0.0" +itertools = "0.12.1" lending-stream = "1.0.0" rand = "0.8.5" tokio = { version = "1.32.0", features = ["macros", "time", "rt-multi-thread"] } diff --git a/benches/bench.rs b/benches/bench.rs index da2f970..bf335e8 100644 --- a/benches/bench.rs +++ b/benches/bench.rs @@ -68,22 +68,30 @@ mod stream_group { } } mod future_group { + use std::fmt::{Debug, Display}; + use std::time::{Duration, Instant}; + use criterion::async_executor::FuturesExecutor; use criterion::{black_box, criterion_group, BatchSize, BenchmarkId, Criterion}; + use futures::channel::oneshot; + use futures::never::Never; use futures::stream::FuturesUnordered; use futures_concurrency::future::FutureGroup; + use futures_lite::future::yield_now; use futures_lite::prelude::*; + use itertools::Itertools; + use rand::{seq::SliceRandom, SeedableRng}; use crate::utils::{make_future_group, make_futures_unordered}; criterion_group! { name = future_group_benches; // This can be any expression that returns a `Criterion` object. config = Criterion::default(); - targets = future_group_bench + targets = future_group_throughput_bench, future_group_latency_bench } - fn future_group_bench(c: &mut Criterion) { - let mut group = c.benchmark_group("future_group"); + fn future_group_throughput_bench(c: &mut Criterion) { + let mut group = c.benchmark_group("future_group_poll_throughput"); for i in [10, 100, 1000].iter() { group.bench_with_input(BenchmarkId::new("FutureGroup", i), i, |b, i| { let setup = || make_future_group(*i); @@ -116,6 +124,115 @@ mod future_group { } group.finish(); } + + /// This benchmark measures the latency between when futures become ready + /// and when their outputs appear on the [`FutureGroup`] stream. + /// + /// To test this, we: + /// - insert N pending futures to the [`FutureGroup`]. + /// - until the [`FutureGroup`] is empty, we set some fraction of the + /// pending futures to ready, then record how long it takes for their + /// outputs to be produced from [`FutureGroup`]'s `Stream` impl. + /// - we sum the recorded durations for each of these rounds. + fn future_group_latency_bench(c: &mut Criterion) { + #[derive(Debug, Clone, Copy)] + struct Params { + init_size: usize, + pct_ready_per_round: f64, + } + + impl Display for Params { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + Debug::fmt(&self, f) + } + } + + async fn routine( + iters: u64, + Params { + init_size, + pct_ready_per_round, + }: Params, + ) -> Duration + where + G: Default + Stream + Unpin + Extend>, + { + let ready_per_round = ((init_size as f64) * pct_ready_per_round).max(1.) as usize; + + let mut total_runtime = Duration::ZERO; + + for _ in 0..iters { + // construct a set of oneshot::Receiver futures. These become + // ready once their Sender ends are dropped. + let (mut senders, mut group) = (0..init_size) + .map(|_| oneshot::channel()) + .unzip::<_, _, Vec<_>, G>(); + + // shuffle our senders so from the FutureGroup's perspective, + // futures become ready in arbitrary order. + let mut rng = rand::rngs::StdRng::seed_from_u64(42); + senders.shuffle(&mut rng); + + // poll once to set up all the wakers + assert!(futures::poll!(group.next()).is_pending()); + + while !senders.is_empty() { + let num_completing = ready_per_round.min(senders.len()); + + // drop some Senders. The corresponding Receiver futures + // will become ready in the FutureGroup + assert_eq!(senders.drain(..num_completing).count(), num_completing); + + // this isn't necessary now, but if we were using the tokio + // runtime/oneshots, coming up for air regularly prevents + // the budget system from inserting false pendings. this + // also more closely emulates what would happen in a real + // system (sender task yields to let executor poll receiver + // task). though that shouldn't make a difference. + yield_now().await; + + // measure the time it takes for all newly ready futures to + // be produced from the FutureGroup stream. + let recv_start = Instant::now(); + assert_eq!( + (&mut group).take(num_completing).count().await, + num_completing + ); + total_runtime += recv_start.elapsed(); + } + } + + total_runtime + } + + let mut group = c.benchmark_group("future_group_poll_latency"); + for params in [10, 100, 1000] + .into_iter() + .cartesian_product([0.0001, 0.2, 1.0]) + .map(|(init_size, pct_ready_per_round)| Params { + init_size, + pct_ready_per_round, + }) + { + group.bench_with_input( + BenchmarkId::new("FutureGroup", params), + ¶ms, + |b, ¶ms| { + b.to_async(FuturesExecutor) + .iter_custom(|iters| routine::>(iters, params)) + }, + ); + group.bench_with_input( + BenchmarkId::new("FuturesUnordered", params), + ¶ms, + |b, ¶ms| { + b.to_async(FuturesExecutor) + .iter_custom(|iters| routine::>(iters, params)) + }, + ); + } + group.finish(); + } } mod merge {