-
Notifications
You must be signed in to change notification settings - Fork 76
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: split WebSocket #48
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for putting a cargo feature to gate it 👍 good idea
@mmastrac Code looks good but this example server is not working: use fastwebsockets::upgrade;
use fastwebsockets::OpCode;
use fastwebsockets::WebSocketError;
use hyper::server::conn::Http;
use hyper::service::service_fn;
use hyper::Body;
use hyper::Request;
use hyper::Response;
use tokio::net::TcpListener;
async fn handle_client(fut: upgrade::UpgradeFut) -> Result<(), WebSocketError> {
let ws = fut.await?;
let (mut rx, mut tx) = ws.split(|ws| tokio::io::split(ws));
loop {
// Empty send_fn is fine because the benchmark does not create obligated writes.
let frame = rx.read_frame(&mut move |_| async { Ok::<_, WebSocketError>(()) }).await?;
match frame.opcode {
OpCode::Close => break,
OpCode::Text | OpCode::Binary => {
tx.write_frame(frame).await?;
}
_ => {}
}
}
Ok(())
}
async fn server_upgrade(
mut req: Request<Body>,
) -> Result<Response<Body>, WebSocketError> {
let (response, fut) = upgrade::upgrade(&mut req)?;
tokio::task::spawn(async move {
if let Err(e) = tokio::task::unconstrained(handle_client(fut)).await {
eprintln!("Error in websocket connection: {}", e);
}
});
Ok(response)
}
fn main() -> Result<(), WebSocketError> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_io()
.build()
.unwrap();
rt.block_on(async move {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("Server started, listening on {}", "127.0.0.1:8080");
loop {
let (stream, _) = listener.accept().await?;
println!("Client connected");
tokio::spawn(async move {
let conn_fut = Http::new()
.serve_connection(stream, service_fn(server_upgrade))
.with_upgrades();
if let Err(e) = conn_fut.await {
println!("An error occurred: {:?}", e);
}
});
}
})
} Running load_test:
|
I can repro that problem here. Digging into it. |
@littledivy Ah. It's because there wasn't a fragment collector on the read half. I will add some documentation around this. |
Ran the two benchmarks and it looks pretty good: Split:
Unsplit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Fixes #40. Working on this in concert with denoland/deno#20579 and #49
This API is not final
To create a split WebSocket, you can either call:
WebSocket::split
, passing a function that can split a stream into read and write halves (tokio::io::split
is suitable for this, as well asTcpStream::into_split
https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.into_split), orafter_handshake_split
, passing read and write streams.From the
WebSocketRead
andWebSocketWrite
halves, you'll need to ensure that obligated writes fromWebSocketRead::read_frame
are passed towrite_frame
. This is done by passing a closure that returns a future.Note that a
FragmentCollectorRead
may be created from aWebSocketRead
in the same way thatFragmentCollector
may be created from aWebSocket
.In the case of Deno's split socket, the read process looks like this: