From 856b436dabf683f2b5f0d1923f89e082c30733cf Mon Sep 17 00:00:00 2001 From: Divy Srivastava Date: Sat, 17 Feb 2024 21:33:00 +0530 Subject: [PATCH] Use bytes::BytesMut to avoid cloning during unmasking --- examples/echo_server.rs | 5 ++++- src/frame.rs | 13 ++++--------- src/lib.rs | 14 ++++++-------- 3 files changed, 14 insertions(+), 18 deletions(-) diff --git a/examples/echo_server.rs b/examples/echo_server.rs index aefe739..daad135 100644 --- a/examples/echo_server.rs +++ b/examples/echo_server.rs @@ -25,7 +25,10 @@ use hyper::Response; use tokio::net::TcpListener; async fn handle_client(fut: upgrade::UpgradeFut) -> Result<(), WebSocketError> { - let mut ws = fastwebsockets::FragmentCollector::new(fut.await?); + let mut ws = fut.await?; + ws.set_auto_apply_mask(false); + + let mut ws = fastwebsockets::FragmentCollector::new(ws); loop { let frame = ws.read_frame().await?; diff --git a/src/frame.rs b/src/frame.rs index 251c7dd..9ac133d 100644 --- a/src/frame.rs +++ b/src/frame.rs @@ -14,7 +14,7 @@ use tokio::io::AsyncWriteExt; -use bytes::Bytes; +use bytes::BytesMut; use core::ops::Deref; use crate::WebSocketError; @@ -45,7 +45,7 @@ pub enum Payload<'a> { BorrowedMut(&'a mut [u8]), Borrowed(&'a [u8]), Owned(Vec), - Bytes(Bytes), + Bytes(BytesMut), } impl<'a> core::fmt::Debug for Payload<'a> { @@ -97,6 +97,7 @@ impl From> for Vec { } impl Payload<'_> { + #[inline(always)] pub fn to_mut(&mut self) -> &mut [u8] { match self { Payload::Borrowed(borrowed) => { @@ -108,13 +109,7 @@ impl Payload<'_> { } Payload::BorrowedMut(borrowed) => borrowed, Payload::Owned(ref mut owned) => owned, - Payload::Bytes(b) => { - *self = Payload::Owned(b.to_vec()); - match self { - Payload::Owned(owned) => owned, - _ => unreachable!(), - } - } + Payload::Bytes(b) => b.as_mut(), } } } diff --git a/src/lib.rs b/src/lib.rs index 4ee251e..2f25d9c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -542,8 +542,12 @@ impl<'f, S> WebSocket { } } +const MAX_HEADER_SIZE: usize = 14; + impl ReadHalf { pub fn after_handshake(role: Role) -> Self { + let mut buffer = BytesMut::with_capacity(8192); + Self { role, auto_apply_mask: true, @@ -551,7 +555,7 @@ impl ReadHalf { auto_pong: true, writev_threshold: 1024, max_message_size: 64 << 20, - buffer: BytesMut::with_capacity(8192), + buffer, } } @@ -561,7 +565,6 @@ impl ReadHalf { /// has been closed. /// /// XXX: Do not expose this method to the public API. - /// Lifetime requirements for safe recv buffer use are not enforced. pub(crate) async fn read_frame_inner<'f, S>( &mut self, stream: &mut S, @@ -639,11 +642,6 @@ impl ReadHalf { }}; } - static MAX_HEADER_SIZE: usize = 14; - - // websocket max header size - self.buffer.reserve(MAX_HEADER_SIZE); - // Read the first two bytes while self.buffer.remaining() < 2 { eof!(stream.read_buf(&mut self.buffer).await?); @@ -717,7 +715,7 @@ impl ReadHalf { // if we read too much it will stay in the buffer, for the next call to this method let payload = self.buffer.split_to(payload_len); - let frame = Frame::new(fin, opcode, mask, Payload::Bytes(payload.freeze())); + let frame = Frame::new(fin, opcode, mask, Payload::Bytes(payload)); Ok(frame) } }