-
Notifications
You must be signed in to change notification settings - Fork 12
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
1 changed file
with
317 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,317 @@ | ||
"""Base classes used by Colmena to describe functions being executed by a workflow engine""" | ||
import os | ||
import shlex | ||
import logging | ||
import platform | ||
from io import StringIO | ||
from pathlib import Path | ||
from subprocess import run | ||
from tempfile import TemporaryDirectory | ||
from time import perf_counter | ||
from inspect import signature, isgeneratorfunction | ||
from typing import Any, Dict, List, Tuple, Optional, Callable, Generator | ||
|
||
from colmena.models.results import ResourceRequirements, Result, FailureInformation | ||
from colmena.proxy import resolve_proxies_async, store_proxy_stats | ||
from colmena.queue import ColmenaQueues | ||
|
||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class ColmenaMethod: | ||
"""Base wrapper for a Python function run as part of a Colmena workflow | ||
The wrapper handles the parts of running a Colmena task that are beyond running the function, | ||
such as serialization, timing, interfaces to ProxyStore. | ||
""" | ||
|
||
name: str | ||
"""Name used to identify the function""" | ||
|
||
@property | ||
def __name__(self): | ||
return self.name | ||
|
||
def function(self, *args, **kwargs) -> Any: | ||
"""Function provided by the Colmena user""" | ||
raise NotImplementedError() | ||
|
||
def __call__(self, result: Result, queues: Optional[ColmenaQueues] = None) -> Result: | ||
"""Invoke a Colmena task request | ||
Args: | ||
result: Request, which inclues the arguments and will hold the result | ||
queues: Queues used to send intermediate results back [Not Yet Used] | ||
Returns: | ||
The input result object, populated with the results | ||
""" | ||
# Mark that compute has started on the worker | ||
result.mark_compute_started() | ||
|
||
# Unpack the inputs | ||
result.time.deserialize_inputs = result.deserialize() | ||
|
||
# Start resolving any proxies in the input asynchronously | ||
start_time = perf_counter() | ||
input_proxies = [] | ||
for arg in result.args: | ||
input_proxies.extend(resolve_proxies_async(arg)) | ||
for value in result.kwargs.values(): | ||
input_proxies.extend(resolve_proxies_async(value)) | ||
result.time.async_resolve_proxies = perf_counter() - start_time | ||
|
||
# Execute the function | ||
start_time = perf_counter() | ||
success = True | ||
try: | ||
if '_resources' in result.kwargs: | ||
logger.warning('`_resources` provided as a kwargs. Unexpected things are about to happen') | ||
if '_resources' in signature(self.function).parameters: | ||
output = self.function(*result.args, **result.kwargs, _resources=result.resources) | ||
else: | ||
output = self.function(*result.args, **result.kwargs) | ||
except BaseException as e: | ||
output = None | ||
success = False | ||
result.failure_info = FailureInformation.from_exception(e) | ||
finally: | ||
end_time = perf_counter() | ||
|
||
# Store the results | ||
result.set_result(output, end_time - start_time) | ||
if not success: | ||
result.success = False | ||
|
||
# Add the worker information into the tasks, if available | ||
worker_info = {} | ||
# TODO (wardlt): Move this information into a separate, parsl-specific wrapper | ||
for tag in ['PARSL_WORKER_RANK', 'PARSL_WORKER_POOL_ID']: | ||
if tag in os.environ: | ||
worker_info[tag] = os.environ[tag] | ||
worker_info['hostname'] = platform.node() | ||
result.worker_info = worker_info | ||
|
||
result.mark_compute_ended() | ||
|
||
# Re-pack the results. Will store the proxy statistics | ||
result.time.serialize_results, _ = result.serialize() | ||
|
||
# Get the statistics for the proxy resolution | ||
for proxy in input_proxies: | ||
store_proxy_stats(proxy, result.time.proxy) | ||
|
||
return result | ||
|
||
|
||
class PythonMethod(ColmenaMethod): | ||
"""A Python function to be executed on a single worker | ||
Args: | ||
function: Generator function to be executed | ||
name: Name of the function. Defaults to `function.__name__` | ||
""" | ||
|
||
function: Callable | ||
|
||
def __init__(self, function: Callable, name: Optional[str] = None) -> None: | ||
if isgeneratorfunction(function): | ||
raise ValueError('Function is a generator function. Use `PythonGeneratorTask` instead.') | ||
self.name = name or function.__name__ | ||
self.function = function | ||
|
||
|
||
class PythonGeneratorMethod(ColmenaMethod): | ||
"""Python function which runs on a single worker and generates results iteratively | ||
Args: | ||
function: Generator function to be executed | ||
name: Name of the function. Defaults to `function.__name__` | ||
store_return_value: Whether to capture the `return value <https://docs.python.org/3/reference/simple_stmts.html#the-return-statement>`_ | ||
of the generator and store it in the Result object. | ||
""" | ||
|
||
def __init__(self, | ||
function: Callable[..., Generator], | ||
name: Optional[str] = None, | ||
store_return_value: bool = False) -> None: | ||
if not isgeneratorfunction(function): | ||
raise ValueError('Function is not a generator function. Use `PythonTask` instead.') | ||
self._function = function | ||
self.name = name or function.__name__ | ||
self.store_return_value = store_return_value | ||
|
||
def function(self, *args, **kwargs) -> Any: | ||
"""Run the Colmena task and collect intermediate results to provide as a list""" | ||
|
||
# TODO (wardlt): Have the function push intemediate results back to a function queue | ||
gen = self._function(*args, **kwargs) | ||
iter_results = [] | ||
while True: | ||
try: | ||
iter_results.append(next(gen)) | ||
except StopIteration as e: | ||
if self.store_return_value: | ||
return iter_results, e.value | ||
else: | ||
return iter_results | ||
|
||
|
||
class ExecutableMethod(ColmenaMethod): | ||
"""Task that involves running an executable using a system call. | ||
Such tasks often include a "pre-processing" step in Python that prepares inputs for the executable | ||
and a "post-processing" step which stores the outputs (either produced from stdout or written to files) | ||
as Python objects. | ||
Separating the task into these two functions and a system call for launching the program | ||
simplifies development (shorter functions that ar easier to test), and allows some workflow | ||
engines to improve performance by running processing and execution tasks separately. | ||
Implement a new ExecutableTask by defining the executable, a preprocessing method (:meth:`preprocess`), | ||
and a postprocessing method (:meth:`postprocess`). | ||
Use the ExecutableTask by instantiating a copy of your new class and then passing it to the task server | ||
as you would with any other function. | ||
**MPI Executables** | ||
Launching an MPI executable requires two parts: a path to an executable and a preamble defining how to launch it. | ||
Defining an MPI application using the instructions described above and then set the :attr:`mpi` attribute to ``True``. | ||
This will tell the Colmena task server to look for a "preamble" for how to launch the application. | ||
You may need to supply an MPI command invocation recipe for your particular cluster, depending on your choice of task server. | ||
Supply a template as the ``mpi_command_string`` field, which will be converted | ||
by `Python's string format function <https://docs.python.org/3/library/string.html#format-string-syntax>`_ | ||
to produce a version of the command with the specific resource requirements of your task | ||
by the :meth:`render_mpi_launch` method. | ||
The attributes of this class (e.g., ``node_count``, ``total_ranks``) will be used as arguments to `format`. | ||
For example, a template of ``aprun -N {total_ranks} -n {cpu_process}`` will produce ``aprun -N 6 -n 3`` if you | ||
specify ``node_count=2`` and ``cpu_processes=3``. | ||
Args: | ||
executable: List of executable arguments | ||
name: Name used for the task. Defaults to ``executable[0]`` | ||
mpi: Whether to use MPI to launch the exectuable | ||
mpi_command_string: Template for MPI launcher. See :attr:`mpi_command_string`. | ||
""" | ||
|
||
executable: List[str] | ||
"""Command used to launch the executable""" | ||
|
||
mpi: bool = False | ||
"""Whether this is an MPI executable""" | ||
|
||
mpi_command_string: Optional[str] = None | ||
"""Template string defining how to launch this application using MPI. | ||
Should include placeholders named after the fields in ResourceRequirements marked using {}'s. | ||
Example: `mpirun -np {total_ranks}`""" | ||
|
||
def __init__(self, executable: List[str], name: Optional[str] = None, | ||
mpi: bool = False, mpi_command_string: Optional[str] = None) -> None: | ||
super().__init__() | ||
self.name = name or executable[0] | ||
self.executable = executable | ||
self.mpi = mpi | ||
self.mpi_command_string = mpi_command_string | ||
|
||
def render_mpi_launch(self, resources: ResourceRequirements) -> str: | ||
"""Create an MPI launch command given the configuration | ||
Returns: | ||
MPI launch configuration | ||
""" | ||
return self.mpi_command_string.format(total_ranks=resources.total_ranks, | ||
**resources.dict(exclude={'mpi_command_string'})) | ||
|
||
def preprocess(self, run_dir: Path, args: Tuple[Any], kwargs: Dict[str, Any]) -> Tuple[List[str], Optional[str]]: | ||
"""Perform preprocessing steps necessary to prepare for executable to be started. | ||
These may include writing files to the local directory, creating CLI arguments, | ||
or standard input to be passed to the executable | ||
Args: | ||
run_dir: Path to a directory in which to write files used by an executable | ||
args: Arguments to the task, control how the run is set up | ||
kwargs: Keyword arguments to the function | ||
Returns: | ||
- Options to be passed as command line arguments to the executable | ||
- Values to pass to the standard in of the executable | ||
""" | ||
raise NotImplementedError() | ||
|
||
def execute(self, run_dir: Path, arguments: List[str], stdin: Optional[str], | ||
resources: Optional[ResourceRequirements] = None) -> float: | ||
"""Run an executable | ||
Args: | ||
run_dir: Directory in which to execute the code | ||
arguments: Command line arguments | ||
stdin: Content to pass in via standard in | ||
resources: Amount of resources to use for the application | ||
Returns: | ||
Runtime (unit: s) | ||
""" | ||
|
||
# Make the shell command to be launched | ||
shell_cmd = self.assemble_shell_cmd(arguments, resources) | ||
logger.debug(f'Launching shell command: {" ".join(shell_cmd)}') | ||
|
||
# Launch it, routing the stdout and stderr as appropriate | ||
start_time = perf_counter() | ||
with open(run_dir / 'colmena.stdout', 'w') as fo, open(run_dir / 'colmena.stderr', 'w') as fe: | ||
if stdin is not None: | ||
stdin = StringIO(stdin) | ||
run(shell_cmd, stdout=fo, stderr=fe, stdin=stdin, cwd=run_dir) | ||
return perf_counter() - start_time | ||
|
||
def assemble_shell_cmd(self, arguments: List[str], resources: ResourceRequirements) -> List[str]: | ||
"""Assemble the shell command to be launched | ||
Args: | ||
arguments: Command line arguments | ||
resources: Resource requirements | ||
Returns: | ||
Components of the shell command | ||
""" | ||
|
||
# If resources are provided and the task is an MPI, generate the MPI executor | ||
if self.mpi: | ||
assert resources is not None, "Resources must be specified for MPI tasks" | ||
preamble = shlex.split(self.render_mpi_launch(resources)) | ||
else: | ||
preamble = [] | ||
|
||
# Get the full shell command | ||
shell_cmd = preamble + self.executable + arguments | ||
return shell_cmd | ||
|
||
def postprocess(self, run_dir: Path) -> Any: | ||
"""Extract results after execution completes | ||
Args: | ||
run_dir: Run directory for the executable. Stdout will be written to `run_dir/colmena.stdout` | ||
and stderr to `run_dir/colmena.stderr` | ||
""" | ||
raise NotImplementedError() | ||
|
||
def function(self, *args, _resources: Optional[ResourceRequirements] = None, **kwargs): | ||
"""Execute the function | ||
Args: | ||
args: Positional arguments | ||
kwargs: Keyword arguments | ||
_resources: Resources available. Optional. Only used for MPI tasks. | ||
""" | ||
# Launch everything inside a temporary directory | ||
with TemporaryDirectory() as run_dir: | ||
run_dir = Path(run_dir) | ||
|
||
# Prepare the run directory | ||
cli_args, stdin = self.preprocess(run_dir, args, kwargs) | ||
|
||
# Execute everything | ||
self.execute(run_dir, cli_args, stdin, resources=_resources) | ||
|
||
# Return the post-processed results | ||
return self.postprocess(run_dir) |