-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathplex
executable file
·298 lines (224 loc) · 8.05 KB
/
plex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
#!/usr/bin/env python3
import re
import click
from plexapi.exceptions import BadRequest, NotFound
from plexapi.myplex import MyPlexAccount
import keyring
from keyring.errors import PasswordDeleteError
from urllib.parse import unquote
import pyperclip
# UTILS
# Ask User for Credentials
def ask_for_credentials():
username = click.prompt('Enter Plex username')
password = click.prompt('Enter Plex password',
hide_input=True)
# Ask user if 2FA is enabled, if so ask for pin
two_factor = click.confirm('Is 2FA enabled?')
if two_factor:
pin = click.prompt('Enter 2FA code')
else:
pin = None
return username, password, pin
# Copy Link to Clipboard
def copy_to_clipboard(url):
try:
pyperclip.copy(url)
except AttributeError:
raise click.ClickException(
'Clipboard not available, please copy the link manually')
except Exception:
raise click.ClickException(
'Could not copy link to clipboard, please copy the link manually')
# Save User Credentials
def save_credentials(token):
keyring.set_password('plexURLGen', 'token', token)
# Get User Credentials
def get_credentials():
token = keyring.get_password('plexURLGen', 'token')
if token:
return token
else:
return "no_token"
# list of servers
def list_servers(account):
return [{'name': server.name, 'id': server.clientIdentifier}
for server in account.resources()]
# Get Plex Account
def get_account():
# Get credentials
credentials = get_credentials()
# If credentials exist
if credentials != "no_token":
# Get account
try:
account = MyPlexAccount(token=credentials)
return account
except Exception as e:
raise BadRequest('Error connecting to Plex account: {}'.format(e))
else:
raise click.ClickException(
'No credentials found, please authenticate first')
def write_m3u8_file(playlist_title, items):
with open(f"{playlist_title}.m3u8", "w") as f:
f.write("#EXTM3U\n")
for item in items:
f.write(f"#EXTINF:{(item[2] / 1000)}, {item[1]}\n{item[0]}\n")
# PLAYLISTS
def list_playlists(account, server_name):
try:
plex = account.resource(server_name).connect()
return plex.playlists()
except Exception as e:
raise click.ClickException(str(e))
def get_playlist_info(account, server_name, playlist_title):
plex = account.resource(server_name).connect()
playlist = plex.playlist(playlist_title)
item_info = []
for item in playlist.items():
base_url = item._server._baseurl
media_url = item.media[0].parts[0].key
token = item._server._token
url = f"{base_url}{media_url}?X-Plex-Token={token}&download=1"
duration = item.duration
file_name = item.media[0].parts[0].file.split("/")[-1]
item_info.append((url, file_name, duration))
return item_info
def get_download_url(web_url):
if not web_url:
raise click.ClickException('No URL provided')
# unquote url
web_url = unquote(web_url)
# Regex
pattern_server_id = r"(?<=server\/)[^\/]+"
pattern_metadata_key = r"(?<=metadata\/)\d+"
server_id = re.search(pattern_server_id, web_url)
metadata_key = re.search(pattern_metadata_key,
web_url)
if not server_id or not metadata_key:
raise click.ClickException("Invalid URL")
server_id = server_id.group(0)
metadata_key = metadata_key.group(0).replace("\\", "")
# if metadata_key or server_id is empty, raise exception
if not server_id or not metadata_key:
raise click.ClickException("Invalid URL")
# Get account
account = get_account()
# Get servers
servers = list_servers(account)
# find the server using a generator expression and the `next` function
server = next(
(server for server in servers if server['id'] == server_id), None)
if not server:
raise BadRequest('Server not found')
# get media and construct the download URL
plex = account.resource(server['name']).connect()
try:
media = plex.fetchItem(int(metadata_key))
except NotFound:
raise click.ClickException('Media not found')
base_url = media._server._baseurl
media_url = media.media[0].parts[0].key
token = media._server._token
url = f"{base_url}{media_url}?X-Plex-Token={token}&download=1"
return url
@click.group()
def plex():
pass
@click.command(name='auth')
@click.option('--username', prompt=False, help='Plex username')
@click.option('--password', prompt=False, hide_input=True, help='Plex password')
@click.option('--pin', prompt=False, help='2FA code')
def authenticate_cli(username, password, pin):
try:
if get_credentials() != "no_token":
click.echo('Already authenticated')
return
# Prompt for credentials if not provided
if not all([username, password]):
username, password, pin = ask_for_credentials()
# Authenticate user
account = MyPlexAccount(username, password, code=pin)
# Get token
token = account.authenticationToken
# Save token
save_credentials(token)
click.echo('Authentication successful')
except Exception as e:
click.echo('Error: {}'.format(e))
@click.command(name='download')
@click.argument('query')
def download_media_cli(query):
try:
url = get_download_url(query)
click.echo(url)
copy_to_clipboard(url)
except Exception as e:
click.echo('Error: {}'.format(e))
@click.command(name='signout')
def signout_cli():
try:
keyring.delete_password('plexURLGen', 'token')
click.echo('Signed out successfully')
except PasswordDeleteError:
click.echo('No credentials found')
@click.command(name='playlist')
@click.option('--m3u', is_flag=True, help='Save as m3u playlist')
def playlist_cli(m3u):
account = get_account()
# List servers
servers = list_servers(account)
click.echo("Choose a server:")
for index, server in enumerate(servers, start=1):
click.echo(f"{index}. {server['name']}")
server_index = click.prompt(
"Enter server number", type=int) - 1
try:
server_name = servers[server_index]['name']
except IndexError:
raise click.ClickException('Invalid server number')
click.echo("Choose a playlist:")
playlists = list_playlists(account, server_name)
# If no playlists, exit
if not playlists:
click.echo("No playlists found")
return
for index, playlist in enumerate(playlists, start=1):
click.echo(f"{index}. {playlist.title}")
playlist_index = click.prompt(
"Enter playlist number", type=int) - 1
try:
playlist_title = playlists[playlist_index].title
except IndexError:
click.echo("Invalid playlist number")
return
media_info = get_playlist_info(account, server_name, playlist_title)
if m3u:
write_m3u8_file(playlist_title, media_info)
click.echo(f"Playlist saved as {playlist_title}.m3u8")
return
# display media Titles
click.echo("0. Display all links")
for index, item in enumerate(media_info, start=1):
click.echo(f"{index}. {item[1]}")
# prompt user for media selection,
media_index = click.prompt(
"Enter media number(s) to download (separated by comma)", type=str)
# if the user enters 0, display all links
if (media_index == "0"):
for item in media_info:
click.echo(f"{item[0]}")
return
selected_media = [int(index) - 1 for index in media_index.split(',')]
for index in selected_media:
try:
copy_to_clipboard(media_info[index][0])
click.echo(f"{media_info[index][0]}")
except IndexError:
click.echo(f"Invalid media number {index}")
plex.add_command(authenticate_cli)
plex.add_command(download_media_cli)
plex.add_command(signout_cli)
plex.add_command(playlist_cli)
if __name__ == '__main__':
plex()