diff --git a/edge_orchestrator/edge_orchestrator/application/api_routes.py b/edge_orchestrator/edge_orchestrator/application/api_routes.py index 72bf22f7..a671580a 100644 --- a/edge_orchestrator/edge_orchestrator/application/api_routes.py +++ b/edge_orchestrator/edge_orchestrator/application/api_routes.py @@ -2,6 +2,10 @@ from typing_extensions import Annotated from edge_orchestrator.application.config import Settings, get_settings, get_supervisor +from edge_orchestrator.domain.ports.metadata_storage import MetadataStorage +from infrastructure.metadata_storage.metadata_storage_factory import ( + MetadataStorageFactory, +) api_router = APIRouter() @@ -16,24 +20,24 @@ async def home(settings: Annotated[Settings, Depends(get_settings)]): @api_router.get("/items") def read_all(settings: Annotated[Settings, Depends(get_settings)]): metadata_storage = get_supervisor( - settings.inventory_path, settings.station_configs_folder, + settings.inventory_path, settings.data_folder, settings.active_config_path, ).metadata_storage return metadata_storage.get_all_items_metadata() -# @api_router.get("/items/{item_id}") -# def get_item( -# item_id: str, -# metadata_storage: MetadataStorage = Depends( -# MetadataStorageFactory.get_metadata_storage -# ), -# ): -# return metadata_storage.get_item_metadata(item_id) -# -# +@api_router.get("/items/{item_id}") +def get_item( + item_id: str, + metadata_storage: MetadataStorage = Depends( + MetadataStorageFactory.get_metadata_storage + ), +): + return metadata_storage.get_item_metadata(item_id) + + # @api_router.get("/items/{item_id}/binaries/{camera_id}") # def get_item_binary( # item_id: str, diff --git a/edge_orchestrator/edge_orchestrator/application/config.py b/edge_orchestrator/edge_orchestrator/application/config.py index 3b6b68f4..a9ec2df8 100644 --- a/edge_orchestrator/edge_orchestrator/application/config.py +++ b/edge_orchestrator/edge_orchestrator/application/config.py @@ -4,6 +4,7 @@ from pydantic_settings import BaseSettings, SettingsConfigDict +from domain.ports.inventory import Inventory from edge_orchestrator.domain.models.edge_station import EdgeStation from edge_orchestrator.domain.use_cases.supervisor import Supervisor from edge_orchestrator.infrastructure.binary_storage.binary_storage_factory import ( @@ -44,13 +45,13 @@ def get_settings(): @lru_cache() def get_supervisor( - inventory_path: Path, station_configs_folder: Path, + inventory_path: Path, data_folder: Path, active_config_path: Path, ) -> Optional[Supervisor]: station_config = get_station_config( - data_folder, get_inventory(inventory_path), station_configs_folder + station_configs_folder, get_inventory(inventory_path), data_folder ) active_config_name = get_active_config_name(active_config_path) @@ -58,18 +59,21 @@ def get_supervisor( station_config.set_station_config(active_config_name) binary_storage = BinaryStorageFactory.get_binary_storage( - station_config.active_config.get("binary_storage")["type"], - **station_config.active_config.get("binary_storage")["params"], + station_config.active_config["binary_storage"].get("type"), + **station_config.active_config["binary_storage"].get("params", {}), ) edge_station = EdgeStation(station_config, data_folder) metadata_storage = MetadataStorageFactory.get_metadata_storage( - station_config.active_config.get("metadata_storage") + station_config.active_config["metadata_storage"]["type"], + **station_config.active_config["metadata_storage"].get("params", {}), ) model_forward = ModelForwardFactory.get_model_forward( - station_config.active_config.get("model_forward") + station_config.active_config["model_forward"]["type"], + **station_config.active_config["model_forward"].get("params", {}), ) telemetry_sink = TelemetrySinkFactory.get_telemetry_sink( - station_config.active_config.get("telemetry_sink") + station_config.active_config["telemetry_sink"]["type"], + **station_config.active_config["telemetry_sink"].get("params", {}), ) return Supervisor( binary_storage, @@ -84,7 +88,9 @@ def get_supervisor( @lru_cache() -def get_station_config(data_folder, inventory, station_configs_folder): +def get_station_config( + station_configs_folder: Path, inventory: Inventory, data_folder: Path +): return JsonStationConfig(station_configs_folder, inventory, data_folder) diff --git a/edge_orchestrator/edge_orchestrator/application/trigger_routes.py b/edge_orchestrator/edge_orchestrator/application/trigger_routes.py index bee31965..71ef6f4c 100644 --- a/edge_orchestrator/edge_orchestrator/application/trigger_routes.py +++ b/edge_orchestrator/edge_orchestrator/application/trigger_routes.py @@ -16,8 +16,8 @@ async def trigger_job( ): item = Item.from_nothing() supervisor = get_supervisor( - settings.inventory_path, settings.station_configs_folder, + settings.inventory_path, settings.data_folder, settings.active_config_path, ) diff --git a/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/metadata_storage_factory.py b/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/metadata_storage_factory.py index b8cd8600..44c6b079 100644 --- a/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/metadata_storage_factory.py +++ b/edge_orchestrator/edge_orchestrator/infrastructure/metadata_storage/metadata_storage_factory.py @@ -1,5 +1,5 @@ from functools import lru_cache -from typing import Dict, Type +from typing import Dict, Type, Optional, Any from edge_orchestrator.domain.ports.metadata_storage import MetadataStorage from edge_orchestrator.infrastructure.metadata_storage.azure_container_metadata_storage import ( @@ -17,6 +17,7 @@ from edge_orchestrator.infrastructure.metadata_storage.mongo_db_metadata_storage import ( MongoDbMetadataStorage, ) +from infrastructure.filesystem_helpers import get_tmp_path AVAILABLE_METADATA_STORAGES: Dict[str, Type[MetadataStorage]] = { "azure_container": AzureContainerMetadataStorage, @@ -31,11 +32,14 @@ class MetadataStorageFactory: @staticmethod @lru_cache() def get_metadata_storage( - metadata_storage_type: str = "filesystem", **kwargs + metadata_storage_type: Optional[str] = "filesystem", + **metadata_storage_config: Optional[Dict[str, Any]], ) -> MetadataStorage: + if not metadata_storage_type: + metadata_storage_type["src_directory_path"] = get_tmp_path() try: - return AVAILABLE_METADATA_STORAGES[metadata_storage_type]() - # return AVAILABLE_METADATA_STORAGES[metadata_storage_type](kwargs) + # return AVAILABLE_METADATA_STORAGES[metadata_storage_type]() + return AVAILABLE_METADATA_STORAGES[metadata_storage_type](**metadata_storage_config) except KeyError as err: raise ValueError( f"Unknown metadata storage type: {metadata_storage_type}" diff --git a/edge_orchestrator/edge_orchestrator/infrastructure/model_forward/model_forward_factory.py b/edge_orchestrator/edge_orchestrator/infrastructure/model_forward/model_forward_factory.py index 30f0d1d1..8671f26a 100644 --- a/edge_orchestrator/edge_orchestrator/infrastructure/model_forward/model_forward_factory.py +++ b/edge_orchestrator/edge_orchestrator/infrastructure/model_forward/model_forward_factory.py @@ -1,5 +1,5 @@ from functools import lru_cache -from typing import Dict, Type +from typing import Dict, Type, Optional, Any from edge_orchestrator.domain.ports.model_forward import ModelForward from edge_orchestrator.infrastructure.model_forward.fake_model_forward import ( @@ -18,9 +18,10 @@ class ModelForwardFactory: @staticmethod @lru_cache() - def get_model_forward(model_forward_type: str = "fake") -> ModelForward: + def get_model_forward(model_forward_type: Optional[str] = "fake", + **model_forward_config: Optional[Dict[str, Any]]) -> ModelForward: try: - return AVAILABLE_MODEL_FORWARD[model_forward_type]() + return AVAILABLE_MODEL_FORWARD[model_forward_type](**model_forward_config) except KeyError as err: raise ValueError( f"Unknown model forward type: {model_forward_type}" diff --git a/edge_orchestrator/edge_orchestrator/infrastructure/telemetry_sink/telemetry_sink_factory.py b/edge_orchestrator/edge_orchestrator/infrastructure/telemetry_sink/telemetry_sink_factory.py index 03b5e8c9..5f96b6c7 100644 --- a/edge_orchestrator/edge_orchestrator/infrastructure/telemetry_sink/telemetry_sink_factory.py +++ b/edge_orchestrator/edge_orchestrator/infrastructure/telemetry_sink/telemetry_sink_factory.py @@ -1,5 +1,5 @@ from functools import lru_cache -from typing import Dict, Type +from typing import Dict, Type, Optional, Any from edge_orchestrator.domain.ports.telemetry_sink import TelemetrySink from edge_orchestrator.infrastructure.telemetry_sink.azure_iot_hub_telemetry_sink import ( @@ -22,9 +22,10 @@ class TelemetrySinkFactory: @staticmethod @lru_cache() - def get_telemetry_sink(telemetry_sink_type: str = "fake") -> TelemetrySink: + def get_telemetry_sink(telemetry_sink_type: Optional[str] = "fake", + **telemetry_sink_config: Optional[Dict[str, Any]]) -> TelemetrySink: try: - return AVAILABLE_TELEMETRY_SINK[telemetry_sink_type]() + return AVAILABLE_TELEMETRY_SINK[telemetry_sink_type](**telemetry_sink_config) except KeyError as err: raise ValueError( f"Unknown telemetry sink type: {telemetry_sink_type}"