Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add new benchmark for FutureGroup #179

Merged
merged 3 commits into from
Jun 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ criterion = { version = "0.3", features = [
] }
futures = "0.3.25"
futures-time = "3.0.0"
itertools = "0.12.1"
lending-stream = "1.0.0"
rand = "0.8.5"
tokio = { version = "1.32.0", features = ["macros", "time", "rt-multi-thread"] }
123 changes: 120 additions & 3 deletions benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,22 +68,30 @@ mod stream_group {
}
}
mod future_group {
use std::fmt::{Debug, Display};
use std::time::{Duration, Instant};

use criterion::async_executor::FuturesExecutor;
use criterion::{black_box, criterion_group, BatchSize, BenchmarkId, Criterion};
use futures::channel::oneshot;
use futures::never::Never;
use futures::stream::FuturesUnordered;
use futures_concurrency::future::FutureGroup;
use futures_lite::future::yield_now;
use futures_lite::prelude::*;
use itertools::Itertools;
use rand::{seq::SliceRandom, SeedableRng};

use crate::utils::{make_future_group, make_futures_unordered};
criterion_group! {
name = future_group_benches;
// This can be any expression that returns a `Criterion` object.
config = Criterion::default();
targets = future_group_bench
targets = future_group_throughput_bench, future_group_latency_bench
}

fn future_group_bench(c: &mut Criterion) {
let mut group = c.benchmark_group("future_group");
fn future_group_throughput_bench(c: &mut Criterion) {
soooch marked this conversation as resolved.
Show resolved Hide resolved
let mut group = c.benchmark_group("future_group_poll_throughput");
for i in [10, 100, 1000].iter() {
group.bench_with_input(BenchmarkId::new("FutureGroup", i), i, |b, i| {
let setup = || make_future_group(*i);
Expand Down Expand Up @@ -116,6 +124,115 @@ mod future_group {
}
group.finish();
}

/// This benchmark measures the latency between when futures become ready
/// and when their outputs appear on the [`FutureGroup`] stream.
///
/// To test this, we:
/// - insert N pending futures to the [`FutureGroup`].
/// - until the [`FutureGroup`] is empty, we set some fraction of the
/// pending futures to ready, then record how long it takes for their
/// outputs to be produced from [`FutureGroup`]'s `Stream` impl.
/// - we sum the recorded durations for each of these rounds.
fn future_group_latency_bench(c: &mut Criterion) {
#[derive(Debug, Clone, Copy)]
struct Params {
init_size: usize,
pct_ready_per_round: f64,
}

impl Display for Params {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Debug::fmt(&self, f)
}
}

async fn routine<G>(
iters: u64,
Params {
init_size,
pct_ready_per_round,
}: Params,
) -> Duration
where
G: Default + Stream + Unpin + Extend<oneshot::Receiver<Never>>,
{
let ready_per_round = ((init_size as f64) * pct_ready_per_round).max(1.) as usize;

let mut total_runtime = Duration::ZERO;

for _ in 0..iters {
// construct a set of oneshot::Receiver futures. These become
// ready once their Sender ends are dropped.
let (mut senders, mut group) = (0..init_size)
.map(|_| oneshot::channel())
.unzip::<_, _, Vec<_>, G>();

// shuffle our senders so from the FutureGroup's perspective,
// futures become ready in arbitrary order.
let mut rng = rand::rngs::StdRng::seed_from_u64(42);
senders.shuffle(&mut rng);

// poll once to set up all the wakers
assert!(futures::poll!(group.next()).is_pending());

while !senders.is_empty() {
let num_completing = ready_per_round.min(senders.len());

// drop some Senders. The corresponding Receiver futures
// will become ready in the FutureGroup
assert_eq!(senders.drain(..num_completing).count(), num_completing);

// this isn't necessary now, but if we were using the tokio
// runtime/oneshots, coming up for air regularly prevents
// the budget system from inserting false pendings. this
// also more closely emulates what would happen in a real
// system (sender task yields to let executor poll receiver
// task). though that shouldn't make a difference.
yield_now().await;

// measure the time it takes for all newly ready futures to
// be produced from the FutureGroup stream.
let recv_start = Instant::now();
assert_eq!(
(&mut group).take(num_completing).count().await,
num_completing
);
total_runtime += recv_start.elapsed();
}
}

total_runtime
}

let mut group = c.benchmark_group("future_group_poll_latency");
for params in [10, 100, 1000]
.into_iter()
.cartesian_product([0.0001, 0.2, 1.0])
.map(|(init_size, pct_ready_per_round)| Params {
init_size,
pct_ready_per_round,
})
{
group.bench_with_input(
BenchmarkId::new("FutureGroup", params),
&params,
|b, &params| {
b.to_async(FuturesExecutor)
.iter_custom(|iters| routine::<FutureGroup<_>>(iters, params))
},
);
group.bench_with_input(
BenchmarkId::new("FuturesUnordered", params),
&params,
|b, &params| {
b.to_async(FuturesExecutor)
.iter_custom(|iters| routine::<FuturesUnordered<_>>(iters, params))
},
);
}
group.finish();
}
}

mod merge {
Expand Down
23 changes: 18 additions & 5 deletions src/future/future_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ use crate::utils::{PollState, PollVec, WakerVec};
/// ```

#[must_use = "`FutureGroup` does nothing if not iterated over"]
#[derive(Default)]
#[pin_project::pin_project]
pub struct FutureGroup<F> {
#[pin]
Expand All @@ -80,6 +79,12 @@ impl<T: Debug> Debug for FutureGroup<T> {
}
}

impl<T> Default for FutureGroup<T> {
fn default() -> Self {
Self::new()
}
}

impl<F> FutureGroup<F> {
/// Create a new instance of `FutureGroup`.
///
Expand Down Expand Up @@ -415,14 +420,22 @@ impl<F: Future> Stream for FutureGroup<F> {
}
}

impl<F: Future> FromIterator<F> for FutureGroup<F> {
fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
impl<F: Future> Extend<F> for FutureGroup<F> {
fn extend<T: IntoIterator<Item = F>>(&mut self, iter: T) {
let iter = iter.into_iter();
let len = iter.size_hint().1.unwrap_or_default();
let mut this = Self::with_capacity(len);
self.reserve(len);

for future in iter {
this.insert(future);
self.insert(future);
}
}
}

impl<F: Future> FromIterator<F> for FutureGroup<F> {
fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
let mut this = Self::new();
this.extend(iter);
this
}
}
Expand Down
Loading