From fb4f84860d7835969c9d10b1efae77a9c23c8db3 Mon Sep 17 00:00:00 2001 From: lward Date: Fri, 8 Mar 2024 09:10:37 -0500 Subject: [PATCH] Change "Task" to "Method" to be consistent with queues --- colmena/models/tasks.py | 317 ------------------------- colmena/task_server/base.py | 10 +- colmena/task_server/globus.py | 4 +- colmena/task_server/parsl.py | 14 +- colmena/task_server/tests/test_base.py | 6 +- colmena/tests/test_task_model.py | 14 +- demo_apps/mpi-tasks/sim.py | 4 +- 7 files changed, 26 insertions(+), 343 deletions(-) delete mode 100644 colmena/models/tasks.py diff --git a/colmena/models/tasks.py b/colmena/models/tasks.py deleted file mode 100644 index 1e44f5c..0000000 --- a/colmena/models/tasks.py +++ /dev/null @@ -1,317 +0,0 @@ -"""Base classes used by Colmena to describe functions being executed by a workflow engine""" -import os -import shlex -import logging -import platform -from io import StringIO -from pathlib import Path -from subprocess import run -from tempfile import TemporaryDirectory -from time import perf_counter -from inspect import signature, isgeneratorfunction -from typing import Any, Dict, List, Tuple, Optional, Callable, Generator - -from colmena.models.results import ResourceRequirements, Result, FailureInformation -from colmena.proxy import resolve_proxies_async, store_proxy_stats -from colmena.queue import ColmenaQueues - - -logger = logging.getLogger(__name__) - - -class ColmenaTask: - """Base wrapper for a Python function run as part of a Colmena workflow - - The wrapper handles the parts of running a Colmena task that are beyond running the function, - such as serialization, timing, interfaces to ProxyStore. - """ - - name: str - """Name used to identify the function""" - - @property - def __name__(self): - return self.name - - def function(self, *args, **kwargs) -> Any: - """Function provided by the Colmena user""" - raise NotImplementedError() - - def __call__(self, result: Result, queues: Optional[ColmenaQueues] = None) -> Result: - """Invoke a Colmena task request - - Args: - result: Request, which inclues the arguments and will hold the result - queues: Queues used to send intermediate results back [Not Yet Used] - Returns: - The input result object, populated with the results - """ - # Mark that compute has started on the worker - result.mark_compute_started() - - # Unpack the inputs - result.time.deserialize_inputs = result.deserialize() - - # Start resolving any proxies in the input asynchronously - start_time = perf_counter() - input_proxies = [] - for arg in result.args: - input_proxies.extend(resolve_proxies_async(arg)) - for value in result.kwargs.values(): - input_proxies.extend(resolve_proxies_async(value)) - result.time.async_resolve_proxies = perf_counter() - start_time - - # Execute the function - start_time = perf_counter() - success = True - try: - if '_resources' in result.kwargs: - logger.warning('`_resources` provided as a kwargs. Unexpected things are about to happen') - if '_resources' in signature(self.function).parameters: - output = self.function(*result.args, **result.kwargs, _resources=result.resources) - else: - output = self.function(*result.args, **result.kwargs) - except BaseException as e: - output = None - success = False - result.failure_info = FailureInformation.from_exception(e) - finally: - end_time = perf_counter() - - # Store the results - result.set_result(output, end_time - start_time) - if not success: - result.success = False - - # Add the worker information into the tasks, if available - worker_info = {} - # TODO (wardlt): Move this information into a separate, parsl-specific wrapper - for tag in ['PARSL_WORKER_RANK', 'PARSL_WORKER_POOL_ID']: - if tag in os.environ: - worker_info[tag] = os.environ[tag] - worker_info['hostname'] = platform.node() - result.worker_info = worker_info - - result.mark_compute_ended() - - # Re-pack the results. Will store the proxy statistics - result.time.serialize_results, _ = result.serialize() - - # Get the statistics for the proxy resolution - for proxy in input_proxies: - store_proxy_stats(proxy, result.time.proxy) - - return result - - -class PythonTask(ColmenaTask): - """A Python function to be executed on a single worker - - Args: - function: Generator function to be executed - name: Name of the function. Defaults to `function.__name__` - """ - - function: Callable - - def __init__(self, function: Callable, name: Optional[str] = None) -> None: - if isgeneratorfunction(function): - raise ValueError('Function is a generator function. Use `PythonGeneratorTask` instead.') - self.name = name or function.__name__ - self.function = function - - -class PythonGeneratorTask(ColmenaTask): - """Python task which runs on a single worker and generates results iteratively - - Args: - function: Generator function to be executed - name: Name of the function. Defaults to `function.__name__` - store_return_value: Whether to capture the `return value `_ - of the generator and store it in the Result object. - """ - - def __init__(self, - function: Callable[..., Generator], - name: Optional[str] = None, - store_return_value: bool = False) -> None: - if not isgeneratorfunction(function): - raise ValueError('Function is not a generator function. Use `PythonTask` instead.') - self._function = function - self.name = name or function.__name__ - self.store_return_value = store_return_value - - def function(self, *args, **kwargs) -> Any: - """Run the Colmena task and collect intermediate results to provide as a list""" - - # TODO (wardlt): Have the function push intemediate results back to a function queue - gen = self._function(*args, **kwargs) - iter_results = [] - while True: - try: - iter_results.append(next(gen)) - except StopIteration as e: - if self.store_return_value: - return iter_results, e.value - else: - return iter_results - - -class ExecutableTask(ColmenaTask): - """Task that involves running an executable using a system call. - - Such tasks often include a "pre-processing" step in Python that prepares inputs for the executable - and a "post-processing" step which stores the outputs (either produced from stdout or written to files) - as Python objects. - - Separating the task into these two functions and a system call for launching the program - simplifies development (shorter functions that ar easier to test), and allows some workflow - engines to improve performance by running processing and execution tasks separately. - - Implement a new ExecutableTask by defining the executable, a preprocessing method (:meth:`preprocess`), - and a postprocessing method (:meth:`postprocess`). - - Use the ExecutableTask by instantiating a copy of your new class and then passing it to the task server - as you would with any other function. - - **MPI Executables** - - Launching an MPI executable requires two parts: a path to an executable and a preamble defining how to launch it. - Defining an MPI application using the instructions described above and then set the :attr:`mpi` attribute to ``True``. - This will tell the Colmena task server to look for a "preamble" for how to launch the application. - - You may need to supply an MPI command invocation recipe for your particular cluster, depending on your choice of task server. - Supply a template as the ``mpi_command_string`` field, which will be converted - by `Python's string format function `_ - to produce a version of the command with the specific resource requirements of your task - by the :meth:`render_mpi_launch` method. - The attributes of this class (e.g., ``node_count``, ``total_ranks``) will be used as arguments to `format`. - For example, a template of ``aprun -N {total_ranks} -n {cpu_process}`` will produce ``aprun -N 6 -n 3`` if you - specify ``node_count=2`` and ``cpu_processes=3``. - - Args: - executable: List of executable arguments - name: Name used for the task. Defaults to ``executable[0]`` - mpi: Whether to use MPI to launch the exectuable - mpi_command_string: Template for MPI launcher. See :attr:`mpi_command_string`. - """ - - executable: List[str] - """Command used to launch the executable""" - - mpi: bool = False - """Whether this is an MPI executable""" - - mpi_command_string: Optional[str] = None - """Template string defining how to launch this application using MPI. - Should include placeholders named after the fields in ResourceRequirements marked using {}'s. - Example: `mpirun -np {total_ranks}`""" - - def __init__(self, executable: List[str], name: Optional[str] = None, - mpi: bool = False, mpi_command_string: Optional[str] = None) -> None: - super().__init__() - self.name = name or executable[0] - self.executable = executable - self.mpi = mpi - self.mpi_command_string = mpi_command_string - - def render_mpi_launch(self, resources: ResourceRequirements) -> str: - """Create an MPI launch command given the configuration - - Returns: - MPI launch configuration - """ - return self.mpi_command_string.format(total_ranks=resources.total_ranks, - **resources.dict(exclude={'mpi_command_string'})) - - def preprocess(self, run_dir: Path, args: Tuple[Any], kwargs: Dict[str, Any]) -> Tuple[List[str], Optional[str]]: - """Perform preprocessing steps necessary to prepare for executable to be started. - - These may include writing files to the local directory, creating CLI arguments, - or standard input to be passed to the executable - - Args: - run_dir: Path to a directory in which to write files used by an executable - args: Arguments to the task, control how the run is set up - kwargs: Keyword arguments to the function - Returns: - - Options to be passed as command line arguments to the executable - - Values to pass to the standard in of the executable - """ - raise NotImplementedError() - - def execute(self, run_dir: Path, arguments: List[str], stdin: Optional[str], - resources: Optional[ResourceRequirements] = None) -> float: - """Run an executable - - Args: - run_dir: Directory in which to execute the code - arguments: Command line arguments - stdin: Content to pass in via standard in - resources: Amount of resources to use for the application - Returns: - Runtime (unit: s) - """ - - # Make the shell command to be launched - shell_cmd = self.assemble_shell_cmd(arguments, resources) - logger.debug(f'Launching shell command: {" ".join(shell_cmd)}') - - # Launch it, routing the stdout and stderr as appropriate - start_time = perf_counter() - with open(run_dir / 'colmena.stdout', 'w') as fo, open(run_dir / 'colmena.stderr', 'w') as fe: - if stdin is not None: - stdin = StringIO(stdin) - run(shell_cmd, stdout=fo, stderr=fe, stdin=stdin, cwd=run_dir) - return perf_counter() - start_time - - def assemble_shell_cmd(self, arguments: List[str], resources: ResourceRequirements) -> List[str]: - """Assemble the shell command to be launched - - Args: - arguments: Command line arguments - resources: Resource requirements - Returns: - Components of the shell command - """ - - # If resources are provided and the task is an MPI, generate the MPI executor - if self.mpi: - assert resources is not None, "Resources must be specified for MPI tasks" - preamble = shlex.split(self.render_mpi_launch(resources)) - else: - preamble = [] - - # Get the full shell command - shell_cmd = preamble + self.executable + arguments - return shell_cmd - - def postprocess(self, run_dir: Path) -> Any: - """Extract results after execution completes - - Args: - run_dir: Run directory for the executable. Stdout will be written to `run_dir/colmena.stdout` - and stderr to `run_dir/colmena.stderr` - """ - raise NotImplementedError() - - def function(self, *args, _resources: Optional[ResourceRequirements] = None, **kwargs): - """Execute the function - - Args: - args: Positional arguments - kwargs: Keyword arguments - _resources: Resources available. Optional. Only used for MPI tasks. - """ - # Launch everything inside a temporary directory - with TemporaryDirectory() as run_dir: - run_dir = Path(run_dir) - - # Prepare the run directory - cli_args, stdin = self.preprocess(run_dir, args, kwargs) - - # Execute everything - self.execute(run_dir, cli_args, stdin, resources=_resources) - - # Return the post-processed results - return self.postprocess(run_dir) diff --git a/colmena/task_server/base.py b/colmena/task_server/base.py index 916b8dd..07177e8 100644 --- a/colmena/task_server/base.py +++ b/colmena/task_server/base.py @@ -7,7 +7,7 @@ from typing import Collection, Optional, Callable, Union from colmena.exceptions import KillSignalException, TimeoutException -from colmena.models.tasks import ColmenaTask, PythonGeneratorTask, PythonTask +from colmena.models.methods import ColmenaMethod, PythonGeneratorMethod, PythonMethod from colmena.models import Result, FailureInformation from colmena.queue.base import ColmenaQueues @@ -161,7 +161,7 @@ def process_queue(self, topic: str, task: Result): future.add_done_callback(lambda x: self.perform_callback(x, task, topic)) -def convert_to_colmena_task(function: Union[Callable, ColmenaTask]) -> ColmenaTask: +def convert_to_colmena_method(function: Union[Callable, ColmenaMethod]) -> ColmenaMethod: """Wrap user-supplified functions in the task model wrapper, if needed Args: @@ -170,9 +170,9 @@ def convert_to_colmena_task(function: Union[Callable, ColmenaTask]) -> ColmenaTa Function as appropriate subclasses of Colmena Task wrapper """ - if isinstance(function, ColmenaTask): + if isinstance(function, ColmenaMethod): return function elif isgeneratorfunction(function): - return PythonGeneratorTask(function) + return PythonGeneratorMethod(function) else: - return PythonTask(function) + return PythonMethod(function) diff --git a/colmena/task_server/globus.py b/colmena/task_server/globus.py index ead7ed1..fd1437f 100644 --- a/colmena/task_server/globus.py +++ b/colmena/task_server/globus.py @@ -10,7 +10,7 @@ from globus_compute_sdk import Client, Executor -from colmena.task_server.base import convert_to_colmena_task, FutureBasedTaskServer +from colmena.task_server.base import convert_to_colmena_method, FutureBasedTaskServer from colmena.queue.python import PipeQueues from colmena.models import Result @@ -58,7 +58,7 @@ def __init__(self, self.registered_funcs: Dict[str, Tuple[str, str]] = {} # Function name -> (funcX id, endpoints) for func, endpoint in methods.items(): # Register a wrapped version of the function - task = convert_to_colmena_task(func) + task = convert_to_colmena_method(func) func_fxid = self.fx_client.register_function(task) # Store the information for the function diff --git a/colmena/task_server/parsl.py b/colmena/task_server/parsl.py index c17ca14..986d64c 100644 --- a/colmena/task_server/parsl.py +++ b/colmena/task_server/parsl.py @@ -17,18 +17,18 @@ from parsl.app.bash import BashApp from parsl.config import Config from parsl.app.python import PythonApp -from colmena.models.tasks import ExecutableTask +from colmena.models.methods import ExecutableMethod from colmena.queue.base import ColmenaQueues from colmena.models import Result, FailureInformation, ResourceRequirements from colmena.proxy import resolve_proxies_async -from colmena.task_server.base import convert_to_colmena_task, FutureBasedTaskServer +from colmena.task_server.base import convert_to_colmena_method, FutureBasedTaskServer logger = logging.getLogger(__name__) # Functions related to splitting "ExecutableTasks" into multiple steps -def _execute_preprocess(task: ExecutableTask, result: Result) -> Tuple[Result, Path, Tuple[List[str], Optional[str]]]: +def _execute_preprocess(task: ExecutableMethod, result: Result) -> Tuple[Result, Path, Tuple[List[str], Optional[str]]]: """Perform the pre-processing step for an executable task Must execute on the remote system @@ -85,7 +85,7 @@ def _execute_preprocess(task: ExecutableTask, result: Result) -> Tuple[Result, P return result, temp_dir, output -def _execute_postprocess(task: ExecutableTask, exit_code: int, result: Result, temp_dir: Path, serialized_inputs: str): +def _execute_postprocess(task: ExecutableMethod, exit_code: int, result: Result, temp_dir: Path, serialized_inputs: str): """Execute the post-processing function after an executable completes Args: @@ -134,7 +134,7 @@ def _execute_postprocess(task: ExecutableTask, exit_code: int, result: Result, t return result -def _execute_execute(task: ExecutableTask, task_path: Path, arguments: List[str], +def _execute_execute(task: ExecutableMethod, task_path: Path, arguments: List[str], stdin: Optional[str], cpu_process_type: str, *, stdout: str, stderr: str, pre_exec: str = None, **kwargs) -> str: """Execute the executable step of an executable task @@ -353,11 +353,11 @@ def __init__(self, methods: List[Union[Callable, Tuple[Callable, Dict]]], logger.info(f'Using default executors for {function.__name__}: {default_executors}') # Convert the function to a Colmena task - function = convert_to_colmena_task(method) + function = convert_to_colmena_method(method) name = function.name # If the function is not an executable, submit it as a single task - if not isinstance(function, ExecutableTask): + if not isinstance(function, ExecutableMethod): app = PythonApp(function, **options) self.methods_[name] = (app, 'basic') else: diff --git a/colmena/task_server/tests/test_base.py b/colmena/task_server/tests/test_base.py index de1cf90..c61937f 100644 --- a/colmena/task_server/tests/test_base.py +++ b/colmena/task_server/tests/test_base.py @@ -1,10 +1,10 @@ from typing import Any, Dict, Tuple, List, Optional from pathlib import Path -from colmena.models.tasks import ExecutableTask +from colmena.models.methods import ExecutableMethod -class EchoTask(ExecutableTask): +class EchoTask(ExecutableMethod): def __init__(self): super().__init__(executable=['echo']) @@ -15,7 +15,7 @@ def postprocess(self, run_dir: Path) -> Any: return (run_dir / 'colmena.stdout').read_text() -class FakeMPITask(ExecutableTask): +class FakeMPITask(ExecutableMethod): def __init__(self): super().__init__(executable=['echo', '-n'], name='fakempitask', diff --git a/colmena/tests/test_task_model.py b/colmena/tests/test_task_model.py index b575701..981a681 100644 --- a/colmena/tests/test_task_model.py +++ b/colmena/tests/test_task_model.py @@ -8,11 +8,11 @@ from proxystore.store import register_store from proxystore.store import unregister_store -from colmena.models.tasks import ExecutableTask, PythonTask, PythonGeneratorTask +from colmena.models.methods import ExecutableMethod, PythonMethod, PythonGeneratorMethod from colmena.models import ResourceRequirements, Result, SerializationMethod -class EchoTask(ExecutableTask): +class EchoTask(ExecutableMethod): def __init__(self): super().__init__(executable=['echo']) @@ -53,10 +53,10 @@ def test_python_task(result): """A standard Python function""" # Make sure name resolution functions as intended - task = PythonTask(function=echo) + task = PythonMethod(function=echo) assert task.name == 'echo' - task = PythonTask(function=echo, name='test') + task = PythonMethod(function=echo, name='test') assert task.name == 'test' # Ensure it sets the task timings while running @@ -70,7 +70,7 @@ def test_python_task(result): def test_generator(result): - task = PythonGeneratorTask(function=generator, store_return_value=False) + task = PythonGeneratorMethod(function=generator, store_return_value=False) assert task.name == 'generator' result = task(result) @@ -80,7 +80,7 @@ def test_generator(result): def test_generator_with_return(result): - task = PythonGeneratorTask(function=generator, name='with_return', store_return_value=True) + task = PythonGeneratorMethod(function=generator, name='with_return', store_return_value=True) assert task.name == 'with_return' result = task(result) @@ -131,7 +131,7 @@ def test_run_function(store): result.serialize() # Run the function - task = PythonTask(lambda x: x.upper(), name='upper') + task = PythonMethod(lambda x: x.upper(), name='upper') result = task(result) # Make sure the timings are all set diff --git a/demo_apps/mpi-tasks/sim.py b/demo_apps/mpi-tasks/sim.py index 155d7b7..f836a76 100644 --- a/demo_apps/mpi-tasks/sim.py +++ b/demo_apps/mpi-tasks/sim.py @@ -1,9 +1,9 @@ from pathlib import Path -from colmena.models.tasks import ExecutableTask +from colmena.models.methods import ExecutableMethod -class Simulation(ExecutableTask): +class Simulation(ExecutableMethod): def __init__(self, executable: Path): super().__init__(executable=[executable.absolute()],