-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdatabasemigration.py
91 lines (77 loc) · 2.3 KB
/
databasemigration.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import sqlite3
from pydantic import BaseModel
class oldquantumkatdb:
class authenticated_servers(BaseModel):
id: int
server_id: int
server_name: str
authenticated_by_id: int
authenticated_by_name: str
is_authenticated: int
class chat(BaseModel):
id: int
user_id: int
user_name: str
server_id: int
server_name: str
user_message: str
assistant_message: str
shared_chat: int
class newquantumkatdb:
class chat(BaseModel):
id: int
user_id: int
server_id: int
user_message: str
assistant_message: str
shared_chat: int
class users(BaseModel):
user_id: int
user_name: str
agreed_to_tos: int
is_banned: int
class servers(BaseModel):
server_id: int
server_name: str
is_authorized: int
is_banned: int
def read_db(db: sqlite3.Connection, table_name: str):
cursor = db.cursor()
cursor.execute(f"SELECT * FROM {table_name}")
return cursor.fetchall()
old_db = sqlite3.connect("./../oldquantumkat.db")
new_db = sqlite3.connect("./../quantumkat.db")
old_db_tables = ["chat"]
new_db_tables = ["chat", "users", "servers"]
for table in old_db_tables:
old_data = read_db(old_db, table)
new_data = []
for row in old_data:
if table == "authenticated_servers":
new_data.append(
newquantumkatdb.servers(
server_id=row[1],
server_name=row[2],
is_authorized=row[5],
is_banned=0,
)
)
elif table == "chat":
new_data.append(
newquantumkatdb.chat(
id=row[0],
user_id=row[1],
server_id=row[3],
user_message=row[5],
assistant_message=row[6],
shared_chat=row[7],
)
)
cursor = new_db.cursor()
cursor.execute(f"DELETE FROM {table}")
for row in new_data:
cursor.execute(
f"INSERT INTO {table} ({','.join(row.dict().keys())}) VALUES ({','.join(['?'] * len(row.dict().keys()))})",
list(row.dict().values()),
)
new_db.commit()