diff --git a/src/app.py b/src/app.py index c07e5f8d..a3ac8541 100644 --- a/src/app.py +++ b/src/app.py @@ -44,6 +44,8 @@ BackendStatusDatasetsSchema, AgentSchema, ServerSchema, + FileToQueueSchema, + QueueStatusSchema, ) @@ -174,6 +176,8 @@ def __call__(self, user: User = Depends(current_user)): can_manage_queries = RoleChecker([UserRole.can_manage_queries]) can_list_queries = RoleChecker([UserRole.can_list_queries]) can_download_files = RoleChecker([UserRole.can_download_files]) +can_manage_queues = RoleChecker([UserRole.can_manage_queues]) +can_view_queues = RoleChecker([UserRole.can_view_queues]) def get_user_roles(user: User) -> List[UserRole]: @@ -196,12 +200,14 @@ def expand_role(role: UserRole) -> List[UserRole]: UserRole.user, UserRole.can_list_all_queries, UserRole.can_manage_all_queries, + UserRole.can_manage_queues, ], UserRole.user: [ UserRole.can_view_queries, UserRole.can_manage_queries, UserRole.can_list_queries, UserRole.can_download_files, + UserRole.can_view_queues, ], UserRole.can_manage_all_queries: [UserRole.can_manage_queries], UserRole.can_list_all_queries: [UserRole.can_list_queries], @@ -589,6 +595,52 @@ def serve_index(path: str) -> FileResponse: return FileResponse(Path(__file__).parent / "mqueryfront/dist/index.html") +@app.post( + "/api/queue/{ursadb_id}", + response_model=StatusSchema, + tags=["queue"], + dependencies=[Depends(can_manage_queues)], +) +def add_files_to_queue( + ursadb_id: str, file_paths: List[FileToQueueSchema] = Body(...) +) -> StatusSchema: + db.add_files_to_queue(ursadb_id, file_paths) + + return StatusSchema(status="ok") + + +@app.get( + "/api/queue/{ursadb_id}", + response_model=QueueStatusSchema, + tags=["queue"], + dependencies=[Depends(can_view_queues)], +) +def get_queue_status(ursadb_id: str): + queue_status = db.get_queue_info(ursadb_id) + + return QueueStatusSchema( + ursadb_id=ursadb_id, + size=queue_status.size, + oldest_file=queue_status.oldest_file, + newest_file=queue_status.newest_file, + ) + + +@app.delete( + "/api/queue/{ursadb_id}", + response_model=StatusSchema, + tags=["queue"], + dependencies=[Depends(can_manage_queues)], +) +def delete_queued_by_id(ursadb_id: str): + ursadb_exist = db.exist_ursadb(ursadb_id) + + if ursadb_exist: + db.delete_queued_files(ursadb_id) + return StatusSchema(status="ok") + return StatusSchema(status="ursadb_id not found") + + @app.get("/recent", include_in_schema=False) @app.get("/status", include_in_schema=False) @app.get("/query", include_in_schema=False) diff --git a/src/db.py b/src/db.py index 4f1eb775..6489d415 100644 --- a/src/db.py +++ b/src/db.py @@ -10,6 +10,7 @@ from redis import StrictRedis from enum import Enum, auto from rq import Queue # type: ignore +from sqlalchemy import exists, func from sqlmodel import ( Session, create_engine, @@ -25,7 +26,13 @@ from .models.job import Job, JobStatus from .models.jobagent import JobAgent from .models.match import Match -from .schema import MatchesSchema, ConfigSchema +from .models.queuedfile import QueuedFile +from .schema import ( + MatchesSchema, + ConfigSchema, + FileToQueueSchema, + QueueStatusDatabaseSchema, +) from .config import app_config @@ -56,6 +63,8 @@ class UserRole(Enum): can_list_queries = auto() can_view_queries = auto() can_download_files = auto() + can_view_queues = auto() + can_manage_queues = auto() # Type alias for Job ids @@ -474,3 +483,48 @@ def alembic_upgrade(self) -> None: config_file = Path(__file__).parent / "alembic.ini" alembic_cfg = Config(str(config_file)) command.upgrade(alembic_cfg, "head") + + def add_files_to_queue( + self, ursadb_id: str, file_paths: List[FileToQueueSchema] + ): + with self.session() as session: + session.bulk_insert_mappings( + QueuedFile, + [ + { + "ursadb_id": ursadb_id, + "path": file.path, + "index_types": file.index_types, + "tags": file.tags, + } + for file in file_paths + ], + ) + session.commit() + + def get_queue_info(self, ursadb_id: str) -> QueueStatusDatabaseSchema: + + with self.session() as session: + query = select( # type: ignore + func.count(QueuedFile.id), + func.min(QueuedFile.created_at), + func.max(QueuedFile.created_at), + ).where(QueuedFile.ursadb_id == ursadb_id) + queue_info = session.exec(query).one() + + return QueueStatusDatabaseSchema( + size=queue_info[0], + oldest_file=queue_info[1], + newest_file=queue_info[2], + ) + + def delete_queued_files(self, ursadb_id: str) -> None: + with self.session() as session: + session.query(QueuedFile).filter_by(ursadb_id=ursadb_id).delete() + session.commit() + + def exist_ursadb(self, ursadb_id: str) -> bool: + with self.session() as session: + return session.query( + exists().where(QueuedFile.ursadb_id == ursadb_id) + ).scalar() diff --git a/src/schema.py b/src/schema.py index 5b2ab8ff..5e48bd53 100644 --- a/src/schema.py +++ b/src/schema.py @@ -1,5 +1,6 @@ +from datetime import datetime from enum import Enum -from typing import List, Dict, Optional, Sequence +from typing import List, Dict, Optional, Sequence, Literal from pydantic import BaseModel, Field # type: ignore from .models.job import JobView from .models.agentgroup import AgentGroupView @@ -105,3 +106,19 @@ class ServerSchema(BaseModel): openid_url: Optional[str] openid_client_id: Optional[str] about: str + + +class FileToQueueSchema(BaseModel): + path: str + index_types: List[Literal["gram3", "text4", "hash4", "wide8"]] + tags: List[str] + + +class QueueStatusDatabaseSchema(BaseModel): + size: int + oldest_file: Optional[datetime] + newest_file: Optional[datetime] + + +class QueueStatusSchema(QueueStatusDatabaseSchema): + ursadb_id: str