diff --git a/deepgram/__init__.py b/deepgram/__init__.py index 4976afcf..4721eae9 100644 --- a/deepgram/__init__.py +++ b/deepgram/__init__.py @@ -96,12 +96,12 @@ from .client import ( SpeakOptions, SpeakRESTOptions, - # SpeakWebSocketOptions, + SpeakWSOptions, # FileSource, SpeakRestSource, SpeakSource, ) -from .client import SpeakWebSocketEvents +from .client import SpeakWebSocketEvents, SpeakWebSocketMessage ## speak REST from .client import ( @@ -115,21 +115,22 @@ SpeakRESTResponse, ) -# ## speak WebSocket -# from .client import ( -# SpeakWebSocketClient, -# AsyncSpeakWebSocketClient, -# ) -# from .client import ( -# SpeakWebSocketResponse, -# # OpenResponse, -# # MetadataResponse, -# FlushedResponse, -# # CloseResponse, -# # UnhandledResponse, -# WarningResponse, -# # ErrorResponse, -# ) +## speak WebSocket +from .client import ( + SpeakWebSocketClient, + AsyncSpeakWebSocketClient, + SpeakWSClient, + AsyncSpeakWSClient, +) +from .client import ( + # OpenResponse, + # MetadataResponse, + FlushedResponse, + # CloseResponse, + # UnhandledResponse, + WarningResponse, + # ErrorResponse, +) # manage from .client import ManageClient, AsyncManageClient @@ -173,10 +174,26 @@ ) # utilities +# pylint: disable=wrong-import-position from .audio import Microphone from .audio import ( - LOGGING, - CHANNELS, - RATE, - CHUNK, + INPUT_LOGGING, + INPUT_CHANNELS, + INPUT_RATE, + INPUT_CHUNK, ) + +LOGGING = INPUT_LOGGING +CHANNELS = INPUT_CHANNELS +RATE = INPUT_RATE +CHUNK = INPUT_CHUNK + +from .audio import Speaker +from .audio import ( + OUTPUT_LOGGING, + OUTPUT_CHANNELS, + OUTPUT_RATE, + OUTPUT_CHUNK, +) + +# pylint: enable=wrong-import-position diff --git a/deepgram/audio/__init__.py b/deepgram/audio/__init__.py index 80c52ca9..ca0d84a7 100644 --- a/deepgram/audio/__init__.py +++ b/deepgram/audio/__init__.py @@ -3,4 +3,17 @@ # SPDX-License-Identifier: MIT from .microphone import Microphone -from .microphone import LOGGING, CHANNELS, RATE, CHUNK +from .microphone import ( + LOGGING as INPUT_LOGGING, + CHANNELS as INPUT_CHANNELS, + RATE as INPUT_RATE, + CHUNK as INPUT_CHUNK, +) + +from .speaker import Speaker +from .speaker import ( + LOGGING as OUTPUT_LOGGING, + CHANNELS as OUTPUT_CHANNELS, + RATE as OUTPUT_RATE, + CHUNK as OUTPUT_CHUNK, +) diff --git a/deepgram/audio/microphone/constants.py b/deepgram/audio/microphone/constants.py index 6570fd4d..4aed8106 100644 --- a/deepgram/audio/microphone/constants.py +++ b/deepgram/audio/microphone/constants.py @@ -5,7 +5,6 @@ from ...utils import verboselogs # Constants for microphone - LOGGING = verboselogs.WARNING CHANNELS = 1 RATE = 16000 diff --git a/deepgram/audio/microphone/microphone.py b/deepgram/audio/microphone/microphone.py index 179a41b0..11649da8 100644 --- a/deepgram/audio/microphone/microphone.py +++ b/deepgram/audio/microphone/microphone.py @@ -5,7 +5,7 @@ import inspect import asyncio import threading -from typing import Optional, Callable, TYPE_CHECKING +from typing import Optional, Callable, Union, TYPE_CHECKING import logging from ...utils import verboselogs @@ -21,10 +21,10 @@ class Microphone: # pylint: disable=too-many-instance-attributes """ _logger: verboselogs.VerboseLogger - _exit: threading.Event _audio: "pyaudio.PyAudio" _stream: "pyaudio.Stream" + _chunk: int _rate: int _format: int @@ -34,9 +34,10 @@ class Microphone: # pylint: disable=too-many-instance-attributes _asyncio_loop: asyncio.AbstractEventLoop _asyncio_thread: threading.Thread + _exit: threading.Event - _push_callback_org: object - _push_callback: object + _push_callback_org: Optional[Callable] = None + _push_callback: Optional[Callable] = None def __init__( self, @@ -53,6 +54,7 @@ def __init__( self._logger = verboselogs.VerboseLogger(__name__) self._logger.addHandler(logging.StreamHandler()) self._logger.setLevel(verbose) + self._exit = threading.Event() self._audio = pyaudio.PyAudio() @@ -71,9 +73,16 @@ def _start_asyncio_loop(self) -> None: def is_active(self) -> bool: """ - returns True if the stream is active, False otherwise + is_active - returns the state of the stream + + Args: + None + + Returns: + True if the stream is active, False otherwise """ self._logger.debug("Microphone.is_active ENTER") + if self._stream is None: self._logger.error("stream is None") self._logger.debug("Microphone.is_active LEAVE") @@ -87,13 +96,23 @@ def is_active(self) -> bool: def set_callback(self, push_callback: Callable) -> None: """ - Set the callback function to be called when data is received. + set_callback - sets the callback function to be called when data is received. + + Args: + push_callback (Callable): The callback function to be called when data is received. + This should be the websocket send function. + + Returns: + None """ self._push_callback_org = push_callback def start(self) -> bool: """ - starts the microphone stream + starts - starts the microphone stream + + Returns: + bool: True if the stream was started, False otherwise """ self._logger.debug("Microphone.start ENTER") @@ -101,10 +120,10 @@ def start(self) -> bool: self._logger.info("channels: %d", self._channels) self._logger.info("rate: %d", self._rate) self._logger.info("chunk: %d", self._chunk) - self._logger.info("input_device_id: %d", self._input_device_index) + # self._logger.info("input_device_id: %d", self._input_device_index) if self._push_callback_org is None: - self._logger.error("start() failed. No callback set.") + self._logger.error("start failed. No callback set.") self._logger.debug("Microphone.start LEAVE") return False @@ -114,9 +133,13 @@ def start(self) -> bool: self._asyncio_thread = threading.Thread(target=self._start_asyncio_loop) self._asyncio_thread.start() - self._push_callback = lambda data: asyncio.run_coroutine_threadsafe( - self._push_callback_org(data), self._asyncio_loop - ).result() + self._push_callback = lambda data: ( + asyncio.run_coroutine_threadsafe( + self._push_callback_org(data), self._asyncio_loop + ).result() + if self._push_callback_org + else None + ) else: self._logger.verbose("regular threaded callback") self._push_callback = self._push_callback_org @@ -134,7 +157,7 @@ def start(self) -> bool: self._exit.clear() self._stream.start_stream() - self._logger.notice("start() succeeded") + self._logger.notice("start succeeded") self._logger.debug("Microphone.start LEAVE") return True @@ -176,41 +199,50 @@ def _callback( def mute(self) -> bool: """ - Mutes the microphone stream + mute - mutes the microphone stream + + Returns: + bool: True if the stream was muted, False otherwise """ self._logger.debug("Microphone.mute ENTER") if self._stream is None: - self._logger.error("mute() failed. Library not initialized.") + self._logger.error("mute failed. Library not initialized.") self._logger.debug("Microphone.mute LEAVE") return False self._is_muted = True - self._logger.notice("mute() succeeded") + self._logger.notice("mute succeeded") self._logger.debug("Microphone.mute LEAVE") return True def unmute(self) -> bool: """ - Unmutes the microphone stream + unmute - unmutes the microphone stream + + Returns: + bool: True if the stream was unmuted, False otherwise """ self._logger.debug("Microphone.unmute ENTER") if self._stream is None: - self._logger.error("unmute() failed. Library not initialized.") + self._logger.error("unmute failed. Library not initialized.") self._logger.debug("Microphone.unmute LEAVE") return False self._is_muted = False - self._logger.notice("unmute() succeeded") + self._logger.notice("unmute succeeded") self._logger.debug("Microphone.unmute LEAVE") return True def finish(self) -> bool: """ - Stops the microphone stream + finish - stops the microphone stream + + Returns: + bool: True if the stream was stopped, False otherwise """ self._logger.debug("Microphone.finish ENTER") @@ -219,19 +251,24 @@ def finish(self) -> bool: # Stop the stream. if self._stream is not None: + self._logger.notice("stopping stream...") self._stream.stop_stream() self._stream.close() self._stream = None # type: ignore + self._logger.notice("stream stopped") # clean up the thread if ( - inspect.iscoroutinefunction(self._push_callback_org) - and self._asyncio_thread is not None + # inspect.iscoroutinefunction(self._push_callback_org) + # and + self._asyncio_thread + is not None ): + self._logger.notice("stopping asyncio loop...") self._asyncio_loop.call_soon_threadsafe(self._asyncio_loop.stop) self._asyncio_thread.join() self._asyncio_thread = None # type: ignore - self._logger.notice("stream/recv thread joined") + self._logger.notice("_asyncio_thread joined") self._logger.notice("finish succeeded") self._logger.debug("Microphone.finish LEAVE") diff --git a/deepgram/audio/speaker/__init__.py b/deepgram/audio/speaker/__init__.py new file mode 100644 index 00000000..f3ae8ca1 --- /dev/null +++ b/deepgram/audio/speaker/__init__.py @@ -0,0 +1,6 @@ +# Copyright 2023-2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + +from .speaker import Speaker +from .constants import LOGGING, CHANNELS, RATE, CHUNK diff --git a/deepgram/audio/speaker/constants.py b/deepgram/audio/speaker/constants.py new file mode 100644 index 00000000..8aa50820 --- /dev/null +++ b/deepgram/audio/speaker/constants.py @@ -0,0 +1,12 @@ +# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + +from ...utils import verboselogs + +# Constants for microphone +LOGGING = verboselogs.WARNING +TIMEOUT = 0.050 +CHANNELS = 1 +RATE = 48000 +CHUNK = 8194 diff --git a/deepgram/audio/speaker/errors.py b/deepgram/audio/speaker/errors.py new file mode 100644 index 00000000..4d9b95ef --- /dev/null +++ b/deepgram/audio/speaker/errors.py @@ -0,0 +1,21 @@ +# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + + +# exceptions for speaker +class DeepgramSpeakerError(Exception): + """ + Exception raised for known errors related to Speaker library. + + Attributes: + message (str): The error message describing the exception. + """ + + def __init__(self, message: str): + super().__init__(message) + self.name = "DeepgramSpeakerError" + self.message = message + + def __str__(self): + return f"{self.name}: {self.message}" diff --git a/deepgram/audio/speaker/speaker.py b/deepgram/audio/speaker/speaker.py new file mode 100644 index 00000000..55c1bbc5 --- /dev/null +++ b/deepgram/audio/speaker/speaker.py @@ -0,0 +1,306 @@ +# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + +import asyncio +import inspect +import queue +import threading +from typing import Optional, Callable, Union, TYPE_CHECKING +import logging + +from ...utils import verboselogs +from .constants import LOGGING, CHANNELS, RATE, CHUNK, TIMEOUT + +if TYPE_CHECKING: + import pyaudio + + +class Speaker: # pylint: disable=too-many-instance-attributes + """ + This implements a speaker for local audio output. This uses PyAudio under the hood. + """ + + _logger: verboselogs.VerboseLogger + + _audio: "pyaudio.PyAudio" + _stream: "pyaudio.Stream" + + _chunk: int + _rate: int + _channels: int + _output_device_index: Optional[int] + + _queue: queue.Queue + _exit: threading.Event + + _thread: threading.Thread + # _asyncio_loop: asyncio.AbstractEventLoop + # _asyncio_thread: threading.Thread + _receiver_thread: threading.Thread + + _loop: asyncio.AbstractEventLoop + + _push_callback_org: Optional[Callable] = None + _push_callback: Optional[Callable] = None + _pull_callback_org: Optional[Callable] = None + _pull_callback: Optional[Callable] = None + + def __init__( + self, + pull_callback: Optional[Callable] = None, + push_callback: Optional[Callable] = None, + verbose: int = LOGGING, + rate: int = RATE, + chunk: int = CHUNK, + channels: int = CHANNELS, + output_device_index: Optional[int] = None, + ): + # dynamic import of pyaudio as not to force the requirements on the SDK (and users) + import pyaudio # pylint: disable=import-outside-toplevel + + self._logger = verboselogs.VerboseLogger(__name__) + self._logger.addHandler(logging.StreamHandler()) + self._logger.setLevel(verbose) + + self._exit = threading.Event() + self._queue = queue.Queue() + + self._audio = pyaudio.PyAudio() + self._chunk = chunk + self._rate = rate + self._format = pyaudio.paInt16 + self._channels = channels + self._output_device_index = output_device_index + + self._push_callback_org = push_callback + self._pull_callback_org = pull_callback + + def set_push_callback(self, push_callback: Callable) -> None: + """ + set_push_callback - sets the callback function to be called when data is sent. + + Args: + push_callback (Callable): The callback function to be called when data is send. + This should be the websocket handle message function. + + Returns: + None + """ + self._push_callback_org = push_callback + + def set_pull_callback(self, pull_callback: Callable) -> None: + """ + set_pull_callback - sets the callback function to be called when data is received. + + Args: + pull_callback (Callable): The callback function to be called when data is received. + This should be the websocket recv function. + + Returns: + None + """ + self._pull_callback_org = pull_callback + + # def _start_asyncio_loop(self) -> None: + # self._asyncio_loop = asyncio.new_event_loop() + # self._asyncio_loop.run_forever() + + def start(self, active_loop: Optional[asyncio.AbstractEventLoop] = None) -> bool: + """ + starts - starts the Speaker stream + + Args: + socket (Union[SyncClientConnection, AsyncClientConnection]): The socket to receive audio data from. + + Returns: + bool: True if the stream was started, False otherwise + """ + self._logger.debug("Speaker.start ENTER") + + self._logger.info("format: %s", self._format) + self._logger.info("channels: %d", self._channels) + self._logger.info("rate: %d", self._rate) + self._logger.info("chunk: %d", self._chunk) + # self._logger.info("output_device_id: %d", self._output_device_index) + + # Automatically get the current running event loop + if inspect.iscoroutinefunction(self._push_callback_org) and active_loop is None: + self._logger.verbose("get default running asyncio loop") + self._loop = asyncio.get_running_loop() + + self._exit.clear() + self._queue = queue.Queue() + + self._stream = self._audio.open( + format=self._format, + channels=self._channels, + rate=self._rate, + input=False, + output=True, + frames_per_buffer=self._chunk, + output_device_index=self._output_device_index, + ) + + self._push_callback = self._push_callback_org + self._pull_callback = self._pull_callback_org + + # if inspect.iscoroutinefunction( + # self._push_callback_org + # ) or inspect.iscoroutinefunction(self._pull_callback_org): + # self._logger.verbose("Starting asyncio loop...") + # self._asyncio_thread = threading.Thread(target=self._start_asyncio_loop) + # self._asyncio_thread.start() + + # # determine if the push_callback is a coroutine + # if inspect.iscoroutinefunction(self._push_callback_org): + # self._logger.verbose("async/await push callback") + # self._push_callback = lambda data: asyncio.run_coroutine_threadsafe( + # self._push_callback_org(data), self._asyncio_loop + # ).result() + # else: + # self._logger.verbose("threaded push callback") + # self._push_callback = self._push_callback_org + + # if inspect.iscoroutinefunction(self._pull_callback_org): + # self._logger.verbose("async/await pull callback") + # self._pull_callback = lambda: asyncio.run_coroutine_threadsafe( + # self._pull_callback_org(), self._asyncio_loop + # ).result() + # else: + # self._logger.verbose("threaded pull callback") + # self._pull_callback = self._pull_callback_org + + # start the play thread + self._thread = threading.Thread( + target=self._play, args=(self._queue, self._stream, self._exit), daemon=True + ) + self._thread.start() + + # Start the stream + self._stream.start_stream() + + # Start the receiver thread within the start function + self._logger.verbose("Starting receiver thread...") + self._receiver_thread = threading.Thread(target=self._start_receiver) + self._receiver_thread.start() + + self._logger.notice("start succeeded") + self._logger.debug("Speaker.start LEAVE") + + return True + + def _start_receiver(self): + # Check if the socket is an asyncio WebSocket + if inspect.iscoroutinefunction(self._pull_callback_org): + self._logger.verbose("Starting asyncio receiver...") + asyncio.run_coroutine_threadsafe(self._start_asyncio_receiver(), self._loop) + else: + self._logger.verbose("Starting threaded receiver...") + self._start_threaded_receiver() + + async def _start_asyncio_receiver(self): + try: + while True: + if self._exit.is_set(): + self._logger.verbose("Exiting receiver thread...") + break + + message = await self._pull_callback() + if message is None: + self._logger.verbose("No message received...") + continue + + if isinstance(message, str): + self._logger.verbose("Received control message...") + await self._push_callback(message) + elif isinstance(message, bytes): + self._logger.verbose("Received audio data...") + self.add_audio_to_queue(message) + except Exception as e: # pylint: disable=broad-except + self._logger.error("_start_asyncio_receiver exception: %s", str(e)) + + def _start_threaded_receiver(self): + try: + while True: + if self._exit.is_set(): + self._logger.verbose("Exiting receiver thread...") + break + + message = self._pull_callback() + if message is None: + self._logger.verbose("No message received...") + continue + + if isinstance(message, str): + self._logger.verbose("Received control message...") + self._push_callback(message) + elif isinstance(message, bytes): + self._logger.verbose("Received audio data...") + self.add_audio_to_queue(message) + except Exception as e: # pylint: disable=broad-except + self._logger.notice("_start_threaded_receiver exception: %s", str(e)) + + def add_audio_to_queue(self, data: bytes) -> None: + """ + add_audio_to_queue - adds audio data to the Speaker queue + + Args: + data (bytes): The audio data to add to the queue + """ + self._queue.put(data) + + def finish(self) -> bool: + """ + finish - stops the Speaker stream + + Returns: + bool: True if the stream was stopped, False otherwise + """ + self._logger.debug("Speaker.finish ENTER") + + self._logger.notice("signal exit") + self._exit.set() + + if self._stream is not None: + self._logger.notice("stopping stream...") + self._stream.stop_stream() + self._stream.close() + self._stream = None # type: ignore + self._logger.notice("stream stopped") + + self._thread.join() + self._thread = None # type: ignore + + # if self._asyncio_thread is not None: + # self._logger.notice("stopping asyncio loop...") + # self._asyncio_loop.call_soon_threadsafe(self._asyncio_loop.stop) + # self._asyncio_thread.join() + # self._asyncio_thread = None # type: ignore + # self._logger.notice("_asyncio_thread joined") + + if self._receiver_thread is not None: + self._logger.notice("stopping asyncio loop...") + self._receiver_thread.join() + self._receiver_thread = None # type: ignore + self._logger.notice("_receiver_thread joined") + + self._queue = None # type: ignore + + self._logger.notice("finish succeeded") + self._logger.debug("Speaker.finish LEAVE") + + return True + + def _play(self, audio_out, stream, stop): + """ + _play - plays audio data from the Speaker queue callback for portaudio + """ + while not stop.is_set(): + try: + data = audio_out.get(True, TIMEOUT) + stream.write(data) + except queue.Empty: + pass + except Exception as e: # pylint: disable=broad-except + self._logger.error("_play exception: %s", str(e)) diff --git a/deepgram/client.py b/deepgram/client.py index 39cc1361..70abf512 100644 --- a/deepgram/client.py +++ b/deepgram/client.py @@ -100,12 +100,12 @@ from .clients import ( SpeakOptions, SpeakRESTOptions, - # SpeakWebSocketOptions, + SpeakWSOptions, # FileSource, SpeakRestSource, SpeakSource, ) -from .clients import SpeakWebSocketEvents +from .clients import SpeakWebSocketEvents, SpeakWebSocketMessage ## speak REST from .clients import ( @@ -119,21 +119,22 @@ SpeakRESTResponse, ) -# ## speak WebSocket -# from .clients import ( -# SpeakWebSocketClient, -# AsyncSpeakWebSocketClient, -# ) -# from .clients import ( -# SpeakWebSocketResponse, -# # OpenResponse, -# # MetadataResponse, -# FlushedResponse, -# # CloseResponse, -# # UnhandledResponse, -# WarningResponse, -# # ErrorResponse, -# ) +## speak WebSocket +from .clients import ( + SpeakWebSocketClient, + AsyncSpeakWebSocketClient, + SpeakWSClient, + AsyncSpeakWSClient, +) +from .clients import ( + # OpenResponse, + # MetadataResponse, + FlushedResponse, + # CloseResponse, + # UnhandledResponse, + WarningResponse, + # ErrorResponse, +) # manage client classes/input from .clients import ManageClient, AsyncManageClient diff --git a/deepgram/clients/__init__.py b/deepgram/clients/__init__.py index 1b3012f2..69d7e895 100644 --- a/deepgram/clients/__init__.py +++ b/deepgram/clients/__init__.py @@ -105,13 +105,13 @@ from .speak import ( SpeakOptions, SpeakRESTOptions, - # SpeakWebSocketOptions, + SpeakWSOptions, # FileSource, SpeakRestSource, SpeakSource, ) -from .speak import SpeakWebSocketEvents +from .speak import SpeakWebSocketEvents, SpeakWebSocketMessage ## text-to-speech REST from .speak import ( @@ -125,21 +125,22 @@ SpeakRESTResponse, ) -# ## text-to-speech WebSocket -# from .speak import ( -# SpeakWebSocketClient, -# AsyncSpeakWebSocketClient, -# ) -# from .speak import ( -# SpeakWebSocketResponse, -# # OpenResponse, -# # MetadataResponse, -# FlushedResponse, -# # CloseResponse, -# # UnhandledResponse, -# WarningResponse, -# # ErrorResponse, -# ) +## text-to-speech WebSocket +from .speak import ( + SpeakWebSocketClient, + AsyncSpeakWebSocketClient, + SpeakWSClient, + AsyncSpeakWSClient, +) +from .speak import ( + # OpenResponse, + # MetadataResponse, + FlushedResponse, + # CloseResponse, + # UnhandledResponse, + WarningResponse, + # ErrorResponse, +) # manage from .manage import ManageClient, AsyncManageClient diff --git a/deepgram/clients/listen/v1/websocket/async_client.py b/deepgram/clients/listen/v1/websocket/async_client.py index 87603e7a..550bbf2f 100644 --- a/deepgram/clients/listen/v1/websocket/async_client.py +++ b/deepgram/clients/listen/v1/websocket/async_client.py @@ -53,11 +53,10 @@ class AsyncListenWebSocketClient: # pylint: disable=too-many-instance-attribute _socket: WebSocketClientProtocol _event_handlers: Dict[LiveTranscriptionEvents, list] - _last_datagram: Optional[datetime] = None - _listen_thread: Union[asyncio.Task, None] _keep_alive_thread: Union[asyncio.Task, None] _flush_thread: Union[asyncio.Task, None] + _last_datagram: Optional[datetime] = None _kwargs: Optional[Dict] = None _addons: Optional[Dict] = None @@ -79,11 +78,10 @@ def __init__(self, config: DeepgramClientOptions): self._keep_alive_thread = None self._flush_thread = None - # exit + # events self._exit_event = asyncio.Event() - # auto flush - self._flush_event = asyncio.Event() + # init handlers self._event_handlers = { event: [] for event in LiveTranscriptionEvents.__members__.values() } @@ -174,7 +172,7 @@ async def start( self._logger.notice("keepalive is disabled") # flush thread - if self._config.is_auto_flush_enabled(): + if self._config.is_auto_flush_reply_enabled(): self._logger.notice("autoflush is enabled") self._flush_thread = asyncio.create_task(self._flush()) else: @@ -219,7 +217,7 @@ async def start( raise return False - def is_connected(self) -> bool: + async def is_connected(self) -> bool: """ Returns the connection status of the WebSocket. """ @@ -311,7 +309,7 @@ async def _listening(self) -> None: self._logger.verbose("LiveResultResponse: %s", msg_result) # auto flush - if self._config.is_inspecting_messages(): + if self._config.is_inspecting_listen(): inspect_res = await self._inspect(msg_result) if not inspect_res: self._logger.error("inspect_res failed") @@ -400,6 +398,8 @@ async def _listening(self) -> None: self._logger.debug("AsyncListenWebSocketClient._listening LEAVE") return + # we need to explicitly call self._signal_exit() here because we are hanging on a recv() + # note: this is different than the speak websocket client self._logger.error( "ConnectionClosed in AsyncListenWebSocketClient._listening with code %s: %s", e.code, @@ -508,11 +508,13 @@ async def _keep_alive(self) -> None: return except websockets.exceptions.ConnectionClosed as e: - if e.code == 1000: + if e.code in [1000, 1001]: self._logger.notice(f"_keep_alive({e.code}) exiting gracefully") self._logger.debug("AsyncListenWebSocketClient._keep_alive LEAVE") return + # we need to explicitly call self._signal_exit() here because we are hanging on a recv() + # note: this is different than the speak websocket client self._logger.error( "ConnectionClosed in AsyncListenWebSocketClient._keep_alive with code %s: %s", e.code, @@ -635,11 +637,13 @@ async def _flush(self) -> None: return except websockets.exceptions.ConnectionClosed as e: - if e.code == 1000: + if e.code in [1000, 1001]: self._logger.notice(f"_flush({e.code}) exiting gracefully") self._logger.debug("AsyncListenWebSocketClient._flush LEAVE") return + # we need to explicitly call self._signal_exit() here because we are hanging on a recv() + # note: this is different than the speak websocket client self._logger.error( "ConnectionClosed in AsyncListenWebSocketClient._flush with code %s: %s", e.code, @@ -731,6 +735,11 @@ async def send(self, data: Union[str, bytes]) -> bool: self._logger.debug("AsyncListenWebSocketClient.send LEAVE") return False + if not await self.is_connected(): + self._logger.notice("is_connected is False") + self._logger.debug("AsyncListenWebSocketClient.send LEAVE") + return False + if self._socket is not None: try: await self._socket.send(data) @@ -741,7 +750,7 @@ async def send(self, data: Union[str, bytes]) -> bool: raise return True except websockets.exceptions.ConnectionClosed as e: - if e.code == 1000: + if e.code in [1000, 1001]: self._logger.notice(f"send({e.code}) exiting gracefully") self._logger.debug("AsyncListenWebSocketClient.send LEAVE") if self._config.options.get("termination_exception_send") == "true": @@ -897,7 +906,7 @@ async def _signal_exit(self) -> None: except websockets.exceptions.ConnectionClosedOK as e: self._logger.notice("_signal_exit - ConnectionClosedOK: %s", e.code) except websockets.exceptions.ConnectionClosed as e: - self._logger.notice("_signal_exit - ConnectionClosed: %s", e.code) + self._logger.error("_signal_exit - ConnectionClosed: %s", e.code) except websockets.exceptions.WebSocketException as e: self._logger.error("_signal_exit - WebSocketException: %s", str(e)) except Exception as e: # pylint: disable=broad-except @@ -931,6 +940,11 @@ async def _signal_exit(self) -> None: self._socket = None # type: ignore async def _inspect(self, msg_result: LiveResultResponse) -> bool: + # auto flush_inspect is generically used to track any messages you might want to snoop on + # place additional logic here to inspect messages of interest + + # for auto flush functionality + # set the last datagram sentence = msg_result.channel.alternatives[0].transcript if len(sentence) == 0: return True diff --git a/deepgram/clients/listen/v1/websocket/client.py b/deepgram/clients/listen/v1/websocket/client.py index 8f074800..0684149c 100644 --- a/deepgram/clients/listen/v1/websocket/client.py +++ b/deepgram/clients/listen/v1/websocket/client.py @@ -56,11 +56,10 @@ class ListenWebSocketClient: # pylint: disable=too-many-instance-attributes _lock_flush: threading.Lock _event_handlers: Dict[LiveTranscriptionEvents, list] - _last_datagram: Optional[datetime] = None - _listen_thread: Union[threading.Thread, None] _keep_alive_thread: Union[threading.Thread, None] _flush_thread: Union[threading.Thread, None] + _last_datagram: Optional[datetime] = None _kwargs: Optional[Dict] = None _addons: Optional[Dict] = None @@ -88,9 +87,9 @@ def __init__(self, config: DeepgramClientOptions): # auto flush self._last_datagram = None - self._flush_event = threading.Event() self._lock_flush = threading.Lock() + # init handlers self._event_handlers = { event: [] for event in LiveTranscriptionEvents.__members__.values() } @@ -178,7 +177,7 @@ def start( self._logger.notice("keepalive is disabled") # flush thread - if self._config.is_auto_flush_enabled(): + if self._config.is_auto_flush_reply_enabled(): self._logger.notice("autoflush is enabled") self._flush_thread = threading.Thread(target=self._flush) self._flush_thread.start() @@ -295,7 +294,7 @@ def _listening( self._logger.verbose("LiveResultResponse: %s", msg_result) # auto flush - if self._config.is_inspecting_messages(): + if self._config.is_inspecting_listen(): inspect_res = self._inspect(msg_result) if not inspect_res: self._logger.error("inspect_res failed") @@ -379,11 +378,13 @@ def _listening( return except websockets.exceptions.ConnectionClosed as e: - if e.code == 1000: + if e.code in [1000, 1001]: self._logger.notice(f"_listening({e.code}) exiting gracefully") self._logger.debug("ListenWebSocketClient._listening LEAVE") return + # we need to explicitly call self._signal_exit() here because we are hanging on a recv() + # note: this is different than the speak websocket client self._logger.error( "ConnectionClosed in ListenWebSocketClient._listening with code %s: %s", e.code, @@ -486,11 +487,13 @@ def _keep_alive(self) -> None: return except websockets.exceptions.ConnectionClosed as e: - if e.code == 1000: + if e.code in [1000, 1001]: self._logger.notice(f"_keep_alive({e.code}) exiting gracefully") self._logger.debug("ListenWebSocketClient._keep_alive LEAVE") return + # we need to explicitly call self._signal_exit() here because we are hanging on a recv() + # note: this is different than the speak websocket client self._logger.error( "ConnectionClosed in ListenWebSocketClient._keep_alive with code %s: %s", e.code, @@ -575,9 +578,10 @@ def _flush(self) -> None: return delta_in_ms = float(delta_in_ms_str) + _flush_event = threading.Event() while True: try: - self._flush_event.wait(timeout=HALF_SECOND) + _flush_event.wait(timeout=HALF_SECOND) if self._exit_event.is_set(): self._logger.notice("_flush exiting gracefully") @@ -611,11 +615,13 @@ def _flush(self) -> None: return except websockets.exceptions.ConnectionClosed as e: - if e.code == 1000: + if e.code in [1000, 1001]: self._logger.notice(f"_flush({e.code}) exiting gracefully") self._logger.debug("ListenWebSocketClient._flush LEAVE") return + # we need to explicitly call self._signal_exit() here because we are hanging on a recv() + # note: this is different than the speak websocket client self._logger.error( "ConnectionClosed in ListenWebSocketClient._flush with code %s: %s", e.code, @@ -698,18 +704,23 @@ def send(self, data: Union[str, bytes]) -> bool: self._logger.debug("ListenWebSocketClient.send LEAVE") return False + if not self.is_connected(): + self._logger.notice("is_connected is False") + self._logger.debug("ListenWebSocketClient.send LEAVE") + return False + if self._socket is not None: with self._lock_send: try: self._socket.send(data) except websockets.exceptions.ConnectionClosedOK as e: self._logger.notice(f"send() exiting gracefully: {e.code}") - self._logger.debug("ListenWebSocketClient._keep_alive LEAVE") + self._logger.debug("ListenWebSocketClient.send LEAVE") if self._config.options.get("termination_exception_send") == "true": raise return True except websockets.exceptions.ConnectionClosed as e: - if e.code == 1000: + if e.code in [1000, 1001]: self._logger.notice(f"send({e.code}) exiting gracefully") self._logger.debug("ListenWebSocketClient.send LEAVE") if ( @@ -890,6 +901,11 @@ def _signal_exit(self) -> None: self._socket = None # type: ignore def _inspect(self, msg_result: LiveResultResponse) -> bool: + # auto flush_inspect is generically used to track any messages you might want to snoop on + # place additional logic here to inspect messages of interest + + # for auto flush functionality + # set the last datagram sentence = msg_result.channel.alternatives[0].transcript if len(sentence) == 0: return True diff --git a/deepgram/clients/speak/__init__.py b/deepgram/clients/speak/__init__.py index 07d8f42e..012e1461 100644 --- a/deepgram/clients/speak/__init__.py +++ b/deepgram/clients/speak/__init__.py @@ -2,20 +2,22 @@ # Use of this source code is governed by a MIT license that can be found in the LICENSE file. # SPDX-License-Identifier: MIT -from .enums import SpeakWebSocketEvents +from .enums import SpeakWebSocketEvents, SpeakWebSocketMessage from ...options import DeepgramClientOptions, ClientOptionsFromEnv from .client import ( SpeakClient, # backward compat SpeakRESTClient, AsyncSpeakRESTClient, - # SpeakWebSocketClient, - # AsyncSpeakWebSocketClient, + SpeakWebSocketClient, + AsyncSpeakWebSocketClient, + SpeakWSClient, + AsyncSpeakWSClient, ) from .client import ( SpeakOptions, SpeakRESTOptions, - # SpeakWebSocketOptions, + SpeakWSOptions, FileSource, SpeakRestSource, SpeakSource, @@ -23,12 +25,11 @@ from .client import ( SpeakResponse, # backward compat SpeakRESTResponse, - # SpeakWebSocketResponse, - # OpenResponse, - # MetadataResponse, - # FlushedResponse, - # CloseResponse, - # UnhandledResponse, - # WarningResponse, - # ErrorResponse, + OpenResponse, + MetadataResponse, + FlushedResponse, + CloseResponse, + UnhandledResponse, + WarningResponse, + ErrorResponse, ) diff --git a/deepgram/clients/speak/client.py b/deepgram/clients/speak/client.py index fc5e902e..e53c9514 100644 --- a/deepgram/clients/speak/client.py +++ b/deepgram/clients/speak/client.py @@ -5,27 +5,28 @@ from .v1 import ( SpeakRESTClient as SpeakRESTClientLatest, AsyncSpeakRESTClient as AsyncSpeakRESTClientLatest, - # SpeakWebSocketClient as SpeakWebSocketClientLatest, - # AsyncSpeakWebSocketClient as AsyncSpeakWebSocketClientLatest, + SpeakWebSocketClient as SpeakWebSocketClientLatest, + AsyncSpeakWebSocketClient as AsyncSpeakWebSocketClientLatest, + SpeakWSClient as SpeakWSClientLatest, + AsyncSpeakWSClient as AsyncSpeakWSClientLatest, ) from .v1 import ( SpeakOptions as SpeakOptionsLatest, SpeakRESTOptions as SpeakRESTOptionsLatest, - # SpeakWebSocketOptions as SpeakWebSocketOptionsLatest, + SpeakWSOptions as SpeakWSOptionsLatest, FileSource as FileSourceLatest, SpeakRestSource as SpeakRestSourceLatest, SpeakSource as SpeakSourceLatest, ) from .v1 import ( SpeakRESTResponse as SpeakRESTResponseLatest, - # SpeakWebSocketResponse as SpeakWebSocketResponseLatest, - # OpenResponse as OpenResponseLatest, - # MetadataResponse as MetadataResponseLatest, - # FlushedResponse as FlushedResponseLatest, - # CloseResponse as CloseResponseLatest, - # UnhandledResponse as UnhandledResponseLatest, - # WarningResponse as WarningResponseLatest, - # ErrorResponse as ErrorResponseLatest, + OpenResponse as OpenResponseLatest, + MetadataResponse as MetadataResponseLatest, + FlushedResponse as FlushedResponseLatest, + CloseResponse as CloseResponseLatest, + UnhandledResponse as UnhandledResponseLatest, + WarningResponse as WarningResponseLatest, + ErrorResponse as ErrorResponseLatest, ) # The client.py points to the current supported version in the SDK. @@ -35,21 +36,20 @@ # input SpeakOptions = SpeakOptionsLatest SpeakRESTOptions = SpeakRESTOptionsLatest -# SpeakWebSocketOptions = SpeakWebSocketOptionsLatest +SpeakWSOptions = SpeakWSOptionsLatest SpeakRestSource = SpeakRestSourceLatest FileSource = FileSourceLatest SpeakSource = SpeakSourceLatest # output SpeakRESTResponse = SpeakRESTResponseLatest -# SpeakWebSocketResponse = SpeakWebSocketResponseLatest -# OpenResponse = OpenResponseLatest -# MetadataResponse = MetadataResponseLatest -# FlushedResponse = FlushedResponseLatest -# CloseResponse = CloseResponseLatest -# UnhandledResponse = UnhandledResponseLatest -# WarningResponse = WarningResponseLatest -# ErrorResponse = ErrorResponseLatest +OpenResponse = OpenResponseLatest +MetadataResponse = MetadataResponseLatest +FlushedResponse = FlushedResponseLatest +CloseResponse = CloseResponseLatest +UnhandledResponse = UnhandledResponseLatest +WarningResponse = WarningResponseLatest +ErrorResponse = ErrorResponseLatest # backward compatibility @@ -59,5 +59,7 @@ # clients SpeakRESTClient = SpeakRESTClientLatest AsyncSpeakRESTClient = AsyncSpeakRESTClientLatest -# SpeakWebSocketClient = SpeakWebSocketClientLatest -# AsyncSpeakWebSocketClient = AsyncSpeakWebSocketClientLatest +SpeakWSClient = SpeakWSClientLatest +AsyncSpeakWSClient = AsyncSpeakWSClientLatest +SpeakWebSocketClient = SpeakWebSocketClientLatest +AsyncSpeakWebSocketClient = AsyncSpeakWebSocketClientLatest diff --git a/deepgram/clients/speak/enums.py b/deepgram/clients/speak/enums.py index 17007aaf..abffe980 100644 --- a/deepgram/clients/speak/enums.py +++ b/deepgram/clients/speak/enums.py @@ -7,6 +7,17 @@ # Constants mapping to events from the Deepgram API +class SpeakWebSocketMessage(StrEnum): + """ + Enumerates the possible message types that can be received from the Deepgram API + """ + + Speak: str = "Speak" + Flush: str = "Flush" + Reset: str = "Reset" + Close: str = "Close" + + class SpeakWebSocketEvents(StrEnum): """ Enumerates the possible events that can be received from the Deepgram API @@ -16,7 +27,7 @@ class SpeakWebSocketEvents(StrEnum): Close: str = "Close" AudioData: str = "AudioData" Metadata: str = "Metadata" - Flush: str = "Flush" + Flush: str = "Flushed" Unhandled: str = "Unhandled" Error: str = "Error" Warning: str = "Warning" diff --git a/deepgram/clients/speak/v1/__init__.py b/deepgram/clients/speak/v1/__init__.py index 97e63d7a..1e6be776 100644 --- a/deepgram/clients/speak/v1/__init__.py +++ b/deepgram/clients/speak/v1/__init__.py @@ -2,19 +2,16 @@ # Use of this source code is governed by a MIT license that can be found in the LICENSE file. # SPDX-License-Identifier: MIT -from .options import ( - SpeakOptions, -) from .rest import ( SpeakRESTOptions, + SpeakOptions, SpeakRestSource, SpeakSource, FileSource, ) - -# from .websocket import ( -# SpeakWebSocketOptions, -# ) +from .websocket import ( + SpeakWSOptions, +) from ....options import DeepgramClientOptions, ClientOptionsFromEnv # rest @@ -22,15 +19,19 @@ from .rest import SpeakRESTResponse -# # websocket -# from .websocket import SpeakWebSocketClient, AsyncSpeakWebSocketClient -# from .websocket import ( -# SpeakWebSocketResponse, -# OpenResponse, -# MetadataResponse, -# FlushedResponse, -# CloseResponse, -# UnhandledResponse, -# WarningResponse, -# ErrorResponse, -# ) +# websocket +from .websocket import ( + SpeakWebSocketClient, + AsyncSpeakWebSocketClient, + SpeakWSClient, + AsyncSpeakWSClient, +) +from .websocket import ( + OpenResponse, + MetadataResponse, + FlushedResponse, + CloseResponse, + UnhandledResponse, + WarningResponse, + ErrorResponse, +) diff --git a/deepgram/clients/speak/v1/rest/__init__.py b/deepgram/clients/speak/v1/rest/__init__.py index 7024eaa9..3ac3b59c 100644 --- a/deepgram/clients/speak/v1/rest/__init__.py +++ b/deepgram/clients/speak/v1/rest/__init__.py @@ -7,6 +7,7 @@ from .response import SpeakRESTResponse from .options import ( SpeakRESTOptions, + SpeakOptions, SpeakSource, SpeakRestSource, FileSource, diff --git a/deepgram/clients/speak/v1/rest/options.py b/deepgram/clients/speak/v1/rest/options.py index 62e5ba31..db1c1c39 100644 --- a/deepgram/clients/speak/v1/rest/options.py +++ b/deepgram/clients/speak/v1/rest/options.py @@ -3,13 +3,69 @@ # SPDX-License-Identifier: MIT from io import BufferedReader -from typing import Union +from typing import Union, Optional +import logging +from dataclasses import dataclass, field +from dataclasses_json import config as dataclass_config, DataClassJsonMixin + +from .....utils import verboselogs from ....common import FileSource -from ..options import SpeakOptions -SpeakRESTOptions = SpeakOptions +@dataclass +class SpeakRESTOptions(DataClassJsonMixin): + """ + Contains all the options for the SpeakOptions. + + Reference: + https://developers.deepgram.com/reference/text-to-speech-api + """ + + model: Optional[str] = field( + default="aura-asteria-en", + metadata=dataclass_config(exclude=lambda f: f is None), + ) + encoding: Optional[str] = field( + default=None, metadata=dataclass_config(exclude=lambda f: f is None) + ) + container: Optional[str] = field( + default=None, metadata=dataclass_config(exclude=lambda f: f is None) + ) + sample_rate: Optional[int] = field( + default=None, metadata=dataclass_config(exclude=lambda f: f is None) + ) + bit_rate: Optional[int] = field( + default=None, metadata=dataclass_config(exclude=lambda f: f is None) + ) + + def __getitem__(self, key): + _dict = self.to_dict() + return _dict[key] + + def __setitem__(self, key, val): + self.__dict__[key] = val + + def __str__(self) -> str: + return self.to_json(indent=4) + + def check(self): + """ + Check the SpeakOptions for any missing or invalid values. + """ + logger = verboselogs.VerboseLogger(__name__) + logger.addHandler(logging.StreamHandler()) + prev = logger.level + logger.setLevel(verboselogs.ERROR) + + # no op at the moment + + logger.setLevel(prev) + + return True + + +SpeakOptions = SpeakRESTOptions SpeakSource = Union[FileSource, BufferedReader] SpeakRestSource = SpeakSource diff --git a/deepgram/clients/speak/v1/websocket/__init__.py b/deepgram/clients/speak/v1/websocket/__init__.py new file mode 100644 index 00000000..c0faa8d4 --- /dev/null +++ b/deepgram/clients/speak/v1/websocket/__init__.py @@ -0,0 +1,16 @@ +# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + +from .client import SpeakWebSocketClient, SpeakWSClient +from .async_client import AsyncSpeakWebSocketClient, AsyncSpeakWSClient +from .response import ( + OpenResponse, + MetadataResponse, + FlushedResponse, + CloseResponse, + UnhandledResponse, + WarningResponse, + ErrorResponse, +) +from .options import SpeakWSOptions diff --git a/deepgram/clients/speak/v1/websocket/async_client.py b/deepgram/clients/speak/v1/websocket/async_client.py new file mode 100644 index 00000000..a9a1d824 --- /dev/null +++ b/deepgram/clients/speak/v1/websocket/async_client.py @@ -0,0 +1,898 @@ +# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + +import asyncio +import json +import logging +from typing import Dict, Union, Optional, cast, Any +from datetime import datetime +import threading + +import websockets +from websockets.client import WebSocketClientProtocol + +from .....utils import verboselogs +from .....options import DeepgramClientOptions +from ...enums import SpeakWebSocketEvents, SpeakWebSocketMessage +from .helpers import convert_to_websocket_url, append_query_params +from ...errors import DeepgramError + +from .response import ( + OpenResponse, + MetadataResponse, + FlushedResponse, + CloseResponse, + WarningResponse, + ErrorResponse, + UnhandledResponse, +) +from .options import SpeakWSOptions + +from .....audio.speaker import Speaker + +ONE_SECOND = 1 +HALF_SECOND = 0.5 +DEEPGRAM_INTERVAL = 5 +PING_INTERVAL = 20 + + +class AsyncSpeakWSClient: # pylint: disable=too-many-instance-attributes + """ + Client for interacting with Deepgram's text-to-speech services over WebSockets. + + This class provides methods to establish a WebSocket connection for TTS synthesis and handle real-time TTS synthesis events. + + Args: + config (DeepgramClientOptions): all the options for the client. + """ + + _logger: verboselogs.VerboseLogger + _config: DeepgramClientOptions + _endpoint: str + _websocket_url: str + + _socket: WebSocketClientProtocol + _event_handlers: Dict[SpeakWebSocketEvents, list] + + _listen_thread: Union[asyncio.Task, None] + _flush_thread: Union[asyncio.Task, None] + _last_datagram: Optional[datetime] = None + _flush_count: int + + _kwargs: Optional[Dict] = None + _addons: Optional[Dict] = None + _options: Optional[Dict] = None + _headers: Optional[Dict] = None + + _speaker: Optional[Speaker] = None + + def __init__(self, config: DeepgramClientOptions): + self._logger = verboselogs.VerboseLogger(__name__) + self._logger.addHandler(logging.StreamHandler()) + self._logger.setLevel(config.verbose) + + self._config = config + self._endpoint = "v1/speak" + + self._listen_thread = None + self._flush_thread = None + + # events + self._exit_event = asyncio.Event() + + # flush + self._last_datagram = None + self._flush_count = 0 + + # init handlers + self._event_handlers = { + event: [] for event in SpeakWebSocketEvents.__members__.values() + } + self._websocket_url = convert_to_websocket_url(self._config.url, self._endpoint) + + if self._config.options.get("speaker_playback") == "true": + self._logger.info("speaker_playback is enabled") + self._speaker = Speaker() + + # pylint: disable=too-many-branches,too-many-statements + async def start( + self, + options: Optional[Union[SpeakWSOptions, Dict]] = None, + addons: Optional[Dict] = None, + headers: Optional[Dict] = None, + members: Optional[Dict] = None, + **kwargs, + ) -> bool: + """ + Starts the WebSocket connection for text-to-speech synthesis. + """ + self._logger.debug("AsyncSpeakStreamClient.start ENTER") + self._logger.info("options: %s", options) + self._logger.info("addons: %s", addons) + self._logger.info("headers: %s", headers) + self._logger.info("members: %s", members) + self._logger.info("kwargs: %s", kwargs) + + if isinstance(options, SpeakWSOptions) and not options.check(): + self._logger.error("options.check failed") + self._logger.debug("SpeakStreamClient.start LEAVE") + raise DeepgramError("Fatal text-to-speech options error") + + self._addons = addons + self._headers = headers + + # add "members" as members of the class + if members is not None: + self.__dict__.update(members) + + # set kwargs as members of the class + if kwargs is not None: + self._kwargs = kwargs + else: + self._kwargs = {} + + if isinstance(options, SpeakWSOptions): + self._logger.info("SpeakWSOptions switching class -> dict") + self._options = options.to_dict() + elif options is not None: + self._options = options + else: + self._options = {} + + combined_options = self._options + if self._addons is not None: + self._logger.info("merging addons to options") + combined_options.update(self._addons) + self._logger.info("new options: %s", combined_options) + self._logger.debug("combined_options: %s", combined_options) + + combined_headers = self._config.headers + if self._headers is not None: + self._logger.info("merging headers to options") + combined_headers.update(self._headers) + self._logger.info("new headers: %s", combined_headers) + self._logger.debug("combined_headers: %s", combined_headers) + + url_with_params = append_query_params(self._websocket_url, combined_options) + + try: + self._socket = await websockets.connect( + url_with_params, extra_headers=combined_headers + ) + self._exit_event.clear() + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + # listen thread + if self._speaker is not None: + self._logger.notice("speaker_playback is enabled") + self._speaker.set_pull_callback(self._socket.recv) + self._speaker.set_push_callback(self._process_message) + self._speaker.start() + else: + self._logger.notice("create _listening thread") + self._listen_thread = asyncio.create_task(self._listening()) + + # flush thread + if self._config.is_auto_flush_speak_enabled(): + self._logger.notice("autoflush is enabled") + self._flush_thread = asyncio.create_task(self._flush()) + else: + self._logger.notice("autoflush is disabled") + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + # push open event + await self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Open), + OpenResponse(type=SpeakWebSocketEvents.Open), + ) + + self._logger.notice("start succeeded") + self._logger.debug("AsyncSpeakStreamClient.start LEAVE") + return True + + except websockets.ConnectionClosed as e: + self._logger.notice( + "ConnectionClosed in AsyncSpeakStreamClient.start: %s", e + ) + self._logger.debug("AsyncSpeakStreamClient.start LEAVE") + if self._config.options.get("termination_exception_connect") == "true": + raise + return False + except websockets.exceptions.WebSocketException as e: + self._logger.error( + "WebSocketException in AsyncSpeakStreamClient.start: %s", e + ) + self._logger.debug("AsyncSpeakStreamClient.start LEAVE") + if self._config.options.get("termination_exception_connect") == "true": + raise + return False + except Exception as e: # pylint: disable=broad-except + self._logger.error( + "WebSocketException in AsyncSpeakStreamClient.start: %s", e + ) + self._logger.debug("AsyncSpeakStreamClient.start LEAVE") + if self._config.options.get("termination_exception_connect") == "true": + raise + return False + + # pylint: enable=too-many-branches,too-many-statements + + async def is_connected(self) -> bool: + """ + Returns the connection status of the WebSocket. + """ + return self._socket is not None + + def on(self, event: SpeakWebSocketEvents, handler) -> None: + """ + Registers event handlers for specific events. + """ + self._logger.info("event subscribed: %s", event) + if event in SpeakWebSocketEvents.__members__.values() and callable(handler): + if handler not in self._event_handlers[event]: + self._event_handlers[event].append(handler) + + # triggers the registered event handlers for a specific event + async def _emit(self, event: SpeakWebSocketEvents, *args, **kwargs) -> None: + """ + Emits events to the registered event handlers. + """ + self._logger.debug("AsyncSpeakStreamClient._emit ENTER") + self._logger.debug("callback handlers for: %s", event) + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + tasks = [] + for handler in self._event_handlers[event]: + task = asyncio.create_task(handler(self, *args, **kwargs)) + tasks.append(task) + + if tasks: + self._logger.debug("waiting for tasks to finish...") + await asyncio.gather(*tasks, return_exceptions=True) + tasks.clear() + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + self._logger.debug("AsyncSpeakStreamClient._emit LEAVE") + + # pylint: disable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches + async def _listening(self) -> None: + """ + Listens for messages from the WebSocket connection. + """ + self._logger.debug("AsyncSpeakStreamClient._listening ENTER") + + while True: + try: + if self._exit_event.is_set(): + self._logger.notice("_listening exiting gracefully") + self._logger.debug("AsyncSpeakStreamClient._listening LEAVE") + return + + if self._socket is None: + self._logger.warning("socket is empty") + self._logger.debug("AsyncSpeakStreamClient._listening LEAVE") + return + + message = await self._socket.recv() + + if message is None: + self._logger.spam("message is None") + continue + + if isinstance(message, bytes): + self._logger.debug("Binary data received") + + await self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.AudioData), + data=message, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + else: + self._logger.debug("Text data received") + await self._process_message(message) + + except websockets.exceptions.ConnectionClosedOK as e: + self._logger.notice(f"_listening({e.code}) exiting gracefully") + self._logger.debug("AsyncSpeakStreamClient._listening LEAVE") + return + + except websockets.exceptions.ConnectionClosed as e: + if e.code in [1000, 1001]: + self._logger.notice(f"_listening({e.code}) exiting gracefully") + self._logger.debug("AsyncSpeakStreamClient._listening LEAVE") + return + + # no need to call self._signal_exit() here because we are already closed + # note: this is different than the listen websocket client + self._logger.notice( + "ConnectionClosed in AsyncSpeakStreamClient._listening with code %s: %s", + e.code, + e.reason, + ) + self._logger.debug("AsyncSpeakStreamClient._listening LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + except websockets.exceptions.WebSocketException as e: + self._logger.error( + "WebSocketException in AsyncSpeakStreamClient._listening: %s", e + ) + ws_error: ErrorResponse = ErrorResponse( + "WebSocketException in AsyncSpeakStreamClient._listening", + f"{e}", + "WebSocketException", + ) + await self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Error), + error=ws_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + + # signal exit and close + await self._signal_exit() + + self._logger.debug("AsyncSpeakStreamClient._listening LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + except Exception as e: # pylint: disable=broad-except + self._logger.error( + "Exception in AsyncSpeakStreamClient._listening: %s", e + ) + e_error: ErrorResponse = ErrorResponse( + "Exception in AsyncSpeakStreamClient._listening", + f"{e}", + "Exception", + ) + await self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Error), + error=e_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + + # signal exit and close + await self._signal_exit() + + self._logger.debug("AsyncSpeakStreamClient._listening LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + async def _process_message(self, message: str) -> None: + self._logger.debug("AsyncSpeakStreamClient._process_message ENTER") + + if len(message) == 0: + self._logger.debug("message is empty") + self._logger.debug("AsyncSpeakStreamClient._process_message LEAVE") + return + + try: + data = json.loads(message) + response_type = data.get("type") + self._logger.debug("response_type: %s, data: %s", response_type, data) + + match response_type: + case SpeakWebSocketEvents.Open: + open_result: OpenResponse = OpenResponse.from_json(message) + self._logger.verbose("OpenResponse: %s", open_result) + await self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Open), + open=open_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case SpeakWebSocketEvents.Metadata: + meta_result: MetadataResponse = MetadataResponse.from_json(message) + self._logger.verbose("MetadataResponse: %s", meta_result) + await self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Metadata), + metadata=meta_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case SpeakWebSocketEvents.Flush: + fl_result: FlushedResponse = FlushedResponse.from_json(message) + self._logger.verbose("FlushedResponse: %s", fl_result) + + # auto flush + if self._config.is_inspecting_speak(): + self._flush_count -= 1 + self._logger.debug( + "Decrement AutoFlush count: %d", + self._flush_count, + ) + + await self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Flush), + flushed=fl_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case SpeakWebSocketEvents.Close: + close_result: CloseResponse = CloseResponse.from_json(message) + self._logger.verbose("CloseResponse: %s", close_result) + await self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Close), + close=close_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case SpeakWebSocketEvents.Warning: + war_warning: WarningResponse = WarningResponse.from_json(message) + self._logger.verbose("WarningResponse: %s", war_warning) + await self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Warning), + warning=war_warning, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case SpeakWebSocketEvents.Error: + err_error: ErrorResponse = ErrorResponse.from_json(message) + self._logger.verbose("ErrorResponse: %s", err_error) + await self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Error), + error=err_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case _: + self._logger.warning( + "Unknown Message: response_type: %s, data: %s", + response_type, + data, + ) + unhandled_error: UnhandledResponse = UnhandledResponse( + type=SpeakWebSocketEvents(SpeakWebSocketEvents.Unhandled), + raw=message, + ) + await self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Unhandled), + unhandled=unhandled_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + + self._logger.notice("_process_message Succeeded") + self._logger.debug("AsyncSpeakStreamClient._process_message LEAVE") + + except Exception as e: # pylint: disable=broad-except + self._logger.error( + "Exception in AsyncSpeakStreamClient._process_message: %s", e + ) + e_error: ErrorResponse = ErrorResponse( + "Exception in AsyncSpeakStreamClient._process_message", + f"{e}", + "Exception", + ) + await self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Error), + error=e_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + + # signal exit and close + await self._signal_exit() + + self._logger.debug("AsyncSpeakStreamClient._process_message LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + # pylint: disable=too-many-return-statements + + ## pylint: disable=too-many-return-statements,too-many-statements + async def _flush(self) -> None: + self._logger.debug("AsyncSpeakStreamClient._flush ENTER") + + delta_in_ms_str = self._config.options.get("auto_flush_speak_delta") + if delta_in_ms_str is None: + self._logger.error("auto_flush_speak_delta is None") + self._logger.debug("AsyncSpeakStreamClient._flush LEAVE") + return + delta_in_ms = float(delta_in_ms_str) + + while True: + try: + await asyncio.sleep(HALF_SECOND) + + if self._exit_event.is_set(): + self._logger.notice("_flush exiting gracefully") + self._logger.debug("AsyncSpeakStreamClient._flush LEAVE") + return + + if self._socket is None: + self._logger.notice("socket is None, exiting flush") + self._logger.debug("AsyncSpeakStreamClient._flush LEAVE") + return + + if self._last_datagram is None: + self._logger.debug("AutoFlush last_datagram is None") + continue + + delta = datetime.now() - self._last_datagram + diff_in_ms = delta.total_seconds() * 1000 + self._logger.debug("AutoFlush delta: %f", diff_in_ms) + if diff_in_ms < delta_in_ms: + self._logger.debug("AutoFlush delta is less than threshold") + continue + + await self.flush() + + except websockets.exceptions.ConnectionClosedOK as e: + self._logger.notice(f"_flush({e.code}) exiting gracefully") + self._logger.debug("AsyncSpeakStreamClient._flush LEAVE") + return + + except websockets.exceptions.ConnectionClosed as e: + if e.code in [1000, 1001]: + self._logger.notice(f"_flush({e.code}) exiting gracefully") + self._logger.debug("AsyncSpeakStreamClient._flush LEAVE") + return + + # no need to call self._signal_exit() here because we are already closed + # note: this is different than the listen websocket client + self._logger.notice( + "ConnectionClosed in AsyncSpeakStreamClient._listening with code %s: %s", + e.code, + e.reason, + ) + self._logger.debug("AsyncSpeakStreamClient._flush LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + except websockets.exceptions.WebSocketException as e: + self._logger.error( + "WebSocketException in AsyncSpeakStreamClient._flush: %s", e + ) + ws_error: ErrorResponse = ErrorResponse( + "WebSocketException in AsyncSpeakStreamClient._flush", + f"{e}", + "Exception", + ) + await self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Error), + error=ws_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + + # signal exit and close + await self._signal_exit() + + self._logger.debug("AsyncSpeakStreamClient._flush LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + except Exception as e: # pylint: disable=broad-except + self._logger.error("Exception in AsyncSpeakStreamClient._flush: %s", e) + e_error: ErrorResponse = ErrorResponse( + "Exception in AsyncSpeakStreamClient._flush", + f"{e}", + "Exception", + ) + self._logger.error( + "Exception in AsyncSpeakStreamClient._flush: %s", str(e) + ) + await self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Error), + error=e_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + + # signal exit and close + await self._signal_exit() + + self._logger.debug("AsyncSpeakStreamClient._flush LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + # pylint: enable=too-many-return-statements + + async def send_text(self, text_input: str) -> bool: + """ + Sends text to the WebSocket connection to generate audio. + + Args: + text_input (str): The raw text to be synthesized. This function will automatically wrap + the text in a JSON object of type "Speak" with the key "text". + + Returns: + bool: True if the text was successfully sent, False otherwise. + """ + return await self.send_raw(json.dumps({"type": "Speak", "text": text_input})) + + async def send(self, text_input: str) -> bool: + """ + Alias for send_text. Please see send_text for more information. + """ + return await self.send_text(text_input) + + # pylint: disable=unused-argument + async def send_control( + self, msg_type: Union[SpeakWebSocketMessage, str], data: Optional[str] = "" + ) -> bool: + """ + Sends a control message consisting of type SpeakWebSocketEvents over the WebSocket connection. + + Args: + msg_type (SpeakWebSocketEvents): The type of control message to send. + (Optional) data (str): The data to send with the control message. + + Returns: + bool: True if the control message was successfully sent, False otherwise. + """ + control_msg = json.dumps({"type": msg_type}) + return await self.send_raw(control_msg) + + # pylint: enable=unused-argument + + # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements + async def send_raw(self, msg: str) -> bool: + """ + Sends a raw/control message over the WebSocket connection. This message must contain a valid JSON object. + + Args: + msg (str): The raw message to send over the WebSocket connection. + + Returns: + bool: True if the message was successfully sent, False otherwise. + """ + self._logger.spam("AsyncSpeakStreamClient.send_raw ENTER") + + if self._exit_event.is_set(): + self._logger.notice("send_raw exiting gracefully") + self._logger.debug("AsyncSpeakStreamClient.send_raw LEAVE") + return False + + if not await self.is_connected(): + self._logger.notice("is_connected is False") + self._logger.debug("AsyncListenWebSocketClient.send LEAVE") + return False + + if self._config.is_inspecting_speak(): + try: + _tmp_json = json.loads(msg) + if "type" in _tmp_json: + self._logger.debug( + "Inspecting Message: Sending %s", _tmp_json["type"] + ) + match _tmp_json["type"]: + case SpeakWebSocketMessage.Speak: + inspect_res = await self._inspect() + if not inspect_res: + self._logger.error("inspect_res failed") + case SpeakWebSocketMessage.Flush: + self._last_datagram = None + self._flush_count += 1 + self._logger.debug( + "Increment Flush count: %d", self._flush_count + ) + except Exception as e: # pylint: disable=broad-except + self._logger.error("send_raw() failed - Exception: %s", str(e)) + + if self._socket is not None: + try: + await self._socket.send(msg) + except websockets.exceptions.ConnectionClosedOK as e: + self._logger.notice(f"send_raw() exiting gracefully: {e.code}") + self._logger.debug("AsyncSpeakStreamClient.send_raw LEAVE") + if self._config.options.get("termination_exception_send") == "true": + raise + return True + except websockets.exceptions.ConnectionClosed as e: + if e.code in [1000, 1001]: + self._logger.notice(f"send_raw({e.code}) exiting gracefully") + self._logger.debug("AsyncSpeakStreamClient.send_raw LEAVE") + if self._config.options.get("termination_exception_send") == "true": + raise + return True + + self._logger.error("send_raw() failed - ConnectionClosed: %s", str(e)) + self._logger.spam("AsyncSpeakStreamClient.send_raw LEAVE") + if self._config.options.get("termination_exception_send") == "true": + raise + return False + except websockets.exceptions.WebSocketException as e: + self._logger.error("send_raw() failed - WebSocketException: %s", str(e)) + self._logger.spam("AsyncSpeakStreamClient.send_raw LEAVE") + if self._config.options.get("termination_exception_send") == "true": + raise + return False + except Exception as e: # pylint: disable=broad-except + self._logger.error("send_raw() failed - Exception: %s", str(e)) + self._logger.spam("AsyncSpeakStreamClient.send_raw LEAVE") + if self._config.options.get("termination_exception_send") == "true": + raise + return False + + self._logger.spam("send_raw() succeeded") + self._logger.spam("AsyncSpeakStreamClient.send_raw LEAVE") + return True + + self._logger.spam("send_raw() failed. socket is None") + self._logger.spam("AsyncSpeakStreamClient.send_raw LEAVE") + return False + + # pylint: enable=too-many-return-statements,too-many-branches + + async def flush(self) -> bool: + """ + Flushes the current buffer and returns generated audio + """ + self._logger.spam("AsyncSpeakStreamClient.flush ENTER") + + if self._exit_event.is_set(): + self._logger.notice("flush exiting gracefully") + self._logger.debug("AsyncSpeakStreamClient.flush LEAVE") + return False + + if self._socket is None: + self._logger.notice("socket is not intialized") + self._logger.debug("AsyncSpeakStreamClient.flush LEAVE") + return False + + self._logger.notice("Sending Flush...") + ret = await self.send_control(SpeakWebSocketMessage.Flush) + + if not ret: + self._logger.error("flush failed") + self._logger.spam("AsyncSpeakStreamClient.flush LEAVE") + return False + + self._logger.notice("flush succeeded") + self._logger.spam("AsyncSpeakStreamClient.flush LEAVE") + + return True + + async def finish(self) -> bool: + """ + Closes the WebSocket connection gracefully. + """ + self._logger.debug("AsyncSpeakStreamClient.finish ENTER") + + # signal exit + await self._signal_exit() + + # stop the threads + self._logger.verbose("cancelling tasks...") + try: + # Before cancelling, check if the tasks were created + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("before running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + tasks = [] + + if self._speaker is not None: + self._logger.notice("stopping speaker...") + self._speaker.finish() + self._speaker = None + self._logger.notice("speaker stopped") + + if self._flush_thread is not None: + self._logger.notice("cancelling _flush_thread...") + self._flush_thread.cancel() + tasks.append(self._flush_thread) + self._logger.notice("_flush_thread cancelled") + + if self._listen_thread is not None: + self._logger.notice("cancelling _listen_thread...") + self._listen_thread.cancel() + tasks.append(self._listen_thread) + self._logger.notice("_listen_thread cancelled") + + # Use asyncio.gather to wait for tasks to be cancelled + await asyncio.wait_for( + asyncio.gather(*tasks), timeout=10 + ) # Prevent indefinite waiting + self._logger.notice("threads joined") + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + self._logger.notice("finish succeeded") + self._logger.spam("AsyncSpeakStreamClient.finish LEAVE") + return True + + except asyncio.CancelledError as e: + self._logger.error("tasks cancelled error: %s", e) + self._logger.debug("AsyncSpeakStreamClient.finish LEAVE") + return False + + except asyncio.TimeoutError as e: + self._logger.error("tasks cancellation timed out: %s", e) + self._logger.debug("AsyncSpeakStreamClient.finish LEAVE") + return False + + async def _signal_exit(self) -> None: + # send close event + self._logger.verbose("closing socket...") + if self._socket is not None: + self._logger.verbose("send CloseStream...") + try: + # if the socket connection is closed, the following line might throw an error + # need to explicitly use _socket.send (dont use self.send_raw) + await self._socket.send(json.dumps({"type": "CloseStream"})) + except websockets.exceptions.ConnectionClosedOK as e: + self._logger.notice("_signal_exit - ConnectionClosedOK: %s", e.code) + except websockets.exceptions.ConnectionClosed as e: + self._logger.error("_signal_exit - ConnectionClosed: %s", e.code) + except websockets.exceptions.WebSocketException as e: + self._logger.error("_signal_exit - WebSocketException: %s", str(e)) + except Exception as e: # pylint: disable=broad-except + self._logger.error("_signal_exit - Exception: %s", str(e)) + + # close the socket + if self._socket is not None: + await self._socket.close() + + # push close event + try: + await self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Close), + close=CloseResponse(type=SpeakWebSocketEvents.Close), + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + except Exception as e: # pylint: disable=broad-except + self._logger.error("_emit - Exception: %s", e) + + # wait for task to send + await asyncio.sleep(0.5) + + # signal exit + self._exit_event.set() + + # closes the WebSocket connection gracefully + self._logger.verbose("clean up socket...") + if self._socket is not None: + self._logger.verbose("socket.wait_closed...") + try: + await self._socket.close() + except websockets.exceptions.WebSocketException as e: + self._logger.error("socket.wait_closed failed: %s", e) + + self._socket = None # type: ignore + + async def _inspect(self) -> bool: + # auto flush_inspect is generically used to track any messages you might want to snoop on + # place additional logic here to inspect messages of interest + + # for auto flush functionality + # set the last datagram + self._last_datagram = datetime.now() + self._logger.debug( + "AutoFlush last received: %s", + str(self._last_datagram), + ) + + return True + + +class AsyncSpeakWebSocketClient(AsyncSpeakWSClient): + """ + AsyncSpeakWebSocketClient is an alias for AsyncSpeakWSClient. + """ diff --git a/deepgram/clients/speak/v1/websocket/client.py b/deepgram/clients/speak/v1/websocket/client.py new file mode 100644 index 00000000..4bf4db29 --- /dev/null +++ b/deepgram/clients/speak/v1/websocket/client.py @@ -0,0 +1,854 @@ +# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + +import json +import time +import logging +from typing import Dict, Union, Optional, cast, Any +from datetime import datetime +import threading + +from websockets.sync.client import connect, ClientConnection +import websockets + +from .....utils import verboselogs +from .....options import DeepgramClientOptions +from ...enums import SpeakWebSocketEvents, SpeakWebSocketMessage +from .helpers import convert_to_websocket_url, append_query_params +from ...errors import DeepgramError + +from .response import ( + OpenResponse, + MetadataResponse, + FlushedResponse, + CloseResponse, + WarningResponse, + ErrorResponse, + UnhandledResponse, +) +from .options import SpeakWSOptions + +from .....audio.speaker import Speaker + +ONE_SECOND = 1 +HALF_SECOND = 0.5 +DEEPGRAM_INTERVAL = 5 +PING_INTERVAL = 20 + + +class SpeakWSClient: # pylint: disable=too-many-instance-attributes + """ + Client for interacting with Deepgram's text-to-speech services over WebSockets. + + This class provides methods to establish a WebSocket connection for TTS synthesis and handle real-time TTS synthesis events. + + Args: + config (DeepgramClientOptions): all the options for the client. + """ + + _logger: verboselogs.VerboseLogger + _config: DeepgramClientOptions + _endpoint: str + _websocket_url: str + + _socket: ClientConnection + _exit_event: threading.Event + _lock_send: threading.Lock + _event_handlers: Dict[SpeakWebSocketEvents, list] + + _listen_thread: Union[threading.Thread, None] + _flush_thread: Union[threading.Thread, None] + _lock_flush: threading.Lock + _last_datagram: Optional[datetime] = None + _flush_count: int + + _kwargs: Optional[Dict] = None + _addons: Optional[Dict] = None + _options: Optional[Dict] = None + _headers: Optional[Dict] = None + + _speaker: Optional[Speaker] = None + + def __init__(self, config: DeepgramClientOptions): + self._logger = verboselogs.VerboseLogger(__name__) + self._logger.addHandler(logging.StreamHandler()) + self._logger.setLevel(config.verbose) + + self._config = config + self._endpoint = "v1/speak" + self._lock_send = threading.Lock() + self._lock_flush = threading.Lock() + + self._listen_thread = None + self._flush_thread = None + + # exit + self._exit_event = threading.Event() + + # flush + self._last_datagram = None + self._flush_count = 0 + + # init handlers + self._event_handlers = { + event: [] for event in SpeakWebSocketEvents.__members__.values() + } + self._websocket_url = convert_to_websocket_url(self._config.url, self._endpoint) + + if self._config.options.get("speaker_playback") == "true": + self._logger.info("speaker_playback is enabled") + self._speaker = Speaker() + + # pylint: disable=too-many-statements,too-many-branches + def start( + self, + options: Optional[Union[SpeakWSOptions, Dict]] = None, + addons: Optional[Dict] = None, + headers: Optional[Dict] = None, + members: Optional[Dict] = None, + **kwargs, + ) -> bool: + """ + Starts the WebSocket connection for text-to-speech synthesis. + """ + self._logger.debug("SpeakStreamClient.start ENTER") + self._logger.info("options: %s", options) + self._logger.info("addons: %s", addons) + self._logger.info("headers: %s", headers) + self._logger.info("members: %s", members) + self._logger.info("kwargs: %s", kwargs) + + if isinstance(options, SpeakWSOptions) and not options.check(): + self._logger.error("options.check failed") + self._logger.debug("SpeakStreamClient.start LEAVE") + raise DeepgramError("Fatal text-to-speech options error") + + self._addons = addons + self._headers = headers + + # add "members" as members of the class + if members is not None: + self.__dict__.update(members) + + # set kwargs as members of the class + if kwargs is not None: + self._kwargs = kwargs + else: + self._kwargs = {} + + if isinstance(options, SpeakWSOptions): + self._logger.info("SpeakWSOptions switching class -> dict") + self._options = options.to_dict() + elif options is not None: + self._options = options + else: + self._options = {} + + combined_options = self._options + if self._addons is not None: + self._logger.info("merging addons to options") + combined_options.update(self._addons) + self._logger.info("new options: %s", combined_options) + self._logger.debug("combined_options: %s", combined_options) + + combined_headers = self._config.headers + if self._headers is not None: + self._logger.info("merging headers to options") + combined_headers.update(self._headers) + self._logger.info("new headers: %s", combined_headers) + self._logger.debug("combined_headers: %s", combined_headers) + + url_with_params = append_query_params(self._websocket_url, combined_options) + try: + self._socket = connect(url_with_params, additional_headers=combined_headers) + self._exit_event.clear() + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + # listening thread + if self._speaker is not None: + self._logger.notice("speaker_playback is enabled") + self._speaker.set_pull_callback(self._socket.recv) + self._speaker.set_push_callback(self._process_message) + self._speaker.start() + else: + self._logger.notice("create _listening thread") + self._listen_thread = threading.Thread(target=self._listening) + self._listen_thread.start() + + # flush thread + if self._config.is_auto_flush_speak_enabled(): + self._logger.notice("autoflush is enabled") + self._flush_thread = threading.Thread(target=self._flush) + self._flush_thread.start() + else: + self._logger.notice("autoflush is disabled") + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("after running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + # push open event + self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Open), + OpenResponse(type=SpeakWebSocketEvents.Open), + ) + + self._logger.notice("start succeeded") + self._logger.debug("SpeakStreamClient.start LEAVE") + return True + + except websockets.ConnectionClosed as e: + self._logger.error("ConnectionClosed in SpeakStreamClient.start: %s", e) + self._logger.debug("SpeakStreamClient.start LEAVE") + if self._config.options.get("termination_exception_connect") == "true": + raise e + return False + except websockets.exceptions.WebSocketException as e: + self._logger.error("WebSocketException in SpeakStreamClient.start: %s", e) + self._logger.debug("SpeakStreamClient.start LEAVE") + if self._config.options.get("termination_exception_connect") == "true": + raise e + return False + except Exception as e: # pylint: disable=broad-except + self._logger.error("WebSocketException in SpeakStreamClient.start: %s", e) + self._logger.debug("SpeakStreamClient.start LEAVE") + if self._config.options.get("termination_exception_connect") == "true": + raise e + return False + + def is_connected(self) -> bool: + """ + Returns the connection status of the WebSocket. + """ + return self._socket is not None + + # pylint: enable=too-many-statements,too-many-branches + + def on( + self, event: SpeakWebSocketEvents, handler + ) -> None: # registers event handlers for specific events + """ + Registers event handlers for specific events. + """ + self._logger.info("event subscribed: %s", event) + if event in SpeakWebSocketEvents.__members__.values() and callable(handler): + self._event_handlers[event].append(handler) + + def _emit(self, event: SpeakWebSocketEvents, *args, **kwargs) -> None: + """ + Emits events to the registered event handlers. + """ + self._logger.debug("callback handlers for: %s", event) + for handler in self._event_handlers[event]: + handler(self, *args, **kwargs) + + # pylint: disable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches + def _listening( + self, + ) -> None: + """ + Listens for messages from the WebSocket connection. + """ + self._logger.debug("SpeakStreamClient._listening ENTER") + + while True: + try: + if self._exit_event.is_set(): + self._logger.notice("_listening exiting gracefully") + self._logger.debug("SpeakStreamClient._listening LEAVE") + return + + if self._socket is None: + self._logger.warning("socket is empty") + self._logger.debug("SpeakStreamClient._listening LEAVE") + return + + message = self._socket.recv() + + if message is None: + self._logger.info("message is empty") + continue + + if isinstance(message, bytes): + self._logger.debug("Binary data received") + + self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.AudioData), + data=message, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + else: + self._logger.debug("Text data received") + self._process_message(message) + + except websockets.exceptions.ConnectionClosedOK as e: + self._logger.notice(f"_listening({e.code}) exiting gracefully") + self._logger.debug("SpeakStreamClient._listening LEAVE") + return + + except websockets.exceptions.ConnectionClosed as e: + if e.code in [1000, 1001]: + self._logger.notice(f"_listening({e.code}) exiting gracefully") + self._logger.debug("SpeakStreamClient._listening LEAVE") + return + + # no need to call self._signal_exit() here because we are already closed + # note: this is different than the listen websocket client + self._logger.notice( + "ConnectionClosed in SpeakStreamClient._listening with code %s: %s", + e.code, + e.reason, + ) + self._logger.debug("SpeakStreamClient._listening LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + except websockets.exceptions.WebSocketException as e: + self._logger.error( + "WebSocketException in SpeakStreamClient._listening with: %s", e + ) + ws_error: ErrorResponse = ErrorResponse( + "WebSocketException in SpeakStreamClient._listening", + f"{e}", + "WebSocketException", + ) + self._emit(SpeakWebSocketEvents(SpeakWebSocketEvents.Error), ws_error) + + # signal exit and close + self._signal_exit() + + self._logger.debug("SpeakStreamClient._listening LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + except Exception as e: # pylint: disable=broad-except + self._logger.error("Exception in SpeakStreamClient._listening: %s", e) + e_error: ErrorResponse = ErrorResponse( + "Exception in SpeakStreamClient._listening", + f"{e}", + "Exception", + ) + self._logger.error( + "Exception in SpeakStreamClient._listening: %s", str(e) + ) + self._emit(SpeakWebSocketEvents(SpeakWebSocketEvents.Error), e_error) + + # signal exit and close + self._signal_exit() + + self._logger.debug("SpeakStreamClient._listening LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + def _process_message(self, message: str) -> None: + try: + self._logger.debug("SpeakStreamClient._process_message ENTER") + + if len(message) == 0: + self._logger.debug("message is empty") + self._logger.debug("SpeakStreamClient._process_message LEAVE") + return + + data = json.loads(message) + response_type = data.get("type") + self._logger.debug("response_type: %s, data: %s", response_type, data) + + match response_type: + case SpeakWebSocketEvents.Open: + open_result: OpenResponse = OpenResponse.from_json(message) + self._logger.verbose("OpenResponse: %s", open_result) + self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Open), + open=open_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case SpeakWebSocketEvents.Metadata: + meta_result: MetadataResponse = MetadataResponse.from_json(message) + self._logger.verbose("MetadataResponse: %s", meta_result) + self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Metadata), + metadata=meta_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case SpeakWebSocketEvents.Flush: + fl_result: FlushedResponse = FlushedResponse.from_json(message) + self._logger.verbose("FlushedResponse: %s", fl_result) + + # auto flush + if self._config.is_inspecting_speak(): + with self._lock_flush: + self._flush_count -= 1 + self._logger.debug( + "Decrement Flush count: %d", + self._flush_count, + ) + + self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Flush), + flushed=fl_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case SpeakWebSocketEvents.Close: + close_result: CloseResponse = CloseResponse.from_json(message) + self._logger.verbose("CloseResponse: %s", close_result) + self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Close), + close=close_result, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case SpeakWebSocketEvents.Warning: + war_warning: WarningResponse = WarningResponse.from_json(message) + self._logger.verbose("WarningResponse: %s", war_warning) + self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Warning), + warning=war_warning, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case SpeakWebSocketEvents.Error: + err_error: ErrorResponse = ErrorResponse.from_json(message) + self._logger.verbose("ErrorResponse: %s", err_error) + self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Error), + error=err_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + case _: + self._logger.warning( + "Unknown Message: response_type: %s, data: %s", + response_type, + data, + ) + unhandled_error: UnhandledResponse = UnhandledResponse( + type=SpeakWebSocketEvents(SpeakWebSocketEvents.Unhandled), + raw=message, + ) + self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Unhandled), + unhandled=unhandled_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + + self._logger.notice("_process_message Succeeded") + self._logger.debug("SpeakStreamClient._process_message LEAVE") + + except Exception as e: # pylint: disable=broad-except + self._logger.error("Exception in SpeakStreamClient._listening: %s", e) + e_error: ErrorResponse = ErrorResponse( + "Exception in SpeakStreamClient._listening", + f"{e}", + "Exception", + ) + self._logger.error("Exception in SpeakStreamClient._listening: %s", str(e)) + self._emit(SpeakWebSocketEvents(SpeakWebSocketEvents.Error), e_error) + + # signal exit and close + self._signal_exit() + + self._logger.debug("SpeakStreamClient._listening LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + # pylint: enable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches + + # pylint: disable=too-many-return-statements,too-many-statements,too-many-branches + def _flush(self) -> None: + self._logger.debug("SpeakStreamClient._flush ENTER") + + delta_in_ms_str = self._config.options.get("auto_flush_speak_delta") + if delta_in_ms_str is None: + self._logger.error("auto_flush_speak_delta is None") + self._logger.debug("SpeakStreamClient._flush LEAVE") + return + delta_in_ms = float(delta_in_ms_str) + + counter = 0 + while True: + try: + counter += 1 + self._exit_event.wait(timeout=HALF_SECOND) + + if self._exit_event.is_set(): + self._logger.notice("_flush exiting gracefully") + self._logger.debug("ListenWebSocketClient._flush LEAVE") + return + + if self._socket is None: + self._logger.notice("socket is None, exiting keep_alive") + self._logger.debug("ListenWebSocketClient._flush LEAVE") + return + + if self._last_datagram is None: + self._logger.debug("AutoFlush last_datagram is None") + continue + + with self._lock_flush: + delta = datetime.now() - self._last_datagram + diff_in_ms = delta.total_seconds() * 1000 + self._logger.debug("AutoFlush delta: %f", diff_in_ms) + if diff_in_ms < delta_in_ms: + self._logger.debug("AutoFlush delta is less than threshold") + continue + + self.flush() + + except websockets.exceptions.ConnectionClosedOK as e: + self._logger.notice(f"_flush({e.code}) exiting gracefully") + self._logger.debug("SpeakStreamClient._flush LEAVE") + return + + except websockets.exceptions.ConnectionClosed as e: + if e.code in [1000, 1001]: + self._logger.notice(f"_flush({e.code}) exiting gracefully") + self._logger.debug("SpeakStreamClient._flush LEAVE") + return + + # no need to call self._signal_exit() here because we are already closed + # note: this is different than the listen websocket client + self._logger.notice( + "ConnectionClosed in SpeakStreamClient._flush with code %s: %s", + e.code, + e.reason, + ) + self._logger.debug("SpeakStreamClient._flush LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + except websockets.exceptions.WebSocketException as e: + self._logger.error( + "WebSocketException in SpeakStreamClient._flush: %s", e + ) + ws_error: ErrorResponse = ErrorResponse( + "WebSocketException in SpeakStreamClient._flush", + f"{e}", + "Exception", + ) + self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Error), + error=ws_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + + # signal exit and close + self._signal_exit() + + self._logger.debug("SpeakStreamClient._flush LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + except Exception as e: # pylint: disable=broad-except + self._logger.error("Exception in SpeakStreamClient._flush: %s", e) + e_error: ErrorResponse = ErrorResponse( + "Exception in SpeakStreamClient._flush", + f"{e}", + "Exception", + ) + self._logger.error("Exception in SpeakStreamClient._flush: %s", str(e)) + self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Error), + error=e_error, + **dict(cast(Dict[Any, Any], self._kwargs)), + ) + + # signal exit and close + self._signal_exit() + + self._logger.debug("SpeakStreamClient._flush LEAVE") + + if self._config.options.get("termination_exception") == "true": + raise + return + + # pylint: enable=too-many-return-statements,too-many-statements,too-many-branches + + def send_text(self, text_input: str) -> bool: + """ + Sends text to the WebSocket connection to generate audio. + + Args: + text_input (str): The raw text to be synthesized. This function will automatically wrap + the text in a JSON object of type "Speak" with the key "text". + + Returns: + bool: True if the text was successfully sent, False otherwise. + """ + return self.send_raw(json.dumps({"type": "Speak", "text": text_input})) + + def send(self, text_input: str) -> bool: + """ + Alias for send_text. Please see send_text for more information. + """ + return self.send_text(text_input) + + # pylint: disable=unused-argument + def send_control( + self, msg_type: Union[SpeakWebSocketMessage, str], data: Optional[str] = "" + ) -> bool: + """ + Sends a control message consisting of type SpeakWebSocketEvents over the WebSocket connection. + + Args: + msg_type (SpeakWebSocketEvents): The type of control message to send. + (Optional) data (str): The data to send with the control message. + + Returns: + bool: True if the control message was successfully sent, False otherwise. + """ + control_msg = json.dumps({"type": msg_type}) + return self.send_raw(control_msg) + + # pylint: enable=unused-argument + + # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements + def send_raw(self, msg: str) -> bool: + """ + Sends a raw/control message over the WebSocket connection. This message must contain a valid JSON object. + + Args: + msg (str): The raw message to send over the WebSocket connection. + + Returns: + bool: True if the message was successfully sent, False otherwise. + """ + self._logger.spam("SpeakStreamClient.send_raw ENTER") + + if self._exit_event.is_set(): + self._logger.notice("send exiting gracefully") + self._logger.debug("SpeakStreamClient.send LEAVE") + return False + + if not self.is_connected(): + self._logger.notice("is_connected is False") + self._logger.debug("AsyncListenWebSocketClient.send LEAVE") + return False + + if self._config.is_inspecting_speak(): + try: + _tmp_json = json.loads(msg) + if "type" in _tmp_json: + self._logger.debug( + "Inspecting Message: Sending %s", _tmp_json["type"] + ) + match _tmp_json["type"]: + case SpeakWebSocketMessage.Speak: + inspect_res = self._inspect() + if not inspect_res: + self._logger.error("inspect_res failed") + case SpeakWebSocketMessage.Flush: + with self._lock_flush: + self._last_datagram = None + self._flush_count += 1 + self._logger.debug( + "Increment Flush count: %d", self._flush_count + ) + except Exception as e: # pylint: disable=broad-except + self._logger.error("send_raw() failed - Exception: %s", str(e)) + + if self._socket is not None: + with self._lock_send: + try: + self._socket.send(msg) + except websockets.exceptions.ConnectionClosedOK as e: + self._logger.notice(f"send_raw() exiting gracefully: {e.code}") + self._logger.debug("SpeakStreamClient.send_raw LEAVE") + if self._config.options.get("termination_exception_send") == "true": + raise + return True + except websockets.exceptions.ConnectionClosed as e: + if e.code in [1000, 1001]: + self._logger.notice(f"send_raw({e.code}) exiting gracefully") + self._logger.debug("SpeakStreamClient.send_raw LEAVE") + if ( + self._config.options.get("termination_exception_send") + == "true" + ): + raise + return True + self._logger.error( + "send_raw() failed - ConnectionClosed: %s", str(e) + ) + self._logger.spam("SpeakStreamClient.send_raw LEAVE") + if self._config.options.get("termination_exception_send") == "true": + raise + return False + except websockets.exceptions.WebSocketException as e: + self._logger.error( + "send_raw() failed - WebSocketException: %s", str(e) + ) + self._logger.spam("SpeakStreamClient.send_raw LEAVE") + if self._config.options.get("termination_exception_send") == "true": + raise + return False + except Exception as e: # pylint: disable=broad-except + self._logger.error("send_raw() failed - Exception: %s", str(e)) + self._logger.spam("SpeakStreamClient.send_raw LEAVE") + if self._config.options.get("termination_exception_send") == "true": + raise + return False + + self._logger.spam("send_raw() succeeded") + self._logger.spam("SpeakStreamClient.send_raw LEAVE") + return True + + self._logger.spam("send_raw() failed. socket is None") + self._logger.spam("SpeakStreamClient.send_raw LEAVE") + return False + + # pylint: enable=too-many-return-statements,too-many-branches,too-many-statements + + def flush(self) -> bool: + """ + Flushes the current buffer and returns generated audio + """ + self._logger.spam("SpeakStreamClient.flush ENTER") + + if self._exit_event.is_set(): + self._logger.notice("flush exiting gracefully") + self._logger.debug("SpeakStreamClient.flush LEAVE") + return False + + if self._socket is None: + self._logger.notice("socket is not intialized") + self._logger.debug("SpeakStreamClient.flush LEAVE") + return False + + self._logger.notice("Sending Flush...") + ret = self.send_control(SpeakWebSocketMessage.Flush) + + if not ret: + self._logger.error("flush failed") + self._logger.spam("SpeakStreamClient.flush LEAVE") + return False + + self._logger.notice("flush succeeded") + self._logger.spam("SpeakStreamClient.flush LEAVE") + + return True + + # closes the WebSocket connection gracefully + def finish(self) -> bool: + """ + Closes the WebSocket connection gracefully. + """ + self._logger.spam("SpeakStreamClient.finish ENTER") + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("before running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + # signal exit + self._signal_exit() + + # stop the threads + if self._speaker is not None: + self._logger.verbose("stopping speaker...") + self._speaker.finish() + self._speaker = None + self._logger.notice("speaker stopped") + + if self._flush_thread is not None: + self._logger.verbose("cancelling _flush_thread...") + self._flush_thread.join() + self._flush_thread = None + self._logger.notice("_flush_thread joined") + + if self._listen_thread is not None: + self._logger.verbose("cancelling _listen_thread...") + self._listen_thread.join() + self._listen_thread = None + self._logger.notice("_listen_thread joined") + + # debug the threads + for thread in threading.enumerate(): + self._logger.debug("before running thread: %s", thread.name) + self._logger.debug("number of active threads: %s", threading.active_count()) + + self._logger.notice("finish succeeded") + self._logger.spam("SpeakStreamClient.finish LEAVE") + return True + + # signals the WebSocket connection to exit + def _signal_exit(self) -> None: + # closes the WebSocket connection gracefully + self._logger.notice("closing socket...") + if self._socket is not None: + self._logger.notice("sending Close...") + try: + # if the socket connection is closed, the following line might throw an error + # need to explicitly use _socket.send (dont use self.send_raw) + self._socket.send(json.dumps({"type": "CloseStream"})) + except websockets.exceptions.ConnectionClosedOK as e: + self._logger.notice("_signal_exit - ConnectionClosedOK: %s", e.code) + except websockets.exceptions.ConnectionClosed as e: + self._logger.notice("_signal_exit - ConnectionClosed: %s", e.code) + except websockets.exceptions.WebSocketException as e: + self._logger.error("_signal_exit - WebSocketException: %s", str(e)) + except Exception as e: # pylint: disable=broad-except + self._logger.error("_signal_exit - Exception: %s", str(e)) + + # close the socket + if self._socket is not None: + self._socket.close() + + # push close event + try: + self._emit( + SpeakWebSocketEvents(SpeakWebSocketEvents.Close), + CloseResponse(type=SpeakWebSocketEvents.Close), + ) + except Exception as e: # pylint: disable=broad-except + self._logger.error("_signal_exit - Exception: %s", e) + + # wait for task to send + time.sleep(0.5) + + # signal exit + self._exit_event.set() + + # closes the WebSocket connection gracefully + self._logger.verbose("clean up socket...") + if self._socket is not None: + self._logger.verbose("socket.wait_closed...") + try: + self._socket.close() + except websockets.exceptions.WebSocketException as e: + self._logger.error("socket.wait_closed failed: %s", e) + + def _inspect(self) -> bool: + # auto flush_inspect is generically used to track any messages you might want to snoop on + # place additional logic here to inspect messages of interest + + # for auto flush functionality + # set the last datagram + with self._lock_flush: + self._last_datagram = datetime.now() + self._logger.debug( + "AutoFlush last received: %s", + str(self._last_datagram), + ) + + return True + + +class SpeakWebSocketClient(SpeakWSClient): + """ + AsyncSpeakWebSocketClient is an alias for AsyncSpeakWSClient. + """ diff --git a/deepgram/clients/speak/v1/websocket/helpers.py b/deepgram/clients/speak/v1/websocket/helpers.py new file mode 100644 index 00000000..c7429acd --- /dev/null +++ b/deepgram/clients/speak/v1/websocket/helpers.py @@ -0,0 +1,53 @@ +# Copyright 2023-2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + +from urllib.parse import urlparse, urlunparse, parse_qs, urlencode +from typing import Dict, Optional +import re + + +# This function appends query parameters to a URL +def append_query_params(url: str, params: Optional[Dict] = None): + """ + Appends query parameters to a URL + """ + parsed_url = urlparse(url) + query_params = parse_qs(parsed_url.query) + + if params is not None: + for key, value in params.items(): + if value is None: + continue + if isinstance(value, bool): + value = str(value).lower() + if isinstance(value, list): + for item in value: + query_params[key] = query_params.get(key, []) + [str(item)] + else: + query_params[key] = [str(value)] + + updated_query_string = urlencode(query_params, doseq=True) + updated_url = parsed_url._replace(query=updated_query_string).geturl() + return updated_url + + +# This function converts a URL to a WebSocket URL +def convert_to_websocket_url(base_url: str, endpoint: str): + """ + Converts a URL to a WebSocket URL + """ + use_ssl = True # Default to true + if re.match(r"^https?://", base_url, re.IGNORECASE): + if "http://" in base_url: + use_ssl = False # Override to false if http:// is found + base_url = base_url.replace("https://", "").replace("http://", "") + if not re.match(r"^wss?://", base_url, re.IGNORECASE): + if use_ssl: + base_url = "wss://" + base_url + else: + base_url = "ws://" + base_url + parsed_url = urlparse(base_url) + domain = parsed_url.netloc + websocket_url = urlunparse((parsed_url.scheme, domain, endpoint, "", "", "")) + return websocket_url diff --git a/deepgram/clients/speak/v1/options.py b/deepgram/clients/speak/v1/websocket/options.py similarity index 83% rename from deepgram/clients/speak/v1/options.py rename to deepgram/clients/speak/v1/websocket/options.py index 0435986c..04a5cf33 100644 --- a/deepgram/clients/speak/v1/options.py +++ b/deepgram/clients/speak/v1/websocket/options.py @@ -9,17 +9,17 @@ from dataclasses import dataclass, field from dataclasses_json import config as dataclass_config, DataClassJsonMixin -from ....utils import verboselogs -from ...common import FileSource +from .....utils import verboselogs +from ....common import FileSource @dataclass -class SpeakOptions(DataClassJsonMixin): +class SpeakWSOptions(DataClassJsonMixin): """ Contains all the options for the SpeakOptions. Reference: - https://developers.deepgram.com/reference/text-to-speech-preview-api + https://developers.deepgram.com/reference/transform-text-to-speech-websocket """ model: Optional[str] = field( @@ -29,9 +29,9 @@ class SpeakOptions(DataClassJsonMixin): encoding: Optional[str] = field( default=None, metadata=dataclass_config(exclude=lambda f: f is None) ) - container: Optional[str] = field( - default=None, metadata=dataclass_config(exclude=lambda f: f is None) - ) + # container: Optional[str] = field( + # default=None, metadata=dataclass_config(exclude=lambda f: f is None) + # ) sample_rate: Optional[int] = field( default=None, metadata=dataclass_config(exclude=lambda f: f is None) ) @@ -65,7 +65,5 @@ def check(self): return True -SpeakRESTOptions = SpeakOptions - SpeakSource = Union[FileSource, BufferedReader] SpeakRestSource = SpeakSource diff --git a/deepgram/clients/speak/v1/websocket/response.py b/deepgram/clients/speak/v1/websocket/response.py new file mode 100644 index 00000000..60b5db22 --- /dev/null +++ b/deepgram/clients/speak/v1/websocket/response.py @@ -0,0 +1,154 @@ +# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + + +from typing import Optional +import io + +from dataclasses import dataclass, field +from dataclasses_json import config as dataclass_config, DataClassJsonMixin + +# Speak Response Types: + + +@dataclass +class OpenResponse(DataClassJsonMixin): + """ + Open Message from the Deepgram Platform + """ + + type: str = "" + + def __getitem__(self, key): + _dict = self.to_dict() + return _dict[key] + + def __setitem__(self, key, val): + self.__dict__[key] = val + + def __str__(self) -> str: + return self.to_json(indent=4) + + +@dataclass +class MetadataResponse(DataClassJsonMixin): + """ + Metadata object + """ + + request_id: str = "" + + def __getitem__(self, key): + _dict = self.to_dict() + return _dict[key] + + def __setitem__(self, key, val): + self.__dict__[key] = val + + def __str__(self) -> str: + return self.to_json(indent=4) + + +@dataclass +class FlushedResponse(DataClassJsonMixin): + """ + Flushed Message from the Deepgram Platform + """ + + type: str = "" + + def __getitem__(self, key): + _dict = self.to_dict() + return _dict[key] + + def __setitem__(self, key, val): + self.__dict__[key] = val + + def __str__(self) -> str: + return self.to_json(indent=4) + + +@dataclass +class CloseResponse(DataClassJsonMixin): + """ + Close Message from the Deepgram Platform + """ + + type: str = "" + + def __getitem__(self, key): + _dict = self.to_dict() + return _dict[key] + + def __setitem__(self, key, val): + self.__dict__[key] = val + + def __str__(self) -> str: + return self.to_json(indent=4) + + +@dataclass +class ErrorResponse(DataClassJsonMixin): + """ + Error Message from the Deepgram Platform + """ + + description: str = "" + message: str = "" + type: str = "" + variant: Optional[str] = "" + + def __getitem__(self, key): + _dict = self.to_dict() + return _dict[key] + + def __setitem__(self, key, val): + self.__dict__[key] = val + + def __str__(self) -> str: + return self.to_json(indent=4) + + +@dataclass +class WarningResponse(DataClassJsonMixin): + """ + Warning Message from the Deepgram Platform + """ + + warn_code: str = "" + warn_msg: str = "" + type: str = "" + + def __getitem__(self, key): + _dict = self.to_dict() + return _dict[key] + + def __setitem__(self, key, val): + self.__dict__[key] = val + + def __str__(self) -> str: + return self.to_json(indent=4) + + +# Unhandled Message + + +@dataclass +class UnhandledResponse(DataClassJsonMixin): + """ + Unhandled Message from the Deepgram Platform + """ + + type: str = "" + raw: str = "" + + def __getitem__(self, key): + _dict = self.to_dict() + return _dict[key] + + def __setitem__(self, key, val): + self.__dict__[key] = val + + def __str__(self) -> str: + return self.to_json(indent=4) diff --git a/deepgram/options.py b/deepgram/options.py index 35ae8489..e92e955c 100644 --- a/deepgram/options.py +++ b/deepgram/options.py @@ -14,7 +14,7 @@ from .errors import DeepgramApiKeyError -class DeepgramClientOptions: +class DeepgramClientOptions: # pylint: disable=too-many-instance-attributes """ Represents options for configuring a Deepgram client. @@ -29,7 +29,8 @@ class DeepgramClientOptions: """ _logger: verboselogs.VerboseLogger - _inspect: bool = False + _inspect_listen: bool = False + _inspect_speak: bool = False def __init__( self, @@ -60,8 +61,10 @@ def __init__( options = {} self.options = options - if self.is_auto_flush_enabled(): - self._inspect = True + if self.is_auto_flush_reply_enabled(): + self._inspect_listen = True + if self.is_auto_flush_speak_enabled(): + self._inspect_speak = True def set_apikey(self, api_key: str): """ @@ -100,18 +103,37 @@ def is_keep_alive_enabled(self) -> bool: "keep_alive", False ) - def is_auto_flush_enabled(self) -> bool: + def is_auto_flush_reply_enabled(self) -> bool: """ - is_auto_flush_enabled: Returns True if the client is configured to auto-flush. + is_auto_flush_reply_enabled: Returns True if the client is configured to auto-flush for listen. """ - auto_flush_delta = float(self.options.get("auto_flush_reply_delta", 0)) - return isinstance(auto_flush_delta, numbers.Number) and auto_flush_delta > 0 + auto_flush_reply_delta = float(self.options.get("auto_flush_reply_delta", 0)) + return ( + isinstance(auto_flush_reply_delta, numbers.Number) + and auto_flush_reply_delta > 0 + ) + + def is_auto_flush_speak_enabled(self) -> bool: + """ + is_auto_flush_speak_enabled: Returns True if the client is configured to auto-flush for speak. + """ + auto_flush_speak_delta = float(self.options.get("auto_flush_speak_delta", 0)) + return ( + isinstance(auto_flush_speak_delta, numbers.Number) + and auto_flush_speak_delta > 0 + ) + + def is_inspecting_listen(self) -> bool: + """ + is_inspecting_listen: Returns True if the client is inspecting listen messages. + """ + return self._inspect_listen - def is_inspecting_messages(self) -> bool: + def is_inspecting_speak(self) -> bool: """ - is_inspecting_messages: Returns True if the client is inspecting messages. + is_inspecting_speak: Returns True if the client is inspecting speak messages. """ - return self._inspect + return self._inspect_speak class ClientOptionsFromEnv( diff --git a/examples/requirements-examples.txt b/examples/requirements-examples.txt index 9ce82b93..8b772e0a 100644 --- a/examples/requirements-examples.txt +++ b/examples/requirements-examples.txt @@ -5,5 +5,6 @@ python-dotenv # streaming libs pyaudio +playsound3==2.2.1 sounddevice==0.4.7 -numpy==2.0.1 \ No newline at end of file +numpy==2.0.1 diff --git a/examples/speech-to-text/rest/url/main.py b/examples/speech-to-text/rest/url/main.py index 85ebe16d..7831bf94 100644 --- a/examples/speech-to-text/rest/url/main.py +++ b/examples/speech-to-text/rest/url/main.py @@ -22,8 +22,8 @@ def main(): try: - # STEP 1 Create a Deepgram client using the API key from environment variables - deepgram: DeepgramClient = DeepgramClient("", ClientOptionsFromEnv()) + # STEP 1 Create a Deepgram client using the DEEPGRAM_API_KEY from your environment variables + deepgram: DeepgramClient = DeepgramClient() # STEP 2 Call the transcribe_url method on the rest class options: PrerecordedOptions = PrerecordedOptions( diff --git a/examples/text-to-speech/websocket/async_complete/main.py b/examples/text-to-speech/websocket/async_complete/main.py new file mode 100644 index 00000000..26f1af5e --- /dev/null +++ b/examples/text-to-speech/websocket/async_complete/main.py @@ -0,0 +1,101 @@ +# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + +import asyncio +import time +from deepgram.utils import verboselogs + +from deepgram import ( + DeepgramClient, + DeepgramClientOptions, + SpeakWebSocketEvents, + SpeakWSOptions, +) + +TTS_TEXT = "Hello, this is a text to speech example using Deepgram." + + +async def main(): + try: + # example of setting up a client config. logging values: WARNING, VERBOSE, DEBUG, SPAM + config: DeepgramClientOptions = DeepgramClientOptions( + options={"auto_flush_speak_delta": "500", "speaker_playback": "true"}, + # verbose=verboselogs.SPAM, + ) + deepgram: DeepgramClient = DeepgramClient("", config) + + # Create a websocket connection to Deepgram + dg_connection = deepgram.speak.asyncwebsocket.v("1") + + async def on_open(self, open, **kwargs): + print(f"\n\n{open}\n\n") + + async def on_binary_data(self, data, **kwargs): + print("Received binary data") + print("You can do something with the binary data here") + print("OR") + print( + "If you want to simply play the audio, set speaker_playback to true in the options for DeepgramClientOptions" + ) + + async def on_metadata(self, metadata, **kwargs): + print(f"\n\n{metadata}\n\n") + + async def on_flush(self, flushed, **kwargs): + print(f"\n\n{flushed}\n\n") + + async def on_close(self, close, **kwargs): + print(f"\n\n{close}\n\n") + + async def on_warning(self, warning, **kwargs): + print(f"\n\n{warning}\n\n") + + async def on_error(self, error, **kwargs): + print(f"\n\n{error}\n\n") + + async def on_unhandled(self, unhandled, **kwargs): + print(f"\n\n{unhandled}\n\n") + + dg_connection.on(SpeakWebSocketEvents.Open, on_open) + dg_connection.on(SpeakWebSocketEvents.AudioData, on_binary_data) + dg_connection.on(SpeakWebSocketEvents.Metadata, on_metadata) + dg_connection.on(SpeakWebSocketEvents.Flush, on_flush) + dg_connection.on(SpeakWebSocketEvents.Close, on_close) + dg_connection.on(SpeakWebSocketEvents.Error, on_error) + dg_connection.on(SpeakWebSocketEvents.Warning, on_warning) + dg_connection.on(SpeakWebSocketEvents.Unhandled, on_unhandled) + + # connect to websocket + options = SpeakWSOptions( + model="aura-asteria-en", + encoding="linear16", + sample_rate=48000, + ) + + print("\n\nPress Enter to stop...\n\n") + if await dg_connection.start(options) is False: + print("Failed to start connection") + return + + # send the text to Deepgram + await dg_connection.send_text(TTS_TEXT) + # if auto_flush_speak_delta is not used, you must flush the connection by calling flush() + await dg_connection.flush() + + # Indicate that we've finished + await asyncio.sleep(7) + + # Close the connection + await dg_connection.finish() + + print("Finished") + + except ValueError as e: + print(f"Invalid value encountered: {e}") + except Exception as e: + print(f"An unexpected error occurred: {e}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/text-to-speech/websocket/complete/main.py b/examples/text-to-speech/websocket/complete/main.py new file mode 100644 index 00000000..90f35981 --- /dev/null +++ b/examples/text-to-speech/websocket/complete/main.py @@ -0,0 +1,103 @@ +# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + +import time +from deepgram.utils import verboselogs + + +from deepgram import ( + DeepgramClient, + DeepgramClientOptions, + SpeakWebSocketEvents, + SpeakWSOptions, +) + +TTS_TEXT = "Hello, this is a text to speech example using Deepgram." + + +def main(): + try: + # example of setting up a client config. logging values: WARNING, VERBOSE, DEBUG, SPAM + config: DeepgramClientOptions = DeepgramClientOptions( + options={"auto_flush_speak_delta": "500", "speaker_playback": "true"}, + # verbose=verboselogs.SPAM, + ) + deepgram: DeepgramClient = DeepgramClient("", config) + + # Create a websocket connection to Deepgram + dg_connection = deepgram.speak.websocket.v("1") + + def on_open(self, open, **kwargs): + print(f"\n\n{open}\n\n") + + def on_binary_data(self, data, **kwargs): + print("Received binary data") + print("You can do something with the binary data here") + print("OR") + print( + "If you want to simply play the audio, set speaker_playback to true in the options for DeepgramClientOptions" + ) + + def on_metadata(self, metadata, **kwargs): + print(f"\n\n{metadata}\n\n") + + def on_flush(self, flushed, **kwargs): + print(f"\n\n{flushed}\n\n") + + def on_close(self, close, **kwargs): + print(f"\n\n{close}\n\n") + + def on_warning(self, warning, **kwargs): + print(f"\n\n{warning}\n\n") + + def on_error(self, error, **kwargs): + print(f"\n\n{error}\n\n") + + def on_unhandled(self, unhandled, **kwargs): + print(f"\n\n{unhandled}\n\n") + + dg_connection.on(SpeakWebSocketEvents.Open, on_open) + dg_connection.on(SpeakWebSocketEvents.AudioData, on_binary_data) + dg_connection.on(SpeakWebSocketEvents.Metadata, on_metadata) + dg_connection.on(SpeakWebSocketEvents.Flush, on_flush) + dg_connection.on(SpeakWebSocketEvents.Close, on_close) + dg_connection.on(SpeakWebSocketEvents.Error, on_error) + dg_connection.on(SpeakWebSocketEvents.Warning, on_warning) + dg_connection.on(SpeakWebSocketEvents.Unhandled, on_unhandled) + + # connect to websocket + options = SpeakWSOptions( + model="aura-asteria-en", + encoding="linear16", + sample_rate=48000, + ) + + print("\n\nPress Enter to stop...\n\n") + if dg_connection.start(options) is False: + print("Failed to start connection") + return + + # send the text to Deepgram + dg_connection.send_text(TTS_TEXT) + # if auto_flush_speak_delta is not used, you must flush the connection by calling flush() + dg_connection.flush() + + # Indicate that we've finished + time.sleep(5) + print("\n\nPress Enter to stop...\n\n") + input() + + # Close the connection + dg_connection.finish() + + print("Finished") + + except ValueError as e: + print(f"Invalid value encountered: {e}") + except Exception as e: + print(f"An unexpected error occurred: {e}") + + +if __name__ == "__main__": + main() diff --git a/examples/text-to-speech/websocket/simple/main.py b/examples/text-to-speech/websocket/simple/main.py index 2d1fbe46..b5a5629b 100644 --- a/examples/text-to-speech/websocket/simple/main.py +++ b/examples/text-to-speech/websocket/simple/main.py @@ -1,42 +1,91 @@ -import json -import os +# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. +# Use of this source code is governed by a MIT license that can be found in the LICENSE file. +# SPDX-License-Identifier: MIT + import time +from deepgram.utils import verboselogs +import wave -from websockets.sync.client import connect +from deepgram import ( + DeepgramClient, + DeepgramClientOptions, + SpeakWebSocketEvents, + SpeakWSOptions, +) -DEFAULT_URL = "wss://api.beta.deepgram.com/v1/speak?encoding=linear16&container=none&sample_rate=48000" -DEFAULT_TOKEN = os.environ.get("DEEPGRAM_API_KEY", None) +AUDIO_FILE = "output.wav" +TTS_TEXT = "Hello, this is a text to speech example using Deepgram. How are you doing today? I am fine thanks for asking." def main(): - _socket = connect( - DEFAULT_URL, additional_headers={"Authorization": f"Token {DEFAULT_TOKEN}"} - ) - - _story = "Hello world." - msg = json.dumps({"type": "TextInput", "text": _story}) - _socket.send(msg) - msg = json.dumps({"type": "Flush"}) - _socket.send(msg) - - # first byte - start_time = time.time() - message = _socket.recv() - end_time = time.time() - time_to_first_byte = end_time - start_time - print( - f"Connection time to first byte: {time_to_first_byte * 1000} milliseconds:\n\n{message}" - ) - - # first audio byte - message = _socket.recv() - end_time = time.time() - time_to_first_byte = end_time - start_time - print( - f"First input time to first audio byte: {time_to_first_byte * 1000} milliseconds:\n\n{message[:20]}..." - ) - - _socket.close() + try: + # example of setting up a client config. logging values: WARNING, VERBOSE, DEBUG, SPAM + # config: DeepgramClientOptions = DeepgramClientOptions( + # # options={"auto_flush_speak_delta": "500", "speaker_playback": "true"}, + # verbose=verboselogs.SPAM, + # ) + # deepgram: DeepgramClient = DeepgramClient("", config) + # otherwise, use default config + deepgram: DeepgramClient = DeepgramClient() + + # Create a websocket connection to Deepgram + dg_connection = deepgram.speak.websocket.v("1") + + def on_open(self, open, **kwargs): + print(f"\n\n{open}\n\n") + + def on_binary_data(self, data, **kwargs): + print("Received binary data") + with open(AUDIO_FILE, "ab") as f: + f.write(data) + f.flush() + + def on_close(self, close, **kwargs): + print(f"\n\n{close}\n\n") + + dg_connection.on(SpeakWebSocketEvents.Open, on_open) + dg_connection.on(SpeakWebSocketEvents.AudioData, on_binary_data) + dg_connection.on(SpeakWebSocketEvents.Close, on_close) + + # Generate a generic WAV container header + # since we don't support containerized audio, we need to generate a header + header = wave.open(AUDIO_FILE, "wb") + header.setnchannels(1) # Mono audio + header.setsampwidth(2) # 16-bit audio + header.setframerate(16000) # Sample rate of 16000 Hz + header.close() + + # connect to websocket + options = SpeakWSOptions( + model="aura-asteria-en", + encoding="linear16", + sample_rate=16000, + ) + + print("\n\nPress Enter to stop...\n\n") + if dg_connection.start(options) is False: + print("Failed to start connection") + return + + # send the text to Deepgram + dg_connection.send_text(TTS_TEXT) + # if auto_flush_speak_delta is not used, you must flush the connection by calling flush() + dg_connection.flush() + + # Indicate that we've finished + time.sleep(7) + print("\n\nPress Enter to stop...\n\n") + input() + + # Close the connection + dg_connection.finish() + + print("Finished") + + except ValueError as e: + print(f"Invalid value encountered: {e}") + except Exception as e: + print(f"An unexpected error occurred: {e}") if __name__ == "__main__":