-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
213 lines (156 loc) · 6.53 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
import uuid
from datetime import datetime
import uvicorn
from fastapi import FastAPI, Request, Response
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from passlib.context import CryptContext
import jwt
from pydantic import BaseModel
from starlette.responses import JSONResponse
import docker
from docker.errors import ContainerError
from db.db import collection, coderunner
app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")
# @app.get("/checkdb", response_class=HTMLResponse)
# async def db(request: Request):
# users = collection.find()
# return templates.TemplateResponse("testdb.html", {"request": request, "users": users})
@app.get("/", response_class=HTMLResponse)
async def home_get(request: Request): # Provide the initial value of the code variable
return templates.TemplateResponse("index.html", {"request": request})
# Route for handling POST request to the home page
@app.post("/", response_class=HTMLResponse)
async def home_get(request: Request):
token = request.cookies.get("token")
username = None
if token:
try:
decoded_token = jwt.decode(token, JWT_SECRET_KEY, algorithms=[JWT_ALGORITHM])
username = decoded_token.get("username")
print(username)
except jwt.exceptions.DecodeError:
pass
return templates.TemplateResponse("index.html", {"request": request, "username": username})
# JWT configuration
JWT_SECRET_KEY = "%UO;tBrhxoU|[')" # Replace with your secret key
JWT_ALGORITHM = "HS256"
JWT_EXPIRATION_TIME_MINUTES = 30
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
@app.get("/login", response_class=HTMLResponse)
async def login_form(request: Request):
return templates.TemplateResponse("login.html", {"request": request})
@app.post("/login", response_class=RedirectResponse)
async def login_submit(request: Request):
form = await request.form()
email = form.get("email")
password = form.get("password")
# Verify user authentication
user = collection.find_one({"email": email})
if user and verify_password(password, user["hashed_password"]):
# Generate JWT token
token = generate_jwt_token(email)
return RedirectResponse(url="/", headers={"Set-Cookie": f"token={token}; HttpOnly"}, status_code=307)
else:
return RedirectResponse(url="/login?error=1") # Redirect back to the login page with an error parameter
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def generate_jwt_token(email: str) -> str:
username = email.split("@")[0] # Extract the username from the email
payload = {"email": email, "username": username}
token = jwt.encode(payload, JWT_SECRET_KEY, algorithm=JWT_ALGORITHM)
return token
def verify_jwt_token(token: str) -> bool:
try:
jwt.decode(token, JWT_SECRET_KEY, algorithms=[JWT_ALGORITHM])
return True
except:
return False
@app.get("/signup", response_class=HTMLResponse)
async def signup_form(request: Request):
return templates.TemplateResponse("signup.html", {"request": request})
@app.post("/signup", response_class=RedirectResponse)
async def signup_submit(request: Request):
form = await request.form()
email = form.get("email")
password = form.get("password")
# Check if the email already exists
existing_user = collection.find_one({"email": email})
if existing_user:
return RedirectResponse(url="/signup?error=1") # Redirect back to the signup page with an error parameter
# Hash the password
hashed_password = hash_password(password)
# Save the user details in the database
user = {"email": email, "hashed_password": hashed_password}
collection.insert_one(user)
# Generate JWT token
token = generate_jwt_token(email)
return RedirectResponse(url="/", headers={"Set-Cookie": f"token={token}; HttpOnly"}, status_code=307)
def hash_password(password: str) -> str:
return pwd_context.hash(password)
@app.get("/logout", response_class=RedirectResponse)
async def logout_get(request: Request):
response = RedirectResponse(url="/")
response.delete_cookie("token") # Remove the token cookie
return response
def get_user_id(email: str) -> str:
user = collection.find_one({"email": email}) # Retrieve the user document from the database
if user:
user_id = str(user["_id"]) # Retrieve the user ID from the user document
print(user_id)
return user_id
else:
raise ValueError("User not found in the database")
class CodeRequest(BaseModel):
code: str
@app.post("/submit-code")
async def submit_code(request: Request, code_request: CodeRequest):
code = code_request.code
token = request.cookies.get("token")
if token:
try:
decoded_token = jwt.decode(token, JWT_SECRET_KEY, algorithms=[JWT_ALGORITHM])
email = decoded_token.get("email")
user_id = get_user_id(email)
except (jwt.exceptions.DecodeError, ValueError):
return JSONResponse({"error": "Invalid token"})
else:
return JSONResponse({"error": "Token not found"})
submission = {
"submission_id": generate_submission_id(),
"user_id": user_id,
"code": code,
"output": "",
"timestamp": datetime.now()
}
coderunner.insert_one(submission)
# Execute the code in a sandbox environment
execution_result = execute_code_in_container(code)
# Update the submission with the execution result
coderunner.update_one(
{"submission_id": submission["submission_id"]},
{"$set": {"output": execution_result}}
)
return JSONResponse({"output": execution_result})
def generate_submission_id():
return str(uuid.uuid4())
def execute_code_in_container(code: str) -> str:
# Docker image name
image_name = "sha256:4f1757fa5d3c15bfb1811a676ca3994944f50785394697cdff9912a9cf75d081"
docker_client = docker.from_env()
try:
# Run the Docker container with the code as a command
container = docker_client.containers.run(
image_name,
command=["python", "-c", code],
remove=True
)
output = container.decode("utf-8").strip()
except ContainerError as e:
output = str(e)
return output
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", reload=True)