Skip to content

Commit

Permalink
notes on ordering
Browse files Browse the repository at this point in the history
  • Loading branch information
yoshuawuyts committed Feb 16, 2024
1 parent a7b6c08 commit d63da7a
Showing 1 changed file with 4 additions and 13 deletions.
17 changes: 4 additions & 13 deletions src/concurrent_stream/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,4 @@
//! Concurrent execution of streams
//!
//! Pitch: we don't want to ever work with a `Stream<Item = impl Future>` for
//! concurrency; what we really want is a `ConcurrentAsyncIterator` trait which
//! does all the right things for us.
//!
//! # todos
//!
//! - [x] base function impl based on `StreamGroup`
//! - [ ] write integration tests to validate the impl
//! - [ ] split it out into its own trait, and `for_each` into its own method
//! - [ ] split `limit` out into its own method
//! - [ ] implement a `map` method
//! - [ ] implement a `collect` method
use crate::future::{FutureGroup, Race};
use futures_lite::{Stream, StreamExt};
Expand All @@ -37,12 +24,14 @@ where
let mut group = FutureGroup::new();

loop {
// ORDERING: this is single-threaded so `Relaxed` is ok
match count.load(Ordering::Relaxed) {
// 1. This is our base case: there are no items in the group, so we
// first have to wait for an item to become available from the
// stream.
0 => match stream.next().await {
Some(item) => {
// ORDERING: this is single-threaded so `Relaxed` is ok
count.fetch_add(1, Ordering::Relaxed);
let fut = CustomFut::new(&f, item, count.clone());
group.insert(fut);
Expand All @@ -67,6 +56,7 @@ where
};
match (a, b).race().await {
State::ItemReady(Some(item)) => {
// ORDERING: this is single-threaded so `Relaxed` is ok
count.fetch_add(1, Ordering::Relaxed);
let fut = CustomFut::new(&f, item, count.clone());
group.insert(fut);
Expand Down Expand Up @@ -121,6 +111,7 @@ where
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
(this.f)(this.item.take().unwrap());
// ORDERING: this is single-threaded so `Relaxed` is ok
this.count.fetch_sub(1, Ordering::Relaxed);
Poll::Ready(())
}
Expand Down

0 comments on commit d63da7a

Please sign in to comment.