From 2297bad2acca2e17ee953eea23c057be95cb72c3 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Thu, 30 Nov 2023 13:49:57 -0800 Subject: [PATCH] Add limit & shuffle params. Improve status logging. --- ami/jobs/models.py | 96 +++++++++++++++++++++++++++++---------- ami/ml/models/pipeline.py | 10 ++++ 2 files changed, 82 insertions(+), 24 deletions(-) diff --git a/ami/jobs/models.py b/ami/jobs/models.py index f5b3479bd..3d1822f1b 100644 --- a/ami/jobs/models.py +++ b/ami/jobs/models.py @@ -1,5 +1,6 @@ import datetime import logging +import random import time import typing @@ -59,6 +60,11 @@ def get_status_label(status: JobState, progress: float) -> str: return f"{status.name}" +def python_slugify(value: str) -> str: + # Use underscore instead of dash so we can use them as python property names + return slugify(value, allow_unicode=False).replace("-", "_") + + ML_API_ENDPOINT = "http://host.docker.internal:2000/pipeline/process/" @@ -99,7 +105,7 @@ class JobProgress(pydantic.BaseModel): def add_stage(self, name: str) -> JobProgressStageDetail: stage = JobProgressStageDetail( - key=slugify(name), + key=python_slugify(name), name=name, ) self.stages.append(stage) @@ -122,7 +128,7 @@ def add_stage_param(self, stage_key: str, name: str, value: typing.Any = None) - stage = self.get_stage(stage_key) param = ConfigurableStageParam( name=name, - key=slugify(name), + key=python_slugify(name), value=value, ) stage.params.append(param) @@ -130,7 +136,7 @@ def add_stage_param(self, stage_key: str, name: str, value: typing.Any = None) - def add_or_update_stage_param(self, stage_key: str, name: str, value: typing.Any = None) -> ConfigurableStageParam: try: - param = self.get_stage_param(stage_key, slugify(name)) + param = self.get_stage_param(stage_key, python_slugify(name)) param.value = value return param except ValueError: @@ -143,6 +149,7 @@ def update_stage(self, stage_key: str, **stage_parameters) -> JobProgressStageDe Will update parameters that are direct attributes of the stage, or parameters that are in the stage's params list. """ + stage_key = python_slugify(stage_key) # Allow both title or key to be used for lookup stage = self.get_stage(stage_key) if stage.key == stage_key: @@ -244,6 +251,10 @@ class Job(BaseModel): result = models.JSONField(null=True, blank=True) task_id = models.CharField(max_length=255, null=True, blank=True) delay = models.IntegerField("Delay in seconds", default=0, help_text="Delay before running the job") + limit = models.IntegerField( + "Limit", null=True, blank=True, default=100, help_text="Limit the number of images to process" + ) + shuffle = models.BooleanField("Shuffle", default=True, help_text="Process images in a random order") project = models.ForeignKey( Project, @@ -302,9 +313,24 @@ def setup(self, save=True): self.progress = self.progress or default_job_progress if self.delay: - self.progress.add_stage("Delay") - self.progress.add_stage_param("delay", "Delay", self.delay) - self.progress.add_stage_param("delay", "Mood", "😴") + delay_stage = self.progress.add_stage("Delay") + self.progress.add_stage_param(delay_stage.key, "Delay", self.delay) + self.progress.add_stage_param(delay_stage.key, "Mood", "😴") + + if self.pipeline: + collect_stage = self.progress.add_stage("Collect") + self.progress.add_stage_param(collect_stage.key, "Total Images", "") + self.progress.add_stage_param(collect_stage.key, "Proccessed", "") + self.progress.add_stage_param(collect_stage.key, "Remaining", "") + + pipeline_stage = self.progress.add_stage("Process") + self.progress.add_stage_param(pipeline_stage.key, "Detections", "") + self.progress.add_stage_param(pipeline_stage.key, "Classifications", "") + + saving_stage = self.progress.add_stage("Results") + self.progress.add_stage_param(saving_stage.key, "Objects created", "") + + self.save() if save: self.save() @@ -347,21 +373,12 @@ def run(self): self.save() if self.pipeline: - pipeline_stage = self.progress.add_stage("Pipeline") - self.progress.add_stage_param(pipeline_stage.key, "Detections", "") - self.progress.add_stage_param(pipeline_stage.key, "Classifications", "") - - saving_stage = self.progress.add_stage("Saving results") - self.progress.add_stage_param(saving_stage.key, "objects_created", "") - self.progress.update_stage( - pipeline_stage.key, + "collect", status=JobState.STARTED, progress=0, ) - self.save() - TEMPORARY_MAX_IMAGES = 100 images = list( # @TODO return generator plus image count # @TODO pass to celery group chain? @@ -369,15 +386,35 @@ def run(self): collection=self.source_image_collection, deployment=self.deployment, source_images=[self.source_image_single] if self.source_image_single else None, + job_id=self.pk, + # shuffle=self.shuffle, ) ) image_count = len(images) + image_count_label = f"{image_count}" + + if self.shuffle: + self.logger.info("Shuffling images") + random.shuffle(images) - if image_count > TEMPORARY_MAX_IMAGES: - self.logger.warn(f"Limiting number of images to {TEMPORARY_MAX_IMAGES}") - images = images[:TEMPORARY_MAX_IMAGES] + # @TODO remove this temporary limit + TEMPORARY_LIMIT = 200 + self.limit = self.limit or TEMPORARY_LIMIT - self.progress.add_stage_param(pipeline_stage.key, "Source images", image_count) + if self.limit and image_count > self.limit: + self.logger.warn(f"Limiting number of images to {self.limit} (out of {image_count})") + images = images[: self.limit] + image_count = len(images) + image_count_label = f"{image_count} (out of {len(images)})" + + self.progress.update_stage( + "collect", + status=JobState.SUCCESS, + progress=1, + total=image_count_label, + proccessed=0, + remaining=image_count, + ) total_detections = 0 total_classifications = 0 @@ -393,7 +430,7 @@ def run(self): total_detections += len(results.detections) total_classifications += len(results.classifications) self.progress.update_stage( - pipeline_stage.key, + "process", status=JobState.STARTED, progress=(i + 1) / len(chunks), detections=total_detections, @@ -402,21 +439,28 @@ def run(self): self.save() objects = self.pipeline.save_results(results=results, job_id=self.pk) self.progress.update_stage( - saving_stage.key, + "results", status=JobState.STARTED, progress=(i + 1) / len(chunks), objects_created=len(objects), ) + self.progress.update_stage( + "collect", + proccessed=(i + 1) * CHUNK_SIZE, + remaining=image_count - (i + 1) * CHUNK_SIZE, + ) self.update_progress() self.save() self.progress.update_stage( - pipeline_stage.key, + "process", status=JobState.SUCCESS, + progress=1, ) self.progress.update_stage( - saving_stage.key, + "results", status=JobState.SUCCESS, + progress=1, ) self.update_status(JobState.SUCCESS) @@ -469,6 +513,10 @@ def update_progress(self, save=True): if not len(self.progress.stages): total_progress = 0 else: + for stage in self.progress.stages: + if stage.status == JobState.SUCCESS and stage.progress < 1: + # Update any stages that are complete but have a progress less than 1 + stage.progress = 1 total_progress = sum([stage.progress for stage in self.progress.stages]) / len(self.progress.stages) self.progress.summary.progress = total_progress diff --git a/ami/ml/models/pipeline.py b/ami/ml/models/pipeline.py index 8aafefdc3..64712f245 100644 --- a/ami/ml/models/pipeline.py +++ b/ami/ml/models/pipeline.py @@ -29,10 +29,12 @@ def collect_images( collection: SourceImageCollection | None = None, source_images: list[SourceImage] | None = None, deployment: Deployment | None = None, + job_id: int | None = None, ) -> typing.Iterable[SourceImage]: """ Collect images from a collection, a list of images or a deployment. """ + # Set source to first argument that is not None if collection: images = collection.images.all() elif source_images: @@ -42,6 +44,12 @@ def collect_images( else: raise ValueError("Must specify a collection, deployment or a list of images") + if job_id: + from ami.jobs.models import Job + + job = Job.objects.get(pk=job_id) + job.logger.info(f"Found {len(images)} images to process") + return images @@ -257,11 +265,13 @@ def collect_images( collection: SourceImageCollection | None = None, source_images: list[SourceImage] | None = None, deployment: Deployment | None = None, + job_id: int | None = None, ) -> typing.Iterable[SourceImage]: return collect_images( collection=collection, source_images=source_images, deployment=deployment, + job_id=job_id, ) def process_images(self, images: typing.Iterable[SourceImage], job_id: int | None = None):