Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve OCP detection + speed up legacy playback start #34

Merged
merged 3 commits into from
Nov 22, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 63 additions & 35 deletions ocp_pipeline/opm.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from dataclasses import dataclass
from os.path import join, dirname
from threading import RLock
from typing import Tuple, Optional, Dict, List, Union
from typing import Tuple, Optional, Dict, List, Union, Any

from langcodes import closest_match
from ovos_bus_client.apis.ocp import ClassicAudioServiceInterface
Expand Down Expand Up @@ -38,6 +38,11 @@ class OCPPlayerProxy:
media_type: MediaType = MediaType.GENERIC


# for easier typing
RawResultsList = List[Union[MediaEntry, Playlist, PluginStream, Dict[str, Any]]]
NormalizedResultsList = List[Union[MediaEntry, Playlist, PluginStream]]


class OCPPipelineMatcher(ConfidenceMatcherPipeline, OVOSAbstractApplication):
intents = ["play.intent", "open.intent", "media_stop.intent",
"next.intent", "prev.intent", "pause.intent", "play_favorites.intent",
Expand Down Expand Up @@ -145,7 +150,6 @@ def register_ocp_api_events(self):
@classmethod
def load_intent_files(cls):
intent_files = cls.load_resource_files()

for lang, intent_data in intent_files.items():
lang = standardize_lang_tag(lang)
cls.intent_matchers[lang] = IntentContainer()
Expand Down Expand Up @@ -756,6 +760,31 @@ def _should_resume(self, phrase: str, lang: str, message: Optional[Message] = No
return False

# search
def _player_sync(self, player: OCPPlayerProxy, message: Optional[Message] = None, timeout=1) -> OCPPlayerProxy:

if not self.config.get("legacy"): # force legacy audio in config
ev = threading.Event()

def handle_m(m):
nonlocal player
s = SessionManager.get(m)
if s.session_id == player.session_id:
player.available_extractors = m.data["SEI"]
player.ocp_available = True
self.update_player_proxy(player)
ev.set()
LOG.info(f"Session: {player.session_id} Available stream extractor plugins: {m.data['SEI']}")

self.bus.on("ovos.common_play.SEI.get.response", handle_m)
message = message or dig_for_message() or Message("") # get message.context to forward
self.bus.emit(message.forward("ovos.common_play.SEI.get"))
ev.wait(timeout)
self.bus.remove("ovos.common_play.SEI.get.response", handle_m)

if not ev.is_set():
LOG.warning(f"Player synchronization timed out after {timeout} seconds")

return player
def get_player(self, message: Optional[Message] = None, timeout=1) -> OCPPlayerProxy:
"""get a PlayerProxy object, containing info such as player state and the available stream extractors from OCP
this is tracked per Session, if needed requests the info from the client"""
Expand All @@ -764,27 +793,15 @@ def get_player(self, message: Optional[Message] = None, timeout=1) -> OCPPlayerP
player = OCPPlayerProxy(available_extractors=available_extractors(),
ocp_available=False,
session_id=sess.session_id)
if not self.config.get("legacy"): # force legacy audio in config
ev = threading.Event()

def handle_m(m):
s = SessionManager.get(m)
if s.session_id == player.session_id:
player.available_extractors = m.data["SEI"]
player.ocp_available = True
ev.set()
LOG.info(f"Session: {player.session_id} Available stream extractor plugins: {m.data['SEI']}")

self.bus.on("ovos.common_play.SEI.get.response", handle_m)
message = message or dig_for_message() or Message("") # get message.context to forward
self.bus.emit(message.forward("ovos.common_play.SEI.get"))
ev.wait(timeout)
self.bus.remove("ovos.common_play.SEI.get.response", handle_m)
self.update_player_proxy(player)

return self.ocp_sessions[sess.session_id]
else:
player = self.ocp_sessions[sess.session_id]
if not player.ocp_available and not self.config.get("legacy"):
# OCP might have loaded meanwhile
player = self._player_sync(player, message, timeout)
return player

def normalize_results(self, results: list) -> List[Union[MediaEntry, Playlist, PluginStream]]:
@staticmethod
def normalize_results(results: RawResultsList) -> NormalizedResultsList:
# support Playlist and MediaEntry objects in tracks
for idx, track in enumerate(results):
if isinstance(track, dict):
Expand Down Expand Up @@ -975,29 +992,40 @@ def select_best(results: list, message: Message) -> Union[MediaEntry, Playlist,

##################
# Legacy Audio subsystem API
def legacy_play(self, results: List[Union[MediaEntry, Playlist, PluginStream]], phrase="",
def legacy_play(self, results: NormalizedResultsList, phrase="",
message: Optional[Message] = None):
res = []
for r in results:
player = self.get_player(message)
player.media_state = MediaState.LOADING_MEDIA
playing = False
for idx, r in enumerate(results):
real_uri = None
if not (r.playback == PlaybackType.AUDIO or r.media_type in OCPQuery.cast2audio):
# we need to filter video results
continue
if isinstance(r, Playlist):
# get internal entries from the playlist
for e in r.entries:
res.append(e.uri)
real_uri = [e.uri for e in r.entries]
elif isinstance(r, MediaEntry):
res.append(r.uri)
real_uri = r.uri
elif isinstance(r, PluginStream):
# for legacy audio service we need to do stream extraction here
res.append(r.extract_uri(video=False))

self.legacy_api.play(res, utterance=phrase, source_message=message)
LOG.debug(f"extracting uri: {r.stream}")
# TODO - apparently it can hang here forever ???
# happens with https://www.cbc.ca/podcasting/includes/hourlynews.xml from news skill
try:
real_uri = r.extract_uri(video=False)
except Exception as e:
LOG.exception(f"extraction failed: {r}")
if not real_uri:
JarbasAl marked this conversation as resolved.
Show resolved Hide resolved
continue
if not playing:
playing = True
self.legacy_api.play(real_uri, utterance=phrase, source_message=message)
player.player_state = PlayerState.PLAYING
self.update_player_proxy(player)
else:
self.legacy_api.queue(real_uri, source_message=message)
JarbasAl marked this conversation as resolved.
Show resolved Hide resolved

player = self.get_player(message)
player.player_state = PlayerState.PLAYING
player.media_state = MediaState.LOADING_MEDIA
self.update_player_proxy(player)

def _handle_legacy_audio_stop(self, message: Message):
player = self.get_player(message)
Expand Down
Loading