diff --git a/ami/jobs/migrations/0011_alter_job_limit.py b/ami/jobs/migrations/0011_alter_job_limit.py new file mode 100644 index 000000000..3ed7e9c06 --- /dev/null +++ b/ami/jobs/migrations/0011_alter_job_limit.py @@ -0,0 +1,23 @@ +# Generated by Django 4.2.10 on 2024-11-03 23:50 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("jobs", "0010_job_limit_job_shuffle"), + ] + + operations = [ + migrations.AlterField( + model_name="job", + name="limit", + field=models.IntegerField( + blank=True, + default=None, + help_text="Limit the number of images to process", + null=True, + verbose_name="Limit", + ), + ), + ] diff --git a/ami/jobs/migrations/0013_merge_0011_alter_job_limit_0012_alter_job_limit.py b/ami/jobs/migrations/0013_merge_0011_alter_job_limit_0012_alter_job_limit.py new file mode 100644 index 000000000..55986cde1 --- /dev/null +++ b/ami/jobs/migrations/0013_merge_0011_alter_job_limit_0012_alter_job_limit.py @@ -0,0 +1,12 @@ +# Generated by Django 4.2.10 on 2024-12-17 22:28 + +from django.db import migrations + + +class Migration(migrations.Migration): + dependencies = [ + ("jobs", "0011_alter_job_limit"), + ("jobs", "0012_alter_job_limit"), + ] + + operations = [] diff --git a/ami/main/models.py b/ami/main/models.py index 770e4c329..a77b39335 100644 --- a/ami/main/models.py +++ b/ami/main/models.py @@ -1868,7 +1868,7 @@ class Detection(BaseModel): null=True, blank=True, ) - # Time that the detection was created by the algorithm in the ML backend + # Time that the detection was created by the algorithm in the processing service detection_time = models.DateTimeField(null=True, blank=True) # @TODO not sure if this detection score is ever used # I think it was intended to be the score of the detection algorithm (bbox score) diff --git a/ami/ml/admin.py b/ami/ml/admin.py index bd5415cf0..3f4784d1b 100644 --- a/ami/ml/admin.py +++ b/ami/ml/admin.py @@ -4,6 +4,7 @@ from .models.algorithm import Algorithm from .models.pipeline import Pipeline +from .models.processing_service import ProcessingService @admin.register(Algorithm) @@ -57,3 +58,13 @@ class PipelineAdmin(AdminBase): # See https://pypi.org/project/django-json-widget/ # models.JSONField: {"widget": JSONInput}, } + + +@admin.register(ProcessingService) +class ProcessingServiceAdmin(AdminBase): + list_display = [ + "id", + "name", + "endpoint_url", + "created_at", + ] diff --git a/ami/ml/migrations/0007_add_processing_service.py b/ami/ml/migrations/0007_add_processing_service.py new file mode 100644 index 000000000..17a9df662 --- /dev/null +++ b/ami/ml/migrations/0007_add_processing_service.py @@ -0,0 +1,69 @@ +# Generated by Django 4.2.10 on 2025-01-17 19:40 + +import ami.base.schemas +from django.db import migrations, models +import django_pydantic_field.fields + + +class Migration(migrations.Migration): + replaces = [ + ("ml", "0007_backend"), + ("ml", "0008_remove_pipeline_endpoint_url_pipeline_backend"), + ("ml", "0009_remove_pipeline_backend_backend_pipelines"), + ("ml", "0010_backend_created_at_backend_updated_at"), + ("ml", "0011_alter_pipeline_stages"), + ("ml", "0012_backend_last_checked_backend_last_checked_live"), + ("ml", "0013_backend_description_backend_name_backend_slug_and_more"), + ("ml", "0014_remove_backend_version_remove_backend_version_name_and_more"), + ("ml", "0015_processingservice_delete_backend"), + ("ml", "0016_alter_processingservice_options"), + ("ml", "0017_remove_processingservice_slug_and_more"), + ] + + dependencies = [ + ("main", "0038_alter_detection_path_alter_sourceimage_event_and_more"), + ("ml", "0006_alter_pipeline_endpoint_url_alter_pipeline_projects"), + ] + + operations = [ + migrations.RemoveField( + model_name="pipeline", + name="endpoint_url", + ), + migrations.AlterField( + model_name="pipeline", + name="stages", + field=django_pydantic_field.fields.PydanticSchemaField( + config=None, + default=ami.base.schemas.default_stages, + help_text="The stages of the pipeline. This is mainly for display. The backend implementation of the pipeline may process data in any way.", + schema="list[PipelineStage]", + ), + ), + migrations.CreateModel( + name="ProcessingService", + fields=[ + ("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("created_at", models.DateTimeField(auto_now_add=True)), + ("updated_at", models.DateTimeField(auto_now=True)), + ("name", models.CharField(max_length=255)), + ("description", models.TextField(blank=True)), + ("endpoint_url", models.CharField(max_length=1024)), + ("last_checked", models.DateTimeField(null=True)), + ("last_checked_live", models.BooleanField(null=True)), + ( + "pipelines", + models.ManyToManyField(blank=True, related_name="processing_services", to="ml.pipeline"), + ), + ( + "projects", + models.ManyToManyField(blank=True, related_name="processing_services", to="main.project"), + ), + ("last_checked_latency", models.FloatField(null=True)), + ], + options={ + "verbose_name": "Processing Service", + "verbose_name_plural": "Processing Services", + }, + ), + ] diff --git a/ami/ml/models/__init__.py b/ami/ml/models/__init__.py index ba011027c..a5e716372 100644 --- a/ami/ml/models/__init__.py +++ b/ami/ml/models/__init__.py @@ -1,7 +1,9 @@ -from .algorithm import Algorithm -from .pipeline import Pipeline +from ami.ml.models.algorithm import Algorithm +from ami.ml.models.pipeline import Pipeline +from ami.ml.models.processing_service import ProcessingService __all__ = [ "Algorithm", "Pipeline", + "ProcessingService", ] diff --git a/ami/ml/models/algorithm.py b/ami/ml/models/algorithm.py index fd54f4466..9cc56d4bd 100644 --- a/ami/ml/models/algorithm.py +++ b/ami/ml/models/algorithm.py @@ -38,6 +38,9 @@ class Meta: ["name", "version"], ] + def __str__(self): + return f'#{self.pk} "{self.name}" ({self.key}) v{self.version}' + def save(self, *args, **kwargs): if not self.key: self.key = slugify(self.name) diff --git a/ami/ml/models/pipeline.py b/ami/ml/models/pipeline.py index d61fa446f..c830f69bf 100644 --- a/ami/ml/models/pipeline.py +++ b/ami/ml/models/pipeline.py @@ -1,5 +1,13 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from ami.ml.models import ProcessingService + import logging import typing +from urllib.parse import urljoin import requests from django.db import models, transaction @@ -22,17 +30,16 @@ TaxonRank, update_calculated_fields_for_events, ) +from ami.ml.models.algorithm import Algorithm +from ami.ml.schemas import PipelineRequest, PipelineResponse, SourceImageRequest, SourceImageResponse from ami.ml.tasks import celery_app, create_detection_images -from ..schemas import PipelineRequest, PipelineResponse, SourceImageRequest -from .algorithm import Algorithm - logger = logging.getLogger(__name__) def filter_processed_images( images: typing.Iterable[SourceImage], - pipeline: "Pipeline", + pipeline: Pipeline, ) -> typing.Iterable[SourceImage]: """ Return only images that need to be processed by a given pipeline for the first time (have no detections) @@ -78,7 +85,7 @@ def collect_images( source_images: list[SourceImage] | None = None, deployment: Deployment | None = None, job_id: int | None = None, - pipeline: "Pipeline | None" = None, + pipeline: Pipeline | None = None, skip_processed: bool = True, ) -> typing.Iterable[SourceImage]: """ @@ -123,7 +130,7 @@ def collect_images( def process_images( - pipeline: "Pipeline", + pipeline: Pipeline, endpoint_url: str, images: typing.Iterable[SourceImage], job_id: int | None = None, @@ -157,19 +164,21 @@ def process_images( detections=[], total_time=0, ) - task_logger.info(f"Sending {len(images)} images to ML backend {pipeline.slug}") + task_logger.info(f"Sending {len(images)} images to Pipeline {pipeline}") urls = [source_image.public_url() for source_image in images if source_image.public_url()] + source_images = [ + SourceImageRequest( + id=str(source_image.pk), + url=url, + ) + for source_image, url in zip(images, urls) + if url + ] + request_data = PipelineRequest( pipeline=pipeline.slug, - source_images=[ - SourceImageRequest( - id=str(source_image.pk), - url=url, - ) - for source_image, url in zip(images, urls) - if url - ], + source_images=source_images, ) resp = requests.post(endpoint_url, json=request_data.dict()) @@ -177,17 +186,25 @@ def process_images( try: msg = resp.json()["detail"] except Exception: - msg = resp.content + msg = str(resp.content) if job: job.logger.error(msg) else: logger.error(msg) - resp.raise_for_status() + results = PipelineResponse( + pipeline=pipeline.slug, + total_time=0, + source_images=[ + SourceImageResponse(id=source_image.id, url=source_image.url) for source_image in source_images + ], + detections=[], + errors=msg, + ) + return results results = resp.json() results = PipelineResponse(**results) - if job: job.logger.debug(f"Results: {results}") detections = results.detections @@ -217,7 +234,7 @@ def save_results(results: PipelineResponse | None = None, results_json: str | No pipeline, _created = Pipeline.objects.get_or_create(slug=results.pipeline, defaults={"name": results.pipeline}) if _created: - logger.warning(f"Pipeline choice returned by the ML backend was not recognized! {pipeline}") + logger.warning(f"Pipeline choice returned by the Processing Service was not recognized! {pipeline}") created_objects.append(pipeline) algorithms_used = set() @@ -396,7 +413,7 @@ class Pipeline(BaseModel): version = models.IntegerField(default=1) version_name = models.CharField(max_length=255, blank=True) # @TODO the algorithms list be retrieved by querying the pipeline endpoint - algorithms = models.ManyToManyField(Algorithm, related_name="pipelines") + algorithms = models.ManyToManyField("ml.Algorithm", related_name="pipelines") stages: list[PipelineStage] = SchemaField( default=default_stages, help_text=( @@ -405,7 +422,7 @@ class Pipeline(BaseModel): ), ) projects = models.ManyToManyField("main.Project", related_name="pipelines", blank=True) - endpoint_url = models.CharField(max_length=1024, null=True, blank=True) + processing_services: models.QuerySet[ProcessingService] class Meta: ordering = ["name", "version"] @@ -414,6 +431,9 @@ class Meta: ["name", "version"], ] + def __str__(self): + return f'#{self.pk} "{self.name}" ({self.slug}) v{self.version}' + def collect_images( self, collection: SourceImageCollection | None = None, @@ -431,11 +451,57 @@ def collect_images( skip_processed=skip_processed, ) + def choose_processing_service_for_pipeline(self, job_id, pipeline_name) -> ProcessingService: + # @TODO use the cached `last_checked_latency` and a max age to avoid checking every time + + job = None + task_logger = logger + if job_id: + from ami.jobs.models import Job + + job = Job.objects.get(pk=job_id) + task_logger = job.logger + + processing_services = self.processing_services.all() + + # check the status of all processing services + timeout = 5 * 60.0 # 5 minutes + lowest_latency = timeout + processing_services_online = False + + for processing_service in processing_services: + status_response = processing_service.get_status() # @TODO pass timeout to get_status() + if status_response.server_live: + processing_services_online = True + if status_response.latency < lowest_latency: + lowest_latency = status_response.latency + # pick the processing service that has lowest latency + processing_service_lowest_latency = processing_service + + # if all offline then throw error + if not processing_services_online: + msg = f'No processing services are online for the pipeline "{pipeline_name}".' + task_logger.error(msg) + + raise Exception(msg) + else: + task_logger.info( + f"Using processing service with latency {round(lowest_latency, 4)}: " + f"{processing_service_lowest_latency}" + ) + + return processing_service_lowest_latency + def process_images(self, images: typing.Iterable[SourceImage], job_id: int | None = None): - if not self.endpoint_url: - raise ValueError("No endpoint URL configured for this pipeline") + processing_service = self.choose_processing_service_for_pipeline(job_id, self.name) + + if not processing_service.endpoint_url: + raise ValueError( + f"No endpoint URL configured for this pipeline's processing service ({processing_service})" + ) + return process_images( - endpoint_url=self.endpoint_url, + endpoint_url=urljoin(processing_service.endpoint_url, "/process_images"), pipeline=self, images=images, job_id=job_id, diff --git a/ami/ml/models/processing_service.py b/ami/ml/models/processing_service.py new file mode 100644 index 000000000..fb34c271c --- /dev/null +++ b/ami/ml/models/processing_service.py @@ -0,0 +1,134 @@ +import datetime +import logging +import time +import typing +from urllib.parse import urljoin + +import requests +from django.db import models + +from ami.base.models import BaseModel +from ami.ml.models.algorithm import Algorithm +from ami.ml.models.pipeline import Pipeline +from ami.ml.schemas import PipelineRegistrationResponse, ProcessingServiceStatusResponse + +logger = logging.getLogger(__name__) + + +@typing.final +class ProcessingService(BaseModel): + """An ML processing service""" + + name = models.CharField(max_length=255) + description = models.TextField(blank=True) + projects = models.ManyToManyField("main.Project", related_name="processing_services", blank=True) + endpoint_url = models.CharField(max_length=1024) + pipelines = models.ManyToManyField("ml.Pipeline", related_name="processing_services", blank=True) + last_checked = models.DateTimeField(null=True) + last_checked_live = models.BooleanField(null=True) + last_checked_latency = models.FloatField(null=True) + + def __str__(self): + return f'#{self.pk} "{self.name}" at {self.endpoint_url}' + + class Meta: + verbose_name = "Processing Service" + verbose_name_plural = "Processing Services" + + def create_pipelines(self): + # Call the status endpoint and get the pipelines/algorithms + resp = self.get_status() + if resp.error: + resp.raise_for_status() + + pipelines_to_add = resp.pipeline_configs + pipelines = [] + pipelines_created = [] + algorithms_created = [] + + for pipeline_data in pipelines_to_add: + pipeline, created = Pipeline.objects.get_or_create( + name=pipeline_data.name, + slug=pipeline_data.slug, + version=pipeline_data.version, + description=pipeline_data.description or "", + ) + pipeline.projects.add(*self.projects.all()) + self.pipelines.add(pipeline) + + if created: + logger.info(f"Successfully created pipeline {pipeline.name}.") + pipelines_created.append(pipeline.slug) + else: + logger.info(f"Using existing pipeline {pipeline.name}.") + + for algorithm_data in pipeline_data.algorithms: + algorithm, created = Algorithm.objects.get_or_create(name=algorithm_data.name, key=algorithm_data.key) + pipeline.algorithms.add(algorithm) + + if created: + logger.info(f"Successfully created algorithm {algorithm.name}.") + algorithms_created.append(algorithm.name) + else: + logger.info(f"Using existing algorithm {algorithm.name}.") + + pipeline.save() + pipelines.append(pipeline) + + return PipelineRegistrationResponse( + timestamp=datetime.datetime.now(), + success=True, + pipelines=pipelines_to_add, + pipelines_created=pipelines_created, + algorithms_created=algorithms_created, + ) + + def get_status(self): + info_url = urljoin(self.endpoint_url, "info") + start_time = time.time() + error = None + timestamp = datetime.datetime.now() + self.last_checked = timestamp + + try: + resp = requests.get(info_url) + resp.raise_for_status() + except requests.exceptions.RequestException as e: + self.last_checked_live = False + self.save() + error = f"Error connecting to {info_url}: {e}" + logger.error(error) + + first_response_time = time.time() + latency = first_response_time - start_time + + return ProcessingServiceStatusResponse( + timestamp=timestamp, + request_successful=False, + endpoint_url=self.endpoint_url, + error=error, + latency=latency, + ) + + pipeline_configs = resp.json() + server_live = requests.get(urljoin(self.endpoint_url, "livez")).json().get("status") + pipelines_online = requests.get(urljoin(self.endpoint_url, "readyz")).json().get("status") + + first_response_time = time.time() + latency = first_response_time - start_time + self.last_checked_live = server_live + self.last_checked_latency = latency + self.save() + + response = ProcessingServiceStatusResponse( + timestamp=timestamp, + request_successful=resp.ok, + server_live=server_live, + pipelines_online=pipelines_online, + pipeline_configs=pipeline_configs, + endpoint_url=self.endpoint_url, + error=error, + latency=latency, + ) + + return response diff --git a/ami/ml/schemas.py b/ami/ml/schemas.py index ea7266e14..e700e8076 100644 --- a/ami/ml/schemas.py +++ b/ami/ml/schemas.py @@ -62,6 +62,60 @@ class PipelineRequest(pydantic.BaseModel): class PipelineResponse(pydantic.BaseModel): # pipeline: PipelineChoice pipeline: str - total_time: float + total_time: float | None source_images: list[SourceImageResponse] detections: list[DetectionResponse] + errors: list | str | None = None + + +class PipelineStageParam(pydantic.BaseModel): + """A configurable parameter of a stage of a pipeline.""" + + name: str + key: str + category: str = "default" + + +class PipelineStage(pydantic.BaseModel): + """A configurable stage of a pipeline.""" + + key: str + name: str + params: list[PipelineStageParam] = [] + description: str | None = None + + +class AlgorithmConfig(pydantic.BaseModel): + name: str + key: str + + +class PipelineConfig(pydantic.BaseModel): + """A configurable pipeline.""" + + name: str + slug: str + version: int + description: str | None = None + algorithms: list[AlgorithmConfig] = [] + stages: list[PipelineStage] = [] + + +class ProcessingServiceStatusResponse(pydantic.BaseModel): + timestamp: datetime.datetime + request_successful: bool + pipeline_configs: list[PipelineConfig] = [] + error: str | None = None + server_live: bool | None = None + pipelines_online: list[str] | str = "pipelines unavailable" + endpoint_url: str + latency: float + + +class PipelineRegistrationResponse(pydantic.BaseModel): + timestamp: datetime.datetime + success: bool + error: str | None = None + pipelines: list[PipelineConfig] = [] + pipelines_created: list[str] = [] + algorithms_created: list[str] = [] diff --git a/ami/ml/serializers.py b/ami/ml/serializers.py index 30ff8c962..24e08a064 100644 --- a/ami/ml/serializers.py +++ b/ami/ml/serializers.py @@ -1,9 +1,12 @@ from django_pydantic_field.rest_framework import SchemaField +from rest_framework import serializers from ami.main.api.serializers import DefaultSerializer +from ami.main.models import Project from .models.algorithm import Algorithm from .models.pipeline import Pipeline, PipelineStage +from .models.processing_service import ProcessingService class AlgorithmSerializer(DefaultSerializer): @@ -33,12 +36,30 @@ class Meta: "key", "version", "version_name", + "created_at", + "updated_at", + ] + + +class ProcessingServiceNestedSerializer(DefaultSerializer): + class Meta: + model = ProcessingService + fields = [ + "name", + "id", + "details", + "endpoint_url", + "last_checked", + "last_checked_live", + "created_at", + "updated_at", ] class PipelineSerializer(DefaultSerializer): algorithms = AlgorithmSerializer(many=True, read_only=True) stages = SchemaField(schema=list[PipelineStage], read_only=True) + processing_services = ProcessingServiceNestedSerializer(many=True, read_only=True) class Meta: model = Pipeline @@ -48,11 +69,9 @@ class Meta: "name", "slug", "description", - "version", - "version_name", "algorithms", "stages", - "endpoint_url", + "processing_services", "created_at", "updated_at", ] @@ -69,4 +88,41 @@ class Meta: "description", "version", "version_name", + "created_at", + "updated_at", ] + + +class ProcessingServiceSerializer(DefaultSerializer): + pipelines = PipelineNestedSerializer(many=True, read_only=True) + project = serializers.PrimaryKeyRelatedField( + write_only=True, + queryset=Project.objects.all(), + required=False, + ) + + class Meta: + model = ProcessingService + fields = [ + "id", + "details", + "name", + "description", + "projects", + "project", + "endpoint_url", + "pipelines", + "created_at", + "updated_at", + "last_checked", + "last_checked_live", + ] + + def create(self, validated_data): + project = validated_data.pop("project", None) + instance = super().create(validated_data) + + if project: + instance.projects.add(project) + + return instance diff --git a/ami/ml/tests.py b/ami/ml/tests.py index c9f9f727c..b2dc5fb70 100644 --- a/ami/ml/tests.py +++ b/ami/ml/tests.py @@ -1,10 +1,12 @@ import datetime from django.test import TestCase +from rest_framework.test import APIRequestFactory, APITestCase from rich import print +from ami.base.serializers import reverse_with_params from ami.main.models import Classification, Detection, Project, SourceImage, SourceImageCollection -from ami.ml.models import Algorithm, Pipeline +from ami.ml.models import Algorithm, Pipeline, ProcessingService from ami.ml.models.pipeline import collect_images, save_results from ami.ml.schemas import ( BoundingBox, @@ -13,19 +15,101 @@ PipelineResponse, SourceImageResponse, ) -from ami.tests.fixtures.main import create_captures_from_files, create_ml_pipeline, setup_test_project +from ami.tests.fixtures.main import create_captures_from_files, create_processing_service, setup_test_project +from ami.users.models import User -class TestPipelineWithMLBackend(TestCase): +class TestProcessingServiceAPI(APITestCase): + """ + Test the Processing Services API endpoints. + """ + + def setUp(self): + self.project = Project.objects.create(name="Processing Service Test Project") + + self.user = User.objects.create_user( # type: ignore + email="testuser@insectai.org", + is_staff=True, + ) + self.factory = APIRequestFactory() + + def _create_processing_service(self, name: str, endpoint_url: str): + processing_services_create_url = reverse_with_params("api:processingservice-list") + self.client.force_authenticate(user=self.user) + processing_service_data = { + "project": self.project.pk, + "name": name, + "endpoint_url": endpoint_url, + } + resp = self.client.post(processing_services_create_url, processing_service_data) + self.client.force_authenticate(user=None) + self.assertEqual(resp.status_code, 201) + return resp.json() + + def _delete_processing_service(self, processing_service_id: int): + processing_services_delete_url = reverse_with_params( + "api:processing-service-detail", kwargs={"pk": processing_service_id} + ) + self.client.force_authenticate(user=self.user) + resp = self.client.delete(processing_services_delete_url) + self.client.force_authenticate(user=None) + self.assertEqual(resp.status_code, 204) + return resp + + def _register_pipelines(self, processing_service_id): + processing_services_register_pipelines_url = reverse_with_params( + "api:processingservice-register-pipelines", args=[processing_service_id] + ) + self.client.force_authenticate(user=self.user) + resp = self.client.post(processing_services_register_pipelines_url) + data = resp.json() + self.assertEqual(data["success"], True) + return data + + def test_create_processing_service(self): + self._create_processing_service( + name="Processing Service Test", + endpoint_url="http://processing_service:2000", + ) + + def test_project_was_added(self): + response = self._create_processing_service( + name="Processing Service Test", + endpoint_url="http://processing_service:2000", + ) + processing_service_id = response["id"] + processing_service = ProcessingService.objects.get(pk=processing_service_id) + self.assertIn(self.project, processing_service.projects.all()) + + def test_processing_service_pipeline_registration(self): + # register a processing service + response = self._create_processing_service( + name="Processing Service Test", + endpoint_url="http://processing_service:2000", + ) + processing_service_id = response["id"] + + # sync the processing service to create/add the associate pipelines + response = self._register_pipelines(processing_service_id) + processing_service = ProcessingService.objects.get(pk=processing_service_id) + pipelines_queryset = processing_service.pipelines.all() + + self.assertEqual(pipelines_queryset.count(), len(response["pipelines"])) + + +class TestPipelineWithProcessingService(TestCase): def setUp(self): self.project, self.deployment = setup_test_project() self.captures = create_captures_from_files(self.deployment, skip_existing=False) self.test_images = [image for image, frame in self.captures] - self.pipeline = create_ml_pipeline(self.project) + self.processing_service_instance = create_processing_service(self.project) + self.processing_service = self.processing_service_instance + self.pipeline = self.processing_service_instance.pipelines.all().filter(slug="constant").first() def test_run_pipeline(self): - # Send images to ML backend to process and return detections - pipeline_response = self.pipeline.process_images(self.test_images) + # Send images to Processing Service to process and return detections + assert self.pipeline + pipeline_response = self.pipeline.process_images(self.test_images, job_id=None) assert pipeline_response.detections @@ -172,7 +256,7 @@ def _test_skip_existing_per_batch_during_processing(self): # @TODO enable test when a pipeline is added to the CI environment in PR #576 pass - def test_unknown_algorithm_returned_by_backend(self): + def test_unknown_algorithm_returned_by_processing_service(self): fake_results = self.fake_pipeline_results(self.test_images, self.pipeline) new_detector_name = "Unknown Detector 5.1b-mobile" @@ -218,7 +302,7 @@ def no_test_reprocessing_after_unknown_algorithm_added(self): # print(fake_results) # print("END FAKE RESULTS") - saved_objects = save_results(fake_results) + saved_objects = save_results(fake_results) or [] saved_detections = [obj for obj in saved_objects if isinstance(obj, Detection)] saved_classifications = [obj for obj in saved_objects if isinstance(obj, Classification)] diff --git a/ami/ml/views.py b/ami/ml/views.py index 2ee4d8dd0..0ffafa525 100644 --- a/ami/ml/views.py +++ b/ami/ml/views.py @@ -1,12 +1,23 @@ +import logging + from django.db.models.query import QuerySet +from django.utils.text import slugify from drf_spectacular.utils import extend_schema +from rest_framework import status +from rest_framework.decorators import action +from rest_framework.request import Request +from rest_framework.response import Response from ami.main.api.views import DefaultViewSet +from ami.main.models import SourceImage from ami.utils.requests import get_active_project, project_id_doc_param from .models.algorithm import Algorithm from .models.pipeline import Pipeline -from .serializers import AlgorithmSerializer, PipelineSerializer +from .models.processing_service import ProcessingService +from .serializers import AlgorithmSerializer, PipelineSerializer, ProcessingServiceSerializer + +logger = logging.getLogger(__name__) class AlgorithmViewSet(DefaultViewSet): @@ -54,3 +65,62 @@ def list(self, request, *args, **kwargs): # Don't enable projects filter until we can use the current users # membership to filter the projects. # filterset_fields = ["projects"] + + @action(detail=True, methods=["post"]) + def test_process(self, request: Request, pk=None) -> Response: + """ + Process images using the pipeline. + """ + pipeline = Pipeline.objects.get(pk=pk) + random_image = ( + SourceImage.objects.all().order_by("?").first() + ) # TODO: Filter images by projects user has access to + if not random_image: + return Response({"error": "No image found to process."}, status=status.HTTP_404_NOT_FOUND) + results = pipeline.process_images(images=[random_image], job_id=None) + return Response(results.dict()) + + +class ProcessingServiceViewSet(DefaultViewSet): + """ + API endpoint that allows processing services to be viewed or edited. + """ + + queryset = ProcessingService.objects.all() + serializer_class = ProcessingServiceSerializer + filterset_fields = ["projects"] + ordering_fields = ["id", "created_at", "updated_at"] + + def get_queryset(self) -> QuerySet: + query_set: QuerySet = super().get_queryset() + project = get_active_project(self.request) + if project: + query_set = query_set.filter(projects=project) + return query_set + + @extend_schema(parameters=[project_id_doc_param]) + def list(self, request, *args, **kwargs): + return super().list(request, *args, **kwargs) + + def create(self, request, *args, **kwargs): + data = request.data.copy() + data["slug"] = slugify(data["name"]) + serializer = self.get_serializer(data=data) + serializer.is_valid(raise_exception=True) + self.perform_create(serializer) + return Response(serializer.data, status=status.HTTP_201_CREATED) + + @action(detail=True, methods=["get"]) + def status(self, request: Request, pk=None) -> Response: + """ + Test the connection to the processing service. + """ + processing_service = ProcessingService.objects.get(pk=pk) + response = processing_service.get_status() + return Response(response.dict()) + + @action(detail=True, methods=["post"]) + def register_pipelines(self, request: Request, pk=None) -> Response: + processing_service = ProcessingService.objects.get(pk=pk) + response = processing_service.create_pipelines() + return Response(response.dict()) diff --git a/ami/tests/fixtures/main.py b/ami/tests/fixtures/main.py index de85bf339..9f4a0613e 100644 --- a/ami/tests/fixtures/main.py +++ b/ami/tests/fixtures/main.py @@ -21,6 +21,7 @@ TaxonRank, group_images_into_events, ) +from ami.ml.models.processing_service import ProcessingService from ami.ml.tasks import create_detection_images from ami.tests.fixtures.storage import GeneratedTestFrame, create_storage_source, populate_bucket @@ -37,44 +38,35 @@ def update_site_settings(**kwargs): return site -def create_ml_pipeline(project): - from ami.ml.models import Algorithm, Pipeline - - pipelines_to_add = [ - { - "name": "ML Dummy Backend", - "slug": "dummy", - "version": 1, - "algorithms": [ - {"name": "Dummy Detector", "key": 1}, - {"name": "Random Detector", "key": 2}, - {"name": "Always Moth Classifier", "key": 3}, - ], - "projects": {"name": project.name}, - "endpoint_url": "http://ml_backend:2000/pipeline/process", - }, - ] - - for pipeline_data in pipelines_to_add: - pipeline, created = Pipeline.objects.get_or_create( - name=pipeline_data["name"], - slug=pipeline_data["slug"], - version=pipeline_data["version"], - endpoint_url=pipeline_data["endpoint_url"], - ) +def create_processing_service(project): + processing_service_to_add = { + "name": "Test Processing Service", + "projects": [{"name": project.name}], + "endpoint_url": "http://processing_service:2000", + } - if created: - logger.info(f'Successfully created {pipeline_data["name"]}.') - else: - logger.info(f'Using existing pipeline {pipeline_data["name"]}.') + processing_service, created = ProcessingService.objects.get_or_create( + name=processing_service_to_add["name"], + endpoint_url=processing_service_to_add["endpoint_url"], + ) + processing_service.save() - for algorithm_data in pipeline_data["algorithms"]: - algorithm, _ = Algorithm.objects.get_or_create(name=algorithm_data["name"], key=algorithm_data["key"]) - pipeline.algorithms.add(algorithm) - pipeline.projects.add(project) - pipeline.save() + if created: + logger.info(f'Successfully created processing service with {processing_service_to_add["endpoint_url"]}.') + else: + logger.info(f'Using existing processing service with {processing_service_to_add["endpoint_url"]}.') - return pipeline + for project_data in processing_service_to_add["projects"]: + try: + project = Project.objects.get(name=project_data["name"]) + processing_service.projects.add(project) + processing_service.save() + except Exception: + logger.error(f'Could not find project {project_data["name"]}.') + + processing_service.create_pipelines() + + return processing_service def create_deployment( @@ -108,7 +100,7 @@ def create_test_project(name: str | None) -> Project: project = Project.objects.create(name=name) data_source = create_storage_source(project, f"Test Data Source {short_id}", prefix=f"{short_id}") create_deployment(project, data_source, f"Test Deployment {short_id}") - create_ml_pipeline(project) + create_processing_service(project) return project @@ -273,6 +265,7 @@ def create_detections( timestamp=source_image.timestamp, bbox=bbox, ) + assert source_image.deployment taxon = Taxon.objects.filter(projects=source_image.deployment.project).order_by("?").first() if taxon: detection.classifications.create( diff --git a/config/api_router.py b/config/api_router.py index 8950a3820..0d45dcb50 100644 --- a/config/api_router.py +++ b/config/api_router.py @@ -29,6 +29,7 @@ router.register(r"taxa", views.TaxonViewSet) router.register(r"ml/algorithms", ml_views.AlgorithmViewSet) router.register(r"ml/pipelines", ml_views.PipelineViewSet) +router.register(r"ml/processing_services", ml_views.ProcessingServiceViewSet) router.register(r"classifications", views.ClassificationViewSet) router.register(r"identifications", views.IdentificationViewSet) router.register(r"jobs", job_views.JobViewSet) diff --git a/config/settings/base.py b/config/settings/base.py index 7bf527163..acd2baf5f 100644 --- a/config/settings/base.py +++ b/config/settings/base.py @@ -395,6 +395,8 @@ # Your stuff... # ------------------------------------------------------------------------------ +DEFAULT_CONFIDENCE_THRESHOLD = env.float("DEFAULT_CONFIDENCE_THRESHOLD", default=0.6) # type: ignore[no-untyped-call] + S3_TEST_ENDPOINT = env("MINIO_ENDPOINT", default="http://minio:9000") # type: ignore[no-untyped-call] S3_TEST_KEY = env("MINIO_ROOT_USER", default=None) # type: ignore[no-untyped-call] S3_TEST_SECRET = env("MINIO_ROOT_PASSWORD", default=None) # type: ignore[no-untyped-call] diff --git a/docker-compose.ci.yml b/docker-compose.ci.yml index 613a7cb5b..f3c6d53cc 100644 --- a/docker-compose.ci.yml +++ b/docker-compose.ci.yml @@ -8,7 +8,7 @@ services: - postgres - redis - minio-init - - ml_backend + - processing_service env_file: - ./.envs/.ci/.django - ./.envs/.ci/.postgres @@ -43,10 +43,10 @@ services: - ./compose/local/minio/init.sh:/etc/minio/init.sh entrypoint: /etc/minio/init.sh - ml_backend: + processing_service: build: - context: ./ml_backends/example + context: ./processing_services/example volumes: - - ./ml_backends/example/:/app:ml_backend + - ./processing_services/example/:/app:processing_service ports: - "2000:2000" diff --git a/docker-compose.yml b/docker-compose.yml index 4b8878fc4..347c39248 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -23,7 +23,7 @@ services: - postgres - redis - minio-init - - ml_backend + - processing_service volumes: - .:/app:z env_file: @@ -140,10 +140,10 @@ services: - ./compose/local/minio/init.sh:/etc/minio/init.sh entrypoint: /etc/minio/init.sh - ml_backend: + processing_service: build: - context: ./ml_backends/example + context: ./processing_services/example volumes: - - ./ml_backends/example/:/app:ml_backend + - ./processing_services/example/:/app:processing_service ports: - "2005:2000" diff --git a/ml_backends/docker-compose.yml b/processing_services/docker-compose.yml similarity index 100% rename from ml_backends/docker-compose.yml rename to processing_services/docker-compose.yml diff --git a/ml_backends/example/Dockerfile b/processing_services/example/Dockerfile similarity index 100% rename from ml_backends/example/Dockerfile rename to processing_services/example/Dockerfile diff --git a/ml_backends/example/api/__init__.py b/processing_services/example/api/__init__.py similarity index 100% rename from ml_backends/example/api/__init__.py rename to processing_services/example/api/__init__.py diff --git a/ml_backends/example/api/api.py b/processing_services/example/api/api.py similarity index 59% rename from ml_backends/example/api/api.py rename to processing_services/example/api/api.py index 245aa2862..e7c0d27a8 100644 --- a/ml_backends/example/api/api.py +++ b/processing_services/example/api/api.py @@ -7,13 +7,12 @@ import fastapi -from .pipeline import DummyPipeline +from .pipeline import ConstantPipeline, DummyPipeline from .schemas import ( + AlgorithmConfig, PipelineConfig, PipelineRequest, PipelineResponse, - PipelineStage, - PipelineStageParam, SourceImage, SourceImageResponse, ) @@ -22,19 +21,29 @@ app = fastapi.FastAPI() -pipeline = PipelineConfig( - name="Pipeline 1", - slug="pipeline1", - stages=[ - PipelineStage( - name="Stage 1", - key="stage1", - params=[PipelineStageParam(name="Panama Moths", key="panama", category="Classifier")], - ) +pipeline1 = PipelineConfig( + name="ML Dummy Pipeline", + slug="dummy", + version=1, + algorithms=[ + AlgorithmConfig(name="Dummy Detector", key="1"), + AlgorithmConfig(name="Random Detector", key="2"), + AlgorithmConfig(name="Always Moth Classifier", key="3"), + ], +) + +pipeline2 = PipelineConfig( + name="ML Constant Pipeline", + slug="constant", + version=1, + algorithms=[ + AlgorithmConfig(name="Dummy Detector", key="1"), + AlgorithmConfig(name="Random Detector", key="2"), + AlgorithmConfig(name="Always Moth Classifier", key="3"), ], ) -pipelines = [pipeline] +pipelines = [pipeline1, pipeline2] @app.get("/") @@ -50,26 +59,34 @@ async def info() -> list[PipelineConfig]: # Check if the server is online @app.get("/livez", tags=["health checks"]) async def livez(): - return fastapi.responses.JSONResponse(status_code=200, content={"status": "ok"}) + return fastapi.responses.JSONResponse(status_code=200, content={"status": True}) -# Check if the server is ready to process data +# Check if the pipelines are ready to process data @app.get("/readyz", tags=["health checks"]) async def readyz(): if pipelines: - return fastapi.responses.JSONResponse(status_code=200, content={"status": "ok"}) + return fastapi.responses.JSONResponse( + status_code=200, content={"status": [pipeline.slug for pipeline in pipelines]} + ) else: return fastapi.responses.JSONResponse(status_code=503, content={"status": "pipelines unavailable"}) -@app.post("/pipeline/process", tags=["services"]) # @TODO: Future change use @app.post("/{pipeline_name}/process/") +@app.post("/process_images", tags=["services"]) async def process(data: PipelineRequest) -> PipelineResponse: + pipeline_slug = data.pipeline + source_image_results = [SourceImageResponse(**image.model_dump()) for image in data.source_images] source_images = [SourceImage(**image.model_dump()) for image in data.source_images] start_time = time.time() - pipeline = DummyPipeline(source_images=source_images) + if pipeline_slug == "constant": + pipeline = ConstantPipeline(source_images=source_images) # returns same detections + else: + pipeline = DummyPipeline(source_images=source_images) # returns random detections + try: results = pipeline.run() except Exception as e: diff --git a/ml_backends/example/api/pipeline.py b/processing_services/example/api/pipeline.py similarity index 67% rename from ml_backends/example/api/pipeline.py rename to processing_services/example/api/pipeline.py index 51c7acd53..a19076949 100644 --- a/ml_backends/example/api/pipeline.py +++ b/processing_services/example/api/pipeline.py @@ -80,6 +80,36 @@ def make_fake_detections(source_image: SourceImage, num_detections: int = 10): ] +def make_constant_detections(source_image: SourceImage, num_detections: int = 10): + source_image.open(raise_exception=True) + assert source_image.width is not None and source_image.height is not None + + # Define a fixed bounding box size and position relative to image size + box_width, box_height = source_image.width // 4, source_image.height // 4 + start_x, start_y = source_image.width // 8, source_image.height // 8 + bboxes = [BoundingBox(x1=start_x, y1=start_y, x2=start_x + box_width, y2=start_y + box_height)] + timestamp = datetime.datetime.now() + + return [ + Detection( + source_image_id=source_image.id, + bbox=bbox, + timestamp=timestamp, + algorithm="Fixed Detector", + classifications=[ + Classification( + classification="moth", + labels=["moth"], + scores=[0.9], # Constant score for each detection + timestamp=timestamp, + algorithm="Always Moth Classifier", + ) + ], + ) + for bbox in bboxes + ] + + class DummyPipeline: source_images: list[SourceImage] @@ -90,3 +120,15 @@ def run(self) -> list[Detection]: results = [make_fake_detections(source_image) for source_image in self.source_images] # Flatten the list of lists return [item for sublist in results for item in sublist] + + +class ConstantPipeline: + source_images: list[SourceImage] + + def __init__(self, source_images: list[SourceImage]): + self.source_images = source_images + + def run(self) -> list[Detection]: + results = [make_constant_detections(source_image) for source_image in self.source_images] + # Flatten the list of lists + return [item for sublist in results for item in sublist] diff --git a/ml_backends/example/api/schemas.py b/processing_services/example/api/schemas.py similarity index 95% rename from ml_backends/example/api/schemas.py rename to processing_services/example/api/schemas.py index 81751d5cd..adb4a16ee 100644 --- a/ml_backends/example/api/schemas.py +++ b/processing_services/example/api/schemas.py @@ -102,7 +102,7 @@ class SourceImageResponse(pydantic.BaseModel): url: str -PipelineChoice = typing.Literal["dummy"] +PipelineChoice = typing.Literal["dummy", "constant"] class PipelineRequest(pydantic.BaseModel): @@ -148,10 +148,17 @@ class PipelineStage(pydantic.BaseModel): description: str | None = None +class AlgorithmConfig(pydantic.BaseModel): + name: str + key: str + + class PipelineConfig(pydantic.BaseModel): """A configurable pipeline.""" name: str slug: str + version: int description: str | None = None + algorithms: list[AlgorithmConfig] = [] stages: list[PipelineStage] = [] diff --git a/ml_backends/example/api/test.py b/processing_services/example/api/test.py similarity index 100% rename from ml_backends/example/api/test.py rename to processing_services/example/api/test.py diff --git a/ml_backends/example/api/utils.py b/processing_services/example/api/utils.py similarity index 100% rename from ml_backends/example/api/utils.py rename to processing_services/example/api/utils.py diff --git a/ml_backends/example/main.py b/processing_services/example/main.py similarity index 100% rename from ml_backends/example/main.py rename to processing_services/example/main.py diff --git a/ml_backends/example/requirements.txt b/processing_services/example/requirements.txt similarity index 100% rename from ml_backends/example/requirements.txt rename to processing_services/example/requirements.txt diff --git a/ui/src/data-services/constants.ts b/ui/src/data-services/constants.ts index 01e5b9b1d..3a7a3f723 100644 --- a/ui/src/data-services/constants.ts +++ b/ui/src/data-services/constants.ts @@ -1,6 +1,7 @@ export const API_URL = '/api/v2' export const API_ROUTES = { + PROCESSING_SERVICES: 'ml/processing_services', ALGORITHM: 'ml/algorithms', CAPTURES: 'captures', COLLECTIONS: 'captures/collections', diff --git a/ui/src/data-services/hooks/processing-services/usePopulateProcessingService.ts b/ui/src/data-services/hooks/processing-services/usePopulateProcessingService.ts new file mode 100644 index 000000000..4e87b9278 --- /dev/null +++ b/ui/src/data-services/hooks/processing-services/usePopulateProcessingService.ts @@ -0,0 +1,26 @@ +import { useMutation, useQueryClient } from '@tanstack/react-query' +import axios from 'axios' +import { API_ROUTES, API_URL } from 'data-services/constants' +import { getAuthHeader } from 'data-services/utils' +import { useUser } from 'utils/user/userContext' + +export const usePopulateProcessingService = () => { + const { user } = useUser() + const queryClient = useQueryClient() + + const { mutateAsync, isLoading, isSuccess, error } = useMutation({ + mutationFn: (id: string) => + axios.post<{ id: number }>( + `${API_URL}/${API_ROUTES.PROCESSING_SERVICES}/${id}/register_pipelines/`, + undefined, + { + headers: getAuthHeader(user), + } + ), + onSuccess: () => { + queryClient.invalidateQueries([API_ROUTES.PROCESSING_SERVICES]) + }, + }) + + return { populateProcessingService: mutateAsync, isLoading, isSuccess, error } +} diff --git a/ui/src/data-services/hooks/processing-services/useProcessingServiceDetails.ts b/ui/src/data-services/hooks/processing-services/useProcessingServiceDetails.ts new file mode 100644 index 000000000..66aee8ab8 --- /dev/null +++ b/ui/src/data-services/hooks/processing-services/useProcessingServiceDetails.ts @@ -0,0 +1,37 @@ +import { API_ROUTES, API_URL } from 'data-services/constants' +import { + ProcessingService, + ServerProcessingService, +} from 'data-services/models/processing-service' +import { useMemo } from 'react' +import { useAuthorizedQuery } from '../auth/useAuthorizedQuery' + +const convertServerRecord = (record: ServerProcessingService) => + new ProcessingService(record) + +export const useProcessingServiceDetails = ( + processingServiceId: string +): { + processingService?: ProcessingService + isLoading: boolean + isFetching: boolean + error?: unknown +} => { + const { data, isLoading, isFetching, error } = + useAuthorizedQuery({ + queryKey: [API_ROUTES.PROCESSING_SERVICES, processingServiceId], + url: `${API_URL}/${API_ROUTES.PROCESSING_SERVICES}/${processingServiceId}/`, + }) + + const processingService = useMemo( + () => (data ? convertServerRecord(data) : undefined), + [data] + ) + + return { + processingService: processingService, + isLoading, + isFetching, + error, + } +} diff --git a/ui/src/data-services/hooks/processing-services/useProcessingServices.ts b/ui/src/data-services/hooks/processing-services/useProcessingServices.ts new file mode 100644 index 000000000..d8ee93ab2 --- /dev/null +++ b/ui/src/data-services/hooks/processing-services/useProcessingServices.ts @@ -0,0 +1,49 @@ +import { API_ROUTES } from 'data-services/constants' +import { + ProcessingService, + ServerProcessingService, +} from 'data-services/models/processing-service' +import { FetchParams } from 'data-services/types' +import { getFetchUrl } from 'data-services/utils' +import { useMemo } from 'react' +import { UserPermission } from 'utils/user/types' +import { useAuthorizedQuery } from '../auth/useAuthorizedQuery' + +const convertServerRecord = (record: ServerProcessingService) => + new ProcessingService(record) + +export const useProcessingServices = ( + params?: FetchParams +): { + items?: ProcessingService[] + total: number + userPermissions?: UserPermission[] + isLoading: boolean + isFetching: boolean + error?: unknown +} => { + const fetchUrl = getFetchUrl({ + collection: API_ROUTES.PROCESSING_SERVICES, + params, + }) + + const { data, isLoading, isFetching, error } = useAuthorizedQuery<{ + results: ProcessingService[] + user_permissions?: UserPermission[] + count: number + }>({ + queryKey: [API_ROUTES.PROCESSING_SERVICES, params], + url: fetchUrl, + }) + + const items = useMemo(() => data?.results.map(convertServerRecord), [data]) + + return { + items, + total: data?.count ?? 0, + userPermissions: data?.user_permissions, + isLoading, + isFetching, + error, + } +} diff --git a/ui/src/data-services/hooks/processing-services/useTestProcessingServiceConnection.ts b/ui/src/data-services/hooks/processing-services/useTestProcessingServiceConnection.ts new file mode 100644 index 000000000..dd7b9861a --- /dev/null +++ b/ui/src/data-services/hooks/processing-services/useTestProcessingServiceConnection.ts @@ -0,0 +1,42 @@ +import { useMutation, useQueryClient } from '@tanstack/react-query' +import axios, { AxiosError } from 'axios' +import { API_ROUTES, API_URL } from 'data-services/constants' +import { APIValidationError } from 'data-services/types' + +interface ResponseData { + request_successful: boolean + server_live: boolean + pipelines_online: [] + error_code: number | null + error_message: string | null + prefix_exists: boolean +} + +export const useTestProcessingServiceConnection = () => { + const queryClient = useQueryClient() + + const { data, mutateAsync, isLoading, isSuccess, error } = useMutation({ + mutationFn: (params: { id: string; subdir?: string; regex?: string }) => + axios.get( + `${API_URL}/${API_ROUTES.PROCESSING_SERVICES}/${params.id}/status/` + ), + onSuccess: () => { + queryClient.invalidateQueries([API_ROUTES.PROCESSING_SERVICES]) + }, + onError: (error: AxiosError) => error, + }) + + let validationError = null + if (error && error.response?.status === 400) { + validationError = error.response?.data as APIValidationError + } + + return { + data: data?.data, + testProcessingServiceConnection: mutateAsync, + isLoading, + isSuccess, + error, + validationError, + } +} diff --git a/ui/src/data-services/models/pipeline.ts b/ui/src/data-services/models/pipeline.ts index 1e72dc224..aa5f62701 100644 --- a/ui/src/data-services/models/pipeline.ts +++ b/ui/src/data-services/models/pipeline.ts @@ -81,4 +81,30 @@ export class Pipeline { date: new Date(this._pipeline.updated_at), }) } + + get processingServicesOnline(): string { + const processingServices = this._pipeline.processing_services + let total_online = 0 + for (const processingService of processingServices) { + if (processingService.last_checked_live) { + total_online += 1 + } + } + + return total_online + '/' + processingServices.length + } + + get processingServicesOnlineLastChecked(): string | undefined { + const processingServices = this._pipeline.processing_services + const last_checked_times = [] + for (const processingService of processingServices) { + last_checked_times.push( + new Date(processingService.last_checked).getTime() + ) + } + + return getFormatedDateTimeString({ + date: new Date(Math.max(...last_checked_times)), + }) + } } diff --git a/ui/src/data-services/models/processing-service.ts b/ui/src/data-services/models/processing-service.ts new file mode 100644 index 000000000..e6bafea72 --- /dev/null +++ b/ui/src/data-services/models/processing-service.ts @@ -0,0 +1,71 @@ +import { getFormatedDateTimeString } from 'utils/date/getFormatedDateTimeString/getFormatedDateTimeString' +import { Entity } from './entity' +import { Pipeline, ServerPipeline } from './pipeline' + +export type ServerProcessingService = any // TODO: Update this type + +export class ProcessingService extends Entity { + protected readonly _processingService: ServerProcessingService + protected readonly _pipelines: Pipeline[] = [] + + public constructor(processingService: ServerProcessingService) { + super(processingService) + this._processingService = processingService + + if (processingService.pipelines) { + this._pipelines = processingService.pipelines.map( + (pipeline: ServerPipeline) => new Pipeline(pipeline) + ) + } + } + + get pipelines(): Pipeline[] { + return this._pipelines + } + + get createdAt(): string { + return getFormatedDateTimeString({ + date: new Date(this._processingService.created_at), + }) + } + + get id(): string { + return `${this._processingService.id}` + } + + get name(): string { + return `${this._processingService.name}` + } + + get endpointUrl(): string { + return `${this._processingService.endpoint_url}` + } + + get description(): string { + return `${this._processingService.description}` + } + + get updatedAt(): string | undefined { + if (!this._processingService.updated_at) { + return undefined + } + + return getFormatedDateTimeString({ + date: new Date(this._processingService.updated_at), + }) + } + + get lastChecked(): string | undefined { + if (!this._processingService.last_checked) { + return undefined + } + + return getFormatedDateTimeString({ + date: new Date(this._processingService.last_checked), + }) + } + + get num_piplines_added(): number { + return this._pipelines.length + } +} diff --git a/ui/src/pages/overview/entities/details-form/constants.ts b/ui/src/pages/overview/entities/details-form/constants.ts index f4ae75538..3784bf761 100644 --- a/ui/src/pages/overview/entities/details-form/constants.ts +++ b/ui/src/pages/overview/entities/details-form/constants.ts @@ -1,5 +1,6 @@ -import { StorageDetailsForm } from './storage-details-form' import { CollectionDetailsForm } from './collection-details-form' +import { ProcessingServiceDetailsForm } from './processing-service-details-form' +import { StorageDetailsForm } from './storage-details-form' import { DetailsFormProps } from './types' export const customFormMap: { @@ -7,4 +8,5 @@ export const customFormMap: { } = { storage: StorageDetailsForm, collection: CollectionDetailsForm, + service: ProcessingServiceDetailsForm, } diff --git a/ui/src/pages/overview/entities/details-form/processing-service-details-form.tsx b/ui/src/pages/overview/entities/details-form/processing-service-details-form.tsx new file mode 100644 index 000000000..d8968635e --- /dev/null +++ b/ui/src/pages/overview/entities/details-form/processing-service-details-form.tsx @@ -0,0 +1,125 @@ +import { FormField } from 'components/form/form-field' +import { + FormActions, + FormError, + FormRow, + FormSection, +} from 'components/form/layout/layout' +import { FormConfig } from 'components/form/types' +import { ProcessingService } from 'data-services/models/processing-service' +import { Button, ButtonTheme } from 'design-system/components/button/button' +import { IconType } from 'design-system/components/icon/icon' +import { ConnectionStatus } from 'pages/overview/processing-services/connection-status' +import { useForm } from 'react-hook-form' +import { STRING, translate } from 'utils/language' +import { useFormError } from 'utils/useFormError' +import { DetailsFormProps, FormValues } from './types' + +type ProcessingServiceFormValues = FormValues & { + endpoint_url: string +} + +const config: FormConfig = { + name: { + label: translate(STRING.FIELD_LABEL_NAME), + description: 'A descriptive name for internal reference.', + rules: { + required: true, + }, + }, + endpoint_url: { + label: 'Endpoint URL', + description: 'Processing service endpoint.', + rules: { + required: true, + }, + }, + description: { + label: translate(STRING.FIELD_LABEL_DESCRIPTION), + }, +} + +export const ProcessingServiceDetailsForm = ({ + entity, + error, + isLoading, + isSuccess, + onSubmit, +}: DetailsFormProps) => { + const processingService = entity as ProcessingService | undefined + const { + control, + handleSubmit, + setError: setFieldError, + } = useForm({ + defaultValues: { + name: processingService?.name, + endpoint_url: processingService?.endpointUrl, + description: processingService?.description ?? '', + }, + mode: 'onChange', + }) + + const errorMessage = useFormError({ error, setFieldError }) + + return ( +
+ onSubmit({ + name: values.name, + description: values.description, + customFields: { + endpoint_url: values.endpoint_url, + }, + }) + )} + > + {errorMessage && ( + + )} + + + + + + + + + {processingService?.id && ( + + )} + + +