Skip to content

Commit

Permalink
Add limit & shuffle params. Improve status logging.
Browse files Browse the repository at this point in the history
  • Loading branch information
mihow committed Nov 30, 2023
1 parent 65135bc commit 2297bad
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 24 deletions.
96 changes: 72 additions & 24 deletions ami/jobs/models.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import datetime
import logging
import random
import time
import typing

Expand Down Expand Up @@ -59,6 +60,11 @@ def get_status_label(status: JobState, progress: float) -> str:
return f"{status.name}"


def python_slugify(value: str) -> str:
# Use underscore instead of dash so we can use them as python property names
return slugify(value, allow_unicode=False).replace("-", "_")


ML_API_ENDPOINT = "http://host.docker.internal:2000/pipeline/process/"


Expand Down Expand Up @@ -99,7 +105,7 @@ class JobProgress(pydantic.BaseModel):

def add_stage(self, name: str) -> JobProgressStageDetail:
stage = JobProgressStageDetail(
key=slugify(name),
key=python_slugify(name),
name=name,
)
self.stages.append(stage)
Expand All @@ -122,15 +128,15 @@ def add_stage_param(self, stage_key: str, name: str, value: typing.Any = None) -
stage = self.get_stage(stage_key)
param = ConfigurableStageParam(
name=name,
key=slugify(name),
key=python_slugify(name),
value=value,
)
stage.params.append(param)
return param

def add_or_update_stage_param(self, stage_key: str, name: str, value: typing.Any = None) -> ConfigurableStageParam:
try:
param = self.get_stage_param(stage_key, slugify(name))
param = self.get_stage_param(stage_key, python_slugify(name))
param.value = value
return param
except ValueError:
Expand All @@ -143,6 +149,7 @@ def update_stage(self, stage_key: str, **stage_parameters) -> JobProgressStageDe
Will update parameters that are direct attributes of the stage,
or parameters that are in the stage's params list.
"""
stage_key = python_slugify(stage_key) # Allow both title or key to be used for lookup
stage = self.get_stage(stage_key)

if stage.key == stage_key:
Expand Down Expand Up @@ -244,6 +251,10 @@ class Job(BaseModel):
result = models.JSONField(null=True, blank=True)
task_id = models.CharField(max_length=255, null=True, blank=True)
delay = models.IntegerField("Delay in seconds", default=0, help_text="Delay before running the job")
limit = models.IntegerField(
"Limit", null=True, blank=True, default=100, help_text="Limit the number of images to process"
)
shuffle = models.BooleanField("Shuffle", default=True, help_text="Process images in a random order")

project = models.ForeignKey(
Project,
Expand Down Expand Up @@ -302,9 +313,24 @@ def setup(self, save=True):
self.progress = self.progress or default_job_progress

if self.delay:
self.progress.add_stage("Delay")
self.progress.add_stage_param("delay", "Delay", self.delay)
self.progress.add_stage_param("delay", "Mood", "😴")
delay_stage = self.progress.add_stage("Delay")
self.progress.add_stage_param(delay_stage.key, "Delay", self.delay)
self.progress.add_stage_param(delay_stage.key, "Mood", "😴")

if self.pipeline:
collect_stage = self.progress.add_stage("Collect")
self.progress.add_stage_param(collect_stage.key, "Total Images", "")
self.progress.add_stage_param(collect_stage.key, "Proccessed", "")
self.progress.add_stage_param(collect_stage.key, "Remaining", "")

pipeline_stage = self.progress.add_stage("Process")
self.progress.add_stage_param(pipeline_stage.key, "Detections", "")
self.progress.add_stage_param(pipeline_stage.key, "Classifications", "")

saving_stage = self.progress.add_stage("Results")
self.progress.add_stage_param(saving_stage.key, "Objects created", "")

self.save()

if save:
self.save()
Expand Down Expand Up @@ -347,37 +373,48 @@ def run(self):
self.save()

if self.pipeline:
pipeline_stage = self.progress.add_stage("Pipeline")
self.progress.add_stage_param(pipeline_stage.key, "Detections", "")
self.progress.add_stage_param(pipeline_stage.key, "Classifications", "")

saving_stage = self.progress.add_stage("Saving results")
self.progress.add_stage_param(saving_stage.key, "objects_created", "")

self.progress.update_stage(
pipeline_stage.key,
"collect",
status=JobState.STARTED,
progress=0,
)
self.save()

TEMPORARY_MAX_IMAGES = 100
images = list(
# @TODO return generator plus image count
# @TODO pass to celery group chain?
self.pipeline.collect_images(
collection=self.source_image_collection,
deployment=self.deployment,
source_images=[self.source_image_single] if self.source_image_single else None,
job_id=self.pk,
# shuffle=self.shuffle,
)
)
image_count = len(images)
image_count_label = f"{image_count}"

if self.shuffle:
self.logger.info("Shuffling images")
random.shuffle(images)

if image_count > TEMPORARY_MAX_IMAGES:
self.logger.warn(f"Limiting number of images to {TEMPORARY_MAX_IMAGES}")
images = images[:TEMPORARY_MAX_IMAGES]
# @TODO remove this temporary limit
TEMPORARY_LIMIT = 200
self.limit = self.limit or TEMPORARY_LIMIT

self.progress.add_stage_param(pipeline_stage.key, "Source images", image_count)
if self.limit and image_count > self.limit:
self.logger.warn(f"Limiting number of images to {self.limit} (out of {image_count})")
images = images[: self.limit]
image_count = len(images)
image_count_label = f"{image_count} (out of {len(images)})"

self.progress.update_stage(
"collect",
status=JobState.SUCCESS,
progress=1,
total=image_count_label,
proccessed=0,
remaining=image_count,
)

total_detections = 0
total_classifications = 0
Expand All @@ -393,7 +430,7 @@ def run(self):
total_detections += len(results.detections)
total_classifications += len(results.classifications)
self.progress.update_stage(
pipeline_stage.key,
"process",
status=JobState.STARTED,
progress=(i + 1) / len(chunks),
detections=total_detections,
Expand All @@ -402,21 +439,28 @@ def run(self):
self.save()
objects = self.pipeline.save_results(results=results, job_id=self.pk)
self.progress.update_stage(
saving_stage.key,
"results",
status=JobState.STARTED,
progress=(i + 1) / len(chunks),
objects_created=len(objects),
)
self.progress.update_stage(
"collect",
proccessed=(i + 1) * CHUNK_SIZE,
remaining=image_count - (i + 1) * CHUNK_SIZE,
)
self.update_progress()
self.save()

self.progress.update_stage(
pipeline_stage.key,
"process",
status=JobState.SUCCESS,
progress=1,
)
self.progress.update_stage(
saving_stage.key,
"results",
status=JobState.SUCCESS,
progress=1,
)

self.update_status(JobState.SUCCESS)
Expand Down Expand Up @@ -469,6 +513,10 @@ def update_progress(self, save=True):
if not len(self.progress.stages):
total_progress = 0
else:
for stage in self.progress.stages:
if stage.status == JobState.SUCCESS and stage.progress < 1:
# Update any stages that are complete but have a progress less than 1
stage.progress = 1
total_progress = sum([stage.progress for stage in self.progress.stages]) / len(self.progress.stages)

self.progress.summary.progress = total_progress
Expand Down
10 changes: 10 additions & 0 deletions ami/ml/models/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ def collect_images(
collection: SourceImageCollection | None = None,
source_images: list[SourceImage] | None = None,
deployment: Deployment | None = None,
job_id: int | None = None,
) -> typing.Iterable[SourceImage]:
"""
Collect images from a collection, a list of images or a deployment.
"""
# Set source to first argument that is not None
if collection:
images = collection.images.all()
elif source_images:
Expand All @@ -42,6 +44,12 @@ def collect_images(
else:
raise ValueError("Must specify a collection, deployment or a list of images")

if job_id:
from ami.jobs.models import Job

job = Job.objects.get(pk=job_id)
job.logger.info(f"Found {len(images)} images to process")

return images


Expand Down Expand Up @@ -257,11 +265,13 @@ def collect_images(
collection: SourceImageCollection | None = None,
source_images: list[SourceImage] | None = None,
deployment: Deployment | None = None,
job_id: int | None = None,
) -> typing.Iterable[SourceImage]:
return collect_images(
collection=collection,
source_images=source_images,
deployment=deployment,
job_id=job_id,
)

def process_images(self, images: typing.Iterable[SourceImage], job_id: int | None = None):
Expand Down

0 comments on commit 2297bad

Please sign in to comment.