Skip to content

Commit

Permalink
inline pollstates for array::merge
Browse files Browse the repository at this point in the history
  • Loading branch information
yoshuawuyts committed Nov 14, 2022
1 parent f262b48 commit 550f938
Showing 1 changed file with 19 additions and 21 deletions.
40 changes: 19 additions & 21 deletions src/stream/merge/array.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use super::Merge as MergeTrait;
use crate::stream::IntoStream;
use crate::utils::{self, Fuse};
use crate::utils::{self, PollState};

use core::array;
use core::fmt;
use futures_core::Stream;
use std::pin::Pin;
Expand All @@ -20,8 +21,10 @@ where
S: Stream,
{
#[pin]
streams: [Fuse<S>; N],
streams: [S; N],
rng: utils::RandomGenerator,
poll_state: [PollState; N],
done: bool,
}

impl<S, const N: usize> Merge<S, N>
Expand All @@ -30,8 +33,10 @@ where
{
pub(crate) fn new(streams: [S; N]) -> Self {
Self {
streams: streams.map(Fuse::new),
streams,
rng: utils::RandomGenerator::new(),
poll_state: array::from_fn(|_| PollState::default()),
done: false,
}
}
}
Expand All @@ -54,31 +59,24 @@ where
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();

// Randomize the indexes into our streams array. This ensures that when
// multiple streams are ready at the same time, we don't accidentally
// exhaust one stream before another.
let mut arr: [usize; N] = {
// this is an inlined version of `core::array::from_fn`
// TODO: replace this with `core::array::from_fn` when it becomes stable
let cb = |n| n;
let mut idx = 0;
[(); N].map(|_| {
let res = cb(idx);
idx += 1;
res
})
};
arr.sort_by_cached_key(|_| this.rng.generate(1000));

// Iterate over our streams one-by-one. If a stream yields a value,
// we exit early. By default we'll return `Poll::Ready(None)`, but
// this changes if we encounter a `Poll::Pending`.
let mut res = Poll::Ready(None);
for index in arr {
let index = this.rng.generate(N as u32) as usize;

for index in (0..N).map(|pos| (index + pos).wrapping_rem(N)) {
if this.poll_state[index].is_consumed() {
continue;
}

let stream = utils::get_pin_mut(this.streams.as_mut(), index).unwrap();
match stream.poll_next(cx) {
Poll::Ready(Some(item)) => return Poll::Ready(Some(item)),
Poll::Ready(None) => continue,
Poll::Ready(None) => {
this.poll_state[index] = PollState::Consumed;
continue;
}
Poll::Pending => res = Poll::Pending,
}
}
Expand Down

0 comments on commit 550f938

Please sign in to comment.