forked from SunoApi/SunoApi
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcookie.py
242 lines (201 loc) · 9.83 KB
/
cookie.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
# -*- coding:utf-8 -*-
import os,json,urllib
import time,threading
from http.cookies import SimpleCookie
from threading import Thread
import requests,random
from utils import COMMON_HEADERS,local_time,get_page_feed
from sqlite import SqliteTool
suno_sqlite = SqliteTool()
class SunoCookie:
def __init__(self):
self.cookie = SimpleCookie()
self.identity = ""
self.session_id = ""
self.token = ""
def load_cookie(self, cookie_str):
self.cookie.load(cookie_str)
def set_cookie(self, cookie_str):
if len(cookie_str) > 500:
for cookie in cookie_str.split('; '):
key, value = cookie.split('=', 1)
try:
value = json.loads(value)
except json.JSONDecodeError:
pass # 如果不是有效的JSON,则保持原始字符串
if key == "__client":
# print("__client: " + value)
value = urllib.parse.quote(value)
# print("__client: " + value)
self.cookie[key] = value
def get_cookie(self):
# print(local_time() + f" ***get_cookie output -> {self.cookie.output()} thread_id: {threading.get_ident()} ***\n")
return ";".join([f"{i}={self.cookie.get(i).value}" for i in self.cookie.keys()])
def set_identity(self, identity):
self.identity = identity
def get_identity(self):
return self.identity
def set_session_id(self, session_id):
self.session_id = session_id
def get_session_id(self):
return self.session_id
def get_token(self):
return self.token
def set_token(self, token: str):
self.token = token
def update_token(suno_cookie: SunoCookie):
headers = {"Cookie": suno_cookie.get_cookie()}
headers.update(COMMON_HEADERS)
session_id = suno_cookie.get_session_id()
identity = suno_cookie.get_identity()
if suno_cookie.get_token() == "401":
return
# print(local_time() + f" ***update_token identity -> {identity} session -> {session_id} headers -> {headers} thread_id: {threading.get_ident()} ***\n")
requests.packages.urllib3.disable_warnings()
resp = requests.post(
# url=f"https://clerk.suno.com/v1/client/sessions/{session_id}/tokens?_clerk_js_version=4.70.5",
# url=f"https://clerk.suno.com/v1/client/sessions/{session_id}/tokens?_clerk_js_version=4.71.4",
# url=f"https://clerk.suno.com/v1/client/sessions/{session_id}/tokens?_clerk_js_version=4.72.0-snapshot.vc141245",
# url=f"https://clerk.suno.com/v1/client/sessions/{session_id}/tokens?_clerk_js_version=4.72.3",
# url=f"https://clerk.suno.com/v1/client/sessions/{session_id}/tokens?_clerk_js_version=4.73.2",
# url=f"https://clerk.suno.com/v1/client/sessions/{session_id}/tokens?_clerk_js_version=4.73.3",
# url=f"https://clerk.suno.com/v1/client/sessions/{session_id}/tokens?_clerk_js_version=5.15.0",
# url=f"https://clerk.suno.com/v1/client/sessions/{session_id}/tokens?_clerk_js_version=5.16.1",
url=f"https://clerk.suno.com/v1/client/sessions/{session_id}/tokens?_clerk_js_version=5.20.0",
headers=headers,
verify=False,
timeout=(3.05, 21)
)
token = ""
if resp.status_code != 200:
print(local_time() + f" ***update_token identity -> {identity} session -> {session_id} status_code -> {resp.status_code} thread_id: {threading.get_ident()} ***\n")
suno_cookie.set_token("401")
else:
resp_headers = dict(resp.headers)
set_cookie = resp_headers.get("Set-Cookie") if resp_headers.get("Set-Cookie") else suno_cookie.get_cookie()
print(local_time() + f" ***update_session identity -> {identity} session -> {session_id} set_cookie -> {set_cookie} ***")
suno_cookie.load_cookie(set_cookie)
token = resp.json().get("jwt") if resp.json().get("jwt") else ""
suno_cookie.set_token(token)
# print(f"*** token -> {token} ***")
result = suno_sqlite.operate_one("update session set updated=(datetime('now', 'localtime')), token=? where identity =?", (token, identity))
if result:
print(local_time() + f" ***update_session identity -> {identity} session -> {session_id} status_code -> {resp.status_code} thread_id: {threading.get_ident()} ***\n")
def page_feed(suno_cookie: SunoCookie):
token = suno_cookie.get_token()
identity = suno_cookie.get_identity()
session_id = suno_cookie.get_session_id()
if token != "":
resp = get_page_feed(0, token)
# print(resp)
# print("\n")
for row in resp:
# print(row)
# print("\n")
if 'id' in row:
result = suno_sqlite.query_one("select aid from music where aid =?", (row["id"],))
print(result)
print("\n")
if result:
result = suno_sqlite.operate_one("update music set data=?, updated=(datetime('now', 'localtime')), sid=?, name=?, image=?, title=?, tags=?, prompt=?, duration=?, status=? where aid =?", (str(row), row["user_id"], row["display_name"], row["image_url"], row["title"], row["metadata"]["tags"], row["metadata"]["gpt_description_prompt"], row["metadata"]["duration"], row["status"], row["id"]))
else:
result = suno_sqlite.operate_one("insert into music (aid, data, sid, name, image, title, tags, prompt,duration, status, private) values(?,?,?,?,?,?,?,?,?,?,?)", (str(row["id"]), str(row), row["user_id"], row["display_name"], row["image_url"], row["title"], row["metadata"]["tags"], row["metadata"]["gpt_description_prompt"], row["metadata"]["duration"], row["status"], 0))
print(result)
print("\n")
status = resp["detail"] if "detail" in resp else row["status"]
if status == "complete":
print(local_time() + f" ***get_page_feed identity -> {identity} session -> {session_id} thread_id: {threading.get_ident()} row -> {row} ***\n")
else:
status = status if "metadata" not in resp else row['metadata']["error_message"]
print(local_time() + f" ***get_page_feed identity -> {identity} session -> {session_id} status -> {status} thread_id: {threading.get_ident()} ***\n")
def keep_alive(suno_cookie: SunoCookie):
while True:
update_token(suno_cookie)
time.sleep(30)
def get_page(suno_cookie: SunoCookie):
while True:
page_feed(suno_cookie)
time.sleep(1800)
def clear_task():
while True:
result = suno_sqlite.delete_record("delete from music where status <> 'complete' and status <> 'streaming'")
if result:
print(local_time() + f" ***clear_task result -> {result} ***\n")
time.sleep(3600)
suno_auths = []
def get_suno_auth():
if len(suno_auths) > 0:
suno_auth = random.choice(suno_auths)
if suno_auth.get_token() == "401":
suno_cookie = SunoCookie()
return suno_cookie
else:
return suno_auth
else:
suno_cookie = SunoCookie()
return suno_cookie
def new_suno_auth(identity, session, cookie):
suno_cookie = SunoCookie()
suno_cookie.set_identity(identity)
suno_cookie.set_session_id(session)
suno_cookie.set_cookie(cookie)
suno_auths.append(suno_cookie)
t = Thread(target=keep_alive, args=(suno_cookie,))
t.start()
t1 = Thread(target=get_page, args=(suno_cookie,))
t1.start()
print(local_time() + f" ***new_suno_auth identity -> {identity} session -> {session} set_cookie -> {cookie} ***\n")
def start_keep_alive():
result = suno_sqlite.query_many("select id,identity,[session],cookie from session where status='200'")
print(result)
print("\n")
if result:
for row in result:
suno_cookie = SunoCookie()
suno_cookie.set_identity(row[1])
suno_cookie.set_session_id(row[2])
suno_cookie.set_cookie(row[3])
suno_auths.append(suno_cookie)
print(local_time() + f" ***start_keep_alive suno_cookie set_identity -> {row[1]} set_session_id -> {row[2]} set_cookie -> {row[3]} ***\n")
t = Thread(target=keep_alive, args=(suno_cookie,))
t.start()
t1 = Thread(target=get_page, args=(suno_cookie,))
t1.start()
print(local_time() + f" ***start_keep_alive suno_auths -> {len(suno_auths)} ***\n")
t2 = Thread(target=clear_task, args=())
t2.start()
def get_random_token():
result = suno_sqlite.query_one("select token from session where token != '' and status='200' order by random()")
# print(result)
# print("\n")
if result:
print(local_time() + f" ***get_random_token -> {result[0]} ***\n")
return result[0]
else:
print(local_time() + f" ***get_random_token -> {result} ***\n")
return ""
def get_clip_token(token_id=None):
result = None
if token_id is None:
result = suno_sqlite.query_one("select id,token from session where token != '' and status='200' order by random()")
else:
result = suno_sqlite.query_one("select id,token from session where token != '' and status='200' and id =?", (token_id,))
# print(result)
# print("\n")
if result:
print(local_time() + f" ***get_clip_token -> {result[0]} {result[1]} ***\n")
return result[0],result[1]
else:
print(local_time() + f" ***get_clip_token -> {result} ***\n")
return 0,""
def get_page_token():
result = suno_sqlite.query_one("select token from session where token != '' and status='200' and page=0 order by random()")
# print(result)
# print("\n")
if result:
print(local_time() + f" ***get_page_token -> {result[0]} ***\n")
return result[0]
else:
print(local_time() + f" ***get_page_token -> {result} ***\n")
return ""
# start_keep_alive()