Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Baptiste O'Jeanson committed Aug 1, 2023
1 parent d01856d commit e2663fe
Show file tree
Hide file tree
Showing 19 changed files with 165 additions and 123 deletions.
2 changes: 1 addition & 1 deletion edge_orchestrator/config/.active_config
Original file line number Diff line number Diff line change
@@ -1 +1 @@
marker_classification_with_1_fake_camera
toto
16 changes: 8 additions & 8 deletions edge_orchestrator/edge_orchestrator/application/api_routes.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from fastapi import APIRouter, Depends
from typing_extensions import Annotated

from edge_orchestrator.application.config import Settings, get_settings, get_supervisor
from edge_orchestrator.application.config import (
Settings,
get_settings,
get_metadata_storage,
)
from edge_orchestrator.domain.ports.metadata_storage import MetadataStorage
from infrastructure.metadata_storage.metadata_storage_factory import (
MetadataStorageFactory,
Expand All @@ -18,13 +22,9 @@ async def home(settings: Annotated[Settings, Depends(get_settings)]):


@api_router.get("/items")
def read_all(settings: Annotated[Settings, Depends(get_settings)]):
metadata_storage = get_supervisor(
settings.station_configs_folder,
settings.inventory_path,
settings.data_folder,
settings.active_config_path,
).metadata_storage
def read_all(
metadata_storage: Annotated[MetadataStorage, Depends(get_metadata_storage)]
):
return metadata_storage.get_all_items_metadata()


Expand Down
134 changes: 83 additions & 51 deletions edge_orchestrator/edge_orchestrator/application/config.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from functools import lru_cache
from pathlib import Path
from typing import Optional

from pydantic_settings import BaseSettings, SettingsConfigDict

from domain.ports.inventory import Inventory
from edge_orchestrator import logger
from edge_orchestrator.domain.models.edge_station import EdgeStation
from edge_orchestrator.domain.use_cases.supervisor import Supervisor
from edge_orchestrator.domain.use_cases.uploader import Uploader
from edge_orchestrator.infrastructure.binary_storage.binary_storage_factory import (
BinaryStorageFactory,
)
Expand Down Expand Up @@ -44,59 +44,23 @@ def get_settings():


@lru_cache()
def get_supervisor(
station_configs_folder: Path,
inventory_path: Path,
data_folder: Path,
active_config_path: Path,
) -> Optional[Supervisor]:
station_config = get_station_config(
station_configs_folder, get_inventory(inventory_path), data_folder
def get_supervisor() -> Supervisor:
return Supervisor(
get_binary_storage(),
get_edge_station(),
get_metadata_storage(),
get_model_forward(),
get_station_config(),
get_telemetry_sink(),
)

active_config_name = get_active_config_name(active_config_path)
if active_config_name != "no_active_config":
station_config.set_station_config(active_config_name)

binary_storage = BinaryStorageFactory.get_binary_storage(
station_config.active_config["binary_storage"].get("type"),
**station_config.active_config["binary_storage"].get("params", {}),
)
edge_station = EdgeStation(station_config, data_folder)
metadata_storage = MetadataStorageFactory.get_metadata_storage(
station_config.active_config["metadata_storage"]["type"],
**station_config.active_config["metadata_storage"].get("params", {}),
)
model_forward = ModelForwardFactory.get_model_forward(
station_config.active_config["model_forward"]["type"],
**station_config.active_config["model_forward"].get("params", {}),
)
telemetry_sink = TelemetrySinkFactory.get_telemetry_sink(
station_config.active_config["telemetry_sink"]["type"],
**station_config.active_config["telemetry_sink"].get("params", {}),
)
return Supervisor(
binary_storage,
edge_station,
metadata_storage,
model_forward,
station_config,
telemetry_sink,
)
else:
return None


@lru_cache()
def get_station_config(
station_configs_folder: Path, inventory: Inventory, data_folder: Path
):
return JsonStationConfig(station_configs_folder, inventory, data_folder)


@lru_cache()
def get_inventory(inventory_path):
return JsonInventory(inventory_path)
def get_uploader() -> Uploader:
return Uploader(
get_metadata_storage(),
get_binary_storage(),
)


def get_active_config_name(active_config_path: Path) -> str:
Expand All @@ -105,3 +69,71 @@ def get_active_config_name(active_config_path: Path) -> str:
return active_config.read().strip()
else:
return "no_active_config"


@lru_cache()
def get_inventory():
return JsonInventory(get_settings().inventory_path)


@lru_cache()
def get_station_config():
settings = get_settings()
active_config_name = get_active_config_name(settings.active_config_path)
station_config = JsonStationConfig(
settings.station_configs_folder, get_inventory(), settings.data_folder
)
try:
station_config.set_station_config(active_config_name)
except KeyError:
logger.error(
f"config_name '{active_config_name}' is unknown.\n"
f"Valid configs are: {list(station_config.all_configs.keys())}"
)
return station_config


def get_active_config():
station_config = get_station_config()
return station_config.active_config


@lru_cache()
def get_binary_storage():
active_config = get_active_config()
return BinaryStorageFactory.get_binary_storage(
active_config["binary_storage"].get("type"),
**active_config["binary_storage"].get("params", {}),
)


@lru_cache()
def get_metadata_storage():
active_config = get_active_config()
return MetadataStorageFactory.get_metadata_storage(
active_config["metadata_storage"]["type"],
**active_config["metadata_storage"].get("params", {}),
)


@lru_cache()
def get_edge_station():
return EdgeStation(get_station_config())


@lru_cache()
def get_model_forward():
active_config = get_active_config()
return ModelForwardFactory.get_model_forward(
active_config["model_forward"]["type"],
**active_config["model_forward"].get("params", {}),
)


@lru_cache()
def get_telemetry_sink():
active_config = get_active_config()
return TelemetrySinkFactory.get_telemetry_sink(
active_config["telemetry_sink"]["type"],
**active_config["telemetry_sink"].get("params", {}),
)
60 changes: 36 additions & 24 deletions edge_orchestrator/edge_orchestrator/application/trigger_routes.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,61 @@
from fastapi import APIRouter, BackgroundTasks, Depends, File, UploadFile
from fastapi import APIRouter, BackgroundTasks, File, UploadFile, Depends
from fastapi.responses import JSONResponse
from typing_extensions import Annotated

from edge_orchestrator.application.config import Settings, get_settings, get_supervisor
from edge_orchestrator.application.config import (
get_supervisor,
get_station_config,
get_uploader,
)
from edge_orchestrator.domain.models.item import Item
from edge_orchestrator.domain.ports.station_config import StationConfig

trigger_router = APIRouter()


@trigger_router.post("/trigger")
async def trigger_job(
settings: Annotated[Settings, Depends(get_settings)],
station_config: Annotated[StationConfig, Depends(get_station_config)],
image: UploadFile = None,
background_tasks: BackgroundTasks = None,
):
item = Item.from_nothing()
supervisor = get_supervisor(
settings.station_configs_folder,
settings.inventory_path,
settings.data_folder,
settings.active_config_path,
)
if supervisor is None:
if station_config.active_config:
supervisor = get_supervisor()
item = Item.from_nothing()
if image:
contents = image.file.read()
camera_id = supervisor.station_config.get_cameras()[0]
item.binaries = {camera_id: contents}
background_tasks.add_task(supervisor.inspect, item)
return {"item_id": item.id}
else:
return JSONResponse(
status_code=403,
content={
"message": "No active configuration selected! "
"Set the active station configuration before triggering the inspection."
},
)
else:
if image:
contents = image.file.read()
camera_id = supervisor.station_config.get_cameras()[0]
item.binaries = {camera_id: contents}
background_tasks.add_task(supervisor.inspect, item)
return {"item_id": item.id}


@trigger_router.post("/upload")
async def upload_job(
image: UploadFile = File(...), background_tasks: BackgroundTasks = None
station_config: Annotated[StationConfig, Depends(get_station_config)],
image: UploadFile = File(...),
background_tasks: BackgroundTasks = None,
):
item = Item.from_nothing()
contents = image.file.read()
item.binaries = {"0": contents}
background_tasks.add_task(uploader.upload, item)
return {"item_id": item.id}
if station_config.active_config:
uploader = get_uploader()
item = Item.from_nothing()
contents = image.file.read()
item.binaries = {"0": contents}
background_tasks.add_task(uploader.upload, item)
return {"item_id": item.id}
else:
return JSONResponse(
status_code=403,
content={
"message": "No active configuration selected! "
"Set the active station configuration before uploading the image."
},
)
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
from pathlib import Path
from typing import Dict, Tuple, Any, List

from domain.models.camera import Camera
from edge_orchestrator.domain.ports.station_config import StationConfig


class EdgeStation:
def __init__(self, station_config: StationConfig, storage: Path):
def __init__(self, station_config: StationConfig):
self.station_config = station_config
self.storage = storage
self.cameras = self.register_cameras()

def register_cameras(self) -> List[Camera]:
Expand Down
12 changes: 5 additions & 7 deletions edge_orchestrator/edge_orchestrator/domain/use_cases/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@
from collections import OrderedDict
from enum import Enum

from edge_orchestrator.api_config import (
get_binary_storage,
get_metadata_storage,
logger,
)
from domain.ports.binary_storage import BinaryStorage
from domain.ports.metadata_storage import MetadataStorage
from edge_orchestrator import logger
from edge_orchestrator.domain.models.item import Item


Expand All @@ -18,8 +16,8 @@ class UploaderState(Enum):
class Uploader:
def __init__(
self,
metadata_storage=get_metadata_storage(),
binary_storage=get_binary_storage(),
metadata_storage: MetadataStorage,
binary_storage: BinaryStorage,
):
self.metadata_storage = metadata_storage
self.binary_storage = binary_storage
Expand Down
2 changes: 1 addition & 1 deletion edge_orchestrator/edge_orchestrator/environment/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,5 @@ def __init__(self):
self.inventory,
self.ROOT_PATH / "data",
)
self.edge_station = EdgeStation(self.station_config, self.ROOT_PATH / "data")
self.edge_station = EdgeStation(self.station_config)
self.telemetry_sink = FakeTelemetrySink()
2 changes: 1 addition & 1 deletion edge_orchestrator/edge_orchestrator/environment/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,6 @@ def __init__(self):
inventory=self.inventory,
data_folder=self.ROOT_PATH / "data",
)
self.edge_station = EdgeStation(self.station_config, self.ROOT_PATH / "data")
self.edge_station = EdgeStation(self.station_config)
self.model_forward = TFServingWrapper(self.SERVING_MODEL_URL)
self.telemetry_sink = PostgresTelemetrySink(self.POSTGRES_DB_URI)
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@ def __init__(self):
self.inventory,
self.ROOT_PATH / "data",
)
self.edge_station = EdgeStation(self.station_config, self.ROOT_PATH / "data")
self.edge_station = EdgeStation(self.station_config)
self.model_forward = TFServingWrapper(self.SERVING_MODEL_URL)
self.telemetry_sink = AzureIotHubTelemetrySink()
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ def __init__(self):
self.inventory,
self.ROOT_PATH / "data",
)
self.edge_station = EdgeStation(self.station_config, self.ROOT_PATH / "data")
self.edge_station = EdgeStation(self.station_config)
self.model_forward = TFServingWrapper(self.SERVING_MODEL_URL)
self.telemetry_sink = AzureIotHubTelemetrySink()
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ def __init__(self):
self.inventory,
self.ROOT_PATH / "data",
)
self.edge_station = EdgeStation(self.station_config, self.ROOT_PATH / "data")
self.edge_station = EdgeStation(self.station_config)
self.model_forward = TFServingWrapper(self.SERVING_MODEL_URL)
self.telemetry_sink = AzureIotHubTelemetrySink()
2 changes: 1 addition & 1 deletion edge_orchestrator/edge_orchestrator/environment/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,6 @@ def __init__(self):
self.station_config = JsonStationConfig(
TEST_STATION_CONFIGS_FOLDER_PATH, self.inventory, TEST_DATA_FOLDER_PATH
)
self.edge_station = EdgeStation(self.station_config, TEST_DATA_FOLDER_PATH)
self.edge_station = EdgeStation(self.station_config)
self.model_forward = TFServingWrapper(self.SERVING_MODEL_URL)
self.telemetry_sink = PostgresTelemetrySink(self.POSTGRES_DB_URI)
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,6 @@ def __init__(self):
self.inventory,
self.ROOT_PATH / "data",
)
self.edge_station = EdgeStation(self.station_config, self.ROOT_PATH / "data")
self.edge_station = EdgeStation(self.station_config)
self.model_forward = TFServingWrapper(self.SERVING_MODEL_URL)
self.telemetry_sink = FakeTelemetrySink()
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,16 @@ class MetadataStorageFactory:
@staticmethod
@lru_cache()
def get_metadata_storage(
metadata_storage_type: Optional[str] = "filesystem",
**metadata_storage_config: Optional[Dict[str, Any]],
metadata_storage_type: Optional[str] = "filesystem",
**metadata_storage_config: Optional[Dict[str, Any]],
) -> MetadataStorage:
if not metadata_storage_type:
metadata_storage_type["src_directory_path"] = get_tmp_path()
try:
# return AVAILABLE_METADATA_STORAGES[metadata_storage_type]()
return AVAILABLE_METADATA_STORAGES[metadata_storage_type](**metadata_storage_config)
return AVAILABLE_METADATA_STORAGES[metadata_storage_type](
**metadata_storage_config
)
except KeyError as err:
raise ValueError(
f"Unknown metadata storage type: {metadata_storage_type}"
Expand Down
Loading

0 comments on commit e2663fe

Please sign in to comment.