diff --git a/src/concurrent_stream/mod.rs b/src/concurrent_stream/mod.rs index 08135f6..233bf11 100644 --- a/src/concurrent_stream/mod.rs +++ b/src/concurrent_stream/mod.rs @@ -1,17 +1,4 @@ //! Concurrent execution of streams -//! -//! Pitch: we don't want to ever work with a `Stream` for -//! concurrency; what we really want is a `ConcurrentAsyncIterator` trait which -//! does all the right things for us. -//! -//! # todos -//! -//! - [x] base function impl based on `StreamGroup` -//! - [ ] write integration tests to validate the impl -//! - [ ] split it out into its own trait, and `for_each` into its own method -//! - [ ] split `limit` out into its own method -//! - [ ] implement a `map` method -//! - [ ] implement a `collect` method use crate::future::{FutureGroup, Race}; use futures_lite::{Stream, StreamExt}; @@ -37,12 +24,14 @@ where let mut group = FutureGroup::new(); loop { + // ORDERING: this is single-threaded so `Relaxed` is ok match count.load(Ordering::Relaxed) { // 1. This is our base case: there are no items in the group, so we // first have to wait for an item to become available from the // stream. 0 => match stream.next().await { Some(item) => { + // ORDERING: this is single-threaded so `Relaxed` is ok count.fetch_add(1, Ordering::Relaxed); let fut = CustomFut::new(&f, item, count.clone()); group.insert(fut); @@ -67,6 +56,7 @@ where }; match (a, b).race().await { State::ItemReady(Some(item)) => { + // ORDERING: this is single-threaded so `Relaxed` is ok count.fetch_add(1, Ordering::Relaxed); let fut = CustomFut::new(&f, item, count.clone()); group.insert(fut); @@ -121,6 +111,7 @@ where fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { let this = self.project(); (this.f)(this.item.take().unwrap()); + // ORDERING: this is single-threaded so `Relaxed` is ok this.count.fetch_sub(1, Ordering::Relaxed); Poll::Ready(()) }