Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

437 implement api to manage indexing queues #444

Merged
merged 26 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
BackendStatusDatasetsSchema,
AgentSchema,
ServerSchema,
FileToQueueSchema,
QueueStatusSchema,
)


Expand Down Expand Up @@ -174,6 +176,8 @@ def __call__(self, user: User = Depends(current_user)):
can_manage_queries = RoleChecker([UserRole.can_manage_queries])
can_list_queries = RoleChecker([UserRole.can_list_queries])
can_download_files = RoleChecker([UserRole.can_download_files])
can_manage_queues = RoleChecker([UserRole.can_manage_queues])
can_view_queues = RoleChecker([UserRole.can_view_queues])


def get_user_roles(user: User) -> List[UserRole]:
Expand All @@ -196,12 +200,14 @@ def expand_role(role: UserRole) -> List[UserRole]:
UserRole.user,
UserRole.can_list_all_queries,
UserRole.can_manage_all_queries,
UserRole.can_manage_queues,
],
UserRole.user: [
UserRole.can_view_queries,
UserRole.can_manage_queries,
UserRole.can_list_queries,
UserRole.can_download_files,
UserRole.can_view_queues,
],
UserRole.can_manage_all_queries: [UserRole.can_manage_queries],
UserRole.can_list_all_queries: [UserRole.can_list_queries],
Expand Down Expand Up @@ -589,6 +595,52 @@ def serve_index(path: str) -> FileResponse:
return FileResponse(Path(__file__).parent / "mqueryfront/dist/index.html")


@app.post(
"/api/queue/{ursadb_id}",
response_model=StatusSchema,
tags=["queue"],
dependencies=[Depends(can_manage_queues)],
)
def add_files_to_queue(
ursadb_id: str, file_paths: List[FileToQueueSchema] = Body(...)
) -> StatusSchema:
db.add_files_to_queue(ursadb_id, file_paths)

return StatusSchema(status="ok")


@app.get(
"/api/queue/{ursadb_id}",
response_model=QueueStatusSchema,
tags=["queue"],
dependencies=[Depends(can_view_queues)],
)
def get_queue_status(ursadb_id: str):
queue_status = db.get_queue_info(ursadb_id)

return QueueStatusSchema(
ursadb_id=ursadb_id,
size=queue_status.size,
oldest_file=queue_status.oldest_file,
newest_file=queue_status.newest_file,
)


@app.delete(
"/api/queue/{ursadb_id}",
response_model=StatusSchema,
tags=["queue"],
dependencies=[Depends(can_manage_queues)],
)
def delete_queued_by_id(ursadb_id: str):
ursadb_exist = db.exist_ursadb(ursadb_id)

if ursadb_exist:
db.delete_queued_files(ursadb_id)
return StatusSchema(status="ok")
return StatusSchema(status="ursadb_id not found")


@app.get("/recent", include_in_schema=False)
@app.get("/status", include_in_schema=False)
@app.get("/query", include_in_schema=False)
Expand Down
56 changes: 55 additions & 1 deletion src/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from redis import StrictRedis
from enum import Enum, auto
from rq import Queue # type: ignore
from sqlalchemy import exists, func
from sqlmodel import (
Session,
create_engine,
Expand All @@ -25,7 +26,13 @@
from .models.job import Job, JobStatus
from .models.jobagent import JobAgent
from .models.match import Match
from .schema import MatchesSchema, ConfigSchema
from .models.queuedfile import QueuedFile
from .schema import (
MatchesSchema,
ConfigSchema,
FileToQueueSchema,
QueueStatusDatabaseSchema,
)
from .config import app_config


Expand Down Expand Up @@ -56,6 +63,8 @@ class UserRole(Enum):
can_list_queries = auto()
can_view_queries = auto()
can_download_files = auto()
can_view_queues = auto()
can_manage_queues = auto()


# Type alias for Job ids
Expand Down Expand Up @@ -474,3 +483,48 @@ def alembic_upgrade(self) -> None:
config_file = Path(__file__).parent / "alembic.ini"
alembic_cfg = Config(str(config_file))
command.upgrade(alembic_cfg, "head")

def add_files_to_queue(
self, ursadb_id: str, file_paths: List[FileToQueueSchema]
):
with self.session() as session:
session.bulk_insert_mappings(
QueuedFile,
[
{
"ursadb_id": ursadb_id,
"path": file.path,
"index_types": file.index_types,
"tags": file.tags,
}
for file in file_paths
],
)
session.commit()

def get_queue_info(self, ursadb_id: str) -> QueueStatusDatabaseSchema:

with self.session() as session:
query = select( # type: ignore
func.count(QueuedFile.id),
func.min(QueuedFile.created_at),
func.max(QueuedFile.created_at),
).where(QueuedFile.ursadb_id == ursadb_id)
queue_info = session.exec(query).one()

return QueueStatusDatabaseSchema(
size=queue_info[0],
oldest_file=queue_info[1],
newest_file=queue_info[2],
)

def delete_queued_files(self, ursadb_id: str) -> None:
with self.session() as session:
session.query(QueuedFile).filter_by(ursadb_id=ursadb_id).delete()
session.commit()

def exist_ursadb(self, ursadb_id: str) -> bool:
with self.session() as session:
return session.query(
exists().where(QueuedFile.ursadb_id == ursadb_id)
).scalar()
19 changes: 18 additions & 1 deletion src/schema.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from datetime import datetime
from enum import Enum
from typing import List, Dict, Optional, Sequence
from typing import List, Dict, Optional, Sequence, Literal
from pydantic import BaseModel, Field # type: ignore
from .models.job import JobView
from .models.agentgroup import AgentGroupView
Expand Down Expand Up @@ -105,3 +106,19 @@ class ServerSchema(BaseModel):
openid_url: Optional[str]
openid_client_id: Optional[str]
about: str


class FileToQueueSchema(BaseModel):
path: str
index_types: List[Literal["gram3", "text4", "hash4", "wide8"]]
tags: List[str]


class QueueStatusDatabaseSchema(BaseModel):
size: int
oldest_file: Optional[datetime]
newest_file: Optional[datetime]


class QueueStatusSchema(QueueStatusDatabaseSchema):
ursadb_id: str