From 3b05ec67463950bd1d4a78ed349623947d8e3da8 Mon Sep 17 00:00:00 2001 From: Nathan Graule Date: Mon, 11 Nov 2024 09:32:03 +0100 Subject: [PATCH] feat(all): duplex audio wrapper for input/output stream pair --- Cargo.lock | 7 ++ Cargo.toml | 1 + examples/duplex.rs | 50 ++++++++++ examples/sine_wave.rs | 42 +------- examples/util/mod.rs | 1 + examples/util/sine.rs | 41 ++++++++ src/audio_buffer.rs | 15 +++ src/backends/mod.rs | 2 +- src/backends/wasapi/stream.rs | 7 +- src/duplex.rs | 180 ++++++++++++++++++++++++++++++++++ src/lib.rs | 1 + 11 files changed, 304 insertions(+), 43 deletions(-) create mode 100644 examples/duplex.rs create mode 100644 examples/util/sine.rs create mode 100644 src/duplex.rs diff --git a/Cargo.lock b/Cargo.lock index 958b766..b031c46 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -301,6 +301,7 @@ dependencies = [ "log", "ndarray", "oneshot", + "rtrb", "thiserror", "windows", ] @@ -527,6 +528,12 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b" +[[package]] +name = "rtrb" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3f94e84c073f3b85d4012b44722fa8842b9986d741590d4f2636ad0a5b14143" + [[package]] name = "rustc-hash" version = "1.1.0" diff --git a/Cargo.toml b/Cargo.toml index 436641b..6d52082 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ log = "0.4.22" ndarray = "0.15.6" oneshot = "0.1.8" thiserror = "1.0.63" +rtrb = "0.3.1" [dev-dependencies] anyhow = "1.0.86" diff --git a/examples/duplex.rs b/examples/duplex.rs new file mode 100644 index 0000000..6e3ad61 --- /dev/null +++ b/examples/duplex.rs @@ -0,0 +1,50 @@ +use crate::util::sine::SineWave; +use anyhow::Result; +use interflow::duplex::AudioDuplexCallback; +use interflow::prelude::*; + +mod util; + +fn main() -> Result<()> { + let input = default_input_device(); + let output = default_output_device(); + let mut input_config = input.default_input_config().unwrap(); + input_config.buffer_size_range = (Some(128), Some(512)); + let mut output_config = output.default_output_config().unwrap(); + output_config.buffer_size_range = (Some(128), Some(512)); + let stream = + duplex::create_duplex_stream(input, input_config, output, output_config, RingMod::new()) + .unwrap(); + println!("Press Enter to stop"); + std::io::stdin().read_line(&mut String::new())?; + stream.eject().unwrap(); + Ok(()) +} + +struct RingMod { + carrier: SineWave, +} + +impl RingMod { + fn new() -> Self { + Self { + carrier: SineWave::new(440.0), + } + } +} + +impl AudioDuplexCallback for RingMod { + fn on_audio_data( + &mut self, + context: AudioCallbackContext, + input: AudioInput, + mut output: AudioOutput, + ) { + let sr = context.stream_config.samplerate as f32; + for i in 0..output.buffer.num_samples() { + let inp = input.buffer.get_frame(i)[0]; + let c = self.carrier.next_sample(sr); + output.buffer.set_mono(i, inp * c); + } + } +} diff --git a/examples/sine_wave.rs b/examples/sine_wave.rs index 3b4dcd7..ee29d7a 100644 --- a/examples/sine_wave.rs +++ b/examples/sine_wave.rs @@ -1,19 +1,16 @@ -use std::f32::consts::TAU; - use anyhow::Result; - +use util::sine::SineWave; use interflow::prelude::*; +mod util; + fn main() -> Result<()> { env_logger::init(); let device = default_output_device(); println!("Using device {}", device.name()); let stream = device - .default_output_stream(SineWave { - frequency: 440., - phase: 0., - }) + .default_output_stream(SineWave::new(440.0)) .unwrap(); println!("Press Enter to stop"); std::io::stdin().read_line(&mut String::new())?; @@ -21,34 +18,3 @@ fn main() -> Result<()> { Ok(()) } -struct SineWave { - frequency: f32, - phase: f32, -} - -impl AudioOutputCallback for SineWave { - fn on_output_data(&mut self, context: AudioCallbackContext, mut output: AudioOutput) { - eprintln!( - "Callback called, timestamp: {:2.3} s", - context.timestamp.as_seconds() - ); - let sr = context.timestamp.samplerate as f32; - for i in 0..output.buffer.num_samples() { - output.buffer.set_mono(i, self.next_sample(sr)); - } - // Reduce amplitude to not blow up speakers and ears - output.buffer.change_amplitude(0.125); - } -} - -impl SineWave { - pub fn next_sample(&mut self, samplerate: f32) -> f32 { - let step = samplerate.recip() * self.frequency; - let y = (TAU * self.phase).sin(); - self.phase += step; - if self.phase > 1. { - self.phase -= 1.; - } - y - } -} diff --git a/examples/util/mod.rs b/examples/util/mod.rs index 3b5980b..4a61895 100644 --- a/examples/util/mod.rs +++ b/examples/util/mod.rs @@ -1 +1,2 @@ pub mod enumerate; +pub mod sine; diff --git a/examples/util/sine.rs b/examples/util/sine.rs new file mode 100644 index 0000000..c297109 --- /dev/null +++ b/examples/util/sine.rs @@ -0,0 +1,41 @@ +use interflow::{AudioCallbackContext, AudioOutput, AudioOutputCallback}; +use std::f32::consts::TAU; + +pub struct SineWave { + pub frequency: f32, + pub phase: f32, +} + +impl AudioOutputCallback for SineWave { + fn on_output_data(&mut self, context: AudioCallbackContext, mut output: AudioOutput) { + eprintln!( + "Callback called, timestamp: {:2.3} s", + context.timestamp.as_seconds() + ); + let sr = context.timestamp.samplerate as f32; + for i in 0..output.buffer.num_samples() { + output.buffer.set_mono(i, self.next_sample(sr)); + } + // Reduce amplitude to not blow up speakers and ears + output.buffer.change_amplitude(0.125); + } +} + +impl SineWave { + pub fn new(frequency: f32) -> Self { + Self { + frequency, + phase: 0.0, + } + } + + pub fn next_sample(&mut self, samplerate: f32) -> f32 { + let step = samplerate.recip() * self.frequency; + let y = (TAU * self.phase).sin(); + self.phase += step; + if self.phase > 1. { + self.phase -= 1.; + } + y + } +} \ No newline at end of file diff --git a/src/audio_buffer.rs b/src/audio_buffer.rs index 240f95b..0410a5b 100644 --- a/src/audio_buffer.rs +++ b/src/audio_buffer.rs @@ -286,6 +286,21 @@ impl AudioBufferBase where S::Elem: Clone, { + /// Returns a mutable view over each channel of the frame at the given index. + /// + /// # Arguments + /// + /// * `sample`: Sample index for the frame to return. + /// + /// # Panics + /// + /// Panics if the sample index is out of range. + /// + /// returns: ArrayBase::Elem>, Dim<[usize; 1]>> + pub fn get_frame_mut(&mut self, sample: usize) -> ArrayViewMut1 { + self.storage.column_mut(sample) + } + /// Sets audio data of a single frame, that is all channels at the specified sample index. /// Panics when the sample is out of range. pub fn set_frame<'a>(&mut self, sample: usize, data: impl AsArray<'a, S::Elem, Ix1>) diff --git a/src/backends/mod.rs b/src/backends/mod.rs index baccd85..74bf030 100644 --- a/src/backends/mod.rs +++ b/src/backends/mod.rs @@ -34,7 +34,7 @@ pub mod wasapi; /// | **Platform** | **Driver** | /// |:------------:|:----------:| /// | Linux | ALSA | -/// | macOS | CoreAudio | +/// | macOS | CoreAudio | /// | Windows | WASAPI | #[cfg(any(os_alsa, os_coreaudio, os_wasapi))] #[allow(clippy::needless_return)] diff --git a/src/backends/wasapi/stream.rs b/src/backends/wasapi/stream.rs index 783c591..f8e2d80 100644 --- a/src/backends/wasapi/stream.rs +++ b/src/backends/wasapi/stream.rs @@ -264,16 +264,15 @@ impl AudioThread Result<(), error::WasapiError> { - let frames_available = unsafe { - self.interface.GetNextPacketSize()? as usize - }; + let frames_available = unsafe { self.interface.GetNextPacketSize()? as usize }; if frames_available == 0 { return Ok(()); } let Some(mut buffer) = AudioCaptureBuffer::::from_client( &self.interface, self.stream_config.channels.count(), - )? else { + )? + else { eprintln!("Null buffer from WASAPI"); return Ok(()); }; diff --git a/src/duplex.rs b/src/duplex.rs new file mode 100644 index 0000000..f56a8fb --- /dev/null +++ b/src/duplex.rs @@ -0,0 +1,180 @@ +use crate::audio_buffer::AudioBuffer; +use crate::channel_map::Bitset; +use crate::{ + AudioCallbackContext, AudioInput, AudioInputCallback, AudioInputDevice, AudioOutput, + AudioOutputCallback, AudioOutputDevice, AudioStreamHandle, SendEverywhereButOnWeb, + StreamConfig, +}; +use ndarray::{ArrayView1, ArrayViewMut1}; +use std::error::Error; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; +use thiserror::Error; + +pub trait AudioDuplexCallback: 'static + SendEverywhereButOnWeb { + fn on_audio_data( + &mut self, + context: AudioCallbackContext, + input: AudioInput, + output: AudioOutput, + ); +} + +pub struct DuplexStream { + input_stream: Box>, + output_stream: Box, Error = Error>>, +} +pub struct InputProxy { + buffer: rtrb::Producer, + output_sample_rate: Arc, +} + +impl AudioInputCallback for InputProxy { + fn on_input_data(&mut self, context: AudioCallbackContext, input: AudioInput) { + if self.buffer.slots() < input.buffer.num_samples() * input.buffer.num_channels() { + eprintln!("Not enough slots to buffer input"); + } + let mut scratch = [0f32; 32]; + let rate = self.output_sample_rate.load(Ordering::SeqCst) as f64 + / context.stream_config.samplerate; + let out_len = (input.buffer.num_samples() as f64 * rate) as usize; + let mut scratch = + ArrayViewMut1::from(&mut scratch[..context.stream_config.channels.count()]); + let rate_recip = rate.recip(); + for i in 0..out_len { + let in_ix = i as f64 / rate_recip; + let i = in_ix.floor() as usize; + let j = i + 1; + if j == out_len { + scratch.assign(&input.buffer.get_frame(i)); + } else { + lerp( + in_ix.fract() as _, + input.buffer.get_frame(i), + input.buffer.get_frame(j), + scratch.view_mut(), + ); + } + for sample in scratch.iter().copied() { + let _ = self.buffer.push(sample); + } + } + } +} + +fn lerp(x: f32, a: ArrayView1, b: ArrayView1, mut out: ArrayViewMut1) { + assert_eq!(out.len(), a.len()); + assert_eq!(out.len(), b.len()); + for i in 0..out.len() { + out[i] = lerpf(x, a[i], b[i]); + } +} + +fn lerpf(x: f32, a: f32, b: f32) -> f32 { + a + (b - a) * x +} + +#[derive(Debug, Error)] +#[error(transparent)] +pub enum DuplexCallbackError { + InputError(InputError), + OutputError(OutputError), + Other(Box), +} + +pub struct DuplexCallback { + input: rtrb::Consumer, + callback: Callback, + storage: AudioBuffer, + output_sample_rate: Arc, +} + +impl DuplexCallback { + pub fn into_inner(self) -> Result> { + Ok(self.callback) + } +} + +impl AudioOutputCallback for DuplexCallback { + fn on_output_data(&mut self, context: AudioCallbackContext, output: AudioOutput) { + self.output_sample_rate + .store(context.stream_config.samplerate as _, Ordering::SeqCst); + let num_channels = self.storage.num_channels(); + for i in 0..output.buffer.num_samples() { + let mut frame = self.storage.get_frame_mut(i); + for ch in 0..num_channels { + frame[ch] = self.input.pop().unwrap_or(0.0); + } + } + let input = AudioInput { + timestamp: context.timestamp, + buffer: self.storage.slice(..output.buffer.num_samples()), + }; + self.callback.on_audio_data(context, input, output); + } +} + +#[derive(Debug)] +pub struct DuplexStreamHandle { + input_handle: InputHandle, + output_handle: OutputHandle, +} + +impl< + Callback, + InputHandle: AudioStreamHandle, + OutputHandle: AudioStreamHandle>, + > AudioStreamHandle for DuplexStreamHandle +{ + type Error = DuplexCallbackError; + + fn eject(self) -> Result { + self.input_handle.eject().map_err(DuplexCallbackError::InputError)?; + let duplex_callback = self.output_handle.eject().map_err(DuplexCallbackError::OutputError)?; + Ok(duplex_callback.into_inner().map_err(DuplexCallbackError::Other)?) + } +} + +pub fn create_duplex_stream< + InputDevice: AudioInputDevice, + OutputDevice: AudioOutputDevice, + Callback: AudioDuplexCallback, +>( + input_device: InputDevice, + input_config: StreamConfig, + output_device: OutputDevice, + output_config: StreamConfig, + callback: Callback, +) -> Result< + DuplexStreamHandle< + InputDevice::StreamHandle, + OutputDevice::StreamHandle>, + >, + DuplexCallbackError, +> { + let (producer, consumer) = rtrb::RingBuffer::new(input_config.samplerate as _); + let output_sample_rate = Arc::new(AtomicU64::new(0)); + let input_handle = input_device.create_input_stream( + input_config, + InputProxy { + buffer: producer, + output_sample_rate: output_sample_rate.clone(), + }, + ).map_err(DuplexCallbackError::InputError)?; + let output_handle = output_device.create_output_stream( + output_config, + DuplexCallback { + input: consumer, + callback, + storage: AudioBuffer::zeroed( + input_config.channels.count(), + input_config.samplerate as _, + ), + output_sample_rate, + }, + ).map_err(DuplexCallbackError::OutputError)?; + Ok(DuplexStreamHandle { + input_handle, + output_handle, + }) +} diff --git a/src/lib.rs b/src/lib.rs index 9693d73..106b0c1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,6 +12,7 @@ pub mod backends; pub mod channel_map; pub mod prelude; pub mod timestamp; +pub mod duplex; /// Audio drivers provide access to the inputs and outputs of physical devices. /// Several drivers might provide the same accesses, some sharing it with other applications,