forked from edwin170/downr1n
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwikiproxy.py
181 lines (141 loc) · 5.13 KB
/
wikiproxy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
#!/usr/bin/env python3
from fastapi import FastAPI
from datetime import datetime
import aiohttp
import asyncio
import re
import time
import ujson
import wikitextparser as wtp
import uvicorn
DEVICE_REGEX = re.compile(r'(iPhone|AppleTV|iPad|iPod)[0-9]+,[0-9]+')
# Only allow 100 simultaneous HTTP requests
HTTP_SEMAPHORE = asyncio.Semaphore(100)
async def get_key_page(
session: aiohttp.ClientSession, identifier: str, buildid: str
) -> str:
params = {
'action': 'query',
'list': 'search',
'srsearch': f'Keys: {buildid} ({identifier})',
'srwhat': 'title',
'srlimit': '1',
'format': 'json',
'srnamespace': '2304',
}
async with HTTP_SEMAPHORE, session.get(
'https://theapplewiki.com/api.php', params=params
) as resp:
if resp.status != 200:
pass # raise error
else:
search = await resp.json()
if search['query']['searchinfo']['totalhits'] == 0:
raise ValueError(
f'No Firmware Keys page for device: {identifier}, buildid: {buildid}.'
)
params = {
'action': 'parse',
'prop': 'wikitext',
'page': search['query']['search'][0]['title'],
'format': 'json',
'formatversion': 2,
}
async with HTTP_SEMAPHORE, session.get(
'https://theapplewiki.com/api.php', params=params
) as resp:
if resp.status != 200:
pass # raise error
data = await resp.json()
return data['parse']['wikitext']
def parse_page(data: str, identifer: str, boardconfig: str = None) -> dict:
# Have to coerce wikitextparser into recognizing it as a table for easy parsing
data = (
' '.join([x for x in data.split(' ') if x != ''])
.replace('{{', '{| class="wikitable"')
.replace('}}', '|}')
)
page = wtp.parse(data)
page_data = {}
for entry in page.tables[0].data()[0]:
key, item = entry.split(' = ')
page_data[key] = item
if boardconfig is not None:
if ('Model' not in page_data.keys()) and ('Model2' not in page_data.keys()):
return page_data
if boardconfig.lower() not in [x.lower() for x in page_data.values()]:
raise ValueError(
f'Boardconfig: {boardconfig} for device: {identifer} is not valid!'
)
if page_data['Model2'].lower() == boardconfig.lower():
for key in page_data:
if '2' in key:
page_data[key.replace('2', '')] = page_data[key]
for key in list(page_data.keys()):
if '2' in key:
del page_data[key]
response = {
'identifier': page_data['Device'],
'buildid': page_data['Build'],
'codename': page_data['Codename'],
'restoreramdiskexists': 'RestoreRamdisk' in page_data,
'updateramdiskexists': 'UpdateRamdisk' in page_data,
'keys': [],
}
for component in page_data:
if component in (
'Version',
'Build',
'Device',
'Model',
'Codename',
'Baseband',
'DownloadURL',
):
continue
if any(component.endswith(x) for x in ('Key', 'IV', 'KBAG')):
continue
image = {
'image': component,
'filename': page_data[component],
'date': datetime.now().isoformat(),
}
if any(component == x for x in ('RootFS', 'RestoreRamdisk', 'UpdateRamdisk')):
image['filename'] += '.dmg'
for key in ('IV', 'Key') if component != 'RootFS' else ('Key',):
if component + key not in page_data.keys():
continue
if all(
x not in page_data[component + key]
for x in ('Unknown', 'Not Encrypted')
):
image[key.lower()] = page_data[component + key]
if (
('iv' not in image.keys())
and ('key' not in image.keys())
and not image['filename'].endswith('.dmg')
):
continue
if 'iv' in image and 'key' in image:
image['kbag'] = image['iv'] + image['key']
response['keys'].append(image)
return response
app = FastAPI()
@app.middleware('http')
async def add_process_time_header(request, call_next):
start_time = time.time()
response = await call_next(request)
response.headers['X-Process-Time'] = str(f'{time.time() - start_time:0.4f} sec')
return response
@app.get('/firmware/{identifier}/{buildid}')
async def get_firmware_keys(identifier: str, buildid: str) -> dict:
async with aiohttp.ClientSession() as session:
page = await get_key_page(session, identifier, buildid)
return parse_page(page, identifier)
@app.get('/firmware/{identifier}/{boardconfig}/{buildid}')
async def get_firmware_keys(identifier: str, boardconfig: str, buildid: str) -> dict:
async with aiohttp.ClientSession() as session:
page = await get_key_page(session, identifier, buildid)
return parse_page(page, identifier, boardconfig)
if __name__ == '__main__':
uvicorn.run(app='__main__:app', host='127.0.0.1', port='8888')