Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ray tune learner refactor, explicit config declaration #68

Merged
merged 12 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,6 @@ uv.lock
.mypy_cache/
.ruff_cache/
__pycache__/

# cursor
.cursorrules
22 changes: 1 addition & 21 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,24 +1,4 @@
# Changelog

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## v0.0.1

### Added
- pyproject.toml
- __init__.py files in most folders
- this changelog

### Changed
- moved launchers from
- directory structure to follow PEP8 principle for packaging
- safetensors in requirements

### Fixed
- important path issues related to project structure change

### Removed
- all nextflow specific code [c226ef](https://github.com/mathysgrapotte/stimulus-py/commit/c226efc39844043ca4b24474e3cd552d40797b69)


and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
199 changes: 91 additions & 108 deletions src/stimulus/learner/raytune_learner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,18 @@

import numpy as np
import torch
from ray import cluster_resources, init, is_initialized, shutdown, train, tune
from ray.tune import Trainable, schedulers
from ray import cluster_resources, train, tune
from ray.tune import Trainable
from safetensors.torch import load_model as safe_load_model
from safetensors.torch import save_model as safe_save_model
from torch import nn, optim
from torch.utils.data import DataLoader, Dataset

from stimulus.data.handlertorch import TorchDataset
from stimulus.data.loaders import EncoderLoader
from stimulus.learner.predict import PredictWrapper
from stimulus.utils.generic_utils import set_general_seeds
from stimulus.utils.yaml_model_schema import YamlRayConfigLoader
from stimulus.utils.yaml_model_schema import RayTuneModel


class CheckpointDict(TypedDict):
Expand All @@ -32,99 +33,82 @@ class TuneWrapper:

def __init__(
self,
config_path: str,
config: RayTuneModel,
model_class: nn.Module,
data_path: str,
experiment_object: object,
max_gpus: Optional[int] = None,
max_cpus: Optional[int] = None,
max_object_store_mem: Optional[float] = None,
max_mem: Optional[float] = None,
encoder_loader: EncoderLoader,
seed: int,
ray_results_dir: Optional[str] = None,
tune_run_name: Optional[str] = None,
*, # Force debug to be keyword-only
*,
debug: bool = False,
) -> None:
"""Initialize the TuneWrapper with the paths to the config, model, and data."""
self.config = YamlRayConfigLoader(config_path).get_config()
self.config = config.model_dump()

# set all general seeds: python, numpy and torch.
set_general_seeds(self.config["seed"])

self.config["model"] = model_class
self.config["experiment"] = experiment_object

# add the ray method for number generation to the config so it can be passed to the trainable class, that will in turn set per worker seeds in a reproducible mnanner.
self.config["ray_worker_seed"] = tune.randint(0, 1000)

# add the data path to the config so it know where it is during tuning
if not os.path.exists(data_path):
raise ValueError("Data path does not exist. Given path:" + data_path)
self.config["data_path"] = os.path.abspath(data_path)

# build the tune config
self.config["tune"]["tune_params"]["scheduler"] = getattr(schedulers, self.config["tune"]["scheduler"]["name"])(
**self.config["tune"]["scheduler"]["params"],
set_general_seeds(seed)

# build the tune config:
try:
scheduler_class = getattr(tune.schedulers, config.tune.scheduler.name) # todo, do this in RayConfigLoader
except AttributeError as err:
raise ValueError(
f"Invalid scheduler: {config.tune.scheduler.name}, check Ray Tune for documentation on available schedulers",
) from err

scheduler = scheduler_class(**config.tune.scheduler.params)
self.tune_config = tune.TuneConfig(
metric=config.tune.tune_params.metric,
mode=config.tune.tune_params.mode,
num_samples=config.tune.tune_params.num_samples,
scheduler=scheduler,
)
self.tune_config = tune.TuneConfig(**self.config["tune"]["tune_params"])

# set ray cluster total resources (max)
self.max_gpus = max_gpus
self.max_cpus = max_cpus
self.max_object_store_mem = max_object_store_mem # this is a special subset of the total usable memory that ray need for his internal work, by default is set to 30% of total memory usable
self.max_mem = max_mem

# build the run config
self.checkpoint_config = train.CheckpointConfig(checkpoint_at_end=True) # TODO implement checkpoiting
# in case a custom name was not given for tune_run_name, build it like ray would do. to later pass it on the worker for the debug section.
if tune_run_name is None:
tune_run_name = "TuneModel_" + datetime.datetime.now(tz=datetime.timezone.utc).strftime("%Y-%m-%d_%H-%M-%S")
self.run_config = train.RunConfig(
name=tune_run_name,
name=tune_run_name
if tune_run_name is not None
else "TuneModel_" + datetime.datetime.now(tz=datetime.timezone.utc).strftime("%Y-%m-%d_%H-%M-%S"),
storage_path=ray_results_dir,
checkpoint_config=self.checkpoint_config,
**self.config["tune"]["run_params"],
) # TODO maybe put name into config if it was possible to retrieve from tune the name of the result subdir)
checkpoint_config=train.CheckpointConfig(checkpoint_at_end=True),
stop=config.tune.run_params.stop,
)

# add the data path to the config
if not os.path.exists(data_path):
raise ValueError("Data path does not exist. Given path:" + data_path)
self.config["data_path"] = os.path.abspath(data_path)

# working towards the path for the tune_run directory. if ray_results_dir None ray will put it under home so we will do the same here.
# Set up tune_run path
if ray_results_dir is None:
ray_results_dir = os.environ.get("HOME", "")
# then we are able to pass the whole correct tune_run path to the trainable function. so it can use thaqt to place the debug dir under if needed.
self.config["tune_run_path"] = os.path.join(ray_results_dir, tune_run_name)

# pass the debug flag to the config taken fromn tune so it can be used inside the setup of the trainable
self.config["_debug"] = False
if debug:
self.config["_debug"] = True
self.config["tune_run_path"] = os.path.join(
ray_results_dir,
tune_run_name
if tune_run_name is not None
else "TuneModel_" + datetime.datetime.now(tz=datetime.timezone.utc).strftime("%Y-%m-%d_%H-%M-%S"),
)
self.config["_debug"] = debug
self.config["model"] = model_class
self.config["encoder_loader"] = encoder_loader
self.config["ray_worker_seed"] = tune.randint(0, 1000)

self.tuner = self.tuner_initialization()

def tuner_initialization(self) -> tune.Tuner:
"""Prepare the tuner with the configs."""
# in ray 3.0.0 the following issue is fixed. Sometimes it sees that ray is already initialized, so in that case shut it off and start anew. TODO update to ray 3.0.0
if is_initialized():
shutdown()

# initialize the ray cluster with the limiter on CPUs, GPUs or memory if needed, otherwise everything that is available. None is what ray uses to get all resources available for either CPU, GPU or memory.
# memory is split in two for ray. read more at ray.init documentation.
init(
num_cpus=self.max_cpus,
num_gpus=self.max_gpus,
object_store_memory=self.max_object_store_mem,
_memory=self.max_mem,
)
# Get available resources from Ray cluster
cluster_res = cluster_resources()
logging.info(f"CLUSTER resources -> {cluster_res}")

logging.info(f"CLUSTER resources -> {cluster_resources()}")

# check if resources per trial are not exceeding maximum resources. traial = single set/combination of hyperparameter (parrallel actors maximum resources in ray tune gergon).
self.gpu_per_trial = self._chek_per_trial_resources("gpu_per_trial", cluster_resources(), "GPU")
self.cpu_per_trial = self._chek_per_trial_resources("cpu_per_trial", cluster_resources(), "CPU")
# Check per-trial resources
self.gpu_per_trial = self._check_per_trial_resources("gpu_per_trial", cluster_res, "GPU")
self.cpu_per_trial = self._check_per_trial_resources("cpu_per_trial", cluster_res, "CPU")

logging.info(f"PER_TRIAL resources -> GPU: {self.gpu_per_trial} CPU: {self.cpu_per_trial}")

# wrap the trainable with the allowed resources per trial
# also provide the training and validation data to the trainable through with_parameters
# this is a wrapper that passes the data as a object reference (pointer)
# Configure trainable with resources and data
trainable = tune.with_resources(TuneModel, resources={"cpu": self.cpu_per_trial, "gpu": self.gpu_per_trial})
trainable = tune.with_parameters(
trainable,
Expand All @@ -138,68 +122,67 @@ def tune(self) -> None:
"""Run the tuning process."""
self.tuner.fit()

def _chek_per_trial_resources(
def _check_per_trial_resources(
self,
resurce_key: str,
resource_key: str,
cluster_max_resources: dict[str, float],
resource_type: str,
) -> float:
"""Helper function that check that user requested per trial resources are not exceeding the available resources for the ray cluster.
"""Check requested per-trial resources against available cluster resources.

If the per trial resources are not asked they are set to a default resoanable ammount.
This function validates and adjusts the requested per-trial resource allocation based on the
available cluster resources. It handles three cases:
1. Resource request is within cluster limits - uses requested amount
2. Resource request exceeds cluster limits - warns and uses maximum available
3. No resource request specified - calculates reasonable default based on cluster capacity

Args:
resurce_key: The key used to look into the self.config["tune"]
cluster_max_resources: The output of the ray.cluster_resources() function. It hold what ray has found to be the available resources for CPU, GPU and Memory
resource_type: The key used to llok into the cluster_resources dict
resource_key: Key in config for the resource (e.g. "gpu_per_trial")
cluster_max_resources: Dictionary of maximum available cluster resources
resource_type: Type of resource being checked ("GPU" or "CPU")

Returns:
The amount of resources per trial to use
float: Number of resources to allocate per trial

Note:
For GPUs, returns 0.0 if no GPUs are available in the cluster.
"""
if resource_type == "GPU" and resource_type not in cluster_resources():
# ray does not have a GPU field also if GPUs were set to zero. So trial GPU resources have to be set to zero.
if self.max_gpus == 0:
return 0.0
# in case GPUs that are not detected raise error. This happens sometimes when max_gpus stay as None and ray.init does not find GPU by itself. not setting max_gpus (None) means to use all available ones. TODO make ray see GPU on None value.
raise SystemError(
"#### ray did not detect any GPU, if you do not want to use GPU set max_gpus=0, or in nextflow --max_gpus 0.",
)
if resource_type == "GPU" and resource_type not in cluster_max_resources:
return 0.0

per_trial_resource: float = 0.0
# if everything is alright, leave the value as it is.

# Check if resource is specified in config and within limits
if (
resurce_key in self.config["tune"]
and self.config["tune"][resurce_key] <= cluster_max_resources[resource_type]
resource_key in self.config["tune"]
and self.config["tune"][resource_key] <= cluster_max_resources[resource_type]
):
per_trial_resource = float(self.config["tune"][resurce_key])
per_trial_resource = float(self.config["tune"][resource_key])

# if per_trial_resource are more than what is avaialble to ray set them to what is available and warn the user
# Warn if requested resources exceed available
elif (
resurce_key in self.config["tune"]
and self.config["tune"][resurce_key] > cluster_max_resources[resource_type]
resource_key in self.config["tune"]
and self.config["tune"][resource_key] > cluster_max_resources[resource_type]
):
# TODO write a better warning
logging.warning(
f"\n\n#### WARNING - {resource_type} per trial are more than what is available. "
f"{resource_type} per trial: {self.config['tune'][resurce_key]} "
f"{resource_type} per trial: {self.config['tune'][resource_key]} "
f"available: {cluster_max_resources[resource_type]} "
"overwriting value to max available",
)
per_trial_resource = float(cluster_max_resources[resource_type])

# if per_trial_resource has not been asked and there is none available set them to zero
elif resurce_key not in self.config["tune"] and cluster_max_resources[resource_type] == 0.0:
per_trial_resource = 0.0

# if per_trial_resource has not been asked and the resource is available set the value to either 1 or number_available resource / num_samples
elif resurce_key not in self.config["tune"] and cluster_max_resources[resource_type] != 0.0:
# TODO maybe set the default to 0.5 instead of 1 ? fractional use in case of GPU? Should this be a mandatory parameter?
per_trial_resource = float(
max(
1,
(cluster_max_resources[resource_type] // self.config["tune"]["tune_params"]["num_samples"]),
),
)
# Set default if not specified
elif resource_key not in self.config["tune"]:
if cluster_max_resources[resource_type] == 0.0:
per_trial_resource = 0.0
else:
per_trial_resource = float(
max(
1,
(cluster_max_resources[resource_type] // self.config["tune"]["tune_params"]["num_samples"]),
),
)

return per_trial_resource

Expand Down
4 changes: 2 additions & 2 deletions src/stimulus/utils/yaml_model_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,13 +200,13 @@ def convert_config_to_ray(self, model: Model) -> RayTuneModel:
tune=model.tune,
)

def get_config(self) -> dict:
def get_config(self) -> RayTuneModel:
"""Return the current configuration.

Returns:
Current configuration dictionary
"""
return self.ray_model.model_dump()
return self.ray_model

@staticmethod
def sampint(sample_space: list, n_space: list) -> list[int]:
Expand Down
10 changes: 5 additions & 5 deletions tests/utils/test_model_yaml.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ def test_get_config(titanic_model_yaml_path: str) -> None:
loader = yaml_model_schema.YamlRayConfigLoader(model)

config = loader.get_config()
assert isinstance(config, dict)
assert "network_params" in config
assert "optimizer_params" in config
assert "loss_params" in config
assert "data_params" in config
assert isinstance(config, yaml_model_schema.RayTuneModel)
assert hasattr(config, "network_params")
assert hasattr(config, "optimizer_params")
assert hasattr(config, "loss_params")
assert hasattr(config, "data_params")


def test_sampint() -> None:
Expand Down