Skip to content

Commit

Permalink
Error handling for jobs (#368)
Browse files Browse the repository at this point in the history
* Management command for updating stale jobs

* Hide failed jobs after 3 days, by default

* Continue if a batch of images fails
  • Loading branch information
mihow authored Mar 25, 2024
1 parent b78d229 commit 66c6736
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 5 deletions.
38 changes: 38 additions & 0 deletions ami/jobs/management/commands/update_stale_jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from celery import states
from celery.result import AsyncResult
from django.core.management.base import BaseCommand
from django.utils import timezone

from ami.jobs.models import Job, JobState


class Command(BaseCommand):
help = (
"Update the status of all jobs that are not in a final state " "and have not been updated in the last X hours."
)

# Add argument for the number of hours to consider a job stale
def add_arguments(self, parser):
parser.add_argument(
"--hours",
type=int,
default=Job.FAILED_CUTOFF_HOURS,
help="Number of hours to consider a job stale",
)

def handle(self, *args, **options):
stale_jobs = Job.objects.filter(
status__in=JobState.running_states(),
updated_at__lt=timezone.now() - timezone.timedelta(hours=options["hours"]),
)

for job in stale_jobs:
task = AsyncResult(job.task_id) if job.task_id else None
if task:
job.update_status(task.state, save=False)
job.save()
self.stdout.write(self.style.SUCCESS(f"Updated status of job {job.pk} to {task.state}"))
else:
self.stdout.write(self.style.WARNING(f"Job {job.pk} has no associated task, setting status to FAILED"))
job.update_status(states.FAILURE, save=False)
job.save()
29 changes: 25 additions & 4 deletions ami/jobs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,18 @@ class JobState(str, OrderedEnum):
RECEIVED = "RECEIVED"
UNKNOWN = "UNKNOWN"

@classmethod
def running_states(cls):
return [cls.CREATED, cls.PENDING, cls.STARTED, cls.RETRY, cls.CANCELING, cls.UNKNOWN]

@classmethod
def final_states(cls):
return [cls.SUCCESS, cls.FAILURE, cls.REVOKED]

@classmethod
def failed_states(cls):
return [cls.FAILURE, cls.REVOKED, cls.UNKNOWN]


def get_status_label(status: JobState, progress: float) -> str:
"""
Expand Down Expand Up @@ -240,6 +252,9 @@ def emit(self, record):
class Job(BaseModel):
"""A job to be run by the scheduler"""

# Hide old failed jobs after 3 days
FAILED_CUTOFF_HOURS = 24 * 3

name = models.CharField(max_length=255)
queue = models.CharField(max_length=255, default="default")
scheduled_at = models.DateTimeField(null=True, blank=True)
Expand Down Expand Up @@ -421,10 +436,16 @@ def run(self):
chunks = [images[i : i + CHUNK_SIZE] for i in range(0, image_count, CHUNK_SIZE)] # noqa

for i, chunk in enumerate(chunks):
results = self.pipeline.process_images(
images=chunk,
job_id=self.pk,
)
try:
results = self.pipeline.process_images(
images=chunk,
job_id=self.pk,
)
except Exception as e:
# Log error about image batch and continue
self.logger.error(f"Failed to process image batch {i} of {len(chunks)}: {e}")
continue

total_detections += len(results.detections)
total_classifications += len(results.classifications)
self.progress.update_stage(
Expand Down
18 changes: 17 additions & 1 deletion ami/jobs/views.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import logging

from django.db.models.query import QuerySet
from django.forms import IntegerField
from django.utils import timezone
from rest_framework.decorators import action
from rest_framework.response import Response

from ami.main.api.views import DefaultViewSet
from ami.utils.fields import url_boolean_param

from .models import Job
from .models import Job, JobState
from .serializers import JobListSerializer, JobSerializer

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -101,3 +104,16 @@ def perform_create(self, serializer):
if url_boolean_param(self.request, "start_now", default=False):
# job.run()
job.enqueue()

def get_queryset(self) -> QuerySet:
jobs = super().get_queryset()

cutoff_hours = IntegerField(required=False, min_value=0).clean(
self.request.query_params.get("cutoff_hours", Job.FAILED_CUTOFF_HOURS)
)
# Filter out completed jobs that have not been updated in the last X hours
cutoff_datetime = timezone.now() - timezone.timedelta(hours=cutoff_hours)
return jobs.exclude(
status=JobState.failed_states(),
updated_at__lt=cutoff_datetime,
)

0 comments on commit 66c6736

Please sign in to comment.