From 95fb46183e445d898a5ef9394d35dfe5dec77a6f Mon Sep 17 00:00:00 2001 From: Suchir Kavi Date: Mon, 15 Apr 2024 22:01:35 -0700 Subject: [PATCH] add new benchmark for `FutureGroup` this benchmark aims to measure latency between futures becoming ready and the stream producing them. --- benches/bench.rs | 69 +++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 66 insertions(+), 3 deletions(-) diff --git a/benches/bench.rs b/benches/bench.rs index da2f970..a1167bc 100644 --- a/benches/bench.rs +++ b/benches/bench.rs @@ -68,22 +68,27 @@ mod stream_group { } } mod future_group { + use std::time::{Duration, Instant}; + use criterion::async_executor::FuturesExecutor; use criterion::{black_box, criterion_group, BatchSize, BenchmarkId, Criterion}; + use futures::channel::oneshot; + use futures::never::Never; use futures::stream::FuturesUnordered; use futures_concurrency::future::FutureGroup; use futures_lite::prelude::*; + use rand::{seq::SliceRandom, Rng, SeedableRng}; use crate::utils::{make_future_group, make_futures_unordered}; criterion_group! { name = future_group_benches; // This can be any expression that returns a `Criterion` object. config = Criterion::default(); - targets = future_group_bench + targets = future_group_throughput_bench, future_group_latency_bench } - fn future_group_bench(c: &mut Criterion) { - let mut group = c.benchmark_group("future_group"); + fn future_group_throughput_bench(c: &mut Criterion) { + let mut group = c.benchmark_group("future_group_poll_throughput"); for i in [10, 100, 1000].iter() { group.bench_with_input(BenchmarkId::new("FutureGroup", i), i, |b, i| { let setup = || make_future_group(*i); @@ -116,6 +121,64 @@ mod future_group { } group.finish(); } + + fn future_group_latency_bench(c: &mut Criterion) { + async fn routine(iters: u64, size: usize) -> Duration + where + G: Default + Stream + Unpin + Extend>, + { + // fraction of `size` which represents the maximum number of + // futures which may all become ready between polls. + const MAX_READY_RATIO_PER_POLL: f64 = 0.1; + let max_ready_num_per_poll = + ((size as f64) * MAX_READY_RATIO_PER_POLL).max(1.) as usize; + + let mut total_runtime = Duration::ZERO; + + for _ in 0..iters { + let (mut senders, mut group) = (0..size) + .map(|_| oneshot::channel()) + .unzip::<_, _, Vec<_>, G>(); + + let mut rng = rand::rngs::StdRng::seed_from_u64(42); + + // senders will complete in arbitrary order + senders.shuffle(&mut rng); + + // poll once to set up all the wakers + assert!(futures::poll!(group.next()).is_pending()); + + while !senders.is_empty() { + let completion_limit = max_ready_num_per_poll.min(senders.len()); + let num_completing = rng.gen_range(0..=completion_limit); + + assert_eq!(senders.drain(..num_completing).count(), num_completing); + + let recv_start = Instant::now(); + assert_eq!( + (&mut group).take(num_completing).count().await, + num_completing + ); + total_runtime += recv_start.elapsed(); + } + } + + total_runtime + } + + let mut group = c.benchmark_group("future_group_poll_latency"); + for i in &[10, 100, 1000] { + group.bench_with_input(BenchmarkId::new("FutureGroup", i), i, |b, &i| { + b.to_async(FuturesExecutor) + .iter_custom(|iters| routine::>(iters, i)) + }); + group.bench_with_input(BenchmarkId::new("FuturesUnordered", i), i, |b, &i| { + b.to_async(FuturesExecutor) + .iter_custom(|iters| routine::>(iters, i)) + }); + } + group.finish(); + } } mod merge {