From 92d5308b7fd41e286018bcdcb514ab36db31f49d Mon Sep 17 00:00:00 2001 From: ruben Date: Wed, 5 Jun 2024 23:01:59 +0200 Subject: [PATCH 01/11] stuff --- examples/Cargo.toml | 6 +- examples/webtransport_server.rs | 2 +- h3-quinn/src/lib.rs | 138 ++++++++++++++++++++++---------- h3-webtransport/src/lib.rs | 10 +-- h3-webtransport/src/server.rs | 2 +- h3/src/client/connection.rs | 31 ++++--- h3/src/connection.rs | 59 ++++++-------- h3/src/quic.rs | 31 ++++--- h3/src/server/connection.rs | 32 +++++--- h3/src/tests/connection.rs | 10 ++- h3/src/tests/mod.rs | 6 +- 11 files changed, 197 insertions(+), 130 deletions(-) diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 6cdf5feb..a6a904ed 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -47,6 +47,6 @@ path = "client.rs" name = "server" path = "server.rs" -[[example]] -name = "webtransport_server" -path = "webtransport_server.rs" +#[[example]] +#name = "webtransport_server" +#path = "webtransport_server.rs" diff --git a/examples/webtransport_server.rs b/examples/webtransport_server.rs index 9ecc7d15..180bc19e 100644 --- a/examples/webtransport_server.rs +++ b/examples/webtransport_server.rs @@ -156,7 +156,7 @@ async fn main() -> Result<(), Box> { Ok(()) } -async fn handle_connection(mut conn: Connection) -> Result<()> { +async fn handle_connection(mut conn: Connection, Bytes>) -> Result<()> { // 3. TODO: Conditionally, if the client indicated that this is a webtransport session, we should accept it here, else use regular h3. // if this is a webtransport session, then h3 needs to stop handing the datagrams, bidirectional streams, and unidirectional streams and give them // to the webtransport session. diff --git a/h3-quinn/src/lib.rs b/h3-quinn/src/lib.rs index 7819b383..09803964 100644 --- a/h3-quinn/src/lib.rs +++ b/h3-quinn/src/lib.rs @@ -16,46 +16,82 @@ use bytes::{Buf, Bytes, BytesMut}; use futures::{ ready, - stream::{self, BoxStream}, + stream::{self, select, BoxStream, Select}, StreamExt, }; -pub use quinn::{self, AcceptBi, AcceptUni, Endpoint, OpenBi, OpenUni, VarInt, WriteError}; +pub use quinn::{self, Endpoint, OpenBi, OpenUni, VarInt, WriteError}; use quinn::{ApplicationClose, ClosedStream, ReadDatagram}; use h3::{ ext::Datagram, - quic::{self, Error, StreamId, WriteBuf}, + quic::{self, Error, IncomingStreamType, StreamId, WriteBuf}, }; use tokio_util::sync::ReusableBoxFuture; /// A QUIC connection backed by Quinn /// /// Implements a [`quic::Connection`] backed by a [`quinn::Connection`]. -pub struct Connection { +pub struct Connection +where + B: Buf, +{ conn: quinn::Connection, - incoming_bi: BoxStream<'static, as Future>::Output>, opening_bi: Option as Future>::Output>>, - incoming_uni: BoxStream<'static, as Future>::Output>, opening_uni: Option as Future>::Output>>, datagrams: BoxStream<'static, as Future>::Output>, -} - -impl Connection { + incoming: Select< + BoxStream< + 'static, + Result, RecvStream, B>, quinn::ConnectionError>, + >, + BoxStream< + 'static, + Result, RecvStream, B>, quinn::ConnectionError>, + >, + >, +} + +impl Connection +where + B: Buf, +{ /// Create a [`Connection`] from a [`quinn::Connection`] pub fn new(conn: quinn::Connection) -> Self { + let incoming_uni = Box::pin(stream::unfold(conn.clone(), |conn| async { + Some(( + conn.accept_uni().await.map(|recv_stream| { + IncomingStreamType::, RecvStream, B>::Unidirectional( + RecvStream::new(recv_stream), + ) + }), + conn, + )) + })); + let incoming_bi = Box::pin(stream::unfold(conn.clone(), |conn| async { + Some(( + conn.accept_bi().await.map(|bidi_stream| { + IncomingStreamType::, RecvStream, B>::Bidirectional( + BidiStream { + send: SendStream::new(bidi_stream.0), + recv: RecvStream::new(bidi_stream.1), + }, + std::marker::PhantomData, + ) + }), + conn, + )) + })); + Self { conn: conn.clone(), - incoming_bi: Box::pin(stream::unfold(conn.clone(), |conn| async { - Some((conn.accept_bi().await, conn)) - })), + opening_bi: None, - incoming_uni: Box::pin(stream::unfold(conn.clone(), |conn| async { - Some((conn.accept_uni().await, conn)) - })), + opening_uni: None, datagrams: Box::pin(stream::unfold(conn, |conn| async { Some((conn.read_datagram().await, conn)) })), + incoming: select(incoming_bi, incoming_uni), } } } @@ -147,7 +183,7 @@ impl From for SendDatagramError { } } -impl quic::Connection for Connection +impl quic::Connection for Connection where B: Buf, { @@ -155,30 +191,30 @@ where type OpenStreams = OpenStreams; type AcceptError = ConnectionError; - fn poll_accept_bidi( - &mut self, - cx: &mut task::Context<'_>, - ) -> Poll, Self::AcceptError>> { - let (send, recv) = match ready!(self.incoming_bi.poll_next_unpin(cx)) { - Some(x) => x?, - None => return Poll::Ready(Ok(None)), - }; - Poll::Ready(Ok(Some(Self::BidiStream { - send: Self::SendStream::new(send), - recv: Self::RecvStream::new(recv), - }))) - } - - fn poll_accept_recv( - &mut self, - cx: &mut task::Context<'_>, - ) -> Poll, Self::AcceptError>> { - let recv = match ready!(self.incoming_uni.poll_next_unpin(cx)) { - Some(x) => x?, - None => return Poll::Ready(Ok(None)), - }; - Poll::Ready(Ok(Some(Self::RecvStream::new(recv)))) - } + // fn poll_accept_bidi( + // &mut self, + // cx: &mut task::Context<'_>, + // ) -> Poll, Self::AcceptError>> { + // let (send, recv) = match ready!(self.incoming_bi.poll_next_unpin(cx)) { + // Some(x) => x?, + // None => return Poll::Ready(Ok(None)), + // }; + // Poll::Ready(Ok(Some(Self::BidiStream { + // send: Self::SendStream::new(send), + // recv: Self::RecvStream::new(recv), + // }))) + // } + + // fn poll_accept_recv( + // &mut self, + // cx: &mut task::Context<'_>, + // ) -> Poll, Self::AcceptError>> { + // let recv = match ready!(self.incoming_uni.poll_next_unpin(cx)) { + // Some(x) => x?, + // None => return Poll::Ready(Ok(None)), + // }; + // Poll::Ready(Ok(Some(Self::RecvStream::new(recv)))) + // } fn opener(&self) -> Self::OpenStreams { OpenStreams { @@ -187,9 +223,22 @@ where opening_uni: None, } } + + fn poll_incoming( + &mut self, + cx: &mut task::Context<'_>, + ) -> Poll, Self::AcceptError>> + { + // put the two streams together + + match ready!(self.incoming.poll_next_unpin(cx)).unwrap() { + Ok(x) => Poll::Ready(Ok(x)), + Err(e) => Poll::Ready(Err(ConnectionError(e).into())), + } + } } -impl quic::OpenStreams for Connection +impl quic::OpenStreams for Connection where B: Buf, { @@ -237,7 +286,7 @@ where } } -impl quic::SendDatagramExt for Connection +impl quic::SendDatagramExt for Connection where B: Buf, { @@ -253,7 +302,10 @@ where } } -impl quic::RecvDatagramExt for Connection { +impl quic::RecvDatagramExt for Connection +where + B: Buf, +{ type Buf = Bytes; type Error = ConnectionError; diff --git a/h3-webtransport/src/lib.rs b/h3-webtransport/src/lib.rs index 85762d45..f3988d60 100644 --- a/h3-webtransport/src/lib.rs +++ b/h3-webtransport/src/lib.rs @@ -5,9 +5,9 @@ //! WebTransport over HTTP/3: #![deny(missing_docs)] -/// Server side WebTransport session support -pub mod server; -/// Webtransport stream types -pub mod stream; +// /// Server side WebTransport session support +//pub mod server; +// /// Webtransport stream types +//pub mod stream; -pub use h3::webtransport::SessionId; +//pub use h3::webtransport::SessionId; diff --git a/h3-webtransport/src/server.rs b/h3-webtransport/src/server.rs index 05970b61..2f016b14 100644 --- a/h3-webtransport/src/server.rs +++ b/h3-webtransport/src/server.rs @@ -410,7 +410,7 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut conn = self.conn.lock().unwrap(); - conn.inner.poll_accept_recv(cx)?; + conn.inner.poll_handle_receive_stream(cx)?; // Get the currently available streams let streams = conn.inner.accepted_streams_mut(); diff --git a/h3/src/client/connection.rs b/h3/src/client/connection.rs index 4fb53cd7..4b8b6f3e 100644 --- a/h3/src/client/connection.rs +++ b/h3/src/client/connection.rs @@ -358,6 +358,23 @@ where /// Maintain the connection state until it is closed pub fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll> { + //= https://www.rfc-editor.org/rfc/rfc9114#section-6.1 + //# Clients MUST treat + //# receipt of a server-initiated bidirectional stream as a connection + //# error of type H3_STREAM_CREATION_ERROR unless such an extension has + //# been negotiated. + if let Poll::Ready(Ok(_)) = self.inner.poll_handle_incoming(cx) { + return Poll::Ready(Err(self.inner.close( + Code::H3_STREAM_CREATION_ERROR, + "client received a bidirectional stream", + ))); + } + + if self.inner.control_recv.is_none() { + panic!("control_recv is None"); + return Poll::Pending; + } + while let Poll::Ready(result) = self.inner.poll_control(cx) { match result { //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.4.2 @@ -428,19 +445,7 @@ where } } } - - //= https://www.rfc-editor.org/rfc/rfc9114#section-6.1 - //# Clients MUST treat - //# receipt of a server-initiated bidirectional stream as a connection - //# error of type H3_STREAM_CREATION_ERROR unless such an extension has - //# been negotiated. - if self.inner.poll_accept_request(cx).is_ready() { - return Poll::Ready(Err(self.inner.close( - Code::H3_STREAM_CREATION_ERROR, - "client received a bidirectional stream", - ))); - } - + panic!("here"); Poll::Pending } } diff --git a/h3/src/connection.rs b/h3/src/connection.rs index 5f7b5d94..be801526 100644 --- a/h3/src/connection.rs +++ b/h3/src/connection.rs @@ -22,7 +22,7 @@ use crate::{ varint::VarInt, }, qpack, - quic::{self, SendStream}, + quic::{self, Connection, SendStream}, stream::{self, AcceptRecvStream, AcceptedRecvStream, BufRecvStream, UniStreamHeader}, webtransport::SessionId, }; @@ -107,7 +107,7 @@ where /// TODO: breaking encapsulation just to see if we can get this to work, will fix before merging pub conn: C, control_send: C::SendStream, - control_recv: Option>, + pub(crate) control_recv: Option>, decoder_send: Option, decoder_recv: Option>, encoder_send: Option, @@ -317,46 +317,39 @@ where } #[allow(missing_docs)] - pub fn poll_accept_request( + pub fn poll_handle_incoming( &mut self, cx: &mut Context<'_>, - ) -> Poll, Error>> { - { - let state = self.shared.read("poll_accept_request"); - if let Some(ref e) = state.error { - return Poll::Ready(Err(e.clone())); - } - } - - // Accept the request by accepting the next bidirectional stream - // .into().into() converts the impl QuicError into crate::error::Error. - // The `?` operator doesn't work here for some reason. - self.conn.poll_accept_bidi(cx).map_err(|e| e.into().into()) + ) -> Poll> { + if let Some(ref e) = self.shared.read("poll_handle_incoming").error { + return Poll::Ready(Err(e.clone())); + }; + let incoming_bidirectional_stream = loop { + match ready!(self.conn.poll_incoming(cx))? { + quic::IncomingStreamType::Bidirectional(bi_di_stream, _) => break bi_di_stream, + quic::IncomingStreamType::Unidirectional(recv_steam) => { + self.poll_handle_receive_stream(cx, recv_steam)?; + } + }; + }; + + Poll::Ready(Ok(incoming_bidirectional_stream)) } /// Polls incoming streams /// /// Accepted streams which are not control, decoder, or encoder streams are buffer in `accepted_recv_streams` - pub fn poll_accept_recv(&mut self, cx: &mut Context<'_>) -> Result<(), Error> { + pub fn poll_handle_receive_stream( + &mut self, + cx: &mut Context<'_>, + recv_stream: >::RecvStream, + ) -> Result<(), Error> { if let Some(ref e) = self.shared.read("poll_accept_request").error { return Err(e.clone()); } - // Get all currently pending streams - loop { - match self.conn.poll_accept_recv(cx)? { - Poll::Ready(Some(stream)) => self - .pending_recv_streams - .push(AcceptRecvStream::new(stream)), - Poll::Ready(None) => { - return Err(Code::H3_GENERAL_PROTOCOL_ERROR.with_reason( - "Connection closed unexpected", - crate::error::ErrorLevel::ConnectionError, - )) - } - Poll::Pending => break, - } - } + self.pending_recv_streams + .push(AcceptRecvStream::new(recv_stream)); let mut resolved = vec![]; @@ -432,12 +425,12 @@ where let recv = { // TODO - self.poll_accept_recv(cx)?; if let Some(v) = &mut self.control_recv { v } else { // Try later - return Poll::Pending; + //return Poll::Pending; + panic!("control stream not received"); } }; diff --git a/h3/src/quic.rs b/h3/src/quic.rs index 6e8722cb..f73c735f 100644 --- a/h3/src/quic.rs +++ b/h3/src/quic.rs @@ -3,6 +3,7 @@ //! This module includes traits and types meant to allow being generic over any //! QUIC implementation. +use std::marker::PhantomData; use std::task::{self, Poll}; use bytes::Buf; @@ -30,6 +31,19 @@ impl<'a, E: Error + 'a> From for Box { } } +/// Trait representing a incoming QUIC stream. +pub enum IncomingStreamType +where + B: Buf, + BiDi: SendStream + RecvStream, + UniDi: RecvStream, +{ + /// A bidirectional stream + Bidirectional(BiDi, PhantomData), + /// A unidirectional stream + Unidirectional(UniDi), +} + /// Trait representing a QUIC connection. pub trait Connection: OpenStreams { /// The type produced by `poll_accept_recv()` @@ -39,21 +53,14 @@ pub trait Connection: OpenStreams { /// Error type yielded by these trait methods type AcceptError: Into>; - /// Accept an incoming unidirectional stream - /// - /// Returning `None` implies the connection is closing or closed. - fn poll_accept_recv( - &mut self, - cx: &mut task::Context<'_>, - ) -> Poll, Self::AcceptError>>; - - /// Accept an incoming bidirectional stream + /// Polls the Connection for Incoming Streams /// - /// Returning `None` implies the connection is closing or closed. - fn poll_accept_bidi( + /// Returning ['IncomingStreamType'] for the next incoming stream. + /// Returning `Err` implies the connection is closing or closed. + fn poll_incoming( &mut self, cx: &mut task::Context<'_>, - ) -> Poll, Self::AcceptError>>; + ) -> Poll, Self::AcceptError>>; /// Get an object to open outgoing streams. fn opener(&self) -> Self::OpenStreams; diff --git a/h3/src/server/connection.rs b/h3/src/server/connection.rs index b28c8d6c..ca636250 100644 --- a/h3/src/server/connection.rs +++ b/h3/src/server/connection.rs @@ -302,20 +302,26 @@ where &mut self, cx: &mut Context<'_>, ) -> Poll, Error>> { - let _ = self.poll_control(cx)?; let _ = self.poll_requests_completion(cx); loop { - match self.inner.poll_accept_request(cx) { + let incoming = self.inner.poll_handle_incoming(cx); + + if self.inner.control_recv.is_some() { + let _ = self.poll_control(cx)?; + } + //let _ = self.poll_control(cx)?; + + match incoming { Poll::Ready(Err(x)) => break Poll::Ready(Err(x)), - Poll::Ready(Ok(None)) => { - if self.poll_requests_completion(cx).is_ready() { - break Poll::Ready(Ok(None)); - } else { - // Wait for all the requests to be finished, request_end_recv will wake - // us on each request completion. - break Poll::Pending; - } - } + // Poll::Ready(Ok(None)) => { + // if self.poll_requests_completion(cx).is_ready() { + // break Poll::Ready(Ok(None)); + // } else { + // // Wait for all the requests to be finished, request_end_recv will wake + // // us on each request completion. + // break Poll::Pending; + // } + // } Poll::Pending => { if self.recv_closing.is_some() && self.poll_requests_completion(cx).is_ready() { // The connection is now idle. @@ -324,7 +330,7 @@ where return Poll::Pending; } } - Poll::Ready(Ok(Some(mut s))) => { + Poll::Ready(Ok(mut s)) => { // When the connection is in a graceful shutdown procedure, reject all // incoming requests not belonging to the grace interval. It's possible that // some acceptable request streams arrive after rejected requests. @@ -354,7 +360,7 @@ where pub(crate) fn poll_next_control( &mut self, cx: &mut Context<'_>, - ) -> Poll, Error>> { + ) -> Poll, Error>> { let frame = ready!(self.inner.poll_control(cx))?; match &frame { diff --git a/h3/src/tests/connection.rs b/h3/src/tests/connection.rs index 7d65b604..80e9786a 100644 --- a/h3/src/tests/connection.rs +++ b/h3/src/tests/connection.rs @@ -8,6 +8,7 @@ use bytes::{Buf, Bytes, BytesMut}; use futures_util::future; use http::{Request, Response, StatusCode}; use tokio::sync::oneshot::{self}; +use tracing::info; use crate::client::SendRequest; use crate::{client, server}; @@ -737,8 +738,9 @@ async fn graceful_shutdown_client() { driver.shutdown(0).await.unwrap(); assert_matches!( future::poll_fn(|cx| { - println!("client drive"); - driver.poll_close(cx) + info!("client drive"); + let x = driver.poll_close(cx); + x }) .await, Ok(()) @@ -750,8 +752,8 @@ async fn graceful_shutdown_client() { let mut incoming = server::Connection::new(conn).await.unwrap(); assert!(incoming.accept().await.unwrap().is_none()); }; - - tokio::join!(server_fut, client_fut); + + tokio::join!(server_fut, client_fut); } async fn request(mut send_request: T) -> Result, Error> diff --git a/h3/src/tests/mod.rs b/h3/src/tests/mod.rs index d34d234c..bf02ec02 100644 --- a/h3/src/tests/mod.rs +++ b/h3/src/tests/mod.rs @@ -18,9 +18,10 @@ use std::{ time::Duration, }; -use bytes::Bytes; +use bytes::{Buf, Bytes}; use quinn::crypto::rustls::{QuicClientConfig, QuicServerConfig}; use rustls::pki_types::{CertificateDer, PrivateKeyDer}; +use tracing::level_filters::LevelFilter; use crate::quic; use h3_quinn::{quinn::TransportConfig, Connection}; @@ -30,6 +31,7 @@ pub fn init_tracing() { .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) .with_span_events(tracing_subscriber::fmt::format::FmtSpan::FULL) .with_test_writer() + .with_max_level(LevelFilter::DEBUG) .try_init(); } @@ -124,7 +126,7 @@ impl Pair { .unwrap() } - pub async fn client(&self) -> h3_quinn::Connection { + pub async fn client(&self) -> h3_quinn::Connection { Connection::new(self.client_inner().await) } } From 51389c26e56a82c8955413ac0659e33ce854ffe2 Mon Sep 17 00:00:00 2001 From: ruben Date: Sat, 15 Jun 2024 23:16:46 +0200 Subject: [PATCH 02/11] more stuff --- h3/src/client/connection.rs | 31 ++++++++++++++++--------------- h3/src/connection.rs | 6 +++--- h3/src/tests/connection.rs | 1 - 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/h3/src/client/connection.rs b/h3/src/client/connection.rs index 4b8b6f3e..eae612b0 100644 --- a/h3/src/client/connection.rs +++ b/h3/src/client/connection.rs @@ -358,21 +358,23 @@ where /// Maintain the connection state until it is closed pub fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll> { - //= https://www.rfc-editor.org/rfc/rfc9114#section-6.1 - //# Clients MUST treat - //# receipt of a server-initiated bidirectional stream as a connection - //# error of type H3_STREAM_CREATION_ERROR unless such an extension has - //# been negotiated. - if let Poll::Ready(Ok(_)) = self.inner.poll_handle_incoming(cx) { - return Poll::Ready(Err(self.inner.close( - Code::H3_STREAM_CREATION_ERROR, - "client received a bidirectional stream", - ))); - } + let incoming_result = self.inner.poll_handle_incoming(cx); - if self.inner.control_recv.is_none() { - panic!("control_recv is None"); - return Poll::Pending; + match incoming_result { + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + //= https://www.rfc-editor.org/rfc/rfc9114#section-6.1 + //# Clients MUST treat + //# receipt of a server-initiated bidirectional stream as a connection + //# error of type H3_STREAM_CREATION_ERROR unless such an extension has + //# been negotiated. + Poll::Ready(Ok(_)) => { + return Poll::Ready(Err(self.inner.close( + Code::H3_STREAM_CREATION_ERROR, + "client received a bidirectional stream", + ))) + } + // if pending, continue to poll the control stream + Poll::Pending => (), } while let Poll::Ready(result) = self.inner.poll_control(cx) { @@ -445,7 +447,6 @@ where } } } - panic!("here"); Poll::Pending } } diff --git a/h3/src/connection.rs b/h3/src/connection.rs index be801526..7f8b695c 100644 --- a/h3/src/connection.rs +++ b/h3/src/connection.rs @@ -332,7 +332,7 @@ where } }; }; - + Poll::Ready(Ok(incoming_bidirectional_stream)) } @@ -429,8 +429,8 @@ where v } else { // Try later - //return Poll::Pending; - panic!("control stream not received"); + return Poll::Pending; + // panic!("control stream not received"); } }; diff --git a/h3/src/tests/connection.rs b/h3/src/tests/connection.rs index 80e9786a..1721cf0a 100644 --- a/h3/src/tests/connection.rs +++ b/h3/src/tests/connection.rs @@ -738,7 +738,6 @@ async fn graceful_shutdown_client() { driver.shutdown(0).await.unwrap(); assert_matches!( future::poll_fn(|cx| { - info!("client drive"); let x = driver.poll_close(cx); x }) From 8e373b72b8bc90f4127374b1947f4857a086c9b9 Mon Sep 17 00:00:00 2001 From: ruben Date: Sun, 16 Jun 2024 00:21:42 +0200 Subject: [PATCH 03/11] handle errors correctly --- h3/src/client/connection.rs | 17 ++++++++--------- h3/src/connection.rs | 12 +++++++++++- h3/src/tests/connection.rs | 1 - 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/h3/src/client/connection.rs b/h3/src/client/connection.rs index eae612b0..01ec7e16 100644 --- a/h3/src/client/connection.rs +++ b/h3/src/client/connection.rs @@ -361,7 +361,13 @@ where let incoming_result = self.inner.poll_handle_incoming(cx); match incoming_result { - Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + Poll::Ready(Err(e)) => { + let connection_error = self.inner.shared.set_error(e, "poll_close error"); + if connection_error.is_closed() { + return Poll::Ready(Ok(())); + } + return Poll::Ready(Err(connection_error)); + } //= https://www.rfc-editor.org/rfc/rfc9114#section-6.1 //# Clients MUST treat //# receipt of a server-initiated bidirectional stream as a connection @@ -432,14 +438,7 @@ where ))) } Err(e) => { - let connection_error = self.inner.shared.read("poll_close").error.clone(); - let connection_error = match connection_error { - Some(e) => e, - None => { - self.inner.shared.write("poll_close error").error = Some(e.clone()); - e - } - }; + let connection_error = self.inner.shared.set_error(e, "poll_close error"); if connection_error.is_closed() { return Poll::Ready(Ok(())); } diff --git a/h3/src/connection.rs b/h3/src/connection.rs index 7f8b695c..c75685be 100644 --- a/h3/src/connection.rs +++ b/h3/src/connection.rs @@ -50,6 +50,16 @@ impl SharedStateRef { pub fn write(&self, panic_msg: &'static str) -> RwLockWriteGuard { self.0.write().expect(panic_msg) } + + /// if a error is set, return it, + /// otherwise the passed error is stored and returned + pub fn set_error(&self, err: Error, panic_msg: &'static str) -> Error { + if let Some(err) = self.read(panic_msg).error.clone() { + return err; + } + self.write(panic_msg).error = Some(err.clone()); + err + } } impl Default for SharedStateRef { @@ -430,7 +440,7 @@ where } else { // Try later return Poll::Pending; - // panic!("control stream not received"); + // panic!("control stream not received"); } }; diff --git a/h3/src/tests/connection.rs b/h3/src/tests/connection.rs index 1721cf0a..c38247da 100644 --- a/h3/src/tests/connection.rs +++ b/h3/src/tests/connection.rs @@ -8,7 +8,6 @@ use bytes::{Buf, Bytes, BytesMut}; use futures_util::future; use http::{Request, Response, StatusCode}; use tokio::sync::oneshot::{self}; -use tracing::info; use crate::client::SendRequest; use crate::{client, server}; From 511c412ec17730fadd7d52bffa55daf2934f1ceb Mon Sep 17 00:00:00 2001 From: ruben Date: Sun, 16 Jun 2024 00:22:26 +0200 Subject: [PATCH 04/11] fmt --- h3/src/server/connection.rs | 2 +- h3/src/tests/connection.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/h3/src/server/connection.rs b/h3/src/server/connection.rs index ca636250..295e883c 100644 --- a/h3/src/server/connection.rs +++ b/h3/src/server/connection.rs @@ -360,7 +360,7 @@ where pub(crate) fn poll_next_control( &mut self, cx: &mut Context<'_>, - ) -> Poll, Error>> { + ) -> Poll, Error>> { let frame = ready!(self.inner.poll_control(cx))?; match &frame { diff --git a/h3/src/tests/connection.rs b/h3/src/tests/connection.rs index c38247da..36ed7c7c 100644 --- a/h3/src/tests/connection.rs +++ b/h3/src/tests/connection.rs @@ -750,8 +750,8 @@ async fn graceful_shutdown_client() { let mut incoming = server::Connection::new(conn).await.unwrap(); assert!(incoming.accept().await.unwrap().is_none()); }; - - tokio::join!(server_fut, client_fut); + + tokio::join!(server_fut, client_fut); } async fn request(mut send_request: T) -> Result, Error> From 5c368af9f9b16739eaf2dbcced7333a97bf559c6 Mon Sep 17 00:00:00 2001 From: ruben Date: Sun, 16 Jun 2024 10:39:57 +0200 Subject: [PATCH 05/11] progress --- examples/Cargo.toml | 6 +-- examples/webtransport_server.rs | 2 +- h3-webtransport/src/lib.rs | 10 ++--- h3-webtransport/src/server.rs | 78 ++++++++++++++++----------------- h3/src/server/connection.rs | 1 - 5 files changed, 48 insertions(+), 49 deletions(-) diff --git a/examples/Cargo.toml b/examples/Cargo.toml index a6a904ed..6cdf5feb 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -47,6 +47,6 @@ path = "client.rs" name = "server" path = "server.rs" -#[[example]] -#name = "webtransport_server" -#path = "webtransport_server.rs" +[[example]] +name = "webtransport_server" +path = "webtransport_server.rs" diff --git a/examples/webtransport_server.rs b/examples/webtransport_server.rs index 180bc19e..acef7e01 100644 --- a/examples/webtransport_server.rs +++ b/examples/webtransport_server.rs @@ -321,7 +321,7 @@ where tokio::spawn( async move { log_result!(echo_stream(send, stream).await); }); } stream = session.accept_bi() => { - if let Some(server::AcceptedBi::BidiStream(_, stream)) = stream? { + if let Some(server::AcceptStream::BidiStream(_, stream)) = stream? { let (send, recv) = quic::BidiStream::split(stream); tokio::spawn( async move { log_result!(echo_stream(send, recv).await); }); } diff --git a/h3-webtransport/src/lib.rs b/h3-webtransport/src/lib.rs index f3988d60..85762d45 100644 --- a/h3-webtransport/src/lib.rs +++ b/h3-webtransport/src/lib.rs @@ -5,9 +5,9 @@ //! WebTransport over HTTP/3: #![deny(missing_docs)] -// /// Server side WebTransport session support -//pub mod server; -// /// Webtransport stream types -//pub mod stream; +/// Server side WebTransport session support +pub mod server; +/// Webtransport stream types +pub mod stream; -//pub use h3::webtransport::SessionId; +pub use h3::webtransport::SessionId; diff --git a/h3-webtransport/src/server.rs b/h3-webtransport/src/server.rs index 2f016b14..a6a32cda 100644 --- a/h3-webtransport/src/server.rs +++ b/h3-webtransport/src/server.rs @@ -151,15 +151,15 @@ where Ok(()) } - /// Accept an incoming unidirectional stream from the client, it reads the stream until EOF. - pub fn accept_uni(&self) -> AcceptUni { - AcceptUni { - conn: &self.server_conn, - } - } + // /// Accept an incoming unidirectional stream from the client, it reads the stream until EOF. + // pub fn accept_uni(&self) -> AcceptUni { + // AcceptUni { + // conn: &self.server_conn, + // } + // } /// Accepts an incoming bidirectional stream or request - pub async fn accept_bi(&self) -> Result>, Error> { + pub async fn accept_bi(&self) -> Result>, Error> { // Get the next stream // Accept the incoming stream let stream = poll_fn(|cx| { @@ -204,7 +204,7 @@ where // Take the stream out of the framed reader and split it in half like Paul Allen let stream = stream.into_inner(); - Ok(Some(AcceptedBi::BidiStream( + Ok(Some(AcceptStream::BidiStream( session_id, BidiStream::new(stream), ))) @@ -217,7 +217,7 @@ where }; if let Some(req) = req { let (req, resp) = req.resolve().await?; - Ok(Some(AcceptedBi::Request(req, resp))) + Ok(Some(AcceptStream::Request(req, resp))) } else { Ok(None) } @@ -351,7 +351,7 @@ where /// An accepted incoming bidirectional stream. /// /// Since -pub enum AcceptedBi, B: Buf> { +pub enum AcceptStream, B: Buf> { /// An incoming bidirectional stream BidiStream(SessionId, BidiStream), /// An incoming HTTP/3 request, passed through a webtransport session. @@ -392,35 +392,35 @@ where } } -/// Future for [`WebTransportSession::accept_uni`] -pub struct AcceptUni<'a, C, B> -where - C: quic::Connection, - B: Buf, -{ - conn: &'a Mutex>, -} - -impl<'a, C, B> Future for AcceptUni<'a, C, B> -where - C: quic::Connection, - B: Buf, -{ - type Output = Result)>, Error>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut conn = self.conn.lock().unwrap(); - conn.inner.poll_handle_receive_stream(cx)?; - - // Get the currently available streams - let streams = conn.inner.accepted_streams_mut(); - if let Some((id, stream)) = streams.wt_uni_streams.pop() { - return Poll::Ready(Ok(Some((id, RecvStream::new(stream))))); - } - - Poll::Pending - } -} +// /// Future for [`WebTransportSession::accept_uni`] +// pub struct AcceptUni<'a, C, B> +// where +// C: quic::Connection, +// B: Buf, +// { +// conn: &'a Mutex>, +// } + +// impl<'a, C, B> Future for AcceptUni<'a, C, B> +// where +// C: quic::Connection, +// B: Buf, +// { +// type Output = Result)>, Error>; + +// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { +// let mut conn = self.conn.lock().unwrap(); + +// // accept_bi polls the incoming connections for us and safes them in wt +// // Get the currently available streams +// let streams = conn.inner.accepted_streams_mut(); +// if let Some((id, stream)) = streams.wt_uni_streams.pop() { +// return Poll::Ready(Ok(Some((id, RecvStream::new(stream))))); +// } + +// Poll::Pending +// } +// } fn validate_wt_connect(request: &Request<()>) -> bool { let protocol = request.extensions().get::(); diff --git a/h3/src/server/connection.rs b/h3/src/server/connection.rs index 295e883c..32936c6a 100644 --- a/h3/src/server/connection.rs +++ b/h3/src/server/connection.rs @@ -309,7 +309,6 @@ where if self.inner.control_recv.is_some() { let _ = self.poll_control(cx)?; } - //let _ = self.poll_control(cx)?; match incoming { Poll::Ready(Err(x)) => break Poll::Ready(Err(x)), From a42f439796c3fe9fa9679dd89d1b6315c8bb43bf Mon Sep 17 00:00:00 2001 From: ruben Date: Sun, 16 Jun 2024 16:39:07 +0200 Subject: [PATCH 06/11] fix webtransport --- examples/webtransport_server.rs | 27 +++++++----- h3-webtransport/src/server.rs | 78 +++++++++++++++++++-------------- 2 files changed, 62 insertions(+), 43 deletions(-) diff --git a/examples/webtransport_server.rs b/examples/webtransport_server.rs index acef7e01..dc1edf26 100644 --- a/examples/webtransport_server.rs +++ b/examples/webtransport_server.rs @@ -61,6 +61,7 @@ async fn main() -> Result<(), Box> { .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) .with_span_events(tracing_subscriber::fmt::format::FmtSpan::FULL) .with_writer(std::io::stderr) + .with_max_level(tracing::Level::INFO) .init(); #[cfg(feature = "tree")] @@ -69,6 +70,7 @@ async fn main() -> Result<(), Box> { tracing_subscriber::registry() .with(tracing_subscriber::EnvFilter::from_default_env()) .with(tracing_tree::HierarchicalLayer::new(4).with_bracketed_fields(true)) + .with_max_level(tracing::Level::INFO) .init(); // process cli arguments @@ -314,17 +316,20 @@ where tracing::info!("Finished sending datagram"); } } - uni_stream = session.accept_uni() => { - let (id, stream) = uni_stream?.unwrap(); - - let send = session.open_uni(id).await?; - tokio::spawn( async move { log_result!(echo_stream(send, stream).await); }); - } - stream = session.accept_bi() => { - if let Some(server::AcceptStream::BidiStream(_, stream)) = stream? { - let (send, recv) = quic::BidiStream::split(stream); - tokio::spawn( async move { log_result!(echo_stream(send, recv).await); }); - } + stream = session.accept_streams() => { + match stream? { + Some(server::AcceptStream::BidiStream(_, stream)) =>{ + info!("Received bidirectional stream"); + let (send, recv) = quic::BidiStream::split(stream); + tokio::spawn( async move { log_result!(echo_stream(send, recv).await); }); + }, + Some(server::AcceptStream::UnidirectionalStream(id, stream)) => { + info!("Received unidirectional stream with id: {:?}", id); + let send = session.open_uni(id).await?; + tokio::spawn( async move { log_result!(echo_stream(send, stream).await); }); + }, + _ => (), + } } else => { break diff --git a/h3-webtransport/src/server.rs b/h3-webtransport/src/server.rs index a6a32cda..b6dac3b1 100644 --- a/h3-webtransport/src/server.rs +++ b/h3-webtransport/src/server.rs @@ -159,15 +159,47 @@ where // } /// Accepts an incoming bidirectional stream or request - pub async fn accept_bi(&self) -> Result>, Error> { + pub async fn accept_streams(&self) -> Result>, Error> { + { + let mut conn = self.server_conn.lock().unwrap(); + // if there are pending unidirectional streams, return them before the first await + if let Some((id, stream)) = conn.inner.accepted_streams_mut().wt_uni_streams.pop() { + return Ok(Some(AcceptStream::UnidirectionalStream::( + id, + RecvStream::new(stream), + ))); + } + } + // Get the next stream // Accept the incoming stream let stream = poll_fn(|cx| { let mut conn = self.server_conn.lock().unwrap(); - conn.poll_accept_request(cx) + let accepted_stream = conn.poll_accept_request(cx); + if matches!(accepted_stream, Poll::Pending) { + // if there are accepted uni streams, return them + if let Some((id, stream)) = conn.inner.accepted_streams_mut().wt_uni_streams.pop() { + return Poll::Ready(PollAcceptRequestResult::UnidirectionalStream::( + id, stream, + )); + } + return Poll::Pending; + } + let bidi_stream = ready!(accepted_stream); + Poll::Ready(PollAcceptRequestResult::BidirectionalStream(bidi_stream)) }) .await; + let stream = match stream { + PollAcceptRequestResult::UnidirectionalStream(id, stream) => { + return Ok(Some(AcceptStream::UnidirectionalStream( + id, + RecvStream::new(stream), + ))); + } + PollAcceptRequestResult::BidirectionalStream(stream) => stream, + }; + let mut stream = match stream { Ok(Some(s)) => FrameStream::new(BufRecvStream::new(s)), Ok(None) => { @@ -249,6 +281,15 @@ where } } +enum PollAcceptRequestResult +where + C: quic::Connection, + B: Buf, +{ + UnidirectionalStream(SessionId, BufRecvStream), + BidirectionalStream(Result, Error>), +} + /// Streams are opened, but the initial webtransport header has not been sent type PendingStreams = ( BidiStream<>::BidiStream, B>, @@ -354,6 +395,9 @@ where pub enum AcceptStream, B: Buf> { /// An incoming bidirectional stream BidiStream(SessionId, BidiStream), + /// An incoming unidirectional stream + UnidirectionalStream(SessionId, RecvStream), + /// An incoming HTTP/3 request, passed through a webtransport session. /// /// This makes it possible to respond to multiple CONNECT requests @@ -392,36 +436,6 @@ where } } -// /// Future for [`WebTransportSession::accept_uni`] -// pub struct AcceptUni<'a, C, B> -// where -// C: quic::Connection, -// B: Buf, -// { -// conn: &'a Mutex>, -// } - -// impl<'a, C, B> Future for AcceptUni<'a, C, B> -// where -// C: quic::Connection, -// B: Buf, -// { -// type Output = Result)>, Error>; - -// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { -// let mut conn = self.conn.lock().unwrap(); - -// // accept_bi polls the incoming connections for us and safes them in wt -// // Get the currently available streams -// let streams = conn.inner.accepted_streams_mut(); -// if let Some((id, stream)) = streams.wt_uni_streams.pop() { -// return Poll::Ready(Ok(Some((id, RecvStream::new(stream))))); -// } - -// Poll::Pending -// } -// } - fn validate_wt_connect(request: &Request<()>) -> bool { let protocol = request.extensions().get::(); matches!((request.method(), protocol), (&Method::CONNECT, Some(p)) if p == &Protocol::WEB_TRANSPORT) From 7658b62eaec5c08e3f584c0c76772ad2edccc312 Mon Sep 17 00:00:00 2001 From: ruben Date: Sun, 16 Jun 2024 16:47:47 +0200 Subject: [PATCH 07/11] remove debug and fmt --- examples/webtransport_server.rs | 2 +- h3-quinn/src/lib.rs | 25 ------------------------- h3-webtransport/src/server.rs | 7 ------- h3/src/connection.rs | 1 - h3/src/tests/connection.rs | 6 +----- 5 files changed, 2 insertions(+), 39 deletions(-) diff --git a/examples/webtransport_server.rs b/examples/webtransport_server.rs index dc1edf26..0698f091 100644 --- a/examples/webtransport_server.rs +++ b/examples/webtransport_server.rs @@ -329,7 +329,7 @@ where tokio::spawn( async move { log_result!(echo_stream(send, stream).await); }); }, _ => (), - } + } } else => { break diff --git a/h3-quinn/src/lib.rs b/h3-quinn/src/lib.rs index 09803964..7e7e95c7 100644 --- a/h3-quinn/src/lib.rs +++ b/h3-quinn/src/lib.rs @@ -191,31 +191,6 @@ where type OpenStreams = OpenStreams; type AcceptError = ConnectionError; - // fn poll_accept_bidi( - // &mut self, - // cx: &mut task::Context<'_>, - // ) -> Poll, Self::AcceptError>> { - // let (send, recv) = match ready!(self.incoming_bi.poll_next_unpin(cx)) { - // Some(x) => x?, - // None => return Poll::Ready(Ok(None)), - // }; - // Poll::Ready(Ok(Some(Self::BidiStream { - // send: Self::SendStream::new(send), - // recv: Self::RecvStream::new(recv), - // }))) - // } - - // fn poll_accept_recv( - // &mut self, - // cx: &mut task::Context<'_>, - // ) -> Poll, Self::AcceptError>> { - // let recv = match ready!(self.incoming_uni.poll_next_unpin(cx)) { - // Some(x) => x?, - // None => return Poll::Ready(Ok(None)), - // }; - // Poll::Ready(Ok(Some(Self::RecvStream::new(recv)))) - // } - fn opener(&self) -> Self::OpenStreams { OpenStreams { conn: self.conn.clone(), diff --git a/h3-webtransport/src/server.rs b/h3-webtransport/src/server.rs index b6dac3b1..0ae88afc 100644 --- a/h3-webtransport/src/server.rs +++ b/h3-webtransport/src/server.rs @@ -151,13 +151,6 @@ where Ok(()) } - // /// Accept an incoming unidirectional stream from the client, it reads the stream until EOF. - // pub fn accept_uni(&self) -> AcceptUni { - // AcceptUni { - // conn: &self.server_conn, - // } - // } - /// Accepts an incoming bidirectional stream or request pub async fn accept_streams(&self) -> Result>, Error> { { diff --git a/h3/src/connection.rs b/h3/src/connection.rs index c75685be..51a2c603 100644 --- a/h3/src/connection.rs +++ b/h3/src/connection.rs @@ -440,7 +440,6 @@ where } else { // Try later return Poll::Pending; - // panic!("control stream not received"); } }; diff --git a/h3/src/tests/connection.rs b/h3/src/tests/connection.rs index 36ed7c7c..e63acea9 100644 --- a/h3/src/tests/connection.rs +++ b/h3/src/tests/connection.rs @@ -736,11 +736,7 @@ async fn graceful_shutdown_client() { let (mut driver, mut _send_request) = client::new(pair.client().await).await.unwrap(); driver.shutdown(0).await.unwrap(); assert_matches!( - future::poll_fn(|cx| { - let x = driver.poll_close(cx); - x - }) - .await, + future::poll_fn(|cx| { driver.poll_close(cx) }).await, Ok(()) ); }; From ce099b73c26807ca5345fd87d5222f0b5b7b1528 Mon Sep 17 00:00:00 2001 From: ruben Date: Sun, 16 Jun 2024 17:33:43 +0200 Subject: [PATCH 08/11] fix docs --- h3-webtransport/src/server.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/h3-webtransport/src/server.rs b/h3-webtransport/src/server.rs index 0ae88afc..840d44d5 100644 --- a/h3-webtransport/src/server.rs +++ b/h3-webtransport/src/server.rs @@ -382,7 +382,7 @@ where } } -/// An accepted incoming bidirectional stream. +/// An accepted incoming bidirectional or unidirectional stream. /// /// Since pub enum AcceptStream, B: Buf> { From bc0799caa9a7b3673a3aae824e96a0247cd06925 Mon Sep 17 00:00:00 2001 From: ruben Date: Sun, 11 Aug 2024 19:10:58 +0200 Subject: [PATCH 09/11] use patched quinn --- examples/Cargo.toml | 4 ++-- h3-quinn/Cargo.toml | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 420fbcc6..062ace1b 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -14,11 +14,11 @@ h3 = { path = "../h3", features = ["tracing"] } h3-quinn = { path = "../h3-quinn", features = ["tracing"] } h3-webtransport = { path = "../h3-webtransport" } http = "1" -quinn = { version = "0.11", default-features = false, features = [ +quinn = { git = "https://github.com/Ruben2424/quinn.git", branch = "stop_sending1", features = [ "runtime-tokio", "rustls", "ring", -] } +], default-features = false } rcgen = { version = "0.13" } rustls = { version = "0.23", default-features = false, features = [ "logging", diff --git a/h3-quinn/Cargo.toml b/h3-quinn/Cargo.toml index bea10395..8c32f620 100644 --- a/h3-quinn/Cargo.toml +++ b/h3-quinn/Cargo.toml @@ -15,9 +15,9 @@ license = "MIT" [dependencies] h3 = { version = "0.0.6", path = "../h3" } bytes = "1" -quinn = { version = "0.11", default-features = false, features = [ +quinn = { git = "https://github.com/Ruben2424/quinn.git", branch = "stop_sending1", features = [ "futures-io", -] } +], default-features = false } tokio-util = { version = "0.7.9" } futures = { version = "0.3.28" } tokio = { version = "1", features = ["io-util"], default-features = false } From e374a8a0304680fb04a0fc7806d6c3cedf17ca5d Mon Sep 17 00:00:00 2001 From: ruben Date: Sun, 11 Aug 2024 19:41:13 +0200 Subject: [PATCH 10/11] warnings --- h3-quinn/src/lib.rs | 4 ++-- h3/Cargo.toml | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/h3-quinn/src/lib.rs b/h3-quinn/src/lib.rs index 45cd344a..5965328f 100644 --- a/h3-quinn/src/lib.rs +++ b/h3-quinn/src/lib.rs @@ -16,11 +16,11 @@ use bytes::{Buf, Bytes, BytesMut}; use futures::{ ready, - stream::{self, select, BoxStream, Select}, + stream::{self, select, Select}, Stream, StreamExt, }; pub use quinn::{self, Endpoint, OpenBi, OpenUni, VarInt, WriteError}; -use quinn::{AcceptBi, AcceptUni, ApplicationClose, ClosedStream, ReadDatagram}; +use quinn::{ApplicationClose, ClosedStream, ReadDatagram}; use h3::{ ext::Datagram, diff --git a/h3/Cargo.toml b/h3/Cargo.toml index 3de9cc48..bf7da3bd 100644 --- a/h3/Cargo.toml +++ b/h3/Cargo.toml @@ -40,6 +40,7 @@ quinn = { version = "0.11", default-features = false, features = [ "runtime-tokio", "rustls", "ring", + "futures-io", ] } quinn-proto = { version = "0.11", default-features = false } rcgen = "0.13" From f49aec57f77fffe531d6780593c8626f9f5c609c Mon Sep 17 00:00:00 2001 From: ruben Date: Sun, 11 Aug 2024 19:45:28 +0200 Subject: [PATCH 11/11] use fixed version of h3spec --- ci/h3spec.sh | 2 +- examples/Cargo.toml | 4 ++-- h3-quinn/Cargo.toml | 6 +++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/ci/h3spec.sh b/ci/h3spec.sh index d250dc37..73cc6ba0 100755 --- a/ci/h3spec.sh +++ b/ci/h3spec.sh @@ -3,7 +3,7 @@ LOGFILE=h3server.log if ! [ -e "h3spec-linux-x86_64" ] ; then # if we don't already have a h3spec executable, wget it from github - wget https://github.com/kazu-yamamoto/h3spec/releases/download/v0.1.10/h3spec-linux-x86_64 + wget https://github.com/kazu-yamamoto/h3spec/releases/download/v0.1.11/h3spec-linux-x86_64 chmod +x h3spec-linux-x86_64 fi diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 062ace1b..420fbcc6 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -14,11 +14,11 @@ h3 = { path = "../h3", features = ["tracing"] } h3-quinn = { path = "../h3-quinn", features = ["tracing"] } h3-webtransport = { path = "../h3-webtransport" } http = "1" -quinn = { git = "https://github.com/Ruben2424/quinn.git", branch = "stop_sending1", features = [ +quinn = { version = "0.11", default-features = false, features = [ "runtime-tokio", "rustls", "ring", -], default-features = false } +] } rcgen = { version = "0.13" } rustls = { version = "0.23", default-features = false, features = [ "logging", diff --git a/h3-quinn/Cargo.toml b/h3-quinn/Cargo.toml index 8c32f620..9a9250e0 100644 --- a/h3-quinn/Cargo.toml +++ b/h3-quinn/Cargo.toml @@ -13,11 +13,11 @@ categories = ["network-programming", "web-programming"] license = "MIT" [dependencies] +quinn = { version = "0.11", default-features = false, features = [ + "futures-io", +] } h3 = { version = "0.0.6", path = "../h3" } bytes = "1" -quinn = { git = "https://github.com/Ruben2424/quinn.git", branch = "stop_sending1", features = [ - "futures-io", -], default-features = false } tokio-util = { version = "0.7.9" } futures = { version = "0.3.28" } tokio = { version = "1", features = ["io-util"], default-features = false }