From dd9dd139c01dfd0ce9f880cf482db00f9e6fdcbe Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Sun, 1 Dec 2024 22:54:11 +0100 Subject: [PATCH] feat: improve resource management and fix timing issues Overhaul key subsystems to be more robust around resource handling and timing-sensitive operations: Gateway/Tokens: - Use UNIX epoch instead of current time for expired token state - Fix inverted timestamp comparisons in expiration checks - Add debug logging for token refresh operations Player: - Add is_started() method to track device state - Prevent ALSA from acquiring device until playback starts - Improve cleanup ordering in stop() - Add comprehensive docs for device lifecycle - Guard player task with device state check Remote Control: - Fix race condition in controller connection handshake - Add timeout for user token refresh operations - Improve session ID handling - Order connection setup operations correctly - Better error documentation --- CHANGELOG.md | 7 +++- src/gateway.rs | 15 +++----- src/main.rs | 2 +- src/player.rs | 47 +++++++++++++++++++---- src/remote.rs | 100 ++++++++++++++++++++++++++++++++----------------- src/tokens.rs | 2 +- 6 files changed, 119 insertions(+), 54 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 47c32d6..b7ec1a9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,17 +10,22 @@ and [Conventional Commits](https://www.conventionalcommits.org/en/v1.0.0/). ### Added - [docs] Comprehensive documentation for all public APIs and internals - [error] Add `downcast()` method to access underlying error types -- [player] Explicit audio device lifecycle with `start()` and `stop()` +- [player] Explicit audio device lifecycle with `start()`, `stop()` and `is_started()` - [uuid] `uuid` module providing a fast UUID v4 generator ### Changed - [docs] Clarify that Deezer Connect control only works from mobile devices +- [gateway] Use UNIX epoch instead of current time for expired token state - [main] Use kernel instead of distribution version on Linux systems - [remote] Start/stop audio device on controller connect/disconnect +- [remote] Improve connection handshake ordering and timeout handling ### Fixed - [protocol] Use epsilon comparison for `Percentage` equality checks - [remote] Improve queue refresh handling +- [player] Prevent from acquiring output device before playback starts +- [remote] Fix race condition in controller connection setup +- [tokens] Fix token expiration check ## [0.3.0] - 2024-11-28 diff --git a/src/gateway.rs b/src/gateway.rs index ed58061..0ec4969 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -350,23 +350,19 @@ impl Gateway { /// * Current time is past expiration time #[must_use] pub fn is_expired(&self) -> bool { - if let Some(data) = &self.user_data { - return data.user.options.expiration_timestamp >= SystemTime::now(); - } - - true + self.expires_at() <= SystemTime::now() } /// Returns when the current session will expire. /// - /// Returns current time if no session is active. + /// Returns UNIX epoch if no session is active. #[must_use] pub fn expires_at(&self) -> SystemTime { if let Some(data) = &self.user_data { return data.user.options.expiration_timestamp; } - SystemTime::now() + SystemTime::UNIX_EPOCH } /// Updates the cached user data. @@ -541,6 +537,7 @@ impl Gateway { /// * Too many devices are registered pub async fn user_token(&mut self) -> Result { if self.is_expired() { + debug!("refreshing user token"); self.refresh().await?; } @@ -561,8 +558,8 @@ impl Gateway { pub fn flush_user_token(&mut self) { // Force refreshing user data, but do not set `user_data` to `None` so // so we can continue using the `api_token` it contains. - if let Some(ref mut data) = self.user_data { - data.user.options.expiration_timestamp = SystemTime::now(); + if let Some(data) = self.user_data.as_mut() { + data.user.options.expiration_timestamp = SystemTime::UNIX_EPOCH; } } diff --git a/src/main.rs b/src/main.rs index 304fc37..89b0a54 100644 --- a/src/main.rs +++ b/src/main.rs @@ -336,7 +336,7 @@ async fn run(args: Args) -> Result<()> { "linux" => sysinfo::System::kernel_version(), _ => sysinfo::System::os_version(), } - .unwrap_or_else(|| String::from("0")); + .unwrap_or("0".to_string()); if os_name.is_empty() || os_name.contains(illegal_chars) || os_version.is_empty() diff --git a/src/player.rs b/src/player.rs index 8caf57b..3802e55 100644 --- a/src/player.rs +++ b/src/player.rs @@ -1,3 +1,4 @@ +//! Audio playback and track management. //! //! This module handles: //! * Audio device configuration @@ -6,7 +7,16 @@ //! * Volume normalization //! * Event notifications //! -//! # Audio Pipeline +//! # Device Management +//! +//! The audio device is handled in three phases: +//! 1. Selection during construction (`new()`) +//! 2. Opening on demand (`start()`) +//! 3. Closing when done (`stop()`) +//! +//! This design prevents ALSA from acquiring the device until it's actually needed. +//! +//! //! # Audio Pipeline //! //! The playback pipeline consists of: //! 1. Track download and decryption @@ -222,8 +232,8 @@ impl Player { /// ``` /// All parts are optional. Use empty string for system default. /// - /// Note: This only selects the audio device but does not open it. - /// Call `start()` to open the device before playback. + /// Note: This only stores the device specification without opening it, + /// preventing ALSA from acquiring the device until `start()` is called. /// /// # Errors /// @@ -296,6 +306,7 @@ impl Player { /// * Device is not found /// * Sample rate is invalid /// * Sample format is not supported + /// * Device cannot be acquired (e.g., in use by another application) fn get_device(device: &str) -> Result<(rodio::Device, rodio::SupportedStreamConfig)> { // The device string has the following format: // "[][|][|][|]" (case-insensitive) @@ -438,14 +449,15 @@ impl Player { /// Note: This method is automatically called when the player is dropped, /// ensuring proper cleanup of audio device resources. pub fn stop(&mut self) { - debug!("closing output device"); - // Don't care if the sink is already dropped: we're already "stopped". - let _ = self.sink_mut().map(|sink| sink.stop()); + if let Ok(sink) = self.sink_mut() { + debug!("closing output device"); + sink.stop(); + } - self.sink = None; self.sources = None; self.stream = None; + self.sink = None; } /// The list of supported sample rates. @@ -1217,6 +1229,27 @@ impl Player { pub fn set_media_url(&mut self, url: Url) { self.media_url = url; } + + /// Returns whether the audio device is open. + /// + /// True if `start()` has been called and the device was successfully opened. + /// False if device has not been opened or has been closed with `stop()`. + /// + /// # Example + /// ``` + /// let mut player = Player::new(&config, "").await?; + /// assert!(!player.is_started()); + /// + /// player.start()?; + /// assert!(player.is_started()); + /// + /// player.stop(); + /// assert!(!player.is_started()); + /// ``` + #[must_use] + pub fn is_started(&self) -> bool { + self.sink.is_some() + } } impl Drop for Player { diff --git a/src/remote.rs b/src/remote.rs index b54db3c..cec42d4 100644 --- a/src/remote.rs +++ b/src/remote.rs @@ -243,14 +243,18 @@ impl Client { /// Maximum allowed websocket message size in bytes. const MESSAGE_SIZE_MAX: usize = 8192; - /// TODO + /// Creates a new client instance. /// - /// # Errors + /// # Arguments /// - /// Will return `Err` if: - /// - the `app_version` in `config` is not in [`SemVer`] format + /// * `config` - Configuration including device and authentication settings + /// * `player` - Audio playback manager instance /// - /// [SemVer]: https://semver.org/ + /// # Errors + /// + /// Returns error if: + /// * Application version in config is not valid `SemVer` + /// * Gateway client creation fails pub fn new(config: &Config, player: Player) -> Result { // Construct version in the form of `Mmmppp` where: // - `M` is the major version @@ -420,13 +424,20 @@ impl Client { self.player.set_media_url(self.gateway.media_url()); } - /// TODO + /// Starts the client and handles control messages. + /// + /// Establishes websocket connection, authenticates, and begins processing: + /// * Controller discovery + /// * Command messages + /// * Playback state updates + /// * Connection maintenance /// /// # Errors /// - /// Will return `Err` if: - /// - the websocket could not be connected to - /// - sending or receiving messages failed + /// Returns error if: + /// * Authentication fails + /// * Websocket connection fails + /// * Message handling fails critically pub async fn start(&mut self) -> Result<()> { if let Credentials::Login { email, password } = &self.credentials.clone() { info!("logging in with email and password"); @@ -544,7 +555,7 @@ impl Client { } } - Err(e) = self.player.run() => break Err(e), + Err(e) = self.player.run(), if self.player.is_started() => break Err(e), Some(event) = self.event_rx.recv() => { self.handle_event(event).await; @@ -662,9 +673,14 @@ impl Client { } } - /// Checks if current queue is Flow (personalized radio). + /// Checks whether the current queue is a Flow (personalized radio) queue. + /// + /// Examines the queue context to determine if it represents a personalized radio stream. + /// + /// # Returns /// - /// Examines queue context for Flow indicators. + /// * `true` - Current queue is a Flow queue + /// * `false` - Current queue is not Flow or no queue exists fn is_flow(&self) -> bool { self.queue.as_ref().is_some_and(|queue| { queue @@ -679,20 +695,18 @@ impl Client { }) } - /// Resets receive watchdog timer. + /// Resets the receive watchdog timer. /// - /// Called when messages are received from controller to - /// prevent connection timeout. + /// Called when messages are received from the controller to prevent connection timeout. fn reset_watchdog_rx(&mut self) { if let Some(deadline) = from_now(Self::WATCHDOG_RX_TIMEOUT) { self.watchdog_rx.as_mut().reset(deadline); } } - /// Resets transmit watchdog timer. + /// Resets the transmit watchdog timer. /// - /// Called when messages are sent to controller to - /// maintain heartbeat timing. + /// Called when messages are sent to the controller to maintain heartbeat timing. fn reset_watchdog_tx(&mut self) { if let Some(deadline) = from_now(Self::WATCHDOG_TX_TIMEOUT) { self.watchdog_tx.as_mut().reset(deadline); @@ -701,13 +715,18 @@ impl Client { /// Resets the playback reporting timer. /// - /// Schedules next progress report according to reporting interval. + /// Schedules the next progress report according to the reporting interval. fn reset_reporting_timer(&mut self) { if let Some(deadline) = from_now(Self::REPORTING_INTERVAL) { self.reporting_timer.as_mut().reset(deadline); } } + /// Stops the client and cleans up resources. + /// + /// * Disconnects from controller if connected + /// * Processes remaining events + /// * Unsubscribes from channels pub async fn stop(&mut self) { if self.is_connected() { if let Err(e) = self.disconnect().await { @@ -814,6 +833,15 @@ impl Client { } } + /// Disconnects from the current controller. + /// + /// Sends a close message to the controller and resets connection state. + /// + /// # Errors + /// + /// Returns error if: + /// * No active controller connection exists + /// * Message send fails async fn disconnect(&mut self) -> Result<()> { if let Some(controller) = self.controller() { let close = Body::Close { @@ -1008,30 +1036,31 @@ impl Client { self.discovery_state = DiscoveryState::Taken; } + // The unique session ID is used when reporting playback. + self.connection_state = ConnectionState::Connected { + controller: from, + session_id: crate::Uuid::fast_v4().into(), + }; + + info!("connected to {controller}"); + if let Err(e) = self.event_tx.send(Event::Connected) { + error!("failed to send connected event: {e}"); + } + // Refreshed the user token on every reconnection in order to reload the user // configuration, like normalization and audio quality. - let (user_token, time_to_live) = self.user_token().await?; + let (user_token, time_to_live) = + tokio::time::timeout(Self::NETWORK_TIMEOUT, self.user_token()).await??; self.user_token = Some(user_token); - self.set_player_settings(); // Inform the select loop about the new time to live. if let Err(e) = self.time_to_live_tx.send(time_to_live).await { error!("failed to send user token time to live: {e}"); } - // The unique session ID is used when reporting playback. - self.connection_state = ConnectionState::Connected { - controller: from, - session_id: *crate::Uuid::fast_v4(), - }; - - info!("connected to {controller}"); + self.set_player_settings(); self.player.start()?; - if let Err(e) = self.event_tx.send(Event::Connected) { - error!("failed to send connected event: {e}"); - } - return Ok(()); } @@ -1067,9 +1096,9 @@ impl Client { )) } - /// Resets connection and discovery state. + /// Resets connection and discovery states. /// - /// Called when connection terminates to: + /// Called when a connection terminates to: /// * Clear controller association /// * Reset connection state /// * Reset discovery state @@ -1078,13 +1107,14 @@ impl Client { fn reset_states(&mut self) { if let Some(controller) = self.controller() { info!("disconnected from {controller}"); - self.player.stop(); if let Err(e) = self.event_tx.send(Event::Disconnected) { error!("failed to send disconnected event: {e}"); } } + self.player.stop(); + // Force the user token to be reloaded on the next connection. self.gateway.flush_user_token(); diff --git a/src/tokens.rs b/src/tokens.rs index 1f8d08b..64647e9 100644 --- a/src/tokens.rs +++ b/src/tokens.rs @@ -163,7 +163,7 @@ impl UserToken { /// ``` #[must_use] pub fn is_expired(&self) -> bool { - SystemTime::now() >= self.expires_at + self.expires_at <= SystemTime::now() } }