diff --git a/edge_orchestrator/config/.active_config b/edge_orchestrator/config/.active_config index e2115f88..fbcf12d5 100644 --- a/edge_orchestrator/config/.active_config +++ b/edge_orchestrator/config/.active_config @@ -1 +1 @@ -marker_classification_with_1_fake_camera +toto diff --git a/edge_orchestrator/edge_orchestrator/application/api_routes.py b/edge_orchestrator/edge_orchestrator/application/api_routes.py index a671580a..b585ce38 100644 --- a/edge_orchestrator/edge_orchestrator/application/api_routes.py +++ b/edge_orchestrator/edge_orchestrator/application/api_routes.py @@ -1,7 +1,11 @@ from fastapi import APIRouter, Depends from typing_extensions import Annotated -from edge_orchestrator.application.config import Settings, get_settings, get_supervisor +from edge_orchestrator.application.config import ( + Settings, + get_settings, + get_metadata_storage, +) from edge_orchestrator.domain.ports.metadata_storage import MetadataStorage from infrastructure.metadata_storage.metadata_storage_factory import ( MetadataStorageFactory, @@ -18,13 +22,9 @@ async def home(settings: Annotated[Settings, Depends(get_settings)]): @api_router.get("/items") -def read_all(settings: Annotated[Settings, Depends(get_settings)]): - metadata_storage = get_supervisor( - settings.station_configs_folder, - settings.inventory_path, - settings.data_folder, - settings.active_config_path, - ).metadata_storage +def read_all( + metadata_storage: Annotated[MetadataStorage, Depends(get_metadata_storage)] +): return metadata_storage.get_all_items_metadata() diff --git a/edge_orchestrator/edge_orchestrator/application/config.py b/edge_orchestrator/edge_orchestrator/application/config.py index a9ec2df8..462f9064 100644 --- a/edge_orchestrator/edge_orchestrator/application/config.py +++ b/edge_orchestrator/edge_orchestrator/application/config.py @@ -1,12 +1,12 @@ from functools import lru_cache from pathlib import Path -from typing import Optional from pydantic_settings import BaseSettings, SettingsConfigDict -from domain.ports.inventory import Inventory +from edge_orchestrator import logger from edge_orchestrator.domain.models.edge_station import EdgeStation from edge_orchestrator.domain.use_cases.supervisor import Supervisor +from edge_orchestrator.domain.use_cases.uploader import Uploader from edge_orchestrator.infrastructure.binary_storage.binary_storage_factory import ( BinaryStorageFactory, ) @@ -44,59 +44,23 @@ def get_settings(): @lru_cache() -def get_supervisor( - station_configs_folder: Path, - inventory_path: Path, - data_folder: Path, - active_config_path: Path, -) -> Optional[Supervisor]: - station_config = get_station_config( - station_configs_folder, get_inventory(inventory_path), data_folder +def get_supervisor() -> Supervisor: + return Supervisor( + get_binary_storage(), + get_edge_station(), + get_metadata_storage(), + get_model_forward(), + get_station_config(), + get_telemetry_sink(), ) - active_config_name = get_active_config_name(active_config_path) - if active_config_name != "no_active_config": - station_config.set_station_config(active_config_name) - - binary_storage = BinaryStorageFactory.get_binary_storage( - station_config.active_config["binary_storage"].get("type"), - **station_config.active_config["binary_storage"].get("params", {}), - ) - edge_station = EdgeStation(station_config, data_folder) - metadata_storage = MetadataStorageFactory.get_metadata_storage( - station_config.active_config["metadata_storage"]["type"], - **station_config.active_config["metadata_storage"].get("params", {}), - ) - model_forward = ModelForwardFactory.get_model_forward( - station_config.active_config["model_forward"]["type"], - **station_config.active_config["model_forward"].get("params", {}), - ) - telemetry_sink = TelemetrySinkFactory.get_telemetry_sink( - station_config.active_config["telemetry_sink"]["type"], - **station_config.active_config["telemetry_sink"].get("params", {}), - ) - return Supervisor( - binary_storage, - edge_station, - metadata_storage, - model_forward, - station_config, - telemetry_sink, - ) - else: - return None - - -@lru_cache() -def get_station_config( - station_configs_folder: Path, inventory: Inventory, data_folder: Path -): - return JsonStationConfig(station_configs_folder, inventory, data_folder) - @lru_cache() -def get_inventory(inventory_path): - return JsonInventory(inventory_path) +def get_uploader() -> Uploader: + return Uploader( + get_metadata_storage(), + get_binary_storage(), + ) def get_active_config_name(active_config_path: Path) -> str: @@ -105,3 +69,71 @@ def get_active_config_name(active_config_path: Path) -> str: return active_config.read().strip() else: return "no_active_config" + + +@lru_cache() +def get_inventory(): + return JsonInventory(get_settings().inventory_path) + + +@lru_cache() +def get_station_config(): + settings = get_settings() + active_config_name = get_active_config_name(settings.active_config_path) + station_config = JsonStationConfig( + settings.station_configs_folder, get_inventory(), settings.data_folder + ) + try: + station_config.set_station_config(active_config_name) + except KeyError: + logger.error( + f"config_name '{active_config_name}' is unknown.\n" + f"Valid configs are: {list(station_config.all_configs.keys())}" + ) + return station_config + + +def get_active_config(): + station_config = get_station_config() + return station_config.active_config + + +@lru_cache() +def get_binary_storage(): + active_config = get_active_config() + return BinaryStorageFactory.get_binary_storage( + active_config["binary_storage"].get("type"), + **active_config["binary_storage"].get("params", {}), + ) + + +@lru_cache() +def get_metadata_storage(): + active_config = get_active_config() + return MetadataStorageFactory.get_metadata_storage( + active_config["metadata_storage"]["type"], + **active_config["metadata_storage"].get("params", {}), + ) + + +@lru_cache() +def get_edge_station(): + return EdgeStation(get_station_config()) + + +@lru_cache() +def get_model_forward(): + active_config = get_active_config() + return ModelForwardFactory.get_model_forward( + active_config["model_forward"]["type"], + **active_config["model_forward"].get("params", {}), + ) + + +@lru_cache() +def get_telemetry_sink(): + active_config = get_active_config() + return TelemetrySinkFactory.get_telemetry_sink( + active_config["telemetry_sink"]["type"], + **active_config["telemetry_sink"].get("params", {}), + ) diff --git a/edge_orchestrator/edge_orchestrator/application/trigger_routes.py b/edge_orchestrator/edge_orchestrator/application/trigger_routes.py index 71ef6f4c..d2a575e4 100644 --- a/edge_orchestrator/edge_orchestrator/application/trigger_routes.py +++ b/edge_orchestrator/edge_orchestrator/application/trigger_routes.py @@ -1,27 +1,34 @@ -from fastapi import APIRouter, BackgroundTasks, Depends, File, UploadFile +from fastapi import APIRouter, BackgroundTasks, File, UploadFile, Depends from fastapi.responses import JSONResponse from typing_extensions import Annotated -from edge_orchestrator.application.config import Settings, get_settings, get_supervisor +from edge_orchestrator.application.config import ( + get_supervisor, + get_station_config, + get_uploader, +) from edge_orchestrator.domain.models.item import Item +from edge_orchestrator.domain.ports.station_config import StationConfig trigger_router = APIRouter() @trigger_router.post("/trigger") async def trigger_job( - settings: Annotated[Settings, Depends(get_settings)], + station_config: Annotated[StationConfig, Depends(get_station_config)], image: UploadFile = None, background_tasks: BackgroundTasks = None, ): - item = Item.from_nothing() - supervisor = get_supervisor( - settings.station_configs_folder, - settings.inventory_path, - settings.data_folder, - settings.active_config_path, - ) - if supervisor is None: + if station_config.active_config: + supervisor = get_supervisor() + item = Item.from_nothing() + if image: + contents = image.file.read() + camera_id = supervisor.station_config.get_cameras()[0] + item.binaries = {camera_id: contents} + background_tasks.add_task(supervisor.inspect, item) + return {"item_id": item.id} + else: return JSONResponse( status_code=403, content={ @@ -29,21 +36,26 @@ async def trigger_job( "Set the active station configuration before triggering the inspection." }, ) - else: - if image: - contents = image.file.read() - camera_id = supervisor.station_config.get_cameras()[0] - item.binaries = {camera_id: contents} - background_tasks.add_task(supervisor.inspect, item) - return {"item_id": item.id} @trigger_router.post("/upload") async def upload_job( - image: UploadFile = File(...), background_tasks: BackgroundTasks = None + station_config: Annotated[StationConfig, Depends(get_station_config)], + image: UploadFile = File(...), + background_tasks: BackgroundTasks = None, ): - item = Item.from_nothing() - contents = image.file.read() - item.binaries = {"0": contents} - background_tasks.add_task(uploader.upload, item) - return {"item_id": item.id} + if station_config.active_config: + uploader = get_uploader() + item = Item.from_nothing() + contents = image.file.read() + item.binaries = {"0": contents} + background_tasks.add_task(uploader.upload, item) + return {"item_id": item.id} + else: + return JSONResponse( + status_code=403, + content={ + "message": "No active configuration selected! " + "Set the active station configuration before uploading the image." + }, + ) diff --git a/edge_orchestrator/edge_orchestrator/domain/models/edge_station.py b/edge_orchestrator/edge_orchestrator/domain/models/edge_station.py index 9d584344..d227a6e3 100644 --- a/edge_orchestrator/edge_orchestrator/domain/models/edge_station.py +++ b/edge_orchestrator/edge_orchestrator/domain/models/edge_station.py @@ -1,4 +1,3 @@ -from pathlib import Path from typing import Dict, Tuple, Any, List from domain.models.camera import Camera @@ -6,9 +5,8 @@ class EdgeStation: - def __init__(self, station_config: StationConfig, storage: Path): + def __init__(self, station_config: StationConfig): self.station_config = station_config - self.storage = storage self.cameras = self.register_cameras() def register_cameras(self) -> List[Camera]: diff --git a/edge_orchestrator/edge_orchestrator/domain/use_cases/uploader.py b/edge_orchestrator/edge_orchestrator/domain/use_cases/uploader.py index 648fb620..561d6ed4 100644 --- a/edge_orchestrator/edge_orchestrator/domain/use_cases/uploader.py +++ b/edge_orchestrator/edge_orchestrator/domain/use_cases/uploader.py @@ -2,11 +2,9 @@ from collections import OrderedDict from enum import Enum -from edge_orchestrator.api_config import ( - get_binary_storage, - get_metadata_storage, - logger, -) +from domain.ports.binary_storage import BinaryStorage +from domain.ports.metadata_storage import MetadataStorage +from edge_orchestrator import logger from edge_orchestrator.domain.models.item import Item @@ -18,8 +16,8 @@ class UploaderState(Enum): class Uploader: def __init__( self, - metadata_storage=get_metadata_storage(), - binary_storage=get_binary_storage(), + metadata_storage: MetadataStorage, + binary_storage: BinaryStorage, ): self.metadata_storage = metadata_storage self.binary_storage = binary_storage diff --git a/edge_orchestrator/edge_orchestrator/environment/default.py b/edge_orchestrator/edge_orchestrator/environment/default.py index fc3044ff..835888f4 100644 --- a/edge_orchestrator/edge_orchestrator/environment/default.py +++ b/edge_orchestrator/edge_orchestrator/environment/default.py @@ -31,5 +31,5 @@ def __init__(self): self.inventory, self.ROOT_PATH / "data", ) - self.edge_station = EdgeStation(self.station_config, self.ROOT_PATH / "data") + self.edge_station = EdgeStation(self.station_config) self.telemetry_sink = FakeTelemetrySink() diff --git a/edge_orchestrator/edge_orchestrator/environment/docker.py b/edge_orchestrator/edge_orchestrator/environment/docker.py index 0b4645e3..c4ee0ca6 100644 --- a/edge_orchestrator/edge_orchestrator/environment/docker.py +++ b/edge_orchestrator/edge_orchestrator/environment/docker.py @@ -40,6 +40,6 @@ def __init__(self): inventory=self.inventory, data_folder=self.ROOT_PATH / "data", ) - self.edge_station = EdgeStation(self.station_config, self.ROOT_PATH / "data") + self.edge_station = EdgeStation(self.station_config) self.model_forward = TFServingWrapper(self.SERVING_MODEL_URL) self.telemetry_sink = PostgresTelemetrySink(self.POSTGRES_DB_URI) diff --git a/edge_orchestrator/edge_orchestrator/environment/edge_with_azure_container_storage.py b/edge_orchestrator/edge_orchestrator/environment/edge_with_azure_container_storage.py index 40b5c7f9..731258e6 100644 --- a/edge_orchestrator/edge_orchestrator/environment/edge_with_azure_container_storage.py +++ b/edge_orchestrator/edge_orchestrator/environment/edge_with_azure_container_storage.py @@ -34,6 +34,6 @@ def __init__(self): self.inventory, self.ROOT_PATH / "data", ) - self.edge_station = EdgeStation(self.station_config, self.ROOT_PATH / "data") + self.edge_station = EdgeStation(self.station_config) self.model_forward = TFServingWrapper(self.SERVING_MODEL_URL) self.telemetry_sink = AzureIotHubTelemetrySink() diff --git a/edge_orchestrator/edge_orchestrator/environment/edge_with_filesystem_metadata_storage.py b/edge_orchestrator/edge_orchestrator/environment/edge_with_filesystem_metadata_storage.py index 689e781a..b2c5e11b 100644 --- a/edge_orchestrator/edge_orchestrator/environment/edge_with_filesystem_metadata_storage.py +++ b/edge_orchestrator/edge_orchestrator/environment/edge_with_filesystem_metadata_storage.py @@ -38,6 +38,6 @@ def __init__(self): self.inventory, self.ROOT_PATH / "data", ) - self.edge_station = EdgeStation(self.station_config, self.ROOT_PATH / "data") + self.edge_station = EdgeStation(self.station_config) self.model_forward = TFServingWrapper(self.SERVING_MODEL_URL) self.telemetry_sink = AzureIotHubTelemetrySink() diff --git a/edge_orchestrator/edge_orchestrator/environment/edge_with_mongo_db_metadata_storage.py b/edge_orchestrator/edge_orchestrator/environment/edge_with_mongo_db_metadata_storage.py index cec93e22..60edce41 100644 --- a/edge_orchestrator/edge_orchestrator/environment/edge_with_mongo_db_metadata_storage.py +++ b/edge_orchestrator/edge_orchestrator/environment/edge_with_mongo_db_metadata_storage.py @@ -38,6 +38,6 @@ def __init__(self): self.inventory, self.ROOT_PATH / "data", ) - self.edge_station = EdgeStation(self.station_config, self.ROOT_PATH / "data") + self.edge_station = EdgeStation(self.station_config) self.model_forward = TFServingWrapper(self.SERVING_MODEL_URL) self.telemetry_sink = AzureIotHubTelemetrySink() diff --git a/edge_orchestrator/edge_orchestrator/environment/test.py b/edge_orchestrator/edge_orchestrator/environment/test.py index 099a1206..5cf58f2c 100644 --- a/edge_orchestrator/edge_orchestrator/environment/test.py +++ b/edge_orchestrator/edge_orchestrator/environment/test.py @@ -44,6 +44,6 @@ def __init__(self): self.station_config = JsonStationConfig( TEST_STATION_CONFIGS_FOLDER_PATH, self.inventory, TEST_DATA_FOLDER_PATH ) - self.edge_station = EdgeStation(self.station_config, TEST_DATA_FOLDER_PATH) + self.edge_station = EdgeStation(self.station_config) self.model_forward = TFServingWrapper(self.SERVING_MODEL_URL) self.telemetry_sink = PostgresTelemetrySink(self.POSTGRES_DB_URI) diff --git a/edge_orchestrator/edge_orchestrator/environment/upload_with_gcp_bucket.py b/edge_orchestrator/edge_orchestrator/environment/upload_with_gcp_bucket.py index b097e620..f32f13d9 100644 --- a/edge_orchestrator/edge_orchestrator/environment/upload_with_gcp_bucket.py +++ b/edge_orchestrator/edge_orchestrator/environment/upload_with_gcp_bucket.py @@ -35,6 +35,6 @@ def __init__(self): self.inventory, self.ROOT_PATH / "data", ) - self.edge_station = EdgeStation(self.station_config, self.ROOT_PATH / "data") + self.edge_station = EdgeStation(self.station_config) self.model_forward = TFServingWrapper(self.SERVING_MODEL_URL) self.telemetry_sink = FakeTelemetrySink() diff --git a/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/metadata_storage_factory.py b/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/metadata_storage_factory.py index 44c6b079..96f987cd 100644 --- a/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/metadata_storage_factory.py +++ b/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/metadata_storage_factory.py @@ -32,14 +32,16 @@ class MetadataStorageFactory: @staticmethod @lru_cache() def get_metadata_storage( - metadata_storage_type: Optional[str] = "filesystem", - **metadata_storage_config: Optional[Dict[str, Any]], + metadata_storage_type: Optional[str] = "filesystem", + **metadata_storage_config: Optional[Dict[str, Any]], ) -> MetadataStorage: if not metadata_storage_type: metadata_storage_type["src_directory_path"] = get_tmp_path() try: # return AVAILABLE_METADATA_STORAGES[metadata_storage_type]() - return AVAILABLE_METADATA_STORAGES[metadata_storage_type](**metadata_storage_config) + return AVAILABLE_METADATA_STORAGES[metadata_storage_type]( + **metadata_storage_config + ) except KeyError as err: raise ValueError( f"Unknown metadata storage type: {metadata_storage_type}" diff --git a/edge_orchestrator/edge_orchestrator/infrastructure/model_forward/model_forward_factory.py b/edge_orchestrator/edge_orchestrator/infrastructure/model_forward/model_forward_factory.py index 8671f26a..8063dba9 100644 --- a/edge_orchestrator/edge_orchestrator/infrastructure/model_forward/model_forward_factory.py +++ b/edge_orchestrator/edge_orchestrator/infrastructure/model_forward/model_forward_factory.py @@ -18,8 +18,10 @@ class ModelForwardFactory: @staticmethod @lru_cache() - def get_model_forward(model_forward_type: Optional[str] = "fake", - **model_forward_config: Optional[Dict[str, Any]]) -> ModelForward: + def get_model_forward( + model_forward_type: Optional[str] = "fake", + **model_forward_config: Optional[Dict[str, Any]], + ) -> ModelForward: try: return AVAILABLE_MODEL_FORWARD[model_forward_type](**model_forward_config) except KeyError as err: diff --git a/edge_orchestrator/edge_orchestrator/infrastructure/station_config/json_station_config.py b/edge_orchestrator/edge_orchestrator/infrastructure/station_config/json_station_config.py index 3911e578..ea3e4ed3 100644 --- a/edge_orchestrator/edge_orchestrator/infrastructure/station_config/json_station_config.py +++ b/edge_orchestrator/edge_orchestrator/infrastructure/station_config/json_station_config.py @@ -51,14 +51,9 @@ def load(self): self._check_station_config_based_on_inventory(content) def set_station_config(self, config_name: str): - try: - self.active_config_name = config_name - self.active_config = self.all_configs[self.active_config_name] - logger.info(f"Activated the configuration {self.active_config_name}") - except KeyError: - raise KeyError( - f"{config_name} is unknown. Valid configs are {list(self.all_configs.keys())}" - ) + self.active_config = self.all_configs[config_name] + self.active_config_name = config_name + logger.info(f"Activated the configuration {self.active_config_name}") def get_model_pipeline_for_camera(self, camera_id: str) -> List[ModelInfos]: model_pipeline = [] diff --git a/edge_orchestrator/edge_orchestrator/infrastructure/telemetry_sink/telemetry_sink_factory.py b/edge_orchestrator/edge_orchestrator/infrastructure/telemetry_sink/telemetry_sink_factory.py index 5f96b6c7..59b1d53f 100644 --- a/edge_orchestrator/edge_orchestrator/infrastructure/telemetry_sink/telemetry_sink_factory.py +++ b/edge_orchestrator/edge_orchestrator/infrastructure/telemetry_sink/telemetry_sink_factory.py @@ -22,10 +22,14 @@ class TelemetrySinkFactory: @staticmethod @lru_cache() - def get_telemetry_sink(telemetry_sink_type: Optional[str] = "fake", - **telemetry_sink_config: Optional[Dict[str, Any]]) -> TelemetrySink: + def get_telemetry_sink( + telemetry_sink_type: Optional[str] = "fake", + **telemetry_sink_config: Optional[Dict[str, Any]], + ) -> TelemetrySink: try: - return AVAILABLE_TELEMETRY_SINK[telemetry_sink_type](**telemetry_sink_config) + return AVAILABLE_TELEMETRY_SINK[telemetry_sink_type]( + **telemetry_sink_config + ) except KeyError as err: raise ValueError( f"Unknown telemetry sink type: {telemetry_sink_type}" diff --git a/edge_orchestrator/tests/unit_tests/domain/models/test_edge_station.py b/edge_orchestrator/tests/unit_tests/domain/models/test_edge_station.py index 7fa6b067..5f1af5a7 100644 --- a/edge_orchestrator/tests/unit_tests/domain/models/test_edge_station.py +++ b/edge_orchestrator/tests/unit_tests/domain/models/test_edge_station.py @@ -8,7 +8,6 @@ from edge_orchestrator.domain.models.edge_station import EdgeStation from edge_orchestrator.domain.ports.station_config import StationConfig from edge_orchestrator.infrastructure.camera.fake_camera import FakeCamera -from tests.conftest import TEST_DATA_FOLDER_PATH class TestEdgeStation: @@ -18,7 +17,7 @@ def test_register_cameras_raises_exception_when_no_active_configuration_is_set( # Given station_config: StationConfig = get_station_config() - edge_station = EdgeStation(station_config, TEST_DATA_FOLDER_PATH) + edge_station = EdgeStation(station_config) # Then with pytest.raises(TypeError) as error: @@ -30,7 +29,7 @@ def test_capture_should_raise_exception_when_cameras_are_not_registered(self): station_config: StationConfig = get_station_config() station_config.set_station_config("station_config_TEST") - edge_station = EdgeStation(station_config, TEST_DATA_FOLDER_PATH) + edge_station = EdgeStation(station_config) # Then with pytest.raises(AttributeError) as error: @@ -57,7 +56,7 @@ def test_capture_should_instantiate_item_with_1_binary( station_config: StationConfig = get_station_config() station_config.set_station_config("station_config_TEST") - edge_station = EdgeStation(station_config, TEST_DATA_FOLDER_PATH) + edge_station = EdgeStation(station_config) # When edge_station.register_cameras() diff --git a/edge_orchestrator/tests/unit_tests/domain/test_supervisor.py b/edge_orchestrator/tests/unit_tests/domain/test_supervisor.py index c103ab65..c6a06a60 100644 --- a/edge_orchestrator/tests/unit_tests/domain/test_supervisor.py +++ b/edge_orchestrator/tests/unit_tests/domain/test_supervisor.py @@ -587,7 +587,7 @@ async def test_set_decision_should_send_final_decision_to_telemetry_sink( station_config.set_station_config("station_config_TEST") supervisor = Supervisor( binary_storage=InMemoryBinaryStorage(), - edge_station=EdgeStation(station_config, TEST_DATA_FOLDER_PATH), + edge_station=EdgeStation(station_config), metadata_storage=InMemoryMetadataStorage(), model_forward=FakeModelForward(), station_config=station_config, @@ -632,7 +632,7 @@ async def test_inspect_should_log_information_about_item_processing( station_config.set_station_config("station_config_TEST") supervisor = Supervisor( binary_storage=InMemoryBinaryStorage(), - edge_station=EdgeStation(station_config, TEST_DATA_FOLDER_PATH), + edge_station=EdgeStation(station_config), metadata_storage=InMemoryMetadataStorage(), model_forward=FakeModelForward(), station_config=station_config,