Skip to content

Commit

Permalink
bug hunt
Browse files Browse the repository at this point in the history
  • Loading branch information
JarbasAl committed Nov 22, 2024
1 parent d7c872f commit a85f4b0
Showing 1 changed file with 84 additions and 45 deletions.
129 changes: 84 additions & 45 deletions ocp_pipeline/opm.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from dataclasses import dataclass
from os.path import join, dirname
from threading import RLock
from typing import Tuple, Optional, Dict, List, Union
from typing import Tuple, Optional, Dict, List, Union, Any

from langcodes import closest_match
from ovos_bus_client.apis.ocp import ClassicAudioServiceInterface
Expand Down Expand Up @@ -38,6 +38,11 @@ class OCPPlayerProxy:
media_type: MediaType = MediaType.GENERIC


# for easier typing
RawResultsList = List[Union[MediaEntry, Playlist, PluginStream, Dict[str, Any]]]
NormalizedResultsList = List[Union[MediaEntry, Playlist, PluginStream]]


class OCPPipelineMatcher(ConfidenceMatcherPipeline, OVOSAbstractApplication):
intents = ["play.intent", "open.intent", "media_stop.intent",
"next.intent", "prev.intent", "pause.intent", "play_favorites.intent",
Expand Down Expand Up @@ -145,7 +150,6 @@ def register_ocp_api_events(self):
@classmethod
def load_intent_files(cls):
intent_files = cls.load_resource_files()

for lang, intent_data in intent_files.items():
lang = standardize_lang_tag(lang)
cls.intent_matchers[lang] = IntentContainer()
Expand Down Expand Up @@ -536,17 +540,26 @@ def handle_play_intent(self, message: Message):
results = [r for r in results if r.as_dict != best.as_dict]
results.insert(0, best)
self.set_context("Playing", origin=OCP_ID)
try:
# ovos-PHAL-plugin-mk1 will display music icon in response to play message
player = self.get_player(message)
LOG.debug(f"OCP player: {player}")
if not player.ocp_available:
LOG.debug(f"OCP legacy play: {results}")
self.legacy_play(results, query, message=message)
else:
LOG.debug(f"OCP play: {results}")
self.ocp_api.play(tracks=[best], utterance=query, source_message=message)

# ovos-PHAL-plugin-mk1 will display music icon in response to play message
player = self.get_player(message)
if not player.ocp_available:
self.legacy_play(results, query, message=message)
else:
self.ocp_api.play(tracks=[best], utterance=query, source_message=message)
self.ocp_api.populate_search_results(tracks=results,
replace=True,
sort_by_conf=False, # already sorted
source_message=message)
LOG.debug(f"OCP populate results: {results}")
self.ocp_api.populate_search_results(tracks=results,
replace=True,
sort_by_conf=False, # already sorted
source_message=message)
except Exception as e:
LOG.exception(f"ERROR: {e}")

LOG.debug(f"OCP handled: {query}")

def handle_open_intent(self, message: Message):
LOG.info("Requesting OCP homescreen")
Expand Down Expand Up @@ -756,6 +769,29 @@ def _should_resume(self, phrase: str, lang: str, message: Optional[Message] = No
return False

# search
def _player_sync(self, player: OCPPlayerProxy, message: Optional[Message] = None, timeout=1) -> OCPPlayerProxy:

if not self.config.get("legacy"): # force legacy audio in config
ev = threading.Event()

def handle_m(m):
nonlocal player
s = SessionManager.get(m)
if s.session_id == player.session_id:
player.available_extractors = m.data["SEI"]
player.ocp_available = True
ev.set()
LOG.info(f"Session: {player.session_id} Available stream extractor plugins: {m.data['SEI']}")

self.bus.on("ovos.common_play.SEI.get.response", handle_m)
message = message or dig_for_message() or Message("") # get message.context to forward
self.bus.emit(message.forward("ovos.common_play.SEI.get"))
ev.wait(timeout)
self.bus.remove("ovos.common_play.SEI.get.response", handle_m)

self.update_player_proxy(player)
return player

def get_player(self, message: Optional[Message] = None, timeout=1) -> OCPPlayerProxy:
"""get a PlayerProxy object, containing info such as player state and the available stream extractors from OCP
this is tracked per Session, if needed requests the info from the client"""
Expand All @@ -764,27 +800,15 @@ def get_player(self, message: Optional[Message] = None, timeout=1) -> OCPPlayerP
player = OCPPlayerProxy(available_extractors=available_extractors(),
ocp_available=False,
session_id=sess.session_id)
if not self.config.get("legacy"): # force legacy audio in config
ev = threading.Event()

def handle_m(m):
s = SessionManager.get(m)
if s.session_id == player.session_id:
player.available_extractors = m.data["SEI"]
player.ocp_available = True
ev.set()
LOG.info(f"Session: {player.session_id} Available stream extractor plugins: {m.data['SEI']}")

self.bus.on("ovos.common_play.SEI.get.response", handle_m)
message = message or dig_for_message() or Message("") # get message.context to forward
self.bus.emit(message.forward("ovos.common_play.SEI.get"))
ev.wait(timeout)
self.bus.remove("ovos.common_play.SEI.get.response", handle_m)
self.update_player_proxy(player)

return self.ocp_sessions[sess.session_id]
else:
player = self.ocp_sessions[sess.session_id]
if not player.ocp_available and not self.config.get("legacy"):
# OCP might have loaded meanwhile
player = self._player_sync(player, message, timeout)
return player

def normalize_results(self, results: list) -> List[Union[MediaEntry, Playlist, PluginStream]]:
@staticmethod
def normalize_results(results: RawResultsList) -> NormalizedResultsList:
# support Playlist and MediaEntry objects in tracks
for idx, track in enumerate(results):
if isinstance(track, dict):
Expand Down Expand Up @@ -975,29 +999,44 @@ def select_best(results: list, message: Message) -> Union[MediaEntry, Playlist,

##################
# Legacy Audio subsystem API
def legacy_play(self, results: List[Union[MediaEntry, Playlist, PluginStream]], phrase="",
def legacy_play(self, results: NormalizedResultsList, phrase="",
message: Optional[Message] = None):
res = []
for r in results:
player = self.get_player(message)
player.media_state = MediaState.LOADING_MEDIA
playing = False
for idx, r in enumerate(results):
LOG.debug(f"result idx: {idx}")
real_uri = None
if not (r.playback == PlaybackType.AUDIO or r.media_type in OCPQuery.cast2audio):
# we need to filter video results
LOG.debug(f"skipping: {r}")
continue
if isinstance(r, Playlist):
# get internal entries from the playlist
for e in r.entries:
res.append(e.uri)
real_uri = [e.uri for e in r.entries]
elif isinstance(r, MediaEntry):
res.append(r.uri)
real_uri = r.uri
elif isinstance(r, PluginStream):
# for legacy audio service we need to do stream extraction here
res.append(r.extract_uri(video=False))

self.legacy_api.play(res, utterance=phrase, source_message=message)
LOG.debug(f"extracting uri: {r.stream}")
# TODO - apparently it can hang here forever ???
# happens with https://www.cbc.ca/podcasting/includes/hourlynews.xml from news skill
try:
real_uri = r.extract_uri(video=False)
except Exception as e:
LOG.exception(f"extraction failed: {r}")
if not real_uri:
continue
if not playing:
playing = True
LOG.debug(f"do play: {real_uri}")
self.legacy_api.play(real_uri, utterance=phrase, source_message=message)
player.player_state = PlayerState.PLAYING
self.update_player_proxy(player)
else:
LOG.debug(f"queue next: {real_uri}")
self.legacy_api.queue(real_uri, source_message=message)

player = self.get_player(message)
player.player_state = PlayerState.PLAYING
player.media_state = MediaState.LOADING_MEDIA
self.update_player_proxy(player)

def _handle_legacy_audio_stop(self, message: Message):
player = self.get_player(message)
Expand Down

0 comments on commit a85f4b0

Please sign in to comment.