Skip to content

Commit

Permalink
Merge pull request #179 from soooch/new-future-group-bench
Browse files Browse the repository at this point in the history
add new benchmark for `FutureGroup`
  • Loading branch information
yoshuawuyts authored Jun 9, 2024
2 parents 9246a94 + d960f75 commit a0b4003
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 8 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ criterion = { version = "0.3", features = [
] }
futures = "0.3.25"
futures-time = "3.0.0"
itertools = "0.12.1"
lending-stream = "1.0.0"
rand = "0.8.5"
tokio = { version = "1.32.0", features = ["macros", "time", "rt-multi-thread"] }
123 changes: 120 additions & 3 deletions benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,22 +68,30 @@ mod stream_group {
}
}
mod future_group {
use std::fmt::{Debug, Display};
use std::time::{Duration, Instant};

use criterion::async_executor::FuturesExecutor;
use criterion::{black_box, criterion_group, BatchSize, BenchmarkId, Criterion};
use futures::channel::oneshot;
use futures::never::Never;
use futures::stream::FuturesUnordered;
use futures_concurrency::future::FutureGroup;
use futures_lite::future::yield_now;
use futures_lite::prelude::*;
use itertools::Itertools;
use rand::{seq::SliceRandom, SeedableRng};

use crate::utils::{make_future_group, make_futures_unordered};
criterion_group! {
name = future_group_benches;
// This can be any expression that returns a `Criterion` object.
config = Criterion::default();
targets = future_group_bench
targets = future_group_throughput_bench, future_group_latency_bench
}

fn future_group_bench(c: &mut Criterion) {
let mut group = c.benchmark_group("future_group");
fn future_group_throughput_bench(c: &mut Criterion) {
let mut group = c.benchmark_group("future_group_poll_throughput");
for i in [10, 100, 1000].iter() {
group.bench_with_input(BenchmarkId::new("FutureGroup", i), i, |b, i| {
let setup = || make_future_group(*i);
Expand Down Expand Up @@ -116,6 +124,115 @@ mod future_group {
}
group.finish();
}

/// This benchmark measures the latency between when futures become ready
/// and when their outputs appear on the [`FutureGroup`] stream.
///
/// To test this, we:
/// - insert N pending futures to the [`FutureGroup`].
/// - until the [`FutureGroup`] is empty, we set some fraction of the
/// pending futures to ready, then record how long it takes for their
/// outputs to be produced from [`FutureGroup`]'s `Stream` impl.
/// - we sum the recorded durations for each of these rounds.
fn future_group_latency_bench(c: &mut Criterion) {
#[derive(Debug, Clone, Copy)]
struct Params {
init_size: usize,
pct_ready_per_round: f64,
}

impl Display for Params {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Debug::fmt(&self, f)
}
}

async fn routine<G>(
iters: u64,
Params {
init_size,
pct_ready_per_round,
}: Params,
) -> Duration
where
G: Default + Stream + Unpin + Extend<oneshot::Receiver<Never>>,
{
let ready_per_round = ((init_size as f64) * pct_ready_per_round).max(1.) as usize;

let mut total_runtime = Duration::ZERO;

for _ in 0..iters {
// construct a set of oneshot::Receiver futures. These become
// ready once their Sender ends are dropped.
let (mut senders, mut group) = (0..init_size)
.map(|_| oneshot::channel())
.unzip::<_, _, Vec<_>, G>();

// shuffle our senders so from the FutureGroup's perspective,
// futures become ready in arbitrary order.
let mut rng = rand::rngs::StdRng::seed_from_u64(42);
senders.shuffle(&mut rng);

// poll once to set up all the wakers
assert!(futures::poll!(group.next()).is_pending());

while !senders.is_empty() {
let num_completing = ready_per_round.min(senders.len());

// drop some Senders. The corresponding Receiver futures
// will become ready in the FutureGroup
assert_eq!(senders.drain(..num_completing).count(), num_completing);

// this isn't necessary now, but if we were using the tokio
// runtime/oneshots, coming up for air regularly prevents
// the budget system from inserting false pendings. this
// also more closely emulates what would happen in a real
// system (sender task yields to let executor poll receiver
// task). though that shouldn't make a difference.
yield_now().await;

// measure the time it takes for all newly ready futures to
// be produced from the FutureGroup stream.
let recv_start = Instant::now();
assert_eq!(
(&mut group).take(num_completing).count().await,
num_completing
);
total_runtime += recv_start.elapsed();
}
}

total_runtime
}

let mut group = c.benchmark_group("future_group_poll_latency");
for params in [10, 100, 1000]
.into_iter()
.cartesian_product([0.0001, 0.2, 1.0])
.map(|(init_size, pct_ready_per_round)| Params {
init_size,
pct_ready_per_round,
})
{
group.bench_with_input(
BenchmarkId::new("FutureGroup", params),
&params,
|b, &params| {
b.to_async(FuturesExecutor)
.iter_custom(|iters| routine::<FutureGroup<_>>(iters, params))
},
);
group.bench_with_input(
BenchmarkId::new("FuturesUnordered", params),
&params,
|b, &params| {
b.to_async(FuturesExecutor)
.iter_custom(|iters| routine::<FuturesUnordered<_>>(iters, params))
},
);
}
group.finish();
}
}

mod merge {
Expand Down
23 changes: 18 additions & 5 deletions src/future/future_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ use crate::utils::{PollState, PollVec, WakerVec};
/// ```
#[must_use = "`FutureGroup` does nothing if not iterated over"]
#[derive(Default)]
#[pin_project::pin_project]
pub struct FutureGroup<F> {
#[pin]
Expand All @@ -80,6 +79,12 @@ impl<T: Debug> Debug for FutureGroup<T> {
}
}

impl<T> Default for FutureGroup<T> {
fn default() -> Self {
Self::new()
}
}

impl<F> FutureGroup<F> {
/// Create a new instance of `FutureGroup`.
///
Expand Down Expand Up @@ -416,14 +421,22 @@ impl<F: Future> Stream for FutureGroup<F> {
}
}

impl<F: Future> FromIterator<F> for FutureGroup<F> {
fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
impl<F: Future> Extend<F> for FutureGroup<F> {
fn extend<T: IntoIterator<Item = F>>(&mut self, iter: T) {
let iter = iter.into_iter();
let len = iter.size_hint().1.unwrap_or_default();
let mut this = Self::with_capacity(len);
self.reserve(len);

for future in iter {
this.insert(future);
self.insert(future);
}
}
}

impl<F: Future> FromIterator<F> for FutureGroup<F> {
fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
let mut this = Self::new();
this.extend(iter);
this
}
}
Expand Down

0 comments on commit a0b4003

Please sign in to comment.