From dfbd2124bade8cdaea32c1a08c6742133c96c02e Mon Sep 17 00:00:00 2001 From: michalkrzem Date: Fri, 13 Dec 2024 08:50:02 +0100 Subject: [PATCH 01/24] 1st endpoint --- src/app.py | 12 ++++++++++++ src/db.py | 15 +++++++++++++++ src/schema.py | 10 ++++++++++ 3 files changed, 37 insertions(+) diff --git a/src/app.py b/src/app.py index c07e5f8d..6008a6b1 100644 --- a/src/app.py +++ b/src/app.py @@ -11,6 +11,7 @@ Depends, Header, ) # type: ignore +from pydantic import BaseModel from starlette.requests import Request # type: ignore from starlette.responses import Response, FileResponse, StreamingResponse # type: ignore from starlette.staticfiles import StaticFiles # type: ignore @@ -589,6 +590,17 @@ def serve_index(path: str) -> FileResponse: return FileResponse(Path(__file__).parent / "mqueryfront/dist/index.html") +@app.post( + "/api/queue/{ursadb_id}", + response_model=StatusSchema, + tags=["queue"], + dependencies=[Depends(can_manage_queries)], +) +def list_of_paths_to_index(ursadb_id: str, file_paths: FilePathsSchema = Body(...)): + db.set_queued_file(ursadb_id, file_paths) + + return StatusSchema(status="ok") + @app.get("/recent", include_in_schema=False) @app.get("/status", include_in_schema=False) @app.get("/query", include_in_schema=False) diff --git a/src/db.py b/src/db.py index 4f1eb775..5414d8ef 100644 --- a/src/db.py +++ b/src/db.py @@ -1,3 +1,5 @@ +import logging + from alembic.config import Config from alembic import command from pathlib import Path @@ -25,6 +27,7 @@ from .models.job import Job, JobStatus from .models.jobagent import JobAgent from .models.match import Match +from .models.queuedfile import QueuedFile from .schema import MatchesSchema, ConfigSchema from .config import app_config @@ -474,3 +477,15 @@ def alembic_upgrade(self) -> None: config_file = Path(__file__).parent / "alembic.ini" alembic_cfg = Config(str(config_file)) command.upgrade(alembic_cfg, "head") + + def set_queued_file(self, ursadb_id, file_paths): + with self.session() as session: + obj = QueuedFile( + ursadb_id=ursadb_id, + path=file_paths.path, + index_types=file_paths.index_types, + tags=file_paths.tags + ) + session.add(obj) + session.commit() + diff --git a/src/schema.py b/src/schema.py index 5b2ab8ff..de8bde29 100644 --- a/src/schema.py +++ b/src/schema.py @@ -105,3 +105,13 @@ class ServerSchema(BaseModel): openid_url: Optional[str] openid_client_id: Optional[str] about: str + + +class FilePathsInputSchema(BaseModel): + path: str + index_types: List[str] + tags: List[str] + + +class FilePathsSchema(FilePathsInputSchema): + ursadb_id: str From 4f94d5729a4e2c2ed1a56e9140a6e7b25c6979ea Mon Sep 17 00:00:00 2001 From: michalkrzem Date: Tue, 31 Dec 2024 14:08:09 +0100 Subject: [PATCH 02/24] Different input scheme --- src/app.py | 15 +++++++++++---- src/db.py | 18 +++++++++--------- src/schema.py | 6 +----- 3 files changed, 21 insertions(+), 18 deletions(-) diff --git a/src/app.py b/src/app.py index 6008a6b1..45d809ca 100644 --- a/src/app.py +++ b/src/app.py @@ -11,7 +11,6 @@ Depends, Header, ) # type: ignore -from pydantic import BaseModel from starlette.requests import Request # type: ignore from starlette.responses import Response, FileResponse, StreamingResponse # type: ignore from starlette.staticfiles import StaticFiles # type: ignore @@ -45,6 +44,7 @@ BackendStatusDatasetsSchema, AgentSchema, ServerSchema, + FilePathInputSchema, ) @@ -175,6 +175,8 @@ def __call__(self, user: User = Depends(current_user)): can_manage_queries = RoleChecker([UserRole.can_manage_queries]) can_list_queries = RoleChecker([UserRole.can_list_queries]) can_download_files = RoleChecker([UserRole.can_download_files]) +can_manage_queues = RoleChecker([UserRole.can_manage_queues]) +can_view_queues = RoleChecker([UserRole.can_view_queues]) def get_user_roles(user: User) -> List[UserRole]: @@ -197,12 +199,14 @@ def expand_role(role: UserRole) -> List[UserRole]: UserRole.user, UserRole.can_list_all_queries, UserRole.can_manage_all_queries, + UserRole.can_manage_queues, ], UserRole.user: [ UserRole.can_view_queries, UserRole.can_manage_queries, UserRole.can_list_queries, UserRole.can_download_files, + UserRole.can_view_queues, ], UserRole.can_manage_all_queries: [UserRole.can_manage_queries], UserRole.can_list_all_queries: [UserRole.can_list_queries], @@ -552,7 +556,7 @@ def job_statuses(user: User = Depends(current_user)) -> JobsSchema: @app.delete( "/api/query/{job_id}", response_model=StatusSchema, - dependencies=[Depends(can_manage_queries)], + dependencies=[Depends(can_manage_queues)], ) def query_remove( job_id: str, user: User = Depends(current_user) @@ -594,13 +598,16 @@ def serve_index(path: str) -> FileResponse: "/api/queue/{ursadb_id}", response_model=StatusSchema, tags=["queue"], - dependencies=[Depends(can_manage_queries)], + dependencies=[Depends(can_manage_queues)], ) -def list_of_paths_to_index(ursadb_id: str, file_paths: FilePathsSchema = Body(...)): +def list_of_paths_to_index( + ursadb_id: str, file_paths: List[FilePathInputSchema] = Body(...) +): db.set_queued_file(ursadb_id, file_paths) return StatusSchema(status="ok") + @app.get("/recent", include_in_schema=False) @app.get("/status", include_in_schema=False) @app.get("/query", include_in_schema=False) diff --git a/src/db.py b/src/db.py index 5414d8ef..d72d9e72 100644 --- a/src/db.py +++ b/src/db.py @@ -1,5 +1,3 @@ -import logging - from alembic.config import Config from alembic import command from pathlib import Path @@ -59,6 +57,8 @@ class UserRole(Enum): can_list_queries = auto() can_view_queries = auto() can_download_files = auto() + can_view_queues = auto() + can_manage_queues = auto() # Type alias for Job ids @@ -480,12 +480,12 @@ def alembic_upgrade(self) -> None: def set_queued_file(self, ursadb_id, file_paths): with self.session() as session: - obj = QueuedFile( - ursadb_id=ursadb_id, - path=file_paths.path, - index_types=file_paths.index_types, - tags=file_paths.tags - ) + for file in file_paths: + obj = QueuedFile( + ursadb_id=ursadb_id, + path=file.path, + index_types=file.index_types, + tags=file.tags, + ) session.add(obj) session.commit() - diff --git a/src/schema.py b/src/schema.py index de8bde29..558208fa 100644 --- a/src/schema.py +++ b/src/schema.py @@ -107,11 +107,7 @@ class ServerSchema(BaseModel): about: str -class FilePathsInputSchema(BaseModel): +class FilePathInputSchema(BaseModel): path: str index_types: List[str] tags: List[str] - - -class FilePathsSchema(FilePathsInputSchema): - ursadb_id: str From 61e3d34e4b47b786037db96575e8c8491165de70 Mon Sep 17 00:00:00 2001 From: michalkrzem Date: Tue, 7 Jan 2025 09:58:05 +0100 Subject: [PATCH 03/24] fix .add indentation --- src/db.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/db.py b/src/db.py index d72d9e72..2c97dc83 100644 --- a/src/db.py +++ b/src/db.py @@ -487,5 +487,5 @@ def set_queued_file(self, ursadb_id, file_paths): index_types=file.index_types, tags=file.tags, ) - session.add(obj) + session.add(obj) session.commit() From 2fe3782954c6fea2ca45f12b4817660ff573e9b7 Mon Sep 17 00:00:00 2001 From: michalkrzem Date: Tue, 7 Jan 2025 10:25:20 +0100 Subject: [PATCH 04/24] add_all version --- src/db.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/db.py b/src/db.py index 2c97dc83..aa6eb695 100644 --- a/src/db.py +++ b/src/db.py @@ -480,12 +480,13 @@ def alembic_upgrade(self) -> None: def set_queued_file(self, ursadb_id, file_paths): with self.session() as session: - for file in file_paths: - obj = QueuedFile( + objects = [ + QueuedFile( ursadb_id=ursadb_id, path=file.path, index_types=file.index_types, - tags=file.tags, - ) - session.add(obj) + tags=file.tags) + for file in file_paths + ] + session.add_all(objects) session.commit() From e455c78fdd44b0402b9bde4886d74d7da1caf0bb Mon Sep 17 00:00:00 2001 From: michalkrzem Date: Tue, 7 Jan 2025 10:42:03 +0100 Subject: [PATCH 05/24] bulk_insert_mapping version --- src/db.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/src/db.py b/src/db.py index aa6eb695..ae6f77a6 100644 --- a/src/db.py +++ b/src/db.py @@ -480,13 +480,14 @@ def alembic_upgrade(self) -> None: def set_queued_file(self, ursadb_id, file_paths): with self.session() as session: - objects = [ - QueuedFile( - ursadb_id=ursadb_id, - path=file.path, - index_types=file.index_types, - tags=file.tags) - for file in file_paths - ] - session.add_all(objects) + session.bulk_insert_mappings( + QueuedFile, [ + { + "ursadb_id": ursadb_id, + "path": file.path, + "index_types": file.index_types, + "tags": file.tags + } for file in file_paths + ] + ) session.commit() From ae24d88a2eb0b5f0fac2e1d77f4caf467cc97b0c Mon Sep 17 00:00:00 2001 From: michalkrzem Date: Tue, 14 Jan 2025 16:55:56 +0100 Subject: [PATCH 06/24] 3 endpoints --- src/app.py | 62 ++++++++++++++++++++++++++++++++++++++++++++++----- src/db.py | 36 +++++++++++++++++++++++++----- src/schema.py | 19 +++++++++++++--- 3 files changed, 102 insertions(+), 15 deletions(-) diff --git a/src/app.py b/src/app.py index 45d809ca..0583b5ef 100644 --- a/src/app.py +++ b/src/app.py @@ -44,7 +44,8 @@ BackendStatusDatasetsSchema, AgentSchema, ServerSchema, - FilePathInputSchema, + FileToQueueSchema, + QueueStatusSchema, ) @@ -556,7 +557,7 @@ def job_statuses(user: User = Depends(current_user)) -> JobsSchema: @app.delete( "/api/query/{job_id}", response_model=StatusSchema, - dependencies=[Depends(can_manage_queues)], + dependencies=[Depends(can_manage_queries)], ) def query_remove( job_id: str, user: User = Depends(current_user) @@ -600,14 +601,63 @@ def serve_index(path: str) -> FileResponse: tags=["queue"], dependencies=[Depends(can_manage_queues)], ) -def list_of_paths_to_index( - ursadb_id: str, file_paths: List[FilePathInputSchema] = Body(...) -): - db.set_queued_file(ursadb_id, file_paths) +def add_files_to_queue( + ursadb_id: str, file_paths: List[FileToQueueSchema] = Body(...) +) -> StatusSchema: + db.add_queued_file(ursadb_id, file_paths) return StatusSchema(status="ok") +@app.get( + "/api/queue/{ursadb_id}", + response_model=QueueStatusSchema, + tags=["queue"], + dependencies=[Depends(can_view_queues)], +) +def get_queue_status(ursadb_id: str): + queue_files = db.get_queued_files(ursadb_id) + + if queue_files: + oldest_file = min(queue_files, key=lambda x: x.created_at) + newest_file = max(queue_files, key=lambda x: x.created_at) + + return QueueStatusSchema( + ursadb_id=ursadb_id, + size=len(queue_files), + oldest_file={ + "path": oldest_file.path, + "created_at": oldest_file.created_at, + }, + newest_file={ + "path": newest_file.path, + "created_at": newest_file.created_at, + }, + ) + raise HTTPException( + status_code=400, + detail="Ursadb_id not found.", + ) + + +@app.delete( + "/api/queue/{ursadb_id}", + response_model=StatusSchema, + tags=["queue"], + dependencies=[Depends(can_manage_queues)], +) +def delete_queued_by_ursadb_id(ursadb_id: str): + ursadb_exist = db.exist_ursadb(ursadb_id) + if ursadb_exist: + db.delete_queued_file(ursadb_id) + return StatusSchema(status="ok") + + raise HTTPException( + status_code=400, + detail="Ursadb_id not found.", + ) + + @app.get("/recent", include_in_schema=False) @app.get("/status", include_in_schema=False) @app.get("/query", include_in_schema=False) diff --git a/src/db.py b/src/db.py index ae6f77a6..caa59321 100644 --- a/src/db.py +++ b/src/db.py @@ -10,6 +10,7 @@ from redis import StrictRedis from enum import Enum, auto from rq import Queue # type: ignore +from sqlalchemy import exists from sqlmodel import ( Session, create_engine, @@ -26,7 +27,7 @@ from .models.jobagent import JobAgent from .models.match import Match from .models.queuedfile import QueuedFile -from .schema import MatchesSchema, ConfigSchema +from .schema import MatchesSchema, ConfigSchema, FileToQueueSchema from .config import app_config @@ -478,16 +479,39 @@ def alembic_upgrade(self) -> None: alembic_cfg = Config(str(config_file)) command.upgrade(alembic_cfg, "head") - def set_queued_file(self, ursadb_id, file_paths): + def add_queued_file( + self, ursadb_id: str, file_paths: List[FileToQueueSchema] + ): with self.session() as session: session.bulk_insert_mappings( - QueuedFile, [ + QueuedFile, + [ { "ursadb_id": ursadb_id, "path": file.path, "index_types": file.index_types, - "tags": file.tags - } for file in file_paths - ] + "tags": file.tags, + } + for file in file_paths + ], ) session.commit() + + def get_queued_files(self, ursadb_id: str) -> List[QueuedFile]: + with self.session() as session: + queue_files = ( + session.query(QueuedFile).filter_by(ursadb_id=ursadb_id).all() + ) + + return queue_files + + def delete_queued_file(self, ursadb_id: str) -> None: + with self.session() as session: + session.query(QueuedFile).filter_by(ursadb_id=ursadb_id).delete() + session.commit() + + def exist_ursadb(self, ursadb_id: str) -> bool: + with self.session() as session: + return session.query( + exists().where(QueuedFile.ursadb_id == ursadb_id) + ).scalar() diff --git a/src/schema.py b/src/schema.py index 558208fa..833b4091 100644 --- a/src/schema.py +++ b/src/schema.py @@ -1,5 +1,6 @@ +from datetime import datetime from enum import Enum -from typing import List, Dict, Optional, Sequence +from typing import List, Dict, Optional, Sequence, Literal from pydantic import BaseModel, Field # type: ignore from .models.job import JobView from .models.agentgroup import AgentGroupView @@ -107,7 +108,19 @@ class ServerSchema(BaseModel): about: str -class FilePathInputSchema(BaseModel): +class FileToQueueSchema(BaseModel): path: str - index_types: List[str] + index_types: List[Literal["gram3", "text4", "hash4", "wide8"]] tags: List[str] + + +class EdgeOfFileSchema(BaseModel): + path: str + created_at: datetime + + +class QueueStatusSchema(BaseModel): + ursadb_id: str + size: int + oldest_file: EdgeOfFileSchema + newest_file: EdgeOfFileSchema From f381aa650785df7778bc27b2d5a9d05d7ff94b6e Mon Sep 17 00:00:00 2001 From: michalkrzem Date: Tue, 14 Jan 2025 17:02:54 +0100 Subject: [PATCH 07/24] mypy fix --- src/app.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/app.py b/src/app.py index 0583b5ef..d2abb920 100644 --- a/src/app.py +++ b/src/app.py @@ -45,7 +45,7 @@ AgentSchema, ServerSchema, FileToQueueSchema, - QueueStatusSchema, + QueueStatusSchema, EdgeOfFileSchema, ) @@ -625,14 +625,14 @@ def get_queue_status(ursadb_id: str): return QueueStatusSchema( ursadb_id=ursadb_id, size=len(queue_files), - oldest_file={ - "path": oldest_file.path, - "created_at": oldest_file.created_at, - }, - newest_file={ - "path": newest_file.path, - "created_at": newest_file.created_at, - }, + oldest_file=EdgeOfFileSchema( + path=oldest_file.path, + created_at=oldest_file.created_at, + ), + newest_file=EdgeOfFileSchema( + path=oldest_file.path, + created_at=newest_file.created_at, + ), ) raise HTTPException( status_code=400, From dd0347d923be47ac2875ba68b7a136233b45efaf Mon Sep 17 00:00:00 2001 From: michalkrzem Date: Tue, 14 Jan 2025 17:04:50 +0100 Subject: [PATCH 08/24] black fix --- src/app.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/app.py b/src/app.py index d2abb920..74bb4b00 100644 --- a/src/app.py +++ b/src/app.py @@ -45,7 +45,8 @@ AgentSchema, ServerSchema, FileToQueueSchema, - QueueStatusSchema, EdgeOfFileSchema, + QueueStatusSchema, + EdgeOfFileSchema, ) From caaa3047347e682d70c7555039b667ae3c91ce52 Mon Sep 17 00:00:00 2001 From: michalkrzem Date: Tue, 11 Feb 2025 14:26:08 +0100 Subject: [PATCH 09/24] after review --- src/app.py | 42 ++++++++++++------------------------------ src/db.py | 20 ++++++++++++-------- src/schema.py | 9 ++------- 3 files changed, 26 insertions(+), 45 deletions(-) diff --git a/src/app.py b/src/app.py index 74bb4b00..1caa9ef5 100644 --- a/src/app.py +++ b/src/app.py @@ -46,7 +46,6 @@ ServerSchema, FileToQueueSchema, QueueStatusSchema, - EdgeOfFileSchema, ) @@ -605,7 +604,7 @@ def serve_index(path: str) -> FileResponse: def add_files_to_queue( ursadb_id: str, file_paths: List[FileToQueueSchema] = Body(...) ) -> StatusSchema: - db.add_queued_file(ursadb_id, file_paths) + db.add_files_to_queue(ursadb_id, file_paths) return StatusSchema(status="ok") @@ -617,27 +616,13 @@ def add_files_to_queue( dependencies=[Depends(can_view_queues)], ) def get_queue_status(ursadb_id: str): - queue_files = db.get_queued_files(ursadb_id) - - if queue_files: - oldest_file = min(queue_files, key=lambda x: x.created_at) - newest_file = max(queue_files, key=lambda x: x.created_at) - - return QueueStatusSchema( - ursadb_id=ursadb_id, - size=len(queue_files), - oldest_file=EdgeOfFileSchema( - path=oldest_file.path, - created_at=oldest_file.created_at, - ), - newest_file=EdgeOfFileSchema( - path=oldest_file.path, - created_at=newest_file.created_at, - ), - ) - raise HTTPException( - status_code=400, - detail="Ursadb_id not found.", + queue_status = db.get_queue_info(ursadb_id) + logging.error(queue_status) + return QueueStatusSchema( + ursadb_id=ursadb_id, + size=queue_status[0], + oldest_file=queue_status[1], + newest_file=queue_status[2], ) @@ -647,16 +632,13 @@ def get_queue_status(ursadb_id: str): tags=["queue"], dependencies=[Depends(can_manage_queues)], ) -def delete_queued_by_ursadb_id(ursadb_id: str): +def delete_queued_by_id(ursadb_id: str): ursadb_exist = db.exist_ursadb(ursadb_id) + if ursadb_exist: - db.delete_queued_file(ursadb_id) + db.delete_queued_files(ursadb_id) return StatusSchema(status="ok") - - raise HTTPException( - status_code=400, - detail="Ursadb_id not found.", - ) + return StatusSchema(status="ursadb_id not found") @app.get("/recent", include_in_schema=False) diff --git a/src/db.py b/src/db.py index caa59321..46138a9b 100644 --- a/src/db.py +++ b/src/db.py @@ -10,7 +10,7 @@ from redis import StrictRedis from enum import Enum, auto from rq import Queue # type: ignore -from sqlalchemy import exists +from sqlalchemy import exists, func from sqlmodel import ( Session, create_engine, @@ -479,7 +479,7 @@ def alembic_upgrade(self) -> None: alembic_cfg = Config(str(config_file)) command.upgrade(alembic_cfg, "head") - def add_queued_file( + def add_files_to_queue( self, ursadb_id: str, file_paths: List[FileToQueueSchema] ): with self.session() as session: @@ -497,15 +497,19 @@ def add_queued_file( ) session.commit() - def get_queued_files(self, ursadb_id: str) -> List[QueuedFile]: + def get_queue_info(self, ursadb_id: str) -> List[QueuedFile]: + with self.session() as session: - queue_files = ( - session.query(QueuedFile).filter_by(ursadb_id=ursadb_id).all() - ) + query = select( + func.count(QueuedFile.id).label("size"), + func.min(QueuedFile.created_at).label("oldest_file"), + func.max(QueuedFile.created_at).label("newest_file"), + ).where(QueuedFile.ursadb_id == ursadb_id) + queue_info = session.exec(query).one() - return queue_files + return queue_info - def delete_queued_file(self, ursadb_id: str) -> None: + def delete_queued_files(self, ursadb_id: str) -> None: with self.session() as session: session.query(QueuedFile).filter_by(ursadb_id=ursadb_id).delete() session.commit() diff --git a/src/schema.py b/src/schema.py index 833b4091..6e76e3c3 100644 --- a/src/schema.py +++ b/src/schema.py @@ -114,13 +114,8 @@ class FileToQueueSchema(BaseModel): tags: List[str] -class EdgeOfFileSchema(BaseModel): - path: str - created_at: datetime - - class QueueStatusSchema(BaseModel): ursadb_id: str size: int - oldest_file: EdgeOfFileSchema - newest_file: EdgeOfFileSchema + oldest_file: datetime | None + newest_file: datetime | None From e3c4d2736638cb4be04496f145a7626f98d6a1a3 Mon Sep 17 00:00:00 2001 From: michalkrzem Date: Tue, 11 Feb 2025 15:32:38 +0100 Subject: [PATCH 10/24] lint --- src/schema.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/schema.py b/src/schema.py index 6e76e3c3..a71f974e 100644 --- a/src/schema.py +++ b/src/schema.py @@ -117,5 +117,5 @@ class FileToQueueSchema(BaseModel): class QueueStatusSchema(BaseModel): ursadb_id: str size: int - oldest_file: datetime | None - newest_file: datetime | None + oldest_file: Optional[datetime] + newest_file: Optional[datetime] From 59990ea5b3b32be754bcf624189dce123410f3e4 Mon Sep 17 00:00:00 2001 From: michalkrzem Date: Tue, 11 Feb 2025 15:40:59 +0100 Subject: [PATCH 11/24] lint --- src/app.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/app.py b/src/app.py index 1caa9ef5..3dd1b423 100644 --- a/src/app.py +++ b/src/app.py @@ -617,7 +617,7 @@ def add_files_to_queue( ) def get_queue_status(ursadb_id: str): queue_status = db.get_queue_info(ursadb_id) - logging.error(queue_status) + return QueueStatusSchema( ursadb_id=ursadb_id, size=queue_status[0], From 300b398fcd61131aa028ca49c8b9828b4c4eb052 Mon Sep 17 00:00:00 2001 From: michalkrzem Date: Tue, 11 Feb 2025 15:41:14 +0100 Subject: [PATCH 12/24] lint --- src/db.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/db.py b/src/db.py index 46138a9b..6f8f44e5 100644 --- a/src/db.py +++ b/src/db.py @@ -501,9 +501,9 @@ def get_queue_info(self, ursadb_id: str) -> List[QueuedFile]: with self.session() as session: query = select( - func.count(QueuedFile.id).label("size"), - func.min(QueuedFile.created_at).label("oldest_file"), - func.max(QueuedFile.created_at).label("newest_file"), + func.count(QueuedFile.id), + func.min(QueuedFile.created_at), + func.max(QueuedFile.created_at), ).where(QueuedFile.ursadb_id == ursadb_id) queue_info = session.exec(query).one() From b408a96f403ff57e2574672e88e375cc86ee3037 Mon Sep 17 00:00:00 2001 From: michalkrzem Date: Thu, 20 Feb 2025 09:39:54 +0100 Subject: [PATCH 13/24] fix typing --- src/db.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/db.py b/src/db.py index 6f8f44e5..1ee7bf33 100644 --- a/src/db.py +++ b/src/db.py @@ -1,9 +1,11 @@ +from datetime import datetime + from alembic.config import Config from alembic import command from pathlib import Path from collections import defaultdict from contextlib import contextmanager -from typing import List, Optional, Dict, Any +from typing import List, Optional, Dict, Any, Union, Tuple from time import time import random import string @@ -497,7 +499,7 @@ def add_files_to_queue( ) session.commit() - def get_queue_info(self, ursadb_id: str) -> List[QueuedFile]: + def get_queue_info(self, ursadb_id: str) -> Tuple[int, datetime, datetime]: with self.session() as session: query = select( From 2ad8a10ea669510c9e2b61e7acdd2605d587afb2 Mon Sep 17 00:00:00 2001 From: michalkrzem Date: Thu, 20 Feb 2025 09:45:57 +0100 Subject: [PATCH 14/24] lint --- src/db.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/db.py b/src/db.py index 1ee7bf33..44d23da4 100644 --- a/src/db.py +++ b/src/db.py @@ -5,7 +5,7 @@ from pathlib import Path from collections import defaultdict from contextlib import contextmanager -from typing import List, Optional, Dict, Any, Union, Tuple +from typing import List, Optional, Dict, Any, Tuple from time import time import random import string @@ -499,7 +499,7 @@ def add_files_to_queue( ) session.commit() - def get_queue_info(self, ursadb_id: str) -> Tuple[int, datetime, datetime]: + def get_queue_info(self, ursadb_id: str) -> List[int | datetime | None]: with self.session() as session: query = select( From 51c1ece6037b5a3f8d47d29f869bdfe047cfa30c Mon Sep 17 00:00:00 2001 From: michalkrzem Date: Thu, 20 Feb 2025 10:00:21 +0100 Subject: [PATCH 15/24] lint --- src/app.py | 9 +++++---- src/db.py | 9 +++++++-- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/app.py b/src/app.py index 3dd1b423..a9dfe3cc 100644 --- a/src/app.py +++ b/src/app.py @@ -617,12 +617,13 @@ def add_files_to_queue( ) def get_queue_status(ursadb_id: str): queue_status = db.get_queue_info(ursadb_id) - + logging.error(queue_status) + logging.error(type(queue_status)) return QueueStatusSchema( ursadb_id=ursadb_id, - size=queue_status[0], - oldest_file=queue_status[1], - newest_file=queue_status[2], + size=queue_status.get("size"), + oldest_file=queue_status.get("oldest_file"), + newest_file=queue_status.get("newest_file"), ) diff --git a/src/db.py b/src/db.py index 44d23da4..f0234bb8 100644 --- a/src/db.py +++ b/src/db.py @@ -5,7 +5,7 @@ from pathlib import Path from collections import defaultdict from contextlib import contextmanager -from typing import List, Optional, Dict, Any, Tuple +from typing import List, Optional, Dict, Any, Union from time import time import random import string @@ -499,7 +499,7 @@ def add_files_to_queue( ) session.commit() - def get_queue_info(self, ursadb_id: str) -> List[int | datetime | None]: + def get_queue_info(self, ursadb_id: str) -> Dict[str, Union[int, datetime | None]]: with self.session() as session: query = select( @@ -509,6 +509,11 @@ def get_queue_info(self, ursadb_id: str) -> List[int | datetime | None]: ).where(QueuedFile.ursadb_id == ursadb_id) queue_info = session.exec(query).one() + queue_info = { + "size": queue_info[0], + "oldest_file": queue_info[1], + "newest_file": queue_info[2], + } return queue_info def delete_queued_files(self, ursadb_id: str) -> None: From af4785e44a78b925e918e064f9b3dc5d598ecd51 Mon Sep 17 00:00:00 2001 From: michalkrzem Date: Thu, 20 Feb 2025 10:14:54 +0100 Subject: [PATCH 16/24] lint --- src/db.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/db.py b/src/db.py index f0234bb8..7d3dfe5c 100644 --- a/src/db.py +++ b/src/db.py @@ -499,7 +499,7 @@ def add_files_to_queue( ) session.commit() - def get_queue_info(self, ursadb_id: str) -> Dict[str, Union[int, datetime | None]]: + def get_queue_info(self, ursadb_id: str) -> Dict[str, int | Optional[datetime]]: with self.session() as session: query = select( From dd3e61e216d066f81997e4650fe9660f7f13072b Mon Sep 17 00:00:00 2001 From: michalkrzem Date: Thu, 20 Feb 2025 10:25:27 +0100 Subject: [PATCH 17/24] lint --- src/app.py | 6 +++--- src/db.py | 18 ++++++++---------- src/schema.py | 6 ++++++ 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/src/app.py b/src/app.py index a9dfe3cc..defb30fc 100644 --- a/src/app.py +++ b/src/app.py @@ -621,9 +621,9 @@ def get_queue_status(ursadb_id: str): logging.error(type(queue_status)) return QueueStatusSchema( ursadb_id=ursadb_id, - size=queue_status.get("size"), - oldest_file=queue_status.get("oldest_file"), - newest_file=queue_status.get("newest_file"), + size=queue_status.size, + oldest_file=queue_status.oldest_file, + newest_file=queue_status.newest_file, ) diff --git a/src/db.py b/src/db.py index 7d3dfe5c..73a78a0d 100644 --- a/src/db.py +++ b/src/db.py @@ -1,11 +1,9 @@ -from datetime import datetime - from alembic.config import Config from alembic import command from pathlib import Path from collections import defaultdict from contextlib import contextmanager -from typing import List, Optional, Dict, Any, Union +from typing import List, Optional, Dict, Any from time import time import random import string @@ -29,7 +27,12 @@ from .models.jobagent import JobAgent from .models.match import Match from .models.queuedfile import QueuedFile -from .schema import MatchesSchema, ConfigSchema, FileToQueueSchema +from .schema import ( + MatchesSchema, + ConfigSchema, + FileToQueueSchema, + QueueStatusDatasetsSchema, +) from .config import app_config @@ -499,7 +502,7 @@ def add_files_to_queue( ) session.commit() - def get_queue_info(self, ursadb_id: str) -> Dict[str, int | Optional[datetime]]: + def get_queue_info(self, ursadb_id: str) -> QueueStatusDatasetsSchema: with self.session() as session: query = select( @@ -509,11 +512,6 @@ def get_queue_info(self, ursadb_id: str) -> Dict[str, int | Optional[datetime]]: ).where(QueuedFile.ursadb_id == ursadb_id) queue_info = session.exec(query).one() - queue_info = { - "size": queue_info[0], - "oldest_file": queue_info[1], - "newest_file": queue_info[2], - } return queue_info def delete_queued_files(self, ursadb_id: str) -> None: diff --git a/src/schema.py b/src/schema.py index a71f974e..e8315820 100644 --- a/src/schema.py +++ b/src/schema.py @@ -119,3 +119,9 @@ class QueueStatusSchema(BaseModel): size: int oldest_file: Optional[datetime] newest_file: Optional[datetime] + + +class QueueStatusDatasetsSchema(QueueStatusSchema): + size: int + oldest_file: Optional[datetime] + newest_file: Optional[datetime] From 5362cbd7f50ffc3be3e12f6d7406dc37c5ff96a8 Mon Sep 17 00:00:00 2001 From: michalkrzem Date: Thu, 20 Feb 2025 10:39:34 +0100 Subject: [PATCH 18/24] lint --- src/db.py | 10 +++++++--- src/schema.py | 9 +++------ 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/src/db.py b/src/db.py index 73a78a0d..1521c581 100644 --- a/src/db.py +++ b/src/db.py @@ -31,7 +31,7 @@ MatchesSchema, ConfigSchema, FileToQueueSchema, - QueueStatusDatasetsSchema, + QueueStatusDatabaseSchema, ) from .config import app_config @@ -502,7 +502,7 @@ def add_files_to_queue( ) session.commit() - def get_queue_info(self, ursadb_id: str) -> QueueStatusDatasetsSchema: + def get_queue_info(self, ursadb_id: str) -> QueueStatusDatabaseSchema: with self.session() as session: query = select( @@ -512,7 +512,11 @@ def get_queue_info(self, ursadb_id: str) -> QueueStatusDatasetsSchema: ).where(QueuedFile.ursadb_id == ursadb_id) queue_info = session.exec(query).one() - return queue_info + return QueueStatusDatabaseSchema( + size=queue_info[0], + oldest_file=queue_info[1], + newest_file=queue_info[2], + ) def delete_queued_files(self, ursadb_id: str) -> None: with self.session() as session: diff --git a/src/schema.py b/src/schema.py index e8315820..5e48bd53 100644 --- a/src/schema.py +++ b/src/schema.py @@ -114,14 +114,11 @@ class FileToQueueSchema(BaseModel): tags: List[str] -class QueueStatusSchema(BaseModel): - ursadb_id: str +class QueueStatusDatabaseSchema(BaseModel): size: int oldest_file: Optional[datetime] newest_file: Optional[datetime] -class QueueStatusDatasetsSchema(QueueStatusSchema): - size: int - oldest_file: Optional[datetime] - newest_file: Optional[datetime] +class QueueStatusSchema(QueueStatusDatabaseSchema): + ursadb_id: str From 98b2092694f797f8dcba9b35f0b6a16881518b62 Mon Sep 17 00:00:00 2001 From: michalkrzem Date: Thu, 20 Feb 2025 10:53:13 +0100 Subject: [PATCH 19/24] lint --- src/app.py | 3 +-- src/db.py | 6 ++++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/app.py b/src/app.py index defb30fc..a3ac8541 100644 --- a/src/app.py +++ b/src/app.py @@ -617,8 +617,7 @@ def add_files_to_queue( ) def get_queue_status(ursadb_id: str): queue_status = db.get_queue_info(ursadb_id) - logging.error(queue_status) - logging.error(type(queue_status)) + return QueueStatusSchema( ursadb_id=ursadb_id, size=queue_status.size, diff --git a/src/db.py b/src/db.py index 1521c581..c0a3365f 100644 --- a/src/db.py +++ b/src/db.py @@ -510,14 +510,16 @@ def get_queue_info(self, ursadb_id: str) -> QueueStatusDatabaseSchema: func.min(QueuedFile.created_at), func.max(QueuedFile.created_at), ).where(QueuedFile.ursadb_id == ursadb_id) - queue_info = session.exec(query).one() + queue_info = session.exec(query).first() - return QueueStatusDatabaseSchema( + queue_status = QueueStatusDatabaseSchema( size=queue_info[0], oldest_file=queue_info[1], newest_file=queue_info[2], ) + return queue_status + def delete_queued_files(self, ursadb_id: str) -> None: with self.session() as session: session.query(QueuedFile).filter_by(ursadb_id=ursadb_id).delete() From cfb671e8ed3e905d492831800796c00dea6a17a4 Mon Sep 17 00:00:00 2001 From: michalkrzem Date: Thu, 20 Feb 2025 10:55:56 +0100 Subject: [PATCH 20/24] lint --- src/db.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/db.py b/src/db.py index c0a3365f..bf8963ef 100644 --- a/src/db.py +++ b/src/db.py @@ -510,7 +510,7 @@ def get_queue_info(self, ursadb_id: str) -> QueueStatusDatabaseSchema: func.min(QueuedFile.created_at), func.max(QueuedFile.created_at), ).where(QueuedFile.ursadb_id == ursadb_id) - queue_info = session.exec(query).first() + queue_info = session.exec(query).one() queue_status = QueueStatusDatabaseSchema( size=queue_info[0], From 9bcad8638c652691b615a04628e7af436b555a5d Mon Sep 17 00:00:00 2001 From: michalkrzem Date: Thu, 20 Feb 2025 10:58:01 +0100 Subject: [PATCH 21/24] lint --- src/db.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/db.py b/src/db.py index bf8963ef..1521c581 100644 --- a/src/db.py +++ b/src/db.py @@ -512,14 +512,12 @@ def get_queue_info(self, ursadb_id: str) -> QueueStatusDatabaseSchema: ).where(QueuedFile.ursadb_id == ursadb_id) queue_info = session.exec(query).one() - queue_status = QueueStatusDatabaseSchema( + return QueueStatusDatabaseSchema( size=queue_info[0], oldest_file=queue_info[1], newest_file=queue_info[2], ) - return queue_status - def delete_queued_files(self, ursadb_id: str) -> None: with self.session() as session: session.query(QueuedFile).filter_by(ursadb_id=ursadb_id).delete() From a6ac02355324923eac55ed0e9aa918d10f577c7d Mon Sep 17 00:00:00 2001 From: michalkrzem Date: Thu, 20 Feb 2025 11:11:23 +0100 Subject: [PATCH 22/24] lint --- src/db.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/db.py b/src/db.py index 1521c581..74f54a14 100644 --- a/src/db.py +++ b/src/db.py @@ -10,7 +10,7 @@ from redis import StrictRedis from enum import Enum, auto from rq import Queue # type: ignore -from sqlalchemy import exists, func +from sqlalchemy import exists from sqlmodel import ( Session, create_engine, @@ -19,6 +19,7 @@ update, col, delete, + func, ) from .models.agentgroup import AgentGroup From f45e6efbea986f68fcccce4366814b74de30dbff Mon Sep 17 00:00:00 2001 From: michalkrzem Date: Thu, 20 Feb 2025 11:23:10 +0100 Subject: [PATCH 23/24] lint --- src/db.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/db.py b/src/db.py index 74f54a14..a2cbfacb 100644 --- a/src/db.py +++ b/src/db.py @@ -10,7 +10,7 @@ from redis import StrictRedis from enum import Enum, auto from rq import Queue # type: ignore -from sqlalchemy import exists +from sqlalchemy import exists, func from sqlmodel import ( Session, create_engine, @@ -19,7 +19,6 @@ update, col, delete, - func, ) from .models.agentgroup import AgentGroup @@ -511,7 +510,7 @@ def get_queue_info(self, ursadb_id: str) -> QueueStatusDatabaseSchema: func.min(QueuedFile.created_at), func.max(QueuedFile.created_at), ).where(QueuedFile.ursadb_id == ursadb_id) - queue_info = session.exec(query).one() + queue_info: QueueStatusDatabaseSchema = session.exec(query).one() return QueueStatusDatabaseSchema( size=queue_info[0], From f047c8c63e5c0d19c9785e97741b2a55d7338ef3 Mon Sep 17 00:00:00 2001 From: michalkrzem Date: Thu, 20 Feb 2025 11:28:02 +0100 Subject: [PATCH 24/24] lint --- src/db.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/db.py b/src/db.py index a2cbfacb..6489d415 100644 --- a/src/db.py +++ b/src/db.py @@ -505,12 +505,12 @@ def add_files_to_queue( def get_queue_info(self, ursadb_id: str) -> QueueStatusDatabaseSchema: with self.session() as session: - query = select( + query = select( # type: ignore func.count(QueuedFile.id), func.min(QueuedFile.created_at), func.max(QueuedFile.created_at), ).where(QueuedFile.ursadb_id == ursadb_id) - queue_info: QueueStatusDatabaseSchema = session.exec(query).one() + queue_info = session.exec(query).one() return QueueStatusDatabaseSchema( size=queue_info[0],