From 89ae11c6a228e50d134688b7a836f8412f0f06ed Mon Sep 17 00:00:00 2001 From: Nathan Brake <33383515+njbrake@users.noreply.github.com> Date: Wed, 29 Jan 2025 11:05:33 -0500 Subject: [PATCH] Created the concept of "workflows", "experiments_new" route split between "workflows" and "experiments" (#728) Signed-off-by: Nathan Brake <33383515+njbrake@users.noreply.github.com> Co-authored-by: Peter Wilson --- .../python/mzai/backend/backend/api/deps.py | 11 + .../python/mzai/backend/backend/api/router.py | 13 +- .../backend/backend/api/routes/experiments.py | 57 ++++- .../backend/api/routes/experiments_new.py | 78 ------ .../backend/backend/api/routes/workflows.py | 79 +++++++ .../python/mzai/backend/backend/api/tags.py | 6 +- lumigator/python/mzai/backend/backend/main.py | 2 +- .../backend/backend/services/experiments.py | 122 +--------- .../mzai/backend/backend/services/jobs.py | 1 - .../backend/backend/services/workflows.py | 223 ++++++++++++++++++ .../mzai/backend/backend/tests/conftest.py | 2 +- .../api/routes/test_api_workflows.py | 39 +-- .../schemas/lumigator_schemas/experiments.py | 5 + .../mzai/schemas/lumigator_schemas/jobs.py | 6 + .../schemas/lumigator_schemas/workflows.py | 44 ++++ 15 files changed, 460 insertions(+), 228 deletions(-) delete mode 100644 lumigator/python/mzai/backend/backend/api/routes/experiments_new.py create mode 100644 lumigator/python/mzai/backend/backend/api/routes/workflows.py create mode 100644 lumigator/python/mzai/backend/backend/services/workflows.py create mode 100644 lumigator/python/mzai/schemas/lumigator_schemas/workflows.py diff --git a/lumigator/python/mzai/backend/backend/api/deps.py b/lumigator/python/mzai/backend/backend/api/deps.py index 2e5844797..e54b9fd27 100644 --- a/lumigator/python/mzai/backend/backend/api/deps.py +++ b/lumigator/python/mzai/backend/backend/api/deps.py @@ -16,6 +16,7 @@ from backend.services.datasets import DatasetService from backend.services.experiments import ExperimentService from backend.services.jobs import JobService +from backend.services.workflows import WorkflowService from backend.settings import settings @@ -74,6 +75,16 @@ def get_experiment_service( ExperimentServiceDep = Annotated[ExperimentService, Depends(get_experiment_service)] +def get_workflow_service( + session: DBSessionDep, job_service: JobServiceDep, dataset_service: DatasetServiceDep +) -> WorkflowService: + job_repo = JobRepository(session) + return WorkflowService(job_repo, job_service, dataset_service) + + +WorkflowServiceDep = Annotated[WorkflowService, Depends(get_workflow_service)] + + def get_mistral_completion_service() -> MistralCompletionService: return MistralCompletionService() diff --git a/lumigator/python/mzai/backend/backend/api/router.py b/lumigator/python/mzai/backend/backend/api/router.py index d5d7596a8..e7764b15a 100644 --- a/lumigator/python/mzai/backend/backend/api/router.py +++ b/lumigator/python/mzai/backend/backend/api/router.py @@ -1,14 +1,6 @@ from fastapi import APIRouter -from backend.api.routes import ( - completions, - datasets, - experiments, - experiments_new, - health, - jobs, - models, -) +from backend.api.routes import completions, datasets, experiments, health, jobs, models, workflows from backend.api.tags import Tags API_V1_PREFIX = "/api/v1" @@ -20,6 +12,7 @@ api_router.include_router(experiments.router, prefix="/experiments", tags=[Tags.EXPERIMENTS]) api_router.include_router(completions.router, prefix="/completions", tags=[Tags.COMPLETIONS]) api_router.include_router(models.router, prefix="/models", tags=[Tags.MODELS]) +# TODO: Workflows route is not yet ready so it is excluded from the OpenAPI schema api_router.include_router( - experiments_new.router, prefix="/experiments_new", tags=[Tags.EXPERIMENTS_NEW] + workflows.router, prefix="/workflows", tags=[Tags.WORKFLOWS], include_in_schema=False ) diff --git a/lumigator/python/mzai/backend/backend/api/routes/experiments.py b/lumigator/python/mzai/backend/backend/api/routes/experiments.py index b3eaf3f99..b66a2d985 100644 --- a/lumigator/python/mzai/backend/backend/api/routes/experiments.py +++ b/lumigator/python/mzai/backend/backend/api/routes/experiments.py @@ -1,8 +1,10 @@ +from http import HTTPStatus from uuid import UUID from fastapi import APIRouter, BackgroundTasks, status from lumigator_schemas.experiments import ( ExperimentCreate, + ExperimentIdCreate, ExperimentResponse, ExperimentResultDownloadResponse, ExperimentResultResponse, @@ -12,11 +14,19 @@ JobEvalCreate, ) -from backend.api.deps import JobServiceDep +from backend.api.deps import ExperimentServiceDep, JobServiceDep +from backend.services.exceptions.base_exceptions import ServiceError +from backend.services.exceptions.experiment_exceptions import ExperimentNotFoundError router = APIRouter() +def experiment_exception_mappings() -> dict[type[ServiceError], HTTPStatus]: + return { + ExperimentNotFoundError: status.HTTP_404_NOT_FOUND, + } + + @router.post("/", status_code=status.HTTP_201_CREATED) def create_experiment( service: JobServiceDep, request: ExperimentCreate, background_tasks: BackgroundTasks @@ -60,3 +70,48 @@ def get_experiment_result_download( return ExperimentResultDownloadResponse.model_validate( service.get_job_result_download(experiment_id).model_dump() ) + + +#################################################################################################### +# "new" routes +#################################################################################################### +# These "experiments_new" routes are not yet ready to be exposed in the OpenAPI schema, +# because it is designed for the API when 'workflows' are supported +# TODO: Eventually this route will become the / route, +# but right now it is a placeholder while we build up the Workflows routes +# It's not included in the OpenAPI schema for now so it's not visible in the docs +@router.post("/new", status_code=status.HTTP_201_CREATED, include_in_schema=False) +def create_experiment_id( + service: ExperimentServiceDep, request: ExperimentIdCreate +) -> ExperimentResponse: + """Create an experiment ID.""" + return ExperimentResponse.model_validate(service.create_experiment(request).model_dump()) + + +# TODO: FIXME this should not need the /all suffix. +# See further discussion https://github.com/mozilla-ai/lumigator/pull/728/files/2c960962c365d72e0714a16333884f0f209d214e#r1932176937 +@router.get("/new/all", include_in_schema=False) +def list_experiments_new( + service: ExperimentServiceDep, + skip: int = 0, + limit: int = 100, +) -> ListingResponse[ExperimentResponse]: + """List all experiments.""" + return ListingResponse[ExperimentResponse].model_validate( + service.list_experiments(skip, limit).model_dump() + ) + + +@router.get("/new/{experiment_id}", include_in_schema=False) +def get_experiment_new(service: ExperimentServiceDep, experiment_id: UUID) -> ExperimentResponse: + """Get an experiment by ID.""" + return ExperimentResponse.model_validate(service.get_experiment(experiment_id).model_dump()) + + +@router.get("/new/{experiment_id}/workflows", include_in_schema=False) +def get_workflows(service: ExperimentServiceDep, experiment_id: UUID) -> ListingResponse[UUID]: + """TODO: this endpoint should handle passing in an experiment id and the returning a list + of all the workflows associated with that experiment. Until workflows are stored and associated + with experiments, this is not yet implemented. + """ + raise NotImplementedError diff --git a/lumigator/python/mzai/backend/backend/api/routes/experiments_new.py b/lumigator/python/mzai/backend/backend/api/routes/experiments_new.py deleted file mode 100644 index 6ad944f4d..000000000 --- a/lumigator/python/mzai/backend/backend/api/routes/experiments_new.py +++ /dev/null @@ -1,78 +0,0 @@ -from http import HTTPStatus -from uuid import UUID - -from fastapi import APIRouter, BackgroundTasks, status -from lumigator_schemas.experiments import ( - ExperimentCreate, - ExperimentResponse, - ExperimentResultDownloadResponse, - ExperimentResultResponse, -) -from lumigator_schemas.extras import ListingResponse -from lumigator_schemas.jobs import JobResponse - -from backend.api.deps import ExperimentServiceDep, JobServiceDep -from backend.services.exceptions.base_exceptions import ServiceError -from backend.services.exceptions.experiment_exceptions import ExperimentNotFoundError - -router = APIRouter() - - -def experiment_exception_mappings() -> dict[type[ServiceError], HTTPStatus]: - return { - ExperimentNotFoundError: status.HTTP_404_NOT_FOUND, - } - - -@router.post("/", status_code=status.HTTP_201_CREATED) -async def create_experiment( - service: ExperimentServiceDep, request: ExperimentCreate, background_tasks: BackgroundTasks -) -> ExperimentResponse: - return ExperimentResponse.model_validate(service.create_experiment(request, background_tasks)) - - -@router.get("/{experiment_id}") -def get_experiment(service: ExperimentServiceDep, experiment_id: UUID) -> ExperimentResponse: - return ExperimentResponse.model_validate(service.get_experiment(experiment_id).model_dump()) - - -@router.get("/{experiment_id}/jobs") -def get_experiment_jobs( - service: ExperimentServiceDep, experiment_id: UUID -) -> ListingResponse[JobResponse]: - return ListingResponse[JobResponse].model_validate( - service._get_experiment_jobs(experiment_id).model_dump() - ) - - -@router.get("/") -def list_experiments( - service: ExperimentServiceDep, - skip: int = 0, - limit: int = 100, -) -> ListingResponse[ExperimentResponse]: - return ListingResponse[ExperimentResponse].model_validate( - service.list_experiments(skip, limit).model_dump() - ) - - -@router.get("/{experiment_id}/result") -def get_experiment_result( - service: JobServiceDep, - experiment_id: UUID, -) -> ExperimentResultResponse: - """Return experiment results metadata if available in the DB.""" - return ExperimentResultResponse.model_validate( - service.get_job_result(experiment_id).model_dump() - ) - - -@router.get("/{experiment_id}/result/download") -def get_experiment_result_download( - service: ExperimentServiceDep, - experiment_id: UUID, -) -> ExperimentResultDownloadResponse: - """Return experiment results file URL for downloading.""" - return ExperimentResultDownloadResponse.model_validate( - service.get_experiment_result_download(experiment_id).model_dump() - ) diff --git a/lumigator/python/mzai/backend/backend/api/routes/workflows.py b/lumigator/python/mzai/backend/backend/api/routes/workflows.py new file mode 100644 index 000000000..54a9df9fa --- /dev/null +++ b/lumigator/python/mzai/backend/backend/api/routes/workflows.py @@ -0,0 +1,79 @@ +from uuid import UUID + +from fastapi import APIRouter, BackgroundTasks, status +from lumigator_schemas.extras import ListingResponse +from lumigator_schemas.jobs import JobResponse +from lumigator_schemas.workflows import ( + WorkflowCreate, + WorkflowDetailsResponse, + WorkflowResponse, + WorkflowResultDownloadResponse, +) + +from backend.api.deps import WorkflowServiceDep + +router = APIRouter() + + +@router.post("/", status_code=status.HTTP_201_CREATED) +async def create_workflow( + service: WorkflowServiceDep, request: WorkflowCreate, background_tasks: BackgroundTasks +) -> WorkflowResponse: + """A workflow is a single execution for an experiment. + A workflow is a collection of 1 or more jobs. + It must be associated with an experiment id, + which means you must already have created an experiment and have that ID in the request. + """ + return WorkflowResponse.model_validate(service.create_workflow(request, background_tasks)) + + +@router.get("/{workflow_id}") +def get_workflow(service: WorkflowServiceDep, workflow_id: UUID) -> WorkflowResponse: + """TODO: The workflow objects are currently not saved in the database so it can't be retrieved. + In order to get all the info about a workflow, + you need to get all the jobs for an experiment and make some decisions about how to use them. + This means you can't yet easily compile a list of all workflows for an experiment. + """ + raise NotImplementedError + + +# TODO: currently experiment_id=workflow_id, but this will change +@router.get("/{experiment_id}/jobs", include_in_schema=False) +def get_workflow_jobs( + service: WorkflowServiceDep, experiment_id: UUID +) -> ListingResponse[JobResponse]: + """Get all jobs for a workflow. + TODO: this will likely eventually be merged with the get_workflow endpoint, once implemented + """ + # TODO right now this command expects that the workflow_id is the same as the experiment_id + return ListingResponse[JobResponse].model_validate( + service.get_workflow_jobs(experiment_id).model_dump() + ) + + +@router.get("/{workflow_id}/details") +def get_workflow_details( + service: WorkflowServiceDep, + workflow_id: UUID, +) -> WorkflowDetailsResponse: + """TODO:Return the results metadata for a run if available in the DB. + This should retrieve the metadata for the job or jobs that were run in the workflow and compile + them into a single response that can be used to populate the UI. + Currently this looks like taking the average results for the + inference job (tok/s, gen length, etc) and the + average results for the evaluation job (ROUGE, BLEU, etc) and + returning them in a single response. + For detailed results you would want to use the get_workflow_details endpoint. + """ + raise NotImplementedError + + +@router.get("/{workflow_id}/details") +def get_experiment_result_download( + service: WorkflowServiceDep, + workflow_id: UUID, +) -> WorkflowResultDownloadResponse: + """Return experiment results file URL for downloading.""" + return WorkflowResultDownloadResponse.model_validate( + service.get_workflow_result_download(workflow_id).model_dump() + ) diff --git a/lumigator/python/mzai/backend/backend/api/tags.py b/lumigator/python/mzai/backend/backend/api/tags.py index 5531e7c22..74d824d93 100644 --- a/lumigator/python/mzai/backend/backend/api/tags.py +++ b/lumigator/python/mzai/backend/backend/api/tags.py @@ -7,7 +7,7 @@ class Tags(str, Enum): JOBS = "jobs" COMPLETIONS = "completions" EXPERIMENTS = "experiments" - EXPERIMENTS_NEW = "experiments_new" + WORKFLOWS = "workflows" MODELS = "models" @@ -25,8 +25,8 @@ class Tags(str, Enum): "description": "Create and manage experiments.", }, { - "name": Tags.EXPERIMENTS_NEW, - "description": "Create and manage experiments (new).", + "name": Tags.WORKFLOWS, + "description": "Create and manage workflows.", }, { "name": Tags.JOBS, diff --git a/lumigator/python/mzai/backend/backend/main.py b/lumigator/python/mzai/backend/backend/main.py index 0005b261a..19e98d961 100644 --- a/lumigator/python/mzai/backend/backend/main.py +++ b/lumigator/python/mzai/backend/backend/main.py @@ -15,7 +15,7 @@ from backend.api.router import api_router from backend.api.routes.completions import completion_exception_mappings from backend.api.routes.datasets import dataset_exception_mappings -from backend.api.routes.experiments_new import experiment_exception_mappings +from backend.api.routes.experiments import experiment_exception_mappings from backend.api.routes.jobs import job_exception_mappings from backend.api.tags import TAGS_METADATA from backend.services.exceptions.base_exceptions import ServiceError diff --git a/lumigator/python/mzai/backend/backend/services/experiments.py b/lumigator/python/mzai/backend/backend/services/experiments.py index 07bff1ae8..94b1076c5 100644 --- a/lumigator/python/mzai/backend/backend/services/experiments.py +++ b/lumigator/python/mzai/backend/backend/services/experiments.py @@ -1,16 +1,14 @@ -import json from uuid import UUID import loguru -from fastapi import BackgroundTasks from lumigator_schemas.experiments import ( ExperimentCreate, ExperimentResponse, - ExperimentResultDownloadResponse, ) from lumigator_schemas.extras import ListingResponse -from lumigator_schemas.jobs import JobEvalLiteCreate, JobInferenceCreate, JobResponse, JobStatus -from s3fs import S3FileSystem +from lumigator_schemas.jobs import ( + JobStatus, +) from backend.records.jobs import JobRecord from backend.repositories.experiments import ExperimentRepository @@ -18,7 +16,6 @@ from backend.services.datasets import DatasetService from backend.services.exceptions.experiment_exceptions import ExperimentNotFoundError from backend.services.jobs import JobService -from backend.settings import settings class ExperimentService: @@ -41,78 +38,12 @@ def get_all_owned_jobs(self, experiment_id: UUID) -> ListingResponse[UUID]: jobs = [job.id for job in self._get_all_owned_jobs(experiment_id)] return ListingResponse[UUID].model_validate({"total": len(jobs), "items": jobs}) - def _run_eval( - self, - inference_job_id: UUID, - request: ExperimentCreate, - background_tasks: BackgroundTasks, - experiment_id: UUID = None, - ): - # use the inference job id to recover the dataset record - dataset_record = self._dataset_service.get_dataset_by_job_id(inference_job_id) - - # prepare the inputs for the evaluation job and pass the id of the new dataset - job_eval_dict = { - "name": f"{request.name}-evaluation", - "model": request.model, - "dataset": dataset_record.id, - "max_samples": request.max_samples, - "skip_inference": True, - } - - # submit the job - self._job_service.create_job( - JobEvalLiteCreate.model_validate(job_eval_dict), - background_tasks, - experiment_id=experiment_id, - ) - - def create_experiment( - self, request: ExperimentCreate, background_tasks: BackgroundTasks - ) -> ExperimentResponse: - # The FastAPI BackgroundTasks object is used to run a function in the background. - # It is a wrapper around Starlette's BackgroundTasks object. - # A background task should be attached to a response, - # and will run only once the response has been sent. - # See here: https://www.starlette.io/background/ - + def create_experiment(self, request: ExperimentCreate) -> ExperimentResponse: experiment_record = self._experiment_repo.create( name=request.name, description=request.description ) loguru.logger.info(f"Created experiment '{request.name}' with ID '{experiment_record.id}'.") - # input is ExperimentCreate, we need to split the configs and generate one - # JobInferenceCreate and one JobEvalCreate - job_inference_dict = { - "name": f"{request.name}-inference", - "model": request.model, - "dataset": request.dataset, - "max_samples": request.max_samples, - "model_url": request.model_url, - "output_field": request.inference_output_field, - "system_prompt": request.system_prompt, - "store_to_dataset": True, - } - - # submit inference job first - job_response = self._job_service.create_job( - JobInferenceCreate.model_validate(job_inference_dict), - background_tasks, - experiment_id=experiment_record.id, - ) - - # run evaluation job afterwards - # (NOTE: tasks in starlette are executed sequentially: https://www.starlette.io/background/) - background_tasks.add_task( - self._job_service.on_job_complete, - job_response.id, - self._run_eval, - job_response.id, - request, - background_tasks, - experiment_record.id, - ) - return ExperimentResponse.model_validate(experiment_record) # TODO Move this into a "composite job" impl @@ -142,51 +73,6 @@ def get_experiment(self, experiment_id: UUID) -> ExperimentResponse: return ExperimentResponse.model_validate(record) - def _get_experiment_jobs(self, experiment_id: UUID) -> ListingResponse[JobResponse]: - records = self._job_repo.get_by_experiment_id(experiment_id) - return ListingResponse( - total=len(records), - items=[JobResponse.model_validate(x) for x in records], - ) - - def get_experiment_result_download( - self, experiment_id: UUID - ) -> ExperimentResultDownloadResponse: - """Return experiment results file URL for downloading.""" - s3 = S3FileSystem() - # get jobs matching this experiment - # have a query returning a list of (two) jobs, one inference and one eval, - # matching the current experiment id. Note that each job has its own type baked in - # (per https://github.com/mozilla-ai/lumigator/pull/576) - jobs = self._get_experiment_jobs(experiment_id) - - # iterate on jobs and depending on which job this is, import selected fields - results = {} - - for job in jobs: - # Get the dataset from the S3 bucket - result_key = self._job_service._get_results_s3_key(job.id) - with s3.open(f"{settings.S3_BUCKET}/{result_key}", "r") as f: - job_results = json.loads(f.read()) - - # we just merge the two dictionaries for now - results.update(job_results) - - loguru.logger.error(results) - - # Generate presigned download URL for the object - result_key = self._job_service._get_results_s3_key(experiment_id) - download_url = self._dataset_service.s3_client.generate_presigned_url( - "get_object", - Params={ - "Bucket": settings.S3_BUCKET, - "Key": result_key, - }, - ExpiresIn=settings.S3_URL_EXPIRATION, - ) - - return ExperimentResultDownloadResponse(id=experiment_id, download_url=download_url) - def list_experiments( self, skip: int = 0, limit: int = 100 ) -> ListingResponse[ExperimentResponse]: diff --git a/lumigator/python/mzai/backend/backend/services/jobs.py b/lumigator/python/mzai/backend/backend/services/jobs.py index 94b005afa..f0ae6f46d 100644 --- a/lumigator/python/mzai/backend/backend/services/jobs.py +++ b/lumigator/python/mzai/backend/backend/services/jobs.py @@ -250,7 +250,6 @@ def get_upstream_job_status(self, job_id: UUID) -> str: async def on_job_complete(self, job_id: UUID, task: Callable = None, *args): """Watches a submitted job and, when it terminates successfully, runs a given task. - Inputs: - job_id: the UUID of the job to watch - task: the function to be called after the job completes successfully diff --git a/lumigator/python/mzai/backend/backend/services/workflows.py b/lumigator/python/mzai/backend/backend/services/workflows.py new file mode 100644 index 000000000..a39176263 --- /dev/null +++ b/lumigator/python/mzai/backend/backend/services/workflows.py @@ -0,0 +1,223 @@ +import asyncio +import datetime +import json +import uuid +from collections.abc import Callable +from uuid import UUID + +import loguru +from fastapi import BackgroundTasks +from lumigator_schemas.extras import ListingResponse +from lumigator_schemas.jobs import ( + JobEvalLiteCreate, + JobInferenceCreate, + JobResponse, + JobStatus, +) +from lumigator_schemas.workflows import ( + WorkflowCreate, + WorkflowResponse, + WorkflowResultDownloadResponse, +) +from s3fs import S3FileSystem + +from backend.repositories.jobs import JobRepository +from backend.services.datasets import DatasetService +from backend.services.jobs import JobService +from backend.settings import settings + + +class WorkflowService: + def __init__( + self, + job_repo: JobRepository, + job_service: JobService, + dataset_service: DatasetService, + ): + self._job_repo = job_repo + self._job_service = job_service + self._dataset_service = dataset_service + + NON_TERMINAL_STATUS = [ + JobStatus.CREATED.value, + JobStatus.PENDING.value, + JobStatus.RUNNING.value, + ] + """list: A list of non-terminal job statuses.""" + + # TODO: rely on https://github.com/ray-project/ray/blob/7c2a200ef84f17418666dad43017a82f782596a3/python/ray/dashboard/modules/job/common.py#L53 + TERMINAL_STATUS = [JobStatus.FAILED.value, JobStatus.SUCCEEDED.value] + """list: A list of terminal job statuses.""" + + async def on_job_complete(self, job_id: UUID, task: Callable = None, *args): + """Watches a submitted job and, when it terminates successfully, runs a given task. + + Inputs: + - job_id: the UUID of the job to watch + - task: the function to be called after the job completes successfully + - args: the arguments to be passed to the function `task()` + """ + job_status = self._job_service.get_upstream_job_status(job_id) + + loguru.logger.info(f"Watching {job_id}") + while job_status not in self.TERMINAL_STATUS and job_status in self.NON_TERMINAL_STATUS: + await asyncio.sleep(5) + job_status = self._job_service.get_upstream_job_status(job_id) + + match job_status: + case JobStatus.FAILED.value: + loguru.logger.error(f"Job {job_id} failed: not running task {str(task)}") + case JobStatus.SUCCEEDED.value: + loguru.logger.info(f"Job {job_id} finished successfully.") + if task is not None: + task(*args) + case _: + # NOTE: Consider raising an exception here as we *really* don't expect + # anything other than FAILED or SUCCEEDED. + loguru.logger.error( + f"Job {job_id} has an unexpected status ({job_status}) that is not completed." + ) + + def _run_eval( + self, + inference_job_id: UUID, + request: WorkflowCreate, + background_tasks: BackgroundTasks, + experiment_id: UUID = None, + ): + # use the inference job id to recover the dataset record + dataset_record = self._dataset_service._get_dataset_record_by_job_id(inference_job_id) + + # prepare the inputs for the evaluation job and pass the id of the new dataset + job_eval_dict = { + "name": f"{request.name}-evaluation", + "model": request.model, + "dataset": dataset_record.id, + "max_samples": request.max_samples, + "skip_inference": True, + } + + # submit the job + self._job_service.create_job( + JobEvalLiteCreate.model_validate(job_eval_dict), + background_tasks, + experiment_id=experiment_id, + ) + + def create_workflow( + self, request: WorkflowCreate, background_tasks: BackgroundTasks + ) -> WorkflowResponse: + """Creates a new workflow and submits inference and evaluation jobs. + + Args: + request (WorkflowCreate): The request object containing the workflow configuration. + background_tasks (BackgroundTasks): The background tasks manager for scheduling tasks. + + Returns: + WorkflowResponse: The response object containing the details of the created workflow. + """ + loguru.logger.info( + f"Creating workflow '{request.name}' for experiment ID '{request.experiment_id}'." + ) + + # input is WorkflowCreate, we need to split the configs and generate one + # JobInferenceCreate and one JobEvalCreate + job_inference_dict = { + "name": f"{request.name}-inference", + "model": request.model, + "dataset": request.dataset, + "max_samples": request.max_samples, + "model_url": request.model_url, + "output_field": request.inference_output_field, + "system_prompt": request.system_prompt, + "store_to_dataset": True, + } + + # submit inference job first + job_response = self._job_service.create_job( + JobInferenceCreate.model_validate(job_inference_dict), + background_tasks, + experiment_id=request.experiment_id, + ) + + # run evaluation job afterwards + # (NOTE: tasks in starlette are executed sequentially: https://www.starlette.io/background/) + background_tasks.add_task( + self.on_job_complete, + job_response.id, + self._run_eval, + job_response.id, + request, + background_tasks, + request.experiment_id, + ) + + # TODO create a new workflow object which will be stored in + # by the tracking service (aka mlflow) + created_at = datetime.datetime.now() + # TODO right now this workflow_record is not related to the experiment, + # but the 2 jobs created by the workflow are both associated with the experiment, + # which is how we'll retrieve them until we + # have implemented the association of workflows with experiments + workflow_record = { + "id": uuid.uuid4(), + "experiment_id": request.experiment_id, + "name": request.name, + "description": request.description, + "created_at": created_at, + "updated_at": created_at, + } + + # TODO: This part will need to be refactored more: + # once all the jobs are done, the last + # step is to created two jobs inside the workflow_record + # which store the inference and evaluation job output info + # on_job_complete_store_in_tracking_service().... + workflow_record["status"] = JobStatus.CREATED + + return WorkflowResponse.model_validate(workflow_record) + + # TODO: until we have implemented the association of workflows with experiments, + # everything continues to be indexed by experiment_id + def get_workflow_jobs(self, experiment_id: UUID) -> ListingResponse[JobResponse]: + records = self._job_repo.get_by_experiment_id(experiment_id) + return ListingResponse( + total=len(records), + items=[JobResponse.model_validate(x) for x in records], + ) + + def get_workflow_result_download(self, experiment_id: UUID) -> WorkflowResultDownloadResponse: + """Return experiment results file URL for downloading.""" + s3 = S3FileSystem() + # get jobs matching this experiment + # have a query returning a list of (two) jobs, one inference and one eval, + # matching the current experiment id. Note that each job has its own type baked in + # (per https://github.com/mozilla-ai/lumigator/pull/576) + jobs = self.get_workflow_jobs(experiment_id) + + # iterate on jobs and depending on which job this is, import selected fields + results = {} + + for job in jobs: + # Get the dataset from the S3 bucket + result_key = self._job_service._get_results_s3_key(job.id) + with s3.open(f"{settings.S3_BUCKET}/{result_key}", "r") as f: + job_results = json.loads(f.read()) + + # we just merge the two dictionaries for now + results.update(job_results) + + loguru.logger.error(results) + + # Generate presigned download URL for the object + result_key = self._job_service._get_results_s3_key(experiment_id) + download_url = self._dataset_service.s3_client.generate_presigned_url( + "get_object", + Params={ + "Bucket": settings.S3_BUCKET, + "Key": result_key, + }, + ExpiresIn=settings.S3_URL_EXPIRATION, + ) + + return WorkflowResultDownloadResponse(id=experiment_id, download_url=download_url) diff --git a/lumigator/python/mzai/backend/backend/tests/conftest.py b/lumigator/python/mzai/backend/backend/tests/conftest.py index c654ee51c..d84cda9a7 100644 --- a/lumigator/python/mzai/backend/backend/tests/conftest.py +++ b/lumigator/python/mzai/backend/backend/tests/conftest.py @@ -95,7 +95,7 @@ def wait_for_experiment(client, experiment_id: UUID) -> bool: succeeded = False timed_out = True for _ in range(1, MAX_POLLS): - get_experiment_response = client.get(f"/experiments_new/{experiment_id}") + get_experiment_response = client.get(f"/experiments/new/{experiment_id}") assert get_experiment_response.status_code == 200 get_experiment_response_model = ExperimentResponse.model_validate(get_experiment_response.json()) if get_experiment_response_model.status == JobStatus.SUCCEEDED.value: diff --git a/lumigator/python/mzai/backend/backend/tests/integration/api/routes/test_api_workflows.py b/lumigator/python/mzai/backend/backend/tests/integration/api/routes/test_api_workflows.py index aec43ad8e..c9f91cafa 100644 --- a/lumigator/python/mzai/backend/backend/tests/integration/api/routes/test_api_workflows.py +++ b/lumigator/python/mzai/backend/backend/tests/integration/api/routes/test_api_workflows.py @@ -225,36 +225,45 @@ def test_full_experiment_launch( "Content-Type": "application/json", } payload = { - "name": "test_run_hugging_face", - "description": "Test run for Huggingface model", - "model": TEST_CAUSAL_MODEL, - "dataset": str(created_dataset.id), - "max_samples": 2, + "name": "test_experiment", + "description": "Test experiment for Huggingface models", } get_ds_response = local_client.get("/datasets/") assert get_ds_response.status_code == 200 get_ds = ListingResponse[DatasetResponse].model_validate(get_ds_response.json()) - create_experiments_response = local_client.post( - "/experiments_new/", headers=headers, json=payload + create_experiments_id_response = local_client.post( + "/experiments/new/", headers=headers, json=payload ) - assert create_experiments_response.status_code == 201 + assert create_experiments_id_response.status_code == 201 + experiment_id = create_experiments_id_response.json()["id"] - get_experiments_response = local_client.get("/experiments_new/") + # run a workflow for that experiment + payload = { + "name": "test_run_hugging_face", + "description": "Test workflow for Huggingface model", + "model": TEST_CAUSAL_MODEL, + "dataset": str(created_dataset.id), + "experiment_id": experiment_id, + "max_samples": 2, + } + create_workflow_response = local_client.post("/workflows/", headers=headers, json=payload) + assert create_workflow_response.status_code == 201 + get_experiments_response = local_client.get("/experiments/new/all") + assert get_experiments_response.status_code == 200 get_experiments = ListingResponse[ExperimentResponse].model_validate( get_experiments_response.json() ) - assert get_experiments.total > 0 + assert experiment_id in [str(exp.id) for exp in get_experiments.items] + experiment_id = get_experiments.items[0].id - get_experiment_response = local_client.get(f"/experiments_new/{get_experiments.items[0].id}") + get_experiment_response = local_client.get(f"/experiments/new/{experiment_id}") + logger.info(f"--> {get_experiment_response.text}") assert get_experiment_response.status_code == 200 - # response - get_jobs_per_experiment_response = local_client.get( - f"/experiments_new/{get_experiments.items[0].id}/jobs" - ) + get_jobs_per_experiment_response = local_client.get(f"/workflows/{experiment_id}/jobs") experiment_jobs = ListingResponse[JobResponse].model_validate( get_jobs_per_experiment_response.json() diff --git a/lumigator/python/mzai/schemas/lumigator_schemas/experiments.py b/lumigator/python/mzai/schemas/lumigator_schemas/experiments.py index fa2a1ec7f..1c119d50c 100644 --- a/lumigator/python/mzai/schemas/lumigator_schemas/experiments.py +++ b/lumigator/python/mzai/schemas/lumigator_schemas/experiments.py @@ -16,6 +16,11 @@ class ExperimentCreate(BaseModel): config_template: str | None = None +class ExperimentIdCreate(BaseModel): + name: str + description: str = "" + + class ExperimentResponse(BaseModel, from_attributes=True): id: UUID name: str diff --git a/lumigator/python/mzai/schemas/lumigator_schemas/jobs.py b/lumigator/python/mzai/schemas/lumigator_schemas/jobs.py index 760dfe417..3f13f1281 100644 --- a/lumigator/python/mzai/schemas/lumigator_schemas/jobs.py +++ b/lumigator/python/mzai/schemas/lumigator_schemas/jobs.py @@ -140,6 +140,12 @@ class JobResultDownloadResponse(BaseModel): download_url: str +class JobResults(BaseModel): + id: UUID + metric_url: str + artifact_url: str + + class Job(JobResponse, JobSubmissionResponse): """Job represents the composition of JobResponse and JobSubmissionResponse. diff --git a/lumigator/python/mzai/schemas/lumigator_schemas/workflows.py b/lumigator/python/mzai/schemas/lumigator_schemas/workflows.py new file mode 100644 index 000000000..a935b0aa4 --- /dev/null +++ b/lumigator/python/mzai/schemas/lumigator_schemas/workflows.py @@ -0,0 +1,44 @@ +import datetime +from uuid import UUID + +from pydantic import BaseModel + +from lumigator_schemas.jobs import JobResults, JobStatus + + +class WorkflowCreate(BaseModel): + name: str + description: str = "" + experiment_id: UUID + model: str + dataset: UUID + max_samples: int = -1 # set to all samples by default + model_url: str | None = None + system_prompt: str | None = None + inference_output_field: str = "predictions" + config_template: str | None = None + + +class WorkflowResponse(BaseModel, from_attributes=True): + id: UUID + experiment_id: UUID + name: str + description: str + status: JobStatus + created_at: datetime.datetime + updated_at: datetime.datetime | None = None + + +# TODO: This schema will need to be refined when the get_workflow route is implemented +class WorkflowDetailsResponse(BaseModel, from_attributes=True): + workflow_id: UUID + experiment_id: UUID + jobs: list[JobResults] + metrics: dict + artifacts: dict + parameters: dict + + +class WorkflowResultDownloadResponse(BaseModel): + id: UUID + download_url: str