-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathmysql_websocket_server.py
executable file
·53 lines (46 loc) · 1.54 KB
/
mysql_websocket_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#!/usr/bin/python3 -u
import time # Used for tracking query time taken
import json
import asyncio
import http
import websockets
from urllib.parse import unquote
import mysql.connector
import datetime
def create_db_connection():
return mysql.connector.connect(
host="localhost",
user="qonqr_ro",
password="readonly",
database="qonqr"
)
def query(db, query):
s = time.time()
try:
cur = db.cursor(dictionary=True)
cur.execute(query)
results = cur.fetchall()
for r in results:
for k, v in r.items():
if v and type(v) in [datetime.datetime, datetime.date]:
r[k] = str(v)
except Exception as e:
results = {"error": str(e)}
print(f"""Query: {query}. Query completed in {time.time() - s}s, {len(results)} results""")
return {"results": results}
async def non_ws(path, request_headers):
if path != "/websocket":
message = unquote(path).strip("/")
db = create_db_connection()
result = query(db, message)
return http.HTTPStatus.OK, [("Content-Type", "application/json")], json.dumps(result).encode("utf-8")
async def handle_request(websocket, path):
db = create_db_connection()
async for message in websocket:
result = query(db, message)
await websocket.send(json.dumps(result))
start_server = websockets.serve(
handle_request, "localhost", 8082, process_request=non_ws
)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()