Skip to content

Commit

Permalink
Merge pull request #151 from yoshuawuyts/lending-stream-examples
Browse files Browse the repository at this point in the history
Lending stream examples
  • Loading branch information
yoshuawuyts authored Aug 14, 2023
2 parents edbadad + 96f1088 commit 5f25559
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 12 deletions.
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ slab = "0.4.8"
smallvec = "1.11.0"

[dev-dependencies]
async-std = { version = "1.12.0", features = ["attributes"] }
criterion = { version = "0.3", features = ["async", "async_futures", "html_reports"] }
futures = "0.3.25"
futures-lite = "1.12.0"
criterion = { version = "0.3", features = ["async", "async_futures", "html_reports"] }
async-std = { version = "1.12.0", features = ["attributes"] }
futures-time = "3.0.0"
lending-stream = "1.0.0"
rand = "0.8.5"
35 changes: 30 additions & 5 deletions src/future/future_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,10 @@ use crate::utils::{PollState, PollVec, WakerVec};

/// A growable group of futures which act as a single unit.
///
/// In order go mutate the group during iteration, the future group should be
/// combined with a mechanism such as
/// [`lend_mut`](https://docs.rs/async-iterator/latest/async_iterator/trait.Iterator.html#method.lend_mut).
/// This is not yet provided by the `futures-concurrency` crate.
///
/// # Example
///
/// **Basic example**
///
/// ```rust
/// use futures_concurrency::future::FutureGroup;
/// use futures_lite::StreamExt;
Expand All @@ -35,6 +32,32 @@ use crate::utils::{PollState, PollVec, WakerVec};
/// assert_eq!(out, 6);
/// # });
/// ```
///
/// **Update the group on every iteration**
///
/// ```
/// use futures_concurrency::future::FutureGroup;
/// use lending_stream::prelude::*;
/// use std::future;
///
/// # fn main() { futures_lite::future::block_on(async {
/// let mut group = FutureGroup::new();
/// group.insert(future::ready(4));
///
/// let mut index = 3;
/// let mut out = 0;
/// let mut group = group.lend_mut();
/// while let Some((group, num)) = group.next().await {
/// if index != 0 {
/// group.insert(future::ready(index));
/// index -= 1;
/// }
/// out += num;
/// }
/// assert_eq!(out, 10);
/// # });}
/// ```
#[must_use = "`FutureGroup` does nothing if not iterated over"]
#[derive(Default)]
#[pin_project::pin_project]
Expand Down Expand Up @@ -214,6 +237,8 @@ impl<F: Future> FutureGroup<F> {

// Set the corresponding state
self.states[index].set_pending();
let mut readiness = self.wakers.readiness().lock().unwrap();
readiness.set_ready(index);

key
}
Expand Down
34 changes: 29 additions & 5 deletions src/stream/stream_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,10 @@ use crate::utils::{PollState, PollVec, WakerVec};

/// A growable group of streams which act as a single unit.
///
/// In order go mutate the group during iteration, the stream should be
/// combined with a mechanism such as
/// [`lend_mut`](https://docs.rs/async-iterator/latest/async_iterator/trait.Iterator.html#method.lend_mut).
/// This is not yet provided by the `futures-concurrency` crate.
///
/// # Example
///
/// **Basic example**
///
/// ```rust
/// use futures_concurrency::stream::StreamGroup;
/// use futures_lite::{stream, StreamExt};
Expand All @@ -34,6 +31,31 @@ use crate::utils::{PollState, PollVec, WakerVec};
/// assert_eq!(out, 6);
/// # });
/// ```
///
/// **Update the group on every iteration**
///
/// ```rust
/// use futures_concurrency::stream::StreamGroup;
/// use lending_stream::prelude::*;
/// use futures_lite::stream;
///
/// # futures_lite::future::block_on(async {
/// let mut group = StreamGroup::new();
/// group.insert(stream::once(4));
/// let mut index = 3;
/// let mut out = 0;
/// let mut group = group.lend_mut();
/// while let Some((group, num)) = group.next().await {
/// if index != 0 {
/// group.insert(stream::once(index));
/// index -= 1;
/// }
/// out += num;
/// }
/// assert_eq!(out, 10);
/// # });
/// ```
#[must_use = "`StreamGroup` does nothing if not iterated over"]
#[derive(Default)]
#[pin_project::pin_project]
Expand Down Expand Up @@ -214,6 +236,8 @@ impl<S: Stream> StreamGroup<S> {

// Set the corresponding state
self.states[index].set_pending();
let mut readiness = self.wakers.readiness().lock().unwrap();
readiness.set_ready(index);

key
}
Expand Down

0 comments on commit 5f25559

Please sign in to comment.