diff --git a/CHANGELOG.md b/CHANGELOG.md index ba6d0f3..30213d1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html) and [Conventional Commits](https://www.conventionalcommits.org/en/v1.0.0/). +## [Unreleased] + +### Changed +- [remote] Configure websocket message size limits to prevent memory exhaustion + ## [0.6.1] - 2024-12-13 ### Added diff --git a/Cargo.lock b/Cargo.lock index 20f28db..5684980 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1800,7 +1800,7 @@ dependencies = [ "shell-escape", "stream-download", "sysinfo", - "thiserror 2.0.6", + "thiserror 2.0.7", "time", "tokio", "tokio-tungstenite", @@ -1998,7 +1998,7 @@ dependencies = [ "rustc-hash 2.1.0", "rustls", "socket2", - "thiserror 2.0.6", + "thiserror 2.0.7", "tokio", "tracing", ] @@ -2017,7 +2017,7 @@ dependencies = [ "rustls", "rustls-pki-types", "slab", - "thiserror 2.0.6", + "thiserror 2.0.7", "tinyvec", "tracing", "web-time", @@ -2531,7 +2531,7 @@ dependencies = [ "reqwest", "tap", "tempfile", - "thiserror 2.0.6", + "thiserror 2.0.7", "tokio", "tokio-util", "tracing", @@ -2705,11 +2705,11 @@ dependencies = [ [[package]] name = "thiserror" -version = "2.0.6" +version = "2.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8fec2a1820ebd077e2b90c4df007bebf344cd394098a13c563957d0afc83ea47" +checksum = "93605438cbd668185516ab499d589afb7ee1859ea3d5fc8f6b0755e1c7443767" dependencies = [ - "thiserror-impl 2.0.6", + "thiserror-impl 2.0.7", ] [[package]] @@ -2725,9 +2725,9 @@ dependencies = [ [[package]] name = "thiserror-impl" -version = "2.0.6" +version = "2.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d65750cab40f4ff1929fb1ba509e9914eb756131cef4210da8d5d700d26f6312" +checksum = "e1d8749b4531af2117677a5fcd12b1348a3fe2b81e36e61ffeac5c4aa3273e36" dependencies = [ "proc-macro2", "quote", @@ -2830,9 +2830,9 @@ dependencies = [ [[package]] name = "tokio-tungstenite" -version = "0.24.0" +version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9" +checksum = "c28562dd8aea311048ed1ab9372a6b9a59977e1b308afb87c985c1f2b3206938" dependencies = [ "futures-util", "log", @@ -2936,9 +2936,9 @@ checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" [[package]] name = "tungstenite" -version = "0.24.0" +version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a" +checksum = "326eb16466ed89221beef69dbc94f517ee888bae959895472133924a25f7070e" dependencies = [ "byteorder", "bytes", @@ -2950,7 +2950,7 @@ dependencies = [ "rustls", "rustls-pki-types", "sha1", - "thiserror 1.0.69", + "thiserror 2.0.7", "utf-8", ] diff --git a/Cargo.toml b/Cargo.toml index 34d5a03..32f050b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -82,7 +82,7 @@ tokio = { version = "1", features = [ "sync", "time", ] } -tokio-tungstenite = { version = "0.24", features = ["rustls-tls-native-roots"] } +tokio-tungstenite = { version = "0.25", features = ["rustls-tls-native-roots"] } tokio-util = "0.7" toml = "0.8" url = { version = "2.5", features = ["serde"] } diff --git a/src/remote.rs b/src/remote.rs index 188ed7d..13d0a8a 100644 --- a/src/remote.rs +++ b/src/remote.rs @@ -64,7 +64,9 @@ use log::Level; use semver; use tokio_tungstenite::{ tungstenite::{ - client::ClientRequestBuilder, protocol::frame::Frame, Message as WebsocketMessage, + client::ClientRequestBuilder, + protocol::{frame::Frame, WebSocketConfig}, + Message as WebsocketMessage, }, MaybeTlsStream, WebSocketStream, }; @@ -275,8 +277,17 @@ impl Client { /// Maximum time between sending heartbeats. const WATCHDOG_TX_TIMEOUT: Duration = Duration::from_secs(5); - /// Maximum allowed websocket message size in bytes. - const MESSAGE_SIZE_MAX: usize = 8192; + /// Maximum allowed websocket frame size (payload) in bytes. + /// Set to 32KB (message size / 4) to balance between chunking and overhead. + const FRAME_SIZE_MAX: usize = Self::MESSAGE_SIZE_MAX / 4; + + /// Maximum allowed websocket message size (payload plus headers) in bytes. + /// Set to 128KB (message buffer / 2) to provide backpressure and prevent OOM. + const MESSAGE_SIZE_MAX: usize = Self::MESSAGE_BUFFER_MAX / 2; + + /// Maximum size of the websocket write buffer in bytes. + /// Set to 256KB to provide adequate buffering while preventing memory exhaustion. + const MESSAGE_BUFFER_MAX: usize = 2 * 128 * 1024; /// Creates a new client instance. /// @@ -462,7 +473,12 @@ impl Client { /// Starts the client and handles control messages. /// - /// Establishes websocket connection, authenticates, and begins processing: + /// Establishes websocket connection with configured limits: + /// * Frame size: 32KB maximum + /// * Message size: 128KB maximum + /// * Write buffer: 128KB maximum + /// + /// Authenticates and begins processing: /// * Controller discovery /// * Command messages /// * Playback state updates @@ -509,12 +525,20 @@ impl Client { } } + let config = Some( + WebSocketConfig::default() + .max_write_buffer_size(Self::MESSAGE_BUFFER_MAX) + .max_message_size(Some(Self::MESSAGE_SIZE_MAX)) + .max_frame_size(Some(Self::FRAME_SIZE_MAX)), + ); + let (ws_stream, _) = if let Some(proxy) = proxy::Http::from_env() { info!("using proxy: {proxy}"); let tcp_stream = proxy.connect_async(&uri).await?; - tokio_tungstenite::client_async_tls(request, tcp_stream).await? + tokio_tungstenite::client_async_tls_with_config(request, tcp_stream, config, None) + .await? } else { - tokio_tungstenite::connect_async(request).await? + tokio_tungstenite::connect_async_with_config(request, config, false).await? }; let (websocket_tx, mut websocket_rx) = ws_stream.split(); @@ -1803,7 +1827,7 @@ impl Client { async fn handle_message(&mut self, message: &WebsocketMessage) -> ControlFlow { match message { WebsocketMessage::Text(message) => { - match serde_json::from_str::(message) { + match serde_json::from_str::(message.as_str()) { Ok(message) => { match message.clone() { Message::Receive { contents, .. } => { @@ -2016,7 +2040,7 @@ impl Client { } let json = serde_json::to_string(&message)?; - let frame = WebsocketMessage::Text(json); + let frame = WebsocketMessage::Text(json.into()); self.send_frame(frame).await }