Skip to content

Commit

Permalink
FutureGroup: update latency bench
Browse files Browse the repository at this point in the history
replace round size randomness with round size parameterization
add documentation
add a yield
  • Loading branch information
soooch committed Apr 18, 2024
1 parent 95fb461 commit 501689c
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 21 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ criterion = { version = "0.3", features = [
] }
futures = "0.3.25"
futures-time = "3.0.0"
itertools = "0.12.1"
lending-stream = "1.0.0"
rand = "0.8.5"
tokio = { version = "1.32.0", features = ["macros", "time", "rt-multi-thread"] }
97 changes: 76 additions & 21 deletions benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ mod stream_group {
}
}
mod future_group {
use std::fmt::{Debug, Display};
use std::time::{Duration, Instant};

use criterion::async_executor::FuturesExecutor;
Expand All @@ -76,8 +77,10 @@ mod future_group {
use futures::never::Never;
use futures::stream::FuturesUnordered;
use futures_concurrency::future::FutureGroup;
use futures_lite::future::yield_now;
use futures_lite::prelude::*;
use rand::{seq::SliceRandom, Rng, SeedableRng};
use itertools::Itertools;
use rand::{seq::SliceRandom, SeedableRng};

use crate::utils::{make_future_group, make_futures_unordered};
criterion_group! {
Expand Down Expand Up @@ -122,38 +125,75 @@ mod future_group {
group.finish();
}

/// This benchmark aims to measure how long it takes for the results of
/// futures that have become ready to be produced by the [`FutureGroup`]
/// stream.
///
/// To test this, we:
/// - insert N pending futures to the [`FutureGroup`].
/// - until the [`FutureGroup`] is empty, we set some fraction of the
/// pending futures to ready, then record how long it takes for all newly
/// ready futures to be produced from [`FutureGroup`]'s `Stream` impl.
/// - we sum the recorded durations for each of these rounds.
fn future_group_latency_bench(c: &mut Criterion) {
async fn routine<G>(iters: u64, size: usize) -> Duration
#[derive(Debug, Clone, Copy)]
struct Params {
init_size: usize,
pct_ready_per_round: f64,
}

impl Display for Params {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Debug::fmt(&self, f)
}
}

async fn routine<G>(
iters: u64,
Params {
init_size,
pct_ready_per_round,
}: Params,
) -> Duration
where
G: Default + Stream + Unpin + Extend<oneshot::Receiver<Never>>,
{
// fraction of `size` which represents the maximum number of
// futures which may all become ready between polls.
const MAX_READY_RATIO_PER_POLL: f64 = 0.1;
let max_ready_num_per_poll =
((size as f64) * MAX_READY_RATIO_PER_POLL).max(1.) as usize;
let ready_per_round = ((init_size as f64) * pct_ready_per_round).max(1.) as usize;

let mut total_runtime = Duration::ZERO;

for _ in 0..iters {
let (mut senders, mut group) = (0..size)
// construct a set of oneshot::Receiver futures. These become
// ready once their Sender ends are dropped.
let (mut senders, mut group) = (0..init_size)
.map(|_| oneshot::channel())
.unzip::<_, _, Vec<_>, G>();

// shuffle our senders so from the FutureGroup's perspective,
// futures become ready in arbitrary order.
let mut rng = rand::rngs::StdRng::seed_from_u64(42);

// senders will complete in arbitrary order
senders.shuffle(&mut rng);

// poll once to set up all the wakers
assert!(futures::poll!(group.next()).is_pending());

while !senders.is_empty() {
let completion_limit = max_ready_num_per_poll.min(senders.len());
let num_completing = rng.gen_range(0..=completion_limit);
let num_completing = ready_per_round.min(senders.len());

// drop some Senders. The corresponding Receiver futures
// will become ready in the FutureGroup
assert_eq!(senders.drain(..num_completing).count(), num_completing);

// this isn't necessary now, but if we were using the tokio
// runtime/oneshots, coming up for air regularly prevents
// the budget system from inserting false pendings. this
// also more closely emulates what would happen in a real
// system (sender task yields to let executor poll receiver
// task). though that shouldn't make a difference.
yield_now().await;

// measure the time it takes for all newly ready futures to
// be produced from the FutureGroup stream.
let recv_start = Instant::now();
assert_eq!(
(&mut group).take(num_completing).count().await,
Expand All @@ -167,15 +207,30 @@ mod future_group {
}

let mut group = c.benchmark_group("future_group_poll_latency");
for i in &[10, 100, 1000] {
group.bench_with_input(BenchmarkId::new("FutureGroup", i), i, |b, &i| {
b.to_async(FuturesExecutor)
.iter_custom(|iters| routine::<FutureGroup<_>>(iters, i))
});
group.bench_with_input(BenchmarkId::new("FuturesUnordered", i), i, |b, &i| {
b.to_async(FuturesExecutor)
.iter_custom(|iters| routine::<FuturesUnordered<_>>(iters, i))
});
for params in [10, 100, 1000]
.into_iter()
.cartesian_product([0.0001, 0.2, 1.0])
.map(|(init_size, pct_ready_per_round)| Params {
init_size,
pct_ready_per_round,
})
{
group.bench_with_input(
BenchmarkId::new("FutureGroup", params),
&params,
|b, &params| {
b.to_async(FuturesExecutor)
.iter_custom(|iters| routine::<FutureGroup<_>>(iters, params))
},
);
group.bench_with_input(
BenchmarkId::new("FuturesUnordered", params),
&params,
|b, &params| {
b.to_async(FuturesExecutor)
.iter_custom(|iters| routine::<FuturesUnordered<_>>(iters, params))
},
);
}
group.finish();
}
Expand Down

0 comments on commit 501689c

Please sign in to comment.