diff --git a/src/stimulus/data/data_handlers.py b/src/stimulus/data/data_handlers.py index c58e3dda..e7d6fc9f 100644 --- a/src/stimulus/data/data_handlers.py +++ b/src/stimulus/data/data_handlers.py @@ -462,20 +462,40 @@ def __getitem__(self, idx: Any) -> tuple[dict[str, torch.Tensor], dict[str, torc Args: idx: The index of the data to be returned, it can be a single index, a list of indexes or a slice """ + input_columns = self.dataset_manager.column_categories["input"] + label_columns = self.dataset_manager.column_categories["label"] + meta_columns = self.dataset_manager.column_categories["meta"] + # Handle different index types if isinstance(idx, slice): - data_at_index = self.data.slice(idx.start or 0, idx.stop or len(self.data)) + # Get the actual indices for the slice + start = idx.start if idx.start is not None else 0 + stop = idx.stop if idx.stop is not None else len(self.data) + data_at_index = self.data.slice(start, stop - start) + + # Process DataFrame + input_data = self.encoder_manager.encode_dataframe(data_at_index[input_columns]) + label_data = self.encoder_manager.encode_dataframe(data_at_index[label_columns]) + meta_data = {key: data_at_index[key].to_list() for key in meta_columns} + elif isinstance(idx, int): - data_at_index = self.data.slice(idx, idx + 1) - else: - data_at_index = self.data[idx] + # For single row, convert to dict with column names as keys + row_dict = dict(zip(self.data.columns, self.data.row(idx))) + + # Create single-row DataFrames for encoding + input_df = pl.DataFrame({col: [row_dict[col]] for col in input_columns}) + label_df = pl.DataFrame({col: [row_dict[col]] for col in label_columns}) + + input_data = self.encoder_manager.encode_dataframe(input_df) + label_data = self.encoder_manager.encode_dataframe(label_df) + meta_data = {key: [row_dict[key]] for key in meta_columns} + + else: # list or other sequence + data_at_index = self.data.select(idx) + + # Process DataFrame + input_data = self.encoder_manager.encode_dataframe(data_at_index[input_columns]) + label_data = self.encoder_manager.encode_dataframe(data_at_index[label_columns]) + meta_data = {key: data_at_index[key].to_list() for key in meta_columns} - input_columns, label_columns, meta_columns = ( - self.dataset_manager.column_categories["input"], - self.dataset_manager.column_categories["label"], - self.dataset_manager.column_categories["meta"], - ) - input_data = self.encoder_manager.encode_dataframe(data_at_index[input_columns]) - label_data = self.encoder_manager.encode_dataframe(data_at_index[label_columns]) - meta_data = {key: data_at_index[key].to_list() for key in meta_columns} return input_data, label_data, meta_data diff --git a/src/stimulus/learner/raytune_learner.py b/src/stimulus/learner/raytune_learner.py index 43175110..cfc3f712 100644 --- a/src/stimulus/learner/raytune_learner.py +++ b/src/stimulus/learner/raytune_learner.py @@ -7,13 +7,14 @@ from typing import Any, Optional, TypedDict import numpy as np +import ray import torch from ray import cluster_resources, train, tune from ray.tune import Trainable from safetensors.torch import load_model as safe_load_model from safetensors.torch import save_model as safe_save_model from torch import nn, optim -from torch.utils.data import DataLoader, Dataset +from torch.utils.data import DataLoader from stimulus.data.handlertorch import TorchDataset from stimulus.data.loaders import EncoderLoader @@ -103,7 +104,7 @@ def __init__( self.cpu_per_trial = model_config.tune.cpu_per_trial self.tuner = self.tuner_initialization( - config_path=data_config_path, + data_config_path=data_config_path, data_path=data_path, encoder_loader=encoder_loader, autoscaler=autoscaler, @@ -111,7 +112,7 @@ def __init__( def tuner_initialization( self, - config_path: str, + data_config_path: str, data_path: str, encoder_loader: EncoderLoader, *, @@ -138,17 +139,47 @@ def tuner_initialization( logging.info(f"PER_TRIAL resources -> GPU: {self.gpu_per_trial} CPU: {self.cpu_per_trial}") - # Configure trainable with resources and data - trainable = tune.with_resources(TuneModel, resources={"cpu": self.cpu_per_trial, "gpu": self.gpu_per_trial}) - trainable = tune.with_parameters( - trainable, - training=TorchDataset(config_path=config_path, csv_path=data_path, encoder_loader=encoder_loader, split=0), - validation=TorchDataset( - config_path=config_path, - csv_path=data_path, - encoder_loader=encoder_loader, - split=1, + # Pre-load and encode datasets once, then put them in Ray's object store + + training = TorchDataset( + config_path=data_config_path, + csv_path=data_path, + encoder_loader=encoder_loader, + split=0, + ) + validation = TorchDataset( + config_path=data_config_path, + csv_path=data_path, + encoder_loader=encoder_loader, + split=1, + ) + + # log to debug the names of the columns and shapes of tensors for a batch of training + # Log shapes of encoded tensors for first batch of training data + inputs, labels, meta = training[0:10] + + logging.debug("Training data tensor shapes:") + for field, tensor in inputs.items(): + logging.debug(f"Input field '{field}' shape: {tensor.shape}") + + for field, tensor in labels.items(): + logging.debug(f"Label field '{field}' shape: {tensor.shape}") + + for field, values in meta.items(): + logging.debug(f"Meta field '{field}' length: {len(values)}") + + training_ref = ray.put(training) + validation_ref = ray.put(validation) + + self.config["_training_ref"] = training_ref + self.config["_validation_ref"] = validation_ref + + # Configure trainable with resources and dataset parameters + trainable = tune.with_resources( + tune.with_parameters( + TuneModel, ), + resources={"cpu": self.cpu_per_trial, "gpu": self.gpu_per_trial}, ) return tune.Tuner(trainable, tune_config=self.tune_config, param_space=self.config, run_config=self.run_config) @@ -163,18 +194,13 @@ class TuneModel(Trainable): def setup(self, config: dict[Any, Any]) -> None: """Get the model, loss function(s), optimizer, train and test data from the config.""" - # set the seeds the second time, first in TuneWrapper initialization. This will make all important seed worker specific. + # set the seeds the second time, first in TuneWrapper initialization set_general_seeds(self.config["ray_worker_seed"]) # Initialize model with the config params - self.model = config["model"](**config["model_params"]) - - # Add data path - self.data_path = config["data_path"] + self.model = config["model"](**config["network_params"]) # Get the loss function(s) from the config model params - # Note that the loss function(s) are stored in a dictionary, - # where the keys are the key of loss_params in the yaml config file and the values are the loss functions associated to such keys. self.loss_dict = config["loss_params"] for key, loss_fn in self.loss_dict.items(): try: @@ -186,23 +212,29 @@ def setup(self, config: dict[Any, Any]) -> None: # get the optimizer parameters optimizer_lr = config["optimizer_params"]["lr"] - - # get the optimizer from PyTorch - self.optimizer = getattr(optim, config["optimizer_params"]["method"])(self.model.parameters(), lr=optimizer_lr) + self.optimizer = getattr(optim, config["optimizer_params"]["method"])( + self.model.parameters(), + lr=optimizer_lr, + ) # get step size from the config self.step_size = config["tune"]["step_size"] + # Get datasets from Ray's object store + training, validation = ray.get(self.config["_training_ref"]), ray.get(self.config["_validation_ref"]) + # use dataloader on training/validation data self.batch_size = config["data_params"]["batch_size"] - training: Dataset = config["training"] - validation: Dataset = config["validation"] self.training = DataLoader( training, batch_size=self.batch_size, shuffle=True, - ) # TODO need to check the reproducibility of this shuffling - self.validation = DataLoader(validation, batch_size=self.batch_size, shuffle=True) + ) + self.validation = DataLoader( + validation, + batch_size=self.batch_size, + shuffle=True, + ) # debug section, first create a dedicated directory for each worker inside Ray_results/ location debug_dir = os.path.join( diff --git a/src/stimulus/utils/performance.py b/src/stimulus/utils/performance.py index 2ac83df2..9564f56f 100644 --- a/src/stimulus/utils/performance.py +++ b/src/stimulus/utils/performance.py @@ -18,6 +18,7 @@ # Constants for threshold and number of classes BINARY_THRESHOLD = 0.5 BINARY_CLASS_COUNT = 2 +NON_SQUEEZED_SHAPE_LENGTH = 2 class Performance: @@ -89,19 +90,37 @@ def handle_multiclass( ) -> tuple[NDArray[np.float64], NDArray[np.float64]]: """Handle the case of multiclass classification. - TODO currently only two class predictions are handled. Needs to handle the other scenarios. - """ - # if only one columns for labels and predictions - if (len(labels.shape) == 1) and (len(predictions.shape) == 1): - return labels, predictions + Args: + labels: Labels array of shape (N,) or (N, 1) + predictions: Predictions array of shape (N,) or (N, C) where C is number of classes - # if one columns for labels, but two columns for predictions - if (len(labels.shape) == 1) and (predictions.shape[1] == BINARY_CLASS_COUNT): - predictions = predictions[:, 1] # assumes the second column is the positive class - return labels, predictions + Returns: + tuple[NDArray[np.float64], NDArray[np.float64]]: Processed labels and predictions - # other scenarios not implemented yet - raise ValueError(f"Labels have shape {labels.shape} and predictions have shape {predictions.shape}.") + Raises: + ValueError: If input shapes are not compatible + """ + # Case 1: If labels are 2D with shape (N,1), squeeze to 1D shape (N,) + # This handles cases where labels come as column vectors + if len(labels.shape) == NON_SQUEEZED_SHAPE_LENGTH and labels.shape[1] == 1: + labels = labels.squeeze(-1) + + if len(predictions.shape) == NON_SQUEEZED_SHAPE_LENGTH: + # Case 2: Binary classification with shape (N,2) + # Take probability of positive class (second column) + if predictions.shape[1] == BINARY_CLASS_COUNT: + predictions = predictions[:, 1] # Shape becomes (N,) + return labels, predictions + # Case 3: Multi-class classification with shape (N,C) + # Keep predictions as-is if labels are 1D and batch sizes match + if len(labels.shape) == 1 and predictions.shape[0] == labels.shape[0]: + return labels, predictions + + # If we get here, the shapes are not compatible + raise ValueError( + f"Incompatible shapes: labels {labels.shape}, predictions {predictions.shape}. " + "Expected labels (N,) or (N, 1) and predictions (N,) or (N, C) where C is number of classes.", + ) def rocauc(self, labels: NDArray[np.float64], predictions: NDArray[np.float64]) -> float: """Compute ROC AUC score.""" diff --git a/tests/learner/test_raytune_learner.py b/tests/learner/test_raytune_learner.py index d2dd0c95..54ef6e27 100644 --- a/tests/learner/test_raytune_learner.py +++ b/tests/learner/test_raytune_learner.py @@ -7,6 +7,7 @@ import ray import yaml +from stimulus.data.handlertorch import TorchDataset from stimulus.data.loaders import EncoderLoader from stimulus.learner.raytune_learner import TuneWrapper from stimulus.utils.yaml_data import YamlSubConfigDict @@ -27,7 +28,20 @@ def encoder_loader() -> EncoderLoader: """Load the EncoderLoader configuration.""" with open("tests/test_data/titanic/titanic_sub_config.yaml") as file: data_config = yaml.safe_load(file) - return EncoderLoader(YamlSubConfigDict(**data_config).columns) + encoder_loader = EncoderLoader() + encoder_loader.initialize_column_encoders_from_config(YamlSubConfigDict(**data_config).columns) + return encoder_loader + + +@pytest.fixture +def titanic_dataset(encoder_loader: EncoderLoader) -> TorchDataset: + """Create a TorchDataset instance for testing.""" + return TorchDataset( + csv_path="tests/test_data/titanic/titanic_stimulus_split.csv", + config_path="tests/test_data/titanic/titanic_sub_config.yaml", + encoder_loader=encoder_loader, + split=0, + ) def test_tunewrapper_init(ray_config_loader: RayTuneModel, encoder_loader: EncoderLoader) -> None: @@ -41,7 +55,7 @@ def test_tunewrapper_init(ray_config_loader: RayTuneModel, encoder_loader: Encod try: tune_wrapper = TuneWrapper( model_config=ray_config_loader, - model_class=titanic_model, + model_class=titanic_model.ModelTitanic, data_path="tests/test_data/titanic/titanic_stimulus_split.csv", data_config_path="tests/test_data/titanic/titanic_sub_config.yaml", encoder_loader=encoder_loader, @@ -61,3 +75,37 @@ def test_tunewrapper_init(ray_config_loader: RayTuneModel, encoder_loader: Encod import shutil shutil.rmtree("tests/test_data/titanic/ray_results", ignore_errors=True) + + +def test_tune_wrapper_tune(ray_config_loader: RayTuneModel, encoder_loader: EncoderLoader) -> None: + """Test the tune method of TuneWrapper class.""" + # Filter ResourceWarning during Ray shutdown + warnings.filterwarnings("ignore", category=ResourceWarning) + + # Initialize Ray with minimal resources for testing + ray.init(ignore_reinit_error=True) + + try: + tune_wrapper = TuneWrapper( + model_config=ray_config_loader, + model_class=titanic_model.ModelTitanic, + data_path="tests/test_data/titanic/titanic_stimulus_split.csv", + data_config_path="tests/test_data/titanic/titanic_sub_config.yaml", + encoder_loader=encoder_loader, + seed=42, + ray_results_dir=os.path.abspath("tests/test_data/titanic/ray_results"), + tune_run_name="test_run", + debug=False, + autoscaler=False, + ) + + tune_wrapper.tune() + + finally: + # Force cleanup of Ray resources + ray.shutdown() + # Clear any temporary files + if os.path.exists("tests/test_data/titanic/ray_results"): + import shutil + + shutil.rmtree("tests/test_data/titanic/ray_results", ignore_errors=True) diff --git a/tests/test_model/titanic_model.py b/tests/test_model/titanic_model.py index 22a2d21b..85aa8152 100644 --- a/tests/test_model/titanic_model.py +++ b/tests/test_model/titanic_model.py @@ -47,12 +47,19 @@ def forward( ) -> torch.Tensor: """Forward pass of the model. - It should return the output as a dictionary, with the same keys as `y`. + Args: + pclass: Tensor of shape [batch_size, 1] + sex: Tensor of shape [batch_size, 1] + ...etc - NOTE that the final `x` is a torch.Tensor with shape (batch_size, nb_classes). - Here nb_classes = 2, so the output is a tensor with two columns, meaning the probabilities for not survived | survived. + Returns: + Tensor of shape [batch_size, nb_classes] containing class probabilities """ - x = torch.stack((pclass, sex, age, sibsp, parch, fare, embarked), dim=1).float() + # Stack features and remove the extra dimension + x = torch.stack((pclass, sex, age, sibsp, parch, fare, embarked), dim=1).float() # [batch_size, 7, 1] + x = x.squeeze(-1) # [batch_size, 7] + + # Pass through layers x = self.relu(self.input_layer(x)) for layer in self.intermediate: x = self.relu(layer(x)) @@ -61,11 +68,17 @@ def forward( def compute_loss(self, output: torch.Tensor, survived: torch.Tensor, loss_fn: Callable) -> torch.Tensor: """Compute the loss. - `output` is the output tensor of the forward pass. - `survived` is the target tensor -> label column name. - `loss_fn` is the loss function to be used. + Args: + output: Model output tensor of shape [batch_size, nb_classes] + survived: Target tensor of shape [batch_size, 1] + loss_fn: Loss function (CrossEntropyLoss) + + Returns: + Loss value """ - return loss_fn(output, survived) + # Squeeze the extra dimension from the target tensor and ensure long dtype + target = survived.squeeze(-1).long() + return loss_fn(output, target) def batch( self,