-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfaas_server.py
90 lines (70 loc) · 2.79 KB
/
faas_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import uuid
import json
import redis
from fastapi import FastAPI
from fastapi import HTTPException
from pydantic import BaseModel
from redis import StrictRedis
from model import TaskInfo
from config import redis_url, redis_port, redis_password, redis_db, redis_topic, redis_fail
# python3 -m uvicorn faas_server:app --reload
class RegisterFn(BaseModel):
name: str
payload: str
class RegisterFnRep(BaseModel):
function_id: uuid.UUID
class ExecuteFnReq(BaseModel):
function_id: uuid.UUID
payload: str
class ExecuteFnRep(BaseModel):
task_id: uuid.UUID
class TaskStatusRep(BaseModel):
task_id: uuid.UUID
status: str
class TaskResultRep(BaseModel):
task_id: uuid.UUID
status: str
result: str
app = FastAPI()
redis_conn = redis.StrictRedis(host=redis_url, port=redis_port, password=redis_password, db=redis_db)
@app.get("/result/{task_id}", response_model=TaskResultRep)
async def result(task_id):
try:
info = json.loads(redis_conn.get(str(task_id)))
return {'task_id' :str(task_id), 'status' : info["status"], 'result' : info["result"]}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to retrieve result from Redis {e}")
@app.get("/status/{task_id}", response_model=TaskStatusRep)
async def status(task_id):
try:
info = json.loads(redis_conn.get(str(task_id)))
return {'task_id' : task_id, 'status' : info["status"]}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to retrieve status from Redis {e}")
@app.post("/register_function", response_model=RegisterFnRep)
async def register(input : RegisterFn):
try:
uuid_key = uuid.uuid4()
obj = json.dumps(input.dict())
key_set_success = redis_conn.set(str(uuid_key), obj)
if key_set_success:
return {"function_id": uuid_key}
else:
raise HTTPException(status_code=500, detail="Failed to register function in redis")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to register function {e}")
@app.post("/execute_function", response_model=ExecuteFnRep)
async def execute(input : ExecuteFnReq):
try:
uuid_key = input.function_id
function = json.loads(redis_conn.get(str(uuid_key)))
task_id = uuid.uuid4()
info = TaskInfo(fn_payload=function['payload'], param_payload=input.payload,
status="QUEUED", result='')
redis_conn.set(str(task_id), json.dumps(info.dict()))
# A copy of task id needed to be processed
redis_conn.sadd(redis_fail,str(task_id))
redis_conn.publish(redis_topic, str(task_id))
return {"task_id": task_id}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to execute function {e}")