From 550f9383bbe8e135ce41bd14766f97905a05726d Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts <2467194+yoshuawuyts@users.noreply.github.com> Date: Mon, 14 Nov 2022 15:44:29 +0100 Subject: [PATCH] inline `pollstates` for `array::merge` --- src/stream/merge/array.rs | 40 +++++++++++++++++++-------------------- 1 file changed, 19 insertions(+), 21 deletions(-) diff --git a/src/stream/merge/array.rs b/src/stream/merge/array.rs index 1c8de5b..9020040 100644 --- a/src/stream/merge/array.rs +++ b/src/stream/merge/array.rs @@ -1,7 +1,8 @@ use super::Merge as MergeTrait; use crate::stream::IntoStream; -use crate::utils::{self, Fuse}; +use crate::utils::{self, PollState}; +use core::array; use core::fmt; use futures_core::Stream; use std::pin::Pin; @@ -20,8 +21,10 @@ where S: Stream, { #[pin] - streams: [Fuse; N], + streams: [S; N], rng: utils::RandomGenerator, + poll_state: [PollState; N], + done: bool, } impl Merge @@ -30,8 +33,10 @@ where { pub(crate) fn new(streams: [S; N]) -> Self { Self { - streams: streams.map(Fuse::new), + streams, rng: utils::RandomGenerator::new(), + poll_state: array::from_fn(|_| PollState::default()), + done: false, } } } @@ -54,31 +59,24 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); - // Randomize the indexes into our streams array. This ensures that when - // multiple streams are ready at the same time, we don't accidentally - // exhaust one stream before another. - let mut arr: [usize; N] = { - // this is an inlined version of `core::array::from_fn` - // TODO: replace this with `core::array::from_fn` when it becomes stable - let cb = |n| n; - let mut idx = 0; - [(); N].map(|_| { - let res = cb(idx); - idx += 1; - res - }) - }; - arr.sort_by_cached_key(|_| this.rng.generate(1000)); - // Iterate over our streams one-by-one. If a stream yields a value, // we exit early. By default we'll return `Poll::Ready(None)`, but // this changes if we encounter a `Poll::Pending`. let mut res = Poll::Ready(None); - for index in arr { + let index = this.rng.generate(N as u32) as usize; + + for index in (0..N).map(|pos| (index + pos).wrapping_rem(N)) { + if this.poll_state[index].is_consumed() { + continue; + } + let stream = utils::get_pin_mut(this.streams.as_mut(), index).unwrap(); match stream.poll_next(cx) { Poll::Ready(Some(item)) => return Poll::Ready(Some(item)), - Poll::Ready(None) => continue, + Poll::Ready(None) => { + this.poll_state[index] = PollState::Consumed; + continue; + } Poll::Pending => res = Poll::Pending, } }