Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stream and try_stream functions #74

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 24 additions & 36 deletions async-stream-impl/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use proc_macro::TokenStream;
use proc_macro2::{Group, TokenStream as TokenStream2, TokenTree};
use proc_macro2::{Group, Ident, Span, TokenStream as TokenStream2, TokenTree};
use quote::quote;
use syn::parse::{Parse, ParseStream, Parser, Result};
use syn::visit_mut::VisitMut;

struct Scrub<'a> {
/// Whether the stream is a try stream.
is_try: bool,
/// The identifier of the local `Stream<T>` variable.
stream: &'a Ident,
/// The unit expression, `()`.
unit: Box<syn::Expr>,
has_yielded: bool,
Expand All @@ -24,9 +24,9 @@ fn parse_input(input: TokenStream) -> syn::Result<(TokenStream2, Vec<syn::Stmt>)
}

impl<'a> Scrub<'a> {
fn new(is_try: bool, crate_path: &'a TokenStream2) -> Self {
fn new(stream: &'a Ident, crate_path: &'a TokenStream2) -> Self {
Self {
is_try,
stream,
unit: syn::parse_quote!(()),
has_yielded: false,
crate_path,
Expand Down Expand Up @@ -118,28 +118,10 @@ impl VisitMut for Scrub<'_> {

let value_expr = yield_expr.expr.as_ref().unwrap_or(&self.unit);

// let ident = &self.yielder;

*i = if self.is_try {
syn::parse_quote! { __yield_tx.send(::core::result::Result::Ok(#value_expr)).await }
} else {
syn::parse_quote! { __yield_tx.send(#value_expr).await }
};
}
syn::Expr::Try(try_expr) => {
syn::visit_mut::visit_expr_try_mut(self, try_expr);
// let ident = &self.yielder;
let e = &try_expr.expr;

*i = syn::parse_quote! {
match #e {
::core::result::Result::Ok(v) => v,
::core::result::Result::Err(e) => {
__yield_tx.send(::core::result::Result::Err(e.into())).await;
return;
}
}
};
let stream = &self.stream;
*i = syn::Expr::Verbatim(quote! {
#stream.yield_item(#value_expr).await
})
}
syn::Expr::Closure(_) | syn::Expr::Async(_) => {
// Don't transform inner closures or async blocks.
Expand Down Expand Up @@ -212,7 +194,9 @@ pub fn stream_inner(input: TokenStream) -> TokenStream {
Err(e) => return e.to_compile_error().into(),
};

let mut scrub = Scrub::new(false, &crate_path);
let stream = Ident::new("stream", Span::mixed_site());

let mut scrub = Scrub::new(&stream, &crate_path);

for stmt in &mut stmts {
scrub.visit_stmt_mut(stmt);
Expand All @@ -222,13 +206,12 @@ pub fn stream_inner(input: TokenStream) -> TokenStream {
None
} else {
Some(quote!(if false {
__yield_tx.send(()).await;
#stream.yield_item(()).await;
}))
};

quote!({
let (mut __yield_tx, __yield_rx) = #crate_path::yielder::pair();
#crate_path::AsyncStream::new(__yield_rx, async move {
#crate_path::stream(move |mut #stream| async move {
#dummy_yield
#(#stmts)*
})
Expand All @@ -246,7 +229,9 @@ pub fn try_stream_inner(input: TokenStream) -> TokenStream {
Err(e) => return e.to_compile_error().into(),
};

let mut scrub = Scrub::new(true, &crate_path);
let stream = Ident::new("stream", Span::mixed_site());

let mut scrub = Scrub::new(&stream, &crate_path);

for stmt in &mut stmts {
scrub.visit_stmt_mut(stmt);
Expand All @@ -256,15 +241,18 @@ pub fn try_stream_inner(input: TokenStream) -> TokenStream {
None
} else {
Some(quote!(if false {
__yield_tx.send(()).await;
#stream.yield_item(()).await;
}))
};

quote!({
let (mut __yield_tx, __yield_rx) = #crate_path::yielder::pair();
#crate_path::AsyncStream::new(__yield_rx, async move {
#crate_path::try_stream(move |mut #stream| async move {
#dummy_yield
#(#stmts)*
let () = {
#(#stmts)*
};
#[allow(unreachable_code)]
Ok(())
})
})
.into()
Expand Down
11 changes: 9 additions & 2 deletions async-stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,21 @@ authors = ["Carl Lerche <[email protected]>"]
description = "Asynchronous streams using async & await notation"
repository = "https://github.com/tokio-rs/async-stream"

[features]
macro = ["async-stream-impl"]

[dependencies]
async-stream-impl = { version = "=0.3.3", path = "../async-stream-impl" }
async-stream-impl = { version = "=0.3.3", path = "../async-stream-impl", optional = true }
futures-core = "0.3"
pin-project-lite = "0.2"
pin-project-lite = "0.2.8"

[dev-dependencies]
futures-util = "0.3"
rustversion = "1"
tokio = { version = "1", features = ["full"] }
tokio-test = "0.4"
trybuild = "1"

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "doc_nightly"]
127 changes: 75 additions & 52 deletions async-stream/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@

Asynchronous stream of elements.

Provides two macros, `stream!` and `try_stream!`, allowing the caller to
Provides two functions, `stream` and `try_stream`, allowing the caller to
define asynchronous streams of elements. These are implemented using `async`
& `await` notation. This crate works without unstable features.

The `stream!` macro returns an anonymous type implementing the [`Stream`]
The `stream` function returns an anonymous type implementing the [`Stream`]
trait. The `Item` associated type is the type of the values yielded from the
stream. The `try_stream!` also returns an anonymous type implementing the
stream. The `try_stream` also returns an anonymous type implementing the
[`Stream`] trait, but the `Item` associated type is `Result<T, Error>`. The
`try_stream!` macro supports using `?` notation as part of the
`try_stream` function supports using `?` notation as part of the
implementation.

## Usage
## Function Usage

A basic stream yielding numbers. Values are yielded using the `yield`
keyword. The stream block must return `()`.
Expand All @@ -26,11 +26,11 @@ use futures_util::stream::StreamExt;

#[tokio::main]
async fn main() {
let s = stream! {
let s = stream(|mut stream| async move {
for i in 0..3 {
yield i;
stream.yield_item(i).await;
}
};
});

pin_mut!(s); // needed for iteration

Expand All @@ -50,11 +50,11 @@ use futures_util::pin_mut;
use futures_util::stream::StreamExt;

fn zero_to_three() -> impl Stream<Item = u32> {
stream! {
stream(|mut stream| async move {
for i in 0..3 {
yield i;
stream.yield_item(i).await;
}
}
})
}

#[tokio::main]
Expand All @@ -68,8 +68,57 @@ async fn main() {
}
```

Streams may be implemented in terms of other streams - `async-stream` provides `for await`
syntax to assist with this:
Rust try notation (`?`) can be used with the `try_stream` function. The
`Item` of the returned stream is `Result` with `Ok` being the value yielded
and `Err` the error type returned by `?`.

```rust
use tokio::net::{TcpListener, TcpStream};

use async_stream::try_stream;
use futures_core::stream::Stream;

use std::io;
use std::net::SocketAddr;

fn bind_and_accept(addr: SocketAddr)
-> impl Stream<Item = io::Result<TcpStream>>
{
try_stream(move |mut stream| async move {
let mut listener = TcpListener::bind(addr).await?;

loop {
let (tcp_stream, addr) = listener.accept().await?;
println!("received on {:?}", addr);
stream.yield_item(tcp_stream).await;
}
})
}
```

## Macro usage

When the `macro` Cargo feature flag is enabled, this crate additionally
provides two macros, `stream!` and `try_stream!`, that are identical to
their equivalent functions but have a slightly nicer syntax where you can
write `yield x` instead of `stream.yield_item(x).await`. For example:

```rust
use async_stream::stream;

use futures_core::stream::Stream;

fn zero_to_three() -> impl Stream<Item = u32> {
stream! {
for i in 0..3 {
yield i;
}
}
}
```

Streams may be implemented in terms of other streams - the macros provide
`for await` syntax to assist with this:

```rust
use async_stream::stream;
Expand Down Expand Up @@ -107,52 +156,26 @@ async fn main() {
}
```

Rust try notation (`?`) can be used with the `try_stream!` macro. The `Item`
of the returned stream is `Result` with `Ok` being the value yielded and
`Err` the error type returned by `?`.

```rust
use tokio::net::{TcpListener, TcpStream};

use async_stream::try_stream;
use futures_core::stream::Stream;

use std::io;
use std::net::SocketAddr;

fn bind_and_accept(addr: SocketAddr)
-> impl Stream<Item = io::Result<TcpStream>>
{
try_stream! {
let mut listener = TcpListener::bind(addr).await?;

loop {
let (stream, addr) = listener.accept().await?;
println!("received on {:?}", addr);
yield stream;
}
}
}
```

## Implementation

The `stream!` and `try_stream!` macros are implemented using proc macros.
The macro searches the syntax tree for instances of `sender.send($expr)` and
transforms them into `sender.send($expr).await`.

The stream uses a lightweight sender to send values from the stream
The streams use a lightweight sender to send values from the stream
implementation to the caller. When entering the stream, an `Option<T>` is
stored on the stack. A pointer to the cell is stored in a thread local and
`poll` is called on the async block. When `poll` returns.
`sender.send(value)` stores the value that cell and yields back to the
caller.
stored on the stack. A pointer to the slot is stored in a thread local and
`poll` is called on the future. When `yield` is called, the value is stored
in there and the stream yields back to the caller.

[`Stream`]: https://docs.rs/futures-core/*/futures_core/stream/trait.Stream.html
The `stream!` and `try_stream!` macros are implemented using proc macros.
The macro searches the syntax tree for instances of `yield $expr` and
transforms them into `stream.yield_item($expr).await`.

## Supported Rust Versions

`async-stream` is built against the latest stable release. The minimum supported version is 1.45 due to [function-like procedural macros in expression, pattern, and statement positions](https://blog.rust-lang.org/2020/07/16/Rust-1.45.0.html#stabilizing-function-like-procedural-macros-in-expressions-patterns-and-statements).
`async-stream` is built against the latest stable release. The minimum
supported version is 1.45 due to [function-like procedural macros in
expression, pattern, and statement positions][1.45].

[`Stream`]: https://docs.rs/futures-core/*/futures_core/stream/trait.Stream.html
[1.45]: https://blog.rust-lang.org/2020/07/16/Rust-1.45.0.html#stabilizing-function-like-procedural-macros-in-expressions-patterns-and-statements

## License

Expand Down
6 changes: 3 additions & 3 deletions async-stream/examples/tcp_accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ use tokio::net::TcpListener;
async fn main() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();

let incoming = stream! {
let incoming = stream(|mut stream| async move {
loop {
let (socket, _) = listener.accept().await.unwrap();
yield socket;
stream.yield_item(socket).await;
}
};
});
pin_mut!(incoming);

while let Some(v) = incoming.next().await {
Expand Down
Loading