Skip to content

Commit

Permalink
better daemonize + pidfile + rust client
Browse files Browse the repository at this point in the history
  • Loading branch information
0xkarmacoma committed Oct 22, 2024
1 parent 3c1870c commit e6a3151
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 74 deletions.
12 changes: 12 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,15 @@ cython_debug/

# ignore all .out files generated by jsi runs
**/*.out

# Generated by Cargo
# will have compiled files and executables
**/debug/
**/target/

# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries
# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html
Cargo.lock

# These are backup files generated by rustfmt
**/*.rs.bk
9 changes: 9 additions & 0 deletions jsi-client-rs/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[package]
name = "jsif"
version = "0.1.0"
edition = "2021"
description = "just solve it fast - rust client for the jsi daemon"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
49 changes: 49 additions & 0 deletions jsi-client-rs/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use std::env;
use std::io::{Read, Write};
use std::os::unix::net::UnixStream;
use std::path::PathBuf;
use std::process;
use std::time::Instant;

fn get_server_home() -> Option<PathBuf> {
env::var_os("HOME").map(|home| {
let mut path = PathBuf::from(home);
path.push(".jsi");
path.push("daemon");
path
})
}

fn send_command(command: &str) -> Result<(), Box<dyn std::error::Error>> {
let socket_path = get_server_home().unwrap().join("server.sock");
let mut stream = UnixStream::connect(socket_path)?;

// Send the command
stream.write_all(command.as_bytes())?;
stream.flush()?;

// Read the response
let start = Instant::now();
let mut response = String::new();
stream.read_to_string(&mut response)?;

println!("{}", response);
println!("; response time: {:?}", start.elapsed());
Ok(())
}

fn main() {
let args: Vec<String> = env::args().collect();

if args.len() < 2 {
eprintln!("Usage: {} <command>", args[0]);
process::exit(1);
}

let command = args[1..].join(" ");

match send_command(&command) {
Ok(_) => (),
Err(e) => eprintln!("Error: {}", e),
}
}
19 changes: 4 additions & 15 deletions src/jsi/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,22 +260,11 @@ def main(args: list[str] | None = None) -> int:
logger.enable(console=stderr, level=LogLevel.DEBUG)

if config.daemon:
import asyncio

import daemon

from jsi.server import STDERR_PATH, STDOUT_PATH, Server

async def run_server():
server = Server(config)
await server.start()

stdout_file = open(STDOUT_PATH, "w+") # noqa: SIM115
stderr_file = open(STDERR_PATH, "w+") # noqa: SIM115

with daemon.DaemonContext(stdout=stdout_file, stderr=stderr_file):
asyncio.run(run_server())
import jsi.server

# this detaches the server from the current shell,
# this returns immediately, leaving the server running in the background
jsi.server.daemonize(config)
return 0

with timer("load_config"):
Expand Down
135 changes: 77 additions & 58 deletions src/jsi/server.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,34 @@
"""
A daemon that listens for requests on a unix socket.
Can be started with:
# with a command line interface to parse the config:
jsi [options] --daemon
# or with a default config:
python -m jsi.server
These commands return immediately, as a detached daemon runs in the background.
The daemon:
- checks if there is an existing daemon (as indicated by ~/.jsi/daemon/server.pid)
- it kills the existing daemon if found
- it writes its own pid to ~/.jsi/daemon/server.pid
- it outputs logs to ~/.jsi/daemon/server.{err,out}
- it listens for requests on a unix domain socket (by default ~/.jsi/daemon/server.sock)
- each request is a single line of text, the path to a file to solve
- for each request, it runs the sequence of solvers defined in the config
- it returns the output of the solvers, based on the config
- it runs until terminated by the user or another daemon
"""

import asyncio
import contextlib
import os
import signal
import socket
import threading
from pathlib import Path

import daemon # type: ignore

Expand All @@ -20,17 +45,35 @@
base_commands,
set_input_output,
)
from jsi.utils import pid_exists
from jsi.utils import get_consoles, pid_exists, unexpand_home

SERVER_HOME = os.path.expanduser("~/.jsi/daemon")
SOCKET_PATH = os.path.join(SERVER_HOME, "server.sock")
STDOUT_PATH = os.path.join(SERVER_HOME, "server.out")
STDERR_PATH = os.path.join(SERVER_HOME, "server.err")
PID_PATH = os.path.join(SERVER_HOME, "server.pid")
SERVER_HOME = Path.home() / ".jsi" / "daemon"
SOCKET_PATH = SERVER_HOME / "server.sock"
STDOUT_PATH = SERVER_HOME / "server.out"
STDERR_PATH = SERVER_HOME / "server.err"
PID_PATH = SERVER_HOME / "server.pid"
CONN_BUFFER_SIZE = 1024

# TODO: handle signal.SIGCHLD (received when a child process exits)
# TODO: check if there is an existing daemon

unexpanded_pid = unexpand_home(PID_PATH)
server_usage = f"""[bold white]starting daemon...[/]
- tail logs:
[green]tail -f {unexpand_home(STDERR_PATH)[:-4]}.{{err,out}}[/]
- view pid of running daemon:
[green]cat {unexpanded_pid}[/]
- display useful info about current daemon:
[green]ps -o pid,etime,command -p $(cat {unexpanded_pid})[/]
- terminate daemon (gently, with SIGTERM):
[green]kill $(cat {unexpanded_pid})[/]
- terminate daemon (forcefully, with SIGKILL):
[green]kill -9 $(cat {unexpanded_pid})[/]
(use the commands above to monitor the daemon, this process will exit immediately)"""


class ResultListener:
Expand All @@ -57,9 +100,10 @@ def result(self) -> str:


class PIDFile:
def __init__(self, path: str):
def __init__(self, path: Path):
self.path = path
self.pid = os.getpid()
# don't get the pid here, as it may not be the current pid anymore
# by the time we enter the context manager

def __enter__(self):
try:
Expand All @@ -70,18 +114,22 @@ def __enter__(self):
if pid_exists(int(other_pid)):
print(f"killing existing daemon ({other_pid=})")
os.kill(int(other_pid), signal.SIGKILL)

else:
print(f"pid file points to dead daemon ({other_pid=})")
except FileNotFoundError:
# pid file doesn't exist, we're good to go
pass

# overwrite the file if it already exists
pid = os.getpid()
with open(self.path, "w") as fd:
fd.write(str(self.pid))
fd.write(str(pid))

print(f"created pid file: {self.path} ({self.pid=})")
print(f"created pid file: {self.path} ({pid=})")
return self.path

def __exit__(self, exc_type, exc_value, traceback):
def __exit__(self, exc_type, exc_value, traceback): # type: ignore
print(f"removing pid file: {self.path}")

# ignore if the file was already removed
Expand All @@ -101,10 +149,11 @@ def __init__(self, config: Config):

async def start(self):
server = await asyncio.start_unix_server(
self.handle_client, path=SOCKET_PATH
self.handle_client, path=str(SOCKET_PATH)
)

async with server:
print(f"server started on {unexpand_home(SOCKET_PATH)}")
await server.serve_forever()

async def handle_client(
Expand Down Expand Up @@ -153,55 +202,25 @@ def sync_solve(self, file: str) -> str:

return listener.result

# def start(self, detach_process: bool | None = None):
# if not os.path.exists(SERVER_HOME):
# print(f"creating server home: {SERVER_HOME}")
# os.makedirs(SERVER_HOME)

# stdout_file = open(STDOUT_PATH, "w+") # noqa: SIM115
# stderr_file = open(STDERR_PATH, "w+") # noqa: SIM115

# print(f"daemonizing... (`tail -f {STDOUT_PATH[:-4]}.{{err,out}}` to view logs)")
# with daemon.DaemonContext(
# stdout=stdout_file,
# stderr=stderr_file,
# detach_process=detach_process,
# pidfile=PIDFile(PID_PATH),
# ):
# if os.path.exists(SOCKET_PATH):
# print(f"removing existing socket: {SOCKET_PATH}")
# os.remove(SOCKET_PATH)

# print(f"binding socket: {SOCKET_PATH}")
# with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as server:
# server.bind(SOCKET_PATH)
# server.listen(1)

# while True:
# try:
# conn, _ = server.accept()
# with conn:
# try:
# data = conn.recv(CONN_BUFFER_SIZE).decode()
# if not data:
# continue
# print(f"solving: {data}")
# conn.sendall(self.solve(data).encode())
# except ConnectionError as e:
# print(f"connection error: {e}")
# except SystemExit as e:
# print(f"system exit: {e}")
# return e.code


if __name__ == "__main__":
def daemonize(config: Config):
stdout, _ = get_consoles()
stdout.print(server_usage)

async def run_server():
server = Server(Config())
server = Server(config)
await server.start()

stdout_file = open(STDOUT_PATH, "w+") # noqa: SIM115
stderr_file = open(STDERR_PATH, "w+") # noqa: SIM115

with daemon.DaemonContext(stdout=stdout_file, stderr=stderr_file):
with daemon.DaemonContext(
stdout=stdout_file,
stderr=stderr_file,
pidfile=PIDFile(PID_PATH),
):
asyncio.run(run_server())


if __name__ == "__main__":
daemonize(Config())
6 changes: 5 additions & 1 deletion src/jsi/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import time
from datetime import datetime
from enum import Enum

from pathlib import Path

class Closeable:
def close(self) -> None: ...
Expand All @@ -32,6 +32,10 @@ def is_terminal(self) -> bool:
return False


def unexpand_home(path: str | Path) -> str:
return str(path).replace(str(Path.home()), "~")


def is_terminal(file: object) -> bool:
return hasattr(file, "isatty") and file.isatty() # type: ignore

Expand Down

0 comments on commit e6a3151

Please sign in to comment.