Skip to content

Commit

Permalink
Merge branch 'Main' into h3-datagrams
Browse files Browse the repository at this point in the history
  • Loading branch information
Ruben2424 committed Dec 29, 2024
2 parents 302511d + d47e24a commit d4a2054
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 22 deletions.
26 changes: 23 additions & 3 deletions h3/src/client/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ use crate::{
qpack,
quic::{self},
};
use std::convert::TryFrom;
use std::{
convert::TryFrom,
task::{Context, Poll},
};

/// Manage request bodies transfer, response and trailers.
///
Expand Down Expand Up @@ -150,11 +153,28 @@ where
self.inner.recv_data().await
}

/// Receive request body
pub fn poll_recv_data(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<Option<impl Buf>, Error>> {
self.inner.poll_recv_data(cx)
}

/// Receive an optional set of trailers for the response.
#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
pub async fn recv_trailers(&mut self) -> Result<Option<HeaderMap>, Error> {
let res = self.inner.recv_trailers().await;
if let Err(ref e) = res {
future::poll_fn(|cx| self.poll_recv_trailers(cx)).await
}

/// Poll receive an optional set of trailers for the response.
#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
pub fn poll_recv_trailers(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap>, Error>> {
let res = self.inner.poll_recv_trailers(cx);
if let Poll::Ready(Err(e)) = &res {
if e.is_header_too_big() {
self.inner.stream.stop_sending(Code::H3_REQUEST_CANCELLED);
}
Expand Down
46 changes: 28 additions & 18 deletions h3/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -792,14 +792,16 @@ where
future::poll_fn(|cx| self.poll_recv_data(cx)).await
}

/// Receive trailers
/// Poll receive trailers.
#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
pub async fn recv_trailers(&mut self) -> Result<Option<HeaderMap>, Error> {
pub fn poll_recv_trailers(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap>, Error>> {
let mut trailers = if let Some(encoded) = self.trailers.take() {
encoded
} else {
let frame = future::poll_fn(|cx| self.stream.poll_next(cx))
.await
let frame = futures_util::ready!(self.stream.poll_next(cx))
.map_err(|e| self.maybe_conn_err(e))?;
match frame {
Some(Frame::Headers(encoded)) => encoded,
Expand All @@ -826,20 +828,28 @@ where
//# The MAX_PUSH_ID frame is always sent on the control stream. Receipt
//# of a MAX_PUSH_ID frame on any other stream MUST be treated as a
//# connection error of type H3_FRAME_UNEXPECTED.
Some(_) => return Err(Code::H3_FRAME_UNEXPECTED.into()),
None => return Ok(None),
Some(_) => return Poll::Ready(Err(Code::H3_FRAME_UNEXPECTED.into())),
None => return Poll::Ready(Ok(None)),
}
};

if !self.stream.is_eos() {
// Get the trailing frame
let trailing_frame = future::poll_fn(|cx| self.stream.poll_next(cx))
.await
.map_err(|e| self.maybe_conn_err(e))?;

if trailing_frame.is_some() {
// if it's not unknown or reserved, fail.
return Err(Code::H3_FRAME_UNEXPECTED.into());
match self
.stream
.poll_next(cx)
.map_err(|e| self.maybe_conn_err(e))?
{
Poll::Ready(trailing_frame) => {
if trailing_frame.is_some() {
// if it's not unknown or reserved, fail.
return Poll::Ready(Err(Code::H3_FRAME_UNEXPECTED.into()));
}
}
Poll::Pending => {
// save the trailers and try again.
self.trailers = Some(trailers);
return Poll::Pending;
}
}
}

Expand All @@ -849,16 +859,16 @@ where
//# An HTTP/3 implementation MAY impose a limit on the maximum size of
//# the message header it will accept on an individual HTTP message.
Err(qpack::DecoderError::HeaderTooLong(cancel_size)) => {
return Err(Error::header_too_big(
return Poll::Ready(Err(Error::header_too_big(
cancel_size,
self.max_field_section_size,
))
)))
}
Ok(decoded) => decoded,
Err(e) => return Err(e.into()),
Err(e) => return Poll::Ready(Err(e.into())),
};

Ok(Some(Header::try_from(fields)?.into_fields()))
Poll::Ready(Ok(Some(Header::try_from(fields)?.into_fields())))
}

#[allow(missing_docs)]
Expand Down
12 changes: 11 additions & 1 deletion h3/src/server/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::{
};

use bytes::BytesMut;
use futures_util::future;
use http::{response, HeaderMap, Response};

use quic::StreamId;
Expand Down Expand Up @@ -78,7 +79,16 @@ where
/// Receive an optional set of trailers for the request
#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
pub async fn recv_trailers(&mut self) -> Result<Option<HeaderMap>, Error> {
self.inner.recv_trailers().await
future::poll_fn(|cx| self.poll_recv_trailers(cx)).await
}

/// Poll for an optional set of trailers for the request
#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
pub fn poll_recv_trailers(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap>, Error>> {
self.inner.poll_recv_trailers(cx)
}

/// Tell the peer to stop sending into the underlying QUIC stream
Expand Down

0 comments on commit d4a2054

Please sign in to comment.