Skip to content

Commit

Permalink
Change all instances of backend to processing service
Browse files Browse the repository at this point in the history
  • Loading branch information
vanessavmac committed Jan 12, 2025
1 parent e65d210 commit 325a7c7
Show file tree
Hide file tree
Showing 46 changed files with 423 additions and 357 deletions.
6 changes: 3 additions & 3 deletions ami/ml/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
from ami.main.admin import AdminBase

from .models.algorithm import Algorithm
from .models.backend import Backend
from .models.pipeline import Pipeline
from .models.processing_service import ProcessingService


@admin.register(Algorithm)
Expand Down Expand Up @@ -60,8 +60,8 @@ class PipelineAdmin(AdminBase):
}


@admin.register(Backend)
class BackendAdmin(AdminBase):
@admin.register(ProcessingService)
class ProcessingServiceAdmin(AdminBase):
list_display = [
"id",
"name",
Expand Down
4 changes: 2 additions & 2 deletions ami/ml/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from ami.ml.models.algorithm import Algorithm
from ami.ml.models.backend import Backend
from ami.ml.models.pipeline import Pipeline
from ami.ml.models.processing_service import ProcessingService

__all__ = [
"Algorithm",
"Pipeline",
"Backend",
"ProcessingService",
]
46 changes: 24 additions & 22 deletions ami/ml/models/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from ami.ml.models import Backend
from ami.ml.models import ProcessingService

import logging
import typing
Expand Down Expand Up @@ -164,7 +164,7 @@ def process_images(
detections=[],
total_time=0,
)
task_logger.info(f"Sending {len(images)} images to ML backend {pipeline.slug}")
task_logger.info(f"Sending {len(images)} images to Processing Service {pipeline.slug}")
urls = [source_image.public_url() for source_image in images if source_image.public_url()]

source_images = [
Expand Down Expand Up @@ -232,7 +232,7 @@ def save_results(results: PipelineResponse | None = None, results_json: str | No

pipeline, _created = Pipeline.objects.get_or_create(slug=results.pipeline, defaults={"name": results.pipeline})
if _created:
logger.warning(f"Pipeline choice returned by the ML backend was not recognized! {pipeline}")
logger.warning(f"Pipeline choice returned by the Processing Service was not recognized! {pipeline}")
created_objects.append(pipeline)
algorithms_used = set()

Expand Down Expand Up @@ -420,7 +420,7 @@ class Pipeline(BaseModel):
),
)
projects = models.ManyToManyField("main.Project", related_name="pipelines", blank=True)
backends: models.QuerySet[Backend]
processing_services: models.QuerySet[ProcessingService]

class Meta:
ordering = ["name", "version"]
Expand All @@ -446,32 +446,32 @@ def collect_images(
skip_processed=skip_processed,
)

def choose_backend_for_pipeline(self, job_id):
def choose_processing_service_for_pipeline(self, job_id):
job = None
if job_id:
from ami.jobs.models import Job

job = Job.objects.get(pk=job_id)

backends = self.backends.all()
processing_services = self.processing_services.all()

# check the status of all backends
backend_id_lowest_latency = backends.first().id if backends.exists() else None
# check the status of all processing services
processing_service_id_lowest_latency = processing_services.first().id if processing_services.exists() else None
lowest_latency = 10000
backends_online = False
processing_services_online = False

for backend in backends:
status_response = backend.get_status()
for processing_service in processing_services:
status_response = processing_service.get_status()
if status_response.server_live:
backends_online = True
processing_services_online = True
if status_response.latency < lowest_latency:
lowest_latency = status_response.latency
# pick the backend that lowest latency
backend_id_lowest_latency = backend.id
# pick the processing service that has lowest latency
processing_service_id_lowest_latency = processing_service.id

# if all offline then throw error
if not backends_online:
msg = "No backends are online."
if not processing_services_online:
msg = "No processing services are online."

if job:
job.logger.error(msg)
Expand All @@ -480,21 +480,23 @@ def choose_backend_for_pipeline(self, job_id):
raise Exception(msg)
else:
if job:
job.logger.info(f"Using Backend with ID={backend_id_lowest_latency}")
logger.info(f"Using Backend with ID={backend_id_lowest_latency}")
job.logger.info(f"Using Processing Service with ID={processing_service_id_lowest_latency}")
logger.info(f"Using Processing Service with ID={processing_service_id_lowest_latency}")

return backend_id_lowest_latency
return processing_service_id_lowest_latency

def process_images(self, images: typing.Iterable[SourceImage], job_id: int | None = None):
try:
backend_id = self.choose_backend_for_pipeline(job_id)
processing_service_id = self.choose_processing_service_for_pipeline(job_id)
except Exception:
return

if not self.backends.filter(pk=backend_id).first().endpoint_url:
if not self.processing_services.filter(pk=processing_service_id).first().endpoint_url:
raise ValueError("No endpoint URL configured for this pipeline")
return process_images(
endpoint_url=urljoin(self.backends.filter(pk=backend_id).first().endpoint_url, "/process_images"),
endpoint_url=urljoin(
self.processing_services.filter(pk=processing_service_id).first().endpoint_url, "/process_images"
),
pipeline=self,
images=images,
job_id=job_id,
Expand Down
14 changes: 7 additions & 7 deletions ami/ml/models/backend.py → ami/ml/models/processing_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,30 @@
from ami.base.models import BaseModel
from ami.ml.models.algorithm import Algorithm
from ami.ml.models.pipeline import Pipeline
from ami.ml.schemas import BackendStatusResponse, PipelineRegistrationResponse
from ami.ml.schemas import PipelineRegistrationResponse, ProcessingServiceStatusResponse

logger = logging.getLogger(__name__)


@typing.final
class Backend(BaseModel):
class ProcessingService(BaseModel):
"""An ML processing backend"""

name = models.CharField(max_length=255)
slug = models.SlugField(max_length=255, unique=True)
description = models.TextField(blank=True)
projects = models.ManyToManyField("main.Project", related_name="backends", blank=True)
projects = models.ManyToManyField("main.Project", related_name="processing_services", blank=True)
endpoint_url = models.CharField(max_length=1024)
pipelines = models.ManyToManyField("ml.Pipeline", related_name="backends", blank=True)
pipelines = models.ManyToManyField("ml.Pipeline", related_name="processing_services", blank=True)
last_checked = models.DateTimeField(null=True)
last_checked_live = models.BooleanField(null=True)

def __str__(self):
return self.name

class Meta:
verbose_name = "Backend"
verbose_name_plural = "Backends"
verbose_name = "ProcessingService"
verbose_name_plural = "ProcessingServices"

def create_pipelines(self):
# Call the status endpoint and get the pipelines/algorithms
Expand Down Expand Up @@ -106,7 +106,7 @@ def get_status(self):
first_response_time = time.time()
latency = first_response_time - start_time

response = BackendStatusResponse(
response = ProcessingServiceStatusResponse(
timestamp=timestamp,
request_successful=resp.ok,
server_live=server_live,
Expand Down
2 changes: 1 addition & 1 deletion ami/ml/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class PipelineConfig(pydantic.BaseModel):
stages: list[PipelineStage] = []


class BackendStatusResponse(pydantic.BaseModel):
class ProcessingServiceStatusResponse(pydantic.BaseModel):
timestamp: datetime.datetime
request_successful: bool
pipeline_configs: list[PipelineConfig] = []
Expand Down
14 changes: 7 additions & 7 deletions ami/ml/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
from ami.main.models import Project

from .models.algorithm import Algorithm
from .models.backend import Backend
from .models.pipeline import Pipeline, PipelineStage
from .models.processing_service import ProcessingService


class AlgorithmSerializer(DefaultSerializer):
Expand Down Expand Up @@ -41,9 +41,9 @@ class Meta:
]


class BackendNestedSerializer(DefaultSerializer):
class ProcessingServiceNestedSerializer(DefaultSerializer):
class Meta:
model = Backend
model = ProcessingService
fields = [
"name",
"slug",
Expand All @@ -60,7 +60,7 @@ class Meta:
class PipelineSerializer(DefaultSerializer):
algorithms = AlgorithmSerializer(many=True, read_only=True)
stages = SchemaField(schema=list[PipelineStage], read_only=True)
backends = BackendNestedSerializer(many=True, read_only=True)
processing_services = ProcessingServiceNestedSerializer(many=True, read_only=True)

class Meta:
model = Pipeline
Expand All @@ -72,7 +72,7 @@ class Meta:
"description",
"algorithms",
"stages",
"backends",
"processing_services",
"created_at",
"updated_at",
]
Expand All @@ -94,7 +94,7 @@ class Meta:
]


class BackendSerializer(DefaultSerializer):
class ProcessingServiceSerializer(DefaultSerializer):
pipelines = PipelineNestedSerializer(many=True, read_only=True)
project = serializers.PrimaryKeyRelatedField(
write_only=True,
Expand All @@ -103,7 +103,7 @@ class BackendSerializer(DefaultSerializer):
)

class Meta:
model = Backend
model = ProcessingService
fields = [
"id",
"details",
Expand Down
Loading

0 comments on commit 325a7c7

Please sign in to comment.