Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Baptiste O'Jeanson committed Aug 1, 2023
1 parent bc3086c commit d01856d
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 30 deletions.
26 changes: 15 additions & 11 deletions edge_orchestrator/edge_orchestrator/application/api_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
from typing_extensions import Annotated

from edge_orchestrator.application.config import Settings, get_settings, get_supervisor
from edge_orchestrator.domain.ports.metadata_storage import MetadataStorage
from infrastructure.metadata_storage.metadata_storage_factory import (
MetadataStorageFactory,
)

api_router = APIRouter()

Expand All @@ -16,24 +20,24 @@ async def home(settings: Annotated[Settings, Depends(get_settings)]):
@api_router.get("/items")
def read_all(settings: Annotated[Settings, Depends(get_settings)]):
metadata_storage = get_supervisor(
settings.inventory_path,
settings.station_configs_folder,
settings.inventory_path,
settings.data_folder,
settings.active_config_path,
).metadata_storage
return metadata_storage.get_all_items_metadata()


# @api_router.get("/items/{item_id}")
# def get_item(
# item_id: str,
# metadata_storage: MetadataStorage = Depends(
# MetadataStorageFactory.get_metadata_storage
# ),
# ):
# return metadata_storage.get_item_metadata(item_id)
#
#
@api_router.get("/items/{item_id}")
def get_item(
item_id: str,
metadata_storage: MetadataStorage = Depends(
MetadataStorageFactory.get_metadata_storage
),
):
return metadata_storage.get_item_metadata(item_id)


# @api_router.get("/items/{item_id}/binaries/{camera_id}")
# def get_item_binary(
# item_id: str,
Expand Down
22 changes: 14 additions & 8 deletions edge_orchestrator/edge_orchestrator/application/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from pydantic_settings import BaseSettings, SettingsConfigDict

from domain.ports.inventory import Inventory
from edge_orchestrator.domain.models.edge_station import EdgeStation
from edge_orchestrator.domain.use_cases.supervisor import Supervisor
from edge_orchestrator.infrastructure.binary_storage.binary_storage_factory import (
Expand Down Expand Up @@ -44,32 +45,35 @@ def get_settings():

@lru_cache()
def get_supervisor(
inventory_path: Path,
station_configs_folder: Path,
inventory_path: Path,
data_folder: Path,
active_config_path: Path,
) -> Optional[Supervisor]:
station_config = get_station_config(
data_folder, get_inventory(inventory_path), station_configs_folder
station_configs_folder, get_inventory(inventory_path), data_folder
)

active_config_name = get_active_config_name(active_config_path)
if active_config_name != "no_active_config":
station_config.set_station_config(active_config_name)

binary_storage = BinaryStorageFactory.get_binary_storage(
station_config.active_config.get("binary_storage")["type"],
**station_config.active_config.get("binary_storage")["params"],
station_config.active_config["binary_storage"].get("type"),
**station_config.active_config["binary_storage"].get("params", {}),
)
edge_station = EdgeStation(station_config, data_folder)
metadata_storage = MetadataStorageFactory.get_metadata_storage(
station_config.active_config.get("metadata_storage")
station_config.active_config["metadata_storage"]["type"],
**station_config.active_config["metadata_storage"].get("params", {}),
)
model_forward = ModelForwardFactory.get_model_forward(
station_config.active_config.get("model_forward")
station_config.active_config["model_forward"]["type"],
**station_config.active_config["model_forward"].get("params", {}),
)
telemetry_sink = TelemetrySinkFactory.get_telemetry_sink(
station_config.active_config.get("telemetry_sink")
station_config.active_config["telemetry_sink"]["type"],
**station_config.active_config["telemetry_sink"].get("params", {}),
)
return Supervisor(
binary_storage,
Expand All @@ -84,7 +88,9 @@ def get_supervisor(


@lru_cache()
def get_station_config(data_folder, inventory, station_configs_folder):
def get_station_config(
station_configs_folder: Path, inventory: Inventory, data_folder: Path
):
return JsonStationConfig(station_configs_folder, inventory, data_folder)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ async def trigger_job(
):
item = Item.from_nothing()
supervisor = get_supervisor(
settings.inventory_path,
settings.station_configs_folder,
settings.inventory_path,
settings.data_folder,
settings.active_config_path,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from functools import lru_cache
from typing import Dict, Type
from typing import Dict, Type, Optional, Any

from edge_orchestrator.domain.ports.metadata_storage import MetadataStorage
from edge_orchestrator.infrastructure.metadata_storage.azure_container_metadata_storage import (
Expand All @@ -17,6 +17,7 @@
from edge_orchestrator.infrastructure.metadata_storage.mongo_db_metadata_storage import (
MongoDbMetadataStorage,
)
from infrastructure.filesystem_helpers import get_tmp_path

AVAILABLE_METADATA_STORAGES: Dict[str, Type[MetadataStorage]] = {
"azure_container": AzureContainerMetadataStorage,
Expand All @@ -31,11 +32,14 @@ class MetadataStorageFactory:
@staticmethod
@lru_cache()
def get_metadata_storage(
metadata_storage_type: str = "filesystem", **kwargs
metadata_storage_type: Optional[str] = "filesystem",
**metadata_storage_config: Optional[Dict[str, Any]],
) -> MetadataStorage:
if not metadata_storage_type:
metadata_storage_type["src_directory_path"] = get_tmp_path()
try:
return AVAILABLE_METADATA_STORAGES[metadata_storage_type]()
# return AVAILABLE_METADATA_STORAGES[metadata_storage_type](kwargs)
# return AVAILABLE_METADATA_STORAGES[metadata_storage_type]()
return AVAILABLE_METADATA_STORAGES[metadata_storage_type](**metadata_storage_config)
except KeyError as err:
raise ValueError(
f"Unknown metadata storage type: {metadata_storage_type}"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from functools import lru_cache
from typing import Dict, Type
from typing import Dict, Type, Optional, Any

from edge_orchestrator.domain.ports.model_forward import ModelForward
from edge_orchestrator.infrastructure.model_forward.fake_model_forward import (
Expand All @@ -18,9 +18,10 @@
class ModelForwardFactory:
@staticmethod
@lru_cache()
def get_model_forward(model_forward_type: str = "fake") -> ModelForward:
def get_model_forward(model_forward_type: Optional[str] = "fake",
**model_forward_config: Optional[Dict[str, Any]]) -> ModelForward:
try:
return AVAILABLE_MODEL_FORWARD[model_forward_type]()
return AVAILABLE_MODEL_FORWARD[model_forward_type](**model_forward_config)
except KeyError as err:
raise ValueError(
f"Unknown model forward type: {model_forward_type}"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from functools import lru_cache
from typing import Dict, Type
from typing import Dict, Type, Optional, Any

from edge_orchestrator.domain.ports.telemetry_sink import TelemetrySink
from edge_orchestrator.infrastructure.telemetry_sink.azure_iot_hub_telemetry_sink import (
Expand All @@ -22,9 +22,10 @@
class TelemetrySinkFactory:
@staticmethod
@lru_cache()
def get_telemetry_sink(telemetry_sink_type: str = "fake") -> TelemetrySink:
def get_telemetry_sink(telemetry_sink_type: Optional[str] = "fake",
**telemetry_sink_config: Optional[Dict[str, Any]]) -> TelemetrySink:
try:
return AVAILABLE_TELEMETRY_SINK[telemetry_sink_type]()
return AVAILABLE_TELEMETRY_SINK[telemetry_sink_type](**telemetry_sink_config)
except KeyError as err:
raise ValueError(
f"Unknown telemetry sink type: {telemetry_sink_type}"
Expand Down

0 comments on commit d01856d

Please sign in to comment.