generated from SteamDeckHomebrew/Plugin-Template
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathmain.py
executable file
·369 lines (358 loc) · 16 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
import time
from http.server import ThreadingHTTPServer
from threading import Thread
from http.server import BaseHTTPRequestHandler
from urllib.parse import parse_qs
from urllib.parse import urlencode
from random import choices
import string
import json
from hashlib import sha256
from base64 import b64encode
import socket
import pickle
from os.path import expanduser, dirname
from os import makedirs, remove
SPDCK_HOST_NAME = "0.0.0.0"
SPDCK_PORT_NUMBER = 49983
SPDCK_TOKEN_FILE = expanduser("/home/deck/.config/spdck/token.pickle")
spdck_access_code_cache = {}
spdck_client_id_cache = {}
def Spdck_get_ip():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(0)
try:
# doesn't even have to be reachable
s.connect(('10.255.255.255', 1))
IP = s.getsockname()[0]
except Exception:
IP = '127.0.0.1'
finally:
s.close()
return IP
class Spdck_AccessServerHandler(BaseHTTPRequestHandler):
def do_OPTIONS(self):
self.send_response(200, "ok")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Methods", "GET, OPTIONS")
self.send_header("Access-Control-Allow-Headers", "X-SPDCK-ACCESS-KEY, Content-Type")
self.end_headers()
return
def do_GET(self):
global spdck_access_code_cache
if (self.path == "/redirect"):
if (self.__access_protection_code in spdck_client_id_cache):
self.send_response(302)
self.send_header("Location", "https://accounts.spotify.com/authorize?" + urlencode({
"response_type": "code",
"client_id": spdck_client_id_cache[self.__access_protection_code],
"scope": "user-modify-playback-state user-read-playback-state user-read-currently-playing",
"redirect_uri": "http://localhost:" + str(SPDCK_PORT_NUMBER) + "/callback",
"state": self.__state,
"code_challenge_method": "S256",
"code_challenge": b64encode(sha256(self.__code_challenge.encode()).digest()).decode().replace("=", "").replace("+", "-").replace("/", "_"),
"show_dialog": "false"
}))
self.end_headers()
else:
self.respond("""
<html>
<body>
<h1>Client ID required</h1>
To log in to spotify, a client id is required.<br>
Please <a href="/">click here</a> to try again.
</body>
</html>
"""
, 200, "text/html")
elif (self.path == "/heartbeat"):
self.respond({"status": "ok"})
elif (self.path.startswith("/callback")):
if ("?" not in self.path):
self.respond(
"""
<html>
<body>
<h1>Missing parameters</h1>
Callback was called with missing parameters.<br>
Please <a href="/">click here</a> to try again.
</body>
</html>
"""
, 200, "text/html")
else:
query = parse_qs(self.path.split("?")[1])
if (query["state"][0] != self.__state):
self.respond(
"""
<html>
<body>
<h1>Invalid state</h1>
Callback was called with a state parameter that doesn't match what was expected.<br>
Please <a href="/">click here</a> to try again.
</body>
</html>
"""
, 200, "text/html")
return
if ("error" in query and query["error"][0] != ""):
self.respond(
"""
<html>
<body>
<h1>Error while authorising Spotify</h1>
""" + query["error"][0] + """<br>
Please <a href="/">click here</a> to try again.
</body>
</html>
"""
, 200, "text/html")
return
spdck_access_code_cache[self.__access_protection_code] = query["code"][0]
self.respond(
"""
<html>
<body>
<h1>Spotify authenticated!</h1>
Return back to the quick access panel!<br>
This tab will close in 5 seconds...
<script type='text/javascript'>
setTimeout(window.close, 5000);
</script>
</body>
</html>
"""
, 200, "text/html")
elif (self.path == "/access_code"):
if (self.__access_protection_code != None and self.__access_protection_code != self.headers["X-SPDCK-ACCESS-KEY"]):
self.respond({
"error": "invalid_access_key"
})
return
try:
if (spdck_access_code_cache[self.__access_protection_code] is None):
self.respond({})
else:
self.respond({
"access_code": spdck_access_code_cache[self.__access_protection_code],
"code_challenge": self.__code_challenge,
"client_id": spdck_client_id_cache[self.__access_protection_code]
})
except AttributeError as e:
self.respond(repr(e))
except Exception as e:
self.respond(repr(e))
elif (self.path.startswith("/setclientid")):
if (self.__access_protection_code in spdck_client_id_cache):
self.respond("""
<html>
<body>
<h1>Client ID already set!</h1>
Return back to the steamdeck and finish authorisation!<br>
This tab will close in 5 seconds...
<script type='text/javascript'>
setTimeout(window.close, 5000);
</script>
</body>
</html>
"""
, 200, "text/html")
elif ("?" not in self.path):
self.respond(
"""
<html>
<body>
<h1>Missing parameters</h1>
SetClientId was called with missing parameters.<br>
Please <a href="/">click here</a> to try again.
</body>
</html>
"""
, 200, "text/html")
else:
query = parse_qs(self.path.split("?")[1])
if ("clientid" not in query or query["clientid"][0] == ""):
self.respond(
"""
<html>
<body>
<h1>Missing parameters</h1>
SetClientId was called with missing parameters.<br>
Please <a href="/">click here</a> to try again.
</body>
</html>
"""
, 200, "text/html")
return
spdck_client_id_cache[self.__access_protection_code] = query["clientid"][0]
self.respond("""
<html>
<body>
<h1>Client ID set!</h1>
Return back to the steamdeck and finish authorisation!<br>
You can now close this tab.
<script type='text/javascript'>
setTimeout(window.close, 5000);
</script>
</body>
</html>
"""
, 200, "text/html")
elif (self.path == "/"):
if (self.__access_protection_code in spdck_client_id_cache):
client_id_val = spdck_client_id_cache[self.__access_protection_code]
else:
client_id_val = ""
client_id_form_text = ""
if (client_id_val == ""):
client_id_form_text = """
<form method="GET" action="/setclientid">
<button onClick="window.location.reload();">Refresh Page</button><br>
<br>
<input type="text" name="clientid" placeholder="Client ID" required pattern="[0-9a-f]{"\{32\}"}">
<input type="submit" value="Set token">
</form>
"""
else:
client_id_form_text = """
<br>
<a href="/redirect">Finish login</a>
"""
self.respond(
"""
<html>
<head>
<title>SPDCK</title>
<style type="text/css">
body {
font-family: sans-serif;
font-size: 1em;
background-color: #fafafa;
color: #000;
}
</style>
</head>
<body>
<h1>SPDCK</h1>
<h2>Authorisation</h2>
<p>
The next steps are best performed on a computer.<br>
If you already use a computer browser, or wish to continue on the Steamdeck, follow the steps below.<br>
<br>
Otherwise connect to the same network as this Steamdeck with your computer and browse to the following address:<br>
<code>
http://""" + Spdck_get_ip() + ":" + str(self.server.server_port or 80) + """/
</code>
</p>
<p>
<b>Step 1:</b> <a href="https://developer.spotify.com/dashboard/login" target="_blank">Click here</a> to go to the Spotify Developer website and create an application.<br>
<b>Step 2:</b> Once the application is created, click on <b>USERS AND ACCESS</b> at the top of the page.<br>
<b>Step 3:</b> Click on <b>ADD NEW USER</b>, add your Spotify Account E-Mail and a name.<br>
<b>Step 4:</b> Click on <b>EDIT SETTINGS</b> and add the following redirect URI: <code>http://localhost:49983/callback</code><br>
<b>Step 5:</b> Copy the client id above <b>Users and Access</b> and paste it into the text field below.<br>
<b>Step 6:</b> Click on <b>Set token</b> below to initiate spotify authorisation.<br>
<b>Step 7:</b> Return back to your Steamdeck, refresh the page and finish authorisation.<br>
""" + client_id_form_text + """
</p>
</body>
</html>
"""
, 200, "text/html")
else:
self.respond({
"error": "invalid_route"
}, 404)
def respond(self, content = {}, status = 200, contentType = "application/json"):
self.send_response(status)
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Content-type", contentType)
self.end_headers()
if (contentType == "application/json"):
self.wfile.write(json.dumps(content).encode("utf-8"))
else:
self.wfile.write(content.encode("utf-8"))
return
def set_access_protection_key(self, key = None):
if (key == None):
return
self.__access_protection_code = key
def set_state(self, state: string):
if (state == None):
return
self.__state = state
def set_challenge(self, challenge: string):
if (challenge == None):
return
self.__code_challenge = challenge
def Spdck_createHandler(access_protection_key: string):
handler = Spdck_AccessServerHandler
spdck_access_code_cache[access_protection_key] = None
handler.set_access_protection_key(handler, access_protection_key)
handler.set_state(handler, "".join(choices(string.ascii_letters + string.digits, k=16)))
handler.set_challenge(handler, "".join(choices(string.ascii_letters + string.digits + "_.-", k=128)))
return handler
class Spdck_ThreadedServer(Thread):
__thread = None
__server = None
__serverHandler = None
def run(self, access_protection_key = ""):
self.__serverHandler = Spdck_createHandler(access_protection_key)
self.__server = ThreadingHTTPServer((SPDCK_HOST_NAME, SPDCK_PORT_NUMBER), self.__serverHandler)
self.__thread = Thread(target=self.__server.serve_forever)
self.__thread.start()
def shutdown(self):
self.__server.shutdown()
self.__server.server_close()
class Plugin:
__spdck_token = None
# A normal method. It can be called from JavaScript using call_plugin_function("method_1", argument1, argument2)
async def start_access_server(self, accessProtectionToken ):
if (self.__active_server != None):
self.__active_server.shutdown()
self.__active_server = Spdck_ThreadedServer()
self.__active_server.run(access_protection_key = accessProtectionToken)
return (time.asctime(), "Spdck Login Server UP - %s:%s" % (SPDCK_HOST_NAME, SPDCK_PORT_NUMBER))
# A normal method. It can be called from JavaScript using call_plugin_function("method_2", argument1, argument2)
async def stop_access_server(self, *args):
if (self.__active_server == None):
return False
self.__active_server.shutdown()
self.__active_server = None
return (time.asctime(), "Spdck Login Server DOWN - %s:%s" % (SPDCK_HOST_NAME, SPDCK_PORT_NUMBER))
async def store_token(self, refresh_token = "", client_id = ""):
if (refresh_token == "" or client_id == ""):
return False
self.__spdck_token = refresh_token
self.__client_id = client_id
makedirs(dirname(SPDCK_TOKEN_FILE), exist_ok=True)
pickle.dump({
"token": self.__spdck_token,
"client_id": self.__client_id
}, open(SPDCK_TOKEN_FILE, "wb"))
return True
async def load_token(self):
if (self.__spdck_token != None and self.__client_id != None):
return {
"token": self.__spdck_token,
"client_id": self.__client_id
}
try:
unpickled = pickle.load(open(SPDCK_TOKEN_FILE, "rb"))
self.__spdck_token = unpickled["token"]
self.__client_id = unpickled["client_id"]
return unpickled
except Exception as e:
return {
"error": str(e)
}
async def remove_token(self):
self.__spdck_token = None
self.__client_id = None
try:
remove(SPDCK_TOKEN_FILE)
return True
except:
return False
# Asyncio-compatible long-running code, executed in a task when the plugin is loaded
async def _main(self):
self.__active_server = None