Skip to content

Commit

Permalink
prepare workflow input parsing from nested process output
Browse files Browse the repository at this point in the history
  • Loading branch information
fmigneault committed Oct 28, 2024
1 parent 5c3b294 commit 4c2c023
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 31 deletions.
22 changes: 14 additions & 8 deletions weaver/database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,20 @@
from weaver.utils import get_registry, get_settings

if TYPE_CHECKING:
from typing import Optional, Union
from typing import Union

from pyramid.config import Configurator

from weaver.typedefs import AnyRegistryContainer, AnySettingsContainer
from weaver.database.base import DatabaseInterface
from weaver.typedefs import AnyDatabaseContainer, AnyRegistryContainer, AnySettingsContainer

LOGGER = logging.getLogger(__name__)


def get_db(container=None, reset_connection=False):
# type: (Optional[Union[AnyRegistryContainer, AnySettingsContainer]], bool) -> MongoDatabase
def get_db(
container=None, # type: Union[AnyDatabaseContainer, AnyRegistryContainer, AnySettingsContainer, None]
reset_connection=False, # type: bool
): # type: (...) -> DatabaseInterface
"""
Obtains the database connection from configured application settings.
Expand All @@ -29,10 +32,13 @@ def get_db(container=None, reset_connection=False):
It is preferable to provide a registry reference to reuse any available connection whenever possible.
Giving application settings will require establishing a new connection.
"""
if not reset_connection and isinstance(container, Request):
db = getattr(container, "db", None)
if isinstance(db, MongoDatabase):
return db
if not reset_connection:
if isinstance(container, MongoDatabase):
return container
if isinstance(container, Request):
db = getattr(container, "db", None)
if isinstance(db, MongoDatabase):
return db
registry = get_registry(container, nothrow=True)
if not reset_connection and registry and isinstance(getattr(registry, "db", None), MongoDatabase):
return registry.db
Expand Down
2 changes: 1 addition & 1 deletion weaver/processes/esgf_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def __init__(
self,
provider, # type: str
process, # type: str
request, # type: WorkerRequest
request, # type: Optional[WorkerRequest]
update_status, # type: UpdateStatusPartialFunction
): # type: (...) -> None
super().__init__(request=request, update_status=update_status)
Expand Down
69 changes: 64 additions & 5 deletions weaver/processes/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
)
from weaver.processes.types import ProcessType
from weaver.processes.utils import get_process
from weaver.processes.wps_process_base import OGCAPIRemoteProcessBase
from weaver.status import JOB_STATUS_CATEGORIES, Status, StatusCategory, map_status
from weaver.store.base import StoreJobs, StoreProcesses
from weaver.utils import (
Expand Down Expand Up @@ -98,6 +99,7 @@
from weaver.status import StatusType
from weaver.typedefs import (
AnyAcceptLanguageHeader,
AnyDatabaseContainer,
AnyHeadersContainer,
AnyProcessRef,
AnyResponseType,
Expand All @@ -112,7 +114,8 @@
JSON,
ProcessExecution,
SettingsType,
Statistics
Statistics,
UpdateStatusPartialFunction
)
from weaver.visibility import AnyVisibility

Expand Down Expand Up @@ -180,7 +183,7 @@ def execute_process(task, job_id, wps_url, headers=None):
# prepare inputs
job.progress = JobProgress.GET_INPUTS
job.save_log(logger=task_logger, message="Fetching job input definitions.")
wps_inputs = parse_wps_inputs(wps_process, job)
wps_inputs = parse_wps_inputs(wps_process, job, database=db)

# prepare outputs
job.progress = JobProgress.GET_OUTPUTS
Expand Down Expand Up @@ -510,8 +513,35 @@ def parse_wps_input_literal(input_value):
return str(input_value)


def parse_wps_inputs(wps_process, job):
# type: (ProcessOWS, Job) -> List[Tuple[str, OWS_Input_Type]]
def log_and_save_update_status_handler(
job, # type: Job
container, # type: AnyDatabaseContainer
): # type: (...) -> UpdateStatusPartialFunction
"""
Creates a :term:`Job` status update function that will immediately reflect the log message in the database.
When log messages are generated and saved in the :term:`Job`, those details are not persisted to the database
until the updated :term:`Job` is entirely pushed to the database store. This causes clients querying the :term:`Job`
endpoints to not receive any latest update from performed operations until the execution returns to the main worker
monitoring loop, which will typically perform a :term:`Job` update "at some point".
Using this handler, each time a message is pushed to the :term:`Job`, that update is also persisted by maintaining
a local database connection handle. However, because updating the entire :term:`Job` each time can become costly
and inefficient for multiple subsequent logs, this operation should be applied only on "important milestones" of
the execution steps. Any intermediate/subsequent logs should use the usual :meth:`Job.save_log` to "accumulate" the
log messages for a following "batch update" of the :term:`Job`.
"""
db = get_db(container)
store = db.get_store(StoreJobs)

def log_and_update_status(message, progress, status, *_, **kwargs):
job.save_log(message=message, progress=progress, status=status, **kwargs)
store.update_job(job)
return log_and_update_status


def parse_wps_inputs(wps_process, job, container=None):
# type: (ProcessOWS, Job, Optional[AnyDatabaseContainer]) -> List[Tuple[str, OWS_Input_Type]]
"""
Parses expected :term:`WPS` process inputs against submitted job input values considering supported definitions.
Expand All @@ -527,6 +557,7 @@ def parse_wps_inputs(wps_process, job):
elif process_input.dataType == WPS_BOUNDINGBOX_DATA:
bbox_inputs[process_input.identifier] = process_input

job_log_update_status_func = log_and_save_update_status_handler(job, container)
try:
wps_inputs = []
# parse both dict and list type inputs
Expand Down Expand Up @@ -574,6 +605,34 @@ def parse_wps_inputs(wps_process, job):
)
for col_file in col_files
])
elif isinstance(input_value, dict) and "process" in input_value:
proc_uri = input_value["process"]
job_log_update_status_func(
message=(
f"Dispatching execution of nested process [{proc_uri}] "
f"for input [{input_id}] of [{job.process}]."
),
logger=LOGGER,
)
process = OGCAPIRemoteProcessBase(
input_value,
proc_uri,
request=None,
update_status=job_log_update_status_func,
)
out_dir = os.path.join(job.tmpdir, "inputs")
results = process.execute(input_value.get("inputs"), out_dir, input_value.get("outputs"))
if not results:
raise ValueError(
f"Abort execution. Cannot map empty outputs from {proc_uri} "
f"to input [{input_id}] of [{job.process}]."
)
if len(results) != 1:
raise ValueError(
f"Abort execution. Cannot map multiple outputs from {proc_uri} "
f"to input [{input_id}] of [{job.process}]."
)
resolved_inputs.append((results[0], input_info))
else:
resolved_inputs.append((input_value, input_info))

Expand All @@ -595,7 +654,7 @@ def parse_wps_inputs(wps_process, job):

# re-validate the resolved data as applicable
if input_data is None:
job.save_log(
job_log_update_status_func(
message=f"Removing [{input_id}] data input from execution request, value was 'null'.",
logger=LOGGER, level=logging.WARNING,
)
Expand Down
15 changes: 9 additions & 6 deletions weaver/processes/ogc_api_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,22 @@
from weaver.status import Status

if TYPE_CHECKING:
from typing import Optional

from weaver.typedefs import JSON, UpdateStatusPartialFunction
from weaver.wps.service import WorkerRequest


class OGCAPIRemoteProcess(OGCAPIRemoteProcessBase):
process_type = "OGC API"

def __init__(self,
step_payload, # type: JSON
process, # type: str
request, # type: WorkerRequest
update_status, # type: UpdateStatusPartialFunction
): # type: (...) -> None
def __init__(
self,
step_payload, # type: JSON
process, # type: str
request, # type: Optional[WorkerRequest]
update_status, # type: UpdateStatusPartialFunction
): # type: (...) -> None
super(OGCAPIRemoteProcess, self).__init__(step_payload, process, request, update_status)
self.url = process
self.provider, self.process = process.rsplit("/processes/", 1)
Expand Down
13 changes: 7 additions & 6 deletions weaver/processes/wps1_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,13 @@ class Wps1RemoteJobProgress(RemoteJobProgress):


class Wps1Process(WpsProcessInterface):
def __init__(self,
provider, # type: str
process, # type: str
request, # type: WorkerRequest
update_status, # type: UpdateStatusPartialFunction
): # type: (...) -> None
def __init__(
self,
provider, # type: str
process, # type: str
request, # type: Optional[WorkerRequest]
update_status, # type: UpdateStatusPartialFunction
): # type: (...) -> None
self.provider = provider
self.process = process
# following are defined after 'prepare' step
Expand Down
4 changes: 2 additions & 2 deletions weaver/processes/wps3_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from weaver.wps_restapi import swagger_definitions as sd

if TYPE_CHECKING:
from typing import Tuple, Union
from typing import Optional, Tuple, Union

from weaver.typedefs import (
AnyHeadersContainer,
Expand Down Expand Up @@ -69,7 +69,7 @@ def __init__(
step_payload, # type: JSON
job_order, # type: CWL_RuntimeInputsMap
process, # type: str
request, # type: WorkerRequest
request, # type: Optional[WorkerRequest]
update_status, # type: UpdateStatusPartialFunction
): # type: (...) -> None
super(Wps3Process, self).__init__(
Expand Down
4 changes: 2 additions & 2 deletions weaver/processes/wps_process_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class WpsProcessInterface(abc.ABC):
"""

def __init__(self, request, update_status):
# type: (WorkerRequest, UpdateStatusPartialFunction) -> None
# type: (Optional[WorkerRequest], UpdateStatusPartialFunction) -> None
self.request = request
self.headers = {"Accept": ContentType.APP_JSON, "Content-Type": ContentType.APP_JSON}
self.settings = get_settings()
Expand Down Expand Up @@ -433,7 +433,7 @@ class OGCAPIRemoteProcessBase(WpsProcessInterface, abc.ABC):
def __init__(self,
step_payload, # type: JSON
process, # type: str
request, # type: WorkerRequest
request, # type: Optional[WorkerRequest]
update_status, # type: UpdateStatusPartialFunction
): # type: (...) -> None
super(OGCAPIRemoteProcessBase, self).__init__(
Expand Down
3 changes: 2 additions & 1 deletion weaver/typedefs.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
from werkzeug.datastructures.structures import MultiDict as WerkzeugMultiDict
from werkzeug.wrappers import Request as WerkzeugRequest

from weaver.database.base import DatabaseInterface
from weaver.datatype import Process, Service
from weaver.execute import AnyExecuteControlOption, AnyExecuteMode, AnyExecuteResponse, AnyExecuteTransmissionMode
from weaver.formats import AnyContentEncoding, AnyContentType
Expand Down Expand Up @@ -359,7 +360,7 @@ class CWL_SchemaName(Protocol):
SettingsType = Dict[str, SettingValue]
AnySettingsContainer = Union[AnyContainer, SettingsType]
AnyRegistryContainer = AnyContainer
AnyDatabaseContainer = AnyContainer
AnyDatabaseContainer = Union[AnyContainer, DatabaseInterface]

AnyData = Union[str, bytes, bytearray]
AnyDataStream = Union[AnyData, io.IOBase]
Expand Down

0 comments on commit 4c2c023

Please sign in to comment.