Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[LiveClient] Pass Self in Callbacks, Make Async/Sync Client StartUp Same #213

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions deepgram/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
AsyncPreRecordedClient,
PrerecordedOptions,
LiveOptions,
LiveTranscriptionEvents,
)
from .clients.onprem.client import OnPremClient
from .clients.onprem.v1.async_client import AsyncOnPremClient
Expand Down
10 changes: 9 additions & 1 deletion deepgram/clients/listen.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,15 @@
AsyncPreRecordedClient,
PrerecordedOptions,
)
from .live.client import LiveClient, AsyncLiveClient, LiveOptions
from .live.client import (
LiveClient,
AsyncLiveClient,
LiveOptions,
LiveResultResponse,
MetadataResponse,
ErrorResponse,
LiveTranscriptionEvents,
)
from .errors import DeepgramModuleError


Expand Down
2 changes: 2 additions & 0 deletions deepgram/clients/live/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from .v1.client import LiveClient as LiveClientLatest
from .v1.async_client import AsyncLiveClient as AsyncLiveClientLatest
from .v1.options import LiveOptions as LiveOptionsLatest
from .enums import LiveTranscriptionEvents
from .v1.response import LiveResultResponse, MetadataResponse, ErrorResponse

"""
The vX/client.py points to the current supported version in the SDK.
Expand Down
59 changes: 40 additions & 19 deletions deepgram/clients/live/v1/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from ..helpers import convert_to_websocket_url, append_query_params
from ..errors import DeepgramError

from .response import LiveResultResponse, MetadataResponse, ErrorResponse
from .options import LiveOptions


Expand Down Expand Up @@ -38,14 +39,14 @@ def __init__(self, config: DeepgramClientOptions):
self._event_handlers = {event: [] for event in LiveTranscriptionEvents}
self.websocket_url = convert_to_websocket_url(self.config.url, self.endpoint)

async def __call__(self, options: LiveOptions = None):
"""
Establishes a WebSocket connection for live transcription.
"""
self.logger.debug("AsyncLiveClient.__call__ ENTER")
self.logger.info("options: %s", options)
async def start(self, options: LiveOptions = None, **kwargs):
self.logger.debug("AsyncLiveClient.start ENTER")
self.logger.info("kwargs: %s", options)
self.logger.info("options: %s", kwargs)

self.options = options
self.kwargs = kwargs

if isinstance(options, LiveOptions):
self.logger.info("LiveOptions switching class -> json")
self.options = self.options.to_dict()
Expand All @@ -55,13 +56,13 @@ async def __call__(self, options: LiveOptions = None):
self._socket = await _socket_connect(url_with_params, self.config.headers)
asyncio.create_task(self._start())

self.logger.notice("__call__ succeeded")
self.logger.debug("AsyncLiveClient.__call__ LEAVE")
self.logger.notice("start succeeded")
self.logger.debug("AsyncLiveClient.start LEAVE")
return self
except websockets.ConnectionClosed as e:
await self._emit(LiveTranscriptionEvents.Close, e.code)
self.logger.notice("exception: websockets.ConnectionClosed")
self.logger.debug("AsyncLiveClient.__call__ LEAVE")
self.logger.debug("AsyncLiveClient.start LEAVE")

def on(self, event, handler):
"""
Expand All @@ -74,7 +75,7 @@ async def _emit(
self, event, *args, **kwargs
): # triggers the registered event handlers for a specific event
for handler in self._event_handlers[event]:
handler(*args, **kwargs)
handler(self, *args, **kwargs)

async def _start(self) -> None:
self.logger.debug("AsyncLiveClient._start ENTER")
Expand All @@ -85,25 +86,45 @@ async def _start(self) -> None:
response_type = data.get("type")
match response_type:
case LiveTranscriptionEvents.Transcript.value:
self.logger.verbose(
self.logger.debug(
"response_type: %s, data: %s", response_type, data
)
await self._emit(LiveTranscriptionEvents.Transcript, data)
case LiveTranscriptionEvents.Error.value:
self.logger.verbose(
"response_type: %s, data: %s", response_type, data
result = LiveResultResponse.from_json(message)
await self._emit(
LiveTranscriptionEvents.Transcript,
result=result,
kwargs=self.kwargs,
)
await self._emit(LiveTranscriptionEvents.Error, data)
case LiveTranscriptionEvents.Metadata.value:
self.logger.verbose(
self.logger.debug(
"response_type: %s, data: %s", response_type, data
)
await self._emit(LiveTranscriptionEvents.Metadata, data)
result = ErrorResponse.from_json(message)
await self._emit(
LiveTranscriptionEvents.Metadata,
metadata=result,
kwargs=self.kwargs,
)
case LiveTranscriptionEvents.Error.value:
self.logger.debug(
"response_type: %s, data: %s", response_type, data
)
result = MetadataResponse.from_json(message)
await self._emit(
LiveTranscriptionEvents.Error,
error=result,
kwargs=self.kwargs,
)
case _:
self.logger.error(
"response_type: %s, data: %s", response_type, data
)
await self._emit(LiveTranscriptionEvents.Error, data)
error = ErrorResponse(
type="UnhandledMessage",
description="Unknown message type",
message=f"Unhandle message type: {response_type}",
)
await self._emit(LiveTranscriptionEvents.Error, error=error)
except json.JSONDecodeError as e:
await self._emit(LiveTranscriptionEvents.Error, e.code)
self.logger.error("exception: json.JSONDecodeError: %s", str(e))
Expand Down
56 changes: 40 additions & 16 deletions deepgram/clients/live/v1/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,17 @@ def __init__(self, config: DeepgramClientOptions):
self._event_handlers = {event: [] for event in LiveTranscriptionEvents}
self.websocket_url = convert_to_websocket_url(self.config.url, self.endpoint)

def start(self, options: LiveOptions = None):
def start(self, options: LiveOptions = None, **kwargs):
"""
Starts the WebSocket connection for live transcription.
"""
self.logger.debug("LiveClient.start ENTER")
self.logger.info("options: %s", options)
self.logger.info("kwargs: %s", options)
self.logger.info("options: %s", kwargs)

self.options = options
self.kwargs = kwargs

if isinstance(options, LiveOptions):
self.logger.info("LiveOptions switching class -> json")
self.options = self.options.to_dict()
Expand Down Expand Up @@ -89,11 +92,9 @@ def on(self, event, handler):
if event in LiveTranscriptionEvents and callable(handler):
self._event_handlers[event].append(handler)

def _emit(
self, event, *args, **kwargs
):
def _emit(self, event, *args, **kwargs):
for handler in self._event_handlers[event]:
handler(*args, **kwargs)
handler(self, *args, **kwargs)

def _listening(self) -> None:
self.logger.debug("LiveClient._listening ENTER")
Expand All @@ -119,22 +120,45 @@ def _listening(self) -> None:

match response_type:
case LiveTranscriptionEvents.Transcript.value:
self.logger.debug(
"response_type: %s, data: %s", response_type, data
)
result = LiveResultResponse.from_json(message)
self._emit(LiveTranscriptionEvents.Transcript, result=result)
self._emit(
LiveTranscriptionEvents.Transcript,
result=result,
kwargs=self.kwargs,
)
case LiveTranscriptionEvents.Metadata.value:
self.logger.debug(
"response_type: %s, data: %s", response_type, data
)
result = MetadataResponse.from_json(message)
self._emit(LiveTranscriptionEvents.Metadata, metadata=result)
self._emit(
LiveTranscriptionEvents.Metadata,
metadata=result,
kwargs=self.kwargs,
)
case LiveTranscriptionEvents.Error.value:
self.logger.debug(
"response_type: %s, data: %s", response_type, data
)
result = ErrorResponse.from_json(message)
self._emit(LiveTranscriptionEvents.Error, error=result)
self._emit(
LiveTranscriptionEvents.Error,
error=result,
kwargs=self.kwargs,
)
case _:
error: ErrorResponse = {
"type": "UnhandledMessage",
"description": "Unknown message type",
"message": f"Unhandle message type: {response_type}",
"variant": "",
}
self._emit(LiveTranscriptionEvents.Error, error)
self.logger.error(
"response_type: %s, data: %s", response_type, data
)
error = ErrorResponse(
type="UnhandledMessage",
description="Unknown message type",
message=f"Unhandle message type: {response_type}",
)
self._emit(LiveTranscriptionEvents.Error, error=error)

except Exception as e:
if e.code == 1000:
Expand Down
71 changes: 41 additions & 30 deletions examples/streaming/async_http/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,44 +20,55 @@
# URL for the realtime streaming audio you would like to transcribe
URL = "http://stream.live.vc.bbcmedia.co.uk/bbc_world_service"

deepgram_api_key = os.getenv("DG_API_KEY")


async def main():
deepgram = DeepgramClient(deepgram_api_key)
deepgram = DeepgramClient()

# Create a websocket connection to Deepgram
try:
dg_connection = await deepgram.listen.asynclive.v("1")(options)
dg_connection = deepgram.listen.asynclive.v("1")

def on_message(self, result, **kwargs):
if result is None:
return
sentence = result.channel.alternatives[0].transcript
if len(sentence) == 0:
return
print(f"speaker: {sentence}")

def on_metadata(self, metadata, **kwargs):
if metadata is None:
return
print(f"\n\n{metadata}\n\n")

def on_error(self, error, **kwargs):
if error is None:
return
print(f"\n\n{error}\n\n")

dg_connection.on(LiveTranscriptionEvents.Transcript, on_message)
dg_connection.on(LiveTranscriptionEvents.Metadata, on_metadata)
dg_connection.on(LiveTranscriptionEvents.Error, on_error)

# connect to websocket
await dg_connection.start(options)

# Send streaming audio from the URL to Deepgram
async with aiohttp.ClientSession() as session:
async with session.get(URL) as audio:
while True:
data = await audio.content.readany()
# send audio data through the socket
await dg_connection.send(data)
# If no data is being sent from the live stream, then break out of the loop.
if not data:
break

# Indicate that we've finished sending data by sending the {"type": "CloseStream"}
await dg_connection.finish()
except Exception as e:
print(f"Could not open socket: {e}")
return

# Listen for transcripts received from Deepgram and write them to the console
dg_connection.on(LiveTranscriptionEvents.Transcript, print)

# Listen for metadata received from Deepgram and write to the console
dg_connection.on(LiveTranscriptionEvents.Metadata, print)

# Listen for the connection to close
dg_connection.on(
LiveTranscriptionEvents.Close,
lambda c: print(f"Connection closed with code {c}."),
)

# Send streaming audio from the URL to Deepgram
async with aiohttp.ClientSession() as session:
async with session.get(URL) as audio:
while True:
data = await audio.content.readany()
# send audio data through the socket
await dg_connection.send(data)
# If no data is being sent from the live stream, then break out of the loop.
if not data:
break

# Indicate that we've finished sending data by sending the {"type": "CloseStream"}
await dg_connection.finish()


asyncio.run(main())
26 changes: 23 additions & 3 deletions examples/streaming/http/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@

load_dotenv()

options = LiveOptions(model="nova", interim_results=False, language="en-US")

# URL for the realtime streaming audio you would like to transcribe
URL = "http://stream.live.vc.bbcmedia.co.uk/bbc_world_service"


def main():
try:
deepgram = DeepgramClient()
Expand All @@ -40,12 +39,33 @@ def on_error(error=None):

# Create a websocket connection to Deepgram
dg_connection = deepgram.listen.live.v("1")
dg_connection.start(options)

def on_message(self, result, **kwargs):
if result is None:
return
sentence = result.channel.alternatives[0].transcript
if len(sentence) == 0:
return
print(f"speaker: {sentence}")

def on_metadata(self, metadata, **kwargs):
if metadata is None:
return
print(f"\n\n{metadata}\n\n")

def on_error(self, error, **kwargs):
if error is None:
return
print(f"\n\n{error}\n\n")

dg_connection.on(LiveTranscriptionEvents.Transcript, on_message)
dg_connection.on(LiveTranscriptionEvents.Metadata, on_metadata)
dg_connection.on(LiveTranscriptionEvents.Error, on_error)

# connect to websocket
options = LiveOptions(model="nova", interim_results=False, language="en-US")
dg_connection.start(options)

lock_exit = threading.Lock()
exit = False

Expand Down
Loading