Skip to content

Commit

Permalink
allow concurrent stream to operate on an async closure
Browse files Browse the repository at this point in the history
  • Loading branch information
yoshuawuyts committed Feb 16, 2024
1 parent d63da7a commit 5551b1c
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 44 deletions.
63 changes: 19 additions & 44 deletions src/concurrent_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,24 @@ use crate::future::{FutureGroup, Race};
use futures_lite::{Stream, StreamExt};
use std::clone::Clone;
use std::future::Future;
use std::pin::{pin, Pin};
use std::pin::pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};

/// Concurrently map the items coming out of a sequential stream, using `limit`
/// as the max concurrency.
///
/// This implementation does not suffer from the "concurrent iterator" issue,
/// because it will always make forward progress.
pub async fn concurrent_for_each<I, F>(stream: I, f: F, limit: usize)
pub async fn concurrent_for_each<I, F, Fut>(stream: I, f: F, limit: usize)
where
I: Stream + Unpin,
F: Fn(I::Item),
F: Fn(I::Item) -> Fut,
Fut: Future<Output = ()>,
{
let mut stream = stream.fuse();
let count = Arc::new(AtomicUsize::new(0));
let mut group = FutureGroup::new();
let mut group = pin!(FutureGroup::new());

loop {
// ORDERING: this is single-threaded so `Relaxed` is ok
Expand All @@ -33,8 +33,8 @@ where
Some(item) => {
// ORDERING: this is single-threaded so `Relaxed` is ok
count.fetch_add(1, Ordering::Relaxed);
let fut = CustomFut::new(&f, item, count.clone());
group.insert(fut);
let fut = insert_fut(&f, item, count.clone());
group.as_mut().insert_pinned(fut);
}
None => return,
},
Expand All @@ -58,8 +58,8 @@ where
State::ItemReady(Some(item)) => {
// ORDERING: this is single-threaded so `Relaxed` is ok
count.fetch_add(1, Ordering::Relaxed);
let fut = CustomFut::new(&f, item, count.clone());
group.insert(fut);
let fut = insert_fut(&f, item, count.clone());
group.as_mut().insert_pinned(fut);
}
State::ItemReady(None) => {} // do nothing, stream is done
State::GroupDone => {} // do nothing, group just finished an item - we get to loop again
Expand All @@ -77,42 +77,17 @@ where
}
}

enum State<T> {
ItemReady(Option<T>),
GroupDone,
}

/// This is a custom future implementation to ensure that our internal
/// `FutureGroup` impl `: Unpin`. We need to give it a concrete type, with a
/// concrete `Map` impl, and guarantee it implements `Unpin`.
#[pin_project::pin_project]
struct CustomFut<F, T> {
f: F,
item: Option<T>,
count: Arc<AtomicUsize>, // lmao, don't judge me
}

impl<F, T> CustomFut<F, T> {
fn new(f: F, item: T, count: Arc<AtomicUsize>) -> Self {
Self {
f,
item: Some(item),
count,
}
}
}

impl<F, T> Future for CustomFut<F, T>
async fn insert_fut<T, F, Fut>(f: F, item: T, count: Arc<AtomicUsize>)
where
F: Fn(T),
F: Fn(T) -> Fut,
Fut: Future<Output = ()>,
{
type Output = ();
(f)(item).await;
// ORDERING: this is single-threaded so `Relaxed` is ok
count.fetch_sub(1, Ordering::Relaxed);
}

fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
(this.f)(this.item.take().unwrap());
// ORDERING: this is single-threaded so `Relaxed` is ok
this.count.fetch_sub(1, Ordering::Relaxed);
Poll::Ready(())
}
enum State<T> {
ItemReady(Option<T>),
GroupDone,
}
31 changes: 31 additions & 0 deletions src/future/future_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,37 @@ impl<F: Future> FutureGroup<F> {
key
}

/// Insert a value into a pinned `FutureGroup`
///
/// This method is private because it serves as an implementation detail for
/// `ConcurrentStream`. We should never expose this publicly, as the entire
/// point of this crate is that we abstract the futures poll machinery away
/// from end-users.
pub(crate) fn insert_pinned(self: Pin<&mut Self>, stream: F) -> Key
where
F: Future,
{
let mut this = self.project();
// SAFETY: inserting a value into the futures slab does not ever move
// any of the existing values.
let index = unsafe { this.futures.as_mut().get_unchecked_mut() }.insert(stream);
this.keys.insert(index);
let key = Key(index);

// If our slab allocated more space we need to
// update our tracking structures along with it.
let max_len = this.futures.as_ref().capacity().max(index);
this.wakers.resize(max_len);
this.states.resize(max_len);

// Set the corresponding state
this.states[index].set_pending();
let mut readiness = this.wakers.readiness().lock().unwrap();
readiness.set_ready(index);

key
}

/// Create a stream which also yields the key of each item.
///
/// # Example
Expand Down

0 comments on commit 5551b1c

Please sign in to comment.