Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hotfix: snafu from renaming ml_backends to processing_services #697

Merged
merged 1 commit into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions docker-compose.ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ services:
- postgres
- redis
- minio-init
- processing_service
- ml_backend
env_file:
- ./.envs/.ci/.django
- ./.envs/.ci/.postgres
Expand Down Expand Up @@ -43,10 +43,8 @@ services:
- ./compose/local/minio/init.sh:/etc/minio/init.sh
entrypoint: /etc/minio/init.sh

processing_service:
ml_backend:
build:
context: ./processing_services/example
volumes:
- ./processing_services/example/:/app:processing_service
ports:
- "2000:2000"
- ./processing_services/example/:/app
9 changes: 7 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ services:
- postgres
- redis
- minio-init
- ml_backend
volumes:
- .:/app:z
env_file:
Expand Down Expand Up @@ -141,8 +142,12 @@ services:

ml_backend:
build:
context: ./ml_backends/example
context: ./processing_services/example
volumes:
- ./ml_backends/example/:/app
- ./processing_services/example/:/app
ports:
- "2005:2000"
networks:
default:
aliases:
- processing_service
110 changes: 110 additions & 0 deletions processing_services/example/api/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
"""
Fast API interface for processing images through the localization and classification pipelines.
"""

import logging
import time

import fastapi

from .pipelines import ConstantPipeline, Pipeline, RandomPipeline
from .schemas import (
AlgorithmConfigResponse,
PipelineRequest,
PipelineResultsResponse,
ProcessingServiceInfoResponse,
SourceImage,
SourceImageResponse,
)

logger = logging.getLogger(__name__)

app = fastapi.FastAPI()


pipelines: list[type[Pipeline]] = [RandomPipeline, ConstantPipeline]
pipeline_choices: dict[str, type[Pipeline]] = {pipeline.config.slug: pipeline for pipeline in pipelines}
algorithm_choices: dict[str, AlgorithmConfigResponse] = {
algorithm.key: algorithm for pipeline in pipelines for algorithm in pipeline.config.algorithms
}


@app.get("/")
async def root():
return fastapi.responses.RedirectResponse("/docs")


@app.get("/info", tags=["services"])
async def info() -> ProcessingServiceInfoResponse:
info = ProcessingServiceInfoResponse(
name="ML Backend Template",
description=(
"A template for an inference API that allows the user to run different sequences of machine learning "
"models and processing methods on images for the Antenna platform."
),
pipelines=[pipeline.config for pipeline in pipelines],
# algorithms=list(algorithm_choices.values()),
)
return info


# Check if the server is online
@app.get("/livez", tags=["health checks"])
async def livez():
return fastapi.responses.JSONResponse(status_code=200, content={"status": True})


# Check if the pipelines are ready to process data
@app.get("/readyz", tags=["health checks"])
async def readyz():
"""
Check if the server is ready to process data.

Returns a list of pipeline slugs that are online and ready to process data.
@TODO may need to simplify this to just return True/False. Pipeline algorithms will likely be loaded into memory
on-demand when the pipeline is selected.
"""
if pipeline_choices:
return fastapi.responses.JSONResponse(status_code=200, content={"status": list(pipeline_choices.keys())})
else:
return fastapi.responses.JSONResponse(status_code=503, content={"status": []})


@app.post("/process", tags=["services"])
async def process(data: PipelineRequest) -> PipelineResultsResponse:
pipeline_slug = data.pipeline

source_images = [SourceImage(**image.model_dump()) for image in data.source_images]
source_image_results = [SourceImageResponse(**image.model_dump()) for image in data.source_images]

start_time = time.time()

try:
Pipeline = pipeline_choices[pipeline_slug]
except KeyError:
raise fastapi.HTTPException(status_code=422, detail=f"Invalid pipeline choice: {pipeline_slug}")

pipeline = Pipeline(source_images=source_images)
try:
results = pipeline.run()
except Exception as e:
logger.error(f"Error running pipeline: {e}")
raise fastapi.HTTPException(status_code=422, detail=f"{e}")

end_time = time.time()
seconds_elapsed = float(end_time - start_time)

response = PipelineResultsResponse(
pipeline=pipeline_slug,
algorithms={algorithm.key: algorithm for algorithm in pipeline.config.algorithms},
source_images=source_image_results,
detections=results,
total_time=seconds_elapsed,
)
return response


if __name__ == "__main__":
import uvicorn

uvicorn.run(app, host="0.0.0.0", port=2000)
Loading