Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor database handling in /api/job endpoint #394 #405

Merged
merged 1 commit into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,12 +530,10 @@ def job_statuses(user: User = Depends(current_user)) -> JobsSchema:
"""Returns statuses of all the jobs in the system. May take some time (> 1s)
when there are a lot of them.
"""
jobs = [db.get_job(job) for job in db.get_job_ids()]
jobs = sorted(jobs, key=lambda j: j.submitted, reverse=True)
jobs = [j for j in jobs if j.status != "removed"]
if "can_list_all_queries" not in get_user_roles(user):
jobs = [j for j in jobs if j.rule_author == user.name]

username_filter: Optional[str] = user.name
if "can_list_all_queries" in get_user_roles(user):
username_filter = None
jobs = db.get_valid_jobs(username_filter)
return JobsSchema(jobs=jobs)


Expand Down
34 changes: 25 additions & 9 deletions src/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,15 @@
from redis import StrictRedis
from enum import Enum
from rq import Queue # type: ignore
from sqlmodel import Session, SQLModel, create_engine, select, and_, update
from sqlmodel import (
Session,
SQLModel,
create_engine,
select,
and_,
update,
col,
)

from .models.agentgroup import AgentGroup
from .models.configentry import ConfigEntry
Expand Down Expand Up @@ -51,12 +59,6 @@ def session(self):
with Session(self.engine) as session:
yield session

def get_job_ids(self) -> List[JobId]:
"""Gets IDs of all jobs in the database."""
with self.session() as session:
jobs = session.exec(select(Job)).all()
return [j.id for j in jobs]

def cancel_job(self, job: JobId, error=None) -> None:
"""Sets the job status to cancelled, with optional error message."""
with self.session() as session:
Expand All @@ -80,6 +82,18 @@ def get_job(self, job: JobId) -> Job:
with self.session() as session:
return self.__get_job(session, job)

def get_valid_jobs(self, username_filter: Optional[str]) -> List[Job]:
"""Retrieves valid (accessible and not removed) jobs from the database."""
with self.session() as session:
query = (
select(Job)
.where(Job.status != "removed")
.order_by(col(Job.submitted).desc())
)
if username_filter:
query = query.where(Job.rule_author == username_filter)
return session.exec(query).all()

def remove_query(self, job: JobId) -> None:
"""Sets the job status to removed."""
with self.session() as session:
Expand Down Expand Up @@ -139,7 +153,8 @@ def agent_finish_job(self, job: Job) -> None:

def init_jobagent(self, job: Job, agent_id: int, tasks: int) -> None:
"""Creates a new JobAgent object.
If tasks==0 then finishes job immediately"""
If tasks==0 then finishes job immediately.
"""
with self.session() as session:
obj = JobAgent(
task_in_progress=tasks,
Expand Down Expand Up @@ -293,7 +308,8 @@ def register_active_agent(
active_plugins: List[str],
) -> None:
"""Update or create a Agent information row in the database.
Returns the new or existing agent ID."""
Returns the new or existing agent ID.
"""
# Currently this is done by workers when starting. In the future,
# this should be configured by the admin, and workers should just read
# their configuration from the database.
Expand Down
Loading