-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmedia_source.py
160 lines (125 loc) · 4.99 KB
/
media_source.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
"""Netatmo Media Source Implementation."""
from __future__ import annotations
import datetime as dt
import logging
import re
from homeassistant.components.media_player import BrowseError, MediaClass, MediaType
from homeassistant.components.media_source import (
BrowseMediaSource,
MediaSource,
MediaSourceError,
MediaSourceItem,
PlayMedia,
Unresolvable,
)
from homeassistant.core import HomeAssistant, callback
from .const import DATA_CAMERAS, DATA_EVENTS, DOMAIN, MANUFACTURER
_LOGGER = logging.getLogger(__name__)
MIME_TYPE = "application/x-mpegURL"
class IncompatibleMediaSource(MediaSourceError):
"""Incompatible media source attributes."""
async def async_get_media_source(hass: HomeAssistant) -> NetatmoSource:
"""Set up Netatmo media source."""
return NetatmoSource(hass)
class NetatmoSource(MediaSource):
"""Provide Netatmo camera recordings as media sources."""
name: str = MANUFACTURER
def __init__(self, hass: HomeAssistant) -> None:
"""Initialize Netatmo source."""
super().__init__(DOMAIN)
self.hass = hass
self.events = self.hass.data[DOMAIN][DATA_EVENTS]
async def async_resolve_media(self, item: MediaSourceItem) -> PlayMedia:
"""Resolve media to a url."""
_, camera_id, event_id = async_parse_identifier(item)
url = self.events[camera_id][event_id]["media_url"]
return PlayMedia(url, MIME_TYPE)
async def async_browse_media(self, item: MediaSourceItem) -> BrowseMediaSource:
"""Return media."""
try:
source, camera_id, event_id = async_parse_identifier(item)
except Unresolvable as err:
raise BrowseError(str(err)) from err
return self._browse_media(source, camera_id, event_id)
def _browse_media(
self, source: str, camera_id: str, event_id: int | None
) -> BrowseMediaSource:
"""Browse media."""
if camera_id and camera_id not in self.events:
raise BrowseError("Camera does not exist.")
if event_id and event_id not in self.events[camera_id]:
raise BrowseError("Event does not exist.")
return self._build_item_response(source, camera_id, event_id)
def _build_item_response(
self, source: str, camera_id: str, event_id: int | None = None
) -> BrowseMediaSource:
if event_id and event_id in self.events[camera_id]:
created = dt.datetime.fromtimestamp(
self.events[camera_id][event_id]["event_time"]
)
thumbnail = self.events[camera_id][event_id].get("snapshot", {}).get("url")
message = remove_html_tags(
self.events[camera_id][event_id].get("message", "")
)
title = f"{created} - {message}"
else:
title = self.hass.data[DOMAIN][DATA_CAMERAS].get(camera_id, MANUFACTURER)
thumbnail = None
if event_id:
path = f"{source}/{camera_id}/{event_id}"
else:
path = f"{source}/{camera_id}"
media_class = MediaClass.DIRECTORY if event_id is None else MediaClass.VIDEO
media = BrowseMediaSource(
domain=DOMAIN,
identifier=path,
media_class=media_class,
media_content_type=MediaType.VIDEO,
title=title,
can_play=bool(
event_id and self.events[camera_id][event_id].get("media_url")
),
can_expand=event_id is None,
thumbnail=thumbnail,
)
if not media.can_play and not media.can_expand:
_LOGGER.debug(
"Camera %s with event %s without media url found", camera_id, event_id
)
raise IncompatibleMediaSource
if not media.can_expand:
return media
media.children = []
# Append first level children
if not camera_id:
for cid in self.events:
child = self._build_item_response(source, cid)
if child:
media.children.append(child)
else:
for eid in self.events[camera_id]:
try:
child = self._build_item_response(source, camera_id, eid)
except IncompatibleMediaSource:
continue
if child:
media.children.append(child)
return media
def remove_html_tags(text: str) -> str:
"""Remove html tags from string."""
clean = re.compile("<.*?>")
return re.sub(clean, "", text)
@callback
def async_parse_identifier(
item: MediaSourceItem,
) -> tuple[str, str, int | None]:
"""Parse identifier."""
if not item.identifier or "/" not in item.identifier:
return "events", "", None
source, path = item.identifier.lstrip("/").split("/", 1)
if source != "events":
raise Unresolvable("Unknown source directory.")
if "/" in path:
camera_id, event_id = path.split("/", 1)
return source, camera_id, int(event_id)
return source, path, None