Skip to content

Commit

Permalink
Merge pull request #99 from poliorcetics/remove-need-for-tuple-for-each
Browse files Browse the repository at this point in the history
Remove need for `tuple_for_each!`, `tuple_len!` and simplify `gen_conditions!` a lot
  • Loading branch information
matheus-consoli authored Nov 19, 2022
2 parents 00de15d + 0732adf commit e3f61cd
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 192 deletions.
26 changes: 14 additions & 12 deletions benches/bench.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![allow(clippy::let_unit_value, clippy::unit_cmp)]

mod utils;

criterion::criterion_main!(merge::merge_benches, join::join_benches, race::race_benches);
Expand Down Expand Up @@ -35,19 +37,19 @@ mod merge {

fn array_merge_bench(c: &mut Criterion) {
c.bench_function("array::merge 10", |b| {
b.to_async(FuturesExecutor).iter(|| array_merge::<10>())
b.to_async(FuturesExecutor).iter(array_merge::<10>)
});
c.bench_function("array::merge 100", |b| {
b.to_async(FuturesExecutor).iter(|| array_merge::<100>())
b.to_async(FuturesExecutor).iter(array_merge::<100>)
});
c.bench_function("array::merge 1000", |b| {
b.to_async(FuturesExecutor).iter(|| array_merge::<1000>())
b.to_async(FuturesExecutor).iter(array_merge::<1000>)
});
}

fn tuple_merge_bench(c: &mut Criterion) {
c.bench_function("tuple::merge 10", |b| {
b.to_async(FuturesExecutor).iter(|| tuple_merge())
b.to_async(FuturesExecutor).iter(tuple_merge)
});
}

Expand Down Expand Up @@ -116,19 +118,19 @@ mod join {

fn array_join_bench(c: &mut Criterion) {
c.bench_function("array::join 10", |b| {
b.to_async(FuturesExecutor).iter(|| array_join::<10>())
b.to_async(FuturesExecutor).iter(array_join::<10>)
});
c.bench_function("array::join 100", |b| {
b.to_async(FuturesExecutor).iter(|| array_join::<100>())
b.to_async(FuturesExecutor).iter(array_join::<100>)
});
c.bench_function("array::join 1000", |b| {
b.to_async(FuturesExecutor).iter(|| array_join::<1000>())
b.to_async(FuturesExecutor).iter(array_join::<1000>)
});
}

fn tuple_join_bench(c: &mut Criterion) {
c.bench_function("tuple::join 10", |b| {
b.to_async(FuturesExecutor).iter(|| tuple_join())
b.to_async(FuturesExecutor).iter(tuple_join)
});
}

Expand Down Expand Up @@ -183,19 +185,19 @@ mod race {

fn array_race_bench(c: &mut Criterion) {
c.bench_function("array::race 10", |b| {
b.to_async(FuturesExecutor).iter(|| array_race::<10>())
b.to_async(FuturesExecutor).iter(array_race::<10>)
});
c.bench_function("array::race 100", |b| {
b.to_async(FuturesExecutor).iter(|| array_race::<100>())
b.to_async(FuturesExecutor).iter(array_race::<100>)
});
c.bench_function("array::race 1000", |b| {
b.to_async(FuturesExecutor).iter(|| array_race::<1000>())
b.to_async(FuturesExecutor).iter(array_race::<1000>)
});
}

fn tuple_race_bench(c: &mut Criterion) {
c.bench_function("tuple::race 10", |b| {
b.to_async(FuturesExecutor).iter(|| tuple_race())
b.to_async(FuturesExecutor).iter(tuple_race)
});
}

Expand Down
4 changes: 2 additions & 2 deletions benches/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub fn futures_tuple() -> (
CountdownFuture::new(6, len, wakers.clone(), completed.clone()),
CountdownFuture::new(7, len, wakers.clone(), completed.clone()),
CountdownFuture::new(8, len, wakers.clone(), completed.clone()),
CountdownFuture::new(9, len, wakers.clone(), completed.clone()),
CountdownFuture::new(9, len, wakers, completed),
)
}

Expand Down Expand Up @@ -94,7 +94,7 @@ pub fn streams_tuple() -> (
CountdownStream::new(6, len, wakers.clone(), completed.clone()),
CountdownStream::new(7, len, wakers.clone(), completed.clone()),
CountdownStream::new(8, len, wakers.clone(), completed.clone()),
CountdownStream::new(9, len, wakers.clone(), completed.clone()),
CountdownStream::new(9, len, wakers, completed),
)
}

Expand Down
1 change: 1 addition & 0 deletions src/future/join/tuple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ mod test {
use std::future;

#[test]
#[allow(clippy::unit_cmp)]
fn join_0() {
futures_lite::future::block_on(async {
assert_eq!(().join().await, ());
Expand Down
11 changes: 8 additions & 3 deletions src/future/race/tuple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,23 @@ macro_rules! impl_race_tuple {
let mut this = self.project();
assert!(!*this.done, "Futures must not be polled after completing");

const LEN: u32 = utils::tuple_len!($($F,)*);
#[repr(u32)]
enum Indexes {
$($F),*
}

const LEN: u32 = [$(Indexes::$F),*].len() as u32;
const PERMUTATIONS: u32 = utils::permutations(LEN);
let r = this.rng.generate(PERMUTATIONS);

for i in 0..LEN {
utils::gen_conditions!(LEN, i, r, this, cx, poll, {
utils::gen_conditions!(LEN, i, r, this, cx, poll, $((Indexes::$F as u32; $F, {
Poll::Ready(output) => {
*this.done = true;
return Poll::Ready(output);
},
_ => continue,
}, $($F,)*);
}))*);
}

Poll::Pending
Expand Down
39 changes: 25 additions & 14 deletions src/stream/merge/tuple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use std::pin::Pin;
use std::task::{Context, Poll};

macro_rules! poll_stream {
($stream_idx:tt, $iteration:ident, $this:ident, $streams:ident, $cx:ident, $len_streams:ident) => {
($stream_idx:tt, $iteration:ident, $this:ident, $streams:ident . $stream_member:ident, $cx:ident, $len_streams:ident) => {
if $stream_idx == $iteration {
match unsafe { Pin::new_unchecked(&mut $streams.$stream_idx) }.poll_next(&mut $cx) {
match unsafe { Pin::new_unchecked(&mut $streams.$stream_member) }.poll_next(&mut $cx) {
Poll::Ready(Some(item)) => {
// Mark ourselves as ready again because we need to poll for the next item.
$this
Expand Down Expand Up @@ -70,9 +70,13 @@ macro_rules! impl_merge_tuple {
};
($mod_name:ident $StructName:ident $($F:ident)+) => {
mod $mod_name {
#[derive(Debug)]
#[pin_project::pin_project]
pub(super) struct Streams<$($F,)+>($(#[pin] pub(super) $F,)+);
pub(super) struct Streams<$($F,)+> { $(#[pin] pub(super) $F: $F),+ }

#[repr(usize)]
pub(super) enum Indexes { $($F),+ }

pub(super) const LEN: usize = [$(Indexes::$F),+].len();
}

/// A stream that merges multiple streams into a single stream.
Expand All @@ -89,10 +93,9 @@ macro_rules! impl_merge_tuple {
)* {
#[pin] streams: $mod_name::Streams<$($F,)+>,
rng: utils::RandomGenerator,
wakers: WakerArray<{utils::tuple_len!($($F,)+)}>,
state: PollArray<{utils::tuple_len!($($F,)+)}>,
wakers: WakerArray<{$mod_name::LEN}>,
state: PollArray<{$mod_name::LEN}>,
completed: u8,
done: bool,
}

impl<T, $($F),*> fmt::Debug for $StructName<T, $($F),*>
Expand All @@ -102,7 +105,7 @@ macro_rules! impl_merge_tuple {
)* {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("Merge")
.field(&self.streams)
$( .field(&self.streams.$F) )* // Hides implementation detail of Streams struct
.finish()
}
}
Expand All @@ -119,7 +122,7 @@ macro_rules! impl_merge_tuple {
let mut readiness = this.wakers.readiness().lock().unwrap();
readiness.set_waker(cx.waker());

const LEN: u8 = utils::tuple_len!($($F,)*);
const LEN: u8 = $mod_name::LEN as u8;
let r = this.rng.generate(LEN as u32) as u8;

let mut streams = this.streams.project();
Expand All @@ -141,8 +144,17 @@ macro_rules! impl_merge_tuple {
// Obtain the intermediate waker.
let mut cx = Context::from_waker(this.wakers.get(index).unwrap());

// poll the `streams.{index}` stream
utils::tuple_for_each!(poll_stream (index, this, streams, cx, LEN) $($F)*);
$(
let stream_index = $mod_name::Indexes::$F as usize;
poll_stream!(
stream_index,
index,
this,
streams . $F,
cx,
LEN
);
)+

// Lock readiness so we can use it again
readiness = this.wakers.readiness().lock().unwrap();
Expand All @@ -162,12 +174,11 @@ macro_rules! impl_merge_tuple {
fn merge(self) -> Self::Stream {
let ($($F,)*): ($($F,)*) = self;
$StructName {
streams: $mod_name::Streams($($F.into_stream(),)+),
streams: $mod_name::Streams { $($F: $F.into_stream()),+ },
rng: utils::RandomGenerator::new(),
wakers: WakerArray::new(),
state: PollArray::new(),
completed: 0,
done: false,
}
}
}
Expand Down Expand Up @@ -202,7 +213,7 @@ mod tests {
let mut s = ().merge();

let mut called = false;
while let Some(_) = s.next().await {
while s.next().await.is_some() {
called = true;
}
assert!(!called);
Expand Down
2 changes: 1 addition & 1 deletion src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub(crate) use pin::{get_pin_mut, get_pin_mut_from_vec, iter_pin_mut, iter_pin_m
pub(crate) use poll_state::MaybeDone;
pub(crate) use poll_state::{PollArray, PollState, PollVec};
pub(crate) use rng::RandomGenerator;
pub(crate) use tuple::{gen_conditions, permutations, tuple_for_each, tuple_len};
pub(crate) use tuple::{gen_conditions, permutations};
pub(crate) use wakers::{InlineWakerArray, WakerArray, WakerVec};

#[cfg(test)]
Expand Down
Loading

0 comments on commit e3f61cd

Please sign in to comment.