-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
60 lines (52 loc) · 1.58 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import asyncio
import websockets
import cv2
import base64
from camera import Camera
from httpserver import HttpServer
import signal
import json
camera = Camera(0)
httpserver = HttpServer(8088)
clients = set()
async def handle_browser_message(message):
"""Handles messages from the browser."""
data = json.loads(message)
if data.get("action") == "start_recording":
camera.start_recording(data.get("path", "./recordings"))
elif data.get("action") == "stop_recording":
camera.stop_recording()
async def handler(websocket):
clients.add(websocket)
try:
async for message in websocket:
await handle_browser_message(message)
finally:
clients.remove(websocket)
async def send(websocket):
frame = camera.get_frame()
if frame is not None:
ret, encoded = cv2.imencode(".png", frame)
if ret:
base64_frame = base64.b64encode(encoded).decode("ascii")
try:
await websocket.send(json.dumps({"frame": base64_frame}))
except websockets.ConnectionClosed:
pass
async def broadcast():
while True:
for websocket in clients:
await send(websocket)
await asyncio.sleep(0.04)
async def main():
signal.signal(signal.SIGINT, signal.SIG_DFL)
try:
httpserver.start()
camera.start()
async with websockets.serve(handler, "", 8089):
await broadcast()
except (KeyboardInterrupt, asyncio.CancelledError):
httpserver.stop()
camera.stop()
if __name__ == "__main__":
asyncio.run(main())