Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
yoshuawuyts committed Feb 17, 2024
1 parent 5551b1c commit 857dafa
Showing 1 changed file with 52 additions and 4 deletions.
56 changes: 52 additions & 4 deletions src/concurrent_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,21 @@ use std::sync::Arc;
///
/// This implementation does not suffer from the "concurrent iterator" issue,
/// because it will always make forward progress.
pub async fn concurrent_for_each<I, F, Fut>(stream: I, f: F, limit: usize)
pub async fn concurrent_for_each<I, F, Fut>(mut stream: I, f: F, limit: usize)
where
I: Stream + Unpin,
F: Fn(I::Item) -> Fut,
Fut: Future<Output = ()>,
{
let mut stream = stream.fuse();
let mut is_done = false;
let count = Arc::new(AtomicUsize::new(0));
let mut group = pin!(FutureGroup::new());

loop {
if is_done {
group.next().await;
}

// ORDERING: this is single-threaded so `Relaxed` is ok
match count.load(Ordering::Relaxed) {
// 1. This is our base case: there are no items in the group, so we
Expand All @@ -36,7 +40,9 @@ where
let fut = insert_fut(&f, item, count.clone());
group.as_mut().insert_pinned(fut);
}
None => return,
None => {
return;
}
},

// 2. Here our group still has capacity remaining, so we want to
Expand All @@ -61,7 +67,9 @@ where
let fut = insert_fut(&f, item, count.clone());
group.as_mut().insert_pinned(fut);
}
State::ItemReady(None) => {} // do nothing, stream is done
State::ItemReady(None) => {
is_done = true;
}
State::GroupDone => {} // do nothing, group just finished an item - we get to loop again
}
}
Expand Down Expand Up @@ -91,3 +99,43 @@ enum State<T> {
ItemReady(Option<T>),
GroupDone,
}

#[cfg(test)]
mod test {
use super::*;
use futures_lite::stream;

#[test]
fn concurrency_one() {
futures_lite::future::block_on(async {
let count = Arc::new(AtomicUsize::new(0));
let s = stream::repeat(1).take(2);
let limit = 1;
let map = |n| {
let count = count.clone();
async move {
count.fetch_add(n, Ordering::Relaxed);
}
};
concurrent_for_each(s, map, limit).await;
assert_eq!(count.load(Ordering::Relaxed), 2);
});
}

#[test]
fn concurrency_three() {
futures_lite::future::block_on(async {
let count = Arc::new(AtomicUsize::new(0));
let s = stream::repeat(1).take(10);
let limit = 3;
let map = |n| {
let count = count.clone();
async move {
count.fetch_add(n, Ordering::Relaxed);
}
};
concurrent_for_each(s, map, limit).await;
assert_eq!(count.load(Ordering::Relaxed), 10);
});
}
}

0 comments on commit 857dafa

Please sign in to comment.