From 3f95dbe48ab9b2eaf0b5ff160e89459e5877d080 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts <2467194+yoshuawuyts@users.noreply.github.com> Date: Tue, 15 Nov 2022 15:32:26 +0100 Subject: [PATCH] `impl Zip for Array` --- src/stream/zip/array.rs | 192 ++++++++++++++++++++++++++++++++++ src/utils/wakers/readiness.rs | 8 ++ 2 files changed, 200 insertions(+) diff --git a/src/stream/zip/array.rs b/src/stream/zip/array.rs index 8b13789..9790a2e 100644 --- a/src/stream/zip/array.rs +++ b/src/stream/zip/array.rs @@ -1 +1,193 @@ +use super::Zip as ZipTrait; +use crate::stream::IntoStream; +use crate::utils::{self, PollState, WakerList}; +use core::array; +use core::fmt; +use core::mem::MaybeUninit; +use core::pin::Pin; +use core::task::{Context, Poll}; +use std::mem; + +use futures_core::Stream; +use pin_project::{pin_project, pinned_drop}; + +/// ‘Zips up’ two streams into a single stream of pairs. +/// +/// This `struct` is created by the [`merge`] method on the [`Zip`] trait. See its +/// documentation for more. +/// +/// [`zip`]: trait.Zip.html#method.zip +/// [`Zip`]: trait.Zip.html +#[pin_project(PinnedDrop)] +pub struct Zip +where + S: Stream, +{ + #[pin] + streams: [S; N], + output: [MaybeUninit<::Item>; N], + wakers: WakerList, + state: [PollState; N], + done: bool, +} + +impl Zip +where + S: Stream, +{ + pub(crate) fn new(streams: [S; N]) -> Self { + Self { + streams, + output: array::from_fn(|_| MaybeUninit::uninit()), + state: array::from_fn(|_| PollState::default()), + wakers: WakerList::new(N), + done: false, + } + } +} + +impl fmt::Debug for Zip +where + S: Stream + fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_list().entries(self.streams.iter()).finish() + } +} + +impl Stream for Zip +where + S: Stream, +{ + type Item = [S::Item; N]; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + + assert!(!*this.done, "Stream should not be polled after completion"); + + let mut readiness = this.wakers.readiness().lock().unwrap(); + readiness.set_waker(cx.waker()); + for index in 0..N { + if !readiness.any_ready() { + // Nothing is ready yet + return Poll::Pending; + } else if this.state[index].is_done() { + // We already have data stored for this stream + continue; + } else if !readiness.clear_ready(index) { + // This waker isn't ready yet + continue; + } + + // unlock readiness so we don't deadlock when polling + drop(readiness); + + // Obtain the intermediate waker. + let mut cx = Context::from_waker(this.wakers.get(index).unwrap()); + + let stream = utils::get_pin_mut(this.streams.as_mut(), index).unwrap(); + match stream.poll_next(&mut cx) { + Poll::Ready(Some(item)) => { + this.output[index] = MaybeUninit::new(item); + this.state[index] = PollState::Done; + + let all_ready = this.state.iter().all(|state| state.is_done()); + if all_ready { + // Reset the future's state. + readiness = this.wakers.readiness().lock().unwrap(); + readiness.set_all_ready(); + this.state.fill(PollState::Pending); + + // Take the output + // + // SAFETY: we just validated all our data is populated, meaning + // we can assume this is initialized. + let mut output = array::from_fn(|_| MaybeUninit::uninit()); + mem::swap(this.output, &mut output); + let output = unsafe { array_assume_init(output) }; + return Poll::Ready(Some(output)); + } + } + Poll::Ready(None) => { + // If one stream returns `None`, we can no longer return + // pairs - meaning the stream is over. + *this.done = true; + return Poll::Ready(None); + } + Poll::Pending => {} + } + + // Lock readiness so we can use it again + readiness = this.wakers.readiness().lock().unwrap(); + } + Poll::Pending + } +} + +/// Drop the already initialized values on cancellation. +#[pinned_drop] +impl PinnedDrop for Zip +where + S: Stream, +{ + fn drop(self: Pin<&mut Self>) { + let this = self.project(); + + for (state, output) in this.state.iter_mut().zip(this.output.iter_mut()) { + if state.is_done() { + // SAFETY: we've just filtered down to *only* the initialized values. + // We can assume they're initialized, and this is where we drop them. + unsafe { output.assume_init_drop() }; + } + } + } +} + +impl ZipTrait for [S; N] +where + S: IntoStream, +{ + type Item = as Stream>::Item; + type Stream = Zip; + + fn zip(self) -> Self::Stream { + Zip::new(self.map(|i| i.into_stream())) + } +} + +#[cfg(test)] +mod tests { + use crate::stream::Zip; + use futures_lite::future::block_on; + use futures_lite::prelude::*; + use futures_lite::stream; + + #[test] + fn merge_tuple_4() { + block_on(async { + let a = stream::repeat(1).take(2); + let b = stream::repeat(2).take(2); + let c = stream::repeat(3).take(2); + let mut s = Zip::zip([a, b, c]); + + assert_eq!(s.next().await, Some([1, 2, 3])); + assert_eq!(s.next().await, Some([1, 2, 3])); + assert_eq!(s.next().await, None); + }) + } +} + +// Inlined version of the unstable `MaybeUninit::array_assume_init` feature. +// FIXME: replace with `utils::array_assume_init` +unsafe fn array_assume_init(array: [MaybeUninit; N]) -> [T; N] { + // SAFETY: + // * The caller guarantees that all elements of the array are initialized + // * `MaybeUninit` and T are guaranteed to have the same layout + // * `MaybeUninit` does not drop, so there are no double-frees + // And thus the conversion is safe + let ret = unsafe { (&array as *const _ as *const [T; N]).read() }; + mem::forget(array); + ret +} diff --git a/src/utils/wakers/readiness.rs b/src/utils/wakers/readiness.rs index 1cd9a93..b6618c8 100644 --- a/src/utils/wakers/readiness.rs +++ b/src/utils/wakers/readiness.rs @@ -7,6 +7,7 @@ use crate::utils; #[derive(Debug)] pub(crate) struct Readiness { count: usize, + max_count: usize, ready: BitVec, parent_waker: Option, } @@ -16,6 +17,7 @@ impl Readiness { pub(crate) fn new(count: usize) -> Self { Self { count, + max_count: count, ready: bitvec![true as usize; count], parent_waker: None, } @@ -33,6 +35,12 @@ impl Readiness { } } + /// Set all markers to ready. + pub(crate) fn set_all_ready(&mut self) { + self.ready.fill(true); + self.count = self.max_count; + } + /// Returns whether the task id was previously ready pub(crate) fn clear_ready(&mut self, id: usize) -> bool { if self.ready[id] {