diff --git a/CHANGELOG.md b/CHANGELOG.md index 058e93d0..cdd65798 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,13 +4,15 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [0.6.0] - 2024-07-23 +## [Unreleased](https://github.com/deepgram-devs/deepgram-rust-sdk/compare/0.6.0...HEAD) + +## [0.6.0](https://github.com/deepgram-devs/deepgram-rust-sdk/compare/0.5.0...0.6.0) - 2024-08-08 ### Migrating from 0.4.0 -> 0.6.0 -Module Imports +#### Module Imports -``` +```rust use deepgram::{ --- transcription::prerecorded::{ +++ common::{ @@ -21,10 +23,49 @@ use deepgram::{ }; ``` -Streaming Changes +#### Streaming Changes -Now you can pass Options using stream_request_with_options +We have exposed a low-level, message-based interface to the websocket API: + +```rust +use futures::select; + +let mut handle = dg + .transcription() + .stream_request() + .handle() + .await?; + +loop { + select! { + _ = tokio::time::sleep(Duration::from_secs(3)) => handle.keep_alive().await, + _ = handle.send_data(data_chunk()).fuse() => {} + response = handle.receive().fuse() => { + match response { + Some(response) => println!("{response:?}"), + None => break, + } + } + } +} +handle.close_stream().await; ``` + +No need to call `.start()` to begin streaming data. + +```rust +let mut results = dg + .transcription() + .stream_request_with_options(Some(&options)) + .file(PATH_TO_FILE, AUDIO_CHUNK_SIZE, Duration::from_millis(16)) +--- .await +--- .start() + .await; +``` + +Now you can pass Options using stream_request_with_options + +```rust let options = Options::builder() .smart_format(true) .language(Language::en_US) @@ -35,8 +76,6 @@ let mut results = dg .stream_request_with_options(Some(&options)) .file(PATH_TO_FILE, AUDIO_CHUNK_SIZE, Duration::from_millis(16)) .await? - .start() - .await?; ``` Some Enums have changed and may need to be updated @@ -48,6 +87,7 @@ Some Enums have changed and may need to be updated - Add Speech to Text - Reorganize Code + ### Streaming Features - endpointing - utterance_end_ms @@ -86,13 +126,14 @@ Some Enums have changed and may need to be updated - custom_topics - custom_topic_mode -## [0.5.0] +## [0.5.0](https://github.com/deepgram-devs/deepgram-rust-sdk/compare/0.4.0...0.5.0) - 2024-07-08 + - Deprecate tiers and add explicit support for all currently available models. - Expand language enum to include all currently-supported languages. - Add (default on) feature flags for live and prerecorded transcription. - Support arbitrary query params in transcription options. -## [0.4.0] - 2023-11-01 +## [0.4.0](https://github.com/deepgram-devs/deepgram-rust-sdk/compare/0.3.0...0.4.0) - 2023-11-01 ### Added - `detect_language` option. @@ -101,7 +142,7 @@ Some Enums have changed and may need to be updated - Remove generic from `Deepgram` struct. - Upgrade dependencies: `tungstenite`, `tokio-tungstenite`, `reqwest`. -## [0.3.0] +## [0.3.0](https://github.com/deepgram-devs/deepgram-rust-sdk/compare/0.2.1...0.3.0) - 2023-07-26 ### Added - Derive `Serialize` for all response types. @@ -113,6 +154,3 @@ Some Enums have changed and may need to be updated ### Changed - Use Rustls instead of OpenSSL. -[Unreleased]: https://github.com/deepgram-devs/deepgram-rust-sdk/compare/0.4.0...HEAD -[0.4.0]: https://github.com/deepgram-devs/deepgram-rust-sdk/compare/0.3.0...0.4.0 -[0.3.0]: https://github.com/deepgram-devs/deepgram-rust-sdk/compare/0.2.1...0.3.0 diff --git a/Cargo.toml b/Cargo.toml index 4c2c91a5..50fc389d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,8 @@ uuid = { version = "1", features = ["serde"] } # Dependencies below are specified only to satisfy minimal-versions. proc-macro2 = "1.0.60" pkg-config = { version = "0.3.30", optional = true } +sha256 = "1.5.0" +anyhow = "1.0.86" [dev-dependencies] cpal = "0.13" diff --git a/examples/README.md b/examples/README.md index 4dec1ffa..76dca538 100644 --- a/examples/README.md +++ b/examples/README.md @@ -3,7 +3,7 @@ ### Setting Env Vars ```sh -export FILENAME=./examples/audio/Bueller-Life-moves-pretty-fast.mp3 +export FILENAME=./examples/audio/bueller.wav ``` ### Running the examples @@ -17,5 +17,21 @@ cargo run --example simple_stream ``` ```sh -cargo run --example advanced_stream -``` \ No newline at end of file +cargo run --example callback +``` + +```sh +cargo run --example make_prerecorded_request_builder +``` + +```sh +cargo run --example microphone_stream +``` + +```sh +cargo run --example text_to_speech_to_file +``` + +```sh +cargo run --example text_to_speech_to_stream +``` diff --git a/examples/speak/rest/text_to_speech_to_file.rs b/examples/speak/rest/text_to_speech_to_file.rs index 5040a108..ba42fb8d 100644 --- a/examples/speak/rest/text_to_speech_to_file.rs +++ b/examples/speak/rest/text_to_speech_to_file.rs @@ -10,7 +10,7 @@ async fn main() -> Result<(), DeepgramError> { let deepgram_api_key = env::var("DEEPGRAM_API_KEY").expect("DEEPGRAM_API_KEY environmental variable"); - let dg_client = Deepgram::new(&deepgram_api_key); + let dg_client = Deepgram::new(&deepgram_api_key)?; let options = Options::builder() .model(Model::AuraAsteriaEn) diff --git a/examples/speak/rest/text_to_speech_to_stream.rs b/examples/speak/rest/text_to_speech_to_stream.rs index f019fde3..c2060080 100644 --- a/examples/speak/rest/text_to_speech_to_stream.rs +++ b/examples/speak/rest/text_to_speech_to_stream.rs @@ -90,7 +90,7 @@ async fn main() -> Result<(), DeepgramError> { let deepgram_api_key = env::var("DEEPGRAM_API_KEY").expect("DEEPGRAM_API_KEY environmental variable"); - let dg_client = Deepgram::new(&deepgram_api_key); + let dg_client = Deepgram::new(&deepgram_api_key)?; let sample_rate = 16000; let channels = 1; diff --git a/examples/transcription/rest/callback.rs b/examples/transcription/rest/callback.rs index 17fb72d0..33303317 100644 --- a/examples/transcription/rest/callback.rs +++ b/examples/transcription/rest/callback.rs @@ -15,7 +15,7 @@ async fn main() -> Result<(), DeepgramError> { let deepgram_api_key = env::var("DEEPGRAM_API_KEY").expect("DEEPGRAM_API_KEY environmental variable"); - let dg_client = Deepgram::new(&deepgram_api_key); + let dg_client = Deepgram::new(&deepgram_api_key)?; let source = AudioSource::from_url(AUDIO_URL); diff --git a/examples/transcription/rest/make_prerecorded_request_builder.rs b/examples/transcription/rest/make_prerecorded_request_builder.rs index dbc18fbb..d6776d09 100644 --- a/examples/transcription/rest/make_prerecorded_request_builder.rs +++ b/examples/transcription/rest/make_prerecorded_request_builder.rs @@ -6,17 +6,17 @@ use deepgram::{ batch_response::Response, options::{Language, Options}, }, - Deepgram, + Deepgram, DeepgramError, }; static AUDIO_URL: &str = "https://static.deepgram.com/examples/Bueller-Life-moves-pretty-fast.wav"; #[tokio::main] -async fn main() -> reqwest::Result<()> { +async fn main() -> Result<(), DeepgramError> { let deepgram_api_key = env::var("DEEPGRAM_API_KEY").expect("DEEPGRAM_API_KEY environmental variable"); - let dg_client = Deepgram::new(&deepgram_api_key); + let dg_client = Deepgram::new(&deepgram_api_key)?; let source = AudioSource::from_url(AUDIO_URL); diff --git a/examples/transcription/rest/prerecorded_from_file.rs b/examples/transcription/rest/prerecorded_from_file.rs index 33117489..a40b3216 100644 --- a/examples/transcription/rest/prerecorded_from_file.rs +++ b/examples/transcription/rest/prerecorded_from_file.rs @@ -16,7 +16,7 @@ async fn main() -> Result<(), DeepgramError> { let deepgram_api_key = env::var("DEEPGRAM_API_KEY").expect("DEEPGRAM_API_KEY environmental variable"); - let dg_client = Deepgram::new(&deepgram_api_key); + let dg_client = Deepgram::new(&deepgram_api_key)?; let file = File::open(PATH_TO_FILE).await.unwrap(); diff --git a/examples/transcription/rest/prerecorded_from_url.rs b/examples/transcription/rest/prerecorded_from_url.rs index 220d8b91..506fba86 100644 --- a/examples/transcription/rest/prerecorded_from_url.rs +++ b/examples/transcription/rest/prerecorded_from_url.rs @@ -15,7 +15,7 @@ async fn main() -> Result<(), DeepgramError> { let deepgram_api_key = env::var("DEEPGRAM_API_KEY").expect("DEEPGRAM_API_KEY environmental variable"); - let dg_client = Deepgram::new(&deepgram_api_key); + let dg_client = Deepgram::new(&deepgram_api_key)?; let source = AudioSource::from_url(AUDIO_URL); diff --git a/examples/transcription/websocket/microphone_stream.rs b/examples/transcription/websocket/microphone_stream.rs index 5380f7ca..ab86fe7f 100644 --- a/examples/transcription/websocket/microphone_stream.rs +++ b/examples/transcription/websocket/microphone_stream.rs @@ -90,9 +90,12 @@ fn microphone_as_stream() -> FuturesReceiver> { #[tokio::main] async fn main() -> Result<(), DeepgramError> { - let dg = Deepgram::new(env::var("DEEPGRAM_API_KEY").unwrap()); + let deepgram_api_key = + env::var("DEEPGRAM_API_KEY").expect("DEEPGRAM_API_KEY environmental variable"); - let mut results = dg + let dg_client = Deepgram::new(&deepgram_api_key)?; + + let mut results = dg_client .transcription() .stream_request() .keep_alive() @@ -102,7 +105,6 @@ async fn main() -> Result<(), DeepgramError> { // TODO Specific to my machine, not general enough example. .channels(2) .stream(microphone_as_stream()) - .start() .await?; while let Some(result) = results.next().await { diff --git a/examples/transcription/websocket/simple_stream.rs b/examples/transcription/websocket/simple_stream.rs index 51333153..5aa6f8a4 100644 --- a/examples/transcription/websocket/simple_stream.rs +++ b/examples/transcription/websocket/simple_stream.rs @@ -10,17 +10,21 @@ use deepgram::{ static PATH_TO_FILE: &str = "examples/audio/bueller.wav"; static AUDIO_CHUNK_SIZE: usize = 3174; +static FRAME_DELAY: Duration = Duration::from_millis(16); #[tokio::main] async fn main() -> Result<(), DeepgramError> { - let dg = Deepgram::new(env::var("DEEPGRAM_API_KEY").unwrap()); + let deepgram_api_key = + env::var("DEEPGRAM_API_KEY").expect("DEEPGRAM_API_KEY environmental variable"); + + let dg_client = Deepgram::new(&deepgram_api_key)?; let options = Options::builder() .smart_format(true) .language(Language::en_US) .build(); - let mut results = dg + let mut results = dg_client .transcription() .stream_request_with_options(options) .keep_alive() @@ -32,9 +36,7 @@ async fn main() -> Result<(), DeepgramError> { .utterance_end_ms(1000) .vad_events(true) .no_delay(true) - .file(PATH_TO_FILE, AUDIO_CHUNK_SIZE, Duration::from_millis(16)) - .await? - .start() + .file(PATH_TO_FILE, AUDIO_CHUNK_SIZE, FRAME_DELAY) .await?; while let Some(result) = results.next().await { diff --git a/src/common/options.rs b/src/common/options.rs index 0bf9bb8e..5e0755da 100644 --- a/src/common/options.rs +++ b/src/common/options.rs @@ -50,6 +50,11 @@ pub struct Options { callback_method: Option, } +impl Default for Options { + fn default() -> Self { + Options::builder().build() + } +} /// Detect Language value /// /// See the [Deepgram Detect Language feature docs][docs] for more info. @@ -1018,10 +1023,10 @@ impl OptionsBuilder { /// # /// # static AUDIO_URL: &str = "https://static.deepgram.com/examples/Bueller-Life-moves-pretty-fast.wav"; /// # - /// # fn main() -> Result<(), reqwest::Error> { + /// # fn main() -> Result<(), deepgram::DeepgramError> { /// # let deepgram_api_key = env::var("DEEPGRAM_API_KEY").unwrap_or_default(); /// # - /// let dg_client = Deepgram::new(&deepgram_api_key); + /// let dg_client = Deepgram::new(&deepgram_api_key)?; /// let dg_transcription = dg_client.transcription(); /// /// let options1 = Options::builder() @@ -2365,7 +2370,7 @@ mod serialize_options_tests { fn check_serialization(options: &Options, expected: &str) { let deepgram_api_key = env::var("DEEPGRAM_API_KEY").unwrap_or_default(); - let dg_client = Deepgram::new(deepgram_api_key); + let dg_client = Deepgram::new(deepgram_api_key).unwrap(); let request = dg_client .transcription() diff --git a/src/lib.rs b/src/lib.rs index fce3c6dc..a1d9834c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -151,6 +151,27 @@ pub enum DeepgramError { /// Something went wrong during serialization/deserialization. #[error("Something went wrong during query serialization: {0}")] UrlencodedError(#[from] serde_urlencoded::ser::Error), + + /// The data stream produced an error + #[error("The data stream produced an error: {0}")] + StreamError(#[from] Box), + + /// The provided base url is not valid + #[error("The provided base url is not valid")] + InvalidUrl, + + /// A websocket close from was received indicating an error + #[error("websocket close frame received with error content: code: {code}, reason: {reason}")] + WebsocketClose { + /// The numerical code indicating the reason for the error + code: u16, + /// A textual description of the error reason + reason: String, + }, + + /// An unexpected error occurred in the client + #[error("an unepected error occurred in the deepgram client: {0}")] + InternalClientError(anyhow::Error), } #[cfg_attr(not(feature = "listen"), allow(unused))] @@ -165,12 +186,15 @@ impl Deepgram { /// /// [console]: https://console.deepgram.com/ /// - /// # Panics + /// # Errors /// - /// Panics under the same conditions as [`reqwest::Client::new`]. - pub fn new>(api_key: K) -> Self { + /// Errors under the same conditions as [`reqwest::ClientBuilder::build`]. + pub fn new>(api_key: K) -> Result { let api_key = Some(api_key.as_ref().to_owned()); - Self::inner_constructor(DEEPGRAM_BASE_URL.try_into().unwrap(), api_key) + // This cannot panic because we are converting a static value + // that is known-good. + let base_url = DEEPGRAM_BASE_URL.try_into().unwrap(); + Self::inner_constructor(base_url, api_key) } /// Construct a new Deepgram client with the specified base URL. @@ -200,16 +224,16 @@ impl Deepgram { /// ); /// ``` /// - /// # Panics + /// # Errors /// - /// Panics under the same conditions as [`reqwest::Client::new`], or if `base_url` + /// Errors under the same conditions as [`reqwest::Client::new`], or if `base_url` /// is not a valid URL. - pub fn with_base_url(base_url: U) -> Self + pub fn with_base_url(base_url: U) -> Result where U: TryInto, U::Error: std::fmt::Debug, { - let base_url = base_url.try_into().expect("base_url must be a valid Url"); + let base_url = base_url.try_into().map_err(|_| DeepgramError::InvalidUrl)?; Self::inner_constructor(base_url, None) } @@ -233,24 +257,24 @@ impl Deepgram { /// let deepgram = Deepgram::with_base_url_and_api_key( /// "http://localhost:8080", /// "apikey12345", - /// ); + /// ).unwrap(); /// ``` /// - /// # Panics + /// # Errors /// - /// Panics under the same conditions as [`reqwest::Client::new`], or if `base_url` + /// Errors under the same conditions as [`reqwest::ClientBuilder::build`], or if `base_url` /// is not a valid URL. - pub fn with_base_url_and_api_key(base_url: U, api_key: K) -> Self + pub fn with_base_url_and_api_key(base_url: U, api_key: K) -> Result where U: TryInto, U::Error: std::fmt::Debug, K: AsRef, { - let base_url = base_url.try_into().expect("base_url must be a valid Url"); + let base_url = base_url.try_into().map_err(|_| DeepgramError::InvalidUrl)?; Self::inner_constructor(base_url, Some(api_key.as_ref().to_owned())) } - fn inner_constructor(base_url: Url, api_key: Option) -> Self { + fn inner_constructor(base_url: Url, api_key: Option) -> Result { static USER_AGENT: &str = concat!( env!("CARGO_PKG_NAME"), "/", @@ -258,27 +282,27 @@ impl Deepgram { " rust", ); + if base_url.cannot_be_a_base() { + return Err(DeepgramError::InvalidUrl); + } let authorization_header = { let mut header = HeaderMap::new(); if let Some(api_key) = &api_key { - header.insert( - "Authorization", - HeaderValue::from_str(&format!("Token {}", api_key)).expect("Invalid API key"), - ); + if let Ok(value) = HeaderValue::from_str(&format!("Token {}", api_key)) { + header.insert("Authorization", value); + } } header }; - Deepgram { + Ok(Deepgram { api_key: api_key.map(RedactedString), base_url, client: reqwest::Client::builder() .user_agent(USER_AGENT) .default_headers(authorization_header) - .build() - // Even though `reqwest::Client::new` is not used here, it will always panic under the same conditions - .expect("See reqwest::Client::new docs for cause of panic"), - } + .build()?, + }) } } diff --git a/src/listen/rest.rs b/src/listen/rest.rs index d9e174b8..28963700 100644 --- a/src/listen/rest.rs +++ b/src/listen/rest.rs @@ -43,7 +43,7 @@ impl Transcription<'_> { /// # let deepgram_api_key = /// # env::var("DEEPGRAM_API_KEY").expect("DEEPGRAM_API_KEY environmental variable"); /// # - /// let dg_client = Deepgram::new(&deepgram_api_key); + /// let dg_client = Deepgram::new(&deepgram_api_key)?; /// /// let source = AudioSource::from_url(AUDIO_URL); /// @@ -97,7 +97,7 @@ impl Transcription<'_> { /// # let deepgram_api_key = /// # env::var("DEEPGRAM_API_KEY").expect("DEEPGRAM_API_KEY environmental variable"); /// # - /// let dg_client = Deepgram::new(&deepgram_api_key); + /// let dg_client = Deepgram::new(&deepgram_api_key)?; /// /// let source = AudioSource::from_url(AUDIO_URL); /// @@ -153,12 +153,12 @@ impl Transcription<'_> { /// # static AUDIO_URL: &str = "https://static.deepgram.com/examples/Bueller-Life-moves-pretty-fast.wav"; /// # /// # #[tokio::main] - /// # async fn main() -> reqwest::Result<()> { + /// # async fn main() -> Result<(), DeepgramError> { /// # /// # let deepgram_api_key = /// # env::var("DEEPGRAM_API_KEY").expect("DEEPGRAM_API_KEY environmental variable"); /// # - /// # let dg_client = Deepgram::new(&deepgram_api_key); + /// # let dg_client = Deepgram::new(&deepgram_api_key)?; /// # /// # let source = AudioSource::from_url(AUDIO_URL); /// # @@ -221,12 +221,12 @@ impl Transcription<'_> { /// # static AUDIO_URL: &str = "https://static.deepgram.com/examples/Bueller-Life-moves-pretty-fast.wav"; /// # /// # #[tokio::main] - /// # async fn main() -> reqwest::Result<()> { + /// # async fn main() -> Result<(), DeepgramError> { /// # /// # let deepgram_api_key = /// # env::var("DEEPGRAM_API_KEY").expect("DEEPGRAM_API_KEY environmental variable"); /// # - /// # let dg_client = Deepgram::new(&deepgram_api_key); + /// # let dg_client = Deepgram::new(&deepgram_api_key)?; /// # /// # let source = AudioSource::from_url(AUDIO_URL); /// # @@ -275,7 +275,7 @@ mod tests { #[test] fn listen_url() { - let dg = Deepgram::new("token"); + let dg = Deepgram::new("token").unwrap(); assert_eq!( &dg.transcription().listen_url().to_string(), "https://api.deepgram.com/v1/listen" @@ -284,7 +284,7 @@ mod tests { #[test] fn listen_url_custom_host() { - let dg = Deepgram::with_base_url("http://localhost:8888/abc/"); + let dg = Deepgram::with_base_url("http://localhost:8888/abc/").unwrap(); assert_eq!( &dg.transcription().listen_url().to_string(), "http://localhost:8888/abc/v1/listen" diff --git a/src/listen/websocket.rs b/src/listen/websocket.rs index 91e58461..2bd2fbf8 100644 --- a/src/listen/websocket.rs +++ b/src/listen/websocket.rs @@ -10,30 +10,34 @@ use std::{ error::Error, - fmt::Debug, - marker::PhantomData, + fmt, + ops::Deref, path::Path, pin::Pin, - sync::Arc, task::{Context, Poll}, time::Duration, }; -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; use futures::{ - channel::mpsc::{self, Receiver}, + channel::mpsc::{self, Receiver, Sender}, + future::{pending, FutureExt}, + select_biased, stream::StreamExt, SinkExt, Stream, }; use http::Request; use pin_project::pin_project; use serde_urlencoded; -use tokio::{fs::File, sync::Mutex, time}; -use tokio_tungstenite::tungstenite::protocol::Message; -use tokio_util::io::ReaderStream; -use tungstenite::handshake::client; +use tokio::fs::File; +use tokio_tungstenite::{tungstenite::protocol::Message, MaybeTlsStream, WebSocketStream}; +use tungstenite::{ + handshake::client, + protocol::frame::coding::{Data, OpCode}, +}; use url::Url; +use self::file_chunker::FileChunker; use crate::{ common::{ options::{Encoding, Endpointing, Options}, @@ -44,8 +48,8 @@ use crate::{ static LIVE_LISTEN_URL_PATH: &str = "v1/listen"; -#[derive(Debug)] -pub struct StreamRequestBuilder<'a> { +#[derive(Clone, Debug)] +pub struct WebsocketBuilder<'a> { deepgram: &'a Deepgram, options: Options, encoding: Option, @@ -60,21 +64,73 @@ pub struct StreamRequestBuilder<'a> { keep_alive: Option, } -#[pin_project] -struct FileChunker { - chunk_size: usize, - buf: BytesMut, - #[pin] - file: ReaderStream, -} - impl Transcription<'_> { - pub fn stream_request(&self) -> StreamRequestBuilder<'_> { - self.stream_request_with_options(Options::builder().build()) + /// Begin to configure a websocket request with common options + /// set to their default values. + /// + /// Once configured, the connection can be initiated with any of + /// [`WebsocketBuilder::file`], [`WebsocketBuilder::stream`], or + /// [`WebsocketBuilder::handle`]. + /// + /// ``` + /// use deepgram::{ + /// Deepgram, + /// DeepgramError, + /// common::options::{ + /// DetectLanguage, + /// Encoding, + /// Model, + /// Options, + /// }, + /// listen::websocket::WebsocketBuilder, + /// }; + /// + /// let dg = Deepgram::new(std::env::var("DEEPGRAM_API_TOKEN").unwrap_or_default()).unwrap(); + /// let transcription = dg.transcription(); + /// let builder: WebsocketBuilder<'_> = transcription + /// .stream_request() + /// .no_delay(true); + /// ``` + pub fn stream_request(&self) -> WebsocketBuilder<'_> { + self.stream_request_with_options(Options::default()) } - pub fn stream_request_with_options(&self, options: Options) -> StreamRequestBuilder<'_> { - StreamRequestBuilder { + /// Construct a websocket request with common options + /// specified in [`Options`]. + /// + /// Once configured, the connection can be initiated with any of + /// [`WebsocketBuilder::file`], [`WebsocketBuilder::stream`], or + /// [`WebsocketBuilder::handle`]. + /// + /// ``` + /// use deepgram::{ + /// Deepgram, + /// DeepgramError, + /// common::options::{ + /// DetectLanguage, + /// Encoding, + /// Model, + /// Options, + /// }, + /// }; + /// + /// let dg = Deepgram::new(std::env::var("DEEPGRAM_API_TOKEN").unwrap_or_default()).unwrap(); + /// let transcription = dg.transcription(); + /// let options = Options::builder() + /// .model(Model::Nova2) + /// .detect_language(DetectLanguage::Enabled) + /// .build(); + /// let builder = transcription + /// .stream_request_with_options( + /// options, + /// ) + /// .no_delay(true); + /// + /// assert_eq!(&builder.urlencoded().unwrap(), "model=nova-2&detect_language=true&no_delay=true") + /// ``` + + pub fn stream_request_with_options(&self, options: Options) -> WebsocketBuilder<'_> { + WebsocketBuilder { deepgram: self.0, options, encoding: None, @@ -91,59 +147,22 @@ impl Transcription<'_> { } fn listen_stream_url(&self) -> Url { - let mut url = self.0.base_url.join(LIVE_LISTEN_URL_PATH).unwrap(); + // base + let mut url = + self.0.base_url.join(LIVE_LISTEN_URL_PATH).expect( + "base_url is checked to be a valid base_url when constructing Deepgram client", + ); + match url.scheme() { - "http" | "ws" => url.set_scheme("ws").unwrap(), - "https" | "wss" => url.set_scheme("wss").unwrap(), - _ => panic!("base_url must have a scheme of http, https, ws, or wss"), + "http" | "ws" => url.set_scheme("ws").expect("a valid conversion according to the .set_scheme docs"), + "https" | "wss" => url.set_scheme("wss").expect("a valid conversion according to the .set_scheme docs"), + _ => unreachable!("base_url is validated to have a scheme of http, https, ws, or wss when constructing Deepgram client"), } url } } -impl FileChunker { - fn new(file: File, chunk_size: usize) -> Self { - FileChunker { - chunk_size, - buf: BytesMut::with_capacity(2 * chunk_size), - file: ReaderStream::new(file), - } - } -} - -impl Stream for FileChunker { - type Item = Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let mut this = self.project(); - - while this.buf.len() < *this.chunk_size { - match Pin::new(&mut this.file).poll_next(cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready(next) => match next.transpose() { - Err(e) => return Poll::Ready(Some(Err(DeepgramError::from(e)))), - Ok(None) => { - if this.buf.is_empty() { - return Poll::Ready(None); - } else { - return Poll::Ready(Some(Ok(this - .buf - .split_to(this.buf.len()) - .freeze()))); - } - } - Ok(Some(next)) => { - this.buf.extend_from_slice(&next); - } - }, - } - } - - Poll::Ready(Some(Ok(this.buf.split_to(*this.chunk_size).freeze()))) - } -} - -impl<'a> StreamRequestBuilder<'a> { +impl<'a> WebsocketBuilder<'a> { /// Return the options in urlencoded format. If serialization would /// fail, this will also return an error. /// @@ -160,11 +179,8 @@ impl<'a> StreamRequestBuilder<'a> { /// Options, /// }, /// }; - /// # let mut need_token = std::env::var("DEEPGRAM_API_TOKEN").is_err(); - /// # if need_token { - /// # std::env::set_var("DEEPGRAM_API_TOKEN", "abc") - /// # } - /// let dg = Deepgram::new(std::env::var("DEEPGRAM_API_TOKEN").unwrap()); + /// + /// let dg = Deepgram::new(std::env::var("DEEPGRAM_API_TOKEN").unwrap_or_default()).unwrap(); /// let transcription = dg.transcription(); /// let options = Options::builder() /// .model(Model::Nova2) @@ -176,10 +192,6 @@ impl<'a> StreamRequestBuilder<'a> { /// ) /// .no_delay(true); /// - /// # if need_token { - /// # std::env::remove_var("DEEPGRAM_API_TOKEN"); - /// # } - /// /// assert_eq!(&builder.urlencoded().unwrap(), "model=nova-2&detect_language=true&no_delay=true") /// ``` /// @@ -305,59 +317,325 @@ impl<'a> StreamRequestBuilder<'a> { } } -impl<'a> StreamRequestBuilder<'a> { +impl<'a> WebsocketBuilder<'a> { pub async fn file( self, filename: impl AsRef, frame_size: usize, frame_delay: Duration, - ) -> Result< - StreamRequest<'a, Receiver>, DeepgramError>, - DeepgramError, - > { + ) -> Result { let file = File::open(filename).await?; let mut chunker = FileChunker::new(file, frame_size); - let (mut tx, rx) = mpsc::channel(1); + let (tx, rx) = tokio::sync::mpsc::channel(1); + let rx_stream = tokio_stream::wrappers::ReceiverStream::new(rx); let task = async move { while let Some(frame) = chunker.next().await { tokio::time::sleep(frame_delay).await; // This unwrap() is safe because application logic dictates that the Receiver won't // be dropped before the Sender. - tx.send(frame).await.unwrap(); + if tx.send(frame).await.is_err() { + break; + } } }; tokio::spawn(task); - Ok(self.stream(rx)) + self.stream(rx_stream).await } - pub fn stream(self, stream: S) -> StreamRequest<'a, S, E> { - StreamRequest { - stream, - builder: self, - _err: PhantomData, + + pub async fn stream(self, stream: S) -> Result + where + S: Stream> + Send + Unpin + 'static, + E: Error + Send + Sync + 'static, + { + let handle = self.handle().await?; + + let (tx, rx) = mpsc::channel(1); + let mut is_done = false; + tokio::task::spawn(async move { + let mut handle = handle; + let mut tx = tx; + let mut stream = stream; + loop { + select_biased! { + // Receiving messages from WebsocketHandle + response = handle.receive().fuse() => { + // eprintln!(" got response"); + match response { + Some(Ok(response)) if matches!(response, StreamResponse::TerminalResponse { .. }) => { + // eprintln!( " got terminal response"); + if tx.send(Ok(response)).await.is_err() { + // Receiver has been dropped. + break; + } + } + Some(response) => { + if tx.send(response).await.is_err() { + // Receiver has been dropped. + break; + } + } + None => { + // eprintln!(" got none from handle"); + tx.close_channel(); + // No more responses + break; + } + } + } + // Receiving audio data from stream. + chunk = stream.next().fuse() => { + match chunk { + Some(Ok(audio)) => if let Err(err) = handle.send_data(audio.to_vec()).await { + // eprintln!(" got audio"); + if tx.send(Err(err)).await.is_err() { + break; + } + }, + Some(Err(err)) => { + // eprintln!(" got error"); + if tx.send(Err(DeepgramError::from(Box::new(err) as Box))).await.is_err() { + break; + } + } + None => { + if is_done { + + continue; + } + if let Err(err) = handle.finalize().await { + if tx.send(Err(err)).await.is_err() { + break; + } + } + + if let Err(err) = handle.close_stream().await { + if tx.send(Err(err)).await.is_err() { + break; + } + } + is_done = true; + } + } + } + + } + } + }); + Ok(TranscriptionStream { rx, done: false }) + } + + /// A low level interface to the Deepgram websocket transcription API. + pub async fn handle(self) -> Result { + WebsocketHandle::new(self).await + } +} + +macro_rules! send_message { + ($stream:expr, $response_tx:expr, $msg:expr) => { + if let Err(err) = $stream.send($msg).await { + if $response_tx.send(Err(err.into())).await.is_err() { + // Responses are no longer being received; close the stream. + break; + } } + }; +} +async fn run_worker( + ws_stream: WebSocketStream>, + mut message_tx: Sender, + mut message_rx: Receiver, + mut response_tx: Sender>, + keep_alive: bool, +) -> Result<()> { + // We use Vec for partial frames because we don't know if a fragment of a string is valid utf-8. + let mut partial_frame: Vec = Vec::new(); + let (mut ws_stream_send, mut ws_stream_recv) = ws_stream.split(); + let mut is_open: bool = true; + let mut last_sent_message = tokio::time::Instant::now(); + loop { + // eprintln!(" loop"); + let sleep = tokio::time::sleep_until(last_sent_message + Duration::from_secs(3)); + // Primary event loop. + select_biased! { + _ = sleep.fuse() => { + // eprintln!(" sleep"); + if keep_alive && is_open { + message_tx.send(WsMessage::ControlMessage(ControlMessage::KeepAlive)).await.expect("we hold the receiver, so we know it hasn't been dropped"); + last_sent_message = tokio::time::Instant::now(); + } else { + pending::<()>().await; + } + } + response = ws_stream_recv.next().fuse() => { + match response { + Some(Ok(Message::Text(response))) => { + // eprintln!(" received dg response"); + match serde_json::from_str(&response) { + Ok(response) => { + if (response_tx.send(Ok(response)).await).is_err() { + // Responses are no longer being received; close the stream. + break; + } + } + Err(err) =>{ + if (response_tx.send(Err(err.into())).await).is_err() { + // Responses are no longer being received; close the stream. + break; + } + } + } + } + Some(Ok(Message::Ping(value))) => { + // We don't really care if the server receives the pong. + let _ = ws_stream_send.send(Message::Pong(value)).await; + } + Some(Ok(Message::Close(None))) => { + // eprintln!(" received websocket close"); + return Ok(()); + } + Some(Ok(Message::Close(Some(closeframe)))) => { + // eprintln!(" received websocket close"); + return Err(DeepgramError::WebsocketClose { + code: closeframe.code.into(), + reason: closeframe.reason.into_owned(), + }); + } + + Some(Ok(Message::Frame(frame))) => { + match frame.header().opcode + { + OpCode::Data(Data::Text) => { + partial_frame.extend(frame.payload()); + } + OpCode::Data(Data::Continue) => { + // We know we're continuing a text frame because otherwise + // partial_frame would be empty. + if !partial_frame.is_empty() { + partial_frame.extend(frame.payload()) + } + } + _ => { + // Ignore other partial frames. + } + } + if frame.header().is_final { + let response = std::mem::take(&mut partial_frame); + let response = serde_json::from_slice(&response).map_err(|err| err.into()); + if (response_tx.send(response).await).is_err() { + // Responses are no longer being received; close the stream. + break + } + } + } + Some(Ok(Message::Binary(_) | Message::Pong(_))) => { + // We don't expect binary messages or pongs from the API. + // They can be safely ignored. + } + + Some(Err(err)) => { + if (response_tx.send(Err(err.into())).await).is_err() { + // Responses are no longer being received; close the stream. + break; + } + + } + None => { + // Upstream is closed + // eprintln!(" received None"); + return Ok(()) + } + } + } + message = message_rx.next().fuse() => { + // eprintln!(" received message: {message:?}, {is_open:?}"); + if is_open { + match message { + Some(WsMessage::Audio(audio))=> { + send_message!(ws_stream_send, response_tx, Message::Binary(audio.0)); + last_sent_message = tokio::time::Instant::now(); + + } + Some(WsMessage::ControlMessage(msg)) => { + send_message!(ws_stream_send, response_tx, Message::Text( + serde_json::to_string(&msg).unwrap_or_default() + )); + last_sent_message = tokio::time::Instant::now(); + if msg == ControlMessage::CloseStream { + is_open = false; + } + } + None => { + // Input stream is shut down. Keep processing responses. + send_message!(ws_stream_send, response_tx, Message::Text( + serde_json::to_string(&ControlMessage::CloseStream).unwrap_or_default() + )); + is_open = false; + } + } + } + } + }; + } + // eprintln!(" post loop"); + if let Err(err) = ws_stream_send + .send(Message::Text( + serde_json::to_string(&ControlMessage::CloseStream).unwrap_or_default(), + )) + .await + { + // If the response channel is closed, there's nothing to be done about it now. + let _ = response_tx.send(Err(err.into())).await; + } + response_tx.close_channel(); + // Waiting for message_tx to be dropped before exiting + while message_rx.next().await.is_some() { + // Receiving messages after closing down. Ignore them. + } + // eprintln!(" exit"); + Ok(()) +} + +#[derive(Debug, Clone, PartialEq, Eq)] +enum WsMessage { + Audio(Audio), + ControlMessage(ControlMessage), +} + +#[derive(Clone, PartialEq, Eq)] +struct Audio(Vec); + +impl fmt::Debug for Audio { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_tuple("audio") + .field(&format!( + "<{} bytes (sha256:{})>", + self.0.len(), + &sha256::digest(&self.0)[..12] + )) + .finish() + } +} + +impl Deref for Audio { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + &self.0 } } #[derive(Debug)] -pub struct StreamRequest<'a, S, E> { - stream: S, - builder: StreamRequestBuilder<'a>, - _err: PhantomData, +pub struct WebsocketHandle { + message_tx: Sender, + response_rx: Receiver>, } -impl StreamRequest<'_, S, E> -where - S: Stream> + Send + Unpin + 'static, - E: Error + Debug + Send + Unpin + 'static, -{ - pub async fn start(self) -> Result>> { - let url = self.builder.as_url()?; - let mut source = self - .stream - .map(|res| res.map(|bytes| Message::binary(Vec::from(bytes.as_ref())))); +impl<'a> WebsocketHandle { + async fn new(builder: WebsocketBuilder<'a>) -> Result { + let url = builder.as_url()?; let request = { - let builder = Request::builder() + let http_builder = Request::builder() .method("GET") .uri(url.to_string()) .header("sec-websocket-key", client::generate_key()) @@ -366,98 +644,186 @@ where .header("upgrade", "websocket") .header("sec-websocket-version", "13"); - let builder = if let Some(api_key) = self.builder.deepgram.api_key.as_deref() { - builder.header("authorization", format!("token {}", api_key)) + let builder = if let Some(api_key) = builder.deepgram.api_key.as_deref() { + http_builder.header("authorization", format!("Token {}", api_key)) } else { - builder + http_builder }; builder.body(())? }; + let (ws_stream, _) = tokio_tungstenite::connect_async(request).await?; - let (write, mut read) = ws_stream.split(); - let write = Arc::new(Mutex::new(write)); - let (mut tx, rx) = mpsc::channel::>(1); - - // Spawn the keep-alive task - if self.builder.keep_alive.unwrap_or(false) { - { - let write_clone = Arc::clone(&write); - tokio::spawn(async move { - let mut interval = time::interval(Duration::from_secs(10)); - loop { - interval.tick().await; - let keep_alive_message = - Message::Text("{\"type\": \"KeepAlive\"}".to_string()); - let mut write = write_clone.lock().await; - if let Err(e) = write.send(keep_alive_message).await { - eprintln!("Error Sending Keep Alive: {:?}", e); - break; - } - } - }) - }; + let (message_tx, message_rx) = mpsc::channel(256); + let (response_tx, response_rx) = mpsc::channel(256); + + tokio::task::spawn({ + let message_tx = message_tx.clone(); + run_worker( + ws_stream, + message_tx, + message_rx, + response_tx, + builder.keep_alive.unwrap_or(false), + ) + }); + + Ok(WebsocketHandle { + message_tx, + response_rx, + }) + } + + pub async fn send_data(&mut self, data: Vec) -> Result<()> { + let audio = Audio(data); + // eprintln!(" sending audio: {audio:?}"); + + self.message_tx + .send(WsMessage::Audio(audio)) + .await + .map_err(|err| DeepgramError::InternalClientError(err.into()))?; + Ok(()) + } + + /// Send a Finalize message to the Deepgram API to force the server to process + /// all the audio it has already received. + pub async fn finalize(&mut self) -> Result<()> { + self.send_control_message(ControlMessage::Finalize).await + } + + /// Send a KeepAlive message to the Deepgram API to ensure the connection + /// isn't closed due to long idle times. + pub async fn keep_alive(&mut self) -> Result<()> { + self.send_control_message(ControlMessage::KeepAlive).await + } + + /// Close the websocket stream. No more data should be sent after this is called. + pub async fn close_stream(&mut self) -> Result<()> { + if !self.message_tx.is_closed() { + self.send_control_message(ControlMessage::CloseStream) + .await?; + self.message_tx.close_channel(); } + Ok(()) + } - let write_clone = Arc::clone(&write); - let send_task = async move { - while let Some(frame) = source.next().await { - match frame { - Ok(frame) => { - let mut write = write_clone.lock().await; - if let Err(e) = write.send(frame).await { - println!("Error sending frame: {:?}", e); - break; - } - } - Err(e) => { - println!("Error receiving from source: {:?}", e); - break; - } - } - } + async fn send_control_message(&mut self, message: ControlMessage) -> Result<()> { + // eprintln!(" sending control message: {message:?}"); + self.message_tx + .send(WsMessage::ControlMessage(message.clone())) + .await + .map_err(|err| { + // eprintln!(" error sending control message: {message:?}"); + DeepgramError::InternalClientError(err.into()) + })?; + // eprintln!(" sent control message"); + Ok(()) + } + + #[allow(clippy::let_and_return)] + pub async fn receive(&mut self) -> Option> { + let resp = self.response_rx.next().await; + // eprintln!(" receiving response: {resp:?}"); + resp + } +} + +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)] +#[serde(tag = "type")] +enum ControlMessage { + Finalize, + KeepAlive, + CloseStream, +} + +#[derive(Debug)] +#[pin_project] +pub struct TranscriptionStream { + #[pin] + rx: Receiver>, + done: bool, +} - let mut write = write_clone.lock().await; - if let Err(e) = write.send(Message::binary([])).await { - println!("Error sending final frame: {:?}", e); +impl Stream for TranscriptionStream { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + this.rx.poll_next(cx) + } +} + +mod file_chunker { + use bytes::{Bytes, BytesMut}; + use futures::Stream; + use pin_project::pin_project; + use std::{ + pin::Pin, + task::{Context, Poll}, + }; + use tokio::fs::File; + use tokio_util::io::ReaderStream; + + use crate::{DeepgramError, Result}; + + #[pin_project] + pub(super) struct FileChunker { + chunk_size: usize, + buf: BytesMut, + #[pin] + file: ReaderStream, + } + + impl FileChunker { + pub(super) fn new(file: File, chunk_size: usize) -> Self { + FileChunker { + chunk_size, + buf: BytesMut::with_capacity(2 * chunk_size), + file: ReaderStream::new(file), } - }; + } + } - let recv_task = async move { - loop { - match read.next().await { - None => break, - Some(Ok(msg)) => { - if let Message::Text(txt) = msg { - let resp = serde_json::from_str(&txt).map_err(DeepgramError::from); - tx.send(resp) - .await - // This unwrap is probably not safe. - .unwrap(); + impl Stream for FileChunker { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let mut this = self.project(); + + while this.buf.len() < *this.chunk_size { + match Pin::new(&mut this.file).poll_next(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(next) => match next.transpose() { + Err(e) => return Poll::Ready(Some(Err(DeepgramError::from(e)))), + Ok(None) => { + if this.buf.is_empty() { + return Poll::Ready(None); + } else { + return Poll::Ready(Some(Ok(this + .buf + .split_to(this.buf.len()) + .freeze()))); + } } - } - Some(e) => { - let _ = dbg!(e); - break; - } + Ok(Some(next)) => { + this.buf.extend_from_slice(&next); + } + }, } } - }; - tokio::spawn(async move { - tokio::join!(send_task, recv_task); - }); - - Ok(rx) + Poll::Ready(Some(Ok(this.buf.split_to(*this.chunk_size).freeze()))) + } } } #[cfg(test)] mod tests { + use super::ControlMessage; use crate::common::options::Options; #[test] fn test_stream_url() { - let dg = crate::Deepgram::new("token"); + let dg = crate::Deepgram::new("token").unwrap(); assert_eq!( dg.transcription().listen_stream_url().to_string(), "wss://api.deepgram.com/v1/listen", @@ -466,7 +832,8 @@ mod tests { #[test] fn test_stream_url_custom_host() { - let dg = crate::Deepgram::with_base_url_and_api_key("http://localhost:8080", "token"); + let dg = + crate::Deepgram::with_base_url_and_api_key("http://localhost:8080", "token").unwrap(); assert_eq!( dg.transcription().listen_stream_url().to_string(), "ws://localhost:8080/v1/listen", @@ -475,10 +842,18 @@ mod tests { #[test] fn query_escaping() { - let dg = crate::Deepgram::new("token"); + let dg = crate::Deepgram::new("token").unwrap(); let opts = Options::builder().custom_topics(["A&R"]).build(); let transcription = dg.transcription(); let builder = transcription.stream_request_with_options(opts.clone()); assert_eq!(builder.urlencoded().unwrap(), opts.urlencoded().unwrap()) } + + #[test] + fn control_message_format() { + assert_eq!( + &serde_json::to_string(&ControlMessage::CloseStream).unwrap(), + r#"{"type":"CloseStream"}"# + ); + } } diff --git a/src/manage/billing.rs b/src/manage/billing.rs index 6b5c3391..28e4acad 100644 --- a/src/manage/billing.rs +++ b/src/manage/billing.rs @@ -56,7 +56,7 @@ impl Billing<'_> { /// # let project_id = /// # env::var("DEEPGRAM_PROJECT_ID").expect("DEEPGRAM_PROJECT_ID environmental variable"); /// # - /// let dg_client = Deepgram::new(&deepgram_api_key); + /// let dg_client = Deepgram::new(&deepgram_api_key)?; /// /// let balances = dg_client /// .billing() @@ -98,7 +98,7 @@ impl Billing<'_> { /// # let balance_id = /// # env::var("DEEPGRAM_BALANCE_ID").expect("DEEPGRAM_BALANCE_ID environmental variable"); /// # - /// let dg_client = Deepgram::new(&deepgram_api_key); + /// let dg_client = Deepgram::new(&deepgram_api_key)?; /// /// let balance = dg_client /// .billing() diff --git a/src/manage/invitations.rs b/src/manage/invitations.rs index 03e45b6b..7e730021 100644 --- a/src/manage/invitations.rs +++ b/src/manage/invitations.rs @@ -56,7 +56,7 @@ impl Invitations<'_> { /// # let project_id = /// # env::var("DEEPGRAM_PROJECT_ID").expect("DEEPGRAM_PROJECT_ID environmental variable"); /// # - /// let dg_client = Deepgram::new(&deepgram_api_key); + /// let dg_client = Deepgram::new(&deepgram_api_key)?; /// /// dg_client /// .invitations() diff --git a/src/manage/keys.rs b/src/manage/keys.rs index 5b8fcfda..e28bc654 100644 --- a/src/manage/keys.rs +++ b/src/manage/keys.rs @@ -63,7 +63,7 @@ impl Keys<'_> { /// # let project_id = /// # env::var("DEEPGRAM_PROJECT_ID").expect("DEEPGRAM_PROJECT_ID environmental variable"); /// # - /// let dg_client = Deepgram::new(&deepgram_api_key); + /// let dg_client = Deepgram::new(&deepgram_api_key)?; /// /// let keys = dg_client /// .keys() @@ -102,7 +102,7 @@ impl Keys<'_> { /// # /// # let key_id = env::var("DEEPGRAM_KEY_ID").expect("DEEPGRAM_KEY_ID environmental variable"); /// # - /// let dg_client = Deepgram::new(&deepgram_api_key); + /// let dg_client = Deepgram::new(&deepgram_api_key)?; /// /// let key = dg_client /// .keys() @@ -144,7 +144,7 @@ impl Keys<'_> { /// # /// # let key_id = env::var("DEEPGRAM_KEY_ID").expect("DEEPGRAM_KEY_ID environmental variable"); /// # - /// let dg_client = Deepgram::new(&deepgram_api_key); + /// let dg_client = Deepgram::new(&deepgram_api_key)?; /// /// let options = Options::builder("New Key", ["member"]).build(); /// let new_key = dg_client @@ -189,7 +189,7 @@ impl Keys<'_> { /// # /// # let key_id = env::var("DEEPGRAM_KEY_ID").expect("DEEPGRAM_KEY_ID environmental variable"); /// # - /// let dg_client = Deepgram::new(&deepgram_api_key); + /// let dg_client = Deepgram::new(&deepgram_api_key)?; /// /// dg_client /// .keys() diff --git a/src/manage/members.rs b/src/manage/members.rs index b5e02b1e..061d9de5 100644 --- a/src/manage/members.rs +++ b/src/manage/members.rs @@ -56,7 +56,7 @@ impl Members<'_> { /// # let project_id = /// # env::var("DEEPGRAM_PROJECT_ID").expect("DEEPGRAM_PROJECT_ID environmental variable"); /// # - /// let dg_client = Deepgram::new(&deepgram_api_key); + /// let dg_client = Deepgram::new(&deepgram_api_key)?; /// /// let members = dg_client /// .members() @@ -99,7 +99,7 @@ impl Members<'_> { /// # let member_id = /// # env::var("DEEPGRAM_MEMBER_ID").expect("DEEPGRAM_MEMBER_ID environmental variable"); /// # - /// let dg_client = Deepgram::new(&deepgram_api_key); + /// let dg_client = Deepgram::new(&deepgram_api_key)?; /// /// dg_client /// .members() diff --git a/src/manage/projects.rs b/src/manage/projects.rs index 979a39ca..e566694d 100644 --- a/src/manage/projects.rs +++ b/src/manage/projects.rs @@ -59,7 +59,7 @@ impl Projects<'_> { /// # let deepgram_api_key = /// # env::var("DEEPGRAM_API_KEY").expect("DEEPGRAM_API_KEY environmental variable"); /// # - /// let dg_client = Deepgram::new(&deepgram_api_key); + /// let dg_client = Deepgram::new(&deepgram_api_key)?; /// /// let projects = dg_client /// .projects() @@ -96,7 +96,7 @@ impl Projects<'_> { /// # let project_id = /// # env::var("DEEPGRAM_PROJECT_ID").expect("DEEPGRAM_PROJECT_ID environmental variable"); /// # - /// let dg_client = Deepgram::new(&deepgram_api_key); + /// let dg_client = Deepgram::new(&deepgram_api_key)?; /// /// let project = dg_client /// .projects() @@ -133,7 +133,7 @@ impl Projects<'_> { /// # let project_id = /// # env::var("DEEPGRAM_PROJECT_ID").expect("DEEPGRAM_PROJECT_ID environmental variable"); /// # - /// let dg_client = Deepgram::new(&deepgram_api_key); + /// let dg_client = Deepgram::new(&deepgram_api_key)?; /// /// let options = Options::builder() /// .name("The Transcribinator") @@ -180,7 +180,7 @@ impl Projects<'_> { /// # let project_id = /// # env::var("DEEPGRAM_PROJECT_ID").expect("DEEPGRAM_PROJECT_ID environmental variable"); /// # - /// let dg_client = Deepgram::new(&deepgram_api_key); + /// let dg_client = Deepgram::new(&deepgram_api_key)?; /// /// dg_client /// .projects() diff --git a/src/manage/scopes.rs b/src/manage/scopes.rs index 2ee86604..5634b197 100644 --- a/src/manage/scopes.rs +++ b/src/manage/scopes.rs @@ -61,7 +61,7 @@ impl Scopes<'_> { /// # let member_id = /// # env::var("DEEPGRAM_MEMBER_ID").expect("DEEPGRAM_MEMBER_ID environmental variable"); /// # - /// let dg_client = Deepgram::new(&deepgram_api_key); + /// let dg_client = Deepgram::new(&deepgram_api_key)?; /// /// let scopes = dg_client /// .scopes() @@ -108,7 +108,7 @@ impl Scopes<'_> { /// # let member_id = /// # env::var("DEEPGRAM_MEMBER_ID").expect("DEEPGRAM_MEMBER_ID environmental variable"); /// # - /// let dg_client = Deepgram::new(&deepgram_api_key); + /// let dg_client = Deepgram::new(&deepgram_api_key)?; /// /// dg_client /// .scopes() diff --git a/src/manage/usage.rs b/src/manage/usage.rs index 8b5d110c..497e1a4d 100644 --- a/src/manage/usage.rs +++ b/src/manage/usage.rs @@ -62,7 +62,7 @@ impl Usage<'_> { /// # let project_id = /// # env::var("DEEPGRAM_PROJECT_ID").expect("DEEPGRAM_PROJECT_ID environmental variable"); /// # - /// let dg_client = Deepgram::new(&deepgram_api_key); + /// let dg_client = Deepgram::new(&deepgram_api_key)?; /// /// let options = list_requests_options::Options::builder().build(); /// let requests = dg_client @@ -118,7 +118,7 @@ impl Usage<'_> { /// # let request_id = /// # env::var("DEEPGRAM_REQUEST_ID").expect("DEEPGRAM_REQUEST_ID environmental variable"); /// # - /// let dg_client = Deepgram::new(&deepgram_api_key); + /// let dg_client = Deepgram::new(&deepgram_api_key)?; /// /// let request = dg_client /// .usage() @@ -161,7 +161,7 @@ impl Usage<'_> { /// # let project_id = /// # env::var("DEEPGRAM_PROJECT_ID").expect("DEEPGRAM_PROJECT_ID environmental variable"); /// # - /// let dg_client = Deepgram::new(&deepgram_api_key); + /// let dg_client = Deepgram::new(&deepgram_api_key)?; /// /// let options = get_usage_options::Options::builder().build(); /// let summary = dg_client @@ -211,7 +211,7 @@ impl Usage<'_> { /// # let project_id = /// # env::var("DEEPGRAM_PROJECT_ID").expect("DEEPGRAM_PROJECT_ID environmental variable"); /// # - /// let dg_client = Deepgram::new(&deepgram_api_key); + /// let dg_client = Deepgram::new(&deepgram_api_key)?; /// /// let options = get_fields_options::Options::builder().build(); /// let summary = dg_client diff --git a/src/speak/rest.rs b/src/speak/rest.rs index d72df283..7a41429a 100644 --- a/src/speak/rest.rs +++ b/src/speak/rest.rs @@ -146,7 +146,7 @@ mod tests { #[test] fn listen_url() { - let dg = Deepgram::new("token"); + let dg = Deepgram::new("token").unwrap(); assert_eq!( &dg.text_to_speech().speak_url().to_string(), "https://api.deepgram.com/v1/speak"