Skip to content

Commit

Permalink
Merge pull request #187 from conradludgate/fix-pin-violation
Browse files Browse the repository at this point in the history
fix pin violation in much of concurrentstream
  • Loading branch information
yoshuawuyts authored Jun 9, 2024
2 parents 629ddf3 + 00601b9 commit 9246a94
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 12 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ futures-lite = "1.12.0"
pin-project = "1.0.8"
slab = { version = "0.4.8", optional = true }
smallvec = { version = "1.11.0", optional = true }
futures-buffered = "0.2.6"

[dev-dependencies]
async-io = "2.3.2"
Expand Down
8 changes: 4 additions & 4 deletions src/concurrent_stream/for_each.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{Consumer, ConsumerState};
use crate::future::FutureGroup;
use futures_buffered::FuturesUnordered;
use futures_lite::StreamExt;
use pin_project::pin_project;

Expand All @@ -22,7 +22,7 @@ where
// NOTE: we can remove the `Arc` here if we're willing to make this struct self-referential
count: Arc<AtomicUsize>,
#[pin]
group: FutureGroup<ForEachFut<F, FutT, T, FutB>>,
group: FuturesUnordered<ForEachFut<F, FutT, T, FutB>>,
limit: usize,
f: F,
_phantom: PhantomData<(T, FutB)>,
Expand All @@ -44,7 +44,7 @@ where
f,
_phantom: PhantomData,
count: Arc::new(AtomicUsize::new(0)),
group: FutureGroup::new(),
group: FuturesUnordered::new(),
}
}
}
Expand All @@ -69,7 +69,7 @@ where
// Space was available! - insert the item for posterity
this.count.fetch_add(1, Ordering::Relaxed);
let fut = ForEachFut::new(this.f.clone(), future, this.count.clone());
this.group.as_mut().insert_pinned(fut);
this.group.as_mut().push(fut);

ConsumerState::Continue
}
Expand Down
8 changes: 4 additions & 4 deletions src/concurrent_stream/from_concurrent_stream.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use super::{ConcurrentStream, Consumer, ConsumerState, IntoConcurrentStream};
use crate::future::FutureGroup;
#[cfg(all(feature = "alloc", not(feature = "std")))]
use alloc::vec::Vec;
use core::future::Future;
use core::pin::Pin;
use futures_buffered::FuturesUnordered;
use futures_lite::StreamExt;
use pin_project::pin_project;

Expand Down Expand Up @@ -32,14 +32,14 @@ impl<T> FromConcurrentStream<T> for Vec<T> {
#[pin_project]
pub(crate) struct VecConsumer<'a, Fut: Future> {
#[pin]
group: FutureGroup<Fut>,
group: FuturesUnordered<Fut>,
output: &'a mut Vec<Fut::Output>,
}

impl<'a, Fut: Future> VecConsumer<'a, Fut> {
pub(crate) fn new(output: &'a mut Vec<Fut::Output>) -> Self {
Self {
group: FutureGroup::new(),
group: FuturesUnordered::new(),
output,
}
}
Expand All @@ -54,7 +54,7 @@ where
async fn send(self: Pin<&mut Self>, future: Fut) -> super::ConsumerState {
let mut this = self.project();
// unbounded concurrency, so we just goooo
this.group.as_mut().insert_pinned(future);
this.group.as_mut().push(future);
ConsumerState::Continue
}

Expand Down
8 changes: 4 additions & 4 deletions src/concurrent_stream/try_for_each.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::concurrent_stream::ConsumerState;
use crate::future::FutureGroup;
use crate::private::Try;
use futures_buffered::FuturesUnordered;
use futures_lite::StreamExt;
use pin_project::pin_project;

Expand All @@ -26,7 +26,7 @@ where
count: Arc<AtomicUsize>,
// TODO: remove the `Pin<Box>` from this signature by requiring this struct is pinned
#[pin]
group: FutureGroup<TryForEachFut<F, FutT, T, FutB, B>>,
group: FuturesUnordered<TryForEachFut<F, FutT, T, FutB, B>>,
limit: usize,
residual: Option<B::Residual>,
f: F,
Expand All @@ -50,7 +50,7 @@ where
f,
residual: None,
count: Arc::new(AtomicUsize::new(0)),
group: FutureGroup::new(),
group: FuturesUnordered::new(),
_phantom: PhantomData,
}
}
Expand Down Expand Up @@ -93,7 +93,7 @@ where
// Space was available! - insert the item for posterity
this.count.fetch_add(1, Ordering::Relaxed);
let fut = TryForEachFut::new(this.f.clone(), future, this.count.clone());
this.group.as_mut().insert_pinned(fut);
this.group.as_mut().push(fut);
ConsumerState::Continue
}

Expand Down
1 change: 1 addition & 0 deletions src/future/future_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ impl<F: Future> FutureGroup<F> {
Key(index)
}

#[allow(unused)]
/// Insert a value into a pinned `FutureGroup`
///
/// This method is private because it serves as an implementation detail for
Expand Down

0 comments on commit 9246a94

Please sign in to comment.