From 234a227ebb5d98c490230c8b477249956f0b0b5f Mon Sep 17 00:00:00 2001 From: Yosh Date: Sat, 6 Apr 2024 13:50:36 +0200 Subject: [PATCH 1/8] Update landing page docs --- src/collections/vec.rs | 2 +- src/lib.rs | 167 ++++++++++++++++++++++++++++++----------- 2 files changed, 124 insertions(+), 45 deletions(-) diff --git a/src/collections/vec.rs b/src/collections/vec.rs index d8371b4..10cdcaa 100644 --- a/src/collections/vec.rs +++ b/src/collections/vec.rs @@ -3,7 +3,7 @@ //! You will rarely need to interact with this module directly unless you need //! to name one of the iterator types. //! -//! [std::vec]: https://doc.rust-lang.org/stable/std/vec/use core::future::Ready; +//! [std::vec]: https://doc.rust-lang.org/std/vec/index.html use crate::concurrent_stream::{self, FromStream}; use crate::prelude::*; diff --git a/src/lib.rs b/src/lib.rs index 4812bde..5407181 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,62 +1,141 @@ -//! Concurrency extensions for [`Future`][core::future::Future] and `Stream` -//! (also known as [`AsyncIterator`][core::async_iter::AsyncIterator]). +//! Performant, portable, structured concurrency operations for async Rust. It works with any runtime, does not erase lifetimes, always handles cancellation, and always returns output to the caller. //! -//! Companion library for the "Futures Concurrency" blog post -//! series: -//! - [Futures Concurrency I: Introduction](https://blog.yoshuawuyts.com/futures-concurrency/) -//! - [Futures Concurrency II: A Trait Approach](https://blog.yoshuawuyts.com/futures-concurrency-2/) -//! - [Futures Concurrency III: `select!`](https://blog.yoshuawuyts.com/futures-concurrency-3/) -//! - [Futures Concurrency IV: Join Semantics](https://blog.yoshuawuyts.com/futures-concurrency-4/) +//! `futures-concurrency` provides concurrency operations for both groups of futures +//! and streams. Both for bounded and unbounded sets of futures and streams. In both +//! cases performance should be on par with, if not exceed conventional executor +//! implementations. +//! +//! ## Examples +//! +//! **Await multiple futures of different types** +//! ```rust +//! use futures_concurrency::prelude::*; +//! use std::future; +//! +//! let a = future::ready(1u8); +//! let b = future::ready("hello"); +//! let c = future::ready(3u16); +//! assert_eq!((a, b, c).join().await, (1, "hello", 3)); +//! ``` //! -//! The purpose of this library is to serve as a staging ground for what -//! eventually may become the futures concurrency methods provided by the -//! stdlib. See the [`future`] and [`stream`] submodules for more. +//! **Concurrently process items in a stream** //! -//! # Operations +//! ```rust +//! use futures_concurrency::prelude::*; +//! use futures_lite::stream; //! -//! This library provides the following operations on arrays, vecs, and tuples: +//! let v: Vec<_> = vec!["chashu", "nori"] +//! .into_co_stream() +//! .map(|msg| async move { format!("hello {msg}") }) +//! .collect() +//! .await; +//! +//! assert_eq!(v, &["hello chashu", "hello nori"]); +//! ``` //! -//! - [`future::Join`]: Wait for all futures to complete. -//! - [`future::TryJoin`]: Wait for all futures to complete successfully, or abort early on error. -//! - [`future::Race`]: Wait for the first future to complete. -//! - [`future::RaceOk`]: Wait for the first successful future to complete. -//! - [`stream::Chain`]: Takes multiple streams and creates a new stream over all in sequence. -//! - [`stream::Merge`]: Combines multiple streams into a single stream of all their outputs. -//! - [`stream::Zip`]: ‘Zips up’ multiple streams into a single stream of pairs. +//! **Access stack data outside the futures' scope** //! -//! # Examples +//! _Adapted from [`std::thread::scope`](https://doc.rust-lang.org/std/thread/fn.scope.html)._ //! -//! Concurrently await multiple heterogenous futures: //! ```rust //! use futures_concurrency::prelude::*; -//! use futures_lite::future::block_on; -//! use std::future; //! -//! block_on(async { -//! let a = future::ready(1u8); -//! let b = future::ready("hello"); -//! let c = future::ready(3u16); -//! assert_eq!((a, b, c).join().await, (1, "hello", 3)); -//! }) +//! let mut container = vec![1, 2, 3]; +//! let mut num = 0; +//! +//! let a = async { +//! println!("hello from the first future"); +//! dbg!(&container); +//! }; +//! +//! let b = async { +//! println!("hello from the second future"); +//! num += container[0] + container[2]; +//! }; +//! +//! println!("hello from the main future"); +//! let _ = (a, b).join().await; +//! container.push(4); +//! assert_eq!(x, container.len()); //! ``` //! -//! # Limitations +//! ## Operations +//! +//! ### Futures //! -//! Because of orphan rules this library can't implement everything the stdlib -//! can. The missing implementations are: +//! For futures which return a regular type `T` only the `join` and `race` +//! operations are available. `join` waits for all futures to complete, while `race` +//! will wait for the first future to complete. However for futures which return a +//! `Try` two additional operations are available. The following table +//! describes the behavior of concurrency operations for fallible futures: //! -//! - `impl IntoFuture for Vec` -//! - `impl IntoFuture for [T; N]` -//! - `impl IntoFuture for (T..)` -//! - `impl IntoAsyncIterator for Vec` -//! - `impl IntoAsyncIterator for [T; N]` -//! - `impl IntoAsyncIterator for (T..)` +//! | | **Wait for all outputs** | **Wait for first output** | +//! | -------------------------- | :----------------------- | :------------------------ | +//! | **Continue on error** | `Future::join` | `Future::race_ok` | +//! | **Short-circuit on error** | `Future::try_join` | `Future::race` | //! -//! This would enable containers of futures to directly be `.await`ed to get -//! `merge` semantics. Or containers of async iterators to be passed directly to -//! `for..await in` loops to be iterated over using `merge` semantics. This would -//! remove the need to think of "merge" as a verb, and would enable treating -//! sets of futures concurrently. +//! The following futures implementations are provided by `futures-concurrency`: +//! - [`FutureGroup`][future::FutureGroup]: A growable group of futures which operate as a single unit. +//! - `tuple`: [`join`][future::Join#impl-Join-for-(A,+B)], [`try_join`][future::TryJoin#impl-TryJoin-for-(A,+B)], [`race`][future::Race#impl-Race-for-(A,+B)], [`race_ok`][future::RaceOk#impl-RaceOk-for-(A,+B)] +//! - `array`: [`join`][future::Join#impl-Join-for-\[Fut;+N\]], [`try_join`][future::TryJoin#impl-TryJoin-for-\[Fut;+N\]], [`race`][future::Race#impl-Race-for-\[Fut;+N\]], [`race_ok`][future::RaceOk#impl-RaceOk-for-\[Fut;+N\]] +//! - `Vec`: [`join`][future::Join#impl-Join-for-Vec], [`try_join`][future::TryJoin#impl-TryJoin-for-Vec], [`race`][future::Race#impl-Race-for-Vec], [`race_ok`][future::RaceOk#impl-RaceOk-for-Vec] +//! +//! ### Streams +//! +//! Streams yield outputs one-by-one, which means that deciding to stop iterating is +//! the same for fallible and infallible streams. The operations provided for +//! streams can be categorized based on whether their inputs can be concurrently +//! processed, and whether their outputs can be concurrently processed. +//! +//! Specifically in the case of `merge`, it takes `N` streams in, and yields items +//! one-by-one as soon as any are available. This enables the output of individual +//! streams to be concurrently processed by further operations later on. +//! +//! | | __Sequential processing__ | __Concurrent processing__ | +//! | ------------------------ | --------------------- | --------------------- | +//! | __Sequential execution__ | `Stream::chain` | *not yet available* ‡ | +//! | __Concurrent execution__ | `Stream::zip` | `Stream::merge` | +//! +//! ‡: _This could be addressed by a hypothetical `Stream::unzip` operation, +//! however because we aspire for semantic compatibility with `std::iter::Iterator` +//! in our operations, the path to adding it is currently unclear_. +//! +//! The following streams implementations are provided by `futures-concurrency`: +//! +//! - [`StreamGroup`][stream::StreamGroup]: A growable group of streams which operate as a single unit. +//! - [`ConcurrentStream`][concurrent_stream::ConcurrentStream]: An asynchronous stream which can concurrently process items. +//! - `tuple`: [`chain`][stream::Chain#impl-Chain-for-(A,+B)], [`merge`][stream::Merge#impl-Merge-for-(A,+B)], [`zip`][stream::Zip#impl-Zip-for-(A,+B)] +//! - `array`: [`chain`][stream::Chain#impl-Chain-for-\[Fut;+N\]], [`merge`][stream::Merge#impl-Merge-for-\[Fut;+N\]], [`zip`][stream::Zip#impl-Zip-for-\[Fut;+N\]] +//! - `Vec`: [`chain`][stream::Chain#impl-Chain-for-Vec], [`merge`][stream::Merge#impl-Merge-for-Vec], [`zip`][stream::Zip#impl-Zip-for-Vec] +//! +//! ## Runtime Support +//! +//! `futures-concurrency` does not depend on any runtime executor being present. This enables it to work out of the box with any async runtime, including: `tokio`, `async-std`, `smol`, `glommio`, and `monoio`. It also supports `#[no_std]` environments, allowing it to be used with embedded async runtimes such as `embassy`. +//! +//! ## Feature Flags +//! +//! The `std` feature flag is enabled by default. To target `alloc` or `no_std` +//! environments, you can enable the following configuration: +//! +//! ```toml +//! [dependencies] +//! # no_std +//! futures-concurrency = { version = "7.5.0", default-features = false } +//! +//! # alloc +//! futures-concurrency = { version = "7.5.0", default-features = false, features = ["alloc"] } +//! ``` +//! +//! ## Further Reading +//! +//! `futures-concurrency` has been developed over the span of several years. It is +//! primarily maintained by Yosh Wuyts, a member of the Rust Async WG. You can read +//! more about the development and ideas behind `futures-concurrency` here: +//! +//! - [Futures Concurrency I: Introduction](https://blog.yoshuawuyts.com/futures-concurrency/) +//! - [Futures Concurrency II: A Trait Approach](https://blog.yoshuawuyts.com/futures-concurrency-2/) +//! - [Futures Concurrency III: `select!`](https://blog.yoshuawuyts.com/futures-concurrency-3/) +//! - [Futures Concurrency IV: Join Semantics](https://blog.yoshuawuyts.com/futures-concurrency-4/) #![deny(missing_debug_implementations, nonstandard_style)] #![warn(missing_docs)] From 616ced3d91e8ee141c595330d0a035d0543e7689 Mon Sep 17 00:00:00 2001 From: Yosh Date: Sun, 7 Apr 2024 00:08:04 +0200 Subject: [PATCH 2/8] fix clippy --- src/lib.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 5407181..48450da 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,12 +25,12 @@ //! use futures_lite::stream; //! //! let v: Vec<_> = vec!["chashu", "nori"] -//! .into_co_stream() -//! .map(|msg| async move { format!("hello {msg}") }) -//! .collect() -//! .await; +//! .into_co_stream() +//! .map(|msg| async move { format!("hello {msg}") }) +//! .collect() +//! .await; //! -//! assert_eq!(v, &["hello chashu", "hello nori"]); +//! assert_eq!(v, &["hello chashu", "hello nori"]); //! ``` //! //! **Access stack data outside the futures' scope** @@ -44,13 +44,13 @@ //! let mut num = 0; //! //! let a = async { -//! println!("hello from the first future"); -//! dbg!(&container); +//! println!("hello from the first future"); +//! dbg!(&container); //! }; //! //! let b = async { -//! println!("hello from the second future"); -//! num += container[0] + container[2]; +//! println!("hello from the second future"); +//! num += container[0] + container[2]; //! }; //! //! println!("hello from the main future"); From e80e4444cea79f900cf7f304bdfeed3d2cd4488a Mon Sep 17 00:00:00 2001 From: Yosh Date: Sun, 7 Apr 2024 00:14:15 +0200 Subject: [PATCH 3/8] fix doctests --- src/lib.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 48450da..3a5b74b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,10 +12,12 @@ //! use futures_concurrency::prelude::*; //! use std::future; //! +//! # futures::executor::block_on(async { //! let a = future::ready(1u8); //! let b = future::ready("hello"); //! let c = future::ready(3u16); //! assert_eq!((a, b, c).join().await, (1, "hello", 3)); +//! # }); //! ``` //! //! **Concurrently process items in a stream** @@ -24,6 +26,7 @@ //! use futures_concurrency::prelude::*; //! use futures_lite::stream; //! +//! # futures::executor::block_on(async { //! let v: Vec<_> = vec!["chashu", "nori"] //! .into_co_stream() //! .map(|msg| async move { format!("hello {msg}") }) @@ -31,6 +34,7 @@ //! .await; //! //! assert_eq!(v, &["hello chashu", "hello nori"]); +//! # }); //! ``` //! //! **Access stack data outside the futures' scope** @@ -40,6 +44,7 @@ //! ```rust //! use futures_concurrency::prelude::*; //! +//! # futures::executor::block_on(async { //! let mut container = vec![1, 2, 3]; //! let mut num = 0; //! @@ -56,7 +61,8 @@ //! println!("hello from the main future"); //! let _ = (a, b).join().await; //! container.push(4); -//! assert_eq!(x, container.len()); +//! assert_eq!(num, container.len()); +//! # }); //! ``` //! //! ## Operations From 6dde6dadab69a4cad9f1fbb1e522c7f0d31021d3 Mon Sep 17 00:00:00 2001 From: Yosh Date: Sun, 7 Apr 2024 00:14:54 +0200 Subject: [PATCH 4/8] multiline lib.rs docs --- src/lib.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 3a5b74b..98b9eaa 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,6 @@ -//! Performant, portable, structured concurrency operations for async Rust. It works with any runtime, does not erase lifetimes, always handles cancellation, and always returns output to the caller. +//! Performant, portable, structured concurrency operations for async Rust. It +//! works with any runtime, does not erase lifetimes, always handles +//! cancellation, and always returns output to the caller. //! //! `futures-concurrency` provides concurrency operations for both groups of futures //! and streams. Both for bounded and unbounded sets of futures and streams. In both @@ -116,7 +118,11 @@ //! //! ## Runtime Support //! -//! `futures-concurrency` does not depend on any runtime executor being present. This enables it to work out of the box with any async runtime, including: `tokio`, `async-std`, `smol`, `glommio`, and `monoio`. It also supports `#[no_std]` environments, allowing it to be used with embedded async runtimes such as `embassy`. +//! `futures-concurrency` does not depend on any runtime executor being present. +//! This enables it to work out of the box with any async runtime, including: +//! `tokio`, `async-std`, `smol`, `glommio`, and `monoio`. It also supports +//! `#[no_std]` environments, allowing it to be used with embedded async +//! runtimes such as `embassy`. //! //! ## Feature Flags //! From 8f4e77e1c22a3f4d339bf59999574da840254a18 Mon Sep 17 00:00:00 2001 From: Yosh Date: Sun, 7 Apr 2024 00:22:30 +0200 Subject: [PATCH 5/8] add examples to README --- README.md | 64 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 14 ++++++------ 2 files changed, 71 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 4352a0a..c21da78 100644 --- a/README.md +++ b/README.md @@ -41,6 +41,70 @@ +Performant, portable, structured concurrency operations for async Rust. It +works with any runtime, does not erase lifetimes, always handles +cancellation, and always returns output to the caller. + +`futures-concurrency` provides concurrency operations for both groups of futures +and streams. Both for bounded and unbounded sets of futures and streams. In both +cases performance should be on par with, if not exceed conventional executor +implementations. + +## Examples + +**Await multiple futures of different types** +```rust +use futures_concurrency::prelude::*; +use std::future; + +let a = future::ready(1u8); +let b = future::ready("hello"); +let c = future::ready(3u16); +assert_eq!((a, b, c).join().await, (1, "hello", 3)); +``` + +**Concurrently process items in a stream** + +```rust +use futures_concurrency::prelude::*; +use futures_lite::stream; + +# futures::executor::block_on(async { +let v: Vec<_> = vec!["chashu", "nori"] + .into_co_stream() + .map(|msg| async move { format!("hello {msg}") }) + .collect() + .await; + +assert_eq!(v, &["hello chashu", "hello nori"]); +``` + +**Access stack data outside the futures' scope** + +_Adapted from [`std::thread::scope`](https://doc.rust-lang.org/std/thread/fn.scope.html)._ + +```rust +use futures_concurrency::prelude::*; + +let mut container = vec![1, 2, 3]; +let mut num = 0; + +let a = async { + println!("hello from the first future"); + dbg!(&container); +}; + +let b = async { + println!("hello from the second future"); + num += container[0] + container[2]; +}; + +println!("hello from the main future"); +let _ = (a, b).join().await; +container.push(4); +assert_eq!(num, container.len()); +``` + ## Installation ```sh $ cargo add futures-concurrency diff --git a/src/lib.rs b/src/lib.rs index 98b9eaa..6743d30 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,7 +7,7 @@ //! cases performance should be on par with, if not exceed conventional executor //! implementations. //! -//! ## Examples +//! # Examples //! //! **Await multiple futures of different types** //! ```rust @@ -67,9 +67,9 @@ //! # }); //! ``` //! -//! ## Operations +//! # Operations //! -//! ### Futures +//! ## Futures //! //! For futures which return a regular type `T` only the `join` and `race` //! operations are available. `join` waits for all futures to complete, while `race` @@ -88,7 +88,7 @@ //! - `array`: [`join`][future::Join#impl-Join-for-\[Fut;+N\]], [`try_join`][future::TryJoin#impl-TryJoin-for-\[Fut;+N\]], [`race`][future::Race#impl-Race-for-\[Fut;+N\]], [`race_ok`][future::RaceOk#impl-RaceOk-for-\[Fut;+N\]] //! - `Vec`: [`join`][future::Join#impl-Join-for-Vec], [`try_join`][future::TryJoin#impl-TryJoin-for-Vec], [`race`][future::Race#impl-Race-for-Vec], [`race_ok`][future::RaceOk#impl-RaceOk-for-Vec] //! -//! ### Streams +//! ## Streams //! //! Streams yield outputs one-by-one, which means that deciding to stop iterating is //! the same for fallible and infallible streams. The operations provided for @@ -116,7 +116,7 @@ //! - `array`: [`chain`][stream::Chain#impl-Chain-for-\[Fut;+N\]], [`merge`][stream::Merge#impl-Merge-for-\[Fut;+N\]], [`zip`][stream::Zip#impl-Zip-for-\[Fut;+N\]] //! - `Vec`: [`chain`][stream::Chain#impl-Chain-for-Vec], [`merge`][stream::Merge#impl-Merge-for-Vec], [`zip`][stream::Zip#impl-Zip-for-Vec] //! -//! ## Runtime Support +//! # Runtime Support //! //! `futures-concurrency` does not depend on any runtime executor being present. //! This enables it to work out of the box with any async runtime, including: @@ -124,7 +124,7 @@ //! `#[no_std]` environments, allowing it to be used with embedded async //! runtimes such as `embassy`. //! -//! ## Feature Flags +//! # Feature Flags //! //! The `std` feature flag is enabled by default. To target `alloc` or `no_std` //! environments, you can enable the following configuration: @@ -138,7 +138,7 @@ //! futures-concurrency = { version = "7.5.0", default-features = false, features = ["alloc"] } //! ``` //! -//! ## Further Reading +//! # Further Reading //! //! `futures-concurrency` has been developed over the span of several years. It is //! primarily maintained by Yosh Wuyts, a member of the Rust Async WG. You can read From 095ec15f6b50d8c85eaaa112c51f334e658473b1 Mon Sep 17 00:00:00 2001 From: Yosh Date: Sun, 7 Apr 2024 00:29:14 +0200 Subject: [PATCH 6/8] remove unused code from examples --- README.md | 2 -- src/lib.rs | 1 - 2 files changed, 3 deletions(-) diff --git a/README.md b/README.md index c21da78..3e4a2cc 100644 --- a/README.md +++ b/README.md @@ -67,9 +67,7 @@ assert_eq!((a, b, c).join().await, (1, "hello", 3)); ```rust use futures_concurrency::prelude::*; -use futures_lite::stream; -# futures::executor::block_on(async { let v: Vec<_> = vec!["chashu", "nori"] .into_co_stream() .map(|msg| async move { format!("hello {msg}") }) diff --git a/src/lib.rs b/src/lib.rs index 6743d30..ad3a6ca 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,7 +26,6 @@ //! //! ```rust //! use futures_concurrency::prelude::*; -//! use futures_lite::stream; //! //! # futures::executor::block_on(async { //! let v: Vec<_> = vec!["chashu", "nori"] From 47c965e4aeabde9ab1fda15e2029d551bdad9917 Mon Sep 17 00:00:00 2001 From: Yosh Date: Sun, 7 Apr 2024 17:40:37 +0200 Subject: [PATCH 7/8] clarify streaming concurrency --- src/lib.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index ad3a6ca..9fc2637 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -92,16 +92,16 @@ //! Streams yield outputs one-by-one, which means that deciding to stop iterating is //! the same for fallible and infallible streams. The operations provided for //! streams can be categorized based on whether their inputs can be concurrently -//! processed, and whether their outputs can be concurrently processed. +//! evaluated, and whether their outputs can be concurrently processed. //! //! Specifically in the case of `merge`, it takes `N` streams in, and yields items //! one-by-one as soon as any are available. This enables the output of individual //! streams to be concurrently processed by further operations later on. //! -//! | | __Sequential processing__ | __Concurrent processing__ | -//! | ------------------------ | --------------------- | --------------------- | -//! | __Sequential execution__ | `Stream::chain` | *not yet available* ‡ | -//! | __Concurrent execution__ | `Stream::zip` | `Stream::merge` | +//! | | __Sequential output processing__ | __Concurrent output processing__ | +//! | ------------------------------- | -------------------------------- | -------------------------------- | +//! | __Sequential input evaluation__ | `Stream::chain` | *not yet available* ‡ | +//! | __Concurrent input evaluation__ | `Stream::zip` | `Stream::merge` | //! //! ‡: _This could be addressed by a hypothetical `Stream::unzip` operation, //! however because we aspire for semantic compatibility with `std::iter::Iterator` From 220cb6e96ba4b4df9147e41cfe5d001bd0a204de Mon Sep 17 00:00:00 2001 From: Yosh Date: Fri, 12 Apr 2024 01:40:23 +0200 Subject: [PATCH 8/8] Implement feedback from review Co-Authored-By: Consoli --- src/concurrent_stream/mod.rs | 36 ++++++++++++++++++++++++++++++++++++ src/lib.rs | 4 ++-- 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/src/concurrent_stream/mod.rs b/src/concurrent_stream/mod.rs index 1ac985f..46de45b 100644 --- a/src/concurrent_stream/mod.rs +++ b/src/concurrent_stream/mod.rs @@ -1,4 +1,40 @@ //! Concurrent execution of streams +//! +//! # Examples +//! +//! **Concurrently process items in a collection** +//! +//! ```rust +//! use futures_concurrency::prelude::*; +//! +//! # futures::executor::block_on(async { +//! let v: Vec<_> = vec!["chashu", "nori"] +//! .into_co_stream() +//! .map(|msg| async move { format!("hello {msg}") }) +//! .collect() +//! .await; +//! +//! assert_eq!(v, &["hello chashu", "hello nori"]); +//! # }); +//! ``` +//! +//! **Concurrently process items in a stream** +//! +//! ```rust +//! use futures_concurrency::prelude::*; +//! use futures_lite::stream; +//! +//! # futures::executor::block_on(async { +//! let v: Vec<_> = stream::repeat("chashu") +//! .co() +//! .take(2) +//! .map(|msg| async move { format!("hello {msg}") }) +//! .collect() +//! .await; +//! +//! assert_eq!(v, &["hello chashu", "hello chashu"]); +//! # }); +//! ``` mod enumerate; mod for_each; diff --git a/src/lib.rs b/src/lib.rs index 9fc2637..d13f6aa 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -22,7 +22,7 @@ //! # }); //! ``` //! -//! **Concurrently process items in a stream** +//! **Concurrently process items in a collection** //! //! ```rust //! use futures_concurrency::prelude::*; @@ -110,7 +110,7 @@ //! The following streams implementations are provided by `futures-concurrency`: //! //! - [`StreamGroup`][stream::StreamGroup]: A growable group of streams which operate as a single unit. -//! - [`ConcurrentStream`][concurrent_stream::ConcurrentStream]: An asynchronous stream which can concurrently process items. +//! - [`ConcurrentStream`][concurrent_stream::ConcurrentStream]: A trait for asynchronous streams which can concurrently process items. //! - `tuple`: [`chain`][stream::Chain#impl-Chain-for-(A,+B)], [`merge`][stream::Merge#impl-Merge-for-(A,+B)], [`zip`][stream::Zip#impl-Zip-for-(A,+B)] //! - `array`: [`chain`][stream::Chain#impl-Chain-for-\[Fut;+N\]], [`merge`][stream::Merge#impl-Merge-for-\[Fut;+N\]], [`zip`][stream::Zip#impl-Zip-for-\[Fut;+N\]] //! - `Vec`: [`chain`][stream::Chain#impl-Chain-for-Vec], [`merge`][stream::Merge#impl-Merge-for-Vec], [`zip`][stream::Zip#impl-Zip-for-Vec]