diff --git a/docs/appendices/environment_vars.rst b/docs/appendices/environment_vars.rst index 6d7c9246c8..ca604a26f7 100644 --- a/docs/appendices/environment_vars.rst +++ b/docs/appendices/environment_vars.rst @@ -128,6 +128,8 @@ There are several environment variables that affect the way Toil runs. | | access to S3 and SimpleDB, the AWS job store will | | | not be usable. | +----------------------------------+----------------------------------------------------+ +| TOIL_AWS_ANONYMOUS_URL_ACCESS | Whether to access s3:// URLs anonymously. | ++----------------------------------+----------------------------------------------------+ | TOIL_GOOGLE_PROJECTID | The Google project ID to use when generating | | | Google job store names for tests or CWL workflows. | +----------------------------------+----------------------------------------------------+ diff --git a/docs/running/cliOptions.rst b/docs/running/cliOptions.rst index bd47282aeb..62639334a4 100644 --- a/docs/running/cliOptions.rst +++ b/docs/running/cliOptions.rst @@ -311,6 +311,10 @@ Allows configuring Toil's data storage. of all the jobs reading from it at once, and you want to use ``--caching=True`` to make jobs on each node read from node-local cache storage. (Default=True) + --awsAnonymousUrlAccess BOOL + Whether to access AWS S3 URLs anonymously. Useful for + skipping multi-factor authentication when MFA is + configured but unnecessary. **Autoscaling Options** Allows the specification of the minimum and maximum number of nodes in an diff --git a/src/toil/batchSystems/abstractBatchSystem.py b/src/toil/batchSystems/abstractBatchSystem.py index 9ccee35371..fafdd61712 100644 --- a/src/toil/batchSystems/abstractBatchSystem.py +++ b/src/toil/batchSystems/abstractBatchSystem.py @@ -23,7 +23,7 @@ from threading import Condition from typing import Any, ContextManager, NamedTuple, Optional, Union, cast -from toil.batchSystems.options import OptionSetter +from toil.options import OptionSetter from toil.bus import MessageBus, MessageOutbox from toil.common import Config, Toil, cacheDirName from toil.deferred import DeferredFunctionManager @@ -282,16 +282,17 @@ def add_options(cls, parser: Union[ArgumentParser, _ArgumentGroup]) -> None: """ If this batch system provides any command line options, add them to the given parser. """ + pass @classmethod def setOptions(cls, setOption: OptionSetter) -> None: """ Process command line or configuration options relevant to this batch system. - :param setOption: A function with signature - setOption(option_name, parsing_function=None, check_function=None, default=None, env=None) - returning nothing, used to update run configuration as a side effect. + :param setOption: A function taking an option name and returning + nothing, used to update run configuration as a side effect. """ + pass def getWorkerContexts(self) -> list[ContextManager[Any]]: """ diff --git a/src/toil/batchSystems/awsBatch.py b/src/toil/batchSystems/awsBatch.py index 613cfd4105..edcfe6c3ec 100644 --- a/src/toil/batchSystems/awsBatch.py +++ b/src/toil/batchSystems/awsBatch.py @@ -48,7 +48,7 @@ ) from toil.batchSystems.cleanup_support import BatchSystemCleanupSupport from toil.batchSystems.contained_executor import pack_job -from toil.batchSystems.options import OptionSetter +from toil.options import OptionSetter from toil.bus import ExternalBatchIdMessage from toil.common import Config, Toil from toil.job import JobDescription, Requirer diff --git a/src/toil/batchSystems/kubernetes.py b/src/toil/batchSystems/kubernetes.py index daa7ea50ae..0a464d3a38 100644 --- a/src/toil/batchSystems/kubernetes.py +++ b/src/toil/batchSystems/kubernetes.py @@ -100,7 +100,7 @@ ) from toil.batchSystems.cleanup_support import BatchSystemCleanupSupport from toil.batchSystems.contained_executor import pack_job -from toil.batchSystems.options import OptionSetter +from toil.options import OptionSetter from toil.common import Config, Toil from toil.job import JobDescription, Requirer from toil.lib.conversions import human2bytes diff --git a/src/toil/batchSystems/mesos/batchSystem.py b/src/toil/batchSystems/mesos/batchSystem.py index 4869c763ac..4452bb53cf 100644 --- a/src/toil/batchSystems/mesos/batchSystem.py +++ b/src/toil/batchSystems/mesos/batchSystem.py @@ -39,7 +39,7 @@ ) from toil.batchSystems.local_support import BatchSystemLocalSupport from toil.batchSystems.mesos import JobQueue, MesosShape, TaskData, ToilJob -from toil.batchSystems.options import OptionSetter +from toil.options import OptionSetter from toil.job import JobDescription from toil.lib.conversions import b_to_mib, mib_to_b from toil.lib.memoize import strict_bool diff --git a/src/toil/batchSystems/options.py b/src/toil/batchSystems/options.py index 7ebc0a6ced..d63ba6f8d5 100644 --- a/src/toil/batchSystems/options.py +++ b/src/toil/batchSystems/options.py @@ -13,7 +13,7 @@ import logging from argparse import ArgumentParser, _ArgumentGroup -from typing import Any, Callable, Optional, Protocol, TypeVar, Union +from typing import Optional, Union from toil.batchSystems.registry import ( DEFAULT_BATCH_SYSTEM, @@ -21,32 +21,12 @@ get_batch_systems, ) from toil.lib.threading import cpu_count +# Need to re-export from here for compatibility with old batch system plugins +from toil.options import OptionSetter as OptionSetter logger = logging.getLogger(__name__) - -class OptionSetter(Protocol): - """ - Protocol for the setOption function we get to let us set up CLI options for - each batch system. - - Actual functionality is defined in the Config class. - """ - - OptionType = TypeVar("OptionType") - - def __call__( - self, - option_name: str, - parsing_function: Optional[Callable[[Any], OptionType]] = None, - check_function: Optional[Callable[[OptionType], Union[None, bool]]] = None, - default: Optional[OptionType] = None, - env: Optional[list[str]] = None, - old_names: Optional[list[str]] = None, - ) -> bool: ... - - -def set_batchsystem_options( +def set_batch_system_options( batch_system: Optional[str], set_option: OptionSetter ) -> None: """ @@ -80,7 +60,7 @@ def set_batchsystem_options( set_option("batch_logs_dir") -def add_all_batchsystem_options(parser: Union[ArgumentParser, _ArgumentGroup]) -> None: +def add_all_batch_system_options(parser: Union[ArgumentParser, _ArgumentGroup]) -> None: from toil.options.common import SYS_MAX_SIZE # Do the global cross-batch-system arguments diff --git a/src/toil/batchSystems/singleMachine.py b/src/toil/batchSystems/singleMachine.py index 711b8b5cb3..706cf1c623 100644 --- a/src/toil/batchSystems/singleMachine.py +++ b/src/toil/batchSystems/singleMachine.py @@ -35,7 +35,7 @@ ResourceSet, UpdatedBatchJobInfo, ) -from toil.batchSystems.options import OptionSetter +from toil.options import OptionSetter from toil.bus import ExternalBatchIdMessage from toil.common import Config, Toil from toil.job import ( diff --git a/src/toil/batchSystems/slurm.py b/src/toil/batchSystems/slurm.py index 3ddddff609..cdad79d3c1 100644 --- a/src/toil/batchSystems/slurm.py +++ b/src/toil/batchSystems/slurm.py @@ -29,7 +29,7 @@ from toil.batchSystems.abstractGridEngineBatchSystem import ( AbstractGridEngineBatchSystem, ) -from toil.batchSystems.options import OptionSetter +from toil.options import OptionSetter from toil.bus import get_job_kind from toil.common import Config from toil.job import JobDescription, Requirer diff --git a/src/toil/common.py b/src/toil/common.py index 3e5ea16f30..094fd40816 100644 --- a/src/toil/common.py +++ b/src/toil/common.py @@ -55,7 +55,8 @@ from ruamel.yaml.comments import CommentedMap from toil import logProcessContext, lookupEnvVar -from toil.batchSystems.options import set_batchsystem_options +from toil.batchSystems.options import set_batch_system_options +from toil.jobStores.options import set_job_store_options from toil.bus import ( ClusterDesiredSizeMessage, ClusterSizeMessage, @@ -72,6 +73,7 @@ from toil.lib.io import AtomicFileCreate, try_path from toil.lib.retry import retry from toil.lib.threading import ensure_filesystem_lockable +from toil.lib.url import URLAccess from toil.options.common import JOBSTORE_HELP, add_base_toil_options from toil.options.cwl import add_cwl_options from toil.options.runner import add_runner_options @@ -83,7 +85,7 @@ if TYPE_CHECKING: from toil.batchSystems.abstractBatchSystem import AbstractBatchSystem - from toil.batchSystems.options import OptionSetter + from toil.options import OptionSetter from toil.job import AcceleratorRequirement, Job, JobDescription, TemporaryID from toil.jobStores.abstractJobStore import AbstractJobStore from toil.provisioners.abstractProvisioner import AbstractProvisioner @@ -100,7 +102,7 @@ class Config: """Class to represent configuration operations for a toil workflow run.""" - + logFile: Optional[str] logRotating: bool cleanWorkDir: str @@ -332,9 +334,13 @@ def set_option(option_name: str, old_names: Optional[list[str]] = None) -> None: # Batch system options set_option("batchSystem") - set_batchsystem_options( - None, cast("OptionSetter", set_option) - ) # None as that will make set_batchsystem_options iterate through all batch systems and set their corresponding values + option_setter: OptionSetter = set_option + set_batch_system_options( + None, option_setter + ) # None as that will make set_batch_system_options iterate through all batch systems and set their corresponding values + + # Job store options + set_job_store_options(option_setter) # File store options set_option("symlinkImports", old_names=["linkImports"]) @@ -891,14 +897,13 @@ def getNodeID() -> str: return nodeID -class Toil(ContextManager["Toil"]): +class Toil(URLAccess, ContextManager["Toil"]): """ A context manager that represents a Toil workflow. Specifically the batch system, job store, and its configuration. """ - config: Config _jobStore: "AbstractJobStore" _batchSystem: "AbstractBatchSystem" _provisioner: Optional["AbstractProvisioner"] @@ -912,11 +917,22 @@ def __init__(self, options: Namespace) -> None: :param options: command line options specified by the user """ - super().__init__() + + # Set up the config so it can be gotten at outside the context manager, + # so workflow code can use command line flags before creating the job + # store storage. + set_logging_from_options(options) + config = Config() + config.setOptions(options) + logger.debug("Set up Toil with configuration: %s", vars(options)) + + # This will set self._config + super().__init__(config) self.options = options self._jobCache: dict[Union[str, "TemporaryID"], "JobDescription"] = {} self._inContextManager: bool = False self._inRestart: bool = False + def __enter__(self) -> "Toil": """ @@ -925,29 +941,25 @@ def __enter__(self) -> "Toil": Then load the job store and, on restart, consolidate the derived configuration with the one from the previous invocation of the workflow. """ - set_logging_from_options(self.options) - config = Config() - config.setOptions(self.options) - logger.debug("Loaded configuration: %s", vars(self.options)) - if config.jobStore is None: + + if self._config.jobStore is None: raise RuntimeError("No jobstore provided!") - jobStore = self.getJobStore(config.jobStore) - if config.caching is None: - config.caching = jobStore.default_caching() + jobStore = self.getJobStore(self._config.jobStore, self._config) + if self._config.caching is None: + self._config.caching = jobStore.default_caching() # Set the caching option because it wasn't set originally, resuming jobstore rebuilds config from CLI options - self.options.caching = config.caching + self.options.caching = self._config.caching - if not config.restart: - config.prepare_start() - jobStore.initialize(config) + if not self._config.restart: + self._config.prepare_start() + jobStore.initialize() else: jobStore.resume() # Merge configuration from job store with command line options - config = jobStore.config - config.prepare_restart() - config.setOptions(self.options) + self._config = jobStore._config + self._config.prepare_restart() + self._config.setOptions(self.options) jobStore.write_config() - self.config = config self._jobStore = jobStore self._inContextManager = True @@ -970,14 +982,14 @@ def __exit__( try: if ( exc_type is not None - and self.config.clean == "onError" + and self._config.clean == "onError" or exc_type is None - and self.config.clean == "onSuccess" - or self.config.clean == "always" + and self._config.clean == "onSuccess" + or self._config.clean == "always" ): try: - if self.config.restart and not self._inRestart: + if self._config.restart and not self._inRestart: pass else: self._jobStore.destroy() @@ -1025,13 +1037,13 @@ def start(self, rootJob: "Job") -> Any: self._jobStore.write_leader_pid() self._jobStore.write_leader_node_id() - if self.config.restart: + if self._config.restart: raise ToilRestartException( "A Toil workflow can only be started once. Use " "Toil.restart() to resume it." ) - self._batchSystem = self.createBatchSystem(self.config) + self._batchSystem = self.createBatchSystem(self._config) self._setupAutoDeployment(rootJob.getUserScript()) try: self._setBatchSystemEnvVars() @@ -1069,7 +1081,7 @@ def restart(self) -> Any: self._jobStore.write_leader_pid() self._jobStore.write_leader_node_id() - if not self.config.restart: + if not self._config.restart: raise ToilRestartException( "A Toil workflow must be initiated with Toil.start(), " "not restart()." ) @@ -1084,7 +1096,7 @@ def restart(self) -> Any: ) return self._jobStore.get_root_job_return_value() - self._batchSystem = self.createBatchSystem(self.config) + self._batchSystem = self.createBatchSystem(self._config) self._setupAutoDeployment() try: self._setBatchSystemEnvVars() @@ -1097,41 +1109,47 @@ def restart(self) -> Any: self._shutdownBatchSystem() def _setProvisioner(self) -> None: - if self.config.provisioner is None: + if self._config.provisioner is None: self._provisioner = None else: self._provisioner = cluster_factory( - provisioner=self.config.provisioner, + provisioner=self._config.provisioner, clusterName=None, zone=None, # read from instance meta-data - nodeStorage=self.config.nodeStorage, - nodeStorageOverrides=self.config.nodeStorageOverrides, - sseKey=self.config.sseKey, + nodeStorage=self._config.nodeStorage, + nodeStorageOverrides=self._config.nodeStorageOverrides, + sseKey=self._config.sseKey, ) - self._provisioner.setAutoscaledNodeTypes(self.config.nodeTypes) + self._provisioner.setAutoscaledNodeTypes(self._config.nodeTypes) @classmethod - def getJobStore(cls, locator: str) -> "AbstractJobStore": + def getJobStore(cls, locator: str, config: Optional[Config] = None) -> "AbstractJobStore": """ - Create an instance of the concrete job store implementation that matches the given locator. + Create an instance of the concrete job store implementation that + matches the given locator. - :param str locator: The location of the job store to be represent by the instance + Will use the provided config if given, or a default one otherwise. :return: an instance of a concrete subclass of AbstractJobStore """ + + if config is None: + # Fill in a default config. + config = Config() + name, rest = cls.parseLocator(locator) if name == "file": from toil.jobStores.fileJobStore import FileJobStore - return FileJobStore(rest) + return FileJobStore(rest, config) elif name == "aws": from toil.jobStores.aws.jobStore import AWSJobStore - return AWSJobStore(rest) + return AWSJobStore(rest, config) elif name == "google": from toil.jobStores.googleJobStore import GoogleJobStore - return GoogleJobStore(rest) + return GoogleJobStore(rest, config) else: raise RuntimeError("Unknown job store implementation '%s'" % name) @@ -1155,7 +1173,12 @@ def buildLocator(name: str, rest: str) -> str: @classmethod def resumeJobStore(cls, locator: str) -> "AbstractJobStore": + """ + Connect to and resume a job store from just its locator. + """ + # Connect to the job store with a temporary config jobStore = cls.getJobStore(locator) + # Replace the config by resuming jobStore.resume() return jobStore @@ -1221,7 +1244,7 @@ def _setupAutoDeployment( else: if ( self._batchSystem.supportsAutoDeployment() - and not self.config.disableAutoDeployment + and not self._config.disableAutoDeployment ): # Note that by saving the ModuleDescriptor, and not the Resource we allow for # redeploying a potentially modified user script on workflow restarts. @@ -1241,7 +1264,7 @@ def _setupAutoDeployment( # This branch is hit on restarts if ( self._batchSystem.supportsAutoDeployment() - and not self.config.disableAutoDeployment + and not self._config.disableAutoDeployment ): # We could deploy a user script from toil.jobStores.abstractJobStore import NoSuchFileException @@ -1399,7 +1422,7 @@ def normalize_uri(uri: str, check_existence: bool = False) -> str: def _setBatchSystemEnvVars(self) -> None: """Set the environment variables required by the job store and those passed on command line.""" - for envDict in (self._jobStore.get_env(), self.config.environment): + for envDict in (self._jobStore.get_env(), self._config.environment): for k, v in envDict.items(): self._batchSystem.setEnv(k, v) @@ -1613,7 +1636,7 @@ def _runMainLoop(self, rootJob: "JobDescription") -> Any: :param rootJob: The root job for the workflow. """ - logProcessContext(self.config) + logProcessContext(self._config) with RealtimeLogger( self._batchSystem, @@ -1623,7 +1646,7 @@ def _runMainLoop(self, rootJob: "JobDescription") -> Any: from toil.leader import Leader return Leader( - config=self.config, + config=self._config, batchSystem=self._batchSystem, provisioner=self._provisioner, jobStore=self._jobStore, diff --git a/src/toil/cwl/cwltoil.py b/src/toil/cwl/cwltoil.py index e13c97d116..19facec1cc 100644 --- a/src/toil/cwl/cwltoil.py +++ b/src/toil/cwl/cwltoil.py @@ -28,6 +28,7 @@ import json import logging import os +import pickle import pprint import shutil import stat @@ -137,7 +138,7 @@ FileMetadata, WorkerImportJob, ) -from toil.jobStores.abstractJobStore import ( +from toil.jobStores.abstractJobStore import (\ AbstractJobStore, NoSuchFileException, InvalidImportExportUrlException, @@ -148,6 +149,7 @@ from toil.jobStores.utils import JobStoreUnavailableException, generate_locator from toil.lib.io import mkdtemp from toil.lib.threading import ExceptionalThread, global_mutex +from toil.lib.url import URLAccess from toil.statsAndLogging import DEFAULT_LOGLEVEL logger = logging.getLogger(__name__) @@ -585,7 +587,7 @@ def eval_prep( # store for the workflow. In that case, no toilfile:// or # other special URIs will exist in the workflow to be read # from, and ToilFsAccess still supports file:// URIs. - fs_access = functools.partial(ToilFsAccess, file_store=file_store) + fs_access = functools.partial(ToilFsAccess, url_access=file_store.jobStore, file_store=file_store) with fs_access("").open(cast(str, val["location"]), "rb") as f: val["contents"] = cwltool.builder.content_limit_respected_read( f @@ -1293,7 +1295,8 @@ def encode_directory(contents: DirectoryContents) -> str: class ToilFsAccess(StdFsAccess): """ - Custom filesystem access class which handles toil filestore references. + Custom filesystem access class which handles toil filestore references and + other URLs. Normal file paths will be resolved relative to basedir, but 'toilfile:' and 'toildir:' URIs will be fulfilled from the Toil file store. @@ -1304,9 +1307,18 @@ class ToilFsAccess(StdFsAccess): def __init__( self, basedir: str, + url_access: URLAccess, file_store: Optional[AbstractFileStore] = None, ) -> None: - """Create a FsAccess object for the given Toil Filestore and basedir.""" + """ + Create a FsAccess object. + + :param bassdir: Directory for resolving relative paths. + :param url_access: Object to use to access URLs. + :param file_store: Toil FileStore to access toilfile: and toildir: + URLs. + """ + self.url_access = url_access self.file_store = file_store # Map encoded directory structures to where we downloaded them, so we @@ -1395,7 +1407,7 @@ def _abs(self, path: str) -> str: destination = path else: # The destination is something else. - if AbstractJobStore.get_is_directory(path): + if self.url_access.get_is_directory(path): # Treat this as a directory if path not in self.dir_to_download: logger.debug( @@ -1405,14 +1417,14 @@ def _abs(self, path: str) -> str: # Recursively fetch all the files in the directory. def download_to(url: str, dest: str) -> None: - if AbstractJobStore.get_is_directory(url): + if self.url_access.get_is_directory(url): os.mkdir(dest) - for part in AbstractJobStore.list_url(url): + for part in self.url_access.list_url(url): download_to( os.path.join(url, part), os.path.join(dest, part) ) else: - AbstractJobStore.read_from_url(url, open(dest, "wb")) + self.url_access.read_from_url(url, open(dest, "wb")) download_to(path, dest_dir) self.dir_to_download[path] = dest_dir @@ -1425,7 +1437,7 @@ def download_to(url: str, dest: str) -> None: # Try to grab it with a jobstore implementation, and save it # somewhere arbitrary. dest_file = NamedTemporaryFile(delete=False) - AbstractJobStore.read_from_url(path, dest_file) + self.url_access.read_from_url(path, dest_file) dest_file.close() self.dir_to_download[path] = dest_file.name destination = self.dir_to_download[path] @@ -1483,7 +1495,7 @@ def open(self, fn: str, mode: str) -> IO[Any]: return open(self._abs(fn), mode) else: # This should be supported by a job store. - byte_stream = AbstractJobStore.open_url(fn) + byte_stream = self.url_access.open_url(fn) if "b" in mode: # Pass stream along in binary return byte_stream @@ -1520,7 +1532,7 @@ def exists(self, path: str) -> bool: return True else: # This should be supported by a job store. - return AbstractJobStore.url_exists(path) + return self.url_access.url_exists(path) def size(self, path: str) -> int: parse = urlparse(path) @@ -1549,7 +1561,7 @@ def size(self, path: str) -> int: ) else: # This should be supported by a job store. - size = AbstractJobStore.get_size(path) + size = self.url_access.get_size(path) if size is None: # get_size can be unimplemented or unavailable raise RuntimeError(f"Could not get size of {path}") @@ -1572,7 +1584,7 @@ def isfile(self, fn: str) -> bool: # TODO: we assume CWL can't call deleteGlobalFile and so the file always exists return isinstance(found, str) else: - return self.exists(fn) and not AbstractJobStore.get_is_directory(fn) + return self.exists(fn) and not self.url_access.get_is_directory(fn) def isdir(self, fn: str) -> bool: logger.debug("ToilFsAccess checking type of %s", fn) @@ -1592,8 +1604,8 @@ def isdir(self, fn: str) -> bool: # TODO: We assume directories can't be deleted. return isinstance(found, dict) else: - status = AbstractJobStore.get_is_directory(fn) - logger.debug("AbstractJobStore said: %s", status) + status = self.url_access.get_is_directory(fn) + logger.debug("Job store said: %s", status) return status def listdir(self, fn: str) -> list[str]: @@ -1626,7 +1638,7 @@ def listdir(self, fn: str) -> list[str]: else: return [ os.path.join(fn, entry.rstrip("/")) - for entry in AbstractJobStore.list_url(fn) + for entry in self.url_access.list_url(fn) ] def join(self, path: str, *paths: str) -> str: @@ -1736,7 +1748,7 @@ def write_to_pipe( pipe.write(data) else: # Stream from some other URI - AbstractJobStore.read_from_url(uri, pipe) + file_store.jobStore.read_from_url(uri, pipe) except OSError as e: # The other side of the pipe may have been closed by the # reading thread, which is OK. @@ -1779,7 +1791,7 @@ def write_to_pipe( # Open that path exclusively to make sure we created it with open(src_path, "xb") as fh: # Download into the file - size, executable = AbstractJobStore.read_from_url(uri, fh) + size, executable = file_store.jobStore.read_from_url(uri, fh) if executable: # Set the execute bit in the file's permissions os.chmod(src_path, os.stat(src_path).st_mode | stat.S_IXUSR) @@ -2229,6 +2241,78 @@ def remove_empty_listings(rec: CWLObjectType) -> None: return +# CWL jobs all need a RuntimeContext, and it needs a make_fs_access function. +# But that needs access to the *current* Toil object/job store/config, which we +# can't pickle and bring around with us. So we drop it out of the runtime +# context when we pack it up in each job for pickling, and we put it back in +# when the job runs. +def pack_runtime_context(local_context: cwltool.context.RuntimeContext) -> cwltool.context.RuntimeContext: + """ + Prepare a CWL RuntimeContext for pickling. + """ + + stored_context = local_context.copy() + # Drop the FS access hook + stored_context.make_fs_access = StdFsAccess + + if hasattr(stored_context, "toil_get_file"): + # Drop the toil_get_file hook + delattr(stored_context, "toil_get_file") + + # Drop the path mapper hook + stored_context.path_mapper = PathMapper + + return stored_context + +def unpack_runtime_context(stored_context: cwltool.context.RuntimeContext, url_access: URLAccess, file_store: AbstractFileStore, pipe_threads: list[tuple[Thread, int]], index: dict[str, str], existing: dict[str, str]) -> cwltool.context.RuntimeContext: + """ + Set up a CWL RuntimeContext for the local machine. + + Always needs the file store (for temp files and streaming) even if it is + being bypassed for CWL file access. + + :param pipe_threads: used for keeping track of separate threads launched to + stream files around. Caller should close all the FDs and join all the + threads when done. + :param index: Used to track downloaded files for :meth:`visit_files`. + :param existing: Used to track downloaded files :meth:`visit_files`. + """ + + local_context = stored_context.copy() + + # Add the FS access hook, using the file store if we're not bypassing it + local_context.make_fs_access = make_fs_access = cast( + type[StdFsAccess], + functools.partial(ToilFsAccess, url_access=url_access, file_store=file_store if not getattr(local_context, "bypass_file_store", False) else None), + ) + + if not getattr(local_context, "bypass_file_store", False): + # Hook up the Toil file store access hooks. + + # When the tool makes its own PathMappers with make_path_mapper, they + # will come and grab this function for fetching files from the Toil + # file store. pipe_threads is used for keeping track of separate + # threads launched to stream files around. + setattr( + local_context, + "toil_get_file", + functools.partial( + toil_get_file, file_store, index, existing, pipe_threads=pipe_threads + ), + ) + + # Intercept file and directory access not already done through the + # tool's make_path_mapper(), and support the URIs we define for + # stuff in Toil's FileStore. We need to plug in a path_mapper type + # or factory function. + local_context.path_mapper = functools.partial( # type: ignore[assignment] + ToilPathMapper, + get_file=getattr(local_context, "toil_get_file"), + streaming_allowed=local_context.streaming_allowed, + ) + + return local_context + class CWLNamedJob(Job): """ Base class for all CWL jobs that do user work, to give them useful names. @@ -2523,7 +2607,7 @@ def __init__( ) self.cwltool = tool self.cwljob = cwljob - self.runtime_context = runtime_context + self.runtime_context = pack_runtime_context(runtime_context) self.conditional = conditional or Conditional() self.parent_name = parent_name @@ -2531,6 +2615,11 @@ def run(self, file_store: AbstractFileStore) -> Any: """Create a child job with the correct resource requirements set.""" cwljob = resolve_dict_w_promises(self.cwljob, file_store) + pipe_threads: list[tuple[Thread, int]] = [] + index: dict[str, str] = {} + existing: dict[str, str] = {} + runtime_context = unpack_runtime_context(self.runtime_context, file_store.jobStore, file_store, pipe_threads, index, existing) + # Check confitional to license full evaluation of job inputs. if self.conditional.is_false(cwljob): return self.conditional.skipped_outputs() @@ -2538,16 +2627,21 @@ def run(self, file_store: AbstractFileStore) -> Any: fill_in_defaults( self.cwltool.tool["inputs"], cwljob, - self.runtime_context.make_fs_access(self.runtime_context.basedir or ""), + runtime_context.make_fs_access(self.runtime_context.basedir or ""), ) # Don't forward the conditional. We checked it already. realjob = CWLJob( tool=self.cwltool, cwljob=cwljob, - runtime_context=self.runtime_context, + runtime_context=runtime_context, parent_name=self.parent_name, ) self.addChild(realjob) + + for t, fd in pipe_threads: + os.close(fd) + t.join() + return realjob.rv() @@ -2564,38 +2658,14 @@ def __init__( ): """Store the context for later execution.""" self.cwltool = tool + self.cwljob = cwljob self.conditional = conditional or Conditional() - if runtime_context.builder: - self.builder = runtime_context.builder - else: - self.builder = cwltool.builder.Builder( - job=cwljob, - files=[], - bindings=[], - schemaDefs={}, - names=Names(), - requirements=self.cwltool.requirements, - hints=[], - resources={}, - mutation_manager=runtime_context.mutation_manager, - formatgraph=tool.formatgraph, - make_fs_access=cast(type[StdFsAccess], runtime_context.make_fs_access), - fs_access=runtime_context.make_fs_access(""), - job_script_provider=runtime_context.job_script_provider, - timeout=runtime_context.eval_timeout, - debug=runtime_context.debug, - js_console=runtime_context.js_console, - force_docker_pull=False, - loadListing=determine_load_listing(tool), - outdir="", - tmpdir=DEFAULT_TMPDIR, - stagedir="/var/lib/cwl", # TODO: use actual defaults here - cwlVersion=cast(str, self.cwltool.metadata["cwlVersion"]), - container_engine=get_container_engine(runtime_context), - ) + # We need the builder, but it needs an fs_access, so we can't + # make one and send it anywhere. + builder = self.get_builder(runtime_context) - req = tool.evalResources(self.builder, runtime_context) + req = tool.evalResources(builder, runtime_context) tool_own_resources = tool.get_requirement("ResourceRequirement")[0] or {} if "ramMin" in tool_own_resources or "ramMax" in tool_own_resources: @@ -2684,24 +2754,62 @@ def __init__( local=isinstance(tool, cwltool.command_line_tool.ExpressionTool), ) - self.cwljob = cwljob - self.runtime_context = runtime_context + self.runtime_context = pack_runtime_context(runtime_context) self.step_inputs = self.cwltool.tool["inputs"] self.workdir: str = runtime_context.workdir # type: ignore[attr-defined] - def required_env_vars(self, cwljob: Any) -> Iterator[tuple[str, str]]: + def get_builder(self, runtime_context: cwltool.context.RuntimeContext) -> cwltool.builder.Builder: + """ + Get a Builder for the job from the given runtime context. + + Requires that self.cwltool and self.cwljob are set. + + The builder may not leave the node. + """ + + if runtime_context.builder: + return runtime_context.builder + else: + return cwltool.builder.Builder( + job=self.cwljob, + files=[], + bindings=[], + schemaDefs={}, + names=Names(), + requirements=self.cwltool.requirements, + hints=[], + resources={}, + mutation_manager=runtime_context.mutation_manager, + formatgraph=self.cwltool.formatgraph, + make_fs_access=cast(type[StdFsAccess], runtime_context.make_fs_access), + fs_access=runtime_context.make_fs_access(""), + job_script_provider=runtime_context.job_script_provider, + timeout=runtime_context.eval_timeout, + debug=runtime_context.debug, + js_console=runtime_context.js_console, + force_docker_pull=False, + loadListing=determine_load_listing(self.cwltool), + outdir="", + tmpdir=DEFAULT_TMPDIR, + stagedir="/var/lib/cwl", # TODO: use actual defaults here + cwlVersion=cast(str, self.cwltool.metadata["cwlVersion"]), + container_engine=get_container_engine(runtime_context), + ) + + + def required_env_vars(self, cwljob: Any, builder: cwltool.builder.Builder) -> Iterator[tuple[str, str]]: """Yield environment variables from EnvVarRequirement.""" if isinstance(cwljob, dict): if cwljob.get("class") == "EnvVarRequirement": for t in cwljob.get("envDef", {}): - yield t["envName"], cast(str, self.builder.do_eval(t["envValue"])) + yield t["envName"], cast(str, builder.do_eval(t["envValue"])) for v in cwljob.values(): - yield from self.required_env_vars(v) + yield from self.required_env_vars(v, builder) if isinstance(cwljob, list): for env_var in cwljob: - yield from self.required_env_vars(env_var) + yield from self.required_env_vars(env_var, builder) - def populate_env_vars(self, cwljob: CWLObjectType) -> dict[str, str]: + def populate_env_vars(self, cwljob: CWLObjectType, builder: cwltool.builder.Builder) -> dict[str, str]: """ Prepare environment variables necessary at runtime for the job. @@ -2713,10 +2821,10 @@ def populate_env_vars(self, cwljob: CWLObjectType) -> dict[str, str]: same name and replaces their value with that found in the "EnvVarRequirement" env var if it exists. """ - self.builder.job = cwljob + builder.job = cwljob required_env_vars = {} # iterate over EnvVarRequirement env vars, if any - for k, v in self.required_env_vars(cwljob): + for k, v in self.required_env_vars(cwljob, builder): required_env_vars[k] = ( v # will tell cwltool which env vars to take from the environment ) @@ -2746,16 +2854,23 @@ def run(self, file_store: AbstractFileStore) -> Any: logger.debug("Loaded order:\n%s", self.cwljob) + pipe_threads: list[tuple[Thread, int]] = [] + index: dict[str, str] = {} + existing: dict[str, str] = {} + runtime_context = unpack_runtime_context(self.runtime_context, file_store.jobStore, file_store, pipe_threads, index, existing) + + builder = self.get_builder(runtime_context) + cwljob = resolve_dict_w_promises(self.cwljob, file_store) if self.conditional.is_false(cwljob): return self.conditional.skipped_outputs() fill_in_defaults( - self.step_inputs, cwljob, self.runtime_context.make_fs_access("") + self.step_inputs, cwljob, runtime_context.make_fs_access("") ) - required_env_vars = self.populate_env_vars(cwljob) + required_env_vars = self.populate_env_vars(cwljob, builder) immobile_cwljob_dict = copy.deepcopy(cwljob) for inp_id in immobile_cwljob_dict.keys(): @@ -2776,28 +2891,10 @@ def run(self, file_store: AbstractFileStore) -> Any: functools.partial(remove_empty_listings), ) - index: dict[str, str] = {} - existing: dict[str, str] = {} - # Prepare the run instructions for cwltool - runtime_context = self.runtime_context.copy() - runtime_context.basedir = os.getcwd() runtime_context.preserve_environment = required_env_vars - # When the tool makes its own PathMappers with make_path_mapper, they - # will come and grab this function for fetching files from the Toil - # file store. pipe_threads is used for keeping track of separate - # threads launched to stream files around. - pipe_threads: list[tuple[Thread, int]] = [] - setattr( - runtime_context, - "toil_get_file", - functools.partial( - toil_get_file, file_store, index, existing, pipe_threads=pipe_threads - ), - ) - if not getattr(runtime_context, "bypass_file_store", False): # Exports temporary directory for batch systems that reset TMPDIR os.environ["TMPDIR"] = os.path.realpath(file_store.getLocalTempDir()) @@ -2817,22 +2914,6 @@ def run(self, file_store: AbstractFileStore) -> Any: runtime_context.tmpdir_prefix = file_store.getLocalTempDir() - # Intercept file and directory access not already done through the - # tool's make_path_mapper(), and support the URIs we define for - # stuff in Toil's FileStore. We need to plug in a make_fs_access - # function and a path_mapper type or factory function. - - runtime_context.make_fs_access = cast( - type[StdFsAccess], - functools.partial(ToilFsAccess, file_store=file_store), - ) - - runtime_context.path_mapper = functools.partial( # type: ignore[assignment] - ToilPathMapper, - get_file=getattr(runtime_context, "toil_get_file"), - streaming_allowed=runtime_context.streaming_allowed, - ) - # Collect standard output and standard error somewhere if they don't go to files. # We need to keep two FDs to these because cwltool will close what we give it. default_stdout = TemporaryFile() @@ -2961,10 +3042,9 @@ def makeRootJob( Actually creates what might be a subgraph of two jobs. The second of which may be the follow on of the first. If only one job is created, it is returned twice. - :return: """ if options.run_imports_on_workers: - filenames = extract_workflow_inputs(options, initialized_job_order, tool) + filenames = extract_workflow_inputs(options, initialized_job_order, tool, toil) metadata = get_file_sizes( filenames, toil._jobStore, include_remote_files=options.reference_inputs ) @@ -2993,6 +3073,7 @@ def makeRootJob( tool, path_to_fileid, options.basedir, + toil._jobStore, options.reference_inputs, options.bypass_file_store, ) @@ -3008,6 +3089,7 @@ def makeRootJob( initialized_job_order=initialized_job_order, tool=tool, ) + root_job, followOn = makeJob( tool, jobobj, runtime_context, None, None ) # toplevel, no name needed @@ -3104,7 +3186,7 @@ def __init__( super().__init__(cores=1, memory="1GiB", disk="1MiB", local=True) self.step = step self.cwljob = cwljob - self.runtime_context = runtime_context + self.runtime_context = pack_runtime_context(runtime_context) self.conditional = conditional self.parent_name = parent_name @@ -3175,6 +3257,11 @@ def run(self, file_store: AbstractFileStore) -> list[Promised[CWLObjectType]]: """Generate the follow on scatter jobs.""" cwljob = resolve_dict_w_promises(self.cwljob, file_store) + pipe_threads: list[tuple[Thread, int]] = [] + index: dict[str, str] = {} + existing: dict[str, str] = {} + runtime_context = unpack_runtime_context(self.runtime_context, file_store.jobStore, file_store, pipe_threads, index, existing) + if isinstance(self.step.tool["scatter"], str): scatter = [self.step.tool["scatter"]] else: @@ -3206,7 +3293,7 @@ def valueFromFunc(k: str, v: Any) -> Any: None, {}, context=v, - container_engine=get_container_engine(self.runtime_context), + container_engine=get_container_engine(runtime_context), ) else: return v @@ -3224,7 +3311,7 @@ def valueFromFunc(k: str, v: Any) -> Any: subjob, follow_on = makeJob( tool=self.step.embedded_tool, jobobj=copyjob, - runtime_context=self.runtime_context, + runtime_context=runtime_context, parent_name=f"{self.parent_name}.{i}", conditional=self.conditional, ) @@ -3243,6 +3330,10 @@ def valueFromFunc(k: str, v: Any) -> Any: raise ValidationException( "Must provide scatterMethod to scatter over multiple inputs." ) + + for t, fd in pipe_threads: + os.close(fd) + t.join() return outputs @@ -3373,7 +3464,7 @@ def __init__( ) self.cwlwf = cwlwf self.cwljob = cwljob - self.runtime_context = runtime_context + self.runtime_context = pack_runtime_context(runtime_context) self.conditional = conditional or Conditional() def run( @@ -3390,8 +3481,13 @@ def run( if self.conditional.is_false(cwljob): return self.conditional.skipped_outputs() + pipe_threads: list[tuple[Thread, int]] = [] + index: dict[str, str] = {} + existing: dict[str, str] = {} + runtime_context = unpack_runtime_context(self.runtime_context, file_store.jobStore, file_store, pipe_threads, index, existing) + # Apply default values set in the workflow - fs_access = ToilFsAccess(self.runtime_context.basedir, file_store=file_store) + fs_access = ToilFsAccess(runtime_context.basedir, url_access=file_store.jobStore, file_store=file_store) fill_in_defaults(self.cwlwf.tool["inputs"], cwljob, fs_access) # `promises` dict @@ -3453,7 +3549,7 @@ def run( inp["valueFrom"], jobobj.get(key, JustAValue(None)), self.cwlwf.requirements, - get_container_engine(self.runtime_context), + get_container_engine(runtime_context), ) logger.debug( @@ -3464,7 +3560,7 @@ def run( expression=step.tool.get("when"), outputs=step.tool["out"], requirements=self.cwlwf.requirements, - container_engine=get_container_engine(self.runtime_context), + container_engine=get_container_engine(runtime_context), ) if "scatter" in step.tool: @@ -3473,7 +3569,7 @@ def run( ] = CWLScatter( step, UnresolvedDict(jobobj), - self.runtime_context, + runtime_context, parent_name=parent_name, conditional=conditional, ) @@ -3490,7 +3586,7 @@ def run( wfjob, followOn = makeJob( tool=step.embedded_tool, jobobj=UnresolvedDict(jobobj), - runtime_context=self.runtime_context, + runtime_context=runtime_context, parent_name=f"{parent_name}.{shortname(step_id)}", conditional=conditional, ) @@ -3559,6 +3655,10 @@ def run( promises=promises, ) + for t, fd in pipe_threads: + os.close(fd) + t.join() + return UnresolvedDict(outobj) @@ -3593,6 +3693,7 @@ def fill_in_files( tool: Process, candidate_to_fileid: dict[str, FileID], basedir: str, + url_access: URLAccess, skip_remote: bool, bypass_file_store: bool, ) -> tuple[Process, CWLObjectType]: @@ -3608,7 +3709,7 @@ def fill_in_file(filename: str) -> FileID: file_convert_function = functools.partial( extract_and_convert_file_to_toil_uri, fill_in_file ) - fs_access = ToilFsAccess(basedir) + fs_access = ToilFsAccess(basedir, url_access) fileindex: dict[str, str] = {} existing: dict[str, str] = {} visit_files( @@ -3659,6 +3760,7 @@ def run(self, file_store: AbstractFileStore) -> Tuple[Process, CWLObjectType]: tool, candidate_to_fileid, self.basedir, + file_store.jobStore, self.skip_remote, self.bypass_file_store, ) @@ -3683,23 +3785,36 @@ def __init__( super().__init__(local=False, disk=options.import_workers_threshold) self.initialized_job_order = initialized_job_order self.tool = tool - self.options = options - self.runtime_context = runtime_context + + # Parse options now to avoid sending options + self.import_workers_threshold = options.import_workers_threshold + self.import_workers_disk = options.import_workers_disk + self.basedir = options.basedir + self.skip_remote = options.reference_inputs + self.bypass_file_store = options.bypass_file_store + + self.runtime_context = pack_runtime_context(runtime_context) self.file_to_data = file_to_data def run(self, file_store: AbstractFileStore) -> Any: + + pipe_threads: list[tuple[Thread, int]] = [] + index: dict[str, str] = {} + existing: dict[str, str] = {} + runtime_context = unpack_runtime_context(self.runtime_context, file_store.jobStore, file_store, pipe_threads, index, existing) + imports_job = ImportsJob( self.file_to_data, - self.options.import_workers_threshold, - self.options.import_workers_disk, + self.import_workers_threshold, + self.import_workers_disk, ) self.addChild(imports_job) install_imports_job = CWLInstallImportsJob( initialized_job_order=self.initialized_job_order, tool=self.tool, - basedir=self.options.basedir, - skip_remote=self.options.reference_inputs, - bypass_file_store=self.options.bypass_file_store, + basedir=self.basedir, + skip_remote=self.skip_remote, + bypass_file_store=self.bypass_file_store, import_data=imports_job.rv(0), ) self.addChild(install_imports_job) @@ -3708,11 +3823,15 @@ def run(self, file_store: AbstractFileStore) -> Any: start_job = CWLStartJob( install_imports_job.rv(0), install_imports_job.rv(1), - runtime_context=self.runtime_context, + runtime_context=runtime_context, ) self.addChild(start_job) install_imports_job.addFollowOn(start_job) + for t, fd in pipe_threads: + os.close(fd) + t.join() + return start_job.rv() @@ -3734,21 +3853,32 @@ def __init__( super().__init__(**kwargs) self.tool = tool self.initialized_job_order = initialized_job_order - self.runtime_context = runtime_context + self.runtime_context = pack_runtime_context(runtime_context) def run(self, file_store: AbstractFileStore) -> Any: initialized_job_order = unwrap(self.initialized_job_order) + + pipe_threads: list[tuple[Thread, int]] = [] + index: dict[str, str] = {} + existing: dict[str, str] = {} + runtime_context = unpack_runtime_context(self.runtime_context, file_store.jobStore, file_store, pipe_threads, index, existing) + tool = unwrap(self.tool) cwljob, _ = makeJob( - tool, initialized_job_order, self.runtime_context, None, None + tool, initialized_job_order, runtime_context, None, None ) # toplevel, no name needed cwljob.cwljob = initialized_job_order self.addChild(cwljob) + + for t, fd in pipe_threads: + os.close(fd) + t.join() + return cwljob.rv() def extract_workflow_inputs( - options: Namespace, initialized_job_order: CWLObjectType, tool: Process + options: Namespace, initialized_job_order: CWLObjectType, tool: Process, toil: Toil ) -> list[str]: """ Collect all the workflow input files to import later. @@ -3762,7 +3892,7 @@ def extract_workflow_inputs( # Extract out all the input files' filenames logger.info("Collecting input files...") - fs_access = ToilFsAccess(options.basedir) + fs_access = ToilFsAccess(options.basedir, toil._jobStore) filenames = visit_files( extract_file_uri_once, fs_access, @@ -3825,7 +3955,7 @@ def file_import_function(url: str) -> FileID: # Import all the input files, some of which may be missing optional # files. logger.info("Importing input files...") - fs_access = ToilFsAccess(options.basedir) + fs_access = ToilFsAccess(options.basedir, jobstore) visit_files( import_function, fs_access, @@ -4268,25 +4398,39 @@ def main(args: Optional[list[str]] = None, stdout: TextIO = sys.stdout) -> int: # We use the file store to write to buckets, so we can't do this (yet?) logger.error("Cannot export outputs to a bucket when bypassing the file store") return 1 + if options.reference_inputs and options.bypass_file_store: + # We can't do both of these at the same time. + logger.error("Cannot reference inputs when bypassing the file store") + return 1 + + + # Now options are fully ready for Toil. Any mutations to the options from + # here on are only for CWL workflow setup. We can start using them to make + # things. + + # Maker the Toil context manager object, which also knows how to access URLs. + toil_object = Toil(options) + if not options.bypass_file_store: # If we're using Toil's filesystem wrappers and the ability to access # URLs implemented by Toil, we need to hook up our own StdFsAccess # replacement early, before we try and set up the main CWL document. # Otherwise, if it takes a File with loadContents from a URL, we won't # be able to load the contents when we need to. - runtime_context.make_fs_access = ToilFsAccess - if options.reference_inputs and options.bypass_file_store: - # We can't do both of these at the same time. - logger.error("Cannot reference inputs when bypassing the file store") - return 1 + # + # And it depends on Toil's URL access machinery, so it needs the Toil object. + runtime_context.make_fs_access = cast(type[StdFsAccess], functools.partial(ToilFsAccess, url_access=toil_object)) + # Set up the CWL loading context loading_context = cwltool.main.setup_loadingContext(None, runtime_context, options) if options.provenance: + # Set up CWL provenance research_obj = cwltool.cwlprov.ro.ResearchObject( temp_prefix_ro=tmp_outdir_prefix, orcid=options.orcid, full_name=options.cwl_full_name, + # TODO: Won't this be un-pickle-able? fsaccess=runtime_context.make_fs_access(""), ) runtime_context.research_obj = research_obj @@ -4294,11 +4438,9 @@ def main(args: Optional[list[str]] = None, stdout: TextIO = sys.stdout) -> int: try: if not options.restart: - # Make a version of the config based on the initial options, for - # setting up CWL option stuff - expected_config = Config() - expected_config.setOptions(options) - + # We have the initial Toil config already and we can use it to set + # more options for cwltool. + # Before showing the options to any cwltool stuff that wants to # load the workflow, transform options.cwltool, where our # argument for what to run is, to handle Dockstore workflows. @@ -4309,10 +4451,10 @@ def main(args: Optional[list[str]] = None, stdout: TextIO = sys.stdout) -> int: loading_context.hints = [ { "class": "ResourceRequirement", - "coresMin": expected_config.defaultCores, + "coresMin": toil_object._config.defaultCores, # Don't include any RAM requirement because we want to # know when tools don't manually ask for RAM. - "outdirMin": expected_config.defaultDisk / (2**20), + "outdirMin": toil_object._config.defaultDisk / (2**20), "tmpdirMin": 0, } ] @@ -4337,7 +4479,7 @@ def main(args: Optional[list[str]] = None, stdout: TextIO = sys.stdout) -> int: # Attempt to prepull the containers if not options.no_prepull and not options.no_container: - try_prepull(uri, runtime_context, expected_config.batchSystem) + try_prepull(uri, runtime_context, toil_object._config.batchSystem) options.tool_help = None options.debug = options.logLevel == "DEBUG" @@ -4474,7 +4616,7 @@ def main(args: Optional[list[str]] = None, stdout: TextIO = sys.stdout) -> int: logger.debug("Root tool: %s", tool) tool = remove_pickle_problems(tool) - with Toil(options) as toil: + with toil_object as toil: if options.restart: outobj = toil.restart() else: diff --git a/src/toil/cwl/utils.py b/src/toil/cwl/utils.py index beac3e1c49..28ea9d745c 100644 --- a/src/toil/cwl/utils.py +++ b/src/toil/cwl/utils.py @@ -164,7 +164,7 @@ def download_structure( Guaranteed to fill the structure with real files, and not symlinks out of it to elsewhere. File URIs may be toilfile: URIs or any other URI that - Toil's job store system can read. + Toil can read. :param file_store: The Toil file store to download from. @@ -178,6 +178,10 @@ def download_structure( :param into_dir: The directory to download the top-level dict's files into. """ + + # TODO: Have a good way to get a URL access straight from a FileStore + # without dealing with the internal JobStore. + logger.debug("Downloading directory with %s items", len(dir_dict)) for name, value in dir_dict.items(): @@ -208,7 +212,7 @@ def download_structure( ) else: # We need to download from some other kind of URL. - size, executable = AbstractJobStore.read_from_url( + size, executable = file_store.jobStore.read_from_url( value, open(dest_path, "wb") ) if executable: diff --git a/src/toil/fileStores/abstractFileStore.py b/src/toil/fileStores/abstractFileStore.py index 6242dcefb0..19bc7082dc 100644 --- a/src/toil/fileStores/abstractFileStore.py +++ b/src/toil/fileStores/abstractFileStore.py @@ -131,6 +131,12 @@ def __init__( # Holds total bytes of observed disk usage for the last job run under open() self._job_disk_used: Optional[int] = None + def __getstate__(self) -> None: + """ + Make sure file stores cannot themselves be pickled. + """ + raise TypeError("Attempted to pickle file store implementation") + @staticmethod def createFileStore( jobStore: AbstractJobStore, diff --git a/src/toil/job.py b/src/toil/job.py index 27600ddd7e..b5ed0928be 100644 --- a/src/toil/job.py +++ b/src/toil/job.py @@ -3025,10 +3025,16 @@ def _saveJobGraph( serviceJob = self._registry[serviceID] logger.debug("Saving service %s", serviceJob.description) # Pickle the service body, which triggers all the promise stuff - serviceJob.saveBody(jobStore) + try: + serviceJob.saveBody(jobStore) + except Exception as e: + raise RuntimeError("Could not save body of service " + str(serviceJob.description)) from e if job != self or saveSelf: # Now pickle the job itself - job.saveBody(jobStore) + try: + job.saveBody(jobStore) + except Exception as e: + raise RuntimeError("Could not save body of " + str(job.description)) from e # Now that the job data is on disk, commit the JobDescriptions in # reverse execution order, in a batch if supported. diff --git a/src/toil/jobStores/abstractJobStore.py b/src/toil/jobStores/abstractJobStore.py index 6e5f4a3b15..b8ec792242 100644 --- a/src/toil/jobStores/abstractJobStore.py +++ b/src/toil/jobStores/abstractJobStore.py @@ -17,6 +17,7 @@ import re import shutil from abc import ABC, ABCMeta, abstractmethod +from argparse import ArgumentParser, _ArgumentGroup from collections.abc import Iterator, ValuesView from contextlib import closing, contextmanager from datetime import timedelta @@ -47,11 +48,12 @@ ServiceJobDescription, ) from toil.lib.ftp_utils import FtpFsAccess +from toil.lib.url import URLAccess, AbstractURLProtocolImplementation from toil.lib.compatibility import deprecated -from toil.lib.exceptions import UnimplementedURLException from toil.lib.io import WriteWatchingStream from toil.lib.memoize import memoize from toil.lib.retry import ErrorCondition, retry +from toil.options import OptionSetter if TYPE_CHECKING: from toil.job import TemporaryID @@ -161,7 +163,8 @@ def __init__(self, locator: str, prefix: str): ) -class AbstractJobStore(ABC): + +class AbstractJobStore(URLAccess, ABC): """ Represents the physical storage for the jobs and files in a Toil workflow. @@ -176,7 +179,7 @@ class AbstractJobStore(ABC): :meth:`toil.job.Job.loadJob` with a JobStore and the relevant JobDescription. """ - def __init__(self, locator: str) -> None: + def __init__(self, locator: str, config: Config) -> None: """ Create an instance of the job store. @@ -185,29 +188,31 @@ def __init__(self, locator: str) -> None: be invoked on the object with or without prior invocation of either of these two methods. - Takes and stores the locator string for the job store, which will be - accessible via self.locator. + :param locator: the locator string for the job store, which will be + accessible via self.locator. + + :param config: the Toil configuration to use. May be used to initialize + the job store with :meth:`.initialize`, or replaced with a stored + config with :meth:`.resume`. Will be accessible via self.config. """ - self.__locator = locator + self._locator = locator + super().__init__(config) - def initialize(self, config: Config) -> None: + def initialize(self) -> None: """ Initialize this job store. Create the physical storage for this job store, allocate a workflow ID and persist the given Toil configuration to the store. - :param config: the Toil configuration to initialize this job store with. - The given configuration will be updated with the newly - allocated workflow ID. + Updates the configuration with a newly allocated workflow ID. :raises JobStoreExistsException: if the physical storage for this job store already exists """ - assert config.workflowID is None - config.workflowID = str(uuid4()) - logger.debug("The workflow ID is: '%s'" % config.workflowID) - self.__config = config + assert self._config.workflowID is None + self._config.workflowID = str(uuid4()) + logger.debug("The workflow ID is: '%s'" % self._config.workflowID) self.write_config() @deprecated(new_function_name="write_config") @@ -222,7 +227,7 @@ def write_config(self) -> None: with self.write_shared_file_stream( "config.pickle", encrypted=False ) as fileHandle: - pickle.dump(self.__config, fileHandle, pickle.HIGHEST_PROTOCOL) + pickle.dump(self._config, fileHandle, pickle.HIGHEST_PROTOCOL) def resume(self) -> None: """ @@ -234,12 +239,12 @@ def resume(self) -> None: with self.read_shared_file_stream("config.pickle") as fileHandle: config = safeUnpickleFromStream(fileHandle) assert config.workflowID is not None - self.__config = config + self._config = config @property def config(self) -> Config: """Return the Toil configuration associated with this job store.""" - return self.__config + return self._config @property def locator(self) -> str: @@ -247,7 +252,7 @@ def locator(self) -> str: Get the locator that defines the job store, which can be used to connect to it. """ - return self.__locator + return self._locator rootJobStoreIDFileName = "rootJobStoreID" @@ -324,17 +329,22 @@ def get_root_job_return_value(self) -> Any: @staticmethod @memoize - def _get_job_store_classes() -> list["AbstractJobStore"]: + def _get_job_store_classes() -> list[type["AbstractJobStore"]]: """ A list of concrete AbstractJobStore implementations whose dependencies are installed. :rtype: List[AbstractJobStore] """ + # TODO: Although this method is private from users of job stores, it is + # shared with jobStores/options.py. + + # TODO: Replace with a real plugin/registry system like for batch + # systems. + jobStoreClassNames = ( "toil.jobStores.fileJobStore.FileJobStore", "toil.jobStores.googleJobStore.GoogleJobStore", "toil.jobStores.aws.jobStore.AWSJobStore", - "toil.jobStores.abstractJobStore.JobStoreSupport", ) jobStoreClasses = [] for className in jobStoreClassNames: @@ -355,22 +365,25 @@ def _get_job_store_classes() -> list["AbstractJobStore"]: return jobStoreClasses @classmethod - def _findJobStoreForUrl( - cls, url: ParseResult, export: bool = False - ) -> "AbstractJobStore": + def add_options(cls, parser: Union[ArgumentParser, _ArgumentGroup]) -> None: + """ + If this job store provides any command line options, add them to the given parser. """ - Returns the AbstractJobStore subclass that supports the given URL. - :param ParseResult url: The given URL + # By default, job store implementations have no CLI options. + pass - :param bool export: Determines if the url is supported for exporting + @classmethod + def set_options(cls, set_option: OptionSetter) -> None: + """ + Process command line or configuration options relevant to this job store. - :rtype: toil.jobStore.AbstractJobStore + :param set_option: A function taking an option name and returning + nothing, used to update run configuration as a side effect. """ - for implementation in cls._get_job_store_classes(): - if implementation._supports_url(url, export): - return implementation - raise UnimplementedURLException(url, "export" if export else "import") + + # By default, job store implementations have no CLI options. + pass # Importing a file with a shared file name returns None, but without one it # returns a file ID. Explain this to MyPy. @@ -464,7 +477,7 @@ def import_file( # optimizations that circumvent this, the _import_file method should be overridden by # subclasses of AbstractJobStore. parseResult = urlparse(src_uri) - otherCls = self._findJobStoreForUrl(parseResult) + otherCls = AbstractURLProtocolImplementation.find_url_implementation(parseResult) logger.info("Importing input %s...", src_uri) return self._import_file( otherCls, @@ -476,7 +489,7 @@ def import_file( def _import_file( self, - otherCls: "AbstractJobStore", + otherCls: type[AbstractURLProtocolImplementation], uri: ParseResult, shared_file_name: Optional[str] = None, hardlink: bool = False, @@ -490,8 +503,7 @@ def _import_file( Raises FileNotFoundError if the file does not exist. - :param AbstractJobStore otherCls: The concrete subclass of AbstractJobStore that supports - reading from the given URL and getting the file size from the URL. + :param otherCls: The type that implements accessing this kind of URL. :param ParseResult uri: The location of the file to import. @@ -503,12 +515,12 @@ def _import_file( if shared_file_name is None: with self.write_file_stream() as (writable, jobStoreFileID): - size, executable = otherCls._read_from_url(uri, writable) + size, executable = otherCls._read_from_url(uri, writable, self._config) return FileID(jobStoreFileID, size, executable) else: self._requireValidSharedFileName(shared_file_name) with self.write_shared_file_stream(shared_file_name) as writable: - otherCls._read_from_url(uri, writable) + otherCls._read_from_url(uri, writable, self._config) return None @deprecated(new_function_name="export_file") @@ -535,19 +547,16 @@ def export_file(self, file_id: FileID, dst_uri: str) -> None: from toil.common import Toil dst_uri = Toil.normalize_uri(dst_uri) parseResult = urlparse(dst_uri) - otherCls = self._findJobStoreForUrl(parseResult, export=True) + otherCls = AbstractURLProtocolImplementation.find_url_implementation(parseResult, export=True) self._export_file(otherCls, file_id, parseResult) def _export_file( - self, otherCls: "AbstractJobStore", jobStoreFileID: FileID, url: ParseResult + self, otherCls: type[AbstractURLProtocolImplementation], jobStoreFileID: FileID, url: ParseResult ) -> None: """ Refer to exportFile docstring for information about this method. - :param AbstractJobStore otherCls: The concrete subclass of AbstractJobStore that supports - exporting to the given URL. Note that the type annotation here is not completely - accurate. This is not an instance, it's a class, but there is no way to reflect - that in :pep:`484` type hints. + :param otherCls: The type that implements accessing this kind of URL. :param str jobStoreFileID: The id of the file that will be exported. @@ -556,15 +565,12 @@ def _export_file( self._default_export_file(otherCls, jobStoreFileID, url) def _default_export_file( - self, otherCls: "AbstractJobStore", jobStoreFileID: FileID, url: ParseResult + self, otherCls: type[AbstractURLProtocolImplementation], jobStoreFileID: FileID, url: ParseResult ) -> None: """ Refer to exportFile docstring for information about this method. - :param AbstractJobStore otherCls: The concrete subclass of AbstractJobStore that supports - exporting to the given URL. Note that the type annotation here is not completely - accurate. This is not an instance, it's a class, but there is no way to reflect - that in :pep:`484` type hints. + :param otherCls: The type that implements accessing this kind of URL. :param str jobStoreFileID: The id of the file that will be exported. @@ -574,218 +580,7 @@ def _default_export_file( with self.read_file_stream(jobStoreFileID) as readable: if getattr(jobStoreFileID, "executable", False): executable = jobStoreFileID.executable - otherCls._write_to_url(readable, url, executable) - - @classmethod - def url_exists(cls, src_uri: str) -> bool: - """ - Return True if the file at the given URI exists, and False otherwise. - - May raise an error if file existence cannot be determined. - - :param src_uri: URL that points to a file or object in the storage - mechanism of a supported URL scheme e.g. a blob in an AWS s3 bucket. - """ - parseResult = urlparse(src_uri) - otherCls = cls._findJobStoreForUrl(parseResult) - return otherCls._url_exists(parseResult) - - @classmethod - def get_size(cls, src_uri: str) -> Optional[int]: - """ - Get the size in bytes of the file at the given URL, or None if it cannot be obtained. - - :param src_uri: URL that points to a file or object in the storage - mechanism of a supported URL scheme e.g. a blob in an AWS s3 bucket. - """ - parseResult = urlparse(src_uri) - otherCls = cls._findJobStoreForUrl(parseResult) - return otherCls._get_size(parseResult) - - @classmethod - def get_is_directory(cls, src_uri: str) -> bool: - """ - Return True if the thing at the given URL is a directory, and False if - it is a file. The URL may or may not end in '/'. - """ - parseResult = urlparse(src_uri) - otherCls = cls._findJobStoreForUrl(parseResult) - return otherCls._get_is_directory(parseResult) - - @classmethod - def list_url(cls, src_uri: str) -> list[str]: - """ - List the directory at the given URL. Returned path components can be - joined with '/' onto the passed URL to form new URLs. Those that end in - '/' correspond to directories. The provided URL may or may not end with - '/'. - - Currently supported schemes are: - - - 's3' for objects in Amazon S3 - e.g. s3://bucket/prefix/ - - - 'file' for local files - e.g. file:///local/dir/path/ - - :param str src_uri: URL that points to a directory or prefix in the storage mechanism of a - supported URL scheme e.g. a prefix in an AWS s3 bucket. - - :return: A list of URL components in the given directory, already URL-encoded. - """ - parseResult = urlparse(src_uri) - otherCls = cls._findJobStoreForUrl(parseResult) - return otherCls._list_url(parseResult) - - @classmethod - def read_from_url(cls, src_uri: str, writable: IO[bytes]) -> tuple[int, bool]: - """ - Read the given URL and write its content into the given writable stream. - - Raises FileNotFoundError if the URL doesn't exist. - - :return: The size of the file in bytes and whether the executable permission bit is set - """ - parseResult = urlparse(src_uri) - otherCls = cls._findJobStoreForUrl(parseResult) - return otherCls._read_from_url(parseResult, writable) - - @classmethod - def open_url(cls, src_uri: str) -> IO[bytes]: - """ - Read from the given URI. - - Raises FileNotFoundError if the URL doesn't exist. - - Has a readable stream interface, unlike :meth:`read_from_url` which - takes a writable stream. - """ - parseResult = urlparse(src_uri) - otherCls = cls._findJobStoreForUrl(parseResult) - return otherCls._open_url(parseResult) - - @classmethod - @abstractmethod - def _url_exists(cls, url: ParseResult) -> bool: - """ - Return True if the item at the given URL exists, and Flase otherwise. - - May raise an error if file existence cannot be determined. - """ - raise NotImplementedError(f"No implementation for {url}") - - @classmethod - @abstractmethod - def _get_size(cls, url: ParseResult) -> Optional[int]: - """ - Get the size of the object at the given URL, or None if it cannot be obtained. - """ - raise NotImplementedError(f"No implementation for {url}") - - @classmethod - @abstractmethod - def _get_is_directory(cls, url: ParseResult) -> bool: - """ - Return True if the thing at the given URL is a directory, and False if - it is a file or it is known not to exist. The URL may or may not end in - '/'. - - :param url: URL that points to a file or object, or directory or prefix, - in the storage mechanism of a supported URL scheme e.g. a blob - in an AWS s3 bucket. - """ - raise NotImplementedError(f"No implementation for {url}") - - @classmethod - @abstractmethod - def _read_from_url(cls, url: ParseResult, writable: IO[bytes]) -> tuple[int, bool]: - """ - Reads the contents of the object at the specified location and writes it to the given - writable stream. - - Refer to :func:`~AbstractJobStore.importFile` documentation for currently supported URL schemes. - - Raises FileNotFoundError if the thing at the URL is not found. - - :param ParseResult url: URL that points to a file or object in the storage - mechanism of a supported URL scheme e.g. a blob in an AWS s3 bucket. - - :param IO[bytes] writable: a writable stream - - :return: The size of the file in bytes and whether the executable permission bit is set - """ - raise NotImplementedError(f"No implementation for {url}") - - @classmethod - @abstractmethod - def _list_url(cls, url: ParseResult) -> list[str]: - """ - List the contents of the given URL, which may or may not end in '/' - - Returns a list of URL components. Those that end in '/' are meant to be - directories, while those that do not are meant to be files. - - Refer to :func:`~AbstractJobStore.importFile` documentation for currently supported URL schemes. - - :param ParseResult url: URL that points to a directory or prefix in the - storage mechanism of a supported URL scheme e.g. a prefix in an AWS s3 - bucket. - - :return: The children of the given URL, already URL-encoded if - appropriate. (If the URL is a bare path, no encoding is done.) - """ - raise NotImplementedError(f"No implementation for {url}") - - @classmethod - @abstractmethod - def _open_url(cls, url: ParseResult) -> IO[bytes]: - """ - Get a stream of the object at the specified location. - - Refer to :func:`~AbstractJobStore.importFile` documentation for currently supported URL schemes. - - Raises FileNotFoundError if the thing at the URL is not found. - """ - raise NotImplementedError(f"No implementation for {url}") - - @classmethod - @abstractmethod - def _write_to_url( - cls, - readable: Union[IO[bytes], IO[str]], - url: ParseResult, - executable: bool = False, - ) -> None: - """ - Reads the contents of the given readable stream and writes it to the object at the - specified location. Raises FileNotFoundError if the URL doesn't exist.. - - Refer to AbstractJobStore.importFile documentation for currently supported URL schemes. - - :param Union[IO[bytes], IO[str]] readable: a readable stream - - :param ParseResult url: URL that points to a file or object in the storage - mechanism of a supported URL scheme e.g. a blob in an AWS s3 bucket. - - :param bool executable: determines if the file has executable permissions - """ - raise NotImplementedError(f"No implementation for {url}") - - @classmethod - @abstractmethod - def _supports_url(cls, url: ParseResult, export: bool = False) -> bool: - """ - Returns True if the job store supports the URL's scheme. - - Refer to AbstractJobStore.importFile documentation for currently supported URL schemes. - - :param ParseResult url: a parsed URL that may be supported - - :param bool export: Determines if the url is supported for exported - - :return bool: returns true if the cls supports the URL - """ - raise NotImplementedError(f"No implementation for {url}") + otherCls._write_to_url(readable, url, executable, self._config) @abstractmethod def destroy(self) -> None: @@ -1851,10 +1646,12 @@ def _requireValidSharedFileName(cls, sharedFileName: str) -> None: raise ValueError("Not a valid shared file name: '%s'." % sharedFileName) -class JobStoreSupport(AbstractJobStore, metaclass=ABCMeta): +class StandardURLProtocolImplementation(AbstractURLProtocolImplementation, metaclass=ABCMeta): """ - A mostly fake JobStore to access URLs not really associated with real job - stores. + A class to access standard HTTP and FTP URLs. + + File URLs are implemented by + :class:`toil.jobStores.fileJobStore.FileJobStore` instead. """ @classmethod @@ -1868,7 +1665,7 @@ def _supports_url(cls, url: ParseResult, export: bool = False) -> bool: return url.scheme.lower() in ("http", "https", "ftp") and not export @classmethod - def _url_exists(cls, url: ParseResult) -> bool: + def _url_exists(cls, url: ParseResult, config: Config) -> bool: # Deal with FTP first to support user/password auth if url.scheme.lower() == "ftp": ftp = cls._setup_ftp() @@ -1892,7 +1689,7 @@ def _url_exists(cls, url: ParseResult) -> bool: ErrorCondition(error=HTTPError, error_codes=[408, 500, 503]), ] ) - def _get_size(cls, url: ParseResult) -> Optional[int]: + def _get_size(cls, url: ParseResult, config: Config) -> Optional[int]: if url.scheme.lower() == "ftp": ftp = cls._setup_ftp() return ftp.size(url.geturl()) @@ -1904,11 +1701,11 @@ def _get_size(cls, url: ParseResult) -> Optional[int]: @classmethod def _read_from_url( - cls, url: ParseResult, writable: Union[IO[bytes], IO[str]] + cls, url: ParseResult, writable: Union[IO[bytes], IO[str]], config: Config ) -> tuple[int, bool]: # We can't actually retry after we start writing. # TODO: Implement retry with byte range requests - with cls._open_url(url) as readable: + with cls._open_url(url, config) as readable: # Make something to count the bytes we get # We need to put the actual count in a container so our # nested function can modify it without creating its own @@ -1932,7 +1729,7 @@ def count(l: int) -> None: ErrorCondition(error=HTTPError, error_codes=[408, 429, 500, 502, 503]), ] ) - def _open_url(cls, url: ParseResult) -> IO[bytes]: + def _open_url(cls, url: ParseResult, config: Config) -> IO[bytes]: # Deal with FTP first so we support user/password auth if url.scheme.lower() == "ftp": ftp = cls._setup_ftp() @@ -1958,11 +1755,11 @@ def _open_url(cls, url: ParseResult) -> IO[bytes]: raise @classmethod - def _get_is_directory(cls, url: ParseResult) -> bool: + def _get_is_directory(cls, url: ParseResult, config: Config) -> bool: # TODO: Implement HTTP index parsing and FTP directory listing return False @classmethod - def _list_url(cls, url: ParseResult) -> list[str]: + def _list_url(cls, url: ParseResult, config: Config) -> list[str]: # TODO: Implement HTTP index parsing and FTP directory listing raise NotImplementedError("HTTP and FTP URLs cannot yet be listed") diff --git a/src/toil/jobStores/aws/jobStore.py b/src/toil/jobStores/aws/jobStore.py index e1f33e7511..9c161c14a4 100644 --- a/src/toil/jobStores/aws/jobStore.py +++ b/src/toil/jobStores/aws/jobStore.py @@ -21,6 +21,7 @@ import stat import time import uuid +from argparse import ArgumentParser, _ArgumentGroup from collections.abc import Generator from contextlib import contextmanager from io import BytesIO @@ -30,10 +31,12 @@ from botocore.exceptions import ClientError import toil.lib.encryption as encryption +from toil.common import Config from toil.fileStores import FileID from toil.job import Job, JobDescription from toil.jobStores.abstractJobStore import ( AbstractJobStore, + AbstractURLProtocolImplementation, ConcurrentFileModificationException, JobStoreExistsException, LocatorException, @@ -69,12 +72,14 @@ retryable_s3_errors, ) from toil.lib.compatibility import compat_bytes +from toil.lib.conversions import opt_strtobool from toil.lib.ec2nodes import EC2Regions from toil.lib.exceptions import panic from toil.lib.io import AtomicFileCreate from toil.lib.memoize import strict_bool from toil.lib.objects import InnerClass from toil.lib.retry import get_error_code, get_error_status, retry +from toil.options import OptionSetter if TYPE_CHECKING: from mypy_boto3_sdb.type_defs import ( @@ -86,8 +91,6 @@ UpdateConditionTypeDef, ) - from toil import Config - boto3_session = establish_boto3_session() s3_boto3_resource = boto3_session.resource("s3") s3_boto3_client = boto3_session.client("s3") @@ -110,7 +113,7 @@ def __init__(self, domain_name): super().__init__(f"Expected domain {domain_name} to exist!") -class AWSJobStore(AbstractJobStore): +class AWSJobStore(AbstractJobStore, AbstractURLProtocolImplementation): """ A job store that uses Amazon's S3 for file storage and SimpleDB for storing job info and enforcing strong consistency on the S3 file storage. There will be SDB domains for jobs and @@ -132,7 +135,7 @@ class AWSJobStore(AbstractJobStore): maxNameLen = 10 nameSeparator = "--" - def __init__(self, locator: str, partSize: int = 50 << 20) -> None: + def __init__(self, locator: str, config: Config, partSize: int = 50 << 20) -> None: """ Create a new job store in AWS or load an existing one from there. @@ -140,7 +143,7 @@ def __init__(self, locator: str, partSize: int = 50 << 20) -> None: upload and copy, must be >= 5 MiB but large enough to not exceed 10k parts for the whole file """ - super().__init__(locator) + super().__init__(locator, config) region, namePrefix = locator.split(":") regions = EC2Regions.keys() if region not in regions: @@ -181,7 +184,7 @@ def __init__(self, locator: str, partSize: int = 50 << 20) -> None: self.s3_resource = boto3_session.resource("s3", region_name=self.region) self.s3_client = self.s3_resource.meta.client - def initialize(self, config: "Config") -> None: + def initialize(self) -> None: if self._registered: raise JobStoreExistsException(self.locator, "aws") self._registered = None @@ -191,7 +194,7 @@ def initialize(self, config: "Config") -> None: with panic(logger): self.destroy() else: - super().initialize(config) + super().initialize() # Only register after job store has been full initialized self._registered = True @@ -637,10 +640,16 @@ def _export_file(self, otherCls, file_id: FileID, uri: ParseResult) -> None: else: super()._default_export_file(otherCls, file_id, uri) + ### + # URL access implementation + ### + + # URL access methods aren't used by the rest of the job store methods. + @classmethod - def _url_exists(cls, url: ParseResult) -> bool: + def _url_exists(cls, url: ParseResult, config: Config) -> bool: try: - get_object_for_url(url, existing=True) + get_object_for_url(url, existing=True, anonymous=config.aws_anonymous_url_access) return True except FileNotFoundError: # Not a file @@ -648,18 +657,18 @@ def _url_exists(cls, url: ParseResult) -> bool: return cls._get_is_directory(url) @classmethod - def _get_size(cls, url: ParseResult) -> int: - return get_object_for_url(url, existing=True).content_length + def _get_size(cls, url: ParseResult, config: Config) -> int: + return get_object_for_url(url, existing=True, anonymous=config.aws_anonymous_url_access).content_length @classmethod - def _read_from_url(cls, url: ParseResult, writable): - srcObj = get_object_for_url(url, existing=True) + def _read_from_url(cls, url: ParseResult, writable, config: Config): + srcObj = get_object_for_url(url, existing=True, anonymous=config.aws_anonymous_url_access) srcObj.download_fileobj(writable) return (srcObj.content_length, False) # executable bit is always False @classmethod - def _open_url(cls, url: ParseResult) -> IO[bytes]: - src_obj = get_object_for_url(url, existing=True) + def _open_url(cls, url: ParseResult, config: Config) -> IO[bytes]: + src_obj = get_object_for_url(url, existing=True, anonymous=config.aws_anonymous_url_access) response = src_obj.get() # We should get back a response with a stream in 'Body' if "Body" not in response: @@ -668,9 +677,9 @@ def _open_url(cls, url: ParseResult) -> IO[bytes]: @classmethod def _write_to_url( - cls, readable, url: ParseResult, executable: bool = False + cls, readable, url: ParseResult, executable: bool, config: Config ) -> None: - dstObj = get_object_for_url(url) + dstObj = get_object_for_url(url, anonymous=config.aws_anonymous_url_access) logger.debug("Uploading %s", dstObj.key) # uploadFile takes care of using multipart upload if the file is larger than partSize (default to 5MB) @@ -683,18 +692,54 @@ def _write_to_url( ) @classmethod - def _list_url(cls, url: ParseResult) -> list[str]: - return list_objects_for_url(url) + def _list_url(cls, url: ParseResult, config: Config) -> list[str]: + return list_objects_for_url(url, anonymous=config.aws_anonymous_url_access) @classmethod - def _get_is_directory(cls, url: ParseResult) -> bool: + def _get_is_directory(cls, url: ParseResult, config: Config) -> bool: # We consider it a directory if anything is in it. # TODO: Can we just get the first item and not the whole list? - return len(list_objects_for_url(url)) > 0 + return len(cls._list_url(url, config)) > 0 @classmethod def _supports_url(cls, url: ParseResult, export: bool = False) -> bool: return url.scheme.lower() == "s3" + + #### + + # To set the config flag affecting URL access, we need command-line option support. + + @classmethod + def add_options(cls, parser: Union[ArgumentParser, _ArgumentGroup]) -> None: + """ + If this job store provides any command line options, add them to the given parser. + """ + + parser.add_argument( + "--awsAnonymousUrlAccess", + dest="aws_anonymous_url_access", + default=False, + metavar="BOOL", + env_var="TOIL_AWS_ANONYMOUS_URL_ACCESS", + type=opt_strtobool, + help="Whether to access AWS S3 URLs anonymously. Useful for skipping " + "multi-factor authentication when MFA is configured but unnecessary. " + "(default: %(default)s)", + ) + + @classmethod + def set_options(cls, set_option: OptionSetter) -> None: + """ + Process command line options relevant to this job store. + + :param set_option: A function taking an option name and returning + nothing, used to update run configuration as a side effect. + """ + + # Set the option in the config + set_option("aws_anonymous_url_access") + + ### def write_file( self, local_path: FileID, job_id: Optional[FileID] = None, cleanup: bool = False diff --git a/src/toil/jobStores/fileJobStore.py b/src/toil/jobStores/fileJobStore.py index fa6df1d325..be3976427c 100644 --- a/src/toil/jobStores/fileJobStore.py +++ b/src/toil/jobStores/fileJobStore.py @@ -26,10 +26,12 @@ from typing import IO, Literal, Optional, Union, overload from urllib.parse import ParseResult, quote, unquote +from toil.common import Config from toil.fileStores import FileID from toil.job import TemporaryID from toil.jobStores.abstractJobStore import ( AbstractJobStore, + AbstractURLProtocolImplementation, JobStoreExistsException, NoSuchFileException, NoSuchJobException, @@ -46,7 +48,7 @@ logger = logging.getLogger(__name__) -class FileJobStore(AbstractJobStore): +class FileJobStore(AbstractJobStore, AbstractURLProtocolImplementation): """ A job store that uses a directory on a locally attached file system. To be compatible with distributed batch systems, that file system must be shared by all worker nodes. @@ -81,13 +83,13 @@ def default_caching(self) -> bool: return False - def __init__(self, path: str, fanOut: int = 1000) -> None: + def __init__(self, path: str, config: Config, fanOut: int = 1000) -> None: """ :param path: Path to directory holding the job store :param fanOut: Number of items to have in a directory before making subdirectories """ - super().__init__(path) + super().__init__(path, config) self.jobStoreDir = os.path.abspath(path) logger.debug("Path to job store directory is '%s'.", self.jobStoreDir) @@ -116,7 +118,7 @@ def __init__(self, path: str, fanOut: int = 1000) -> None: def __repr__(self): return f"FileJobStore({self.jobStoreDir})" - def initialize(self, config): + def initialize(self): try: os.mkdir(self.jobStoreDir) except OSError as e: @@ -131,10 +133,10 @@ def initialize(self, config): os.makedirs(self.filesDir, exist_ok=True) os.makedirs(self.jobFilesDir, exist_ok=True) os.makedirs(self.sharedFilesDir, exist_ok=True) - self.linkImports = config.symlinkImports - self.moveExports = config.moveOutputs - self.symlink_job_store_reads = config.symlink_job_store_reads - super().initialize(config) + self.linkImports = self.config.symlinkImports + self.moveExports = self.config.moveOutputs + self.symlink_job_store_reads = self.config.symlink_job_store_reads + super().initialize() def resume(self): if not os.path.isdir(self.jobStoreDir): @@ -394,15 +396,15 @@ def _move_and_linkback(self, srcPath, destPath, executable): os.chmod(destPath, os.stat(destPath).st_mode | stat.S_IXUSR) @classmethod - def _url_exists(cls, url: ParseResult) -> bool: + def _url_exists(cls, url: ParseResult, config: Config) -> bool: return os.path.exists(cls._extract_path_from_url(url)) @classmethod - def _get_size(cls, url): + def _get_size(cls, url: ParseResult, config: Config) -> int: return os.stat(cls._extract_path_from_url(url)).st_size @classmethod - def _read_from_url(cls, url, writable): + def _read_from_url(cls, url: ParseResult, writable: bool, config: Config) -> tuple[int, bool]: """ Writes the contents of a file to a source (writes url to writable) using a ~10Mb buffer. @@ -412,21 +414,21 @@ def _read_from_url(cls, url, writable): """ # we use a ~10Mb buffer to improve speed - with cls._open_url(url) as readable: + with cls._open_url(url, config) as readable: shutil.copyfileobj(readable, writable, length=cls.BUFFER_SIZE) # Return the number of bytes we read when we reached EOF. executable = os.stat(readable.name).st_mode & stat.S_IXUSR return readable.tell(), executable @classmethod - def _open_url(cls, url: ParseResult) -> IO[bytes]: + def _open_url(cls, url: ParseResult, config: Config) -> IO[bytes]: """ Open a file URL as a binary stream. """ return open(cls._extract_path_from_url(url), "rb") @classmethod - def _write_to_url(cls, readable, url, executable=False): + def _write_to_url(cls, readable: IO[bytes], url: ParseResult, executable: bool, config: Config): """ Writes the contents of a file to a source (writes readable to url) using a ~10Mb buffer. @@ -443,7 +445,7 @@ def _write_to_url(cls, readable, url, executable=False): ) @classmethod - def _list_url(cls, url: ParseResult) -> list[str]: + def _list_url(cls, url: ParseResult, config: Config) -> list[str]: path = cls._extract_path_from_url(url) listing = [] for p in os.listdir(path): @@ -456,7 +458,7 @@ def _list_url(cls, url: ParseResult) -> list[str]: return listing @classmethod - def _get_is_directory(cls, url: ParseResult) -> bool: + def _get_is_directory(cls, url: ParseResult, config: Config) -> bool: path = cls._extract_path_from_url(url) return os.path.isdir(path) diff --git a/src/toil/jobStores/googleJobStore.py b/src/toil/jobStores/googleJobStore.py index 1cc2cc2714..7fade50a36 100644 --- a/src/toil/jobStores/googleJobStore.py +++ b/src/toil/jobStores/googleJobStore.py @@ -31,8 +31,10 @@ from google.auth.exceptions import DefaultCredentialsError from google.cloud import exceptions, storage +from toil.common import Config from toil.jobStores.abstractJobStore import ( AbstractJobStore, + AbstractURLProtocolImplementation, JobStoreExistsException, NoSuchFileException, NoSuchJobException, @@ -91,12 +93,12 @@ def wrapper(*args, **kwargs): return wrapper -class GoogleJobStore(AbstractJobStore): +class GoogleJobStore(AbstractJobStore, AbstractURLProtocolImplementation): nodeServiceAccountJson = "/root/service_account.json" - def __init__(self, locator: str) -> None: - super().__init__(locator) + def __init__(self, locator: str, config: Config) -> None: + super().__init__(locator, config) try: projectID, namePrefix = locator.split(":", 1) @@ -173,12 +175,12 @@ def create_client(cls) -> storage.Client: return storage.Client.create_anonymous_client() @google_retry - def initialize(self, config=None): + def initialize(self): try: self.bucket = self.storageClient.create_bucket(self.bucketName) except exceptions.Conflict: raise JobStoreExistsException(self.locator, "google") - super().initialize(config) + super().initialize() # set up sever side encryption after we set up config in super if self.config.sseKey is not None: @@ -439,7 +441,7 @@ def _get_blob_from_url(cls, url, exists=False): return blob @classmethod - def _url_exists(cls, url: ParseResult) -> bool: + def _url_exists(cls, url: ParseResult, config: Config) -> bool: try: cls._get_blob_from_url(url, exists=True) return True @@ -447,17 +449,17 @@ def _url_exists(cls, url: ParseResult) -> bool: return False @classmethod - def _get_size(cls, url): + def _get_size(cls, url, config: Config): return cls._get_blob_from_url(url, exists=True).size @classmethod - def _read_from_url(cls, url, writable): + def _read_from_url(cls, url, writable, config: Config): blob = cls._get_blob_from_url(url, exists=True) blob.download_to_file(writable) return blob.size, False @classmethod - def _open_url(cls, url: ParseResult) -> IO[bytes]: + def _open_url(cls, url: ParseResult, config: Config) -> IO[bytes]: blob = cls._get_blob_from_url(url, exists=True) return blob.open("rb") @@ -466,18 +468,18 @@ def _supports_url(cls, url, export=False): return url.scheme.lower() == "gs" @classmethod - def _write_to_url(cls, readable: bytes, url: str, executable: bool = False) -> None: + def _write_to_url(cls, readable: bytes, url: str, executable: bool, config: Config) -> None: blob = cls._get_blob_from_url(url) blob.upload_from_file(readable) @classmethod - def _list_url(cls, url: ParseResult) -> list[str]: + def _list_url(cls, url: ParseResult, config: Config) -> list[str]: raise NotImplementedError( "Listing files in Google buckets is not yet implemented!" ) @classmethod - def _get_is_directory(cls, url: ParseResult) -> bool: + def _get_is_directory(cls, url: ParseResult, config: Config) -> bool: raise NotImplementedError( "Checking directory status in Google buckets is not yet implemented!" ) diff --git a/src/toil/jobStores/options.py b/src/toil/jobStores/options.py new file mode 100644 index 0000000000..eeda1ed6d6 --- /dev/null +++ b/src/toil/jobStores/options.py @@ -0,0 +1,51 @@ +# Copyright (C) 2015-2025 Regents of the University of California +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and + +import logging +from argparse import ArgumentParser, _ArgumentGroup +from typing import Optional, Union + +from toil.options import OptionSetter + +logger = logging.getLogger(__name__) + +def add_all_job_store_options(parser: Union[ArgumentParser, _ArgumentGroup]) -> None: + """ + Add all job stores' options to the parser. + """ + + from toil.jobStores.abstractJobStore import AbstractJobStore + + for job_store_type in AbstractJobStore._get_job_store_classes(): + logger.debug("Add options for %s job store", job_store_type) + job_store_type.add_options(parser) + +def set_job_store_options( + set_option: OptionSetter +) -> None: + """ + Call set_option for all job stores' options. + + Unlike with batch systems, multiple job store classes can be used in one + workflow (for URL access), so we don't allow restrictign to jsut one + implementation. + """ + + from toil.jobStores.abstractJobStore import AbstractJobStore + + for job_store_type in AbstractJobStore._get_job_store_classes(): + job_store_type.set_options(set_option) + + + + diff --git a/src/toil/lib/aws/session.py b/src/toil/lib/aws/session.py index a442648322..f14cd116d2 100644 --- a/src/toil/lib/aws/session.py +++ b/src/toil/lib/aws/session.py @@ -35,6 +35,9 @@ logger = logging.getLogger(__name__) +# You can pass config=ANONYMOUS_CONFIG to make anonymous S3 accesses +ANONYMOUS_CONFIG = Config(signature_version=botocore.UNSIGNED) + # A note on thread safety: # # Boto3 Session: Not thread safe, 1 per thread is required. @@ -148,6 +151,7 @@ def resource( region: Optional[str], service_name: Literal["s3"], endpoint_url: Optional[str] = None, + config: Optional[Config] = None, ) -> "S3ServiceResource": ... @overload def resource( @@ -155,6 +159,7 @@ def resource( region: Optional[str], service_name: Literal["iam"], endpoint_url: Optional[str] = None, + config: Optional[Config] = None, ) -> "IAMServiceResource": ... @overload def resource( @@ -162,6 +167,7 @@ def resource( region: Optional[str], service_name: Literal["ec2"], endpoint_url: Optional[str] = None, + config: Optional[Config] = None, ) -> "EC2ServiceResource": ... def resource( @@ -169,6 +175,7 @@ def resource( region: Optional[str], service_name: str, endpoint_url: Optional[str] = None, + config: Optional[Config] = None, ) -> boto3.resources.base.ServiceResource: """ Get the Boto3 Resource to use with the given service (like 'ec2') in the given region. @@ -188,10 +195,10 @@ def resource( # The Boto3 stubs are missing an overload for `resource` that takes # a non-literal string. See # - storage.item = self.session(region).resource(service_name, endpoint_url=endpoint_url) # type: ignore + storage.item = self.session(region).resource(service_name, endpoint_url=endpoint_url, config=config) # type: ignore else: # We might not be able to pass None to Boto3 and have it be the same as no argument. - storage.item = self.session(region).resource(service_name) # type: ignore + storage.item = self.session(region).resource(service_name, config=config) # type: ignore return cast(boto3.resources.base.ServiceResource, storage.item) @@ -369,18 +376,21 @@ def resource( service_name: Literal["s3"], region_name: Optional[str] = None, endpoint_url: Optional[str] = None, + config: Optional[Config] = None, ) -> "S3ServiceResource": ... @overload def resource( service_name: Literal["iam"], region_name: Optional[str] = None, endpoint_url: Optional[str] = None, + config: Optional[Config] = None, ) -> "IAMServiceResource": ... @overload def resource( service_name: Literal["ec2"], region_name: Optional[str] = None, endpoint_url: Optional[str] = None, + config: Optional[Config] = None, ) -> "EC2ServiceResource": ... @@ -388,6 +398,7 @@ def resource( service_name: Literal["s3", "iam", "ec2"], region_name: Optional[str] = None, endpoint_url: Optional[str] = None, + config: Optional[Config] = None, ) -> boto3.resources.base.ServiceResource: """ Get a Boto 3 resource for a particular AWS service, usable by the current thread. @@ -397,5 +408,5 @@ def resource( # Just use a global version of the manager. Note that we change the argument order! return _global_manager.resource( - region_name, service_name, endpoint_url=endpoint_url + region_name, service_name, endpoint_url=endpoint_url, config=config ) diff --git a/src/toil/lib/aws/utils.py b/src/toil/lib/aws/utils.py index a6b2a2e0a3..dd608705a7 100644 --- a/src/toil/lib/aws/utils.py +++ b/src/toil/lib/aws/utils.py @@ -19,6 +19,7 @@ from typing import TYPE_CHECKING, Any, Callable, ContextManager, Optional, cast from urllib.parse import ParseResult +# To import toil.lib.aws.session, the AWS libraries must be installed from toil.lib.aws import AWSRegionName, AWSServerErrors, session from toil.lib.conversions import strtobool from toil.lib.misc import printq @@ -37,12 +38,7 @@ from mypy_boto3_s3.service_resource import Object as S3Object from mypy_boto3_sdb.type_defs import AttributeTypeDef -try: - from botocore.exceptions import ClientError, EndpointConnectionError -except ImportError: - ClientError = None # type: ignore - EndpointConnectionError = None # type: ignore - # AWS/boto extra is not installed +from botocore.exceptions import ClientError, EndpointConnectionError logger = logging.getLogger(__name__) @@ -232,6 +228,7 @@ def get_bucket_region( bucket_name: str, endpoint_url: Optional[str] = None, only_strategies: Optional[set[int]] = None, + anonymous: Optional[bool] = None ) -> str: """ Get the AWS region name associated with the given S3 bucket, or raise NoBucketLocationError. @@ -243,7 +240,8 @@ def get_bucket_region( :param only_strategies: For testing, use only strategies with 1-based numbers in this set. """ - s3_client = session.client("s3", endpoint_url=endpoint_url) + config = session.ANONYMOUS_CONFIG if anonymous else None + s3_client = session.client("s3", endpoint_url=endpoint_url, config=config) def attempt_get_bucket_location() -> Optional[str]: """ @@ -267,7 +265,7 @@ def attempt_get_bucket_location_from_us_east_1() -> Optional[str]: # It could also be because AWS open data buckets (which we tend to # encounter this problem for) tend to actually themselves be in # us-east-1. - backup_s3_client = session.client("s3", region_name="us-east-1") + backup_s3_client = session.client("s3", region_name="us-east-1", config=config) return backup_s3_client.get_bucket_location(Bucket=bucket_name).get( "LocationConstraint", None ) @@ -346,7 +344,7 @@ def bucket_location_to_region(location: Optional[str]) -> str: return "us-east-1" if location == "" or location is None else location -def get_object_for_url(url: ParseResult, existing: Optional[bool] = None) -> "S3Object": +def get_object_for_url(url: ParseResult, existing: Optional[bool] = None, anonymous: Optional[bool] = None) -> "S3Object": """ Extracts a key (object) from a given parsed s3:// URL. @@ -372,17 +370,18 @@ def get_object_for_url(url: ParseResult, existing: Optional[bool] = None) -> "S3 # TODO: OrdinaryCallingFormat equivalent in boto3? # if botoargs: # botoargs['calling_format'] = boto.s3.connection.OrdinaryCallingFormat() - + + config = session.ANONYMOUS_CONFIG if anonymous else None try: # Get the bucket's region to avoid a redirect per request - region = get_bucket_region(bucket_name, endpoint_url=endpoint_url) - s3 = session.resource("s3", region_name=region, endpoint_url=endpoint_url) + region = get_bucket_region(bucket_name, endpoint_url=endpoint_url, anonymous=anonymous) + s3 = session.resource("s3", region_name=region, endpoint_url=endpoint_url, config=config) except NoBucketLocationError as e: # Probably don't have permission. # TODO: check if it is that logger.debug("Couldn't get bucket location: %s", e) logger.debug("Fall back to not specifying location") - s3 = session.resource("s3", endpoint_url=endpoint_url) + s3 = session.resource("s3", endpoint_url=endpoint_url, config=config) obj = s3.Object(bucket_name, key_name) objExists = True @@ -407,11 +406,12 @@ def get_object_for_url(url: ParseResult, existing: Optional[bool] = None) -> "S3 @retry(errors=[AWSServerErrors]) -def list_objects_for_url(url: ParseResult) -> list[str]: +def list_objects_for_url(url: ParseResult, anonymous: Optional[bool] = None) -> list[str]: """ Extracts a key (object) from a given parsed s3:// URL. The URL will be supplemented with a trailing slash if it is missing. """ + key_name = url.path[1:] bucket_name = url.netloc @@ -430,8 +430,9 @@ def list_objects_for_url(url: ParseResult) -> list[str]: protocol = "http" if host: endpoint_url = f"{protocol}://{host}" + f":{port}" if port else "" - - client = session.client("s3", endpoint_url=endpoint_url) + + config = session.ANONYMOUS_CONFIG if anonymous else None + client = session.client("s3", endpoint_url=endpoint_url, config=config) listing = [] diff --git a/src/toil/lib/url.py b/src/toil/lib/url.py new file mode 100644 index 0000000000..b1a12e1fcb --- /dev/null +++ b/src/toil/lib/url.py @@ -0,0 +1,313 @@ +# Copyright (C) 2015-2025 Regents of the University of California +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +import os +from abc import ABC, ABCMeta, abstractmethod +from typing import ( + IO, + TYPE_CHECKING, + Any, + Callable, + ContextManager, + Literal, + Optional, + Union, + cast, + overload, +) +from urllib.parse import ParseResult, urlparse + +from toil.lib.exceptions import UnimplementedURLException + +if TYPE_CHECKING: + from toil.common import Config + +logger = logging.getLogger(__name__) + +class URLAccess: + """ + Widget for accessing external storage (URLs). + + Can be instantiated. + """ + + def __init__(self, config: "Config") -> None: + """ + Make a new widget for accessing URLs. + """ + self._config = config + super().__init__() + + def url_exists(self, src_uri: str) -> bool: + """ + Return True if the file at the given URI exists, and False otherwise. + + May raise an error if file existence cannot be determined. + + :param src_uri: URL that points to a file or object in the storage + mechanism of a supported URL scheme e.g. a blob in an AWS s3 bucket. + + """ + parseResult = urlparse(src_uri) + otherCls = AbstractURLProtocolImplementation.find_url_implementation(parseResult) + return otherCls._url_exists(parseResult, self._config) + + def get_size(self, src_uri: str) -> Optional[int]: + """ + Get the size in bytes of the file at the given URL, or None if it cannot be obtained. + + :param src_uri: URL that points to a file or object in the storage + mechanism of a supported URL scheme e.g. a blob in an AWS s3 bucket. + """ + parseResult = urlparse(src_uri) + otherCls = AbstractURLProtocolImplementation.find_url_implementation(parseResult) + return otherCls._get_size(parseResult, self._config) + + def get_is_directory(self, src_uri: str) -> bool: + """ + Return True if the thing at the given URL is a directory, and False if + it is a file. The URL may or may not end in '/'. + """ + parseResult = urlparse(src_uri) + otherCls = AbstractURLProtocolImplementation.find_url_implementation(parseResult) + return otherCls._get_is_directory(parseResult, self._config) + + def list_url(self, src_uri: str) -> list[str]: + """ + List the directory at the given URL. Returned path components can be + joined with '/' onto the passed URL to form new URLs. Those that end in + '/' correspond to directories. The provided URL may or may not end with + '/'. + + Currently supported schemes are: + + - 's3' for objects in Amazon S3 + e.g. s3://bucket/prefix/ + + - 'file' for local files + e.g. file:///local/dir/path/ + + :param str src_uri: URL that points to a directory or prefix in the storage mechanism of a + supported URL scheme e.g. a prefix in an AWS s3 bucket. + + :return: A list of URL components in the given directory, already URL-encoded. + """ + parseResult = urlparse(src_uri) + otherCls = AbstractURLProtocolImplementation.find_url_implementation(parseResult) + return otherCls._list_url(parseResult, self._config) + + def read_from_url(self, src_uri: str, writable: IO[bytes]) -> tuple[int, bool]: + """ + Read the given URL and write its content into the given writable stream. + + Raises FileNotFoundError if the URL doesn't exist. + + :param config: The current Toil configuration object. + + :return: The size of the file in bytes and whether the executable permission bit is set + """ + parseResult = urlparse(src_uri) + otherCls = AbstractURLProtocolImplementation.find_url_implementation(parseResult) + return otherCls._read_from_url(parseResult, writable, self._config) + + def open_url(self, src_uri: str) -> IO[bytes]: + """ + Read from the given URI. + + Raises FileNotFoundError if the URL doesn't exist. + + Has a readable stream interface, unlike :meth:`read_from_url` which + takes a writable stream. + """ + parseResult = urlparse(src_uri) + otherCls = AbstractURLProtocolImplementation.find_url_implementation(parseResult) + return otherCls._open_url(parseResult, self._config) + + +class AbstractURLProtocolImplementation(ABC): + """ + Base class for URL accessor implementations. Also manages finding the + implementation for a URL. + + Many job stores implement this to allow access to URLs on the same kind of + backing storage as that kind of job store. + """ + + @classmethod + def find_url_implementation( + cls, url: ParseResult, export: bool = False + ) -> type["AbstractURLProtocolImplementation"]: + """ + Returns the subclass that supports the given URL. + + :param ParseResult url: The given URL + + :param bool export: Determines if the url is supported for exporting + """ + # TODO: Make pluggable. + + from toil.jobStores.abstractJobStore import AbstractJobStore, StandardURLProtocolImplementation + + if StandardURLProtocolImplementation._supports_url(url, export): + # This is a standard URL scheme + return StandardURLProtocolImplementation + + for implementation in AbstractJobStore._get_job_store_classes(): + if issubclass(implementation, AbstractURLProtocolImplementation) and implementation._supports_url(url, export): + # This scheme belongs to one of the available job store implementations. + return implementation + raise UnimplementedURLException(url, "export" if export else "import") + + # TODO: De-private-ify these methods while allowing a class that provides + # this to also be an instance of URLAccess + + @classmethod + @abstractmethod + def _url_exists(cls, url: ParseResult, config: "Config") -> bool: + """ + Return True if the item at the given URL exists, and Flase otherwise. + + May raise an error if file existence cannot be determined. + + :param config: The current Toil configuration object. + """ + raise NotImplementedError(f"No implementation for {url}") + + @classmethod + @abstractmethod + def _get_size(cls, url: ParseResult, config: "Config") -> Optional[int]: + """ + Get the size of the object at the given URL, or None if it cannot be obtained. + + :param config: The current Toil configuration object. + """ + raise NotImplementedError(f"No implementation for {url}") + + @classmethod + @abstractmethod + def _get_is_directory(cls, url: ParseResult, config: "Config") -> bool: + """ + Return True if the thing at the given URL is a directory, and False if + it is a file or it is known not to exist. The URL may or may not end in + '/'. + + :param url: URL that points to a file or object, or directory or prefix, + in the storage mechanism of a supported URL scheme e.g. a blob + in an AWS s3 bucket. + + :param config: The current Toil configuration object. + """ + raise NotImplementedError(f"No implementation for {url}") + + @classmethod + @abstractmethod + def _read_from_url(cls, url: ParseResult, writable: IO[bytes], config: "Config") -> tuple[int, bool]: + """ + Reads the contents of the object at the specified location and writes it to the given + writable stream. + + Refer to :func:`~AbstractJobStore.importFile` documentation for currently supported URL schemes. + + Raises FileNotFoundError if the thing at the URL is not found. + + :param ParseResult url: URL that points to a file or object in the storage + mechanism of a supported URL scheme e.g. a blob in an AWS s3 bucket. + + :param IO[bytes] writable: a writable stream + + :param config: The current Toil configuration object. + + :return: The size of the file in bytes and whether the executable permission bit is set + """ + raise NotImplementedError(f"No implementation for {url}") + + @classmethod + @abstractmethod + def _list_url(cls, url: ParseResult, config: "Config") -> list[str]: + """ + List the contents of the given URL, which may or may not end in '/' + + Returns a list of URL components. Those that end in '/' are meant to be + directories, while those that do not are meant to be files. + + Refer to :func:`~AbstractJobStore.importFile` documentation for currently supported URL schemes. + + :param ParseResult url: URL that points to a directory or prefix in the + storage mechanism of a supported URL scheme e.g. a prefix in an AWS s3 + bucket. + + :param config: The current Toil configuration object. + + :return: The children of the given URL, already URL-encoded if + appropriate. (If the URL is a bare path, no encoding is done.) + """ + raise NotImplementedError(f"No implementation for {url}") + + @classmethod + @abstractmethod + def _open_url(cls, url: ParseResult, config: "Config") -> IO[bytes]: + """ + Get a stream of the object at the specified location. + + Refer to :func:`~AbstractJobStore.importFile` documentation for currently supported URL schemes. + + :param config: The current Toil configuration object. + + :raises: FileNotFoundError if the thing at the URL is not found. + """ + raise NotImplementedError(f"No implementation for {url}") + + @classmethod + @abstractmethod + def _write_to_url( + cls, + readable: Union[IO[bytes], IO[str]], + url: ParseResult, + executable: bool, + config: "Config" + ) -> None: + """ + Reads the contents of the given readable stream and writes it to the object at the + specified location. Raises FileNotFoundError if the URL doesn't exist.. + + Refer to AbstractJobStore.importFile documentation for currently supported URL schemes. + + :param Union[IO[bytes], IO[str]] readable: a readable stream + + :param ParseResult url: URL that points to a file or object in the storage + mechanism of a supported URL scheme e.g. a blob in an AWS s3 bucket. + + :param bool executable: determines if the file has executable permissions + + :param config: The current Toil configuration object. + """ + raise NotImplementedError(f"No implementation for {url}") + + @classmethod + @abstractmethod + def _supports_url(cls, url: ParseResult, export: bool = False) -> bool: + """ + Returns True if the job store supports the URL's scheme. + + Refer to AbstractJobStore.importFile documentation for currently supported URL schemes. + + :param ParseResult url: a parsed URL that may be supported + + :param bool export: Determines if the url is supported for export + + :param config: The current Toil configuration object. + + :return bool: returns true if the cls supports the URL + """ + raise NotImplementedError(f"No implementation for {url}") diff --git a/src/toil/options/__init__.py b/src/toil/options/__init__.py index e69de29bb2..ea0843462e 100644 --- a/src/toil/options/__init__.py +++ b/src/toil/options/__init__.py @@ -0,0 +1,33 @@ +# Copyright (C) 2015-2025 Regents of the University of California +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and + +from typing import Any, Optional, Protocol + +class OptionSetter(Protocol): + """ + Protocol for the setOption function we get to let us set up CLI options for + each batch system or job store. + + Used to have more parameters for managing defaults and environment + variables, but now that's all done by the parser. + + Actual functionality is defined in the Config class. + + Looks first at option_name and then at old_names to find the namespace key for the option. + """ + + def __call__( + self, + option_name: str, + old_names: Optional[list[str]] = None, + ) -> None: ... diff --git a/src/toil/options/common.py b/src/toil/options/common.py index b32bfba1c0..e9304d67ef 100644 --- a/src/toil/options/common.py +++ b/src/toil/options/common.py @@ -6,7 +6,8 @@ from configargparse import SUPPRESS from ruamel.yaml import YAML -from toil.batchSystems.options import add_all_batchsystem_options +from toil.batchSystems.options import add_all_batch_system_options +from toil.jobStores.options import add_all_job_store_options from toil.lib.conversions import bytes2human, human2bytes, opt_strtobool, strtobool from toil.provisioners import parse_node_types from toil.statsAndLogging import add_logging_options @@ -421,18 +422,18 @@ def is_within(x: Union[int, float]) -> bool: ) # Batch system options - batchsystem_options = parser.add_argument_group( - title="Toil options for specifying the batch system", - description="Allows the specification of the batch system.", + batch_system_options = parser.add_argument_group( + title="Toil options for the batch system", + description="Allows specifying and configuring batch system.", ) - add_all_batchsystem_options(batchsystem_options) + add_all_batch_system_options(batch_system_options) # File store options - file_store_options = parser.add_argument_group( + storage_options = parser.add_argument_group( title="Toil options for configuring storage", description="Allows configuring Toil's data storage.", ) - link_imports = file_store_options.add_mutually_exclusive_group() + link_imports = storage_options.add_mutually_exclusive_group() link_imports_help = ( "When using a filesystem based job store, CWL input files are by default symlinked in. " "Setting this option to True instead copies the files into the job store, which may protect " @@ -448,7 +449,7 @@ def is_within(x: Union[int, float]) -> bool: metavar="BOOL", help=link_imports_help, ) - move_exports = file_store_options.add_mutually_exclusive_group() + move_exports = storage_options.add_mutually_exclusive_group() move_exports_help = ( "When using a filesystem based job store, output files are by default moved to the " "output directory, and a symlink to the moved exported file is created at the initial " @@ -465,7 +466,7 @@ def is_within(x: Union[int, float]) -> bool: help=move_exports_help, ) - caching = file_store_options.add_mutually_exclusive_group() + caching = storage_options.add_mutually_exclusive_group() caching_help = "Enable or disable caching for your workflow, specifying this overrides default from job store" caching.add_argument( "--caching", @@ -477,7 +478,7 @@ def is_within(x: Union[int, float]) -> bool: ) # default is None according to PR 4299, seems to be generated at runtime - file_store_options.add_argument( + storage_options.add_argument( "--symlinkJobStoreReads", dest="symlink_job_store_reads", type=strtobool, @@ -487,6 +488,10 @@ def is_within(x: Union[int, float]) -> bool: "via symlink. default=%(default)s", ) + # Add job store pluggable options to storage section + add_all_job_store_options(storage_options) + + # Auto scaling options autoscaling_options = parser.add_argument_group( title="Toil options for autoscaling the cluster of worker nodes", diff --git a/src/toil/test/batchSystems/batch_system_plugin_test.py b/src/toil/test/batchSystems/batch_system_plugin_test.py index a57ed97eb4..d267782d94 100644 --- a/src/toil/test/batchSystems/batch_system_plugin_test.py +++ b/src/toil/test/batchSystems/batch_system_plugin_test.py @@ -21,7 +21,7 @@ UpdatedBatchJobInfo, ) from toil.batchSystems.cleanup_support import BatchSystemCleanupSupport -from toil.batchSystems.options import OptionSetter +from toil.options import OptionSetter from toil.batchSystems.registry import add_batch_system_factory from toil.common import Toil, addOptions from toil.job import JobDescription @@ -87,4 +87,4 @@ def fake_batch_system_factory() -> type[AbstractBatchSystem]: # try to install a batchsystem plugin with some arguments # if the arguments exists, the values should also exist in the config with Toil(options) as toil: - self.assertEqual(toil.config.fake_argument == "exists", True) + self.assertEqual(toil._config.fake_argument == "exists", True) diff --git a/src/toil/test/jobStores/jobStoreTest.py b/src/toil/test/jobStores/jobStoreTest.py index 1c88c071ab..df1fd947e3 100644 --- a/src/toil/test/jobStores/jobStoreTest.py +++ b/src/toil/test/jobStores/jobStoreTest.py @@ -115,17 +115,24 @@ def _createJobStore(self): def setUp(self): super().setUp() self.namePrefix = "jobstore-test-" + str(uuid.uuid4()) - self.config = self._createConfig() # Jobstores to be used in testing. - # jobstore_initialized is created with a particular configuration, as creating by self._createConfig() - # jobstore_resume_noconfig is created with the resume() method. resume() will look for a previously - # instantiated jobstore, and initialize the jobstore calling it with the found config. In this case, - # jobstore_resume_noconfig will be initialized with the config from jobstore_initialized. + # + # jobstore_initialized is created with a particular configuration, + # as creating by self._createConfig() + # + # jobstore_resumed is created with the resume() method. resume() + # will look for a previously instantiated jobstore, and initialize + # the jobstore calling it with the found config, overriding the one + # from _createConfig(). In this case, jobstore_resumed will be + # initialized with the config from jobstore_initialized. self.jobstore_initialized = self._createJobStore() - self.jobstore_initialized.initialize(self.config) - self.jobstore_resumed_noconfig = self._createJobStore() - self.jobstore_resumed_noconfig.resume() + self.jobstore_initialized._config.test_value = 123 + self.jobstore_initialized.initialize() + self.config = self.jobstore_initialized._config + + self.jobstore_resumed = self._createJobStore() + self.jobstore_resumed.resume() # Requirements for jobs to be created. self.arbitraryRequirements = { @@ -145,7 +152,7 @@ def setUp(self): def tearDown(self): self.jobstore_initialized.destroy() - self.jobstore_resumed_noconfig.destroy() + self.jobstore_resumed.destroy() super().tearDown() def testInitialState(self): @@ -196,8 +203,8 @@ def testConfigEquality(self): """ newJobStore = self._createJobStore() newJobStore.resume() - self.assertEqual(newJobStore.config, self.config) - self.assertIsNot(newJobStore.config, self.config) + self.assertEqual(newJobStore._config, self.config) + self.assertIsNot(newJobStore._config, self.config) def testJobLoadEquality(self): """Tests that a job created via one JobStore instance can be loaded from another.""" @@ -210,7 +217,7 @@ def testJobLoadEquality(self): self.jobstore_initialized.create_job(jobDesc1) # Load it from the second jobstore - jobDesc2 = self.jobstore_resumed_noconfig.load_job(jobDesc1.jobStoreID) + jobDesc2 = self.jobstore_resumed.load_job(jobDesc1.jobStoreID) self.assertEqual(jobDesc1._body, jobDesc2._body) @@ -264,7 +271,7 @@ def testPersistantFilesToDelete(self): def testUpdateBehavior(self): """Tests the proper behavior during updating jobs.""" jobstore1 = self.jobstore_initialized - jobstore2 = self.jobstore_resumed_noconfig + jobstore2 = self.jobstore_resumed job1 = JobDescription( requirements=self.parentJobReqs, jobName="test1", unitName="onParent" @@ -380,7 +387,7 @@ def testJobDeletions(self): def testSharedFiles(self): """Tests the sharing of files.""" jobstore1 = self.jobstore_initialized - jobstore2 = self.jobstore_resumed_noconfig + jobstore2 = self.jobstore_resumed bar = b"bar" @@ -405,7 +412,7 @@ def testSharedFiles(self): def testReadWriteSharedFilesTextMode(self): """Checks if text mode is compatible for shared file streams.""" jobstore1 = self.jobstore_initialized - jobstore2 = self.jobstore_resumed_noconfig + jobstore2 = self.jobstore_resumed bar = "bar" @@ -446,7 +453,7 @@ def testReadWriteFileStreamTextMode(self): def testPerJobFiles(self): """Tests the behavior of files on jobs.""" jobstore1 = self.jobstore_initialized - jobstore2 = self.jobstore_resumed_noconfig + jobstore2 = self.jobstore_resumed # Create jobNodeOnJS1 jobOnJobStore1 = JobDescription( @@ -525,7 +532,7 @@ def testPerJobFiles(self): def testStatsAndLogging(self): """Tests behavior of reading and writing stats and logging.""" jobstore1 = self.jobstore_initialized - jobstore2 = self.jobstore_resumed_noconfig + jobstore2 = self.jobstore_resumed jobOnJobStore1 = JobDescription( requirements=self.parentJobReqs, jobName="test1", unitName="onJobStore1" @@ -1199,7 +1206,7 @@ class FileJobStoreTest(AbstractJobStoreTest.Test): def _createJobStore(self): # Make a FileJobStore with an artificially low fan out threshold, to # make sure to test fan out logic - return FileJobStore(self.namePrefix, fanOut=2) + return FileJobStore(self.namePrefix, config=self._createConfig(), fanOut=2) def _corruptJobStore(self): assert isinstance(self.jobstore_initialized, FileJobStore) # type hint @@ -1253,7 +1260,7 @@ def test_jobstore_init_preserves_symlink_path(self): dir_symlinked_to_original_filestore = f"{original_filestore}-am-i-real" os.symlink(original_filestore, dir_symlinked_to_original_filestore) filejobstore_using_symlink = FileJobStore( - dir_symlinked_to_original_filestore, fanOut=2 + dir_symlinked_to_original_filestore, config=self._createConfig(), fanOut=2 ) self.assertEqual( dir_symlinked_to_original_filestore, @@ -1345,9 +1352,10 @@ def test_symlink_read_control(self): config = self._createConfig() config.symlink_job_store_reads = should_link store = FileJobStore( - self.namePrefix + ("-link" if should_link else "-nolink") + self.namePrefix + ("-link" if should_link else "-nolink"), + config=config ) - store.initialize(config) + store.initialize() # Put something in the job store src_url, _ = self._prepareTestFile(self._externalStore(), 1) @@ -1373,7 +1381,7 @@ class GoogleJobStoreTest(AbstractJobStoreTest.Test): def _createJobStore(self): from toil.jobStores.googleJobStore import GoogleJobStore - return GoogleJobStore(GoogleJobStoreTest.projectID + ":" + self.namePrefix) + return GoogleJobStore(GoogleJobStoreTest.projectID + ":" + self.namePrefix, config=self._createConfig()) def _corruptJobStore(self): # The Google job store has only one resource, the bucket, so we can't corrupt it without @@ -1427,7 +1435,7 @@ def _createJobStore(self): from toil.jobStores.aws.jobStore import AWSJobStore partSize = self._partSize() - return AWSJobStore(self.awsRegion() + ":" + self.namePrefix, partSize=partSize) + return AWSJobStore(self.awsRegion() + ":" + self.namePrefix, config=self._createConfig(), partSize=partSize) def _corruptJobStore(self): from toil.jobStores.aws.jobStore import AWSJobStore diff --git a/src/toil/test/options/options.py b/src/toil/test/options/options.py index a5fbd5a761..c5e2f7179d 100644 --- a/src/toil/test/options/options.py +++ b/src/toil/test/options/options.py @@ -1,3 +1,5 @@ +import os + from configargparse import ArgParser from toil.common import Toil, addOptions @@ -14,12 +16,13 @@ def test_default_caching_slurm(self): Test to ensure that caching will be set to false when running on Slurm :return: """ + job_store = os.path.join(self._createTempDir(), "tree") parser = ArgParser() addOptions(parser, jobstore_as_flag=True, wdl=False, cwl=False) - test_args = ["--jobstore=example-jobstore", "--batchSystem=slurm"] + test_args = ["--jobstore", job_store, "--batchSystem=slurm"] options = parser.parse_args(test_args) with Toil(options) as toil: - caching_value = toil.config.caching + caching_value = toil._config.caching self.assertEqual(caching_value, False) def test_caching_option_priority(self): @@ -27,16 +30,18 @@ def test_caching_option_priority(self): Test to ensure that the --caching option takes priority over the default_caching() return value :return: """ + job_store = os.path.join(self._createTempDir(), "tree") parser = ArgParser() addOptions(parser, jobstore_as_flag=True, wdl=False, cwl=False) # the kubernetes batchsystem (and I think all batchsystems including singlemachine) return False # for default_caching test_args = [ - "--jobstore=example-jobstore", + "--jobstore", + job_store, "--batchSystem=kubernetes", "--caching=True", ] options = parser.parse_args(test_args) with Toil(options) as toil: - caching_value = toil.config.caching + caching_value = toil._config.caching self.assertEqual(caching_value, True) diff --git a/src/toil/test/provisioners/aws/awsProvisionerTest.py b/src/toil/test/provisioners/aws/awsProvisionerTest.py index cae25e4dd6..b70d85790e 100644 --- a/src/toil/test/provisioners/aws/awsProvisionerTest.py +++ b/src/toil/test/provisioners/aws/awsProvisionerTest.py @@ -661,7 +661,7 @@ def job(job, disk="10M", cores=1, memory="10M", preemptible=True): if __name__ == "__main__": options = Job.Runner.getDefaultArgumentParser().parse_args() with Toil(options) as toil: - if toil.config.restart: + if options.restart: toil.restart() else: toil.start(Job.wrapJobFn(job)) diff --git a/src/toil/test/src/autoDeploymentTest.py b/src/toil/test/src/autoDeploymentTest.py index 380f6a2564..5e28d090a3 100644 --- a/src/toil/test/src/autoDeploymentTest.py +++ b/src/toil/test/src/autoDeploymentTest.py @@ -66,7 +66,7 @@ def job(job, disk="10M", cores=1, memory="10M"): if __name__ == "__main__": options = Job.Runner.getDefaultArgumentParser().parse_args() with Toil(options) as toil: - if toil.config.restart: + if options.restart: toil.restart() else: toil.start(Job.wrapJobFn(job)) @@ -139,7 +139,7 @@ def job(job, disk="10M", cores=1, memory="10M"): if __name__ == "__main__": options = Job.Runner.getDefaultArgumentParser().parse_args() with Toil(options) as toil: - if toil.config.restart: + if options.restart: toil.restart() else: toil.start(Job.wrapJobFn(job)) diff --git a/src/toil/test/src/workerTest.py b/src/toil/test/src/workerTest.py index 543ded531f..03dfc31ddc 100644 --- a/src/toil/test/src/workerTest.py +++ b/src/toil/test/src/workerTest.py @@ -27,10 +27,10 @@ class WorkerTests(ToilTest): def setUp(self): super().setUp() path = self._getTestJobStorePath() - self.jobStore = FileJobStore(path) self.config = Config() self.config.jobStore = "file:%s" % path - self.jobStore.initialize(self.config) + self.jobStore = FileJobStore(path, self.config) + self.jobStore.initialize() self.jobNumber = 0 def testNextChainable(self): diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 5f34031b7d..b2e266fd00 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -494,7 +494,10 @@ def first_mismatch(prefix: str, value: str) -> int: async def toil_read_source( - uri: str, path: list[str], importer: WDL.Tree.Document | None + uri: str, + path: list[str], + importer: WDL.Tree.Document | None, + job_store: AbstractJobStore ) -> ReadSourceResult: """ Implementation of a MiniWDL read_source function that can use any @@ -513,12 +516,15 @@ async def toil_read_source( tried.append(candidate_uri) try: # TODO: this is probably sync work that would be better as async work here - AbstractJobStore.read_from_url(candidate_uri, destination_buffer) + job_store.read_from_url(candidate_uri, destination_buffer) except Exception as e: - # TODO: we need to assume any error is just a not-found, + # We need to assume any arbitrary error is just a not-found, # because the exceptions thrown by read_from_url() # implementations are not specified. - logger.debug("Tried to fetch %s from %s but got %s", uri, candidate_uri, e) + logger.debug("Tried to fetch %s from %s but got %s: %s", uri, candidate_uri, type(e), e) + if isinstance(e, TypeError): + # This is probably actually a bug + raise continue # If we get here, we got it probably. try: @@ -1218,7 +1224,10 @@ def _call_eager( else: # This is some other kind of remote file. # We need to get its size from the URI. - item_size = AbstractJobStore.get_size(uri) + # To access the URI we need to reach inside the stdlib to get a URLAccess. + # TODO: Maybe the stdlib should be one??? + assert isinstance(self.stdlib, ToilWDLStdLibBase) + item_size = self.stdlib._file_store.jobStore.get_size(uri) if item_size is None: # User asked for the size and we can't figure it out efficiently, so bail out. raise RuntimeError(f"Attempt to check the size of {uri} failed") @@ -1656,7 +1665,7 @@ def _virtualize_file( ) file_uri = Toil.normalize_uri(abs_filepath) - if not AbstractJobStore.url_exists(file_uri): + if not self._file_store.jobStore.url_exists(file_uri): logger.debug("File appears nonexistent so marking it nonexistent") return set_file_nonexistent(file, True) virtualized_filename = self._virtualize_filename(file.value) @@ -1743,7 +1752,8 @@ def _devirtualize_uri( # Open it exclusively with open(dest_path, "xb") as dest_file: # And save to it - size, executable = AbstractJobStore.read_from_url(filename, dest_file) + url_access = file_source if isinstance(file_source, Toil) else file_source.jobStore + size, executable = url_access.read_from_url(filename, dest_file) if executable: # Set the execute bit in the file's permissions os.chmod(dest_path, os.stat(dest_path).st_mode | stat.S_IXUSR) @@ -2608,7 +2618,7 @@ def drop_if_missing( if filename is not None and is_any_url(filename): try: - if filename.startswith(TOIL_URI_SCHEME) or AbstractJobStore.url_exists( + if filename.startswith(TOIL_URI_SCHEME) or standard_library._file_store.jobStore.url_exists( filename ): # We assume anything in the filestore actually exists. @@ -5449,7 +5459,8 @@ def main() -> None: # Load the WDL document document: WDL.Tree.Document = WDL.load( resolve_workflow(options.wdl_uri, supported_languages={"WDL"}), - read_source=toil_read_source, + # Smuggle the job store in so we can read URLs according to the config + read_source=partial(toil_read_source, job_store=toil._jobStore), ) # See if we're going to run a workflow or a task @@ -5508,7 +5519,7 @@ def main() -> None: input_source_uri = input_source input_source_name = input_source_uri input_json = asyncio.run( - toil_read_source(input_source_uri, [], None) + toil_read_source(input_source_uri, [], None, toil._jobStore) ).source_text try: inputs = json.loads(input_json) @@ -5517,7 +5528,7 @@ def main() -> None: # We need the absolute path or URL to raise the error if input_source_uri is not None: - # If this is a local fike, use that as the abspath. + # If this is a local file, use that as the abspath. # Otherwise just pass through a URL. inputs_abspath = ( input_source_uri