Skip to content

Commit

Permalink
feat: improve resource management and fix timing issues
Browse files Browse the repository at this point in the history
Overhaul key subsystems to be more robust around resource handling
and timing-sensitive operations:

Gateway/Tokens:
- Use UNIX epoch instead of current time for expired token state
- Fix inverted timestamp comparisons in expiration checks
- Add debug logging for token refresh operations

Player:
- Add is_started() method to track device state
- Prevent ALSA from acquiring device until playback starts
- Improve cleanup ordering in stop()
- Add comprehensive docs for device lifecycle
- Guard player task with device state check

Remote Control:
- Fix race condition in controller connection handshake
- Add timeout for user token refresh operations
- Improve session ID handling
- Order connection setup operations correctly
- Better error documentation
  • Loading branch information
roderickvd committed Dec 1, 2024
1 parent c00aece commit dd9dd13
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 54 deletions.
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,22 @@ and [Conventional Commits](https://www.conventionalcommits.org/en/v1.0.0/).
### Added
- [docs] Comprehensive documentation for all public APIs and internals
- [error] Add `downcast()` method to access underlying error types
- [player] Explicit audio device lifecycle with `start()` and `stop()`
- [player] Explicit audio device lifecycle with `start()`, `stop()` and `is_started()`
- [uuid] `uuid` module providing a fast UUID v4 generator

### Changed
- [docs] Clarify that Deezer Connect control only works from mobile devices
- [gateway] Use UNIX epoch instead of current time for expired token state
- [main] Use kernel instead of distribution version on Linux systems
- [remote] Start/stop audio device on controller connect/disconnect
- [remote] Improve connection handshake ordering and timeout handling

### Fixed
- [protocol] Use epsilon comparison for `Percentage` equality checks
- [remote] Improve queue refresh handling
- [player] Prevent from acquiring output device before playback starts
- [remote] Fix race condition in controller connection setup
- [tokens] Fix token expiration check

## [0.3.0] - 2024-11-28

Expand Down
15 changes: 6 additions & 9 deletions src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,23 +350,19 @@ impl Gateway {
/// * Current time is past expiration time
#[must_use]
pub fn is_expired(&self) -> bool {
if let Some(data) = &self.user_data {
return data.user.options.expiration_timestamp >= SystemTime::now();
}

true
self.expires_at() <= SystemTime::now()
}

/// Returns when the current session will expire.
///
/// Returns current time if no session is active.
/// Returns UNIX epoch if no session is active.
#[must_use]
pub fn expires_at(&self) -> SystemTime {
if let Some(data) = &self.user_data {
return data.user.options.expiration_timestamp;
}

SystemTime::now()
SystemTime::UNIX_EPOCH
}

/// Updates the cached user data.
Expand Down Expand Up @@ -541,6 +537,7 @@ impl Gateway {
/// * Too many devices are registered
pub async fn user_token(&mut self) -> Result<UserToken> {
if self.is_expired() {
debug!("refreshing user token");
self.refresh().await?;
}

Expand All @@ -561,8 +558,8 @@ impl Gateway {
pub fn flush_user_token(&mut self) {
// Force refreshing user data, but do not set `user_data` to `None` so
// so we can continue using the `api_token` it contains.
if let Some(ref mut data) = self.user_data {
data.user.options.expiration_timestamp = SystemTime::now();
if let Some(data) = self.user_data.as_mut() {
data.user.options.expiration_timestamp = SystemTime::UNIX_EPOCH;
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ async fn run(args: Args) -> Result<()> {
"linux" => sysinfo::System::kernel_version(),
_ => sysinfo::System::os_version(),
}
.unwrap_or_else(|| String::from("0"));
.unwrap_or("0".to_string());
if os_name.is_empty()
|| os_name.contains(illegal_chars)
|| os_version.is_empty()
Expand Down
47 changes: 40 additions & 7 deletions src/player.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//! Audio playback and track management.
//!
//! This module handles:
//! * Audio device configuration
Expand All @@ -6,7 +7,16 @@
//! * Volume normalization
//! * Event notifications
//!
//! # Audio Pipeline
//! # Device Management
//!
//! The audio device is handled in three phases:
//! 1. Selection during construction (`new()`)
//! 2. Opening on demand (`start()`)
//! 3. Closing when done (`stop()`)
//!
//! This design prevents ALSA from acquiring the device until it's actually needed.
//!
//! //! # Audio Pipeline
//!
//! The playback pipeline consists of:
//! 1. Track download and decryption
Expand Down Expand Up @@ -222,8 +232,8 @@ impl Player {
/// ```
/// All parts are optional. Use empty string for system default.
///
/// Note: This only selects the audio device but does not open it.
/// Call `start()` to open the device before playback.
/// Note: This only stores the device specification without opening it,
/// preventing ALSA from acquiring the device until `start()` is called.
///
/// # Errors
///
Expand Down Expand Up @@ -296,6 +306,7 @@ impl Player {
/// * Device is not found
/// * Sample rate is invalid
/// * Sample format is not supported
/// * Device cannot be acquired (e.g., in use by another application)
fn get_device(device: &str) -> Result<(rodio::Device, rodio::SupportedStreamConfig)> {
// The device string has the following format:
// "[<host>][|<device>][|<sample rate>][|<sample format>]" (case-insensitive)
Expand Down Expand Up @@ -438,14 +449,15 @@ impl Player {
/// Note: This method is automatically called when the player is dropped,
/// ensuring proper cleanup of audio device resources.
pub fn stop(&mut self) {
debug!("closing output device");

// Don't care if the sink is already dropped: we're already "stopped".
let _ = self.sink_mut().map(|sink| sink.stop());
if let Ok(sink) = self.sink_mut() {
debug!("closing output device");
sink.stop();
}

self.sink = None;
self.sources = None;
self.stream = None;
self.sink = None;
}

/// The list of supported sample rates.
Expand Down Expand Up @@ -1217,6 +1229,27 @@ impl Player {
pub fn set_media_url(&mut self, url: Url) {
self.media_url = url;
}

/// Returns whether the audio device is open.
///
/// True if `start()` has been called and the device was successfully opened.
/// False if device has not been opened or has been closed with `stop()`.
///
/// # Example
/// ```
/// let mut player = Player::new(&config, "").await?;
/// assert!(!player.is_started());
///
/// player.start()?;
/// assert!(player.is_started());
///
/// player.stop();
/// assert!(!player.is_started());
/// ```
#[must_use]
pub fn is_started(&self) -> bool {
self.sink.is_some()
}
}

impl Drop for Player {
Expand Down
100 changes: 65 additions & 35 deletions src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,14 +243,18 @@ impl Client {
/// Maximum allowed websocket message size in bytes.
const MESSAGE_SIZE_MAX: usize = 8192;

/// TODO
/// Creates a new client instance.
///
/// # Errors
/// # Arguments
///
/// Will return `Err` if:
/// - the `app_version` in `config` is not in [`SemVer`] format
/// * `config` - Configuration including device and authentication settings
/// * `player` - Audio playback manager instance
///
/// [SemVer]: https://semver.org/
/// # Errors
///
/// Returns error if:
/// * Application version in config is not valid `SemVer`
/// * Gateway client creation fails
pub fn new(config: &Config, player: Player) -> Result<Self> {
// Construct version in the form of `Mmmppp` where:
// - `M` is the major version
Expand Down Expand Up @@ -420,13 +424,20 @@ impl Client {
self.player.set_media_url(self.gateway.media_url());
}

/// TODO
/// Starts the client and handles control messages.
///
/// Establishes websocket connection, authenticates, and begins processing:
/// * Controller discovery
/// * Command messages
/// * Playback state updates
/// * Connection maintenance
///
/// # Errors
///
/// Will return `Err` if:
/// - the websocket could not be connected to
/// - sending or receiving messages failed
/// Returns error if:
/// * Authentication fails
/// * Websocket connection fails
/// * Message handling fails critically
pub async fn start(&mut self) -> Result<()> {
if let Credentials::Login { email, password } = &self.credentials.clone() {
info!("logging in with email and password");
Expand Down Expand Up @@ -544,7 +555,7 @@ impl Client {
}
}

Err(e) = self.player.run() => break Err(e),
Err(e) = self.player.run(), if self.player.is_started() => break Err(e),

Some(event) = self.event_rx.recv() => {
self.handle_event(event).await;
Expand Down Expand Up @@ -662,9 +673,14 @@ impl Client {
}
}

/// Checks if current queue is Flow (personalized radio).
/// Checks whether the current queue is a Flow (personalized radio) queue.
///
/// Examines the queue context to determine if it represents a personalized radio stream.
///
/// # Returns
///
/// Examines queue context for Flow indicators.
/// * `true` - Current queue is a Flow queue
/// * `false` - Current queue is not Flow or no queue exists
fn is_flow(&self) -> bool {
self.queue.as_ref().is_some_and(|queue| {
queue
Expand All @@ -679,20 +695,18 @@ impl Client {
})
}

/// Resets receive watchdog timer.
/// Resets the receive watchdog timer.
///
/// Called when messages are received from controller to
/// prevent connection timeout.
/// Called when messages are received from the controller to prevent connection timeout.
fn reset_watchdog_rx(&mut self) {
if let Some(deadline) = from_now(Self::WATCHDOG_RX_TIMEOUT) {
self.watchdog_rx.as_mut().reset(deadline);
}
}

/// Resets transmit watchdog timer.
/// Resets the transmit watchdog timer.
///
/// Called when messages are sent to controller to
/// maintain heartbeat timing.
/// Called when messages are sent to the controller to maintain heartbeat timing.
fn reset_watchdog_tx(&mut self) {
if let Some(deadline) = from_now(Self::WATCHDOG_TX_TIMEOUT) {
self.watchdog_tx.as_mut().reset(deadline);
Expand All @@ -701,13 +715,18 @@ impl Client {

/// Resets the playback reporting timer.
///
/// Schedules next progress report according to reporting interval.
/// Schedules the next progress report according to the reporting interval.
fn reset_reporting_timer(&mut self) {
if let Some(deadline) = from_now(Self::REPORTING_INTERVAL) {
self.reporting_timer.as_mut().reset(deadline);
}
}

/// Stops the client and cleans up resources.
///
/// * Disconnects from controller if connected
/// * Processes remaining events
/// * Unsubscribes from channels
pub async fn stop(&mut self) {
if self.is_connected() {
if let Err(e) = self.disconnect().await {
Expand Down Expand Up @@ -814,6 +833,15 @@ impl Client {
}
}

/// Disconnects from the current controller.
///
/// Sends a close message to the controller and resets connection state.
///
/// # Errors
///
/// Returns error if:
/// * No active controller connection exists
/// * Message send fails
async fn disconnect(&mut self) -> Result<()> {
if let Some(controller) = self.controller() {
let close = Body::Close {
Expand Down Expand Up @@ -1008,30 +1036,31 @@ impl Client {
self.discovery_state = DiscoveryState::Taken;
}

// The unique session ID is used when reporting playback.
self.connection_state = ConnectionState::Connected {
controller: from,
session_id: crate::Uuid::fast_v4().into(),
};

info!("connected to {controller}");
if let Err(e) = self.event_tx.send(Event::Connected) {
error!("failed to send connected event: {e}");
}

// Refreshed the user token on every reconnection in order to reload the user
// configuration, like normalization and audio quality.
let (user_token, time_to_live) = self.user_token().await?;
let (user_token, time_to_live) =
tokio::time::timeout(Self::NETWORK_TIMEOUT, self.user_token()).await??;
self.user_token = Some(user_token);
self.set_player_settings();

// Inform the select loop about the new time to live.
if let Err(e) = self.time_to_live_tx.send(time_to_live).await {
error!("failed to send user token time to live: {e}");
}

// The unique session ID is used when reporting playback.
self.connection_state = ConnectionState::Connected {
controller: from,
session_id: *crate::Uuid::fast_v4(),
};

info!("connected to {controller}");
self.set_player_settings();
self.player.start()?;

if let Err(e) = self.event_tx.send(Event::Connected) {
error!("failed to send connected event: {e}");
}

return Ok(());
}

Expand Down Expand Up @@ -1067,9 +1096,9 @@ impl Client {
))
}

/// Resets connection and discovery state.
/// Resets connection and discovery states.
///
/// Called when connection terminates to:
/// Called when a connection terminates to:
/// * Clear controller association
/// * Reset connection state
/// * Reset discovery state
Expand All @@ -1078,13 +1107,14 @@ impl Client {
fn reset_states(&mut self) {
if let Some(controller) = self.controller() {
info!("disconnected from {controller}");
self.player.stop();

if let Err(e) = self.event_tx.send(Event::Disconnected) {
error!("failed to send disconnected event: {e}");
}
}

self.player.stop();

// Force the user token to be reloaded on the next connection.
self.gateway.flush_user_token();

Expand Down
2 changes: 1 addition & 1 deletion src/tokens.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ impl UserToken {
/// ```
#[must_use]
pub fn is_expired(&self) -> bool {
SystemTime::now() >= self.expires_at
self.expires_at <= SystemTime::now()
}
}

Expand Down

0 comments on commit dd9dd13

Please sign in to comment.