From 5b122ca913fea7b9e8b0f2e0bb268af13997e19a Mon Sep 17 00:00:00 2001 From: David vonThenen <12752197+dvonthenen@users.noreply.github.com> Date: Thu, 14 Dec 2023 18:08:04 -0800 Subject: [PATCH] Enable Inheritance, Pass Self in Callbacks, Make Async/Sync Client StartUp Same --- deepgram/client.py | 1 + deepgram/clients/listen.py | 10 +++- deepgram/clients/live/client.py | 2 + deepgram/clients/live/v1/async_client.py | 59 +++++++++++++------- deepgram/clients/live/v1/client.py | 56 +++++++++++++------ examples/streaming/async_http/main.py | 71 ++++++++++++++---------- examples/streaming/http/main.py | 26 ++++++++- examples/streaming/microphone/main.py | 34 ++++++------ 8 files changed, 173 insertions(+), 86 deletions(-) diff --git a/deepgram/client.py b/deepgram/client.py index 653946b0..036a0e67 100644 --- a/deepgram/client.py +++ b/deepgram/client.py @@ -14,6 +14,7 @@ AsyncPreRecordedClient, PrerecordedOptions, LiveOptions, + LiveTranscriptionEvents, ) from .clients.onprem.client import OnPremClient from .clients.onprem.v1.async_client import AsyncOnPremClient diff --git a/deepgram/clients/listen.py b/deepgram/clients/listen.py index ab16bff8..1f1054da 100644 --- a/deepgram/clients/listen.py +++ b/deepgram/clients/listen.py @@ -12,7 +12,15 @@ AsyncPreRecordedClient, PrerecordedOptions, ) -from .live.client import LiveClient, AsyncLiveClient, LiveOptions +from .live.client import ( + LiveClient, + AsyncLiveClient, + LiveOptions, + LiveResultResponse, + MetadataResponse, + ErrorResponse, + LiveTranscriptionEvents, +) from .errors import DeepgramModuleError diff --git a/deepgram/clients/live/client.py b/deepgram/clients/live/client.py index e362c098..1e6cf76a 100644 --- a/deepgram/clients/live/client.py +++ b/deepgram/clients/live/client.py @@ -5,6 +5,8 @@ from .v1.client import LiveClient as LiveClientLatest from .v1.async_client import AsyncLiveClient as AsyncLiveClientLatest from .v1.options import LiveOptions as LiveOptionsLatest +from .enums import LiveTranscriptionEvents +from .v1.response import LiveResultResponse, MetadataResponse, ErrorResponse """ The vX/client.py points to the current supported version in the SDK. diff --git a/deepgram/clients/live/v1/async_client.py b/deepgram/clients/live/v1/async_client.py index 6fcc445c..f55e8d43 100644 --- a/deepgram/clients/live/v1/async_client.py +++ b/deepgram/clients/live/v1/async_client.py @@ -11,6 +11,7 @@ from ..helpers import convert_to_websocket_url, append_query_params from ..errors import DeepgramError +from .response import LiveResultResponse, MetadataResponse, ErrorResponse from .options import LiveOptions @@ -38,14 +39,14 @@ def __init__(self, config: DeepgramClientOptions): self._event_handlers = {event: [] for event in LiveTranscriptionEvents} self.websocket_url = convert_to_websocket_url(self.config.url, self.endpoint) - async def __call__(self, options: LiveOptions = None): - """ - Establishes a WebSocket connection for live transcription. - """ - self.logger.debug("AsyncLiveClient.__call__ ENTER") - self.logger.info("options: %s", options) + async def start(self, options: LiveOptions = None, **kwargs): + self.logger.debug("AsyncLiveClient.start ENTER") + self.logger.info("kwargs: %s", options) + self.logger.info("options: %s", kwargs) self.options = options + self.kwargs = kwargs + if isinstance(options, LiveOptions): self.logger.info("LiveOptions switching class -> json") self.options = self.options.to_dict() @@ -55,13 +56,13 @@ async def __call__(self, options: LiveOptions = None): self._socket = await _socket_connect(url_with_params, self.config.headers) asyncio.create_task(self._start()) - self.logger.notice("__call__ succeeded") - self.logger.debug("AsyncLiveClient.__call__ LEAVE") + self.logger.notice("start succeeded") + self.logger.debug("AsyncLiveClient.start LEAVE") return self except websockets.ConnectionClosed as e: await self._emit(LiveTranscriptionEvents.Close, e.code) self.logger.notice("exception: websockets.ConnectionClosed") - self.logger.debug("AsyncLiveClient.__call__ LEAVE") + self.logger.debug("AsyncLiveClient.start LEAVE") def on(self, event, handler): """ @@ -74,7 +75,7 @@ async def _emit( self, event, *args, **kwargs ): # triggers the registered event handlers for a specific event for handler in self._event_handlers[event]: - handler(*args, **kwargs) + handler(self, *args, **kwargs) async def _start(self) -> None: self.logger.debug("AsyncLiveClient._start ENTER") @@ -85,25 +86,45 @@ async def _start(self) -> None: response_type = data.get("type") match response_type: case LiveTranscriptionEvents.Transcript.value: - self.logger.verbose( + self.logger.debug( "response_type: %s, data: %s", response_type, data ) - await self._emit(LiveTranscriptionEvents.Transcript, data) - case LiveTranscriptionEvents.Error.value: - self.logger.verbose( - "response_type: %s, data: %s", response_type, data + result = LiveResultResponse.from_json(message) + await self._emit( + LiveTranscriptionEvents.Transcript, + result=result, + kwargs=self.kwargs, ) - await self._emit(LiveTranscriptionEvents.Error, data) case LiveTranscriptionEvents.Metadata.value: - self.logger.verbose( + self.logger.debug( "response_type: %s, data: %s", response_type, data ) - await self._emit(LiveTranscriptionEvents.Metadata, data) + result = ErrorResponse.from_json(message) + await self._emit( + LiveTranscriptionEvents.Metadata, + metadata=result, + kwargs=self.kwargs, + ) + case LiveTranscriptionEvents.Error.value: + self.logger.debug( + "response_type: %s, data: %s", response_type, data + ) + result = MetadataResponse.from_json(message) + await self._emit( + LiveTranscriptionEvents.Error, + error=result, + kwargs=self.kwargs, + ) case _: self.logger.error( "response_type: %s, data: %s", response_type, data ) - await self._emit(LiveTranscriptionEvents.Error, data) + error = ErrorResponse( + type="UnhandledMessage", + description="Unknown message type", + message=f"Unhandle message type: {response_type}", + ) + await self._emit(LiveTranscriptionEvents.Error, error=error) except json.JSONDecodeError as e: await self._emit(LiveTranscriptionEvents.Error, e.code) self.logger.error("exception: json.JSONDecodeError: %s", str(e)) diff --git a/deepgram/clients/live/v1/client.py b/deepgram/clients/live/v1/client.py index daf97e06..1802153e 100644 --- a/deepgram/clients/live/v1/client.py +++ b/deepgram/clients/live/v1/client.py @@ -43,14 +43,17 @@ def __init__(self, config: DeepgramClientOptions): self._event_handlers = {event: [] for event in LiveTranscriptionEvents} self.websocket_url = convert_to_websocket_url(self.config.url, self.endpoint) - def start(self, options: LiveOptions = None): + def start(self, options: LiveOptions = None, **kwargs): """ Starts the WebSocket connection for live transcription. """ self.logger.debug("LiveClient.start ENTER") - self.logger.info("options: %s", options) + self.logger.info("kwargs: %s", options) + self.logger.info("options: %s", kwargs) self.options = options + self.kwargs = kwargs + if isinstance(options, LiveOptions): self.logger.info("LiveOptions switching class -> json") self.options = self.options.to_dict() @@ -89,11 +92,9 @@ def on(self, event, handler): if event in LiveTranscriptionEvents and callable(handler): self._event_handlers[event].append(handler) - def _emit( - self, event, *args, **kwargs - ): + def _emit(self, event, *args, **kwargs): for handler in self._event_handlers[event]: - handler(*args, **kwargs) + handler(self, *args, **kwargs) def _listening(self) -> None: self.logger.debug("LiveClient._listening ENTER") @@ -119,22 +120,45 @@ def _listening(self) -> None: match response_type: case LiveTranscriptionEvents.Transcript.value: + self.logger.debug( + "response_type: %s, data: %s", response_type, data + ) result = LiveResultResponse.from_json(message) - self._emit(LiveTranscriptionEvents.Transcript, result=result) + self._emit( + LiveTranscriptionEvents.Transcript, + result=result, + kwargs=self.kwargs, + ) case LiveTranscriptionEvents.Metadata.value: + self.logger.debug( + "response_type: %s, data: %s", response_type, data + ) result = MetadataResponse.from_json(message) - self._emit(LiveTranscriptionEvents.Metadata, metadata=result) + self._emit( + LiveTranscriptionEvents.Metadata, + metadata=result, + kwargs=self.kwargs, + ) case LiveTranscriptionEvents.Error.value: + self.logger.debug( + "response_type: %s, data: %s", response_type, data + ) result = ErrorResponse.from_json(message) - self._emit(LiveTranscriptionEvents.Error, error=result) + self._emit( + LiveTranscriptionEvents.Error, + error=result, + kwargs=self.kwargs, + ) case _: - error: ErrorResponse = { - "type": "UnhandledMessage", - "description": "Unknown message type", - "message": f"Unhandle message type: {response_type}", - "variant": "", - } - self._emit(LiveTranscriptionEvents.Error, error) + self.logger.error( + "response_type: %s, data: %s", response_type, data + ) + error = ErrorResponse( + type="UnhandledMessage", + description="Unknown message type", + message=f"Unhandle message type: {response_type}", + ) + self._emit(LiveTranscriptionEvents.Error, error=error) except Exception as e: if e.code == 1000: diff --git a/examples/streaming/async_http/main.py b/examples/streaming/async_http/main.py index c1eec989..cf22653e 100644 --- a/examples/streaming/async_http/main.py +++ b/examples/streaming/async_http/main.py @@ -20,44 +20,55 @@ # URL for the realtime streaming audio you would like to transcribe URL = "http://stream.live.vc.bbcmedia.co.uk/bbc_world_service" -deepgram_api_key = os.getenv("DG_API_KEY") - async def main(): - deepgram = DeepgramClient(deepgram_api_key) + deepgram = DeepgramClient() # Create a websocket connection to Deepgram try: - dg_connection = await deepgram.listen.asynclive.v("1")(options) + dg_connection = deepgram.listen.asynclive.v("1") + + def on_message(self, result, **kwargs): + if result is None: + return + sentence = result.channel.alternatives[0].transcript + if len(sentence) == 0: + return + print(f"speaker: {sentence}") + + def on_metadata(self, metadata, **kwargs): + if metadata is None: + return + print(f"\n\n{metadata}\n\n") + + def on_error(self, error, **kwargs): + if error is None: + return + print(f"\n\n{error}\n\n") + + dg_connection.on(LiveTranscriptionEvents.Transcript, on_message) + dg_connection.on(LiveTranscriptionEvents.Metadata, on_metadata) + dg_connection.on(LiveTranscriptionEvents.Error, on_error) + + # connect to websocket + await dg_connection.start(options) + + # Send streaming audio from the URL to Deepgram + async with aiohttp.ClientSession() as session: + async with session.get(URL) as audio: + while True: + data = await audio.content.readany() + # send audio data through the socket + await dg_connection.send(data) + # If no data is being sent from the live stream, then break out of the loop. + if not data: + break + + # Indicate that we've finished sending data by sending the {"type": "CloseStream"} + await dg_connection.finish() except Exception as e: print(f"Could not open socket: {e}") return - # Listen for transcripts received from Deepgram and write them to the console - dg_connection.on(LiveTranscriptionEvents.Transcript, print) - - # Listen for metadata received from Deepgram and write to the console - dg_connection.on(LiveTranscriptionEvents.Metadata, print) - - # Listen for the connection to close - dg_connection.on( - LiveTranscriptionEvents.Close, - lambda c: print(f"Connection closed with code {c}."), - ) - - # Send streaming audio from the URL to Deepgram - async with aiohttp.ClientSession() as session: - async with session.get(URL) as audio: - while True: - data = await audio.content.readany() - # send audio data through the socket - await dg_connection.send(data) - # If no data is being sent from the live stream, then break out of the loop. - if not data: - break - - # Indicate that we've finished sending data by sending the {"type": "CloseStream"} - await dg_connection.finish() - asyncio.run(main()) diff --git a/examples/streaming/http/main.py b/examples/streaming/http/main.py index 81e8550c..b93e52ce 100644 --- a/examples/streaming/http/main.py +++ b/examples/streaming/http/main.py @@ -11,11 +11,10 @@ load_dotenv() -options = LiveOptions(model="nova", interim_results=False, language="en-US") - # URL for the realtime streaming audio you would like to transcribe URL = "http://stream.live.vc.bbcmedia.co.uk/bbc_world_service" + def main(): try: deepgram = DeepgramClient() @@ -40,12 +39,33 @@ def on_error(error=None): # Create a websocket connection to Deepgram dg_connection = deepgram.listen.live.v("1") - dg_connection.start(options) + + def on_message(self, result, **kwargs): + if result is None: + return + sentence = result.channel.alternatives[0].transcript + if len(sentence) == 0: + return + print(f"speaker: {sentence}") + + def on_metadata(self, metadata, **kwargs): + if metadata is None: + return + print(f"\n\n{metadata}\n\n") + + def on_error(self, error, **kwargs): + if error is None: + return + print(f"\n\n{error}\n\n") dg_connection.on(LiveTranscriptionEvents.Transcript, on_message) dg_connection.on(LiveTranscriptionEvents.Metadata, on_metadata) dg_connection.on(LiveTranscriptionEvents.Error, on_error) + # connect to websocket + options = LiveOptions(model="nova", interim_results=False, language="en-US") + dg_connection.start(options) + lock_exit = threading.Lock() exit = False diff --git a/examples/streaming/microphone/main.py b/examples/streaming/microphone/main.py index adb5adad..94e6105c 100644 --- a/examples/streaming/microphone/main.py +++ b/examples/streaming/microphone/main.py @@ -16,6 +16,7 @@ load_dotenv() + def main(): try: # example of setting up a client config @@ -27,16 +28,9 @@ def main(): # otherwise, use default config deepgram = DeepgramClient() - # Create a websocket connection to Deepgram - options = LiveOptions( - punctuate=True, - language="en-US", - encoding="linear16", - channels=1, - sample_rate=16000, - ) + dg_connection = deepgram.listen.live.v("1") - def on_message(result=None): + def on_message(self, result, **kwargs): if result is None: return sentence = result.channel.alternatives[0].transcript @@ -44,24 +38,30 @@ def on_message(result=None): return print(f"speaker: {sentence}") - def on_metadata(metadata=None): + def on_metadata(self, metadata, **kwargs): if metadata is None: return - print(f"\n{metadata}\n") + print(f"\n\n{metadata}\n\n") - def on_error(error=None): + def on_error(self, error, **kwargs): if error is None: return - print(f"\n{error}\n") - - dg_connection = deepgram.listen.live.v("1") - dg_connection.start(options) + print(f"\n\n{error}\n\n") dg_connection.on(LiveTranscriptionEvents.Transcript, on_message) dg_connection.on(LiveTranscriptionEvents.Metadata, on_metadata) dg_connection.on(LiveTranscriptionEvents.Error, on_error) - # create microphone + options = LiveOptions( + punctuate=True, + language="en-US", + encoding="linear16", + channels=1, + sample_rate=16000, + ) + dg_connection.start(options) + + # Open a microphone stream microphone = Microphone(dg_connection.send) # start microphone