Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update docs for docs.rs #174

Merged
merged 8 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,68 @@
</h3>
</div>

Performant, portable, structured concurrency operations for async Rust. It
works with any runtime, does not erase lifetimes, always handles
cancellation, and always returns output to the caller.

`futures-concurrency` provides concurrency operations for both groups of futures
and streams. Both for bounded and unbounded sets of futures and streams. In both
cases performance should be on par with, if not exceed conventional executor
implementations.

## Examples

**Await multiple futures of different types**
```rust
use futures_concurrency::prelude::*;
use std::future;

let a = future::ready(1u8);
let b = future::ready("hello");
let c = future::ready(3u16);
assert_eq!((a, b, c).join().await, (1, "hello", 3));
```

**Concurrently process items in a stream**

```rust
use futures_concurrency::prelude::*;

let v: Vec<_> = vec!["chashu", "nori"]
.into_co_stream()
.map(|msg| async move { format!("hello {msg}") })
.collect()
.await;

assert_eq!(v, &["hello chashu", "hello nori"]);
```

**Access stack data outside the futures' scope**

_Adapted from [`std::thread::scope`](https://doc.rust-lang.org/std/thread/fn.scope.html)._

```rust
use futures_concurrency::prelude::*;

let mut container = vec![1, 2, 3];
let mut num = 0;

let a = async {
println!("hello from the first future");
dbg!(&container);
};

let b = async {
println!("hello from the second future");
num += container[0] + container[2];
};

println!("hello from the main future");
let _ = (a, b).join().await;
container.push(4);
assert_eq!(num, container.len());
```

## Installation
```sh
$ cargo add futures-concurrency
Expand Down
2 changes: 1 addition & 1 deletion src/collections/vec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! You will rarely need to interact with this module directly unless you need
//! to name one of the iterator types.
//!
//! [std::vec]: https://doc.rust-lang.org/stable/std/vec/use core::future::Ready;
//! [std::vec]: https://doc.rust-lang.org/std/vec/index.html

use crate::concurrent_stream::{self, FromStream};
use crate::prelude::*;
Expand Down
36 changes: 36 additions & 0 deletions src/concurrent_stream/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,40 @@
//! Concurrent execution of streams
//!
//! # Examples
//!
//! **Concurrently process items in a collection**
//!
//! ```rust
//! use futures_concurrency::prelude::*;
//!
//! # futures::executor::block_on(async {
//! let v: Vec<_> = vec!["chashu", "nori"]
//! .into_co_stream()
//! .map(|msg| async move { format!("hello {msg}") })
//! .collect()
//! .await;
//!
//! assert_eq!(v, &["hello chashu", "hello nori"]);
//! # });
//! ```
//!
//! **Concurrently process items in a stream**
//!
//! ```rust
//! use futures_concurrency::prelude::*;
//! use futures_lite::stream;
//!
//! # futures::executor::block_on(async {
//! let v: Vec<_> = stream::repeat("chashu")
//! .co()
//! .take(2)
//! .map(|msg| async move { format!("hello {msg}") })
//! .collect()
//! .await;
//!
//! assert_eq!(v, &["hello chashu", "hello chashu"]);
//! # });
//! ```

mod enumerate;
mod for_each;
Expand Down
178 changes: 134 additions & 44 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,62 +1,152 @@
//! Concurrency extensions for [`Future`][core::future::Future] and `Stream`
//! (also known as [`AsyncIterator`][core::async_iter::AsyncIterator]).
//! Performant, portable, structured concurrency operations for async Rust. It
//! works with any runtime, does not erase lifetimes, always handles
//! cancellation, and always returns output to the caller.
//!
//! Companion library for the "Futures Concurrency" blog post
//! series:
//! - [Futures Concurrency I: Introduction](https://blog.yoshuawuyts.com/futures-concurrency/)
//! - [Futures Concurrency II: A Trait Approach](https://blog.yoshuawuyts.com/futures-concurrency-2/)
//! - [Futures Concurrency III: `select!`](https://blog.yoshuawuyts.com/futures-concurrency-3/)
//! - [Futures Concurrency IV: Join Semantics](https://blog.yoshuawuyts.com/futures-concurrency-4/)
//! `futures-concurrency` provides concurrency operations for both groups of futures
//! and streams. Both for bounded and unbounded sets of futures and streams. In both
//! cases performance should be on par with, if not exceed conventional executor
//! implementations.
//!
//! The purpose of this library is to serve as a staging ground for what
//! eventually may become the futures concurrency methods provided by the
//! stdlib. See the [`future`] and [`stream`] submodules for more.
//! # Examples
//!
//! # Operations
//! **Await multiple futures of different types**
//! ```rust
//! use futures_concurrency::prelude::*;
//! use std::future;
//!
//! This library provides the following operations on arrays, vecs, and tuples:
//! # futures::executor::block_on(async {
//! let a = future::ready(1u8);
//! let b = future::ready("hello");
//! let c = future::ready(3u16);
//! assert_eq!((a, b, c).join().await, (1, "hello", 3));
//! # });
//! ```
//!
//! - [`future::Join`]: Wait for all futures to complete.
//! - [`future::TryJoin`]: Wait for all futures to complete successfully, or abort early on error.
//! - [`future::Race`]: Wait for the first future to complete.
//! - [`future::RaceOk`]: Wait for the first successful future to complete.
//! - [`stream::Chain`]: Takes multiple streams and creates a new stream over all in sequence.
//! - [`stream::Merge`]: Combines multiple streams into a single stream of all their outputs.
//! - [`stream::Zip`]: ‘Zips up’ multiple streams into a single stream of pairs.
//! **Concurrently process items in a collection**
//!
//! # Examples
//! ```rust
//! use futures_concurrency::prelude::*;
//!
//! # futures::executor::block_on(async {
//! let v: Vec<_> = vec!["chashu", "nori"]
//! .into_co_stream()
//! .map(|msg| async move { format!("hello {msg}") })
//! .collect()
//! .await;
//!
//! assert_eq!(v, &["hello chashu", "hello nori"]);
//! # });
//! ```
//!
//! **Access stack data outside the futures' scope**
//!
//! _Adapted from [`std::thread::scope`](https://doc.rust-lang.org/std/thread/fn.scope.html)._
//!
//! Concurrently await multiple heterogenous futures:
//! ```rust
//! use futures_concurrency::prelude::*;
//! use futures_lite::future::block_on;
//! use std::future;
//!
//! block_on(async {
//! let a = future::ready(1u8);
//! let b = future::ready("hello");
//! let c = future::ready(3u16);
//! assert_eq!((a, b, c).join().await, (1, "hello", 3));
//! })
//! # futures::executor::block_on(async {
//! let mut container = vec![1, 2, 3];
//! let mut num = 0;
//!
//! let a = async {
//! println!("hello from the first future");
//! dbg!(&container);
//! };
//!
//! let b = async {
//! println!("hello from the second future");
//! num += container[0] + container[2];
//! };
//!
//! println!("hello from the main future");
//! let _ = (a, b).join().await;
//! container.push(4);
//! assert_eq!(num, container.len());
//! # });
//! ```
//!
//! # Limitations
//! # Operations
//!
//! ## Futures
//!
//! For futures which return a regular type `T` only the `join` and `race`
//! operations are available. `join` waits for all futures to complete, while `race`
//! will wait for the first future to complete. However for futures which return a
//! `Try<Output = T>` two additional operations are available. The following table
//! describes the behavior of concurrency operations for fallible futures:
//!
//! | | **Wait for all outputs** | **Wait for first output** |
//! | -------------------------- | :----------------------- | :------------------------ |
//! | **Continue on error** | `Future::join` | `Future::race_ok` |
//! | **Short-circuit on error** | `Future::try_join` | `Future::race` |
//!
//! The following futures implementations are provided by `futures-concurrency`:
//! - [`FutureGroup`][future::FutureGroup]: A growable group of futures which operate as a single unit.
//! - `tuple`: [`join`][future::Join#impl-Join-for-(A,+B)], [`try_join`][future::TryJoin#impl-TryJoin-for-(A,+B)], [`race`][future::Race#impl-Race-for-(A,+B)], [`race_ok`][future::RaceOk#impl-RaceOk-for-(A,+B)]
//! - `array`: [`join`][future::Join#impl-Join-for-\[Fut;+N\]], [`try_join`][future::TryJoin#impl-TryJoin-for-\[Fut;+N\]], [`race`][future::Race#impl-Race-for-\[Fut;+N\]], [`race_ok`][future::RaceOk#impl-RaceOk-for-\[Fut;+N\]]
//! - `Vec`: [`join`][future::Join#impl-Join-for-Vec<Fut>], [`try_join`][future::TryJoin#impl-TryJoin-for-Vec<Fut>], [`race`][future::Race#impl-Race-for-Vec<Fut>], [`race_ok`][future::RaceOk#impl-RaceOk-for-Vec<Fut>]
//!
//! ## Streams
//!
//! Streams yield outputs one-by-one, which means that deciding to stop iterating is
//! the same for fallible and infallible streams. The operations provided for
//! streams can be categorized based on whether their inputs can be concurrently
//! evaluated, and whether their outputs can be concurrently processed.
//!
//! Specifically in the case of `merge`, it takes `N` streams in, and yields items
//! one-by-one as soon as any are available. This enables the output of individual
//! streams to be concurrently processed by further operations later on.
//!
//! | | __Sequential output processing__ | __Concurrent output processing__ |
//! | ------------------------------- | -------------------------------- | -------------------------------- |
//! | __Sequential input evaluation__ | `Stream::chain` | *not yet available* ‡ |
//! | __Concurrent input evaluation__ | `Stream::zip` | `Stream::merge` |
//!
//! ‡: _This could be addressed by a hypothetical `Stream::unzip` operation,
//! however because we aspire for semantic compatibility with `std::iter::Iterator`
//! in our operations, the path to adding it is currently unclear_.
//!
//! The following streams implementations are provided by `futures-concurrency`:
//!
//! - [`StreamGroup`][stream::StreamGroup]: A growable group of streams which operate as a single unit.
//! - [`ConcurrentStream`][concurrent_stream::ConcurrentStream]: A trait for asynchronous streams which can concurrently process items.
//! - `tuple`: [`chain`][stream::Chain#impl-Chain-for-(A,+B)], [`merge`][stream::Merge#impl-Merge-for-(A,+B)], [`zip`][stream::Zip#impl-Zip-for-(A,+B)]
//! - `array`: [`chain`][stream::Chain#impl-Chain-for-\[Fut;+N\]], [`merge`][stream::Merge#impl-Merge-for-\[Fut;+N\]], [`zip`][stream::Zip#impl-Zip-for-\[Fut;+N\]]
//! - `Vec`: [`chain`][stream::Chain#impl-Chain-for-Vec<Fut>], [`merge`][stream::Merge#impl-Merge-for-Vec<Fut>], [`zip`][stream::Zip#impl-Zip-for-Vec<Fut>]
//!
//! Because of orphan rules this library can't implement everything the stdlib
//! can. The missing implementations are:
//! # Runtime Support
//!
//! - `impl<T> IntoFuture for Vec<T>`
//! - `impl<T, const N: usize> IntoFuture for [T; N]`
//! - `impl<T..> IntoFuture for (T..)`
//! - `impl<T> IntoAsyncIterator for Vec<T>`
//! - `impl<T, const N: usize> IntoAsyncIterator for [T; N]`
//! - `impl<T..> IntoAsyncIterator for (T..)`
//! `futures-concurrency` does not depend on any runtime executor being present.
//! This enables it to work out of the box with any async runtime, including:
//! `tokio`, `async-std`, `smol`, `glommio`, and `monoio`. It also supports
//! `#[no_std]` environments, allowing it to be used with embedded async
//! runtimes such as `embassy`.
//!
//! This would enable containers of futures to directly be `.await`ed to get
//! `merge` semantics. Or containers of async iterators to be passed directly to
//! `for..await in` loops to be iterated over using `merge` semantics. This would
//! remove the need to think of "merge" as a verb, and would enable treating
//! sets of futures concurrently.
//! # Feature Flags
//!
//! The `std` feature flag is enabled by default. To target `alloc` or `no_std`
//! environments, you can enable the following configuration:
//!
//! ```toml
//! [dependencies]
//! # no_std
//! futures-concurrency = { version = "7.5.0", default-features = false }
//!
//! # alloc
//! futures-concurrency = { version = "7.5.0", default-features = false, features = ["alloc"] }
//! ```
//!
//! # Further Reading
//!
//! `futures-concurrency` has been developed over the span of several years. It is
//! primarily maintained by Yosh Wuyts, a member of the Rust Async WG. You can read
//! more about the development and ideas behind `futures-concurrency` here:
//!
//! - [Futures Concurrency I: Introduction](https://blog.yoshuawuyts.com/futures-concurrency/)
//! - [Futures Concurrency II: A Trait Approach](https://blog.yoshuawuyts.com/futures-concurrency-2/)
//! - [Futures Concurrency III: `select!`](https://blog.yoshuawuyts.com/futures-concurrency-3/)
//! - [Futures Concurrency IV: Join Semantics](https://blog.yoshuawuyts.com/futures-concurrency-4/)

#![deny(missing_debug_implementations, nonstandard_style)]
#![warn(missing_docs)]
Expand Down
Loading