From 334cb29ba25a67f2930c3ac08badbbabe9359818 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Mon, 27 Jan 2025 06:53:54 -0800 Subject: [PATCH] fix: snafu from renaming ml_backends to processing_services (#697) --- docker-compose.ci.yml | 8 +- docker-compose.yml | 9 +- processing_services/example/api/api.py | 110 +++++++++++++++++++++++++ 3 files changed, 120 insertions(+), 7 deletions(-) diff --git a/docker-compose.ci.yml b/docker-compose.ci.yml index f3c6d53cc..fc49c5c04 100644 --- a/docker-compose.ci.yml +++ b/docker-compose.ci.yml @@ -8,7 +8,7 @@ services: - postgres - redis - minio-init - - processing_service + - ml_backend env_file: - ./.envs/.ci/.django - ./.envs/.ci/.postgres @@ -43,10 +43,8 @@ services: - ./compose/local/minio/init.sh:/etc/minio/init.sh entrypoint: /etc/minio/init.sh - processing_service: + ml_backend: build: context: ./processing_services/example volumes: - - ./processing_services/example/:/app:processing_service - ports: - - "2000:2000" + - ./processing_services/example/:/app diff --git a/docker-compose.yml b/docker-compose.yml index 478f85101..a29b8a278 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -23,6 +23,7 @@ services: - postgres - redis - minio-init + - ml_backend volumes: - .:/app:z env_file: @@ -141,8 +142,12 @@ services: ml_backend: build: - context: ./ml_backends/example + context: ./processing_services/example volumes: - - ./ml_backends/example/:/app + - ./processing_services/example/:/app ports: - "2005:2000" + networks: + default: + aliases: + - processing_service diff --git a/processing_services/example/api/api.py b/processing_services/example/api/api.py index e69de29bb..f5ee08e5e 100644 --- a/processing_services/example/api/api.py +++ b/processing_services/example/api/api.py @@ -0,0 +1,110 @@ +""" +Fast API interface for processing images through the localization and classification pipelines. +""" + +import logging +import time + +import fastapi + +from .pipelines import ConstantPipeline, Pipeline, RandomPipeline +from .schemas import ( + AlgorithmConfigResponse, + PipelineRequest, + PipelineResultsResponse, + ProcessingServiceInfoResponse, + SourceImage, + SourceImageResponse, +) + +logger = logging.getLogger(__name__) + +app = fastapi.FastAPI() + + +pipelines: list[type[Pipeline]] = [RandomPipeline, ConstantPipeline] +pipeline_choices: dict[str, type[Pipeline]] = {pipeline.config.slug: pipeline for pipeline in pipelines} +algorithm_choices: dict[str, AlgorithmConfigResponse] = { + algorithm.key: algorithm for pipeline in pipelines for algorithm in pipeline.config.algorithms +} + + +@app.get("/") +async def root(): + return fastapi.responses.RedirectResponse("/docs") + + +@app.get("/info", tags=["services"]) +async def info() -> ProcessingServiceInfoResponse: + info = ProcessingServiceInfoResponse( + name="ML Backend Template", + description=( + "A template for an inference API that allows the user to run different sequences of machine learning " + "models and processing methods on images for the Antenna platform." + ), + pipelines=[pipeline.config for pipeline in pipelines], + # algorithms=list(algorithm_choices.values()), + ) + return info + + +# Check if the server is online +@app.get("/livez", tags=["health checks"]) +async def livez(): + return fastapi.responses.JSONResponse(status_code=200, content={"status": True}) + + +# Check if the pipelines are ready to process data +@app.get("/readyz", tags=["health checks"]) +async def readyz(): + """ + Check if the server is ready to process data. + + Returns a list of pipeline slugs that are online and ready to process data. + @TODO may need to simplify this to just return True/False. Pipeline algorithms will likely be loaded into memory + on-demand when the pipeline is selected. + """ + if pipeline_choices: + return fastapi.responses.JSONResponse(status_code=200, content={"status": list(pipeline_choices.keys())}) + else: + return fastapi.responses.JSONResponse(status_code=503, content={"status": []}) + + +@app.post("/process", tags=["services"]) +async def process(data: PipelineRequest) -> PipelineResultsResponse: + pipeline_slug = data.pipeline + + source_images = [SourceImage(**image.model_dump()) for image in data.source_images] + source_image_results = [SourceImageResponse(**image.model_dump()) for image in data.source_images] + + start_time = time.time() + + try: + Pipeline = pipeline_choices[pipeline_slug] + except KeyError: + raise fastapi.HTTPException(status_code=422, detail=f"Invalid pipeline choice: {pipeline_slug}") + + pipeline = Pipeline(source_images=source_images) + try: + results = pipeline.run() + except Exception as e: + logger.error(f"Error running pipeline: {e}") + raise fastapi.HTTPException(status_code=422, detail=f"{e}") + + end_time = time.time() + seconds_elapsed = float(end_time - start_time) + + response = PipelineResultsResponse( + pipeline=pipeline_slug, + algorithms={algorithm.key: algorithm for algorithm in pipeline.config.algorithms}, + source_images=source_image_results, + detections=results, + total_time=seconds_elapsed, + ) + return response + + +if __name__ == "__main__": + import uvicorn + + uvicorn.run(app, host="0.0.0.0", port=2000)