From af5edd38a8282f4a5b4d4dcbb13f2e9d2632f878 Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Fri, 20 Oct 2023 14:04:48 -0700 Subject: [PATCH] ParallelJoin works but require dyn --- src/seq_join.rs | 59 ++++++++++++++----------------------------------- 1 file changed, 17 insertions(+), 42 deletions(-) diff --git a/src/seq_join.rs b/src/seq_join.rs index 583513499..27e90dcf2 100644 --- a/src/seq_join.rs +++ b/src/seq_join.rs @@ -6,13 +6,12 @@ use std::{ task::{Context, Poll}, }; use std::marker::PhantomData; +use async_trait::async_trait; use clap::builder::TypedValueParser; -use futures::{ - stream::{iter, Iter as StreamIter, TryCollect}, - Future, Stream, StreamExt, TryStreamExt, -}; +use futures::{stream::{iter, Iter as StreamIter, TryCollect}, Future, Stream, StreamExt, TryStreamExt, TryFuture}; use futures_util::future::TryJoinAll; +use futures_util::stream::FuturesOrdered; use pin_project::pin_project; use crate::exact::ExactSizeStream; @@ -103,38 +102,10 @@ where } } -struct ParallelFutures<'a, I, F> { - spawner: UnsafeSpawner<'a, F>, - iterable: I, -} - -#[pin_project] -pub struct ParallelFutures2<'a, F> - where F: futures::future::TryFuture, -F::Ok: Send + 'static, -F::Error: Send + 'static -{ - spawner: UnsafeSpawner<'a, Result>, - #[pin] - inner: TryJoinAll>>, -} - -impl Future for ParallelFutures2<'_, F> - - where F: futures::future::TryFuture, -F::Ok: Send + 'static, -F::Error: Send + 'static -{ - type Output = Result, F::Error>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.project().inner.poll(cx) - } -} - /// The `SeqJoin` trait wraps `seq_try_join_all`, providing the `active` parameter /// from the provided context so that the value can be made consistent. pub trait SeqJoin { + /// Perform a sequential join of the futures from the provided iterable. /// This uses [`seq_join`], with the current state of the associated object /// being used to determine the number of active items to track (see [`active_work`]). @@ -166,24 +137,28 @@ pub trait SeqJoin { } /// Join multiple tasks in parallel. Only do this if you can't use a sequential join. - fn parallel_join<'a, I, F, O, E>(&self, iterable: I) -> ParallelFutures2<'a, I::Item> + fn parallel_join(&self, iterable: I) -> Pin, E>> + Send>> where - I: IntoIterator, - // F: futures::future::TryFuture + Send, - F: Future> + Send + 'a, + I: IntoIterator + Send, + F: Future> + Send, O: Send + 'static, E: Send + 'static - // F::Ok: Send + 'static, - // F::Error: Send + 'static, { // let iterable = iterable.into_iter().map(|f| { // spawner.spawn(f) // }); + // let spawner = UnsafeSpawner::default(); + let mut futures = FuturesOrdered::default(); let spawner = UnsafeSpawner::default(); - ParallelFutures2 { - spawner, - inner: futures::future::try_join_all(iterable.into_iter().map(|f| spawner.spawn(f))), + for f in iterable.into_iter() { + futures.push_back(spawner.spawn(f.into_future())); } + + Box::pin(async move { futures.try_collect().await }) + // ParallelFutures2 { + // spawner, + // inner: futures::future::try_join_all(iterable.into_iter().map(|f| spawner.spawn(f))), + // } // #[allow(clippy::disallowed_methods)] // Just in this one place. // futures::future::try_join_all(iterable.into_iter() // .map(|f| tokio::spawn()))