Skip to content

Commit

Permalink
WIP: make server async
Browse files Browse the repository at this point in the history
  • Loading branch information
0xkarmacoma committed Oct 9, 2024
1 parent 5884292 commit 3c1870c
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 55 deletions.
18 changes: 15 additions & 3 deletions src/jsi/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,10 +260,22 @@ def main(args: list[str] | None = None) -> int:
logger.enable(console=stderr, level=LogLevel.DEBUG)

if config.daemon:
from jsi.server import Server
import asyncio

import daemon

from jsi.server import STDERR_PATH, STDOUT_PATH, Server

async def run_server():
server = Server(config)
await server.start()

stdout_file = open(STDOUT_PATH, "w+") # noqa: SIM115
stderr_file = open(STDERR_PATH, "w+") # noqa: SIM115

with daemon.DaemonContext(stdout=stdout_file, stderr=stderr_file):
asyncio.run(run_server())

server = Server(config)
server.start(detach_process=True)
return 0

with timer("load_config"):
Expand Down
143 changes: 91 additions & 52 deletions src/jsi/server.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import asyncio
import contextlib
import os
import signal
import socket
Expand All @@ -19,7 +21,6 @@
set_input_output,
)
from jsi.utils import pid_exists
import contextlib

SERVER_HOME = os.path.expanduser("~/.jsi/daemon")
SOCKET_PATH = os.path.join(SERVER_HOME, "server.sock")
Expand Down Expand Up @@ -58,27 +59,26 @@ def result(self) -> str:
class PIDFile:
def __init__(self, path: str):
self.path = path
self.pid = os.getpid()

def __enter__(self):
if os.path.exists(self.path):
print(f"pid file already exists: {self.path}")

try:
with open(self.path) as fd:
print(f"pid file already exists: {self.path}")
other_pid = fd.read()

if pid_exists(int(other_pid)):
print(f"killing existing daemon ({other_pid=})")
os.kill(int(other_pid), signal.SIGKILL)
except FileNotFoundError:
# pid file doesn't exist, we're good to go
pass

# the file may have been removed on termination by another instance
with contextlib.suppress(FileNotFoundError):
os.remove(self.path)

pid = os.getpid()
print(f"creating pid file: {self.path} ({pid=})")
# overwrite the file if it already exists
with open(self.path, "w") as fd:
fd.write(str(pid))
fd.write(str(self.pid))

print(f"created pid file: {self.path} ({self.pid=})")
return self.path

def __exit__(self, exc_type, exc_value, traceback):
Expand All @@ -99,7 +99,37 @@ def __init__(self, config: Config):
self.solver_definitions = load_definitions(config)
self.available_solvers = find_available_solvers(self.solver_definitions, config)

def solve(self, file: str) -> str:
async def start(self):
server = await asyncio.start_unix_server(
self.handle_client, path=SOCKET_PATH
)

async with server:
await server.serve_forever()

async def handle_client(
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
):
try:
data: bytes = await reader.read(1024)
if data:
message: str = data.decode()
result = await self.solve(message)
writer.write(result.encode())
await writer.drain()
except Exception as e:
print(f"Error handling client: {e}")
finally:
writer.close()
await writer.wait_closed()

async def solve(self, file: str) -> str:
# Assuming solve is CPU-bound, we use run_in_executor
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, self.sync_solve, file)
return result

def sync_solve(self, file: str) -> str:
# initialize the controller
task = Task(name=str(file))

Expand All @@ -123,46 +153,55 @@ def solve(self, file: str) -> str:

return listener.result


def start(self, detach_process: bool | None = None):
if not os.path.exists(SERVER_HOME):
print(f"creating server home: {SERVER_HOME}")
os.makedirs(SERVER_HOME)

stdout_file = open(STDOUT_PATH, "w+") # noqa: SIM115
stderr_file = open(STDERR_PATH, "w+") # noqa: SIM115

print(f"daemonizing... (`tail -f {STDOUT_PATH[:-4]}.{{err,out}}` to view logs)")
with daemon.DaemonContext(
stdout=stdout_file,
stderr=stderr_file,
detach_process=detach_process,
pidfile=PIDFile(PID_PATH),
):
if os.path.exists(SOCKET_PATH):
print(f"removing existing socket: {SOCKET_PATH}")
os.remove(SOCKET_PATH)

print(f"binding socket: {SOCKET_PATH}")
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as server:
server.bind(SOCKET_PATH)
server.listen(1)

while True:
try:
conn, _ = server.accept()
with conn:
try:
data = conn.recv(CONN_BUFFER_SIZE).decode()
if not data:
continue
conn.sendall(self.solve(data).encode())
except ConnectionError as e:
print(f"connection error: {e}")
except SystemExit as e:
print(f"system exit: {e}")
return e.code
# def start(self, detach_process: bool | None = None):
# if not os.path.exists(SERVER_HOME):
# print(f"creating server home: {SERVER_HOME}")
# os.makedirs(SERVER_HOME)

# stdout_file = open(STDOUT_PATH, "w+") # noqa: SIM115
# stderr_file = open(STDERR_PATH, "w+") # noqa: SIM115

# print(f"daemonizing... (`tail -f {STDOUT_PATH[:-4]}.{{err,out}}` to view logs)")
# with daemon.DaemonContext(
# stdout=stdout_file,
# stderr=stderr_file,
# detach_process=detach_process,
# pidfile=PIDFile(PID_PATH),
# ):
# if os.path.exists(SOCKET_PATH):
# print(f"removing existing socket: {SOCKET_PATH}")
# os.remove(SOCKET_PATH)

# print(f"binding socket: {SOCKET_PATH}")
# with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as server:
# server.bind(SOCKET_PATH)
# server.listen(1)

# while True:
# try:
# conn, _ = server.accept()
# with conn:
# try:
# data = conn.recv(CONN_BUFFER_SIZE).decode()
# if not data:
# continue
# print(f"solving: {data}")
# conn.sendall(self.solve(data).encode())
# except ConnectionError as e:
# print(f"connection error: {e}")
# except SystemExit as e:
# print(f"system exit: {e}")
# return e.code


if __name__ == "__main__":
Server(Config()).start()

async def run_server():
server = Server(Config())
await server.start()

stdout_file = open(STDOUT_PATH, "w+") # noqa: SIM115
stderr_file = open(STDERR_PATH, "w+") # noqa: SIM115

with daemon.DaemonContext(stdout=stdout_file, stderr=stderr_file):
asyncio.run(run_server())

0 comments on commit 3c1870c

Please sign in to comment.