Skip to content

Commit

Permalink
re-add WQ support
Browse files Browse the repository at this point in the history
  • Loading branch information
svandenhaute committed Dec 8, 2023
1 parent 08c4e7d commit 8b8b2c4
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 15 deletions.
2 changes: 2 additions & 0 deletions configs/htex.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,7 @@ ModelTraining:
max_walltime: 1
gpu: true
ReferenceEvaluation:
mpi_command: 'mpirun -np {}' # cp2k on conda-forge comes with OpenMPI (not MPICH as in container)
cores_per_worker: 1
mode: 'htex'
...
15 changes: 15 additions & 0 deletions configs/workqueue.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
---
ModelEvaluation:
cores_per_worker: 1
simulation_engine: 'openmm'
gpu: false
ModelTraining:
cores_per_worker: 4
max_walltime: 1
gpu: true
ReferenceEvaluation:
max_walltime: 0.3
cores_per_worker: 1
mpi_command: 'mpirun -np {}' # cp2k on conda-forge comes with OpenMPI (not MPICH as in container)
mode: 'workqueue'
...
75 changes: 66 additions & 9 deletions psiflow/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

# see https://stackoverflow.com/questions/59904631/python-class-constants-in-dataclasses
from typing import Any, Callable, Optional, Type, Union
from warnings import warn

import parsl
import psutil
Expand All @@ -25,7 +24,7 @@
from parsl.providers.base import ExecutionProvider

from psiflow.models import BaseModel
from psiflow.parsl_utils import ContainerizedLauncher
from psiflow.parsl_utils import ContainerizedLauncher, MyWorkQueueExecutor
from psiflow.reference import BaseReference
from psiflow.utils import resolve_and_check, set_logger

Expand Down Expand Up @@ -63,9 +62,9 @@ def __post_init__(self):
self.max_walltime = 1e9

def generate_parsl_resource_specification(self):
warn("use of the WQ executor is deprecated!")
resource_specification = {}
resource_specification["cores"] = self.cores_per_worker
# add random disk and mem usage because this is somehow required
resource_specification["disk"] = 1000
memory = 2000 * self.cores_per_worker
resource_specification["memory"] = int(memory)
Expand Down Expand Up @@ -257,8 +256,10 @@ def parse_config(yaml_dict: dict):
"retries": 1,
"strategy": "simple",
"max_idletime": 20,
"htex_address": None,
"default_threads": 1,
"htex_address": None,
# "use_workqueue": False,
"mode": "taskvine",
}
forced = {
"initialize_logging": False, # manual; to move parsl.log one level up
Expand Down Expand Up @@ -328,8 +329,15 @@ def load(

# create main parsl executors
executors = []
mode = psiflow_config.pop("mode")
for definition in definitions:
if not definition.use_threadpool:
if definition.use_threadpool:
executor = ThreadPoolExecutor(
max_threads=definition.cores_per_worker,
working_dir=str(path),
label=definition.name(),
)
elif mode == "htex":
if type(definition.parsl_provider) is LocalProvider: # noqa: F405
cores_available = psutil.cpu_count(logical=False)
max_workers = max(
Expand All @@ -352,12 +360,61 @@ def load(
provider=definition.parsl_provider,
cpu_affinity=definition.cpu_affinity,
)
else:
executor = ThreadPoolExecutor(
max_threads=definition.cores_per_worker,
working_dir=str(path),
elif mode == "workqueue":
worker_options = []
if hasattr(definition.parsl_provider, "cores_per_node"):
worker_options.append(
"--cores={}".format(definition.parsl_provider.cores_per_node),
)
else:
worker_options.append(
"--cores={}".format(psutil.cpu_count(logical=False)),
)
if hasattr(definition.parsl_provider, "walltime"):
walltime_hhmmss = definition.parsl_provider.walltime.split(":")
assert len(walltime_hhmmss) == 3
walltime = 0
walltime += 60 * float(walltime_hhmmss[0])
walltime += float(walltime_hhmmss[1])
walltime += 1 # whatever seconds are present
walltime -= (
5 # add 5 minutes of slack, e.g. for container downloading
)
worker_options.append("--wall-time={}".format(walltime * 60))
worker_options.append("--parent-death")
worker_options.append(
"--timeout={}".format(psiflow_config["max_idletime"])
)
# manager_config = TaskVineManagerConfig(
# shared_fs=True,
# max_retries=1,
# autocategory=False,
# enable_peer_transfers=False,
# port=0,
# )
# factory_config = TaskVineFactoryConfig(
# factory_timeout=20,
# worker_options=' '.join(worker_options),
# )
executor = MyWorkQueueExecutor(
label=definition.name(),
working_dir=str(path / definition.name()),
provider=definition.parsl_provider,
shared_fs=True,
autocategory=False,
port=0,
max_retries=0,
coprocess=False,
worker_options=" ".join(worker_options),
)
else:
raise ValueError("Unknown mode {}".format(mode))
# executor = TaskVineExecutor(
# label=definition.name(),
# provider=definition.parsl_provider,
# manager_config=manager_config,
# factory_config=factory_config,
# )
executors.append(executor)

# create default executors
Expand Down
6 changes: 3 additions & 3 deletions psiflow/models/_mace.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class MACEConfig:
)


@typeguard.typechecked
# @typeguard.typechecked
def initialize(
mace_config: dict,
stdout: str = "",
Expand Down Expand Up @@ -174,14 +174,14 @@ def train(
command_write,
"timeout -s 15 {}s psiflow-train-mace".format(max(walltime - 15, 0)),
"--config config.yaml",
"--model {};".format(inputs[0].filepath),
"--model {} || true;".format(inputs[0].filepath),
"ls *;",
"cp model/mace.model {};".format(outputs[0].filepath), # no swa
]
return " ".join(command_list)


@typeguard.typechecked
# @typeguard.typechecked
def deploy(
device: str,
inputs: List[File] = [],
Expand Down
6 changes: 3 additions & 3 deletions psiflow/models/_nequip.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ class AllegroConfig(NequIPConfig):
r_max: float = 5.0


@typeguard.typechecked
# @typeguard.typechecked
def initialize(
nequip_config: dict,
stdout: str = "",
Expand Down Expand Up @@ -214,7 +214,7 @@ def initialize(
return " ".join(command_list)


@typeguard.typechecked
# @typeguard.typechecked
def deploy(
nequip_config: dict,
inputs: List[File] = [],
Expand Down Expand Up @@ -265,7 +265,7 @@ def train(
"timeout -s 15 {}s".format(max(walltime - 15, 0)), # 15 s slack
"psiflow-train-nequip",
"--config config.yaml",
"--model {};".format(inputs[0].filepath),
"--model {} || true;".format(inputs[0].filepath),
"ls;",
"cp {}/best_model.pth {}".format(
nequip_config["run_name"], outputs[0].filepath
Expand Down

0 comments on commit 8b8b2c4

Please sign in to comment.