From 3e83e8b928c281f8dc27a91b6ddf1f11ce295ef9 Mon Sep 17 00:00:00 2001 From: mgrapotte Date: Fri, 31 Jan 2025 14:43:07 +0100 Subject: [PATCH 01/13] fix(raytune_learner): fixed data init to be within the trainable setup, this prevents from passing data through ray object store --- src/stimulus/learner/raytune_learner.py | 57 +++++++++++++++---------- 1 file changed, 35 insertions(+), 22 deletions(-) diff --git a/src/stimulus/learner/raytune_learner.py b/src/stimulus/learner/raytune_learner.py index 4317511..fd93b4d 100644 --- a/src/stimulus/learner/raytune_learner.py +++ b/src/stimulus/learner/raytune_learner.py @@ -111,7 +111,7 @@ def __init__( def tuner_initialization( self, - config_path: str, + data_config_path: str, data_path: str, encoder_loader: EncoderLoader, *, @@ -138,17 +138,15 @@ def tuner_initialization( logging.info(f"PER_TRIAL resources -> GPU: {self.gpu_per_trial} CPU: {self.cpu_per_trial}") - # Configure trainable with resources and data - trainable = tune.with_resources(TuneModel, resources={"cpu": self.cpu_per_trial, "gpu": self.gpu_per_trial}) - trainable = tune.with_parameters( - trainable, - training=TorchDataset(config_path=config_path, csv_path=data_path, encoder_loader=encoder_loader, split=0), - validation=TorchDataset( - config_path=config_path, - csv_path=data_path, + # Configure trainable with resources and dataset parameters + trainable = tune.with_resources( + tune.with_parameters( + TuneModel, + data_config_path=data_config_path, + data_path=data_path, encoder_loader=encoder_loader, - split=1, ), + resources={"cpu": self.cpu_per_trial, "gpu": self.gpu_per_trial} ) return tune.Tuner(trainable, tune_config=self.tune_config, param_space=self.config, run_config=self.run_config) @@ -161,20 +159,18 @@ def tune(self) -> None: class TuneModel(Trainable): """Trainable model class for Ray Tune.""" - def setup(self, config: dict[Any, Any]) -> None: + def setup(self, config: dict[Any, Any], *, data_config_path: str, data_path: str, encoder_loader: EncoderLoader) -> None: """Get the model, loss function(s), optimizer, train and test data from the config.""" - # set the seeds the second time, first in TuneWrapper initialization. This will make all important seed worker specific. + # set the seeds the second time, first in TuneWrapper initialization set_general_seeds(self.config["ray_worker_seed"]) # Initialize model with the config params self.model = config["model"](**config["model_params"]) # Add data path - self.data_path = config["data_path"] + self.data_path = data_path # Get the loss function(s) from the config model params - # Note that the loss function(s) are stored in a dictionary, - # where the keys are the key of loss_params in the yaml config file and the values are the loss functions associated to such keys. self.loss_dict = config["loss_params"] for key, loss_fn in self.loss_dict.items(): try: @@ -186,23 +182,40 @@ def setup(self, config: dict[Any, Any]) -> None: # get the optimizer parameters optimizer_lr = config["optimizer_params"]["lr"] - - # get the optimizer from PyTorch - self.optimizer = getattr(optim, config["optimizer_params"]["method"])(self.model.parameters(), lr=optimizer_lr) + self.optimizer = getattr(optim, config["optimizer_params"]["method"])( + self.model.parameters(), + lr=optimizer_lr + ) # get step size from the config self.step_size = config["tune"]["step_size"] + # Initialize datasets using the passed parameters + training = TorchDataset( + config_path=data_config_path, + csv_path=data_path, + encoder_loader=encoder_loader, + split=0, + ) + validation = TorchDataset( + config_path=data_config_path, + csv_path=data_path, + encoder_loader=encoder_loader, + split=1, + ) + # use dataloader on training/validation data self.batch_size = config["data_params"]["batch_size"] - training: Dataset = config["training"] - validation: Dataset = config["validation"] self.training = DataLoader( training, batch_size=self.batch_size, shuffle=True, - ) # TODO need to check the reproducibility of this shuffling - self.validation = DataLoader(validation, batch_size=self.batch_size, shuffle=True) + ) + self.validation = DataLoader( + validation, + batch_size=self.batch_size, + shuffle=True + ) # debug section, first create a dedicated directory for each worker inside Ray_results/ location debug_dir = os.path.join( From e32f009da2719d2128295990ec0859fda6c23f8f Mon Sep 17 00:00:00 2001 From: mgrapotte Date: Fri, 31 Jan 2025 14:47:53 +0100 Subject: [PATCH 02/13] perf(raytune_learner): improved performance by loading data into the ray object store instead of having each actor loading the data. --- src/stimulus/learner/raytune_learner.py | 45 ++++++++++++++----------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/src/stimulus/learner/raytune_learner.py b/src/stimulus/learner/raytune_learner.py index fd93b4d..8d20703 100644 --- a/src/stimulus/learner/raytune_learner.py +++ b/src/stimulus/learner/raytune_learner.py @@ -14,6 +14,7 @@ from safetensors.torch import save_model as safe_save_model from torch import nn, optim from torch.utils.data import DataLoader, Dataset +import ray from stimulus.data.handlertorch import TorchDataset from stimulus.data.loaders import EncoderLoader @@ -138,13 +139,31 @@ def tuner_initialization( logging.info(f"PER_TRIAL resources -> GPU: {self.gpu_per_trial} CPU: {self.cpu_per_trial}") + # Pre-load and encode datasets once, then put them in Ray's object store + @ray.remote + def create_datasets(data_config_path: str, data_path: str, encoder_loader: EncoderLoader): + training = TorchDataset( + config_path=data_config_path, + csv_path=data_path, + encoder_loader=encoder_loader, + split=0, + ) + validation = TorchDataset( + config_path=data_config_path, + csv_path=data_path, + encoder_loader=encoder_loader, + split=1, + ) + return training, validation + + # Put datasets in Ray's object store + datasets_ref = create_datasets.remote(data_config_path, data_path, encoder_loader) + # Configure trainable with resources and dataset parameters trainable = tune.with_resources( tune.with_parameters( TuneModel, - data_config_path=data_config_path, - data_path=data_path, - encoder_loader=encoder_loader, + datasets_ref=datasets_ref, ), resources={"cpu": self.cpu_per_trial, "gpu": self.gpu_per_trial} ) @@ -159,7 +178,7 @@ def tune(self) -> None: class TuneModel(Trainable): """Trainable model class for Ray Tune.""" - def setup(self, config: dict[Any, Any], *, data_config_path: str, data_path: str, encoder_loader: EncoderLoader) -> None: + def setup(self, config: dict[Any, Any], *, datasets_ref: ray.ObjectRef) -> None: """Get the model, loss function(s), optimizer, train and test data from the config.""" # set the seeds the second time, first in TuneWrapper initialization set_general_seeds(self.config["ray_worker_seed"]) @@ -167,9 +186,6 @@ def setup(self, config: dict[Any, Any], *, data_config_path: str, data_path: str # Initialize model with the config params self.model = config["model"](**config["model_params"]) - # Add data path - self.data_path = data_path - # Get the loss function(s) from the config model params self.loss_dict = config["loss_params"] for key, loss_fn in self.loss_dict.items(): @@ -190,19 +206,8 @@ def setup(self, config: dict[Any, Any], *, data_config_path: str, data_path: str # get step size from the config self.step_size = config["tune"]["step_size"] - # Initialize datasets using the passed parameters - training = TorchDataset( - config_path=data_config_path, - csv_path=data_path, - encoder_loader=encoder_loader, - split=0, - ) - validation = TorchDataset( - config_path=data_config_path, - csv_path=data_path, - encoder_loader=encoder_loader, - split=1, - ) + # Get datasets from Ray's object store + training, validation = ray.get(datasets_ref) # use dataloader on training/validation data self.batch_size = config["data_params"]["batch_size"] From 86cd58208f72b460c6b8da283a4590d15c7fd35b Mon Sep 17 00:00:00 2001 From: mgrapotte Date: Fri, 31 Jan 2025 14:49:49 +0100 Subject: [PATCH 03/13] fix(raytune_learner): fixed arg issue --- src/stimulus/learner/raytune_learner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stimulus/learner/raytune_learner.py b/src/stimulus/learner/raytune_learner.py index 8d20703..e57bbda 100644 --- a/src/stimulus/learner/raytune_learner.py +++ b/src/stimulus/learner/raytune_learner.py @@ -104,7 +104,7 @@ def __init__( self.cpu_per_trial = model_config.tune.cpu_per_trial self.tuner = self.tuner_initialization( - config_path=data_config_path, + data_config_path=data_config_path, data_path=data_path, encoder_loader=encoder_loader, autoscaler=autoscaler, From 476b322ff6c667bfafcd56aa502695095dd80c8c Mon Sep 17 00:00:00 2001 From: mgrapotte Date: Fri, 31 Jan 2025 14:57:05 +0100 Subject: [PATCH 04/13] fix(raytune_learner): replace bad model_param keyword with network_params in config --- src/stimulus/learner/raytune_learner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stimulus/learner/raytune_learner.py b/src/stimulus/learner/raytune_learner.py index e57bbda..11561d5 100644 --- a/src/stimulus/learner/raytune_learner.py +++ b/src/stimulus/learner/raytune_learner.py @@ -184,7 +184,7 @@ def setup(self, config: dict[Any, Any], *, datasets_ref: ray.ObjectRef) -> None: set_general_seeds(self.config["ray_worker_seed"]) # Initialize model with the config params - self.model = config["model"](**config["model_params"]) + self.model = config["model"](**config["network_params"]) # Get the loss function(s) from the config model params self.loss_dict = config["loss_params"] From fe76db1da20b7506ee833a38fc45bdd36a51a05a Mon Sep 17 00:00:00 2001 From: mgrapotte Date: Fri, 31 Jan 2025 15:42:25 +0100 Subject: [PATCH 05/13] fix(raytune_learner): pass data refs through config instead of function def --- src/stimulus/learner/raytune_learner.py | 42 +++++++++++++------------ 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/src/stimulus/learner/raytune_learner.py b/src/stimulus/learner/raytune_learner.py index 11561d5..73af5c3 100644 --- a/src/stimulus/learner/raytune_learner.py +++ b/src/stimulus/learner/raytune_learner.py @@ -140,30 +140,32 @@ def tuner_initialization( logging.info(f"PER_TRIAL resources -> GPU: {self.gpu_per_trial} CPU: {self.cpu_per_trial}") # Pre-load and encode datasets once, then put them in Ray's object store - @ray.remote - def create_datasets(data_config_path: str, data_path: str, encoder_loader: EncoderLoader): - training = TorchDataset( - config_path=data_config_path, - csv_path=data_path, - encoder_loader=encoder_loader, - split=0, - ) - validation = TorchDataset( - config_path=data_config_path, - csv_path=data_path, - encoder_loader=encoder_loader, - split=1, - ) - return training, validation + + + training = TorchDataset( + config_path=data_config_path, + csv_path=data_path, + encoder_loader=encoder_loader, + split=0, + ) + validation = TorchDataset( + config_path=data_config_path, + csv_path=data_path, + encoder_loader=encoder_loader, + split=1, + ) + + + training_ref = ray.put(training) + validation_ref = ray.put(validation) - # Put datasets in Ray's object store - datasets_ref = create_datasets.remote(data_config_path, data_path, encoder_loader) + self.config["_training_ref"] = training_ref + self.config["_validation_ref"] = validation_ref # Configure trainable with resources and dataset parameters trainable = tune.with_resources( tune.with_parameters( TuneModel, - datasets_ref=datasets_ref, ), resources={"cpu": self.cpu_per_trial, "gpu": self.gpu_per_trial} ) @@ -178,7 +180,7 @@ def tune(self) -> None: class TuneModel(Trainable): """Trainable model class for Ray Tune.""" - def setup(self, config: dict[Any, Any], *, datasets_ref: ray.ObjectRef) -> None: + def setup(self, config: dict[Any, Any]) -> None: """Get the model, loss function(s), optimizer, train and test data from the config.""" # set the seeds the second time, first in TuneWrapper initialization set_general_seeds(self.config["ray_worker_seed"]) @@ -207,7 +209,7 @@ def setup(self, config: dict[Any, Any], *, datasets_ref: ray.ObjectRef) -> None: self.step_size = config["tune"]["step_size"] # Get datasets from Ray's object store - training, validation = ray.get(datasets_ref) + training, validation = ray.get(self.config["_training_ref"]), ray.get(self.config["_validation_ref"]) # use dataloader on training/validation data self.batch_size = config["data_params"]["batch_size"] From 539089db8aad2558b77906b4e4546d4c2ca44252 Mon Sep 17 00:00:00 2001 From: mgrapotte Date: Fri, 31 Jan 2025 16:26:15 +0100 Subject: [PATCH 06/13] fix(data_handlers): current implementation was considering everything as a slice. i.e. if __getitem__(4) was called, would return everything from 0 to 4. This was crashing torch DataLoader. Issue was fixed by re-creating a dataframe when being asked single idx. --- src/stimulus/data/data_handlers.py | 44 ++++++++++++++++++++++-------- 1 file changed, 32 insertions(+), 12 deletions(-) diff --git a/src/stimulus/data/data_handlers.py b/src/stimulus/data/data_handlers.py index c58e3dd..bf405a9 100644 --- a/src/stimulus/data/data_handlers.py +++ b/src/stimulus/data/data_handlers.py @@ -462,20 +462,40 @@ def __getitem__(self, idx: Any) -> tuple[dict[str, torch.Tensor], dict[str, torc Args: idx: The index of the data to be returned, it can be a single index, a list of indexes or a slice """ + input_columns = self.dataset_manager.column_categories["input"] + label_columns = self.dataset_manager.column_categories["label"] + meta_columns = self.dataset_manager.column_categories["meta"] + # Handle different index types if isinstance(idx, slice): - data_at_index = self.data.slice(idx.start or 0, idx.stop or len(self.data)) + # Get the actual indices for the slice + start = idx.start if idx.start is not None else 0 + stop = idx.stop if idx.stop is not None else len(self.data) + data_at_index = self.data.slice(start, stop - start) + + # Process DataFrame + input_data = self.encoder_manager.encode_dataframe(data_at_index[input_columns]) + label_data = self.encoder_manager.encode_dataframe(data_at_index[label_columns]) + meta_data = {key: data_at_index[key].to_list() for key in meta_columns} + elif isinstance(idx, int): - data_at_index = self.data.slice(idx, idx + 1) - else: - data_at_index = self.data[idx] + # For single row, convert to dict with column names as keys + row_dict = {col: val for col, val in zip(self.data.columns, self.data.row(idx))} + + # Create single-row DataFrames for encoding + input_df = pl.DataFrame({col: [row_dict[col]] for col in input_columns}) + label_df = pl.DataFrame({col: [row_dict[col]] for col in label_columns}) + + input_data = self.encoder_manager.encode_dataframe(input_df) + label_data = self.encoder_manager.encode_dataframe(label_df) + meta_data = {key: [row_dict[key]] for key in meta_columns} + + else: # list or other sequence + data_at_index = self.data.select(idx) + + # Process DataFrame + input_data = self.encoder_manager.encode_dataframe(data_at_index[input_columns]) + label_data = self.encoder_manager.encode_dataframe(data_at_index[label_columns]) + meta_data = {key: data_at_index[key].to_list() for key in meta_columns} - input_columns, label_columns, meta_columns = ( - self.dataset_manager.column_categories["input"], - self.dataset_manager.column_categories["label"], - self.dataset_manager.column_categories["meta"], - ) - input_data = self.encoder_manager.encode_dataframe(data_at_index[input_columns]) - label_data = self.encoder_manager.encode_dataframe(data_at_index[label_columns]) - meta_data = {key: data_at_index[key].to_list() for key in meta_columns} return input_data, label_data, meta_data From 189aab700df7779b4db5fe3fe48e9cac620909a2 Mon Sep 17 00:00:00 2001 From: mgrapotte Date: Fri, 31 Jan 2025 16:26:39 +0100 Subject: [PATCH 07/13] fix(data_handlers): make format --- src/stimulus/data/data_handlers.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/stimulus/data/data_handlers.py b/src/stimulus/data/data_handlers.py index bf405a9..824f373 100644 --- a/src/stimulus/data/data_handlers.py +++ b/src/stimulus/data/data_handlers.py @@ -472,27 +472,27 @@ def __getitem__(self, idx: Any) -> tuple[dict[str, torch.Tensor], dict[str, torc start = idx.start if idx.start is not None else 0 stop = idx.stop if idx.stop is not None else len(self.data) data_at_index = self.data.slice(start, stop - start) - + # Process DataFrame input_data = self.encoder_manager.encode_dataframe(data_at_index[input_columns]) label_data = self.encoder_manager.encode_dataframe(data_at_index[label_columns]) meta_data = {key: data_at_index[key].to_list() for key in meta_columns} - + elif isinstance(idx, int): # For single row, convert to dict with column names as keys row_dict = {col: val for col, val in zip(self.data.columns, self.data.row(idx))} - + # Create single-row DataFrames for encoding input_df = pl.DataFrame({col: [row_dict[col]] for col in input_columns}) label_df = pl.DataFrame({col: [row_dict[col]] for col in label_columns}) - + input_data = self.encoder_manager.encode_dataframe(input_df) label_data = self.encoder_manager.encode_dataframe(label_df) meta_data = {key: [row_dict[key]] for key in meta_columns} - + else: # list or other sequence data_at_index = self.data.select(idx) - + # Process DataFrame input_data = self.encoder_manager.encode_dataframe(data_at_index[input_columns]) label_data = self.encoder_manager.encode_dataframe(data_at_index[label_columns]) From b37f099561e929f88d657e4610dc76461549fb60 Mon Sep 17 00:00:00 2001 From: mgrapotte Date: Fri, 31 Jan 2025 16:39:23 +0100 Subject: [PATCH 08/13] fix(titanic_model): correct tensor shapes. - stack during forward was not well executed - target shape was incorrect --- tests/test_model/titanic_model.py | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/tests/test_model/titanic_model.py b/tests/test_model/titanic_model.py index 22a2d21..a546c6a 100644 --- a/tests/test_model/titanic_model.py +++ b/tests/test_model/titanic_model.py @@ -47,12 +47,19 @@ def forward( ) -> torch.Tensor: """Forward pass of the model. - It should return the output as a dictionary, with the same keys as `y`. - - NOTE that the final `x` is a torch.Tensor with shape (batch_size, nb_classes). - Here nb_classes = 2, so the output is a tensor with two columns, meaning the probabilities for not survived | survived. + Args: + pclass: Tensor of shape [batch_size, 1] + sex: Tensor of shape [batch_size, 1] + ...etc + + Returns: + Tensor of shape [batch_size, nb_classes] containing class probabilities """ - x = torch.stack((pclass, sex, age, sibsp, parch, fare, embarked), dim=1).float() + # Stack features and remove the extra dimension + x = torch.stack((pclass, sex, age, sibsp, parch, fare, embarked), dim=1).float() # [batch_size, 7, 1] + x = x.squeeze(-1) # [batch_size, 7] + + # Pass through layers x = self.relu(self.input_layer(x)) for layer in self.intermediate: x = self.relu(layer(x)) @@ -61,11 +68,17 @@ def forward( def compute_loss(self, output: torch.Tensor, survived: torch.Tensor, loss_fn: Callable) -> torch.Tensor: """Compute the loss. - `output` is the output tensor of the forward pass. - `survived` is the target tensor -> label column name. - `loss_fn` is the loss function to be used. + Args: + output: Model output tensor of shape [batch_size, nb_classes] + survived: Target tensor of shape [batch_size, 1] + loss_fn: Loss function (CrossEntropyLoss) + + Returns: + Loss value """ - return loss_fn(output, survived) + # Squeeze the extra dimension from the target tensor and ensure long dtype + target = survived.squeeze(-1).long() + return loss_fn(output, target) def batch( self, From 4256e88d73a54490b82ede27848f0c5a404ce6f6 Mon Sep 17 00:00:00 2001 From: mgrapotte Date: Fri, 31 Jan 2025 16:39:46 +0100 Subject: [PATCH 09/13] fix(titanic_model): run make format --- tests/test_model/titanic_model.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_model/titanic_model.py b/tests/test_model/titanic_model.py index a546c6a..85aa815 100644 --- a/tests/test_model/titanic_model.py +++ b/tests/test_model/titanic_model.py @@ -51,14 +51,14 @@ def forward( pclass: Tensor of shape [batch_size, 1] sex: Tensor of shape [batch_size, 1] ...etc - + Returns: Tensor of shape [batch_size, nb_classes] containing class probabilities """ # Stack features and remove the extra dimension x = torch.stack((pclass, sex, age, sibsp, parch, fare, embarked), dim=1).float() # [batch_size, 7, 1] x = x.squeeze(-1) # [batch_size, 7] - + # Pass through layers x = self.relu(self.input_layer(x)) for layer in self.intermediate: From 6ea4bb02cdf84a508650828f4e5780857e896b8d Mon Sep 17 00:00:00 2001 From: mgrapotte Date: Fri, 31 Jan 2025 17:03:30 +0100 Subject: [PATCH 10/13] fix(data_handlers): fix linting issues --- src/stimulus/data/data_handlers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stimulus/data/data_handlers.py b/src/stimulus/data/data_handlers.py index 824f373..e7d6fc9 100644 --- a/src/stimulus/data/data_handlers.py +++ b/src/stimulus/data/data_handlers.py @@ -480,7 +480,7 @@ def __getitem__(self, idx: Any) -> tuple[dict[str, torch.Tensor], dict[str, torc elif isinstance(idx, int): # For single row, convert to dict with column names as keys - row_dict = {col: val for col, val in zip(self.data.columns, self.data.row(idx))} + row_dict = dict(zip(self.data.columns, self.data.row(idx))) # Create single-row DataFrames for encoding input_df = pl.DataFrame({col: [row_dict[col]] for col in input_columns}) From 748b887c23159c3c893a125294df597f1429271a Mon Sep 17 00:00:00 2001 From: mgrapotte Date: Fri, 31 Jan 2025 17:05:10 +0100 Subject: [PATCH 11/13] fix(raytune_learner): added debug section for checking tensor shapes and make format --- src/stimulus/learner/raytune_learner.py | 32 +++++++++++++++++-------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/src/stimulus/learner/raytune_learner.py b/src/stimulus/learner/raytune_learner.py index 73af5c3..cfc3f71 100644 --- a/src/stimulus/learner/raytune_learner.py +++ b/src/stimulus/learner/raytune_learner.py @@ -7,14 +7,14 @@ from typing import Any, Optional, TypedDict import numpy as np +import ray import torch from ray import cluster_resources, train, tune from ray.tune import Trainable from safetensors.torch import load_model as safe_load_model from safetensors.torch import save_model as safe_save_model from torch import nn, optim -from torch.utils.data import DataLoader, Dataset -import ray +from torch.utils.data import DataLoader from stimulus.data.handlertorch import TorchDataset from stimulus.data.loaders import EncoderLoader @@ -140,7 +140,6 @@ def tuner_initialization( logging.info(f"PER_TRIAL resources -> GPU: {self.gpu_per_trial} CPU: {self.cpu_per_trial}") # Pre-load and encode datasets once, then put them in Ray's object store - training = TorchDataset( config_path=data_config_path, @@ -155,19 +154,32 @@ def tuner_initialization( split=1, ) + # log to debug the names of the columns and shapes of tensors for a batch of training + # Log shapes of encoded tensors for first batch of training data + inputs, labels, meta = training[0:10] + + logging.debug("Training data tensor shapes:") + for field, tensor in inputs.items(): + logging.debug(f"Input field '{field}' shape: {tensor.shape}") + + for field, tensor in labels.items(): + logging.debug(f"Label field '{field}' shape: {tensor.shape}") + + for field, values in meta.items(): + logging.debug(f"Meta field '{field}' length: {len(values)}") training_ref = ray.put(training) validation_ref = ray.put(validation) self.config["_training_ref"] = training_ref self.config["_validation_ref"] = validation_ref - + # Configure trainable with resources and dataset parameters trainable = tune.with_resources( tune.with_parameters( TuneModel, ), - resources={"cpu": self.cpu_per_trial, "gpu": self.gpu_per_trial} + resources={"cpu": self.cpu_per_trial, "gpu": self.gpu_per_trial}, ) return tune.Tuner(trainable, tune_config=self.tune_config, param_space=self.config, run_config=self.run_config) @@ -201,8 +213,8 @@ def setup(self, config: dict[Any, Any]) -> None: # get the optimizer parameters optimizer_lr = config["optimizer_params"]["lr"] self.optimizer = getattr(optim, config["optimizer_params"]["method"])( - self.model.parameters(), - lr=optimizer_lr + self.model.parameters(), + lr=optimizer_lr, ) # get step size from the config @@ -219,9 +231,9 @@ def setup(self, config: dict[Any, Any]) -> None: shuffle=True, ) self.validation = DataLoader( - validation, - batch_size=self.batch_size, - shuffle=True + validation, + batch_size=self.batch_size, + shuffle=True, ) # debug section, first create a dedicated directory for each worker inside Ray_results/ location From 200e30afd00bbcd9ca142c4dba2dd05b3af67716 Mon Sep 17 00:00:00 2001 From: mgrapotte Date: Fri, 31 Jan 2025 17:05:57 +0100 Subject: [PATCH 12/13] fix(performance): added comments and ran make format --- src/stimulus/utils/performance.py | 41 ++++++++++++++++++++++--------- 1 file changed, 30 insertions(+), 11 deletions(-) diff --git a/src/stimulus/utils/performance.py b/src/stimulus/utils/performance.py index 2ac83df..9564f56 100644 --- a/src/stimulus/utils/performance.py +++ b/src/stimulus/utils/performance.py @@ -18,6 +18,7 @@ # Constants for threshold and number of classes BINARY_THRESHOLD = 0.5 BINARY_CLASS_COUNT = 2 +NON_SQUEEZED_SHAPE_LENGTH = 2 class Performance: @@ -89,19 +90,37 @@ def handle_multiclass( ) -> tuple[NDArray[np.float64], NDArray[np.float64]]: """Handle the case of multiclass classification. - TODO currently only two class predictions are handled. Needs to handle the other scenarios. - """ - # if only one columns for labels and predictions - if (len(labels.shape) == 1) and (len(predictions.shape) == 1): - return labels, predictions + Args: + labels: Labels array of shape (N,) or (N, 1) + predictions: Predictions array of shape (N,) or (N, C) where C is number of classes - # if one columns for labels, but two columns for predictions - if (len(labels.shape) == 1) and (predictions.shape[1] == BINARY_CLASS_COUNT): - predictions = predictions[:, 1] # assumes the second column is the positive class - return labels, predictions + Returns: + tuple[NDArray[np.float64], NDArray[np.float64]]: Processed labels and predictions - # other scenarios not implemented yet - raise ValueError(f"Labels have shape {labels.shape} and predictions have shape {predictions.shape}.") + Raises: + ValueError: If input shapes are not compatible + """ + # Case 1: If labels are 2D with shape (N,1), squeeze to 1D shape (N,) + # This handles cases where labels come as column vectors + if len(labels.shape) == NON_SQUEEZED_SHAPE_LENGTH and labels.shape[1] == 1: + labels = labels.squeeze(-1) + + if len(predictions.shape) == NON_SQUEEZED_SHAPE_LENGTH: + # Case 2: Binary classification with shape (N,2) + # Take probability of positive class (second column) + if predictions.shape[1] == BINARY_CLASS_COUNT: + predictions = predictions[:, 1] # Shape becomes (N,) + return labels, predictions + # Case 3: Multi-class classification with shape (N,C) + # Keep predictions as-is if labels are 1D and batch sizes match + if len(labels.shape) == 1 and predictions.shape[0] == labels.shape[0]: + return labels, predictions + + # If we get here, the shapes are not compatible + raise ValueError( + f"Incompatible shapes: labels {labels.shape}, predictions {predictions.shape}. " + "Expected labels (N,) or (N, 1) and predictions (N,) or (N, C) where C is number of classes.", + ) def rocauc(self, labels: NDArray[np.float64], predictions: NDArray[np.float64]) -> float: """Compute ROC AUC score.""" From 58cc43c3911e84f5be4b55c6d4eeb28af1ac4ebe Mon Sep 17 00:00:00 2001 From: mgrapotte Date: Fri, 31 Jan 2025 17:06:22 +0100 Subject: [PATCH 13/13] fix(test_raytune_learner): ran make format --- tests/learner/test_raytune_learner.py | 52 +++++++++++++++++++++++++-- 1 file changed, 50 insertions(+), 2 deletions(-) diff --git a/tests/learner/test_raytune_learner.py b/tests/learner/test_raytune_learner.py index d2dd0c9..54ef6e2 100644 --- a/tests/learner/test_raytune_learner.py +++ b/tests/learner/test_raytune_learner.py @@ -7,6 +7,7 @@ import ray import yaml +from stimulus.data.handlertorch import TorchDataset from stimulus.data.loaders import EncoderLoader from stimulus.learner.raytune_learner import TuneWrapper from stimulus.utils.yaml_data import YamlSubConfigDict @@ -27,7 +28,20 @@ def encoder_loader() -> EncoderLoader: """Load the EncoderLoader configuration.""" with open("tests/test_data/titanic/titanic_sub_config.yaml") as file: data_config = yaml.safe_load(file) - return EncoderLoader(YamlSubConfigDict(**data_config).columns) + encoder_loader = EncoderLoader() + encoder_loader.initialize_column_encoders_from_config(YamlSubConfigDict(**data_config).columns) + return encoder_loader + + +@pytest.fixture +def titanic_dataset(encoder_loader: EncoderLoader) -> TorchDataset: + """Create a TorchDataset instance for testing.""" + return TorchDataset( + csv_path="tests/test_data/titanic/titanic_stimulus_split.csv", + config_path="tests/test_data/titanic/titanic_sub_config.yaml", + encoder_loader=encoder_loader, + split=0, + ) def test_tunewrapper_init(ray_config_loader: RayTuneModel, encoder_loader: EncoderLoader) -> None: @@ -41,7 +55,7 @@ def test_tunewrapper_init(ray_config_loader: RayTuneModel, encoder_loader: Encod try: tune_wrapper = TuneWrapper( model_config=ray_config_loader, - model_class=titanic_model, + model_class=titanic_model.ModelTitanic, data_path="tests/test_data/titanic/titanic_stimulus_split.csv", data_config_path="tests/test_data/titanic/titanic_sub_config.yaml", encoder_loader=encoder_loader, @@ -61,3 +75,37 @@ def test_tunewrapper_init(ray_config_loader: RayTuneModel, encoder_loader: Encod import shutil shutil.rmtree("tests/test_data/titanic/ray_results", ignore_errors=True) + + +def test_tune_wrapper_tune(ray_config_loader: RayTuneModel, encoder_loader: EncoderLoader) -> None: + """Test the tune method of TuneWrapper class.""" + # Filter ResourceWarning during Ray shutdown + warnings.filterwarnings("ignore", category=ResourceWarning) + + # Initialize Ray with minimal resources for testing + ray.init(ignore_reinit_error=True) + + try: + tune_wrapper = TuneWrapper( + model_config=ray_config_loader, + model_class=titanic_model.ModelTitanic, + data_path="tests/test_data/titanic/titanic_stimulus_split.csv", + data_config_path="tests/test_data/titanic/titanic_sub_config.yaml", + encoder_loader=encoder_loader, + seed=42, + ray_results_dir=os.path.abspath("tests/test_data/titanic/ray_results"), + tune_run_name="test_run", + debug=False, + autoscaler=False, + ) + + tune_wrapper.tune() + + finally: + # Force cleanup of Ray resources + ray.shutdown() + # Clear any temporary files + if os.path.exists("tests/test_data/titanic/ray_results"): + import shutil + + shutil.rmtree("tests/test_data/titanic/ray_results", ignore_errors=True)