diff --git a/numalogic/models/autoencoder/variants/vanilla.py b/numalogic/models/autoencoder/variants/vanilla.py index 5580f6d8..97dc8e43 100644 --- a/numalogic/models/autoencoder/variants/vanilla.py +++ b/numalogic/models/autoencoder/variants/vanilla.py @@ -59,7 +59,7 @@ def _construct_layers(self, layersizes: Sequence[int]) -> nn.ModuleList: layers.extend( [ nn.Linear(start_layersize, lsize), - nn.BatchNorm1d(self.n_features), + # nn.BatchNorm1d(self.n_features), nn.Tanh(), nn.Dropout(p=self.dropout_p), ] @@ -69,8 +69,10 @@ def _construct_layers(self, layersizes: Sequence[int]) -> nn.ModuleList: layers.extend( [ nn.Linear(start_layersize, layersizes[-1]), - nn.BatchNorm1d(self.n_features), - nn.ReLU(), + # nn.BatchNorm1d(self.n_features), + # nn.ReLU(), + nn.Tanh(), + nn.Dropout(p=self.dropout_p), ] ) return layers @@ -120,7 +122,7 @@ def _construct_layers(self, layersizes: Sequence[int]) -> nn.ModuleList: layers.extend( [ nn.Linear(layersizes[idx], layersizes[idx + 1]), - nn.BatchNorm1d(self.n_features), + # nn.BatchNorm1d(self.n_features), nn.Tanh(), nn.Dropout(p=self.dropout_p), ] @@ -190,14 +192,14 @@ def forward(self, batch: Tensor) -> tuple[Tensor, Tensor]: decoded = self.decoder(encoded) return encoded, torch.swapdims(decoded, 1, 2) - def _get_reconstruction_loss(self, batch: Tensor): + def _get_reconstruction_loss(self, batch: Tensor, reduction="mean") -> Tensor: _, recon = self.forward(batch) - return self.criterion(batch, recon) + return 0.5 * self.criterion(batch, recon, reduction=reduction) def predict_step(self, batch: Tensor, batch_idx: int, dataloader_idx: int = 0): """Returns reconstruction for streaming input.""" recon = self.reconstruction(batch) - return self.criterion(batch, recon, reduction="none") + return 0.5 * self.criterion(batch, recon, reduction="none") class SparseVanillaAE(VanillaAE): diff --git a/numalogic/tools/data.py b/numalogic/tools/data.py index dd38a066..10f956c8 100644 --- a/numalogic/tools/data.py +++ b/numalogic/tools/data.py @@ -191,7 +191,7 @@ def __getitem__(self, idx: Union[int, slice]) -> npt.NDArray[float]: return np.stack(output) if idx >= len(self): raise IndexError(f"{idx} out of bound!") - return self._data[idx : idx + self._seq_len] + return self._data[(idx * self._stride) : (idx * self._stride) + self._seq_len] class StreamingDataLoader(DataLoader): diff --git a/numalogic/udfs/inference.py b/numalogic/udfs/inference.py index 38ea614e..ad2b1807 100644 --- a/numalogic/udfs/inference.py +++ b/numalogic/udfs/inference.py @@ -84,8 +84,7 @@ def compute(cls, model: artifact_t, input_: npt.NDArray[float], **_) -> npt.NDAr model.eval() try: with torch.no_grad(): - _, out = model.forward(x) - recon_err = model.criterion(out, x, reduction="none") + recon_err = model._get_reconstruction_loss(x, reduction="none") except Exception as err: raise RuntimeError("Model forward pass failed!") from err return np.ascontiguousarray(recon_err).squeeze(0) diff --git a/numalogic/udfs/trainer/_base.py b/numalogic/udfs/trainer/_base.py index 8ab91cd3..4fb6527e 100644 --- a/numalogic/udfs/trainer/_base.py +++ b/numalogic/udfs/trainer/_base.py @@ -33,6 +33,7 @@ ) from numalogic.udfs.entities import TrainerPayload from numalogic.udfs.tools import TrainMsgDeduplicator +import torch _struct_log = configure_logger() @@ -120,8 +121,12 @@ def compute( model, train_dataloaders=DataLoader(train_ds, batch_size=trainer_cfg.batch_size) ) train_reconerr = trainer.predict( - model, dataloaders=DataLoader(train_ds, batch_size=trainer_cfg.batch_size) - ).numpy() + model, + dataloaders=DataLoader(train_ds, batch_size=trainer_cfg.batch_size), + unbatch=False, + ) + train_reconerr = torch.mean(train_reconerr, dim=1).numpy() + dict_artifacts["inference"] = KeyedArtifact( dkeys=[numalogic_cfg.model.name], artifact=model, stateful=numalogic_cfg.model.stateful ) @@ -233,12 +238,12 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: _add_summary( summary=NAN_SUMMARY, labels=_metric_label_values, - data=nan_counter, + data=np.sum(nan_counter), ) _add_summary( summary=INF_SUMMARY, labels=_metric_label_values, - data=inf_counter, + data=np.sum(inf_counter), ) # Initialize artifacts @@ -363,7 +368,7 @@ def get_feature_arr( raw_df: pd.DataFrame, metrics: list[str], fill_value: float = 0.0, - ) -> tuple[npt.NDArray[float], float, float]: + ) -> tuple[npt.NDArray[float], pd.Series, pd.Series]: """ Get feature array from the raw dataframe. @@ -378,14 +383,15 @@ def get_feature_arr( nan_counter: Number of nan values inf_counter: Number of inf values """ - nan_counter = 0 - for col in metrics: + nan_counter = np.zeros(len(metrics), dtype=int) + inf_counter = np.zeros(len(metrics), dtype=int) + for idx, col in enumerate(metrics): if col not in raw_df.columns: raw_df[col] = fill_value - nan_counter += len(raw_df) + nan_counter[idx] += len(raw_df) feat_df = raw_df[metrics] - nan_counter += raw_df.isna().sum().all() - inf_counter = np.isinf(feat_df).sum().all() + nan_counter += feat_df.isna().sum() + inf_counter = np.isinf(feat_df).sum() feat_df = feat_df.fillna(fill_value).replace([np.inf, -np.inf], fill_value) return feat_df.to_numpy(dtype=np.float32), nan_counter, inf_counter