From 4c2c0237e584a7751240faf6466810144a07d924 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Mon, 28 Oct 2024 15:45:36 -0400 Subject: [PATCH] prepare workflow input parsing from nested process output --- weaver/database/__init__.py | 22 +++++---- weaver/processes/esgf_process.py | 2 +- weaver/processes/execution.py | 69 ++++++++++++++++++++++++++-- weaver/processes/ogc_api_process.py | 15 +++--- weaver/processes/wps1_process.py | 13 +++--- weaver/processes/wps3_process.py | 4 +- weaver/processes/wps_process_base.py | 4 +- weaver/typedefs.py | 3 +- 8 files changed, 101 insertions(+), 31 deletions(-) diff --git a/weaver/database/__init__.py b/weaver/database/__init__.py index ad6339686..e944ec64a 100644 --- a/weaver/database/__init__.py +++ b/weaver/database/__init__.py @@ -8,17 +8,20 @@ from weaver.utils import get_registry, get_settings if TYPE_CHECKING: - from typing import Optional, Union + from typing import Union from pyramid.config import Configurator - from weaver.typedefs import AnyRegistryContainer, AnySettingsContainer + from weaver.database.base import DatabaseInterface + from weaver.typedefs import AnyDatabaseContainer, AnyRegistryContainer, AnySettingsContainer LOGGER = logging.getLogger(__name__) -def get_db(container=None, reset_connection=False): - # type: (Optional[Union[AnyRegistryContainer, AnySettingsContainer]], bool) -> MongoDatabase +def get_db( + container=None, # type: Union[AnyDatabaseContainer, AnyRegistryContainer, AnySettingsContainer, None] + reset_connection=False, # type: bool +): # type: (...) -> DatabaseInterface """ Obtains the database connection from configured application settings. @@ -29,10 +32,13 @@ def get_db(container=None, reset_connection=False): It is preferable to provide a registry reference to reuse any available connection whenever possible. Giving application settings will require establishing a new connection. """ - if not reset_connection and isinstance(container, Request): - db = getattr(container, "db", None) - if isinstance(db, MongoDatabase): - return db + if not reset_connection: + if isinstance(container, MongoDatabase): + return container + if isinstance(container, Request): + db = getattr(container, "db", None) + if isinstance(db, MongoDatabase): + return db registry = get_registry(container, nothrow=True) if not reset_connection and registry and isinstance(getattr(registry, "db", None), MongoDatabase): return registry.db diff --git a/weaver/processes/esgf_process.py b/weaver/processes/esgf_process.py index 5e42d40ee..f08428e8b 100644 --- a/weaver/processes/esgf_process.py +++ b/weaver/processes/esgf_process.py @@ -65,7 +65,7 @@ def __init__( self, provider, # type: str process, # type: str - request, # type: WorkerRequest + request, # type: Optional[WorkerRequest] update_status, # type: UpdateStatusPartialFunction ): # type: (...) -> None super().__init__(request=request, update_status=update_status) diff --git a/weaver/processes/execution.py b/weaver/processes/execution.py index 3747364db..ecdb0a1c4 100644 --- a/weaver/processes/execution.py +++ b/weaver/processes/execution.py @@ -47,6 +47,7 @@ ) from weaver.processes.types import ProcessType from weaver.processes.utils import get_process +from weaver.processes.wps_process_base import OGCAPIRemoteProcessBase from weaver.status import JOB_STATUS_CATEGORIES, Status, StatusCategory, map_status from weaver.store.base import StoreJobs, StoreProcesses from weaver.utils import ( @@ -98,6 +99,7 @@ from weaver.status import StatusType from weaver.typedefs import ( AnyAcceptLanguageHeader, + AnyDatabaseContainer, AnyHeadersContainer, AnyProcessRef, AnyResponseType, @@ -112,7 +114,8 @@ JSON, ProcessExecution, SettingsType, - Statistics + Statistics, + UpdateStatusPartialFunction ) from weaver.visibility import AnyVisibility @@ -180,7 +183,7 @@ def execute_process(task, job_id, wps_url, headers=None): # prepare inputs job.progress = JobProgress.GET_INPUTS job.save_log(logger=task_logger, message="Fetching job input definitions.") - wps_inputs = parse_wps_inputs(wps_process, job) + wps_inputs = parse_wps_inputs(wps_process, job, database=db) # prepare outputs job.progress = JobProgress.GET_OUTPUTS @@ -510,8 +513,35 @@ def parse_wps_input_literal(input_value): return str(input_value) -def parse_wps_inputs(wps_process, job): - # type: (ProcessOWS, Job) -> List[Tuple[str, OWS_Input_Type]] +def log_and_save_update_status_handler( + job, # type: Job + container, # type: AnyDatabaseContainer +): # type: (...) -> UpdateStatusPartialFunction + """ + Creates a :term:`Job` status update function that will immediately reflect the log message in the database. + + When log messages are generated and saved in the :term:`Job`, those details are not persisted to the database + until the updated :term:`Job` is entirely pushed to the database store. This causes clients querying the :term:`Job` + endpoints to not receive any latest update from performed operations until the execution returns to the main worker + monitoring loop, which will typically perform a :term:`Job` update "at some point". + + Using this handler, each time a message is pushed to the :term:`Job`, that update is also persisted by maintaining + a local database connection handle. However, because updating the entire :term:`Job` each time can become costly + and inefficient for multiple subsequent logs, this operation should be applied only on "important milestones" of + the execution steps. Any intermediate/subsequent logs should use the usual :meth:`Job.save_log` to "accumulate" the + log messages for a following "batch update" of the :term:`Job`. + """ + db = get_db(container) + store = db.get_store(StoreJobs) + + def log_and_update_status(message, progress, status, *_, **kwargs): + job.save_log(message=message, progress=progress, status=status, **kwargs) + store.update_job(job) + return log_and_update_status + + +def parse_wps_inputs(wps_process, job, container=None): + # type: (ProcessOWS, Job, Optional[AnyDatabaseContainer]) -> List[Tuple[str, OWS_Input_Type]] """ Parses expected :term:`WPS` process inputs against submitted job input values considering supported definitions. @@ -527,6 +557,7 @@ def parse_wps_inputs(wps_process, job): elif process_input.dataType == WPS_BOUNDINGBOX_DATA: bbox_inputs[process_input.identifier] = process_input + job_log_update_status_func = log_and_save_update_status_handler(job, container) try: wps_inputs = [] # parse both dict and list type inputs @@ -574,6 +605,34 @@ def parse_wps_inputs(wps_process, job): ) for col_file in col_files ]) + elif isinstance(input_value, dict) and "process" in input_value: + proc_uri = input_value["process"] + job_log_update_status_func( + message=( + f"Dispatching execution of nested process [{proc_uri}] " + f"for input [{input_id}] of [{job.process}]." + ), + logger=LOGGER, + ) + process = OGCAPIRemoteProcessBase( + input_value, + proc_uri, + request=None, + update_status=job_log_update_status_func, + ) + out_dir = os.path.join(job.tmpdir, "inputs") + results = process.execute(input_value.get("inputs"), out_dir, input_value.get("outputs")) + if not results: + raise ValueError( + f"Abort execution. Cannot map empty outputs from {proc_uri} " + f"to input [{input_id}] of [{job.process}]." + ) + if len(results) != 1: + raise ValueError( + f"Abort execution. Cannot map multiple outputs from {proc_uri} " + f"to input [{input_id}] of [{job.process}]." + ) + resolved_inputs.append((results[0], input_info)) else: resolved_inputs.append((input_value, input_info)) @@ -595,7 +654,7 @@ def parse_wps_inputs(wps_process, job): # re-validate the resolved data as applicable if input_data is None: - job.save_log( + job_log_update_status_func( message=f"Removing [{input_id}] data input from execution request, value was 'null'.", logger=LOGGER, level=logging.WARNING, ) diff --git a/weaver/processes/ogc_api_process.py b/weaver/processes/ogc_api_process.py index 969c2dba1..92b237586 100644 --- a/weaver/processes/ogc_api_process.py +++ b/weaver/processes/ogc_api_process.py @@ -4,6 +4,8 @@ from weaver.status import Status if TYPE_CHECKING: + from typing import Optional + from weaver.typedefs import JSON, UpdateStatusPartialFunction from weaver.wps.service import WorkerRequest @@ -11,12 +13,13 @@ class OGCAPIRemoteProcess(OGCAPIRemoteProcessBase): process_type = "OGC API" - def __init__(self, - step_payload, # type: JSON - process, # type: str - request, # type: WorkerRequest - update_status, # type: UpdateStatusPartialFunction - ): # type: (...) -> None + def __init__( + self, + step_payload, # type: JSON + process, # type: str + request, # type: Optional[WorkerRequest] + update_status, # type: UpdateStatusPartialFunction + ): # type: (...) -> None super(OGCAPIRemoteProcess, self).__init__(step_payload, process, request, update_status) self.url = process self.provider, self.process = process.rsplit("/processes/", 1) diff --git a/weaver/processes/wps1_process.py b/weaver/processes/wps1_process.py index d28fa8046..e3219a3e2 100644 --- a/weaver/processes/wps1_process.py +++ b/weaver/processes/wps1_process.py @@ -52,12 +52,13 @@ class Wps1RemoteJobProgress(RemoteJobProgress): class Wps1Process(WpsProcessInterface): - def __init__(self, - provider, # type: str - process, # type: str - request, # type: WorkerRequest - update_status, # type: UpdateStatusPartialFunction - ): # type: (...) -> None + def __init__( + self, + provider, # type: str + process, # type: str + request, # type: Optional[WorkerRequest] + update_status, # type: UpdateStatusPartialFunction + ): # type: (...) -> None self.provider = provider self.process = process # following are defined after 'prepare' step diff --git a/weaver/processes/wps3_process.py b/weaver/processes/wps3_process.py index 5d1a2577e..a814b485a 100644 --- a/weaver/processes/wps3_process.py +++ b/weaver/processes/wps3_process.py @@ -17,7 +17,7 @@ from weaver.wps_restapi import swagger_definitions as sd if TYPE_CHECKING: - from typing import Tuple, Union + from typing import Optional, Tuple, Union from weaver.typedefs import ( AnyHeadersContainer, @@ -69,7 +69,7 @@ def __init__( step_payload, # type: JSON job_order, # type: CWL_RuntimeInputsMap process, # type: str - request, # type: WorkerRequest + request, # type: Optional[WorkerRequest] update_status, # type: UpdateStatusPartialFunction ): # type: (...) -> None super(Wps3Process, self).__init__( diff --git a/weaver/processes/wps_process_base.py b/weaver/processes/wps_process_base.py index 909c74bb0..67b580047 100644 --- a/weaver/processes/wps_process_base.py +++ b/weaver/processes/wps_process_base.py @@ -94,7 +94,7 @@ class WpsProcessInterface(abc.ABC): """ def __init__(self, request, update_status): - # type: (WorkerRequest, UpdateStatusPartialFunction) -> None + # type: (Optional[WorkerRequest], UpdateStatusPartialFunction) -> None self.request = request self.headers = {"Accept": ContentType.APP_JSON, "Content-Type": ContentType.APP_JSON} self.settings = get_settings() @@ -433,7 +433,7 @@ class OGCAPIRemoteProcessBase(WpsProcessInterface, abc.ABC): def __init__(self, step_payload, # type: JSON process, # type: str - request, # type: WorkerRequest + request, # type: Optional[WorkerRequest] update_status, # type: UpdateStatusPartialFunction ): # type: (...) -> None super(OGCAPIRemoteProcessBase, self).__init__( diff --git a/weaver/typedefs.py b/weaver/typedefs.py index 76e146120..236732f8f 100644 --- a/weaver/typedefs.py +++ b/weaver/typedefs.py @@ -78,6 +78,7 @@ from werkzeug.datastructures.structures import MultiDict as WerkzeugMultiDict from werkzeug.wrappers import Request as WerkzeugRequest + from weaver.database.base import DatabaseInterface from weaver.datatype import Process, Service from weaver.execute import AnyExecuteControlOption, AnyExecuteMode, AnyExecuteResponse, AnyExecuteTransmissionMode from weaver.formats import AnyContentEncoding, AnyContentType @@ -359,7 +360,7 @@ class CWL_SchemaName(Protocol): SettingsType = Dict[str, SettingValue] AnySettingsContainer = Union[AnyContainer, SettingsType] AnyRegistryContainer = AnyContainer - AnyDatabaseContainer = AnyContainer + AnyDatabaseContainer = Union[AnyContainer, DatabaseInterface] AnyData = Union[str, bytes, bytearray] AnyDataStream = Union[AnyData, io.IOBase]