Skip to content

Commit

Permalink
Enable users to register Processing Services & Pipelines (#632)
Browse files Browse the repository at this point in the history
* Create backend model

* Create backend status endpoint

* Return server status and available pipelines

* Use pipeline slug

* Fix .gitignore

* Update backend status endpoint, test pipeline process images

* fix: missing import in ml models

* Add Backend to admin, update pipeline/backend model, register_pipelines action

* Fix type checking

* Add backend id to test pipeline processing

* Constant and Random pipeline processing

* Add test fixture

* Don't use same project id for all tests

* Added Backend created_at and updated_at serializer fields

* Update models and display backends last checked

* Resolve merge conflicts

* Remove unused variables

* Remove unused file

* Register pipelines via frontend

* Add missing fields to backend, fix migration error after merging with main

* Add backend details dialog

* Display backend details

* Fix backend details displayed values

* Select first backend associated with pipeline

* Fix linting errors

* Remove backend_id

* Remove version/version name, fix adding project, make endpoint required

* Use ErrorState component

* Add serializer details

* API test to check that pipelines are created

* Add edit backend default values

* Process images using backend with lowest latency

* Remove projects from ML schemas

* Resolve todos

* Raise exception if no backends online

* Fail the job if no backends are online

* Change MLBackend to ProcessingService

* Change all instances of backend to processing service

* Fix ui formatting, fix tests, and add migrations

* Update comment to processing service

* Update process_images error handling

* Fix last_checked_live and processing services online

* Change Sync to Add Pipelines

* Remove updated at column for processing services

* Display column of num pipelines added

* Change status label of pipelines online to pipelines avaialble

* Use slugify to add processing service

* fix: clean up some logging, type warnings and extra code

* feat: remove slug field, update naming

* fix: update phrasing

* Remove print statements

* Fix log formatting

* Squash migrations

* Filter processing services by project ID

* Button indicates pipeline registration error

* Fix processing service error handling

---------

Co-authored-by: Michael Bunsen <[email protected]>
  • Loading branch information
vanessavmac and mihow authored Jan 26, 2025
1 parent 650a305 commit e5b1aed
Show file tree
Hide file tree
Showing 51 changed files with 1,836 additions and 107 deletions.
23 changes: 23 additions & 0 deletions ami/jobs/migrations/0011_alter_job_limit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Generated by Django 4.2.10 on 2024-11-03 23:50

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("jobs", "0010_job_limit_job_shuffle"),
]

operations = [
migrations.AlterField(
model_name="job",
name="limit",
field=models.IntegerField(
blank=True,
default=None,
help_text="Limit the number of images to process",
null=True,
verbose_name="Limit",
),
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Generated by Django 4.2.10 on 2024-12-17 22:28

from django.db import migrations


class Migration(migrations.Migration):
dependencies = [
("jobs", "0011_alter_job_limit"),
("jobs", "0012_alter_job_limit"),
]

operations = []
2 changes: 1 addition & 1 deletion ami/main/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1868,7 +1868,7 @@ class Detection(BaseModel):
null=True,
blank=True,
)
# Time that the detection was created by the algorithm in the ML backend
# Time that the detection was created by the algorithm in the processing service
detection_time = models.DateTimeField(null=True, blank=True)
# @TODO not sure if this detection score is ever used
# I think it was intended to be the score of the detection algorithm (bbox score)
Expand Down
11 changes: 11 additions & 0 deletions ami/ml/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from .models.algorithm import Algorithm
from .models.pipeline import Pipeline
from .models.processing_service import ProcessingService


@admin.register(Algorithm)
Expand Down Expand Up @@ -57,3 +58,13 @@ class PipelineAdmin(AdminBase):
# See https://pypi.org/project/django-json-widget/
# models.JSONField: {"widget": JSONInput},
}


@admin.register(ProcessingService)
class ProcessingServiceAdmin(AdminBase):
list_display = [
"id",
"name",
"endpoint_url",
"created_at",
]
69 changes: 69 additions & 0 deletions ami/ml/migrations/0007_add_processing_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Generated by Django 4.2.10 on 2025-01-17 19:40

import ami.base.schemas
from django.db import migrations, models
import django_pydantic_field.fields


class Migration(migrations.Migration):
replaces = [
("ml", "0007_backend"),
("ml", "0008_remove_pipeline_endpoint_url_pipeline_backend"),
("ml", "0009_remove_pipeline_backend_backend_pipelines"),
("ml", "0010_backend_created_at_backend_updated_at"),
("ml", "0011_alter_pipeline_stages"),
("ml", "0012_backend_last_checked_backend_last_checked_live"),
("ml", "0013_backend_description_backend_name_backend_slug_and_more"),
("ml", "0014_remove_backend_version_remove_backend_version_name_and_more"),
("ml", "0015_processingservice_delete_backend"),
("ml", "0016_alter_processingservice_options"),
("ml", "0017_remove_processingservice_slug_and_more"),
]

dependencies = [
("main", "0038_alter_detection_path_alter_sourceimage_event_and_more"),
("ml", "0006_alter_pipeline_endpoint_url_alter_pipeline_projects"),
]

operations = [
migrations.RemoveField(
model_name="pipeline",
name="endpoint_url",
),
migrations.AlterField(
model_name="pipeline",
name="stages",
field=django_pydantic_field.fields.PydanticSchemaField(
config=None,
default=ami.base.schemas.default_stages,
help_text="The stages of the pipeline. This is mainly for display. The backend implementation of the pipeline may process data in any way.",
schema="list[PipelineStage]",
),
),
migrations.CreateModel(
name="ProcessingService",
fields=[
("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")),
("created_at", models.DateTimeField(auto_now_add=True)),
("updated_at", models.DateTimeField(auto_now=True)),
("name", models.CharField(max_length=255)),
("description", models.TextField(blank=True)),
("endpoint_url", models.CharField(max_length=1024)),
("last_checked", models.DateTimeField(null=True)),
("last_checked_live", models.BooleanField(null=True)),
(
"pipelines",
models.ManyToManyField(blank=True, related_name="processing_services", to="ml.pipeline"),
),
(
"projects",
models.ManyToManyField(blank=True, related_name="processing_services", to="main.project"),
),
("last_checked_latency", models.FloatField(null=True)),
],
options={
"verbose_name": "Processing Service",
"verbose_name_plural": "Processing Services",
},
),
]
6 changes: 4 additions & 2 deletions ami/ml/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from .algorithm import Algorithm
from .pipeline import Pipeline
from ami.ml.models.algorithm import Algorithm
from ami.ml.models.pipeline import Pipeline
from ami.ml.models.processing_service import ProcessingService

__all__ = [
"Algorithm",
"Pipeline",
"ProcessingService",
]
3 changes: 3 additions & 0 deletions ami/ml/models/algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ class Meta:
["name", "version"],
]

def __str__(self):
return f'#{self.pk} "{self.name}" ({self.key}) v{self.version}'

def save(self, *args, **kwargs):
if not self.key:
self.key = slugify(self.name)
Expand Down
114 changes: 90 additions & 24 deletions ami/ml/models/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from ami.ml.models import ProcessingService

import logging
import typing
from urllib.parse import urljoin

import requests
from django.db import models, transaction
Expand All @@ -22,17 +30,16 @@
TaxonRank,
update_calculated_fields_for_events,
)
from ami.ml.models.algorithm import Algorithm
from ami.ml.schemas import PipelineRequest, PipelineResponse, SourceImageRequest, SourceImageResponse
from ami.ml.tasks import celery_app, create_detection_images

from ..schemas import PipelineRequest, PipelineResponse, SourceImageRequest
from .algorithm import Algorithm

logger = logging.getLogger(__name__)


def filter_processed_images(
images: typing.Iterable[SourceImage],
pipeline: "Pipeline",
pipeline: Pipeline,
) -> typing.Iterable[SourceImage]:
"""
Return only images that need to be processed by a given pipeline for the first time (have no detections)
Expand Down Expand Up @@ -78,7 +85,7 @@ def collect_images(
source_images: list[SourceImage] | None = None,
deployment: Deployment | None = None,
job_id: int | None = None,
pipeline: "Pipeline | None" = None,
pipeline: Pipeline | None = None,
skip_processed: bool = True,
) -> typing.Iterable[SourceImage]:
"""
Expand Down Expand Up @@ -123,7 +130,7 @@ def collect_images(


def process_images(
pipeline: "Pipeline",
pipeline: Pipeline,
endpoint_url: str,
images: typing.Iterable[SourceImage],
job_id: int | None = None,
Expand Down Expand Up @@ -157,37 +164,47 @@ def process_images(
detections=[],
total_time=0,
)
task_logger.info(f"Sending {len(images)} images to ML backend {pipeline.slug}")
task_logger.info(f"Sending {len(images)} images to Pipeline {pipeline}")
urls = [source_image.public_url() for source_image in images if source_image.public_url()]

source_images = [
SourceImageRequest(
id=str(source_image.pk),
url=url,
)
for source_image, url in zip(images, urls)
if url
]

request_data = PipelineRequest(
pipeline=pipeline.slug,
source_images=[
SourceImageRequest(
id=str(source_image.pk),
url=url,
)
for source_image, url in zip(images, urls)
if url
],
source_images=source_images,
)

resp = requests.post(endpoint_url, json=request_data.dict())
if not resp.ok:
try:
msg = resp.json()["detail"]
except Exception:
msg = resp.content
msg = str(resp.content)
if job:
job.logger.error(msg)
else:
logger.error(msg)

resp.raise_for_status()
results = PipelineResponse(
pipeline=pipeline.slug,
total_time=0,
source_images=[
SourceImageResponse(id=source_image.id, url=source_image.url) for source_image in source_images
],
detections=[],
errors=msg,
)
return results

results = resp.json()
results = PipelineResponse(**results)

if job:
job.logger.debug(f"Results: {results}")
detections = results.detections
Expand Down Expand Up @@ -217,7 +234,7 @@ def save_results(results: PipelineResponse | None = None, results_json: str | No

pipeline, _created = Pipeline.objects.get_or_create(slug=results.pipeline, defaults={"name": results.pipeline})
if _created:
logger.warning(f"Pipeline choice returned by the ML backend was not recognized! {pipeline}")
logger.warning(f"Pipeline choice returned by the Processing Service was not recognized! {pipeline}")
created_objects.append(pipeline)
algorithms_used = set()

Expand Down Expand Up @@ -396,7 +413,7 @@ class Pipeline(BaseModel):
version = models.IntegerField(default=1)
version_name = models.CharField(max_length=255, blank=True)
# @TODO the algorithms list be retrieved by querying the pipeline endpoint
algorithms = models.ManyToManyField(Algorithm, related_name="pipelines")
algorithms = models.ManyToManyField("ml.Algorithm", related_name="pipelines")
stages: list[PipelineStage] = SchemaField(
default=default_stages,
help_text=(
Expand All @@ -405,7 +422,7 @@ class Pipeline(BaseModel):
),
)
projects = models.ManyToManyField("main.Project", related_name="pipelines", blank=True)
endpoint_url = models.CharField(max_length=1024, null=True, blank=True)
processing_services: models.QuerySet[ProcessingService]

class Meta:
ordering = ["name", "version"]
Expand All @@ -414,6 +431,9 @@ class Meta:
["name", "version"],
]

def __str__(self):
return f'#{self.pk} "{self.name}" ({self.slug}) v{self.version}'

def collect_images(
self,
collection: SourceImageCollection | None = None,
Expand All @@ -431,11 +451,57 @@ def collect_images(
skip_processed=skip_processed,
)

def choose_processing_service_for_pipeline(self, job_id, pipeline_name) -> ProcessingService:
# @TODO use the cached `last_checked_latency` and a max age to avoid checking every time

job = None
task_logger = logger
if job_id:
from ami.jobs.models import Job

job = Job.objects.get(pk=job_id)
task_logger = job.logger

processing_services = self.processing_services.all()

# check the status of all processing services
timeout = 5 * 60.0 # 5 minutes
lowest_latency = timeout
processing_services_online = False

for processing_service in processing_services:
status_response = processing_service.get_status() # @TODO pass timeout to get_status()
if status_response.server_live:
processing_services_online = True
if status_response.latency < lowest_latency:
lowest_latency = status_response.latency
# pick the processing service that has lowest latency
processing_service_lowest_latency = processing_service

# if all offline then throw error
if not processing_services_online:
msg = f'No processing services are online for the pipeline "{pipeline_name}".'
task_logger.error(msg)

raise Exception(msg)
else:
task_logger.info(
f"Using processing service with latency {round(lowest_latency, 4)}: "
f"{processing_service_lowest_latency}"
)

return processing_service_lowest_latency

def process_images(self, images: typing.Iterable[SourceImage], job_id: int | None = None):
if not self.endpoint_url:
raise ValueError("No endpoint URL configured for this pipeline")
processing_service = self.choose_processing_service_for_pipeline(job_id, self.name)

if not processing_service.endpoint_url:
raise ValueError(
f"No endpoint URL configured for this pipeline's processing service ({processing_service})"
)

return process_images(
endpoint_url=self.endpoint_url,
endpoint_url=urljoin(processing_service.endpoint_url, "/process_images"),
pipeline=self,
images=images,
job_id=job_id,
Expand Down
Loading

0 comments on commit e5b1aed

Please sign in to comment.