Skip to content

Commit

Permalink
add new benchmark for FutureGroup
Browse files Browse the repository at this point in the history
this benchmark aims to measure latency between futures
becoming ready and the stream producing them.
  • Loading branch information
soooch committed Apr 23, 2024
1 parent ee74871 commit bf34520
Showing 1 changed file with 66 additions and 3 deletions.
69 changes: 66 additions & 3 deletions benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,22 +68,27 @@ mod stream_group {
}
}
mod future_group {
use std::time::{Duration, Instant};

use criterion::async_executor::FuturesExecutor;
use criterion::{black_box, criterion_group, BatchSize, BenchmarkId, Criterion};
use futures::channel::oneshot;
use futures::never::Never;
use futures::stream::FuturesUnordered;
use futures_concurrency::future::FutureGroup;
use futures_lite::prelude::*;
use rand::{seq::SliceRandom, Rng, SeedableRng};

use crate::utils::{make_future_group, make_futures_unordered};
criterion_group! {
name = future_group_benches;
// This can be any expression that returns a `Criterion` object.
config = Criterion::default();
targets = future_group_bench
targets = future_group_throughput_bench, future_group_latency_bench
}

fn future_group_bench(c: &mut Criterion) {
let mut group = c.benchmark_group("future_group");
fn future_group_throughput_bench(c: &mut Criterion) {
let mut group = c.benchmark_group("future_group_poll_throughput");
for i in [10, 100, 1000].iter() {
group.bench_with_input(BenchmarkId::new("FutureGroup", i), i, |b, i| {
let setup = || make_future_group(*i);
Expand Down Expand Up @@ -116,6 +121,64 @@ mod future_group {
}
group.finish();
}

fn future_group_latency_bench(c: &mut Criterion) {
async fn routine<G>(iters: u64, size: usize) -> Duration
where
G: Default + Stream + Unpin + Extend<oneshot::Receiver<Never>>,
{
// fraction of `size` which represents the maximum number of
// futures which may all become ready between polls.
const MAX_READY_RATIO_PER_POLL: f64 = 0.1;
let max_ready_num_per_poll =
((size as f64) * MAX_READY_RATIO_PER_POLL).max(1.) as usize;

let mut total_runtime = Duration::ZERO;

for _ in 0..iters {
let (mut senders, mut group) = (0..size)
.map(|_| oneshot::channel())
.unzip::<_, _, Vec<_>, G>();

let mut rng = rand::rngs::StdRng::seed_from_u64(42);

// senders will complete in arbitrary order
senders.shuffle(&mut rng);

// poll once to set up all the wakers
assert!(futures::poll!(group.next()).is_pending());

while !senders.is_empty() {
let completion_limit = max_ready_num_per_poll.min(senders.len());
let num_completing = rng.gen_range(0..=completion_limit);

assert_eq!(senders.drain(..num_completing).count(), num_completing);

let recv_start = Instant::now();
assert_eq!(
(&mut group).take(num_completing).count().await,
num_completing
);
total_runtime += recv_start.elapsed();
}
}

total_runtime
}

let mut group = c.benchmark_group("future_group_poll_latency");
for i in &[10, 100, 1000] {
group.bench_with_input(BenchmarkId::new("FutureGroup", i), i, |b, &i| {
b.to_async(FuturesExecutor)
.iter_custom(|iters| routine::<FutureGroup<_>>(iters, i))
});
group.bench_with_input(BenchmarkId::new("FuturesUnordered", i), i, |b, &i| {
b.to_async(FuturesExecutor)
.iter_custom(|iters| routine::<FuturesUnordered<_>>(iters, i))
});
}
group.finish();
}
}

mod merge {
Expand Down

0 comments on commit bf34520

Please sign in to comment.